diff --git a/src/main/java/org/qortal/network/Network.java b/src/main/java/org/qortal/network/Network.java index 4b72e60e..3ee7af75 100644 --- a/src/main/java/org/qortal/network/Network.java +++ b/src/main/java/org/qortal/network/Network.java @@ -13,6 +13,7 @@ import org.qortal.data.block.BlockData; import org.qortal.data.network.PeerData; import org.qortal.data.transaction.TransactionData; import org.qortal.network.message.*; +import org.qortal.network.task.*; import org.qortal.repository.DataException; import org.qortal.repository.Repository; import org.qortal.repository.RepositoryManager; @@ -216,12 +217,16 @@ public class Network { // Getters / setters - public static synchronized Network getInstance() { - if (instance == null) { - instance = new Network(); - } + private static class SingletonContainer { + private static final Network INSTANCE = new Network(); + } - return instance; + public static Network getInstance() { + return SingletonContainer.INSTANCE; + } + + public int getMaxPeers() { + return this.maxPeers; } public byte[] getMessageMagic() { @@ -496,39 +501,19 @@ public class Network { } private Task maybeProducePeerMessageTask() { - for (Peer peer : getImmutableConnectedPeers()) { - Task peerTask = peer.getMessageTask(); - if (peerTask != null) { - return peerTask; - } - } - - return null; + return getImmutableConnectedPeers().stream() + .map(Peer::getMessageTask) + .filter(Objects::nonNull) + .findFirst() + .orElse(null); } private Task maybeProducePeerPingTask(Long now) { - // Ask connected peers whether they need a ping - for (Peer peer : getImmutableHandshakedPeers()) { - Task peerTask = peer.getPingTask(now); - if (peerTask != null) { - return peerTask; - } - } - - return null; - } - - class PeerConnectTask implements ExecuteProduceConsume.Task { - private final Peer peer; - - PeerConnectTask(Peer peer) { - this.peer = peer; - } - - @Override - public void perform() throws InterruptedException { - connectPeer(peer); - } + return getImmutableHandshakedPeers().stream() + .map(peer -> peer.getPingTask(now)) + .filter(Objects::nonNull) + .findFirst() + .orElse(null); } private Task maybeProduceConnectPeerTask(Long now) throws InterruptedException { @@ -557,65 +542,10 @@ public class Network { } nextBroadcastTimestamp = now + BROADCAST_INTERVAL; - return () -> Controller.getInstance().doNetworkBroadcast(); - } - - class ChannelTask implements ExecuteProduceConsume.Task { - private final SelectionKey selectionKey; - - ChannelTask(SelectionKey selectionKey) { - this.selectionKey = selectionKey; - } - - @Override - public void perform() throws InterruptedException { - try { - LOGGER.trace("Thread {} has pending channel: {}, with ops {}", - Thread.currentThread().getId(), selectionKey.channel(), selectionKey.readyOps()); - - // process pending channel task - if (selectionKey.isReadable()) { - connectionRead((SocketChannel) selectionKey.channel()); - } else if (selectionKey.isAcceptable()) { - acceptConnection((ServerSocketChannel) selectionKey.channel()); - } - - LOGGER.trace("Thread {} processed channel: {}", - Thread.currentThread().getId(), selectionKey.channel()); - } catch (CancelledKeyException e) { - LOGGER.trace("Thread {} encountered cancelled channel: {}", - Thread.currentThread().getId(), selectionKey.channel()); - } - } - - private void connectionRead(SocketChannel socketChannel) { - Peer peer = getPeerFromChannel(socketChannel); - if (peer == null) { - return; - } - - try { - peer.readChannel(); - - LOGGER.trace("Thread {} re-registering OP_READ interestOps on channel: {}", - Thread.currentThread().getId(), socketChannel); - socketChannel.register(channelSelector, SelectionKey.OP_READ); - } catch (IOException e) { - if (e.getMessage() != null && e.getMessage().toLowerCase().contains("connection reset")) { - peer.disconnect("Connection reset"); - return; - } - - LOGGER.trace("[{}] Network thread {} encountered I/O error: {}", peer.getPeerConnectionId(), - Thread.currentThread().getId(), e.getMessage(), e); - peer.disconnect("I/O error"); - } - } + return new BroadcastTask(); } private Task maybeProduceChannelTask(boolean canBlock) throws InterruptedException { - final SelectionKey nextSelectionKey; - // Synchronization here to enforce thread-safety on channelIterator synchronized (channelSelector) { // anything to do? @@ -636,99 +566,45 @@ public class Network { } channelIterator = channelSelector.selectedKeys().iterator(); + LOGGER.trace("Thread {}, after {} select, channelIterator now {}", + Thread.currentThread().getId(), + canBlock ? "blocking": "non-blocking", + channelIterator); } - if (channelIterator.hasNext()) { - nextSelectionKey = channelIterator.next(); - channelIterator.remove(); - - if (nextSelectionKey.isReadable()) { - LOGGER.trace("Thread {} clearing all interestOps on channel: {}", - Thread.currentThread().getId(), nextSelectionKey.channel()); - nextSelectionKey.interestOps(0); - } - } else { - nextSelectionKey = null; + if (!channelIterator.hasNext()) { channelIterator = null; // Nothing to do so reset iterator to cause new select + + LOGGER.trace("Thread {}, channelIterator now null", Thread.currentThread().getId()); + return null; } - LOGGER.trace("Thread {}, nextSelectionKey {}, channelIterator now {}", - Thread.currentThread().getId(), nextSelectionKey, channelIterator); + final SelectionKey nextSelectionKey = channelIterator.next(); + channelIterator.remove(); + + LOGGER.trace("Thread {}, nextSelectionKey {}", Thread.currentThread().getId(), nextSelectionKey); + + if (nextSelectionKey.isReadable()) { + clearInterestOps(nextSelectionKey, SelectionKey.OP_READ); + return new ChannelReadTask(nextSelectionKey); + } + + if (nextSelectionKey.isWritable()) { + clearInterestOps(nextSelectionKey, SelectionKey.OP_WRITE); + return new ChannelWriteTask(nextSelectionKey); + } + + if (nextSelectionKey.isAcceptable()) { + clearInterestOps(nextSelectionKey, SelectionKey.OP_ACCEPT); + return new ChannelAcceptTask(nextSelectionKey); + } } - if (nextSelectionKey == null) { - return null; - } - - return new ChannelTask(nextSelectionKey); + return null; } } - private void acceptConnection(ServerSocketChannel serverSocketChannel) throws InterruptedException { - SocketChannel socketChannel; - - try { - if (getImmutableConnectedPeers().size() >= maxPeers) { - // We have enough peers - if (serverSelectionKey.interestOps() != 0) { - LOGGER.debug("Ignoring pending incoming connections because the server is full"); - serverSelectionKey.interestOps(0); - } - return; - } - - socketChannel = serverSocketChannel.accept(); - } catch (IOException e) { - return; - } - - // No connection actually accepted? - if (socketChannel == null) { - return; - } - PeerAddress address = PeerAddress.fromSocket(socketChannel.socket()); - List fixedNetwork = Settings.getInstance().getFixedNetwork(); - if (fixedNetwork != null && !fixedNetwork.isEmpty() && ipNotInFixedList(address, fixedNetwork)) { - try { - LOGGER.debug("Connection discarded from peer {} as not in the fixed network list", address); - socketChannel.close(); - } catch (IOException e) { - // IGNORE - } - return; - } - - final Long now = NTP.getTime(); - Peer newPeer; - - try { - if (now == null) { - LOGGER.debug("Connection discarded from peer {} due to lack of NTP sync", address); - socketChannel.close(); - return; - } - - LOGGER.debug("Connection accepted from peer {}", address); - - newPeer = new Peer(socketChannel, channelSelector); - this.addConnectedPeer(newPeer); - - } catch (IOException e) { - if (socketChannel.isOpen()) { - try { - LOGGER.debug("Connection failed from peer {} while connecting/closing", address); - socketChannel.close(); - } catch (IOException ce) { - // Couldn't close? - } - } - return; - } - - this.onPeerReady(newPeer); - } - - private boolean ipNotInFixedList(PeerAddress address, List fixedNetwork) { + public boolean ipNotInFixedList(PeerAddress address, List fixedNetwork) { for (String ipAddress : fixedNetwork) { String[] bits = ipAddress.split(":"); if (bits.length >= 1 && bits.length <= 2 && address.getHost().equals(bits[0])) { @@ -764,8 +640,9 @@ public class Network { peers.removeIf(isConnectedPeer); // Don't consider already connected peers (resolved address match) - // XXX This might be too slow if we end up waiting a long time for hostnames to resolve via DNS - peers.removeIf(isResolvedAsConnectedPeer); + // Disabled because this might be too slow if we end up waiting a long time for hostnames to resolve via DNS + // Which is ok because duplicate connections to the same peer are handled during handshaking + // peers.removeIf(isResolvedAsConnectedPeer); this.checkLongestConnection(now); @@ -795,12 +672,12 @@ public class Network { } } - private boolean connectPeer(Peer newPeer) throws InterruptedException { + public boolean connectPeer(Peer newPeer) throws InterruptedException { // NOT CORRECT: if (getImmutableConnectedPeers().size() >= minOutboundPeers) return false; - SocketChannel socketChannel = newPeer.connect(this.channelSelector); + SocketChannel socketChannel = newPeer.connect(); if (socketChannel == null) { return false; } @@ -815,7 +692,7 @@ public class Network { return true; } - private Peer getPeerFromChannel(SocketChannel socketChannel) { + public Peer getPeerFromChannel(SocketChannel socketChannel) { for (Peer peer : this.getImmutableConnectedPeers()) { if (peer.getSocketChannel() == socketChannel) { return peer; @@ -848,6 +725,69 @@ public class Network { nextDisconnectionCheck = now + DISCONNECTION_CHECK_INTERVAL; } + // SocketChannel interest-ops manipulations + + private static final String[] OP_NAMES = new String[SelectionKey.OP_ACCEPT * 2]; + static { + for (int i = 0; i < OP_NAMES.length; i++) { + StringJoiner joiner = new StringJoiner(","); + + if ((i & SelectionKey.OP_READ) != 0) joiner.add("OP_READ"); + if ((i & SelectionKey.OP_WRITE) != 0) joiner.add("OP_WRITE"); + if ((i & SelectionKey.OP_CONNECT) != 0) joiner.add("OP_CONNECT"); + if ((i & SelectionKey.OP_ACCEPT) != 0) joiner.add("OP_ACCEPT"); + + OP_NAMES[i] = joiner.toString(); + } + } + + public void clearInterestOps(SelectableChannel socketChannel, int interestOps) { + SelectionKey selectionKey = socketChannel.keyFor(channelSelector); + if (selectionKey == null) + return; + + clearInterestOps(selectionKey, interestOps); + } + + private void clearInterestOps(SelectionKey selectionKey, int interestOps) { + if (!selectionKey.channel().isOpen()) + return; + + LOGGER.trace("Thread {} clearing {} interest-ops on channel: {}", + Thread.currentThread().getId(), + OP_NAMES[interestOps], + selectionKey.channel()); + + selectionKey.interestOpsAnd(~interestOps); + } + + public void setInterestOps(SelectableChannel socketChannel, int interestOps) { + SelectionKey selectionKey = socketChannel.keyFor(channelSelector); + if (selectionKey == null) { + try { + selectionKey = socketChannel.register(this.channelSelector, interestOps); + } catch (ClosedChannelException e) { + // Channel already closed so ignore + return; + } + // Fall-through to allow logging + } + + setInterestOps(selectionKey, interestOps); + } + + private void setInterestOps(SelectionKey selectionKey, int interestOps) { + if (!selectionKey.channel().isOpen()) + return; + + LOGGER.trace("Thread {} setting {} interest-ops on channel: {}", + Thread.currentThread().getId(), + OP_NAMES[interestOps], + selectionKey.channel()); + + selectionKey.interestOpsOr(interestOps); + } + // Peer callbacks protected void wakeupChannelSelector() { @@ -887,7 +827,7 @@ public class Network { if (getImmutableConnectedPeers().size() < maxPeers - 1 && (serverSelectionKey.interestOps() & SelectionKey.OP_ACCEPT) == 0) { try { LOGGER.debug("Re-enabling accepting incoming connections because the server is not longer full"); - serverSelectionKey.interestOps(SelectionKey.OP_ACCEPT); + setInterestOps(serverSelectionKey, SelectionKey.OP_ACCEPT); } catch (CancelledKeyException e) { LOGGER.error("Failed to re-enable accepting of incoming connections: {}", e.getMessage()); } diff --git a/src/main/java/org/qortal/network/Peer.java b/src/main/java/org/qortal/network/Peer.java index b1dd0ef5..a755632d 100644 --- a/src/main/java/org/qortal/network/Peer.java +++ b/src/main/java/org/qortal/network/Peer.java @@ -12,24 +12,20 @@ import org.qortal.data.network.PeerData; import org.qortal.network.message.ChallengeMessage; import org.qortal.network.message.Message; import org.qortal.network.message.MessageException; -import org.qortal.network.message.MessageType; -import org.qortal.network.message.PingMessage; +import org.qortal.network.task.MessageTask; +import org.qortal.network.task.PingTask; import org.qortal.settings.Settings; -import org.qortal.utils.ExecuteProduceConsume; +import org.qortal.utils.ExecuteProduceConsume.Task; import org.qortal.utils.NTP; import java.io.IOException; import java.net.*; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.security.SecureRandom; import java.util.*; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -98,7 +94,7 @@ public class Peer { /** * When last PING message was sent, or null if pings not started yet. */ - private Long lastPingSent; + private Long lastPingSent = null; byte[] ourChallenge; @@ -160,10 +156,10 @@ public class Peer { /** * Construct Peer using existing, connected socket */ - public Peer(SocketChannel socketChannel, Selector channelSelector) throws IOException { + public Peer(SocketChannel socketChannel) throws IOException { this.isOutbound = false; this.socketChannel = socketChannel; - sharedSetup(channelSelector); + sharedSetup(); this.resolvedAddress = ((InetSocketAddress) socketChannel.socket().getRemoteSocketAddress()); this.isLocal = isAddressLocal(this.resolvedAddress.getAddress()); @@ -276,7 +272,7 @@ public class Peer { } } - protected void setLastPing(long lastPing) { + public void setLastPing(long lastPing) { synchronized (this.peerInfoLock) { this.lastPing = lastPing; } @@ -396,13 +392,13 @@ public class Peer { // Processing - private void sharedSetup(Selector channelSelector) throws IOException { + private void sharedSetup() throws IOException { this.connectionTimestamp = NTP.getTime(); this.socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true); this.socketChannel.configureBlocking(false); - this.socketChannel.register(channelSelector, SelectionKey.OP_READ); + Network.getInstance().setInterestOps(this.socketChannel, SelectionKey.OP_READ); this.byteBuffer = null; // Defer allocation to when we need it, to save memory. Sorry GC! - this.replyQueues = Collections.synchronizedMap(new HashMap>()); + this.replyQueues = new ConcurrentHashMap<>(); this.pendingMessages = new LinkedBlockingQueue<>(); Random random = new SecureRandom(); @@ -410,7 +406,7 @@ public class Peer { random.nextBytes(this.ourChallenge); } - public SocketChannel connect(Selector channelSelector) { + public SocketChannel connect() { LOGGER.trace("[{}] Connecting to peer {}", this.peerConnectionId, this); try { @@ -432,7 +428,7 @@ public class Peer { try { LOGGER.debug("[{}] Connected to peer {}", this.peerConnectionId, this); - sharedSetup(channelSelector); + sharedSetup(); return socketChannel; } catch (IOException e) { LOGGER.trace("[{}] Post-connection setup failed, peer {}", this.peerConnectionId, this); @@ -450,7 +446,7 @@ public class Peer { * * @throws IOException If this channel is not yet connected */ - protected void readChannel() throws IOException { + public void readChannel() throws IOException { synchronized (this.byteBufferLock) { while (true) { if (!this.socketChannel.isOpen() || this.socketChannel.socket().isClosed()) { @@ -556,7 +552,16 @@ public class Peer { } } - protected ExecuteProduceConsume.Task getMessageTask() { + /** Maybe send some pending outgoing messages. + * + * @return true if more data is pending to be sent + */ + public boolean writeChannel() throws IOException { + // TODO + return false; + } + + protected Task getMessageTask() { /* * If we are still handshaking and there is a message yet to be processed then * don't produce another message task. This allows us to process handshake @@ -580,7 +585,7 @@ public class Peer { } // Return a task to process message in queue - return () -> Network.getInstance().onMessage(this, nextMessage); + return new MessageTask(this, nextMessage); } /** @@ -720,7 +725,7 @@ public class Peer { this.lastPingSent = NTP.getTime(); } - protected ExecuteProduceConsume.Task getPingTask(Long now) { + protected Task getPingTask(Long now) { // Pings not enabled yet? if (now == null || this.lastPingSent == null) { return null; @@ -734,19 +739,7 @@ public class Peer { // Not strictly true, but prevents this peer from being immediately chosen again this.lastPingSent = now; - return () -> { - PingMessage pingMessage = new PingMessage(); - Message message = this.getResponse(pingMessage); - - if (message == null || message.getType() != MessageType.PING) { - LOGGER.debug("[{}] Didn't receive reply from {} for PING ID {}", this.peerConnectionId, this, - pingMessage.getId()); - this.disconnect("no ping received"); - return; - } - - this.setLastPing(NTP.getTime() - now); - }; + return new PingTask(this, now); } public void disconnect(String reason) { diff --git a/src/main/java/org/qortal/network/task/BroadcastTask.java b/src/main/java/org/qortal/network/task/BroadcastTask.java new file mode 100644 index 00000000..5714ebf6 --- /dev/null +++ b/src/main/java/org/qortal/network/task/BroadcastTask.java @@ -0,0 +1,22 @@ +package org.qortal.network.task; + +import org.qortal.controller.Controller; +import org.qortal.network.Network; +import org.qortal.network.Peer; +import org.qortal.network.message.Message; +import org.qortal.utils.ExecuteProduceConsume.Task; + +public class BroadcastTask implements Task { + public BroadcastTask() { + } + + @Override + public String getName() { + return "BroadcastTask"; + } + + @Override + public void perform() throws InterruptedException { + Controller.getInstance().doNetworkBroadcast(); + } +} diff --git a/src/main/java/org/qortal/network/task/ChannelAcceptTask.java b/src/main/java/org/qortal/network/task/ChannelAcceptTask.java new file mode 100644 index 00000000..b98a881a --- /dev/null +++ b/src/main/java/org/qortal/network/task/ChannelAcceptTask.java @@ -0,0 +1,99 @@ +package org.qortal.network.task; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.qortal.network.Network; +import org.qortal.network.Peer; +import org.qortal.network.PeerAddress; +import org.qortal.settings.Settings; +import org.qortal.utils.ExecuteProduceConsume.Task; +import org.qortal.utils.NTP; + +import java.io.IOException; +import java.nio.channels.SelectionKey; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.List; + +public class ChannelAcceptTask implements Task { + private static final Logger LOGGER = LogManager.getLogger(ChannelAcceptTask.class); + + private final SelectionKey serverSelectionKey; + private final ServerSocketChannel serverSocketChannel; + + public ChannelAcceptTask(SelectionKey selectionKey) { + this.serverSelectionKey = selectionKey; + this.serverSocketChannel = (ServerSocketChannel) this.serverSelectionKey.channel(); + } + + @Override + public String getName() { + return "ChannelAcceptTask"; + } + + @Override + public void perform() throws InterruptedException { + Network network = Network.getInstance(); + SocketChannel socketChannel; + + try { + if (network.getImmutableConnectedPeers().size() >= network.getMaxPeers()) { + // We have enough peers + LOGGER.debug("Ignoring pending incoming connections because the server is full"); + return; + } + + socketChannel = serverSocketChannel.accept(); + + network.setInterestOps(serverSocketChannel, SelectionKey.OP_ACCEPT); + } catch (IOException e) { + return; + } + + // No connection actually accepted? + if (socketChannel == null) { + return; + } + + PeerAddress address = PeerAddress.fromSocket(socketChannel.socket()); + List fixedNetwork = Settings.getInstance().getFixedNetwork(); + if (fixedNetwork != null && !fixedNetwork.isEmpty() && network.ipNotInFixedList(address, fixedNetwork)) { + try { + LOGGER.debug("Connection discarded from peer {} as not in the fixed network list", address); + socketChannel.close(); + } catch (IOException e) { + // IGNORE + } + return; + } + + final Long now = NTP.getTime(); + Peer newPeer; + + try { + if (now == null) { + LOGGER.debug("Connection discarded from peer {} due to lack of NTP sync", address); + socketChannel.close(); + return; + } + + LOGGER.debug("Connection accepted from peer {}", address); + + newPeer = new Peer(socketChannel); + network.addConnectedPeer(newPeer); + + } catch (IOException e) { + if (socketChannel.isOpen()) { + try { + LOGGER.debug("Connection failed from peer {} while connecting/closing", address); + socketChannel.close(); + } catch (IOException ce) { + // Couldn't close? + } + } + return; + } + + network.onPeerReady(newPeer); + } +} diff --git a/src/main/java/org/qortal/network/task/ChannelReadTask.java b/src/main/java/org/qortal/network/task/ChannelReadTask.java new file mode 100644 index 00000000..ad190ef2 --- /dev/null +++ b/src/main/java/org/qortal/network/task/ChannelReadTask.java @@ -0,0 +1,55 @@ +package org.qortal.network.task; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import org.qortal.network.Network; +import org.qortal.network.Peer; +import org.qortal.utils.ExecuteProduceConsume.Task; + +import java.io.IOException; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; + +public class ChannelReadTask implements Task { + private static final Logger LOGGER = LogManager.getLogger(ChannelReadTask.class); + + private final SelectionKey selectionKey; + private final SocketChannel socketChannel; + private final Peer peer; + private final String name; + + public ChannelReadTask(SelectionKey selectionKey) { + this.selectionKey = selectionKey; + this.socketChannel = (SocketChannel) this.selectionKey.channel(); + this.peer = Network.getInstance().getPeerFromChannel(this.socketChannel); + this.name = "ChannelReadTask::" + peer; + } + + @Override + public String getName() { + return name; + } + + @Override + public void perform() throws InterruptedException { + if (peer == null) { + return; + } + + try { + peer.readChannel(); + + Network.getInstance().setInterestOps(socketChannel, SelectionKey.OP_READ); + } catch (IOException e) { + if (e.getMessage() != null && e.getMessage().toLowerCase().contains("connection reset")) { + peer.disconnect("Connection reset"); + return; + } + + LOGGER.trace("[{}] Network thread {} encountered I/O error: {}", peer.getPeerConnectionId(), + Thread.currentThread().getId(), e.getMessage(), e); + peer.disconnect("I/O error"); + } + } +} diff --git a/src/main/java/org/qortal/network/task/ChannelWriteTask.java b/src/main/java/org/qortal/network/task/ChannelWriteTask.java new file mode 100644 index 00000000..757fa01d --- /dev/null +++ b/src/main/java/org/qortal/network/task/ChannelWriteTask.java @@ -0,0 +1,56 @@ +package org.qortal.network.task; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.qortal.network.Network; +import org.qortal.network.Peer; +import org.qortal.utils.ExecuteProduceConsume.Task; + +import java.io.IOException; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; + +public class ChannelWriteTask implements Task { + private static final Logger LOGGER = LogManager.getLogger(ChannelWriteTask.class); + + private final SelectionKey selectionKey; + private final SocketChannel socketChannel; + private final Peer peer; + private final String name; + + public ChannelWriteTask(SelectionKey selectionKey) { + this.selectionKey = selectionKey; + this.socketChannel = (SocketChannel) this.selectionKey.channel(); + this.peer = Network.getInstance().getPeerFromChannel(this.socketChannel); + this.name = "ChannelWriteTask::" + peer; + } + + @Override + public String getName() { + return name; + } + + @Override + public void perform() throws InterruptedException { + if (peer == null) { + return; + } + + try { + boolean isMoreDataPending = peer.writeChannel(); + + if (isMoreDataPending) { + Network.getInstance().setInterestOps(socketChannel, SelectionKey.OP_WRITE); + } + } catch (IOException e) { + if (e.getMessage() != null && e.getMessage().toLowerCase().contains("connection reset")) { + peer.disconnect("Connection reset"); + return; + } + + LOGGER.trace("[{}] Network thread {} encountered I/O error: {}", peer.getPeerConnectionId(), + Thread.currentThread().getId(), e.getMessage(), e); + peer.disconnect("I/O error"); + } + } +} diff --git a/src/main/java/org/qortal/network/task/MessageTask.java b/src/main/java/org/qortal/network/task/MessageTask.java new file mode 100644 index 00000000..c1907b62 --- /dev/null +++ b/src/main/java/org/qortal/network/task/MessageTask.java @@ -0,0 +1,28 @@ +package org.qortal.network.task; + +import org.qortal.network.Network; +import org.qortal.network.Peer; +import org.qortal.network.message.Message; +import org.qortal.utils.ExecuteProduceConsume.Task; + +public class MessageTask implements Task { + private final Peer peer; + private final Message nextMessage; + private final String name; + + public MessageTask(Peer peer, Message nextMessage) { + this.peer = peer; + this.nextMessage = nextMessage; + this.name = "MessageTask::" + peer + "::" + nextMessage.getType(); + } + + @Override + public String getName() { + return name; + } + + @Override + public void perform() throws InterruptedException { + Network.getInstance().onMessage(peer, nextMessage); + } +} diff --git a/src/main/java/org/qortal/network/task/PeerConnectTask.java b/src/main/java/org/qortal/network/task/PeerConnectTask.java new file mode 100644 index 00000000..759cabce --- /dev/null +++ b/src/main/java/org/qortal/network/task/PeerConnectTask.java @@ -0,0 +1,33 @@ +package org.qortal.network.task; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.qortal.network.Network; +import org.qortal.network.Peer; +import org.qortal.network.message.Message; +import org.qortal.network.message.MessageType; +import org.qortal.network.message.PingMessage; +import org.qortal.utils.ExecuteProduceConsume.Task; +import org.qortal.utils.NTP; + +public class PeerConnectTask implements Task { + private static final Logger LOGGER = LogManager.getLogger(PeerConnectTask.class); + + private final Peer peer; + private final String name; + + public PeerConnectTask(Peer peer) { + this.peer = peer; + this.name = "PeerConnectTask::" + peer; + } + + @Override + public String getName() { + return name; + } + + @Override + public void perform() throws InterruptedException { + Network.getInstance().connectPeer(peer); + } +} diff --git a/src/main/java/org/qortal/network/task/PingTask.java b/src/main/java/org/qortal/network/task/PingTask.java new file mode 100644 index 00000000..f47ecd32 --- /dev/null +++ b/src/main/java/org/qortal/network/task/PingTask.java @@ -0,0 +1,44 @@ +package org.qortal.network.task; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.qortal.network.Peer; +import org.qortal.network.message.Message; +import org.qortal.network.message.MessageType; +import org.qortal.network.message.PingMessage; +import org.qortal.utils.ExecuteProduceConsume.Task; +import org.qortal.utils.NTP; + +public class PingTask implements Task { + private static final Logger LOGGER = LogManager.getLogger(PingTask.class); + + private final Peer peer; + private final Long now; + private final String name; + + public PingTask(Peer peer, Long now) { + this.peer = peer; + this.now = now; + this.name = "PingTask::" + peer; + } + + @Override + public String getName() { + return name; + } + + @Override + public void perform() throws InterruptedException { + PingMessage pingMessage = new PingMessage(); + Message message = peer.getResponse(pingMessage); + + if (message == null || message.getType() != MessageType.PING) { + LOGGER.debug("[{}] Didn't receive reply from {} for PING ID {}", + peer.getPeerConnectionId(), peer, pingMessage.getId()); + peer.disconnect("no ping received"); + return; + } + + peer.setLastPing(NTP.getTime() - now); + } +} diff --git a/src/main/java/org/qortal/utils/ExecuteProduceConsume.java b/src/main/java/org/qortal/utils/ExecuteProduceConsume.java index d8e4dbf3..223d0e93 100644 --- a/src/main/java/org/qortal/utils/ExecuteProduceConsume.java +++ b/src/main/java/org/qortal/utils/ExecuteProduceConsume.java @@ -97,9 +97,9 @@ public abstract class ExecuteProduceConsume implements Runnable { */ protected abstract Task produceTask(boolean canBlock) throws InterruptedException; - @FunctionalInterface public interface Task { - public abstract void perform() throws InterruptedException; + String getName(); + void perform() throws InterruptedException; } @Override @@ -152,7 +152,7 @@ public abstract class ExecuteProduceConsume implements Runnable { if (this.logger.isDebugEnabled()) { final long productionPeriod = System.currentTimeMillis() - beforeProduce; - taskType = task == null ? "no task" : task.getClass().getCanonicalName(); + taskType = task == null ? "no task" : task.getName(); this.logger.debug(() -> String.format("[%d] produced [%s] in %dms [canBlock: %b]", Thread.currentThread().getId(), diff --git a/src/test/java/org/qortal/test/EPCTests.java b/src/test/java/org/qortal/test/EPCTests.java index fe48af24..1a41b75d 100644 --- a/src/test/java/org/qortal/test/EPCTests.java +++ b/src/test/java/org/qortal/test/EPCTests.java @@ -13,9 +13,25 @@ import org.junit.Test; import org.qortal.utils.ExecuteProduceConsume; import org.qortal.utils.ExecuteProduceConsume.StatsSnapshot; +import static org.junit.Assert.fail; + public class EPCTests { - class RandomEPC extends ExecuteProduceConsume { + static class SleepTask implements ExecuteProduceConsume.Task { + private static final Random RANDOM = new Random(); + + @Override + public String getName() { + return "SleepTask"; + } + + @Override + public void perform() throws InterruptedException { + Thread.sleep(RANDOM.nextInt(500) + 100); + } + } + + static class RandomEPC extends ExecuteProduceConsume { private final int TASK_PERCENT; private final int PAUSE_PERCENT; @@ -37,9 +53,7 @@ public class EPCTests { // Sometimes produce a task if (percent < TASK_PERCENT) { - return () -> { - Thread.sleep(random.nextInt(500) + 100); - }; + return new SleepTask(); } else { // If we don't produce a task, then maybe simulate a pause until work arrives if (canIdle && percent < PAUSE_PERCENT) @@ -50,45 +64,6 @@ public class EPCTests { } } - private void testEPC(ExecuteProduceConsume testEPC) throws InterruptedException { - final int runTime = 60; // seconds - System.out.println(String.format("Testing EPC for %s seconds:", runTime)); - - final long start = System.currentTimeMillis(); - testEPC.start(); - - // Status reports every second (bar waiting for synchronization) - ScheduledExecutorService statusExecutor = Executors.newSingleThreadScheduledExecutor(); - - statusExecutor.scheduleAtFixedRate(() -> { - final StatsSnapshot snapshot = testEPC.getStatsSnapshot(); - final long seconds = (System.currentTimeMillis() - start) / 1000L; - System.out.print(String.format("After %d second%s, ", seconds, (seconds != 1 ? "s" : ""))); - printSnapshot(snapshot); - }, 1L, 1L, TimeUnit.SECONDS); - - // Let it run for a minute - Thread.sleep(runTime * 1000L); - statusExecutor.shutdownNow(); - - final long before = System.currentTimeMillis(); - testEPC.shutdown(30 * 1000); - final long after = System.currentTimeMillis(); - - System.out.println(String.format("Shutdown took %d milliseconds", after - before)); - - final StatsSnapshot snapshot = testEPC.getStatsSnapshot(); - System.out.print("After shutdown, "); - printSnapshot(snapshot); - } - - private void printSnapshot(final StatsSnapshot snapshot) { - System.out.println(String.format("threads: %d active (%d max, %d exhaustion%s), tasks: %d produced / %d consumed", - snapshot.activeThreadCount, snapshot.greatestActiveThreadCount, - snapshot.spawnFailures, (snapshot.spawnFailures != 1 ? "s": ""), - snapshot.tasksProduced, snapshot.tasksConsumed)); - } - @Test public void testRandomEPC() throws InterruptedException { final int TASK_PERCENT = 25; // Produce a task this % of the time @@ -131,18 +106,39 @@ public class EPCTests { final int MAX_PEERS = 20; - final List lastPings = new ArrayList<>(Collections.nCopies(MAX_PEERS, System.currentTimeMillis())); + final List lastPingProduced = new ArrayList<>(Collections.nCopies(MAX_PEERS, System.currentTimeMillis())); class PingTask implements ExecuteProduceConsume.Task { private final int peerIndex; + private final long lastPing; + private final long productionTimestamp; + private final String name; - public PingTask(int peerIndex) { + public PingTask(int peerIndex, long lastPing, long productionTimestamp) { this.peerIndex = peerIndex; + this.lastPing = lastPing; + this.productionTimestamp = productionTimestamp; + this.name = "PingTask::[" + this.peerIndex + "]"; + } + + @Override + public String getName() { + return name; } @Override public void perform() throws InterruptedException { - System.out.println("Pinging peer " + peerIndex); + long now = System.currentTimeMillis(); + + System.out.println(String.format("Pinging peer %d after post-production delay of %dms and ping interval of %dms", + peerIndex, + now - productionTimestamp, + now - lastPing + )); + + long threshold = now - PING_INTERVAL - PRODUCER_SLEEP_TIME; + if (lastPing < threshold) + fail("excessive peer ping interval for peer " + peerIndex); // At least half the worst case ping round-trip Random random = new Random(); @@ -155,32 +151,73 @@ public class EPCTests { class PingEPC extends ExecuteProduceConsume { @Override protected Task produceTask(boolean canIdle) throws InterruptedException { - // If we can idle, then we do, to simulate worst case - if (canIdle) - Thread.sleep(PRODUCER_SLEEP_TIME); - // Is there a peer that needs a ping? final long now = System.currentTimeMillis(); - synchronized (lastPings) { - for (int peerIndex = 0; peerIndex < lastPings.size(); ++peerIndex) { - long lastPing = lastPings.get(peerIndex); - - if (lastPing < now - PING_INTERVAL - PING_ROUND_TRIP_TIME - PRODUCER_SLEEP_TIME) - throw new RuntimeException("excessive peer ping interval for peer " + peerIndex); + synchronized (lastPingProduced) { + for (int peerIndex = 0; peerIndex < lastPingProduced.size(); ++peerIndex) { + long lastPing = lastPingProduced.get(peerIndex); if (lastPing < now - PING_INTERVAL) { - lastPings.set(peerIndex, System.currentTimeMillis()); - return new PingTask(peerIndex); + lastPingProduced.set(peerIndex, System.currentTimeMillis()); + return new PingTask(peerIndex, lastPing, now); } } } + // If we can idle, then we do, to simulate worst case + if (canIdle) + Thread.sleep(PRODUCER_SLEEP_TIME); + // No work to do return null; } } + System.out.println(String.format("Pings should start after %s seconds", PING_INTERVAL)); + testEPC(new PingEPC()); } + private void testEPC(ExecuteProduceConsume testEPC) throws InterruptedException { + final int runTime = 60; // seconds + System.out.println(String.format("Testing EPC for %s seconds:", runTime)); + + final long start = System.currentTimeMillis(); + + // Status reports every second (bar waiting for synchronization) + ScheduledExecutorService statusExecutor = Executors.newSingleThreadScheduledExecutor(); + + statusExecutor.scheduleAtFixedRate( + () -> { + final StatsSnapshot snapshot = testEPC.getStatsSnapshot(); + final long seconds = (System.currentTimeMillis() - start) / 1000L; + System.out.println(String.format("After %d second%s, %s", seconds, seconds != 1 ? "s" : "", formatSnapshot(snapshot))); + }, + 0L, 1L, TimeUnit.SECONDS + ); + + testEPC.start(); + + // Let it run for a minute + Thread.sleep(runTime * 1000L); + statusExecutor.shutdownNow(); + + final long before = System.currentTimeMillis(); + testEPC.shutdown(30 * 1000); + final long after = System.currentTimeMillis(); + + System.out.println(String.format("Shutdown took %d milliseconds", after - before)); + + final StatsSnapshot snapshot = testEPC.getStatsSnapshot(); + System.out.println("After shutdown, " + formatSnapshot(snapshot)); + } + + private String formatSnapshot(StatsSnapshot snapshot) { + return String.format("threads: %d active (%d max, %d exhaustion%s), tasks: %d produced / %d consumed", + snapshot.activeThreadCount, snapshot.greatestActiveThreadCount, + snapshot.spawnFailures, (snapshot.spawnFailures != 1 ? "s": ""), + snapshot.tasksProduced, snapshot.tasksConsumed + ); + } + }