rewored pruning and implementation as task

This commit is contained in:
Jürg Schulthess 2025-04-20 19:59:40 +02:00
parent 13e3d81759
commit c1091cf9e6
5 changed files with 167 additions and 105 deletions

View File

@ -855,29 +855,29 @@ public class Controller extends Thread {
repositoryMaintenanceInterval = getRandomRepositoryMaintenanceInterval(); repositoryMaintenanceInterval = getRandomRepositoryMaintenanceInterval();
} }
// Prune stuck/slow/old peers //// Prune stuck/slow/old peers
if (now >= prunePeersTimestamp + prunePeersInterval) { //if (now >= prunePeersTimestamp + prunePeersInterval) {
prunePeersTimestamp = now + prunePeersInterval; // prunePeersTimestamp = now + prunePeersInterval;
//
// try {
// LOGGER.debug("Pruning peers...");
// Network.getInstance().prunePeers();
// } catch (DataException e) {
// LOGGER.warn(String.format("Repository issue when trying to prune peers: %s", e.getMessage()));
// }
//}
try { //// Q: Do we need global pruning?
LOGGER.debug("Pruning peers..."); //if (now >= pruneRNSPeersTimestamp + pruneRNSPeersInterval) {
Network.getInstance().prunePeers(); // pruneRNSPeersTimestamp = now + pruneRNSPeersInterval;
} catch (DataException e) { //
LOGGER.warn(String.format("Repository issue when trying to prune peers: %s", e.getMessage())); // try {
} // LOGGER.debug("Pruning Reticulum peers...");
} // RNSNetwork.getInstance().prunePeers();
// } catch (DataException e) {
// Q: Do we need global pruning? // LOGGER.warn(String.format("Repository issue when trying to prune Reticulum peers: %s", e.getMessage()));
if (now >= pruneRNSPeersTimestamp + pruneRNSPeersInterval) { // }
pruneRNSPeersTimestamp = now + pruneRNSPeersInterval; //}
try {
LOGGER.debug("Pruning Reticulum peers...");
RNSNetwork.getInstance().prunePeers();
} catch (DataException e) {
LOGGER.warn(String.format("Repository issue when trying to prune Reticulum peers: %s", e.getMessage()));
}
}
// Delete expired transactions // Delete expired transactions
if (now >= deleteExpiredTimestamp) { if (now >= deleteExpiredTimestamp) {
@ -1280,6 +1280,17 @@ public class Controller extends Thread {
} }
public void doRNSPrunePeers() {
RNSNetwork network = RNSNetwork.getInstance();
try {
LOGGER.debug("Pruning peers...");
network.prunePeers();
} catch (DataException e) {
LOGGER.warn(String.format("Repository issue when trying to prune peers: %s", e.getMessage()));
}
}
public void onMintingPossibleChange(boolean isMintingPossible) { public void onMintingPossibleChange(boolean isMintingPossible) {
this.isMintingPossible = isMintingPossible; this.isMintingPossible = isMintingPossible;
requestSysTrayUpdate = true; requestSysTrayUpdate = true;

View File

@ -82,6 +82,7 @@ import org.qortal.network.message.BlockSummariesV2Message;
import org.qortal.network.message.TransactionSignaturesMessage; import org.qortal.network.message.TransactionSignaturesMessage;
import org.qortal.network.message.GetUnconfirmedTransactionsMessage; import org.qortal.network.message.GetUnconfirmedTransactionsMessage;
import org.qortal.network.task.RNSBroadcastTask; import org.qortal.network.task.RNSBroadcastTask;
import org.qortal.network.task.RNSPrunePeersTask;
import org.qortal.controller.Controller; import org.qortal.controller.Controller;
import org.qortal.repository.Repository; import org.qortal.repository.Repository;
import org.qortal.repository.RepositoryManager; import org.qortal.repository.RepositoryManager;
@ -135,11 +136,15 @@ public class RNSNetwork {
* How long between informational broadcasts to all ACTIVE peers, in milliseconds. * How long between informational broadcasts to all ACTIVE peers, in milliseconds.
*/ */
private static final long BROADCAST_INTERVAL = 30 * 1000L; // ms private static final long BROADCAST_INTERVAL = 30 * 1000L; // ms
/**
* How log between pruning of peers
*/
private static final long PRUNE_INTERVAL = 2 * 60 * 1000L; // ms
/** /**
* Link low-level ping interval and timeout * Link low-level ping interval and timeout
*/ */
private static final long LINK_PING_INTERVAL = 34 * 1000L; // ms private static final long LINK_PING_INTERVAL = 55 * 1000L; // ms
private static final long LINK_UNREACHABLE_TIMEOUT = 2 * LINK_PING_INTERVAL; private static final long LINK_UNREACHABLE_TIMEOUT = 3 * LINK_PING_INTERVAL;
//private static final Logger logger = LoggerFactory.getLogger(RNSNetwork.class); //private static final Logger logger = LoggerFactory.getLogger(RNSNetwork.class);
@ -450,11 +455,14 @@ public class RNSNetwork {
private final AtomicLong nextConnectTaskTimestamp = new AtomicLong(0L); // ms - try first connect once NTP syncs private final AtomicLong nextConnectTaskTimestamp = new AtomicLong(0L); // ms - try first connect once NTP syncs
private final AtomicLong nextBroadcastTimestamp = new AtomicLong(0L); // ms - try first broadcast once NTP syncs private final AtomicLong nextBroadcastTimestamp = new AtomicLong(0L); // ms - try first broadcast once NTP syncs
private final AtomicLong nextPingTimestamp = new AtomicLong(0L); // ms - try first low-level Ping private final AtomicLong nextPingTimestamp = new AtomicLong(0L); // ms - try first low-level Ping
private final AtomicLong nextPruneTimestamp = new AtomicLong(0L); // ms - try first low-level Ping
private Iterator<SelectionKey> channelIterator = null; private Iterator<SelectionKey> channelIterator = null;
RNSNetworkProcessor(ExecutorService executor) { RNSNetworkProcessor(ExecutorService executor) {
super(executor); super(executor);
final Long now = NTP.getTime();
nextPruneTimestamp.set(now + PRUNE_INTERVAL/2);
} }
@Override @Override
@ -482,10 +490,17 @@ public class RNSNetwork {
return task; return task;
} }
//task = maybeProduceBroadcastTask(now); task = maybeProduceBroadcastTask(now);
//if (task != null) { if (task != null) {
// return task; return task;
//} }
// Prune stuck/slow/old peers (moved from Controller)
task = maybeProduceRNSPrunePeersTask(now);
if (task != null) {
return task;
}
return null; return null;
} }
@ -545,6 +560,15 @@ public class RNSNetwork {
nextBroadcastTimestamp.set(now + BROADCAST_INTERVAL); nextBroadcastTimestamp.set(now + BROADCAST_INTERVAL);
return new RNSBroadcastTask(); return new RNSBroadcastTask();
} }
private Task maybeProduceRNSPrunePeersTask(Long now) {
if (now == null || now < nextPruneTimestamp.get()) {
return null;
}
nextPruneTimestamp.set(now + PRUNE_INTERVAL);
return new RNSPrunePeersTask();
}
} }
private static class SingletonContainer { private static class SingletonContainer {
@ -565,6 +589,9 @@ public class RNSNetwork {
} }
public void removeLinkedPeer(RNSPeer peer) { public void removeLinkedPeer(RNSPeer peer) {
if (nonNull(peer.getPeerBuffer())) {
peer.getPeerBuffer().close();
}
if (nonNull(peer.getPeerLink())) { if (nonNull(peer.getPeerLink())) {
peer.getPeerLink().teardown(); peer.getPeerLink().teardown();
} }
@ -619,30 +646,59 @@ public class RNSNetwork {
// } // }
//} //}
private Boolean isUnreachable(RNSPeer peer) {
var result = peer.getDeleteMe();
var now = Instant.now();
var peerLastAccessTimestamp = peer.getLastAccessTimestamp();
if (peerLastAccessTimestamp.isBefore(now.minusMillis(LINK_UNREACHABLE_TIMEOUT))) {
result = true;
}
return result;
}
public List<RNSPeer> incomingNonActivePeers() {
var ips = getIncomingPeers();
List<RNSPeer> result = Collections.synchronizedList(new ArrayList<>());
Link pl;
for (RNSPeer p: ips) {
pl = p.getPeerLink();
if (nonNull(pl)) {
if (pl.getStatus() != ACTIVE) {
result.add(p);
}
} else {
result.add(p);
}
}
return result;
}
//@Synchronized //@Synchronized
public void prunePeers() throws DataException { public void prunePeers() throws DataException {
// run periodically (by the Controller) // run periodically (by the Controller)
var peerList = getLinkedPeers(); var peerList = getLinkedPeers();
//var peerList = getImmutableLinkedPeers(); var incomingPeerList = getIncomingPeers();
log.info("number of links (linkedPeers) before pruning: {}", peerList.size()); log.info("number of links (linkedPeers / incomingPeers) before prunig: {}, {}", peerList.size(),
incomingPeerList.size());
Link pLink; Link pLink;
LinkStatus lStatus; LinkStatus lStatus;
//final Long now = NTP.getTime(); var now = Instant.now();
Instant now = Instant.now();
for (RNSPeer p: peerList) { for (RNSPeer p: peerList) {
pLink = p.getPeerLink(); pLink = p.getPeerLink();
//var peerLastAccessTimestamp = p.getLastAccessTimestamp();
var peerLastPingResponseReceived = p.getLastPingResponseReceived();
log.info("prunePeers - pLink: {}, destinationHash: {}", log.info("prunePeers - pLink: {}, destinationHash: {}",
pLink, Hex.encodeHexString(p.getDestinationHash())); pLink, Hex.encodeHexString(p.getDestinationHash()));
log.debug("peer: {}", p); log.debug("peer: {}", p);
if (nonNull(pLink)) { if (nonNull(pLink)) {
if ((p.getPeerTimedOut()) || (p.getLastPingResponseReceived() > LINK_UNREACHABLE_TIMEOUT)) { if ((p.getPeerTimedOut()) && (peerLastPingResponseReceived.isBefore(now.minusMillis(LINK_UNREACHABLE_TIMEOUT)))) {
// close peer link for now // close peer link for now
pLink.teardown(); pLink.teardown();
} }
lStatus = pLink.getStatus(); lStatus = pLink.getStatus();
log.info("Link {} status: {}", pLink, lStatus); log.info("Link {} status: {}", pLink, lStatus);
// lStatus in: PENDING, HANDSHAKE, ACTIVE, STALE, CLOSED // lStatus in: PENDING, HANDSHAKE, ACTIVE, STALE, CLOSED
if ((lStatus == STALE) || (pLink.getTeardownReason() == TIMEOUT) || (p.getDeleteMe())) { if ((lStatus == STALE) || (pLink.getTeardownReason() == TIMEOUT) || (isUnreachable(p))) {
//p.shutdown(); //p.shutdown();
//peerList.remove(p); //peerList.remove(p);
removeLinkedPeer(p); removeLinkedPeer(p);
@ -653,67 +709,29 @@ public class RNSNetwork {
//peerList.remove(p); //peerList.remove(p);
removeLinkedPeer(p); removeLinkedPeer(p);
} }
// either reach peer or disable link
p.pingRemote();
} else { } else {
if (peerLastPingResponseReceived.isBefore(now.minusMillis(LINK_UNREACHABLE_TIMEOUT))) {
//peerList.remove(p); //peerList.remove(p);
removeLinkedPeer(p); removeLinkedPeer(p);
} }
} }
//var incomingPeerList = getImmutableIncomingPeers();
var incomingPeerList = getIncomingPeers();
for (RNSPeer ip: incomingPeerList) {
pLink = ip.getPeerLink();
//log.info("prunePeers - {} incoming peer: {}", pLink.getStatus(), ip);
if (nonNull(pLink)) {
if (pLink.getStatus() != ACTIVE) {
log.info("removing inactive incoming/non-initiator peer.");
removeIncomingPeer(ip);
} else {
log.info("prunePeers - {} incoming/non-initiator peer: {}", pLink.getStatus(), pLink);
}
}
else {
log.info("prunePeers - null incoming/non-initiator peer: {}", ip);
//removeIncomingPeer(ip);
} }
List<RNSPeer> inaps = incomingNonActivePeers();
//log.info("number of inactive incoming peers: {}", inaps.size());
//var incomingPeerList = getIncomingPeers();
//log.info("number of links (linkedPeers / incomingPeers) before prunig: {}, {}", peerList.size(),
// incomingPeerList.size());
for (RNSPeer p: inaps) {
incomingPeerList.remove(incomingPeerList.indexOf(p));
} }
//removeExpiredPeers(this.linkedPeers); //removeExpiredPeers(this.linkedPeers);
log.info("number of links (linkedPeers / incomingPeers) after prunig: {}, {}", peerList.size(), log.info("number of links (linkedPeers / incomingPeers) after prunig: {}, {}", peerList.size(),
incomingPeerList.size()); incomingPeerList.size());
//log.info("we have {} non-initiator links, list: {}", incomingLinks.size(), incomingLinks);
var activePeerCount = 0;
//var lps = RNSNetwork.getInstance().getLinkedPeers();
var ips = getImmutableLinkedPeers();
for (RNSPeer p: ips) {
pLink = p.getPeerLink();
if (now.minusMillis(LINK_UNREACHABLE_TIMEOUT).isAfter(p.getLastAccessTimestamp())) {
// Link was not accessed for too long
pLink.teardown();
}
//p.pingRemote();
//try {
// TimeUnit.SECONDS.sleep(2); // allow for peers to disconnect gracefully
//} catch (InterruptedException e) {
// log.error("exception: ", e);
//}
if ((nonNull(pLink) && (pLink.getStatus() == ACTIVE))) {
activePeerCount = activePeerCount + 1;
}
}
log.info("we have {} active peers (linkedPeers)", activePeerCount);
maybeAnnounce(getBaseDestination()); maybeAnnounce(getBaseDestination());
} }
//public void removeExpiredPeers(List<RNSPeer> peerList) {
// //List<RNSPeer> peerList = this.linkedPeers;
// for (RNSPeer p: peerList) {
// if (p.getPeerLink() == null) {
// peerList.remove(p);
// } else if (p.getPeerLink().getStatus() == STALE) {
// peerList.remove(p);
// }
// }
//}
public void maybeAnnounce(Destination d) { public void maybeAnnounce(Destination d) {
if (getLinkedPeers().size() < MIN_DESIRED_PEERS) { if (getLinkedPeers().size() < MIN_DESIRED_PEERS) {
d.announce(); d.announce();

View File

@ -89,7 +89,8 @@ public class RNSPeer {
Destination peerDestination; // OUT destination created for this Destination peerDestination; // OUT destination created for this
private Identity serverIdentity; private Identity serverIdentity;
@Setter(AccessLevel.PACKAGE) private Instant creationTimestamp; @Setter(AccessLevel.PACKAGE) private Instant creationTimestamp;
private Instant lastAccessTimestamp; @Setter(AccessLevel.PACKAGE) private Instant lastAccessTimestamp;
@Setter(AccessLevel.PACKAGE) private Instant lastLinkProbeTimestamp;
Link peerLink; Link peerLink;
byte[] peerLinkHash; byte[] peerLinkHash;
BufferedRWPair peerBuffer; BufferedRWPair peerBuffer;
@ -111,7 +112,7 @@ public class RNSPeer {
private byte[] messageMagic; // set in message creating classes private byte[] messageMagic; // set in message creating classes
private Long lastPing = null; // last (packet) ping roundtrip time [ms] private Long lastPing = null; // last (packet) ping roundtrip time [ms]
private Long lastPingSent = null; // time last (packet) ping was sent, or null if not started. private Long lastPingSent = null; // time last (packet) ping was sent, or null if not started.
@Setter(AccessLevel.PACKAGE) private Long lastPingResponseReceived = null; // time last (packet) ping succeeded @Setter(AccessLevel.PACKAGE) private Instant lastPingResponseReceived = null; // time last (packet) ping succeeded
private Map<Integer, BlockingQueue<Message>> replyQueues; private Map<Integer, BlockingQueue<Message>> replyQueues;
private LinkedBlockingQueue<Message> pendingMessages; private LinkedBlockingQueue<Message> pendingMessages;
// Versioning // Versioning
@ -155,7 +156,8 @@ public class RNSPeer {
this.serverIdentity = link.getRemoteIdentity(); this.serverIdentity = link.getRemoteIdentity();
this.creationTimestamp = Instant.now(); this.creationTimestamp = Instant.now();
this.lastAccessTimestamp = null; this.lastAccessTimestamp = Instant.now();
this.lastLinkProbeTimestamp = null;
this.isInitiator = false; this.isInitiator = false;
this.isVacant = false; this.isVacant = false;
@ -175,7 +177,8 @@ public class RNSPeer {
peerDestination.setProofStrategy(ProofStrategy.PROVE_ALL); peerDestination.setProofStrategy(ProofStrategy.PROVE_ALL);
this.creationTimestamp = Instant.now(); this.creationTimestamp = Instant.now();
this.lastAccessTimestamp = null; this.lastAccessTimestamp = Instant.now();
this.lastLinkProbeTimestamp = null;
this.isInitiator = true; this.isInitiator = true;
this.peerLink = new Link(peerDestination); this.peerLink = new Link(peerDestination);
@ -233,8 +236,9 @@ public class RNSPeer {
if (nonNull(this.peerLink)) { if (nonNull(this.peerLink)) {
log.info("shutdown - peerLink: {}, status: {}", peerLink, peerLink.getStatus()); log.info("shutdown - peerLink: {}, status: {}", peerLink, peerLink.getStatus());
if (peerLink.getStatus() == ACTIVE) { if (peerLink.getStatus() == ACTIVE) {
if (isFalse(this.isInitiator)) { if (nonNull(this.peerBuffer)) {
sendCloseToRemote(this.peerLink); this.peerBuffer.close();
this.peerBuffer = null;
} }
this.peerLink.teardown(); this.peerLink.teardown();
} else { } else {
@ -290,6 +294,7 @@ public class RNSPeer {
var msgText = new String(message, StandardCharsets.UTF_8); var msgText = new String(message, StandardCharsets.UTF_8);
if (msgText.equals("ping")) { if (msgText.equals("ping")) {
log.info("received ping on link"); log.info("received ping on link");
this.lastLinkProbeTimestamp = Instant.now();
} else if (msgText.startsWith("close::")) { } else if (msgText.startsWith("close::")) {
var targetPeerHash = subarray(message, 7, message.length); var targetPeerHash = subarray(message, 7, message.length);
log.info("peer dest hash: {}, target hash: {}", log.info("peer dest hash: {}, target hash: {}",
@ -297,6 +302,10 @@ public class RNSPeer {
encodeHexString(targetPeerHash)); encodeHexString(targetPeerHash));
if (Arrays.equals(destinationHash, targetPeerHash)) { if (Arrays.equals(destinationHash, targetPeerHash)) {
log.info("closing link: {}", peerLink.getDestination().getHexHash()); log.info("closing link: {}", peerLink.getDestination().getHexHash());
if (nonNull(this.peerBuffer)) {
this.peerBuffer.close();
this.peerBuffer = null;
}
peerLink.teardown(); peerLink.teardown();
} }
} else if (msgText.startsWith("open::")) { } else if (msgText.startsWith("open::")) {
@ -309,7 +318,6 @@ public class RNSPeer {
getOrInitPeerLink(); getOrInitPeerLink();
} }
} }
// TODO: process incoming packet....
} }
/* /*
@ -321,20 +329,20 @@ public class RNSPeer {
// get the message data // get the message data
byte[] data = this.peerBuffer.read(readyBytes); byte[] data = this.peerBuffer.read(readyBytes);
ByteBuffer bb = ByteBuffer.wrap(data); ByteBuffer bb = ByteBuffer.wrap(data);
log.info("data length: {}, MAGIC: {}, data: {}, ByteBuffer: {}", data.length, this.messageMagic, data, bb); //log.info("data length: {}, MAGIC: {}, data: {}, ByteBuffer: {}", data.length, this.messageMagic, data, bb);
//log.info("data length: {}, ByteBuffer: {}", data.length, bb); //log.info("data length: {}, MAGIC: {}, ByteBuffer: {}", data.length, this.messageMagic, bb);
//var pureData = Arrays.copyOfRange(data, this.messageMagic.length - 1, data.length); //log.trace("peerBufferReady - data bytes: {}", data.length);
log.trace("peerBufferReady - data bytes: {}", data.length);
this.lastAccessTimestamp = Instant.now(); this.lastAccessTimestamp = Instant.now();
if (ByteBuffer.wrap(data, 0, emptyBuffer.length).equals(ByteBuffer.wrap(emptyBuffer, 0, emptyBuffer.length))) { if (ByteBuffer.wrap(data, 0, emptyBuffer.length).equals(ByteBuffer.wrap(emptyBuffer, 0, emptyBuffer.length))) {
log.info("peerBufferReady - empty buffer detected (length: {})", data.length); log.info("peerBufferReady - empty buffer detected (length: {})", data.length);
//this.peerBuffer.flush(); }
} else { else {
try { try {
//log.info("***> creating message from {} bytes", data.length); //log.info("***> creating message from {} bytes", data.length);
Message message = Message.fromByteBuffer(bb); Message message = Message.fromByteBuffer(bb);
log.info("*=> type {} message received ({} bytes): {}", message.getType(), data.length, message); //log.info("*=> type {} message received ({} bytes): {}", message.getType(), data.length, message);
log.info("*=> type {} message received ({} bytes)", message.getType(), data.length);
// Handle message based on type // Handle message based on type
switch (message.getType()) { switch (message.getType()) {
// Do we need this ? (seems like a TCP scenario only thing) // Do we need this ? (seems like a TCP scenario only thing)
@ -345,6 +353,7 @@ public class RNSPeer {
// break; // break;
case PING: case PING:
this.lastPingResponseReceived = Instant.now();
if (isFalse(this.isInitiator)) { if (isFalse(this.isInitiator)) {
onPingMessage(this, message); onPingMessage(this, message);
// Note: buffer flush done in onPingMessage method // Note: buffer flush done in onPingMessage method
@ -353,13 +362,11 @@ public class RNSPeer {
case PONG: case PONG:
log.info("PONG received"); log.info("PONG received");
//this.peerBuffer.flush();
break; break;
// Do we need this ? (no need to relay peer list...) // Do we need this ? (no need to relay peer list...)
//case PEERS_V2: //case PEERS_V2:
// onPeersV2Message(peer, message); // onPeersV2Message(peer, message);
// this.peerBuffer.flush();
// break; // break;
default: default:
@ -367,7 +374,6 @@ public class RNSPeer {
// Bump up to controller for possible action // Bump up to controller for possible action
//Controller.getInstance().onNetworkMessage(peer, message); //Controller.getInstance().onNetworkMessage(peer, message);
Controller.getInstance().onRNSNetworkMessage(this, message); Controller.getInstance().onRNSNetworkMessage(this, message);
//this.peerBuffer.flush();
break; break;
} }
} catch (MessageException e) { } catch (MessageException e) {
@ -375,7 +381,6 @@ public class RNSPeer {
log.error("{} from peer {}", e, this); log.error("{} from peer {}", e, this);
log.info("{} from peer {}", e, this); log.info("{} from peer {}", e, this);
} }
//this.peerBuffer.flush(); // clear buffer
} }
} }

View File

@ -0,0 +1,27 @@
package org.qortal.network.task;
import org.qortal.controller.Controller;
//import org.qortal.network.RNSNetwork;
//import org.qortal.repository.DataException;
import org.qortal.utils.ExecuteProduceConsume.Task;
public class RNSPrunePeersTask implements Task {
public RNSPrunePeersTask() {
}
@Override
public String getName() {
return "PrunePeersTask";
}
@Override
public void perform() throws InterruptedException {
Controller.getInstance().doRNSPrunePeers();
//try {
// log.debug("Pruning peers...");
// RNSNetwork.getInstance().prunePeers();
//} catch (DataException e) {
// log.warn(String.format("Repository issue when trying to prune peers: %s", e.getMessage()));
//}
}
}

View File

@ -46,7 +46,8 @@ public class RNSPingTask implements Task {
// LOGGER.error(e.getMessage(), e); // LOGGER.error(e.getMessage(), e);
//} //}
// Note: We might use peer.sendMessage(pingMessage) instead // Note: We might use peer.sendMessage(pingMessage) instead
peer.getResponse(pingMessage); //peer.getResponse(pingMessage);
peer.sendMessage(pingMessage);
//// task is not over here (Reticulum is asynchronous) //// task is not over here (Reticulum is asynchronous)
//peer.setLastPing(NTP.getTime() - now); //peer.setLastPing(NTP.getTime() - now);