@ -40,6 +40,9 @@ public class OnlineAccountsManager extends Thread {
private long onlineAccountsTasksTimestamp = Controller . startTime + ONLINE_ACCOUNTS_TASKS_INTERVAL ; // ms
private long onlineAccountsTasksTimestamp = Controller . startTime + ONLINE_ACCOUNTS_TASKS_INTERVAL ; // ms
private final List < OnlineAccountData > onlineAccountsImportQueue = Collections . synchronizedList ( new ArrayList < > ( ) ) ;
/** Cache of current 'online accounts' */
/** Cache of current 'online accounts' */
List < OnlineAccountData > onlineAccounts = new ArrayList < > ( ) ;
List < OnlineAccountData > onlineAccounts = new ArrayList < > ( ) ;
/** Cache of latest blocks' online accounts */
/** Cache of latest blocks' online accounts */
@ -60,7 +63,7 @@ public class OnlineAccountsManager extends Thread {
public void run ( ) {
public void run ( ) {
try {
try {
while ( ! Controller . isStopping ( ) ) {
while ( ! Controller . isStopping ( ) ) {
Thread . sleep ( 1000 L ) ;
Thread . sleep ( 100L ) ;
final Long now = NTP . getTime ( ) ;
final Long now = NTP . getTime ( ) ;
@ -69,6 +72,10 @@ public class OnlineAccountsManager extends Thread {
onlineAccountsTasksTimestamp = now + ONLINE_ACCOUNTS_TASKS_INTERVAL ;
onlineAccountsTasksTimestamp = now + ONLINE_ACCOUNTS_TASKS_INTERVAL ;
performOnlineAccountsTasks ( ) ;
performOnlineAccountsTasks ( ) ;
}
}
// Process queued online account verifications
this . processOnlineAccountsImportQueue ( ) ;
}
}
} catch ( InterruptedException e ) {
} catch ( InterruptedException e ) {
// Fall through to exit thread
// Fall through to exit thread
@ -81,6 +88,38 @@ public class OnlineAccountsManager extends Thread {
}
}
// Online accounts import queue
private void processOnlineAccountsImportQueue ( ) {
if ( this . onlineAccountsImportQueue . isEmpty ( ) ) {
// Nothing to do
return ;
}
LOGGER . debug ( "Processing online accounts import queue (size: {})" , this . onlineAccountsImportQueue . size ( ) ) ;
try ( final Repository repository = RepositoryManager . getRepository ( ) ) {
List < OnlineAccountData > onlineAccountDataCopy = new ArrayList < > ( this . onlineAccountsImportQueue ) ;
for ( OnlineAccountData onlineAccountData : onlineAccountDataCopy ) {
if ( isStopping ) {
return ;
}
this . verifyAndAddAccount ( repository , onlineAccountData ) ;
// Remove from queue
onlineAccountsImportQueue . remove ( onlineAccountData ) ;
}
LOGGER . debug ( "Finished processing online accounts import queue" ) ;
} catch ( DataException e ) {
LOGGER . error ( String . format ( "Repository issue while verifying online accounts" ) , e ) ;
}
}
// Utilities
// Utilities
private void verifyAndAddAccount ( Repository repository , OnlineAccountData onlineAccountData ) throws DataException {
private void verifyAndAddAccount ( Repository repository , OnlineAccountData onlineAccountData ) throws DataException {
@ -432,13 +471,29 @@ public class OnlineAccountsManager extends Thread {
OnlineAccountsV2Message onlineAccountsMessage = ( OnlineAccountsV2Message ) message ;
OnlineAccountsV2Message onlineAccountsMessage = ( OnlineAccountsV2Message ) message ;
List < OnlineAccountData > peersOnlineAccounts = onlineAccountsMessage . getOnlineAccounts ( ) ;
List < OnlineAccountData > peersOnlineAccounts = onlineAccountsMessage . getOnlineAccounts ( ) ;
LOGGER . trace ( ( ) - > String . format ( "Received %d online accounts from %s" , peersOnlineAccounts . size ( ) , peer ) ) ;
LOGGER . debug ( String . format ( "Received %d online accounts from %s" , peersOnlineAccounts . size ( ) , peer ) ) ;
try ( final Repository repository = RepositoryManager . getRepository ( ) ) {
int importCount = 0 ;
for ( OnlineAccountData onlineAccountData : peersOnlineAccounts )
this . verifyAndAddAccount ( repository , onlineAccountData ) ;
synchronized ( onlineAccountsImportQueue ) {
} catch ( DataException e ) {
// Add any online accounts to the queue that aren't already present
LOGGER . error ( String . format ( "Repository issue while verifying online accounts from peer %s" , peer ) , e ) ;
for ( OnlineAccountData onlineAccountData : peersOnlineAccounts ) {
// Do we already know about this online account data?
if ( onlineAccounts . contains ( onlineAccountData ) ) {
continue ;
}
// Is it already in the import queue?
if ( onlineAccountsImportQueue . contains ( onlineAccountData ) ) {
continue ;
}
onlineAccountsImportQueue . add ( onlineAccountData ) ;
importCount + + ;
}
}
}
LOGGER . debug ( String . format ( "Added %d online accounts to queue" , importCount ) ) ;
}
}
}
}