From b17e96e121bfd424b064d5639b41bc280acbf00c Mon Sep 17 00:00:00 2001 From: catbref Date: Sat, 13 Aug 2022 15:42:24 +0100 Subject: [PATCH] Log count & total size of peer messages sent & received when a peer is disconnected. Requires org.qortal.net.Peer logging level set to DEBUG --- src/main/java/org/qortal/network/Peer.java | 54 ++++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/src/main/java/org/qortal/network/Peer.java b/src/main/java/org/qortal/network/Peer.java index f99a94b1..31eb4569 100644 --- a/src/main/java/org/qortal/network/Peer.java +++ b/src/main/java/org/qortal/network/Peer.java @@ -27,6 +27,8 @@ import java.nio.channels.SocketChannel; import java.security.SecureRandom; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.LongAccumulator; +import java.util.concurrent.atomic.LongAdder; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -153,6 +155,16 @@ public class Peer { */ private CommonBlockData commonBlockData; + // Message stats + + private static class MessageStats { + public final LongAdder count = new LongAdder(); + public final LongAdder totalBytes = new LongAdder(); + } + + private final Map receivedMessageStats = new ConcurrentHashMap<>(); + private final Map sentMessageStats = new ConcurrentHashMap<>(); + // Constructors /** @@ -542,11 +554,18 @@ public class Peer { // Tidy up buffers: this.byteBuffer.flip(); // Read-only, flipped buffer's position will be after end of message, so copy that + long messageByteSize = readOnlyBuffer.position(); this.byteBuffer.position(readOnlyBuffer.position()); // Copy bytes after read message to front of buffer, // adjusting position accordingly, reset limit to capacity this.byteBuffer.compact(); + // Record message stats + MessageStats messageStats = this.receivedMessageStats.computeIfAbsent(message.getType(), k -> new MessageStats()); + // Ideally these two operations would be atomic, we could pack 'count' in top X bits of the 64-bit long, but meh + messageStats.count.increment(); + messageStats.totalBytes.add(messageByteSize); + // Unsupported message type? Discard with no further processing if (message.getType() == MessageType.UNSUPPORTED) continue; @@ -609,6 +628,12 @@ public class Peer { LOGGER.trace("[{}] Sending {} message with ID {} to peer {}", this.peerConnectionId, this.outputMessageType, this.outputMessageId, this); + + // Record message stats + MessageStats messageStats = this.sentMessageStats.computeIfAbsent(message.getType(), k -> new MessageStats()); + // Ideally these two operations would be atomic, we could pack 'count' in top X bits of the 64-bit long, but meh + messageStats.count.increment(); + messageStats.totalBytes.add(this.outputBuffer.limit()); } catch (MessageException e) { // Something went wrong converting message to bytes, so discard but allow another round LOGGER.warn("[{}] Failed to send {} message with ID {} to peer {}: {}", this.peerConnectionId, @@ -799,8 +824,11 @@ public class Peer { } public void shutdown() { + boolean logStats = false; + if (!isStopping) { LOGGER.debug("[{}] Shutting down peer {}", this.peerConnectionId, this); + logStats = true; } isStopping = true; @@ -812,8 +840,34 @@ public class Peer { LOGGER.debug("[{}] IOException while trying to close peer {}", this.peerConnectionId, this); } } + + if (logStats) { + StringBuilder statsBuilder = new StringBuilder(1024); + statsBuilder.append("peer ").append(this).append(" message stats:\n=received="); + appendMessageStats(statsBuilder, this.receivedMessageStats); + statsBuilder.append("\n=sent="); + appendMessageStats(statsBuilder, this.sentMessageStats); + + LOGGER.debug(statsBuilder.toString()); + } } + private static void appendMessageStats(StringBuilder statsBuilder, Map messageStats) { + if (messageStats.isEmpty()) { + statsBuilder.append("\n none"); + return; + } + + messageStats.keySet().stream() + .sorted(Comparator.comparing(MessageType::name)) + .forEach(messageType -> { + MessageStats stats = messageStats.get(messageType); + + statsBuilder.append("\n ").append(messageType.name()) + .append(": count=").append(stats.count.sum()) + .append(", total bytes=").append(stats.totalBytes.sum()); + }); + } // Minimum version