3
0
mirror of https://github.com/Qortal/qortal.git synced 2025-02-12 10:15:49 +00:00

More improvements to networking:

As per work done by szisti in PR#45:
Extracted network 'Tasks' to their own classes.
Network.NetworkProcessor reduced to only producing Tasks.

Improved usage of SocketChannel interest-ops.
Eventually this might lead to reducing task-producing synchronization lock into more granular locks.
Work still needed to convert sending messages to a queue and to make use of OP_WRITE instead of sleeping to wait for socket buffer to empty.

Disabled PeerConnectTask producer from checking against connected peers via DNS as it's too slow.

Swapped Peer's replyQueues from SynchronizedMap(wrapped HashMap) to ConcurrentHashMap.

Other minor changes within networking.
This commit is contained in:
catbref 2022-03-28 21:11:16 +01:00
parent 00996b047f
commit 91e0c9b940
11 changed files with 582 additions and 275 deletions

View File

@ -13,6 +13,7 @@ import org.qortal.data.block.BlockData;
import org.qortal.data.network.PeerData;
import org.qortal.data.transaction.TransactionData;
import org.qortal.network.message.*;
import org.qortal.network.task.*;
import org.qortal.repository.DataException;
import org.qortal.repository.Repository;
import org.qortal.repository.RepositoryManager;
@ -216,12 +217,16 @@ public class Network {
// Getters / setters
public static synchronized Network getInstance() {
if (instance == null) {
instance = new Network();
}
private static class SingletonContainer {
private static final Network INSTANCE = new Network();
}
return instance;
public static Network getInstance() {
return SingletonContainer.INSTANCE;
}
public int getMaxPeers() {
return this.maxPeers;
}
public byte[] getMessageMagic() {
@ -496,39 +501,19 @@ public class Network {
}
private Task maybeProducePeerMessageTask() {
for (Peer peer : getImmutableConnectedPeers()) {
Task peerTask = peer.getMessageTask();
if (peerTask != null) {
return peerTask;
}
}
return null;
return getImmutableConnectedPeers().stream()
.map(Peer::getMessageTask)
.filter(Objects::nonNull)
.findFirst()
.orElse(null);
}
private Task maybeProducePeerPingTask(Long now) {
// Ask connected peers whether they need a ping
for (Peer peer : getImmutableHandshakedPeers()) {
Task peerTask = peer.getPingTask(now);
if (peerTask != null) {
return peerTask;
}
}
return null;
}
class PeerConnectTask implements ExecuteProduceConsume.Task {
private final Peer peer;
PeerConnectTask(Peer peer) {
this.peer = peer;
}
@Override
public void perform() throws InterruptedException {
connectPeer(peer);
}
return getImmutableHandshakedPeers().stream()
.map(peer -> peer.getPingTask(now))
.filter(Objects::nonNull)
.findFirst()
.orElse(null);
}
private Task maybeProduceConnectPeerTask(Long now) throws InterruptedException {
@ -557,65 +542,10 @@ public class Network {
}
nextBroadcastTimestamp = now + BROADCAST_INTERVAL;
return () -> Controller.getInstance().doNetworkBroadcast();
}
class ChannelTask implements ExecuteProduceConsume.Task {
private final SelectionKey selectionKey;
ChannelTask(SelectionKey selectionKey) {
this.selectionKey = selectionKey;
}
@Override
public void perform() throws InterruptedException {
try {
LOGGER.trace("Thread {} has pending channel: {}, with ops {}",
Thread.currentThread().getId(), selectionKey.channel(), selectionKey.readyOps());
// process pending channel task
if (selectionKey.isReadable()) {
connectionRead((SocketChannel) selectionKey.channel());
} else if (selectionKey.isAcceptable()) {
acceptConnection((ServerSocketChannel) selectionKey.channel());
}
LOGGER.trace("Thread {} processed channel: {}",
Thread.currentThread().getId(), selectionKey.channel());
} catch (CancelledKeyException e) {
LOGGER.trace("Thread {} encountered cancelled channel: {}",
Thread.currentThread().getId(), selectionKey.channel());
}
}
private void connectionRead(SocketChannel socketChannel) {
Peer peer = getPeerFromChannel(socketChannel);
if (peer == null) {
return;
}
try {
peer.readChannel();
LOGGER.trace("Thread {} re-registering OP_READ interestOps on channel: {}",
Thread.currentThread().getId(), socketChannel);
socketChannel.register(channelSelector, SelectionKey.OP_READ);
} catch (IOException e) {
if (e.getMessage() != null && e.getMessage().toLowerCase().contains("connection reset")) {
peer.disconnect("Connection reset");
return;
}
LOGGER.trace("[{}] Network thread {} encountered I/O error: {}", peer.getPeerConnectionId(),
Thread.currentThread().getId(), e.getMessage(), e);
peer.disconnect("I/O error");
}
}
return new BroadcastTask();
}
private Task maybeProduceChannelTask(boolean canBlock) throws InterruptedException {
final SelectionKey nextSelectionKey;
// Synchronization here to enforce thread-safety on channelIterator
synchronized (channelSelector) {
// anything to do?
@ -636,99 +566,45 @@ public class Network {
}
channelIterator = channelSelector.selectedKeys().iterator();
LOGGER.trace("Thread {}, after {} select, channelIterator now {}",
Thread.currentThread().getId(),
canBlock ? "blocking": "non-blocking",
channelIterator);
}
if (channelIterator.hasNext()) {
nextSelectionKey = channelIterator.next();
channelIterator.remove();
if (nextSelectionKey.isReadable()) {
LOGGER.trace("Thread {} clearing all interestOps on channel: {}",
Thread.currentThread().getId(), nextSelectionKey.channel());
nextSelectionKey.interestOps(0);
}
} else {
nextSelectionKey = null;
if (!channelIterator.hasNext()) {
channelIterator = null; // Nothing to do so reset iterator to cause new select
LOGGER.trace("Thread {}, channelIterator now null", Thread.currentThread().getId());
return null;
}
LOGGER.trace("Thread {}, nextSelectionKey {}, channelIterator now {}",
Thread.currentThread().getId(), nextSelectionKey, channelIterator);
final SelectionKey nextSelectionKey = channelIterator.next();
channelIterator.remove();
LOGGER.trace("Thread {}, nextSelectionKey {}", Thread.currentThread().getId(), nextSelectionKey);
if (nextSelectionKey.isReadable()) {
clearInterestOps(nextSelectionKey, SelectionKey.OP_READ);
return new ChannelReadTask(nextSelectionKey);
}
if (nextSelectionKey.isWritable()) {
clearInterestOps(nextSelectionKey, SelectionKey.OP_WRITE);
return new ChannelWriteTask(nextSelectionKey);
}
if (nextSelectionKey.isAcceptable()) {
clearInterestOps(nextSelectionKey, SelectionKey.OP_ACCEPT);
return new ChannelAcceptTask(nextSelectionKey);
}
}
if (nextSelectionKey == null) {
return null;
}
return new ChannelTask(nextSelectionKey);
return null;
}
}
private void acceptConnection(ServerSocketChannel serverSocketChannel) throws InterruptedException {
SocketChannel socketChannel;
try {
if (getImmutableConnectedPeers().size() >= maxPeers) {
// We have enough peers
if (serverSelectionKey.interestOps() != 0) {
LOGGER.debug("Ignoring pending incoming connections because the server is full");
serverSelectionKey.interestOps(0);
}
return;
}
socketChannel = serverSocketChannel.accept();
} catch (IOException e) {
return;
}
// No connection actually accepted?
if (socketChannel == null) {
return;
}
PeerAddress address = PeerAddress.fromSocket(socketChannel.socket());
List<String> fixedNetwork = Settings.getInstance().getFixedNetwork();
if (fixedNetwork != null && !fixedNetwork.isEmpty() && ipNotInFixedList(address, fixedNetwork)) {
try {
LOGGER.debug("Connection discarded from peer {} as not in the fixed network list", address);
socketChannel.close();
} catch (IOException e) {
// IGNORE
}
return;
}
final Long now = NTP.getTime();
Peer newPeer;
try {
if (now == null) {
LOGGER.debug("Connection discarded from peer {} due to lack of NTP sync", address);
socketChannel.close();
return;
}
LOGGER.debug("Connection accepted from peer {}", address);
newPeer = new Peer(socketChannel, channelSelector);
this.addConnectedPeer(newPeer);
} catch (IOException e) {
if (socketChannel.isOpen()) {
try {
LOGGER.debug("Connection failed from peer {} while connecting/closing", address);
socketChannel.close();
} catch (IOException ce) {
// Couldn't close?
}
}
return;
}
this.onPeerReady(newPeer);
}
private boolean ipNotInFixedList(PeerAddress address, List<String> fixedNetwork) {
public boolean ipNotInFixedList(PeerAddress address, List<String> fixedNetwork) {
for (String ipAddress : fixedNetwork) {
String[] bits = ipAddress.split(":");
if (bits.length >= 1 && bits.length <= 2 && address.getHost().equals(bits[0])) {
@ -764,8 +640,9 @@ public class Network {
peers.removeIf(isConnectedPeer);
// Don't consider already connected peers (resolved address match)
// XXX This might be too slow if we end up waiting a long time for hostnames to resolve via DNS
peers.removeIf(isResolvedAsConnectedPeer);
// Disabled because this might be too slow if we end up waiting a long time for hostnames to resolve via DNS
// Which is ok because duplicate connections to the same peer are handled during handshaking
// peers.removeIf(isResolvedAsConnectedPeer);
this.checkLongestConnection(now);
@ -795,12 +672,12 @@ public class Network {
}
}
private boolean connectPeer(Peer newPeer) throws InterruptedException {
public boolean connectPeer(Peer newPeer) throws InterruptedException {
// NOT CORRECT:
if (getImmutableConnectedPeers().size() >= minOutboundPeers)
return false;
SocketChannel socketChannel = newPeer.connect(this.channelSelector);
SocketChannel socketChannel = newPeer.connect();
if (socketChannel == null) {
return false;
}
@ -815,7 +692,7 @@ public class Network {
return true;
}
private Peer getPeerFromChannel(SocketChannel socketChannel) {
public Peer getPeerFromChannel(SocketChannel socketChannel) {
for (Peer peer : this.getImmutableConnectedPeers()) {
if (peer.getSocketChannel() == socketChannel) {
return peer;
@ -848,6 +725,69 @@ public class Network {
nextDisconnectionCheck = now + DISCONNECTION_CHECK_INTERVAL;
}
// SocketChannel interest-ops manipulations
private static final String[] OP_NAMES = new String[SelectionKey.OP_ACCEPT * 2];
static {
for (int i = 0; i < OP_NAMES.length; i++) {
StringJoiner joiner = new StringJoiner(",");
if ((i & SelectionKey.OP_READ) != 0) joiner.add("OP_READ");
if ((i & SelectionKey.OP_WRITE) != 0) joiner.add("OP_WRITE");
if ((i & SelectionKey.OP_CONNECT) != 0) joiner.add("OP_CONNECT");
if ((i & SelectionKey.OP_ACCEPT) != 0) joiner.add("OP_ACCEPT");
OP_NAMES[i] = joiner.toString();
}
}
public void clearInterestOps(SelectableChannel socketChannel, int interestOps) {
SelectionKey selectionKey = socketChannel.keyFor(channelSelector);
if (selectionKey == null)
return;
clearInterestOps(selectionKey, interestOps);
}
private void clearInterestOps(SelectionKey selectionKey, int interestOps) {
if (!selectionKey.channel().isOpen())
return;
LOGGER.trace("Thread {} clearing {} interest-ops on channel: {}",
Thread.currentThread().getId(),
OP_NAMES[interestOps],
selectionKey.channel());
selectionKey.interestOpsAnd(~interestOps);
}
public void setInterestOps(SelectableChannel socketChannel, int interestOps) {
SelectionKey selectionKey = socketChannel.keyFor(channelSelector);
if (selectionKey == null) {
try {
selectionKey = socketChannel.register(this.channelSelector, interestOps);
} catch (ClosedChannelException e) {
// Channel already closed so ignore
return;
}
// Fall-through to allow logging
}
setInterestOps(selectionKey, interestOps);
}
private void setInterestOps(SelectionKey selectionKey, int interestOps) {
if (!selectionKey.channel().isOpen())
return;
LOGGER.trace("Thread {} setting {} interest-ops on channel: {}",
Thread.currentThread().getId(),
OP_NAMES[interestOps],
selectionKey.channel());
selectionKey.interestOpsOr(interestOps);
}
// Peer callbacks
protected void wakeupChannelSelector() {
@ -887,7 +827,7 @@ public class Network {
if (getImmutableConnectedPeers().size() < maxPeers - 1 && (serverSelectionKey.interestOps() & SelectionKey.OP_ACCEPT) == 0) {
try {
LOGGER.debug("Re-enabling accepting incoming connections because the server is not longer full");
serverSelectionKey.interestOps(SelectionKey.OP_ACCEPT);
setInterestOps(serverSelectionKey, SelectionKey.OP_ACCEPT);
} catch (CancelledKeyException e) {
LOGGER.error("Failed to re-enable accepting of incoming connections: {}", e.getMessage());
}

View File

@ -12,24 +12,20 @@ import org.qortal.data.network.PeerData;
import org.qortal.network.message.ChallengeMessage;
import org.qortal.network.message.Message;
import org.qortal.network.message.MessageException;
import org.qortal.network.message.MessageType;
import org.qortal.network.message.PingMessage;
import org.qortal.network.task.MessageTask;
import org.qortal.network.task.PingTask;
import org.qortal.settings.Settings;
import org.qortal.utils.ExecuteProduceConsume;
import org.qortal.utils.ExecuteProduceConsume.Task;
import org.qortal.utils.NTP;
import java.io.IOException;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.security.SecureRandom;
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -98,7 +94,7 @@ public class Peer {
/**
* When last PING message was sent, or null if pings not started yet.
*/
private Long lastPingSent;
private Long lastPingSent = null;
byte[] ourChallenge;
@ -160,10 +156,10 @@ public class Peer {
/**
* Construct Peer using existing, connected socket
*/
public Peer(SocketChannel socketChannel, Selector channelSelector) throws IOException {
public Peer(SocketChannel socketChannel) throws IOException {
this.isOutbound = false;
this.socketChannel = socketChannel;
sharedSetup(channelSelector);
sharedSetup();
this.resolvedAddress = ((InetSocketAddress) socketChannel.socket().getRemoteSocketAddress());
this.isLocal = isAddressLocal(this.resolvedAddress.getAddress());
@ -276,7 +272,7 @@ public class Peer {
}
}
protected void setLastPing(long lastPing) {
public void setLastPing(long lastPing) {
synchronized (this.peerInfoLock) {
this.lastPing = lastPing;
}
@ -396,13 +392,13 @@ public class Peer {
// Processing
private void sharedSetup(Selector channelSelector) throws IOException {
private void sharedSetup() throws IOException {
this.connectionTimestamp = NTP.getTime();
this.socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
this.socketChannel.configureBlocking(false);
this.socketChannel.register(channelSelector, SelectionKey.OP_READ);
Network.getInstance().setInterestOps(this.socketChannel, SelectionKey.OP_READ);
this.byteBuffer = null; // Defer allocation to when we need it, to save memory. Sorry GC!
this.replyQueues = Collections.synchronizedMap(new HashMap<Integer, BlockingQueue<Message>>());
this.replyQueues = new ConcurrentHashMap<>();
this.pendingMessages = new LinkedBlockingQueue<>();
Random random = new SecureRandom();
@ -410,7 +406,7 @@ public class Peer {
random.nextBytes(this.ourChallenge);
}
public SocketChannel connect(Selector channelSelector) {
public SocketChannel connect() {
LOGGER.trace("[{}] Connecting to peer {}", this.peerConnectionId, this);
try {
@ -432,7 +428,7 @@ public class Peer {
try {
LOGGER.debug("[{}] Connected to peer {}", this.peerConnectionId, this);
sharedSetup(channelSelector);
sharedSetup();
return socketChannel;
} catch (IOException e) {
LOGGER.trace("[{}] Post-connection setup failed, peer {}", this.peerConnectionId, this);
@ -450,7 +446,7 @@ public class Peer {
*
* @throws IOException If this channel is not yet connected
*/
protected void readChannel() throws IOException {
public void readChannel() throws IOException {
synchronized (this.byteBufferLock) {
while (true) {
if (!this.socketChannel.isOpen() || this.socketChannel.socket().isClosed()) {
@ -556,7 +552,16 @@ public class Peer {
}
}
protected ExecuteProduceConsume.Task getMessageTask() {
/** Maybe send some pending outgoing messages.
*
* @return true if more data is pending to be sent
*/
public boolean writeChannel() throws IOException {
// TODO
return false;
}
protected Task getMessageTask() {
/*
* If we are still handshaking and there is a message yet to be processed then
* don't produce another message task. This allows us to process handshake
@ -580,7 +585,7 @@ public class Peer {
}
// Return a task to process message in queue
return () -> Network.getInstance().onMessage(this, nextMessage);
return new MessageTask(this, nextMessage);
}
/**
@ -720,7 +725,7 @@ public class Peer {
this.lastPingSent = NTP.getTime();
}
protected ExecuteProduceConsume.Task getPingTask(Long now) {
protected Task getPingTask(Long now) {
// Pings not enabled yet?
if (now == null || this.lastPingSent == null) {
return null;
@ -734,19 +739,7 @@ public class Peer {
// Not strictly true, but prevents this peer from being immediately chosen again
this.lastPingSent = now;
return () -> {
PingMessage pingMessage = new PingMessage();
Message message = this.getResponse(pingMessage);
if (message == null || message.getType() != MessageType.PING) {
LOGGER.debug("[{}] Didn't receive reply from {} for PING ID {}", this.peerConnectionId, this,
pingMessage.getId());
this.disconnect("no ping received");
return;
}
this.setLastPing(NTP.getTime() - now);
};
return new PingTask(this, now);
}
public void disconnect(String reason) {

View File

@ -0,0 +1,22 @@
package org.qortal.network.task;
import org.qortal.controller.Controller;
import org.qortal.network.Network;
import org.qortal.network.Peer;
import org.qortal.network.message.Message;
import org.qortal.utils.ExecuteProduceConsume.Task;
public class BroadcastTask implements Task {
public BroadcastTask() {
}
@Override
public String getName() {
return "BroadcastTask";
}
@Override
public void perform() throws InterruptedException {
Controller.getInstance().doNetworkBroadcast();
}
}

View File

@ -0,0 +1,99 @@
package org.qortal.network.task;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.qortal.network.Network;
import org.qortal.network.Peer;
import org.qortal.network.PeerAddress;
import org.qortal.settings.Settings;
import org.qortal.utils.ExecuteProduceConsume.Task;
import org.qortal.utils.NTP;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.List;
public class ChannelAcceptTask implements Task {
private static final Logger LOGGER = LogManager.getLogger(ChannelAcceptTask.class);
private final SelectionKey serverSelectionKey;
private final ServerSocketChannel serverSocketChannel;
public ChannelAcceptTask(SelectionKey selectionKey) {
this.serverSelectionKey = selectionKey;
this.serverSocketChannel = (ServerSocketChannel) this.serverSelectionKey.channel();
}
@Override
public String getName() {
return "ChannelAcceptTask";
}
@Override
public void perform() throws InterruptedException {
Network network = Network.getInstance();
SocketChannel socketChannel;
try {
if (network.getImmutableConnectedPeers().size() >= network.getMaxPeers()) {
// We have enough peers
LOGGER.debug("Ignoring pending incoming connections because the server is full");
return;
}
socketChannel = serverSocketChannel.accept();
network.setInterestOps(serverSocketChannel, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
return;
}
// No connection actually accepted?
if (socketChannel == null) {
return;
}
PeerAddress address = PeerAddress.fromSocket(socketChannel.socket());
List<String> fixedNetwork = Settings.getInstance().getFixedNetwork();
if (fixedNetwork != null && !fixedNetwork.isEmpty() && network.ipNotInFixedList(address, fixedNetwork)) {
try {
LOGGER.debug("Connection discarded from peer {} as not in the fixed network list", address);
socketChannel.close();
} catch (IOException e) {
// IGNORE
}
return;
}
final Long now = NTP.getTime();
Peer newPeer;
try {
if (now == null) {
LOGGER.debug("Connection discarded from peer {} due to lack of NTP sync", address);
socketChannel.close();
return;
}
LOGGER.debug("Connection accepted from peer {}", address);
newPeer = new Peer(socketChannel);
network.addConnectedPeer(newPeer);
} catch (IOException e) {
if (socketChannel.isOpen()) {
try {
LOGGER.debug("Connection failed from peer {} while connecting/closing", address);
socketChannel.close();
} catch (IOException ce) {
// Couldn't close?
}
}
return;
}
network.onPeerReady(newPeer);
}
}

View File

@ -0,0 +1,55 @@
package org.qortal.network.task;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.qortal.network.Network;
import org.qortal.network.Peer;
import org.qortal.utils.ExecuteProduceConsume.Task;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
public class ChannelReadTask implements Task {
private static final Logger LOGGER = LogManager.getLogger(ChannelReadTask.class);
private final SelectionKey selectionKey;
private final SocketChannel socketChannel;
private final Peer peer;
private final String name;
public ChannelReadTask(SelectionKey selectionKey) {
this.selectionKey = selectionKey;
this.socketChannel = (SocketChannel) this.selectionKey.channel();
this.peer = Network.getInstance().getPeerFromChannel(this.socketChannel);
this.name = "ChannelReadTask::" + peer;
}
@Override
public String getName() {
return name;
}
@Override
public void perform() throws InterruptedException {
if (peer == null) {
return;
}
try {
peer.readChannel();
Network.getInstance().setInterestOps(socketChannel, SelectionKey.OP_READ);
} catch (IOException e) {
if (e.getMessage() != null && e.getMessage().toLowerCase().contains("connection reset")) {
peer.disconnect("Connection reset");
return;
}
LOGGER.trace("[{}] Network thread {} encountered I/O error: {}", peer.getPeerConnectionId(),
Thread.currentThread().getId(), e.getMessage(), e);
peer.disconnect("I/O error");
}
}
}

View File

@ -0,0 +1,56 @@
package org.qortal.network.task;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.qortal.network.Network;
import org.qortal.network.Peer;
import org.qortal.utils.ExecuteProduceConsume.Task;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
public class ChannelWriteTask implements Task {
private static final Logger LOGGER = LogManager.getLogger(ChannelWriteTask.class);
private final SelectionKey selectionKey;
private final SocketChannel socketChannel;
private final Peer peer;
private final String name;
public ChannelWriteTask(SelectionKey selectionKey) {
this.selectionKey = selectionKey;
this.socketChannel = (SocketChannel) this.selectionKey.channel();
this.peer = Network.getInstance().getPeerFromChannel(this.socketChannel);
this.name = "ChannelWriteTask::" + peer;
}
@Override
public String getName() {
return name;
}
@Override
public void perform() throws InterruptedException {
if (peer == null) {
return;
}
try {
boolean isMoreDataPending = peer.writeChannel();
if (isMoreDataPending) {
Network.getInstance().setInterestOps(socketChannel, SelectionKey.OP_WRITE);
}
} catch (IOException e) {
if (e.getMessage() != null && e.getMessage().toLowerCase().contains("connection reset")) {
peer.disconnect("Connection reset");
return;
}
LOGGER.trace("[{}] Network thread {} encountered I/O error: {}", peer.getPeerConnectionId(),
Thread.currentThread().getId(), e.getMessage(), e);
peer.disconnect("I/O error");
}
}
}

View File

@ -0,0 +1,28 @@
package org.qortal.network.task;
import org.qortal.network.Network;
import org.qortal.network.Peer;
import org.qortal.network.message.Message;
import org.qortal.utils.ExecuteProduceConsume.Task;
public class MessageTask implements Task {
private final Peer peer;
private final Message nextMessage;
private final String name;
public MessageTask(Peer peer, Message nextMessage) {
this.peer = peer;
this.nextMessage = nextMessage;
this.name = "MessageTask::" + peer + "::" + nextMessage.getType();
}
@Override
public String getName() {
return name;
}
@Override
public void perform() throws InterruptedException {
Network.getInstance().onMessage(peer, nextMessage);
}
}

View File

@ -0,0 +1,33 @@
package org.qortal.network.task;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.qortal.network.Network;
import org.qortal.network.Peer;
import org.qortal.network.message.Message;
import org.qortal.network.message.MessageType;
import org.qortal.network.message.PingMessage;
import org.qortal.utils.ExecuteProduceConsume.Task;
import org.qortal.utils.NTP;
public class PeerConnectTask implements Task {
private static final Logger LOGGER = LogManager.getLogger(PeerConnectTask.class);
private final Peer peer;
private final String name;
public PeerConnectTask(Peer peer) {
this.peer = peer;
this.name = "PeerConnectTask::" + peer;
}
@Override
public String getName() {
return name;
}
@Override
public void perform() throws InterruptedException {
Network.getInstance().connectPeer(peer);
}
}

View File

@ -0,0 +1,44 @@
package org.qortal.network.task;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.qortal.network.Peer;
import org.qortal.network.message.Message;
import org.qortal.network.message.MessageType;
import org.qortal.network.message.PingMessage;
import org.qortal.utils.ExecuteProduceConsume.Task;
import org.qortal.utils.NTP;
public class PingTask implements Task {
private static final Logger LOGGER = LogManager.getLogger(PingTask.class);
private final Peer peer;
private final Long now;
private final String name;
public PingTask(Peer peer, Long now) {
this.peer = peer;
this.now = now;
this.name = "PingTask::" + peer;
}
@Override
public String getName() {
return name;
}
@Override
public void perform() throws InterruptedException {
PingMessage pingMessage = new PingMessage();
Message message = peer.getResponse(pingMessage);
if (message == null || message.getType() != MessageType.PING) {
LOGGER.debug("[{}] Didn't receive reply from {} for PING ID {}",
peer.getPeerConnectionId(), peer, pingMessage.getId());
peer.disconnect("no ping received");
return;
}
peer.setLastPing(NTP.getTime() - now);
}
}

View File

@ -97,9 +97,9 @@ public abstract class ExecuteProduceConsume implements Runnable {
*/
protected abstract Task produceTask(boolean canBlock) throws InterruptedException;
@FunctionalInterface
public interface Task {
public abstract void perform() throws InterruptedException;
String getName();
void perform() throws InterruptedException;
}
@Override
@ -152,7 +152,7 @@ public abstract class ExecuteProduceConsume implements Runnable {
if (this.logger.isDebugEnabled()) {
final long productionPeriod = System.currentTimeMillis() - beforeProduce;
taskType = task == null ? "no task" : task.getClass().getCanonicalName();
taskType = task == null ? "no task" : task.getName();
this.logger.debug(() -> String.format("[%d] produced [%s] in %dms [canBlock: %b]",
Thread.currentThread().getId(),

View File

@ -13,9 +13,25 @@ import org.junit.Test;
import org.qortal.utils.ExecuteProduceConsume;
import org.qortal.utils.ExecuteProduceConsume.StatsSnapshot;
import static org.junit.Assert.fail;
public class EPCTests {
class RandomEPC extends ExecuteProduceConsume {
static class SleepTask implements ExecuteProduceConsume.Task {
private static final Random RANDOM = new Random();
@Override
public String getName() {
return "SleepTask";
}
@Override
public void perform() throws InterruptedException {
Thread.sleep(RANDOM.nextInt(500) + 100);
}
}
static class RandomEPC extends ExecuteProduceConsume {
private final int TASK_PERCENT;
private final int PAUSE_PERCENT;
@ -37,9 +53,7 @@ public class EPCTests {
// Sometimes produce a task
if (percent < TASK_PERCENT) {
return () -> {
Thread.sleep(random.nextInt(500) + 100);
};
return new SleepTask();
} else {
// If we don't produce a task, then maybe simulate a pause until work arrives
if (canIdle && percent < PAUSE_PERCENT)
@ -50,45 +64,6 @@ public class EPCTests {
}
}
private void testEPC(ExecuteProduceConsume testEPC) throws InterruptedException {
final int runTime = 60; // seconds
System.out.println(String.format("Testing EPC for %s seconds:", runTime));
final long start = System.currentTimeMillis();
testEPC.start();
// Status reports every second (bar waiting for synchronization)
ScheduledExecutorService statusExecutor = Executors.newSingleThreadScheduledExecutor();
statusExecutor.scheduleAtFixedRate(() -> {
final StatsSnapshot snapshot = testEPC.getStatsSnapshot();
final long seconds = (System.currentTimeMillis() - start) / 1000L;
System.out.print(String.format("After %d second%s, ", seconds, (seconds != 1 ? "s" : "")));
printSnapshot(snapshot);
}, 1L, 1L, TimeUnit.SECONDS);
// Let it run for a minute
Thread.sleep(runTime * 1000L);
statusExecutor.shutdownNow();
final long before = System.currentTimeMillis();
testEPC.shutdown(30 * 1000);
final long after = System.currentTimeMillis();
System.out.println(String.format("Shutdown took %d milliseconds", after - before));
final StatsSnapshot snapshot = testEPC.getStatsSnapshot();
System.out.print("After shutdown, ");
printSnapshot(snapshot);
}
private void printSnapshot(final StatsSnapshot snapshot) {
System.out.println(String.format("threads: %d active (%d max, %d exhaustion%s), tasks: %d produced / %d consumed",
snapshot.activeThreadCount, snapshot.greatestActiveThreadCount,
snapshot.spawnFailures, (snapshot.spawnFailures != 1 ? "s": ""),
snapshot.tasksProduced, snapshot.tasksConsumed));
}
@Test
public void testRandomEPC() throws InterruptedException {
final int TASK_PERCENT = 25; // Produce a task this % of the time
@ -131,18 +106,39 @@ public class EPCTests {
final int MAX_PEERS = 20;
final List<Long> lastPings = new ArrayList<>(Collections.nCopies(MAX_PEERS, System.currentTimeMillis()));
final List<Long> lastPingProduced = new ArrayList<>(Collections.nCopies(MAX_PEERS, System.currentTimeMillis()));
class PingTask implements ExecuteProduceConsume.Task {
private final int peerIndex;
private final long lastPing;
private final long productionTimestamp;
private final String name;
public PingTask(int peerIndex) {
public PingTask(int peerIndex, long lastPing, long productionTimestamp) {
this.peerIndex = peerIndex;
this.lastPing = lastPing;
this.productionTimestamp = productionTimestamp;
this.name = "PingTask::[" + this.peerIndex + "]";
}
@Override
public String getName() {
return name;
}
@Override
public void perform() throws InterruptedException {
System.out.println("Pinging peer " + peerIndex);
long now = System.currentTimeMillis();
System.out.println(String.format("Pinging peer %d after post-production delay of %dms and ping interval of %dms",
peerIndex,
now - productionTimestamp,
now - lastPing
));
long threshold = now - PING_INTERVAL - PRODUCER_SLEEP_TIME;
if (lastPing < threshold)
fail("excessive peer ping interval for peer " + peerIndex);
// At least half the worst case ping round-trip
Random random = new Random();
@ -155,32 +151,73 @@ public class EPCTests {
class PingEPC extends ExecuteProduceConsume {
@Override
protected Task produceTask(boolean canIdle) throws InterruptedException {
// If we can idle, then we do, to simulate worst case
if (canIdle)
Thread.sleep(PRODUCER_SLEEP_TIME);
// Is there a peer that needs a ping?
final long now = System.currentTimeMillis();
synchronized (lastPings) {
for (int peerIndex = 0; peerIndex < lastPings.size(); ++peerIndex) {
long lastPing = lastPings.get(peerIndex);
if (lastPing < now - PING_INTERVAL - PING_ROUND_TRIP_TIME - PRODUCER_SLEEP_TIME)
throw new RuntimeException("excessive peer ping interval for peer " + peerIndex);
synchronized (lastPingProduced) {
for (int peerIndex = 0; peerIndex < lastPingProduced.size(); ++peerIndex) {
long lastPing = lastPingProduced.get(peerIndex);
if (lastPing < now - PING_INTERVAL) {
lastPings.set(peerIndex, System.currentTimeMillis());
return new PingTask(peerIndex);
lastPingProduced.set(peerIndex, System.currentTimeMillis());
return new PingTask(peerIndex, lastPing, now);
}
}
}
// If we can idle, then we do, to simulate worst case
if (canIdle)
Thread.sleep(PRODUCER_SLEEP_TIME);
// No work to do
return null;
}
}
System.out.println(String.format("Pings should start after %s seconds", PING_INTERVAL));
testEPC(new PingEPC());
}
private void testEPC(ExecuteProduceConsume testEPC) throws InterruptedException {
final int runTime = 60; // seconds
System.out.println(String.format("Testing EPC for %s seconds:", runTime));
final long start = System.currentTimeMillis();
// Status reports every second (bar waiting for synchronization)
ScheduledExecutorService statusExecutor = Executors.newSingleThreadScheduledExecutor();
statusExecutor.scheduleAtFixedRate(
() -> {
final StatsSnapshot snapshot = testEPC.getStatsSnapshot();
final long seconds = (System.currentTimeMillis() - start) / 1000L;
System.out.println(String.format("After %d second%s, %s", seconds, seconds != 1 ? "s" : "", formatSnapshot(snapshot)));
},
0L, 1L, TimeUnit.SECONDS
);
testEPC.start();
// Let it run for a minute
Thread.sleep(runTime * 1000L);
statusExecutor.shutdownNow();
final long before = System.currentTimeMillis();
testEPC.shutdown(30 * 1000);
final long after = System.currentTimeMillis();
System.out.println(String.format("Shutdown took %d milliseconds", after - before));
final StatsSnapshot snapshot = testEPC.getStatsSnapshot();
System.out.println("After shutdown, " + formatSnapshot(snapshot));
}
private String formatSnapshot(StatsSnapshot snapshot) {
return String.format("threads: %d active (%d max, %d exhaustion%s), tasks: %d produced / %d consumed",
snapshot.activeThreadCount, snapshot.greatestActiveThreadCount,
snapshot.spawnFailures, (snapshot.spawnFailures != 1 ? "s": ""),
snapshot.tasksProduced, snapshot.tasksConsumed
);
}
}