From ffffb508841cc730d8ea87d9856831173bc021ca Mon Sep 17 00:00:00 2001 From: catbref Date: Tue, 4 Jun 2019 17:35:03 +0100 Subject: [PATCH] Networking/Controller changes to aid broadcast of unconfirmed transactions. Notably: network messages passed up to Controller are now processed in their own thread, as opposed to peer's thread. So each message processor in Controller needs to thread-safe. V2 network protocol asks for unconfirmed transactions, can send round lists of transaction signatures and ask for individual transactions, to save bandwidth/processing. --- .../java/org/qora/controller/Controller.java | 159 +++++++++++++++++- src/main/java/org/qora/network/Network.java | 58 ++++--- .../GetUnconfirmedTransactionsMessage.java | 25 +++ .../org/qora/network/message/Message.java | 4 +- .../message/TransactionSignaturesMessage.java | 66 ++++++++ .../repository/TransactionRepository.java | 8 + .../HSQLDBTransactionRepository.java | 23 +++ 7 files changed, 317 insertions(+), 26 deletions(-) create mode 100644 src/main/java/org/qora/network/message/GetUnconfirmedTransactionsMessage.java create mode 100644 src/main/java/org/qora/network/message/TransactionSignaturesMessage.java diff --git a/src/main/java/org/qora/controller/Controller.java b/src/main/java/org/qora/controller/Controller.java index 942c9f1c..f5a9d8a7 100644 --- a/src/main/java/org/qora/controller/Controller.java +++ b/src/main/java/org/qora/controller/Controller.java @@ -11,6 +11,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Predicate; @@ -38,11 +41,13 @@ import org.qora.network.message.GetPeersMessage; import org.qora.network.message.GetSignaturesMessage; import org.qora.network.message.GetSignaturesV2Message; import org.qora.network.message.GetTransactionMessage; +import org.qora.network.message.GetUnconfirmedTransactionsMessage; import org.qora.network.message.HeightMessage; import org.qora.network.message.HeightV2Message; import org.qora.network.message.Message; import org.qora.network.message.SignaturesMessage; import org.qora.network.message.TransactionMessage; +import org.qora.network.message.TransactionSignaturesMessage; import org.qora.repository.DataException; import org.qora.repository.Repository; import org.qora.repository.RepositoryFactory; @@ -77,7 +82,10 @@ public class Controller extends Thread { private final long buildTimestamp; // seconds /** Lock for only allowing one blockchain-modifying codepath at a time. e.g. synchronization or newly generated block. */ - private final ReentrantLock blockchainLock = new ReentrantLock();; + private final ReentrantLock blockchainLock = new ReentrantLock(); + + /** Executor for processing network messages. */ + private final ExecutorService networkMessageExecutor = Executors.newCachedThreadPool(); private Controller() { Properties properties = new Properties(); @@ -352,6 +360,7 @@ public class Controller extends Thread { } LOGGER.info("Shutting down networking"); + networkMessageExecutor.shutdown(); Network.getInstance().shutdown(); LOGGER.info("Shutting down API"); @@ -398,6 +407,9 @@ public class Controller extends Thread { // Request peers lists network.broadcast(peer -> new GetPeersMessage()); + + // Request unconfirmed transaction signatures + network.broadcast(peer -> network.buildGetUnconfirmedTransactionsMessage(peer)); } public void onGeneratedBlock() { @@ -410,10 +422,64 @@ public class Controller extends Thread { public void onNewTransaction(TransactionData transactionData) { // Send round to all peers - Network.getInstance().broadcast(peer -> new TransactionMessage(transactionData)); + Network network = Network.getInstance(); + network.broadcast(peer -> network.buildNewTransactionMessage(peer, transactionData)); + } + + public void onPeerHandshakeCompleted(Peer peer) { + if (peer.getVersion() < 2) { + // Legacy mode + + // Send our unconfirmed transactions + try (final Repository repository = RepositoryManager.getRepository()) { + List transactions = repository.getTransactionRepository().getUnconfirmedTransactions(); + + for (TransactionData transactionData : transactions) { + Message transactionMessage = new TransactionMessage(transactionData); + if (!peer.sendMessage(transactionMessage)) { + peer.disconnect("failed to send unconfirmed transaction"); + return; + } + } + } catch (DataException e) { + LOGGER.error("Repository issue while sending unconfirmed transactions", e); + } + } else { + // V2 protocol + + // Request peer's unconfirmed transactions + Message message = new GetUnconfirmedTransactionsMessage(); + if (!peer.sendMessage(message)) { + peer.disconnect("failed to send request for unconfirmed transactions"); + return; + } + } } public void onNetworkMessage(Peer peer, Message message) { + class NetworkMessageProcessor implements Runnable { + private Peer peer; + private Message message; + + public NetworkMessageProcessor(Peer peer, Message message) { + this.peer = peer; + this.message = message; + } + + @Override + public void run() { + Controller.getInstance().processNetworkMessage(peer, message); + } + } + + try { + networkMessageExecutor.execute(new NetworkMessageProcessor(peer, message)); + } catch (RejectedExecutionException e) { + // Can't execute - probably because we're shutting down, so ignore + } + } + + private void processNetworkMessage(Peer peer, Message message) { LOGGER.trace(String.format("Processing %s message from %s", message.getType().name(), peer)); switch (message.getType()) { @@ -607,7 +673,93 @@ public class Controller extends Thread { break; } - case GET_BLOCK_SUMMARIES: + case GET_UNCONFIRMED_TRANSACTIONS: { + try (final Repository repository = RepositoryManager.getRepository()) { + List signatures = repository.getTransactionRepository().getUnconfirmedTransactionSignatures(); + + Message transactionSignaturesMessage = new TransactionSignaturesMessage(signatures); + if (!peer.sendMessage(transactionSignaturesMessage)) + peer.disconnect("failed to send unconfirmed transaction signatures"); + } catch (DataException e) { + LOGGER.error(String.format("Repository issue while sending unconfirmed transaction signatures to peer %s", peer), e); + } + break; + } + + case TRANSACTION_SIGNATURES: { + TransactionSignaturesMessage transactionSignaturesMessage = (TransactionSignaturesMessage) message; + List signatures = transactionSignaturesMessage.getSignatures(); + List newSignatures = new ArrayList<>(); + + try (final Repository repository = RepositoryManager.getRepository()) { + for (byte[] signature : signatures) { + // Do we have it already? + if (repository.getTransactionRepository().exists(signature)) { + LOGGER.trace(String.format("Ignoring unconfirmed transaction %s from peer %s", Base58.encode(signature), peer)); + break; + } + + // Blockchain lock required to prevent multiple threads trying to save the same transaction simultaneously + ReentrantLock blockchainLock = Controller.getInstance().getBlockchainLock(); + if (blockchainLock.tryLock()) + try { + // Fetch actual transaction data from peer + Message getTransactionMessage = new GetTransactionMessage(signature); + Message responseMessage = peer.getResponse(getTransactionMessage); + if (responseMessage == null || !(responseMessage instanceof TransactionMessage)) { + peer.disconnect("failed to fetch unconfirmed transaction"); + break; + } + + TransactionMessage transactionMessage = (TransactionMessage) responseMessage; + TransactionData transactionData = transactionMessage.getTransactionData(); + Transaction transaction = Transaction.fromData(repository, transactionData); + + // Check signature + if (!transaction.isSignatureValid()) { + LOGGER.trace(String.format("Ignoring unconfirmed transaction %s with invalid signature from peer %s", Base58.encode(transactionData.getSignature()), peer)); + break; + } + + // Do we have it already? + if (repository.getTransactionRepository().exists(transactionData.getSignature())) { + LOGGER.trace(String.format("Ignoring existing unconfirmed transaction %s from peer %s", Base58.encode(transactionData.getSignature()), peer)); + break; + } + + // Is it valid? + ValidationResult validationResult = transaction.isValidUnconfirmed(); + if (validationResult != ValidationResult.OK) { + LOGGER.trace(String.format("Ignoring invalid (%s) unconfirmed transaction %s from peer %s", validationResult.name(), Base58.encode(transactionData.getSignature()), peer)); + break; + } + + // Clean repository state before import + repository.discardChanges(); + + // Seems ok - add to unconfirmed pile + transaction.importAsUnconfirmed(); + } finally { + blockchainLock.unlock(); + } + + // We could collate signatures that are new to us and broadcast them to our peers too + newSignatures.add(signature); + } + } catch (DataException e) { + LOGGER.error(String.format("Repository issue while processing unconfirmed transactions from peer %s", peer), e); + } + + if (newSignatures.isEmpty()) + break; + + // Broadcast signatures that are new to us + Network.getInstance().broadcast(broadcastPeer -> new TransactionSignaturesMessage(newSignatures)); + + break; + } + + case GET_BLOCK_SUMMARIES: { GetBlockSummariesMessage getBlockSummariesMessage = (GetBlockSummariesMessage) message; byte[] parentSignature = getBlockSummariesMessage.getParentSignature(); @@ -635,6 +787,7 @@ public class Controller extends Thread { LOGGER.error(String.format("Repository issue while sending block summaries after %s to peer %s", Base58.encode(parentSignature), peer), e); } break; + } default: LOGGER.debug(String.format("Unhandled %s message [ID %d] from peer %s", message.getType().name(), message.getId(), peer)); diff --git a/src/main/java/org/qora/network/Network.java b/src/main/java/org/qora/network/Network.java index d68115c4..e73c5d6f 100644 --- a/src/main/java/org/qora/network/Network.java +++ b/src/main/java/org/qora/network/Network.java @@ -29,6 +29,7 @@ import org.qora.data.block.BlockData; import org.qora.data.network.PeerData; import org.qora.data.transaction.TransactionData; import org.qora.network.message.GetPeersMessage; +import org.qora.network.message.GetUnconfirmedTransactionsMessage; import org.qora.network.message.HeightMessage; import org.qora.network.message.HeightV2Message; import org.qora.network.message.Message; @@ -38,6 +39,7 @@ import org.qora.network.message.PeersMessage; import org.qora.network.message.PeersV2Message; import org.qora.network.message.PingMessage; import org.qora.network.message.TransactionMessage; +import org.qora.network.message.TransactionSignaturesMessage; import org.qora.network.message.VerificationCodesMessage; import org.qora.repository.DataException; import org.qora.repository.Repository; @@ -529,25 +531,13 @@ public class Network extends Thread { if (!peer.sendMessage(peersMessage)) peer.disconnect("failed to send peers list"); - // Send our unconfirmed transactions - try (final Repository repository = RepositoryManager.getRepository()) { - List transactions = repository.getTransactionRepository().getUnconfirmedTransactions(); - - for (TransactionData transactionData : transactions) { - Message transactionMessage = new TransactionMessage(transactionData); - if (!peer.sendMessage(transactionMessage)) { - peer.disconnect("failed to send unconfirmed transaction"); - return; - } - } - } catch (DataException e) { - LOGGER.error("Repository issue while sending unconfirmed transactions", e); - } - // Request their peers list Message getPeersMessage = new GetPeersMessage(); if (!peer.sendMessage(getPeersMessage)) peer.disconnect("failed to request peers list"); + + // Ask Controller if they want to send anything + Controller.getInstance().onPeerHandshakeCompleted(peer); } /** Returns PEERS message made from peers we've connected to recently, and this node's details */ @@ -621,6 +611,24 @@ public class Network extends Thread { return new HeightV2Message(blockData.getHeight(), blockData.getSignature(), blockData.getTimestamp(), blockData.getGeneratorPublicKey()); } + public Message buildNewTransactionMessage(Peer peer, TransactionData transactionData) { + if (peer.getVersion() < 2) { + // Legacy TRANSACTION message + return new TransactionMessage(transactionData); + } + + // In V2 we send out transaction signature only and peers can decide whether to request the full transaction + return new TransactionSignaturesMessage(Collections.singletonList(transactionData.getSignature())); + } + + public Message buildGetUnconfirmedTransactionsMessage(Peer peer) { + // V2 only + if (peer.getVersion() < 2) + return null; + + return new GetUnconfirmedTransactionsMessage(); + } + // Network-wide calls /** Returns list of connected peers that have completed handshaking. */ @@ -731,26 +739,32 @@ public class Network extends Thread { mergePeersExecutor.execute(new PeersMerger(peerAddresses)); } - public void broadcast(Function peerMessage) { + public void broadcast(Function peerMessageBuilder) { class Broadcaster implements Runnable { private List targetPeers; - private Function peerMessage; + private Function peerMessageBuilder; - public Broadcaster(List targetPeers, Function peerMessage) { + public Broadcaster(List targetPeers, Function peerMessageBuilder) { this.targetPeers = targetPeers; - this.peerMessage = peerMessage; + this.peerMessageBuilder = peerMessageBuilder; } @Override public void run() { - for (Peer peer : targetPeers) - if (!peer.sendMessage(peerMessage.apply(peer))) + for (Peer peer : targetPeers) { + Message message = peerMessageBuilder.apply(peer); + + if (message == null) + continue; + + if (!peer.sendMessage(message)) peer.disconnect("failed to broadcast message"); + } } } try { - peerExecutor.execute(new Broadcaster(this.getHandshakedPeers(), peerMessage)); + peerExecutor.execute(new Broadcaster(this.getHandshakedPeers(), peerMessageBuilder)); } catch (RejectedExecutionException e) { // Can't execute - probably because we're shutting down, so ignore } diff --git a/src/main/java/org/qora/network/message/GetUnconfirmedTransactionsMessage.java b/src/main/java/org/qora/network/message/GetUnconfirmedTransactionsMessage.java new file mode 100644 index 00000000..8c6e2b27 --- /dev/null +++ b/src/main/java/org/qora/network/message/GetUnconfirmedTransactionsMessage.java @@ -0,0 +1,25 @@ +package org.qora.network.message; + +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; + +public class GetUnconfirmedTransactionsMessage extends Message { + + public GetUnconfirmedTransactionsMessage() { + this(-1); + } + + private GetUnconfirmedTransactionsMessage(int id) { + super(id, MessageType.GET_UNCONFIRMED_TRANSACTIONS); + } + + public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException { + return new GetUnconfirmedTransactionsMessage(id); + } + + @Override + protected byte[] toData() { + return new byte[0]; + } + +} diff --git a/src/main/java/org/qora/network/message/Message.java b/src/main/java/org/qora/network/message/Message.java index d806b2e6..467bea28 100644 --- a/src/main/java/org/qora/network/message/Message.java +++ b/src/main/java/org/qora/network/message/Message.java @@ -65,7 +65,9 @@ public abstract class Message { PEER_VERIFY(17), VERIFICATION_CODES(18), HEIGHT_V2(19), - GET_TRANSACTION(20); + GET_TRANSACTION(20), + GET_UNCONFIRMED_TRANSACTIONS(21), + TRANSACTION_SIGNATURES(22); public final int value; public final Method fromByteBuffer; diff --git a/src/main/java/org/qora/network/message/TransactionSignaturesMessage.java b/src/main/java/org/qora/network/message/TransactionSignaturesMessage.java new file mode 100644 index 00000000..69cdc412 --- /dev/null +++ b/src/main/java/org/qora/network/message/TransactionSignaturesMessage.java @@ -0,0 +1,66 @@ +package org.qora.network.message; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import org.qora.transform.Transformer; + +import com.google.common.primitives.Ints; + +public class TransactionSignaturesMessage extends Message { + + private static final int SIGNATURE_LENGTH = Transformer.SIGNATURE_LENGTH; + + private List signatures; + + public TransactionSignaturesMessage(List signatures) { + this(-1, signatures); + } + + private TransactionSignaturesMessage(int id, List signatures) { + super(id, MessageType.TRANSACTION_SIGNATURES); + + this.signatures = signatures; + } + + public List getSignatures() { + return this.signatures; + } + + public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException { + int count = bytes.getInt(); + + if (bytes.remaining() != count * SIGNATURE_LENGTH) + return null; + + List signatures = new ArrayList<>(); + for (int i = 0; i < count; ++i) { + byte[] signature = new byte[SIGNATURE_LENGTH]; + bytes.get(signature); + signatures.add(signature); + } + + return new TransactionSignaturesMessage(id, signatures); + } + + @Override + protected byte[] toData() { + try { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + + bytes.write(Ints.toByteArray(this.signatures.size())); + + for (byte[] signature : this.signatures) + bytes.write(signature); + + return bytes.toByteArray(); + } catch (IOException e) { + return null; + } + } + +} diff --git a/src/main/java/org/qora/repository/TransactionRepository.java b/src/main/java/org/qora/repository/TransactionRepository.java index d2830c8a..960069b3 100644 --- a/src/main/java/org/qora/repository/TransactionRepository.java +++ b/src/main/java/org/qora/repository/TransactionRepository.java @@ -136,6 +136,14 @@ public interface TransactionRepository { */ public boolean isConfirmed(byte[] signature) throws DataException; + /** + * Returns list of unconfirmed transaction signatures in timestamp-else-signature order. + * + * @return list of transaction signatures, or empty if none. + * @throws DataException + */ + public List getUnconfirmedTransactionSignatures() throws DataException; + /** * Returns list of unconfirmed transactions in timestamp-else-signature order. *

diff --git a/src/main/java/org/qora/repository/hsqldb/transaction/HSQLDBTransactionRepository.java b/src/main/java/org/qora/repository/hsqldb/transaction/HSQLDBTransactionRepository.java index 9c8abb12..35d080c5 100644 --- a/src/main/java/org/qora/repository/hsqldb/transaction/HSQLDBTransactionRepository.java +++ b/src/main/java/org/qora/repository/hsqldb/transaction/HSQLDBTransactionRepository.java @@ -833,6 +833,29 @@ public class HSQLDBTransactionRepository implements TransactionRepository { } } + @Override + public List getUnconfirmedTransactionSignatures() throws DataException { + String sql = "SELECT signature FROM UnconfirmedTransactions ORDER by creation DESC, signature DESC"; + + List signatures = new ArrayList<>(); + + // Find transactions with no corresponding row in BlockTransactions + try (ResultSet resultSet = this.repository.checkedExecute(sql)) { + if (resultSet == null) + return signatures; + + do { + byte[] signature = resultSet.getBytes(1); + + signatures.add(signature); + } while (resultSet.next()); + + return signatures; + } catch (SQLException e) { + throw new DataException("Unable to fetch unconfirmed transaction signatures from repository", e); + } + } + @Override public List getUnconfirmedTransactions(Integer limit, Integer offset, Boolean reverse) throws DataException { String sql = "SELECT signature FROM UnconfirmedTransactions ";