Potential fix for issue #22

This commit is contained in:
catbref 2020-08-24 14:27:03 +01:00
parent 9e52f20f71
commit b4301f125d

View File

@ -96,22 +96,24 @@ public class Network {
private final String ourNodeId = Crypto.toNodeAddress(edPublicKeyParams.getEncoded()); private final String ourNodeId = Crypto.toNodeAddress(edPublicKeyParams.getEncoded());
private final int maxMessageSize; private final int maxMessageSize;
private final int minOutboundPeers;
private final int maxPeers;
private final List<PeerData> allKnownPeers = new ArrayList<>(); private final List<PeerData> allKnownPeers = new ArrayList<>();
private final List<Peer> connectedPeers = new ArrayList<>(); private final List<Peer> connectedPeers = new ArrayList<>();
private final List<PeerAddress> selfPeers = new ArrayList<>(); private final List<PeerAddress> selfPeers = new ArrayList<>();
private ExecuteProduceConsume networkEPC; private final ExecuteProduceConsume networkEPC;
private Selector channelSelector; private Selector channelSelector;
private ServerSocketChannel serverChannel; private ServerSocketChannel serverChannel;
private Iterator<SelectionKey> channelIterator = null; private Iterator<SelectionKey> channelIterator = null;
private int minOutboundPeers; // volatile because value is updated inside any one of the EPC threads
private int maxPeers; private volatile long nextConnectTaskTimestamp = 0L; // ms - try first connect once NTP syncs
private long nextConnectTaskTimestamp = 0L; // ms - try first connect once NTP syncs
private ExecutorService broadcastExecutor = Executors.newCachedThreadPool(); private ExecutorService broadcastExecutor = Executors.newCachedThreadPool();
private long nextBroadcastTimestamp = 0L; // ms - try first broadcast once NTP syncs // volatile because value is updated inside any one of the EPC threads
private volatile long nextBroadcastTimestamp = 0L; // ms - try first broadcast once NTP syncs
private final Lock mergePeersLock = new ReentrantLock(); private final Lock mergePeersLock = new ReentrantLock();
@ -429,35 +431,38 @@ public class Network {
private Task maybeProduceChannelTask(boolean canBlock) throws InterruptedException { private Task maybeProduceChannelTask(boolean canBlock) throws InterruptedException {
final SelectionKey nextSelectionKey; final SelectionKey nextSelectionKey;
// anything to do? // Synchronization here to enforce thread-safety on channelIterator
if (channelIterator == null) { synchronized (channelSelector) {
try { // anything to do?
if (canBlock) if (channelIterator == null) {
channelSelector.select(1000L); try {
else if (canBlock)
channelSelector.selectNow(); channelSelector.select(1000L);
} catch (IOException e) { else
LOGGER.warn(String.format("Channel selection threw IOException: %s", e.getMessage())); channelSelector.selectNow();
return null; } catch (IOException e) {
LOGGER.warn(String.format("Channel selection threw IOException: %s", e.getMessage()));
return null;
}
if (Thread.currentThread().isInterrupted())
throw new InterruptedException();
channelIterator = channelSelector.selectedKeys().iterator();
} }
if (Thread.currentThread().isInterrupted()) if (channelIterator.hasNext()) {
throw new InterruptedException(); nextSelectionKey = channelIterator.next();
channelIterator.remove();
} else {
nextSelectionKey = null;
channelIterator = null; // Nothing to do so reset iterator to cause new select
}
channelIterator = channelSelector.selectedKeys().iterator(); LOGGER.trace(() -> String.format("Thread %d, nextSelectionKey %s, channelIterator now %s",
Thread.currentThread().getId(), nextSelectionKey, channelIterator));
} }
if (channelIterator.hasNext()) {
nextSelectionKey = channelIterator.next();
channelIterator.remove();
} else {
nextSelectionKey = null;
channelIterator = null; // Nothing to do so reset iterator to cause new select
}
LOGGER.trace(() -> String.format("Thread %d, nextSelectionKey %s, channelIterator now %s",
Thread.currentThread().getId(), nextSelectionKey, channelIterator));
if (nextSelectionKey == null) if (nextSelectionKey == null)
return null; return null;