Networking and repository

Some pom.xml changes to reduce maven-shade-plugin conflicting classes warnings.

Repository now supports SAVEPOINT and ROLLBACK TO SAVEPOINT.
HSQLDB concurrency/transaction model changed from LOCKS to MVCC to help with
 transaction deadlocks/rollbacks.

More XXXs and TODOs added to Block.java for investigation/fix/improvements.
Also used new repository SAVEPOINT feature when validating transactions
 instead of rolling back entire transaction. This fixes problem during
 synchronization where the rollback would undo previously synchronized,
 but not yet committed, blocks!

Transactions orphaned by Block.orphan ARE now added to unconfirmed pile,
 unlike before.

Concurrent lock now prevents simultaneous block generation and synchronization,
 including synchronization via multiple peers.

Lots of new networking code: peer lists, block signatures, blocks,
 blockchain synchronization.

PEERS_V2 message now supports hostnames, IPv6 and port numbers.

Fixed bug with block serialization for transport over network.
This commit is contained in:
catbref 2019-02-01 14:03:06 +00:00
parent 0db43451d4
commit 7f4511cb7b
27 changed files with 1081 additions and 195 deletions

View File

@ -26,6 +26,10 @@ logger.txSearch.level = trace
logger.blockgen.name = org.qora.block.BlockGenerator logger.blockgen.name = org.qora.block.BlockGenerator
logger.blockgen.level = trace logger.blockgen.level = trace
# Debug synchronization
logger.sync.name = org.qora.controller.Synchronizer
logger.sync.level = trace
# Debug networking # Debug networking
logger.network.name = org.qora.network.Network logger.network.name = org.qora.network.Network
logger.network.level = trace logger.network.level = trace

12
pom.xml
View File

@ -385,6 +385,12 @@
<groupId>org.glassfish.jersey.inject</groupId> <groupId>org.glassfish.jersey.inject</groupId>
<artifactId>jersey-hk2</artifactId> <artifactId>jersey-hk2</artifactId>
<version>${jersey.version}</version> <version>${jersey.version}</version>
<exclusions>
<exclusion><!-- exclude javax.inject-1.jar because other jersey modules include javax.inject v2+ -->
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
</exclusion>
</exclusions>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.glassfish.jersey.media</groupId> <groupId>org.glassfish.jersey.media</groupId>
@ -406,6 +412,12 @@
<groupId>io.swagger.core.v3</groupId> <groupId>io.swagger.core.v3</groupId>
<artifactId>swagger-jaxrs2-servlet-initializer</artifactId> <artifactId>swagger-jaxrs2-servlet-initializer</artifactId>
<version>${swagger-api.version}</version> <version>${swagger-api.version}</version>
<exclusions>
<exclusion><!-- excluded because included in swagger-jaxrs2-servlet-initializer -->
<groupId>io.swagger.core.v3</groupId>
<artifactId>swagger-integration</artifactId>
</exclusion>
</exclusions>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.webjars</groupId> <groupId>org.webjars</groupId>

View File

@ -102,12 +102,12 @@ public enum BlockchainAPI {
BTC(1) { BTC(1) {
@Override @Override
public void putTransactionFromRecipientAfterTimestampInA(String recipient, Timestamp timestamp, MachineState state) { public void putTransactionFromRecipientAfterTimestampInA(String recipient, Timestamp timestamp, MachineState state) {
// TODO // TODO BTC transaction support for ATv2
} }
@Override @Override
public long getAmountFromTransactionInA(Timestamp timestamp, MachineState state) { public long getAmountFromTransactionInA(Timestamp timestamp, MachineState state) {
// TODO // TODO BTC transaction support for ATv2
return 0; return 0;
} }
}; };

View File

@ -82,6 +82,7 @@ public class Block {
TRANSACTION_TIMESTAMP_INVALID(51), TRANSACTION_TIMESTAMP_INVALID(51),
TRANSACTION_INVALID(52), TRANSACTION_INVALID(52),
TRANSACTION_PROCESSING_FAILED(53), TRANSACTION_PROCESSING_FAILED(53),
TRANSACTION_ALREADY_PROCESSED(54),
AT_STATES_MISMATCH(61); AT_STATES_MISMATCH(61);
public final int value; public final int value;
@ -123,6 +124,7 @@ public class Block {
// Other useful constants // Other useful constants
/** Maximum size of block in bytes */ /** Maximum size of block in bytes */
// TODO push this out to blockchain config file
public static final int MAX_BLOCK_BYTES = 1048576; public static final int MAX_BLOCK_BYTES = 1048576;
// Constructors // Constructors
@ -737,7 +739,7 @@ public class Block {
return ValidationResult.TIMESTAMP_MS_INCORRECT; return ValidationResult.TIMESTAMP_MS_INCORRECT;
// Too early to forge block? // Too early to forge block?
// XXX DISABLED // XXX DISABLED as it doesn't work - but why?
// if (this.blockData.getTimestamp() < parentBlock.getBlockData().getTimestamp() + BlockChain.getInstance().getMinBlockTime()) // if (this.blockData.getTimestamp() < parentBlock.getBlockData().getTimestamp() + BlockChain.getInstance().getMinBlockTime())
// return ValidationResult.TIMESTAMP_TOO_SOON; // return ValidationResult.TIMESTAMP_TOO_SOON;
@ -751,6 +753,7 @@ public class Block {
if (this.blockData.getGeneratingBalance().compareTo(parentBlock.calcNextBlockGeneratingBalance()) != 0) if (this.blockData.getGeneratingBalance().compareTo(parentBlock.calcNextBlockGeneratingBalance()) != 0)
return ValidationResult.GENERATING_BALANCE_INCORRECT; return ValidationResult.GENERATING_BALANCE_INCORRECT;
// XXX Block.isValid generator check relaxation?? blockchain config option?
// After maximum block period, then generator checks are relaxed // After maximum block period, then generator checks are relaxed
if (this.blockData.getTimestamp() < parentBlock.getBlockData().getTimestamp() + BlockChain.getInstance().getMaxBlockTime()) { if (this.blockData.getTimestamp() < parentBlock.getBlockData().getTimestamp() + BlockChain.getInstance().getMaxBlockTime()) {
// Check generator is allowed to forge this block // Check generator is allowed to forge this block
@ -814,6 +817,9 @@ public class Block {
// Check transactions // Check transactions
try { try {
// Create repository savepoint here so we can rollback to it after testing transactions
repository.setSavepoint();
for (Transaction transaction : this.getTransactions()) { for (Transaction transaction : this.getTransactions()) {
// GenesisTransactions are not allowed (GenesisBlock overrides isValid() to allow them) // GenesisTransactions are not allowed (GenesisBlock overrides isValid() to allow them)
if (transaction instanceof GenesisTransaction) if (transaction instanceof GenesisTransaction)
@ -824,6 +830,10 @@ public class Block {
|| transaction.getDeadline() <= this.blockData.getTimestamp()) || transaction.getDeadline() <= this.blockData.getTimestamp())
return ValidationResult.TRANSACTION_TIMESTAMP_INVALID; return ValidationResult.TRANSACTION_TIMESTAMP_INVALID;
// Check transaction isn't already included in a block
if (this.repository.getTransactionRepository().isConfirmed(transaction.getTransactionData().getSignature()))
return ValidationResult.TRANSACTION_ALREADY_PROCESSED;
// Check transaction is even valid // Check transaction is even valid
// NOTE: in Gen1 there was an extra block height passed to DeployATTransaction.isValid // NOTE: in Gen1 there was an extra block height passed to DeployATTransaction.isValid
Transaction.ValidationResult validationResult = transaction.isValid(); Transaction.ValidationResult validationResult = transaction.isValid();
@ -843,15 +853,15 @@ public class Block {
} }
} }
} catch (DataException e) { } catch (DataException e) {
return ValidationResult.TRANSACTION_TIMESTAMP_INVALID; // XXX why was this TRANSACTION_TIMESTAMP_INVALID?
return ValidationResult.TRANSACTION_INVALID;
} finally { } finally {
// Discard changes to repository made by test-processing transactions above // Rollback repository changes made by test-processing transactions above
try { try {
this.repository.discardChanges(); this.repository.rollbackToSavepoint();
} catch (DataException e) { } catch (DataException e) {
/* /*
* discardChanges failure most likely due to prior DataException, so catch discardChanges' DataException and ignore. Prior DataException * Rollback failure most likely due to prior DataException, so discard this DataException. Prior DataException propagates to caller.
* propagates to caller.
*/ */
} }
} }
@ -916,7 +926,8 @@ public class Block {
this.blockData.setTransactionCount(this.blockData.getTransactionCount() + 1); this.blockData.setTransactionCount(this.blockData.getTransactionCount() + 1);
// We've added transactions, so recalculate transactions signature // We've added transactions, so recalculate transactions signature
calcTransactionsSignature(); // XXX surely this breaks Block.isSignatureValid which is called before we are?
// calcTransactionsSignature();
} }
/** /**
@ -976,9 +987,7 @@ public class Block {
} }
/** /**
* Removes block from blockchain undoing transactions. * Removes block from blockchain undoing transactions and adding them to unconfirmed pile.
* <p>
* Note: it is up to the caller to re-add any of the block's transactions back to the unconfirmed transactions pile.
* *
* @throws DataException * @throws DataException
*/ */
@ -990,10 +999,14 @@ public class Block {
Transaction transaction = transactions.get(sequence); Transaction transaction = transactions.get(sequence);
transaction.orphan(); transaction.orphan();
// Unlink transaction from this block
BlockTransactionData blockTransactionData = new BlockTransactionData(this.getSignature(), sequence, BlockTransactionData blockTransactionData = new BlockTransactionData(this.getSignature(), sequence,
transaction.getTransactionData().getSignature()); transaction.getTransactionData().getSignature());
this.repository.getBlockRepository().delete(blockTransactionData); this.repository.getBlockRepository().delete(blockTransactionData);
// Add to unconfirmed pile
this.repository.getTransactionRepository().unconfirmTransaction(transaction.getTransactionData());
this.repository.getTransactionRepository().deleteParticipants(transaction.getTransactionData()); this.repository.getTransactionRepository().deleteParticipants(transaction.getTransactionData());
} }

View File

@ -2,6 +2,7 @@ package org.qora.block;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.locks.Lock;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
@ -78,40 +79,47 @@ public class BlockGenerator extends Thread {
if (newBlock == null) if (newBlock == null)
newBlock = new Block(repository, previousBlock.getBlockData(), generator); newBlock = new Block(repository, previousBlock.getBlockData(), generator);
// Is new block valid yet? (Before adding unconfirmed transactions) // Make sure we're the only thread modifying the blockchain
if (newBlock.isValid() == ValidationResult.OK) { Lock blockchainLock = Controller.getInstance().getBlockchainLock();
// Add unconfirmed transactions if (blockchainLock.tryLock())
addUnconfirmedTransactions(repository, newBlock); try {
// Is new block valid yet? (Before adding unconfirmed transactions)
if (newBlock.isValid() == ValidationResult.OK) {
// Add unconfirmed transactions
addUnconfirmedTransactions(repository, newBlock);
// Sign to create block's signature // Sign to create block's signature
newBlock.sign(); newBlock.sign();
// If newBlock is still valid then we can use it // If newBlock is still valid then we can use it
ValidationResult validationResult = newBlock.isValid(); ValidationResult validationResult = newBlock.isValid();
if (validationResult == ValidationResult.OK) { if (validationResult == ValidationResult.OK) {
// Add to blockchain - something else will notice and broadcast new block to network // Add to blockchain - something else will notice and broadcast new block to network
try { try {
newBlock.process(); newBlock.process();
LOGGER.info("Generated new block: " + newBlock.getBlockData().getHeight()); LOGGER.info("Generated new block: " + newBlock.getBlockData().getHeight());
repository.saveChanges(); repository.saveChanges();
// Notify controller // Notify controller
Controller.getInstance().onGeneratedBlock(newBlock.getBlockData()); Controller.getInstance().onGeneratedBlock(newBlock.getBlockData());
} catch (DataException e) { } catch (DataException e) {
// Unable to process block - report and discard // Unable to process block - report and discard
LOGGER.error("Unable to process newly generated block?", e); LOGGER.error("Unable to process newly generated block?", e);
newBlock = null; newBlock = null;
}
} else {
// No longer valid? Report and discard
LOGGER.error("Valid, generated block now invalid '" + validationResult.name() + "' after adding unconfirmed transactions?");
newBlock = null;
}
} }
} else { } finally {
// No longer valid? Report and discard blockchainLock.unlock();
LOGGER.error("Valid, generated block now invalid '" + validationResult.name() + "' after adding unconfirmed transactions?");
newBlock = null;
} }
}
// Sleep for a while // Sleep for a while
try { try {
repository.discardChanges(); // Free transactional locks, if any repository.discardChanges(); // Free repository locks, if any
Thread.sleep(1000); // No point sleeping less than this as block timestamp millisecond values must be the same Thread.sleep(1000); // No point sleeping less than this as block timestamp millisecond values must be the same
} catch (InterruptedException e) { } catch (InterruptedException e) {
// We've been interrupted - time to exit // We've been interrupted - time to exit

View File

@ -58,14 +58,12 @@ public class blockgenerator {
try { try {
blockGenerator.join(); blockGenerator.join();
} catch (InterruptedException e) { } catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace(); e.printStackTrace();
} }
try { try {
RepositoryManager.closeRepositoryFactory(); RepositoryManager.closeRepositoryFactory();
} catch (DataException e) { } catch (DataException e) {
// TODO Auto-generated catch block
e.printStackTrace(); e.printStackTrace();
} }
} }

View File

@ -2,24 +2,35 @@ package org.qora.controller;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.security.SecureRandom;
import java.security.Security; import java.security.Security;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneOffset; import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.bouncycastle.jce.provider.BouncyCastleProvider; import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.bouncycastle.jsse.provider.BouncyCastleJsseProvider; import org.bouncycastle.jsse.provider.BouncyCastleJsseProvider;
import org.qora.api.ApiService; import org.qora.api.ApiService;
import org.qora.block.Block;
import org.qora.block.BlockChain; import org.qora.block.BlockChain;
import org.qora.block.BlockGenerator; import org.qora.block.BlockGenerator;
import org.qora.data.block.BlockData; import org.qora.data.block.BlockData;
import org.qora.data.network.PeerData;
import org.qora.network.Network; import org.qora.network.Network;
import org.qora.network.Peer; import org.qora.network.Peer;
import org.qora.network.message.BlockMessage;
import org.qora.network.message.GetBlockMessage;
import org.qora.network.message.GetSignaturesMessage;
import org.qora.network.message.HeightMessage; import org.qora.network.message.HeightMessage;
import org.qora.network.message.Message; import org.qora.network.message.Message;
import org.qora.network.message.SignaturesMessage;
import org.qora.repository.DataException; import org.qora.repository.DataException;
import org.qora.repository.Repository; import org.qora.repository.Repository;
import org.qora.repository.RepositoryFactory; import org.qora.repository.RepositoryFactory;
@ -27,6 +38,7 @@ import org.qora.repository.RepositoryManager;
import org.qora.repository.hsqldb.HSQLDBRepositoryFactory; import org.qora.repository.hsqldb.HSQLDBRepositoryFactory;
import org.qora.settings.Settings; import org.qora.settings.Settings;
import org.qora.utils.Base58; import org.qora.utils.Base58;
import org.qora.utils.NTP;
public class Controller extends Thread { public class Controller extends Thread {
@ -47,6 +59,9 @@ public class Controller extends Thread {
private final String buildVersion; private final String buildVersion;
private final long buildTimestamp; private final long buildTimestamp;
/** Lock for only allowing one blockchain-modifying codepath at a time. e.g. synchronization or newly generated block. */
private final Lock blockchainLock;
private Controller() { private Controller() {
Properties properties = new Properties(); Properties properties = new Properties();
try (InputStream in = ClassLoader.getSystemResourceAsStream("build.properties")) { try (InputStream in = ClassLoader.getSystemResourceAsStream("build.properties")) {
@ -66,6 +81,8 @@ public class Controller extends Thread {
throw new RuntimeException("Can't read build.version from build.properties resource"); throw new RuntimeException("Can't read build.version from build.properties resource");
this.buildVersion = VERSION_PREFIX + buildVersion; this.buildVersion = VERSION_PREFIX + buildVersion;
blockchainLock = new ReentrantLock();
} }
public static Controller getInstance() { public static Controller getInstance() {
@ -75,6 +92,38 @@ public class Controller extends Thread {
return instance; return instance;
} }
// Getters / setters
public byte[] getMessageMagic() {
return new byte[] {
0x12, 0x34, 0x56, 0x78
};
}
public long getBuildTimestamp() {
return this.buildTimestamp;
}
public String getVersionString() {
return this.buildVersion;
}
/** Returns current blockchain height, or 0 if there's a repository issue */
public int getChainHeight() {
try (final Repository repository = RepositoryManager.getRepository()) {
return repository.getBlockRepository().getBlockchainHeight();
} catch (DataException e) {
LOGGER.error("Repository issue when fetching blockchain height", e);
return 0;
}
}
public Lock getBlockchainLock() {
return this.blockchainLock;
}
// Entry point
public static void main(String args[]) { public static void main(String args[]) {
LOGGER.info("Starting up..."); LOGGER.info("Starting up...");
@ -101,10 +150,10 @@ public class Controller extends Thread {
System.exit(2); System.exit(2);
} }
// XXX work to be done here! // XXX extract private key needed for block gen
if (args.length == 0) { if (args.length == 0 || !args[0].equals("NO-BLOCK-GEN")) {
LOGGER.info("Starting block generator"); LOGGER.info("Starting block generator");
byte[] privateKey = Base58.decode("A9MNsATgQgruBUjxy2rjWY36Yf19uRioKZbiLFT2P7c6"); byte[] privateKey = Base58.decode(args.length > 0 ? args[0] : "A9MNsATgQgruBUjxy2rjWY36Yf19uRioKZbiLFT2P7c6");
blockGenerator = new BlockGenerator(privateKey); blockGenerator = new BlockGenerator(privateKey);
blockGenerator.start(); blockGenerator.start();
} }
@ -130,6 +179,8 @@ public class Controller extends Thread {
Runtime.getRuntime().addShutdownHook(new Thread() { Runtime.getRuntime().addShutdownHook(new Thread() {
@Override @Override
public void run() { public void run() {
Thread.currentThread().setName("Shutdown hook");
Controller.getInstance().shutdown(); Controller.getInstance().shutdown();
} }
}); });
@ -138,16 +189,17 @@ public class Controller extends Thread {
Controller.getInstance().start(); Controller.getInstance().start();
} }
// Main thread
@Override @Override
public void run() { public void run() {
Thread.currentThread().setName("Controller"); Thread.currentThread().setName("Controller");
try { try {
while (true) { while (!isStopping) {
Thread.sleep(1000); Thread.sleep(1000);
// Query random connections for their blockchain status potentiallySynchronize();
// If height > ours then potentially synchronize
// Query random connections for unconfirmed transactions // Query random connections for unconfirmed transactions
} }
@ -157,6 +209,38 @@ public class Controller extends Thread {
} }
} }
private void potentiallySynchronize() {
int ourHeight = getChainHeight();
if (ourHeight == 0)
return;
// If we have enough peers, potentially synchronize
List<Peer> peers = Network.getInstance().getHandshakeCompletedPeers();
if (peers.size() >= Settings.getInstance().getMinPeers()) {
peers.removeIf(peer -> peer.getPeerData().getLastHeight() <= ourHeight);
if (!peers.isEmpty()) {
// Pick random peer to sync with
int index = new SecureRandom().nextInt(peers.size());
Peer peer = peers.get(index);
if (!Synchronizer.getInstance().synchronize(peer)) {
// Failure so don't use this peer again for a while
try (final Repository repository = RepositoryManager.getRepository()) {
PeerData peerData = peer.getPeerData();
peerData.setLastMisbehaved(NTP.getTime());
repository.getNetworkRepository().save(peerData);
repository.saveChanges();
} catch (DataException e) {
LOGGER.warn("Repository issue while updating peer synchronization info", e);
}
}
}
}
}
// Shutdown
public void shutdown() { public void shutdown() {
synchronized (shutdownLock) { synchronized (shutdownLock) {
if (!isStopping) { if (!isStopping) {
@ -164,6 +248,11 @@ public class Controller extends Thread {
LOGGER.info("Shutting down controller"); LOGGER.info("Shutting down controller");
this.interrupt(); this.interrupt();
try {
this.join();
} catch (InterruptedException e) {
// We were interrupted while waiting for thread to join
}
LOGGER.info("Shutting down networking"); LOGGER.info("Shutting down networking");
Network.getInstance().shutdown(); Network.getInstance().shutdown();
@ -177,7 +266,7 @@ public class Controller extends Thread {
try { try {
blockGenerator.join(); blockGenerator.join();
} catch (InterruptedException e) { } catch (InterruptedException e) {
// We were interrupted while waiting for thread to 'join' // We were interrupted while waiting for thread to join
} }
} }
@ -198,39 +287,16 @@ public class Controller extends Thread {
System.exit(0); System.exit(0);
} }
public byte[] getMessageMagic() {
return new byte[] {
0x12, 0x34, 0x56, 0x78
};
}
public long getBuildTimestamp() {
return this.buildTimestamp;
}
public String getVersionString() {
return this.buildVersion;
}
public int getChainHeight() {
try (final Repository repository = RepositoryManager.getRepository()) {
return repository.getBlockRepository().getBlockchainHeight();
} catch (DataException e) {
LOGGER.error("Repository issue when fetching blockchain height", e);
return 0;
}
}
// Callbacks for/from network // Callbacks for/from network
public void doNetworkBroadcast() { public void doNetworkBroadcast() {
Network network = Network.getInstance(); Network network = Network.getInstance();
// Send our known peers // Send our known peers
network.broadcast(network.buildPeersMessage()); network.broadcast(peer -> network.buildPeersMessage(peer));
// Send our current height // Send our current height
network.broadcast(new HeightMessage(this.getChainHeight())); network.broadcast(peer -> new HeightMessage(this.getChainHeight()));
} }
public void onGeneratedBlock(BlockData newBlockData) { public void onGeneratedBlock(BlockData newBlockData) {
@ -238,24 +304,66 @@ public class Controller extends Thread {
// Could even broadcast top two block sigs so that remote peers can see new block references current network-wide last block // Could even broadcast top two block sigs so that remote peers can see new block references current network-wide last block
// Broadcast our new height // Broadcast our new height
Network.getInstance().broadcast(new HeightMessage(newBlockData.getHeight())); Network.getInstance().broadcast(peer -> new HeightMessage(newBlockData.getHeight()));
} }
public void onNetworkMessage(Peer peer, Message message) { public void onNetworkMessage(Peer peer, Message message) {
LOGGER.trace(String.format("Processing %s message from %s", message.getType().name(), peer.getRemoteSocketAddress())); LOGGER.trace(String.format("Processing %s message from %s", message.getType().name(), peer));
switch (message.getType()) { switch (message.getType()) {
case HEIGHT: case HEIGHT:
HeightMessage heightMessage = (HeightMessage) message; HeightMessage heightMessage = (HeightMessage) message;
// If we connected to peer, then update our record of peer's height // Update our record of peer's height
if (peer.isOutbound()) peer.getPeerData().setLastHeight(heightMessage.getHeight());
peer.getPeerData().setLastHeight(heightMessage.getHeight());
// XXX we should instead test incoming block sigs to see if we have them, and if not do sync break;
// Is peer's blockchain longer than ours?
if (heightMessage.getHeight() > getChainHeight()) case GET_SIGNATURES:
Synchronizer.getInstance().synchronize(peer); try (final Repository repository = RepositoryManager.getRepository()) {
GetSignaturesMessage getSignaturesMessage = (GetSignaturesMessage) message;
byte[] parentSignature = getSignaturesMessage.getParentSignature();
List<byte[]> signatures = new ArrayList<>();
do {
BlockData blockData = repository.getBlockRepository().fromReference(parentSignature);
if (blockData == null)
break;
parentSignature = blockData.getSignature();
signatures.add(parentSignature);
} while (signatures.size() < 500);
Message signaturesMessage = new SignaturesMessage(signatures);
signaturesMessage.setId(message.getId());
if (!peer.sendMessage(signaturesMessage))
peer.disconnect();
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while responding to %s from peer %s", message.getType().name(), peer), e);
}
break;
case GET_BLOCK:
try (final Repository repository = RepositoryManager.getRepository()) {
GetBlockMessage getBlockMessage = (GetBlockMessage) message;
byte[] signature = getBlockMessage.getSignature();
BlockData blockData = repository.getBlockRepository().fromSignature(signature);
if (blockData == null)
// No response at all???
break;
Block block = new Block(repository, blockData);
Message blockMessage = new BlockMessage(block);
blockMessage.setId(message.getId());
if (!peer.sendMessage(blockMessage))
peer.disconnect();
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while responding to %s from peer %s", message.getType().name(), peer), e);
}
break; break;
default: default:

View File

@ -1,15 +1,40 @@
package org.qora.controller; package org.qora.controller;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.locks.Lock;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.qora.block.Block;
import org.qora.block.Block.ValidationResult;
import org.qora.block.GenesisBlock;
import org.qora.data.block.BlockData;
import org.qora.network.Peer; import org.qora.network.Peer;
import org.qora.network.message.BlockMessage;
import org.qora.network.message.GetBlockMessage;
import org.qora.network.message.GetSignaturesMessage;
import org.qora.network.message.Message;
import org.qora.network.message.Message.MessageType;
import org.qora.network.message.SignaturesMessage;
import org.qora.repository.DataException;
import org.qora.repository.Repository;
import org.qora.repository.RepositoryManager;
public class Synchronizer { public class Synchronizer {
private static final Logger LOGGER = LogManager.getLogger(Synchronizer.class); private static final Logger LOGGER = LogManager.getLogger(Synchronizer.class);
private static final int INITIAL_BLOCK_STEP = 8;
private static final int MAXIMUM_BLOCK_STEP = 500;
private static final int MAXIMUM_HEIGHT_DELTA = 2000; // XXX move to blockchain config?
private static Synchronizer instance; private static Synchronizer instance;
private Repository repository;
private int ourHeight;
private Synchronizer() { private Synchronizer() {
} }
@ -20,35 +45,211 @@ public class Synchronizer {
return instance; return instance;
} }
public void synchronize(Peer peer) { public boolean synchronize(Peer peer) {
// If we're already synchronizing with another peer then return // Make sure we're the only thread modifying the blockchain
// If we're already synchronizing with another peer then this will also return fast
Lock blockchainLock = Controller.getInstance().getBlockchainLock();
if (blockchainLock.tryLock())
try {
try (final Repository repository = RepositoryManager.getRepository()) {
try {
this.repository = repository;
this.ourHeight = this.repository.getBlockRepository().getBlockchainHeight();
int peerHeight = peer.getPeerData().getLastHeight();
LOGGER.info(String.format("Synchronizing with peer %s", peer.getRemoteSocketAddress())); LOGGER.info(String.format("Synchronizing with peer %s from height %d to height %d", peer, this.ourHeight, peerHeight));
// Peer has different latest block sig to us List<byte[]> signatures = findSignaturesFromCommonBlock(peer);
if (signatures == null) {
LOGGER.info(String.format("Failure to find common block with peer %s", peer));
return false;
}
// find common block? // First signature is common block
BlockData commonBlockData = this.repository.getBlockRepository().fromSignature(signatures.get(0));
signatures.remove(0);
// if common block is too far behind us then we're on massively different forks so give up, maybe human invention required to download desired fork // If common block is too far behind us then we're on massively different forks so give up.
int minHeight = ourHeight - MAXIMUM_HEIGHT_DELTA;
if (commonBlockData.getHeight() < minHeight) {
LOGGER.info(String.format("Blockchain too divergent with peer %s", peer));
return false;
}
// unwind to common block (unless common block is our latest block) if (this.ourHeight > commonBlockData.getHeight()) {
// Unwind to common block (unless common block is our latest block)
LOGGER.debug(String.format("Orphaning blocks back to height %d", commonBlockData.getHeight()));
// apply some newer blocks from peer while (this.ourHeight > commonBlockData.getHeight()) {
BlockData blockData = repository.getBlockRepository().fromHeight(this.ourHeight);
Block block = new Block(repository, blockData);
block.orphan();
// commit --this.ourHeight;
}
// If our block gen creates a block while we do this - what happens? LOGGER.debug(String.format("Orphaned blocks back to height %d - fetching blocks from peer", commonBlockData.getHeight(), peer));
// does repository serialization prevent issues? } else {
LOGGER.debug(String.format("Fetching new blocks from peer %s", peer));
}
// blockgen: block 123: pay X from A to B, commit // Fetch, and apply, blocks from peer
// sync: block 122 orphaned, replacement blocks 122 through 129 applied, commit byte[] signature = commonBlockData.getSignature();
while (this.ourHeight < peerHeight) {
// Do we need more signatures?
if (signatures.isEmpty()) {
signatures = this.getBlockSignatures(peer, signature, MAXIMUM_BLOCK_STEP);
if (signatures == null || signatures.isEmpty()) {
LOGGER.info(String.format("Peer %s failed to respond with more block signatures after height %d", peer, this.ourHeight));
return false;
}
}
// and vice versa? signature = signatures.get(0);
signatures.remove(0);
++this.ourHeight;
// sync: block 122 orphaned, replacement blocks 122 through 129 applied, commit BlockData newBlockData = this.fetchBlockData(peer, signature);
// blockgen: block 123: pay X from A to B, commit
// simply block syncing when generating and vice versa by grabbing a Controller-owned non-blocking mutex? if (newBlockData == null) {
LOGGER.info(String.format("Peer %s failed to respond with block for height %d", peer, this.ourHeight));
return false;
}
Block newBlock = new Block(repository, newBlockData);
if (!newBlock.isSignatureValid()) {
LOGGER.info(String.format("Peer %s sent block with invalid signature for height %d", peer, this.ourHeight));
return false;
}
ValidationResult blockResult = newBlock.isValid();
if (blockResult != ValidationResult.OK) {
LOGGER.info(String.format("Peer %s sent invalid block for height %d: %s", peer, this.ourHeight, blockResult.name()));
return false;
}
newBlock.process();
}
// Commit
repository.saveChanges();
LOGGER.info(String.format("Synchronized with peer %s to height %d", peer, this.ourHeight));
return true;
} finally {
repository.discardChanges();
this.repository = null;
}
}
} catch (DataException e) {
LOGGER.error("Repository issue during synchronization with peer", e);
return false;
} finally {
blockchainLock.unlock();
}
// Wasn't peer's fault we couldn't sync
return true;
}
/**
* Returns list of block signatures start with common block with peer.
*
* @param peer
* @return block signatures
* @throws DataException
*/
private List<byte[]> findSignaturesFromCommonBlock(Peer peer) throws DataException {
// Start by asking for a few recent block hashes as this will cover a majority of reorgs
// Failing that, back off exponentially
int step = INITIAL_BLOCK_STEP;
List<byte[]> blockSignatures = null;
int testHeight = ourHeight - step;
byte[] testSignature = null;
while (testHeight > 1) {
// Fetch our block signature at this height
BlockData testBlockData = this.repository.getBlockRepository().fromHeight(testHeight);
if (testBlockData == null) {
// Not found? But we've locked the blockchain and height is below blockchain's tip!
LOGGER.error("Failed to get block at height lower than blockchain tip during synchronization?");
return null;
}
testSignature = testBlockData.getSignature();
// Ask for block signatures since test block's signature
LOGGER.trace(String.format("Requesting %d signature%s after our height %d", step, (step != 1 ? "s": ""), testHeight));
blockSignatures = this.getBlockSignatures(peer, testSignature, step);
if (blockSignatures == null)
// No response - give up this time
return null;
LOGGER.trace(String.format("Received %s signature%s", blockSignatures.size(), (blockSignatures.size() != 1 ? "s" : "")));
// Empty list means remote peer is unaware of test signature OR has no new blocks after test signature
if (!blockSignatures.isEmpty())
// We have entries so we have found a common block
break;
if (peer.getVersion() >= 2) {
step <<= 1;
} else {
// Old v1 peers are hard-coded to return 500 signatures so we might as well go backward by 500 too
step = 500;
}
step = Math.min(step, MAXIMUM_BLOCK_STEP);
testHeight -= step;
}
if (testHeight <= 1)
// Can't go back any further - return Genesis block
return new ArrayList<byte[]>(Arrays.asList(GenesisBlock.getInstance(this.repository).getBlockData().getSignature()));
// Prepend common block's signature as first block sig
blockSignatures.add(0, testSignature);
// Work through returned signatures to get closer common block
// Do this by trimming all-but-one leading known signatures
for (int i = blockSignatures.size() - 1; i > 0; --i) {
BlockData blockData = this.repository.getBlockRepository().fromSignature(blockSignatures.get(i));
if (blockData != null) {
blockSignatures.subList(0, i).clear();
break;
}
}
return blockSignatures;
}
private List<byte[]> getBlockSignatures(Peer peer, byte[] parentSignature, int countRequested) {
// TODO countRequested is v2+ feature
Message getSignaturesMessage = new GetSignaturesMessage(parentSignature);
Message message = peer.getResponse(getSignaturesMessage);
if (message == null || message.getType() != MessageType.SIGNATURES)
return null;
SignaturesMessage signaturesMessage = (SignaturesMessage) message;
return signaturesMessage.getSignatures();
}
private BlockData fetchBlockData(Peer peer, byte[] signature) {
Message getBlockMessage = new GetBlockMessage(signature);
Message message = peer.getResponse(getBlockMessage);
if (message == null || message.getType() != MessageType.BLOCK)
return null;
BlockMessage blockMessage = (BlockMessage) message;
return blockMessage.getBlockData();
} }
} }

View File

@ -14,6 +14,7 @@ public class PeerData {
private Long lastAttempted; private Long lastAttempted;
private Long lastConnected; private Long lastConnected;
private Integer lastHeight; private Integer lastHeight;
private Long lastMisbehaved;
// Constructors // Constructors
@ -21,15 +22,16 @@ public class PeerData {
protected PeerData() { protected PeerData() {
} }
public PeerData(InetSocketAddress socketAddress, Long lastAttempted, Long lastConnected, Integer lastHeight) { public PeerData(InetSocketAddress socketAddress, Long lastAttempted, Long lastConnected, Integer lastHeight, Long lastMisbehaved) {
this.socketAddress = socketAddress; this.socketAddress = socketAddress;
this.lastAttempted = lastAttempted; this.lastAttempted = lastAttempted;
this.lastConnected = lastConnected; this.lastConnected = lastConnected;
this.lastHeight = lastHeight; this.lastHeight = lastHeight;
this.lastMisbehaved = lastMisbehaved;
} }
public PeerData(InetSocketAddress socketAddress) { public PeerData(InetSocketAddress socketAddress) {
this(socketAddress, null, null, null); this(socketAddress, null, null, null, null);
} }
// Getters / setters // Getters / setters
@ -62,4 +64,12 @@ public class PeerData {
this.lastHeight = lastHeight; this.lastHeight = lastHeight;
} }
public Long getLastMisbehaved() {
return this.lastMisbehaved;
}
public void setLastMisbehaved(Long lastMisbehaved) {
this.lastMisbehaved = lastMisbehaved;
}
} }

View File

@ -5,7 +5,6 @@ import java.util.Arrays;
import org.qora.controller.Controller; import org.qora.controller.Controller;
import org.qora.network.message.Message; import org.qora.network.message.Message;
import org.qora.network.message.Message.MessageType; import org.qora.network.message.Message.MessageType;
import org.qora.utils.NTP;
import org.qora.network.message.PeerIdMessage; import org.qora.network.message.PeerIdMessage;
import org.qora.network.message.ProofMessage; import org.qora.network.message.ProofMessage;
import org.qora.network.message.VersionMessage; import org.qora.network.message.VersionMessage;
@ -77,7 +76,7 @@ public enum Handshake {
if (peer.isOutbound()) if (peer.isOutbound())
return COMPLETED; return COMPLETED;
// Check salt hasn't been seen before - this stops multiple peers reusing salt nonce in a Sybil-like attack // Check salt hasn't been seen before - this stops multiple peers reusing same nonce in a Sybil-like attack
if (Proof.seenSalt(proofMessage.getSalt())) if (Proof.seenSalt(proofMessage.getSalt()))
return null; return null;
@ -103,9 +102,6 @@ public enum Handshake {
@Override @Override
public void action(Peer peer) { public void action(Peer peer) {
// Note: this is only called when we've made outbound connection // Note: this is only called when we've made outbound connection
// Make a note that we've successfully completed handshake (and when)
peer.getPeerData().setLastConnected(NTP.getTime());
} }
}; };

View File

@ -9,6 +9,7 @@ import java.net.SocketTimeoutException;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.security.SecureRandom; import java.security.SecureRandom;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -23,6 +24,7 @@ import org.qora.data.network.PeerData;
import org.qora.network.message.HeightMessage; import org.qora.network.message.HeightMessage;
import org.qora.network.message.Message; import org.qora.network.message.Message;
import org.qora.network.message.PeersMessage; import org.qora.network.message.PeersMessage;
import org.qora.network.message.PeersV2Message;
import org.qora.network.message.PingMessage; import org.qora.network.message.PingMessage;
import org.qora.repository.DataException; import org.qora.repository.DataException;
import org.qora.repository.Repository; import org.qora.repository.Repository;
@ -34,11 +36,16 @@ import org.qora.utils.NTP;
public class Network extends Thread { public class Network extends Thread {
private static final Logger LOGGER = LogManager.getLogger(Network.class); private static final Logger LOGGER = LogManager.getLogger(Network.class);
private static final int LISTEN_BACKLOG = 10;
private static final int CONNECT_FAILURE_BACKOFF = 60 * 1000; // ms
private static final int BROADCAST_INTERVAL = 60 * 1000; // ms
private static Network instance; private static Network instance;
private static final int LISTEN_BACKLOG = 10;
/** How long before retrying after a connection failure, in milliseconds. */
private static final int CONNECT_FAILURE_BACKOFF = 60 * 1000; // ms
/** How long between informational broadcasts to all connected peers, in milliseconds. */
private static final int BROADCAST_INTERVAL = 60 * 1000; // ms
/** Maximum time since last successful connection for peer info to be propagated, in milliseconds. */
private static final long RECENT_CONNECTION_THRESHOLD = 24 * 60 * 60 * 1000; // ms
public static final int PEER_ID_LENGTH = 128; public static final int PEER_ID_LENGTH = 128;
private final byte[] ourPeerId; private final byte[] ourPeerId;
@ -113,7 +120,7 @@ public class Network extends Thread {
} }
public void noteToSelf(Peer peer) { public void noteToSelf(Peer peer) {
LOGGER.info(String.format("No longer considering peer address %s as it connects to self", peer.getRemoteSocketAddress())); LOGGER.info(String.format("No longer considering peer address %s as it connects to self", peer));
synchronized (this.selfPeers) { synchronized (this.selfPeers) {
this.selfPeers.add(peer.getPeerData()); this.selfPeers.add(peer.getPeerData());
@ -129,7 +136,7 @@ public class Network extends Thread {
// Maintain long-term connections to various peers' API applications // Maintain long-term connections to various peers' API applications
try { try {
while (true) { while (true) {
acceptConnection(); acceptConnections();
createConnection(); createConnection();
@ -160,38 +167,40 @@ public class Network extends Thread {
} }
@SuppressWarnings("resource") @SuppressWarnings("resource")
private void acceptConnection() throws InterruptedException { private void acceptConnections() throws InterruptedException {
Socket socket; Socket socket;
try { do {
socket = this.listenSocket.accept(); try {
} catch (SocketTimeoutException e) { socket = this.listenSocket.accept();
// No connections to accept } catch (SocketTimeoutException e) {
return; // No connections to accept
} catch (IOException e) { return;
// Something went wrong or listen socket was closed due to shutdown } catch (IOException e) {
return; // Something went wrong or listen socket was closed due to shutdown
}
synchronized (this.connectedPeers) {
if (connectedPeers.size() >= maxPeers) {
// We have enough peers
LOGGER.trace(String.format("Connection discarded from peer %s", socket.getRemoteSocketAddress()));
try {
socket.close();
} catch (IOException e) {
// Not important
}
return; return;
} }
LOGGER.debug(String.format("Connection accepted from peer %s", socket.getRemoteSocketAddress())); synchronized (this.connectedPeers) {
Peer newPeer = new Peer(socket); if (connectedPeers.size() >= maxPeers) {
this.connectedPeers.add(newPeer); // We have enough peers
peerExecutor.execute(newPeer); LOGGER.trace(String.format("Connection discarded from peer %s", socket.getRemoteSocketAddress()));
}
try {
socket.close();
} catch (IOException e) {
// Not important
}
return;
}
LOGGER.debug(String.format("Connection accepted from peer %s", socket.getRemoteSocketAddress()));
Peer newPeer = new Peer(socket);
this.connectedPeers.add(newPeer);
peerExecutor.execute(newPeer);
}
} while (true);
} }
private void createConnection() throws InterruptedException, DataException { private void createConnection() throws InterruptedException, DataException {
@ -220,7 +229,7 @@ public class Network extends Thread {
// Don't consider already connected peers // Don't consider already connected peers
Predicate<PeerData> isConnectedPeer = peerData -> this.connectedPeers.stream() Predicate<PeerData> isConnectedPeer = peerData -> this.connectedPeers.stream()
.anyMatch(peer -> peer.getPeerData() != null && peer.getPeerData().getSocketAddress().equals(peerData.getSocketAddress())); .anyMatch(peer -> peer.getPeerData().getSocketAddress().equals(peerData.getSocketAddress()));
synchronized (this.connectedPeers) { synchronized (this.connectedPeers) {
peers.removeIf(isConnectedPeer); peers.removeIf(isConnectedPeer);
@ -269,7 +278,7 @@ public class Network extends Thread {
/** Called when a new message arrives for a peer. message can be null if called after connection */ /** Called when a new message arrives for a peer. message can be null if called after connection */
public void onMessage(Peer peer, Message message) { public void onMessage(Peer peer, Message message) {
if (message != null) if (message != null)
LOGGER.trace(String.format("Received %s message from %s", message.getType().name(), peer.getRemoteSocketAddress())); LOGGER.trace(String.format("Received %s message from %s", message.getType().name(), peer));
Handshake handshakeStatus = peer.getHandshakeStatus(); Handshake handshakeStatus = peer.getHandshakeStatus();
if (handshakeStatus != Handshake.COMPLETED) { if (handshakeStatus != Handshake.COMPLETED) {
@ -277,8 +286,7 @@ public class Network extends Thread {
// Check message type is as expected // Check message type is as expected
if (handshakeStatus.expectedMessageType != null && message.getType() != handshakeStatus.expectedMessageType) { if (handshakeStatus.expectedMessageType != null && message.getType() != handshakeStatus.expectedMessageType) {
LOGGER.debug(String.format("Unexpected %s message from %s, expected %s", message.getType().name(), peer.getRemoteSocketAddress(), LOGGER.debug(String.format("Unexpected %s message from %s, expected %s", message.getType().name(), peer, handshakeStatus.expectedMessageType));
handshakeStatus.expectedMessageType));
peer.disconnect(); peer.disconnect();
return; return;
} }
@ -287,7 +295,7 @@ public class Network extends Thread {
if (newHandshakeStatus == null) { if (newHandshakeStatus == null) {
// Handshake failure // Handshake failure
LOGGER.debug(String.format("Handshake failure with peer %s message %s", peer.getRemoteSocketAddress(), message.getType().name())); LOGGER.debug(String.format("Handshake failure with peer %s message %s", peer, message.getType().name()));
peer.disconnect(); peer.disconnect();
return; return;
} }
@ -296,7 +304,7 @@ public class Network extends Thread {
// If we made outbound connection then we need to act first // If we made outbound connection then we need to act first
newHandshakeStatus.action(peer); newHandshakeStatus.action(peer);
else else
// We have inbound connection so we need to respond inline with what we just received // We have inbound connection so we need to respond in kind with what we just received
handshakeStatus.action(peer); handshakeStatus.action(peer);
peer.setHandshakeStatus(newHandshakeStatus); peer.setHandshakeStatus(newHandshakeStatus);
@ -313,7 +321,7 @@ public class Network extends Thread {
case VERSION: case VERSION:
case PEER_ID: case PEER_ID:
case PROOF: case PROOF:
LOGGER.debug(String.format("Unexpected handshaking message %s from peer %s", message.getType().name(), peer.getRemoteSocketAddress())); LOGGER.debug(String.format("Unexpected handshaking message %s from peer %s", message.getType().name(), peer));
peer.disconnect(); peer.disconnect();
return; return;
@ -334,16 +342,28 @@ public class Network extends Thread {
List<InetSocketAddress> peerAddresses = new ArrayList<>(); List<InetSocketAddress> peerAddresses = new ArrayList<>();
// v1 PEERS message doesn't support port numbers so we have to add default port
for (InetAddress peerAddress : peersMessage.getPeerAddresses()) for (InetAddress peerAddress : peersMessage.getPeerAddresses())
peerAddresses.add(new InetSocketAddress(peerAddress, Settings.DEFAULT_LISTEN_PORT)); peerAddresses.add(new InetSocketAddress(peerAddress, Settings.DEFAULT_LISTEN_PORT));
try { // Also add peer's details
mergePeers(peerAddresses); peerAddresses.add(new InetSocketAddress(peer.getRemoteSocketAddress().getHostString(), Settings.DEFAULT_LISTEN_PORT));
} catch (DataException e) {
// Not good mergePeers(peerAddresses);
peer.disconnect(); break;
return;
} case PEERS_V2:
PeersV2Message peersV2Message = (PeersV2Message) message;
List<InetSocketAddress> peerV2Addresses = peersV2Message.getPeerAddresses();
// First entry contains remote peer's listen port but empty address.
// Overwrite address with one obtained from socket.
int peerPort = peerV2Addresses.get(0).getPort();
peerV2Addresses.remove(0);
peerV2Addresses.add(0, InetSocketAddress.createUnresolved(peer.getRemoteSocketAddress().getHostString(), peerPort));
mergePeers(peerV2Addresses);
break; break;
default: default:
@ -354,6 +374,9 @@ public class Network extends Thread {
} }
private void onHandshakeCompleted(Peer peer) { private void onHandshakeCompleted(Peer peer) {
// Make a note that we've successfully completed handshake (and when)
peer.getPeerData().setLastConnected(NTP.getTime());
peer.startPings(); peer.startPings();
Message heightMessage = new HeightMessage(Controller.getInstance().getChainHeight()); Message heightMessage = new HeightMessage(Controller.getInstance().getChainHeight());
@ -363,36 +386,61 @@ public class Network extends Thread {
return; return;
} }
Message peersMessage = this.buildPeersMessage(); Message peersMessage = this.buildPeersMessage(peer);
if (!peer.sendMessage(peersMessage)) if (!peer.sendMessage(peersMessage))
peer.disconnect(); peer.disconnect();
} }
public Message buildPeersMessage() { /** Returns PEERS message made from peers we've connected to recently, and this node's details */
List<Peer> peers = new ArrayList<>(); public Message buildPeersMessage(Peer peer) {
try (final Repository repository = RepositoryManager.getRepository()) {
List<PeerData> knownPeers = repository.getNetworkRepository().getAllPeers();
synchronized (this.connectedPeers) { // Filter out peers that we've not connected to ever or within X milliseconds
// Only outbound peer connections that have completed handshake long connectionThreshold = NTP.getTime() - RECENT_CONNECTION_THRESHOLD;
peers = this.connectedPeers.stream().filter(peer -> peer.isOutbound() && peer.getHandshakeStatus() == Handshake.COMPLETED) knownPeers.removeIf(peerData -> peerData.getLastConnected() == null || peerData.getLastConnected() < connectionThreshold);
.collect(Collectors.toList());
// Map to socket addresses
List<InetSocketAddress> peerSocketAddresses = knownPeers.stream().map(peerData -> peerData.getSocketAddress()).collect(Collectors.toList());
if (peer.getVersion() >= 2)
// New format PEERS_V2 message that supports hostnames, IPv6 and ports
return new PeersV2Message(peerSocketAddresses);
else
// Legacy PEERS message that only sends IPv4 addresses
return new PeersMessage(peerSocketAddresses);
} catch (DataException e) {
LOGGER.error("Repository issue while building PEERS message", e);
return new PeersMessage(Collections.emptyList());
} }
return new PeersMessage(peers);
} }
// Network-wide calls // Network-wide calls
private List<Peer> getCompletedPeers() { /** Returns list of connected peers that have completed handshaking. */
List<Peer> completedPeers = new ArrayList<>(); public List<Peer> getHandshakeCompletedPeers() {
List<Peer> peers = new ArrayList<>();
synchronized (this.connectedPeers) { synchronized (this.connectedPeers) {
completedPeers = this.connectedPeers.stream().filter(peer -> peer.getHandshakeStatus() == Handshake.COMPLETED).collect(Collectors.toList()); peers = this.connectedPeers.stream().filter(peer -> peer.getHandshakeStatus() == Handshake.COMPLETED).collect(Collectors.toList());
} }
return completedPeers; return peers;
} }
private void mergePeers(List<InetSocketAddress> peerAddresses) throws DataException { /** Returns list of peers we connected to that have completed handshaking. */
public List<Peer> getOutboundHandshakeCompletedPeers() {
List<Peer> peers = new ArrayList<>();
synchronized (this.connectedPeers) {
peers = this.connectedPeers.stream().filter(peer -> peer.isOutbound() && peer.getHandshakeStatus() == Handshake.COMPLETED)
.collect(Collectors.toList());
}
return peers;
}
private void mergePeers(List<InetSocketAddress> peerAddresses) {
try (final Repository repository = RepositoryManager.getRepository()) { try (final Repository repository = RepositoryManager.getRepository()) {
List<PeerData> knownPeers = repository.getNetworkRepository().getAllPeers(); List<PeerData> knownPeers = repository.getNetworkRepository().getAllPeers();
@ -412,28 +460,30 @@ public class Network extends Thread {
} }
repository.saveChanges(); repository.saveChanges();
} catch (DataException e) {
LOGGER.error("Repository issue while merging peers list from remote node", e);
} }
} }
public void broadcast(Message message) { public void broadcast(Function<Peer, Message> peerMessage) {
class Broadcaster implements Runnable { class Broadcaster implements Runnable {
private List<Peer> targetPeers; private List<Peer> targetPeers;
private Message message; private Function<Peer, Message> peerMessage;
public Broadcaster(List<Peer> targetPeers, Message message) { public Broadcaster(List<Peer> targetPeers, Function<Peer, Message> peerMessage) {
this.targetPeers = targetPeers; this.targetPeers = targetPeers;
this.message = message; this.peerMessage = peerMessage;
} }
@Override @Override
public void run() { public void run() {
for (Peer peer : targetPeers) for (Peer peer : targetPeers)
if (!peer.sendMessage(message)) if (!peer.sendMessage(peerMessage.apply(peer)))
peer.disconnect(); peer.disconnect();
} }
} }
peerExecutor.execute(new Broadcaster(this.getCompletedPeers(), message)); peerExecutor.execute(new Broadcaster(this.getHandshakeCompletedPeers(), peerMessage));
} }
public void shutdown() { public void shutdown() {

View File

@ -62,6 +62,7 @@ public class Peer implements Runnable {
this.isOutbound = false; this.isOutbound = false;
this.socket = socket; this.socket = socket;
this.remoteSocketAddress = (InetSocketAddress) this.socket.getRemoteSocketAddress(); this.remoteSocketAddress = (InetSocketAddress) this.socket.getRemoteSocketAddress();
this.peerData = new PeerData(this.remoteSocketAddress);
} }
// Getters / setters // Getters / setters
@ -121,6 +122,15 @@ public class Peer implements Runnable {
this.lastPing = lastPing; this.lastPing = lastPing;
} }
// Easier, and nicer output, than peer.getRemoteSocketAddress()
@Override
public String toString() {
InetSocketAddress socketAddress = this.getRemoteSocketAddress();
return socketAddress.getHostString() + ":" + socketAddress.getPort();
}
// Processing // Processing
private void setup() throws IOException { private void setup() throws IOException {
@ -131,22 +141,22 @@ public class Peer implements Runnable {
} }
public boolean connect() { public boolean connect() {
LOGGER.trace(String.format("Connecting to peer %s", this.remoteSocketAddress)); LOGGER.trace(String.format("Connecting to peer %s", this));
this.socket = new Socket(); this.socket = new Socket();
try { try {
InetSocketAddress resolvedSocketAddress = new InetSocketAddress(this.remoteSocketAddress.getHostString(), this.remoteSocketAddress.getPort()); InetSocketAddress resolvedSocketAddress = new InetSocketAddress(this.remoteSocketAddress.getHostString(), this.remoteSocketAddress.getPort());
this.socket.connect(resolvedSocketAddress, CONNECT_TIMEOUT); this.socket.connect(resolvedSocketAddress, CONNECT_TIMEOUT);
LOGGER.debug(String.format("Connected to peer %s", this.remoteSocketAddress)); LOGGER.debug(String.format("Connected to peer %s", this));
} catch (SocketTimeoutException e) { } catch (SocketTimeoutException e) {
LOGGER.trace(String.format("Connection timed out to peer %s", this.remoteSocketAddress)); LOGGER.trace(String.format("Connection timed out to peer %s", this));
return false; return false;
} catch (UnknownHostException e) { } catch (UnknownHostException e) {
LOGGER.trace(String.format("Connection failed to unresolved peer %s", this.remoteSocketAddress)); LOGGER.trace(String.format("Connection failed to unresolved peer %s", this));
return false; return false;
} catch (IOException e) { } catch (IOException e) {
LOGGER.trace(String.format("Connection failed to peer %s", this.remoteSocketAddress)); LOGGER.trace(String.format("Connection failed to peer %s", this));
return false; return false;
} }
@ -157,7 +167,7 @@ public class Peer implements Runnable {
@Override @Override
public void run() { public void run() {
Thread.currentThread().setName("Peer " + this.socket.getRemoteSocketAddress()); Thread.currentThread().setName("Peer " + this);
try (DataInputStream in = new DataInputStream(socket.getInputStream())) { try (DataInputStream in = new DataInputStream(socket.getInputStream())) {
setup(); setup();
@ -199,7 +209,7 @@ public class Peer implements Runnable {
try { try {
// Send message // Send message
LOGGER.trace(String.format("Sending %s message to peer %s", message.getType().name(), this.getRemoteSocketAddress())); LOGGER.trace(String.format("Sending %s message to peer %s", message.getType().name(), this));
synchronized (this.out) { synchronized (this.out) {
this.out.write(message.toBytes()); this.out.write(message.toBytes());
@ -288,7 +298,7 @@ public class Peer implements Runnable {
// Close socket // Close socket
if (!this.socket.isClosed()) { if (!this.socket.isClosed()) {
LOGGER.debug(String.format("Disconnected peer %s", this.getRemoteSocketAddress())); LOGGER.debug(String.format("Disconnected peer %s", this));
try { try {
this.socket.close(); this.socket.close();

View File

@ -0,0 +1,91 @@
package org.qora.network.message;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.List;
import org.qora.block.Block;
import org.qora.data.at.ATStateData;
import org.qora.data.block.BlockData;
import org.qora.data.transaction.TransactionData;
import org.qora.transform.TransformationException;
import org.qora.transform.block.BlockTransformer;
import org.qora.utils.Triple;
import com.google.common.primitives.Ints;
public class BlockMessage extends Message {
private Block block = null;
private BlockData blockData = null;
private List<TransactionData> transactions = null;
private List<ATStateData> atStates = null;
private int height;
public BlockMessage(Block block) {
super(MessageType.BLOCK);
this.block = block;
this.height = block.getBlockData().getHeight();
}
private BlockMessage(int id, BlockData blockData, List<TransactionData> transactions, List<ATStateData> atStates) {
super(id, MessageType.BLOCK);
this.blockData = blockData;
this.transactions = transactions;
this.atStates = atStates;
this.height = blockData.getHeight();
}
public BlockData getBlockData() {
return this.blockData;
}
public List<TransactionData> getTransactions() {
return this.transactions;
}
public List<ATStateData> getAtStates() {
return this.atStates;
}
public static Message fromByteBuffer(int id, ByteBuffer byteBuffer) throws UnsupportedEncodingException {
try {
int height = byteBuffer.getInt();
Triple<BlockData, List<TransactionData>, List<ATStateData>> blockInfo = BlockTransformer.fromByteBuffer(byteBuffer);
BlockData blockData = blockInfo.getA();
blockData.setHeight(height);
return new BlockMessage(id, blockData, blockInfo.getB(), blockInfo.getC());
} catch (TransformationException e) {
return null;
}
}
@Override
protected byte[] toData() {
if (this.block == null)
return null;
try {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
bytes.write(Ints.toByteArray(this.height));
bytes.write(BlockTransformer.toBytes(this.block));
return bytes.toByteArray();
} catch (TransformationException | IOException e) {
return null;
}
}
}

View File

@ -0,0 +1,54 @@
package org.qora.network.message;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import org.qora.transform.block.BlockTransformer;
public class GetBlockMessage extends Message {
private static final int BLOCK_SIGNATURE_LENGTH = BlockTransformer.BLOCK_SIGNATURE_LENGTH;
private byte[] signature;
public GetBlockMessage(byte[] signature) {
this(-1, signature);
}
private GetBlockMessage(int id, byte[] signature) {
super(id, MessageType.GET_BLOCK);
this.signature = signature;
}
public byte[] getSignature() {
return this.signature;
}
public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException {
if (bytes.remaining() != BLOCK_SIGNATURE_LENGTH)
return null;
byte[] signature = new byte[BLOCK_SIGNATURE_LENGTH];
bytes.get(signature);
return new GetBlockMessage(id, signature);
}
@Override
protected byte[] toData() {
try {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
bytes.write(this.signature);
return bytes.toByteArray();
} catch (IOException e) {
return null;
}
}
}

View File

@ -0,0 +1,54 @@
package org.qora.network.message;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import org.qora.transform.block.BlockTransformer;
public class GetSignaturesMessage extends Message {
private static final int BLOCK_SIGNATURE_LENGTH = BlockTransformer.BLOCK_SIGNATURE_LENGTH;
private byte[] parentSignature;
public GetSignaturesMessage(byte[] parentSignature) {
this(-1, parentSignature);
}
private GetSignaturesMessage(int id, byte[] parentSignature) {
super(id, MessageType.GET_SIGNATURES);
this.parentSignature = parentSignature;
}
public byte[] getParentSignature() {
return this.parentSignature;
}
public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException {
if (bytes.remaining() != BLOCK_SIGNATURE_LENGTH)
return null;
byte[] parentSignature = new byte[BLOCK_SIGNATURE_LENGTH];
bytes.get(parentSignature);
return new GetSignaturesMessage(id, parentSignature);
}
@Override
protected byte[] toData() {
try {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
bytes.write(this.parentSignature);
return bytes.toByteArray();
} catch (IOException e) {
return null;
}
}
}

View File

@ -39,7 +39,8 @@ public abstract class Message {
PING(9), PING(9),
VERSION(10), VERSION(10),
PEER_ID(11), PEER_ID(11),
PROOF(12); PROOF(12),
PEERS_V2(13);
public final int value; public final int value;
public final Method fromByteBuffer; public final Method fromByteBuffer;

View File

@ -4,13 +4,12 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.qora.network.Peer;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
// NOTE: this legacy message only supports 4-byte IPv4 addresses and doesn't send port number either // NOTE: this legacy message only supports 4-byte IPv4 addresses and doesn't send port number either
@ -20,15 +19,15 @@ public class PeersMessage extends Message {
private List<InetAddress> peerAddresses; private List<InetAddress> peerAddresses;
public PeersMessage(List<Peer> peers) { public PeersMessage(List<InetSocketAddress> peerSocketAddresses) {
super(-1, MessageType.PEERS); super(MessageType.PEERS);
// We have to forcibly resolve into IP addresses as we can't send hostnames // We have to forcibly resolve into IP addresses as we can't send hostnames
this.peerAddresses = new ArrayList<>(); this.peerAddresses = new ArrayList<>();
for (Peer peer : peers) { for (InetSocketAddress peerSocketAddress : peerSocketAddresses) {
try { try {
InetAddress resolvedAddress = InetAddress.getByName(peer.getRemoteSocketAddress().getHostString()); InetAddress resolvedAddress = InetAddress.getByName(peerSocketAddress.getHostString());
// Filter out unsupported address types // Filter out unsupported address types
if (resolvedAddress.getAddress().length != ADDRESS_LENGTH) if (resolvedAddress.getAddress().length != ADDRESS_LENGTH)

View File

@ -0,0 +1,134 @@
package org.qora.network.message;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.qora.settings.Settings;
import com.google.common.primitives.Bytes;
import com.google.common.primitives.Ints;
// NOTE: this message supports hostnames, IPv6, port numbers and IPv4 addresses (in IPv6 form)
public class PeersV2Message extends Message {
private static final byte[] IPV6_V4_PREFIX = new byte[] {
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, (byte) 0xff, (byte) 0xff
};
private List<InetSocketAddress> peerSocketAddresses;
public PeersV2Message(List<InetSocketAddress> peerSocketAddresses) {
this(-1, peerSocketAddresses);
}
private PeersV2Message(int id, List<InetSocketAddress> peerSocketAddresses) {
super(id, MessageType.PEERS_V2);
this.peerSocketAddresses = peerSocketAddresses;
}
public List<InetSocketAddress> getPeerAddresses() {
return this.peerSocketAddresses;
}
public static Message fromByteBuffer(int id, ByteBuffer byteBuffer) throws UnsupportedEncodingException {
// Read entry count
int count = byteBuffer.getInt();
List<InetSocketAddress> peerSocketAddresses = new ArrayList<>();
byte[] ipAddressBytes = new byte[16];
int port;
for (int i = 0; i < count; ++i) {
byte addressSize = byteBuffer.get();
if (addressSize == 0) {
// Address size of 0 indicates IP address (always in IPv6 form)
byteBuffer.get(ipAddressBytes);
port = byteBuffer.getInt();
try {
InetAddress address = InetAddress.getByAddress(ipAddressBytes);
peerSocketAddresses.add(new InetSocketAddress(address, port));
} catch (UnknownHostException e) {
// Ignore and continue
}
} else {
byte[] hostnameBytes = new byte[addressSize & 0xff];
byteBuffer.get(hostnameBytes);
String hostname = new String(hostnameBytes, "UTF-8");
port = byteBuffer.getInt();
peerSocketAddresses.add(InetSocketAddress.createUnresolved(hostname, port));
}
}
return new PeersV2Message(id, peerSocketAddresses);
}
@Override
protected byte[] toData() {
try {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
// First entry represents sending node but contains only port number with empty address.
List<InetSocketAddress> socketAddresses = new ArrayList<>(this.peerSocketAddresses);
socketAddresses.add(0, new InetSocketAddress(Settings.getInstance().getListenPort()));
// Number of entries we are sending.
int count = socketAddresses.size();
for (InetSocketAddress socketAddress : socketAddresses) {
// Hostname preferred, failing that IP address
if (socketAddress.isUnresolved()) {
String hostname = socketAddress.getHostString();
byte[] hostnameBytes = hostname.getBytes("UTF-8");
// We don't support hostnames that are longer than 256 bytes
if (hostnameBytes.length > 256) {
--count;
continue;
}
bytes.write(hostnameBytes.length);
bytes.write(hostnameBytes);
} else {
// IP address
byte[] ipAddressBytes = socketAddress.getAddress().getAddress();
// IPv4? Convert to IPv6 form
if (ipAddressBytes.length == 4)
ipAddressBytes = Bytes.concat(IPV6_V4_PREFIX, ipAddressBytes);
// Write zero length to indicate IP address follows
bytes.write(0);
bytes.write(ipAddressBytes);
}
// Port
bytes.write(Ints.toByteArray(socketAddress.getPort()));
}
// Prepend updated entry count
byte[] countBytes = Ints.toByteArray(count);
return Bytes.concat(countBytes, bytes.toByteArray());
} catch (IOException e) {
return null;
}
}
}

View File

@ -0,0 +1,66 @@
package org.qora.network.message;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.qora.transform.block.BlockTransformer;
import com.google.common.primitives.Ints;
public class SignaturesMessage extends Message {
private static final int BLOCK_SIGNATURE_LENGTH = BlockTransformer.BLOCK_SIGNATURE_LENGTH;
private List<byte[]> signatures;
public SignaturesMessage(List<byte[]> signatures) {
this(-1, signatures);
}
private SignaturesMessage(int id, List<byte[]> signatures) {
super(id, MessageType.SIGNATURES);
this.signatures = signatures;
}
public List<byte[]> getSignatures() {
return this.signatures;
}
public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException {
int count = bytes.getInt();
if (bytes.remaining() != count * BLOCK_SIGNATURE_LENGTH)
return null;
List<byte[]> signatures = new ArrayList<>();
for (int i = 0; i < count; ++i) {
byte[] signature = new byte[BLOCK_SIGNATURE_LENGTH];
bytes.get(signature);
signatures.add(signature);
}
return new SignaturesMessage(id, signatures);
}
@Override
protected byte[] toData() {
try {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
bytes.write(Ints.toByteArray(this.signatures.size()));
for (byte[] signature : this.signatures)
bytes.write(signature);
return bytes.toByteArray();
} catch (IOException e) {
return null;
}
}
}

View File

@ -24,6 +24,10 @@ public interface Repository extends AutoCloseable {
public void discardChanges() throws DataException; public void discardChanges() throws DataException;
void setSavepoint() throws DataException;
void rollbackToSavepoint() throws DataException;
@Override @Override
public void close() throws DataException; public void close() throws DataException;

View File

@ -44,6 +44,14 @@ public interface TransactionRepository {
public List<TransactionData> getAssetTransactions(int assetId, ConfirmationStatus confirmationStatus, Integer limit, Integer offset, Boolean reverse) public List<TransactionData> getAssetTransactions(int assetId, ConfirmationStatus confirmationStatus, Integer limit, Integer offset, Boolean reverse)
throws DataException; throws DataException;
/**
* Returns whether transaction is confirmed or not.
*
* @param signature
* @return true if confirmed, false if not.
*/
public boolean isConfirmed(byte[] signature) throws DataException;
/** /**
* Returns list of unconfirmed transactions in timestamp-else-signature order. * Returns list of unconfirmed transactions in timestamp-else-signature order.
* <p> * <p>
@ -75,7 +83,13 @@ public interface TransactionRepository {
*/ */
public void confirmTransaction(byte[] signature) throws DataException; public void confirmTransaction(byte[] signature) throws DataException;
void unconfirmTransaction(TransactionData transactionData) throws DataException; /**
* Add transaction to unconfirmed transactions pile.
*
* @param transactionData
* @throws DataException
*/
public void unconfirmTransaction(TransactionData transactionData) throws DataException;
public void save(TransactionData transactionData) throws DataException; public void save(TransactionData transactionData) throws DataException;

View File

@ -503,7 +503,7 @@ public class HSQLDBDatabaseUpdates {
case 30: case 30:
// Networking // Networking
stmt.execute("CREATE TABLE Peers (hostname VARCHAR(255), port INTEGER, last_connected TIMESTAMP WITH TIME ZONE, last_attempted TIMESTAMP WITH TIME ZONE, " stmt.execute("CREATE TABLE Peers (hostname VARCHAR(255), port INTEGER, last_connected TIMESTAMP WITH TIME ZONE, last_attempted TIMESTAMP WITH TIME ZONE, "
+ "last_height INTEGER, PRIMARY KEY (hostname, port))"); + "last_height INTEGER, last_misbehaved TIMESTAMP WITH TIME ZONE, PRIMARY KEY (hostname, port))");
break; break;
default: default:

View File

@ -24,7 +24,8 @@ public class HSQLDBNetworkRepository implements NetworkRepository {
public List<PeerData> getAllPeers() throws DataException { public List<PeerData> getAllPeers() throws DataException {
List<PeerData> peers = new ArrayList<>(); List<PeerData> peers = new ArrayList<>();
try (ResultSet resultSet = this.repository.checkedExecute("SELECT hostname, port, last_connected, last_attempted, last_height FROM Peers")) { try (ResultSet resultSet = this.repository
.checkedExecute("SELECT hostname, port, last_connected, last_attempted, last_height, last_misbehaved FROM Peers")) {
if (resultSet == null) if (resultSet == null)
return peers; return peers;
@ -44,7 +45,10 @@ public class HSQLDBNetworkRepository implements NetworkRepository {
if (resultSet.wasNull()) if (resultSet.wasNull())
lastHeight = null; lastHeight = null;
peers.add(new PeerData(socketAddress, lastConnected, lastAttempted, lastHeight)); Timestamp lastMisbehavedTimestamp = resultSet.getTimestamp(6, Calendar.getInstance(HSQLDBRepository.UTC));
Long lastMisbehaved = resultSet.wasNull() ? null : lastMisbehavedTimestamp.getTime();
peers.add(new PeerData(socketAddress, lastConnected, lastAttempted, lastHeight, lastMisbehaved));
} while (resultSet.next()); } while (resultSet.next());
return peers; return peers;
@ -59,9 +63,11 @@ public class HSQLDBNetworkRepository implements NetworkRepository {
Timestamp lastConnected = peerData.getLastConnected() == null ? null : new Timestamp(peerData.getLastConnected()); Timestamp lastConnected = peerData.getLastConnected() == null ? null : new Timestamp(peerData.getLastConnected());
Timestamp lastAttempted = peerData.getLastAttempted() == null ? null : new Timestamp(peerData.getLastAttempted()); Timestamp lastAttempted = peerData.getLastAttempted() == null ? null : new Timestamp(peerData.getLastAttempted());
Timestamp lastMisbehaved = peerData.getLastMisbehaved() == null ? null : new Timestamp(peerData.getLastMisbehaved());
saveHelper.bind("hostname", peerData.getSocketAddress().getHostString()).bind("port", peerData.getSocketAddress().getPort()) saveHelper.bind("hostname", peerData.getSocketAddress().getHostString()).bind("port", peerData.getSocketAddress().getPort())
.bind("last_connected", lastConnected).bind("last_attempted", lastAttempted).bind("last_height", peerData.getLastHeight()); .bind("last_connected", lastConnected).bind("last_attempted", lastAttempted).bind("last_height", peerData.getLastHeight())
.bind("last_misbehaved", lastMisbehaved);
try { try {
saveHelper.execute(this.repository); saveHelper.execute(this.repository);

View File

@ -5,7 +5,10 @@ import java.sql.Connection;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Savepoint;
import java.sql.Statement; import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.TimeZone; import java.util.TimeZone;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
@ -33,11 +36,13 @@ public class HSQLDBRepository implements Repository {
public static final TimeZone UTC = TimeZone.getTimeZone("UTC"); public static final TimeZone UTC = TimeZone.getTimeZone("UTC");
protected Connection connection; protected Connection connection;
protected List<Savepoint> savepoints;
protected boolean debugState = false; protected boolean debugState = false;
// NB: no visibility modifier so only callable from within same package // NB: no visibility modifier so only callable from within same package
HSQLDBRepository(Connection connection) { HSQLDBRepository(Connection connection) {
this.connection = connection; this.connection = connection;
this.savepoints = new ArrayList<>();
} }
@Override @Override
@ -91,6 +96,8 @@ public class HSQLDBRepository implements Repository {
this.connection.commit(); this.connection.commit();
} catch (SQLException e) { } catch (SQLException e) {
throw new DataException("commit error", e); throw new DataException("commit error", e);
} finally {
this.savepoints.clear();
} }
} }
@ -100,6 +107,33 @@ public class HSQLDBRepository implements Repository {
this.connection.rollback(); this.connection.rollback();
} catch (SQLException e) { } catch (SQLException e) {
throw new DataException("rollback error", e); throw new DataException("rollback error", e);
} finally {
this.savepoints.clear();
}
}
@Override
public void setSavepoint() throws DataException {
try {
Savepoint savepoint = this.connection.setSavepoint();
this.savepoints.add(savepoint);
} catch (SQLException e) {
throw new DataException("savepoint error", e);
}
}
@Override
public void rollbackToSavepoint() throws DataException {
if (this.savepoints.isEmpty())
throw new DataException("no savepoint to rollback");
Savepoint savepoint = this.savepoints.get(0);
this.savepoints.remove(0);
try {
this.connection.rollback(savepoint);
} catch (SQLException e) {
throw new DataException("savepoint rollback error", e);
} }
} }

View File

@ -475,6 +475,15 @@ public class HSQLDBTransactionRepository implements TransactionRepository {
} }
} }
@Override
public boolean isConfirmed(byte[] signature) throws DataException {
try {
return this.repository.exists("BlockTransactions", "transaction_signature = ?", signature);
} catch (SQLException e) {
throw new DataException("Unable to check whether transaction is confirmed in repository", e);
}
}
@Override @Override
public List<TransactionData> getUnconfirmedTransactions(Integer limit, Integer offset, Boolean reverse) throws DataException { public List<TransactionData> getUnconfirmedTransactions(Integer limit, Integer offset, Boolean reverse) throws DataException {
String sql = "SELECT signature FROM UnconfirmedTransactions ORDER BY creation"; String sql = "SELECT signature FROM UnconfirmedTransactions ORDER BY creation";

View File

@ -44,10 +44,10 @@ public class BlockTransformer extends Transformer {
private static final int GENERATOR_LENGTH = PUBLIC_KEY_LENGTH; private static final int GENERATOR_LENGTH = PUBLIC_KEY_LENGTH;
private static final int TRANSACTION_COUNT_LENGTH = INT_LENGTH; private static final int TRANSACTION_COUNT_LENGTH = INT_LENGTH;
private static final int BASE_LENGTH = VERSION_LENGTH + BLOCK_REFERENCE_LENGTH + TIMESTAMP_LENGTH + GENERATING_BALANCE_LENGTH + GENERATOR_LENGTH private static final int BASE_LENGTH = VERSION_LENGTH + TIMESTAMP_LENGTH + BLOCK_REFERENCE_LENGTH + GENERATING_BALANCE_LENGTH + GENERATOR_LENGTH
+ TRANSACTIONS_SIGNATURE_LENGTH + GENERATOR_SIGNATURE_LENGTH + TRANSACTION_COUNT_LENGTH; + TRANSACTIONS_SIGNATURE_LENGTH + GENERATOR_SIGNATURE_LENGTH + TRANSACTION_COUNT_LENGTH;
protected static final int BLOCK_SIGNATURE_LENGTH = GENERATOR_SIGNATURE_LENGTH + TRANSACTIONS_SIGNATURE_LENGTH; public static final int BLOCK_SIGNATURE_LENGTH = GENERATOR_SIGNATURE_LENGTH + TRANSACTIONS_SIGNATURE_LENGTH;
protected static final int TRANSACTION_SIZE_LENGTH = INT_LENGTH; // per transaction protected static final int TRANSACTION_SIZE_LENGTH = INT_LENGTH; // per transaction
protected static final int AT_BYTES_LENGTH = INT_LENGTH; protected static final int AT_BYTES_LENGTH = INT_LENGTH;
protected static final int AT_FEES_LENGTH = LONG_LENGTH; protected static final int AT_FEES_LENGTH = LONG_LENGTH;
@ -72,9 +72,20 @@ public class BlockTransformer extends Transformer {
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
return fromByteBuffer(byteBuffer);
}
/**
* Extract block data and transaction data from serialized bytes.
*
* @param bytes
* @return BlockData and a List of transactions.
* @throws TransformationException
*/
public static Triple<BlockData, List<TransactionData>, List<ATStateData>> fromByteBuffer(ByteBuffer byteBuffer) throws TransformationException {
int version = byteBuffer.getInt(); int version = byteBuffer.getInt();
if (version >= 2 && bytes.length < BASE_LENGTH + AT_LENGTH) if (version >= 2 && byteBuffer.remaining() < BASE_LENGTH + AT_BYTES_LENGTH - VERSION_LENGTH)
throw new TransformationException("Byte data too short for V2+ Block"); throw new TransformationException("Byte data too short for V2+ Block");
long timestamp = byteBuffer.getLong(); long timestamp = byteBuffer.getLong();

View File

@ -24,7 +24,6 @@ public final class NTP {
lastUpdate = System.currentTimeMillis(); lastUpdate = System.currentTimeMillis();
// Log new value of offset // Log new value of offset
// TODO: LOGGER.info(Lang.getInstance().translate("Adjusting time with %offset% milliseconds.").replace("%offset%", String.valueOf(offset)));
LOGGER.info("Adjusting time with %offset% milliseconds.".replace("%offset%", String.valueOf(offset))); LOGGER.info("Adjusting time with %offset% milliseconds.".replace("%offset%", String.valueOf(offset)));
} }