diff --git a/src/main/java/org/qortal/controller/Controller.java b/src/main/java/org/qortal/controller/Controller.java index 733286c1..c69bb2eb 100644 --- a/src/main/java/org/qortal/controller/Controller.java +++ b/src/main/java/org/qortal/controller/Controller.java @@ -767,7 +767,7 @@ public class Controller extends Thread { // // if (needsArchiveRebuildRNS && !canBootstrap) { // LOGGER.info("Start syncing from genesis (RNS)!"); - // List seeds = new ArrayList<>(RNSNetwork.getInstance().getImmutableActiveLinkedPeers()); + // List seeds = new ArrayList<>(RNSNetwork.getInstance().getActiveImmutableLinkedPeers()); // // // Check if have a qualified peer to sync // if (seeds.isEmpty()) { @@ -2985,7 +2985,7 @@ public class Controller extends Thread { return true; // Needs a mutable copy of the unmodifiableList - List peers = new ArrayList<>(RNSNetwork.getInstance().getImmutableActiveLinkedPeers()); + List peers = new ArrayList<>(RNSNetwork.getInstance().getActiveImmutableLinkedPeers()); if (peers == null) return false; diff --git a/src/main/java/org/qortal/controller/RNSSynchronizer.java b/src/main/java/org/qortal/controller/RNSSynchronizer.java index c28413e8..29453b3f 100644 --- a/src/main/java/org/qortal/controller/RNSSynchronizer.java +++ b/src/main/java/org/qortal/controller/RNSSynchronizer.java @@ -218,7 +218,7 @@ public class RNSSynchronizer extends Thread { return true; // Needs a mutable copy of the unmodifiableList - List peers = new ArrayList<>(RNSNetwork.getInstance().getImmutableActiveLinkedPeers()); + List peers = new ArrayList<>(RNSNetwork.getInstance().getActiveImmutableLinkedPeers()); //// Disregard peers that have "misbehaved" recently //peers.removeIf(Controller.hasMisbehaved); @@ -395,7 +395,7 @@ public class RNSSynchronizer extends Thread { } private boolean checkRecoveryModeForPeers(List qualifiedPeers) { - List linkedPeers = RNSNetwork.getInstance().getImmutableActiveLinkedPeers(); + List linkedPeers = RNSNetwork.getInstance().getActiveImmutableLinkedPeers(); if (!linkedPeers.isEmpty()) { // There is at least one handshaked peer diff --git a/src/main/java/org/qortal/network/RNSNetwork.java b/src/main/java/org/qortal/network/RNSNetwork.java index d0e3708b..17a9e973 100644 --- a/src/main/java/org/qortal/network/RNSNetwork.java +++ b/src/main/java/org/qortal/network/RNSNetwork.java @@ -252,7 +252,7 @@ public class RNSNetwork { } public void broadcast(Function peerMessageBuilder) { - for (RNSPeer peer : getImmutableActiveLinkedPeers()) { + for (RNSPeer peer : getActiveImmutableLinkedPeers()) { if (this.isShuttingDown) { return; } @@ -531,7 +531,7 @@ public class RNSNetwork { //// Note: we might not need this. All messages handled asynchronously in Reticulum //// (RNSPeer peerBufferReady callback) //private Task maybeProducePeerMessageTask() { - // return getImmutableActiveLinkedPeers().stream() + // return getActiveImmutableLinkedPeers().stream() // .map(RNSPeer::getMessageTask) // .filter(Objects::nonNull) // .findFirst() @@ -555,7 +555,7 @@ public class RNSNetwork { // log.info("ilp - {}", ilp); //} //return ilp; - return getImmutableActiveLinkedPeers().stream() + return getActiveImmutableLinkedPeers().stream() .map(peer -> peer.getPingTask(now)) .filter(Objects::nonNull) .findFirst() @@ -589,7 +589,7 @@ public class RNSNetwork { return SingletonContainer.INSTANCE; } - public List getImmutableActiveLinkedPeers() { + public List getActiveImmutableLinkedPeers() { List activePeers = Collections.synchronizedList(new ArrayList<>()); for (RNSPeer p: this.immutableLinkedPeers) { if (nonNull(p.getPeerLink()) && (p.getPeerLink().getStatus() == ACTIVE)) { @@ -615,7 +615,7 @@ public class RNSNetwork { if (nonNull(peer.getPeerLink())) { peer.getPeerLink().teardown(); } - this.linkedPeers.remove(peer); // thread safe + this.linkedPeers.remove(this.linkedPeers.indexOf(peer)); // thread safe this.immutableLinkedPeers = List.copyOf(this.linkedPeers); } @@ -635,7 +635,7 @@ public class RNSNetwork { if (nonNull(peer.getPeerLink())) { peer.getPeerLink().teardown(); } - this.incomingPeers.remove(peer); + this.incomingPeers.remove(this.incomingPeers.indexOf(peer)); this.immutableIncomingPeers = List.copyOf(this.incomingPeers); } @@ -676,7 +676,7 @@ public class RNSNetwork { //} } - public List incomingNonActivePeers() { + public List getNonActiveIncommingPeers() { var ips = getIncomingPeers(); List result = Collections.synchronizedList(new ArrayList<>()); Link pl; @@ -695,86 +695,61 @@ public class RNSNetwork { //@Synchronized public void prunePeers() throws DataException { - // run periodically (by the Controller) - var peerList = getImmutableLinkedPeers(); - var incomingPeerList = getImmutableIncomingPeers(); - //log.info("number of links (linkedPeers / incomingPeers) before prunig: {}, {}", peerList.size(), - // incomingPeerList.size()); - log.info("number of links (linkedPeers (active) / incomingPeers before prunig: {} ({}), {}", - getImmutableLinkedPeers().size(), getImmutableActiveLinkedPeers().size(), - getImmutableIncomingPeers().size()); // prune initiator peers - List lps = getImmutableLinkedPeers(); - for (RNSPeer p : lps) { + //var peerList = getImmutableLinkedPeers(); + var initiatorPeerList = getImmutableLinkedPeers(); + var initiatorActivePeerList = getActiveImmutableLinkedPeers(); + var incomingPeerList = getImmutableIncomingPeers(); + log.info("number of links (linkedPeers (active) / incomingPeers before prunig: {} ({}), {}", + initiatorPeerList.size(), getActiveImmutableLinkedPeers().size(), + incomingPeerList.size()); + for (RNSPeer p: initiatorActivePeerList) { + var pLink = p.getOrInitPeerLink(); + p.pingRemote(); + } + for (RNSPeer p : initiatorPeerList) { var pLink = p.getPeerLink(); if (nonNull(pLink)) { + if (p.getPeerTimedOut()) { + // options: keep in case peer reconnects or remove => we'll remove it + removeLinkedPeer(p); + continue; + } + if (pLink.getStatus() == ACTIVE) { + continue; + } + if (pLink.getStatus() == CLOSED) { + removeLinkedPeer(p); + continue; + } if (pLink.getStatus() == PENDING) { pLink.teardown(); - } - if (p.getPeerTimedOut()) { - pLink.teardown(); - } - log.info("peer link: {}, status: {}", pLink, pLink.getStatus()); - if (pLink.getStatus() == ACTIVE) { - p.pingRemote(); - } else { removeLinkedPeer(p); + continue; } } } - //Link pLink; - //LinkStatus lStatus; - //var now = Instant.now(); - //for (RNSPeer p: peerList) { - // pLink = p.getPeerLink(); - // var peerLastAccessTimestamp = p.getLastAccessTimestamp(); - // var peerLastPingResponseReceived = p.getLastPingResponseReceived(); - // log.info("peerLink: {}, status: {}", pLink, pLink.getStatus()); - // log.info("prunePeers - pLink: {}, destinationHash: {}", - // pLink, Hex.encodeHexString(p.getDestinationHash())); - // log.debug("peer: {}", p); - // if (nonNull(pLink)) { - // if ((p.getPeerTimedOut()) && (peerLastPingResponseReceived.isBefore(now.minusMillis(LINK_UNREACHABLE_TIMEOUT)))) { - // // close peer link for now - // pLink.teardown(); - // } - // lStatus = pLink.getStatus(); - // log.info("Link {} status: {}", pLink, lStatus); - // // lStatus in: PENDING, HANDSHAKE, ACTIVE, STALE, CLOSED - // if ((lStatus == STALE) || (pLink.getTeardownReason() == TIMEOUT) || (isUnreachable(p))) { - // //p.shutdown(); - // //peerList.remove(p); - // removeLinkedPeer(p); - // } else if (lStatus == HANDSHAKE) { - // // stuck in handshake state (do we need to shutdown/remove it?) - // log.info("peer status HANDSHAKE"); - // //p.shutdown(); - // //peerList.remove(p); - // removeLinkedPeer(p); - // } - // // either reach peer or disable link - // p.pingRemote(); - // } else { - // if (peerLastPingResponseReceived.isBefore(now.minusMillis(LINK_UNREACHABLE_TIMEOUT))) { - // //peerList.remove(p); - // removeLinkedPeer(p); - // } - // } - //} - List inaps = incomingNonActivePeers(); - //log.info("number of inactive incoming peers: {}", inaps.size()); + // prune non-initiator peers + List inaps = getNonActiveIncommingPeers(); for (RNSPeer p: inaps) { - //incomingPeerList.remove(incomingPeerList.indexOf(p)); + var pLink = p.getPeerLink(); + if (nonNull(pLink)) { + // could be eg. PENDING + pLink.teardown(); + } removeIncomingPeer(p); } + initiatorPeerList = getImmutableLinkedPeers(); + initiatorActivePeerList = getActiveImmutableLinkedPeers(); + incomingPeerList = getImmutableIncomingPeers(); log.info("number of links (linkedPeers (active) / incomingPeers after prunig: {} ({}), {}", - getImmutableLinkedPeers().size(), getImmutableActiveLinkedPeers().size(), - getImmutableIncomingPeers().size()); + initiatorPeerList.size(), getActiveImmutableLinkedPeers().size(), + incomingPeerList.size()); maybeAnnounce(getBaseDestination()); } public void maybeAnnounce(Destination d) { - if (getImmutableActiveLinkedPeers().size() < MIN_DESIRED_PEERS) { + if (getActiveImmutableLinkedPeers().size() < MIN_DESIRED_PEERS) { d.announce(); } } diff --git a/src/main/java/org/qortal/network/RNSPeer.java b/src/main/java/org/qortal/network/RNSPeer.java index dba40878..99177b64 100644 --- a/src/main/java/org/qortal/network/RNSPeer.java +++ b/src/main/java/org/qortal/network/RNSPeer.java @@ -99,9 +99,9 @@ public class RNSPeer { int sendStreamId = 0; private Boolean isInitiator; private Boolean deleteMe = false; - private Boolean isVacant = true; + //private Boolean isVacant = true; private Long lastPacketRtt = null; - private byte[] emptyBuffer = {0,0,0,0,0}; + //private byte[] emptyBuffer = {0,0,0,0,0}; private Double requestResponseProgress; @Setter(AccessLevel.PACKAGE) private Boolean peerTimedOut = false; @@ -144,7 +144,7 @@ public class RNSPeer { initPeerLink(); //setCreationTimestamp(System.currentTimeMillis()); this.creationTimestamp = Instant.now(); - this.isVacant = true; + //this.isVacant = true; this.replyQueues = new ConcurrentHashMap<>(); this.pendingMessages = new LinkedBlockingQueue<>(); this.peerData = new RNSPeerData(dhash); @@ -164,7 +164,7 @@ public class RNSPeer { this.lastAccessTimestamp = Instant.now(); this.lastLinkProbeTimestamp = null; this.isInitiator = false; - this.isVacant = false; + //this.isVacant = false; //this.peerLink.setLinkEstablishedCallback(this::linkEstablished); //this.peerLink.setLinkClosedCallback(this::linkClosed); @@ -223,8 +223,7 @@ public class RNSPeer { log.info("creating buffer - peerLink status: {}, channel: {}", this.peerLink.getStatus(), channel); this.peerBuffer = Buffer.createBidirectionalBuffer(receiveStreamId, sendStreamId, channel, this::peerBufferReady); } - //return getPeerBuffer(); - return this.peerBuffer; + return getPeerBuffer(); } public Link getOrInitPeerLink() { @@ -342,10 +341,10 @@ public class RNSPeer { //log.trace("peerBufferReady - data bytes: {}", data.length); this.lastAccessTimestamp = Instant.now(); - if (ByteBuffer.wrap(data, 0, emptyBuffer.length).equals(ByteBuffer.wrap(emptyBuffer, 0, emptyBuffer.length))) { - log.info("peerBufferReady - empty buffer detected (length: {})", data.length); - } - else { + //if (ByteBuffer.wrap(data, 0, emptyBuffer.length).equals(ByteBuffer.wrap(emptyBuffer, 0, emptyBuffer.length))) { + // log.info("peerBufferReady - empty buffer detected (length: {})", data.length); + //} + //else { try { //log.info("***> creating message from {} bytes", data.length); Message message = Message.fromByteBuffer(bb); @@ -415,7 +414,7 @@ public class RNSPeer { log.error("{} from peer {}", e, this); log.info("{} from peer {}", e, this); } - } + //} } /** @@ -499,12 +498,13 @@ public class RNSPeer { public void packetTimedOut(PacketReceipt receipt) { log.info("packet timed out, receipt status: {}", receipt.getStatus()); - //if (receipt.getStatus() == PacketReceiptStatus.FAILED) { - // this.peerTimedOut = true; - // this.peerLink.teardown(); - //} - this.peerTimedOut = true; - this.peerLink.teardown(); + if (receipt.getStatus() == PacketReceiptStatus.FAILED) { + log.info("packet timed out, receipt status: {}", PacketReceiptStatus.FAILED); + this.peerTimedOut = true; + this.peerLink.teardown(); + } + //this.peerTimedOut = true; + //this.peerLink.teardown(); } /** Link Request callbacks */