diff --git a/log4j2.properties b/log4j2.properties index 9e9af414..3188c2ff 100644 --- a/log4j2.properties +++ b/log4j2.properties @@ -26,6 +26,10 @@ logger.txSearch.level = trace logger.blockgen.name = org.qora.block.BlockGenerator logger.blockgen.level = trace +# Debug synchronization +logger.sync.name = org.qora.controller.Synchronizer +logger.sync.level = trace + # Debug networking logger.network.name = org.qora.network.Network logger.network.level = trace diff --git a/pom.xml b/pom.xml index ef052c39..8dd67a2f 100644 --- a/pom.xml +++ b/pom.xml @@ -385,6 +385,12 @@ org.glassfish.jersey.inject jersey-hk2 ${jersey.version} + + + javax.inject + javax.inject + + org.glassfish.jersey.media @@ -406,6 +412,12 @@ io.swagger.core.v3 swagger-jaxrs2-servlet-initializer ${swagger-api.version} + + + io.swagger.core.v3 + swagger-integration + + org.webjars diff --git a/src/main/java/org/qora/at/BlockchainAPI.java b/src/main/java/org/qora/at/BlockchainAPI.java index ec58b900..f1c2a0dc 100644 --- a/src/main/java/org/qora/at/BlockchainAPI.java +++ b/src/main/java/org/qora/at/BlockchainAPI.java @@ -102,12 +102,12 @@ public enum BlockchainAPI { BTC(1) { @Override public void putTransactionFromRecipientAfterTimestampInA(String recipient, Timestamp timestamp, MachineState state) { - // TODO + // TODO BTC transaction support for ATv2 } @Override public long getAmountFromTransactionInA(Timestamp timestamp, MachineState state) { - // TODO + // TODO BTC transaction support for ATv2 return 0; } }; diff --git a/src/main/java/org/qora/block/Block.java b/src/main/java/org/qora/block/Block.java index f08d67d6..b5f8ecc4 100644 --- a/src/main/java/org/qora/block/Block.java +++ b/src/main/java/org/qora/block/Block.java @@ -82,6 +82,7 @@ public class Block { TRANSACTION_TIMESTAMP_INVALID(51), TRANSACTION_INVALID(52), TRANSACTION_PROCESSING_FAILED(53), + TRANSACTION_ALREADY_PROCESSED(54), AT_STATES_MISMATCH(61); public final int value; @@ -123,6 +124,7 @@ public class Block { // Other useful constants /** Maximum size of block in bytes */ + // TODO push this out to blockchain config file public static final int MAX_BLOCK_BYTES = 1048576; // Constructors @@ -737,7 +739,7 @@ public class Block { return ValidationResult.TIMESTAMP_MS_INCORRECT; // Too early to forge block? - // XXX DISABLED + // XXX DISABLED as it doesn't work - but why? // if (this.blockData.getTimestamp() < parentBlock.getBlockData().getTimestamp() + BlockChain.getInstance().getMinBlockTime()) // return ValidationResult.TIMESTAMP_TOO_SOON; @@ -751,6 +753,7 @@ public class Block { if (this.blockData.getGeneratingBalance().compareTo(parentBlock.calcNextBlockGeneratingBalance()) != 0) return ValidationResult.GENERATING_BALANCE_INCORRECT; + // XXX Block.isValid generator check relaxation?? blockchain config option? // After maximum block period, then generator checks are relaxed if (this.blockData.getTimestamp() < parentBlock.getBlockData().getTimestamp() + BlockChain.getInstance().getMaxBlockTime()) { // Check generator is allowed to forge this block @@ -814,6 +817,9 @@ public class Block { // Check transactions try { + // Create repository savepoint here so we can rollback to it after testing transactions + repository.setSavepoint(); + for (Transaction transaction : this.getTransactions()) { // GenesisTransactions are not allowed (GenesisBlock overrides isValid() to allow them) if (transaction instanceof GenesisTransaction) @@ -824,6 +830,10 @@ public class Block { || transaction.getDeadline() <= this.blockData.getTimestamp()) return ValidationResult.TRANSACTION_TIMESTAMP_INVALID; + // Check transaction isn't already included in a block + if (this.repository.getTransactionRepository().isConfirmed(transaction.getTransactionData().getSignature())) + return ValidationResult.TRANSACTION_ALREADY_PROCESSED; + // Check transaction is even valid // NOTE: in Gen1 there was an extra block height passed to DeployATTransaction.isValid Transaction.ValidationResult validationResult = transaction.isValid(); @@ -843,15 +853,15 @@ public class Block { } } } catch (DataException e) { - return ValidationResult.TRANSACTION_TIMESTAMP_INVALID; + // XXX why was this TRANSACTION_TIMESTAMP_INVALID? + return ValidationResult.TRANSACTION_INVALID; } finally { - // Discard changes to repository made by test-processing transactions above + // Rollback repository changes made by test-processing transactions above try { - this.repository.discardChanges(); + this.repository.rollbackToSavepoint(); } catch (DataException e) { /* - * discardChanges failure most likely due to prior DataException, so catch discardChanges' DataException and ignore. Prior DataException - * propagates to caller. + * Rollback failure most likely due to prior DataException, so discard this DataException. Prior DataException propagates to caller. */ } } @@ -916,7 +926,8 @@ public class Block { this.blockData.setTransactionCount(this.blockData.getTransactionCount() + 1); // We've added transactions, so recalculate transactions signature - calcTransactionsSignature(); + // XXX surely this breaks Block.isSignatureValid which is called before we are? + // calcTransactionsSignature(); } /** @@ -976,9 +987,7 @@ public class Block { } /** - * Removes block from blockchain undoing transactions. - *

- * Note: it is up to the caller to re-add any of the block's transactions back to the unconfirmed transactions pile. + * Removes block from blockchain undoing transactions and adding them to unconfirmed pile. * * @throws DataException */ @@ -990,10 +999,14 @@ public class Block { Transaction transaction = transactions.get(sequence); transaction.orphan(); + // Unlink transaction from this block BlockTransactionData blockTransactionData = new BlockTransactionData(this.getSignature(), sequence, transaction.getTransactionData().getSignature()); this.repository.getBlockRepository().delete(blockTransactionData); + // Add to unconfirmed pile + this.repository.getTransactionRepository().unconfirmTransaction(transaction.getTransactionData()); + this.repository.getTransactionRepository().deleteParticipants(transaction.getTransactionData()); } diff --git a/src/main/java/org/qora/block/BlockGenerator.java b/src/main/java/org/qora/block/BlockGenerator.java index 88dac306..00871c35 100644 --- a/src/main/java/org/qora/block/BlockGenerator.java +++ b/src/main/java/org/qora/block/BlockGenerator.java @@ -2,6 +2,7 @@ package org.qora.block; import java.util.Arrays; import java.util.List; +import java.util.concurrent.locks.Lock; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -78,40 +79,47 @@ public class BlockGenerator extends Thread { if (newBlock == null) newBlock = new Block(repository, previousBlock.getBlockData(), generator); - // Is new block valid yet? (Before adding unconfirmed transactions) - if (newBlock.isValid() == ValidationResult.OK) { - // Add unconfirmed transactions - addUnconfirmedTransactions(repository, newBlock); + // Make sure we're the only thread modifying the blockchain + Lock blockchainLock = Controller.getInstance().getBlockchainLock(); + if (blockchainLock.tryLock()) + try { + // Is new block valid yet? (Before adding unconfirmed transactions) + if (newBlock.isValid() == ValidationResult.OK) { + // Add unconfirmed transactions + addUnconfirmedTransactions(repository, newBlock); - // Sign to create block's signature - newBlock.sign(); + // Sign to create block's signature + newBlock.sign(); - // If newBlock is still valid then we can use it - ValidationResult validationResult = newBlock.isValid(); - if (validationResult == ValidationResult.OK) { - // Add to blockchain - something else will notice and broadcast new block to network - try { - newBlock.process(); - LOGGER.info("Generated new block: " + newBlock.getBlockData().getHeight()); - repository.saveChanges(); + // If newBlock is still valid then we can use it + ValidationResult validationResult = newBlock.isValid(); + if (validationResult == ValidationResult.OK) { + // Add to blockchain - something else will notice and broadcast new block to network + try { + newBlock.process(); + LOGGER.info("Generated new block: " + newBlock.getBlockData().getHeight()); + repository.saveChanges(); - // Notify controller - Controller.getInstance().onGeneratedBlock(newBlock.getBlockData()); - } catch (DataException e) { - // Unable to process block - report and discard - LOGGER.error("Unable to process newly generated block?", e); - newBlock = null; + // Notify controller + Controller.getInstance().onGeneratedBlock(newBlock.getBlockData()); + } catch (DataException e) { + // Unable to process block - report and discard + LOGGER.error("Unable to process newly generated block?", e); + newBlock = null; + } + } else { + // No longer valid? Report and discard + LOGGER.error("Valid, generated block now invalid '" + validationResult.name() + "' after adding unconfirmed transactions?"); + newBlock = null; + } } - } else { - // No longer valid? Report and discard - LOGGER.error("Valid, generated block now invalid '" + validationResult.name() + "' after adding unconfirmed transactions?"); - newBlock = null; + } finally { + blockchainLock.unlock(); } - } // Sleep for a while try { - repository.discardChanges(); // Free transactional locks, if any + repository.discardChanges(); // Free repository locks, if any Thread.sleep(1000); // No point sleeping less than this as block timestamp millisecond values must be the same } catch (InterruptedException e) { // We've been interrupted - time to exit diff --git a/src/main/java/org/qora/blockgenerator.java b/src/main/java/org/qora/blockgenerator.java index dba5ac3f..0e8d8edb 100644 --- a/src/main/java/org/qora/blockgenerator.java +++ b/src/main/java/org/qora/blockgenerator.java @@ -58,14 +58,12 @@ public class blockgenerator { try { blockGenerator.join(); } catch (InterruptedException e) { - // TODO Auto-generated catch block e.printStackTrace(); } try { RepositoryManager.closeRepositoryFactory(); } catch (DataException e) { - // TODO Auto-generated catch block e.printStackTrace(); } } diff --git a/src/main/java/org/qora/controller/Controller.java b/src/main/java/org/qora/controller/Controller.java index 61801cf2..7e4aa7b8 100644 --- a/src/main/java/org/qora/controller/Controller.java +++ b/src/main/java/org/qora/controller/Controller.java @@ -2,24 +2,35 @@ package org.qora.controller; import java.io.IOException; import java.io.InputStream; +import java.security.SecureRandom; import java.security.Security; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; import java.util.Properties; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.bouncycastle.jce.provider.BouncyCastleProvider; import org.bouncycastle.jsse.provider.BouncyCastleJsseProvider; import org.qora.api.ApiService; +import org.qora.block.Block; import org.qora.block.BlockChain; import org.qora.block.BlockGenerator; import org.qora.data.block.BlockData; +import org.qora.data.network.PeerData; import org.qora.network.Network; import org.qora.network.Peer; +import org.qora.network.message.BlockMessage; +import org.qora.network.message.GetBlockMessage; +import org.qora.network.message.GetSignaturesMessage; import org.qora.network.message.HeightMessage; import org.qora.network.message.Message; +import org.qora.network.message.SignaturesMessage; import org.qora.repository.DataException; import org.qora.repository.Repository; import org.qora.repository.RepositoryFactory; @@ -27,6 +38,7 @@ import org.qora.repository.RepositoryManager; import org.qora.repository.hsqldb.HSQLDBRepositoryFactory; import org.qora.settings.Settings; import org.qora.utils.Base58; +import org.qora.utils.NTP; public class Controller extends Thread { @@ -47,6 +59,9 @@ public class Controller extends Thread { private final String buildVersion; private final long buildTimestamp; + /** Lock for only allowing one blockchain-modifying codepath at a time. e.g. synchronization or newly generated block. */ + private final Lock blockchainLock; + private Controller() { Properties properties = new Properties(); try (InputStream in = ClassLoader.getSystemResourceAsStream("build.properties")) { @@ -66,6 +81,8 @@ public class Controller extends Thread { throw new RuntimeException("Can't read build.version from build.properties resource"); this.buildVersion = VERSION_PREFIX + buildVersion; + + blockchainLock = new ReentrantLock(); } public static Controller getInstance() { @@ -75,6 +92,38 @@ public class Controller extends Thread { return instance; } + // Getters / setters + + public byte[] getMessageMagic() { + return new byte[] { + 0x12, 0x34, 0x56, 0x78 + }; + } + + public long getBuildTimestamp() { + return this.buildTimestamp; + } + + public String getVersionString() { + return this.buildVersion; + } + + /** Returns current blockchain height, or 0 if there's a repository issue */ + public int getChainHeight() { + try (final Repository repository = RepositoryManager.getRepository()) { + return repository.getBlockRepository().getBlockchainHeight(); + } catch (DataException e) { + LOGGER.error("Repository issue when fetching blockchain height", e); + return 0; + } + } + + public Lock getBlockchainLock() { + return this.blockchainLock; + } + + // Entry point + public static void main(String args[]) { LOGGER.info("Starting up..."); @@ -101,10 +150,10 @@ public class Controller extends Thread { System.exit(2); } - // XXX work to be done here! - if (args.length == 0) { + // XXX extract private key needed for block gen + if (args.length == 0 || !args[0].equals("NO-BLOCK-GEN")) { LOGGER.info("Starting block generator"); - byte[] privateKey = Base58.decode("A9MNsATgQgruBUjxy2rjWY36Yf19uRioKZbiLFT2P7c6"); + byte[] privateKey = Base58.decode(args.length > 0 ? args[0] : "A9MNsATgQgruBUjxy2rjWY36Yf19uRioKZbiLFT2P7c6"); blockGenerator = new BlockGenerator(privateKey); blockGenerator.start(); } @@ -130,6 +179,8 @@ public class Controller extends Thread { Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { + Thread.currentThread().setName("Shutdown hook"); + Controller.getInstance().shutdown(); } }); @@ -138,16 +189,17 @@ public class Controller extends Thread { Controller.getInstance().start(); } + // Main thread + @Override public void run() { Thread.currentThread().setName("Controller"); try { - while (true) { + while (!isStopping) { Thread.sleep(1000); - // Query random connections for their blockchain status - // If height > ours then potentially synchronize + potentiallySynchronize(); // Query random connections for unconfirmed transactions } @@ -157,6 +209,38 @@ public class Controller extends Thread { } } + private void potentiallySynchronize() { + int ourHeight = getChainHeight(); + if (ourHeight == 0) + return; + + // If we have enough peers, potentially synchronize + List peers = Network.getInstance().getHandshakeCompletedPeers(); + if (peers.size() >= Settings.getInstance().getMinPeers()) { + peers.removeIf(peer -> peer.getPeerData().getLastHeight() <= ourHeight); + + if (!peers.isEmpty()) { + // Pick random peer to sync with + int index = new SecureRandom().nextInt(peers.size()); + Peer peer = peers.get(index); + + if (!Synchronizer.getInstance().synchronize(peer)) { + // Failure so don't use this peer again for a while + try (final Repository repository = RepositoryManager.getRepository()) { + PeerData peerData = peer.getPeerData(); + peerData.setLastMisbehaved(NTP.getTime()); + repository.getNetworkRepository().save(peerData); + repository.saveChanges(); + } catch (DataException e) { + LOGGER.warn("Repository issue while updating peer synchronization info", e); + } + } + } + } + } + + // Shutdown + public void shutdown() { synchronized (shutdownLock) { if (!isStopping) { @@ -164,6 +248,11 @@ public class Controller extends Thread { LOGGER.info("Shutting down controller"); this.interrupt(); + try { + this.join(); + } catch (InterruptedException e) { + // We were interrupted while waiting for thread to join + } LOGGER.info("Shutting down networking"); Network.getInstance().shutdown(); @@ -177,7 +266,7 @@ public class Controller extends Thread { try { blockGenerator.join(); } catch (InterruptedException e) { - // We were interrupted while waiting for thread to 'join' + // We were interrupted while waiting for thread to join } } @@ -198,39 +287,16 @@ public class Controller extends Thread { System.exit(0); } - public byte[] getMessageMagic() { - return new byte[] { - 0x12, 0x34, 0x56, 0x78 - }; - } - - public long getBuildTimestamp() { - return this.buildTimestamp; - } - - public String getVersionString() { - return this.buildVersion; - } - - public int getChainHeight() { - try (final Repository repository = RepositoryManager.getRepository()) { - return repository.getBlockRepository().getBlockchainHeight(); - } catch (DataException e) { - LOGGER.error("Repository issue when fetching blockchain height", e); - return 0; - } - } - // Callbacks for/from network public void doNetworkBroadcast() { Network network = Network.getInstance(); // Send our known peers - network.broadcast(network.buildPeersMessage()); + network.broadcast(peer -> network.buildPeersMessage(peer)); // Send our current height - network.broadcast(new HeightMessage(this.getChainHeight())); + network.broadcast(peer -> new HeightMessage(this.getChainHeight())); } public void onGeneratedBlock(BlockData newBlockData) { @@ -238,24 +304,66 @@ public class Controller extends Thread { // Could even broadcast top two block sigs so that remote peers can see new block references current network-wide last block // Broadcast our new height - Network.getInstance().broadcast(new HeightMessage(newBlockData.getHeight())); + Network.getInstance().broadcast(peer -> new HeightMessage(newBlockData.getHeight())); } public void onNetworkMessage(Peer peer, Message message) { - LOGGER.trace(String.format("Processing %s message from %s", message.getType().name(), peer.getRemoteSocketAddress())); + LOGGER.trace(String.format("Processing %s message from %s", message.getType().name(), peer)); switch (message.getType()) { case HEIGHT: HeightMessage heightMessage = (HeightMessage) message; - // If we connected to peer, then update our record of peer's height - if (peer.isOutbound()) - peer.getPeerData().setLastHeight(heightMessage.getHeight()); + // Update our record of peer's height + peer.getPeerData().setLastHeight(heightMessage.getHeight()); - // XXX we should instead test incoming block sigs to see if we have them, and if not do sync - // Is peer's blockchain longer than ours? - if (heightMessage.getHeight() > getChainHeight()) - Synchronizer.getInstance().synchronize(peer); + break; + + case GET_SIGNATURES: + try (final Repository repository = RepositoryManager.getRepository()) { + GetSignaturesMessage getSignaturesMessage = (GetSignaturesMessage) message; + byte[] parentSignature = getSignaturesMessage.getParentSignature(); + + List signatures = new ArrayList<>(); + + do { + BlockData blockData = repository.getBlockRepository().fromReference(parentSignature); + + if (blockData == null) + break; + + parentSignature = blockData.getSignature(); + signatures.add(parentSignature); + } while (signatures.size() < 500); + + Message signaturesMessage = new SignaturesMessage(signatures); + signaturesMessage.setId(message.getId()); + if (!peer.sendMessage(signaturesMessage)) + peer.disconnect(); + } catch (DataException e) { + LOGGER.error(String.format("Repository issue while responding to %s from peer %s", message.getType().name(), peer), e); + } + break; + + case GET_BLOCK: + try (final Repository repository = RepositoryManager.getRepository()) { + GetBlockMessage getBlockMessage = (GetBlockMessage) message; + byte[] signature = getBlockMessage.getSignature(); + + BlockData blockData = repository.getBlockRepository().fromSignature(signature); + if (blockData == null) + // No response at all??? + break; + + Block block = new Block(repository, blockData); + + Message blockMessage = new BlockMessage(block); + blockMessage.setId(message.getId()); + if (!peer.sendMessage(blockMessage)) + peer.disconnect(); + } catch (DataException e) { + LOGGER.error(String.format("Repository issue while responding to %s from peer %s", message.getType().name(), peer), e); + } break; default: diff --git a/src/main/java/org/qora/controller/Synchronizer.java b/src/main/java/org/qora/controller/Synchronizer.java index b42e0f56..b82c48df 100644 --- a/src/main/java/org/qora/controller/Synchronizer.java +++ b/src/main/java/org/qora/controller/Synchronizer.java @@ -1,15 +1,40 @@ package org.qora.controller; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.locks.Lock; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.qora.block.Block; +import org.qora.block.Block.ValidationResult; +import org.qora.block.GenesisBlock; +import org.qora.data.block.BlockData; import org.qora.network.Peer; +import org.qora.network.message.BlockMessage; +import org.qora.network.message.GetBlockMessage; +import org.qora.network.message.GetSignaturesMessage; +import org.qora.network.message.Message; +import org.qora.network.message.Message.MessageType; +import org.qora.network.message.SignaturesMessage; +import org.qora.repository.DataException; +import org.qora.repository.Repository; +import org.qora.repository.RepositoryManager; public class Synchronizer { private static final Logger LOGGER = LogManager.getLogger(Synchronizer.class); + private static final int INITIAL_BLOCK_STEP = 8; + private static final int MAXIMUM_BLOCK_STEP = 500; + private static final int MAXIMUM_HEIGHT_DELTA = 2000; // XXX move to blockchain config? + private static Synchronizer instance; + private Repository repository; + private int ourHeight; + private Synchronizer() { } @@ -20,35 +45,211 @@ public class Synchronizer { return instance; } - public void synchronize(Peer peer) { - // If we're already synchronizing with another peer then return + public boolean synchronize(Peer peer) { + // Make sure we're the only thread modifying the blockchain + // If we're already synchronizing with another peer then this will also return fast + Lock blockchainLock = Controller.getInstance().getBlockchainLock(); + if (blockchainLock.tryLock()) + try { + try (final Repository repository = RepositoryManager.getRepository()) { + try { + this.repository = repository; + this.ourHeight = this.repository.getBlockRepository().getBlockchainHeight(); + int peerHeight = peer.getPeerData().getLastHeight(); - LOGGER.info(String.format("Synchronizing with peer %s", peer.getRemoteSocketAddress())); + LOGGER.info(String.format("Synchronizing with peer %s from height %d to height %d", peer, this.ourHeight, peerHeight)); - // Peer has different latest block sig to us + List signatures = findSignaturesFromCommonBlock(peer); + if (signatures == null) { + LOGGER.info(String.format("Failure to find common block with peer %s", peer)); + return false; + } - // find common block? + // First signature is common block + BlockData commonBlockData = this.repository.getBlockRepository().fromSignature(signatures.get(0)); + signatures.remove(0); - // if common block is too far behind us then we're on massively different forks so give up, maybe human invention required to download desired fork + // If common block is too far behind us then we're on massively different forks so give up. + int minHeight = ourHeight - MAXIMUM_HEIGHT_DELTA; + if (commonBlockData.getHeight() < minHeight) { + LOGGER.info(String.format("Blockchain too divergent with peer %s", peer)); + return false; + } - // unwind to common block (unless common block is our latest block) + if (this.ourHeight > commonBlockData.getHeight()) { + // Unwind to common block (unless common block is our latest block) + LOGGER.debug(String.format("Orphaning blocks back to height %d", commonBlockData.getHeight())); - // apply some newer blocks from peer + while (this.ourHeight > commonBlockData.getHeight()) { + BlockData blockData = repository.getBlockRepository().fromHeight(this.ourHeight); + Block block = new Block(repository, blockData); + block.orphan(); - // commit + --this.ourHeight; + } - // If our block gen creates a block while we do this - what happens? - // does repository serialization prevent issues? + LOGGER.debug(String.format("Orphaned blocks back to height %d - fetching blocks from peer", commonBlockData.getHeight(), peer)); + } else { + LOGGER.debug(String.format("Fetching new blocks from peer %s", peer)); + } - // blockgen: block 123: pay X from A to B, commit - // sync: block 122 orphaned, replacement blocks 122 through 129 applied, commit + // Fetch, and apply, blocks from peer + byte[] signature = commonBlockData.getSignature(); + while (this.ourHeight < peerHeight) { + // Do we need more signatures? + if (signatures.isEmpty()) { + signatures = this.getBlockSignatures(peer, signature, MAXIMUM_BLOCK_STEP); + if (signatures == null || signatures.isEmpty()) { + LOGGER.info(String.format("Peer %s failed to respond with more block signatures after height %d", peer, this.ourHeight)); + return false; + } + } - // and vice versa? + signature = signatures.get(0); + signatures.remove(0); + ++this.ourHeight; - // sync: block 122 orphaned, replacement blocks 122 through 129 applied, commit - // blockgen: block 123: pay X from A to B, commit + BlockData newBlockData = this.fetchBlockData(peer, signature); - // simply block syncing when generating and vice versa by grabbing a Controller-owned non-blocking mutex? + if (newBlockData == null) { + LOGGER.info(String.format("Peer %s failed to respond with block for height %d", peer, this.ourHeight)); + return false; + } + + Block newBlock = new Block(repository, newBlockData); + + if (!newBlock.isSignatureValid()) { + LOGGER.info(String.format("Peer %s sent block with invalid signature for height %d", peer, this.ourHeight)); + return false; + } + + ValidationResult blockResult = newBlock.isValid(); + if (blockResult != ValidationResult.OK) { + LOGGER.info(String.format("Peer %s sent invalid block for height %d: %s", peer, this.ourHeight, blockResult.name())); + return false; + } + + newBlock.process(); + } + + // Commit + repository.saveChanges(); + LOGGER.info(String.format("Synchronized with peer %s to height %d", peer, this.ourHeight)); + + return true; + } finally { + repository.discardChanges(); + this.repository = null; + } + } + } catch (DataException e) { + LOGGER.error("Repository issue during synchronization with peer", e); + return false; + } finally { + blockchainLock.unlock(); + } + + // Wasn't peer's fault we couldn't sync + return true; + } + + /** + * Returns list of block signatures start with common block with peer. + * + * @param peer + * @return block signatures + * @throws DataException + */ + private List findSignaturesFromCommonBlock(Peer peer) throws DataException { + // Start by asking for a few recent block hashes as this will cover a majority of reorgs + // Failing that, back off exponentially + int step = INITIAL_BLOCK_STEP; + + List blockSignatures = null; + int testHeight = ourHeight - step; + byte[] testSignature = null; + + while (testHeight > 1) { + // Fetch our block signature at this height + BlockData testBlockData = this.repository.getBlockRepository().fromHeight(testHeight); + if (testBlockData == null) { + // Not found? But we've locked the blockchain and height is below blockchain's tip! + LOGGER.error("Failed to get block at height lower than blockchain tip during synchronization?"); + return null; + } + + testSignature = testBlockData.getSignature(); + + // Ask for block signatures since test block's signature + LOGGER.trace(String.format("Requesting %d signature%s after our height %d", step, (step != 1 ? "s": ""), testHeight)); + blockSignatures = this.getBlockSignatures(peer, testSignature, step); + + if (blockSignatures == null) + // No response - give up this time + return null; + + LOGGER.trace(String.format("Received %s signature%s", blockSignatures.size(), (blockSignatures.size() != 1 ? "s" : ""))); + + // Empty list means remote peer is unaware of test signature OR has no new blocks after test signature + if (!blockSignatures.isEmpty()) + // We have entries so we have found a common block + break; + + if (peer.getVersion() >= 2) { + step <<= 1; + } else { + // Old v1 peers are hard-coded to return 500 signatures so we might as well go backward by 500 too + step = 500; + } + step = Math.min(step, MAXIMUM_BLOCK_STEP); + + testHeight -= step; + } + + if (testHeight <= 1) + // Can't go back any further - return Genesis block + return new ArrayList(Arrays.asList(GenesisBlock.getInstance(this.repository).getBlockData().getSignature())); + + // Prepend common block's signature as first block sig + blockSignatures.add(0, testSignature); + + // Work through returned signatures to get closer common block + // Do this by trimming all-but-one leading known signatures + for (int i = blockSignatures.size() - 1; i > 0; --i) { + BlockData blockData = this.repository.getBlockRepository().fromSignature(blockSignatures.get(i)); + + if (blockData != null) { + blockSignatures.subList(0, i).clear(); + break; + } + } + + return blockSignatures; + } + + private List getBlockSignatures(Peer peer, byte[] parentSignature, int countRequested) { + // TODO countRequested is v2+ feature + Message getSignaturesMessage = new GetSignaturesMessage(parentSignature); + + Message message = peer.getResponse(getSignaturesMessage); + if (message == null || message.getType() != MessageType.SIGNATURES) + return null; + + SignaturesMessage signaturesMessage = (SignaturesMessage) message; + + return signaturesMessage.getSignatures(); + } + + private BlockData fetchBlockData(Peer peer, byte[] signature) { + Message getBlockMessage = new GetBlockMessage(signature); + + Message message = peer.getResponse(getBlockMessage); + if (message == null || message.getType() != MessageType.BLOCK) + return null; + + BlockMessage blockMessage = (BlockMessage) message; + + return blockMessage.getBlockData(); } } diff --git a/src/main/java/org/qora/data/network/PeerData.java b/src/main/java/org/qora/data/network/PeerData.java index 48b5288b..f120eacd 100644 --- a/src/main/java/org/qora/data/network/PeerData.java +++ b/src/main/java/org/qora/data/network/PeerData.java @@ -14,6 +14,7 @@ public class PeerData { private Long lastAttempted; private Long lastConnected; private Integer lastHeight; + private Long lastMisbehaved; // Constructors @@ -21,15 +22,16 @@ public class PeerData { protected PeerData() { } - public PeerData(InetSocketAddress socketAddress, Long lastAttempted, Long lastConnected, Integer lastHeight) { + public PeerData(InetSocketAddress socketAddress, Long lastAttempted, Long lastConnected, Integer lastHeight, Long lastMisbehaved) { this.socketAddress = socketAddress; this.lastAttempted = lastAttempted; this.lastConnected = lastConnected; this.lastHeight = lastHeight; + this.lastMisbehaved = lastMisbehaved; } public PeerData(InetSocketAddress socketAddress) { - this(socketAddress, null, null, null); + this(socketAddress, null, null, null, null); } // Getters / setters @@ -62,4 +64,12 @@ public class PeerData { this.lastHeight = lastHeight; } + public Long getLastMisbehaved() { + return this.lastMisbehaved; + } + + public void setLastMisbehaved(Long lastMisbehaved) { + this.lastMisbehaved = lastMisbehaved; + } + } diff --git a/src/main/java/org/qora/network/Handshake.java b/src/main/java/org/qora/network/Handshake.java index 87e7ae61..1b1368f4 100644 --- a/src/main/java/org/qora/network/Handshake.java +++ b/src/main/java/org/qora/network/Handshake.java @@ -5,7 +5,6 @@ import java.util.Arrays; import org.qora.controller.Controller; import org.qora.network.message.Message; import org.qora.network.message.Message.MessageType; -import org.qora.utils.NTP; import org.qora.network.message.PeerIdMessage; import org.qora.network.message.ProofMessage; import org.qora.network.message.VersionMessage; @@ -77,7 +76,7 @@ public enum Handshake { if (peer.isOutbound()) return COMPLETED; - // Check salt hasn't been seen before - this stops multiple peers reusing salt nonce in a Sybil-like attack + // Check salt hasn't been seen before - this stops multiple peers reusing same nonce in a Sybil-like attack if (Proof.seenSalt(proofMessage.getSalt())) return null; @@ -103,9 +102,6 @@ public enum Handshake { @Override public void action(Peer peer) { // Note: this is only called when we've made outbound connection - - // Make a note that we've successfully completed handshake (and when) - peer.getPeerData().setLastConnected(NTP.getTime()); } }; diff --git a/src/main/java/org/qora/network/Network.java b/src/main/java/org/qora/network/Network.java index cf034f3c..2372cd8c 100644 --- a/src/main/java/org/qora/network/Network.java +++ b/src/main/java/org/qora/network/Network.java @@ -9,6 +9,7 @@ import java.net.SocketTimeoutException; import java.net.UnknownHostException; import java.security.SecureRandom; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -23,6 +24,7 @@ import org.qora.data.network.PeerData; import org.qora.network.message.HeightMessage; import org.qora.network.message.Message; import org.qora.network.message.PeersMessage; +import org.qora.network.message.PeersV2Message; import org.qora.network.message.PingMessage; import org.qora.repository.DataException; import org.qora.repository.Repository; @@ -34,11 +36,16 @@ import org.qora.utils.NTP; public class Network extends Thread { private static final Logger LOGGER = LogManager.getLogger(Network.class); - private static final int LISTEN_BACKLOG = 10; - private static final int CONNECT_FAILURE_BACKOFF = 60 * 1000; // ms - private static final int BROADCAST_INTERVAL = 60 * 1000; // ms private static Network instance; + private static final int LISTEN_BACKLOG = 10; + /** How long before retrying after a connection failure, in milliseconds. */ + private static final int CONNECT_FAILURE_BACKOFF = 60 * 1000; // ms + /** How long between informational broadcasts to all connected peers, in milliseconds. */ + private static final int BROADCAST_INTERVAL = 60 * 1000; // ms + /** Maximum time since last successful connection for peer info to be propagated, in milliseconds. */ + private static final long RECENT_CONNECTION_THRESHOLD = 24 * 60 * 60 * 1000; // ms + public static final int PEER_ID_LENGTH = 128; private final byte[] ourPeerId; @@ -113,7 +120,7 @@ public class Network extends Thread { } public void noteToSelf(Peer peer) { - LOGGER.info(String.format("No longer considering peer address %s as it connects to self", peer.getRemoteSocketAddress())); + LOGGER.info(String.format("No longer considering peer address %s as it connects to self", peer)); synchronized (this.selfPeers) { this.selfPeers.add(peer.getPeerData()); @@ -129,7 +136,7 @@ public class Network extends Thread { // Maintain long-term connections to various peers' API applications try { while (true) { - acceptConnection(); + acceptConnections(); createConnection(); @@ -160,38 +167,40 @@ public class Network extends Thread { } @SuppressWarnings("resource") - private void acceptConnection() throws InterruptedException { + private void acceptConnections() throws InterruptedException { Socket socket; - try { - socket = this.listenSocket.accept(); - } catch (SocketTimeoutException e) { - // No connections to accept - return; - } catch (IOException e) { - // Something went wrong or listen socket was closed due to shutdown - return; - } - - synchronized (this.connectedPeers) { - if (connectedPeers.size() >= maxPeers) { - // We have enough peers - LOGGER.trace(String.format("Connection discarded from peer %s", socket.getRemoteSocketAddress())); - - try { - socket.close(); - } catch (IOException e) { - // Not important - } - + do { + try { + socket = this.listenSocket.accept(); + } catch (SocketTimeoutException e) { + // No connections to accept + return; + } catch (IOException e) { + // Something went wrong or listen socket was closed due to shutdown return; } - LOGGER.debug(String.format("Connection accepted from peer %s", socket.getRemoteSocketAddress())); - Peer newPeer = new Peer(socket); - this.connectedPeers.add(newPeer); - peerExecutor.execute(newPeer); - } + synchronized (this.connectedPeers) { + if (connectedPeers.size() >= maxPeers) { + // We have enough peers + LOGGER.trace(String.format("Connection discarded from peer %s", socket.getRemoteSocketAddress())); + + try { + socket.close(); + } catch (IOException e) { + // Not important + } + + return; + } + + LOGGER.debug(String.format("Connection accepted from peer %s", socket.getRemoteSocketAddress())); + Peer newPeer = new Peer(socket); + this.connectedPeers.add(newPeer); + peerExecutor.execute(newPeer); + } + } while (true); } private void createConnection() throws InterruptedException, DataException { @@ -220,7 +229,7 @@ public class Network extends Thread { // Don't consider already connected peers Predicate isConnectedPeer = peerData -> this.connectedPeers.stream() - .anyMatch(peer -> peer.getPeerData() != null && peer.getPeerData().getSocketAddress().equals(peerData.getSocketAddress())); + .anyMatch(peer -> peer.getPeerData().getSocketAddress().equals(peerData.getSocketAddress())); synchronized (this.connectedPeers) { peers.removeIf(isConnectedPeer); @@ -269,7 +278,7 @@ public class Network extends Thread { /** Called when a new message arrives for a peer. message can be null if called after connection */ public void onMessage(Peer peer, Message message) { if (message != null) - LOGGER.trace(String.format("Received %s message from %s", message.getType().name(), peer.getRemoteSocketAddress())); + LOGGER.trace(String.format("Received %s message from %s", message.getType().name(), peer)); Handshake handshakeStatus = peer.getHandshakeStatus(); if (handshakeStatus != Handshake.COMPLETED) { @@ -277,8 +286,7 @@ public class Network extends Thread { // Check message type is as expected if (handshakeStatus.expectedMessageType != null && message.getType() != handshakeStatus.expectedMessageType) { - LOGGER.debug(String.format("Unexpected %s message from %s, expected %s", message.getType().name(), peer.getRemoteSocketAddress(), - handshakeStatus.expectedMessageType)); + LOGGER.debug(String.format("Unexpected %s message from %s, expected %s", message.getType().name(), peer, handshakeStatus.expectedMessageType)); peer.disconnect(); return; } @@ -287,7 +295,7 @@ public class Network extends Thread { if (newHandshakeStatus == null) { // Handshake failure - LOGGER.debug(String.format("Handshake failure with peer %s message %s", peer.getRemoteSocketAddress(), message.getType().name())); + LOGGER.debug(String.format("Handshake failure with peer %s message %s", peer, message.getType().name())); peer.disconnect(); return; } @@ -296,7 +304,7 @@ public class Network extends Thread { // If we made outbound connection then we need to act first newHandshakeStatus.action(peer); else - // We have inbound connection so we need to respond inline with what we just received + // We have inbound connection so we need to respond in kind with what we just received handshakeStatus.action(peer); peer.setHandshakeStatus(newHandshakeStatus); @@ -313,7 +321,7 @@ public class Network extends Thread { case VERSION: case PEER_ID: case PROOF: - LOGGER.debug(String.format("Unexpected handshaking message %s from peer %s", message.getType().name(), peer.getRemoteSocketAddress())); + LOGGER.debug(String.format("Unexpected handshaking message %s from peer %s", message.getType().name(), peer)); peer.disconnect(); return; @@ -334,16 +342,28 @@ public class Network extends Thread { List peerAddresses = new ArrayList<>(); + // v1 PEERS message doesn't support port numbers so we have to add default port for (InetAddress peerAddress : peersMessage.getPeerAddresses()) peerAddresses.add(new InetSocketAddress(peerAddress, Settings.DEFAULT_LISTEN_PORT)); - try { - mergePeers(peerAddresses); - } catch (DataException e) { - // Not good - peer.disconnect(); - return; - } + // Also add peer's details + peerAddresses.add(new InetSocketAddress(peer.getRemoteSocketAddress().getHostString(), Settings.DEFAULT_LISTEN_PORT)); + + mergePeers(peerAddresses); + break; + + case PEERS_V2: + PeersV2Message peersV2Message = (PeersV2Message) message; + + List peerV2Addresses = peersV2Message.getPeerAddresses(); + + // First entry contains remote peer's listen port but empty address. + // Overwrite address with one obtained from socket. + int peerPort = peerV2Addresses.get(0).getPort(); + peerV2Addresses.remove(0); + peerV2Addresses.add(0, InetSocketAddress.createUnresolved(peer.getRemoteSocketAddress().getHostString(), peerPort)); + + mergePeers(peerV2Addresses); break; default: @@ -354,6 +374,9 @@ public class Network extends Thread { } private void onHandshakeCompleted(Peer peer) { + // Make a note that we've successfully completed handshake (and when) + peer.getPeerData().setLastConnected(NTP.getTime()); + peer.startPings(); Message heightMessage = new HeightMessage(Controller.getInstance().getChainHeight()); @@ -363,36 +386,61 @@ public class Network extends Thread { return; } - Message peersMessage = this.buildPeersMessage(); + Message peersMessage = this.buildPeersMessage(peer); if (!peer.sendMessage(peersMessage)) peer.disconnect(); } - public Message buildPeersMessage() { - List peers = new ArrayList<>(); + /** Returns PEERS message made from peers we've connected to recently, and this node's details */ + public Message buildPeersMessage(Peer peer) { + try (final Repository repository = RepositoryManager.getRepository()) { + List knownPeers = repository.getNetworkRepository().getAllPeers(); - synchronized (this.connectedPeers) { - // Only outbound peer connections that have completed handshake - peers = this.connectedPeers.stream().filter(peer -> peer.isOutbound() && peer.getHandshakeStatus() == Handshake.COMPLETED) - .collect(Collectors.toList()); + // Filter out peers that we've not connected to ever or within X milliseconds + long connectionThreshold = NTP.getTime() - RECENT_CONNECTION_THRESHOLD; + knownPeers.removeIf(peerData -> peerData.getLastConnected() == null || peerData.getLastConnected() < connectionThreshold); + + // Map to socket addresses + List peerSocketAddresses = knownPeers.stream().map(peerData -> peerData.getSocketAddress()).collect(Collectors.toList()); + + if (peer.getVersion() >= 2) + // New format PEERS_V2 message that supports hostnames, IPv6 and ports + return new PeersV2Message(peerSocketAddresses); + else + // Legacy PEERS message that only sends IPv4 addresses + return new PeersMessage(peerSocketAddresses); + } catch (DataException e) { + LOGGER.error("Repository issue while building PEERS message", e); + return new PeersMessage(Collections.emptyList()); } - - return new PeersMessage(peers); } // Network-wide calls - private List getCompletedPeers() { - List completedPeers = new ArrayList<>(); + /** Returns list of connected peers that have completed handshaking. */ + public List getHandshakeCompletedPeers() { + List peers = new ArrayList<>(); synchronized (this.connectedPeers) { - completedPeers = this.connectedPeers.stream().filter(peer -> peer.getHandshakeStatus() == Handshake.COMPLETED).collect(Collectors.toList()); + peers = this.connectedPeers.stream().filter(peer -> peer.getHandshakeStatus() == Handshake.COMPLETED).collect(Collectors.toList()); } - return completedPeers; + return peers; } - private void mergePeers(List peerAddresses) throws DataException { + /** Returns list of peers we connected to that have completed handshaking. */ + public List getOutboundHandshakeCompletedPeers() { + List peers = new ArrayList<>(); + + synchronized (this.connectedPeers) { + peers = this.connectedPeers.stream().filter(peer -> peer.isOutbound() && peer.getHandshakeStatus() == Handshake.COMPLETED) + .collect(Collectors.toList()); + } + + return peers; + } + + private void mergePeers(List peerAddresses) { try (final Repository repository = RepositoryManager.getRepository()) { List knownPeers = repository.getNetworkRepository().getAllPeers(); @@ -412,28 +460,30 @@ public class Network extends Thread { } repository.saveChanges(); + } catch (DataException e) { + LOGGER.error("Repository issue while merging peers list from remote node", e); } } - public void broadcast(Message message) { + public void broadcast(Function peerMessage) { class Broadcaster implements Runnable { private List targetPeers; - private Message message; + private Function peerMessage; - public Broadcaster(List targetPeers, Message message) { + public Broadcaster(List targetPeers, Function peerMessage) { this.targetPeers = targetPeers; - this.message = message; + this.peerMessage = peerMessage; } @Override public void run() { for (Peer peer : targetPeers) - if (!peer.sendMessage(message)) + if (!peer.sendMessage(peerMessage.apply(peer))) peer.disconnect(); } } - peerExecutor.execute(new Broadcaster(this.getCompletedPeers(), message)); + peerExecutor.execute(new Broadcaster(this.getHandshakeCompletedPeers(), peerMessage)); } public void shutdown() { diff --git a/src/main/java/org/qora/network/Peer.java b/src/main/java/org/qora/network/Peer.java index b004fcd8..32acd76c 100644 --- a/src/main/java/org/qora/network/Peer.java +++ b/src/main/java/org/qora/network/Peer.java @@ -62,6 +62,7 @@ public class Peer implements Runnable { this.isOutbound = false; this.socket = socket; this.remoteSocketAddress = (InetSocketAddress) this.socket.getRemoteSocketAddress(); + this.peerData = new PeerData(this.remoteSocketAddress); } // Getters / setters @@ -121,6 +122,15 @@ public class Peer implements Runnable { this.lastPing = lastPing; } + // Easier, and nicer output, than peer.getRemoteSocketAddress() + + @Override + public String toString() { + InetSocketAddress socketAddress = this.getRemoteSocketAddress(); + + return socketAddress.getHostString() + ":" + socketAddress.getPort(); + } + // Processing private void setup() throws IOException { @@ -131,22 +141,22 @@ public class Peer implements Runnable { } public boolean connect() { - LOGGER.trace(String.format("Connecting to peer %s", this.remoteSocketAddress)); + LOGGER.trace(String.format("Connecting to peer %s", this)); this.socket = new Socket(); try { InetSocketAddress resolvedSocketAddress = new InetSocketAddress(this.remoteSocketAddress.getHostString(), this.remoteSocketAddress.getPort()); this.socket.connect(resolvedSocketAddress, CONNECT_TIMEOUT); - LOGGER.debug(String.format("Connected to peer %s", this.remoteSocketAddress)); + LOGGER.debug(String.format("Connected to peer %s", this)); } catch (SocketTimeoutException e) { - LOGGER.trace(String.format("Connection timed out to peer %s", this.remoteSocketAddress)); + LOGGER.trace(String.format("Connection timed out to peer %s", this)); return false; } catch (UnknownHostException e) { - LOGGER.trace(String.format("Connection failed to unresolved peer %s", this.remoteSocketAddress)); + LOGGER.trace(String.format("Connection failed to unresolved peer %s", this)); return false; } catch (IOException e) { - LOGGER.trace(String.format("Connection failed to peer %s", this.remoteSocketAddress)); + LOGGER.trace(String.format("Connection failed to peer %s", this)); return false; } @@ -157,7 +167,7 @@ public class Peer implements Runnable { @Override public void run() { - Thread.currentThread().setName("Peer " + this.socket.getRemoteSocketAddress()); + Thread.currentThread().setName("Peer " + this); try (DataInputStream in = new DataInputStream(socket.getInputStream())) { setup(); @@ -199,7 +209,7 @@ public class Peer implements Runnable { try { // Send message - LOGGER.trace(String.format("Sending %s message to peer %s", message.getType().name(), this.getRemoteSocketAddress())); + LOGGER.trace(String.format("Sending %s message to peer %s", message.getType().name(), this)); synchronized (this.out) { this.out.write(message.toBytes()); @@ -288,7 +298,7 @@ public class Peer implements Runnable { // Close socket if (!this.socket.isClosed()) { - LOGGER.debug(String.format("Disconnected peer %s", this.getRemoteSocketAddress())); + LOGGER.debug(String.format("Disconnected peer %s", this)); try { this.socket.close(); diff --git a/src/main/java/org/qora/network/message/BlockMessage.java b/src/main/java/org/qora/network/message/BlockMessage.java new file mode 100644 index 00000000..649915ab --- /dev/null +++ b/src/main/java/org/qora/network/message/BlockMessage.java @@ -0,0 +1,91 @@ +package org.qora.network.message; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.util.List; + +import org.qora.block.Block; +import org.qora.data.at.ATStateData; +import org.qora.data.block.BlockData; +import org.qora.data.transaction.TransactionData; +import org.qora.transform.TransformationException; +import org.qora.transform.block.BlockTransformer; +import org.qora.utils.Triple; + +import com.google.common.primitives.Ints; + +public class BlockMessage extends Message { + + private Block block = null; + + private BlockData blockData = null; + private List transactions = null; + private List atStates = null; + + private int height; + + public BlockMessage(Block block) { + super(MessageType.BLOCK); + + this.block = block; + this.height = block.getBlockData().getHeight(); + } + + private BlockMessage(int id, BlockData blockData, List transactions, List atStates) { + super(id, MessageType.BLOCK); + + this.blockData = blockData; + this.transactions = transactions; + this.atStates = atStates; + + this.height = blockData.getHeight(); + } + + public BlockData getBlockData() { + return this.blockData; + } + + public List getTransactions() { + return this.transactions; + } + + public List getAtStates() { + return this.atStates; + } + + public static Message fromByteBuffer(int id, ByteBuffer byteBuffer) throws UnsupportedEncodingException { + try { + int height = byteBuffer.getInt(); + + Triple, List> blockInfo = BlockTransformer.fromByteBuffer(byteBuffer); + + BlockData blockData = blockInfo.getA(); + blockData.setHeight(height); + + return new BlockMessage(id, blockData, blockInfo.getB(), blockInfo.getC()); + } catch (TransformationException e) { + return null; + } + } + + @Override + protected byte[] toData() { + if (this.block == null) + return null; + + try { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + + bytes.write(Ints.toByteArray(this.height)); + + bytes.write(BlockTransformer.toBytes(this.block)); + + return bytes.toByteArray(); + } catch (TransformationException | IOException e) { + return null; + } + } + +} diff --git a/src/main/java/org/qora/network/message/GetBlockMessage.java b/src/main/java/org/qora/network/message/GetBlockMessage.java new file mode 100644 index 00000000..3813d5ee --- /dev/null +++ b/src/main/java/org/qora/network/message/GetBlockMessage.java @@ -0,0 +1,54 @@ +package org.qora.network.message; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; + +import org.qora.transform.block.BlockTransformer; + +public class GetBlockMessage extends Message { + + private static final int BLOCK_SIGNATURE_LENGTH = BlockTransformer.BLOCK_SIGNATURE_LENGTH; + + private byte[] signature; + + public GetBlockMessage(byte[] signature) { + this(-1, signature); + } + + private GetBlockMessage(int id, byte[] signature) { + super(id, MessageType.GET_BLOCK); + + this.signature = signature; + } + + public byte[] getSignature() { + return this.signature; + } + + public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException { + if (bytes.remaining() != BLOCK_SIGNATURE_LENGTH) + return null; + + byte[] signature = new byte[BLOCK_SIGNATURE_LENGTH]; + + bytes.get(signature); + + return new GetBlockMessage(id, signature); + } + + @Override + protected byte[] toData() { + try { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + + bytes.write(this.signature); + + return bytes.toByteArray(); + } catch (IOException e) { + return null; + } + } + +} diff --git a/src/main/java/org/qora/network/message/GetSignaturesMessage.java b/src/main/java/org/qora/network/message/GetSignaturesMessage.java new file mode 100644 index 00000000..5379abcd --- /dev/null +++ b/src/main/java/org/qora/network/message/GetSignaturesMessage.java @@ -0,0 +1,54 @@ +package org.qora.network.message; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; + +import org.qora.transform.block.BlockTransformer; + +public class GetSignaturesMessage extends Message { + + private static final int BLOCK_SIGNATURE_LENGTH = BlockTransformer.BLOCK_SIGNATURE_LENGTH; + + private byte[] parentSignature; + + public GetSignaturesMessage(byte[] parentSignature) { + this(-1, parentSignature); + } + + private GetSignaturesMessage(int id, byte[] parentSignature) { + super(id, MessageType.GET_SIGNATURES); + + this.parentSignature = parentSignature; + } + + public byte[] getParentSignature() { + return this.parentSignature; + } + + public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException { + if (bytes.remaining() != BLOCK_SIGNATURE_LENGTH) + return null; + + byte[] parentSignature = new byte[BLOCK_SIGNATURE_LENGTH]; + + bytes.get(parentSignature); + + return new GetSignaturesMessage(id, parentSignature); + } + + @Override + protected byte[] toData() { + try { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + + bytes.write(this.parentSignature); + + return bytes.toByteArray(); + } catch (IOException e) { + return null; + } + } + +} diff --git a/src/main/java/org/qora/network/message/Message.java b/src/main/java/org/qora/network/message/Message.java index 03755e8a..ab8eacf1 100644 --- a/src/main/java/org/qora/network/message/Message.java +++ b/src/main/java/org/qora/network/message/Message.java @@ -39,7 +39,8 @@ public abstract class Message { PING(9), VERSION(10), PEER_ID(11), - PROOF(12); + PROOF(12), + PEERS_V2(13); public final int value; public final Method fromByteBuffer; diff --git a/src/main/java/org/qora/network/message/PeersMessage.java b/src/main/java/org/qora/network/message/PeersMessage.java index 9c6aa448..a576c403 100644 --- a/src/main/java/org/qora/network/message/PeersMessage.java +++ b/src/main/java/org/qora/network/message/PeersMessage.java @@ -4,13 +4,12 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.InetAddress; +import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; -import org.qora.network.Peer; - import com.google.common.primitives.Ints; // NOTE: this legacy message only supports 4-byte IPv4 addresses and doesn't send port number either @@ -20,15 +19,15 @@ public class PeersMessage extends Message { private List peerAddresses; - public PeersMessage(List peers) { - super(-1, MessageType.PEERS); + public PeersMessage(List peerSocketAddresses) { + super(MessageType.PEERS); // We have to forcibly resolve into IP addresses as we can't send hostnames this.peerAddresses = new ArrayList<>(); - for (Peer peer : peers) { + for (InetSocketAddress peerSocketAddress : peerSocketAddresses) { try { - InetAddress resolvedAddress = InetAddress.getByName(peer.getRemoteSocketAddress().getHostString()); + InetAddress resolvedAddress = InetAddress.getByName(peerSocketAddress.getHostString()); // Filter out unsupported address types if (resolvedAddress.getAddress().length != ADDRESS_LENGTH) diff --git a/src/main/java/org/qora/network/message/PeersV2Message.java b/src/main/java/org/qora/network/message/PeersV2Message.java new file mode 100644 index 00000000..528dfcbb --- /dev/null +++ b/src/main/java/org/qora/network/message/PeersV2Message.java @@ -0,0 +1,134 @@ +package org.qora.network.message; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import org.qora.settings.Settings; + +import com.google.common.primitives.Bytes; +import com.google.common.primitives.Ints; + +// NOTE: this message supports hostnames, IPv6, port numbers and IPv4 addresses (in IPv6 form) +public class PeersV2Message extends Message { + + private static final byte[] IPV6_V4_PREFIX = new byte[] { + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, (byte) 0xff, (byte) 0xff + }; + + private List peerSocketAddresses; + + public PeersV2Message(List peerSocketAddresses) { + this(-1, peerSocketAddresses); + } + + private PeersV2Message(int id, List peerSocketAddresses) { + super(id, MessageType.PEERS_V2); + + this.peerSocketAddresses = peerSocketAddresses; + } + + public List getPeerAddresses() { + return this.peerSocketAddresses; + } + + public static Message fromByteBuffer(int id, ByteBuffer byteBuffer) throws UnsupportedEncodingException { + // Read entry count + int count = byteBuffer.getInt(); + + List peerSocketAddresses = new ArrayList<>(); + + byte[] ipAddressBytes = new byte[16]; + int port; + + for (int i = 0; i < count; ++i) { + byte addressSize = byteBuffer.get(); + + if (addressSize == 0) { + // Address size of 0 indicates IP address (always in IPv6 form) + byteBuffer.get(ipAddressBytes); + + port = byteBuffer.getInt(); + + try { + InetAddress address = InetAddress.getByAddress(ipAddressBytes); + + peerSocketAddresses.add(new InetSocketAddress(address, port)); + } catch (UnknownHostException e) { + // Ignore and continue + } + } else { + byte[] hostnameBytes = new byte[addressSize & 0xff]; + byteBuffer.get(hostnameBytes); + String hostname = new String(hostnameBytes, "UTF-8"); + + port = byteBuffer.getInt(); + + peerSocketAddresses.add(InetSocketAddress.createUnresolved(hostname, port)); + } + } + + return new PeersV2Message(id, peerSocketAddresses); + } + + @Override + protected byte[] toData() { + try { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + + // First entry represents sending node but contains only port number with empty address. + List socketAddresses = new ArrayList<>(this.peerSocketAddresses); + socketAddresses.add(0, new InetSocketAddress(Settings.getInstance().getListenPort())); + + // Number of entries we are sending. + int count = socketAddresses.size(); + + for (InetSocketAddress socketAddress : socketAddresses) { + // Hostname preferred, failing that IP address + if (socketAddress.isUnresolved()) { + String hostname = socketAddress.getHostString(); + + byte[] hostnameBytes = hostname.getBytes("UTF-8"); + + // We don't support hostnames that are longer than 256 bytes + if (hostnameBytes.length > 256) { + --count; + continue; + } + + bytes.write(hostnameBytes.length); + + bytes.write(hostnameBytes); + } else { + // IP address + byte[] ipAddressBytes = socketAddress.getAddress().getAddress(); + + // IPv4? Convert to IPv6 form + if (ipAddressBytes.length == 4) + ipAddressBytes = Bytes.concat(IPV6_V4_PREFIX, ipAddressBytes); + + // Write zero length to indicate IP address follows + bytes.write(0); + + bytes.write(ipAddressBytes); + } + + // Port + bytes.write(Ints.toByteArray(socketAddress.getPort())); + } + + // Prepend updated entry count + byte[] countBytes = Ints.toByteArray(count); + return Bytes.concat(countBytes, bytes.toByteArray()); + } catch (IOException e) { + return null; + } + } + +} diff --git a/src/main/java/org/qora/network/message/SignaturesMessage.java b/src/main/java/org/qora/network/message/SignaturesMessage.java new file mode 100644 index 00000000..efe1cdad --- /dev/null +++ b/src/main/java/org/qora/network/message/SignaturesMessage.java @@ -0,0 +1,66 @@ +package org.qora.network.message; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import org.qora.transform.block.BlockTransformer; + +import com.google.common.primitives.Ints; + +public class SignaturesMessage extends Message { + + private static final int BLOCK_SIGNATURE_LENGTH = BlockTransformer.BLOCK_SIGNATURE_LENGTH; + + private List signatures; + + public SignaturesMessage(List signatures) { + this(-1, signatures); + } + + private SignaturesMessage(int id, List signatures) { + super(id, MessageType.SIGNATURES); + + this.signatures = signatures; + } + + public List getSignatures() { + return this.signatures; + } + + public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException { + int count = bytes.getInt(); + + if (bytes.remaining() != count * BLOCK_SIGNATURE_LENGTH) + return null; + + List signatures = new ArrayList<>(); + for (int i = 0; i < count; ++i) { + byte[] signature = new byte[BLOCK_SIGNATURE_LENGTH]; + bytes.get(signature); + signatures.add(signature); + } + + return new SignaturesMessage(id, signatures); + } + + @Override + protected byte[] toData() { + try { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + + bytes.write(Ints.toByteArray(this.signatures.size())); + + for (byte[] signature : this.signatures) + bytes.write(signature); + + return bytes.toByteArray(); + } catch (IOException e) { + return null; + } + } + +} diff --git a/src/main/java/org/qora/repository/Repository.java b/src/main/java/org/qora/repository/Repository.java index bbe1a06a..efab17e5 100644 --- a/src/main/java/org/qora/repository/Repository.java +++ b/src/main/java/org/qora/repository/Repository.java @@ -24,6 +24,10 @@ public interface Repository extends AutoCloseable { public void discardChanges() throws DataException; + void setSavepoint() throws DataException; + + void rollbackToSavepoint() throws DataException; + @Override public void close() throws DataException; diff --git a/src/main/java/org/qora/repository/TransactionRepository.java b/src/main/java/org/qora/repository/TransactionRepository.java index 25dbed5c..97a6aa34 100644 --- a/src/main/java/org/qora/repository/TransactionRepository.java +++ b/src/main/java/org/qora/repository/TransactionRepository.java @@ -44,6 +44,14 @@ public interface TransactionRepository { public List getAssetTransactions(int assetId, ConfirmationStatus confirmationStatus, Integer limit, Integer offset, Boolean reverse) throws DataException; + /** + * Returns whether transaction is confirmed or not. + * + * @param signature + * @return true if confirmed, false if not. + */ + public boolean isConfirmed(byte[] signature) throws DataException; + /** * Returns list of unconfirmed transactions in timestamp-else-signature order. *

@@ -75,7 +83,13 @@ public interface TransactionRepository { */ public void confirmTransaction(byte[] signature) throws DataException; - void unconfirmTransaction(TransactionData transactionData) throws DataException; + /** + * Add transaction to unconfirmed transactions pile. + * + * @param transactionData + * @throws DataException + */ + public void unconfirmTransaction(TransactionData transactionData) throws DataException; public void save(TransactionData transactionData) throws DataException; diff --git a/src/main/java/org/qora/repository/hsqldb/HSQLDBDatabaseUpdates.java b/src/main/java/org/qora/repository/hsqldb/HSQLDBDatabaseUpdates.java index 54ea6175..e41e2c31 100644 --- a/src/main/java/org/qora/repository/hsqldb/HSQLDBDatabaseUpdates.java +++ b/src/main/java/org/qora/repository/hsqldb/HSQLDBDatabaseUpdates.java @@ -503,7 +503,7 @@ public class HSQLDBDatabaseUpdates { case 30: // Networking stmt.execute("CREATE TABLE Peers (hostname VARCHAR(255), port INTEGER, last_connected TIMESTAMP WITH TIME ZONE, last_attempted TIMESTAMP WITH TIME ZONE, " - + "last_height INTEGER, PRIMARY KEY (hostname, port))"); + + "last_height INTEGER, last_misbehaved TIMESTAMP WITH TIME ZONE, PRIMARY KEY (hostname, port))"); break; default: diff --git a/src/main/java/org/qora/repository/hsqldb/HSQLDBNetworkRepository.java b/src/main/java/org/qora/repository/hsqldb/HSQLDBNetworkRepository.java index 4d8b0992..f8292680 100644 --- a/src/main/java/org/qora/repository/hsqldb/HSQLDBNetworkRepository.java +++ b/src/main/java/org/qora/repository/hsqldb/HSQLDBNetworkRepository.java @@ -24,7 +24,8 @@ public class HSQLDBNetworkRepository implements NetworkRepository { public List getAllPeers() throws DataException { List peers = new ArrayList<>(); - try (ResultSet resultSet = this.repository.checkedExecute("SELECT hostname, port, last_connected, last_attempted, last_height FROM Peers")) { + try (ResultSet resultSet = this.repository + .checkedExecute("SELECT hostname, port, last_connected, last_attempted, last_height, last_misbehaved FROM Peers")) { if (resultSet == null) return peers; @@ -44,7 +45,10 @@ public class HSQLDBNetworkRepository implements NetworkRepository { if (resultSet.wasNull()) lastHeight = null; - peers.add(new PeerData(socketAddress, lastConnected, lastAttempted, lastHeight)); + Timestamp lastMisbehavedTimestamp = resultSet.getTimestamp(6, Calendar.getInstance(HSQLDBRepository.UTC)); + Long lastMisbehaved = resultSet.wasNull() ? null : lastMisbehavedTimestamp.getTime(); + + peers.add(new PeerData(socketAddress, lastConnected, lastAttempted, lastHeight, lastMisbehaved)); } while (resultSet.next()); return peers; @@ -59,9 +63,11 @@ public class HSQLDBNetworkRepository implements NetworkRepository { Timestamp lastConnected = peerData.getLastConnected() == null ? null : new Timestamp(peerData.getLastConnected()); Timestamp lastAttempted = peerData.getLastAttempted() == null ? null : new Timestamp(peerData.getLastAttempted()); + Timestamp lastMisbehaved = peerData.getLastMisbehaved() == null ? null : new Timestamp(peerData.getLastMisbehaved()); saveHelper.bind("hostname", peerData.getSocketAddress().getHostString()).bind("port", peerData.getSocketAddress().getPort()) - .bind("last_connected", lastConnected).bind("last_attempted", lastAttempted).bind("last_height", peerData.getLastHeight()); + .bind("last_connected", lastConnected).bind("last_attempted", lastAttempted).bind("last_height", peerData.getLastHeight()) + .bind("last_misbehaved", lastMisbehaved); try { saveHelper.execute(this.repository); diff --git a/src/main/java/org/qora/repository/hsqldb/HSQLDBRepository.java b/src/main/java/org/qora/repository/hsqldb/HSQLDBRepository.java index 16db33ad..09481e09 100644 --- a/src/main/java/org/qora/repository/hsqldb/HSQLDBRepository.java +++ b/src/main/java/org/qora/repository/hsqldb/HSQLDBRepository.java @@ -5,7 +5,10 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Savepoint; import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; import java.util.TimeZone; import org.apache.logging.log4j.LogManager; @@ -33,11 +36,13 @@ public class HSQLDBRepository implements Repository { public static final TimeZone UTC = TimeZone.getTimeZone("UTC"); protected Connection connection; + protected List savepoints; protected boolean debugState = false; // NB: no visibility modifier so only callable from within same package HSQLDBRepository(Connection connection) { this.connection = connection; + this.savepoints = new ArrayList<>(); } @Override @@ -91,6 +96,8 @@ public class HSQLDBRepository implements Repository { this.connection.commit(); } catch (SQLException e) { throw new DataException("commit error", e); + } finally { + this.savepoints.clear(); } } @@ -100,6 +107,33 @@ public class HSQLDBRepository implements Repository { this.connection.rollback(); } catch (SQLException e) { throw new DataException("rollback error", e); + } finally { + this.savepoints.clear(); + } + } + + @Override + public void setSavepoint() throws DataException { + try { + Savepoint savepoint = this.connection.setSavepoint(); + this.savepoints.add(savepoint); + } catch (SQLException e) { + throw new DataException("savepoint error", e); + } + } + + @Override + public void rollbackToSavepoint() throws DataException { + if (this.savepoints.isEmpty()) + throw new DataException("no savepoint to rollback"); + + Savepoint savepoint = this.savepoints.get(0); + this.savepoints.remove(0); + + try { + this.connection.rollback(savepoint); + } catch (SQLException e) { + throw new DataException("savepoint rollback error", e); } } diff --git a/src/main/java/org/qora/repository/hsqldb/transaction/HSQLDBTransactionRepository.java b/src/main/java/org/qora/repository/hsqldb/transaction/HSQLDBTransactionRepository.java index dc4b710d..fb5054cf 100644 --- a/src/main/java/org/qora/repository/hsqldb/transaction/HSQLDBTransactionRepository.java +++ b/src/main/java/org/qora/repository/hsqldb/transaction/HSQLDBTransactionRepository.java @@ -475,6 +475,15 @@ public class HSQLDBTransactionRepository implements TransactionRepository { } } + @Override + public boolean isConfirmed(byte[] signature) throws DataException { + try { + return this.repository.exists("BlockTransactions", "transaction_signature = ?", signature); + } catch (SQLException e) { + throw new DataException("Unable to check whether transaction is confirmed in repository", e); + } + } + @Override public List getUnconfirmedTransactions(Integer limit, Integer offset, Boolean reverse) throws DataException { String sql = "SELECT signature FROM UnconfirmedTransactions ORDER BY creation"; diff --git a/src/main/java/org/qora/transform/block/BlockTransformer.java b/src/main/java/org/qora/transform/block/BlockTransformer.java index b1b39685..748f9368 100644 --- a/src/main/java/org/qora/transform/block/BlockTransformer.java +++ b/src/main/java/org/qora/transform/block/BlockTransformer.java @@ -44,10 +44,10 @@ public class BlockTransformer extends Transformer { private static final int GENERATOR_LENGTH = PUBLIC_KEY_LENGTH; private static final int TRANSACTION_COUNT_LENGTH = INT_LENGTH; - private static final int BASE_LENGTH = VERSION_LENGTH + BLOCK_REFERENCE_LENGTH + TIMESTAMP_LENGTH + GENERATING_BALANCE_LENGTH + GENERATOR_LENGTH + private static final int BASE_LENGTH = VERSION_LENGTH + TIMESTAMP_LENGTH + BLOCK_REFERENCE_LENGTH + GENERATING_BALANCE_LENGTH + GENERATOR_LENGTH + TRANSACTIONS_SIGNATURE_LENGTH + GENERATOR_SIGNATURE_LENGTH + TRANSACTION_COUNT_LENGTH; - protected static final int BLOCK_SIGNATURE_LENGTH = GENERATOR_SIGNATURE_LENGTH + TRANSACTIONS_SIGNATURE_LENGTH; + public static final int BLOCK_SIGNATURE_LENGTH = GENERATOR_SIGNATURE_LENGTH + TRANSACTIONS_SIGNATURE_LENGTH; protected static final int TRANSACTION_SIZE_LENGTH = INT_LENGTH; // per transaction protected static final int AT_BYTES_LENGTH = INT_LENGTH; protected static final int AT_FEES_LENGTH = LONG_LENGTH; @@ -72,9 +72,20 @@ public class BlockTransformer extends Transformer { ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + return fromByteBuffer(byteBuffer); + } + + /** + * Extract block data and transaction data from serialized bytes. + * + * @param bytes + * @return BlockData and a List of transactions. + * @throws TransformationException + */ + public static Triple, List> fromByteBuffer(ByteBuffer byteBuffer) throws TransformationException { int version = byteBuffer.getInt(); - if (version >= 2 && bytes.length < BASE_LENGTH + AT_LENGTH) + if (version >= 2 && byteBuffer.remaining() < BASE_LENGTH + AT_BYTES_LENGTH - VERSION_LENGTH) throw new TransformationException("Byte data too short for V2+ Block"); long timestamp = byteBuffer.getLong(); diff --git a/src/main/java/org/qora/utils/NTP.java b/src/main/java/org/qora/utils/NTP.java index 9fd6f20e..c0a9f07e 100644 --- a/src/main/java/org/qora/utils/NTP.java +++ b/src/main/java/org/qora/utils/NTP.java @@ -24,7 +24,6 @@ public final class NTP { lastUpdate = System.currentTimeMillis(); // Log new value of offset - // TODO: LOGGER.info(Lang.getInstance().translate("Adjusting time with %offset% milliseconds.").replace("%offset%", String.valueOf(offset))); LOGGER.info("Adjusting time with %offset% milliseconds.".replace("%offset%", String.valueOf(offset))); }