From 20200b844e2911c4251e9659a9e32677bda78315 Mon Sep 17 00:00:00 2001 From: catbref Date: Wed, 5 Jun 2019 11:57:40 +0100 Subject: [PATCH] Networking and synchronization improvements Controller now sets (volatile) requestSync flag when a peer sends new height info. This allows much quicker response to new blocks which might hopefully improve synchronization compared with the old periodic sync method. "Unsolicited" network messages are now added to a BlockingQueue, and a separate unsolicited message processing thread (one per peer) deals with this messages in turn. This allows "reply" network messages to propagate up to the threads that are waiting for them, preventing deadlocks and peer disconnections due to lost pings. Controller tries to do as much new transaction processing outside of the blockchain lock as possible, and only broadcasts new transaction's signature if we successfully import transaction to our unconfirmed pile. Synchronizer.findSignaturesFromCommonBlock now returns null to indicate some sort of connection issue (no cool-off) and an empty list to indicate NO COMMON BLOCK. That method also tries to work back to genesis block instead of giving up too early if test block height becomes negative. Network.createConnection additionally filters out candidates if their addresses resolve to the same IP+port as an existing connection. So now it won't connect to localhost:1234 if it has an existing connection with 127.0.0.1:1234. Network.broadcast only considers unique peers, i.e. prefers outbound connection if a peer has corresponding inbound connection. Added Thread.currentThread().setName() where possible. --- .../java/org/qora/controller/AutoUpdate.java | 2 + .../java/org/qora/controller/Controller.java | 109 ++++++++---------- .../org/qora/controller/Synchronizer.java | 23 ++-- src/main/java/org/qora/network/Network.java | 25 +++- src/main/java/org/qora/network/Peer.java | 98 +++++++++++++--- 5 files changed, 165 insertions(+), 92 deletions(-) diff --git a/src/main/java/org/qora/controller/AutoUpdate.java b/src/main/java/org/qora/controller/AutoUpdate.java index c398c453..ea077d7b 100644 --- a/src/main/java/org/qora/controller/AutoUpdate.java +++ b/src/main/java/org/qora/controller/AutoUpdate.java @@ -64,6 +64,8 @@ public class AutoUpdate extends Thread { } public void run() { + Thread.currentThread().setName("Auto-update"); + long buildTimestamp = Controller.getInstance().getBuildTimestamp() * 1000L; boolean attemptedUpdate = false; diff --git a/src/main/java/org/qora/controller/Controller.java b/src/main/java/org/qora/controller/Controller.java index f5a9d8a7..1b000f44 100644 --- a/src/main/java/org/qora/controller/Controller.java +++ b/src/main/java/org/qora/controller/Controller.java @@ -11,9 +11,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Properties; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Predicate; @@ -75,8 +72,9 @@ public class Controller extends Thread { private static final Object shutdownLock = new Object(); private static final String repositoryUrlTemplate = "jdbc:hsqldb:file:%s/blockchain;create=true"; - private static boolean isStopping = false; + private static volatile boolean isStopping = false; private static BlockGenerator blockGenerator = null; + private static volatile boolean requestSync = false; private static Controller instance; private final String buildVersion; private final long buildTimestamp; // seconds @@ -84,9 +82,6 @@ public class Controller extends Thread { /** Lock for only allowing one blockchain-modifying codepath at a time. e.g. synchronization or newly generated block. */ private final ReentrantLock blockchainLock = new ReentrantLock(); - /** Executor for processing network messages. */ - private final ExecutorService networkMessageExecutor = Executors.newCachedThreadPool(); - private Controller() { Properties properties = new Properties(); try (InputStream in = this.getClass().getResourceAsStream("/build.properties")) { @@ -256,17 +251,17 @@ public class Controller extends Thread { public void run() { Thread.currentThread().setName("Controller"); - try { - while (!isStopping) { - Thread.sleep(14 * 1000); + while (!isStopping) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + return; + } + if (requestSync) { + requestSync = false; potentiallySynchronize(); - - // Query random connections for unconfirmed transactions? } - } catch (InterruptedException e) { - // time to exit - return; } } @@ -360,7 +355,6 @@ public class Controller extends Thread { } LOGGER.info("Shutting down networking"); - networkMessageExecutor.shutdown(); Network.getInstance().shutdown(); LOGGER.info("Shutting down API"); @@ -457,29 +451,6 @@ public class Controller extends Thread { } public void onNetworkMessage(Peer peer, Message message) { - class NetworkMessageProcessor implements Runnable { - private Peer peer; - private Message message; - - public NetworkMessageProcessor(Peer peer, Message message) { - this.peer = peer; - this.message = message; - } - - @Override - public void run() { - Controller.getInstance().processNetworkMessage(peer, message); - } - } - - try { - networkMessageExecutor.execute(new NetworkMessageProcessor(peer, message)); - } catch (RejectedExecutionException e) { - // Can't execute - probably because we're shutting down, so ignore - } - } - - private void processNetworkMessage(Peer peer, Message message) { LOGGER.trace(String.format("Processing %s message from %s", message.getType().name(), peer)); switch (message.getType()) { @@ -499,6 +470,9 @@ public class Controller extends Thread { LOGGER.error(String.format("Repository issue while updating height of peer %s", peer), e); } + // Potentially synchronize + requestSync = true; + break; } @@ -529,6 +503,9 @@ public class Controller extends Thread { LOGGER.error(String.format("Repository issue while updating info of peer %s", peer), e); } + // Potentially synchronize + requestSync = true; + break; } @@ -556,6 +533,7 @@ public class Controller extends Thread { } catch (DataException e) { LOGGER.error(String.format("Repository issue while sending signatures after %s to peer %s", Base58.encode(parentSignature), peer), e); } + break; } @@ -583,6 +561,7 @@ public class Controller extends Thread { } catch (DataException e) { LOGGER.error(String.format("Repository issue while sending V2 signatures after %s to peer %s", Base58.encode(parentSignature), peer), e); } + break; } @@ -607,6 +586,7 @@ public class Controller extends Thread { } catch (DataException e) { LOGGER.error(String.format("Repository issue while send block %s to peer %s", Base58.encode(signature), peer), e); } + break; } @@ -629,6 +609,7 @@ public class Controller extends Thread { } catch (DataException e) { LOGGER.error(String.format("Repository issue while send transaction %s to peer %s", Base58.encode(signature), peer), e); } + break; } @@ -664,12 +645,15 @@ public class Controller extends Thread { // Seems ok - add to unconfirmed pile transaction.importAsUnconfirmed(); + + LOGGER.debug(String.format("Imported %s transaction %s from peer %s", transactionData.getType().name(), Base58.encode(transactionData.getSignature()), peer)); } finally { blockchainLock.unlock(); } } catch (DataException e) { LOGGER.error(String.format("Repository issue while processing transaction %s from peer %s", Base58.encode(transactionData.getSignature()), peer), e); } + break; } @@ -699,29 +683,29 @@ public class Controller extends Thread { break; } + // Fetch actual transaction data from peer + Message getTransactionMessage = new GetTransactionMessage(signature); + Message responseMessage = peer.getResponse(getTransactionMessage); + if (responseMessage == null || !(responseMessage instanceof TransactionMessage)) { + peer.disconnect("failed to fetch unconfirmed transaction"); + break; + } + + TransactionMessage transactionMessage = (TransactionMessage) responseMessage; + TransactionData transactionData = transactionMessage.getTransactionData(); + Transaction transaction = Transaction.fromData(repository, transactionData); + + // Check signature + if (!transaction.isSignatureValid()) { + LOGGER.trace(String.format("Ignoring unconfirmed transaction %s with invalid signature from peer %s", Base58.encode(transactionData.getSignature()), peer)); + break; + } + // Blockchain lock required to prevent multiple threads trying to save the same transaction simultaneously ReentrantLock blockchainLock = Controller.getInstance().getBlockchainLock(); if (blockchainLock.tryLock()) try { - // Fetch actual transaction data from peer - Message getTransactionMessage = new GetTransactionMessage(signature); - Message responseMessage = peer.getResponse(getTransactionMessage); - if (responseMessage == null || !(responseMessage instanceof TransactionMessage)) { - peer.disconnect("failed to fetch unconfirmed transaction"); - break; - } - - TransactionMessage transactionMessage = (TransactionMessage) responseMessage; - TransactionData transactionData = transactionMessage.getTransactionData(); - Transaction transaction = Transaction.fromData(repository, transactionData); - - // Check signature - if (!transaction.isSignatureValid()) { - LOGGER.trace(String.format("Ignoring unconfirmed transaction %s with invalid signature from peer %s", Base58.encode(transactionData.getSignature()), peer)); - break; - } - - // Do we have it already? + // Do we have it already? Rechecking in case it has appeared since previous check above if (repository.getTransactionRepository().exists(transactionData.getSignature())) { LOGGER.trace(String.format("Ignoring existing unconfirmed transaction %s from peer %s", Base58.encode(transactionData.getSignature()), peer)); break; @@ -739,12 +723,14 @@ public class Controller extends Thread { // Seems ok - add to unconfirmed pile transaction.importAsUnconfirmed(); + + LOGGER.debug(String.format("Imported %s transaction %s from peer %s", transactionData.getType().name(), Base58.encode(transactionData.getSignature()), peer)); + + // We could collate signatures that are new to us and broadcast them to our peers too + newSignatures.add(signature); } finally { blockchainLock.unlock(); } - - // We could collate signatures that are new to us and broadcast them to our peers too - newSignatures.add(signature); } } catch (DataException e) { LOGGER.error(String.format("Repository issue while processing unconfirmed transactions from peer %s", peer), e); @@ -786,6 +772,7 @@ public class Controller extends Thread { } catch (DataException e) { LOGGER.error(String.format("Repository issue while sending block summaries after %s to peer %s", Base58.encode(parentSignature), peer), e); } + break; } diff --git a/src/main/java/org/qora/controller/Synchronizer.java b/src/main/java/org/qora/controller/Synchronizer.java index 732f4382..29d844c5 100644 --- a/src/main/java/org/qora/controller/Synchronizer.java +++ b/src/main/java/org/qora/controller/Synchronizer.java @@ -2,6 +2,7 @@ package org.qora.controller; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.concurrent.locks.ReentrantLock; @@ -10,7 +11,6 @@ import org.apache.logging.log4j.Logger; import org.qora.block.Block; import org.qora.block.Block.ValidationResult; import org.qora.block.BlockChain; -import org.qora.block.GenesisBlock; import org.qora.data.block.BlockData; import org.qora.data.network.BlockSummaryData; import org.qora.data.transaction.TransactionData; @@ -107,6 +107,10 @@ public class Synchronizer { List signatures = findSignaturesFromCommonBlock(peer, ourHeight); if (signatures == null) { + LOGGER.info(String.format("Error while trying to find common block with peer %s", peer)); + return SynchronizationResult.NO_REPLY; + } + if (signatures.isEmpty()) { LOGGER.info(String.format("Failure to find common block with peer %s", peer)); return SynchronizationResult.NO_COMMON_BLOCK; } @@ -290,7 +294,7 @@ public class Synchronizer { * Returns list of peer's block signatures starting with common block with peer. * * @param peer - * @return block signatures + * @return block signatures, or empty list if no common block, or null if there was an issue * @throws DataException */ private List findSignaturesFromCommonBlock(Peer peer, int ourHeight) throws DataException { @@ -299,10 +303,10 @@ public class Synchronizer { int step = INITIAL_BLOCK_STEP; List blockSignatures = null; - int testHeight = ourHeight - step; + int testHeight = Math.max(ourHeight - step, 1); byte[] testSignature = null; - while (testHeight > 1) { + while (testHeight >= 1) { // Fetch our block signature at this height BlockData testBlockData = this.repository.getBlockRepository().fromHeight(testHeight); if (testBlockData == null) { @@ -328,6 +332,11 @@ public class Synchronizer { // We have entries so we have found a common block break; + // No blocks after genesis block? + // We don't get called for a peer at genesis height so this means NO blocks in common + if (testHeight == 1) + return Collections.emptyList(); + if (peer.getVersion() >= 2) { step <<= 1; } else { @@ -336,13 +345,9 @@ public class Synchronizer { } step = Math.min(step, MAXIMUM_BLOCK_STEP); - testHeight -= step; + testHeight = Math.max(testHeight - step, 1); } - if (testHeight <= 1) - // Can't go back any further - return Genesis block - return new ArrayList(Arrays.asList(GenesisBlock.getInstance(this.repository).getBlockData().getSignature())); - // Prepend common block's signature as first block sig blockSignatures.add(0, testSignature); diff --git a/src/main/java/org/qora/network/Network.java b/src/main/java/org/qora/network/Network.java index e73c5d6f..76fb5c21 100644 --- a/src/main/java/org/qora/network/Network.java +++ b/src/main/java/org/qora/network/Network.java @@ -300,7 +300,7 @@ public class Network extends Thread { peers.removeIf(isSelfPeer); } - // Don't consider already connected peers + // Don't consider already connected peers (simple address match) Predicate isConnectedPeer = peerData -> { PeerAddress peerAddress = peerData.getAddress(); return this.connectedPeers.stream().anyMatch(peer -> peer.getPeerData().getAddress().equals(peerAddress)); @@ -310,6 +310,21 @@ public class Network extends Thread { peers.removeIf(isConnectedPeer); } + // Don't consider already connected peers (resolved address match) + Predicate isResolvedAsConnectedPeer = peerData -> { + try { + InetSocketAddress resolvedSocketAddress = peerData.getAddress().toSocketAddress(); + return this.connectedPeers.stream().anyMatch(peer -> peer.getResolvedAddress().equals(resolvedSocketAddress)); + } catch (UnknownHostException e) { + // Can't resolve - no point even trying to connect + return true; + } + }; + + synchronized (this.connectedPeers) { + peers.removeIf(isResolvedAsConnectedPeer); + } + // Any left? if (peers.isEmpty()) return; @@ -642,7 +657,7 @@ public class Network extends Thread { return peers; } - /** Returns list of connected peers that have completed handshaking, with unbound duplicates removed. */ + /** Returns list of connected peers that have completed handshaking, with inbound duplicates removed. */ public List getUniqueHandshakedPeers() { final List peers; @@ -702,6 +717,8 @@ public class Network extends Thread { @Override public void run() { + Thread.currentThread().setName("Merging peers"); + // Serialize using lock to prevent repository deadlocks mergePeersLock.lock(); @@ -751,6 +768,8 @@ public class Network extends Thread { @Override public void run() { + Thread.currentThread().setName("Network Broadcast"); + for (Peer peer : targetPeers) { Message message = peerMessageBuilder.apply(peer); @@ -764,7 +783,7 @@ public class Network extends Thread { } try { - peerExecutor.execute(new Broadcaster(this.getHandshakedPeers(), peerMessageBuilder)); + peerExecutor.execute(new Broadcaster(this.getUniqueHandshakedPeers(), peerMessageBuilder)); } catch (RejectedExecutionException e) { // Can't execute - probably because we're shutting down, so ignore } diff --git a/src/main/java/org/qora/network/Peer.java b/src/main/java/org/qora/network/Peer.java index 7d0eee4b..763f3e77 100644 --- a/src/main/java/org/qora/network/Peer.java +++ b/src/main/java/org/qora/network/Peer.java @@ -15,6 +15,7 @@ import java.util.Map; import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -44,6 +45,7 @@ public class Peer implements Runnable { private static final int RESPONSE_TIMEOUT = 5000; // ms private static final int PING_INTERVAL = 20000; // ms - just under every 30s is usually ideal to keep NAT mappings refreshed private static final int INACTIVITY_TIMEOUT = 30000; // ms + private static final int UNSOLICITED_MESSAGE_QUEUE_CAPACITY = 10; private final boolean isOutbound; private Socket socket = null; @@ -52,11 +54,14 @@ public class Peer implements Runnable { private Long connectionTimestamp = null; private OutputStream out; private Handshake handshakeStatus = Handshake.STARTED; - private Map> messages; + private Map> replyQueues; + private BlockingQueue unsolicitedQueue; + private ExecutorService messageExecutor; private VersionMessage versionMessage = null; private Integer version; - private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + private ScheduledExecutorService pingExecutor; private Long lastPing = null; + private InetSocketAddress resolvedAddress = null; private boolean isLocal; private byte[] peerId; @@ -75,7 +80,8 @@ public class Peer implements Runnable { this.isOutbound = false; this.socket = socket; - this.isLocal = isAddressLocal(((InetSocketAddress) socket.getRemoteSocketAddress()).getAddress()); + this.resolvedAddress = ((InetSocketAddress) socket.getRemoteSocketAddress()); + this.isLocal = isAddressLocal(this.resolvedAddress.getAddress()); PeerAddress peerAddress = PeerAddress.fromSocket(socket); this.peerData = new PeerData(peerAddress); @@ -135,6 +141,10 @@ public class Peer implements Runnable { this.lastPing = lastPing; } + public InetSocketAddress getResolvedAddress() { + return this.resolvedAddress; + } + public boolean getIsLocal() { return this.isLocal; } @@ -190,11 +200,41 @@ public class Peer implements Runnable { new SecureRandom().nextBytes(verificationCodeExpected); } + class MessageProcessor implements Runnable { + private Peer peer; + private BlockingQueue blockingQueue; + + public MessageProcessor(Peer peer, BlockingQueue blockingQueue) { + this.peer = peer; + this.blockingQueue = blockingQueue; + } + + @Override + public void run() { + Thread.currentThread().setName("Peer UMP " + this.peer); + + while (true) { + try { + Message message = blockingQueue.poll(1000L, TimeUnit.MILLISECONDS); + if (message != null) + Network.getInstance().onMessage(peer, message); + } catch (InterruptedException e) { + // Shutdown + return; + } + } + } + } + private void setup() throws IOException { this.socket.setSoTimeout(INACTIVITY_TIMEOUT); this.out = this.socket.getOutputStream(); this.connectionTimestamp = NTP.getTime(); - this.messages = Collections.synchronizedMap(new HashMap>()); + this.replyQueues = Collections.synchronizedMap(new HashMap>()); + + this.unsolicitedQueue = new ArrayBlockingQueue<>(UNSOLICITED_MESSAGE_QUEUE_CAPACITY); + this.messageExecutor = Executors.newSingleThreadExecutor(); + this.messageExecutor.execute(new MessageProcessor(this, this.unsolicitedQueue)); } public boolean connect() { @@ -202,11 +242,10 @@ public class Peer implements Runnable { this.socket = new Socket(); try { - InetSocketAddress resolvedSocketAddress = this.peerData.getAddress().toSocketAddress(); + this.resolvedAddress = this.peerData.getAddress().toSocketAddress(); + this.isLocal = isAddressLocal(this.resolvedAddress.getAddress()); - this.isLocal = isAddressLocal(resolvedSocketAddress.getAddress()); - - this.socket.connect(resolvedSocketAddress, CONNECT_TIMEOUT); + this.socket.connect(resolvedAddress, CONNECT_TIMEOUT); LOGGER.debug(String.format("Connected to peer %s", this)); } catch (SocketTimeoutException e) { LOGGER.trace(String.format("Connection timed out to peer %s", this)); @@ -242,13 +281,20 @@ public class Peer implements Runnable { LOGGER.trace(String.format("Received %s message with ID %d from peer %s", message.getType().name(), message.getId(), this)); // Find potential blocking queue for this id (expect null if id is -1) - BlockingQueue queue = this.messages.get(message.getId()); + BlockingQueue queue = this.replyQueues.get(message.getId()); if (queue != null) { // Adding message to queue will unblock thread waiting for response - this.messages.get(message.getId()).add(message); + this.replyQueues.get(message.getId()).add(message); } else { - // Nothing waiting for this message - pass up to network - Network.getInstance().onMessage(this, message); + // Nothing waiting for this message (unsolicited) - queue up for network + + // Queue full? + if (unsolicitedQueue.remainingCapacity() == 0) { + LOGGER.debug(String.format("No room for %s message with ID %s from peer %s", message.getType().name(), message.getId(), this)); + continue; + } + + unsolicitedQueue.add(message); } } } catch (MessageException e) { @@ -313,12 +359,12 @@ public class Peer implements Runnable { message.setId(id); // Put queue into map (keyed by message ID) so we can poll for a response - // If putIfAbsent() doesn't return null, then this id is already taken - } while (this.messages.putIfAbsent(id, blockingQueue) != null); + // If putIfAbsent() doesn't return null, then this ID is already taken + } while (this.replyQueues.putIfAbsent(id, blockingQueue) != null); // Try to send message if (!this.sendMessage(message)) { - this.messages.remove(id); + this.replyQueues.remove(id); return null; } @@ -329,7 +375,7 @@ public class Peer implements Runnable { // Our thread was interrupted. Probably in shutdown scenario. return null; } finally { - this.messages.remove(id); + this.replyQueues.remove(id); } } @@ -343,6 +389,8 @@ public class Peer implements Runnable { @Override public void run() { + Thread.currentThread().setName("Pinger " + this.peer); + PingMessage pingMessage = new PingMessage(); long before = System.currentTimeMillis(); @@ -358,16 +406,28 @@ public class Peer implements Runnable { Random random = new Random(); long initialDelay = random.nextInt(PING_INTERVAL); - this.executor.scheduleWithFixedDelay(new Pinger(this), initialDelay, PING_INTERVAL, TimeUnit.MILLISECONDS); + this.pingExecutor = Executors.newSingleThreadScheduledExecutor(); + this.pingExecutor.scheduleWithFixedDelay(new Pinger(this), initialDelay, PING_INTERVAL, TimeUnit.MILLISECONDS); } public void disconnect(String reason) { + LOGGER.trace(String.format("Disconnecting peer %s: %s", this, reason)); + // Shut down pinger - this.executor.shutdownNow(); + if (this.pingExecutor != null) { + this.pingExecutor.shutdownNow(); + this.pingExecutor = null; + } + + // Shut down unsolicited message processor + if (this.messageExecutor != null) { + this.messageExecutor.shutdownNow(); + this.messageExecutor = null; + } // Close socket if (!this.socket.isClosed()) { - LOGGER.debug(String.format("Disconnecting peer %s: %s", this, reason)); + LOGGER.debug(String.format("Closing socket with peer %s: %s", this, reason)); try { this.socket.close();