|
|
@ -27,6 +27,8 @@ import java.nio.channels.SocketChannel; |
|
|
|
import java.security.SecureRandom; |
|
|
|
import java.security.SecureRandom; |
|
|
|
import java.util.*; |
|
|
|
import java.util.*; |
|
|
|
import java.util.concurrent.*; |
|
|
|
import java.util.concurrent.*; |
|
|
|
|
|
|
|
import java.util.concurrent.atomic.LongAccumulator; |
|
|
|
|
|
|
|
import java.util.concurrent.atomic.LongAdder; |
|
|
|
import java.util.regex.Matcher; |
|
|
|
import java.util.regex.Matcher; |
|
|
|
import java.util.regex.Pattern; |
|
|
|
import java.util.regex.Pattern; |
|
|
|
|
|
|
|
|
|
|
@ -153,6 +155,16 @@ public class Peer { |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
private CommonBlockData commonBlockData; |
|
|
|
private CommonBlockData commonBlockData; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Message stats
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private static class MessageStats { |
|
|
|
|
|
|
|
public final LongAdder count = new LongAdder(); |
|
|
|
|
|
|
|
public final LongAdder totalBytes = new LongAdder(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private final Map<MessageType, MessageStats> receivedMessageStats = new ConcurrentHashMap<>(); |
|
|
|
|
|
|
|
private final Map<MessageType, MessageStats> sentMessageStats = new ConcurrentHashMap<>(); |
|
|
|
|
|
|
|
|
|
|
|
// Constructors
|
|
|
|
// Constructors
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
/** |
|
|
@ -542,11 +554,18 @@ public class Peer { |
|
|
|
// Tidy up buffers:
|
|
|
|
// Tidy up buffers:
|
|
|
|
this.byteBuffer.flip(); |
|
|
|
this.byteBuffer.flip(); |
|
|
|
// Read-only, flipped buffer's position will be after end of message, so copy that
|
|
|
|
// Read-only, flipped buffer's position will be after end of message, so copy that
|
|
|
|
|
|
|
|
long messageByteSize = readOnlyBuffer.position(); |
|
|
|
this.byteBuffer.position(readOnlyBuffer.position()); |
|
|
|
this.byteBuffer.position(readOnlyBuffer.position()); |
|
|
|
// Copy bytes after read message to front of buffer,
|
|
|
|
// Copy bytes after read message to front of buffer,
|
|
|
|
// adjusting position accordingly, reset limit to capacity
|
|
|
|
// adjusting position accordingly, reset limit to capacity
|
|
|
|
this.byteBuffer.compact(); |
|
|
|
this.byteBuffer.compact(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Record message stats
|
|
|
|
|
|
|
|
MessageStats messageStats = this.receivedMessageStats.computeIfAbsent(message.getType(), k -> new MessageStats()); |
|
|
|
|
|
|
|
// Ideally these two operations would be atomic, we could pack 'count' in top X bits of the 64-bit long, but meh
|
|
|
|
|
|
|
|
messageStats.count.increment(); |
|
|
|
|
|
|
|
messageStats.totalBytes.add(messageByteSize); |
|
|
|
|
|
|
|
|
|
|
|
// Unsupported message type? Discard with no further processing
|
|
|
|
// Unsupported message type? Discard with no further processing
|
|
|
|
if (message.getType() == MessageType.UNSUPPORTED) |
|
|
|
if (message.getType() == MessageType.UNSUPPORTED) |
|
|
|
continue; |
|
|
|
continue; |
|
|
@ -609,6 +628,12 @@ public class Peer { |
|
|
|
|
|
|
|
|
|
|
|
LOGGER.trace("[{}] Sending {} message with ID {} to peer {}", |
|
|
|
LOGGER.trace("[{}] Sending {} message with ID {} to peer {}", |
|
|
|
this.peerConnectionId, this.outputMessageType, this.outputMessageId, this); |
|
|
|
this.peerConnectionId, this.outputMessageType, this.outputMessageId, this); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Record message stats
|
|
|
|
|
|
|
|
MessageStats messageStats = this.sentMessageStats.computeIfAbsent(message.getType(), k -> new MessageStats()); |
|
|
|
|
|
|
|
// Ideally these two operations would be atomic, we could pack 'count' in top X bits of the 64-bit long, but meh
|
|
|
|
|
|
|
|
messageStats.count.increment(); |
|
|
|
|
|
|
|
messageStats.totalBytes.add(this.outputBuffer.limit()); |
|
|
|
} catch (MessageException e) { |
|
|
|
} catch (MessageException e) { |
|
|
|
// Something went wrong converting message to bytes, so discard but allow another round
|
|
|
|
// Something went wrong converting message to bytes, so discard but allow another round
|
|
|
|
LOGGER.warn("[{}] Failed to send {} message with ID {} to peer {}: {}", this.peerConnectionId, |
|
|
|
LOGGER.warn("[{}] Failed to send {} message with ID {} to peer {}: {}", this.peerConnectionId, |
|
|
@ -799,8 +824,11 @@ public class Peer { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
public void shutdown() { |
|
|
|
public void shutdown() { |
|
|
|
|
|
|
|
boolean logStats = false; |
|
|
|
|
|
|
|
|
|
|
|
if (!isStopping) { |
|
|
|
if (!isStopping) { |
|
|
|
LOGGER.debug("[{}] Shutting down peer {}", this.peerConnectionId, this); |
|
|
|
LOGGER.debug("[{}] Shutting down peer {}", this.peerConnectionId, this); |
|
|
|
|
|
|
|
logStats = true; |
|
|
|
} |
|
|
|
} |
|
|
|
isStopping = true; |
|
|
|
isStopping = true; |
|
|
|
|
|
|
|
|
|
|
@ -812,8 +840,34 @@ public class Peer { |
|
|
|
LOGGER.debug("[{}] IOException while trying to close peer {}", this.peerConnectionId, this); |
|
|
|
LOGGER.debug("[{}] IOException while trying to close peer {}", this.peerConnectionId, this); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (logStats) { |
|
|
|
|
|
|
|
StringBuilder statsBuilder = new StringBuilder(1024); |
|
|
|
|
|
|
|
statsBuilder.append("peer ").append(this).append(" message stats:\n=received="); |
|
|
|
|
|
|
|
appendMessageStats(statsBuilder, this.receivedMessageStats); |
|
|
|
|
|
|
|
statsBuilder.append("\n=sent="); |
|
|
|
|
|
|
|
appendMessageStats(statsBuilder, this.sentMessageStats); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
LOGGER.debug(statsBuilder.toString()); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private static void appendMessageStats(StringBuilder statsBuilder, Map<MessageType, MessageStats> messageStats) { |
|
|
|
|
|
|
|
if (messageStats.isEmpty()) { |
|
|
|
|
|
|
|
statsBuilder.append("\n none"); |
|
|
|
|
|
|
|
return; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
messageStats.keySet().stream() |
|
|
|
|
|
|
|
.sorted(Comparator.comparing(MessageType::name)) |
|
|
|
|
|
|
|
.forEach(messageType -> { |
|
|
|
|
|
|
|
MessageStats stats = messageStats.get(messageType); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
statsBuilder.append("\n ").append(messageType.name()) |
|
|
|
|
|
|
|
.append(": count=").append(stats.count.sum()) |
|
|
|
|
|
|
|
.append(", total bytes=").append(stats.totalBytes.sum()); |
|
|
|
|
|
|
|
}); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Minimum version
|
|
|
|
// Minimum version
|
|
|
|
|
|
|
|
|
|
|
|