mirror of
synced 2025-03-15 03:52:31 +00:00
Added GetBlocksMessage and BlocksMessage, which allow multiple blocks to be transferred between peers in a single request.
When communicating with a peer that is running at least version 1.5.0, it will now sync multiple blocks at once in Synchronizer.syncToPeerChain(). This allows us to bypass the single block syncing (and retry mechanism), which has proven to be unviable when there are multiple active forks with several blocks in each chain. For peers below v1.5.0, the logic should remain unaffected and it will continue to sync blocks individually.
This commit is contained in:
@ -469,6 +469,16 @@ public class Block {
return this.minter;
public void setRepository(Repository repository) throws DataException {
this.repository = repository;
for (Transaction transaction : this.getTransactions()) {
// More information
@ -517,8 +527,10 @@ public class Block {
long nonAtTransactionCount = transactionsData.stream().filter(transactionData -> transactionData.getType() != TransactionType.AT).count();
// The number of non-AT transactions fetched from repository should correspond with Block's transactionCount
if (nonAtTransactionCount != this.blockData.getTransactionCount())
if (nonAtTransactionCount != this.blockData.getTransactionCount()) {
LOGGER.error(() -> String.format("Block's transactions from repository (%d) do not match block's transaction count (%d)", nonAtTransactionCount, this.blockData.getTransactionCount()));
throw new IllegalStateException("Block's transactions from repository do not match block's transaction count");
this.transactions = new ArrayList<>();
@ -68,9 +68,11 @@ import org.qortal.network.Network;
import org.qortal.network.Peer;
import org.qortal.network.message.ArbitraryDataMessage;
import org.qortal.network.message.BlockSummariesMessage;
import org.qortal.network.message.BlocksMessage;
import org.qortal.network.message.CachedBlockMessage;
import org.qortal.network.message.GetArbitraryDataMessage;
import org.qortal.network.message.GetBlockMessage;
import org.qortal.network.message.GetBlocksMessage;
import org.qortal.network.message.GetBlockSummariesMessage;
import org.qortal.network.message.GetOnlineAccountsMessage;
import org.qortal.network.message.GetPeersMessage;
@ -216,6 +218,18 @@ public class Controller extends Thread {
public GetBlockMessageStats getBlockMessageStats = new GetBlockMessageStats();
public static class GetBlocksMessageStats {
public AtomicLong requests = new AtomicLong();
public AtomicLong cacheHits = new AtomicLong();
public AtomicLong unknownBlocks = new AtomicLong();
public AtomicLong cacheFills = new AtomicLong();
public AtomicLong fullyFromCache = new AtomicLong();
public GetBlocksMessageStats() {
public GetBlocksMessageStats getBlocksMessageStats = new GetBlocksMessageStats();
public static class GetBlockSummariesStats {
public AtomicLong requests = new AtomicLong();
public AtomicLong cacheHits = new AtomicLong();
@ -1094,6 +1108,10 @@ public class Controller extends Thread {
onNetworkGetBlockMessage(peer, message);
onNetworkGetBlocksMessage(peer, message);
onNetworkTransactionMessage(peer, message);
@ -1208,6 +1226,68 @@ public class Controller extends Thread {
private void onNetworkGetBlocksMessage(Peer peer, Message message) {
GetBlocksMessage getBlocksMessage = (GetBlocksMessage) message;
byte[] parentSignature = getBlocksMessage.getParentSignature();
try (final Repository repository = RepositoryManager.getRepository()) {
// If peer's parent signature matches our latest block signature
// then we can short-circuit with an empty response
BlockData chainTip = getChainTip();
if (chainTip != null && Arrays.equals(parentSignature, chainTip.getSignature())) {
Message blocksMessage = new BlocksMessage(Collections.emptyList());
if (!peer.sendMessage(blocksMessage))
peer.disconnect("failed to send blocks");
List<BlockData> blockDataList = new ArrayList<>();
// Attempt to serve from our cache of latest blocks
synchronized (this.latestBlocks) {
blockDataList = this.latestBlocks.stream()
.dropWhile(cachedBlockData -> !Arrays.equals(cachedBlockData.getReference(), parentSignature))
if (blockDataList.isEmpty()) {
int numberRequested = Math.min(Network.MAX_BLOCKS_PER_REPLY, getBlocksMessage.getNumberRequested());
BlockData blockData = repository.getBlockRepository().fromReference(parentSignature);
while (blockData != null && blockDataList.size() < numberRequested) {
blockData = repository.getBlockRepository().fromReference(blockData.getSignature());
} else {
if (blockDataList.size() >= getBlocksMessage.getNumberRequested())
List<Block> blocks = new ArrayList<>();
for (BlockData blockData : blockDataList) {
Block block = new Block(repository, blockData);
Message blocksMessage = new BlocksMessage(blocks);
if (!peer.sendMessage(blocksMessage))
peer.disconnect("failed to send blocks");
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while sending blocks after %s to peer %s", Base58.encode(parentSignature), peer), e);
private void onNetworkTransactionMessage(Peer peer, Message message) {
TransactionMessage transactionMessage = (TransactionMessage) message;
TransactionData transactionData = transactionMessage.getTransactionData();
@ -22,8 +22,10 @@ import org.qortal.data.transaction.RewardShareTransactionData;
import org.qortal.data.transaction.TransactionData;
import org.qortal.network.Peer;
import org.qortal.network.message.BlockMessage;
import org.qortal.network.message.BlocksMessage;
import org.qortal.network.message.BlockSummariesMessage;
import org.qortal.network.message.GetBlockMessage;
import org.qortal.network.message.GetBlocksMessage;
import org.qortal.network.message.GetBlockSummariesMessage;
import org.qortal.network.message.GetSignaturesV2Message;
import org.qortal.network.message.Message;
@ -56,6 +58,9 @@ public class Synchronizer {
/** Number of retry attempts if a peer fails to respond with the requested data */
private static final int MAXIMUM_RETRIES = 3; // XXX move to Settings?
/* Minimum peer version that supports syncing multiple blocks at once via GetBlocksMessage */
private static final long PEER_VERSION_150 = 0x0100050000L;
private static Synchronizer instance;
@ -360,97 +365,161 @@ public class Synchronizer {
// Overall plan: fetch peer's blocks first, then orphan, then apply
// Convert any leftover (post-common) block summaries into signatures to request from peer
List<byte[]> peerBlockSignatures = peerBlockSummaries.stream().map(BlockSummaryData::getSignature).collect(Collectors.toList());
// Calculate the total number of additional blocks this peer has beyond the common block
int additionalPeerBlocksAfterCommonBlock = peerHeight - commonBlockHeight;
// Subtract the number of signatures that we already have, as we don't need to request them again
int numberSignaturesRequired = additionalPeerBlocksAfterCommonBlock - peerBlockSignatures.size();
// Fetch remaining block signatures, if needed
int retryCount = 0;
while (numberSignaturesRequired > 0) {
byte[] latestPeerSignature = peerBlockSignatures.isEmpty() ? commonBlockSig : peerBlockSignatures.get(peerBlockSignatures.size() - 1);
int lastPeerHeight = commonBlockHeight + peerBlockSignatures.size();
int numberOfSignaturesToRequest = Math.min(numberSignaturesRequired, MAXIMUM_REQUEST_SIZE);
LOGGER.trace(String.format("Requesting %d signature%s after height %d, sig %.8s",
numberOfSignaturesToRequest, (numberOfSignaturesToRequest != 1 ? "s": ""), lastPeerHeight, Base58.encode(latestPeerSignature)));
List<byte[]> moreBlockSignatures = this.getBlockSignatures(peer, latestPeerSignature, numberOfSignaturesToRequest);
if (moreBlockSignatures == null || moreBlockSignatures.isEmpty()) {
LOGGER.info(String.format("Peer %s failed to respond with more block signatures after height %d, sig %.8s", peer,
lastPeerHeight, Base58.encode(latestPeerSignature)));
if (retryCount >= MAXIMUM_RETRIES) {
// Give up with this peer
return SynchronizationResult.NO_REPLY;
else {
// Retry until retryCount reaches MAXIMUM_RETRIES
int triesRemaining = MAXIMUM_RETRIES - retryCount;
LOGGER.info(String.format("Re-issuing request to peer %s (%d attempt%s remaining)", peer, triesRemaining, (triesRemaining != 1 ? "s": "")));
// Reset retryCount because the last request succeeded
retryCount = 0;
LOGGER.trace(String.format("Received %s signature%s", peerBlockSignatures.size(), (peerBlockSignatures.size() != 1 ? "s" : "")));
numberSignaturesRequired = additionalPeerBlocksAfterCommonBlock - peerBlockSignatures.size();
// Fetch blocks using signatures
LOGGER.debug(String.format("Fetching new blocks from peer %s after height %d", peer, commonBlockHeight));
// Firstly, attempt to retrieve the blocks themselves, rather than signatures. This is supported by newer peers.
// We could optionally check for a version here if we didn't want to make unnecessary requests
List<Block> peerBlocks = new ArrayList<>();
retryCount = 0;
while (peerBlocks.size() < peerBlockSignatures.size()) {
byte[] blockSignature = peerBlockSignatures.get(peerBlocks.size());
if (peer.getPeersVersion() >= PEER_VERSION_150) {
// This peer supports syncing multiple blocks at once via GetBlocksMessage
int numberBlocksRequired = additionalPeerBlocksAfterCommonBlock - peerBlocks.size();
while (numberBlocksRequired > 0) {
if (Controller.isStopping())
return SynchronizationResult.SHUTTING_DOWN;
LOGGER.debug(String.format("Fetching block with signature %.8s", Base58.encode(blockSignature)));
int blockHeightToRequest = commonBlockHeight + peerBlocks.size() + 1; // +1 because we are requesting the next block, beyond what we already have in the peerBlocks array
Block newBlock = this.fetchBlock(repository, peer, blockSignature);
byte[] latestPeerSignature = peerBlocks.isEmpty() ? commonBlockSig : peerBlocks.get(peerBlocks.size() - 1).getSignature();
int lastPeerHeight = commonBlockHeight + peerBlocks.size();
int numberOfBlocksToRequest = Math.min(numberBlocksRequired, MAXIMUM_REQUEST_SIZE);
if (newBlock == null) {
LOGGER.info(String.format("Peer %s failed to respond with block for height %d, sig %.8s", peer, blockHeightToRequest, Base58.encode(blockSignature)));
LOGGER.trace(String.format("Requesting %d block%s after height %d, sig %.8s",
numberOfBlocksToRequest, (numberOfBlocksToRequest != 1 ? "s" : ""), lastPeerHeight, Base58.encode(latestPeerSignature)));
if (retryCount >= MAXIMUM_RETRIES) {
// Give up with this peer
return SynchronizationResult.NO_REPLY;
List<Block> blocks = this.fetchBlocks(repository, peer, latestPeerSignature, numberOfBlocksToRequest);
if (blocks == null || blocks.isEmpty()) {
LOGGER.info(String.format("Peer %s failed to respond with more blocks after height %d, sig %.8s", peer,
lastPeerHeight, Base58.encode(latestPeerSignature)));
if (peerBlocks.isEmpty()) {
return SynchronizationResult.NO_REPLY;
else {
// Retry until retryCount reaches MAXIMUM_RETRIES
int triesRemaining = MAXIMUM_RETRIES - retryCount;
LOGGER.info(String.format("Re-issuing request to peer %s (%d attempt%s remaining)", peer, triesRemaining, (triesRemaining != 1 ? "s": "")));
LOGGER.debug(String.format("Received %d blocks from peer %s", blocks.size(), peer));
try {
for (Block block : blocks) {
// Set the repository, because we couldn't do that when originally constructing the Block
// Transactions are transmitted without approval status so determine that now
for (Transaction transaction : block.getTransactions()) {
} catch (IllegalStateException e) {
LOGGER.error("Error processing transactions in block", e);
return SynchronizationResult.REPOSITORY_ISSUE;
numberBlocksRequired = additionalPeerBlocksAfterCommonBlock - peerBlocks.size();
if (!newBlock.isSignatureValid()) {
LOGGER.info(String.format("Peer %s sent block with invalid signature for height %d, sig %.8s", peer,
blockHeightToRequest, Base58.encode(blockSignature)));
return SynchronizationResult.INVALID_DATA;
// Reset retryCount because the last request succeeded
retryCount = 0;
LOGGER.debug(String.format("Received block with height %d, sig: %.8s", newBlock.getBlockData().getHeight(), Base58.encode(blockSignature)));
// Transactions are transmitted without approval status so determine that now
for (Transaction transaction : newBlock.getTransactions())
else {
// Older peer version - use slow sync
// Convert any leftover (post-common) block summaries into signatures to request from peer
List<byte[]> peerBlockSignatures = peerBlockSummaries.stream().map(BlockSummaryData::getSignature).collect(Collectors.toList());
// Subtract the number of signatures that we already have, as we don't need to request them again
int numberSignaturesRequired = additionalPeerBlocksAfterCommonBlock - peerBlockSignatures.size();
// Fetch remaining block signatures, if needed
int retryCount = 0;
while (numberSignaturesRequired > 0) {
if (Controller.isStopping())
return SynchronizationResult.SHUTTING_DOWN;
byte[] latestPeerSignature = peerBlockSignatures.isEmpty() ? commonBlockSig : peerBlockSignatures.get(peerBlockSignatures.size() - 1);
int lastPeerHeight = commonBlockHeight + peerBlockSignatures.size();
int numberOfSignaturesToRequest = Math.min(numberSignaturesRequired, MAXIMUM_REQUEST_SIZE);
LOGGER.trace(String.format("Requesting %d signature%s after height %d, sig %.8s",
numberOfSignaturesToRequest, (numberOfSignaturesToRequest != 1 ? "s" : ""), lastPeerHeight, Base58.encode(latestPeerSignature)));
List<byte[]> moreBlockSignatures = this.getBlockSignatures(peer, latestPeerSignature, numberOfSignaturesToRequest);
if (moreBlockSignatures == null || moreBlockSignatures.isEmpty()) {
LOGGER.info(String.format("Peer %s failed to respond with more block signatures after height %d, sig %.8s", peer,
lastPeerHeight, Base58.encode(latestPeerSignature)));
if (retryCount >= MAXIMUM_RETRIES) {
// Give up with this peer
return SynchronizationResult.NO_REPLY;
} else {
// Retry until retryCount reaches MAXIMUM_RETRIES
int triesRemaining = MAXIMUM_RETRIES - retryCount;
LOGGER.info(String.format("Re-issuing request to peer %s (%d attempt%s remaining)", peer, triesRemaining, (triesRemaining != 1 ? "s" : "")));
// Reset retryCount because the last request succeeded
retryCount = 0;
LOGGER.trace(String.format("Received %s signature%s", peerBlockSignatures.size(), (peerBlockSignatures.size() != 1 ? "s" : "")));
numberSignaturesRequired = additionalPeerBlocksAfterCommonBlock - peerBlockSignatures.size();
// Fetch blocks using signatures
LOGGER.debug(String.format("Fetching new blocks from peer %s after height %d", peer, commonBlockHeight));
retryCount = 0;
while (peerBlocks.size() < peerBlockSignatures.size()) {
if (Controller.isStopping())
return SynchronizationResult.SHUTTING_DOWN;
byte[] blockSignature = peerBlockSignatures.get(peerBlocks.size());
LOGGER.debug(String.format("Fetching block with signature %.8s", Base58.encode(blockSignature)));
int blockHeightToRequest = commonBlockHeight + peerBlocks.size() + 1; // +1 because we are requesting the next block, beyond what we already have in the peerBlocks array
Block newBlock = this.fetchBlock(repository, peer, blockSignature);
if (newBlock == null) {
LOGGER.info(String.format("Peer %s failed to respond with block for height %d, sig %.8s", peer, blockHeightToRequest, Base58.encode(blockSignature)));
if (retryCount >= MAXIMUM_RETRIES) {
// Give up with this peer
return SynchronizationResult.NO_REPLY;
} else {
// Retry until retryCount reaches MAXIMUM_RETRIES
int triesRemaining = MAXIMUM_RETRIES - retryCount;
LOGGER.info(String.format("Re-issuing request to peer %s (%d attempt%s remaining)", peer, triesRemaining, (triesRemaining != 1 ? "s" : "")));
if (!newBlock.isSignatureValid()) {
LOGGER.info(String.format("Peer %s sent block with invalid signature for height %d, sig %.8s", peer,
blockHeightToRequest, Base58.encode(blockSignature)));
return SynchronizationResult.INVALID_DATA;
// Reset retryCount because the last request succeeded
retryCount = 0;
LOGGER.debug(String.format("Received block with height %d, sig: %.8s", newBlock.getBlockData().getHeight(), Base58.encode(blockSignature)));
// Transactions are transmitted without approval status so determine that now
for (Transaction transaction : newBlock.getTransactions())
// Unwind to common block (unless common block is our latest block)
LOGGER.debug(String.format("Orphaning blocks back to common block height %d, sig %.8s", commonBlockHeight, commonBlockSig58));
@ -625,6 +694,22 @@ public class Synchronizer {
return new Block(repository, blockMessage.getBlockData(), blockMessage.getTransactions(), blockMessage.getAtStates());
private List<Block> fetchBlocks(Repository repository, Peer peer, byte[] parentSignature, int numberRequested) throws InterruptedException {
Message getBlocksMessage = new GetBlocksMessage(parentSignature, numberRequested);
Message message = peer.getResponse(getBlocksMessage);
if (message == null || message.getType() != MessageType.BLOCKS) {
return null;
BlocksMessage blocksMessage = (BlocksMessage) message;
if (blocksMessage == null || blocksMessage.getBlocks() == null) {
return null;
return blocksMessage.getBlocks();
private void populateBlockSummariesMinterLevels(Repository repository, List<BlockSummaryData> blockSummaries) throws DataException {
final int firstBlockHeight = blockSummaries.get(0).getHeight();
@ -89,6 +89,7 @@ public class Network {
public static final int MAX_SIGNATURES_PER_REPLY = 500;
public static final int MAX_BLOCK_SUMMARIES_PER_REPLY = 500;
public static final int MAX_BLOCKS_PER_REPLY = 500;
// Generate our node keys / ID
private final Ed25519PrivateKeyParameters edPrivateKeyParams = new Ed25519PrivateKeyParameters(new SecureRandom());
Normal file
Normal file
@ -0,0 +1,90 @@
package org.qortal.network.message;
import com.google.common.primitives.Ints;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.qortal.block.Block;
import org.qortal.data.at.ATStateData;
import org.qortal.data.block.BlockData;
import org.qortal.data.transaction.TransactionData;
import org.qortal.transform.TransformationException;
import org.qortal.transform.block.BlockTransformer;
import org.qortal.utils.Triple;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
public class BlocksMessage extends Message {
private static final Logger LOGGER = LogManager.getLogger(BlocksMessage.class);
private List<Block> blocks;
public BlocksMessage(List<Block> blocks) {
this(-1, blocks);
private BlocksMessage(int id, List<Block> blocks) {
super(id, MessageType.BLOCKS);
this.blocks = blocks;
public List<Block> getBlocks() {
return this.blocks;
public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException {
int count = bytes.getInt();
List<Block> blocks = new ArrayList<>();
for (int i = 0; i < count; ++i) {
int height = bytes.getInt();
try {
boolean finalBlockInBuffer = (i == count-1);
Triple<BlockData, List<TransactionData>, List<ATStateData>> blockInfo = null;
blockInfo = BlockTransformer.fromByteBuffer(bytes, finalBlockInBuffer);
BlockData blockData = blockInfo.getA();
// We are unable to obtain a valid Repository instance here, so set it to null and we will attach it later
Block block = new Block(null, blockData, blockInfo.getB(), blockInfo.getC());
} catch (TransformationException e) {
return null;
return new BlocksMessage(id, blocks);
protected byte[] toData() {
try {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
for (Block block : this.blocks) {
return bytes.toByteArray();
} catch (IOException e) {
return null;
} catch (TransformationException e) {
return null;
@ -0,0 +1,65 @@
package org.qortal.network.message;
import com.google.common.primitives.Ints;
import org.qortal.transform.Transformer;
import org.qortal.transform.block.BlockTransformer;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
public class GetBlocksMessage extends Message {
private static final int BLOCK_SIGNATURE_LENGTH = BlockTransformer.BLOCK_SIGNATURE_LENGTH;
private byte[] parentSignature;
private int numberRequested;
public GetBlocksMessage(byte[] parentSignature, int numberRequested) {
this(-1, parentSignature, numberRequested);
private GetBlocksMessage(int id, byte[] parentSignature, int numberRequested) {
super(id, MessageType.GET_BLOCKS);
this.parentSignature = parentSignature;
this.numberRequested = numberRequested;
public byte[] getParentSignature() {
return this.parentSignature;
public int getNumberRequested() {
return this.numberRequested;
public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException {
if (bytes.remaining() != BLOCK_SIGNATURE_LENGTH + Transformer.INT_LENGTH)
return null;
byte[] parentSignature = new byte[BLOCK_SIGNATURE_LENGTH];
int numberRequested = bytes.getInt();
return new GetBlocksMessage(id, parentSignature, numberRequested);
protected byte[] toData() {
try {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
return bytes.toByteArray();
} catch (IOException e) {
return null;
@ -80,7 +80,10 @@ public abstract class Message {
public final int value;
public final Method fromByteBufferMethod;
@ -315,6 +315,10 @@ public abstract class Transaction {
return this.transactionData;
public void setRepository(Repository repository) {
this.repository = repository;
// More information
public static long getDeadline(TransactionData transactionData) {
@ -74,19 +74,30 @@ public class BlockTransformer extends Transformer {
* Extract block data and transaction data from serialized bytes.
* Extract block data and transaction data from serialized bytes containing a single block.
* @param bytes
* @return BlockData and a List of transactions.
* @throws TransformationException
public static Triple<BlockData, List<TransactionData>, List<ATStateData>> fromByteBuffer(ByteBuffer byteBuffer) throws TransformationException {
return BlockTransformer.fromByteBuffer(byteBuffer, true);
* Extract block data and transaction data from serialized bytes containing one or more blocks.
* @param bytes
* @return the next block's BlockData and a List of transactions.
* @throws TransformationException
public static Triple<BlockData, List<TransactionData>, List<ATStateData>> fromByteBuffer(ByteBuffer byteBuffer, boolean finalBlockInBuffer) throws TransformationException {
int version = byteBuffer.getInt();
if (byteBuffer.remaining() < BASE_LENGTH + AT_BYTES_LENGTH - VERSION_LENGTH)
if (finalBlockInBuffer && byteBuffer.remaining() < BASE_LENGTH + AT_BYTES_LENGTH - VERSION_LENGTH)
throw new TransformationException("Byte data too short for Block");
if (byteBuffer.remaining() > BlockChain.getInstance().getMaxBlockSize())
if (finalBlockInBuffer && byteBuffer.remaining() > BlockChain.getInstance().getMaxBlockSize())
throw new TransformationException("Byte data too long for Block");
long timestamp = byteBuffer.getLong();
@ -210,7 +221,8 @@ public class BlockTransformer extends Transformer {
if (byteBuffer.hasRemaining())
// We should only complain about excess byte data if we aren't expecting more blocks in this ByteBuffer
if (finalBlockInBuffer && byteBuffer.hasRemaining())
throw new TransformationException("Excess byte data found after parsing Block");
// We don't have a height!
Reference in New Issue
Block a user