Browse Source

Networking improvements

Fix issue where sometimes the channelSelector.select(1000) would
block processing of queued messages.

Improve support with older v1 peers.
split-DB
catbref 5 years ago
parent
commit
6abc3f4d39
  1. 1
      src/main/java/org/qora/controller/Synchronizer.java
  2. 73
      src/main/java/org/qora/network/Network.java
  3. 133
      src/main/java/org/qora/network/Peer.java

1
src/main/java/org/qora/controller/Synchronizer.java

@ -1,6 +1,5 @@
package org.qora.controller; package org.qora.controller;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;

73
src/main/java/org/qora/network/Network.java

@ -288,11 +288,6 @@ public class Network extends Thread {
protected Task produceTask(boolean canBlock) throws InterruptedException { protected Task produceTask(boolean canBlock) throws InterruptedException {
Task task; Task task;
// Only this method can block to reduce CPU spin
task = maybeProduceChannelTask(canBlock);
if (task != null)
return task;
task = maybeProducePeerMessageTask(); task = maybeProducePeerMessageTask();
if (task != null) if (task != null)
return task; return task;
@ -309,6 +304,11 @@ public class Network extends Thread {
if (task != null) if (task != null)
return task; return task;
// Only this method can block to reduce CPU spin
task = maybeProduceChannelTask(canBlock);
if (task != null)
return task;
// Really nothing to do // Really nothing to do
return null; return null;
} }
@ -688,6 +688,10 @@ public class Network extends Thread {
// Peer callbacks // Peer callbacks
/* package */ void wakeupChannelSelector() {
this.channelSelector.wakeup();
}
/** Called when Peer's thread has setup and is ready to process messages */ /** Called when Peer's thread has setup and is ready to process messages */
public void onPeerReady(Peer peer) { public void onPeerReady(Peer peer) {
this.onMessage(peer, null); this.onMessage(peer, null);
@ -719,41 +723,48 @@ public class Network extends Thread {
Handshake handshakeStatus = peer.getHandshakeStatus(); Handshake handshakeStatus = peer.getHandshakeStatus();
if (handshakeStatus != Handshake.COMPLETED) { if (handshakeStatus != Handshake.COMPLETED) {
// Still handshaking try {
// Still handshaking
LOGGER.trace(() -> String.format("Handshake status %s, message %s from peer %s", handshakeStatus.name(), (message != null ? message.getType().name() : "null"), peer));
// Check message type is as expected // v1 nodes are keen on sending PINGs early. Send to back of queue so we'll process right after handshake
if (handshakeStatus.expectedMessageType != null && message.getType() != handshakeStatus.expectedMessageType) { if (message != null && message.getType() == MessageType.PING) {
// v1 nodes are keen on sending PINGs early. Discard as we'll send a PING right after handshake peer.queueMessage(message);
if (message.getType() == MessageType.PING)
return; return;
}
LOGGER.debug(String.format("Unexpected %s message from %s, expected %s", message.getType().name(), peer, handshakeStatus.expectedMessageType)); // Check message type is as expected
peer.disconnect("unexpected message"); if (handshakeStatus.expectedMessageType != null && message.getType() != handshakeStatus.expectedMessageType) {
return; LOGGER.debug(String.format("Unexpected %s message from %s, expected %s", message.getType().name(), peer, handshakeStatus.expectedMessageType));
} peer.disconnect("unexpected message");
return;
}
Handshake newHandshakeStatus = handshakeStatus.onMessage(peer, message); Handshake newHandshakeStatus = handshakeStatus.onMessage(peer, message);
if (newHandshakeStatus == null) { if (newHandshakeStatus == null) {
// Handshake failure // Handshake failure
LOGGER.debug(String.format("Handshake failure with peer %s message %s", peer, message.getType().name())); LOGGER.debug(String.format("Handshake failure with peer %s message %s", peer, message.getType().name()));
peer.disconnect("handshake failure"); peer.disconnect("handshake failure");
return; return;
} }
if (peer.isOutbound()) if (peer.isOutbound())
// If we made outbound connection then we need to act first // If we made outbound connection then we need to act first
newHandshakeStatus.action(peer); newHandshakeStatus.action(peer);
else else
// We have inbound connection so we need to respond in kind with what we just received // We have inbound connection so we need to respond in kind with what we just received
handshakeStatus.action(peer); handshakeStatus.action(peer);
peer.setHandshakeStatus(newHandshakeStatus); peer.setHandshakeStatus(newHandshakeStatus);
if (newHandshakeStatus == Handshake.COMPLETED) if (newHandshakeStatus == Handshake.COMPLETED)
this.onHandshakeCompleted(peer); this.onHandshakeCompleted(peer);
return; return;
} finally {
peer.resetHandshakeMessagePending();
}
} }
// Should be non-handshaking messages from now on // Should be non-handshaking messages from now on

133
src/main/java/org/qora/network/Peer.java

@ -59,50 +59,51 @@ public class Peer {
private InetSocketAddress resolvedAddress = null; private InetSocketAddress resolvedAddress = null;
/** True if remote address is loopback/link-local/site-local, false otherwise. */ /** True if remote address is loopback/link-local/site-local, false otherwise. */
private boolean isLocal; private boolean isLocal;
private ByteBuffer byteBuffer; private volatile ByteBuffer byteBuffer;
private Map<Integer, BlockingQueue<Message>> replyQueues; private Map<Integer, BlockingQueue<Message>> replyQueues;
private LinkedBlockingQueue<Message> pendingMessages; private LinkedBlockingQueue<Message> pendingMessages;
/** True if we created connection to peer, false if we accepted incoming connection from peer. */ /** True if we created connection to peer, false if we accepted incoming connection from peer. */
private final boolean isOutbound; private final boolean isOutbound;
/** Numeric protocol version, typically 1 or 2. */ /** Numeric protocol version, typically 1 or 2. */
private Integer version; private volatile Integer version;
private byte[] peerId; private volatile byte[] peerId;
private Handshake handshakeStatus = Handshake.STARTED; private volatile Handshake handshakeStatus = Handshake.STARTED;
private volatile boolean handshakeMessagePending = false;
private byte[] pendingPeerId; private volatile byte[] pendingPeerId;
private byte[] verificationCodeSent; private volatile byte[] verificationCodeSent;
private byte[] verificationCodeExpected; private volatile byte[] verificationCodeExpected;
private PeerData peerData = null; private volatile PeerData peerData = null;
private final ReentrantLock peerDataLock = new ReentrantLock(); private final ReentrantLock peerDataLock = new ReentrantLock();
/** Timestamp of when socket was accepted, or connected. */ /** Timestamp of when socket was accepted, or connected. */
private Long connectionTimestamp = null; private volatile Long connectionTimestamp = null;
/** Peer's value of connectionTimestamp. */ /** Peer's value of connectionTimestamp. */
private Long peersConnectionTimestamp = null; private volatile Long peersConnectionTimestamp = null;
/** Version info as reported by peer. */ /** Version info as reported by peer. */
private VersionMessage versionMessage = null; private volatile VersionMessage versionMessage = null;
/** Last PING message round-trip time (ms). */ /** Last PING message round-trip time (ms). */
private Long lastPing = null; private volatile Long lastPing = null;
/** When last PING message was sent, or null if pings not started yet. */ /** When last PING message was sent, or null if pings not started yet. */
private Long lastPingSent; private volatile Long lastPingSent;
/** Latest block height as reported by peer. */ /** Latest block height as reported by peer. */
private Integer lastHeight; private volatile Integer lastHeight;
/** Latest block signature as reported by peer. */ /** Latest block signature as reported by peer. */
private byte[] lastBlockSignature; private volatile byte[] lastBlockSignature;
/** Latest block timestamp as reported by peer. */ /** Latest block timestamp as reported by peer. */
private Long lastBlockTimestamp; private volatile Long lastBlockTimestamp;
/** Latest block generator public key as reported by peer. */ /** Latest block generator public key as reported by peer. */
private byte[] lastBlockGenerator; private volatile byte[] lastBlockGenerator;
// Constructors // Constructors
@ -151,6 +152,10 @@ public class Peer {
this.handshakeStatus = handshakeStatus; this.handshakeStatus = handshakeStatus;
} }
public void resetHandshakeMessagePending() {
this.handshakeMessagePending = false;
}
public VersionMessage getVersionMessage() { public VersionMessage getVersionMessage() {
return this.versionMessage; return this.versionMessage;
} }
@ -263,6 +268,11 @@ public class Peer {
return this.peerDataLock; return this.peerDataLock;
} }
/* package */ void queueMessage(Message message) {
if (!this.pendingMessages.offer(message))
LOGGER.info(String.format("No room to queue message from peer %s - discarding", this));
}
@Override @Override
public String toString() { public String toString() {
// Easier, and nicer output, than peer.getRemoteSocketAddress() // Easier, and nicer output, than peer.getRemoteSocketAddress()
@ -320,63 +330,80 @@ public class Peer {
*/ */
/* package */ void readChannel() throws IOException { /* package */ void readChannel() throws IOException {
synchronized (this.byteBuffer) { synchronized (this.byteBuffer) {
if (!this.socketChannel.isOpen() || this.socketChannel.socket().isClosed()) while(true) {
return; if (!this.socketChannel.isOpen() || this.socketChannel.socket().isClosed())
return;
int bytesRead = this.socketChannel.read(this.byteBuffer); int bytesRead = this.socketChannel.read(this.byteBuffer);
if (bytesRead == -1) { if (bytesRead == -1) {
this.disconnect("EOF"); this.disconnect("EOF");
return; return;
} }
if (bytesRead == 0) LOGGER.trace(() -> String.format("Received %d bytes from peer %s", bytesRead, this));
// No room in buffer, or no more bytes to read
return;
LOGGER.trace(() -> String.format("Received %d bytes from peer %s", bytesRead, this)); while (true) {
final Message message;
while (true) { // Can we build a message from buffer now?
final Message message; try {
message = Message.fromByteBuffer(this.byteBuffer);
} catch (MessageException e) {
LOGGER.debug(String.format("%s, from peer %s", e.getMessage(), this));
this.disconnect(e.getMessage());
return;
}
// Can we build a message from buffer now? if (message == null && bytesRead == 0)
try { // No complete message in buffer and no more bytes to read from socket
message = Message.fromByteBuffer(this.byteBuffer); return;
} catch (MessageException e) {
LOGGER.debug(String.format("%s, from peer %s", e.getMessage(), this));
this.disconnect(e.getMessage());
return;
}
if (message == null) if (message == null)
return; // No complete message in buffer, but maybe more bytes to read from socket
break;
LOGGER.trace(() -> String.format("Received %s message with ID %d from peer %s", message.getType().name(), message.getId(), this)); LOGGER.trace(() -> String.format("Received %s message with ID %d from peer %s", message.getType().name(), message.getId(), this));
BlockingQueue<Message> queue = this.replyQueues.get(message.getId()); BlockingQueue<Message> queue = this.replyQueues.get(message.getId());
if (queue != null) { if (queue != null) {
// Adding message to queue will unblock thread waiting for response // Adding message to queue will unblock thread waiting for response
this.replyQueues.get(message.getId()).add(message); this.replyQueues.get(message.getId()).add(message);
// Consumed elsewhere // Consumed elsewhere
continue; continue;
} }
// No thread waiting for message so we need to pass it up to network layer // No thread waiting for message so we need to pass it up to network layer
// Add message to pending queue // Add message to pending queue
if (!this.pendingMessages.offer(message)) { if (!this.pendingMessages.offer(message)) {
LOGGER.info(String.format("No room to queue message from peer %s - discarding", this)); LOGGER.info(String.format("No room to queue message from peer %s - discarding", this));
return; return;
}
// Prematurely end any blocking channel select so that new messages can be processed
Network.getInstance().wakeupChannelSelector();
} }
} }
} }
} }
/* package */ ExecuteProduceConsume.Task getMessageTask() { /* package */ ExecuteProduceConsume.Task getMessageTask() {
// If we are still handshaking and there is a message yet to be processed
// then don't produce another message task.
// This allows us to process handshake messages sequentially.
if (this.handshakeMessagePending)
return null;
final Message nextMessage = this.pendingMessages.poll(); final Message nextMessage = this.pendingMessages.poll();
if (nextMessage == null) if (nextMessage == null)
return null; return null;
LOGGER.trace(() -> String.format("Produced %s message task from peer %s", nextMessage.getType().name(), this));
if (this.handshakeStatus != Handshake.COMPLETED)
this.handshakeMessagePending = true;
// Return a task to process message in queue // Return a task to process message in queue
return () -> Network.getInstance().onMessage(this, nextMessage); return () -> Network.getInstance().onMessage(this, nextMessage);
} }

Loading…
Cancel
Save