@ -143,11 +143,11 @@ public class Controller extends Thread {
/** Whether we can mint new blocks, as reported by BlockMinter. */
private volatile boolean isMintingPossible = false ;
/** List of incoming transaction that are in the import queue */
private List < TransactionData > incomingTransactions = Collections . synchronizedList ( new ArrayList < > ( ) ) ;
/** Map of incoming transaction that are in the import queue. Key is transaction data, value is whether signature has been validated. */
private final Map < TransactionData , Boolean > incomingTransactions = Collections . synchronizedMap ( new HashMap < > ( ) ) ;
/** List of recent invalid unconfirmed transactions */
private Map < String , Long > invalidUnconfirmedTransactions = Collections . synchronizedMap ( new HashMap < > ( ) ) ;
/** Map of recent invalid unconfirmed transactions. Key is base58 transaction signature, value is do-not-request expiry timestamp. */
private final Map < String , Long > invalidUnconfirmedTransactions = Collections . synchronizedMap ( new HashMap < > ( ) ) ;
/** Lock for only allowing one blockchain-modifying codepath at a time. e.g. synchronization or newly minted block. */
private final ReentrantLock blockchainLock = new ReentrantLock ( ) ;
@ -837,16 +837,16 @@ public class Controller extends Thread {
private boolean incomingTransactionQueueContains ( byte [ ] signature ) {
synchronized ( incomingTransactions ) {
return incomingTransactions . stream ( ) . anyMatch ( t - > Arrays . equals ( t . getSignature ( ) , signature ) ) ;
return incomingTransactions . keySet ( ) . stream ( ) . anyMatch ( t - > Arrays . equals ( t . getSignature ( ) , signature ) ) ;
}
}
private void removeIncomingTransaction ( byte [ ] signature ) {
incomingTransactions . removeIf ( t - > Arrays . equals ( t . getSignature ( ) , signature ) ) ;
incomingTransactions . keySet ( ) . removeIf ( t - > Arrays . equals ( t . getSignature ( ) , signature ) ) ;
}
private void processIncomingTransactionsQueue ( ) {
if ( this . incomingTransactions . size ( ) = = 0 ) {
if ( this . incomingTransactions . isEmpty ( ) ) {
// Don't bother locking if there are no new transactions to process
return ;
}
@ -856,86 +856,139 @@ public class Controller extends Thread {
return ;
}
try {
ReentrantLock blockchainLock = Controller . getInstance ( ) . getBlockchainLock ( ) ;
if ( ! blockchainLock . tryLock ( 2 , TimeUnit . SECONDS ) ) {
LOGGER . trace ( ( ) - > String . format ( "Too busy to process incoming transactions queue" ) ) ;
return ;
}
} catch ( InterruptedException e ) {
LOGGER . info ( "Interrupted when trying to acquire blockchain lock" ) ;
return ;
}
try ( final Repository repository = RepositoryManager . getRepository ( ) ) {
LOGGER . debug ( "Processing incoming transactions queue (size {})..." , this . incomingTransactions . size ( ) ) ;
// Take a snapshot of incomingTransactions, so we don't need to lock it while processing
Map < TransactionData , Boolean > incomingTransactionsCopy = Map . copyOf ( this . incomingTransactions ) ;
// Take a copy of incomingTransactions so we can release the lock
List < TransactionData > incomingTransactionsCopy = new ArrayList < > ( this . incomingTransactions ) ;
LOGGER . debug ( "Processing incoming transactions queue (size {})..." , incomingTransactionsCopy . size ( ) ) ;
// Iterate through incoming transactions list
Iterator iterator = incomingTransactionsCopy . iterator ( ) ;
while ( iterator . hasNext ( ) ) {
List < Transaction > sigValidTransactions = new ArrayList < > ( ) ;
// Signature validation round - does not require blockchain lock
for ( Map . Entry < TransactionData , Boolean > transactionEntry : incomingTransactionsCopy . entrySet ( ) ) {
// Quick exit?
if ( isStopping ) {
return ;
}
if ( Synchronizer . getInstance ( ) . isSyncRequestPending ( ) ) {
LOGGER . debug ( "Breaking out of transaction processing loop with {} remaining, because a sync request is pending" , incomingTransactionsCopy . size ( ) ) ;
return ;
LOGGER . debug ( "Breaking out of transaction signature validation with {} remaining, because a sync request is pending" , incomingTransactionsCopy . size ( ) ) ;
// Fall-through to importing, or we could not even attempt to import by changing following line to 'return'
break ;
}
TransactionData transactionData = ( TransactionData ) iterator . next ( ) ;
TransactionData transactionData = transactionEntry . getKey ( ) ;
Transaction transaction = Transaction . fromData ( repository , transactionData ) ;
// Check signature
if ( ! transaction . isSignatureValid ( ) ) {
LOGGER . trace ( ( ) - > String . format ( "Ignoring %s transaction %s with invalid signature" , transactionData . getType ( ) . name ( ) , Base58 . encode ( transactionData . getSignature ( ) ) ) ) ;
removeIncomingTransaction ( transactionData . getSignature ( ) ) ;
continue ;
}
// Only validate signature if we haven't already done so
Boolean isSigValid = transactionEntry . getValue ( ) ;
if ( ! Boolean . TRUE . equals ( isSigValid ) ) {
if ( ! transaction . isSignatureValid ( ) ) {
String signature58 = Base58 . encode ( transactionData . getSignature ( ) ) ;
LOGGER . trace ( "Ignoring {} transaction {} with invalid signature" , transactionData . getType ( ) . name ( ) , signature58 ) ;
removeIncomingTransaction ( transactionData . getSignature ( ) ) ;
// Also add to invalidIncomingTransactions map
Long now = NTP . getTime ( ) ;
if ( now ! = null ) {
Long expiry = now + INVALID_TRANSACTION_RECHECK_INTERVAL ;
LOGGER . trace ( "Adding stale invalid transaction {} to invalidUnconfirmedTransactions..." , signature58 ) ;
// Add to invalidUnconfirmedTransactions so that we don't keep requesting it
invalidUnconfirmedTransactions . put ( signature58 , expiry ) ;
}
ValidationResult validationResult = transaction . importAsUnconfirmed ( ) ;
continue ;
}
if ( validationResult = = ValidationResult . TRANSACTION_ALREADY_EXISTS ) {
LOGGER . trace ( ( ) - > String . format ( "Ignoring existing transaction %s" , Base58 . encode ( transactionData . getSignature ( ) ) ) ) ;
removeIncomingTransaction ( transactionData . getSignature ( ) ) ;
continue ;
// Add mark signature as valid if transaction still exists in import queue
incomingTransactions . computeIfPresent ( transactionData , ( k , v ) - > Boolean . TRUE ) ;
} else {
LOGGER . trace ( ( ) - > String . format ( "Transaction %s known to have valid signature" , Base58 . encode ( transac tio nData . getSignat ur e( ) ) ) ) ;
}
if ( validationResult = = ValidationResult . NO_BLOCKCHAIN_LOCK ) {
LOGGER . trace ( ( ) - > String . format ( "Couldn't lock blockchain to import unconfirmed transaction" , Base58 . encode ( transactionData . getSignature ( ) ) ) ) ;
removeIncomingTransaction ( transactionData . getSignature ( ) ) ;
continue ;
// Signature valid - add to shortlist
sigValidTransactions . add ( transaction ) ;
}
try {
ReentrantLock blockchainLock = Controller . getInstance ( ) . getBlockchainLock ( ) ;
if ( ! blockchainLock . tryLock ( 2 , TimeUnit . SECONDS ) ) {
// This is not great if we've just spent a while doing mem-PoW during signature validation round above
LOGGER . debug ( "Too busy to process incoming transactions queue" ) ;
return ;
}
} catch ( InterruptedException e ) {
LOGGER . debug ( "Interrupted when trying to acquire blockchain lock" ) ;
return ;
}
// Import transactions with valid signatures
try {
for ( int i = 0 ; i < sigValidTransactions . size ( ) ; + + i ) {
if ( isStopping ) {
return ;
}
if ( Synchronizer . getInstance ( ) . isSyncRequestPending ( ) ) {
LOGGER . debug ( "Breaking out of transaction processing with {} remaining, because a sync request is pending" , sigValidTransactions . size ( ) - i ) ;
return ;
}
Transaction transaction = sigValidTransactions . get ( i ) ;
TransactionData transactionData = transaction . getTransactionData ( ) ;
ValidationResult validationResult = transaction . importAsUnconfirmed ( ) ;
switch ( validationResult ) {
case TRANSACTION_ALREADY_EXISTS : {
LOGGER . trace ( ( ) - > String . format ( "Ignoring existing transaction %s" , Base58 . encode ( transactionData . getSignature ( ) ) ) ) ;
break ;
}
case NO_BLOCKCHAIN_LOCK : {
// Is this even possible considering we acquired blockchain lock above?
LOGGER . trace ( ( ) - > String . format ( "Couldn't lock blockchain to import unconfirmed transaction %s" , Base58 . encode ( transactionData . getSignature ( ) ) ) ) ;
break ;
}
case OK : {
LOGGER . debug ( ( ) - > String . format ( "Imported %s transaction %s" , transactionData . getType ( ) . name ( ) , Base58 . encode ( transactionData . getSignature ( ) ) ) ) ;
break ;
}
if ( validationResult ! = ValidationResult . OK ) {
final String signature58 = Base58 . encode ( transactionData . getSignature ( ) ) ;
LOGGER . trace ( ( ) - > String . format ( "Ignoring invalid (%s) %s transaction %s" , validationResult . name ( ) , transactionData . getType ( ) . name ( ) , signature58 ) ) ;
Long now = NTP . getTime ( ) ;
if ( now ! = null & & now - transactionData . getTimestamp ( ) > INVALID_TRANSACTION_STALE_TIMEOUT ) {
Long expiryLength = INVALID_TRANSACTION_RECHECK_INTERVAL ;
if ( validationResult = = ValidationResult . TIMESTAMP_TOO_OLD ) {
// Use shorter recheck interval for expired transactions
expiryLength = EXPIRED_TRANSACTION_RECHECK_INTERVAL ;
// All other invalid cases:
default : {
final String signature58 = Base58 . encode ( transactionData . getSignature ( ) ) ;
LOGGER . trace ( ( ) - > String . format ( "Ignoring invalid (%s) %s transaction %s" , validationResult . name ( ) , transactionData . getType ( ) . name ( ) , signature58 ) ) ;
Long now = NTP . getTime ( ) ;
if ( now ! = null & & now - transactionData . getTimestamp ( ) > INVALID_TRANSACTION_STALE_TIMEOUT ) {
Long expiryLength = INVALID_TRANSACTION_RECHECK_INTERVAL ;
if ( validationResult = = ValidationResult . TIMESTAMP_TOO_OLD ) {
// Use shorter recheck interval for expired transactions
expiryLength = EXPIRED_TRANSACTION_RECHECK_INTERVAL ;
}
Long expiry = now + expiryLength ;
LOGGER . trace ( "Adding stale invalid transaction {} to invalidUnconfirmedTransactions..." , signature58 ) ;
// Invalid, unconfirmed transaction has become stale - add to invalidUnconfirmedTransactions so that we don't keep requesting it
invalidUnconfirmedTransactions . put ( signature58 , expiry ) ;
}
}
Long expiry = now + expiryLength ;
LOGGER . debug ( "Adding stale invalid transaction {} to invalidUnconfirmedTransactions..." , signature58 ) ;
// Invalid, unconfirmed transaction has become stale - add to invalidUnconfirmedTransactions so that we don't keep requesting it
invalidUnconfirmedTransactions . put ( signature58 , expiry ) ;
}
// Transaction has been processed, even if only to reject it
removeIncomingTransaction ( transactionData . getSignature ( ) ) ;
continue ;
}
LOGGER . debug ( ( ) - > String . format ( "Imported %s transaction %s" , transactionData . getType ( ) . name ( ) , Base58 . encode ( transactionData . getSignature ( ) ) ) ) ;
removeIncomingTransaction ( transactionData . getSignature ( ) ) ;
} finally {
LOGGER . debug ( "Finished processing incoming transactions queue" ) ;
blockchainLock . unlock ( ) ;
}
} catch ( DataException e ) {
LOGGER . error ( String . format ( "Repository issue while processing incoming transactions" , e ) ) ;
} finally {
LOGGER . debug ( "Finished processing incoming transactions queue" ) ;
blockchainLock . unlock ( ) ;
LOGGER . error ( "Repository issue while processing incoming transactions" , e ) ;
}
}
@ -1437,9 +1490,12 @@ public class Controller extends Thread {
private void onNetworkTransactionMessage ( Peer peer , Message message ) {
TransactionMessage transactionMessage = ( TransactionMessage ) message ;
TransactionData transactionData = transactionMessage . getTransactionData ( ) ;
if ( this . incomingTransactions . size ( ) < MAX_INCOMING_TRANSACTIONS ) {
if ( ! this . incomingTransactions . contains ( transactionData ) ) {
this . incomingTransactions . add ( transactionData ) ;
synchronized ( this . incomingTransactions ) {
if ( ! incomingTransactionQueueContains ( transactionData . getSignature ( ) ) ) {
this . incomingTransactions . put ( transactionData , Boolean . FALSE ) ;
}
}
}
}