diff --git a/src/main/java/org/qora/controller/Synchronizer.java b/src/main/java/org/qora/controller/Synchronizer.java index af2f942d..75f0718b 100644 --- a/src/main/java/org/qora/controller/Synchronizer.java +++ b/src/main/java/org/qora/controller/Synchronizer.java @@ -1,6 +1,5 @@ package org.qora.controller; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.locks.ReentrantLock; diff --git a/src/main/java/org/qora/network/Network.java b/src/main/java/org/qora/network/Network.java index d653f284..35edd20c 100644 --- a/src/main/java/org/qora/network/Network.java +++ b/src/main/java/org/qora/network/Network.java @@ -288,11 +288,6 @@ public class Network extends Thread { protected Task produceTask(boolean canBlock) throws InterruptedException { Task task; - // Only this method can block to reduce CPU spin - task = maybeProduceChannelTask(canBlock); - if (task != null) - return task; - task = maybeProducePeerMessageTask(); if (task != null) return task; @@ -309,6 +304,11 @@ public class Network extends Thread { if (task != null) return task; + // Only this method can block to reduce CPU spin + task = maybeProduceChannelTask(canBlock); + if (task != null) + return task; + // Really nothing to do return null; } @@ -688,6 +688,10 @@ public class Network extends Thread { // Peer callbacks + /* package */ void wakeupChannelSelector() { + this.channelSelector.wakeup(); + } + /** Called when Peer's thread has setup and is ready to process messages */ public void onPeerReady(Peer peer) { this.onMessage(peer, null); @@ -719,41 +723,48 @@ public class Network extends Thread { Handshake handshakeStatus = peer.getHandshakeStatus(); if (handshakeStatus != Handshake.COMPLETED) { - // Still handshaking + try { + // Still handshaking + LOGGER.trace(() -> String.format("Handshake status %s, message %s from peer %s", handshakeStatus.name(), (message != null ? message.getType().name() : "null"), peer)); - // Check message type is as expected - if (handshakeStatus.expectedMessageType != null && message.getType() != handshakeStatus.expectedMessageType) { - // v1 nodes are keen on sending PINGs early. Discard as we'll send a PING right after handshake - if (message.getType() == MessageType.PING) + // v1 nodes are keen on sending PINGs early. Send to back of queue so we'll process right after handshake + if (message != null && message.getType() == MessageType.PING) { + peer.queueMessage(message); return; + } + + // Check message type is as expected + if (handshakeStatus.expectedMessageType != null && message.getType() != handshakeStatus.expectedMessageType) { + LOGGER.debug(String.format("Unexpected %s message from %s, expected %s", message.getType().name(), peer, handshakeStatus.expectedMessageType)); + peer.disconnect("unexpected message"); + return; + } + + Handshake newHandshakeStatus = handshakeStatus.onMessage(peer, message); + + if (newHandshakeStatus == null) { + // Handshake failure + LOGGER.debug(String.format("Handshake failure with peer %s message %s", peer, message.getType().name())); + peer.disconnect("handshake failure"); + return; + } + + if (peer.isOutbound()) + // If we made outbound connection then we need to act first + newHandshakeStatus.action(peer); + else + // We have inbound connection so we need to respond in kind with what we just received + handshakeStatus.action(peer); + + peer.setHandshakeStatus(newHandshakeStatus); + + if (newHandshakeStatus == Handshake.COMPLETED) + this.onHandshakeCompleted(peer); - LOGGER.debug(String.format("Unexpected %s message from %s, expected %s", message.getType().name(), peer, handshakeStatus.expectedMessageType)); - peer.disconnect("unexpected message"); return; + } finally { + peer.resetHandshakeMessagePending(); } - - Handshake newHandshakeStatus = handshakeStatus.onMessage(peer, message); - - if (newHandshakeStatus == null) { - // Handshake failure - LOGGER.debug(String.format("Handshake failure with peer %s message %s", peer, message.getType().name())); - peer.disconnect("handshake failure"); - return; - } - - if (peer.isOutbound()) - // If we made outbound connection then we need to act first - newHandshakeStatus.action(peer); - else - // We have inbound connection so we need to respond in kind with what we just received - handshakeStatus.action(peer); - - peer.setHandshakeStatus(newHandshakeStatus); - - if (newHandshakeStatus == Handshake.COMPLETED) - this.onHandshakeCompleted(peer); - - return; } // Should be non-handshaking messages from now on diff --git a/src/main/java/org/qora/network/Peer.java b/src/main/java/org/qora/network/Peer.java index dec55735..b9678071 100644 --- a/src/main/java/org/qora/network/Peer.java +++ b/src/main/java/org/qora/network/Peer.java @@ -59,50 +59,51 @@ public class Peer { private InetSocketAddress resolvedAddress = null; /** True if remote address is loopback/link-local/site-local, false otherwise. */ private boolean isLocal; - private ByteBuffer byteBuffer; + private volatile ByteBuffer byteBuffer; private Map> replyQueues; private LinkedBlockingQueue pendingMessages; /** True if we created connection to peer, false if we accepted incoming connection from peer. */ private final boolean isOutbound; /** Numeric protocol version, typically 1 or 2. */ - private Integer version; - private byte[] peerId; + private volatile Integer version; + private volatile byte[] peerId; - private Handshake handshakeStatus = Handshake.STARTED; + private volatile Handshake handshakeStatus = Handshake.STARTED; + private volatile boolean handshakeMessagePending = false; - private byte[] pendingPeerId; - private byte[] verificationCodeSent; - private byte[] verificationCodeExpected; + private volatile byte[] pendingPeerId; + private volatile byte[] verificationCodeSent; + private volatile byte[] verificationCodeExpected; - private PeerData peerData = null; + private volatile PeerData peerData = null; private final ReentrantLock peerDataLock = new ReentrantLock(); /** Timestamp of when socket was accepted, or connected. */ - private Long connectionTimestamp = null; + private volatile Long connectionTimestamp = null; /** Peer's value of connectionTimestamp. */ - private Long peersConnectionTimestamp = null; + private volatile Long peersConnectionTimestamp = null; /** Version info as reported by peer. */ - private VersionMessage versionMessage = null; + private volatile VersionMessage versionMessage = null; /** Last PING message round-trip time (ms). */ - private Long lastPing = null; + private volatile Long lastPing = null; /** When last PING message was sent, or null if pings not started yet. */ - private Long lastPingSent; + private volatile Long lastPingSent; /** Latest block height as reported by peer. */ - private Integer lastHeight; + private volatile Integer lastHeight; /** Latest block signature as reported by peer. */ - private byte[] lastBlockSignature; + private volatile byte[] lastBlockSignature; /** Latest block timestamp as reported by peer. */ - private Long lastBlockTimestamp; + private volatile Long lastBlockTimestamp; /** Latest block generator public key as reported by peer. */ - private byte[] lastBlockGenerator; + private volatile byte[] lastBlockGenerator; // Constructors @@ -151,6 +152,10 @@ public class Peer { this.handshakeStatus = handshakeStatus; } + public void resetHandshakeMessagePending() { + this.handshakeMessagePending = false; + } + public VersionMessage getVersionMessage() { return this.versionMessage; } @@ -263,6 +268,11 @@ public class Peer { return this.peerDataLock; } + /* package */ void queueMessage(Message message) { + if (!this.pendingMessages.offer(message)) + LOGGER.info(String.format("No room to queue message from peer %s - discarding", this)); + } + @Override public String toString() { // Easier, and nicer output, than peer.getRemoteSocketAddress() @@ -320,63 +330,80 @@ public class Peer { */ /* package */ void readChannel() throws IOException { synchronized (this.byteBuffer) { - if (!this.socketChannel.isOpen() || this.socketChannel.socket().isClosed()) - return; + while(true) { + if (!this.socketChannel.isOpen() || this.socketChannel.socket().isClosed()) + return; - int bytesRead = this.socketChannel.read(this.byteBuffer); - if (bytesRead == -1) { - this.disconnect("EOF"); - return; - } - - if (bytesRead == 0) - // No room in buffer, or no more bytes to read - return; - - LOGGER.trace(() -> String.format("Received %d bytes from peer %s", bytesRead, this)); - - while (true) { - final Message message; - - // Can we build a message from buffer now? - try { - message = Message.fromByteBuffer(this.byteBuffer); - } catch (MessageException e) { - LOGGER.debug(String.format("%s, from peer %s", e.getMessage(), this)); - this.disconnect(e.getMessage()); + int bytesRead = this.socketChannel.read(this.byteBuffer); + if (bytesRead == -1) { + this.disconnect("EOF"); return; } - if (message == null) - return; + LOGGER.trace(() -> String.format("Received %d bytes from peer %s", bytesRead, this)); - LOGGER.trace(() -> String.format("Received %s message with ID %d from peer %s", message.getType().name(), message.getId(), this)); + while (true) { + final Message message; - BlockingQueue queue = this.replyQueues.get(message.getId()); - if (queue != null) { - // Adding message to queue will unblock thread waiting for response - this.replyQueues.get(message.getId()).add(message); - // Consumed elsewhere - continue; - } + // Can we build a message from buffer now? + try { + message = Message.fromByteBuffer(this.byteBuffer); + } catch (MessageException e) { + LOGGER.debug(String.format("%s, from peer %s", e.getMessage(), this)); + this.disconnect(e.getMessage()); + return; + } - // No thread waiting for message so we need to pass it up to network layer + if (message == null && bytesRead == 0) + // No complete message in buffer and no more bytes to read from socket + return; - // Add message to pending queue - if (!this.pendingMessages.offer(message)) { - LOGGER.info(String.format("No room to queue message from peer %s - discarding", this)); - return; + if (message == null) + // No complete message in buffer, but maybe more bytes to read from socket + break; + + LOGGER.trace(() -> String.format("Received %s message with ID %d from peer %s", message.getType().name(), message.getId(), this)); + + BlockingQueue queue = this.replyQueues.get(message.getId()); + if (queue != null) { + // Adding message to queue will unblock thread waiting for response + this.replyQueues.get(message.getId()).add(message); + // Consumed elsewhere + continue; + } + + // No thread waiting for message so we need to pass it up to network layer + + // Add message to pending queue + if (!this.pendingMessages.offer(message)) { + LOGGER.info(String.format("No room to queue message from peer %s - discarding", this)); + return; + } + + // Prematurely end any blocking channel select so that new messages can be processed + Network.getInstance().wakeupChannelSelector(); } } } } /* package */ ExecuteProduceConsume.Task getMessageTask() { + // If we are still handshaking and there is a message yet to be processed + // then don't produce another message task. + // This allows us to process handshake messages sequentially. + if (this.handshakeMessagePending) + return null; + final Message nextMessage = this.pendingMessages.poll(); if (nextMessage == null) return null; + LOGGER.trace(() -> String.format("Produced %s message task from peer %s", nextMessage.getType().name(), this)); + + if (this.handshakeStatus != Handshake.COMPLETED) + this.handshakeMessagePending = true; + // Return a task to process message in queue return () -> Network.getInstance().onMessage(this, nextMessage); }