From a83d8bf1d5d38a9c6090d964da4a76257e41313f Mon Sep 17 00:00:00 2001 From: catbref Date: Mon, 10 Aug 2020 14:00:45 +0100 Subject: [PATCH] Split Synchronizer into two strategies depending on whether swapping chains or simply adding new blocks --- .../org/qortal/controller/Synchronizer.java | 429 +++++++++++------- 1 file changed, 273 insertions(+), 156 deletions(-) diff --git a/src/main/java/org/qortal/controller/Synchronizer.java b/src/main/java/org/qortal/controller/Synchronizer.java index e7dc6dd2..fa98ea73 100644 --- a/src/main/java/org/qortal/controller/Synchronizer.java +++ b/src/main/java/org/qortal/controller/Synchronizer.java @@ -126,167 +126,22 @@ public class Synchronizer { // Unless we're doing a forced sync, we might need to compare blocks after common block if (!force && ourInitialHeight > commonBlockHeight) { - // If our latest block is very old, we're very behind and should ditch our fork. - final Long minLatestBlockTimestamp = Controller.getMinimumLatestBlockTimestamp(); - if (minLatestBlockTimestamp == null) - return SynchronizationResult.REPOSITORY_ISSUE; - - if (ourLatestBlockData.getTimestamp() < minLatestBlockTimestamp) { - LOGGER.info(String.format("Ditching our chain after height %d as our latest block is very old", commonBlockHeight)); - } else { - // Compare chain weights - - LOGGER.debug(String.format("Comparing chains from block %d with peer %s", commonBlockHeight + 1, peer)); - - // Fetch remaining peer's block summaries (which we also use to fill signatures list) - int peerBlockCount = peerHeight - commonBlockHeight; - - while (peerBlockSummaries.size() < peerBlockCount) { - if (Controller.isStopping()) - return SynchronizationResult.SHUTTING_DOWN; - - int lastSummaryHeight = commonBlockHeight + peerBlockSummaries.size(); - byte[] previousSignature; - if (peerBlockSummaries.isEmpty()) - previousSignature = commonBlockSig; - else - previousSignature = peerBlockSummaries.get(peerBlockSummaries.size() - 1).getSignature(); - - List moreBlockSummaries = this.getBlockSummaries(peer, previousSignature, peerBlockCount - peerBlockSummaries.size()); - - if (moreBlockSummaries == null || moreBlockSummaries.isEmpty()) { - LOGGER.info(String.format("Peer %s failed to respond with block summaries after height %d, sig %.8s", peer, - lastSummaryHeight, Base58.encode(previousSignature))); - return SynchronizationResult.NO_REPLY; - } - - // Check peer sent valid heights - for (int i = 0; i < moreBlockSummaries.size(); ++i) { - ++lastSummaryHeight; - - BlockSummaryData blockSummary = moreBlockSummaries.get(i); - - if (blockSummary.getHeight() != lastSummaryHeight) { - LOGGER.info(String.format("Peer %s responded with invalid block summary for height %d, sig %.8s", peer, - lastSummaryHeight, Base58.encode(blockSummary.getSignature()))); - return SynchronizationResult.NO_REPLY; - } - } - - peerBlockSummaries.addAll(moreBlockSummaries); - } - - // Fetch our corresponding block summaries - List ourBlockSummaries = repository.getBlockRepository().getBlockSummaries(commonBlockHeight + 1, ourInitialHeight); - - // Populate minter account levels for both lists of block summaries - populateBlockSummariesMinterLevels(repository, peerBlockSummaries); - populateBlockSummariesMinterLevels(repository, ourBlockSummaries); - - // Calculate cumulative chain weights of both blockchain subsets, from common block to highest mutual block. - BigInteger ourChainWeight = Block.calcChainWeight(commonBlockHeight, commonBlockSig, ourBlockSummaries); - BigInteger peerChainWeight = Block.calcChainWeight(commonBlockHeight, commonBlockSig, peerBlockSummaries); - - // If our blockchain has greater weight then don't synchronize with peer - if (ourChainWeight.compareTo(peerChainWeight) >= 0) { - LOGGER.debug(String.format("Not synchronizing with peer %s as we have better blockchain", peer)); - NumberFormat formatter = new DecimalFormat("0.###E0"); - LOGGER.debug(String.format("Our chain weight: %s, peer's chain weight: %s (higher is better)", formatter.format(ourChainWeight), formatter.format(peerChainWeight))); - return SynchronizationResult.INFERIOR_CHAIN; - } - } + SynchronizationResult chainCompareResult = compareChains(repository, commonBlockData, ourLatestBlockData, peer, peerHeight, peerBlockSummaries); + if (chainCompareResult != SynchronizationResult.OK) + return chainCompareResult; } - int ourHeight = ourInitialHeight; - if (ourHeight > commonBlockHeight) { - // Unwind to common block (unless common block is our latest block) - LOGGER.debug(String.format("Orphaning blocks back to common block height %d, sig %.8s", commonBlockHeight, commonBlockSig58)); - - while (ourHeight > commonBlockHeight) { - if (Controller.isStopping()) - return SynchronizationResult.SHUTTING_DOWN; - - BlockData blockData = repository.getBlockRepository().fromHeight(ourHeight); - Block block = new Block(repository, blockData); - block.orphan(); - - --ourHeight; - } - - LOGGER.debug(String.format("Orphaned blocks back to height %d, sig %.8s - fetching blocks from peer %s", commonBlockHeight, commonBlockSig58, peer)); + SynchronizationResult syncResult = null; + if (commonBlockHeight < ourInitialHeight) { + // Peer's chain is better, sync to that one + syncResult = syncToPeerChain(repository, commonBlockData, ourInitialHeight, peer, peerHeight, peerBlockSummaries); } else { - LOGGER.debug(String.format("Fetching new blocks from peer %s", peer)); + // Simply fetch and apply blocks as they arrive + syncResult = applyNewBlocks(repository, commonBlockData, ourInitialHeight, peer, peerHeight, peerBlockSummaries); } - // Fetch, and apply, blocks from peer - byte[] latestPeerSignature = commonBlockSig; - int maxBatchHeight = commonBlockHeight + SYNC_BATCH_SIZE; - - // Convert any block summaries from above into signatures to request from peer - List peerBlockSignatures = peerBlockSummaries.stream().map(BlockSummaryData::getSignature).collect(Collectors.toList()); - - while (ourHeight < peerHeight && ourHeight < maxBatchHeight) { - if (Controller.isStopping()) - return SynchronizationResult.SHUTTING_DOWN; - - // Do we need more signatures? - if (peerBlockSignatures.isEmpty()) { - int numberRequested = maxBatchHeight - ourHeight; - LOGGER.trace(String.format("Requesting %d signature%s after height %d, sig %.8s", - numberRequested, (numberRequested != 1 ? "s": ""), ourHeight, Base58.encode(latestPeerSignature))); - - peerBlockSignatures = this.getBlockSignatures(peer, latestPeerSignature, numberRequested); - - if (peerBlockSignatures == null || peerBlockSignatures.isEmpty()) { - LOGGER.info(String.format("Peer %s failed to respond with more block signatures after height %d, sig %.8s", peer, - ourHeight, Base58.encode(latestPeerSignature))); - return SynchronizationResult.NO_REPLY; - } - - LOGGER.trace(String.format("Received %s signature%s", peerBlockSignatures.size(), (peerBlockSignatures.size() != 1 ? "s" : ""))); - } - - latestPeerSignature = peerBlockSignatures.get(0); - peerBlockSignatures.remove(0); - ++ourHeight; - - Block newBlock = this.fetchBlock(repository, peer, latestPeerSignature); - - if (newBlock == null) { - LOGGER.info(String.format("Peer %s failed to respond with block for height %d, sig %.8s", peer, - ourHeight, Base58.encode(latestPeerSignature))); - return SynchronizationResult.NO_REPLY; - } - - if (!newBlock.isSignatureValid()) { - LOGGER.info(String.format("Peer %s sent block with invalid signature for height %d, sig %.8s", peer, - ourHeight, Base58.encode(latestPeerSignature))); - return SynchronizationResult.INVALID_DATA; - } - - // Transactions are transmitted without approval status so determine that now - for (Transaction transaction : newBlock.getTransactions()) - transaction.setInitialApprovalStatus(); - - ValidationResult blockResult = newBlock.isValid(); - if (blockResult != ValidationResult.OK) { - LOGGER.info(String.format("Peer %s sent invalid block for height %d, sig %.8s: %s", peer, - ourHeight, Base58.encode(latestPeerSignature), blockResult.name())); - return SynchronizationResult.INVALID_DATA; - } - - // Save transactions attached to this block - for (Transaction transaction : newBlock.getTransactions()) { - TransactionData transactionData = transaction.getTransactionData(); - repository.getTransactionRepository().save(transactionData); - } - - newBlock.process(); - - // If we've grown our blockchain then at least save progress so far - if (ourHeight > ourInitialHeight) - repository.saveChanges(); - } + if (syncResult != SynchronizationResult.OK) + return syncResult; // Commit repository.saveChanges(); @@ -396,6 +251,268 @@ public class Synchronizer { return SynchronizationResult.OK; } + private SynchronizationResult compareChains(Repository repository, BlockData commonBlockData, BlockData ourLatestBlockData, + Peer peer, int peerHeight, List peerBlockSummaries) throws DataException, InterruptedException { + final int commonBlockHeight = commonBlockData.getHeight(); + final byte[] commonBlockSig = commonBlockData.getSignature(); + + // If our latest block is very old, we're very behind and should ditch our fork. + final Long minLatestBlockTimestamp = Controller.getMinimumLatestBlockTimestamp(); + if (minLatestBlockTimestamp == null) + return SynchronizationResult.REPOSITORY_ISSUE; + + if (ourLatestBlockData.getTimestamp() < minLatestBlockTimestamp) { + LOGGER.info(String.format("Ditching our chain after height %d as our latest block is very old", commonBlockHeight)); + } else { + // Compare chain weights + + LOGGER.debug(String.format("Comparing chains from block %d with peer %s", commonBlockHeight + 1, peer)); + + // Fetch remaining peer's block summaries (which we also use to fill signatures list) + int peerBlockCount = peerHeight - commonBlockHeight; + + while (peerBlockSummaries.size() < peerBlockCount) { + if (Controller.isStopping()) + return SynchronizationResult.SHUTTING_DOWN; + + int lastSummaryHeight = commonBlockHeight + peerBlockSummaries.size(); + byte[] previousSignature; + if (peerBlockSummaries.isEmpty()) + previousSignature = commonBlockSig; + else + previousSignature = peerBlockSummaries.get(peerBlockSummaries.size() - 1).getSignature(); + + List moreBlockSummaries = this.getBlockSummaries(peer, previousSignature, peerBlockCount - peerBlockSummaries.size()); + + if (moreBlockSummaries == null || moreBlockSummaries.isEmpty()) { + LOGGER.info(String.format("Peer %s failed to respond with block summaries after height %d, sig %.8s", peer, + lastSummaryHeight, Base58.encode(previousSignature))); + return SynchronizationResult.NO_REPLY; + } + + // Check peer sent valid heights + for (int i = 0; i < moreBlockSummaries.size(); ++i) { + ++lastSummaryHeight; + + BlockSummaryData blockSummary = moreBlockSummaries.get(i); + + if (blockSummary.getHeight() != lastSummaryHeight) { + LOGGER.info(String.format("Peer %s responded with invalid block summary for height %d, sig %.8s", peer, + lastSummaryHeight, Base58.encode(blockSummary.getSignature()))); + return SynchronizationResult.NO_REPLY; + } + } + + peerBlockSummaries.addAll(moreBlockSummaries); + } + + // Fetch our corresponding block summaries + List ourBlockSummaries = repository.getBlockRepository().getBlockSummaries(commonBlockHeight + 1, ourLatestBlockData.getHeight()); + + // Populate minter account levels for both lists of block summaries + populateBlockSummariesMinterLevels(repository, peerBlockSummaries); + populateBlockSummariesMinterLevels(repository, ourBlockSummaries); + + // Calculate cumulative chain weights of both blockchain subsets, from common block to highest mutual block. + BigInteger ourChainWeight = Block.calcChainWeight(commonBlockHeight, commonBlockSig, ourBlockSummaries); + BigInteger peerChainWeight = Block.calcChainWeight(commonBlockHeight, commonBlockSig, peerBlockSummaries); + + // If our blockchain has greater weight then don't synchronize with peer + if (ourChainWeight.compareTo(peerChainWeight) >= 0) { + LOGGER.debug(String.format("Not synchronizing with peer %s as we have better blockchain", peer)); + NumberFormat formatter = new DecimalFormat("0.###E0"); + LOGGER.debug(String.format("Our chain weight: %s, peer's chain weight: %s (higher is better)", formatter.format(ourChainWeight), formatter.format(peerChainWeight))); + return SynchronizationResult.INFERIOR_CHAIN; + } + } + + return SynchronizationResult.OK; + } + + private SynchronizationResult syncToPeerChain(Repository repository, BlockData commonBlockData, int ourInitialHeight, + Peer peer, final int peerHeight, List peerBlockSummaries) throws DataException, InterruptedException { + final int commonBlockHeight = commonBlockData.getHeight(); + final byte[] commonBlockSig = commonBlockData.getSignature(); + String commonBlockSig58 = Base58.encode(commonBlockSig); + + LOGGER.debug(() -> String.format("Fetching peer %s chain from height %d, sig %.8s", peer, commonBlockHeight, commonBlockSig58)); + + int ourHeight = ourInitialHeight; + + // Overall plan: fetch peer's blocks first, then orphan, then apply + + // Convert any leftover (post-common) block summaries into signatures to request from peer + List peerBlockSignatures = peerBlockSummaries.stream().map(BlockSummaryData::getSignature).collect(Collectors.toList()); + + // Fetch remaining block signatures, if needed + int numberSignaturesRequired = peerBlockSignatures.size() - (peerHeight - commonBlockHeight - 1); + if (numberSignaturesRequired > 0) { + byte[] latestPeerSignature = peerBlockSignatures.isEmpty() ? commonBlockSig : peerBlockSignatures.get(peerBlockSignatures.size() - 1); + + LOGGER.trace(String.format("Requesting %d signature%s after height %d, sig %.8s", + numberSignaturesRequired, (numberSignaturesRequired != 1 ? "s": ""), ourHeight, Base58.encode(latestPeerSignature))); + + List moreBlockSignatures = this.getBlockSignatures(peer, latestPeerSignature, numberSignaturesRequired); + + if (moreBlockSignatures == null || moreBlockSignatures.isEmpty()) { + LOGGER.info(String.format("Peer %s failed to respond with more block signatures after height %d, sig %.8s", peer, + ourHeight, Base58.encode(latestPeerSignature))); + return SynchronizationResult.NO_REPLY; + } + + LOGGER.trace(String.format("Received %s signature%s", peerBlockSignatures.size(), (peerBlockSignatures.size() != 1 ? "s" : ""))); + + peerBlockSignatures.addAll(moreBlockSignatures); + } + + // Fetch blocks using signatures + LOGGER.debug(String.format("Fetching new blocks from peer %s", peer)); + List peerBlocks = new ArrayList<>(); + + for (byte[] blockSignature : peerBlockSignatures) { + Block newBlock = this.fetchBlock(repository, peer, blockSignature); + + if (newBlock == null) { + LOGGER.info(String.format("Peer %s failed to respond with block for height %d, sig %.8s", peer, + ourHeight, Base58.encode(blockSignature))); + return SynchronizationResult.NO_REPLY; + } + + if (!newBlock.isSignatureValid()) { + LOGGER.info(String.format("Peer %s sent block with invalid signature for height %d, sig %.8s", peer, + ourHeight, Base58.encode(blockSignature))); + return SynchronizationResult.INVALID_DATA; + } + + // Transactions are transmitted without approval status so determine that now + for (Transaction transaction : newBlock.getTransactions()) + transaction.setInitialApprovalStatus(); + + peerBlocks.add(newBlock); + } + + // Unwind to common block (unless common block is our latest block) + LOGGER.debug(String.format("Orphaning blocks back to common block height %d, sig %.8s", commonBlockHeight, commonBlockSig58)); + + while (ourHeight > commonBlockHeight) { + if (Controller.isStopping()) + return SynchronizationResult.SHUTTING_DOWN; + + BlockData blockData = repository.getBlockRepository().fromHeight(ourHeight); + Block block = new Block(repository, blockData); + block.orphan(); + + --ourHeight; + } + + LOGGER.debug(String.format("Orphaned blocks back to height %d, sig %.8s - applying new blocks from peer %s", commonBlockHeight, commonBlockSig58, peer)); + + for (Block newBlock : peerBlocks) { + ValidationResult blockResult = newBlock.isValid(); + if (blockResult != ValidationResult.OK) { + LOGGER.info(String.format("Peer %s sent invalid block for height %d, sig %.8s: %s", peer, + ourHeight, Base58.encode(newBlock.getSignature()), blockResult.name())); + return SynchronizationResult.INVALID_DATA; + } + + // Save transactions attached to this block + for (Transaction transaction : newBlock.getTransactions()) { + TransactionData transactionData = transaction.getTransactionData(); + repository.getTransactionRepository().save(transactionData); + } + + newBlock.process(); + + // If we've grown our blockchain then at least save progress so far + if (ourHeight > ourInitialHeight) + repository.saveChanges(); + } + + return SynchronizationResult.OK; + } + + private SynchronizationResult applyNewBlocks(Repository repository, BlockData commonBlockData, int ourInitialHeight, + Peer peer, int peerHeight, List peerBlockSummaries) throws InterruptedException, DataException { + LOGGER.debug(String.format("Fetching new blocks from peer %s", peer)); + + final int commonBlockHeight = commonBlockData.getHeight(); + final byte[] commonBlockSig = commonBlockData.getSignature(); + + int ourHeight = ourInitialHeight; + + // Fetch, and apply, blocks from peer + byte[] latestPeerSignature = commonBlockSig; + int maxBatchHeight = commonBlockHeight + SYNC_BATCH_SIZE; + + // Convert any block summaries from above into signatures to request from peer + List peerBlockSignatures = peerBlockSummaries.stream().map(BlockSummaryData::getSignature).collect(Collectors.toList()); + + while (ourHeight < peerHeight && ourHeight < maxBatchHeight) { + if (Controller.isStopping()) + return SynchronizationResult.SHUTTING_DOWN; + + // Do we need more signatures? + if (peerBlockSignatures.isEmpty()) { + int numberRequested = maxBatchHeight - ourHeight; + LOGGER.trace(String.format("Requesting %d signature%s after height %d, sig %.8s", + numberRequested, (numberRequested != 1 ? "s": ""), ourHeight, Base58.encode(latestPeerSignature))); + + peerBlockSignatures = this.getBlockSignatures(peer, latestPeerSignature, numberRequested); + + if (peerBlockSignatures == null || peerBlockSignatures.isEmpty()) { + LOGGER.info(String.format("Peer %s failed to respond with more block signatures after height %d, sig %.8s", peer, + ourHeight, Base58.encode(latestPeerSignature))); + return SynchronizationResult.NO_REPLY; + } + + LOGGER.trace(String.format("Received %s signature%s", peerBlockSignatures.size(), (peerBlockSignatures.size() != 1 ? "s" : ""))); + } + + latestPeerSignature = peerBlockSignatures.get(0); + peerBlockSignatures.remove(0); + ++ourHeight; + + Block newBlock = this.fetchBlock(repository, peer, latestPeerSignature); + + if (newBlock == null) { + LOGGER.info(String.format("Peer %s failed to respond with block for height %d, sig %.8s", peer, + ourHeight, Base58.encode(latestPeerSignature))); + return SynchronizationResult.NO_REPLY; + } + + if (!newBlock.isSignatureValid()) { + LOGGER.info(String.format("Peer %s sent block with invalid signature for height %d, sig %.8s", peer, + ourHeight, Base58.encode(latestPeerSignature))); + return SynchronizationResult.INVALID_DATA; + } + + // Transactions are transmitted without approval status so determine that now + for (Transaction transaction : newBlock.getTransactions()) + transaction.setInitialApprovalStatus(); + + ValidationResult blockResult = newBlock.isValid(); + if (blockResult != ValidationResult.OK) { + LOGGER.info(String.format("Peer %s sent invalid block for height %d, sig %.8s: %s", peer, + ourHeight, Base58.encode(latestPeerSignature), blockResult.name())); + return SynchronizationResult.INVALID_DATA; + } + + // Save transactions attached to this block + for (Transaction transaction : newBlock.getTransactions()) { + TransactionData transactionData = transaction.getTransactionData(); + repository.getTransactionRepository().save(transactionData); + } + + newBlock.process(); + + // If we've grown our blockchain then at least save progress so far + if (ourHeight > ourInitialHeight) + repository.saveChanges(); + } + + return SynchronizationResult.OK; + } + private List getBlockSummaries(Peer peer, byte[] parentSignature, int numberRequested) throws InterruptedException { Message getBlockSummariesMessage = new GetBlockSummariesMessage(parentSignature, numberRequested);