From ab0fc07ee974e618a4c1af4638a443e80b0af8fa Mon Sep 17 00:00:00 2001 From: CalDescent Date: Fri, 13 May 2022 12:31:19 +0100 Subject: [PATCH] Refactored transaction importer, to separate signature validation from importing. Importing has to be single threaded since it requires the database lock, but there's nothing to stop us from validating signatures on multiple threads, as no lock is required. So it makes sense to separate these two functions to allow for possible multi threaded signature validation in the future, to speed up the process. Everything remains single threaded in this commit. It should be functionally the same as before, to reduce risk. --- .../controller/TransactionImporter.java | 80 +++++++++++++------ 1 file changed, 57 insertions(+), 23 deletions(-) diff --git a/src/main/java/org/qortal/controller/TransactionImporter.java b/src/main/java/org/qortal/controller/TransactionImporter.java index 84198a7d..ea20e8d7 100644 --- a/src/main/java/org/qortal/controller/TransactionImporter.java +++ b/src/main/java/org/qortal/controller/TransactionImporter.java @@ -20,6 +20,7 @@ import org.qortal.utils.NTP; import java.util.*; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; public class TransactionImporter extends Thread { @@ -63,7 +64,9 @@ public class TransactionImporter extends Thread { Thread.sleep(1000L); // Process incoming transactions queue - processIncomingTransactionsQueue(); + validateTransactionsInQueue(); + importTransactionsInQueue(); + // Clean up invalid incoming transactions list cleanupInvalidTransactionsList(NTP.getTime()); } @@ -90,7 +93,24 @@ public class TransactionImporter extends Thread { incomingTransactions.keySet().removeIf(t -> Arrays.equals(t.getSignature(), signature)); } - private void processIncomingTransactionsQueue() { + /** + * Retrieve all pending unconfirmed transactions that have had their signatures validated. + * @return a list of TransactionData objects, with valid signatures. + */ + private List getCachedSigValidTransactions() { + return this.incomingTransactions.entrySet().stream() + .filter(t -> Boolean.TRUE.equals(t.getValue())) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + } + + /** + * Validate the signatures of any transactions pending import, then update their + * entries in the queue to mark them as valid/invalid. + * + * No database lock is required. + */ + private void validateTransactionsInQueue() { if (this.incomingTransactions.isEmpty()) { // Nothing to do? return; @@ -127,6 +147,8 @@ public class TransactionImporter extends Thread { if (isLiteNode) { // Lite nodes can't easily validate transactions, so for now we will have to assume that everything is valid sigValidTransactions.add(transaction); + // Add mark signature as valid if transaction still exists in import queue + incomingTransactions.computeIfPresent(transactionData, (k, v) -> Boolean.TRUE); continue; } @@ -166,30 +188,42 @@ public class TransactionImporter extends Thread { LOGGER.debug("Finished validating signatures in incoming transactions queue (valid this round: {}, total pending import: {})...", validatedCount, sigValidTransactions.size()); } - if (sigValidTransactions.isEmpty()) { - // Don't bother locking if there are no new transactions to process - return; - } + } catch (DataException e) { + LOGGER.error("Repository issue while processing incoming transactions", e); + } + } - if (Synchronizer.getInstance().isSyncRequested() || Synchronizer.getInstance().isSynchronizing()) { - // Prioritize syncing, and don't attempt to lock - // Signature validity is retained in the incomingTransactions map, to avoid the above work being wasted - return; - } + /** + * Import any transactions in the queue that have valid signatures. + * + * A database lock is required. + */ + private void importTransactionsInQueue() { + List sigValidTransactions = this.getCachedSigValidTransactions(); + if (sigValidTransactions.isEmpty()) { + // Don't bother locking if there are no new transactions to process + return; + } - try { - ReentrantLock blockchainLock = Controller.getInstance().getBlockchainLock(); - if (!blockchainLock.tryLock(2, TimeUnit.SECONDS)) { - // Signature validity is retained in the incomingTransactions map, to avoid the above work being wasted - LOGGER.debug("Too busy to process incoming transactions queue"); - return; - } - } catch (InterruptedException e) { - LOGGER.debug("Interrupted when trying to acquire blockchain lock"); + if (Synchronizer.getInstance().isSyncRequested() || Synchronizer.getInstance().isSynchronizing()) { + // Prioritize syncing, and don't attempt to lock + return; + } + + try { + ReentrantLock blockchainLock = Controller.getInstance().getBlockchainLock(); + if (!blockchainLock.tryLock(2, TimeUnit.SECONDS)) { + LOGGER.debug("Too busy to process incoming transactions queue"); return; } + } catch (InterruptedException e) { + LOGGER.debug("Interrupted when trying to acquire blockchain lock"); + return; + } + + LOGGER.debug("Processing incoming transactions queue (size {})...", sigValidTransactions.size()); - LOGGER.debug("Processing incoming transactions queue (size {})...", sigValidTransactions.size()); + try (final Repository repository = RepositoryManager.getRepository()) { // Import transactions with valid signatures try { @@ -203,8 +237,8 @@ public class TransactionImporter extends Thread { return; } - Transaction transaction = sigValidTransactions.get(i); - TransactionData transactionData = transaction.getTransactionData(); + TransactionData transactionData = sigValidTransactions.get(i); + Transaction transaction = Transaction.fromData(repository, transactionData); Transaction.ValidationResult validationResult = transaction.importAsUnconfirmed();