Browse Source

Add more latest block caching to reduce repository accesses, especially for requests from remote peers

AT-sleep-until-message
catbref 4 years ago
parent
commit
f3b8258067
  1. 2
      src/main/java/org/qortal/block/BlockChain.java
  2. 194
      src/main/java/org/qortal/controller/Controller.java
  3. 2
      src/main/java/org/qortal/controller/Synchronizer.java
  4. 1
      src/main/java/org/qortal/network/message/BlockMessage.java

2
src/main/java/org/qortal/block/BlockChain.java

@ -568,7 +568,7 @@ public class BlockChain {
orphanBlockData = repository.getBlockRepository().fromHeight(height);
repository.discardChanges(); // clear transaction status to prevent deadlocks
Controller.getInstance().onNewBlock(orphanBlockData);
Controller.getInstance().onOrphanedBlock(orphanBlockData);
}
return true;

194
src/main/java/org/qortal/controller/Controller.java

@ -16,6 +16,7 @@ import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@ -24,6 +25,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@ -135,7 +137,10 @@ public class Controller extends Thread {
private ExecutorService callbackExecutor = Executors.newFixedThreadPool(3);
private volatile boolean notifyGroupMembershipChange = false;
private volatile BlockData chainTip = null;
private static final int LATEST_BLOCKS_SIZE = 10; // To cover typical Synchronizer request + a few spare
/** Latest blocks on our chain. Note: tail/last is the latest block. */
private final Deque<BlockData> latestBlocks = new LinkedList<>();
private volatile BlockMessage latestBlockMessage = null;
private long repositoryBackupTimestamp = startTime; // ms
private long ntpCheckTimestamp = startTime; // ms
@ -238,21 +243,36 @@ public class Controller extends Thread {
/** Returns current blockchain height, or 0 if it's not available. */
public int getChainHeight() {
BlockData blockData = this.chainTip;
if (blockData == null)
return 0;
synchronized (this.latestBlocks) {
BlockData blockData = this.latestBlocks.peekLast();
if (blockData == null)
return 0;
return blockData.getHeight();
return blockData.getHeight();
}
}
/** Returns highest block, or null if it's not available. */
public BlockData getChainTip() {
return this.chainTip;
synchronized (this.latestBlocks) {
return this.latestBlocks.peekLast();
}
}
/** Cache new blockchain tip. */
public void setChainTip(BlockData blockData) {
this.chainTip = blockData;
public void refillLatestBlocksCache() throws DataException {
// Set initial chain height/tip
try (final Repository repository = RepositoryManager.getRepository()) {
BlockData blockData = repository.getBlockRepository().getLastBlock();
synchronized (this.latestBlocks) {
this.latestBlocks.clear();
for (int i = 0; i < LATEST_BLOCKS_SIZE && blockData != null; ++i) {
this.latestBlocks.addFirst(blockData);
blockData = repository.getBlockRepository().fromHeight(blockData.getHeight() - 1);
}
}
}
}
public ReentrantLock getBlockchainLock() {
@ -334,13 +354,8 @@ public class Controller extends Thread {
try {
BlockChain.validate();
// Set initial chain height/tip
try (final Repository repository = RepositoryManager.getRepository()) {
BlockData blockData = repository.getBlockRepository().getLastBlock();
Controller.getInstance().setChainTip(blockData);
LOGGER.info(String.format("Our chain height at start-up: %d", blockData.getHeight()));
}
Controller.getInstance().refillLatestBlocksCache();
LOGGER.info(String.format("Our chain height at start-up: %d", Controller.getInstance().getChainHeight()));
} catch (DataException e) {
LOGGER.error("Couldn't validate blockchain", e);
Gui.getInstance().fatalError("Blockchain validation issue", e);
@ -572,9 +587,10 @@ public class Controller extends Thread {
public SynchronizationResult actuallySynchronize(Peer peer, boolean force) throws InterruptedException {
boolean hasStatusChanged = false;
BlockData priorChainTip = this.getChainTip();
synchronized (this.syncLock) {
this.syncPercent = (this.chainTip.getHeight() * 100) / peer.getChainTipData().getLastHeight();
this.syncPercent = (priorChainTip.getHeight() * 100) / peer.getChainTipData().getLastHeight();
// Only update SysTray if we're potentially changing height
if (this.syncPercent < 100) {
@ -586,8 +602,6 @@ public class Controller extends Thread {
if (hasStatusChanged)
updateSysTray();
BlockData priorChainTip = this.chainTip;
try {
SynchronizationResult syncResult = Synchronizer.getInstance().synchronize(peer, force);
switch (syncResult) {
@ -850,11 +864,84 @@ public class Controller extends Thread {
// Protective copy
BlockData blockDataCopy = new BlockData(latestBlockData);
this.setChainTip(blockDataCopy);
synchronized (this.latestBlocks) {
BlockData cachedChainTip = this.latestBlocks.peekLast();
if (cachedChainTip != null && Arrays.equals(cachedChainTip.getSignature(), blockDataCopy.getReference())) {
// Chain tip is parent for new latest block, so we can safely add new latest block
this.latestBlocks.addLast(latestBlockData);
} else {
if (cachedChainTip != null)
// Chain tip didn't match - potentially abnormal behaviour?
LOGGER.debug(() -> String.format("Cached chain tip %.8s not parent for new latest block %.8s (reference %.8s)",
Base58.encode(cachedChainTip.getSignature()),
Base58.encode(blockDataCopy.getSignature()),
Base58.encode(blockDataCopy.getReference())));
// Protectively rebuild cache
try {
this.refillLatestBlocksCache();
} catch (DataException e) {
LOGGER.warn(() -> "Couldn't refill latest blocks cache?", e);
}
}
}
this.onNewOrOrphanedBlock(blockDataCopy, NewBlockEvent::new);
}
public static class OrphanedBlockEvent implements Event {
private final BlockData blockData;
public OrphanedBlockEvent(BlockData blockData) {
this.blockData = blockData;
}
public BlockData getBlockData() {
return this.blockData;
}
}
/**
* Callback for when we've orphaned a block.
* <p>
* See <b>WARNING</b> for {@link EventBus#notify(Event)}
* to prevent deadlocks.
*/
public void onOrphanedBlock(BlockData latestBlockData) {
// Protective copy
BlockData blockDataCopy = new BlockData(latestBlockData);
synchronized (this.latestBlocks) {
BlockData cachedChainTip = this.latestBlocks.pollLast();
if (cachedChainTip != null && Arrays.equals(cachedChainTip.getReference(), blockDataCopy.getSignature())) {
// Chain tip was parent for new latest block that has been orphaned, so we're good
} else {
if (cachedChainTip != null)
// Chain tip didn't match - potentially abnormal behaviour?
LOGGER.debug(() -> String.format("Cached chain tip %.8s (reference %.8s) was not parent for new latest block %.8s",
Base58.encode(cachedChainTip.getSignature()),
Base58.encode(cachedChainTip.getReference()),
Base58.encode(blockDataCopy.getSignature())));
// Protectively rebuild cache
try {
this.refillLatestBlocksCache();
} catch (DataException e) {
LOGGER.warn(() -> "Couldn't refill latest blocks cache?", e);
}
}
}
this.onNewOrOrphanedBlock(blockDataCopy, OrphanedBlockEvent::new);
}
private void onNewOrOrphanedBlock(BlockData blockDataCopy, Function<BlockData, Event> eventConstructor) {
requestSysTrayUpdate = true;
// Notify listeners, trade-bot, etc.
EventBus.INSTANCE.notify(new NewBlockEvent(blockDataCopy));
EventBus.INSTANCE.notify(eventConstructor.apply(blockDataCopy));
if (this.notifyGroupMembershipChange) {
this.notifyGroupMembershipChange = false;
@ -955,8 +1042,21 @@ public class Controller extends Thread {
GetBlockMessage getBlockMessage = (GetBlockMessage) message;
byte[] signature = getBlockMessage.getSignature();
BlockMessage blockMessage = this.latestBlockMessage;
// Check cached latest block message
if (blockMessage != null && Arrays.equals(blockMessage.getBlockData().getSignature(), signature)) {
blockMessage.setId(message.getId());
if (!peer.sendMessage(blockMessage))
peer.disconnect("failed to send block");
return;
}
try (final Repository repository = RepositoryManager.getRepository()) {
BlockData blockData = repository.getBlockRepository().fromSignature(signature);
if (blockData == null) {
// We don't have this block
@ -973,10 +1073,16 @@ public class Controller extends Thread {
Block block = new Block(repository, blockData);
Message blockMessage = new BlockMessage(block);
blockMessage = new BlockMessage(block);
blockMessage.setId(message.getId());
// This call also causes the other needed data to be pulled in from repository
if (!peer.sendMessage(blockMessage))
peer.disconnect("failed to send block");
// If request is for latest block, cache it
if (Arrays.equals(this.getChainTip().getSignature(), signature))
this.latestBlockMessage = blockMessage;
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while send block %s to peer %s", Base58.encode(signature), peer), e);
}
@ -1023,32 +1129,38 @@ public class Controller extends Thread {
private void onNetworkGetBlockSummariesMessage(Peer peer, Message message) {
GetBlockSummariesMessage getBlockSummariesMessage = (GetBlockSummariesMessage) message;
byte[] parentSignature = getBlockSummariesMessage.getParentSignature();
final byte[] parentSignature = getBlockSummariesMessage.getParentSignature();
try (final Repository repository = RepositoryManager.getRepository()) {
List<BlockSummaryData> blockSummaries = new ArrayList<>();
List<BlockSummaryData> blockSummaries = new ArrayList<>();
int numberRequested = Math.min(Network.MAX_BLOCK_SUMMARIES_PER_REPLY, getBlockSummariesMessage.getNumberRequested());
// Attempt to serve from our cache of latest blocks
synchronized (this.latestBlocks) {
blockSummaries = this.latestBlocks.stream()
.dropWhile(cachedBlockData -> Arrays.equals(cachedBlockData.getSignature(), parentSignature))
.map(BlockSummaryData::new)
.collect(Collectors.toList());
}
if (blockSummaries.isEmpty())
try (final Repository repository = RepositoryManager.getRepository()) {
int numberRequested = Math.min(Network.MAX_BLOCK_SUMMARIES_PER_REPLY, getBlockSummariesMessage.getNumberRequested());
do {
BlockData blockData = repository.getBlockRepository().fromReference(parentSignature);
if (blockData == null)
// No more blocks to send to peer
break;
while (blockData != null && blockSummaries.size() < numberRequested) {
BlockSummaryData blockSummary = new BlockSummaryData(blockData);
blockSummaries.add(blockSummary);
BlockSummaryData blockSummary = new BlockSummaryData(blockData);
blockSummaries.add(blockSummary);
parentSignature = blockData.getSignature();
} while (blockSummaries.size() < numberRequested);
blockData = repository.getBlockRepository().fromReference(blockData.getSignature());
}
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while sending block summaries after %s to peer %s", Base58.encode(parentSignature), peer), e);
}
Message blockSummariesMessage = new BlockSummariesMessage(blockSummaries);
blockSummariesMessage.setId(message.getId());
if (!peer.sendMessage(blockSummariesMessage))
peer.disconnect("failed to send block summaries");
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while sending block summaries after %s to peer %s", Base58.encode(parentSignature), peer), e);
}
Message blockSummariesMessage = new BlockSummariesMessage(blockSummaries);
blockSummariesMessage.setId(message.getId());
if (!peer.sendMessage(blockSummariesMessage))
peer.disconnect("failed to send block summaries");
}
private void onNetworkGetSignaturesV2Message(Peer peer, Message message) {

2
src/main/java/org/qortal/controller/Synchronizer.java

@ -412,7 +412,7 @@ public class Synchronizer {
orphanBlockData = repository.getBlockRepository().fromHeight(ourHeight);
repository.discardChanges(); // clear transaction status to prevent deadlocks
Controller.getInstance().onNewBlock(orphanBlockData);
Controller.getInstance().onOrphanedBlock(orphanBlockData);
}
LOGGER.debug(String.format("Orphaned blocks back to height %d, sig %.8s - applying new blocks from peer %s", commonBlockHeight, commonBlockSig58, peer));

1
src/main/java/org/qortal/network/message/BlockMessage.java

@ -34,6 +34,7 @@ public class BlockMessage extends Message {
super(MessageType.BLOCK);
this.block = block;
this.blockData = block.getBlockData();
this.height = block.getBlockData().getHeight();
}

Loading…
Cancel
Save