mirror of
https://github.com/Qortal/qortal.git
synced 2025-03-13 11:12:31 +00:00
Rework of processIncomingTransactionsQueue() so that it no longer holds the lock while processing.
This should fix an issue where network threads could be blocked when new transactions arrived, due to waiting for the incomingTransactions lock to free up.
This commit is contained in:
parent
b72153f62b
commit
9630625449
@ -830,6 +830,10 @@ public class Controller extends Thread {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void removeIncomingTransaction(byte[] signature) {
|
||||||
|
incomingTransactions.removeIf(t -> Arrays.equals(t.getSignature(), signature));
|
||||||
|
}
|
||||||
|
|
||||||
private void processIncomingTransactionsQueue() {
|
private void processIncomingTransactionsQueue() {
|
||||||
if (this.incomingTransactions.size() == 0) {
|
if (this.incomingTransactions.size() == 0) {
|
||||||
// Don't bother locking if there are no new transactions to process
|
// Don't bother locking if there are no new transactions to process
|
||||||
@ -853,70 +857,73 @@ public class Controller extends Thread {
|
|||||||
}
|
}
|
||||||
|
|
||||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||||
|
LOGGER.debug("Processing incoming transactions queue (size {})...", this.incomingTransactions.size());
|
||||||
|
|
||||||
|
// Take a copy of incomingTransactions so we can release the lock
|
||||||
|
List<TransactionData>incomingTransactionsCopy = new ArrayList<>(this.incomingTransactions);
|
||||||
|
|
||||||
// Iterate through incoming transactions list
|
// Iterate through incoming transactions list
|
||||||
synchronized (this.incomingTransactions) { // Required in order to safely iterate a synchronizedList()
|
Iterator iterator = incomingTransactionsCopy.iterator();
|
||||||
Iterator iterator = this.incomingTransactions.iterator();
|
while (iterator.hasNext()) {
|
||||||
while (iterator.hasNext()) {
|
if (isStopping) {
|
||||||
if (isStopping) {
|
return;
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (Synchronizer.getInstance().isSyncRequestPending()) {
|
|
||||||
LOGGER.debug("Breaking out of transaction processing loop with {} remaining, because a sync request is pending", this.incomingTransactions.size());
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
TransactionData transactionData = (TransactionData) iterator.next();
|
|
||||||
Transaction transaction = Transaction.fromData(repository, transactionData);
|
|
||||||
|
|
||||||
// Check signature
|
|
||||||
if (!transaction.isSignatureValid()) {
|
|
||||||
LOGGER.trace(() -> String.format("Ignoring %s transaction %s with invalid signature", transactionData.getType().name(), Base58.encode(transactionData.getSignature())));
|
|
||||||
iterator.remove();
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
ValidationResult validationResult = transaction.importAsUnconfirmed();
|
|
||||||
|
|
||||||
if (validationResult == ValidationResult.TRANSACTION_ALREADY_EXISTS) {
|
|
||||||
LOGGER.trace(() -> String.format("Ignoring existing transaction %s", Base58.encode(transactionData.getSignature())));
|
|
||||||
iterator.remove();
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (validationResult == ValidationResult.NO_BLOCKCHAIN_LOCK) {
|
|
||||||
LOGGER.trace(() -> String.format("Couldn't lock blockchain to import unconfirmed transaction", Base58.encode(transactionData.getSignature())));
|
|
||||||
iterator.remove();
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (validationResult != ValidationResult.OK) {
|
|
||||||
final String signature58 = Base58.encode(transactionData.getSignature());
|
|
||||||
LOGGER.trace(() -> String.format("Ignoring invalid (%s) %s transaction %s", validationResult.name(), transactionData.getType().name(), signature58));
|
|
||||||
Long now = NTP.getTime();
|
|
||||||
if (now != null && now - transactionData.getTimestamp() > INVALID_TRANSACTION_STALE_TIMEOUT) {
|
|
||||||
Long expiryLength = INVALID_TRANSACTION_RECHECK_INTERVAL;
|
|
||||||
if (validationResult == ValidationResult.TIMESTAMP_TOO_OLD) {
|
|
||||||
// Use shorter recheck interval for expired transactions
|
|
||||||
expiryLength = EXPIRED_TRANSACTION_RECHECK_INTERVAL;
|
|
||||||
}
|
|
||||||
Long expiry = now + expiryLength;
|
|
||||||
LOGGER.debug("Adding stale invalid transaction {} to invalidUnconfirmedTransactions...", signature58);
|
|
||||||
// Invalid, unconfirmed transaction has become stale - add to invalidUnconfirmedTransactions so that we don't keep requesting it
|
|
||||||
invalidUnconfirmedTransactions.put(signature58, expiry);
|
|
||||||
}
|
|
||||||
iterator.remove();
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
LOGGER.debug(() -> String.format("Imported %s transaction %s", transactionData.getType().name(), Base58.encode(transactionData.getSignature())));
|
|
||||||
iterator.remove();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (Synchronizer.getInstance().isSyncRequestPending()) {
|
||||||
|
LOGGER.debug("Breaking out of transaction processing loop with {} remaining, because a sync request is pending", incomingTransactionsCopy.size());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
TransactionData transactionData = (TransactionData) iterator.next();
|
||||||
|
Transaction transaction = Transaction.fromData(repository, transactionData);
|
||||||
|
|
||||||
|
// Check signature
|
||||||
|
if (!transaction.isSignatureValid()) {
|
||||||
|
LOGGER.trace(() -> String.format("Ignoring %s transaction %s with invalid signature", transactionData.getType().name(), Base58.encode(transactionData.getSignature())));
|
||||||
|
removeIncomingTransaction(transactionData.getSignature());
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
ValidationResult validationResult = transaction.importAsUnconfirmed();
|
||||||
|
|
||||||
|
if (validationResult == ValidationResult.TRANSACTION_ALREADY_EXISTS) {
|
||||||
|
LOGGER.trace(() -> String.format("Ignoring existing transaction %s", Base58.encode(transactionData.getSignature())));
|
||||||
|
removeIncomingTransaction(transactionData.getSignature());
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (validationResult == ValidationResult.NO_BLOCKCHAIN_LOCK) {
|
||||||
|
LOGGER.trace(() -> String.format("Couldn't lock blockchain to import unconfirmed transaction", Base58.encode(transactionData.getSignature())));
|
||||||
|
removeIncomingTransaction(transactionData.getSignature());
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (validationResult != ValidationResult.OK) {
|
||||||
|
final String signature58 = Base58.encode(transactionData.getSignature());
|
||||||
|
LOGGER.trace(() -> String.format("Ignoring invalid (%s) %s transaction %s", validationResult.name(), transactionData.getType().name(), signature58));
|
||||||
|
Long now = NTP.getTime();
|
||||||
|
if (now != null && now - transactionData.getTimestamp() > INVALID_TRANSACTION_STALE_TIMEOUT) {
|
||||||
|
Long expiryLength = INVALID_TRANSACTION_RECHECK_INTERVAL;
|
||||||
|
if (validationResult == ValidationResult.TIMESTAMP_TOO_OLD) {
|
||||||
|
// Use shorter recheck interval for expired transactions
|
||||||
|
expiryLength = EXPIRED_TRANSACTION_RECHECK_INTERVAL;
|
||||||
|
}
|
||||||
|
Long expiry = now + expiryLength;
|
||||||
|
LOGGER.debug("Adding stale invalid transaction {} to invalidUnconfirmedTransactions...", signature58);
|
||||||
|
// Invalid, unconfirmed transaction has become stale - add to invalidUnconfirmedTransactions so that we don't keep requesting it
|
||||||
|
invalidUnconfirmedTransactions.put(signature58, expiry);
|
||||||
|
}
|
||||||
|
removeIncomingTransaction(transactionData.getSignature());
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
LOGGER.debug(() -> String.format("Imported %s transaction %s", transactionData.getType().name(), Base58.encode(transactionData.getSignature())));
|
||||||
|
removeIncomingTransaction(transactionData.getSignature());
|
||||||
}
|
}
|
||||||
} catch (DataException e) {
|
} catch (DataException e) {
|
||||||
LOGGER.error(String.format("Repository issue while processing incoming transactions", e));
|
LOGGER.error(String.format("Repository issue while processing incoming transactions", e));
|
||||||
} finally {
|
} finally {
|
||||||
|
LOGGER.debug("Finished processing incoming transactions queue");
|
||||||
blockchainLock.unlock();
|
blockchainLock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user