diff --git a/src/main/java/org/qortal/block/BlockChain.java b/src/main/java/org/qortal/block/BlockChain.java index 95ecc41b..520f8952 100644 --- a/src/main/java/org/qortal/block/BlockChain.java +++ b/src/main/java/org/qortal/block/BlockChain.java @@ -568,7 +568,7 @@ public class BlockChain { orphanBlockData = repository.getBlockRepository().fromHeight(height); repository.discardChanges(); // clear transaction status to prevent deadlocks - Controller.getInstance().onNewBlock(orphanBlockData); + Controller.getInstance().onOrphanedBlock(orphanBlockData); } return true; diff --git a/src/main/java/org/qortal/controller/Controller.java b/src/main/java/org/qortal/controller/Controller.java index 4f4ae6b1..6e52ae2c 100644 --- a/src/main/java/org/qortal/controller/Controller.java +++ b/src/main/java/org/qortal/controller/Controller.java @@ -16,6 +16,7 @@ import java.util.Collections; import java.util.Deque; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Properties; @@ -24,6 +25,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -135,7 +137,10 @@ public class Controller extends Thread { private ExecutorService callbackExecutor = Executors.newFixedThreadPool(3); private volatile boolean notifyGroupMembershipChange = false; - private volatile BlockData chainTip = null; + private static final int LATEST_BLOCKS_SIZE = 10; // To cover typical Synchronizer request + a few spare + /** Latest blocks on our chain. Note: tail/last is the latest block. */ + private final Deque latestBlocks = new LinkedList<>(); + private volatile BlockMessage latestBlockMessage = null; private long repositoryBackupTimestamp = startTime; // ms private long ntpCheckTimestamp = startTime; // ms @@ -238,21 +243,36 @@ public class Controller extends Thread { /** Returns current blockchain height, or 0 if it's not available. */ public int getChainHeight() { - BlockData blockData = this.chainTip; - if (blockData == null) - return 0; + synchronized (this.latestBlocks) { + BlockData blockData = this.latestBlocks.peekLast(); + if (blockData == null) + return 0; - return blockData.getHeight(); + return blockData.getHeight(); + } } /** Returns highest block, or null if it's not available. */ public BlockData getChainTip() { - return this.chainTip; + synchronized (this.latestBlocks) { + return this.latestBlocks.peekLast(); + } } - /** Cache new blockchain tip. */ - public void setChainTip(BlockData blockData) { - this.chainTip = blockData; + public void refillLatestBlocksCache() throws DataException { + // Set initial chain height/tip + try (final Repository repository = RepositoryManager.getRepository()) { + BlockData blockData = repository.getBlockRepository().getLastBlock(); + + synchronized (this.latestBlocks) { + this.latestBlocks.clear(); + + for (int i = 0; i < LATEST_BLOCKS_SIZE && blockData != null; ++i) { + this.latestBlocks.addFirst(blockData); + blockData = repository.getBlockRepository().fromHeight(blockData.getHeight() - 1); + } + } + } } public ReentrantLock getBlockchainLock() { @@ -334,13 +354,8 @@ public class Controller extends Thread { try { BlockChain.validate(); - // Set initial chain height/tip - try (final Repository repository = RepositoryManager.getRepository()) { - BlockData blockData = repository.getBlockRepository().getLastBlock(); - - Controller.getInstance().setChainTip(blockData); - LOGGER.info(String.format("Our chain height at start-up: %d", blockData.getHeight())); - } + Controller.getInstance().refillLatestBlocksCache(); + LOGGER.info(String.format("Our chain height at start-up: %d", Controller.getInstance().getChainHeight())); } catch (DataException e) { LOGGER.error("Couldn't validate blockchain", e); Gui.getInstance().fatalError("Blockchain validation issue", e); @@ -572,9 +587,10 @@ public class Controller extends Thread { public SynchronizationResult actuallySynchronize(Peer peer, boolean force) throws InterruptedException { boolean hasStatusChanged = false; + BlockData priorChainTip = this.getChainTip(); synchronized (this.syncLock) { - this.syncPercent = (this.chainTip.getHeight() * 100) / peer.getChainTipData().getLastHeight(); + this.syncPercent = (priorChainTip.getHeight() * 100) / peer.getChainTipData().getLastHeight(); // Only update SysTray if we're potentially changing height if (this.syncPercent < 100) { @@ -586,8 +602,6 @@ public class Controller extends Thread { if (hasStatusChanged) updateSysTray(); - BlockData priorChainTip = this.chainTip; - try { SynchronizationResult syncResult = Synchronizer.getInstance().synchronize(peer, force); switch (syncResult) { @@ -850,11 +864,84 @@ public class Controller extends Thread { // Protective copy BlockData blockDataCopy = new BlockData(latestBlockData); - this.setChainTip(blockDataCopy); + synchronized (this.latestBlocks) { + BlockData cachedChainTip = this.latestBlocks.peekLast(); + + if (cachedChainTip != null && Arrays.equals(cachedChainTip.getSignature(), blockDataCopy.getReference())) { + // Chain tip is parent for new latest block, so we can safely add new latest block + this.latestBlocks.addLast(latestBlockData); + } else { + if (cachedChainTip != null) + // Chain tip didn't match - potentially abnormal behaviour? + LOGGER.debug(() -> String.format("Cached chain tip %.8s not parent for new latest block %.8s (reference %.8s)", + Base58.encode(cachedChainTip.getSignature()), + Base58.encode(blockDataCopy.getSignature()), + Base58.encode(blockDataCopy.getReference()))); + + // Protectively rebuild cache + try { + this.refillLatestBlocksCache(); + } catch (DataException e) { + LOGGER.warn(() -> "Couldn't refill latest blocks cache?", e); + } + } + } + + this.onNewOrOrphanedBlock(blockDataCopy, NewBlockEvent::new); + } + + public static class OrphanedBlockEvent implements Event { + private final BlockData blockData; + + public OrphanedBlockEvent(BlockData blockData) { + this.blockData = blockData; + } + + public BlockData getBlockData() { + return this.blockData; + } + } + + /** + * Callback for when we've orphaned a block. + *

+ * See WARNING for {@link EventBus#notify(Event)} + * to prevent deadlocks. + */ + public void onOrphanedBlock(BlockData latestBlockData) { + // Protective copy + BlockData blockDataCopy = new BlockData(latestBlockData); + + synchronized (this.latestBlocks) { + BlockData cachedChainTip = this.latestBlocks.pollLast(); + + if (cachedChainTip != null && Arrays.equals(cachedChainTip.getReference(), blockDataCopy.getSignature())) { + // Chain tip was parent for new latest block that has been orphaned, so we're good + } else { + if (cachedChainTip != null) + // Chain tip didn't match - potentially abnormal behaviour? + LOGGER.debug(() -> String.format("Cached chain tip %.8s (reference %.8s) was not parent for new latest block %.8s", + Base58.encode(cachedChainTip.getSignature()), + Base58.encode(cachedChainTip.getReference()), + Base58.encode(blockDataCopy.getSignature()))); + + // Protectively rebuild cache + try { + this.refillLatestBlocksCache(); + } catch (DataException e) { + LOGGER.warn(() -> "Couldn't refill latest blocks cache?", e); + } + } + } + + this.onNewOrOrphanedBlock(blockDataCopy, OrphanedBlockEvent::new); + } + + private void onNewOrOrphanedBlock(BlockData blockDataCopy, Function eventConstructor) { requestSysTrayUpdate = true; // Notify listeners, trade-bot, etc. - EventBus.INSTANCE.notify(new NewBlockEvent(blockDataCopy)); + EventBus.INSTANCE.notify(eventConstructor.apply(blockDataCopy)); if (this.notifyGroupMembershipChange) { this.notifyGroupMembershipChange = false; @@ -955,8 +1042,21 @@ public class Controller extends Thread { GetBlockMessage getBlockMessage = (GetBlockMessage) message; byte[] signature = getBlockMessage.getSignature(); + BlockMessage blockMessage = this.latestBlockMessage; + + // Check cached latest block message + if (blockMessage != null && Arrays.equals(blockMessage.getBlockData().getSignature(), signature)) { + blockMessage.setId(message.getId()); + + if (!peer.sendMessage(blockMessage)) + peer.disconnect("failed to send block"); + + return; + } + try (final Repository repository = RepositoryManager.getRepository()) { BlockData blockData = repository.getBlockRepository().fromSignature(signature); + if (blockData == null) { // We don't have this block @@ -973,10 +1073,16 @@ public class Controller extends Thread { Block block = new Block(repository, blockData); - Message blockMessage = new BlockMessage(block); + blockMessage = new BlockMessage(block); blockMessage.setId(message.getId()); + + // This call also causes the other needed data to be pulled in from repository if (!peer.sendMessage(blockMessage)) peer.disconnect("failed to send block"); + + // If request is for latest block, cache it + if (Arrays.equals(this.getChainTip().getSignature(), signature)) + this.latestBlockMessage = blockMessage; } catch (DataException e) { LOGGER.error(String.format("Repository issue while send block %s to peer %s", Base58.encode(signature), peer), e); } @@ -1023,32 +1129,38 @@ public class Controller extends Thread { private void onNetworkGetBlockSummariesMessage(Peer peer, Message message) { GetBlockSummariesMessage getBlockSummariesMessage = (GetBlockSummariesMessage) message; - byte[] parentSignature = getBlockSummariesMessage.getParentSignature(); + final byte[] parentSignature = getBlockSummariesMessage.getParentSignature(); - try (final Repository repository = RepositoryManager.getRepository()) { - List blockSummaries = new ArrayList<>(); + List blockSummaries = new ArrayList<>(); - int numberRequested = Math.min(Network.MAX_BLOCK_SUMMARIES_PER_REPLY, getBlockSummariesMessage.getNumberRequested()); + // Attempt to serve from our cache of latest blocks + synchronized (this.latestBlocks) { + blockSummaries = this.latestBlocks.stream() + .dropWhile(cachedBlockData -> Arrays.equals(cachedBlockData.getSignature(), parentSignature)) + .map(BlockSummaryData::new) + .collect(Collectors.toList()); + } + + if (blockSummaries.isEmpty()) + try (final Repository repository = RepositoryManager.getRepository()) { + int numberRequested = Math.min(Network.MAX_BLOCK_SUMMARIES_PER_REPLY, getBlockSummariesMessage.getNumberRequested()); - do { BlockData blockData = repository.getBlockRepository().fromReference(parentSignature); - if (blockData == null) - // No more blocks to send to peer - break; + while (blockData != null && blockSummaries.size() < numberRequested) { + BlockSummaryData blockSummary = new BlockSummaryData(blockData); + blockSummaries.add(blockSummary); - BlockSummaryData blockSummary = new BlockSummaryData(blockData); - blockSummaries.add(blockSummary); - parentSignature = blockData.getSignature(); - } while (blockSummaries.size() < numberRequested); + blockData = repository.getBlockRepository().fromReference(blockData.getSignature()); + } + } catch (DataException e) { + LOGGER.error(String.format("Repository issue while sending block summaries after %s to peer %s", Base58.encode(parentSignature), peer), e); + } - Message blockSummariesMessage = new BlockSummariesMessage(blockSummaries); - blockSummariesMessage.setId(message.getId()); - if (!peer.sendMessage(blockSummariesMessage)) - peer.disconnect("failed to send block summaries"); - } catch (DataException e) { - LOGGER.error(String.format("Repository issue while sending block summaries after %s to peer %s", Base58.encode(parentSignature), peer), e); - } + Message blockSummariesMessage = new BlockSummariesMessage(blockSummaries); + blockSummariesMessage.setId(message.getId()); + if (!peer.sendMessage(blockSummariesMessage)) + peer.disconnect("failed to send block summaries"); } private void onNetworkGetSignaturesV2Message(Peer peer, Message message) { diff --git a/src/main/java/org/qortal/controller/Synchronizer.java b/src/main/java/org/qortal/controller/Synchronizer.java index 7aede4f2..747711b2 100644 --- a/src/main/java/org/qortal/controller/Synchronizer.java +++ b/src/main/java/org/qortal/controller/Synchronizer.java @@ -412,7 +412,7 @@ public class Synchronizer { orphanBlockData = repository.getBlockRepository().fromHeight(ourHeight); repository.discardChanges(); // clear transaction status to prevent deadlocks - Controller.getInstance().onNewBlock(orphanBlockData); + Controller.getInstance().onOrphanedBlock(orphanBlockData); } LOGGER.debug(String.format("Orphaned blocks back to height %d, sig %.8s - applying new blocks from peer %s", commonBlockHeight, commonBlockSig58, peer)); diff --git a/src/main/java/org/qortal/network/message/BlockMessage.java b/src/main/java/org/qortal/network/message/BlockMessage.java index 8ca86ee6..e63dce92 100644 --- a/src/main/java/org/qortal/network/message/BlockMessage.java +++ b/src/main/java/org/qortal/network/message/BlockMessage.java @@ -34,6 +34,7 @@ public class BlockMessage extends Message { super(MessageType.BLOCK); this.block = block; + this.blockData = block.getBlockData(); this.height = block.getBlockData().getHeight(); }