diff --git a/src/main/java/org/qortal/controller/Controller.java b/src/main/java/org/qortal/controller/Controller.java index 4d3ad391..c4429346 100644 --- a/src/main/java/org/qortal/controller/Controller.java +++ b/src/main/java/org/qortal/controller/Controller.java @@ -452,6 +452,7 @@ public class Controller extends Thread { // Arbitrary data controllers LOGGER.info("Starting arbitrary-transaction controllers"); ArbitraryDataManager.getInstance().start(); + ArbitraryDataFileManager.getInstance().start(); ArbitraryDataBuildManager.getInstance().start(); ArbitraryDataCleanupManager.getInstance().start(); ArbitraryDataStorageManager.getInstance().start(); @@ -840,6 +841,7 @@ public class Controller extends Thread { // Arbitrary data controllers LOGGER.info("Shutting down arbitrary-transaction controllers"); ArbitraryDataManager.getInstance().shutdown(); + ArbitraryDataFileManager.getInstance().shutdown(); ArbitraryDataBuildManager.getInstance().shutdown(); ArbitraryDataCleanupManager.getInstance().shutdown(); ArbitraryDataStorageManager.getInstance().shutdown(); diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileListManager.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileListManager.java index 008217b0..6e20499d 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileListManager.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileListManager.java @@ -431,6 +431,14 @@ public class ArbitraryDataFileListManager { // } if (!isRelayRequest || !Settings.getInstance().isRelayModeEnabled()) { + // Keep track of the hashes this peer reports to have access to + Long now = NTP.getTime(); + for (byte[] hash : hashes) { + String hash58 = Base58.encode(hash); + String sig58 = Base58.encode(signature); + ArbitraryDataFileManager.getInstance().arbitraryDataFileHashResponses.put(hash58, new Triple<>(peer, sig58, now)); + } + // Go and fetch the actual data, since this isn't a relay request arbitraryDataFileManager.fetchArbitraryDataFiles(repository, peer, signature, arbitraryTransactionData, hashes); } diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java index 339d9123..5c20585d 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java @@ -7,7 +7,6 @@ import org.qortal.controller.Controller; import org.qortal.data.network.ArbitraryPeerData; import org.qortal.data.network.PeerData; import org.qortal.data.transaction.ArbitraryTransactionData; -import org.qortal.data.transaction.TransactionData; import org.qortal.network.Network; import org.qortal.network.Peer; import org.qortal.network.message.*; @@ -24,11 +23,12 @@ import java.security.SecureRandom; import java.util.*; import java.util.stream.Collectors; -public class ArbitraryDataFileManager { +public class ArbitraryDataFileManager extends Thread { private static final Logger LOGGER = LogManager.getLogger(ArbitraryDataFileManager.class); private static ArbitraryDataFileManager instance; + private volatile boolean isStopping = false; /** @@ -42,6 +42,13 @@ public class ArbitraryDataFileManager { */ public Map> arbitraryRelayMap = Collections.synchronizedMap(new HashMap<>()); + /** + * Map to keep track of any arbitrary data file hash responses + * Key: string - the hash encoded in base58 + * Value: Triple + */ + public Map> arbitraryDataFileHashResponses = Collections.synchronizedMap(new HashMap<>()); + private ArbitraryDataFileManager() { } @@ -53,6 +60,65 @@ public class ArbitraryDataFileManager { return instance; } + @Override + public void run() { + Thread.currentThread().setName("Arbitrary Data File Manager"); + + try { + while (!isStopping) { + Thread.sleep(1000); + + Long now = NTP.getTime(); + this.processFileHashes(now); + } + } catch (InterruptedException e) { + // Fall-through to exit thread... + } + } + + public void shutdown() { + isStopping = true; + this.interrupt(); + } + + private void processFileHashes(Long now) { + try (final Repository repository = RepositoryManager.getRepository()) { + + for (String hash58 : arbitraryDataFileHashResponses.keySet()) { + if (isStopping) { + return; + } + + Triple value = arbitraryDataFileHashResponses.get(hash58); + if (value != null) { + Peer peer = value.getA(); + String signature58 = value.getB(); + Long timestamp = value.getC(); + + if (now - timestamp >= ArbitraryDataManager.ARBITRARY_RELAY_TIMEOUT || signature58 == null || peer == null) { + // Ignore - to be deleted + continue; + } + + byte[] hash = Base58.decode(hash58); + byte[] signature = Base58.decode(signature58); + + // Fetch the transaction data + ArbitraryTransactionData arbitraryTransactionData = ArbitraryTransactionUtils.fetchTransactionData(repository, signature); + if (arbitraryTransactionData == null) { + continue; + } + + LOGGER.debug("Fetching file {} from peer {} via response queue...", hash58, peer); + this.fetchArbitraryDataFiles(repository, peer, signature, arbitraryTransactionData, Arrays.asList(hash)); + } + } + + } catch (DataException e) { + LOGGER.info("Unable to process file hashes: {}", e.getMessage()); + } + } + public void cleanupRequestCache(Long now) { if (now == null) { @@ -63,6 +129,7 @@ public class ArbitraryDataFileManager { final long relayMinimumTimestamp = now - ArbitraryDataManager.getInstance().ARBITRARY_RELAY_TIMEOUT; arbitraryRelayMap.entrySet().removeIf(entry -> entry.getValue().getC() == null || entry.getValue().getC() < relayMinimumTimestamp); + arbitraryDataFileHashResponses.entrySet().removeIf(entry -> entry.getValue().getC() == null || entry.getValue().getC() < relayMinimumTimestamp); } @@ -83,10 +150,14 @@ public class ArbitraryDataFileManager { // Now fetch actual data from this peer for (byte[] hash : hashes) { + if (isStopping) { + return false; + } + String hash58 = Base58.encode(hash); if (!arbitraryDataFile.chunkExists(hash)) { // Only request the file if we aren't already requesting it from someone else if (!arbitraryDataFileRequests.containsKey(Base58.encode(hash))) { - LOGGER.debug("Requesting data file {} from peer {}", Base58.encode(hash), peer); + LOGGER.debug("Requesting data file {} from peer {}", hash58, peer); Long startTime = NTP.getTime(); ArbitraryDataFileMessage receivedArbitraryDataFileMessage = fetchArbitraryDataFile(peer, null, signature, hash, null); Long endTime = NTP.getTime(); @@ -97,11 +168,18 @@ public class ArbitraryDataFileManager { else { LOGGER.debug("Peer {} didn't respond with data file {} for signature {}. Time taken: {} ms", peer, Base58.encode(hash), Base58.encode(signature), (endTime-startTime)); } + + // Remove this hash from arbitraryDataFileHashResponses now that we have tried to request it + arbitraryDataFileHashResponses.remove(hash58); } else { LOGGER.trace("Already requesting data file {} for signature {}", arbitraryDataFile, Base58.encode(signature)); } } + else { + // Remove this hash from arbitraryDataFileHashResponses because we have a local copy + arbitraryDataFileHashResponses.remove(hash58); + } } if (receivedAtLeastOneFile) {