Split Synchronizer into two strategies depending on whether swapping chains or simply adding new blocks

This commit is contained in:
catbref 2020-08-10 14:00:45 +01:00
parent 1e4432b1f3
commit a83d8bf1d5

View File

@ -126,167 +126,22 @@ public class Synchronizer {
// Unless we're doing a forced sync, we might need to compare blocks after common block
if (!force && ourInitialHeight > commonBlockHeight) {
// If our latest block is very old, we're very behind and should ditch our fork.
final Long minLatestBlockTimestamp = Controller.getMinimumLatestBlockTimestamp();
if (minLatestBlockTimestamp == null)
return SynchronizationResult.REPOSITORY_ISSUE;
SynchronizationResult chainCompareResult = compareChains(repository, commonBlockData, ourLatestBlockData, peer, peerHeight, peerBlockSummaries);
if (chainCompareResult != SynchronizationResult.OK)
return chainCompareResult;
}
if (ourLatestBlockData.getTimestamp() < minLatestBlockTimestamp) {
LOGGER.info(String.format("Ditching our chain after height %d as our latest block is very old", commonBlockHeight));
SynchronizationResult syncResult = null;
if (commonBlockHeight < ourInitialHeight) {
// Peer's chain is better, sync to that one
syncResult = syncToPeerChain(repository, commonBlockData, ourInitialHeight, peer, peerHeight, peerBlockSummaries);
} else {
// Compare chain weights
LOGGER.debug(String.format("Comparing chains from block %d with peer %s", commonBlockHeight + 1, peer));
// Fetch remaining peer's block summaries (which we also use to fill signatures list)
int peerBlockCount = peerHeight - commonBlockHeight;
while (peerBlockSummaries.size() < peerBlockCount) {
if (Controller.isStopping())
return SynchronizationResult.SHUTTING_DOWN;
int lastSummaryHeight = commonBlockHeight + peerBlockSummaries.size();
byte[] previousSignature;
if (peerBlockSummaries.isEmpty())
previousSignature = commonBlockSig;
else
previousSignature = peerBlockSummaries.get(peerBlockSummaries.size() - 1).getSignature();
List<BlockSummaryData> moreBlockSummaries = this.getBlockSummaries(peer, previousSignature, peerBlockCount - peerBlockSummaries.size());
if (moreBlockSummaries == null || moreBlockSummaries.isEmpty()) {
LOGGER.info(String.format("Peer %s failed to respond with block summaries after height %d, sig %.8s", peer,
lastSummaryHeight, Base58.encode(previousSignature)));
return SynchronizationResult.NO_REPLY;
// Simply fetch and apply blocks as they arrive
syncResult = applyNewBlocks(repository, commonBlockData, ourInitialHeight, peer, peerHeight, peerBlockSummaries);
}
// Check peer sent valid heights
for (int i = 0; i < moreBlockSummaries.size(); ++i) {
++lastSummaryHeight;
BlockSummaryData blockSummary = moreBlockSummaries.get(i);
if (blockSummary.getHeight() != lastSummaryHeight) {
LOGGER.info(String.format("Peer %s responded with invalid block summary for height %d, sig %.8s", peer,
lastSummaryHeight, Base58.encode(blockSummary.getSignature())));
return SynchronizationResult.NO_REPLY;
}
}
peerBlockSummaries.addAll(moreBlockSummaries);
}
// Fetch our corresponding block summaries
List<BlockSummaryData> ourBlockSummaries = repository.getBlockRepository().getBlockSummaries(commonBlockHeight + 1, ourInitialHeight);
// Populate minter account levels for both lists of block summaries
populateBlockSummariesMinterLevels(repository, peerBlockSummaries);
populateBlockSummariesMinterLevels(repository, ourBlockSummaries);
// Calculate cumulative chain weights of both blockchain subsets, from common block to highest mutual block.
BigInteger ourChainWeight = Block.calcChainWeight(commonBlockHeight, commonBlockSig, ourBlockSummaries);
BigInteger peerChainWeight = Block.calcChainWeight(commonBlockHeight, commonBlockSig, peerBlockSummaries);
// If our blockchain has greater weight then don't synchronize with peer
if (ourChainWeight.compareTo(peerChainWeight) >= 0) {
LOGGER.debug(String.format("Not synchronizing with peer %s as we have better blockchain", peer));
NumberFormat formatter = new DecimalFormat("0.###E0");
LOGGER.debug(String.format("Our chain weight: %s, peer's chain weight: %s (higher is better)", formatter.format(ourChainWeight), formatter.format(peerChainWeight)));
return SynchronizationResult.INFERIOR_CHAIN;
}
}
}
int ourHeight = ourInitialHeight;
if (ourHeight > commonBlockHeight) {
// Unwind to common block (unless common block is our latest block)
LOGGER.debug(String.format("Orphaning blocks back to common block height %d, sig %.8s", commonBlockHeight, commonBlockSig58));
while (ourHeight > commonBlockHeight) {
if (Controller.isStopping())
return SynchronizationResult.SHUTTING_DOWN;
BlockData blockData = repository.getBlockRepository().fromHeight(ourHeight);
Block block = new Block(repository, blockData);
block.orphan();
--ourHeight;
}
LOGGER.debug(String.format("Orphaned blocks back to height %d, sig %.8s - fetching blocks from peer %s", commonBlockHeight, commonBlockSig58, peer));
} else {
LOGGER.debug(String.format("Fetching new blocks from peer %s", peer));
}
// Fetch, and apply, blocks from peer
byte[] latestPeerSignature = commonBlockSig;
int maxBatchHeight = commonBlockHeight + SYNC_BATCH_SIZE;
// Convert any block summaries from above into signatures to request from peer
List<byte[]> peerBlockSignatures = peerBlockSummaries.stream().map(BlockSummaryData::getSignature).collect(Collectors.toList());
while (ourHeight < peerHeight && ourHeight < maxBatchHeight) {
if (Controller.isStopping())
return SynchronizationResult.SHUTTING_DOWN;
// Do we need more signatures?
if (peerBlockSignatures.isEmpty()) {
int numberRequested = maxBatchHeight - ourHeight;
LOGGER.trace(String.format("Requesting %d signature%s after height %d, sig %.8s",
numberRequested, (numberRequested != 1 ? "s": ""), ourHeight, Base58.encode(latestPeerSignature)));
peerBlockSignatures = this.getBlockSignatures(peer, latestPeerSignature, numberRequested);
if (peerBlockSignatures == null || peerBlockSignatures.isEmpty()) {
LOGGER.info(String.format("Peer %s failed to respond with more block signatures after height %d, sig %.8s", peer,
ourHeight, Base58.encode(latestPeerSignature)));
return SynchronizationResult.NO_REPLY;
}
LOGGER.trace(String.format("Received %s signature%s", peerBlockSignatures.size(), (peerBlockSignatures.size() != 1 ? "s" : "")));
}
latestPeerSignature = peerBlockSignatures.get(0);
peerBlockSignatures.remove(0);
++ourHeight;
Block newBlock = this.fetchBlock(repository, peer, latestPeerSignature);
if (newBlock == null) {
LOGGER.info(String.format("Peer %s failed to respond with block for height %d, sig %.8s", peer,
ourHeight, Base58.encode(latestPeerSignature)));
return SynchronizationResult.NO_REPLY;
}
if (!newBlock.isSignatureValid()) {
LOGGER.info(String.format("Peer %s sent block with invalid signature for height %d, sig %.8s", peer,
ourHeight, Base58.encode(latestPeerSignature)));
return SynchronizationResult.INVALID_DATA;
}
// Transactions are transmitted without approval status so determine that now
for (Transaction transaction : newBlock.getTransactions())
transaction.setInitialApprovalStatus();
ValidationResult blockResult = newBlock.isValid();
if (blockResult != ValidationResult.OK) {
LOGGER.info(String.format("Peer %s sent invalid block for height %d, sig %.8s: %s", peer,
ourHeight, Base58.encode(latestPeerSignature), blockResult.name()));
return SynchronizationResult.INVALID_DATA;
}
// Save transactions attached to this block
for (Transaction transaction : newBlock.getTransactions()) {
TransactionData transactionData = transaction.getTransactionData();
repository.getTransactionRepository().save(transactionData);
}
newBlock.process();
// If we've grown our blockchain then at least save progress so far
if (ourHeight > ourInitialHeight)
repository.saveChanges();
}
if (syncResult != SynchronizationResult.OK)
return syncResult;
// Commit
repository.saveChanges();
@ -396,6 +251,268 @@ public class Synchronizer {
return SynchronizationResult.OK;
}
private SynchronizationResult compareChains(Repository repository, BlockData commonBlockData, BlockData ourLatestBlockData,
Peer peer, int peerHeight, List<BlockSummaryData> peerBlockSummaries) throws DataException, InterruptedException {
final int commonBlockHeight = commonBlockData.getHeight();
final byte[] commonBlockSig = commonBlockData.getSignature();
// If our latest block is very old, we're very behind and should ditch our fork.
final Long minLatestBlockTimestamp = Controller.getMinimumLatestBlockTimestamp();
if (minLatestBlockTimestamp == null)
return SynchronizationResult.REPOSITORY_ISSUE;
if (ourLatestBlockData.getTimestamp() < minLatestBlockTimestamp) {
LOGGER.info(String.format("Ditching our chain after height %d as our latest block is very old", commonBlockHeight));
} else {
// Compare chain weights
LOGGER.debug(String.format("Comparing chains from block %d with peer %s", commonBlockHeight + 1, peer));
// Fetch remaining peer's block summaries (which we also use to fill signatures list)
int peerBlockCount = peerHeight - commonBlockHeight;
while (peerBlockSummaries.size() < peerBlockCount) {
if (Controller.isStopping())
return SynchronizationResult.SHUTTING_DOWN;
int lastSummaryHeight = commonBlockHeight + peerBlockSummaries.size();
byte[] previousSignature;
if (peerBlockSummaries.isEmpty())
previousSignature = commonBlockSig;
else
previousSignature = peerBlockSummaries.get(peerBlockSummaries.size() - 1).getSignature();
List<BlockSummaryData> moreBlockSummaries = this.getBlockSummaries(peer, previousSignature, peerBlockCount - peerBlockSummaries.size());
if (moreBlockSummaries == null || moreBlockSummaries.isEmpty()) {
LOGGER.info(String.format("Peer %s failed to respond with block summaries after height %d, sig %.8s", peer,
lastSummaryHeight, Base58.encode(previousSignature)));
return SynchronizationResult.NO_REPLY;
}
// Check peer sent valid heights
for (int i = 0; i < moreBlockSummaries.size(); ++i) {
++lastSummaryHeight;
BlockSummaryData blockSummary = moreBlockSummaries.get(i);
if (blockSummary.getHeight() != lastSummaryHeight) {
LOGGER.info(String.format("Peer %s responded with invalid block summary for height %d, sig %.8s", peer,
lastSummaryHeight, Base58.encode(blockSummary.getSignature())));
return SynchronizationResult.NO_REPLY;
}
}
peerBlockSummaries.addAll(moreBlockSummaries);
}
// Fetch our corresponding block summaries
List<BlockSummaryData> ourBlockSummaries = repository.getBlockRepository().getBlockSummaries(commonBlockHeight + 1, ourLatestBlockData.getHeight());
// Populate minter account levels for both lists of block summaries
populateBlockSummariesMinterLevels(repository, peerBlockSummaries);
populateBlockSummariesMinterLevels(repository, ourBlockSummaries);
// Calculate cumulative chain weights of both blockchain subsets, from common block to highest mutual block.
BigInteger ourChainWeight = Block.calcChainWeight(commonBlockHeight, commonBlockSig, ourBlockSummaries);
BigInteger peerChainWeight = Block.calcChainWeight(commonBlockHeight, commonBlockSig, peerBlockSummaries);
// If our blockchain has greater weight then don't synchronize with peer
if (ourChainWeight.compareTo(peerChainWeight) >= 0) {
LOGGER.debug(String.format("Not synchronizing with peer %s as we have better blockchain", peer));
NumberFormat formatter = new DecimalFormat("0.###E0");
LOGGER.debug(String.format("Our chain weight: %s, peer's chain weight: %s (higher is better)", formatter.format(ourChainWeight), formatter.format(peerChainWeight)));
return SynchronizationResult.INFERIOR_CHAIN;
}
}
return SynchronizationResult.OK;
}
private SynchronizationResult syncToPeerChain(Repository repository, BlockData commonBlockData, int ourInitialHeight,
Peer peer, final int peerHeight, List<BlockSummaryData> peerBlockSummaries) throws DataException, InterruptedException {
final int commonBlockHeight = commonBlockData.getHeight();
final byte[] commonBlockSig = commonBlockData.getSignature();
String commonBlockSig58 = Base58.encode(commonBlockSig);
LOGGER.debug(() -> String.format("Fetching peer %s chain from height %d, sig %.8s", peer, commonBlockHeight, commonBlockSig58));
int ourHeight = ourInitialHeight;
// Overall plan: fetch peer's blocks first, then orphan, then apply
// Convert any leftover (post-common) block summaries into signatures to request from peer
List<byte[]> peerBlockSignatures = peerBlockSummaries.stream().map(BlockSummaryData::getSignature).collect(Collectors.toList());
// Fetch remaining block signatures, if needed
int numberSignaturesRequired = peerBlockSignatures.size() - (peerHeight - commonBlockHeight - 1);
if (numberSignaturesRequired > 0) {
byte[] latestPeerSignature = peerBlockSignatures.isEmpty() ? commonBlockSig : peerBlockSignatures.get(peerBlockSignatures.size() - 1);
LOGGER.trace(String.format("Requesting %d signature%s after height %d, sig %.8s",
numberSignaturesRequired, (numberSignaturesRequired != 1 ? "s": ""), ourHeight, Base58.encode(latestPeerSignature)));
List<byte[]> moreBlockSignatures = this.getBlockSignatures(peer, latestPeerSignature, numberSignaturesRequired);
if (moreBlockSignatures == null || moreBlockSignatures.isEmpty()) {
LOGGER.info(String.format("Peer %s failed to respond with more block signatures after height %d, sig %.8s", peer,
ourHeight, Base58.encode(latestPeerSignature)));
return SynchronizationResult.NO_REPLY;
}
LOGGER.trace(String.format("Received %s signature%s", peerBlockSignatures.size(), (peerBlockSignatures.size() != 1 ? "s" : "")));
peerBlockSignatures.addAll(moreBlockSignatures);
}
// Fetch blocks using signatures
LOGGER.debug(String.format("Fetching new blocks from peer %s", peer));
List<Block> peerBlocks = new ArrayList<>();
for (byte[] blockSignature : peerBlockSignatures) {
Block newBlock = this.fetchBlock(repository, peer, blockSignature);
if (newBlock == null) {
LOGGER.info(String.format("Peer %s failed to respond with block for height %d, sig %.8s", peer,
ourHeight, Base58.encode(blockSignature)));
return SynchronizationResult.NO_REPLY;
}
if (!newBlock.isSignatureValid()) {
LOGGER.info(String.format("Peer %s sent block with invalid signature for height %d, sig %.8s", peer,
ourHeight, Base58.encode(blockSignature)));
return SynchronizationResult.INVALID_DATA;
}
// Transactions are transmitted without approval status so determine that now
for (Transaction transaction : newBlock.getTransactions())
transaction.setInitialApprovalStatus();
peerBlocks.add(newBlock);
}
// Unwind to common block (unless common block is our latest block)
LOGGER.debug(String.format("Orphaning blocks back to common block height %d, sig %.8s", commonBlockHeight, commonBlockSig58));
while (ourHeight > commonBlockHeight) {
if (Controller.isStopping())
return SynchronizationResult.SHUTTING_DOWN;
BlockData blockData = repository.getBlockRepository().fromHeight(ourHeight);
Block block = new Block(repository, blockData);
block.orphan();
--ourHeight;
}
LOGGER.debug(String.format("Orphaned blocks back to height %d, sig %.8s - applying new blocks from peer %s", commonBlockHeight, commonBlockSig58, peer));
for (Block newBlock : peerBlocks) {
ValidationResult blockResult = newBlock.isValid();
if (blockResult != ValidationResult.OK) {
LOGGER.info(String.format("Peer %s sent invalid block for height %d, sig %.8s: %s", peer,
ourHeight, Base58.encode(newBlock.getSignature()), blockResult.name()));
return SynchronizationResult.INVALID_DATA;
}
// Save transactions attached to this block
for (Transaction transaction : newBlock.getTransactions()) {
TransactionData transactionData = transaction.getTransactionData();
repository.getTransactionRepository().save(transactionData);
}
newBlock.process();
// If we've grown our blockchain then at least save progress so far
if (ourHeight > ourInitialHeight)
repository.saveChanges();
}
return SynchronizationResult.OK;
}
private SynchronizationResult applyNewBlocks(Repository repository, BlockData commonBlockData, int ourInitialHeight,
Peer peer, int peerHeight, List<BlockSummaryData> peerBlockSummaries) throws InterruptedException, DataException {
LOGGER.debug(String.format("Fetching new blocks from peer %s", peer));
final int commonBlockHeight = commonBlockData.getHeight();
final byte[] commonBlockSig = commonBlockData.getSignature();
int ourHeight = ourInitialHeight;
// Fetch, and apply, blocks from peer
byte[] latestPeerSignature = commonBlockSig;
int maxBatchHeight = commonBlockHeight + SYNC_BATCH_SIZE;
// Convert any block summaries from above into signatures to request from peer
List<byte[]> peerBlockSignatures = peerBlockSummaries.stream().map(BlockSummaryData::getSignature).collect(Collectors.toList());
while (ourHeight < peerHeight && ourHeight < maxBatchHeight) {
if (Controller.isStopping())
return SynchronizationResult.SHUTTING_DOWN;
// Do we need more signatures?
if (peerBlockSignatures.isEmpty()) {
int numberRequested = maxBatchHeight - ourHeight;
LOGGER.trace(String.format("Requesting %d signature%s after height %d, sig %.8s",
numberRequested, (numberRequested != 1 ? "s": ""), ourHeight, Base58.encode(latestPeerSignature)));
peerBlockSignatures = this.getBlockSignatures(peer, latestPeerSignature, numberRequested);
if (peerBlockSignatures == null || peerBlockSignatures.isEmpty()) {
LOGGER.info(String.format("Peer %s failed to respond with more block signatures after height %d, sig %.8s", peer,
ourHeight, Base58.encode(latestPeerSignature)));
return SynchronizationResult.NO_REPLY;
}
LOGGER.trace(String.format("Received %s signature%s", peerBlockSignatures.size(), (peerBlockSignatures.size() != 1 ? "s" : "")));
}
latestPeerSignature = peerBlockSignatures.get(0);
peerBlockSignatures.remove(0);
++ourHeight;
Block newBlock = this.fetchBlock(repository, peer, latestPeerSignature);
if (newBlock == null) {
LOGGER.info(String.format("Peer %s failed to respond with block for height %d, sig %.8s", peer,
ourHeight, Base58.encode(latestPeerSignature)));
return SynchronizationResult.NO_REPLY;
}
if (!newBlock.isSignatureValid()) {
LOGGER.info(String.format("Peer %s sent block with invalid signature for height %d, sig %.8s", peer,
ourHeight, Base58.encode(latestPeerSignature)));
return SynchronizationResult.INVALID_DATA;
}
// Transactions are transmitted without approval status so determine that now
for (Transaction transaction : newBlock.getTransactions())
transaction.setInitialApprovalStatus();
ValidationResult blockResult = newBlock.isValid();
if (blockResult != ValidationResult.OK) {
LOGGER.info(String.format("Peer %s sent invalid block for height %d, sig %.8s: %s", peer,
ourHeight, Base58.encode(latestPeerSignature), blockResult.name()));
return SynchronizationResult.INVALID_DATA;
}
// Save transactions attached to this block
for (Transaction transaction : newBlock.getTransactions()) {
TransactionData transactionData = transaction.getTransactionData();
repository.getTransactionRepository().save(transactionData);
}
newBlock.process();
// If we've grown our blockchain then at least save progress so far
if (ourHeight > ourInitialHeight)
repository.saveChanges();
}
return SynchronizationResult.OK;
}
private List<BlockSummaryData> getBlockSummaries(Peer peer, byte[] parentSignature, int numberRequested) throws InterruptedException {
Message getBlockSummariesMessage = new GetBlockSummariesMessage(parentSignature, numberRequested);