Add lock around some Peer.peerData actions to help sync

This commit is contained in:
catbref 2019-06-04 13:34:16 +01:00
parent 6a969f9473
commit 2f3123a315
2 changed files with 53 additions and 20 deletions

View File

@ -413,42 +413,54 @@ public class Controller extends Thread {
LOGGER.trace(String.format("Processing %s message from %s", message.getType().name(), peer));
switch (message.getType()) {
case HEIGHT:
case HEIGHT: {
HeightMessage heightMessage = (HeightMessage) message;
// Update our record of peer's height
try (final Repository repository = RepositoryManager.getRepository()) {
PeerData peerData = peer.getPeerData();
PeerData peerData = peer.getPeerData();
peer.getPeerData().setLastHeight(heightMessage.getHeight());
peer.getPeerData().setLastHeight(heightMessage.getHeight());
repository.getNetworkRepository().save(peerData);
repository.saveChanges();
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while updating height of peer %s", peer), e);
}
// Only save to repository if outbound peer
if (peer.isOutbound())
try (final Repository repository = RepositoryManager.getRepository()) {
repository.getNetworkRepository().save(peerData);
repository.saveChanges();
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while updating height of peer %s", peer), e);
}
break;
}
case HEIGHT_V2:
case HEIGHT_V2: {
HeightV2Message heightV2Message = (HeightV2Message) message;
// Update our record for peer's blockchain info
try (final Repository repository = RepositoryManager.getRepository()) {
PeerData peerData = peer.getPeerData();
PeerData peerData = peer.getPeerData();
// We want to update atomically so use lock
ReentrantLock peerDataLock = peer.getPeerDataLock();
peerDataLock.lock();
try {
peerData.setLastHeight(heightV2Message.getHeight());
peerData.setLastBlockSignature(heightV2Message.getSignature());
peerData.setLastBlockTimestamp(heightV2Message.getTimestamp());
peerData.setLastBlockGenerator(heightV2Message.getGenerator());
repository.getNetworkRepository().save(peerData);
repository.saveChanges();
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while updating info of peer %s", peer), e);
} finally {
peerDataLock.unlock();
}
// Only save to repository if outbound peer
if (peer.isOutbound())
try (final Repository repository = RepositoryManager.getRepository()) {
repository.getNetworkRepository().save(peerData);
repository.saveChanges();
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while updating info of peer %s", peer), e);
}
break;
}
case GET_SIGNATURES: {
GetSignaturesMessage getSignaturesMessage = (GetSignaturesMessage) message;
@ -550,7 +562,7 @@ public class Controller extends Thread {
break;
}
case TRANSACTION:
case TRANSACTION: {
TransactionMessage transactionMessage = (TransactionMessage) message;
TransactionData transactionData = transactionMessage.getTransactionData();
@ -589,6 +601,7 @@ public class Controller extends Thread {
LOGGER.error(String.format("Repository issue while processing transaction %s from peer %s", Base58.encode(transactionData.getSignature()), peer), e);
}
break;
}
case GET_BLOCK_SUMMARIES:
GetBlockSummariesMessage getBlockSummariesMessage = (GetBlockSummariesMessage) message;
@ -663,9 +676,16 @@ public class Controller extends Thread {
// Remove peers with unknown height, lower height or same height and same block signature (unless we don't have their block signature)
peers.removeIf(hasShorterBlockchain());
// Remove peers that within 1 block of our height (actually ourHeight + 1)
final int maxHeight = getChainHeight() + 1;
peers.removeIf(peer -> peer.getPeerData().getLastHeight() <= maxHeight );
// Remove peers that have "misbehaved" recently
peers.removeIf(hasPeerMisbehaved);
for (Peer peer : peers)
LOGGER.debug(String.format("Not up to date due to peer %s at height %d with block sig %s", peer, peer.getPeerData().getLastHeight(), Base58.encode(peer.getPeerData().getLastBlockSignature())));
// If we have any peers left, then they would be candidates for synchronization therefore we're not up to date.
return peers.isEmpty();
}

View File

@ -18,6 +18,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -47,6 +48,7 @@ public class Peer implements Runnable {
private final boolean isOutbound;
private Socket socket = null;
private PeerData peerData = null;
private final ReentrantLock peerDataLock = new ReentrantLock();
private Long connectionTimestamp = null;
private OutputStream out;
private Handshake handshakeStatus = Handshake.STARTED;
@ -82,7 +84,13 @@ public class Peer implements Runnable {
// Getters / setters
public PeerData getPeerData() {
return this.peerData;
this.peerDataLock.lock();
try {
return this.peerData;
} finally {
this.peerDataLock.unlock();
}
}
public boolean isOutbound() {
@ -160,6 +168,11 @@ public class Peer implements Runnable {
this.verificationCodeExpected = expected;
}
/** Returns the lock used for synchronizing access to peer's PeerData. */
public ReentrantLock getPeerDataLock() {
return this.peerDataLock;
}
// Easier, and nicer output, than peer.getRemoteSocketAddress()
@Override