From bd60c793bea54f3ef8f3a80308291b4db21755d2 Mon Sep 17 00:00:00 2001 From: CalDescent Date: Sun, 30 Jan 2022 19:03:31 +0000 Subject: [PATCH] Incoming transactions are now added to a queue, and then processed soon after. This solves a problem where incoming transactions could rarely obtain a blockchain lock (due to multiple transactions arriving at once) and therefore most messages were thrown away. It was also causing constant blockchain locks to be acquired, which would often prevent the synchronizer from running. --- .../org/qortal/controller/Controller.java | 90 ++++++++++++------- 1 file changed, 60 insertions(+), 30 deletions(-) diff --git a/src/main/java/org/qortal/controller/Controller.java b/src/main/java/org/qortal/controller/Controller.java index 2bfc80c2..90ac25d9 100644 --- a/src/main/java/org/qortal/controller/Controller.java +++ b/src/main/java/org/qortal/controller/Controller.java @@ -26,6 +26,7 @@ import java.util.Properties; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; @@ -101,6 +102,7 @@ public class Controller extends Thread { private static final long NTP_POST_SYNC_CHECK_PERIOD = 5 * 60 * 1000L; // ms private static final long DELETE_EXPIRED_INTERVAL = 5 * 60 * 1000L; // ms private static final long RECOVERY_MODE_TIMEOUT = 10 * 60 * 1000L; // ms + private static final int MAX_INCOMING_TRANSACTIONS = 5000; // To do with online accounts list private static final long ONLINE_ACCOUNTS_TASKS_INTERVAL = 10 * 1000L; // ms @@ -153,6 +155,9 @@ public class Controller extends Thread { /** Temporary estimate of synchronization progress for SysTray use. */ private volatile int syncPercent = 0; + /** List of incoming transaction that are in the import queue */ + private List incomingTransactions = Collections.synchronizedList(new ArrayList<>()); + /** Latest block signatures from other peers that we know are on inferior chains. */ List inferiorChainSignatures = new ArrayList<>(); @@ -584,6 +589,9 @@ public class Controller extends Thread { potentiallySynchronize(); } + // Process incoming transactions queue + processIncomingTransactionsQueue(); + // Clean up arbitrary data request cache ArbitraryDataManager.getInstance().cleanupRequestCache(now); // Clean up arbitrary data queues and lists @@ -1497,50 +1505,72 @@ public class Controller extends Thread { private void onNetworkTransactionMessage(Peer peer, Message message) { TransactionMessage transactionMessage = (TransactionMessage) message; TransactionData transactionData = transactionMessage.getTransactionData(); + if (this.incomingTransactions.size() < MAX_INCOMING_TRANSACTIONS) { + this.incomingTransactions.add(transactionData); + } + } - /* - * If we can't obtain blockchain lock immediately, - * e.g. Synchronizer is active, or another transaction is taking a while to validate, - * then we're using up a network thread for ages and clogging things up - * so bail out early - */ - ReentrantLock blockchainLock = Controller.getInstance().getBlockchainLock(); - if (!blockchainLock.tryLock()) { - LOGGER.trace(() -> String.format("Too busy to import %s transaction %s from peer %s", transactionData.getType().name(), Base58.encode(transactionData.getSignature()), peer)); + private void processIncomingTransactionsQueue() { + try { + ReentrantLock blockchainLock = Controller.getInstance().getBlockchainLock(); + if (!blockchainLock.tryLock(2, TimeUnit.SECONDS)) { + LOGGER.info(() -> String.format("Too busy to process incoming transactions queue")); + return; + } + } catch (InterruptedException e) { + LOGGER.info("Interrupted when trying to acquire blockchain lock"); return; } try (final Repository repository = RepositoryManager.getRepository()) { - Transaction transaction = Transaction.fromData(repository, transactionData); - // Check signature - if (!transaction.isSignatureValid()) { - LOGGER.trace(() -> String.format("Ignoring %s transaction %s with invalid signature from peer %s", transactionData.getType().name(), Base58.encode(transactionData.getSignature()), peer)); - return; - } + // Iterate through incoming transactions list + synchronized (this.incomingTransactions) { // Required in order to safely iterate a synchronizedList() + Iterator iterator = this.incomingTransactions.iterator(); + while (iterator.hasNext()) { + if (isStopping) { + return; + } - ValidationResult validationResult = transaction.importAsUnconfirmed(); + TransactionData transactionData = (TransactionData) iterator.next(); + Transaction transaction = Transaction.fromData(repository, transactionData); - if (validationResult == ValidationResult.TRANSACTION_ALREADY_EXISTS) { - LOGGER.trace(() -> String.format("Ignoring existing transaction %s from peer %s", Base58.encode(transactionData.getSignature()), peer)); - return; - } + // Check signature + if (!transaction.isSignatureValid()) { + LOGGER.trace(() -> String.format("Ignoring %s transaction %s with invalid signature", transactionData.getType().name(), Base58.encode(transactionData.getSignature()))); + iterator.remove(); + continue; + } - if (validationResult == ValidationResult.NO_BLOCKCHAIN_LOCK) { - LOGGER.trace(() -> String.format("Couldn't lock blockchain to import unconfirmed transaction %s from peer %s", Base58.encode(transactionData.getSignature()), peer)); - return; - } + ValidationResult validationResult = transaction.importAsUnconfirmed(); - if (validationResult != ValidationResult.OK) { - LOGGER.trace(() -> String.format("Ignoring invalid (%s) %s transaction %s from peer %s", validationResult.name(), transactionData.getType().name(), Base58.encode(transactionData.getSignature()), peer)); - return; - } + if (validationResult == ValidationResult.TRANSACTION_ALREADY_EXISTS) { + LOGGER.trace(() -> String.format("Ignoring existing transaction %s", Base58.encode(transactionData.getSignature()))); + iterator.remove(); + continue; + } - LOGGER.debug(() -> String.format("Imported %s transaction %s from peer %s", transactionData.getType().name(), Base58.encode(transactionData.getSignature()), peer)); + if (validationResult == ValidationResult.NO_BLOCKCHAIN_LOCK) { + LOGGER.trace(() -> String.format("Couldn't lock blockchain to import unconfirmed transaction", Base58.encode(transactionData.getSignature()))); + iterator.remove(); + continue; + } + + if (validationResult != ValidationResult.OK) { + LOGGER.trace(() -> String.format("Ignoring invalid (%s) %s transaction %s", validationResult.name(), transactionData.getType().name(), Base58.encode(transactionData.getSignature()))); + iterator.remove(); + continue; + } + + LOGGER.debug(() -> String.format("Imported %s transaction %s", transactionData.getType().name(), Base58.encode(transactionData.getSignature()))); + iterator.remove(); + } + } } catch (DataException e) { - LOGGER.error(String.format("Repository issue while processing transaction %s from peer %s", Base58.encode(transactionData.getSignature()), peer), e); + LOGGER.error(String.format("Repository issue while processing incoming transactions", e)); } finally { blockchainLock.unlock(); + LOGGER.info("[processIncomingTransactionsQueue] Released blockchain lock"); } }