@ -119,6 +119,7 @@ public class Synchronizer {
LOGGER . debug ( String . format ( "Searching for common blocks with %d peers..." , peers . size ( ) ) ) ;
final long startTime = System . currentTimeMillis ( ) ;
int commonBlocksFound = 0 ;
boolean wereNewRequestsMade = false ;
for ( Peer peer : peers ) {
// Are we shutting down?
@ -139,10 +140,15 @@ public class Synchronizer {
Synchronizer . getInstance ( ) . findCommonBlockWithPeer ( peer , repository ) ;
if ( peer . getCommonBlockData ( ) ! = null )
commonBlocksFound + + ;
// This round wasn't served entirely from the cache, so we may want to log the results
wereNewRequestsMade = true ;
}
final long totalTimeTaken = System . currentTimeMillis ( ) - startTime ;
LOGGER . info ( String . format ( "Finished searching for common blocks with %d peer%s. Found: %d. Total time taken: %d ms" , peers . size ( ) , ( peers . size ( ) ! = 1 ? "s" : "" ) , commonBlocksFound , totalTimeTaken ) ) ;
if ( wereNewRequestsMade ) {
final long totalTimeTaken = System . currentTimeMillis ( ) - startTime ;
LOGGER . info ( String . format ( "Finished searching for common blocks with %d peer%s. Found: %d. Total time taken: %d ms" , peers . size ( ) , ( peers . size ( ) ! = 1 ? "s" : "" ) , commonBlocksFound , totalTimeTaken ) ) ;
}
return SynchronizationResult . OK ;
} finally {
@ -180,7 +186,7 @@ public class Synchronizer {
ourInitialHeight , Base58 . encode ( ourLastBlockSignature ) , ourLatestBlockData . getTimestamp ( ) ) ) ;
List < BlockSummaryData > peerBlockSummaries = new ArrayList < > ( ) ;
SynchronizationResult findCommonBlockResult = fetchSummariesFromCommonBlock ( repository , peer , ourInitialHeight , false , peerBlockSummaries ) ;
SynchronizationResult findCommonBlockResult = fetchSummariesFromCommonBlock ( repository , peer , ourInitialHeight , false , peerBlockSummaries , false ) ;
if ( findCommonBlockResult ! = SynchronizationResult . OK ) {
// Logging performed by fetchSummariesFromCommonBlock() above
peer . setCommonBlockData ( null ) ;
@ -290,7 +296,9 @@ public class Synchronizer {
return peers ;
// Count the number of blocks this peer has beyond our common block
final int peerHeight = peer . getChainTipData ( ) . getLastHeight ( ) ;
final PeerChainTipData peerChainTipData = peer . getChainTipData ( ) ;
final int peerHeight = peerChainTipData . getLastHeight ( ) ;
final byte [ ] peerLastBlockSignature = peerChainTipData . getLastBlockSignature ( ) ;
final int peerAdditionalBlocksAfterCommonBlock = peerHeight - commonBlockSummary . getHeight ( ) ;
// Limit the number of blocks we are comparing. FUTURE: we could request more in batches, but there may not be a case when this is needed
int summariesRequired = Math . min ( peerAdditionalBlocksAfterCommonBlock , MAXIMUM_REQUEST_SIZE ) ;
@ -300,7 +308,7 @@ public class Synchronizer {
if ( peer . canUseCachedCommonBlockData ( ) ) {
if ( peer . getCommonBlockData ( ) . getBlockSummariesAfterCommonBlock ( ) ! = null ) {
if ( peer . getCommonBlockData ( ) . getBlockSummariesAfterCommonBlock ( ) . size ( ) = = summariesRequired ) {
LOGGER . debug ( String . format ( "Using cached block summaries for peer %s" , peer ) ) ;
LOGGER . trace ( String . format ( "Using cached block summaries for peer %s" , peer ) ) ;
useCachedSummaries = true ;
}
}
@ -310,15 +318,23 @@ public class Synchronizer {
if ( summariesRequired > 0 ) {
LOGGER . trace ( String . format ( "Requesting %d block summar%s from peer %s after common block %.8s. Peer height: %d" , summariesRequired , ( summariesRequired ! = 1 ? "ies" : "y" ) , peer , Base58 . encode ( commonBlockSummary . getSignature ( ) ) , peerHeight ) ) ;
List < BlockSummaryData > blockSummaries = this . getBlockSummaries ( peer , commonBlockSummary . getSignature ( ) , summariesRequired ) ;
peer . getCommonBlockData ( ) . setBlockSummariesAfterCommonBlock ( blockSummaries ) ;
// Forget any cached summaries
peer . getCommonBlockData ( ) . setBlockSummariesAfterCommonBlock ( null ) ;
// Request new block summaries
List < BlockSummaryData > blockSummaries = this . getBlockSummaries ( peer , commonBlockSummary . getSignature ( ) , summariesRequired ) ;
if ( blockSummaries ! = null ) {
LOGGER . trace ( String . format ( "Peer %s returned %d block summar%s" , peer , blockSummaries . size ( ) , ( blockSummaries . size ( ) ! = 1 ? "ies" : "y" ) ) ) ;
if ( blockSummaries . size ( ) < summariesRequired )
// This could mean that the peer has re-orged. But we still have the same common block, so it's safe to proceed with this set of signatures instead.
LOGGER . debug ( String . format ( "Peer %s returned %d block summar%s instead of expected %d" , peer , blockSummaries . size ( ) , ( blockSummaries . size ( ) ! = 1 ? "ies" : "y" ) , summariesRequired ) ) ;
// This could mean that the peer has re-orged. Exclude this peer until they return the summaries we expect.
LOGGER . debug ( String . format ( "Peer %s returned %d block summar%s instead of expected %d - excluding them from this round" , peer , blockSummaries . size ( ) , ( blockSummaries . size ( ) ! = 1 ? "ies" : "y" ) , summariesRequired ) ) ;
else if ( blockSummaryWithSignature ( peerLastBlockSignature , blockSummaries ) = = null )
// We don't have a block summary for the peer's reported chain tip, so should exclude it
LOGGER . debug ( String . format ( "Peer %s didn't return a block summary with signature %.8s - excluding them from this round" , peer , Base58 . encode ( peerLastBlockSignature ) ) ) ;
else
// All looks good, so store the retrieved block summaries in the peer's cache
peer . getCommonBlockData ( ) . setBlockSummariesAfterCommonBlock ( blockSummaries ) ;
}
} else {
// There are no block summaries after this common block
@ -391,8 +407,8 @@ public class Synchronizer {
peers . remove ( peer ) ;
}
else {
// Our chain is inferior
LOGGER . debug ( String . format ( "Peer %s is on a better chain to us. We will compare the other peers sharing this common block against each other, and drop all peers sharing higher common blocks." , peer ) ) ;
// Our chain is inferior or equal
LOGGER . debug ( String . format ( "Peer %s is on an equal or better chain to us. We will compare the other peers sharing this common block against each other, and drop all peers sharing higher common blocks." , peer ) ) ;
dropPeersAfterCommonBlockHeight = commonBlockSummary . getHeight ( ) ;
superiorPeersForComparison . add ( peer ) ;
}
@ -414,6 +430,9 @@ public class Synchronizer {
peers . remove ( peer ) ;
}
}
// FUTURE: we may want to prefer peers with additional blocks, and compare the additional blocks against each other.
// This would fast track us to the best candidate for the latest block.
// Right now, peers with the exact same chain as us are treated equally to those with an additional block.
}
}
@ -432,14 +451,14 @@ public class Synchronizer {
for ( Peer peer : peers ) {
if ( peer . getCommonBlockData ( ) ! = null & & peer . getCommonBlockData ( ) . getCommonBlockSummary ( ) ! = null ) {
LOGGER . debug ( String . format ( "Peer %s has common block %.8s" , peer , Base58 . encode ( peer . getCommonBlockData ( ) . getCommonBlockSummary ( ) . getSignature ( ) ) ) ) ;
LOGGER . trace ( String . format ( "Peer %s has common block %.8s" , peer , Base58 . encode ( peer . getCommonBlockData ( ) . getCommonBlockSummary ( ) . getSignature ( ) ) ) ) ;
BlockSummaryData commonBlockSummary = peer . getCommonBlockData ( ) . getCommonBlockSummary ( ) ;
if ( ! commonBlocks . contains ( commonBlockSummary ) )
commonBlocks . add ( commonBlockSummary ) ;
}
else {
LOGGER . debug ( String . format ( "Peer %s has no common block data. Skipping..." , peer ) ) ;
LOGGER . trace ( String . format ( "Peer %s has no common block data. Skipping..." , peer ) ) ;
}
}
@ -459,6 +478,12 @@ public class Synchronizer {
return minChainLength ;
}
private BlockSummaryData blockSummaryWithSignature ( byte [ ] signature , List < BlockSummaryData > blockSummaries ) {
if ( blockSummaries ! = null )
return blockSummaries . stream ( ) . filter ( blockSummary - > Arrays . equals ( blockSummary . getSignature ( ) , signature ) ) . findAny ( ) . orElse ( null ) ;
return null ;
}
/ * *
* Attempt to synchronize blockchain with peer .
@ -494,7 +519,7 @@ public class Synchronizer {
ourInitialHeight , Base58 . encode ( ourLastBlockSignature ) , ourLatestBlockData . getTimestamp ( ) ) ) ;
List < BlockSummaryData > peerBlockSummaries = new ArrayList < > ( ) ;
SynchronizationResult findCommonBlockResult = fetchSummariesFromCommonBlock ( repository , peer , ourInitialHeight , force , peerBlockSummaries ) ;
SynchronizationResult findCommonBlockResult = fetchSummariesFromCommonBlock ( repository , peer , ourInitialHeight , force , peerBlockSummaries , true ) ;
if ( findCommonBlockResult ! = SynchronizationResult . OK ) {
// Logging performed by fetchSummariesFromCommonBlock() above
// Clear our common block cache for this peer
@ -576,7 +601,7 @@ public class Synchronizer {
* @throws DataException
* @throws InterruptedException
* /
public SynchronizationResult fetchSummariesFromCommonBlock ( Repository repository , Peer peer , int ourHeight , boolean force , List < BlockSummaryData > blockSummariesFromCommon ) throws DataException , InterruptedException {
public SynchronizationResult fetchSummariesFromCommonBlock ( Repository repository , Peer peer , int ourHeight , boolean force , List < BlockSummaryData > blockSummariesFromCommon , boolean infoLogWhenNotFound ) throws DataException , InterruptedException {
// Start by asking for a few recent block hashes as this will cover a majority of reorgs
// Failing that, back off exponentially
int step = INITIAL_BLOCK_STEP ;
@ -605,8 +630,12 @@ public class Synchronizer {
blockSummariesBatch = this . getBlockSummaries ( peer , testSignature , step ) ;
if ( blockSummariesBatch = = null ) {
if ( infoLogWhenNotFound )
LOGGER . info ( String . format ( "Error while trying to find common block with peer %s" , peer ) ) ;
else
LOGGER . debug ( String . format ( "Error while trying to find common block with peer %s" , peer ) ) ;
// No response - give up this time
LOGGER . info ( String . format ( "Error while trying to find common block with peer %s" , peer ) ) ;
return SynchronizationResult . NO_REPLY ;
}
@ -793,19 +822,13 @@ public class Synchronizer {
if ( cachedCommonBlockData ! = null )
cachedCommonBlockData . setBlockSummariesAfterCommonBlock ( null ) ;
// If we have already received recent or newer blocks from this peer , go ahead and apply them
if ( peerBlocks . size ( ) > 0 ) {
// If we have already received newer blocks from this peer that what we have already , go ahead and apply them
if ( peerBlocks . size ( ) > 0 ) {
final BlockData ourLatestBlockData = repository . getBlockRepository ( ) . getLastBlock ( ) ;
final Block peerLatestBlock = peerBlocks . get ( peerBlocks . size ( ) - 1 ) ;
final Long minLatestBlockTimestamp = Controller . getMinimumLatestBlockTimestamp ( ) ;
if ( ourLatestBlockData ! = null & & peerLatestBlock ! = null & & minLatestBlockTimestamp ! = null ) {
// If we have received at least one recent block, we can apply them
if ( peerLatestBlock . getBlockData ( ) . getTimestamp ( ) > minLatestBlockTimestamp ) {
LOGGER . debug ( "Newly received blocks are recent, so we will apply them" ) ;
break ;
}
// If our latest block is very old....
if ( ourLatestBlockData . getTimestamp ( ) < minLatestBlockTimestamp ) {
// ... and we have received a block that is more recent than our latest block ...
@ -820,10 +843,10 @@ public class Synchronizer {
}
}
}
}
// Otherwise, give up and move on to the next peer, to avoid putting our chain into an outdated state
return SynchronizationResult . NO_REPLY ;
}
}
// Otherwise, give up and move on to the next peer, to avoid putting our chain into an outdated or incomplete state
return SynchronizationResult . NO_REPLY ;
}
numberSignaturesRequired = peerHeight - height - peerBlockSignatures . size ( ) ;
LOGGER . trace ( String . format ( "Received %s signature%s" , peerBlockSignatures . size ( ) , ( peerBlockSignatures . size ( ) ! = 1 ? "s" : "" ) ) ) ;
@ -845,20 +868,13 @@ public class Synchronizer {
nextHeight , Base58 . encode ( nextPeerSignature ) ) ) ;
if ( retryCount > = maxRetries ) {
// If we have already received recent or newer blocks from this peer, go ahead and apply them
// If we have already received newer blocks from this peer that what we have already, go ahead and apply them
if ( peerBlocks . size ( ) > 0 ) {
final BlockData ourLatestBlockData = repository . getBlockRepository ( ) . getLastBlock ( ) ;
final Block peerLatestBlock = peerBlocks . get ( peerBlocks . size ( ) - 1 ) ;
final Long minLatestBlockTimestamp = Controller . getMinimumLatestBlockTimestamp ( ) ;
if ( ourLatestBlockData ! = null & & peerLatestBlock ! = null & & minLatestBlockTimestamp ! = null ) {
// If we have received at least one recent block, we can apply them
if ( peerLatestBlock . getBlockData ( ) . getTimestamp ( ) > minLatestBlockTimestamp ) {
LOGGER . debug ( "Newly received blocks are recent, so we will apply them" ) ;
break ;
}
// If our latest block is very old....
if ( ourLatestBlockData . getTimestamp ( ) < minLatestBlockTimestamp ) {
// ... and we have received a block that is more recent than our latest block ...
@ -874,7 +890,7 @@ public class Synchronizer {
}
}
}
// Otherwise, give up and move on to the next peer, to avoid putting our chain into an outdated state
// Otherwise, give up and move on to the next peer, to avoid putting our chain into an outdated or incomplete state
return SynchronizationResult . NO_REPLY ;
} else {