From 114b1aac76ac3322a28290e1b881634b11e443d0 Mon Sep 17 00:00:00 2001 From: CalDescent Date: Thu, 3 Feb 2022 19:51:02 +0000 Subject: [PATCH] Added arbitrary data file manager thread, which will ensure that all file list responses are tried until we receive the files. Previously we would only try the first response and then discard the others due to being duplicates. They are now added to a queue and retried by the dedicated thread (up to the 60 second timeout). --- .../org/qortal/controller/Controller.java | 2 + .../ArbitraryDataFileListManager.java | 8 ++ .../arbitrary/ArbitraryDataFileManager.java | 84 ++++++++++++++++++- 3 files changed, 91 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/qortal/controller/Controller.java b/src/main/java/org/qortal/controller/Controller.java index 4d3ad391..c4429346 100644 --- a/src/main/java/org/qortal/controller/Controller.java +++ b/src/main/java/org/qortal/controller/Controller.java @@ -452,6 +452,7 @@ public class Controller extends Thread { // Arbitrary data controllers LOGGER.info("Starting arbitrary-transaction controllers"); ArbitraryDataManager.getInstance().start(); + ArbitraryDataFileManager.getInstance().start(); ArbitraryDataBuildManager.getInstance().start(); ArbitraryDataCleanupManager.getInstance().start(); ArbitraryDataStorageManager.getInstance().start(); @@ -840,6 +841,7 @@ public class Controller extends Thread { // Arbitrary data controllers LOGGER.info("Shutting down arbitrary-transaction controllers"); ArbitraryDataManager.getInstance().shutdown(); + ArbitraryDataFileManager.getInstance().shutdown(); ArbitraryDataBuildManager.getInstance().shutdown(); ArbitraryDataCleanupManager.getInstance().shutdown(); ArbitraryDataStorageManager.getInstance().shutdown(); diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileListManager.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileListManager.java index 008217b0..6e20499d 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileListManager.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileListManager.java @@ -431,6 +431,14 @@ public class ArbitraryDataFileListManager { // } if (!isRelayRequest || !Settings.getInstance().isRelayModeEnabled()) { + // Keep track of the hashes this peer reports to have access to + Long now = NTP.getTime(); + for (byte[] hash : hashes) { + String hash58 = Base58.encode(hash); + String sig58 = Base58.encode(signature); + ArbitraryDataFileManager.getInstance().arbitraryDataFileHashResponses.put(hash58, new Triple<>(peer, sig58, now)); + } + // Go and fetch the actual data, since this isn't a relay request arbitraryDataFileManager.fetchArbitraryDataFiles(repository, peer, signature, arbitraryTransactionData, hashes); } diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java index 339d9123..5c20585d 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java @@ -7,7 +7,6 @@ import org.qortal.controller.Controller; import org.qortal.data.network.ArbitraryPeerData; import org.qortal.data.network.PeerData; import org.qortal.data.transaction.ArbitraryTransactionData; -import org.qortal.data.transaction.TransactionData; import org.qortal.network.Network; import org.qortal.network.Peer; import org.qortal.network.message.*; @@ -24,11 +23,12 @@ import java.security.SecureRandom; import java.util.*; import java.util.stream.Collectors; -public class ArbitraryDataFileManager { +public class ArbitraryDataFileManager extends Thread { private static final Logger LOGGER = LogManager.getLogger(ArbitraryDataFileManager.class); private static ArbitraryDataFileManager instance; + private volatile boolean isStopping = false; /** @@ -42,6 +42,13 @@ public class ArbitraryDataFileManager { */ public Map> arbitraryRelayMap = Collections.synchronizedMap(new HashMap<>()); + /** + * Map to keep track of any arbitrary data file hash responses + * Key: string - the hash encoded in base58 + * Value: Triple + */ + public Map> arbitraryDataFileHashResponses = Collections.synchronizedMap(new HashMap<>()); + private ArbitraryDataFileManager() { } @@ -53,6 +60,65 @@ public class ArbitraryDataFileManager { return instance; } + @Override + public void run() { + Thread.currentThread().setName("Arbitrary Data File Manager"); + + try { + while (!isStopping) { + Thread.sleep(1000); + + Long now = NTP.getTime(); + this.processFileHashes(now); + } + } catch (InterruptedException e) { + // Fall-through to exit thread... + } + } + + public void shutdown() { + isStopping = true; + this.interrupt(); + } + + private void processFileHashes(Long now) { + try (final Repository repository = RepositoryManager.getRepository()) { + + for (String hash58 : arbitraryDataFileHashResponses.keySet()) { + if (isStopping) { + return; + } + + Triple value = arbitraryDataFileHashResponses.get(hash58); + if (value != null) { + Peer peer = value.getA(); + String signature58 = value.getB(); + Long timestamp = value.getC(); + + if (now - timestamp >= ArbitraryDataManager.ARBITRARY_RELAY_TIMEOUT || signature58 == null || peer == null) { + // Ignore - to be deleted + continue; + } + + byte[] hash = Base58.decode(hash58); + byte[] signature = Base58.decode(signature58); + + // Fetch the transaction data + ArbitraryTransactionData arbitraryTransactionData = ArbitraryTransactionUtils.fetchTransactionData(repository, signature); + if (arbitraryTransactionData == null) { + continue; + } + + LOGGER.debug("Fetching file {} from peer {} via response queue...", hash58, peer); + this.fetchArbitraryDataFiles(repository, peer, signature, arbitraryTransactionData, Arrays.asList(hash)); + } + } + + } catch (DataException e) { + LOGGER.info("Unable to process file hashes: {}", e.getMessage()); + } + } + public void cleanupRequestCache(Long now) { if (now == null) { @@ -63,6 +129,7 @@ public class ArbitraryDataFileManager { final long relayMinimumTimestamp = now - ArbitraryDataManager.getInstance().ARBITRARY_RELAY_TIMEOUT; arbitraryRelayMap.entrySet().removeIf(entry -> entry.getValue().getC() == null || entry.getValue().getC() < relayMinimumTimestamp); + arbitraryDataFileHashResponses.entrySet().removeIf(entry -> entry.getValue().getC() == null || entry.getValue().getC() < relayMinimumTimestamp); } @@ -83,10 +150,14 @@ public class ArbitraryDataFileManager { // Now fetch actual data from this peer for (byte[] hash : hashes) { + if (isStopping) { + return false; + } + String hash58 = Base58.encode(hash); if (!arbitraryDataFile.chunkExists(hash)) { // Only request the file if we aren't already requesting it from someone else if (!arbitraryDataFileRequests.containsKey(Base58.encode(hash))) { - LOGGER.debug("Requesting data file {} from peer {}", Base58.encode(hash), peer); + LOGGER.debug("Requesting data file {} from peer {}", hash58, peer); Long startTime = NTP.getTime(); ArbitraryDataFileMessage receivedArbitraryDataFileMessage = fetchArbitraryDataFile(peer, null, signature, hash, null); Long endTime = NTP.getTime(); @@ -97,11 +168,18 @@ public class ArbitraryDataFileManager { else { LOGGER.debug("Peer {} didn't respond with data file {} for signature {}. Time taken: {} ms", peer, Base58.encode(hash), Base58.encode(signature), (endTime-startTime)); } + + // Remove this hash from arbitraryDataFileHashResponses now that we have tried to request it + arbitraryDataFileHashResponses.remove(hash58); } else { LOGGER.trace("Already requesting data file {} for signature {}", arbitraryDataFile, Base58.encode(signature)); } } + else { + // Remove this hash from arbitraryDataFileHashResponses because we have a local copy + arbitraryDataFileHashResponses.remove(hash58); + } } if (receivedAtLeastOneFile) {