improve incoming peer management, add pruning

This commit is contained in:
Jürg Schulthess 2025-01-12 21:02:50 +01:00
parent ef171c4d03
commit b5f51aa3fd
2 changed files with 40 additions and 6 deletions

View File

@ -520,6 +520,10 @@ public class RNSNetwork {
return this.incomingPeers; return this.incomingPeers;
} }
public List<RNSPeer> getImmutableIncomingPeers() {
return this.immutableIncomingPeers;
}
// TODO, methods for: getAvailablePeer // TODO, methods for: getAvailablePeer
// maintenance // maintenance
@ -542,8 +546,8 @@ public class RNSNetwork {
//@Synchronized //@Synchronized
public void prunePeers() throws DataException { public void prunePeers() throws DataException {
// run periodically (by the Controller) // run periodically (by the Controller)
//List<Link> linkList = getLinkedPeers(); //var peerList = getLinkedPeers();
var peerList = getLinkedPeers(); var peerList = getImmutableLinkedPeers();
log.info("number of links (linkedPeers) before pruning: {}", peerList.size()); log.info("number of links (linkedPeers) before pruning: {}", peerList.size());
Link pLink; Link pLink;
LinkStatus lStatus; LinkStatus lStatus;
@ -576,13 +580,32 @@ public class RNSNetwork {
removeLinkedPeer(p); removeLinkedPeer(p);
} }
} }
//var incomingPeerList = getImmutableIncomingPeers();
var incomingPeerList = getIncomingPeers();
for (RNSPeer ip: incomingPeerList) {
pLink = ip.getPeerLink();
//log.info("prunePeers - {} incoming peer: {}", pLink.getStatus(), ip);
if (nonNull(pLink)) {
if (pLink.getStatus() != ACTIVE) {
log.info("removing inactive incoming/non-initiator peer.");
removeIncomingPeer(ip);
} else {
log.info("prunePeers - {} incoming/non-initiator peer: {}", pLink.getStatus(), pLink);
}
}
else {
log.info("prunePeers - null incoming/non-initiator peer: {}", ip);
//removeIncomingPeer(ip);
}
}
//removeExpiredPeers(this.linkedPeers); //removeExpiredPeers(this.linkedPeers);
log.info("number of links (linkedPeers / incomingPeers) after prunig: {}, {}", peerList.size(), log.info("number of links (linkedPeers / incomingPeers) after prunig: {}, {}", peerList.size(),
getIncomingPeers().size()); incomingPeerList.size());
//log.info("we have {} non-initiator links, list: {}", incomingLinks.size(), incomingLinks); //log.info("we have {} non-initiator links, list: {}", incomingLinks.size(), incomingLinks);
var activePeerCount = 0; var activePeerCount = 0;
var lps = RNSNetwork.getInstance().getLinkedPeers(); //var lps = RNSNetwork.getInstance().getLinkedPeers();
for (RNSPeer p: lps) { var ips = getImmutableLinkedPeers();
for (RNSPeer p: ips) {
pLink = p.getPeerLink(); pLink = p.getPeerLink();
p.pingRemote(); p.pingRemote();
try { try {

View File

@ -559,7 +559,9 @@ public class RNSPeer {
// return false; // return false;
} catch (IllegalStateException e) { } catch (IllegalStateException e) {
//log.warn("Can't write to buffer (remote buffer down?)"); //log.warn("Can't write to buffer (remote buffer down?)");
log.error("IllegalStateException - can't write to buffer: e", e); this.peerLink.teardown();
this.peerBuffer = null;
log.error("IllegalStateException - can't write to buffer: {}", e);
return false; return false;
} catch (MessageException e) { } catch (MessageException e) {
log.error(e.getMessage(), e); log.error(e.getMessage(), e);
@ -579,6 +581,15 @@ public class RNSPeer {
return null; return null;
} }
// ping only possible over ACTIVE Link
if (nonNull(this.peerLink)) {
if (this.peerLink.getStatus() != ACTIVE) {
return null;
}
} else {
return null;
}
// Time to send another ping? // Time to send another ping?
if (now < this.lastPingSent + PING_INTERVAL) { if (now < this.lastPingSent + PING_INTERVAL) {
return null; // Not yet return null; // Not yet