From b8aaf14cdc9187f9702bc035a34d96ba99e67a8a Mon Sep 17 00:00:00 2001 From: CalDescent Date: Sun, 6 Feb 2022 15:34:06 +0000 Subject: [PATCH] Introduced ArbitraryDataFileRequestThread to allow for multiple concurrent file requests. This is likely a short term solution (to allow existing code to be repurposed) until replaced with a task-based approach, as this will allow for a much greater number of threads. --- .../arbitrary/ArbitraryDataFileManager.java | 75 ++--------- .../ArbitraryDataFileRequestThread.java | 117 ++++++++++++++++++ 2 files changed, 128 insertions(+), 64 deletions(-) create mode 100644 src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileRequestThread.java diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java index 1b544434..27433180 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java @@ -21,6 +21,8 @@ import org.qortal.utils.Triple; import java.security.SecureRandom; import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.stream.Collectors; public class ArbitraryDataFileManager extends Thread { @@ -65,11 +67,16 @@ public class ArbitraryDataFileManager extends Thread { Thread.currentThread().setName("Arbitrary Data File Manager"); try { - while (!isStopping) { - Thread.sleep(1000); + // Use a fixed thread pool to execute the arbitrary data file requests + int threadCount = 10; + ExecutorService arbitraryDataFileRequestExecutor = Executors.newFixedThreadPool(threadCount); + for (int i = 0; i < threadCount; i++) { + arbitraryDataFileRequestExecutor.execute(new ArbitraryDataFileRequestThread()); + } - Long now = NTP.getTime(); - this.processFileHashes(now); + while (!isStopping) { + // Nothing to do yet + Thread.sleep(1000); } } catch (InterruptedException e) { // Fall-through to exit thread... @@ -81,66 +88,6 @@ public class ArbitraryDataFileManager extends Thread { this.interrupt(); } - private void processFileHashes(Long now) { - try (final Repository repository = RepositoryManager.getRepository()) { - - ArbitraryTransactionData arbitraryTransactionData = null; - byte[] signature = null; - byte[] hash = null; - Peer peer = null; - boolean shouldProcess = false; - - synchronized (arbitraryDataFileHashResponses) { - for (String hash58 : arbitraryDataFileHashResponses.keySet()) { - if (isStopping) { - return; - } - - Triple value = arbitraryDataFileHashResponses.get(hash58); - if (value != null) { - peer = value.getA(); - String signature58 = value.getB(); - Long timestamp = value.getC(); - - if (now - timestamp >= ArbitraryDataManager.ARBITRARY_RELAY_TIMEOUT || signature58 == null || peer == null) { - // Ignore - to be deleted - continue; - } - - hash = Base58.decode(hash58); - signature = Base58.decode(signature58); - - // Fetch the transaction data - arbitraryTransactionData = ArbitraryTransactionUtils.fetchTransactionData(repository, signature); - if (arbitraryTransactionData == null) { - continue; - } - - // We want to process this file - shouldProcess = true; - break; - } - } - } - - if (!shouldProcess) { - // Nothing to do - return; - } - - if (signature == null || hash == null || peer == null || arbitraryTransactionData == null) { - return; - } - - String hash58 = Base58.encode(hash); - LOGGER.debug("Fetching file {} from peer {} via response queue...", hash58, peer); - this.fetchArbitraryDataFiles(repository, peer, signature, arbitraryTransactionData, Arrays.asList(hash)); - - } catch (DataException e) { - LOGGER.info("Unable to process file hashes: {}", e.getMessage()); - } - } - public void cleanupRequestCache(Long now) { if (now == null) { diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileRequestThread.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileRequestThread.java new file mode 100644 index 00000000..97704ae5 --- /dev/null +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileRequestThread.java @@ -0,0 +1,117 @@ +package org.qortal.controller.arbitrary; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.qortal.controller.Controller; +import org.qortal.data.transaction.ArbitraryTransactionData; +import org.qortal.network.Peer; +import org.qortal.repository.DataException; +import org.qortal.repository.Repository; +import org.qortal.repository.RepositoryManager; +import org.qortal.utils.ArbitraryTransactionUtils; +import org.qortal.utils.Base58; +import org.qortal.utils.NTP; +import org.qortal.utils.Triple; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.Map; + +public class ArbitraryDataFileRequestThread implements Runnable { + + private static final Logger LOGGER = LogManager.getLogger(ArbitraryDataFileRequestThread.class); + + public ArbitraryDataFileRequestThread() { + + } + + @Override + public void run() { + Thread.currentThread().setName("Arbitrary Data File Request Thread"); + + try { + while (!Controller.isStopping()) { + Thread.sleep(1000); + + Long now = NTP.getTime(); + this.processFileHashes(now); + } + } catch (InterruptedException e) { + // Fall-through to exit thread... + } + } + + private void processFileHashes(Long now) { + try (final Repository repository = RepositoryManager.getRepository()) { + ArbitraryDataFileManager arbitraryDataFileManager = ArbitraryDataFileManager.getInstance(); + + ArbitraryTransactionData arbitraryTransactionData = null; + byte[] signature = null; + byte[] hash = null; + Peer peer = null; + boolean shouldProcess = false; + + synchronized (arbitraryDataFileManager.arbitraryDataFileHashResponses) { + Iterator iterator = arbitraryDataFileManager.arbitraryDataFileHashResponses.entrySet().iterator(); + while (iterator.hasNext()) { + if (Controller.isStopping()) { + return; + } + + Map.Entry entry = (Map.Entry) iterator.next(); + if (entry == null || entry.getKey() == null || entry.getValue() == null) { + iterator.remove(); + continue; + } + + String hash58 = (String) entry.getKey(); + Triple value = (Triple) entry.getValue(); + if (value == null) { + iterator.remove(); + continue; + } + + peer = value.getA(); + String signature58 = value.getB(); + Long timestamp = value.getC(); + + if (now - timestamp >= ArbitraryDataManager.ARBITRARY_RELAY_TIMEOUT || signature58 == null || peer == null) { + // Ignore - to be deleted + iterator.remove(); + continue; + } + + hash = Base58.decode(hash58); + signature = Base58.decode(signature58); + + // We want to process this file + shouldProcess = true; + iterator.remove(); + break; + } + } + + if (!shouldProcess) { + // Nothing to do + return; + } + + // Fetch the transaction data + arbitraryTransactionData = ArbitraryTransactionUtils.fetchTransactionData(repository, signature); + if (arbitraryTransactionData == null) { + return; + } + + if (signature == null || hash == null || peer == null || arbitraryTransactionData == null) { + return; + } + + String hash58 = Base58.encode(hash); + LOGGER.debug("Fetching file {} from peer {} via request thread...", hash58, peer); + arbitraryDataFileManager.fetchArbitraryDataFiles(repository, peer, signature, arbitraryTransactionData, Arrays.asList(hash)); + + } catch (DataException e) { + LOGGER.debug("Unable to process file hashes: {}", e.getMessage()); + } + } +}