From affd1002982a9cd7d3fc2dcbb7cc98b8fb3cddb7 Mon Sep 17 00:00:00 2001 From: catbref Date: Thu, 3 Mar 2022 20:09:28 +0000 Subject: [PATCH 1/2] Reworking of Controller.processIncomingTransactionsQueue() Main changes are: * Check transaction signature validity in initial round, without blockchain lock * Convert List of incoming transactions to Map so we can record whether we have validated transaction signature before to save rechecking effort * Add invalid signature transactions to invalidUnconfirmedTransactions map with INVALID_TRANSACTION_RECHECK_INTERVAL expiry (~60min) * Other minor changes related to List->Map change and Java object synchronization --- .../org/qortal/controller/Controller.java | 186 ++++++++++++------ 1 file changed, 121 insertions(+), 65 deletions(-) diff --git a/src/main/java/org/qortal/controller/Controller.java b/src/main/java/org/qortal/controller/Controller.java index 85901620..4e2d0b72 100644 --- a/src/main/java/org/qortal/controller/Controller.java +++ b/src/main/java/org/qortal/controller/Controller.java @@ -143,11 +143,11 @@ public class Controller extends Thread { /** Whether we can mint new blocks, as reported by BlockMinter. */ private volatile boolean isMintingPossible = false; - /** List of incoming transaction that are in the import queue */ - private List incomingTransactions = Collections.synchronizedList(new ArrayList<>()); + /** Map of incoming transaction that are in the import queue. Key is transaction data, value is whether signature has been validated. */ + private final Map incomingTransactions = Collections.synchronizedMap(new HashMap<>()); - /** List of recent invalid unconfirmed transactions */ - private Map invalidUnconfirmedTransactions = Collections.synchronizedMap(new HashMap<>()); + /** Map of recent invalid unconfirmed transactions. Key is base58 transaction signature, value is do-not-request expiry timestamp. */ + private final Map invalidUnconfirmedTransactions = Collections.synchronizedMap(new HashMap<>()); /** Lock for only allowing one blockchain-modifying codepath at a time. e.g. synchronization or newly minted block. */ private final ReentrantLock blockchainLock = new ReentrantLock(); @@ -837,16 +837,16 @@ public class Controller extends Thread { private boolean incomingTransactionQueueContains(byte[] signature) { synchronized (incomingTransactions) { - return incomingTransactions.stream().anyMatch(t -> Arrays.equals(t.getSignature(), signature)); + return incomingTransactions.keySet().stream().anyMatch(t -> Arrays.equals(t.getSignature(), signature)); } } private void removeIncomingTransaction(byte[] signature) { - incomingTransactions.removeIf(t -> Arrays.equals(t.getSignature(), signature)); + incomingTransactions.keySet().removeIf(t -> Arrays.equals(t.getSignature(), signature)); } private void processIncomingTransactionsQueue() { - if (this.incomingTransactions.size() == 0) { + if (this.incomingTransactions.isEmpty()) { // Don't bother locking if there are no new transactions to process return; } @@ -856,86 +856,139 @@ public class Controller extends Thread { return; } - try { - ReentrantLock blockchainLock = Controller.getInstance().getBlockchainLock(); - if (!blockchainLock.tryLock(2, TimeUnit.SECONDS)) { - LOGGER.trace(() -> String.format("Too busy to process incoming transactions queue")); - return; - } - } catch (InterruptedException e) { - LOGGER.info("Interrupted when trying to acquire blockchain lock"); - return; - } - try (final Repository repository = RepositoryManager.getRepository()) { - LOGGER.debug("Processing incoming transactions queue (size {})...", this.incomingTransactions.size()); + // Take a snapshot of incomingTransactions, so we don't need to lock it while processing + Map incomingTransactionsCopy = Map.copyOf(this.incomingTransactions); - // Take a copy of incomingTransactions so we can release the lock - ListincomingTransactionsCopy = new ArrayList<>(this.incomingTransactions); + LOGGER.debug("Processing incoming transactions queue (size {})...", incomingTransactionsCopy.size()); - // Iterate through incoming transactions list - Iterator iterator = incomingTransactionsCopy.iterator(); - while (iterator.hasNext()) { + List sigValidTransactions = new ArrayList<>(); + + // Signature validation round - does not require blockchain lock + for (Map.Entry transactionEntry : incomingTransactionsCopy.entrySet()) { + // Quick exit? if (isStopping) { return; } if (Synchronizer.getInstance().isSyncRequestPending()) { - LOGGER.debug("Breaking out of transaction processing loop with {} remaining, because a sync request is pending", incomingTransactionsCopy.size()); - return; + LOGGER.debug("Breaking out of transaction signature validation with {} remaining, because a sync request is pending", incomingTransactionsCopy.size()); + + // Fall-through to importing, or we could not even attempt to import by changing following line to 'return' + break; } - TransactionData transactionData = (TransactionData) iterator.next(); + TransactionData transactionData = transactionEntry.getKey(); Transaction transaction = Transaction.fromData(repository, transactionData); - // Check signature - if (!transaction.isSignatureValid()) { - LOGGER.trace(() -> String.format("Ignoring %s transaction %s with invalid signature", transactionData.getType().name(), Base58.encode(transactionData.getSignature()))); - removeIncomingTransaction(transactionData.getSignature()); - continue; - } + // Only validate signature if we haven't already done so + Boolean isSigValid = transactionEntry.getValue(); + if (!Boolean.TRUE.equals(isSigValid)) { + if (!transaction.isSignatureValid()) { + String signature58 = Base58.encode(transactionData.getSignature()); + + LOGGER.trace("Ignoring {} transaction {} with invalid signature", transactionData.getType().name(), signature58); + removeIncomingTransaction(transactionData.getSignature()); + + // Also add to invalidIncomingTransactions map + Long now = NTP.getTime(); + if (now != null) { + Long expiry = now + INVALID_TRANSACTION_RECHECK_INTERVAL; + LOGGER.trace("Adding stale invalid transaction {} to invalidUnconfirmedTransactions...", signature58); + // Add to invalidUnconfirmedTransactions so that we don't keep requesting it + invalidUnconfirmedTransactions.put(signature58, expiry); + } - ValidationResult validationResult = transaction.importAsUnconfirmed(); + continue; + } - if (validationResult == ValidationResult.TRANSACTION_ALREADY_EXISTS) { - LOGGER.trace(() -> String.format("Ignoring existing transaction %s", Base58.encode(transactionData.getSignature()))); - removeIncomingTransaction(transactionData.getSignature()); - continue; + // Add mark signature as valid if transaction still exists in import queue + incomingTransactions.computeIfPresent(transactionData, (k, v) -> Boolean.TRUE); + } else { + LOGGER.trace(() -> String.format("Transaction %s known to have valid signature", Base58.encode(transactionData.getSignature()))); } - if (validationResult == ValidationResult.NO_BLOCKCHAIN_LOCK) { - LOGGER.trace(() -> String.format("Couldn't lock blockchain to import unconfirmed transaction", Base58.encode(transactionData.getSignature()))); - removeIncomingTransaction(transactionData.getSignature()); - continue; + // Signature valid - add to shortlist + sigValidTransactions.add(transaction); + } + + try { + ReentrantLock blockchainLock = Controller.getInstance().getBlockchainLock(); + if (!blockchainLock.tryLock(2, TimeUnit.SECONDS)) { + // This is not great if we've just spent a while doing mem-PoW during signature validation round above + LOGGER.debug("Too busy to process incoming transactions queue"); + return; } + } catch (InterruptedException e) { + LOGGER.debug("Interrupted when trying to acquire blockchain lock"); + return; + } + + // Import transactions with valid signatures + try { + for (int i = 0; i < sigValidTransactions.size(); ++i) { + if (isStopping) { + return; + } + + if (Synchronizer.getInstance().isSyncRequestPending()) { + LOGGER.debug("Breaking out of transaction processing with {} remaining, because a sync request is pending", sigValidTransactions.size() - i); + return; + } + + Transaction transaction = sigValidTransactions.get(i); + TransactionData transactionData = transaction.getTransactionData(); + + ValidationResult validationResult = transaction.importAsUnconfirmed(); + + switch (validationResult) { + case TRANSACTION_ALREADY_EXISTS: { + LOGGER.trace(() -> String.format("Ignoring existing transaction %s", Base58.encode(transactionData.getSignature()))); + break; + } + + case NO_BLOCKCHAIN_LOCK: { + // Is this even possible considering we acquired blockchain lock above? + LOGGER.trace(() -> String.format("Couldn't lock blockchain to import unconfirmed transaction %s", Base58.encode(transactionData.getSignature()))); + break; + } + + case OK: { + LOGGER.debug(() -> String.format("Imported %s transaction %s", transactionData.getType().name(), Base58.encode(transactionData.getSignature()))); + break; + } - if (validationResult != ValidationResult.OK) { - final String signature58 = Base58.encode(transactionData.getSignature()); - LOGGER.trace(() -> String.format("Ignoring invalid (%s) %s transaction %s", validationResult.name(), transactionData.getType().name(), signature58)); - Long now = NTP.getTime(); - if (now != null && now - transactionData.getTimestamp() > INVALID_TRANSACTION_STALE_TIMEOUT) { - Long expiryLength = INVALID_TRANSACTION_RECHECK_INTERVAL; - if (validationResult == ValidationResult.TIMESTAMP_TOO_OLD) { - // Use shorter recheck interval for expired transactions - expiryLength = EXPIRED_TRANSACTION_RECHECK_INTERVAL; + // All other invalid cases: + default: { + final String signature58 = Base58.encode(transactionData.getSignature()); + LOGGER.trace(() -> String.format("Ignoring invalid (%s) %s transaction %s", validationResult.name(), transactionData.getType().name(), signature58)); + + Long now = NTP.getTime(); + if (now != null && now - transactionData.getTimestamp() > INVALID_TRANSACTION_STALE_TIMEOUT) { + Long expiryLength = INVALID_TRANSACTION_RECHECK_INTERVAL; + + if (validationResult == ValidationResult.TIMESTAMP_TOO_OLD) { + // Use shorter recheck interval for expired transactions + expiryLength = EXPIRED_TRANSACTION_RECHECK_INTERVAL; + } + + Long expiry = now + expiryLength; + LOGGER.trace("Adding stale invalid transaction {} to invalidUnconfirmedTransactions...", signature58); + // Invalid, unconfirmed transaction has become stale - add to invalidUnconfirmedTransactions so that we don't keep requesting it + invalidUnconfirmedTransactions.put(signature58, expiry); + } } - Long expiry = now + expiryLength; - LOGGER.debug("Adding stale invalid transaction {} to invalidUnconfirmedTransactions...", signature58); - // Invalid, unconfirmed transaction has become stale - add to invalidUnconfirmedTransactions so that we don't keep requesting it - invalidUnconfirmedTransactions.put(signature58, expiry); } + + // Transaction has been processed, even if only to reject it removeIncomingTransaction(transactionData.getSignature()); - continue; } - - LOGGER.debug(() -> String.format("Imported %s transaction %s", transactionData.getType().name(), Base58.encode(transactionData.getSignature()))); - removeIncomingTransaction(transactionData.getSignature()); + } finally { + LOGGER.debug("Finished processing incoming transactions queue"); + blockchainLock.unlock(); } } catch (DataException e) { - LOGGER.error(String.format("Repository issue while processing incoming transactions", e)); - } finally { - LOGGER.debug("Finished processing incoming transactions queue"); - blockchainLock.unlock(); + LOGGER.error("Repository issue while processing incoming transactions", e); } } @@ -1437,9 +1490,12 @@ public class Controller extends Thread { private void onNetworkTransactionMessage(Peer peer, Message message) { TransactionMessage transactionMessage = (TransactionMessage) message; TransactionData transactionData = transactionMessage.getTransactionData(); + if (this.incomingTransactions.size() < MAX_INCOMING_TRANSACTIONS) { - if (!this.incomingTransactions.contains(transactionData)) { - this.incomingTransactions.add(transactionData); + synchronized (this.incomingTransactions) { + if (!incomingTransactionQueueContains(transactionData.getSignature())) { + this.incomingTransactions.put(transactionData, Boolean.FALSE); + } } } } From a9371f0a9064e21fffb05a49119ff019fdbdcd19 Mon Sep 17 00:00:00 2001 From: catbref Date: Thu, 3 Mar 2022 20:32:27 +0000 Subject: [PATCH 2/2] In Controller.processIncomingTransactionsQueue(), don't bother with 2nd-phase of locking blockchain and importing if there are no valid signature transactions to actually import --- src/main/java/org/qortal/controller/Controller.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/qortal/controller/Controller.java b/src/main/java/org/qortal/controller/Controller.java index 4e2d0b72..ede953c2 100644 --- a/src/main/java/org/qortal/controller/Controller.java +++ b/src/main/java/org/qortal/controller/Controller.java @@ -847,7 +847,7 @@ public class Controller extends Thread { private void processIncomingTransactionsQueue() { if (this.incomingTransactions.isEmpty()) { - // Don't bother locking if there are no new transactions to process + // Nothing to do? return; } @@ -912,6 +912,11 @@ public class Controller extends Thread { sigValidTransactions.add(transaction); } + if (sigValidTransactions.isEmpty()) { + // Don't bother locking if there are no new transactions to process + return; + } + try { ReentrantLock blockchainLock = Controller.getInstance().getBlockchainLock(); if (!blockchainLock.tryLock(2, TimeUnit.SECONDS)) {