More work on synchronization

Various fixes to synchronization

Added missing code for processing incoming block summaries in Network.

Fixed block summaries serialization and removed references to BlockData.

Fixed bug in transaction transformation where base transaction length
didn't include reference or fee lengths.

Original commit was ebbab7b
This commit is contained in:
catbref 2019-05-06 09:17:54 +01:00
parent 57b982d2fb
commit 06e6802d97
6 changed files with 72 additions and 24 deletions

View File

@ -21,12 +21,15 @@ import org.qora.block.Block;
import org.qora.block.BlockChain; import org.qora.block.BlockChain;
import org.qora.block.BlockGenerator; import org.qora.block.BlockGenerator;
import org.qora.data.block.BlockData; import org.qora.data.block.BlockData;
import org.qora.data.network.BlockSummaryData;
import org.qora.data.network.PeerData; import org.qora.data.network.PeerData;
import org.qora.data.transaction.TransactionData; import org.qora.data.transaction.TransactionData;
import org.qora.network.Network; import org.qora.network.Network;
import org.qora.network.Peer; import org.qora.network.Peer;
import org.qora.network.message.BlockMessage; import org.qora.network.message.BlockMessage;
import org.qora.network.message.BlockSummariesMessage;
import org.qora.network.message.GetBlockMessage; import org.qora.network.message.GetBlockMessage;
import org.qora.network.message.GetBlockSummariesMessage;
import org.qora.network.message.GetPeersMessage; import org.qora.network.message.GetPeersMessage;
import org.qora.network.message.GetSignaturesMessage; import org.qora.network.message.GetSignaturesMessage;
import org.qora.network.message.HeightMessage; import org.qora.network.message.HeightMessage;
@ -204,7 +207,7 @@ public class Controller extends Thread {
potentiallySynchronize(); potentiallySynchronize();
// Query random connections for unconfirmed transactions // Query random connections for unconfirmed transactions?
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
// time to exit // time to exit
@ -225,10 +228,10 @@ public class Controller extends Thread {
for(Peer peer : peers) for(Peer peer : peers)
LOGGER.trace(String.format("Peer %s is at height %d", peer, peer.getPeerData().getLastHeight())); LOGGER.trace(String.format("Peer %s is at height %d", peer, peer.getPeerData().getLastHeight()));
// Remove peers with lower, or unknown, height // Remove peers with unknown, or same, height
peers.removeIf(peer -> { peers.removeIf(peer -> {
Integer peerHeight = peer.getPeerData().getLastHeight(); Integer peerHeight = peer.getPeerData().getLastHeight();
return peerHeight == null || peerHeight <= ourHeight; return peerHeight == null;
}); });
// Remove peers that have "misbehaved" recently // Remove peers that have "misbehaved" recently
@ -260,8 +263,10 @@ public class Controller extends Thread {
LOGGER.debug(String.format("Synchronized with peer %s", peer)); LOGGER.debug(String.format("Synchronized with peer %s", peer));
// Broadcast our new height // Broadcast our new height (if changed)
Network.getInstance().broadcast(recipientPeer -> new HeightMessage(getChainHeight())); int updatedHeight = getChainHeight();
if (updatedHeight != ourHeight)
Network.getInstance().broadcast(recipientPeer -> new HeightMessage(updatedHeight));
} }
} }
@ -363,7 +368,7 @@ public class Controller extends Thread {
parentSignature = blockData.getSignature(); parentSignature = blockData.getSignature();
signatures.add(parentSignature); signatures.add(parentSignature);
} while (signatures.size() < 500); } while (signatures.size() < Network.MAX_SIGNATURES_PER_REPLY);
Message signaturesMessage = new SignaturesMessage(signatures); Message signaturesMessage = new SignaturesMessage(signatures);
signaturesMessage.setId(message.getId()); signaturesMessage.setId(message.getId());
@ -432,6 +437,35 @@ public class Controller extends Thread {
} }
break; break;
case GET_BLOCK_SUMMARIES:
try (final Repository repository = RepositoryManager.getRepository()) {
GetBlockSummariesMessage getBlockSummariesMessage = (GetBlockSummariesMessage) message;
byte[] parentSignature = getBlockSummariesMessage.getParentSignature();
List<BlockSummaryData> blockSummaries = new ArrayList<>();
int numberRequested = Math.min(Network.MAX_BLOCK_SUMMARIES_PER_REPLY, getBlockSummariesMessage.getNumberRequested());
do {
BlockData blockData = repository.getBlockRepository().fromReference(parentSignature);
if (blockData == null)
break;
BlockSummaryData blockSummary = new BlockSummaryData(blockData);
blockSummaries.add(blockSummary);
parentSignature = blockData.getSignature();
} while (blockSummaries.size() < numberRequested);
Message blockSummariesMessage = new BlockSummariesMessage(blockSummaries);
blockSummariesMessage.setId(message.getId());
if (!peer.sendMessage(blockSummariesMessage))
peer.disconnect();
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while responding to %s from peer %s", message.getType().name(), peer), e);
}
break;
default: default:
break; break;
} }

View File

@ -48,6 +48,15 @@ public class Synchronizer {
return instance; return instance;
} }
/**
* Attempt to synchronize blockchain with peer.
* <p>
* Will return <tt>true</tt> if synchronization succeeded,
* even if no changes were made to our blockchain.
* <p>
* @param peer
* @return false if something went wrong, true otherwise.
*/
public boolean synchronize(Peer peer) { public boolean synchronize(Peer peer) {
// Make sure we're the only thread modifying the blockchain // Make sure we're the only thread modifying the blockchain
// If we're already synchronizing with another peer then this will also return fast // If we're already synchronizing with another peer then this will also return fast
@ -58,9 +67,9 @@ public class Synchronizer {
try { try {
this.repository = repository; this.repository = repository;
this.ourHeight = this.repository.getBlockRepository().getBlockchainHeight(); this.ourHeight = this.repository.getBlockRepository().getBlockchainHeight();
int peerHeight = peer.getPeerData().getLastHeight(); final int peerHeight = peer.getPeerData().getLastHeight();
LOGGER.info(String.format("Synchronizing with peer %s from height %d to height %d", peer, this.ourHeight, peerHeight)); LOGGER.info(String.format("Synchronizing with peer %s at height %d, our height %d", peer, peerHeight, this.ourHeight));
List<byte[]> signatures = findSignaturesFromCommonBlock(peer); List<byte[]> signatures = findSignaturesFromCommonBlock(peer);
if (signatures == null) { if (signatures == null) {
@ -70,13 +79,17 @@ public class Synchronizer {
// First signature is common block // First signature is common block
BlockData commonBlockData = this.repository.getBlockRepository().fromSignature(signatures.get(0)); BlockData commonBlockData = this.repository.getBlockRepository().fromSignature(signatures.get(0));
int commonBlockHeight = commonBlockData.getHeight(); final int commonBlockHeight = commonBlockData.getHeight();
LOGGER.info(String.format("Common block with peer %s is at height %d", peer, commonBlockHeight)); LOGGER.info(String.format("Common block with peer %s is at height %d", peer, commonBlockHeight));
signatures.remove(0); signatures.remove(0);
// If common block is peer's latest block then we simply have a longer chain to peer, so exit now
if (commonBlockHeight == peerHeight)
return true;
// If common block is too far behind us then we're on massively different forks so give up. // If common block is too far behind us then we're on massively different forks so give up.
int minHeight = ourHeight - MAXIMUM_HEIGHT_DELTA; int minHeight = ourHeight - MAXIMUM_HEIGHT_DELTA;
if (commonBlockData.getHeight() < minHeight) { if (commonBlockHeight < minHeight) {
LOGGER.info(String.format("Blockchain too divergent with peer %s", peer)); LOGGER.info(String.format("Blockchain too divergent with peer %s", peer));
return false; return false;
} }
@ -117,11 +130,11 @@ public class Synchronizer {
} }
} }
if (this.ourHeight > commonBlockData.getHeight()) { if (this.ourHeight > commonBlockHeight) {
// Unwind to common block (unless common block is our latest block) // Unwind to common block (unless common block is our latest block)
LOGGER.debug(String.format("Orphaning blocks back to height %d", commonBlockData.getHeight())); LOGGER.debug(String.format("Orphaning blocks back to height %d", commonBlockHeight));
while (this.ourHeight > commonBlockData.getHeight()) { while (this.ourHeight > commonBlockHeight) {
BlockData blockData = repository.getBlockRepository().fromHeight(this.ourHeight); BlockData blockData = repository.getBlockRepository().fromHeight(this.ourHeight);
Block block = new Block(repository, blockData); Block block = new Block(repository, blockData);
block.orphan(); block.orphan();
@ -129,7 +142,7 @@ public class Synchronizer {
--this.ourHeight; --this.ourHeight;
} }
LOGGER.debug(String.format("Orphaned blocks back to height %d - fetching blocks from peer", commonBlockData.getHeight(), peer)); LOGGER.debug(String.format("Orphaned blocks back to height %d - fetching blocks from peer", commonBlockHeight, peer));
} else { } else {
LOGGER.debug(String.format("Fetching new blocks from peer %s", peer)); LOGGER.debug(String.format("Fetching new blocks from peer %s", peer));
} }
@ -193,7 +206,7 @@ public class Synchronizer {
} }
/** /**
* Returns list of block signatures start with common block with peer. * Returns list of peer's block signatures starting with common block with peer.
* *
* @param peer * @param peer
* @return block signatures * @return block signatures

View File

@ -55,6 +55,8 @@ public class Network extends Thread {
/** Maximum time since last successful connection for peer info to be propagated, in milliseconds. */ /** Maximum time since last successful connection for peer info to be propagated, in milliseconds. */
private static final long RECENT_CONNECTION_THRESHOLD = 24 * 60 * 60 * 1000; // ms private static final long RECENT_CONNECTION_THRESHOLD = 24 * 60 * 60 * 1000; // ms
public static final int MAX_SIGNATURES_PER_REPLY = 500;
public static final int MAX_BLOCK_SUMMARIES_PER_REPLY = 500;
public static final int PEER_ID_LENGTH = 128; public static final int PEER_ID_LENGTH = 128;
private final byte[] ourPeerId; private final byte[] ourPeerId;

View File

@ -6,9 +6,7 @@ import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.stream.Collectors;
import org.qora.data.block.BlockData;
import org.qora.data.network.BlockSummaryData; import org.qora.data.network.BlockSummaryData;
import org.qora.transform.Transformer; import org.qora.transform.Transformer;
import org.qora.transform.block.BlockTransformer; import org.qora.transform.block.BlockTransformer;
@ -21,9 +19,8 @@ public class BlockSummariesMessage extends Message {
private List<BlockSummaryData> blockSummaries; private List<BlockSummaryData> blockSummaries;
public BlockSummariesMessage(List<BlockData> blocksData) { public BlockSummariesMessage(List<BlockSummaryData> blockSummaries) {
// Extract what we need from block data this(-1, blockSummaries);
this(-1, blocksData.stream().map(blockData -> new BlockSummaryData(blockData)).collect(Collectors.toList()));
} }
private BlockSummariesMessage(int id, List<BlockSummaryData> blockSummaries) { private BlockSummariesMessage(int id, List<BlockSummaryData> blockSummaries) {
@ -67,7 +64,7 @@ public class BlockSummariesMessage extends Message {
bytes.write(Ints.toByteArray(this.blockSummaries.size())); bytes.write(Ints.toByteArray(this.blockSummaries.size()));
for (BlockSummaryData blockSummary : this.blockSummaries) { for (BlockSummaryData blockSummary : this.blockSummaries) {
bytes.write(blockSummary.getHeight()); bytes.write(Ints.toByteArray(blockSummary.getHeight()));
bytes.write(blockSummary.getSignature()); bytes.write(blockSummary.getSignature());
bytes.write(blockSummary.getGeneratorPublicKey()); bytes.write(blockSummary.getGeneratorPublicKey());
} }

View File

@ -8,6 +8,8 @@ import java.nio.ByteBuffer;
import org.qora.transform.Transformer; import org.qora.transform.Transformer;
import org.qora.transform.block.BlockTransformer; import org.qora.transform.block.BlockTransformer;
import com.google.common.primitives.Ints;
public class GetBlockSummariesMessage extends Message { public class GetBlockSummariesMessage extends Message {
private static final int BLOCK_SIGNATURE_LENGTH = BlockTransformer.BLOCK_SIGNATURE_LENGTH; private static final int BLOCK_SIGNATURE_LENGTH = BlockTransformer.BLOCK_SIGNATURE_LENGTH;
@ -53,7 +55,7 @@ public class GetBlockSummariesMessage extends Message {
bytes.write(this.parentSignature); bytes.write(this.parentSignature);
bytes.write(this.numberRequested); bytes.write(Ints.toByteArray(this.numberRequested));
return bytes.toByteArray(); return bytes.toByteArray();
} catch (IOException e) { } catch (IOException e) {

View File

@ -222,8 +222,8 @@ public abstract class TransactionTransformer extends Transformer {
} }
protected static int getBaseLength(TransactionData transactionData) { protected static int getBaseLength(TransactionData transactionData) {
// All transactions have at least txType, timestamp, maybe txGroupId, tx creator's public key and finally signature (on the end) // All transactions have at least txType, timestamp, reference, tx creator's public key and also fee and signature (on the end)
int baseLength = TYPE_LENGTH + TIMESTAMP_LENGTH + PUBLIC_KEY_LENGTH + SIGNATURE_LENGTH; int baseLength = TYPE_LENGTH + TIMESTAMP_LENGTH + REFERENCE_LENGTH + PUBLIC_KEY_LENGTH + FEE_LENGTH + SIGNATURE_LENGTH;
if (transactionData.getTimestamp() >= BlockChain.getInstance().getQoraV2Timestamp()) if (transactionData.getTimestamp() >= BlockChain.getInstance().getQoraV2Timestamp())
baseLength += GROUPID_LENGTH; baseLength += GROUPID_LENGTH;