From 2f3123a315d11c32a4fbb1ec243a82f6359202a8 Mon Sep 17 00:00:00 2001 From: catbref Date: Tue, 4 Jun 2019 13:34:16 +0100 Subject: [PATCH] Add lock around some Peer.peerData actions to help sync --- .../java/org/qora/controller/Controller.java | 58 +++++++++++++------ src/main/java/org/qora/network/Peer.java | 15 ++++- 2 files changed, 53 insertions(+), 20 deletions(-) diff --git a/src/main/java/org/qora/controller/Controller.java b/src/main/java/org/qora/controller/Controller.java index 973e3601..061a6077 100644 --- a/src/main/java/org/qora/controller/Controller.java +++ b/src/main/java/org/qora/controller/Controller.java @@ -413,42 +413,54 @@ public class Controller extends Thread { LOGGER.trace(String.format("Processing %s message from %s", message.getType().name(), peer)); switch (message.getType()) { - case HEIGHT: + case HEIGHT: { HeightMessage heightMessage = (HeightMessage) message; // Update our record of peer's height - try (final Repository repository = RepositoryManager.getRepository()) { - PeerData peerData = peer.getPeerData(); + PeerData peerData = peer.getPeerData(); + peer.getPeerData().setLastHeight(heightMessage.getHeight()); - peer.getPeerData().setLastHeight(heightMessage.getHeight()); - - repository.getNetworkRepository().save(peerData); - repository.saveChanges(); - } catch (DataException e) { - LOGGER.error(String.format("Repository issue while updating height of peer %s", peer), e); - } + // Only save to repository if outbound peer + if (peer.isOutbound()) + try (final Repository repository = RepositoryManager.getRepository()) { + repository.getNetworkRepository().save(peerData); + repository.saveChanges(); + } catch (DataException e) { + LOGGER.error(String.format("Repository issue while updating height of peer %s", peer), e); + } break; + } - case HEIGHT_V2: + case HEIGHT_V2: { HeightV2Message heightV2Message = (HeightV2Message) message; // Update our record for peer's blockchain info - try (final Repository repository = RepositoryManager.getRepository()) { - PeerData peerData = peer.getPeerData(); + PeerData peerData = peer.getPeerData(); + // We want to update atomically so use lock + ReentrantLock peerDataLock = peer.getPeerDataLock(); + peerDataLock.lock(); + try { peerData.setLastHeight(heightV2Message.getHeight()); peerData.setLastBlockSignature(heightV2Message.getSignature()); peerData.setLastBlockTimestamp(heightV2Message.getTimestamp()); peerData.setLastBlockGenerator(heightV2Message.getGenerator()); - - repository.getNetworkRepository().save(peerData); - repository.saveChanges(); - } catch (DataException e) { - LOGGER.error(String.format("Repository issue while updating info of peer %s", peer), e); + } finally { + peerDataLock.unlock(); } + // Only save to repository if outbound peer + if (peer.isOutbound()) + try (final Repository repository = RepositoryManager.getRepository()) { + repository.getNetworkRepository().save(peerData); + repository.saveChanges(); + } catch (DataException e) { + LOGGER.error(String.format("Repository issue while updating info of peer %s", peer), e); + } + break; + } case GET_SIGNATURES: { GetSignaturesMessage getSignaturesMessage = (GetSignaturesMessage) message; @@ -550,7 +562,7 @@ public class Controller extends Thread { break; } - case TRANSACTION: + case TRANSACTION: { TransactionMessage transactionMessage = (TransactionMessage) message; TransactionData transactionData = transactionMessage.getTransactionData(); @@ -589,6 +601,7 @@ public class Controller extends Thread { LOGGER.error(String.format("Repository issue while processing transaction %s from peer %s", Base58.encode(transactionData.getSignature()), peer), e); } break; + } case GET_BLOCK_SUMMARIES: GetBlockSummariesMessage getBlockSummariesMessage = (GetBlockSummariesMessage) message; @@ -663,9 +676,16 @@ public class Controller extends Thread { // Remove peers with unknown height, lower height or same height and same block signature (unless we don't have their block signature) peers.removeIf(hasShorterBlockchain()); + // Remove peers that within 1 block of our height (actually ourHeight + 1) + final int maxHeight = getChainHeight() + 1; + peers.removeIf(peer -> peer.getPeerData().getLastHeight() <= maxHeight ); + // Remove peers that have "misbehaved" recently peers.removeIf(hasPeerMisbehaved); + for (Peer peer : peers) + LOGGER.debug(String.format("Not up to date due to peer %s at height %d with block sig %s", peer, peer.getPeerData().getLastHeight(), Base58.encode(peer.getPeerData().getLastBlockSignature()))); + // If we have any peers left, then they would be candidates for synchronization therefore we're not up to date. return peers.isEmpty(); } diff --git a/src/main/java/org/qora/network/Peer.java b/src/main/java/org/qora/network/Peer.java index 81554ac6..7d0eee4b 100644 --- a/src/main/java/org/qora/network/Peer.java +++ b/src/main/java/org/qora/network/Peer.java @@ -18,6 +18,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -47,6 +48,7 @@ public class Peer implements Runnable { private final boolean isOutbound; private Socket socket = null; private PeerData peerData = null; + private final ReentrantLock peerDataLock = new ReentrantLock(); private Long connectionTimestamp = null; private OutputStream out; private Handshake handshakeStatus = Handshake.STARTED; @@ -82,7 +84,13 @@ public class Peer implements Runnable { // Getters / setters public PeerData getPeerData() { - return this.peerData; + this.peerDataLock.lock(); + + try { + return this.peerData; + } finally { + this.peerDataLock.unlock(); + } } public boolean isOutbound() { @@ -160,6 +168,11 @@ public class Peer implements Runnable { this.verificationCodeExpected = expected; } + /** Returns the lock used for synchronizing access to peer's PeerData. */ + public ReentrantLock getPeerDataLock() { + return this.peerDataLock; + } + // Easier, and nicer output, than peer.getRemoteSocketAddress() @Override