Browse Source

Merge pull request #73 from catbref/incoming-txns-rework

Reworking of Controller.processIncomingTransactionsQueue()
name-fixes
CalDescent 3 years ago committed by GitHub
parent
commit
a19e1f06c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 151
      src/main/java/org/qortal/controller/Controller.java

151
src/main/java/org/qortal/controller/Controller.java

@ -143,11 +143,11 @@ public class Controller extends Thread {
/** Whether we can mint new blocks, as reported by BlockMinter. */
private volatile boolean isMintingPossible = false;
/** List of incoming transaction that are in the import queue */
private List<TransactionData> incomingTransactions = Collections.synchronizedList(new ArrayList<>());
/** Map of incoming transaction that are in the import queue. Key is transaction data, value is whether signature has been validated. */
private final Map<TransactionData, Boolean> incomingTransactions = Collections.synchronizedMap(new HashMap<>());
/** List of recent invalid unconfirmed transactions */
private Map<String, Long> invalidUnconfirmedTransactions = Collections.synchronizedMap(new HashMap<>());
/** Map of recent invalid unconfirmed transactions. Key is base58 transaction signature, value is do-not-request expiry timestamp. */
private final Map<String, Long> invalidUnconfirmedTransactions = Collections.synchronizedMap(new HashMap<>());
/** Lock for only allowing one blockchain-modifying codepath at a time. e.g. synchronization or newly minted block. */
private final ReentrantLock blockchainLock = new ReentrantLock();
@ -837,17 +837,17 @@ public class Controller extends Thread {
private boolean incomingTransactionQueueContains(byte[] signature) {
synchronized (incomingTransactions) {
return incomingTransactions.stream().anyMatch(t -> Arrays.equals(t.getSignature(), signature));
return incomingTransactions.keySet().stream().anyMatch(t -> Arrays.equals(t.getSignature(), signature));
}
}
private void removeIncomingTransaction(byte[] signature) {
incomingTransactions.removeIf(t -> Arrays.equals(t.getSignature(), signature));
incomingTransactions.keySet().removeIf(t -> Arrays.equals(t.getSignature(), signature));
}
private void processIncomingTransactionsQueue() {
if (this.incomingTransactions.size() == 0) {
// Don't bother locking if there are no new transactions to process
if (this.incomingTransactions.isEmpty()) {
// Nothing to do?
return;
}
@ -856,87 +856,145 @@ public class Controller extends Thread {
return;
}
try (final Repository repository = RepositoryManager.getRepository()) {
// Take a snapshot of incomingTransactions, so we don't need to lock it while processing
Map<TransactionData, Boolean> incomingTransactionsCopy = Map.copyOf(this.incomingTransactions);
LOGGER.debug("Processing incoming transactions queue (size {})...", incomingTransactionsCopy.size());
List<Transaction> sigValidTransactions = new ArrayList<>();
// Signature validation round - does not require blockchain lock
for (Map.Entry<TransactionData, Boolean> transactionEntry : incomingTransactionsCopy.entrySet()) {
// Quick exit?
if (isStopping) {
return;
}
if (Synchronizer.getInstance().isSyncRequestPending()) {
LOGGER.debug("Breaking out of transaction signature validation with {} remaining, because a sync request is pending", incomingTransactionsCopy.size());
// Fall-through to importing, or we could not even attempt to import by changing following line to 'return'
break;
}
TransactionData transactionData = transactionEntry.getKey();
Transaction transaction = Transaction.fromData(repository, transactionData);
// Only validate signature if we haven't already done so
Boolean isSigValid = transactionEntry.getValue();
if (!Boolean.TRUE.equals(isSigValid)) {
if (!transaction.isSignatureValid()) {
String signature58 = Base58.encode(transactionData.getSignature());
LOGGER.trace("Ignoring {} transaction {} with invalid signature", transactionData.getType().name(), signature58);
removeIncomingTransaction(transactionData.getSignature());
// Also add to invalidIncomingTransactions map
Long now = NTP.getTime();
if (now != null) {
Long expiry = now + INVALID_TRANSACTION_RECHECK_INTERVAL;
LOGGER.trace("Adding stale invalid transaction {} to invalidUnconfirmedTransactions...", signature58);
// Add to invalidUnconfirmedTransactions so that we don't keep requesting it
invalidUnconfirmedTransactions.put(signature58, expiry);
}
continue;
}
// Add mark signature as valid if transaction still exists in import queue
incomingTransactions.computeIfPresent(transactionData, (k, v) -> Boolean.TRUE);
} else {
LOGGER.trace(() -> String.format("Transaction %s known to have valid signature", Base58.encode(transactionData.getSignature())));
}
// Signature valid - add to shortlist
sigValidTransactions.add(transaction);
}
if (sigValidTransactions.isEmpty()) {
// Don't bother locking if there are no new transactions to process
return;
}
try {
ReentrantLock blockchainLock = Controller.getInstance().getBlockchainLock();
if (!blockchainLock.tryLock(2, TimeUnit.SECONDS)) {
LOGGER.trace(() -> String.format("Too busy to process incoming transactions queue"));
// This is not great if we've just spent a while doing mem-PoW during signature validation round above
LOGGER.debug("Too busy to process incoming transactions queue");
return;
}
} catch (InterruptedException e) {
LOGGER.info("Interrupted when trying to acquire blockchain lock");
LOGGER.debug("Interrupted when trying to acquire blockchain lock");
return;
}
try (final Repository repository = RepositoryManager.getRepository()) {
LOGGER.debug("Processing incoming transactions queue (size {})...", this.incomingTransactions.size());
// Take a copy of incomingTransactions so we can release the lock
List<TransactionData>incomingTransactionsCopy = new ArrayList<>(this.incomingTransactions);
// Iterate through incoming transactions list
Iterator iterator = incomingTransactionsCopy.iterator();
while (iterator.hasNext()) {
// Import transactions with valid signatures
try {
for (int i = 0; i < sigValidTransactions.size(); ++i) {
if (isStopping) {
return;
}
if (Synchronizer.getInstance().isSyncRequestPending()) {
LOGGER.debug("Breaking out of transaction processing loop with {} remaining, because a sync request is pending", incomingTransactionsCopy.size());
LOGGER.debug("Breaking out of transaction processing with {} remaining, because a sync request is pending", sigValidTransactions.size() - i);
return;
}
TransactionData transactionData = (TransactionData) iterator.next();
Transaction transaction = Transaction.fromData(repository, transactionData);
// Check signature
if (!transaction.isSignatureValid()) {
LOGGER.trace(() -> String.format("Ignoring %s transaction %s with invalid signature", transactionData.getType().name(), Base58.encode(transactionData.getSignature())));
removeIncomingTransaction(transactionData.getSignature());
continue;
}
Transaction transaction = sigValidTransactions.get(i);
TransactionData transactionData = transaction.getTransactionData();
ValidationResult validationResult = transaction.importAsUnconfirmed();
if (validationResult == ValidationResult.TRANSACTION_ALREADY_EXISTS) {
switch (validationResult) {
case TRANSACTION_ALREADY_EXISTS: {
LOGGER.trace(() -> String.format("Ignoring existing transaction %s", Base58.encode(transactionData.getSignature())));
removeIncomingTransaction(transactionData.getSignature());
continue;
break;
}
if (validationResult == ValidationResult.NO_BLOCKCHAIN_LOCK) {
LOGGER.trace(() -> String.format("Couldn't lock blockchain to import unconfirmed transaction", Base58.encode(transactionData.getSignature())));
removeIncomingTransaction(transactionData.getSignature());
continue;
case NO_BLOCKCHAIN_LOCK: {
// Is this even possible considering we acquired blockchain lock above?
LOGGER.trace(() -> String.format("Couldn't lock blockchain to import unconfirmed transaction %s", Base58.encode(transactionData.getSignature())));
break;
}
case OK: {
LOGGER.debug(() -> String.format("Imported %s transaction %s", transactionData.getType().name(), Base58.encode(transactionData.getSignature())));
break;
}
if (validationResult != ValidationResult.OK) {
// All other invalid cases:
default: {
final String signature58 = Base58.encode(transactionData.getSignature());
LOGGER.trace(() -> String.format("Ignoring invalid (%s) %s transaction %s", validationResult.name(), transactionData.getType().name(), signature58));
Long now = NTP.getTime();
if (now != null && now - transactionData.getTimestamp() > INVALID_TRANSACTION_STALE_TIMEOUT) {
Long expiryLength = INVALID_TRANSACTION_RECHECK_INTERVAL;
if (validationResult == ValidationResult.TIMESTAMP_TOO_OLD) {
// Use shorter recheck interval for expired transactions
expiryLength = EXPIRED_TRANSACTION_RECHECK_INTERVAL;
}
Long expiry = now + expiryLength;
LOGGER.debug("Adding stale invalid transaction {} to invalidUnconfirmedTransactions...", signature58);
LOGGER.trace("Adding stale invalid transaction {} to invalidUnconfirmedTransactions...", signature58);
// Invalid, unconfirmed transaction has become stale - add to invalidUnconfirmedTransactions so that we don't keep requesting it
invalidUnconfirmedTransactions.put(signature58, expiry);
}
removeIncomingTransaction(transactionData.getSignature());
continue;
}
}
LOGGER.debug(() -> String.format("Imported %s transaction %s", transactionData.getType().name(), Base58.encode(transactionData.getSignature())));
// Transaction has been processed, even if only to reject it
removeIncomingTransaction(transactionData.getSignature());
}
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while processing incoming transactions", e));
} finally {
LOGGER.debug("Finished processing incoming transactions queue");
blockchainLock.unlock();
}
} catch (DataException e) {
LOGGER.error("Repository issue while processing incoming transactions", e);
}
}
private void cleanupInvalidTransactionsList(Long now) {
@ -1437,9 +1495,12 @@ public class Controller extends Thread {
private void onNetworkTransactionMessage(Peer peer, Message message) {
TransactionMessage transactionMessage = (TransactionMessage) message;
TransactionData transactionData = transactionMessage.getTransactionData();
if (this.incomingTransactions.size() < MAX_INCOMING_TRANSACTIONS) {
if (!this.incomingTransactions.contains(transactionData)) {
this.incomingTransactions.add(transactionData);
synchronized (this.incomingTransactions) {
if (!incomingTransactionQueueContains(transactionData.getSignature())) {
this.incomingTransactions.put(transactionData, Boolean.FALSE);
}
}
}
}

Loading…
Cancel
Save