diff --git a/src/main/java/org/qortal/controller/TransactionImporter.java b/src/main/java/org/qortal/controller/TransactionImporter.java index 84198a7d..ea20e8d7 100644 --- a/src/main/java/org/qortal/controller/TransactionImporter.java +++ b/src/main/java/org/qortal/controller/TransactionImporter.java @@ -20,6 +20,7 @@ import org.qortal.utils.NTP; import java.util.*; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; public class TransactionImporter extends Thread { @@ -63,7 +64,9 @@ public class TransactionImporter extends Thread { Thread.sleep(1000L); // Process incoming transactions queue - processIncomingTransactionsQueue(); + validateTransactionsInQueue(); + importTransactionsInQueue(); + // Clean up invalid incoming transactions list cleanupInvalidTransactionsList(NTP.getTime()); } @@ -90,7 +93,24 @@ public class TransactionImporter extends Thread { incomingTransactions.keySet().removeIf(t -> Arrays.equals(t.getSignature(), signature)); } - private void processIncomingTransactionsQueue() { + /** + * Retrieve all pending unconfirmed transactions that have had their signatures validated. + * @return a list of TransactionData objects, with valid signatures. + */ + private List getCachedSigValidTransactions() { + return this.incomingTransactions.entrySet().stream() + .filter(t -> Boolean.TRUE.equals(t.getValue())) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + } + + /** + * Validate the signatures of any transactions pending import, then update their + * entries in the queue to mark them as valid/invalid. + * + * No database lock is required. + */ + private void validateTransactionsInQueue() { if (this.incomingTransactions.isEmpty()) { // Nothing to do? return; @@ -127,6 +147,8 @@ public class TransactionImporter extends Thread { if (isLiteNode) { // Lite nodes can't easily validate transactions, so for now we will have to assume that everything is valid sigValidTransactions.add(transaction); + // Add mark signature as valid if transaction still exists in import queue + incomingTransactions.computeIfPresent(transactionData, (k, v) -> Boolean.TRUE); continue; } @@ -166,30 +188,42 @@ public class TransactionImporter extends Thread { LOGGER.debug("Finished validating signatures in incoming transactions queue (valid this round: {}, total pending import: {})...", validatedCount, sigValidTransactions.size()); } - if (sigValidTransactions.isEmpty()) { - // Don't bother locking if there are no new transactions to process - return; - } + } catch (DataException e) { + LOGGER.error("Repository issue while processing incoming transactions", e); + } + } - if (Synchronizer.getInstance().isSyncRequested() || Synchronizer.getInstance().isSynchronizing()) { - // Prioritize syncing, and don't attempt to lock - // Signature validity is retained in the incomingTransactions map, to avoid the above work being wasted - return; - } + /** + * Import any transactions in the queue that have valid signatures. + * + * A database lock is required. + */ + private void importTransactionsInQueue() { + List sigValidTransactions = this.getCachedSigValidTransactions(); + if (sigValidTransactions.isEmpty()) { + // Don't bother locking if there are no new transactions to process + return; + } - try { - ReentrantLock blockchainLock = Controller.getInstance().getBlockchainLock(); - if (!blockchainLock.tryLock(2, TimeUnit.SECONDS)) { - // Signature validity is retained in the incomingTransactions map, to avoid the above work being wasted - LOGGER.debug("Too busy to process incoming transactions queue"); - return; - } - } catch (InterruptedException e) { - LOGGER.debug("Interrupted when trying to acquire blockchain lock"); + if (Synchronizer.getInstance().isSyncRequested() || Synchronizer.getInstance().isSynchronizing()) { + // Prioritize syncing, and don't attempt to lock + return; + } + + try { + ReentrantLock blockchainLock = Controller.getInstance().getBlockchainLock(); + if (!blockchainLock.tryLock(2, TimeUnit.SECONDS)) { + LOGGER.debug("Too busy to process incoming transactions queue"); return; } + } catch (InterruptedException e) { + LOGGER.debug("Interrupted when trying to acquire blockchain lock"); + return; + } + + LOGGER.debug("Processing incoming transactions queue (size {})...", sigValidTransactions.size()); - LOGGER.debug("Processing incoming transactions queue (size {})...", sigValidTransactions.size()); + try (final Repository repository = RepositoryManager.getRepository()) { // Import transactions with valid signatures try { @@ -203,8 +237,8 @@ public class TransactionImporter extends Thread { return; } - Transaction transaction = sigValidTransactions.get(i); - TransactionData transactionData = transaction.getTransactionData(); + TransactionData transactionData = sigValidTransactions.get(i); + Transaction transaction = Transaction.fromData(repository, transactionData); Transaction.ValidationResult validationResult = transaction.importAsUnconfirmed();