diff --git a/src/main/java/org/qora/block/BlockGenerator.java b/src/main/java/org/qora/block/BlockGenerator.java index c3e5a412..b6ab9a69 100644 --- a/src/main/java/org/qora/block/BlockGenerator.java +++ b/src/main/java/org/qora/block/BlockGenerator.java @@ -70,12 +70,20 @@ public class BlockGenerator extends Thread { // Sleep for a while try { repository.discardChanges(); // Free repository locks, if any - Thread.sleep(1000); // No point sleeping less than this as block timestamp millisecond values must be the same + Thread.sleep(1000); } catch (InterruptedException e) { // We've been interrupted - time to exit return; } + // Don't generate if we don't have enough connected peers as where would the transactions/consensus come from? + if (Network.getInstance().getUniqueHandshakedPeers().size() < Settings.getInstance().getMinBlockchainPeers()) + continue; + + // Don't generate if it looks like we're behind + if (!Controller.getInstance().isUpToDate()) + continue; + // Check blockchain hasn't changed BlockData lastBlockData = blockRepository.getLastBlock(); if (previousBlock == null || !Arrays.equals(previousBlock.getSignature(), lastBlockData.getSignature())) { @@ -83,10 +91,6 @@ public class BlockGenerator extends Thread { newBlocks.clear(); } - // Don't generate if we don't have enough connected peers as where would the transactions/consensus come from? - if (Network.getInstance().getUniqueHandshakedPeers().size() < Settings.getInstance().getMinBlockchainPeers()) - continue; - // Do we need to build any potential new blocks? List forgingAccountsData = repository.getAccountRepository().getForgingAccounts(); List forgingAccounts = forgingAccountsData.stream().map(accountData -> new PrivateKeyAccount(repository, accountData.getSeed())).collect(Collectors.toList()); @@ -108,7 +112,9 @@ public class BlockGenerator extends Thread { // Make sure we're the only thread modifying the blockchain ReentrantLock blockchainLock = Controller.getInstance().getBlockchainLock(); - if (blockchainLock.tryLock()) + if (blockchainLock.tryLock()) { + boolean newBlockGenerated = false; + generation: try { // Clear repository's "in transaction" state so we don't cause a repository deadlock repository.discardChanges(); @@ -160,7 +166,7 @@ public class BlockGenerator extends Thread { repository.saveChanges(); // Notify controller - Controller.getInstance().onGeneratedBlock(newBlock.getBlockData()); + newBlockGenerated = true; } catch (DataException e) { // Unable to process block - report and discard LOGGER.error("Unable to process newly generated block?", e); @@ -169,6 +175,10 @@ public class BlockGenerator extends Thread { } finally { blockchainLock.unlock(); } + + if (newBlockGenerated) + Controller.getInstance().onGeneratedBlock(); + } } } catch (DataException e) { LOGGER.warn("Repository issue while running block generator", e); diff --git a/src/main/java/org/qora/controller/Controller.java b/src/main/java/org/qora/controller/Controller.java index 27fb5dc3..c3c1ef76 100644 --- a/src/main/java/org/qora/controller/Controller.java +++ b/src/main/java/org/qora/controller/Controller.java @@ -8,9 +8,11 @@ import java.time.LocalDateTime; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Properties; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Predicate; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -36,6 +38,7 @@ import org.qora.network.message.GetPeersMessage; import org.qora.network.message.GetSignaturesMessage; import org.qora.network.message.GetSignaturesV2Message; import org.qora.network.message.HeightMessage; +import org.qora.network.message.HeightV2Message; import org.qora.network.message.Message; import org.qora.network.message.SignaturesMessage; import org.qora.network.message.TransactionMessage; @@ -136,6 +139,16 @@ public class Controller extends Thread { } } + /** Returns highest block, or null if there's a repository issue */ + public BlockData getChainTip() { + try (final Repository repository = RepositoryManager.getRepository()) { + return repository.getBlockRepository().getLastBlock(); + } catch (DataException e) { + LOGGER.error("Repository issue when fetching blockchain tip", e); + return null; + } + } + public ReentrantLock getBlockchainLock() { return this.blockchainLock; } @@ -222,7 +235,7 @@ public class Controller extends Thread { try { while (!isStopping) { - Thread.sleep(60 * 1000); + Thread.sleep(14 * 1000); potentiallySynchronize(); @@ -235,31 +248,24 @@ public class Controller extends Thread { } private void potentiallySynchronize() { - int ourHeight = getChainHeight(); - if (ourHeight == 0) - return; - - // If we have enough peers, potentially synchronize List peers = Network.getInstance().getUniqueHandshakedPeers(); + + // Check we have enough peers to potentially synchronize if (peers.size() < Settings.getInstance().getMinBlockchainPeers()) return; for(Peer peer : peers) LOGGER.trace(String.format("Peer %s is at height %d", peer, peer.getPeerData().getLastHeight())); - // Remove peers with unknown, or same, height - peers.removeIf(peer -> { - Integer peerHeight = peer.getPeerData().getLastHeight(); - return peerHeight == null; - }); + // Remove peers with unknown height, lower height or same height and same block signature (unless we don't have their block signature) + peers.removeIf(hasShorterBlockchain()); // Remove peers that have "misbehaved" recently - peers.removeIf(peer -> { - Long lastMisbehaved = peer.getPeerData().getLastMisbehaved(); - return lastMisbehaved != null && lastMisbehaved > NTP.getTime() - MISBEHAVIOUR_COOLOFF; - }); + peers.removeIf(hasPeerMisbehaved); if (!peers.isEmpty()) { + int ourHeight = getChainHeight(); + // Pick random peer to sync with int index = new SecureRandom().nextInt(peers.size()); Peer peer = peers.get(index); @@ -294,14 +300,15 @@ public class Controller extends Thread { break; case OK: - LOGGER.debug(String.format("Synchronized with peer %s", peer)); + case NOTHING_TO_DO: + LOGGER.debug(String.format("Synchronized with peer %s (%s)", peer, syncResult.name())); break; } // Broadcast our new height (if changed) - int updatedHeight = getChainHeight(); - if (updatedHeight != ourHeight) - Network.getInstance().broadcast(recipientPeer -> new HeightMessage(updatedHeight)); + BlockData latestBlockData = getChainTip(); + if (latestBlockData.getHeight() != ourHeight) + Network.getInstance().broadcast(recipientPeer -> Network.getInstance().buildHeightMessage(recipientPeer, latestBlockData)); } } @@ -365,18 +372,24 @@ public class Controller extends Thread { network.broadcast(peer -> network.buildPeersMessage(peer)); // Send our current height - network.broadcast(peer -> new HeightMessage(this.getChainHeight())); + BlockData latestBlockData = getChainTip(); + network.broadcast(peer -> network.buildHeightMessage(peer, latestBlockData)); // Request peers lists network.broadcast(peer -> new GetPeersMessage()); } - public void onGeneratedBlock(BlockData newBlockData) { - // XXX we should really be broadcasting the new block sig, not height - // Could even broadcast top two block sigs so that remote peers can see new block references current network-wide last block + public void onGeneratedBlock() { + // Broadcast our new height info + BlockData latestBlockData = getChainTip(); - // Broadcast our new height - Network.getInstance().broadcast(peer -> new HeightMessage(newBlockData.getHeight())); + Network network = Network.getInstance(); + network.broadcast(peer -> network.buildHeightMessage(peer, latestBlockData)); + } + + public void onNewTransaction(TransactionData transactionData) { + // Send round to all peers + Network.getInstance().broadcast(peer -> new TransactionMessage(transactionData)); } public void onNetworkMessage(Peer peer, Message message) { @@ -387,15 +400,44 @@ public class Controller extends Thread { HeightMessage heightMessage = (HeightMessage) message; // Update our record of peer's height - peer.getPeerData().setLastHeight(heightMessage.getHeight()); + try (final Repository repository = RepositoryManager.getRepository()) { + PeerData peerData = peer.getPeerData(); + + peer.getPeerData().setLastHeight(heightMessage.getHeight()); + + repository.getNetworkRepository().save(peerData); + repository.saveChanges(); + } catch (DataException e) { + LOGGER.error(String.format("Repository issue while updating height of peer %s", peer), e); + } break; - case GET_SIGNATURES: + case HEIGHT_V2: + HeightV2Message heightV2Message = (HeightV2Message) message; + + // Update our record for peer's blockchain info try (final Repository repository = RepositoryManager.getRepository()) { - GetSignaturesMessage getSignaturesMessage = (GetSignaturesMessage) message; - byte[] parentSignature = getSignaturesMessage.getParentSignature(); + PeerData peerData = peer.getPeerData(); + + peerData.setLastHeight(heightV2Message.getHeight()); + peerData.setLastBlockSignature(heightV2Message.getSignature()); + peerData.setLastBlockTimestamp(heightV2Message.getTimestamp()); + peerData.setLastBlockGenerator(heightV2Message.getGenerator()); + + repository.getNetworkRepository().save(peerData); + repository.saveChanges(); + } catch (DataException e) { + LOGGER.error(String.format("Repository issue while updating info of peer %s", peer), e); + } + + break; + case GET_SIGNATURES: { + GetSignaturesMessage getSignaturesMessage = (GetSignaturesMessage) message; + byte[] parentSignature = getSignaturesMessage.getParentSignature(); + + try (final Repository repository = RepositoryManager.getRepository()) { List signatures = new ArrayList<>(); do { @@ -411,17 +453,18 @@ public class Controller extends Thread { Message signaturesMessage = new SignaturesMessage(signatures); signaturesMessage.setId(message.getId()); if (!peer.sendMessage(signaturesMessage)) - peer.disconnect(); + peer.disconnect("failed to send signatures"); } catch (DataException e) { - LOGGER.error(String.format("Repository issue while responding to %s from peer %s", message.getType().name(), peer), e); + LOGGER.error(String.format("Repository issue while sending signatures after %s to peer %s", Base58.encode(parentSignature), peer), e); } break; + } - case GET_SIGNATURES_V2: - try (final Repository repository = RepositoryManager.getRepository()) { - GetSignaturesV2Message getSignaturesMessage = (GetSignaturesV2Message) message; - byte[] parentSignature = getSignaturesMessage.getParentSignature(); + case GET_SIGNATURES_V2: { + GetSignaturesV2Message getSignaturesMessage = (GetSignaturesV2Message) message; + byte[] parentSignature = getSignaturesMessage.getParentSignature(); + try (final Repository repository = RepositoryManager.getRepository()) { List signatures = new ArrayList<>(); do { @@ -437,17 +480,18 @@ public class Controller extends Thread { Message signaturesMessage = new SignaturesMessage(signatures); signaturesMessage.setId(message.getId()); if (!peer.sendMessage(signaturesMessage)) - peer.disconnect(); + peer.disconnect("failed to send signatures (v2)"); } catch (DataException e) { - LOGGER.error(String.format("Repository issue while responding to %s from peer %s", message.getType().name(), peer), e); + LOGGER.error(String.format("Repository issue while sending V2 signatures after %s to peer %s", Base58.encode(parentSignature), peer), e); } break; + } case GET_BLOCK: - try (final Repository repository = RepositoryManager.getRepository()) { - GetBlockMessage getBlockMessage = (GetBlockMessage) message; - byte[] signature = getBlockMessage.getSignature(); + GetBlockMessage getBlockMessage = (GetBlockMessage) message; + byte[] signature = getBlockMessage.getSignature(); + try (final Repository repository = RepositoryManager.getRepository()) { BlockData blockData = repository.getBlockRepository().fromSignature(signature); if (blockData == null) { LOGGER.debug(String.format("Ignoring GET_BLOCK request from peer %s for unknown block %s", peer, Base58.encode(signature))); @@ -460,17 +504,17 @@ public class Controller extends Thread { Message blockMessage = new BlockMessage(block); blockMessage.setId(message.getId()); if (!peer.sendMessage(blockMessage)) - peer.disconnect(); + peer.disconnect("failed to send block"); } catch (DataException e) { - LOGGER.error(String.format("Repository issue while responding to %s from peer %s", message.getType().name(), peer), e); + LOGGER.error(String.format("Repository issue while send block %s to peer %s", Base58.encode(signature), peer), e); } break; case TRANSACTION: - try (final Repository repository = RepositoryManager.getRepository()) { - TransactionMessage transactionMessage = (TransactionMessage) message; + TransactionMessage transactionMessage = (TransactionMessage) message; + TransactionData transactionData = transactionMessage.getTransactionData(); - TransactionData transactionData = transactionMessage.getTransactionData(); + try (final Repository repository = RepositoryManager.getRepository()) { Transaction transaction = Transaction.fromData(repository, transactionData); // Check signature @@ -479,33 +523,40 @@ public class Controller extends Thread { break; } - // Do we have it already? - if (repository.getTransactionRepository().exists(transactionData.getSignature())) { - LOGGER.trace(String.format("Ignoring existing TRANSACTION %s from peer %s", Base58.encode(transactionData.getSignature()), peer)); - break; - } + // Blockchain lock required to prevent multiple threads trying to save the same transaction simultaneously + ReentrantLock blockchainLock = Controller.getInstance().getBlockchainLock(); + blockchainLock.lock(); + try { + // Do we have it already? + if (repository.getTransactionRepository().exists(transactionData.getSignature())) { + LOGGER.trace(String.format("Ignoring existing TRANSACTION %s from peer %s", Base58.encode(transactionData.getSignature()), peer)); + break; + } - // Is it valid? - ValidationResult validationResult = transaction.isValidUnconfirmed(); - if (validationResult != ValidationResult.OK) { - LOGGER.trace(String.format("Ignoring invalid (%s) TRANSACTION %s from peer %s", validationResult.name(), Base58.encode(transactionData.getSignature()), peer)); - break; - } + // Is it valid? + ValidationResult validationResult = transaction.isValidUnconfirmed(); + if (validationResult != ValidationResult.OK) { + LOGGER.trace(String.format("Ignoring invalid (%s) TRANSACTION %s from peer %s", validationResult.name(), Base58.encode(transactionData.getSignature()), peer)); + break; + } - // Seems ok - add to unconfirmed pile - repository.getTransactionRepository().save(transactionData); - repository.getTransactionRepository().unconfirmTransaction(transactionData); - repository.saveChanges(); + // Seems ok - add to unconfirmed pile + repository.getTransactionRepository().save(transactionData); + repository.getTransactionRepository().unconfirmTransaction(transactionData); + repository.saveChanges(); + } finally { + blockchainLock.unlock(); + } } catch (DataException e) { - LOGGER.error(String.format("Repository issue while responding to %s from peer %s", message.getType().name(), peer), e); + LOGGER.error(String.format("Repository issue while processing transaction %s from peer %s", Base58.encode(transactionData.getSignature()), peer), e); } break; case GET_BLOCK_SUMMARIES: - try (final Repository repository = RepositoryManager.getRepository()) { - GetBlockSummariesMessage getBlockSummariesMessage = (GetBlockSummariesMessage) message; - byte[] parentSignature = getBlockSummariesMessage.getParentSignature(); + GetBlockSummariesMessage getBlockSummariesMessage = (GetBlockSummariesMessage) message; + byte[] parentSignature = getBlockSummariesMessage.getParentSignature(); + try (final Repository repository = RepositoryManager.getRepository()) { List blockSummaries = new ArrayList<>(); int numberRequested = Math.min(Network.MAX_BLOCK_SUMMARIES_PER_REPLY, getBlockSummariesMessage.getNumberRequested()); @@ -524,20 +575,67 @@ public class Controller extends Thread { Message blockSummariesMessage = new BlockSummariesMessage(blockSummaries); blockSummariesMessage.setId(message.getId()); if (!peer.sendMessage(blockSummariesMessage)) - peer.disconnect(); + peer.disconnect("failed to send block summaries"); } catch (DataException e) { - LOGGER.error(String.format("Repository issue while responding to %s from peer %s", message.getType().name(), peer), e); + LOGGER.error(String.format("Repository issue while sending block summaries after %s to peer %s", Base58.encode(parentSignature), peer), e); } break; + case GET_PEERS: + // Send our known peers + if (!peer.sendMessage(Network.getInstance().buildPeersMessage(peer))) + peer.disconnect("failed to send peers list"); + break; + default: + LOGGER.debug(String.format("Unhandled %s message [ID %d] from peer %s", message.getType().name(), message.getId(), peer)); break; } } - public void onNewTransaction(TransactionData transactionData) { - // Send round to all peers - Network.getInstance().broadcast(peer -> new TransactionMessage(transactionData)); + // Utilities + + public static final Predicate hasPeerMisbehaved = peer -> { + Long lastMisbehaved = peer.getPeerData().getLastMisbehaved(); + return lastMisbehaved != null && lastMisbehaved > NTP.getTime() - MISBEHAVIOUR_COOLOFF; + }; + + /** True if peer has unknown height, lower height or same height and same block signature (unless we don't have their block signature). */ + public static Predicate hasShorterBlockchain() { + BlockData highestBlockData = getInstance().getChainTip(); + int ourHeight = highestBlockData.getHeight(); + + return peer -> { + PeerData peerData = peer.getPeerData(); + + Integer peerHeight = peerData.getLastHeight(); + if (peerHeight == null || peerHeight < ourHeight) + return true; + + if (peerHeight > ourHeight || peerData.getLastBlockSignature() == null) + return false; + + // Remove if signatures match + return Arrays.equals(peerData.getLastBlockSignature(), highestBlockData.getSignature()); + }; + } + + /** Returns whether we think our node has up-to-date blockchain based on our height info about other peers. */ + public boolean isUpToDate() { + List peers = Network.getInstance().getUniqueHandshakedPeers(); + + // Check we have enough peers to potentially synchronize/generator + if (peers.size() < Settings.getInstance().getMinBlockchainPeers()) + return false; + + // Remove peers with unknown height, lower height or same height and same block signature (unless we don't have their block signature) + peers.removeIf(hasShorterBlockchain()); + + // Remove peers that have "misbehaved" recently + peers.removeIf(hasPeerMisbehaved); + + // If we have any peers left, then they would be candidates for synchronization therefore we're not up to date. + return peers.isEmpty(); } } diff --git a/src/main/java/org/qora/controller/Synchronizer.java b/src/main/java/org/qora/controller/Synchronizer.java index 859cc0fd..d9970a76 100644 --- a/src/main/java/org/qora/controller/Synchronizer.java +++ b/src/main/java/org/qora/controller/Synchronizer.java @@ -9,6 +9,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.qora.block.Block; import org.qora.block.Block.ValidationResult; +import org.qora.block.BlockChain; import org.qora.block.GenesisBlock; import org.qora.data.block.BlockData; import org.qora.data.network.BlockSummaryData; @@ -26,6 +27,7 @@ import org.qora.repository.DataException; import org.qora.repository.Repository; import org.qora.repository.RepositoryManager; import org.qora.transaction.Transaction; +import org.qora.utils.NTP; public class Synchronizer { @@ -35,6 +37,8 @@ public class Synchronizer { private static final int MAXIMUM_BLOCK_STEP = 500; private static final int MAXIMUM_HEIGHT_DELTA = 300; // XXX move to blockchain config? private static final int MAXIMUM_COMMON_DELTA = 60; // XXX move to blockchain config? + /** Maximum age for our latest block before we consider ditching our fork. */ + private static final long MAXIMUM_TIP_AGE = BlockChain.getInstance().getMaxBlockTime() * 1000L * 10; // XXX move to blockchain config? private static final int SYNC_BATCH_SIZE = 200; private static Synchronizer instance; @@ -42,7 +46,7 @@ public class Synchronizer { private Repository repository; public enum SynchronizationResult { - OK, GENESIS_ONLY, NO_COMMON_BLOCK, TOO_FAR_BEHIND, TOO_DIVERGENT, NO_REPLY, INFERIOR_CHAIN, INVALID_DATA, NO_BLOCKCHAIN_LOCK, REPOSITORY_ISSUE; + OK, NOTHING_TO_DO, GENESIS_ONLY, NO_COMMON_BLOCK, TOO_FAR_BEHIND, TOO_DIVERGENT, NO_REPLY, INFERIOR_CHAIN, INVALID_DATA, NO_BLOCKCHAIN_LOCK, REPOSITORY_ISSUE; } // Constructors @@ -75,14 +79,16 @@ public class Synchronizer { try (final Repository repository = RepositoryManager.getRepository()) { try { this.repository = repository; - final int ourInitialHeight = this.repository.getBlockRepository().getBlockchainHeight(); + final BlockData ourLatestBlockData = this.repository.getBlockRepository().getLastBlock(); + final int ourInitialHeight = ourLatestBlockData.getHeight(); int ourHeight = ourInitialHeight; - final int peerHeight = peer.getPeerData().getLastHeight(); + int peerHeight = peer.getPeerData().getLastHeight(); // If peer is at genesis block then peer has no blocks so ignore them for a while if (peerHeight == 1) return SynchronizationResult.GENESIS_ONLY; + // XXX this may well be obsolete now // If peer is too far behind us then don't them. int minHeight = ourHeight - MAXIMUM_HEIGHT_DELTA; if (peerHeight < minHeight) { @@ -90,7 +96,12 @@ public class Synchronizer { return SynchronizationResult.TOO_FAR_BEHIND; } - LOGGER.info(String.format("Synchronizing with peer %s at height %d, our height %d", peer, peerHeight, ourHeight)); + byte[] peersLastBlockSignature = peer.getPeerData().getLastBlockSignature(); + byte[] ourLastBlockSignature = ourLatestBlockData.getSignature(); + if (peerHeight == ourHeight && (peersLastBlockSignature == null || !Arrays.equals(peersLastBlockSignature, ourLastBlockSignature))) + LOGGER.info(String.format("Synchronizing with peer %s at height %d, our height %d, signatures differ", peer, peerHeight, ourHeight)); + else + LOGGER.info(String.format("Synchronizing with peer %s at height %d, our height %d", peer, peerHeight, ourHeight)); List signatures = findSignaturesFromCommonBlock(peer, ourHeight); if (signatures == null) { @@ -104,13 +115,22 @@ public class Synchronizer { LOGGER.debug(String.format("Common block with peer %s is at height %d", peer, commonBlockHeight)); signatures.remove(0); + // If common block height is higher than peer's last reported height + // then peer must have a very recent sync. Update our idea of peer's height. + if (commonBlockHeight > peerHeight) { + LOGGER.debug(String.format("Peer height %d was lower than common block height %d - using higher value", peerHeight, commonBlockHeight)); + peerHeight = commonBlockHeight; + } + + // XXX This may well be obsolete now // If common block is peer's latest block then we simply have the same, or longer, chain to peer, so exit now if (commonBlockHeight == peerHeight) { if (peerHeight == ourHeight) LOGGER.info(String.format("We have the same blockchain as peer %s", peer)); else LOGGER.info(String.format("We have the same blockchain as peer %s, but longer", peer)); - return SynchronizationResult.OK; + + return SynchronizationResult.NOTHING_TO_DO; } // If common block is too far behind us then we're on massively different forks so give up. @@ -121,7 +141,15 @@ public class Synchronizer { } // If we have blocks after common block then decide whether we want to sync (lowest block signature wins) - for (int height = commonBlockHeight + 1; height <= peerHeight && height <= ourHeight; ++height) { + int highestMutualHeight = Math.min(peerHeight, ourHeight); + + // If our latest block is very old, we're very behind and should ditch our fork. + if (ourLatestBlockData.getTimestamp() < NTP.getTime() - MAXIMUM_TIP_AGE) { + LOGGER.info(String.format("Ditching our chain after height %d as our latest block is very old", commonBlockHeight)); + highestMutualHeight = commonBlockHeight; + } + + for (int height = commonBlockHeight + 1; height <= highestMutualHeight; ++height) { int sigIndex = height - commonBlockHeight - 1; // Do we need more signatures? @@ -268,7 +296,7 @@ public class Synchronizer { testSignature = testBlockData.getSignature(); // Ask for block signatures since test block's signature - LOGGER.trace(String.format("Requesting %d signature%s after our height %d", step, (step != 1 ? "s": ""), testHeight)); + LOGGER.trace(String.format("Requesting %d signature%s after height %d", step, (step != 1 ? "s": ""), testHeight)); blockSignatures = this.getBlockSignatures(peer, testSignature, step); if (blockSignatures == null) diff --git a/src/main/java/org/qora/data/network/PeerData.java b/src/main/java/org/qora/data/network/PeerData.java index e4be748b..98e75874 100644 --- a/src/main/java/org/qora/data/network/PeerData.java +++ b/src/main/java/org/qora/data/network/PeerData.java @@ -22,6 +22,9 @@ public class PeerData { private Long lastAttempted; private Long lastConnected; private Integer lastHeight; + private byte[] lastBlockSignature; + private Long lastBlockTimestamp; + private byte[] lastBlockGenerator; private Long lastMisbehaved; // Constructors @@ -30,16 +33,19 @@ public class PeerData { protected PeerData() { } - public PeerData(PeerAddress peerAddress, Long lastAttempted, Long lastConnected, Integer lastHeight, Long lastMisbehaved) { + public PeerData(PeerAddress peerAddress, Long lastAttempted, Long lastConnected, Integer lastHeight, byte[] lastBlockSignature, Long lastBlockTimestamp, byte[] lastBlockGenerator, Long lastMisbehaved) { this.peerAddress = peerAddress; this.lastAttempted = lastAttempted; this.lastConnected = lastConnected; this.lastHeight = lastHeight; + this.lastBlockSignature = lastBlockSignature; + this.lastBlockTimestamp = lastBlockTimestamp; + this.lastBlockGenerator = lastBlockGenerator; this.lastMisbehaved = lastMisbehaved; } public PeerData(PeerAddress peerAddress) { - this(peerAddress, null, null, null, null); + this(peerAddress, null, null, null, null, null, null, null); } // Getters / setters @@ -75,6 +81,30 @@ public class PeerData { this.lastHeight = lastHeight; } + public byte[] getLastBlockSignature() { + return lastBlockSignature; + } + + public void setLastBlockSignature(byte[] lastBlockSignature) { + this.lastBlockSignature = lastBlockSignature; + } + + public Long getLastBlockTimestamp() { + return lastBlockTimestamp; + } + + public void setLastBlockTimestamp(Long lastBlockTimestamp) { + this.lastBlockTimestamp = lastBlockTimestamp; + } + + public byte[] getLastBlockGenerator() { + return lastBlockGenerator; + } + + public void setLastBlockGenerator(byte[] lastBlockGenerator) { + this.lastBlockGenerator = lastBlockGenerator; + } + public Long getLastMisbehaved() { return this.lastMisbehaved; } diff --git a/src/main/java/org/qora/network/Handshake.java b/src/main/java/org/qora/network/Handshake.java index a0946156..f5c8f921 100644 --- a/src/main/java/org/qora/network/Handshake.java +++ b/src/main/java/org/qora/network/Handshake.java @@ -152,7 +152,7 @@ public enum Handshake { // Drop other inbound peers with the same ID for (Peer otherPeer : Network.getInstance().getConnectedPeers()) if (!otherPeer.isOutbound() && otherPeer.getPeerId() != null && Arrays.equals(otherPeer.getPeerId(), peer.getPendingPeerId())) - otherPeer.disconnect(); + otherPeer.disconnect("doppelganger"); // Tidy up peer.setVerificationCodes(null, null); @@ -191,13 +191,13 @@ public enum Handshake { Message versionMessage = new VersionMessage(buildTimestamp, versionString); if (!peer.sendMessage(versionMessage)) - peer.disconnect(); + peer.disconnect("failed to send version"); } private static void sendMyId(Peer peer) { Message peerIdMessage = new PeerIdMessage(Network.getInstance().getOurPeerId()); if (!peer.sendMessage(peerIdMessage)) - peer.disconnect(); + peer.disconnect("failed to send peer ID"); } private static void sendProof(Peer peer) { @@ -208,7 +208,7 @@ public enum Handshake { // For incoming connections we only need to send a fake proof message as confirmation Message proofMessage = new ProofMessage(peer.getConnectionTimestamp(), 0, 0); if (!peer.sendMessage(proofMessage)) - peer.disconnect(); + peer.disconnect("failed to send proof"); } } @@ -218,14 +218,14 @@ public enum Handshake { // Send VERIFICATION_CODES to peer Message verificationCodesMessage = new VerificationCodesMessage(peer.getVerificationCodeSent(), peer.getVerificationCodeExpected()); if (!otherOutboundPeer.sendMessage(verificationCodesMessage)) { - peer.disconnect(); // give up with this peer instead + peer.disconnect("failed to send verification codes"); // give up with this peer instead return; } // Send PEER_VERIFY to peer Message peerVerifyMessage = new PeerVerifyMessage(peer.getVerificationCodeSent()); if (!peer.sendMessage(peerVerifyMessage)) - peer.disconnect(); + peer.disconnect("failed to send verification code"); } } diff --git a/src/main/java/org/qora/network/Network.java b/src/main/java/org/qora/network/Network.java index 71e428f6..17612564 100644 --- a/src/main/java/org/qora/network/Network.java +++ b/src/main/java/org/qora/network/Network.java @@ -25,10 +25,12 @@ import java.util.stream.Collectors; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.qora.controller.Controller; +import org.qora.data.block.BlockData; import org.qora.data.network.PeerData; import org.qora.data.transaction.TransactionData; import org.qora.network.message.GetPeersMessage; import org.qora.network.message.HeightMessage; +import org.qora.network.message.HeightV2Message; import org.qora.network.message.Message; import org.qora.network.message.Message.MessageType; import org.qora.network.message.PeerVerifyMessage; @@ -316,6 +318,7 @@ public class Network extends Thread { newPeer = new Peer(peerData); // Update connection attempt info + repository.discardChanges(); peerData.setLastAttempted(NTP.getTime()); repository.getNetworkRepository().save(peerData); repository.saveChanges(); @@ -360,7 +363,7 @@ public class Network extends Thread { return; LOGGER.debug(String.format("Unexpected %s message from %s, expected %s", message.getType().name(), peer, handshakeStatus.expectedMessageType)); - peer.disconnect(); + peer.disconnect("unexpected message"); return; } @@ -369,7 +372,7 @@ public class Network extends Thread { if (newHandshakeStatus == null) { // Handshake failure LOGGER.debug(String.format("Handshake failure with peer %s message %s", peer, message.getType().name())); - peer.disconnect(); + peer.disconnect("handshake failure"); return; } @@ -410,7 +413,7 @@ public class Network extends Thread { case PEER_ID: case PROOF: LOGGER.debug(String.format("Unexpected handshaking message %s from peer %s", message.getType().name(), peer)); - peer.disconnect(); + peer.disconnect("unexpected handshaking message"); return; case PING: @@ -421,7 +424,7 @@ public class Network extends Thread { pongMessage.setId(pingMessage.getId()); if (!peer.sendMessage(pongMessage)) - peer.disconnect(); + peer.disconnect("failed to send ping reply"); break; @@ -471,7 +474,7 @@ public class Network extends Thread { PeerVerifyMessage peerVerifyMessage = new PeerVerifyMessage(peer.getVerificationCodeExpected()); if (!peer.sendMessage(peerVerifyMessage)) { - peer.disconnect(); + peer.disconnect("failed to send verification code"); return; } @@ -481,13 +484,15 @@ public class Network extends Thread { } private void onHandshakeCompleted(Peer peer) { - // Do we need extra handshaking because of peer dopplegangers? + // Do we need extra handshaking because of peer doppelgangers? if (peer.getPendingPeerId() != null) { peer.setHandshakeStatus(Handshake.PEER_VERIFY); peer.getHandshakeStatus().action(peer); return; } + LOGGER.debug(String.format("Handshake completed with peer %s", peer)); + // Make a note that we've successfully completed handshake (and when) peer.getPeerData().setLastConnected(NTP.getTime()); @@ -495,16 +500,16 @@ public class Network extends Thread { peer.startPings(); // Send our height - Message heightMessage = new HeightMessage(Controller.getInstance().getChainHeight()); + Message heightMessage = buildHeightMessage(peer, Controller.getInstance().getChainTip()); if (!peer.sendMessage(heightMessage)) { - peer.disconnect(); + peer.disconnect("failed to send height/info"); return; } // Send our peers list Message peersMessage = this.buildPeersMessage(peer); if (!peer.sendMessage(peersMessage)) - peer.disconnect(); + peer.disconnect("failed to send peers list"); // Send our unconfirmed transactions try (final Repository repository = RepositoryManager.getRepository()) { @@ -513,7 +518,7 @@ public class Network extends Thread { for (TransactionData transactionData : transactions) { Message transactionMessage = new TransactionMessage(transactionData); if (!peer.sendMessage(transactionMessage)) { - peer.disconnect(); + peer.disconnect("failed to send unconfirmed transaction"); return; } } @@ -524,7 +529,7 @@ public class Network extends Thread { // Request their peers list Message getPeersMessage = new GetPeersMessage(); if (!peer.sendMessage(getPeersMessage)) - peer.disconnect(); + peer.disconnect("failed to request peers list"); } /** Returns PEERS message made from peers we've connected to recently, and this node's details */ @@ -588,6 +593,16 @@ public class Network extends Thread { } } + public Message buildHeightMessage(Peer peer, BlockData blockData) { + if (peer.getVersion() < 2) { + // Legacy height message + return new HeightMessage(blockData.getHeight()); + } + + // HEIGHT_V2 contains way more useful info + return new HeightV2Message(blockData.getHeight(), blockData.getSignature(), blockData.getTimestamp(), blockData.getGeneratorPublicKey()); + } + // Network-wide calls /** Returns list of connected peers that have completed handshaking. */ @@ -712,7 +727,7 @@ public class Network extends Thread { public void run() { for (Peer peer : targetPeers) if (!peer.sendMessage(peerMessage.apply(peer))) - peer.disconnect(); + peer.disconnect("failed to broadcast message"); } } diff --git a/src/main/java/org/qora/network/Peer.java b/src/main/java/org/qora/network/Peer.java index faa4dc22..81554ac6 100644 --- a/src/main/java/org/qora/network/Peer.java +++ b/src/main/java/org/qora/network/Peer.java @@ -12,6 +12,7 @@ import java.security.SecureRandom; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executors; @@ -23,6 +24,7 @@ import org.apache.logging.log4j.Logger; import org.qora.controller.Controller; import org.qora.data.network.PeerData; import org.qora.network.message.Message; +import org.qora.network.message.Message.MessageException; import org.qora.network.message.Message.MessageType; import org.qora.settings.Settings; import org.qora.network.message.PingMessage; @@ -236,10 +238,14 @@ public class Peer implements Runnable { Network.getInstance().onMessage(this, message); } } + } catch (MessageException e) { + LOGGER.debug(String.format("%s, from peer %s", e.getMessage(), this)); + this.disconnect(e.getMessage()); + } catch (SocketTimeoutException e) { + this.disconnect("timeout"); } catch (IOException e) { - // Fall-through + this.disconnect("I/O error"); } finally { - this.disconnect(); Thread.currentThread().setName("disconnected peer"); } } @@ -262,6 +268,8 @@ public class Peer implements Runnable { this.out.write(message.toBytes()); this.out.flush(); } + } catch (MessageException e) { + LOGGER.warn(String.format("Failed to send %s message with ID %d to peer %s: %s", message.getType().name(), message.getId(), this, e.getMessage())); } catch (IOException e) { // Send failure return false; @@ -329,23 +337,24 @@ public class Peer implements Runnable { long after = System.currentTimeMillis(); if (message == null || message.getType() != MessageType.PING) - peer.disconnect(); + peer.disconnect("no ping received"); peer.setLastPing(after - before); } } - ; - this.executor.scheduleWithFixedDelay(new Pinger(this), 0, PING_INTERVAL, TimeUnit.MILLISECONDS); + Random random = new Random(); + long initialDelay = random.nextInt(PING_INTERVAL); + this.executor.scheduleWithFixedDelay(new Pinger(this), initialDelay, PING_INTERVAL, TimeUnit.MILLISECONDS); } - public void disconnect() { + public void disconnect(String reason) { // Shut down pinger this.executor.shutdownNow(); // Close socket if (!this.socket.isClosed()) { - LOGGER.debug(String.format("Disconnecting peer %s", this)); + LOGGER.debug(String.format("Disconnecting peer %s: %s", this, reason)); try { this.socket.close(); diff --git a/src/main/java/org/qora/network/message/HeightV2Message.java b/src/main/java/org/qora/network/message/HeightV2Message.java new file mode 100644 index 00000000..70df83bd --- /dev/null +++ b/src/main/java/org/qora/network/message/HeightV2Message.java @@ -0,0 +1,83 @@ +package org.qora.network.message; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; + +import org.qora.transform.Transformer; +import org.qora.transform.block.BlockTransformer; + +import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; + +public class HeightV2Message extends Message { + + private int height; + private byte[] signature; + private long timestamp; + private byte[] generator; + + public HeightV2Message(int height, byte[] signature, long timestamp, byte[] generator) { + this(-1, height, signature, timestamp, generator); + } + + private HeightV2Message(int id, int height, byte[] signature, long timestamp, byte[] generator) { + super(id, MessageType.HEIGHT_V2); + + this.height = height; + this.signature = signature; + this.timestamp = timestamp; + this.generator = generator; + } + + public int getHeight() { + return this.height; + } + + public byte[] getSignature() { + return this.signature; + } + + public long getTimestamp() { + return this.timestamp; + } + + public byte[] getGenerator() { + return this.generator; + } + + public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException { + int height = bytes.getInt(); + + byte[] signature = new byte[BlockTransformer.BLOCK_SIGNATURE_LENGTH]; + bytes.get(signature); + + long timestamp = bytes.getLong(); + + byte[] generator = new byte[Transformer.PUBLIC_KEY_LENGTH]; + bytes.get(generator); + + return new HeightV2Message(id, height, signature, timestamp, generator); + } + + @Override + protected byte[] toData() { + try { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + + bytes.write(Ints.toByteArray(this.height)); + + bytes.write(this.signature); + + bytes.write(Longs.toByteArray(this.timestamp)); + + bytes.write(this.generator); + + return bytes.toByteArray(); + } catch (IOException e) { + return null; + } + } + +} diff --git a/src/main/java/org/qora/network/message/Message.java b/src/main/java/org/qora/network/message/Message.java index 14679e93..2c9f50db 100644 --- a/src/main/java/org/qora/network/message/Message.java +++ b/src/main/java/org/qora/network/message/Message.java @@ -27,6 +27,24 @@ public abstract class Message { private static final int MAX_DATA_SIZE = 1024 * 1024; // 1MB + @SuppressWarnings("serial") + public static class MessageException extends Exception { + public MessageException() { + } + + public MessageException(String message) { + super(message); + } + + public MessageException(String message, Throwable cause) { + super(message, cause); + } + + public MessageException(Throwable cause) { + super(cause); + } + } + public enum MessageType { GET_PEERS(1), PEERS(2), @@ -45,7 +63,8 @@ public abstract class Message { BLOCK_SUMMARIES(15), GET_SIGNATURES_V2(16), PEER_VERIFY(17), - VERIFICATION_CODES(18); + VERIFICATION_CODES(18), + HEIGHT_V2(19); public final int value; public final Method fromByteBuffer; @@ -119,7 +138,7 @@ public abstract class Message { return this.type; } - public static Message fromStream(DataInputStream in) throws SocketTimeoutException { + public static Message fromStream(DataInputStream in) throws MessageException, IOException { try { // Read only enough bytes to cover Message "magic" preamble byte[] messageMagic = new byte[MAGIC_LENGTH]; @@ -127,13 +146,13 @@ public abstract class Message { if (!Arrays.equals(messageMagic, Controller.getInstance().getMessageMagic())) // Didn't receive correct Message "magic" - return null; + throw new MessageException("Received incorrect message 'magic'"); int typeValue = in.readInt(); MessageType messageType = MessageType.valueOf(typeValue); if (messageType == null) // Unrecognised message type - return null; + throw new MessageException(String.format("Received unknown message type [%d]", typeValue)); // Find supporting object @@ -144,14 +163,14 @@ public abstract class Message { if (id <= 0) // Invalid ID - return null; + throw new MessageException("Invalid negative ID"); } int dataSize = in.readInt(); if (dataSize > MAX_DATA_SIZE) // Too large - return null; + throw new MessageException(String.format("Declared data length %d larger than max allowed %d", dataSize, MAX_DATA_SIZE)); byte[] data = null; if (dataSize > 0) { @@ -164,14 +183,14 @@ public abstract class Message { // Test checksum byte[] actualChecksum = generateChecksum(data); if (!Arrays.equals(expectedChecksum, actualChecksum)) - return null; + throw new MessageException("Message checksum incorrect"); } return messageType.fromBytes(id, data); } catch (SocketTimeoutException e) { throw e; } catch (IOException e) { - return null; + throw e; } } @@ -179,7 +198,7 @@ public abstract class Message { return Arrays.copyOfRange(Crypto.digest(data), 0, CHECKSUM_LENGTH); } - public byte[] toBytes() { + public byte[] toBytes() throws MessageException { try { ByteArrayOutputStream bytes = new ByteArrayOutputStream(); @@ -198,7 +217,7 @@ public abstract class Message { byte[] data = this.toData(); if (data == null) - return null; + throw new MessageException("Missing data payload"); bytes.write(Ints.toByteArray(data.length)); @@ -207,9 +226,12 @@ public abstract class Message { bytes.write(data); } + if (bytes.size() > MAX_DATA_SIZE) + throw new MessageException(String.format("About to send message with length %d larger than allowed %d", bytes.size(), MAX_DATA_SIZE)); + return bytes.toByteArray(); } catch (IOException e) { - return null; + throw new MessageException("Failed to serialize message", e); } } diff --git a/src/main/java/org/qora/repository/hsqldb/HSQLDBDatabaseUpdates.java b/src/main/java/org/qora/repository/hsqldb/HSQLDBDatabaseUpdates.java index a5d82cec..65ed7512 100644 --- a/src/main/java/org/qora/repository/hsqldb/HSQLDBDatabaseUpdates.java +++ b/src/main/java/org/qora/repository/hsqldb/HSQLDBDatabaseUpdates.java @@ -730,6 +730,13 @@ public class HSQLDBDatabaseUpdates { stmt.execute("CREATE INDEX TransactionParticipantsAddressIndex on TransactionParticipants (participant)"); break; + case 49: + // Additional peer information + stmt.execute("ALTER TABLE Peers ADD COLUMN last_block_signature BlockSignature BEFORE last_misbehaved"); + stmt.execute("ALTER TABLE Peers ADD COLUMN last_block_timestamp TIMESTAMP WITH TIME ZONE BEFORE last_misbehaved"); + stmt.execute("ALTER TABLE Peers ADD COLUMN last_block_generator QoraPublicKey BEFORE last_misbehaved"); + break; + default: // nothing to do return false; diff --git a/src/main/java/org/qora/repository/hsqldb/HSQLDBNetworkRepository.java b/src/main/java/org/qora/repository/hsqldb/HSQLDBNetworkRepository.java index 48738aef..4a75541e 100644 --- a/src/main/java/org/qora/repository/hsqldb/HSQLDBNetworkRepository.java +++ b/src/main/java/org/qora/repository/hsqldb/HSQLDBNetworkRepository.java @@ -2,9 +2,7 @@ package org.qora.repository.hsqldb; import java.sql.ResultSet; import java.sql.SQLException; -import java.sql.Timestamp; import java.util.ArrayList; -import java.util.Calendar; import java.util.List; import org.qora.data.network.PeerData; @@ -22,9 +20,11 @@ public class HSQLDBNetworkRepository implements NetworkRepository { @Override public List getAllPeers() throws DataException { + String sql = "SELECT address, last_connected, last_attempted, last_height, last_block_signature, last_block_timestamp, last_block_generator, last_misbehaved FROM Peers"; + List peers = new ArrayList<>(); - try (ResultSet resultSet = this.repository.checkedExecute("SELECT address, last_connected, last_attempted, last_height, last_misbehaved FROM Peers")) { + try (ResultSet resultSet = this.repository.checkedExecute(sql)) { if (resultSet == null) return peers; @@ -33,20 +33,23 @@ public class HSQLDBNetworkRepository implements NetworkRepository { String address = resultSet.getString(1); PeerAddress peerAddress = PeerAddress.fromString(address); - Timestamp lastConnectedTimestamp = resultSet.getTimestamp(2, Calendar.getInstance(HSQLDBRepository.UTC)); - Long lastConnected = resultSet.wasNull() ? null : lastConnectedTimestamp.getTime(); + Long lastConnected = HSQLDBRepository.getZonedTimestampMilli(resultSet, 2); - Timestamp lastAttemptedTimestamp = resultSet.getTimestamp(3, Calendar.getInstance(HSQLDBRepository.UTC)); - Long lastAttempted = resultSet.wasNull() ? null : lastAttemptedTimestamp.getTime(); + Long lastAttempted = HSQLDBRepository.getZonedTimestampMilli(resultSet, 3); Integer lastHeight = resultSet.getInt(4); if (resultSet.wasNull()) lastHeight = null; - Timestamp lastMisbehavedTimestamp = resultSet.getTimestamp(5, Calendar.getInstance(HSQLDBRepository.UTC)); - Long lastMisbehaved = resultSet.wasNull() ? null : lastMisbehavedTimestamp.getTime(); + byte[] lastBlockSignature = resultSet.getBytes(5); + + Long lastBlockTimestamp = HSQLDBRepository.getZonedTimestampMilli(resultSet, 6); + + byte[] lastBlockGenerator = resultSet.getBytes(7); - peers.add(new PeerData(peerAddress, lastAttempted, lastConnected, lastHeight, lastMisbehaved)); + Long lastMisbehaved = HSQLDBRepository.getZonedTimestampMilli(resultSet, 8); + + peers.add(new PeerData(peerAddress, lastAttempted, lastConnected, lastHeight, lastBlockSignature, lastBlockTimestamp, lastBlockGenerator, lastMisbehaved)); } while (resultSet.next()); return peers; @@ -61,12 +64,12 @@ public class HSQLDBNetworkRepository implements NetworkRepository { public void save(PeerData peerData) throws DataException { HSQLDBSaver saveHelper = new HSQLDBSaver("Peers"); - Timestamp lastConnected = peerData.getLastConnected() == null ? null : new Timestamp(peerData.getLastConnected()); - Timestamp lastAttempted = peerData.getLastAttempted() == null ? null : new Timestamp(peerData.getLastAttempted()); - Timestamp lastMisbehaved = peerData.getLastMisbehaved() == null ? null : new Timestamp(peerData.getLastMisbehaved()); - - saveHelper.bind("address", peerData.getAddress().toString()).bind("last_connected", lastConnected).bind("last_attempted", lastAttempted) - .bind("last_height", peerData.getLastHeight()).bind("last_misbehaved", lastMisbehaved); + saveHelper.bind("address", peerData.getAddress().toString()).bind("last_connected", HSQLDBRepository.toOffsetDateTime(peerData.getLastConnected())) + .bind("last_attempted", HSQLDBRepository.toOffsetDateTime(peerData.getLastAttempted())) + .bind("last_height", peerData.getLastHeight()).bind("last_block_signature", peerData.getLastBlockSignature()) + .bind("last_block_timestamp", HSQLDBRepository.toOffsetDateTime(peerData.getLastBlockTimestamp())) + .bind("last_block_generator", peerData.getLastBlockGenerator()) + .bind("last_misbehaved", HSQLDBRepository.toOffsetDateTime(peerData.getLastMisbehaved())); try { saveHelper.execute(this.repository); @@ -92,4 +95,5 @@ public class HSQLDBNetworkRepository implements NetworkRepository { throw new DataException("Unable to delete peers from repository", e); } } + }