@ -20,6 +20,7 @@ import org.qortal.utils.NTP;
import java.util.* ;
import java.util.* ;
import java.util.concurrent.TimeUnit ;
import java.util.concurrent.TimeUnit ;
import java.util.concurrent.locks.ReentrantLock ;
import java.util.concurrent.locks.ReentrantLock ;
import java.util.stream.Collectors ;
public class TransactionImporter extends Thread {
public class TransactionImporter extends Thread {
@ -63,7 +64,9 @@ public class TransactionImporter extends Thread {
Thread . sleep ( 1000L ) ;
Thread . sleep ( 1000L ) ;
// Process incoming transactions queue
// Process incoming transactions queue
processIncomingTransactionsQueue ( ) ;
validateTransactionsInQueue ( ) ;
importTransactionsInQueue ( ) ;
// Clean up invalid incoming transactions list
// Clean up invalid incoming transactions list
cleanupInvalidTransactionsList ( NTP . getTime ( ) ) ;
cleanupInvalidTransactionsList ( NTP . getTime ( ) ) ;
}
}
@ -90,7 +93,24 @@ public class TransactionImporter extends Thread {
incomingTransactions . keySet ( ) . removeIf ( t - > Arrays . equals ( t . getSignature ( ) , signature ) ) ;
incomingTransactions . keySet ( ) . removeIf ( t - > Arrays . equals ( t . getSignature ( ) , signature ) ) ;
}
}
private void processIncomingTransactionsQueue ( ) {
/ * *
* Retrieve all pending unconfirmed transactions that have had their signatures validated .
* @return a list of TransactionData objects , with valid signatures .
* /
private List < TransactionData > getCachedSigValidTransactions ( ) {
return this . incomingTransactions . entrySet ( ) . stream ( )
. filter ( t - > Boolean . TRUE . equals ( t . getValue ( ) ) )
. map ( Map . Entry : : getKey )
. collect ( Collectors . toList ( ) ) ;
}
/ * *
* Validate the signatures of any transactions pending import , then update their
* entries in the queue to mark them as valid / invalid .
*
* No database lock is required .
* /
private void validateTransactionsInQueue ( ) {
if ( this . incomingTransactions . isEmpty ( ) ) {
if ( this . incomingTransactions . isEmpty ( ) ) {
// Nothing to do?
// Nothing to do?
return ;
return ;
@ -127,6 +147,8 @@ public class TransactionImporter extends Thread {
if ( isLiteNode ) {
if ( isLiteNode ) {
// Lite nodes can't easily validate transactions, so for now we will have to assume that everything is valid
// Lite nodes can't easily validate transactions, so for now we will have to assume that everything is valid
sigValidTransactions . add ( transaction ) ;
sigValidTransactions . add ( transaction ) ;
// Add mark signature as valid if transaction still exists in import queue
incomingTransactions . computeIfPresent ( transactionData , ( k , v ) - > Boolean . TRUE ) ;
continue ;
continue ;
}
}
@ -166,6 +188,18 @@ public class TransactionImporter extends Thread {
LOGGER . debug ( "Finished validating signatures in incoming transactions queue (valid this round: {}, total pending import: {})..." , validatedCount , sigValidTransactions . size ( ) ) ;
LOGGER . debug ( "Finished validating signatures in incoming transactions queue (valid this round: {}, total pending import: {})..." , validatedCount , sigValidTransactions . size ( ) ) ;
}
}
} catch ( DataException e ) {
LOGGER . error ( "Repository issue while processing incoming transactions" , e ) ;
}
}
/ * *
* Import any transactions in the queue that have valid signatures .
*
* A database lock is required .
* /
private void importTransactionsInQueue ( ) {
List < TransactionData > sigValidTransactions = this . getCachedSigValidTransactions ( ) ;
if ( sigValidTransactions . isEmpty ( ) ) {
if ( sigValidTransactions . isEmpty ( ) ) {
// Don't bother locking if there are no new transactions to process
// Don't bother locking if there are no new transactions to process
return ;
return ;
@ -173,14 +207,12 @@ public class TransactionImporter extends Thread {
if ( Synchronizer . getInstance ( ) . isSyncRequested ( ) | | Synchronizer . getInstance ( ) . isSynchronizing ( ) ) {
if ( Synchronizer . getInstance ( ) . isSyncRequested ( ) | | Synchronizer . getInstance ( ) . isSynchronizing ( ) ) {
// Prioritize syncing, and don't attempt to lock
// Prioritize syncing, and don't attempt to lock
// Signature validity is retained in the incomingTransactions map, to avoid the above work being wasted
return ;
return ;
}
}
try {
try {
ReentrantLock blockchainLock = Controller . getInstance ( ) . getBlockchainLock ( ) ;
ReentrantLock blockchainLock = Controller . getInstance ( ) . getBlockchainLock ( ) ;
if ( ! blockchainLock . tryLock ( 2 , TimeUnit . SECONDS ) ) {
if ( ! blockchainLock . tryLock ( 2 , TimeUnit . SECONDS ) ) {
// Signature validity is retained in the incomingTransactions map, to avoid the above work being wasted
LOGGER . debug ( "Too busy to process incoming transactions queue" ) ;
LOGGER . debug ( "Too busy to process incoming transactions queue" ) ;
return ;
return ;
}
}
@ -191,6 +223,8 @@ public class TransactionImporter extends Thread {
LOGGER . debug ( "Processing incoming transactions queue (size {})..." , sigValidTransactions . size ( ) ) ;
LOGGER . debug ( "Processing incoming transactions queue (size {})..." , sigValidTransactions . size ( ) ) ;
try ( final Repository repository = RepositoryManager . getRepository ( ) ) {
// Import transactions with valid signatures
// Import transactions with valid signatures
try {
try {
for ( int i = 0 ; i < sigValidTransactions . size ( ) ; + + i ) {
for ( int i = 0 ; i < sigValidTransactions . size ( ) ; + + i ) {
@ -203,8 +237,8 @@ public class TransactionImporter extends Thread {
return ;
return ;
}
}
Transaction transaction = sigValidTransactions . get ( i ) ;
TransactionData transactionData = sigValidTransactions . get ( i ) ;
TransactionData transactionData = transaction . getTransactionData ( ) ;
Transaction transaction = Transaction . fromData ( repository , transactionData ) ;
Transaction . ValidationResult validationResult = transaction . importAsUnconfirmed ( ) ;
Transaction . ValidationResult validationResult = transaction . importAsUnconfirmed ( ) ;