From eb244bb45b1a52a1b8fed22cb512508255a85913 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Schulthess?= Date: Sun, 13 Apr 2025 10:57:26 +0200 Subject: [PATCH] improve prune and more --- .../java/org/qortal/network/RNSNetwork.java | 42 +++++-- src/main/java/org/qortal/network/RNSPeer.java | 108 ++++++++++++------ 2 files changed, 105 insertions(+), 45 deletions(-) diff --git a/src/main/java/org/qortal/network/RNSNetwork.java b/src/main/java/org/qortal/network/RNSNetwork.java index 08cdffd2..5e130b6b 100644 --- a/src/main/java/org/qortal/network/RNSNetwork.java +++ b/src/main/java/org/qortal/network/RNSNetwork.java @@ -70,6 +70,7 @@ import java.util.concurrent.atomic.AtomicLong; //import java.util.concurrent.locks.ReentrantLock; import java.util.Objects; import java.util.function.Function; +import java.time.Instant; import org.apache.commons.codec.binary.Hex; import org.qortal.utils.ExecuteProduceConsume; @@ -129,11 +130,16 @@ public class RNSNetwork { // just in case the classic TCP/IP Networking is turned off. private static final byte[] MAINNET_MESSAGE_MAGIC = new byte[]{0x51, 0x4f, 0x52, 0x54}; // QORT private static final byte[] TESTNET_MESSAGE_MAGIC = new byte[]{0x71, 0x6f, 0x72, 0x54}; // qorT - private static final int BROADCAST_CHAIN_TIP_DEPTH = 7; // Just enough to fill a SINGLE TCP packet (~1440 bytes) + private static final int BROADCAST_CHAIN_TIP_DEPTH = 7; // (~1440 bytes) /** * How long between informational broadcasts to all ACTIVE peers, in milliseconds. */ private static final long BROADCAST_INTERVAL = 30 * 1000L; // ms + /** + * Link low-level ping interval and timeout + */ + private static final long LINK_PING_INTERVAL = 34 * 1000L; // ms + private static final long LINK_UNREACHABLE_TIMEOUT = 2 * LINK_PING_INTERVAL; //private static final Logger logger = LoggerFactory.getLogger(RNSNetwork.class); @@ -279,7 +285,7 @@ public class RNSNetwork { } public void shutdown() { - isShuttingDown = true; + this.isShuttingDown = true; log.info("shutting down Reticulum"); // gracefully close links of peers that point to us @@ -430,6 +436,9 @@ public class RNSNetwork { log.info("added new RNSPeer, destinationHash: {}", Hex.encodeHexString(destinationHash)); } } + // Chance to announce instead of waiting for next pruning. + // Note: good in theory but leads to ping-pong of announces => not a good idea! + //maybeAnnounce(getBaseDestination()); } } @@ -440,6 +449,7 @@ public class RNSNetwork { private final AtomicLong nextConnectTaskTimestamp = new AtomicLong(0L); // ms - try first connect once NTP syncs private final AtomicLong nextBroadcastTimestamp = new AtomicLong(0L); // ms - try first broadcast once NTP syncs + private final AtomicLong nextPingTimestamp = new AtomicLong(0L); // ms - try first low-level Ping private Iterator channelIterator = null; @@ -457,8 +467,8 @@ public class RNSNetwork { protected Task produceTask(boolean canBlock) throws InterruptedException { Task task; - //// TODO: enable this once we figure out how to add pending messages in RNSPeer - /// (RNSPeer: pendingMessages.offer(message)) + //// TODO: Needed? Figure out how to add pending messages in RNSPeer + //// (RNSPeer: pendingMessages.offer(message)) //task = maybeProducePeerMessageTask(); //if (task != null) { // return task; @@ -466,6 +476,7 @@ public class RNSNetwork { final Long now = NTP.getTime(); + // ping task (Link+Channel+Buffer) task = maybeProducePeerPingTask(now); if (task != null) { return task; @@ -492,9 +503,12 @@ public class RNSNetwork { //// .findFirst() //// .orElse(null); ////} + //// Note: we might not need this. All messages handled asynchronously in Reticulum + //// (RNSPeer peerBufferReady callback) //private Task maybeProducePeerMessageTask() { - // return getImmutableIncomingPeers().stream() + // return getImmutableLinkedPeers().stream() // .map(RNSPeer::getMessageTask) + // .filter(Objects::nonNull) // .findFirst() // .orElse(null); //} @@ -613,13 +627,15 @@ public class RNSNetwork { log.info("number of links (linkedPeers) before pruning: {}", peerList.size()); Link pLink; LinkStatus lStatus; + //final Long now = NTP.getTime(); + Instant now = Instant.now(); for (RNSPeer p: peerList) { pLink = p.getPeerLink(); log.info("prunePeers - pLink: {}, destinationHash: {}", pLink, Hex.encodeHexString(p.getDestinationHash())); log.debug("peer: {}", p); if (nonNull(pLink)) { - if (p.getPeerTimedOut()) { + if ((p.getPeerTimedOut()) || (p.getLastPingResponseReceived() > LINK_UNREACHABLE_TIMEOUT)) { // close peer link for now pLink.teardown(); } @@ -669,12 +685,16 @@ public class RNSNetwork { var ips = getImmutableLinkedPeers(); for (RNSPeer p: ips) { pLink = p.getPeerLink(); - p.pingRemote(); - try { - TimeUnit.SECONDS.sleep(2); // allow for peers to disconnect gracefully - } catch (InterruptedException e) { - log.error("exception: ", e); + if (now.minusMillis(LINK_UNREACHABLE_TIMEOUT).isAfter(p.getLastAccessTimestamp())) { + // Link was not accessed for too long + pLink.teardown(); } + //p.pingRemote(); + //try { + // TimeUnit.SECONDS.sleep(2); // allow for peers to disconnect gracefully + //} catch (InterruptedException e) { + // log.error("exception: ", e); + //} if ((nonNull(pLink) && (pLink.getStatus() == ACTIVE))) { activePeerCount = activePeerCount + 1; } diff --git a/src/main/java/org/qortal/network/RNSPeer.java b/src/main/java/org/qortal/network/RNSPeer.java index c52baef7..45807692 100644 --- a/src/main/java/org/qortal/network/RNSPeer.java +++ b/src/main/java/org/qortal/network/RNSPeer.java @@ -99,7 +99,7 @@ public class RNSPeer { private Boolean deleteMe = false; private Boolean isVacant = true; private Long lastPacketRtt = null; - private byte[] emptyBuffer = {0,0,0,0,0,0,0,0}; + private byte[] emptyBuffer = {0,0,0,0,0}; private Double requestResponseProgress; @Setter(AccessLevel.PACKAGE) private Boolean peerTimedOut = false; @@ -107,9 +107,11 @@ public class RNSPeer { // for qortal networking private static final int RESPONSE_TIMEOUT = 3000; // [ms] private static final int PING_INTERVAL = 34_000; // [ms] - private byte[] messageMagic; // set in creating classes - private Long lastPing = null; // last ping roundtrip time [ms] - private Long lastPingSent = null; // time last ping was sent, or null if not started. + private static final long LINK_PING_INTERVAL = 34 * 1000L; // ms + private byte[] messageMagic; // set in message creating classes + private Long lastPing = null; // last (packet) ping roundtrip time [ms] + private Long lastPingSent = null; // time last (packet) ping was sent, or null if not started. + @Setter(AccessLevel.PACKAGE) private Long lastPingResponseReceived = null; // time last (packet) ping succeeded private Map> replyQueues; private LinkedBlockingQueue pendingMessages; // Versioning @@ -186,27 +188,28 @@ public class RNSPeer { @Override public String toString() { // for messages we want an address-like string representation - //return encodeHexString(this.getDestinationHash()); - return this.getPeerLink().toString(); + if (nonNull(this.peerLink)) { + return this.getPeerLink().toString(); + } else { + return encodeHexString(this.getDestinationHash()); + } } public BufferedRWPair getOrInitPeerBuffer() { var channel = this.peerLink.getChannel(); if (nonNull(this.peerBuffer)) { - log.trace("peerBuffer exists: {}, link status: {}", this.peerBuffer, this.peerLink.getStatus()); log.info("peerBuffer exists: {}, link status: {}", this.peerBuffer, this.peerLink.getStatus()); - //return this.peerBuffer; - //try { - // this.peerBuffer.close(); - // this.peerBuffer = Buffer.createBidirectionalBuffer(receiveStreamId, sendStreamId, channel, this::peerBufferReady); - //} catch (IllegalStateException e) { - // // Exception thrown by Reticulum BufferedRWPair.close() - // // This is a chance to correct links status when doing a RNSPingTask - // log.warn("can't establish Channel/Buffer (remote peer down?), closing link: {}"); - // this.peerLink.teardown(); - // this.peerLink = null; - // //log.error("(handled) IllegalStateException - can't establish Channel/Buffer: {}", e); - //} + try { + log.trace("peerBuffer exists: {}, link status: {}", this.peerBuffer, this.peerLink.getStatus()); + } catch (IllegalStateException e) { + // Exception thrown by Reticulum if the buffer is unusable (Channel, Link, etc) + // This is a chance to correct links status when doing a RNSPingTask + log.warn("can't establish Channel/Buffer (remote peer down?), closing link: {}"); + this.peerBuffer.close(); + this.peerLink.teardown(); + this.peerLink = null; + //log.error("(handled) IllegalStateException - can't establish Channel/Buffer: {}", e); + } } else { log.info("creating buffer - peerLink status: {}, channel: {}", this.peerLink.getStatus(), channel); @@ -219,7 +222,7 @@ public class RNSPeer { public Link getOrInitPeerLink() { if (this.peerLink.getStatus() == ACTIVE) { lastAccessTimestamp = Instant.now(); - return this.peerLink; + //return this.peerLink; } else { initPeerLink(); } @@ -318,18 +321,20 @@ public class RNSPeer { // get the message data byte[] data = this.peerBuffer.read(readyBytes); ByteBuffer bb = ByteBuffer.wrap(data); - log.info("data length: {}, data: {}, ByteBuffer: {}", data.length, data, bb); + log.info("data length: {}, MAGIC: {}, data: {}, ByteBuffer: {}", data.length, this.messageMagic, data, bb); + //log.info("data length: {}, ByteBuffer: {}", data.length, bb); //var pureData = Arrays.copyOfRange(data, this.messageMagic.length - 1, data.length); log.trace("peerBufferReady - data bytes: {}", data.length); + this.lastAccessTimestamp = Instant.now(); if (ByteBuffer.wrap(data, 0, emptyBuffer.length).equals(ByteBuffer.wrap(emptyBuffer, 0, emptyBuffer.length))) { log.info("peerBufferReady - empty buffer detected (length: {})", data.length); //this.peerBuffer.flush(); } else { try { - log.info("***> creating message from {} bytes", data.length); + //log.info("***> creating message from {} bytes", data.length); Message message = Message.fromByteBuffer(bb); - log.info("type {} message received ({} bytes): {}", message.getType(), data.length, message); + log.info("*=> type {} message received ({} bytes): {}", message.getType(), data.length, message); // Handle message based on type switch (message.getType()) { // Do we need this ? (seems like a TCP scenario only thing) @@ -348,31 +353,36 @@ public class RNSPeer { case PONG: log.info("PONG received"); + //this.peerBuffer.flush(); break; - // Do we need this ? (We don't have RNSPeer versions) + // Do we need this ? (no need to relay peer list...) //case PEERS_V2: // onPeersV2Message(peer, message); // this.peerBuffer.flush(); // break; default: - //if (isFalse(this.isInitiator)) { - // Bump up to controller for possible action - //Controller.getInstance().onNetworkMessage(peer, message); - Controller.getInstance().onRNSNetworkMessage(this, message); - this.peerBuffer.flush(); - //} + log.info("default - type {} message received ({} bytes)", message.getType(), data.length); + // Bump up to controller for possible action + //Controller.getInstance().onNetworkMessage(peer, message); + Controller.getInstance().onRNSNetworkMessage(this, message); + //this.peerBuffer.flush(); break; } } catch (MessageException e) { //log.error("{} from peer {}", e.getMessage(), this); log.error("{} from peer {}", e, this); + log.info("{} from peer {}", e, this); } //this.peerBuffer.flush(); // clear buffer } } + //public void handleMessage(Message message) { + // + //} + /** * Set a packet to remote with the message format "close::" * This method is only useful for non-initiator links to close the remote initiator. @@ -427,6 +437,7 @@ public class RNSPeer { } log.info("Valid reply received from {}, round-trip time is {}", encodeHexString(receipt.getDestination().getHash()), rttString); + this.lastAccessTimestamp = Instant.now(); } } @@ -482,9 +493,10 @@ public class RNSPeer { public void pingRemote() { var link = this.peerLink; //if (nonNull(link) & (isFalse(link.isInitiator()))) { - if (nonNull(link) & link.isInitiator()) { + //if (nonNull(link) & link.isInitiator()) { + if (nonNull(link)) { if (peerLink.getStatus() == ACTIVE) { - log.info("pinging remote: {}", link); + log.info("pinging remote (direct, 1 packet): {}", link); var data = "ping".getBytes(UTF_8); link.setPacketCallback(this::linkPacketReceived); Packet pingPacket = new Packet(link, data); @@ -519,6 +531,7 @@ public class RNSPeer { pongMessage.setId(message.getId()); // use the ping message id this.peerBuffer.write(pongMessage.toBytes()); this.peerBuffer.flush(); + this.lastAccessTimestamp = Instant.now(); } catch (MessageException e) { //log.error("{} from peer {}", e.getMessage(), this); log.error("{} from peer {}", e, this); @@ -637,12 +650,14 @@ public class RNSPeer { * @param message message to be sent * @return true if message successfully sent; false otherwise */ + //@Synchronized public boolean sendMessage(Message message) { try { log.trace("Sending {} message with ID {} to peer {}", message.getType().name(), message.getId(), this); + log.info("Sending {} message with ID {} to peer {}", message.getType().name(), message.getId(), this); var peerBuffer = getOrInitPeerBuffer(); - this.peerBuffer.write(message.toBytes()); - this.peerBuffer.flush(); + peerBuffer.write(message.toBytes()); + peerBuffer.flush(); return true; } catch (IllegalStateException e) { this.peerLink.teardown(); @@ -687,6 +702,31 @@ public class RNSPeer { return new RNSPingTask(this, now); } + // low-level Link (packet) ping + protected Link getPingLinks(Long now) { + if (now == null || this.lastPingSent == null) { + return null; + } + + // ping only possible over ACTIVE link + if (nonNull(this.peerLink)) { + if (this.peerLink.getStatus() != ACTIVE) { + return null; + } + } else { + return null; + } + + if (now < this.lastPingSent + LINK_PING_INTERVAL) { + return null; + } + + this.lastPingSent = now; + + return this.peerLink; + + } + // Peer methods reticulum implementations public BlockSummaryData getChainTipData() { List chainTipSummaries = this.peersChainTipData;