Browse Source

Change processing of network TRANSACTION_SIGNATURES message.

Previously Controller would loop through the transaction signatures,
discard those already known, and then requesting the full transaction
via peer.getResponse(). This would tie up a networking thread for some
time and also potentially cause repository deadlocks, although the latter
could have been fixed another way.

However, the code after peer.getResponse() was identical to the code
processing an incoming TRANSACTION message. Now instead of requesting
and waiting for then processing each transaction, Controller simply
sends the peer a GET_TRANSACTION for each unknown transaction signature.

As the peer responds with corresponding TRANSACTION messages, these can
be processed individually with shorter period of locking.
split-DB
catbref 4 years ago
parent
commit
026c904ce4
  1. 56
      src/main/java/org/qortal/controller/Controller.java

56
src/main/java/org/qortal/controller/Controller.java

@ -1028,6 +1028,9 @@ public class Controller extends Thread {
} catch (DataException e) { } catch (DataException e) {
LOGGER.error(String.format("Repository issue while processing transaction %s from peer %s", Base58.encode(transactionData.getSignature()), peer), e); LOGGER.error(String.format("Repository issue while processing transaction %s from peer %s", Base58.encode(transactionData.getSignature()), peer), e);
} }
// Broadcast transaction signature because it's new to us
Network.getInstance().broadcast(broadcastPeer -> broadcastPeer == peer ? null : new TransactionSignaturesMessage(Arrays.asList(transactionData.getSignature())));
} }
private void onNetworkGetBlockSummariesMessage(Peer peer, Message message) { private void onNetworkGetBlockSummariesMessage(Peer peer, Message message) {
@ -1149,7 +1152,6 @@ public class Controller extends Thread {
private void onNetworkTransactionSignaturesMessage(Peer peer, Message message) { private void onNetworkTransactionSignaturesMessage(Peer peer, Message message) {
TransactionSignaturesMessage transactionSignaturesMessage = (TransactionSignaturesMessage) message; TransactionSignaturesMessage transactionSignaturesMessage = (TransactionSignaturesMessage) message;
List<byte[]> signatures = transactionSignaturesMessage.getSignatures(); List<byte[]> signatures = transactionSignaturesMessage.getSignatures();
List<byte[]> newSignatures = new ArrayList<>();
try (final Repository repository = RepositoryManager.getRepository()) { try (final Repository repository = RepositoryManager.getRepository()) {
for (byte[] signature : signatures) { for (byte[] signature : signatures) {
@ -1165,62 +1167,14 @@ public class Controller extends Thread {
// Fetch actual transaction data from peer // Fetch actual transaction data from peer
Message getTransactionMessage = new GetTransactionMessage(signature); Message getTransactionMessage = new GetTransactionMessage(signature);
Message responseMessage = peer.getResponse(getTransactionMessage); if (!peer.sendMessage(getTransactionMessage)) {
if (!(responseMessage instanceof TransactionMessage)) { peer.disconnect("failed to request transaction");
// Maybe peer no longer has this transaction
LOGGER.trace(() -> String.format("Peer %s didn't send transaction %s", peer, Base58.encode(signature)));
continue;
}
// Check isInterrupted() here and exit fast
if (Thread.currentThread().isInterrupted())
return; return;
TransactionMessage transactionMessage = (TransactionMessage) responseMessage;
TransactionData transactionData = transactionMessage.getTransactionData();
Transaction transaction = Transaction.fromData(repository, transactionData);
// Check signature
if (!transaction.isSignatureValid()) {
LOGGER.trace(() -> String.format("Ignoring %s transaction %s with invalid signature from peer %s", transactionData.getType().name(), Base58.encode(transactionData.getSignature()), peer));
continue;
}
ValidationResult validationResult = transaction.importAsUnconfirmed();
if (validationResult == ValidationResult.TRANSACTION_ALREADY_EXISTS) {
LOGGER.trace(() -> String.format("Ignoring existing transaction %s from peer %s", Base58.encode(transactionData.getSignature()), peer));
continue;
} }
if (validationResult == ValidationResult.NO_BLOCKCHAIN_LOCK) {
LOGGER.trace(() -> String.format("Couldn't lock blockchain to import unconfirmed transaction %s from peer %s", Base58.encode(transactionData.getSignature()), peer));
// Some other thread (e.g. Synchronizer) might have blockchain lock for a while so might as well give up for now
break;
}
if (validationResult != ValidationResult.OK) {
LOGGER.trace(() -> String.format("Ignoring invalid (%s) %s transaction %s from peer %s", validationResult.name(), transactionData.getType().name(), Base58.encode(transactionData.getSignature()), peer));
continue;
}
LOGGER.debug(() -> String.format("Imported %s transaction %s from peer %s", transactionData.getType().name(), Base58.encode(transactionData.getSignature()), peer));
// We could collate signatures that are new to us and broadcast them to our peers too
newSignatures.add(signature);
} }
} catch (DataException e) { } catch (DataException e) {
LOGGER.error(String.format("Repository issue while processing unconfirmed transactions from peer %s", peer), e); LOGGER.error(String.format("Repository issue while processing unconfirmed transactions from peer %s", peer), e);
} catch (InterruptedException e) {
// Shutdown
return;
} }
if (newSignatures.isEmpty())
return;
// Broadcast signatures that are new to us
Network.getInstance().broadcast(broadcastPeer -> broadcastPeer == peer ? null : new TransactionSignaturesMessage(newSignatures));
} }
private void onNetworkGetArbitraryDataMessage(Peer peer, Message message) { private void onNetworkGetArbitraryDataMessage(Peer peer, Message message) {

Loading…
Cancel
Save