more pruning updates, changes in naming

This commit is contained in:
Jürg Schulthess 2025-05-12 07:37:25 +02:00
parent 8f663f2c83
commit 65e16896ef
4 changed files with 66 additions and 91 deletions

View File

@ -767,7 +767,7 @@ public class Controller extends Thread {
// //
// if (needsArchiveRebuildRNS && !canBootstrap) { // if (needsArchiveRebuildRNS && !canBootstrap) {
// LOGGER.info("Start syncing from genesis (RNS)!"); // LOGGER.info("Start syncing from genesis (RNS)!");
// List<RNSPeer> seeds = new ArrayList<>(RNSNetwork.getInstance().getImmutableActiveLinkedPeers()); // List<RNSPeer> seeds = new ArrayList<>(RNSNetwork.getInstance().getActiveImmutableLinkedPeers());
// //
// // Check if have a qualified peer to sync // // Check if have a qualified peer to sync
// if (seeds.isEmpty()) { // if (seeds.isEmpty()) {
@ -2985,7 +2985,7 @@ public class Controller extends Thread {
return true; return true;
// Needs a mutable copy of the unmodifiableList // Needs a mutable copy of the unmodifiableList
List<RNSPeer> peers = new ArrayList<>(RNSNetwork.getInstance().getImmutableActiveLinkedPeers()); List<RNSPeer> peers = new ArrayList<>(RNSNetwork.getInstance().getActiveImmutableLinkedPeers());
if (peers == null) if (peers == null)
return false; return false;

View File

@ -218,7 +218,7 @@ public class RNSSynchronizer extends Thread {
return true; return true;
// Needs a mutable copy of the unmodifiableList // Needs a mutable copy of the unmodifiableList
List<RNSPeer> peers = new ArrayList<>(RNSNetwork.getInstance().getImmutableActiveLinkedPeers()); List<RNSPeer> peers = new ArrayList<>(RNSNetwork.getInstance().getActiveImmutableLinkedPeers());
//// Disregard peers that have "misbehaved" recently //// Disregard peers that have "misbehaved" recently
//peers.removeIf(Controller.hasMisbehaved); //peers.removeIf(Controller.hasMisbehaved);
@ -395,7 +395,7 @@ public class RNSSynchronizer extends Thread {
} }
private boolean checkRecoveryModeForPeers(List<RNSPeer> qualifiedPeers) { private boolean checkRecoveryModeForPeers(List<RNSPeer> qualifiedPeers) {
List<RNSPeer> linkedPeers = RNSNetwork.getInstance().getImmutableActiveLinkedPeers(); List<RNSPeer> linkedPeers = RNSNetwork.getInstance().getActiveImmutableLinkedPeers();
if (!linkedPeers.isEmpty()) { if (!linkedPeers.isEmpty()) {
// There is at least one handshaked peer // There is at least one handshaked peer

View File

@ -252,7 +252,7 @@ public class RNSNetwork {
} }
public void broadcast(Function<RNSPeer, Message> peerMessageBuilder) { public void broadcast(Function<RNSPeer, Message> peerMessageBuilder) {
for (RNSPeer peer : getImmutableActiveLinkedPeers()) { for (RNSPeer peer : getActiveImmutableLinkedPeers()) {
if (this.isShuttingDown) { if (this.isShuttingDown) {
return; return;
} }
@ -531,7 +531,7 @@ public class RNSNetwork {
//// Note: we might not need this. All messages handled asynchronously in Reticulum //// Note: we might not need this. All messages handled asynchronously in Reticulum
//// (RNSPeer peerBufferReady callback) //// (RNSPeer peerBufferReady callback)
//private Task maybeProducePeerMessageTask() { //private Task maybeProducePeerMessageTask() {
// return getImmutableActiveLinkedPeers().stream() // return getActiveImmutableLinkedPeers().stream()
// .map(RNSPeer::getMessageTask) // .map(RNSPeer::getMessageTask)
// .filter(Objects::nonNull) // .filter(Objects::nonNull)
// .findFirst() // .findFirst()
@ -555,7 +555,7 @@ public class RNSNetwork {
// log.info("ilp - {}", ilp); // log.info("ilp - {}", ilp);
//} //}
//return ilp; //return ilp;
return getImmutableActiveLinkedPeers().stream() return getActiveImmutableLinkedPeers().stream()
.map(peer -> peer.getPingTask(now)) .map(peer -> peer.getPingTask(now))
.filter(Objects::nonNull) .filter(Objects::nonNull)
.findFirst() .findFirst()
@ -589,7 +589,7 @@ public class RNSNetwork {
return SingletonContainer.INSTANCE; return SingletonContainer.INSTANCE;
} }
public List<RNSPeer> getImmutableActiveLinkedPeers() { public List<RNSPeer> getActiveImmutableLinkedPeers() {
List<RNSPeer> activePeers = Collections.synchronizedList(new ArrayList<>()); List<RNSPeer> activePeers = Collections.synchronizedList(new ArrayList<>());
for (RNSPeer p: this.immutableLinkedPeers) { for (RNSPeer p: this.immutableLinkedPeers) {
if (nonNull(p.getPeerLink()) && (p.getPeerLink().getStatus() == ACTIVE)) { if (nonNull(p.getPeerLink()) && (p.getPeerLink().getStatus() == ACTIVE)) {
@ -615,7 +615,7 @@ public class RNSNetwork {
if (nonNull(peer.getPeerLink())) { if (nonNull(peer.getPeerLink())) {
peer.getPeerLink().teardown(); peer.getPeerLink().teardown();
} }
this.linkedPeers.remove(peer); // thread safe this.linkedPeers.remove(this.linkedPeers.indexOf(peer)); // thread safe
this.immutableLinkedPeers = List.copyOf(this.linkedPeers); this.immutableLinkedPeers = List.copyOf(this.linkedPeers);
} }
@ -635,7 +635,7 @@ public class RNSNetwork {
if (nonNull(peer.getPeerLink())) { if (nonNull(peer.getPeerLink())) {
peer.getPeerLink().teardown(); peer.getPeerLink().teardown();
} }
this.incomingPeers.remove(peer); this.incomingPeers.remove(this.incomingPeers.indexOf(peer));
this.immutableIncomingPeers = List.copyOf(this.incomingPeers); this.immutableIncomingPeers = List.copyOf(this.incomingPeers);
} }
@ -676,7 +676,7 @@ public class RNSNetwork {
//} //}
} }
public List<RNSPeer> incomingNonActivePeers() { public List<RNSPeer> getNonActiveIncommingPeers() {
var ips = getIncomingPeers(); var ips = getIncomingPeers();
List<RNSPeer> result = Collections.synchronizedList(new ArrayList<>()); List<RNSPeer> result = Collections.synchronizedList(new ArrayList<>());
Link pl; Link pl;
@ -695,86 +695,61 @@ public class RNSNetwork {
//@Synchronized //@Synchronized
public void prunePeers() throws DataException { public void prunePeers() throws DataException {
// run periodically (by the Controller)
var peerList = getImmutableLinkedPeers();
var incomingPeerList = getImmutableIncomingPeers();
//log.info("number of links (linkedPeers / incomingPeers) before prunig: {}, {}", peerList.size(),
// incomingPeerList.size());
log.info("number of links (linkedPeers (active) / incomingPeers before prunig: {} ({}), {}",
getImmutableLinkedPeers().size(), getImmutableActiveLinkedPeers().size(),
getImmutableIncomingPeers().size());
// prune initiator peers // prune initiator peers
List<RNSPeer> lps = getImmutableLinkedPeers(); //var peerList = getImmutableLinkedPeers();
for (RNSPeer p : lps) { var initiatorPeerList = getImmutableLinkedPeers();
var initiatorActivePeerList = getActiveImmutableLinkedPeers();
var incomingPeerList = getImmutableIncomingPeers();
log.info("number of links (linkedPeers (active) / incomingPeers before prunig: {} ({}), {}",
initiatorPeerList.size(), getActiveImmutableLinkedPeers().size(),
incomingPeerList.size());
for (RNSPeer p: initiatorActivePeerList) {
var pLink = p.getOrInitPeerLink();
p.pingRemote();
}
for (RNSPeer p : initiatorPeerList) {
var pLink = p.getPeerLink(); var pLink = p.getPeerLink();
if (nonNull(pLink)) { if (nonNull(pLink)) {
if (p.getPeerTimedOut()) {
// options: keep in case peer reconnects or remove => we'll remove it
removeLinkedPeer(p);
continue;
}
if (pLink.getStatus() == ACTIVE) {
continue;
}
if (pLink.getStatus() == CLOSED) {
removeLinkedPeer(p);
continue;
}
if (pLink.getStatus() == PENDING) { if (pLink.getStatus() == PENDING) {
pLink.teardown(); pLink.teardown();
removeLinkedPeer(p);
continue;
} }
if (p.getPeerTimedOut()) { }
}
// prune non-initiator peers
List<RNSPeer> inaps = getNonActiveIncommingPeers();
for (RNSPeer p: inaps) {
var pLink = p.getPeerLink();
if (nonNull(pLink)) {
// could be eg. PENDING
pLink.teardown(); pLink.teardown();
} }
log.info("peer link: {}, status: {}", pLink, pLink.getStatus());
if (pLink.getStatus() == ACTIVE) {
p.pingRemote();
} else {
removeLinkedPeer(p);
}
}
}
//Link pLink;
//LinkStatus lStatus;
//var now = Instant.now();
//for (RNSPeer p: peerList) {
// pLink = p.getPeerLink();
// var peerLastAccessTimestamp = p.getLastAccessTimestamp();
// var peerLastPingResponseReceived = p.getLastPingResponseReceived();
// log.info("peerLink: {}, status: {}", pLink, pLink.getStatus());
// log.info("prunePeers - pLink: {}, destinationHash: {}",
// pLink, Hex.encodeHexString(p.getDestinationHash()));
// log.debug("peer: {}", p);
// if (nonNull(pLink)) {
// if ((p.getPeerTimedOut()) && (peerLastPingResponseReceived.isBefore(now.minusMillis(LINK_UNREACHABLE_TIMEOUT)))) {
// // close peer link for now
// pLink.teardown();
// }
// lStatus = pLink.getStatus();
// log.info("Link {} status: {}", pLink, lStatus);
// // lStatus in: PENDING, HANDSHAKE, ACTIVE, STALE, CLOSED
// if ((lStatus == STALE) || (pLink.getTeardownReason() == TIMEOUT) || (isUnreachable(p))) {
// //p.shutdown();
// //peerList.remove(p);
// removeLinkedPeer(p);
// } else if (lStatus == HANDSHAKE) {
// // stuck in handshake state (do we need to shutdown/remove it?)
// log.info("peer status HANDSHAKE");
// //p.shutdown();
// //peerList.remove(p);
// removeLinkedPeer(p);
// }
// // either reach peer or disable link
// p.pingRemote();
// } else {
// if (peerLastPingResponseReceived.isBefore(now.minusMillis(LINK_UNREACHABLE_TIMEOUT))) {
// //peerList.remove(p);
// removeLinkedPeer(p);
// }
// }
//}
List<RNSPeer> inaps = incomingNonActivePeers();
//log.info("number of inactive incoming peers: {}", inaps.size());
for (RNSPeer p: inaps) {
//incomingPeerList.remove(incomingPeerList.indexOf(p));
removeIncomingPeer(p); removeIncomingPeer(p);
} }
initiatorPeerList = getImmutableLinkedPeers();
initiatorActivePeerList = getActiveImmutableLinkedPeers();
incomingPeerList = getImmutableIncomingPeers();
log.info("number of links (linkedPeers (active) / incomingPeers after prunig: {} ({}), {}", log.info("number of links (linkedPeers (active) / incomingPeers after prunig: {} ({}), {}",
getImmutableLinkedPeers().size(), getImmutableActiveLinkedPeers().size(), initiatorPeerList.size(), getActiveImmutableLinkedPeers().size(),
getImmutableIncomingPeers().size()); incomingPeerList.size());
maybeAnnounce(getBaseDestination()); maybeAnnounce(getBaseDestination());
} }
public void maybeAnnounce(Destination d) { public void maybeAnnounce(Destination d) {
if (getImmutableActiveLinkedPeers().size() < MIN_DESIRED_PEERS) { if (getActiveImmutableLinkedPeers().size() < MIN_DESIRED_PEERS) {
d.announce(); d.announce();
} }
} }

View File

@ -99,9 +99,9 @@ public class RNSPeer {
int sendStreamId = 0; int sendStreamId = 0;
private Boolean isInitiator; private Boolean isInitiator;
private Boolean deleteMe = false; private Boolean deleteMe = false;
private Boolean isVacant = true; //private Boolean isVacant = true;
private Long lastPacketRtt = null; private Long lastPacketRtt = null;
private byte[] emptyBuffer = {0,0,0,0,0}; //private byte[] emptyBuffer = {0,0,0,0,0};
private Double requestResponseProgress; private Double requestResponseProgress;
@Setter(AccessLevel.PACKAGE) private Boolean peerTimedOut = false; @Setter(AccessLevel.PACKAGE) private Boolean peerTimedOut = false;
@ -144,7 +144,7 @@ public class RNSPeer {
initPeerLink(); initPeerLink();
//setCreationTimestamp(System.currentTimeMillis()); //setCreationTimestamp(System.currentTimeMillis());
this.creationTimestamp = Instant.now(); this.creationTimestamp = Instant.now();
this.isVacant = true; //this.isVacant = true;
this.replyQueues = new ConcurrentHashMap<>(); this.replyQueues = new ConcurrentHashMap<>();
this.pendingMessages = new LinkedBlockingQueue<>(); this.pendingMessages = new LinkedBlockingQueue<>();
this.peerData = new RNSPeerData(dhash); this.peerData = new RNSPeerData(dhash);
@ -164,7 +164,7 @@ public class RNSPeer {
this.lastAccessTimestamp = Instant.now(); this.lastAccessTimestamp = Instant.now();
this.lastLinkProbeTimestamp = null; this.lastLinkProbeTimestamp = null;
this.isInitiator = false; this.isInitiator = false;
this.isVacant = false; //this.isVacant = false;
//this.peerLink.setLinkEstablishedCallback(this::linkEstablished); //this.peerLink.setLinkEstablishedCallback(this::linkEstablished);
//this.peerLink.setLinkClosedCallback(this::linkClosed); //this.peerLink.setLinkClosedCallback(this::linkClosed);
@ -223,8 +223,7 @@ public class RNSPeer {
log.info("creating buffer - peerLink status: {}, channel: {}", this.peerLink.getStatus(), channel); log.info("creating buffer - peerLink status: {}, channel: {}", this.peerLink.getStatus(), channel);
this.peerBuffer = Buffer.createBidirectionalBuffer(receiveStreamId, sendStreamId, channel, this::peerBufferReady); this.peerBuffer = Buffer.createBidirectionalBuffer(receiveStreamId, sendStreamId, channel, this::peerBufferReady);
} }
//return getPeerBuffer(); return getPeerBuffer();
return this.peerBuffer;
} }
public Link getOrInitPeerLink() { public Link getOrInitPeerLink() {
@ -342,10 +341,10 @@ public class RNSPeer {
//log.trace("peerBufferReady - data bytes: {}", data.length); //log.trace("peerBufferReady - data bytes: {}", data.length);
this.lastAccessTimestamp = Instant.now(); this.lastAccessTimestamp = Instant.now();
if (ByteBuffer.wrap(data, 0, emptyBuffer.length).equals(ByteBuffer.wrap(emptyBuffer, 0, emptyBuffer.length))) { //if (ByteBuffer.wrap(data, 0, emptyBuffer.length).equals(ByteBuffer.wrap(emptyBuffer, 0, emptyBuffer.length))) {
log.info("peerBufferReady - empty buffer detected (length: {})", data.length); // log.info("peerBufferReady - empty buffer detected (length: {})", data.length);
} //}
else { //else {
try { try {
//log.info("***> creating message from {} bytes", data.length); //log.info("***> creating message from {} bytes", data.length);
Message message = Message.fromByteBuffer(bb); Message message = Message.fromByteBuffer(bb);
@ -415,7 +414,7 @@ public class RNSPeer {
log.error("{} from peer {}", e, this); log.error("{} from peer {}", e, this);
log.info("{} from peer {}", e, this); log.info("{} from peer {}", e, this);
} }
} //}
} }
/** /**
@ -499,13 +498,14 @@ public class RNSPeer {
public void packetTimedOut(PacketReceipt receipt) { public void packetTimedOut(PacketReceipt receipt) {
log.info("packet timed out, receipt status: {}", receipt.getStatus()); log.info("packet timed out, receipt status: {}", receipt.getStatus());
//if (receipt.getStatus() == PacketReceiptStatus.FAILED) { if (receipt.getStatus() == PacketReceiptStatus.FAILED) {
// this.peerTimedOut = true; log.info("packet timed out, receipt status: {}", PacketReceiptStatus.FAILED);
// this.peerLink.teardown();
//}
this.peerTimedOut = true; this.peerTimedOut = true;
this.peerLink.teardown(); this.peerLink.teardown();
} }
//this.peerTimedOut = true;
//this.peerLink.teardown();
}
/** Link Request callbacks */ /** Link Request callbacks */
public void linkRequestResponseReceived(RequestReceipt rr) { public void linkRequestResponseReceived(RequestReceipt rr) {