remove purge timeout setting, updates to pruning

This commit is contained in:
Jürg Schulthess 2025-04-21 15:34:04 +02:00
parent ea06a7fe91
commit e6f349ca41
3 changed files with 67 additions and 76 deletions

View File

@ -106,7 +106,9 @@ public class RNSNetwork {
static final String defaultConfigPath = Settings.getInstance().isTestNet() ? RNSCommon.defaultRNSConfigPathTestnet: RNSCommon.defaultRNSConfigPath;
private final int MAX_PEERS = Settings.getInstance().getReticulumMaxPeers();
private final int MIN_DESIRED_PEERS = Settings.getInstance().getReticulumMinDesiredPeers();
private final long PRUNE_INTERVAL = Settings.getInstance().getReticulumPruneInterval();
// How long [ms] between pruning of peers
private long PRUNE_INTERVAL = 1 * 60 * 1000L; // ms;
Identity serverIdentity;
public Destination baseDestination;
private volatile boolean isShuttingDown = false;
@ -250,8 +252,9 @@ public class RNSNetwork {
public void broadcast(Function<RNSPeer, Message> peerMessageBuilder) {
for (RNSPeer peer : getImmutableLinkedPeers()) {
if (this.isShuttingDown)
if (this.isShuttingDown) {
return;
}
Message message = peerMessageBuilder.apply(peer);
@ -259,7 +262,10 @@ public class RNSNetwork {
continue;
}
peer.sendMessage(message);
var pl = peer.getPeerLink();
if (nonNull(pl) && (pl.getStatus() == ACTIVE)) {
peer.sendMessage(message);
}
}
}
@ -480,6 +486,12 @@ public class RNSNetwork {
//}
final Long now = NTP.getTime();
// Prune stuck/slow/old peers (moved from Controller)
task = maybeProduceRNSPrunePeersTask(now);
if (task != null) {
return task;
}
// ping task (Link+Channel+Buffer)
task = maybeProducePeerPingTask(now);
@ -677,53 +689,64 @@ public class RNSNetwork {
var incomingPeerList = getIncomingPeers();
log.info("number of links (linkedPeers / incomingPeers) before prunig: {}, {}", peerList.size(),
incomingPeerList.size());
Link pLink;
LinkStatus lStatus;
var now = Instant.now();
for (RNSPeer p: peerList) {
pLink = p.getPeerLink();
//var peerLastAccessTimestamp = p.getLastAccessTimestamp();
var peerLastPingResponseReceived = p.getLastPingResponseReceived();
log.info("prunePeers - pLink: {}, destinationHash: {}",
pLink, Hex.encodeHexString(p.getDestinationHash()));
log.debug("peer: {}", p);
// prune initiator peers
List<RNSPeer> lps = getLinkedPeers();
for (RNSPeer p : lps) {
var pLink = p.getPeerLink();
if (nonNull(pLink)) {
if ((p.getPeerTimedOut()) && (peerLastPingResponseReceived.isBefore(now.minusMillis(LINK_UNREACHABLE_TIMEOUT)))) {
// close peer link for now
log.info("peer link: {}, status: {}", pLink, pLink.getStatus());
if (pLink.getStatus() == ACTIVE) {
p.pingRemote();
}
if (p.getPeerTimedOut()) {
pLink.teardown();
}
lStatus = pLink.getStatus();
log.info("Link {} status: {}", pLink, lStatus);
// lStatus in: PENDING, HANDSHAKE, ACTIVE, STALE, CLOSED
if ((lStatus == STALE) || (pLink.getTeardownReason() == TIMEOUT) || (isUnreachable(p))) {
//p.shutdown();
//peerList.remove(p);
removeLinkedPeer(p);
} else if (lStatus == HANDSHAKE) {
// stuck in handshake state (do we need to shutdown/remove it?)
log.info("peer status HANDSHAKE");
//p.shutdown();
//peerList.remove(p);
removeLinkedPeer(p);
}
// either reach peer or disable link
p.pingRemote();
} else {
if (peerLastPingResponseReceived.isBefore(now.minusMillis(LINK_UNREACHABLE_TIMEOUT))) {
//peerList.remove(p);
removeLinkedPeer(p);
}
}
}
//Link pLink;
//LinkStatus lStatus;
//var now = Instant.now();
//for (RNSPeer p: peerList) {
// pLink = p.getPeerLink();
// var peerLastAccessTimestamp = p.getLastAccessTimestamp();
// var peerLastPingResponseReceived = p.getLastPingResponseReceived();
// log.info("peerLink: {}, status: {}", pLink, pLink.getStatus());
// log.info("prunePeers - pLink: {}, destinationHash: {}",
// pLink, Hex.encodeHexString(p.getDestinationHash()));
// log.debug("peer: {}", p);
// if (nonNull(pLink)) {
// if ((p.getPeerTimedOut()) && (peerLastPingResponseReceived.isBefore(now.minusMillis(LINK_UNREACHABLE_TIMEOUT)))) {
// // close peer link for now
// pLink.teardown();
// }
// lStatus = pLink.getStatus();
// log.info("Link {} status: {}", pLink, lStatus);
// // lStatus in: PENDING, HANDSHAKE, ACTIVE, STALE, CLOSED
// if ((lStatus == STALE) || (pLink.getTeardownReason() == TIMEOUT) || (isUnreachable(p))) {
// //p.shutdown();
// //peerList.remove(p);
// removeLinkedPeer(p);
// } else if (lStatus == HANDSHAKE) {
// // stuck in handshake state (do we need to shutdown/remove it?)
// log.info("peer status HANDSHAKE");
// //p.shutdown();
// //peerList.remove(p);
// removeLinkedPeer(p);
// }
// // either reach peer or disable link
// p.pingRemote();
// } else {
// if (peerLastPingResponseReceived.isBefore(now.minusMillis(LINK_UNREACHABLE_TIMEOUT))) {
// //peerList.remove(p);
// removeLinkedPeer(p);
// }
// }
//}
List<RNSPeer> inaps = incomingNonActivePeers();
//log.info("number of inactive incoming peers: {}", inaps.size());
//var incomingPeerList = getIncomingPeers();
//log.info("number of links (linkedPeers / incomingPeers) before prunig: {}, {}", peerList.size(),
// incomingPeerList.size());
for (RNSPeer p: inaps) {
incomingPeerList.remove(incomingPeerList.indexOf(p));
}
//removeExpiredPeers(this.linkedPeers);
log.info("number of links (linkedPeers / incomingPeers) after prunig: {}, {}", peerList.size(),
incomingPeerList.size());
maybeAnnounce(getBaseDestination());
@ -739,35 +762,6 @@ public class RNSNetwork {
* Helper methods
*/
//@Synchronized
//public RNSPeer getPeerIfExists(RNSPeer peer) {
// List<RNSPeer> lps = RNSNetwork.getInstance().getLinkedPeers();
// RNSPeer result = null;
// for (RNSPeer p: lps) {
// if (nonNull(p.getDestinationHash()) && Arrays.equals(p.getDestinationHash(), peer.getDestinationHash())) {
// log.info("found match by destinationHash");
// result = p;
// //break;
// }
// if (nonNull(p.getPeerDestinationHash()) && Arrays.equals(p.getPeerDestinationHash(), peer.getPeerDestinationHash())) {
// log.info("found match by peerDestinationHash");
// result = p;
// //break;
// }
// if (nonNull(p.getPeerBaseDestinationHash()) && Arrays.equals(p.getPeerBaseDestinationHash(), peer.getPeerBaseDestinationHash())) {
// log.info("found match by peerBaseDestinationHash");
// result = p;
// //break;
// }
// if (nonNull(p.getRemoteTestHash()) && Arrays.equals(p.getRemoteTestHash(), peer.getRemoteTestHash())) {
// log.info("found match by remoteTestHash");
// result = p;
// //break;
// }
// }
// return result;
//}
public RNSPeer findPeerByLink(Link link) {
//List<RNSPeer> lps = RNSNetwork.getInstance().getLinkedPeers();
List<RNSPeer> lps = RNSNetwork.getInstance().getImmutableLinkedPeers();

View File

@ -277,14 +277,17 @@ public class RNSPeer {
if (link.getTeardownReason() == TIMEOUT) {
log.info("The link timed out");
this.peerTimedOut = true;
this.peerBuffer = null;
} else if (link.getTeardownReason() == INITIATOR_CLOSED) {
log.info("Link closed callback: The initiator closed the link");
log.info("peerLink {} closed (link: {}), link destination hash: {}",
peerLink, link, encodeHexString(link.getDestination().getHash()));
this.peerBuffer = null;
} else if (link.getTeardownReason() == DESTINATION_CLOSED) {
log.info("Link closed callback: The link was closed by the peer, removing peer");
log.info("peerLink {} closed (link: {}), link destination hash: {}",
peerLink, link, encodeHexString(link.getDestination().getHash()));
this.peerBuffer = null;
} else {
log.info("Link closed callback");
}
@ -306,7 +309,7 @@ public class RNSPeer {
this.peerBuffer.close();
this.peerBuffer = null;
}
peerLink.teardown();
this.peerLink.teardown();
}
} else if (msgText.startsWith("open::")) {
var targetPeerHash = subarray(message, 7, message.length);

View File

@ -622,8 +622,6 @@ public class Settings {
private int reticulumMinDesiredPeers = 3;
/** Maximum number of task executor network threads */
private int reticulumMaxNetworkThreadPoolSize = 89;
/** How long [ms] between pruning of peers */
private long reticulumPruneInterval = 2 * 60 * 1000L; // ms;
// Constructors
@ -1394,10 +1392,6 @@ public class Settings {
return this.reticulumMaxNetworkThreadPoolSize;
}
public long getReticulumPruneInterval() {
return this.reticulumPruneInterval;
}
public int getBuildArbitraryResourcesBatchSize() {
return buildArbitraryResourcesBatchSize;
}