diff --git a/src/main/java/org/qortal/api/resource/ArbitraryResource.java b/src/main/java/org/qortal/api/resource/ArbitraryResource.java index f588e9c9..84e53200 100644 --- a/src/main/java/org/qortal/api/resource/ArbitraryResource.java +++ b/src/main/java/org/qortal/api/resource/ArbitraryResource.java @@ -576,14 +576,16 @@ public class ArbitraryResource { @PathParam("service") Service service, @PathParam("name") String name, @QueryParam("filepath") String filepath, - @QueryParam("rebuild") boolean rebuild) { + @QueryParam("rebuild") boolean rebuild, + @QueryParam("async") boolean async, + @QueryParam("attempts") Integer attempts) { // Authentication can be bypassed in the settings, for those running public QDN nodes if (!Settings.getInstance().isQDNAuthBypassEnabled()) { Security.checkApiCallAllowed(request); } - return this.download(service, name, null, filepath, rebuild); + return this.download(service, name, null, filepath, rebuild, async, attempts); } @GET @@ -609,14 +611,16 @@ public class ArbitraryResource { @PathParam("name") String name, @PathParam("identifier") String identifier, @QueryParam("filepath") String filepath, - @QueryParam("rebuild") boolean rebuild) { + @QueryParam("rebuild") boolean rebuild, + @QueryParam("async") boolean async, + @QueryParam("attempts") Integer attempts) { // Authentication can be bypassed in the settings, for those running public QDN nodes if (!Settings.getInstance().isQDNAuthBypassEnabled()) { Security.checkApiCallAllowed(request); } - return this.download(service, name, identifier, filepath, rebuild); + return this.download(service, name, identifier, filepath, rebuild, async, attempts); } @@ -1027,30 +1031,45 @@ public class ArbitraryResource { } } - private HttpServletResponse download(Service service, String name, String identifier, String filepath, boolean rebuild) { + private HttpServletResponse download(Service service, String name, String identifier, String filepath, boolean rebuild, boolean async, Integer maxAttempts) { ArbitraryDataReader arbitraryDataReader = new ArbitraryDataReader(name, ArbitraryDataFile.ResourceIdType.NAME, service, identifier); try { int attempts = 0; + if (maxAttempts == null) { + maxAttempts = 5; + } // Loop until we have data - while (!Controller.isStopping()) { - attempts++; - if (!arbitraryDataReader.isBuilding()) { - try { - arbitraryDataReader.loadSynchronously(rebuild); - break; - } catch (MissingDataException e) { - if (attempts > 5) { - // Give up after 5 attempts - throw ApiExceptionFactory.INSTANCE.createCustomException(request, ApiError.INVALID_CRITERIA, "Data unavailable. Please try again later."); + if (async) { + // Asynchronous + arbitraryDataReader.loadAsynchronously(false); + } + else { + // Synchronous + while (!Controller.isStopping()) { + attempts++; + if (!arbitraryDataReader.isBuilding()) { + try { + arbitraryDataReader.loadSynchronously(rebuild); + break; + } catch (MissingDataException e) { + if (attempts > maxAttempts) { + // Give up after 5 attempts + throw ApiExceptionFactory.INSTANCE.createCustomException(request, ApiError.INVALID_CRITERIA, "Data unavailable. Please try again later."); + } } } + Thread.sleep(3000L); } - Thread.sleep(3000L); } + java.nio.file.Path outputPath = arbitraryDataReader.getFilePath(); + if (outputPath == null) { + // Assume the resource doesn't exist + throw ApiExceptionFactory.INSTANCE.createCustomException(request, ApiError.FILE_NOT_FOUND, "File not found"); + } if (filepath == null || filepath.isEmpty()) { // No file path supplied - so check if this is a single file resource diff --git a/src/main/java/org/qortal/arbitrary/ArbitraryDataReader.java b/src/main/java/org/qortal/arbitrary/ArbitraryDataReader.java index 619e5330..bb5641c2 100644 --- a/src/main/java/org/qortal/arbitrary/ArbitraryDataReader.java +++ b/src/main/java/org/qortal/arbitrary/ArbitraryDataReader.java @@ -122,9 +122,19 @@ public class ArbitraryDataReader { * This adds the build task to a queue, and the result will be cached when complete * To check the status of the build, periodically call isCachedDataAvailable() * Once it returns true, you can then use getFilePath() to access the data itself. + * + * @param overwrite - set to true to force rebuild an existing cache * @return true if added or already present in queue; false if not */ - public boolean loadAsynchronously() { + public boolean loadAsynchronously(boolean overwrite) { + ArbitraryDataCache cache = new ArbitraryDataCache(this.uncompressedPath, overwrite, + this.resourceId, this.resourceIdType, this.service, this.identifier); + if (cache.isCachedDataAvailable()) { + // Use cached data + this.filePath = this.uncompressedPath; + return true; + } + return ArbitraryDataBuildManager.getInstance().addToBuildQueue(this.createQueueItem()); } diff --git a/src/main/java/org/qortal/arbitrary/ArbitraryDataRenderer.java b/src/main/java/org/qortal/arbitrary/ArbitraryDataRenderer.java index 445ff2f6..e4d90b79 100644 --- a/src/main/java/org/qortal/arbitrary/ArbitraryDataRenderer.java +++ b/src/main/java/org/qortal/arbitrary/ArbitraryDataRenderer.java @@ -76,7 +76,7 @@ public class ArbitraryDataRenderer { if (!arbitraryDataReader.isCachedDataAvailable()) { // If async is requested, show a loading screen whilst build is in progress if (async) { - arbitraryDataReader.loadAsynchronously(); + arbitraryDataReader.loadAsynchronously(false); return this.getLoadingResponse(service, resourceId); } diff --git a/src/main/java/org/qortal/controller/Controller.java b/src/main/java/org/qortal/controller/Controller.java index 51f91970..7c3caad5 100644 --- a/src/main/java/org/qortal/controller/Controller.java +++ b/src/main/java/org/qortal/controller/Controller.java @@ -11,18 +11,7 @@ import java.security.Security; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Deque; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Random; +import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -101,6 +90,14 @@ public class Controller extends Thread { private static final long DELETE_EXPIRED_INTERVAL = 5 * 60 * 1000L; // ms private static final int MAX_INCOMING_TRANSACTIONS = 5000; + /** Minimum time before considering an invalid unconfirmed transaction as "stale" */ + public static final long INVALID_TRANSACTION_STALE_TIMEOUT = 30 * 60 * 1000L; // ms + /** Minimum frequency to re-request stale unconfirmed transactions from peers, to recheck validity */ + public static final long INVALID_TRANSACTION_RECHECK_INTERVAL = 60 * 60 * 1000L; // ms\ + /** Minimum frequency to re-request expired unconfirmed transactions from peers, to recheck validity + * This mainly exists to stop expired transactions from bloating the list */ + public static final long EXPIRED_TRANSACTION_RECHECK_INTERVAL = 10 * 60 * 1000L; // ms + // To do with online accounts list private static final long ONLINE_ACCOUNTS_TASKS_INTERVAL = 10 * 1000L; // ms private static final long ONLINE_ACCOUNTS_BROADCAST_INTERVAL = 1 * 60 * 1000L; // ms @@ -147,6 +144,9 @@ public class Controller extends Thread { /** List of incoming transaction that are in the import queue */ private List incomingTransactions = Collections.synchronizedList(new ArrayList<>()); + /** List of recent invalid unconfirmed transactions */ + private Map invalidUnconfirmedTransactions = Collections.synchronizedMap(new HashMap<>()); + /** Lock for only allowing one blockchain-modifying codepath at a time. e.g. synchronization or newly minted block. */ private final ReentrantLock blockchainLock = new ReentrantLock(); @@ -557,6 +557,8 @@ public class Controller extends Thread { // Process incoming transactions queue processIncomingTransactionsQueue(); + // Clean up invalid incoming transactions list + cleanupInvalidTransactionsList(now); // Clean up arbitrary data request cache ArbitraryDataManager.getInstance().cleanupRequestCache(now); @@ -820,6 +822,103 @@ public class Controller extends Thread { } } + // Incoming transactions queue + + private void processIncomingTransactionsQueue() { + if (this.incomingTransactions.size() == 0) { + // Don't bother locking if there are no new transactions to process + return; + } + + if (Synchronizer.getInstance().isSyncRequested() || Synchronizer.getInstance().isSynchronizing()) { + // Prioritize syncing, and don't attempt to lock + return; + } + + try { + ReentrantLock blockchainLock = Controller.getInstance().getBlockchainLock(); + if (!blockchainLock.tryLock(2, TimeUnit.SECONDS)) { + LOGGER.trace(() -> String.format("Too busy to process incoming transactions queue")); + return; + } + } catch (InterruptedException e) { + LOGGER.info("Interrupted when trying to acquire blockchain lock"); + return; + } + + try (final Repository repository = RepositoryManager.getRepository()) { + + // Iterate through incoming transactions list + synchronized (this.incomingTransactions) { // Required in order to safely iterate a synchronizedList() + Iterator iterator = this.incomingTransactions.iterator(); + while (iterator.hasNext()) { + if (isStopping) { + return; + } + + TransactionData transactionData = (TransactionData) iterator.next(); + Transaction transaction = Transaction.fromData(repository, transactionData); + + // Check signature + if (!transaction.isSignatureValid()) { + LOGGER.trace(() -> String.format("Ignoring %s transaction %s with invalid signature", transactionData.getType().name(), Base58.encode(transactionData.getSignature()))); + iterator.remove(); + continue; + } + + ValidationResult validationResult = transaction.importAsUnconfirmed(); + + if (validationResult == ValidationResult.TRANSACTION_ALREADY_EXISTS) { + LOGGER.trace(() -> String.format("Ignoring existing transaction %s", Base58.encode(transactionData.getSignature()))); + iterator.remove(); + continue; + } + + if (validationResult == ValidationResult.NO_BLOCKCHAIN_LOCK) { + LOGGER.trace(() -> String.format("Couldn't lock blockchain to import unconfirmed transaction", Base58.encode(transactionData.getSignature()))); + iterator.remove(); + continue; + } + + if (validationResult != ValidationResult.OK) { + final String signature58 = Base58.encode(transactionData.getSignature()); + LOGGER.trace(() -> String.format("Ignoring invalid (%s) %s transaction %s", validationResult.name(), transactionData.getType().name(), signature58)); + Long now = NTP.getTime(); + if (now != null && now - transactionData.getTimestamp() > INVALID_TRANSACTION_STALE_TIMEOUT) { + Long expiryLength = INVALID_TRANSACTION_RECHECK_INTERVAL; + if (validationResult == ValidationResult.TIMESTAMP_TOO_OLD) { + // Use shorter recheck interval for expired transactions + expiryLength = EXPIRED_TRANSACTION_RECHECK_INTERVAL; + } + Long expiry = now + expiryLength; + LOGGER.debug("Adding stale invalid transaction {} to invalidUnconfirmedTransactions...", signature58); + // Invalid, unconfirmed transaction has become stale - add to invalidUnconfirmedTransactions so that we don't keep requesting it + invalidUnconfirmedTransactions.put(signature58, expiry); + } + iterator.remove(); + continue; + } + + LOGGER.debug(() -> String.format("Imported %s transaction %s", transactionData.getType().name(), Base58.encode(transactionData.getSignature()))); + iterator.remove(); + } + } + } catch (DataException e) { + LOGGER.error(String.format("Repository issue while processing incoming transactions", e)); + } finally { + blockchainLock.unlock(); + } + } + + private void cleanupInvalidTransactionsList(Long now) { + if (now == null) { + return; + } + // Periodically remove invalid unconfirmed transactions from the list, so that they can be fetched again + invalidUnconfirmedTransactions.entrySet().removeIf(entry -> entry.getValue() == null || entry.getValue() < now); + } + + // Shutdown public void shutdown() { @@ -1293,79 +1392,6 @@ public class Controller extends Thread { } } - private void processIncomingTransactionsQueue() { - if (this.incomingTransactions.size() == 0) { - // Don't bother locking if there are no new transactions to process - return; - } - - if (Synchronizer.getInstance().isSyncRequested() || Synchronizer.getInstance().isSynchronizing()) { - // Prioritize syncing, and don't attempt to lock - return; - } - - try { - ReentrantLock blockchainLock = Controller.getInstance().getBlockchainLock(); - if (!blockchainLock.tryLock(2, TimeUnit.SECONDS)) { - LOGGER.trace(() -> String.format("Too busy to process incoming transactions queue")); - return; - } - } catch (InterruptedException e) { - LOGGER.info("Interrupted when trying to acquire blockchain lock"); - return; - } - - try (final Repository repository = RepositoryManager.getRepository()) { - - // Iterate through incoming transactions list - synchronized (this.incomingTransactions) { // Required in order to safely iterate a synchronizedList() - Iterator iterator = this.incomingTransactions.iterator(); - while (iterator.hasNext()) { - if (isStopping) { - return; - } - - TransactionData transactionData = (TransactionData) iterator.next(); - Transaction transaction = Transaction.fromData(repository, transactionData); - - // Check signature - if (!transaction.isSignatureValid()) { - LOGGER.trace(() -> String.format("Ignoring %s transaction %s with invalid signature", transactionData.getType().name(), Base58.encode(transactionData.getSignature()))); - iterator.remove(); - continue; - } - - ValidationResult validationResult = transaction.importAsUnconfirmed(); - - if (validationResult == ValidationResult.TRANSACTION_ALREADY_EXISTS) { - LOGGER.trace(() -> String.format("Ignoring existing transaction %s", Base58.encode(transactionData.getSignature()))); - iterator.remove(); - continue; - } - - if (validationResult == ValidationResult.NO_BLOCKCHAIN_LOCK) { - LOGGER.trace(() -> String.format("Couldn't lock blockchain to import unconfirmed transaction", Base58.encode(transactionData.getSignature()))); - iterator.remove(); - continue; - } - - if (validationResult != ValidationResult.OK) { - LOGGER.trace(() -> String.format("Ignoring invalid (%s) %s transaction %s", validationResult.name(), transactionData.getType().name(), Base58.encode(transactionData.getSignature()))); - iterator.remove(); - continue; - } - - LOGGER.debug(() -> String.format("Imported %s transaction %s", transactionData.getType().name(), Base58.encode(transactionData.getSignature()))); - iterator.remove(); - } - } - } catch (DataException e) { - LOGGER.error(String.format("Repository issue while processing incoming transactions", e)); - } finally { - blockchainLock.unlock(); - } - } - private void onNetworkGetBlockSummariesMessage(Peer peer, Message message) { GetBlockSummariesMessage getBlockSummariesMessage = (GetBlockSummariesMessage) message; final byte[] parentSignature = getBlockSummariesMessage.getParentSignature(); @@ -1561,6 +1587,13 @@ public class Controller extends Thread { try (final Repository repository = RepositoryManager.getRepository()) { for (byte[] signature : signatures) { + String signature58 = Base58.encode(signature); + if (invalidUnconfirmedTransactions.containsKey(signature58)) { + // Previously invalid transaction - don't keep requesting it + // It will be periodically removed from invalidUnconfirmedTransactions to allow for rechecks + continue; + } + // Do we have it already? (Before requesting transaction data itself) if (repository.getTransactionRepository().exists(signature)) { LOGGER.trace(() -> String.format("Ignoring existing transaction %s from peer %s", Base58.encode(signature), peer)); @@ -1754,88 +1787,94 @@ public class Controller extends Thread { private void sendOurOnlineAccountsInfo() { final Long now = NTP.getTime(); - if (now == null) - return; + if (now != null) { - List mintingAccounts; - try (final Repository repository = RepositoryManager.getRepository()) { - mintingAccounts = repository.getAccountRepository().getMintingAccounts(); + List mintingAccounts; + try (final Repository repository = RepositoryManager.getRepository()) { + mintingAccounts = repository.getAccountRepository().getMintingAccounts(); - // We have no accounts, but don't reset timestamp - if (mintingAccounts.isEmpty()) - return; + // We have no accounts, but don't reset timestamp + if (mintingAccounts.isEmpty()) + return; - // Only reward-share accounts allowed - Iterator iterator = mintingAccounts.iterator(); - while (iterator.hasNext()) { - MintingAccountData mintingAccountData = iterator.next(); - - RewardShareData rewardShareData = repository.getAccountRepository().getRewardShare(mintingAccountData.getPublicKey()); - if (rewardShareData == null) { - // Reward-share doesn't even exist - probably not a good sign - iterator.remove(); - continue; - } - - Account mintingAccount = new Account(repository, rewardShareData.getMinter()); - if (!mintingAccount.canMint()) { - // Minting-account component of reward-share can no longer mint - disregard - iterator.remove(); - continue; - } - } - } catch (DataException e) { - LOGGER.warn(String.format("Repository issue trying to fetch minting accounts: %s", e.getMessage())); - return; - } - - // 'current' timestamp - final long onlineAccountsTimestamp = Controller.toOnlineAccountTimestamp(now); - boolean hasInfoChanged = false; - - byte[] timestampBytes = Longs.toByteArray(onlineAccountsTimestamp); - List ourOnlineAccounts = new ArrayList<>(); - - MINTING_ACCOUNTS: - for (MintingAccountData mintingAccountData : mintingAccounts) { - PrivateKeyAccount mintingAccount = new PrivateKeyAccount(null, mintingAccountData.getPrivateKey()); - - byte[] signature = mintingAccount.sign(timestampBytes); - byte[] publicKey = mintingAccount.getPublicKey(); - - // Our account is online - OnlineAccountData ourOnlineAccountData = new OnlineAccountData(onlineAccountsTimestamp, signature, publicKey); - synchronized (this.onlineAccounts) { - Iterator iterator = this.onlineAccounts.iterator(); + // Only reward-share accounts allowed + Iterator iterator = mintingAccounts.iterator(); + int i = 0; while (iterator.hasNext()) { - OnlineAccountData existingOnlineAccountData = iterator.next(); + MintingAccountData mintingAccountData = iterator.next(); - if (Arrays.equals(existingOnlineAccountData.getPublicKey(), ourOnlineAccountData.getPublicKey())) { - // If our online account is already present, with same timestamp, then move on to next mintingAccount - if (existingOnlineAccountData.getTimestamp() == onlineAccountsTimestamp) - continue MINTING_ACCOUNTS; - - // If our online account is already present, but with older timestamp, then remove it + RewardShareData rewardShareData = repository.getAccountRepository().getRewardShare(mintingAccountData.getPublicKey()); + if (rewardShareData == null) { + // Reward-share doesn't even exist - probably not a good sign iterator.remove(); - break; + continue; + } + + Account mintingAccount = new Account(repository, rewardShareData.getMinter()); + if (!mintingAccount.canMint()) { + // Minting-account component of reward-share can no longer mint - disregard + iterator.remove(); + continue; + } + + if (++i > 2) { + iterator.remove(); + continue; } } - - this.onlineAccounts.add(ourOnlineAccountData); + } catch (DataException e) { + LOGGER.warn(String.format("Repository issue trying to fetch minting accounts: %s", e.getMessage())); + return; } - LOGGER.trace(() -> String.format("Added our online account %s with timestamp %d", mintingAccount.getAddress(), onlineAccountsTimestamp)); - ourOnlineAccounts.add(ourOnlineAccountData); - hasInfoChanged = true; + // 'current' timestamp + final long onlineAccountsTimestamp = Controller.toOnlineAccountTimestamp(now); + boolean hasInfoChanged = false; + + byte[] timestampBytes = Longs.toByteArray(onlineAccountsTimestamp); + List ourOnlineAccounts = new ArrayList<>(); + + MINTING_ACCOUNTS: + for (MintingAccountData mintingAccountData : mintingAccounts) { + PrivateKeyAccount mintingAccount = new PrivateKeyAccount(null, mintingAccountData.getPrivateKey()); + + byte[] signature = mintingAccount.sign(timestampBytes); + byte[] publicKey = mintingAccount.getPublicKey(); + + // Our account is online + OnlineAccountData ourOnlineAccountData = new OnlineAccountData(onlineAccountsTimestamp, signature, publicKey); + synchronized (this.onlineAccounts) { + Iterator iterator = this.onlineAccounts.iterator(); + while (iterator.hasNext()) { + OnlineAccountData existingOnlineAccountData = iterator.next(); + + if (Arrays.equals(existingOnlineAccountData.getPublicKey(), ourOnlineAccountData.getPublicKey())) { + // If our online account is already present, with same timestamp, then move on to next mintingAccount + if (existingOnlineAccountData.getTimestamp() == onlineAccountsTimestamp) + continue MINTING_ACCOUNTS; + + // If our online account is already present, but with older timestamp, then remove it + iterator.remove(); + break; + } + } + + this.onlineAccounts.add(ourOnlineAccountData); + } + + LOGGER.trace(() -> String.format("Added our online account %s with timestamp %d", mintingAccount.getAddress(), onlineAccountsTimestamp)); + ourOnlineAccounts.add(ourOnlineAccountData); + hasInfoChanged = true; + } + + if (!hasInfoChanged) + return; + + Message message = new OnlineAccountsMessage(ourOnlineAccounts); + Network.getInstance().broadcast(peer -> message); + + LOGGER.trace(() -> String.format("Broadcasted %d online account%s with timestamp %d", ourOnlineAccounts.size(), (ourOnlineAccounts.size() != 1 ? "s" : ""), onlineAccountsTimestamp)); } - - if (!hasInfoChanged) - return; - - Message message = new OnlineAccountsMessage(ourOnlineAccounts); - Network.getInstance().broadcast(peer -> message); - - LOGGER.trace(()-> String.format("Broadcasted %d online account%s with timestamp %d", ourOnlineAccounts.size(), (ourOnlineAccounts.size() != 1 ? "s" : ""), onlineAccountsTimestamp)); } public static long toOnlineAccountTimestamp(long timestamp) { diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataBuilderThread.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataBuilderThread.java index da7c7293..8da18a2b 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataBuilderThread.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataBuilderThread.java @@ -20,8 +20,9 @@ public class ArbitraryDataBuilderThread implements Runnable { } + @Override public void run() { - Thread.currentThread().setName("Arbitrary Data Build Manager"); + Thread.currentThread().setName("Arbitrary Data Builder Thread"); ArbitraryDataBuildManager buildManager = ArbitraryDataBuildManager.getInstance(); while (!Controller.isStopping()) { @@ -39,7 +40,7 @@ public class ArbitraryDataBuilderThread implements Runnable { Map.Entry next = buildManager.arbitraryDataBuildQueue .entrySet().stream() .filter(e -> e.getValue().isQueued()) - .findFirst().get(); + .findFirst().orElse(null); if (next == null) { continue; diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileListManager.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileListManager.java index 6337fc7c..46c2ff15 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileListManager.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileListManager.java @@ -5,6 +5,7 @@ import org.apache.logging.log4j.Logger; import org.qortal.arbitrary.ArbitraryDataFile; import org.qortal.arbitrary.ArbitraryDataFileChunk; import org.qortal.controller.Controller; +import org.qortal.data.arbitrary.ArbitraryRelayInfo; import org.qortal.data.transaction.ArbitraryTransactionData; import org.qortal.data.transaction.TransactionData; import org.qortal.network.Network; @@ -477,10 +478,8 @@ public class ArbitraryDataFileListManager { Long now = NTP.getTime(); for (byte[] hash : hashes) { String hash58 = Base58.encode(hash); - Triple value = new Triple<>(signature58, peer, now); - if (arbitraryDataFileManager.arbitraryRelayMap.putIfAbsent(hash58, value) == null) { - LOGGER.debug("Added {} to relay map: {}, {}, {}", hash58, signature58, peer, now); - } + ArbitraryRelayInfo relayMap = new ArbitraryRelayInfo(hash58, signature58, peer, now); + ArbitraryDataFileManager.getInstance().addToRelayMap(relayMap); } // Forward to requesting peer diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java index 1b544434..8461448e 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java @@ -4,6 +4,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.qortal.arbitrary.ArbitraryDataFile; import org.qortal.controller.Controller; +import org.qortal.data.arbitrary.ArbitraryRelayInfo; import org.qortal.data.network.ArbitraryPeerData; import org.qortal.data.network.PeerData; import org.qortal.data.transaction.ArbitraryTransactionData; @@ -21,6 +22,8 @@ import org.qortal.utils.Triple; import java.security.SecureRandom; import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.stream.Collectors; public class ArbitraryDataFileManager extends Thread { @@ -37,10 +40,9 @@ public class ArbitraryDataFileManager extends Thread { private Map arbitraryDataFileRequests = Collections.synchronizedMap(new HashMap<>()); /** - * Map to keep track of hashes that we might need to relay, keyed by the hash of the file (base58 encoded). - * Value is comprised of the base58-encoded signature, the peer that is hosting it, and the timestamp that it was added + * Map to keep track of hashes that we might need to relay */ - public Map> arbitraryRelayMap = Collections.synchronizedMap(new HashMap<>()); + public List arbitraryRelayMap = Collections.synchronizedList(new ArrayList<>()); /** * Map to keep track of any arbitrary data file hash responses @@ -65,11 +67,16 @@ public class ArbitraryDataFileManager extends Thread { Thread.currentThread().setName("Arbitrary Data File Manager"); try { - while (!isStopping) { - Thread.sleep(1000); + // Use a fixed thread pool to execute the arbitrary data file requests + int threadCount = 10; + ExecutorService arbitraryDataFileRequestExecutor = Executors.newFixedThreadPool(threadCount); + for (int i = 0; i < threadCount; i++) { + arbitraryDataFileRequestExecutor.execute(new ArbitraryDataFileRequestThread()); + } - Long now = NTP.getTime(); - this.processFileHashes(now); + while (!isStopping) { + // Nothing to do yet + Thread.sleep(1000); } } catch (InterruptedException e) { // Fall-through to exit thread... @@ -81,66 +88,6 @@ public class ArbitraryDataFileManager extends Thread { this.interrupt(); } - private void processFileHashes(Long now) { - try (final Repository repository = RepositoryManager.getRepository()) { - - ArbitraryTransactionData arbitraryTransactionData = null; - byte[] signature = null; - byte[] hash = null; - Peer peer = null; - boolean shouldProcess = false; - - synchronized (arbitraryDataFileHashResponses) { - for (String hash58 : arbitraryDataFileHashResponses.keySet()) { - if (isStopping) { - return; - } - - Triple value = arbitraryDataFileHashResponses.get(hash58); - if (value != null) { - peer = value.getA(); - String signature58 = value.getB(); - Long timestamp = value.getC(); - - if (now - timestamp >= ArbitraryDataManager.ARBITRARY_RELAY_TIMEOUT || signature58 == null || peer == null) { - // Ignore - to be deleted - continue; - } - - hash = Base58.decode(hash58); - signature = Base58.decode(signature58); - - // Fetch the transaction data - arbitraryTransactionData = ArbitraryTransactionUtils.fetchTransactionData(repository, signature); - if (arbitraryTransactionData == null) { - continue; - } - - // We want to process this file - shouldProcess = true; - break; - } - } - } - - if (!shouldProcess) { - // Nothing to do - return; - } - - if (signature == null || hash == null || peer == null || arbitraryTransactionData == null) { - return; - } - - String hash58 = Base58.encode(hash); - LOGGER.debug("Fetching file {} from peer {} via response queue...", hash58, peer); - this.fetchArbitraryDataFiles(repository, peer, signature, arbitraryTransactionData, Arrays.asList(hash)); - - } catch (DataException e) { - LOGGER.info("Unable to process file hashes: {}", e.getMessage()); - } - } - public void cleanupRequestCache(Long now) { if (now == null) { @@ -150,7 +97,7 @@ public class ArbitraryDataFileManager extends Thread { arbitraryDataFileRequests.entrySet().removeIf(entry -> entry.getValue() == null || entry.getValue() < requestMinimumTimestamp); final long relayMinimumTimestamp = now - ArbitraryDataManager.getInstance().ARBITRARY_RELAY_TIMEOUT; - arbitraryRelayMap.entrySet().removeIf(entry -> entry.getValue().getC() == null || entry.getValue().getC() < relayMinimumTimestamp); + arbitraryRelayMap.removeIf(entry -> entry == null || entry.getTimestamp() == null || entry.getTimestamp() < relayMinimumTimestamp); arbitraryDataFileHashResponses.entrySet().removeIf(entry -> entry.getValue().getC() == null || entry.getValue().getC() < relayMinimumTimestamp); } @@ -186,13 +133,19 @@ public class ArbitraryDataFileManager extends Thread { if (receivedArbitraryDataFileMessage != null) { LOGGER.debug("Received data file {} from peer {}. Time taken: {} ms", receivedArbitraryDataFileMessage.getArbitraryDataFile().getHash58(), peer, (endTime-startTime)); receivedAtLeastOneFile = true; + + // Remove this hash from arbitraryDataFileHashResponses now that we have received it + arbitraryDataFileHashResponses.remove(hash58); } else { LOGGER.debug("Peer {} didn't respond with data file {} for signature {}. Time taken: {} ms", peer, Base58.encode(hash), Base58.encode(signature), (endTime-startTime)); - } - // Remove this hash from arbitraryDataFileHashResponses now that we have tried to request it - arbitraryDataFileHashResponses.remove(hash58); + // Remove this hash from arbitraryDataFileHashResponses now that we have failed to receive it + arbitraryDataFileHashResponses.remove(hash58); + + // Stop asking for files from this peer + break; + } } else { LOGGER.trace("Already requesting data file {} for signature {}", arbitraryDataFile, Base58.encode(signature)); @@ -217,22 +170,23 @@ public class ArbitraryDataFileManager extends Thread { // Invalidate the hosted transactions cache as we are now hosting something new ArbitraryDataStorageManager.getInstance().invalidateHostedTransactionsCache(); - } - // Check if we have all the files we need for this transaction - if (arbitraryDataFile.allFilesExist()) { + // Check if we have all the files we need for this transaction + if (arbitraryDataFile.allFilesExist()) { - // We have all the chunks for this transaction, so we should invalidate the transaction's name's - // data cache so that it is rebuilt the next time we serve it - ArbitraryDataManager.getInstance().invalidateCache(arbitraryTransactionData); + // We have all the chunks for this transaction, so we should invalidate the transaction's name's + // data cache so that it is rebuilt the next time we serve it + ArbitraryDataManager.getInstance().invalidateCache(arbitraryTransactionData); - // We may also need to broadcast to the network that we are now hosting files for this transaction, - // but only if these files are in accordance with our storage policy - if (ArbitraryDataStorageManager.getInstance().canStoreData(arbitraryTransactionData)) { - // Use a null peer address to indicate our own - Message newArbitrarySignatureMessage = new ArbitrarySignaturesMessage(null, 0, Arrays.asList(signature)); - Network.getInstance().broadcast(broadcastPeer -> newArbitrarySignatureMessage); + // We may also need to broadcast to the network that we are now hosting files for this transaction, + // but only if these files are in accordance with our storage policy + if (ArbitraryDataStorageManager.getInstance().canStoreData(arbitraryTransactionData)) { + // Use a null peer address to indicate our own + Message newArbitrarySignatureMessage = new ArbitrarySignaturesMessage(null, 0, Arrays.asList(signature)); + Network.getInstance().broadcast(broadcastPeer -> newArbitrarySignatureMessage); + } } + } return receivedAtLeastOneFile; @@ -437,6 +391,48 @@ public class ArbitraryDataFileManager extends Thread { } + // Relays + + private List getRelayInfoListForHash(String hash58) { + synchronized (arbitraryRelayMap) { + return arbitraryRelayMap.stream() + .filter(relayInfo -> Objects.equals(relayInfo.getHash58(), hash58)) + .collect(Collectors.toList()); + } + } + + private ArbitraryRelayInfo getRandomRelayInfoEntryForHash(String hash58) { + LOGGER.info("Fetching random relay info for hash: {}", hash58); + List relayInfoList = this.getRelayInfoListForHash(hash58); + if (relayInfoList != null && !relayInfoList.isEmpty()) { + + // Pick random item + int index = new SecureRandom().nextInt(relayInfoList.size()); + LOGGER.info("Returning random relay info for hash: {} (index {})", hash58, index); + return relayInfoList.get(index); + } + LOGGER.info("No relay info exists for hash: {}", hash58); + return null; + } + + public void addToRelayMap(ArbitraryRelayInfo newEntry) { + if (newEntry == null || !newEntry.isValid()) { + return; + } + + // Remove existing entry for this peer if it exists, to renew the timestamp + this.removeFromRelayMap(newEntry); + + // Re-add + arbitraryRelayMap.add(newEntry); + LOGGER.debug("Added entry to relay map: {}", newEntry); + } + + private void removeFromRelayMap(ArbitraryRelayInfo entry) { + arbitraryRelayMap.removeIf(relayInfo -> relayInfo.equals(entry)); + } + + // Network handlers public void onNetworkGetArbitraryDataFileMessage(Peer peer, Message message) { @@ -455,7 +451,7 @@ public class ArbitraryDataFileManager extends Thread { try { ArbitraryDataFile arbitraryDataFile = ArbitraryDataFile.fromHash(hash, signature); - Triple relayInfo = this.arbitraryRelayMap.get(hash58); + ArbitraryRelayInfo relayInfo = this.getRandomRelayInfoEntryForHash(hash58); if (arbitraryDataFile.exists()) { LOGGER.trace("Hash {} exists", hash58); @@ -472,15 +468,12 @@ public class ArbitraryDataFileManager extends Thread { else if (relayInfo != null) { LOGGER.debug("We have relay info for hash {}", Base58.encode(hash)); // We need to ask this peer for the file - Peer peerToAsk = relayInfo.getB(); + Peer peerToAsk = relayInfo.getPeer(); if (peerToAsk != null) { // Forward the message to this peer LOGGER.debug("Asking peer {} for hash {}", peerToAsk, hash58); this.fetchArbitraryDataFile(peerToAsk, peer, signature, hash, message); - - // Remove from the map regardless of outcome, as the relay attempt is now considered complete - arbitraryRelayMap.remove(hash58); } else { LOGGER.debug("Peer {} not found in relay info", peer); diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileRequestThread.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileRequestThread.java new file mode 100644 index 00000000..97704ae5 --- /dev/null +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileRequestThread.java @@ -0,0 +1,117 @@ +package org.qortal.controller.arbitrary; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.qortal.controller.Controller; +import org.qortal.data.transaction.ArbitraryTransactionData; +import org.qortal.network.Peer; +import org.qortal.repository.DataException; +import org.qortal.repository.Repository; +import org.qortal.repository.RepositoryManager; +import org.qortal.utils.ArbitraryTransactionUtils; +import org.qortal.utils.Base58; +import org.qortal.utils.NTP; +import org.qortal.utils.Triple; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.Map; + +public class ArbitraryDataFileRequestThread implements Runnable { + + private static final Logger LOGGER = LogManager.getLogger(ArbitraryDataFileRequestThread.class); + + public ArbitraryDataFileRequestThread() { + + } + + @Override + public void run() { + Thread.currentThread().setName("Arbitrary Data File Request Thread"); + + try { + while (!Controller.isStopping()) { + Thread.sleep(1000); + + Long now = NTP.getTime(); + this.processFileHashes(now); + } + } catch (InterruptedException e) { + // Fall-through to exit thread... + } + } + + private void processFileHashes(Long now) { + try (final Repository repository = RepositoryManager.getRepository()) { + ArbitraryDataFileManager arbitraryDataFileManager = ArbitraryDataFileManager.getInstance(); + + ArbitraryTransactionData arbitraryTransactionData = null; + byte[] signature = null; + byte[] hash = null; + Peer peer = null; + boolean shouldProcess = false; + + synchronized (arbitraryDataFileManager.arbitraryDataFileHashResponses) { + Iterator iterator = arbitraryDataFileManager.arbitraryDataFileHashResponses.entrySet().iterator(); + while (iterator.hasNext()) { + if (Controller.isStopping()) { + return; + } + + Map.Entry entry = (Map.Entry) iterator.next(); + if (entry == null || entry.getKey() == null || entry.getValue() == null) { + iterator.remove(); + continue; + } + + String hash58 = (String) entry.getKey(); + Triple value = (Triple) entry.getValue(); + if (value == null) { + iterator.remove(); + continue; + } + + peer = value.getA(); + String signature58 = value.getB(); + Long timestamp = value.getC(); + + if (now - timestamp >= ArbitraryDataManager.ARBITRARY_RELAY_TIMEOUT || signature58 == null || peer == null) { + // Ignore - to be deleted + iterator.remove(); + continue; + } + + hash = Base58.decode(hash58); + signature = Base58.decode(signature58); + + // We want to process this file + shouldProcess = true; + iterator.remove(); + break; + } + } + + if (!shouldProcess) { + // Nothing to do + return; + } + + // Fetch the transaction data + arbitraryTransactionData = ArbitraryTransactionUtils.fetchTransactionData(repository, signature); + if (arbitraryTransactionData == null) { + return; + } + + if (signature == null || hash == null || peer == null || arbitraryTransactionData == null) { + return; + } + + String hash58 = Base58.encode(hash); + LOGGER.debug("Fetching file {} from peer {} via request thread...", hash58, peer); + arbitraryDataFileManager.fetchArbitraryDataFiles(repository, peer, signature, arbitraryTransactionData, Arrays.asList(hash)); + + } catch (DataException e) { + LOGGER.debug("Unable to process file hashes: {}", e.getMessage()); + } + } +} diff --git a/src/main/java/org/qortal/data/arbitrary/ArbitraryRelayInfo.java b/src/main/java/org/qortal/data/arbitrary/ArbitraryRelayInfo.java new file mode 100644 index 00000000..94f41d18 --- /dev/null +++ b/src/main/java/org/qortal/data/arbitrary/ArbitraryRelayInfo.java @@ -0,0 +1,60 @@ +package org.qortal.data.arbitrary; + +import org.qortal.network.Peer; +import java.util.Objects; + +public class ArbitraryRelayInfo { + + private final String hash58; + private final String signature58; + private final Peer peer; + private final Long timestamp; + + public ArbitraryRelayInfo(String hash58, String signature58, Peer peer, Long timestamp) { + this.hash58 = hash58; + this.signature58 = signature58; + this.peer = peer; + this.timestamp = timestamp; + } + + public boolean isValid() { + return this.getHash58() != null && this.getSignature58() != null + && this.getPeer() != null && this.getTimestamp() != null; + } + + public String getHash58() { + return this.hash58; + } + + public String getSignature58() { + return signature58; + } + + public Peer getPeer() { + return peer; + } + + public Long getTimestamp() { + return timestamp; + } + + @Override + public String toString() { + return String.format("%s = %s, %s, %d", this.hash58, this.signature58, this.peer, this.timestamp); + } + + @Override + public boolean equals(Object other) { + if (other == this) + return true; + + if (!(other instanceof ArbitraryRelayInfo)) + return false; + + ArbitraryRelayInfo otherRelayInfo = (ArbitraryRelayInfo) other; + + return this.peer == otherRelayInfo.getPeer() + && Objects.equals(this.hash58, otherRelayInfo.getHash58()) + && Objects.equals(this.signature58, otherRelayInfo.getSignature58()); + } +} diff --git a/src/main/java/org/qortal/settings/Settings.java b/src/main/java/org/qortal/settings/Settings.java index 41b69114..dd62189f 100644 --- a/src/main/java/org/qortal/settings/Settings.java +++ b/src/main/java/org/qortal/settings/Settings.java @@ -202,9 +202,9 @@ public class Settings { private boolean allowConnectionsWithOlderPeerVersions = true; /** Minimum time (in seconds) that we should attempt to remain connected to a peer for */ - private int minPeerConnectionTime = 2 * 60; // seconds + private int minPeerConnectionTime = 5 * 60; // seconds /** Maximum time (in seconds) that we should attempt to remain connected to a peer for */ - private int maxPeerConnectionTime = 20 * 60; // seconds + private int maxPeerConnectionTime = 60 * 60; // seconds /** Whether to sync multiple blocks at once in normal operation */ private boolean fastSyncEnabled = true;