diff --git a/src/main/java/org/qortal/network/RNSNetwork.java b/src/main/java/org/qortal/network/RNSNetwork.java index 536ff1a3..83c35df1 100644 --- a/src/main/java/org/qortal/network/RNSNetwork.java +++ b/src/main/java/org/qortal/network/RNSNetwork.java @@ -106,7 +106,9 @@ public class RNSNetwork { static final String defaultConfigPath = Settings.getInstance().isTestNet() ? RNSCommon.defaultRNSConfigPathTestnet: RNSCommon.defaultRNSConfigPath; private final int MAX_PEERS = Settings.getInstance().getReticulumMaxPeers(); private final int MIN_DESIRED_PEERS = Settings.getInstance().getReticulumMinDesiredPeers(); - private final long PRUNE_INTERVAL = Settings.getInstance().getReticulumPruneInterval(); + // How long [ms] between pruning of peers + private long PRUNE_INTERVAL = 1 * 60 * 1000L; // ms; + Identity serverIdentity; public Destination baseDestination; private volatile boolean isShuttingDown = false; @@ -250,8 +252,9 @@ public class RNSNetwork { public void broadcast(Function peerMessageBuilder) { for (RNSPeer peer : getImmutableLinkedPeers()) { - if (this.isShuttingDown) + if (this.isShuttingDown) { return; + } Message message = peerMessageBuilder.apply(peer); @@ -259,7 +262,10 @@ public class RNSNetwork { continue; } - peer.sendMessage(message); + var pl = peer.getPeerLink(); + if (nonNull(pl) && (pl.getStatus() == ACTIVE)) { + peer.sendMessage(message); + } } } @@ -480,6 +486,12 @@ public class RNSNetwork { //} final Long now = NTP.getTime(); + + // Prune stuck/slow/old peers (moved from Controller) + task = maybeProduceRNSPrunePeersTask(now); + if (task != null) { + return task; + } // ping task (Link+Channel+Buffer) task = maybeProducePeerPingTask(now); @@ -677,53 +689,64 @@ public class RNSNetwork { var incomingPeerList = getIncomingPeers(); log.info("number of links (linkedPeers / incomingPeers) before prunig: {}, {}", peerList.size(), incomingPeerList.size()); - Link pLink; - LinkStatus lStatus; - var now = Instant.now(); - for (RNSPeer p: peerList) { - pLink = p.getPeerLink(); - //var peerLastAccessTimestamp = p.getLastAccessTimestamp(); - var peerLastPingResponseReceived = p.getLastPingResponseReceived(); - log.info("prunePeers - pLink: {}, destinationHash: {}", - pLink, Hex.encodeHexString(p.getDestinationHash())); - log.debug("peer: {}", p); + // prune initiator peers + List lps = getLinkedPeers(); + for (RNSPeer p : lps) { + var pLink = p.getPeerLink(); if (nonNull(pLink)) { - if ((p.getPeerTimedOut()) && (peerLastPingResponseReceived.isBefore(now.minusMillis(LINK_UNREACHABLE_TIMEOUT)))) { - // close peer link for now + log.info("peer link: {}, status: {}", pLink, pLink.getStatus()); + if (pLink.getStatus() == ACTIVE) { + p.pingRemote(); + } + if (p.getPeerTimedOut()) { pLink.teardown(); } - lStatus = pLink.getStatus(); - log.info("Link {} status: {}", pLink, lStatus); - // lStatus in: PENDING, HANDSHAKE, ACTIVE, STALE, CLOSED - if ((lStatus == STALE) || (pLink.getTeardownReason() == TIMEOUT) || (isUnreachable(p))) { - //p.shutdown(); - //peerList.remove(p); - removeLinkedPeer(p); - } else if (lStatus == HANDSHAKE) { - // stuck in handshake state (do we need to shutdown/remove it?) - log.info("peer status HANDSHAKE"); - //p.shutdown(); - //peerList.remove(p); - removeLinkedPeer(p); - } - // either reach peer or disable link - p.pingRemote(); - } else { - if (peerLastPingResponseReceived.isBefore(now.minusMillis(LINK_UNREACHABLE_TIMEOUT))) { - //peerList.remove(p); - removeLinkedPeer(p); - } } } + //Link pLink; + //LinkStatus lStatus; + //var now = Instant.now(); + //for (RNSPeer p: peerList) { + // pLink = p.getPeerLink(); + // var peerLastAccessTimestamp = p.getLastAccessTimestamp(); + // var peerLastPingResponseReceived = p.getLastPingResponseReceived(); + // log.info("peerLink: {}, status: {}", pLink, pLink.getStatus()); + // log.info("prunePeers - pLink: {}, destinationHash: {}", + // pLink, Hex.encodeHexString(p.getDestinationHash())); + // log.debug("peer: {}", p); + // if (nonNull(pLink)) { + // if ((p.getPeerTimedOut()) && (peerLastPingResponseReceived.isBefore(now.minusMillis(LINK_UNREACHABLE_TIMEOUT)))) { + // // close peer link for now + // pLink.teardown(); + // } + // lStatus = pLink.getStatus(); + // log.info("Link {} status: {}", pLink, lStatus); + // // lStatus in: PENDING, HANDSHAKE, ACTIVE, STALE, CLOSED + // if ((lStatus == STALE) || (pLink.getTeardownReason() == TIMEOUT) || (isUnreachable(p))) { + // //p.shutdown(); + // //peerList.remove(p); + // removeLinkedPeer(p); + // } else if (lStatus == HANDSHAKE) { + // // stuck in handshake state (do we need to shutdown/remove it?) + // log.info("peer status HANDSHAKE"); + // //p.shutdown(); + // //peerList.remove(p); + // removeLinkedPeer(p); + // } + // // either reach peer or disable link + // p.pingRemote(); + // } else { + // if (peerLastPingResponseReceived.isBefore(now.minusMillis(LINK_UNREACHABLE_TIMEOUT))) { + // //peerList.remove(p); + // removeLinkedPeer(p); + // } + // } + //} List inaps = incomingNonActivePeers(); //log.info("number of inactive incoming peers: {}", inaps.size()); - //var incomingPeerList = getIncomingPeers(); - //log.info("number of links (linkedPeers / incomingPeers) before prunig: {}, {}", peerList.size(), - // incomingPeerList.size()); for (RNSPeer p: inaps) { incomingPeerList.remove(incomingPeerList.indexOf(p)); } - //removeExpiredPeers(this.linkedPeers); log.info("number of links (linkedPeers / incomingPeers) after prunig: {}, {}", peerList.size(), incomingPeerList.size()); maybeAnnounce(getBaseDestination()); @@ -739,35 +762,6 @@ public class RNSNetwork { * Helper methods */ - //@Synchronized - //public RNSPeer getPeerIfExists(RNSPeer peer) { - // List lps = RNSNetwork.getInstance().getLinkedPeers(); - // RNSPeer result = null; - // for (RNSPeer p: lps) { - // if (nonNull(p.getDestinationHash()) && Arrays.equals(p.getDestinationHash(), peer.getDestinationHash())) { - // log.info("found match by destinationHash"); - // result = p; - // //break; - // } - // if (nonNull(p.getPeerDestinationHash()) && Arrays.equals(p.getPeerDestinationHash(), peer.getPeerDestinationHash())) { - // log.info("found match by peerDestinationHash"); - // result = p; - // //break; - // } - // if (nonNull(p.getPeerBaseDestinationHash()) && Arrays.equals(p.getPeerBaseDestinationHash(), peer.getPeerBaseDestinationHash())) { - // log.info("found match by peerBaseDestinationHash"); - // result = p; - // //break; - // } - // if (nonNull(p.getRemoteTestHash()) && Arrays.equals(p.getRemoteTestHash(), peer.getRemoteTestHash())) { - // log.info("found match by remoteTestHash"); - // result = p; - // //break; - // } - // } - // return result; - //} - public RNSPeer findPeerByLink(Link link) { //List lps = RNSNetwork.getInstance().getLinkedPeers(); List lps = RNSNetwork.getInstance().getImmutableLinkedPeers(); diff --git a/src/main/java/org/qortal/network/RNSPeer.java b/src/main/java/org/qortal/network/RNSPeer.java index 6faa91cc..d803e754 100644 --- a/src/main/java/org/qortal/network/RNSPeer.java +++ b/src/main/java/org/qortal/network/RNSPeer.java @@ -277,14 +277,17 @@ public class RNSPeer { if (link.getTeardownReason() == TIMEOUT) { log.info("The link timed out"); this.peerTimedOut = true; + this.peerBuffer = null; } else if (link.getTeardownReason() == INITIATOR_CLOSED) { log.info("Link closed callback: The initiator closed the link"); log.info("peerLink {} closed (link: {}), link destination hash: {}", peerLink, link, encodeHexString(link.getDestination().getHash())); + this.peerBuffer = null; } else if (link.getTeardownReason() == DESTINATION_CLOSED) { log.info("Link closed callback: The link was closed by the peer, removing peer"); log.info("peerLink {} closed (link: {}), link destination hash: {}", peerLink, link, encodeHexString(link.getDestination().getHash())); + this.peerBuffer = null; } else { log.info("Link closed callback"); } @@ -306,7 +309,7 @@ public class RNSPeer { this.peerBuffer.close(); this.peerBuffer = null; } - peerLink.teardown(); + this.peerLink.teardown(); } } else if (msgText.startsWith("open::")) { var targetPeerHash = subarray(message, 7, message.length); diff --git a/src/main/java/org/qortal/settings/Settings.java b/src/main/java/org/qortal/settings/Settings.java index 8c18f21d..f3f84e12 100644 --- a/src/main/java/org/qortal/settings/Settings.java +++ b/src/main/java/org/qortal/settings/Settings.java @@ -622,8 +622,6 @@ public class Settings { private int reticulumMinDesiredPeers = 3; /** Maximum number of task executor network threads */ private int reticulumMaxNetworkThreadPoolSize = 89; - /** How long [ms] between pruning of peers */ - private long reticulumPruneInterval = 2 * 60 * 1000L; // ms; // Constructors @@ -1394,10 +1392,6 @@ public class Settings { return this.reticulumMaxNetworkThreadPoolSize; } - public long getReticulumPruneInterval() { - return this.reticulumPruneInterval; - } - public int getBuildArbitraryResourcesBatchSize() { return buildArbitraryResourcesBatchSize; }