|
|
@ -61,6 +61,8 @@ public class Peer { |
|
|
|
private InetSocketAddress resolvedAddress = null; |
|
|
|
private InetSocketAddress resolvedAddress = null; |
|
|
|
/** True if remote address is loopback/link-local/site-local, false otherwise. */ |
|
|
|
/** True if remote address is loopback/link-local/site-local, false otherwise. */ |
|
|
|
private boolean isLocal; |
|
|
|
private boolean isLocal; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private final Object byteBufferLock = new Object(); |
|
|
|
private volatile ByteBuffer byteBuffer; |
|
|
|
private volatile ByteBuffer byteBuffer; |
|
|
|
private Map<Integer, BlockingQueue<Message>> replyQueues; |
|
|
|
private Map<Integer, BlockingQueue<Message>> replyQueues; |
|
|
|
private LinkedBlockingQueue<Message> pendingMessages; |
|
|
|
private LinkedBlockingQueue<Message> pendingMessages; |
|
|
@ -256,7 +258,7 @@ public class Peer { |
|
|
|
this.connectionTimestamp = NTP.getTime(); |
|
|
|
this.connectionTimestamp = NTP.getTime(); |
|
|
|
this.socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true); |
|
|
|
this.socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true); |
|
|
|
this.socketChannel.configureBlocking(false); |
|
|
|
this.socketChannel.configureBlocking(false); |
|
|
|
this.byteBuffer = ByteBuffer.allocate(Network.getInstance().getMaxMessageSize()); |
|
|
|
this.byteBuffer = null; // Defer allocation to when we need it, to save memory. Sorry GC!
|
|
|
|
this.replyQueues = Collections.synchronizedMap(new HashMap<Integer, BlockingQueue<Message>>()); |
|
|
|
this.replyQueues = Collections.synchronizedMap(new HashMap<Integer, BlockingQueue<Message>>()); |
|
|
|
this.pendingMessages = new LinkedBlockingQueue<>(); |
|
|
|
this.pendingMessages = new LinkedBlockingQueue<>(); |
|
|
|
} |
|
|
|
} |
|
|
@ -292,11 +294,15 @@ public class Peer { |
|
|
|
* @throws IOException |
|
|
|
* @throws IOException |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
/* package */ void readChannel() throws IOException { |
|
|
|
/* package */ void readChannel() throws IOException { |
|
|
|
synchronized (this.byteBuffer) { |
|
|
|
synchronized (this.byteBufferLock) { |
|
|
|
while(true) { |
|
|
|
while(true) { |
|
|
|
if (!this.socketChannel.isOpen() || this.socketChannel.socket().isClosed()) |
|
|
|
if (!this.socketChannel.isOpen() || this.socketChannel.socket().isClosed()) |
|
|
|
return; |
|
|
|
return; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Do we need to allocate byteBuffer?
|
|
|
|
|
|
|
|
if (this.byteBuffer == null) |
|
|
|
|
|
|
|
this.byteBuffer = ByteBuffer.allocate(Network.getInstance().getMaxMessageSize()); |
|
|
|
|
|
|
|
|
|
|
|
final int bytesRead = this.socketChannel.read(this.byteBuffer); |
|
|
|
final int bytesRead = this.socketChannel.read(this.byteBuffer); |
|
|
|
if (bytesRead == -1) { |
|
|
|
if (bytesRead == -1) { |
|
|
|
this.disconnect("EOF"); |
|
|
|
this.disconnect("EOF"); |
|
|
@ -318,9 +324,15 @@ public class Peer { |
|
|
|
return; |
|
|
|
return; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if (message == null && bytesRead == 0 && !wasByteBufferFull) |
|
|
|
if (message == null && bytesRead == 0 && !wasByteBufferFull) { |
|
|
|
// No complete message in buffer, no more bytes to read from socket even though there was room to read bytes
|
|
|
|
// No complete message in buffer, no more bytes to read from socket even though there was room to read bytes
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// If byteBuffer is empty then we can deallocate it, to save memory, albeit costing GC
|
|
|
|
|
|
|
|
if (this.byteBuffer.remaining() == this.byteBuffer.capacity()) |
|
|
|
|
|
|
|
this.byteBuffer = null; |
|
|
|
|
|
|
|
|
|
|
|
return; |
|
|
|
return; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if (message == null) |
|
|
|
if (message == null) |
|
|
|
// No complete message in buffer, but maybe more bytes to read from socket
|
|
|
|
// No complete message in buffer, but maybe more bytes to read from socket
|
|
|
|