diff --git a/src/main/java/org/qortal/controller/Controller.java b/src/main/java/org/qortal/controller/Controller.java index 4f885e7e..b504c7fe 100644 --- a/src/main/java/org/qortal/controller/Controller.java +++ b/src/main/java/org/qortal/controller/Controller.java @@ -855,29 +855,29 @@ public class Controller extends Thread { repositoryMaintenanceInterval = getRandomRepositoryMaintenanceInterval(); } - // Prune stuck/slow/old peers - if (now >= prunePeersTimestamp + prunePeersInterval) { - prunePeersTimestamp = now + prunePeersInterval; + //// Prune stuck/slow/old peers + //if (now >= prunePeersTimestamp + prunePeersInterval) { + // prunePeersTimestamp = now + prunePeersInterval; + // + // try { + // LOGGER.debug("Pruning peers..."); + // Network.getInstance().prunePeers(); + // } catch (DataException e) { + // LOGGER.warn(String.format("Repository issue when trying to prune peers: %s", e.getMessage())); + // } + //} - try { - LOGGER.debug("Pruning peers..."); - Network.getInstance().prunePeers(); - } catch (DataException e) { - LOGGER.warn(String.format("Repository issue when trying to prune peers: %s", e.getMessage())); - } - } - - // Q: Do we need global pruning? - if (now >= pruneRNSPeersTimestamp + pruneRNSPeersInterval) { - pruneRNSPeersTimestamp = now + pruneRNSPeersInterval; - - try { - LOGGER.debug("Pruning Reticulum peers..."); - RNSNetwork.getInstance().prunePeers(); - } catch (DataException e) { - LOGGER.warn(String.format("Repository issue when trying to prune Reticulum peers: %s", e.getMessage())); - } - } + //// Q: Do we need global pruning? + //if (now >= pruneRNSPeersTimestamp + pruneRNSPeersInterval) { + // pruneRNSPeersTimestamp = now + pruneRNSPeersInterval; + // + // try { + // LOGGER.debug("Pruning Reticulum peers..."); + // RNSNetwork.getInstance().prunePeers(); + // } catch (DataException e) { + // LOGGER.warn(String.format("Repository issue when trying to prune Reticulum peers: %s", e.getMessage())); + // } + //} // Delete expired transactions if (now >= deleteExpiredTimestamp) { @@ -1280,6 +1280,17 @@ public class Controller extends Thread { } + public void doRNSPrunePeers() { + RNSNetwork network = RNSNetwork.getInstance(); + + try { + LOGGER.debug("Pruning peers..."); + network.prunePeers(); + } catch (DataException e) { + LOGGER.warn(String.format("Repository issue when trying to prune peers: %s", e.getMessage())); + } + } + public void onMintingPossibleChange(boolean isMintingPossible) { this.isMintingPossible = isMintingPossible; requestSysTrayUpdate = true; diff --git a/src/main/java/org/qortal/network/RNSNetwork.java b/src/main/java/org/qortal/network/RNSNetwork.java index 5e130b6b..c09af71c 100644 --- a/src/main/java/org/qortal/network/RNSNetwork.java +++ b/src/main/java/org/qortal/network/RNSNetwork.java @@ -82,6 +82,7 @@ import org.qortal.network.message.BlockSummariesV2Message; import org.qortal.network.message.TransactionSignaturesMessage; import org.qortal.network.message.GetUnconfirmedTransactionsMessage; import org.qortal.network.task.RNSBroadcastTask; +import org.qortal.network.task.RNSPrunePeersTask; import org.qortal.controller.Controller; import org.qortal.repository.Repository; import org.qortal.repository.RepositoryManager; @@ -135,11 +136,15 @@ public class RNSNetwork { * How long between informational broadcasts to all ACTIVE peers, in milliseconds. */ private static final long BROADCAST_INTERVAL = 30 * 1000L; // ms + /** + * How log between pruning of peers + */ + private static final long PRUNE_INTERVAL = 2 * 60 * 1000L; // ms /** * Link low-level ping interval and timeout */ - private static final long LINK_PING_INTERVAL = 34 * 1000L; // ms - private static final long LINK_UNREACHABLE_TIMEOUT = 2 * LINK_PING_INTERVAL; + private static final long LINK_PING_INTERVAL = 55 * 1000L; // ms + private static final long LINK_UNREACHABLE_TIMEOUT = 3 * LINK_PING_INTERVAL; //private static final Logger logger = LoggerFactory.getLogger(RNSNetwork.class); @@ -450,11 +455,14 @@ public class RNSNetwork { private final AtomicLong nextConnectTaskTimestamp = new AtomicLong(0L); // ms - try first connect once NTP syncs private final AtomicLong nextBroadcastTimestamp = new AtomicLong(0L); // ms - try first broadcast once NTP syncs private final AtomicLong nextPingTimestamp = new AtomicLong(0L); // ms - try first low-level Ping + private final AtomicLong nextPruneTimestamp = new AtomicLong(0L); // ms - try first low-level Ping private Iterator channelIterator = null; RNSNetworkProcessor(ExecutorService executor) { super(executor); + final Long now = NTP.getTime(); + nextPruneTimestamp.set(now + PRUNE_INTERVAL/2); } @Override @@ -482,10 +490,17 @@ public class RNSNetwork { return task; } - //task = maybeProduceBroadcastTask(now); - //if (task != null) { - // return task; - //} + task = maybeProduceBroadcastTask(now); + if (task != null) { + return task; + } + + // Prune stuck/slow/old peers (moved from Controller) + task = maybeProduceRNSPrunePeersTask(now); + if (task != null) { + return task; + } + return null; } @@ -541,10 +556,19 @@ public class RNSNetwork { if (now == null || now < nextBroadcastTimestamp.get()) { return null; } - + nextBroadcastTimestamp.set(now + BROADCAST_INTERVAL); return new RNSBroadcastTask(); } + + private Task maybeProduceRNSPrunePeersTask(Long now) { + if (now == null || now < nextPruneTimestamp.get()) { + return null; + } + + nextPruneTimestamp.set(now + PRUNE_INTERVAL); + return new RNSPrunePeersTask(); + } } private static class SingletonContainer { @@ -565,6 +589,9 @@ public class RNSNetwork { } public void removeLinkedPeer(RNSPeer peer) { + if (nonNull(peer.getPeerBuffer())) { + peer.getPeerBuffer().close(); + } if (nonNull(peer.getPeerLink())) { peer.getPeerLink().teardown(); } @@ -619,30 +646,59 @@ public class RNSNetwork { // } //} + private Boolean isUnreachable(RNSPeer peer) { + var result = peer.getDeleteMe(); + var now = Instant.now(); + var peerLastAccessTimestamp = peer.getLastAccessTimestamp(); + if (peerLastAccessTimestamp.isBefore(now.minusMillis(LINK_UNREACHABLE_TIMEOUT))) { + result = true; + } + return result; + } + + public List incomingNonActivePeers() { + var ips = getIncomingPeers(); + List result = Collections.synchronizedList(new ArrayList<>()); + Link pl; + for (RNSPeer p: ips) { + pl = p.getPeerLink(); + if (nonNull(pl)) { + if (pl.getStatus() != ACTIVE) { + result.add(p); + } + } else { + result.add(p); + } + } + return result; + } + //@Synchronized public void prunePeers() throws DataException { // run periodically (by the Controller) var peerList = getLinkedPeers(); - //var peerList = getImmutableLinkedPeers(); - log.info("number of links (linkedPeers) before pruning: {}", peerList.size()); + var incomingPeerList = getIncomingPeers(); + log.info("number of links (linkedPeers / incomingPeers) before prunig: {}, {}", peerList.size(), + incomingPeerList.size()); Link pLink; LinkStatus lStatus; - //final Long now = NTP.getTime(); - Instant now = Instant.now(); + var now = Instant.now(); for (RNSPeer p: peerList) { pLink = p.getPeerLink(); + //var peerLastAccessTimestamp = p.getLastAccessTimestamp(); + var peerLastPingResponseReceived = p.getLastPingResponseReceived(); log.info("prunePeers - pLink: {}, destinationHash: {}", pLink, Hex.encodeHexString(p.getDestinationHash())); log.debug("peer: {}", p); if (nonNull(pLink)) { - if ((p.getPeerTimedOut()) || (p.getLastPingResponseReceived() > LINK_UNREACHABLE_TIMEOUT)) { + if ((p.getPeerTimedOut()) && (peerLastPingResponseReceived.isBefore(now.minusMillis(LINK_UNREACHABLE_TIMEOUT)))) { // close peer link for now pLink.teardown(); } lStatus = pLink.getStatus(); log.info("Link {} status: {}", pLink, lStatus); // lStatus in: PENDING, HANDSHAKE, ACTIVE, STALE, CLOSED - if ((lStatus == STALE) || (pLink.getTeardownReason() == TIMEOUT) || (p.getDeleteMe())) { + if ((lStatus == STALE) || (pLink.getTeardownReason() == TIMEOUT) || (isUnreachable(p))) { //p.shutdown(); //peerList.remove(p); removeLinkedPeer(p); @@ -653,67 +709,29 @@ public class RNSNetwork { //peerList.remove(p); removeLinkedPeer(p); } + // either reach peer or disable link + p.pingRemote(); } else { - //peerList.remove(p); - removeLinkedPeer(p); - } - } - //var incomingPeerList = getImmutableIncomingPeers(); - var incomingPeerList = getIncomingPeers(); - for (RNSPeer ip: incomingPeerList) { - pLink = ip.getPeerLink(); - //log.info("prunePeers - {} incoming peer: {}", pLink.getStatus(), ip); - if (nonNull(pLink)) { - if (pLink.getStatus() != ACTIVE) { - log.info("removing inactive incoming/non-initiator peer."); - removeIncomingPeer(ip); - } else { - log.info("prunePeers - {} incoming/non-initiator peer: {}", pLink.getStatus(), pLink); + if (peerLastPingResponseReceived.isBefore(now.minusMillis(LINK_UNREACHABLE_TIMEOUT))) { + //peerList.remove(p); + removeLinkedPeer(p); } } - else { - log.info("prunePeers - null incoming/non-initiator peer: {}", ip); - //removeIncomingPeer(ip); - } + } + List inaps = incomingNonActivePeers(); + //log.info("number of inactive incoming peers: {}", inaps.size()); + //var incomingPeerList = getIncomingPeers(); + //log.info("number of links (linkedPeers / incomingPeers) before prunig: {}, {}", peerList.size(), + // incomingPeerList.size()); + for (RNSPeer p: inaps) { + incomingPeerList.remove(incomingPeerList.indexOf(p)); } //removeExpiredPeers(this.linkedPeers); log.info("number of links (linkedPeers / incomingPeers) after prunig: {}, {}", peerList.size(), incomingPeerList.size()); - //log.info("we have {} non-initiator links, list: {}", incomingLinks.size(), incomingLinks); - var activePeerCount = 0; - //var lps = RNSNetwork.getInstance().getLinkedPeers(); - var ips = getImmutableLinkedPeers(); - for (RNSPeer p: ips) { - pLink = p.getPeerLink(); - if (now.minusMillis(LINK_UNREACHABLE_TIMEOUT).isAfter(p.getLastAccessTimestamp())) { - // Link was not accessed for too long - pLink.teardown(); - } - //p.pingRemote(); - //try { - // TimeUnit.SECONDS.sleep(2); // allow for peers to disconnect gracefully - //} catch (InterruptedException e) { - // log.error("exception: ", e); - //} - if ((nonNull(pLink) && (pLink.getStatus() == ACTIVE))) { - activePeerCount = activePeerCount + 1; - } - } - log.info("we have {} active peers (linkedPeers)", activePeerCount); maybeAnnounce(getBaseDestination()); } - //public void removeExpiredPeers(List peerList) { - // //List peerList = this.linkedPeers; - // for (RNSPeer p: peerList) { - // if (p.getPeerLink() == null) { - // peerList.remove(p); - // } else if (p.getPeerLink().getStatus() == STALE) { - // peerList.remove(p); - // } - // } - //} - public void maybeAnnounce(Destination d) { if (getLinkedPeers().size() < MIN_DESIRED_PEERS) { d.announce(); diff --git a/src/main/java/org/qortal/network/RNSPeer.java b/src/main/java/org/qortal/network/RNSPeer.java index 45807692..6171f043 100644 --- a/src/main/java/org/qortal/network/RNSPeer.java +++ b/src/main/java/org/qortal/network/RNSPeer.java @@ -89,7 +89,8 @@ public class RNSPeer { Destination peerDestination; // OUT destination created for this private Identity serverIdentity; @Setter(AccessLevel.PACKAGE) private Instant creationTimestamp; - private Instant lastAccessTimestamp; + @Setter(AccessLevel.PACKAGE) private Instant lastAccessTimestamp; + @Setter(AccessLevel.PACKAGE) private Instant lastLinkProbeTimestamp; Link peerLink; byte[] peerLinkHash; BufferedRWPair peerBuffer; @@ -111,7 +112,7 @@ public class RNSPeer { private byte[] messageMagic; // set in message creating classes private Long lastPing = null; // last (packet) ping roundtrip time [ms] private Long lastPingSent = null; // time last (packet) ping was sent, or null if not started. - @Setter(AccessLevel.PACKAGE) private Long lastPingResponseReceived = null; // time last (packet) ping succeeded + @Setter(AccessLevel.PACKAGE) private Instant lastPingResponseReceived = null; // time last (packet) ping succeeded private Map> replyQueues; private LinkedBlockingQueue pendingMessages; // Versioning @@ -155,7 +156,8 @@ public class RNSPeer { this.serverIdentity = link.getRemoteIdentity(); this.creationTimestamp = Instant.now(); - this.lastAccessTimestamp = null; + this.lastAccessTimestamp = Instant.now(); + this.lastLinkProbeTimestamp = null; this.isInitiator = false; this.isVacant = false; @@ -175,7 +177,8 @@ public class RNSPeer { peerDestination.setProofStrategy(ProofStrategy.PROVE_ALL); this.creationTimestamp = Instant.now(); - this.lastAccessTimestamp = null; + this.lastAccessTimestamp = Instant.now(); + this.lastLinkProbeTimestamp = null; this.isInitiator = true; this.peerLink = new Link(peerDestination); @@ -233,8 +236,9 @@ public class RNSPeer { if (nonNull(this.peerLink)) { log.info("shutdown - peerLink: {}, status: {}", peerLink, peerLink.getStatus()); if (peerLink.getStatus() == ACTIVE) { - if (isFalse(this.isInitiator)) { - sendCloseToRemote(this.peerLink); + if (nonNull(this.peerBuffer)) { + this.peerBuffer.close(); + this.peerBuffer = null; } this.peerLink.teardown(); } else { @@ -290,6 +294,7 @@ public class RNSPeer { var msgText = new String(message, StandardCharsets.UTF_8); if (msgText.equals("ping")) { log.info("received ping on link"); + this.lastLinkProbeTimestamp = Instant.now(); } else if (msgText.startsWith("close::")) { var targetPeerHash = subarray(message, 7, message.length); log.info("peer dest hash: {}, target hash: {}", @@ -297,6 +302,10 @@ public class RNSPeer { encodeHexString(targetPeerHash)); if (Arrays.equals(destinationHash, targetPeerHash)) { log.info("closing link: {}", peerLink.getDestination().getHexHash()); + if (nonNull(this.peerBuffer)) { + this.peerBuffer.close(); + this.peerBuffer = null; + } peerLink.teardown(); } } else if (msgText.startsWith("open::")) { @@ -309,7 +318,6 @@ public class RNSPeer { getOrInitPeerLink(); } } - // TODO: process incoming packet.... } /* @@ -321,20 +329,20 @@ public class RNSPeer { // get the message data byte[] data = this.peerBuffer.read(readyBytes); ByteBuffer bb = ByteBuffer.wrap(data); - log.info("data length: {}, MAGIC: {}, data: {}, ByteBuffer: {}", data.length, this.messageMagic, data, bb); - //log.info("data length: {}, ByteBuffer: {}", data.length, bb); - //var pureData = Arrays.copyOfRange(data, this.messageMagic.length - 1, data.length); - log.trace("peerBufferReady - data bytes: {}", data.length); + //log.info("data length: {}, MAGIC: {}, data: {}, ByteBuffer: {}", data.length, this.messageMagic, data, bb); + //log.info("data length: {}, MAGIC: {}, ByteBuffer: {}", data.length, this.messageMagic, bb); + //log.trace("peerBufferReady - data bytes: {}", data.length); this.lastAccessTimestamp = Instant.now(); if (ByteBuffer.wrap(data, 0, emptyBuffer.length).equals(ByteBuffer.wrap(emptyBuffer, 0, emptyBuffer.length))) { log.info("peerBufferReady - empty buffer detected (length: {})", data.length); - //this.peerBuffer.flush(); - } else { + } + else { try { //log.info("***> creating message from {} bytes", data.length); Message message = Message.fromByteBuffer(bb); - log.info("*=> type {} message received ({} bytes): {}", message.getType(), data.length, message); + //log.info("*=> type {} message received ({} bytes): {}", message.getType(), data.length, message); + log.info("*=> type {} message received ({} bytes)", message.getType(), data.length); // Handle message based on type switch (message.getType()) { // Do we need this ? (seems like a TCP scenario only thing) @@ -345,6 +353,7 @@ public class RNSPeer { // break; case PING: + this.lastPingResponseReceived = Instant.now(); if (isFalse(this.isInitiator)) { onPingMessage(this, message); // Note: buffer flush done in onPingMessage method @@ -353,13 +362,11 @@ public class RNSPeer { case PONG: log.info("PONG received"); - //this.peerBuffer.flush(); break; // Do we need this ? (no need to relay peer list...) //case PEERS_V2: // onPeersV2Message(peer, message); - // this.peerBuffer.flush(); // break; default: @@ -367,7 +374,6 @@ public class RNSPeer { // Bump up to controller for possible action //Controller.getInstance().onNetworkMessage(peer, message); Controller.getInstance().onRNSNetworkMessage(this, message); - //this.peerBuffer.flush(); break; } } catch (MessageException e) { @@ -375,7 +381,6 @@ public class RNSPeer { log.error("{} from peer {}", e, this); log.info("{} from peer {}", e, this); } - //this.peerBuffer.flush(); // clear buffer } } diff --git a/src/main/java/org/qortal/network/RNSPrunePeersTask.java b/src/main/java/org/qortal/network/RNSPrunePeersTask.java new file mode 100644 index 00000000..f0da3ecc --- /dev/null +++ b/src/main/java/org/qortal/network/RNSPrunePeersTask.java @@ -0,0 +1,27 @@ +package org.qortal.network.task; + +import org.qortal.controller.Controller; +//import org.qortal.network.RNSNetwork; +//import org.qortal.repository.DataException; +import org.qortal.utils.ExecuteProduceConsume.Task; + +public class RNSPrunePeersTask implements Task { + public RNSPrunePeersTask() { + } + + @Override + public String getName() { + return "PrunePeersTask"; + } + + @Override + public void perform() throws InterruptedException { + Controller.getInstance().doRNSPrunePeers(); + //try { + // log.debug("Pruning peers..."); + // RNSNetwork.getInstance().prunePeers(); + //} catch (DataException e) { + // log.warn(String.format("Repository issue when trying to prune peers: %s", e.getMessage())); + //} + } +} diff --git a/src/main/java/org/qortal/network/task/RNSPingTask.java b/src/main/java/org/qortal/network/task/RNSPingTask.java index 705cdd1d..acef59a6 100644 --- a/src/main/java/org/qortal/network/task/RNSPingTask.java +++ b/src/main/java/org/qortal/network/task/RNSPingTask.java @@ -46,7 +46,8 @@ public class RNSPingTask implements Task { // LOGGER.error(e.getMessage(), e); //} // Note: We might use peer.sendMessage(pingMessage) instead - peer.getResponse(pingMessage); + //peer.getResponse(pingMessage); + peer.sendMessage(pingMessage); //// task is not over here (Reticulum is asynchronous) //peer.setLastPing(NTP.getTime() - now);