Browse Source

Networking and synchronization improvements

Controller now sets (volatile) requestSync flag when a peer sends new height info.
This allows much quicker response to new blocks which might hopefully improve synchronization
compared with the old periodic sync method.

"Unsolicited" network messages are now added to a BlockingQueue,
and a separate unsolicited message processing thread (one per peer)
deals with this messages in turn.
This allows "reply" network messages to propagate up to the
threads that are waiting for them, preventing deadlocks and
peer disconnections due to lost pings.

Controller tries to do as much new transaction processing
outside of the blockchain lock as possible, and only
broadcasts new transaction's signature if we successfully
import transaction to our unconfirmed pile.

Synchronizer.findSignaturesFromCommonBlock now returns null
to indicate some sort of connection issue (no cool-off)
and an empty list to indicate NO COMMON BLOCK.
That method also tries to work back to genesis block
instead of giving up too early if test block height
becomes negative.

Network.createConnection additionally filters out
candidates if their addresses resolve to the same
IP+port as an existing connection. So now it won't
connect to localhost:1234 if it has an existing
connection with 127.0.0.1:1234.

Network.broadcast only considers unique peers,
i.e. prefers outbound connection if a peer has
corresponding inbound connection.

Added Thread.currentThread().setName() where possible.
pull/67/head
catbref 5 years ago
parent
commit
20200b844e
  1. 2
      src/main/java/org/qora/controller/AutoUpdate.java
  2. 79
      src/main/java/org/qora/controller/Controller.java
  3. 23
      src/main/java/org/qora/controller/Synchronizer.java
  4. 25
      src/main/java/org/qora/network/Network.java
  5. 98
      src/main/java/org/qora/network/Peer.java

2
src/main/java/org/qora/controller/AutoUpdate.java

@ -64,6 +64,8 @@ public class AutoUpdate extends Thread {
}
public void run() {
Thread.currentThread().setName("Auto-update");
long buildTimestamp = Controller.getInstance().getBuildTimestamp() * 1000L;
boolean attemptedUpdate = false;

79
src/main/java/org/qora/controller/Controller.java

@ -11,9 +11,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
@ -75,8 +72,9 @@ public class Controller extends Thread {
private static final Object shutdownLock = new Object();
private static final String repositoryUrlTemplate = "jdbc:hsqldb:file:%s/blockchain;create=true";
private static boolean isStopping = false;
private static volatile boolean isStopping = false;
private static BlockGenerator blockGenerator = null;
private static volatile boolean requestSync = false;
private static Controller instance;
private final String buildVersion;
private final long buildTimestamp; // seconds
@ -84,9 +82,6 @@ public class Controller extends Thread {
/** Lock for only allowing one blockchain-modifying codepath at a time. e.g. synchronization or newly generated block. */
private final ReentrantLock blockchainLock = new ReentrantLock();
/** Executor for processing network messages. */
private final ExecutorService networkMessageExecutor = Executors.newCachedThreadPool();
private Controller() {
Properties properties = new Properties();
try (InputStream in = this.getClass().getResourceAsStream("/build.properties")) {
@ -256,17 +251,17 @@ public class Controller extends Thread {
public void run() {
Thread.currentThread().setName("Controller");
try {
while (!isStopping) {
Thread.sleep(14 * 1000);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
return;
}
if (requestSync) {
requestSync = false;
potentiallySynchronize();
// Query random connections for unconfirmed transactions?
}
} catch (InterruptedException e) {
// time to exit
return;
}
}
@ -360,7 +355,6 @@ public class Controller extends Thread {
}
LOGGER.info("Shutting down networking");
networkMessageExecutor.shutdown();
Network.getInstance().shutdown();
LOGGER.info("Shutting down API");
@ -457,29 +451,6 @@ public class Controller extends Thread {
}
public void onNetworkMessage(Peer peer, Message message) {
class NetworkMessageProcessor implements Runnable {
private Peer peer;
private Message message;
public NetworkMessageProcessor(Peer peer, Message message) {
this.peer = peer;
this.message = message;
}
@Override
public void run() {
Controller.getInstance().processNetworkMessage(peer, message);
}
}
try {
networkMessageExecutor.execute(new NetworkMessageProcessor(peer, message));
} catch (RejectedExecutionException e) {
// Can't execute - probably because we're shutting down, so ignore
}
}
private void processNetworkMessage(Peer peer, Message message) {
LOGGER.trace(String.format("Processing %s message from %s", message.getType().name(), peer));
switch (message.getType()) {
@ -499,6 +470,9 @@ public class Controller extends Thread {
LOGGER.error(String.format("Repository issue while updating height of peer %s", peer), e);
}
// Potentially synchronize
requestSync = true;
break;
}
@ -529,6 +503,9 @@ public class Controller extends Thread {
LOGGER.error(String.format("Repository issue while updating info of peer %s", peer), e);
}
// Potentially synchronize
requestSync = true;
break;
}
@ -556,6 +533,7 @@ public class Controller extends Thread {
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while sending signatures after %s to peer %s", Base58.encode(parentSignature), peer), e);
}
break;
}
@ -583,6 +561,7 @@ public class Controller extends Thread {
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while sending V2 signatures after %s to peer %s", Base58.encode(parentSignature), peer), e);
}
break;
}
@ -607,6 +586,7 @@ public class Controller extends Thread {
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while send block %s to peer %s", Base58.encode(signature), peer), e);
}
break;
}
@ -629,6 +609,7 @@ public class Controller extends Thread {
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while send transaction %s to peer %s", Base58.encode(signature), peer), e);
}
break;
}
@ -664,12 +645,15 @@ public class Controller extends Thread {
// Seems ok - add to unconfirmed pile
transaction.importAsUnconfirmed();
LOGGER.debug(String.format("Imported %s transaction %s from peer %s", transactionData.getType().name(), Base58.encode(transactionData.getSignature()), peer));
} finally {
blockchainLock.unlock();
}
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while processing transaction %s from peer %s", Base58.encode(transactionData.getSignature()), peer), e);
}
break;
}
@ -699,10 +683,6 @@ public class Controller extends Thread {
break;
}
// Blockchain lock required to prevent multiple threads trying to save the same transaction simultaneously
ReentrantLock blockchainLock = Controller.getInstance().getBlockchainLock();
if (blockchainLock.tryLock())
try {
// Fetch actual transaction data from peer
Message getTransactionMessage = new GetTransactionMessage(signature);
Message responseMessage = peer.getResponse(getTransactionMessage);
@ -721,7 +701,11 @@ public class Controller extends Thread {
break;
}
// Do we have it already?
// Blockchain lock required to prevent multiple threads trying to save the same transaction simultaneously
ReentrantLock blockchainLock = Controller.getInstance().getBlockchainLock();
if (blockchainLock.tryLock())
try {
// Do we have it already? Rechecking in case it has appeared since previous check above
if (repository.getTransactionRepository().exists(transactionData.getSignature())) {
LOGGER.trace(String.format("Ignoring existing unconfirmed transaction %s from peer %s", Base58.encode(transactionData.getSignature()), peer));
break;
@ -739,12 +723,14 @@ public class Controller extends Thread {
// Seems ok - add to unconfirmed pile
transaction.importAsUnconfirmed();
} finally {
blockchainLock.unlock();
}
LOGGER.debug(String.format("Imported %s transaction %s from peer %s", transactionData.getType().name(), Base58.encode(transactionData.getSignature()), peer));
// We could collate signatures that are new to us and broadcast them to our peers too
newSignatures.add(signature);
} finally {
blockchainLock.unlock();
}
}
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while processing unconfirmed transactions from peer %s", peer), e);
@ -786,6 +772,7 @@ public class Controller extends Thread {
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while sending block summaries after %s to peer %s", Base58.encode(parentSignature), peer), e);
}
break;
}

23
src/main/java/org/qora/controller/Synchronizer.java

@ -2,6 +2,7 @@ package org.qora.controller;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;
@ -10,7 +11,6 @@ import org.apache.logging.log4j.Logger;
import org.qora.block.Block;
import org.qora.block.Block.ValidationResult;
import org.qora.block.BlockChain;
import org.qora.block.GenesisBlock;
import org.qora.data.block.BlockData;
import org.qora.data.network.BlockSummaryData;
import org.qora.data.transaction.TransactionData;
@ -107,6 +107,10 @@ public class Synchronizer {
List<byte[]> signatures = findSignaturesFromCommonBlock(peer, ourHeight);
if (signatures == null) {
LOGGER.info(String.format("Error while trying to find common block with peer %s", peer));
return SynchronizationResult.NO_REPLY;
}
if (signatures.isEmpty()) {
LOGGER.info(String.format("Failure to find common block with peer %s", peer));
return SynchronizationResult.NO_COMMON_BLOCK;
}
@ -290,7 +294,7 @@ public class Synchronizer {
* Returns list of peer's block signatures starting with common block with peer.
*
* @param peer
* @return block signatures
* @return block signatures, or empty list if no common block, or null if there was an issue
* @throws DataException
*/
private List<byte[]> findSignaturesFromCommonBlock(Peer peer, int ourHeight) throws DataException {
@ -299,10 +303,10 @@ public class Synchronizer {
int step = INITIAL_BLOCK_STEP;
List<byte[]> blockSignatures = null;
int testHeight = ourHeight - step;
int testHeight = Math.max(ourHeight - step, 1);
byte[] testSignature = null;
while (testHeight > 1) {
while (testHeight >= 1) {
// Fetch our block signature at this height
BlockData testBlockData = this.repository.getBlockRepository().fromHeight(testHeight);
if (testBlockData == null) {
@ -328,6 +332,11 @@ public class Synchronizer {
// We have entries so we have found a common block
break;
// No blocks after genesis block?
// We don't get called for a peer at genesis height so this means NO blocks in common
if (testHeight == 1)
return Collections.emptyList();
if (peer.getVersion() >= 2) {
step <<= 1;
} else {
@ -336,13 +345,9 @@ public class Synchronizer {
}
step = Math.min(step, MAXIMUM_BLOCK_STEP);
testHeight -= step;
testHeight = Math.max(testHeight - step, 1);
}
if (testHeight <= 1)
// Can't go back any further - return Genesis block
return new ArrayList<byte[]>(Arrays.asList(GenesisBlock.getInstance(this.repository).getBlockData().getSignature()));
// Prepend common block's signature as first block sig
blockSignatures.add(0, testSignature);

25
src/main/java/org/qora/network/Network.java

@ -300,7 +300,7 @@ public class Network extends Thread {
peers.removeIf(isSelfPeer);
}
// Don't consider already connected peers
// Don't consider already connected peers (simple address match)
Predicate<PeerData> isConnectedPeer = peerData -> {
PeerAddress peerAddress = peerData.getAddress();
return this.connectedPeers.stream().anyMatch(peer -> peer.getPeerData().getAddress().equals(peerAddress));
@ -310,6 +310,21 @@ public class Network extends Thread {
peers.removeIf(isConnectedPeer);
}
// Don't consider already connected peers (resolved address match)
Predicate<PeerData> isResolvedAsConnectedPeer = peerData -> {
try {
InetSocketAddress resolvedSocketAddress = peerData.getAddress().toSocketAddress();
return this.connectedPeers.stream().anyMatch(peer -> peer.getResolvedAddress().equals(resolvedSocketAddress));
} catch (UnknownHostException e) {
// Can't resolve - no point even trying to connect
return true;
}
};
synchronized (this.connectedPeers) {
peers.removeIf(isResolvedAsConnectedPeer);
}
// Any left?
if (peers.isEmpty())
return;
@ -642,7 +657,7 @@ public class Network extends Thread {
return peers;
}
/** Returns list of connected peers that have completed handshaking, with unbound duplicates removed. */
/** Returns list of connected peers that have completed handshaking, with inbound duplicates removed. */
public List<Peer> getUniqueHandshakedPeers() {
final List<Peer> peers;
@ -702,6 +717,8 @@ public class Network extends Thread {
@Override
public void run() {
Thread.currentThread().setName("Merging peers");
// Serialize using lock to prevent repository deadlocks
mergePeersLock.lock();
@ -751,6 +768,8 @@ public class Network extends Thread {
@Override
public void run() {
Thread.currentThread().setName("Network Broadcast");
for (Peer peer : targetPeers) {
Message message = peerMessageBuilder.apply(peer);
@ -764,7 +783,7 @@ public class Network extends Thread {
}
try {
peerExecutor.execute(new Broadcaster(this.getHandshakedPeers(), peerMessageBuilder));
peerExecutor.execute(new Broadcaster(this.getUniqueHandshakedPeers(), peerMessageBuilder));
} catch (RejectedExecutionException e) {
// Can't execute - probably because we're shutting down, so ignore
}

98
src/main/java/org/qora/network/Peer.java

@ -15,6 +15,7 @@ import java.util.Map;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@ -44,6 +45,7 @@ public class Peer implements Runnable {
private static final int RESPONSE_TIMEOUT = 5000; // ms
private static final int PING_INTERVAL = 20000; // ms - just under every 30s is usually ideal to keep NAT mappings refreshed
private static final int INACTIVITY_TIMEOUT = 30000; // ms
private static final int UNSOLICITED_MESSAGE_QUEUE_CAPACITY = 10;
private final boolean isOutbound;
private Socket socket = null;
@ -52,11 +54,14 @@ public class Peer implements Runnable {
private Long connectionTimestamp = null;
private OutputStream out;
private Handshake handshakeStatus = Handshake.STARTED;
private Map<Integer, BlockingQueue<Message>> messages;
private Map<Integer, BlockingQueue<Message>> replyQueues;
private BlockingQueue<Message> unsolicitedQueue;
private ExecutorService messageExecutor;
private VersionMessage versionMessage = null;
private Integer version;
private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
private ScheduledExecutorService pingExecutor;
private Long lastPing = null;
private InetSocketAddress resolvedAddress = null;
private boolean isLocal;
private byte[] peerId;
@ -75,7 +80,8 @@ public class Peer implements Runnable {
this.isOutbound = false;
this.socket = socket;
this.isLocal = isAddressLocal(((InetSocketAddress) socket.getRemoteSocketAddress()).getAddress());
this.resolvedAddress = ((InetSocketAddress) socket.getRemoteSocketAddress());
this.isLocal = isAddressLocal(this.resolvedAddress.getAddress());
PeerAddress peerAddress = PeerAddress.fromSocket(socket);
this.peerData = new PeerData(peerAddress);
@ -135,6 +141,10 @@ public class Peer implements Runnable {
this.lastPing = lastPing;
}
public InetSocketAddress getResolvedAddress() {
return this.resolvedAddress;
}
public boolean getIsLocal() {
return this.isLocal;
}
@ -190,11 +200,41 @@ public class Peer implements Runnable {
new SecureRandom().nextBytes(verificationCodeExpected);
}
class MessageProcessor implements Runnable {
private Peer peer;
private BlockingQueue<Message> blockingQueue;
public MessageProcessor(Peer peer, BlockingQueue<Message> blockingQueue) {
this.peer = peer;
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
Thread.currentThread().setName("Peer UMP " + this.peer);
while (true) {
try {
Message message = blockingQueue.poll(1000L, TimeUnit.MILLISECONDS);
if (message != null)
Network.getInstance().onMessage(peer, message);
} catch (InterruptedException e) {
// Shutdown
return;
}
}
}
}
private void setup() throws IOException {
this.socket.setSoTimeout(INACTIVITY_TIMEOUT);
this.out = this.socket.getOutputStream();
this.connectionTimestamp = NTP.getTime();
this.messages = Collections.synchronizedMap(new HashMap<Integer, BlockingQueue<Message>>());
this.replyQueues = Collections.synchronizedMap(new HashMap<Integer, BlockingQueue<Message>>());
this.unsolicitedQueue = new ArrayBlockingQueue<>(UNSOLICITED_MESSAGE_QUEUE_CAPACITY);
this.messageExecutor = Executors.newSingleThreadExecutor();
this.messageExecutor.execute(new MessageProcessor(this, this.unsolicitedQueue));
}
public boolean connect() {
@ -202,11 +242,10 @@ public class Peer implements Runnable {
this.socket = new Socket();
try {
InetSocketAddress resolvedSocketAddress = this.peerData.getAddress().toSocketAddress();
this.isLocal = isAddressLocal(resolvedSocketAddress.getAddress());
this.resolvedAddress = this.peerData.getAddress().toSocketAddress();
this.isLocal = isAddressLocal(this.resolvedAddress.getAddress());
this.socket.connect(resolvedSocketAddress, CONNECT_TIMEOUT);
this.socket.connect(resolvedAddress, CONNECT_TIMEOUT);
LOGGER.debug(String.format("Connected to peer %s", this));
} catch (SocketTimeoutException e) {
LOGGER.trace(String.format("Connection timed out to peer %s", this));
@ -242,13 +281,20 @@ public class Peer implements Runnable {
LOGGER.trace(String.format("Received %s message with ID %d from peer %s", message.getType().name(), message.getId(), this));
// Find potential blocking queue for this id (expect null if id is -1)
BlockingQueue<Message> queue = this.messages.get(message.getId());
BlockingQueue<Message> queue = this.replyQueues.get(message.getId());
if (queue != null) {
// Adding message to queue will unblock thread waiting for response
this.messages.get(message.getId()).add(message);
this.replyQueues.get(message.getId()).add(message);
} else {
// Nothing waiting for this message - pass up to network
Network.getInstance().onMessage(this, message);
// Nothing waiting for this message (unsolicited) - queue up for network
// Queue full?
if (unsolicitedQueue.remainingCapacity() == 0) {
LOGGER.debug(String.format("No room for %s message with ID %s from peer %s", message.getType().name(), message.getId(), this));
continue;
}
unsolicitedQueue.add(message);
}
}
} catch (MessageException e) {
@ -313,12 +359,12 @@ public class Peer implements Runnable {
message.setId(id);
// Put queue into map (keyed by message ID) so we can poll for a response
// If putIfAbsent() doesn't return null, then this id is already taken
} while (this.messages.putIfAbsent(id, blockingQueue) != null);
// If putIfAbsent() doesn't return null, then this ID is already taken
} while (this.replyQueues.putIfAbsent(id, blockingQueue) != null);
// Try to send message
if (!this.sendMessage(message)) {
this.messages.remove(id);
this.replyQueues.remove(id);
return null;
}
@ -329,7 +375,7 @@ public class Peer implements Runnable {
// Our thread was interrupted. Probably in shutdown scenario.
return null;
} finally {
this.messages.remove(id);
this.replyQueues.remove(id);
}
}
@ -343,6 +389,8 @@ public class Peer implements Runnable {
@Override
public void run() {
Thread.currentThread().setName("Pinger " + this.peer);
PingMessage pingMessage = new PingMessage();
long before = System.currentTimeMillis();
@ -358,16 +406,28 @@ public class Peer implements Runnable {
Random random = new Random();
long initialDelay = random.nextInt(PING_INTERVAL);
this.executor.scheduleWithFixedDelay(new Pinger(this), initialDelay, PING_INTERVAL, TimeUnit.MILLISECONDS);
this.pingExecutor = Executors.newSingleThreadScheduledExecutor();
this.pingExecutor.scheduleWithFixedDelay(new Pinger(this), initialDelay, PING_INTERVAL, TimeUnit.MILLISECONDS);
}
public void disconnect(String reason) {
LOGGER.trace(String.format("Disconnecting peer %s: %s", this, reason));
// Shut down pinger
this.executor.shutdownNow();
if (this.pingExecutor != null) {
this.pingExecutor.shutdownNow();
this.pingExecutor = null;
}
// Shut down unsolicited message processor
if (this.messageExecutor != null) {
this.messageExecutor.shutdownNow();
this.messageExecutor = null;
}
// Close socket
if (!this.socket.isClosed()) {
LOGGER.debug(String.format("Disconnecting peer %s: %s", this, reason));
LOGGER.debug(String.format("Closing socket with peer %s: %s", this, reason));
try {
this.socket.close();

Loading…
Cancel
Save