Browse Source

Synchronize peer data lookups.

Without this we could broadcast the same data multiple times, due to more than one thread processing the same message from different peers.
qdn
CalDescent 3 years ago
parent
commit
e7cb33d8e2
  1. 62
      src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java

62
src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java

@ -36,6 +36,7 @@ public class ArbitraryDataManager extends Thread {
private static final long ARBITRARY_REQUEST_TIMEOUT = 5 * 1000L; // ms
private static ArbitraryDataManager instance;
private final Object peerDataLock = new Object();
private boolean buildInProgress = false;
@ -679,39 +680,42 @@ public class ArbitraryDataManager extends Thread {
boolean containsNewEntry = false;
try (final Repository repository = RepositoryManager.getRepository()) {
for (byte[] signature : signatures) {
// Check if a record already exists for this hash/peer combination
ArbitraryPeerData existingEntry = repository.getArbitraryRepository()
.getArbitraryPeerDataForSignatureAndPeer(signature, peer.getPeerData().getAddress().toString());
if (existingEntry == null) {
// We haven't got a record of this mapping yet, so add it
LOGGER.info("Adding arbitrary peer: {} for signature {}", peerAddress, Base58.encode(signature));
ArbitraryPeerData arbitraryPeerData = new ArbitraryPeerData(signature, peer);
repository.getArbitraryRepository().save(arbitraryPeerData);
repository.saveChanges();
// Synchronize peer data lookups to make this process thread safe. Otherwise we could broadcast
// the same data multiple times, due to more than one thread processing the same message from different peers
synchronized (this.peerDataLock) {
try (final Repository repository = RepositoryManager.getRepository()) {
for (byte[] signature : signatures) {
// Check if a record already exists for this hash/peer combination
ArbitraryPeerData existingEntry = repository.getArbitraryRepository()
.getArbitraryPeerDataForSignatureAndPeer(signature, peer.getPeerData().getAddress().toString());
if (existingEntry == null) {
// We haven't got a record of this mapping yet, so add it
LOGGER.info("Adding arbitrary peer: {} for signature {}", peerAddress, Base58.encode(signature));
ArbitraryPeerData arbitraryPeerData = new ArbitraryPeerData(signature, peer);
repository.getArbitraryRepository().save(arbitraryPeerData);
repository.saveChanges();
// Remember that this data is new, so that it can be re-broadcast later
containsNewEntry = true;
}
}
// Remember that this data is new, so that it can be re-broadcast later
containsNewEntry = true;
// If at least one signature in this batch was new to us, we should re-broadcast the message to the
// network in case some peers haven't received it yet
if (containsNewEntry) {
LOGGER.info("Rebroadcasting arbitrary signature list for peer {}", peerAddress);
Network.getInstance().broadcast(broadcastPeer -> arbitrarySignaturesMessage);
} else {
// Don't re-broadcast as otherwise we could get into a loop
}
}
// If at least one signature in this batch was new to us, we should re-broadcast the message to the
// network in case some peers haven't received it yet
if (containsNewEntry) {
LOGGER.info("Rebroadcasting arbitrary signature list for peer {}", peerAddress);
Network.getInstance().broadcast(broadcastPeer -> arbitrarySignaturesMessage);
}
else {
// Don't re-broadcast as otherwise we could get into a loop
// If anything needed saving, it would already have called saveChanges() above
repository.discardChanges();
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while processing arbitrary transaction signature list from peer %s", peer), e);
}
// If anything needed saving, it would already have called saveChanges() above
repository.discardChanges();
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while processing arbitrary transaction signature list from peer %s", peer), e);
}
}

Loading…
Cancel
Save