From e7cb33d8e28e819eb815b8562caeeb5d2a59d155 Mon Sep 17 00:00:00 2001 From: CalDescent Date: Fri, 29 Oct 2021 17:46:58 +0100 Subject: [PATCH] Synchronize peer data lookups. Without this we could broadcast the same data multiple times, due to more than one thread processing the same message from different peers. --- .../arbitrary/ArbitraryDataManager.java | 62 ++++++++++--------- 1 file changed, 33 insertions(+), 29 deletions(-) diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java index 9541a17e..d322a970 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java @@ -36,6 +36,7 @@ public class ArbitraryDataManager extends Thread { private static final long ARBITRARY_REQUEST_TIMEOUT = 5 * 1000L; // ms private static ArbitraryDataManager instance; + private final Object peerDataLock = new Object(); private boolean buildInProgress = false; @@ -679,39 +680,42 @@ public class ArbitraryDataManager extends Thread { boolean containsNewEntry = false; - try (final Repository repository = RepositoryManager.getRepository()) { - for (byte[] signature : signatures) { - - // Check if a record already exists for this hash/peer combination - ArbitraryPeerData existingEntry = repository.getArbitraryRepository() - .getArbitraryPeerDataForSignatureAndPeer(signature, peer.getPeerData().getAddress().toString()); - - if (existingEntry == null) { - // We haven't got a record of this mapping yet, so add it - LOGGER.info("Adding arbitrary peer: {} for signature {}", peerAddress, Base58.encode(signature)); - ArbitraryPeerData arbitraryPeerData = new ArbitraryPeerData(signature, peer); - repository.getArbitraryRepository().save(arbitraryPeerData); - repository.saveChanges(); + // Synchronize peer data lookups to make this process thread safe. Otherwise we could broadcast + // the same data multiple times, due to more than one thread processing the same message from different peers + synchronized (this.peerDataLock) { + try (final Repository repository = RepositoryManager.getRepository()) { + for (byte[] signature : signatures) { + + // Check if a record already exists for this hash/peer combination + ArbitraryPeerData existingEntry = repository.getArbitraryRepository() + .getArbitraryPeerDataForSignatureAndPeer(signature, peer.getPeerData().getAddress().toString()); + + if (existingEntry == null) { + // We haven't got a record of this mapping yet, so add it + LOGGER.info("Adding arbitrary peer: {} for signature {}", peerAddress, Base58.encode(signature)); + ArbitraryPeerData arbitraryPeerData = new ArbitraryPeerData(signature, peer); + repository.getArbitraryRepository().save(arbitraryPeerData); + repository.saveChanges(); + + // Remember that this data is new, so that it can be re-broadcast later + containsNewEntry = true; + } + } - // Remember that this data is new, so that it can be re-broadcast later - containsNewEntry = true; + // If at least one signature in this batch was new to us, we should re-broadcast the message to the + // network in case some peers haven't received it yet + if (containsNewEntry) { + LOGGER.info("Rebroadcasting arbitrary signature list for peer {}", peerAddress); + Network.getInstance().broadcast(broadcastPeer -> arbitrarySignaturesMessage); + } else { + // Don't re-broadcast as otherwise we could get into a loop } - } - // If at least one signature in this batch was new to us, we should re-broadcast the message to the - // network in case some peers haven't received it yet - if (containsNewEntry) { - LOGGER.info("Rebroadcasting arbitrary signature list for peer {}", peerAddress); - Network.getInstance().broadcast(broadcastPeer -> arbitrarySignaturesMessage); - } - else { - // Don't re-broadcast as otherwise we could get into a loop + // If anything needed saving, it would already have called saveChanges() above + repository.discardChanges(); + } catch (DataException e) { + LOGGER.error(String.format("Repository issue while processing arbitrary transaction signature list from peer %s", peer), e); } - - // If anything needed saving, it would already have called saveChanges() above - repository.discardChanges(); - } catch (DataException e) { - LOGGER.error(String.format("Repository issue while processing arbitrary transaction signature list from peer %s", peer), e); } }