diff --git a/src/main/java/org/qortal/controller/Controller.java b/src/main/java/org/qortal/controller/Controller.java index 0c9e30f6..0026be41 100644 --- a/src/main/java/org/qortal/controller/Controller.java +++ b/src/main/java/org/qortal/controller/Controller.java @@ -830,6 +830,10 @@ public class Controller extends Thread { } } + private void removeIncomingTransaction(byte[] signature) { + incomingTransactions.removeIf(t -> Arrays.equals(t.getSignature(), signature)); + } + private void processIncomingTransactionsQueue() { if (this.incomingTransactions.size() == 0) { // Don't bother locking if there are no new transactions to process @@ -853,70 +857,73 @@ public class Controller extends Thread { } try (final Repository repository = RepositoryManager.getRepository()) { + LOGGER.debug("Processing incoming transactions queue (size {})...", this.incomingTransactions.size()); + + // Take a copy of incomingTransactions so we can release the lock + ListincomingTransactionsCopy = new ArrayList<>(this.incomingTransactions); // Iterate through incoming transactions list - synchronized (this.incomingTransactions) { // Required in order to safely iterate a synchronizedList() - Iterator iterator = this.incomingTransactions.iterator(); - while (iterator.hasNext()) { - if (isStopping) { - return; - } + Iterator iterator = incomingTransactionsCopy.iterator(); + while (iterator.hasNext()) { + if (isStopping) { + return; + } - if (Synchronizer.getInstance().isSyncRequestPending()) { - LOGGER.debug("Breaking out of transaction processing loop with {} remaining, because a sync request is pending", this.incomingTransactions.size()); - return; - } + if (Synchronizer.getInstance().isSyncRequestPending()) { + LOGGER.debug("Breaking out of transaction processing loop with {} remaining, because a sync request is pending", incomingTransactionsCopy.size()); + return; + } - TransactionData transactionData = (TransactionData) iterator.next(); - Transaction transaction = Transaction.fromData(repository, transactionData); + TransactionData transactionData = (TransactionData) iterator.next(); + Transaction transaction = Transaction.fromData(repository, transactionData); - // Check signature - if (!transaction.isSignatureValid()) { - LOGGER.trace(() -> String.format("Ignoring %s transaction %s with invalid signature", transactionData.getType().name(), Base58.encode(transactionData.getSignature()))); - iterator.remove(); - continue; - } + // Check signature + if (!transaction.isSignatureValid()) { + LOGGER.trace(() -> String.format("Ignoring %s transaction %s with invalid signature", transactionData.getType().name(), Base58.encode(transactionData.getSignature()))); + removeIncomingTransaction(transactionData.getSignature()); + continue; + } - ValidationResult validationResult = transaction.importAsUnconfirmed(); + ValidationResult validationResult = transaction.importAsUnconfirmed(); - if (validationResult == ValidationResult.TRANSACTION_ALREADY_EXISTS) { - LOGGER.trace(() -> String.format("Ignoring existing transaction %s", Base58.encode(transactionData.getSignature()))); - iterator.remove(); - continue; - } + if (validationResult == ValidationResult.TRANSACTION_ALREADY_EXISTS) { + LOGGER.trace(() -> String.format("Ignoring existing transaction %s", Base58.encode(transactionData.getSignature()))); + removeIncomingTransaction(transactionData.getSignature()); + continue; + } - if (validationResult == ValidationResult.NO_BLOCKCHAIN_LOCK) { - LOGGER.trace(() -> String.format("Couldn't lock blockchain to import unconfirmed transaction", Base58.encode(transactionData.getSignature()))); - iterator.remove(); - continue; - } + if (validationResult == ValidationResult.NO_BLOCKCHAIN_LOCK) { + LOGGER.trace(() -> String.format("Couldn't lock blockchain to import unconfirmed transaction", Base58.encode(transactionData.getSignature()))); + removeIncomingTransaction(transactionData.getSignature()); + continue; + } - if (validationResult != ValidationResult.OK) { - final String signature58 = Base58.encode(transactionData.getSignature()); - LOGGER.trace(() -> String.format("Ignoring invalid (%s) %s transaction %s", validationResult.name(), transactionData.getType().name(), signature58)); - Long now = NTP.getTime(); - if (now != null && now - transactionData.getTimestamp() > INVALID_TRANSACTION_STALE_TIMEOUT) { - Long expiryLength = INVALID_TRANSACTION_RECHECK_INTERVAL; - if (validationResult == ValidationResult.TIMESTAMP_TOO_OLD) { - // Use shorter recheck interval for expired transactions - expiryLength = EXPIRED_TRANSACTION_RECHECK_INTERVAL; - } - Long expiry = now + expiryLength; - LOGGER.debug("Adding stale invalid transaction {} to invalidUnconfirmedTransactions...", signature58); - // Invalid, unconfirmed transaction has become stale - add to invalidUnconfirmedTransactions so that we don't keep requesting it - invalidUnconfirmedTransactions.put(signature58, expiry); + if (validationResult != ValidationResult.OK) { + final String signature58 = Base58.encode(transactionData.getSignature()); + LOGGER.trace(() -> String.format("Ignoring invalid (%s) %s transaction %s", validationResult.name(), transactionData.getType().name(), signature58)); + Long now = NTP.getTime(); + if (now != null && now - transactionData.getTimestamp() > INVALID_TRANSACTION_STALE_TIMEOUT) { + Long expiryLength = INVALID_TRANSACTION_RECHECK_INTERVAL; + if (validationResult == ValidationResult.TIMESTAMP_TOO_OLD) { + // Use shorter recheck interval for expired transactions + expiryLength = EXPIRED_TRANSACTION_RECHECK_INTERVAL; } - iterator.remove(); - continue; + Long expiry = now + expiryLength; + LOGGER.debug("Adding stale invalid transaction {} to invalidUnconfirmedTransactions...", signature58); + // Invalid, unconfirmed transaction has become stale - add to invalidUnconfirmedTransactions so that we don't keep requesting it + invalidUnconfirmedTransactions.put(signature58, expiry); } - - LOGGER.debug(() -> String.format("Imported %s transaction %s", transactionData.getType().name(), Base58.encode(transactionData.getSignature()))); - iterator.remove(); + removeIncomingTransaction(transactionData.getSignature()); + continue; } + + LOGGER.debug(() -> String.format("Imported %s transaction %s", transactionData.getType().name(), Base58.encode(transactionData.getSignature()))); + removeIncomingTransaction(transactionData.getSignature()); } } catch (DataException e) { LOGGER.error(String.format("Repository issue while processing incoming transactions", e)); } finally { + LOGGER.debug("Finished processing incoming transactions queue"); blockchainLock.unlock(); } }