diff --git a/lib/io/reticulum/reticulum-network-stack/1.0-SNAPSHOT/maven-metadata-local.xml b/lib/io/reticulum/reticulum-network-stack/1.0-SNAPSHOT/maven-metadata-local.xml deleted file mode 100644 index 0c856aac..00000000 --- a/lib/io/reticulum/reticulum-network-stack/1.0-SNAPSHOT/maven-metadata-local.xml +++ /dev/null @@ -1,24 +0,0 @@ - - - io.reticulum - reticulum-network-stack - 1.0-SNAPSHOT - - - true - - 20250418180444 - - - jar - 1.0-SNAPSHOT - 20250418180444 - - - pom - 1.0-SNAPSHOT - 20241218212752 - - - - diff --git a/lib/io/reticulum/reticulum-network-stack/1.0-SNAPSHOT/reticulum-network-stack-1.0-SNAPSHOT.jar b/lib/io/reticulum/reticulum-network-stack/1.0-SNAPSHOT/reticulum-network-stack-1.0-SNAPSHOT.jar deleted file mode 100644 index e1d6ad48..00000000 Binary files a/lib/io/reticulum/reticulum-network-stack/1.0-SNAPSHOT/reticulum-network-stack-1.0-SNAPSHOT.jar and /dev/null differ diff --git a/lib/io/reticulum/reticulum-network-stack/1.0-SNAPSHOT/reticulum-network-stack-1.0-SNAPSHOT.pom b/lib/io/reticulum/reticulum-network-stack/1.0-SNAPSHOT/reticulum-network-stack-1.0-SNAPSHOT.pom deleted file mode 100644 index 1b1cc206..00000000 --- a/lib/io/reticulum/reticulum-network-stack/1.0-SNAPSHOT/reticulum-network-stack-1.0-SNAPSHOT.pom +++ /dev/null @@ -1,9 +0,0 @@ - - - 4.0.0 - io.reticulum - reticulum-network-stack - 1.0-SNAPSHOT - POM was created from install:install-file - diff --git a/lib/io/reticulum/reticulum-network-stack/maven-metadata-local.xml b/lib/io/reticulum/reticulum-network-stack/maven-metadata-local.xml deleted file mode 100644 index 45dbe562..00000000 --- a/lib/io/reticulum/reticulum-network-stack/maven-metadata-local.xml +++ /dev/null @@ -1,11 +0,0 @@ - - - io.reticulum - reticulum-network-stack - - - 1.0-SNAPSHOT - - 20250418180444 - - diff --git a/pom.xml b/pom.xml index 82e0f42f..721892e5 100644 --- a/pom.xml +++ b/pom.xml @@ -52,6 +52,7 @@ 3.5.2 3.25.3 1.5.3 + ace00f8 1.17 2.0.10 5.18.2 @@ -512,6 +513,12 @@ altcoinj ${altcoinj.version} + + + com.github.sergst83 + reticulum-network-stack + ${reticulum.version} + com.googlecode.json-simple @@ -805,12 +812,6 @@ jaxb-runtime ${jaxb-runtime.version} - - - io.reticulum - reticulum-network-stack - 1.0-SNAPSHOT - com.fasterxml.jackson.core jackson-databind diff --git a/src/main/java/org/qortal/controller/Controller.java b/src/main/java/org/qortal/controller/Controller.java index ae2d6a99..37bc981d 100644 --- a/src/main/java/org/qortal/controller/Controller.java +++ b/src/main/java/org/qortal/controller/Controller.java @@ -543,8 +543,8 @@ public class Controller extends Thread { LOGGER.info("Starting synchronizer"); Synchronizer.getInstance().start(); - //LOGGER.info("Starting synchronizer over Reticulum"); - //RNSSynchronizer.getInstance().start(); + LOGGER.info("Starting synchronizer over Reticulum"); + RNSSynchronizer.getInstance().start(); LOGGER.info("Starting block minter"); blockMinter = new BlockMinter(); @@ -743,6 +743,73 @@ public class Controller extends Thread { } } }, 3*60*1000, 3*60*1000); + //Timer syncFromGenesisRNS = new Timer(); + //syncFromGenesisRNS.schedule(new TimerTask() { + // @Override + // public void run() { + // LOGGER.debug("Start sync from genesis check (RNS)."); + // boolean canBootstrap = Settings.getInstance().getBootstrap(); + // boolean needsArchiveRebuildRNS = false; + // int checkHeightRNS = 0; + // + // try (final Repository repository = RepositoryManager.getRepository()){ + // needsArchiveRebuildRNS = (repository.getBlockArchiveRepository().fromHeight(2) == null); + // checkHeightRNS = repository.getBlockRepository().getBlockchainHeight(); + // } catch (DataException e) { + // throw new RuntimeException(e); + // } + // + // if (canBootstrap || !needsArchiveRebuildRNS || checkHeightRNS > 3) { + // LOGGER.debug("Bootstrapping is enabled or we have more than 2 blocks, cancel sync from genesis check."); + // syncFromGenesisRNS.cancel(); + // return; + // } + // + // if (needsArchiveRebuildRNS && !canBootstrap) { + // LOGGER.info("Start syncing from genesis (RNS)!"); + // List seeds = new ArrayList<>(RNSNetwork.getInstance().getActiveImmutableLinkedPeers()); + // + // // Check if have a qualified peer to sync + // if (seeds.isEmpty()) { + // LOGGER.info("No connected RNSPeer(s), will try again later."); + // return; + // } + // + // int index = new SecureRandom().nextInt(seeds.size()); + // RNSPeer syncPeer = seeds.get(index); + // var syncPeerLinkAsString = syncPeer.getPeerLink().toString(); + // //String syncNode = String.valueOf(seeds.get(index)); + // //PeerAddress peerAddress = PeerAddress.fromString(syncNode); + // //InetSocketAddress resolvedAddress = null; + // // + // //try { + // // resolvedAddress = peerAddress.toSocketAddress(); + // //} catch (UnknownHostException e) { + // // throw new RuntimeException(e); + // //} + // // + // //InetSocketAddress finalResolvedAddress = resolvedAddress; + // //Peer targetPeer = seeds.stream().filter(peer -> peer.getResolvedAddress().equals(finalResolvedAddress)).findFirst().orElse(null); + // //RNSPeer targetPeerRNS = seeds.stream().findFirst().orElse(null); + // RNSPeer targetPeerRNS = seeds.stream().filter(peer -> peer.getPeerLink().toString().equals(syncPeerLinkAsString)).findFirst().orElse(null); + // RNSSynchronizer.SynchronizationResult syncResultRNS; + // + // try { + // do { + // try { + // syncResultRNS = RNSSynchronizer.getInstance().actuallySynchronize(targetPeerRNS, true); + // } catch (InterruptedException e) { + // throw new RuntimeException(e); + // } + // } + // while (syncResultRNS == RNSSynchronizer.SynchronizationResult.OK); + // } finally { + // // We are syncing now, so can cancel the check + // syncFromGenesisRNS.cancel(); + // } + // } + // } + //}, 3*60*1000, 3*60*1000); } /** Called by AdvancedInstaller's launch EXE in single-instance mode, when an instance is already running. */ @@ -858,29 +925,29 @@ public class Controller extends Thread { repositoryMaintenanceInterval = getRandomRepositoryMaintenanceInterval(); } - //// Prune stuck/slow/old peers - //if (now >= prunePeersTimestamp + prunePeersInterval) { - // prunePeersTimestamp = now + prunePeersInterval; - // - // try { - // LOGGER.debug("Pruning peers..."); - // Network.getInstance().prunePeers(); - // } catch (DataException e) { - // LOGGER.warn(String.format("Repository issue when trying to prune peers: %s", e.getMessage())); - // } - //} + // Prune stuck/slow/old peers + if (now >= prunePeersTimestamp + prunePeersInterval) { + prunePeersTimestamp = now + prunePeersInterval; - //// Q: Do we need global pruning? - //if (now >= pruneRNSPeersTimestamp + pruneRNSPeersInterval) { - // pruneRNSPeersTimestamp = now + pruneRNSPeersInterval; - // - // try { - // LOGGER.debug("Pruning Reticulum peers..."); - // RNSNetwork.getInstance().prunePeers(); - // } catch (DataException e) { - // LOGGER.warn(String.format("Repository issue when trying to prune Reticulum peers: %s", e.getMessage())); - // } - //} + try { + LOGGER.debug("Pruning peers..."); + Network.getInstance().prunePeers(); + } catch (DataException e) { + LOGGER.warn(String.format("Repository issue when trying to prune peers: %s", e.getMessage())); + } + } + + // Q: Do we need global pruning? + if (now >= pruneRNSPeersTimestamp + pruneRNSPeersInterval) { + pruneRNSPeersTimestamp = now + pruneRNSPeersInterval; + + try { + LOGGER.debug("Pruning Reticulum peers..."); + RNSNetwork.getInstance().prunePeers(); + } catch (DataException e) { + LOGGER.warn(String.format("Repository issue when trying to prune Reticulum peers: %s", e.getMessage())); + } + } // Delete expired transactions if (now >= deleteExpiredTimestamp) { @@ -1171,6 +1238,7 @@ public class Controller extends Thread { LOGGER.info("Shutting down synchronizer"); Synchronizer.getInstance().shutdown(); + RNSSynchronizer.getInstance().shutdown(); LOGGER.info("Shutting down API"); ApiService.getInstance().stop(); @@ -2311,22 +2379,22 @@ public class Controller extends Thread { // OnlineAccountsManager.getInstance().onNetworkOnlineAccountsV3Message(peer, message); // break; - //// TODO: Compiles but much of the Manager details need to be rethought for Reticulum - //case GET_ARBITRARY_DATA: - // // Not currently supported - // break; - //// - //case ARBITRARY_DATA_FILE_LIST: - // RNSArbitraryDataFileListManager.getInstance().onNetworkArbitraryDataFileListMessage(peer, message); - // break; - // - //case GET_ARBITRARY_DATA_FILE: - // RNSArbitraryDataFileManager.getInstance().onNetworkGetArbitraryDataFileMessage(peer, message); - // break; - // - //case GET_ARBITRARY_DATA_FILE_LIST: - // RNSArbitraryDataFileListManager.getInstance().onNetworkGetArbitraryDataFileListMessage(peer, message); - // break; + // TODO: Compiles but rethink for Reticulum + case GET_ARBITRARY_DATA: + // Not currently supported + break; + + case ARBITRARY_DATA_FILE_LIST: + RNSArbitraryDataFileListManager.getInstance().onNetworkArbitraryDataFileListMessage(peer, message); + break; + + case GET_ARBITRARY_DATA_FILE: + RNSArbitraryDataFileManager.getInstance().onNetworkGetArbitraryDataFileMessage(peer, message); + break; + + case GET_ARBITRARY_DATA_FILE_LIST: + RNSArbitraryDataFileListManager.getInstance().onNetworkGetArbitraryDataFileListMessage(peer, message); + break; // case ARBITRARY_SIGNATURES: // Not currently supported @@ -2665,7 +2733,7 @@ public class Controller extends Thread { peer.setChainTipData(newChainTipData); // Potentially synchronize - Synchronizer.getInstance().requestSync(); + RNSSynchronizer.getInstance().requestSync(); } private void onRNSNetworkBlockSummariesV2Message(RNSPeer peer, Message message) { @@ -2696,7 +2764,7 @@ public class Controller extends Thread { peer.setChainTipSummaries(blockSummariesV2Message.getBlockSummaries()); // Potentially synchronize - Synchronizer.getInstance().requestSync(); + RNSSynchronizer.getInstance().requestSync(); } // ************ @@ -2918,7 +2986,7 @@ public class Controller extends Thread { return true; // Needs a mutable copy of the unmodifiableList - List peers = new ArrayList<>(RNSNetwork.getInstance().getImmutableActiveLinkedPeers()); + List peers = new ArrayList<>(RNSNetwork.getInstance().getActiveImmutableLinkedPeers()); if (peers == null) return false; diff --git a/src/main/java/org/qortal/controller/RNSSynchronizer.java b/src/main/java/org/qortal/controller/RNSSynchronizer.java index c28413e8..29453b3f 100644 --- a/src/main/java/org/qortal/controller/RNSSynchronizer.java +++ b/src/main/java/org/qortal/controller/RNSSynchronizer.java @@ -218,7 +218,7 @@ public class RNSSynchronizer extends Thread { return true; // Needs a mutable copy of the unmodifiableList - List peers = new ArrayList<>(RNSNetwork.getInstance().getImmutableActiveLinkedPeers()); + List peers = new ArrayList<>(RNSNetwork.getInstance().getActiveImmutableLinkedPeers()); //// Disregard peers that have "misbehaved" recently //peers.removeIf(Controller.hasMisbehaved); @@ -395,7 +395,7 @@ public class RNSSynchronizer extends Thread { } private boolean checkRecoveryModeForPeers(List qualifiedPeers) { - List linkedPeers = RNSNetwork.getInstance().getImmutableActiveLinkedPeers(); + List linkedPeers = RNSNetwork.getInstance().getActiveImmutableLinkedPeers(); if (!linkedPeers.isEmpty()) { // There is at least one handshaked peer diff --git a/src/main/java/org/qortal/controller/arbitrary/RNSArbitraryDataFileManager.java b/src/main/java/org/qortal/controller/arbitrary/RNSArbitraryDataFileManager.java index fc68fdec..ad2b1f89 100644 --- a/src/main/java/org/qortal/controller/arbitrary/RNSArbitraryDataFileManager.java +++ b/src/main/java/org/qortal/controller/arbitrary/RNSArbitraryDataFileManager.java @@ -204,12 +204,12 @@ public class RNSArbitraryDataFileManager extends Thread { Message getArbitraryDataFileMessage = new GetArbitraryDataFileMessage(signature, hash); Message response = null; - //// TODO - revisit (doesn't work with Reticulum) - //try { - // response = peer.getResponseWithTimeout(getArbitraryDataFileMessage, (int) ArbitraryDataManager.ARBITRARY_REQUEST_TIMEOUT); - //} catch (InterruptedException e) { - // // Will return below due to null response - //} + // TODO - revisit with RNS + try { + response = peer.getResponseWithTimeout(getArbitraryDataFileMessage, (int) ArbitraryDataManager.ARBITRARY_REQUEST_TIMEOUT); + } catch (InterruptedException e) { + // Will return below due to null response + } arbitraryDataFileRequests.remove(hash58); LOGGER.trace(String.format("Removed hash %.8s from arbitraryDataFileRequests", hash58)); diff --git a/src/main/java/org/qortal/controller/arbitrary/RNSArbitraryDataFileRequestThread.java b/src/main/java/org/qortal/controller/arbitrary/RNSArbitraryDataFileRequestThread.java new file mode 100644 index 00000000..2a477395 --- /dev/null +++ b/src/main/java/org/qortal/controller/arbitrary/RNSArbitraryDataFileRequestThread.java @@ -0,0 +1,130 @@ +package org.qortal.controller.arbitrary; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.qortal.controller.Controller; +import org.qortal.data.arbitrary.RNSArbitraryFileListResponseInfo; +import org.qortal.data.transaction.ArbitraryTransactionData; +import org.qortal.event.DataMonitorEvent; +import org.qortal.event.EventBus; +import org.qortal.network.RNSPeer; +import org.qortal.repository.DataException; +import org.qortal.repository.Repository; +import org.qortal.repository.RepositoryManager; +import org.qortal.utils.ArbitraryTransactionUtils; +import org.qortal.utils.Base58; +import org.qortal.utils.NTP; + +import java.util.Arrays; +import java.util.Comparator; +import java.util.Iterator; + +import static java.lang.Thread.NORM_PRIORITY; + +public class RNSArbitraryDataFileRequestThread implements Runnable { + + private static final Logger LOGGER = LogManager.getLogger(ArbitraryDataFileRequestThread.class); + + public RNSArbitraryDataFileRequestThread() { + + } + + @Override + public void run() { + Thread.currentThread().setName("Arbitrary Data File Request Thread"); + Thread.currentThread().setPriority(NORM_PRIORITY); + + try { + while (!Controller.isStopping()) { + Long now = NTP.getTime(); + this.processFileHashes(now); + } + } catch (InterruptedException e) { + // Fall-through to exit thread... + } + } + + private void processFileHashes(Long now) throws InterruptedException { + if (Controller.isStopping()) { + return; + } + + RNSArbitraryDataFileManager arbitraryDataFileManager = RNSArbitraryDataFileManager.getInstance(); + String signature58 = null; + String hash58 = null; + RNSPeer peer = null; + boolean shouldProcess = false; + + synchronized (arbitraryDataFileManager.arbitraryDataFileHashResponses) { + if (!arbitraryDataFileManager.arbitraryDataFileHashResponses.isEmpty()) { + + // Sort by lowest number of node hops first + Comparator lowestHopsFirstComparator = + Comparator.comparingInt(RNSArbitraryFileListResponseInfo::getRequestHops); + arbitraryDataFileManager.arbitraryDataFileHashResponses.sort(lowestHopsFirstComparator); + + Iterator iterator = arbitraryDataFileManager.arbitraryDataFileHashResponses.iterator(); + while (iterator.hasNext()) { + if (Controller.isStopping()) { + return; + } + + RNSArbitraryFileListResponseInfo responseInfo = (RNSArbitraryFileListResponseInfo) iterator.next(); + if (responseInfo == null) { + iterator.remove(); + continue; + } + + hash58 = responseInfo.getHash58(); + peer = responseInfo.getPeer(); + signature58 = responseInfo.getSignature58(); + Long timestamp = responseInfo.getTimestamp(); + + if (now - timestamp >= ArbitraryDataManager.ARBITRARY_RELAY_TIMEOUT || signature58 == null || peer == null) { + // Ignore - to be deleted + iterator.remove(); + continue; + } + + // Skip if already requesting, but don't remove, as we might want to retry later + if (arbitraryDataFileManager.arbitraryDataFileRequests.containsKey(hash58)) { + // Already requesting - leave this attempt for later + continue; + } + + // We want to process this file + shouldProcess = true; + iterator.remove(); + break; + } + } + } + + if (!shouldProcess) { + // Nothing to do + Thread.sleep(1000L); + return; + } + + byte[] hash = Base58.decode(hash58); + byte[] signature = Base58.decode(signature58); + + // Fetch the transaction data + try (final Repository repository = RepositoryManager.getRepository()) { + ArbitraryTransactionData arbitraryTransactionData = ArbitraryTransactionUtils.fetchTransactionData(repository, signature); + if (arbitraryTransactionData == null) { + return; + } + + if (signature == null || hash == null || peer == null || arbitraryTransactionData == null) { + return; + } + + LOGGER.trace("Fetching file {} from peer {} via request thread...", hash58, peer); + arbitraryDataFileManager.fetchArbitraryDataFiles(repository, peer, signature, arbitraryTransactionData, Arrays.asList(hash)); + + } catch (DataException e) { + LOGGER.debug("Unable to process file hashes: {}", e.getMessage()); + } + } +} diff --git a/src/main/java/org/qortal/data/arbitrary/RNSArbitraryRelayInfo.java b/src/main/java/org/qortal/data/arbitrary/RNSArbitraryRelayInfo.java index cc8a0f90..c344f13f 100644 --- a/src/main/java/org/qortal/data/arbitrary/RNSArbitraryRelayInfo.java +++ b/src/main/java/org/qortal/data/arbitrary/RNSArbitraryRelayInfo.java @@ -61,7 +61,7 @@ public class RNSArbitraryRelayInfo { if (other == this) return true; - if (!(other instanceof ArbitraryRelayInfo)) + if (!(other instanceof RNSArbitraryRelayInfo)) return false; RNSArbitraryRelayInfo otherRelayInfo = (RNSArbitraryRelayInfo) other; diff --git a/src/main/java/org/qortal/network/RNSNetwork.java b/src/main/java/org/qortal/network/RNSNetwork.java index 85d26bf6..5a3bf28f 100644 --- a/src/main/java/org/qortal/network/RNSNetwork.java +++ b/src/main/java/org/qortal/network/RNSNetwork.java @@ -24,7 +24,7 @@ import static io.reticulum.link.TeardownSession.TIMEOUT; import static io.reticulum.link.LinkStatus.ACTIVE; import static io.reticulum.link.LinkStatus.STALE; import static io.reticulum.link.LinkStatus.CLOSED; -//import static io.reticulum.link.LinkStatus.PENDING; +import static io.reticulum.link.LinkStatus.PENDING; import static io.reticulum.link.LinkStatus.HANDSHAKE; //import static io.reticulum.packet.PacketContextType.LINKCLOSE; //import static io.reticulum.identity.IdentityKnownDestination.recall; @@ -72,7 +72,7 @@ import java.util.Objects; import java.util.function.Function; import java.time.Instant; -import org.apache.commons.codec.binary.Hex; +import static org.apache.commons.codec.binary.Hex.encodeHexString; import org.qortal.utils.ExecuteProduceConsume; import org.qortal.utils.ExecuteProduceConsume.StatsSnapshot; import org.qortal.utils.NTP; @@ -218,7 +218,7 @@ public class RNSNetwork { // APP_NAME, // "qdn" //); - log.info("Destination {} {} running", Hex.encodeHexString(baseDestination.getHash()), baseDestination.getName()); + log.info("Destination {} {} running", encodeHexString(baseDestination.getHash()), baseDestination.getName()); baseDestination.setProofStrategy(ProofStrategy.PROVE_ALL); baseDestination.setAcceptLinkRequests(true); @@ -228,7 +228,7 @@ public class RNSNetwork { log.debug("announceHandlers: {}", Transport.getInstance().getAnnounceHandlers()); // do a first announce baseDestination.announce(); - log.debug("Sent initial announce from {} ({})", Hex.encodeHexString(baseDestination.getHash()), baseDestination.getName()); + log.debug("Sent initial announce from {} ({})", encodeHexString(baseDestination.getHash()), baseDestination.getName()); // Start up first networking thread (the "server loop", the "Tasks engine") rnsNetworkEPC.start(); @@ -252,7 +252,7 @@ public class RNSNetwork { } public void broadcast(Function peerMessageBuilder) { - for (RNSPeer peer : getImmutableActiveLinkedPeers()) { + for (RNSPeer peer : getActiveImmutableLinkedPeers()) { if (this.isShuttingDown) { return; } @@ -306,7 +306,7 @@ public class RNSNetwork { } // Disconnect peers gracefully and terminate Reticulum for (RNSPeer p: linkedPeers) { - log.info("shutting down peer: {}", Hex.encodeHexString(p.getDestinationHash())); + log.info("shutting down peer: {}", encodeHexString(p.getDestinationHash())); //log.debug("peer: {}", p); p.shutdown(); try { @@ -355,7 +355,7 @@ public class RNSNetwork { rttString = String.format("%d miliseconds", rtt); } log.info("Shutdown packet confirmation received from {}, round-trip time is {}", - Hex.encodeHexString(receipt.getDestination().getHash()), rttString); + encodeHexString(receipt.getDestination().getHash()), rttString); } } @@ -366,14 +366,14 @@ public class RNSNetwork { public void clientConnected(Link link) { //link.setLinkClosedCallback(this::clientDisconnected); //link.setPacketCallback(this::serverPacketReceived); - log.info("clientConnected - link hash: {}, {}", link.getHash(), Hex.encodeHexString(link.getHash())); + log.info("clientConnected - link hash: {}, {}", link.getHash(), encodeHexString(link.getHash())); RNSPeer newPeer = new RNSPeer(link); newPeer.setPeerLinkHash(link.getHash()); newPeer.setMessageMagic(getMessageMagic()); // make sure the peer has a channel and buffer newPeer.getOrInitPeerBuffer(); - incomingPeers.add(newPeer); - log.info("***> Client connected, link: {}", link); + addIncomingPeer(newPeer); + log.info("***> Client connected, link: {}", encodeHexString(link.getLinkId())); } public void clientDisconnected(Link link) { @@ -382,7 +382,7 @@ public class RNSNetwork { public void serverPacketReceived(byte[] message, Packet packet) { var msgText = new String(message, StandardCharsets.UTF_8); - log.info("Received data on link - message: {}, destinationHash: {}", msgText, Hex.encodeHexString(packet.getDestinationHash())); + log.info("Received data on link - message: {}, destinationHash: {}", msgText, encodeHexString(packet.getDestinationHash())); } //public void announceBaseDestination () { @@ -401,7 +401,7 @@ public class RNSNetwork { var peerExists = false; var activePeerCount = 0; - log.info("Received an announce from {}", Hex.encodeHexString(destinationHash)); + log.info("Received an announce from {}", encodeHexString(destinationHash)); if (nonNull(appData)) { log.debug("The announce contained the following app data: {}", new String(appData, UTF_8)); @@ -409,7 +409,7 @@ public class RNSNetwork { // add to peer list if we can use more peers //synchronized (this) { - var lps = RNSNetwork.getInstance().getLinkedPeers(); + var lps = RNSNetwork.getInstance().getImmutableLinkedPeers(); for (RNSPeer p: lps) { var pl = p.getPeerLink(); if ((nonNull(pl) && (pl.getStatus() == ACTIVE))) { @@ -421,7 +421,8 @@ public class RNSNetwork { if (Arrays.equals(p.getDestinationHash(), destinationHash)) { log.info("QAnnounceHandler - peer exists - found peer matching destinationHash"); if (nonNull(p.getPeerLink())) { - log.info("peer link: {}, status: {}", p.getPeerLink(), p.getPeerLink().getStatus()); + log.info("peer link: {}, status: {}", + encodeHexString(p.getPeerLink().getLinkId()), p.getPeerLink().getStatus()); } peerExists = true; if (p.getPeerLink().getStatus() != ACTIVE) { @@ -430,7 +431,12 @@ public class RNSNetwork { break; } else { if (nonNull(p.getPeerLink())) { - log.info("QAnnounceHandler - other peer - link: {}, status: {}", p.getPeerLink(), p.getPeerLink().getStatus()); + log.info("QAnnounceHandler - other peer - link: {}, status: {}", + encodeHexString(p.getPeerLink().getLinkId()), p.getPeerLink().getStatus()); + if (p.getPeerLink().getStatus() == CLOSED) { + // mark peer for deletion on nexe pruning + p.setDeleteMe(true); + } } else { log.info("QAnnounceHandler - peer link is null"); } @@ -442,7 +448,7 @@ public class RNSNetwork { newPeer.setIsInitiator(true); newPeer.setMessageMagic(getMessageMagic()); addLinkedPeer(newPeer); - log.info("added new RNSPeer, destinationHash: {}", Hex.encodeHexString(destinationHash)); + log.info("added new RNSPeer, destinationHash: {}", encodeHexString(destinationHash)); } } // Chance to announce instead of waiting for next pruning. @@ -487,12 +493,6 @@ public class RNSNetwork { //} final Long now = NTP.getTime(); - - // Prune stuck/slow/old peers (moved from Controller) - task = maybeProduceRNSPrunePeersTask(now); - if (task != null) { - return task; - } // ping task (Link+Channel+Buffer) task = maybeProducePeerPingTask(now); @@ -505,11 +505,11 @@ public class RNSNetwork { return task; } - // Prune stuck/slow/old peers (moved from Controller) - task = maybeProduceRNSPrunePeersTask(now); - if (task != null) { - return task; - } + //// Prune stuck/slow/old peers (moved from Controller) + //task = maybeProduceRNSPrunePeersTask(now); + //if (task != null) { + // return task; + //} return null; } @@ -531,7 +531,7 @@ public class RNSNetwork { //// Note: we might not need this. All messages handled asynchronously in Reticulum //// (RNSPeer peerBufferReady callback) //private Task maybeProducePeerMessageTask() { - // return getImmutableActiveLinkedPeers().stream() + // return getActiveImmutableLinkedPeers().stream() // .map(RNSPeer::getMessageTask) // .filter(Objects::nonNull) // .findFirst() @@ -555,7 +555,7 @@ public class RNSNetwork { // log.info("ilp - {}", ilp); //} //return ilp; - return getImmutableActiveLinkedPeers().stream() + return getActiveImmutableLinkedPeers().stream() .map(peer -> peer.getPingTask(now)) .filter(Objects::nonNull) .findFirst() @@ -589,7 +589,7 @@ public class RNSNetwork { return SingletonContainer.INSTANCE; } - public List getImmutableActiveLinkedPeers() { + public List getActiveImmutableLinkedPeers() { List activePeers = Collections.synchronizedList(new ArrayList<>()); for (RNSPeer p: this.immutableLinkedPeers) { if (nonNull(p.getPeerLink()) && (p.getPeerLink().getStatus() == ACTIVE)) { @@ -599,9 +599,10 @@ public class RNSNetwork { return activePeers; } - public List getImmutableLinkedPeers() { - return this.immutableLinkedPeers; - } + // note: we already have a lobok getter for this + //public List getImmutableLinkedPeers() { + // return this.immutableLinkedPeers; + //} public void addLinkedPeer(RNSPeer peer) { this.linkedPeers.add(peer); @@ -609,22 +610,23 @@ public class RNSNetwork { } public void removeLinkedPeer(RNSPeer peer) { - if (nonNull(peer.getPeerBuffer())) { - peer.getPeerBuffer().close(); - } + //if (nonNull(peer.getPeerBuffer())) { + // peer.getPeerBuffer().close(); + //} if (nonNull(peer.getPeerLink())) { peer.getPeerLink().teardown(); } - this.linkedPeers.remove(peer); // thread safe + var p = this.linkedPeers.remove(this.linkedPeers.indexOf(peer)); // thread safe this.immutableLinkedPeers = List.copyOf(this.linkedPeers); } - public List getLinkedPeers() { - //synchronized(this.linkedPeers) { - //return new ArrayList<>(this.linkedPeers); - return this.linkedPeers; - //} - } + // note: we already have a lobok getter for this + //public List getLinkedPeers() { + // //synchronized(this.linkedPeers) { + // //return new ArrayList<>(this.linkedPeers); + // return this.linkedPeers; + // //} + //} public void addIncomingPeer(RNSPeer peer) { this.incomingPeers.add(peer); @@ -635,37 +637,20 @@ public class RNSNetwork { if (nonNull(peer.getPeerLink())) { peer.getPeerLink().teardown(); } - this.incomingPeers.remove(peer); + var p = this.incomingPeers.remove(this.incomingPeers.indexOf(peer)); this.immutableIncomingPeers = List.copyOf(this.incomingPeers); } - public List getIncomingPeers() { - return this.incomingPeers; - } - - public List getImmutableIncomingPeers() { - return this.immutableIncomingPeers; - } + // note: we already have a lobok getter for this + //public List getIncomingPeers() { + // return this.incomingPeers; + //} + //public List getImmutableIncomingPeers() { + // return this.immutableIncomingPeers; + //} // TODO, methods for: getAvailablePeer - // maintenance - //public void removePeer(RNSPeer peer) { - // synchronized(this) { - // List peerList = this.linkedPeers; - // log.info("removing peer {} on peer shutdown", peer); - // peerList.remove(peer); - // } - //} - - //public void pingPeer(RNSPeer peer) { - // if (nonNull(peer)) { - // peer.pingRemote(); - // } else { - // log.error("peer argument is null"); - // } - //} - private Boolean isUnreachable(RNSPeer peer) { var result = peer.getDeleteMe(); var now = Instant.now(); @@ -693,7 +678,7 @@ public class RNSNetwork { //} } - public List incomingNonActivePeers() { + public List getNonActiveIncomingPeers() { var ips = getIncomingPeers(); List result = Collections.synchronizedList(new ArrayList<>()); Link pl; @@ -712,76 +697,74 @@ public class RNSNetwork { //@Synchronized public void prunePeers() throws DataException { - // run periodically (by the Controller) - var peerList = getLinkedPeers(); - var incomingPeerList = getIncomingPeers(); - log.info("number of links (linkedPeers / incomingPeers) before prunig: {}, {}", peerList.size(), - incomingPeerList.size()); // prune initiator peers - List lps = getLinkedPeers(); - for (RNSPeer p : lps) { + //var peerList = getImmutableLinkedPeers(); + var initiatorPeerList = getImmutableLinkedPeers(); + var initiatorActivePeerList = getActiveImmutableLinkedPeers(); + var incomingPeerList = getImmutableIncomingPeers(); + var numActiveIncomingPeers = incomingPeerList.size() - getNonActiveIncomingPeers().size(); + log.info("number of links (linkedPeers (active) / incomingPeers (active) before prunig: {} ({}), {} ({})", + initiatorPeerList.size(), getActiveImmutableLinkedPeers().size(), + incomingPeerList.size(), numActiveIncomingPeers); + for (RNSPeer p: initiatorActivePeerList) { + var pLink = p.getOrInitPeerLink(); + p.pingRemote(); + } + for (RNSPeer p : initiatorPeerList) { var pLink = p.getPeerLink(); if (nonNull(pLink)) { - log.info("peer link: {}, status: {}", pLink, pLink.getStatus()); - if (pLink.getStatus() == ACTIVE) { - p.pingRemote(); - } if (p.getPeerTimedOut()) { + // options: keep in case peer reconnects or remove => we'll remove it + removeLinkedPeer(p); + continue; + } + if (pLink.getStatus() == ACTIVE) { + continue; + } + if ((pLink.getStatus() == CLOSED) || (p.getDeleteMe())) { + removeLinkedPeer(p); + continue; + } + if (pLink.getStatus() == PENDING) { pLink.teardown(); + removeLinkedPeer(p); + continue; } } } - //Link pLink; - //LinkStatus lStatus; - //var now = Instant.now(); - //for (RNSPeer p: peerList) { - // pLink = p.getPeerLink(); - // var peerLastAccessTimestamp = p.getLastAccessTimestamp(); - // var peerLastPingResponseReceived = p.getLastPingResponseReceived(); - // log.info("peerLink: {}, status: {}", pLink, pLink.getStatus()); - // log.info("prunePeers - pLink: {}, destinationHash: {}", - // pLink, Hex.encodeHexString(p.getDestinationHash())); - // log.debug("peer: {}", p); - // if (nonNull(pLink)) { - // if ((p.getPeerTimedOut()) && (peerLastPingResponseReceived.isBefore(now.minusMillis(LINK_UNREACHABLE_TIMEOUT)))) { - // // close peer link for now - // pLink.teardown(); - // } - // lStatus = pLink.getStatus(); - // log.info("Link {} status: {}", pLink, lStatus); - // // lStatus in: PENDING, HANDSHAKE, ACTIVE, STALE, CLOSED - // if ((lStatus == STALE) || (pLink.getTeardownReason() == TIMEOUT) || (isUnreachable(p))) { - // //p.shutdown(); - // //peerList.remove(p); - // removeLinkedPeer(p); - // } else if (lStatus == HANDSHAKE) { - // // stuck in handshake state (do we need to shutdown/remove it?) - // log.info("peer status HANDSHAKE"); - // //p.shutdown(); - // //peerList.remove(p); - // removeLinkedPeer(p); - // } - // // either reach peer or disable link - // p.pingRemote(); - // } else { - // if (peerLastPingResponseReceived.isBefore(now.minusMillis(LINK_UNREACHABLE_TIMEOUT))) { - // //peerList.remove(p); - // removeLinkedPeer(p); - // } - // } - //} - List inaps = incomingNonActivePeers(); - //log.info("number of inactive incoming peers: {}", inaps.size()); - for (RNSPeer p: inaps) { - incomingPeerList.remove(incomingPeerList.indexOf(p)); + // prune non-initiator peers + List inaps = getNonActiveIncomingPeers(); + incomingPeerList = this.incomingPeers; + for (RNSPeer p: incomingPeerList) { + var pLink = p.getOrInitPeerLink(); + if (nonNull(pLink) && (pLink.getStatus() == ACTIVE)) { + // make false active links to timeout (and teardown in timeout callback) + // note: actual removal of peer happens on the following pruning run. + p.pingRemote(); + } } - log.info("number of links (linkedPeers / incomingPeers) after prunig: {}, {}", peerList.size(), - incomingPeerList.size()); + for (RNSPeer p: inaps) { + var pLink = p.getPeerLink(); + if (nonNull(pLink)) { + // could be eg. PENDING + pLink.teardown(); + } + removeIncomingPeer(p); + } + initiatorPeerList = getImmutableLinkedPeers(); + initiatorActivePeerList = getActiveImmutableLinkedPeers(); + incomingPeerList = getImmutableIncomingPeers(); + numActiveIncomingPeers = incomingPeerList.size() - getNonActiveIncomingPeers().size(); + log.info("number of links (linkedPeers (active) / incomingPeers (active) after prunig: {} ({}), {} ({})", + initiatorPeerList.size(), getActiveImmutableLinkedPeers().size(), + incomingPeerList.size(), numActiveIncomingPeers); maybeAnnounce(getBaseDestination()); } public void maybeAnnounce(Destination d) { - if (getLinkedPeers().size() < MIN_DESIRED_PEERS) { + var activePeers = getActiveImmutableLinkedPeers().size(); + if (activePeers <= MIN_DESIRED_PEERS) { + log.info("Active peers ({}) <= desired peers ({}). Announcing", activePeers, MIN_DESIRED_PEERS); d.announce(); } } @@ -798,7 +781,7 @@ public class RNSNetwork { var pLink = p.getPeerLink(); if (nonNull(pLink)) { if (Arrays.equals(pLink.getDestination().getHash(),link.getDestination().getHash())) { - log.info("found peer matching destinationHash: {}", Hex.encodeHexString(link.getDestination().getHash())); + log.info("found peer matching destinationHash: {}", encodeHexString(link.getDestination().getHash())); peer = p; break; } @@ -813,7 +796,7 @@ public class RNSNetwork { RNSPeer peer = null; for (RNSPeer p : lps) { if (Arrays.equals(p.getDestinationHash(), dhash)) { - log.info("found peer matching destinationHash: {}", Hex.encodeHexString(dhash)); + log.info("found peer matching destinationHash: {}", encodeHexString(dhash)); peer = p; break; } @@ -832,6 +815,14 @@ public class RNSNetwork { return Settings.getInstance().isTestNet() ? TESTNET_MESSAGE_MAGIC : MAINNET_MESSAGE_MAGIC; } + public String getOurNodeId() { + return this.serverIdentity.toString(); + } + + protected byte[] getOurPublicKey() { + return this.serverIdentity.getPublicKey(); + } + // Network methods Reticulum implementation /** Builds either (legacy) HeightV2Message or (newer) BlockSummariesV2Message, depending on peer version. diff --git a/src/main/java/org/qortal/network/RNSPeer.java b/src/main/java/org/qortal/network/RNSPeer.java index 81d693d5..a9de972e 100644 --- a/src/main/java/org/qortal/network/RNSPeer.java +++ b/src/main/java/org/qortal/network/RNSPeer.java @@ -7,9 +7,7 @@ import static java.util.Objects.isNull; import static java.util.Objects.nonNull; //import java.io.IOException; import java.time.Instant; -import java.util.Arrays; -import java.util.List; -import java.util.Collections; +import java.util.*; //import io.reticulum.Reticulum; //import org.qortal.network.RNSNetwork; @@ -53,8 +51,7 @@ import org.qortal.utils.NTP; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; -import java.util.Map; -import java.util.Random; +import static java.nio.charset.StandardCharsets.UTF_8; import java.util.concurrent.*; import java.util.Arrays; @@ -73,6 +70,7 @@ import lombok.AccessLevel; //import org.qortal.network.message.Message; //import org.qortal.network.message.MessageException; +import java.util.concurrent.atomic.LongAdder; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -99,17 +97,17 @@ public class RNSPeer { int sendStreamId = 0; private Boolean isInitiator; private Boolean deleteMe = false; - private Boolean isVacant = true; + //private Boolean isVacant = true; private Long lastPacketRtt = null; - private byte[] emptyBuffer = {0,0,0,0,0}; + //private byte[] emptyBuffer = {0,0,0,0,0}; private Double requestResponseProgress; @Setter(AccessLevel.PACKAGE) private Boolean peerTimedOut = false; // for qortal networking private static final int RESPONSE_TIMEOUT = 3000; // [ms] - private static final int PING_INTERVAL = 34_000; // [ms] - private static final long LINK_PING_INTERVAL = 34 * 1000L; // ms + private static final int PING_INTERVAL = 55_000; // [ms] + private static final long LINK_PING_INTERVAL = 55 * 1000L; // ms private byte[] messageMagic; // set in message creating classes private Long lastPing = null; // last (packet) ping roundtrip time [ms] private Long lastPingSent = null; // time last (packet) ping was sent, or null if not started. @@ -117,11 +115,13 @@ public class RNSPeer { private Map> replyQueues; private LinkedBlockingQueue pendingMessages; private boolean syncInProgress = false; + private RNSPeerData peerData = null; + private long linkEstablishedTime = -1L; // equivalent of (tcpip) Peer 'handshakeComplete' // Versioning public static final Pattern VERSION_PATTERN = Pattern.compile(Controller.VERSION_PREFIX + "(\\d{1,3})\\.(\\d{1,5})\\.(\\d{1,5})"); - - private RNSPeerData peerData = null; + /* Pending signature requests */ + private List pendingSignatureRequests = Collections.synchronizedList(new ArrayList<>()); /** * Latest block info as reported by peer. */ @@ -134,6 +134,20 @@ public class RNSPeer { * Last time we detected this peer as TOO_DIVERGENT */ private Long lastTooDivergentTime; + ///** + // * Known starting sequences for data received over buffer + // */ + //private byte[] SEQ_REQUEST_CONFIRM_ID = new byte[]{0x53, 0x52, 0x65, 0x71, 0x43, 0x49, 0x44}; // SReqCID + //private byte[] SEQ_RESPONSE_CONFIRM_ID = new byte[]{0x53, 0x52, 0x65, 0x73, 0x70, 0x43, 0x49, 0x44}; // SRespCID + + // Message stats + private static class MessageStats { + public final LongAdder count = new LongAdder(); + public final LongAdder totalBytes = new LongAdder(); + } + + private final Map receivedMessageStats = new ConcurrentHashMap<>(); + private final Map sentMessageStats = new ConcurrentHashMap<>(); /** * Constructor for initiator peers @@ -144,7 +158,7 @@ public class RNSPeer { initPeerLink(); //setCreationTimestamp(System.currentTimeMillis()); this.creationTimestamp = Instant.now(); - this.isVacant = true; + //this.isVacant = true; this.replyQueues = new ConcurrentHashMap<>(); this.pendingMessages = new LinkedBlockingQueue<>(); this.peerData = new RNSPeerData(dhash); @@ -164,7 +178,7 @@ public class RNSPeer { this.lastAccessTimestamp = Instant.now(); this.lastLinkProbeTimestamp = null; this.isInitiator = false; - this.isVacant = false; + //this.isVacant = false; //this.peerLink.setLinkEstablishedCallback(this::linkEstablished); //this.peerLink.setLinkClosedCallback(this::linkClosed); @@ -206,7 +220,7 @@ public class RNSPeer { public BufferedRWPair getOrInitPeerBuffer() { var channel = this.peerLink.getChannel(); if (nonNull(this.peerBuffer)) { - log.info("peerBuffer exists: {}, link status: {}", this.peerBuffer, this.peerLink.getStatus()); + //log.info("peerBuffer exists: {}, link status: {}", this.peerBuffer, this.peerLink.getStatus()); try { log.trace("peerBuffer exists: {}, link status: {}", this.peerBuffer, this.peerLink.getStatus()); } catch (IllegalStateException e) { @@ -223,8 +237,7 @@ public class RNSPeer { log.info("creating buffer - peerLink status: {}, channel: {}", this.peerLink.getStatus(), channel); this.peerBuffer = Buffer.createBidirectionalBuffer(receiveStreamId, sendStreamId, channel, this::peerBufferReady); } - //return getPeerBuffer(); - return this.peerBuffer; + return getPeerBuffer(); } public Link getOrInitPeerLink() { @@ -269,9 +282,10 @@ public class RNSPeer { /** Link callbacks */ public void linkEstablished(Link link) { + this.linkEstablishedTime = System.currentTimeMillis(); link.setLinkClosedCallback(this::linkClosed); log.info("peerLink {} established (link: {}) with peer: hash - {}, link destination hash: {}", - peerLink, link, encodeHexString(destinationHash), + encodeHexString(peerLink.getLinkId()), encodeHexString(link.getLinkId()), encodeHexString(destinationHash), encodeHexString(link.getDestination().getHash())); if (isInitiator) { startPings(); @@ -286,12 +300,12 @@ public class RNSPeer { } else if (link.getTeardownReason() == INITIATOR_CLOSED) { log.info("Link closed callback: The initiator closed the link"); log.info("peerLink {} closed (link: {}), link destination hash: {}", - peerLink, link, encodeHexString(link.getDestination().getHash())); + encodeHexString(peerLink.getLinkId()), encodeHexString(link.getLinkId()), encodeHexString(link.getDestination().getHash())); this.peerBuffer = null; } else if (link.getTeardownReason() == DESTINATION_CLOSED) { log.info("Link closed callback: The link was closed by the peer, removing peer"); log.info("peerLink {} closed (link: {}), link destination hash: {}", - peerLink, link, encodeHexString(link.getDestination().getHash())); + encodeHexString(peerLink.getLinkId()), encodeHexString(link.getLinkId()), encodeHexString(link.getDestination().getHash())); this.peerBuffer = null; } else { log.info("Link closed callback"); @@ -342,15 +356,27 @@ public class RNSPeer { //log.trace("peerBufferReady - data bytes: {}", data.length); this.lastAccessTimestamp = Instant.now(); - if (ByteBuffer.wrap(data, 0, emptyBuffer.length).equals(ByteBuffer.wrap(emptyBuffer, 0, emptyBuffer.length))) { - log.info("peerBufferReady - empty buffer detected (length: {})", data.length); - } - else { + //if (ByteBuffer.wrap(data, 0, emptyBuffer.length).equals(ByteBuffer.wrap(emptyBuffer, 0, emptyBuffer.length))) { + // log.info("peerBufferReady - empty buffer detected (length: {})", data.length); + //} + //else { + //if (Arrays.equals(SEQ_REQUEST_CONFIRM_ID, Arrays.copyOfRange(data, 0, SEQ_REQUEST_CONFIRM_ID.length))) { + // // a non-initiator peer requested to confirm sending of a packet + // var messageId = subarray(data, SEQ_REQUEST_CONFIRM_ID.length + 1, data.length); + // log.info("received request to confirm message id, id: {}", messageId); + // var confirmData = concatArrays(SEQ_RESPONSE_CONFIRM_ID, "::",data.getBytes(UTF_8), messageId.getBytes(UTF_8)); + // this.peerBuffer.write(confirmData); + // this.peerBuffer.flush(); + //} else if (Arrays.equals(SEQ_RESPONSE_CONFIRM_ID, Arrays.copyOfRange(data, 0, SEQ_RESPONSE_CONFIRM_ID.lenth))) { + // // an initiator peer receiving the confirmation + // var messageId = subarray(data, SEQ_RESPONSE_CONFIRM_ID.length + 1, data.length); + // this.replyQueues.remove(messageId); + //} else { try { //log.info("***> creating message from {} bytes", data.length); Message message = Message.fromByteBuffer(bb); //log.info("*=> type {} message received ({} bytes): {}", message.getType(), data.length, message); - log.info("*=> type {} message received ({} bytes)", message.getType(), data.length); + log.info("*=> type {} message received ({} bytes, id: {})", message.getType(), data.length, message.getId()); // Handle message based on type switch (message.getType()) { @@ -365,12 +391,12 @@ public class RNSPeer { this.lastPingResponseReceived = Instant.now(); if (isFalse(this.isInitiator)) { onPingMessage(this, message); - // Note: buffer flush done in onPingMessage method } break; case PONG: - //log.info("PONG received"); + log.trace("PONG received"); + addToQueue(message); // as response in blocking queue for ping getResponse break; // Do we need this ? (no need to relay peer list...) @@ -381,45 +407,44 @@ public class RNSPeer { case BLOCK_SUMMARIES: // from Synchronizer addToQueue(message); - break; case BLOCK_SUMMARIES_V2: // from Synchronizer addToQueue(message); - break; case SIGNATURES: // from Synchronizer addToQueue(message); - break; case BLOCK: // from Synchronizer addToQueue(message); - break; case BLOCK_V2: // from Synchronizer addToQueue(message); - break; default: log.info("default - type {} message received ({} bytes)", message.getType(), data.length); // Bump up to controller for possible action - //Controller.getInstance().onNetworkMessage(peer, message); + addToQueue(message); Controller.getInstance().onRNSNetworkMessage(this, message); break; } } catch (MessageException e) { //log.error("{} from peer {}", e.getMessage(), this); - log.error("{} from peer {}", e, this); - log.info("{} from peer {}", e, this); + log.error("{} from peer {}, closing link", e, this); + //log.info("{} from peer {}", e, this); + // don't take any chances: + // can happen if link is closed by peer in which case we close this side of the link + this.peerData.setLastMisbehaved(NTP.getTime()); + shutdown(); } - } + //} } /** - * we need to queue all incomming messages that follow request/response + * we need to queue all incoming messages that follow request/response * with explicit handling of the response message. */ public void addToQueue(Message message) { @@ -431,7 +456,8 @@ public class RNSPeer { if (queue != null) { // Adding message to queue will unblock thread waiting for response this.replyQueues.get(message.getId()).add(message); - // Consumed elsewhere + // Consumed elsewhere (getResponseWithTimeout) + log.info("addToQueue - queue size: {}, message type: {} (id: {})", queue.size(), message.getType(), message.getId()); } else if (!this.pendingMessages.offer(message)) { log.info("[{}] Busy, no room to queue message from peer {} - discarding", @@ -491,18 +517,24 @@ public class RNSPeer { } else { rttString = String.format("%d milliseconds", rtt); } - log.info("Valid reply received from {}, round-trip time is {}", - encodeHexString(receipt.getDestination().getHash()), rttString); + if (getIsInitiator()) { + // reporting round trip time in one direction is enough + log.info("Valid reply received from {}, round-trip time is {}", + encodeHexString(receipt.getDestination().getHash()), rttString); + } this.lastAccessTimestamp = Instant.now(); } } public void packetTimedOut(PacketReceipt receipt) { - log.info("packet timed out, receipt status: {}", receipt.getStatus()); + //log.info("packet timed out, receipt status: {}", receipt.getStatus()); if (receipt.getStatus() == PacketReceiptStatus.FAILED) { + log.info("packet timed out, receipt status: {}", PacketReceiptStatus.FAILED); this.peerTimedOut = true; this.peerLink.teardown(); } + //this.peerTimedOut = true; + //this.peerLink.teardown(); } /** Link Request callbacks */ @@ -532,27 +564,12 @@ public class RNSPeer { log.debug("Resource transfer complete"); } - ///** - // * Send a message using the peer buffer - // */ - //public Message getResponse(Message message) throws InterruptedException { - // var peerBuffer = getOrInitPeerBuffer(); - // - // //// send message - // //peerBuffer.write(...); - // //peerBuffer.flush(); - // - // // receive - peerBufferReady callback result - //} - /** Utility methods */ public void pingRemote() { var link = this.peerLink; - //if (nonNull(link) & (isFalse(link.isInitiator()))) { - //if (nonNull(link) & link.isInitiator()) { if (nonNull(link)) { if (peerLink.getStatus() == ACTIVE) { - log.info("pinging remote (direct, 1 packet): {}", link); + log.info("pinging remote (direct, 1 packet): {}", encodeHexString(link.getLinkId())); var data = "ping".getBytes(UTF_8); link.setPacketCallback(this::linkPacketReceived); Packet pingPacket = new Packet(link, data); @@ -584,7 +601,7 @@ public class RNSPeer { try { PongMessage pongMessage = new PongMessage(); - pongMessage.setId(message.getId()); // use the ping message id + pongMessage.setId(message.getId()); // use the ping message id (for ping getResponse) this.peerBuffer.write(pongMessage.toBytes()); this.peerBuffer.flush(); this.lastAccessTimestamp = Instant.now(); @@ -607,7 +624,7 @@ public class RNSPeer { * @throws InterruptedException if interrupted while waiting */ public Message getResponse(Message message) throws InterruptedException { - log.info("RNSPingTask action - pinging peer {}", encodeHexString(getDestinationHash())); + //log.info("RNSPingTask action - pinging peer {}", encodeHexString(getDestinationHash())); return getResponseWithTimeout(message, RESPONSE_TIMEOUT); } @@ -627,7 +644,6 @@ public class RNSPeer { */ public Message getResponseWithTimeout(Message message, int timeout) throws InterruptedException { BlockingQueue blockingQueue = new ArrayBlockingQueue<>(1); - // TODO: implement equivalent of Peer class... // Assign random ID to this message Random random = new Random(); int id; @@ -638,17 +654,20 @@ public class RNSPeer { // If putIfAbsent() doesn't return null, then this ID is already taken } while (this.replyQueues.putIfAbsent(id, blockingQueue) != null); message.setId(id); + //log.info("getResponse - before send {} message, random id is {}", message.getType(), id); // Try to send message if (!this.sendMessageWithTimeout(message, timeout)) { this.replyQueues.remove(id); return null; } + //log.info("getResponse - after send"); try { return blockingQueue.poll(timeout, TimeUnit.MILLISECONDS); } finally { this.replyQueues.remove(id); + //log.info("getResponse - regular - id removed from replyQueues"); } } @@ -660,10 +679,16 @@ public class RNSPeer { */ public boolean sendMessageWithTimeout(Message message, int timeout) { try { + // send the message log.trace("Sending {} message with ID {} to peer {}", message.getType().name(), message.getId(), this); var peerBuffer = getOrInitPeerBuffer(); this.peerBuffer.write(message.toBytes()); this.peerBuffer.flush(); + //// send a message to confirm receipt over the buffer + //var messageId = message.getId(); + //var confirmData = concatArrays(SEQ_REQUEST_CONFIRM_ID,"::".getBytes(UTF_8), messageId.getBytes(UTF_8)); + //this.peerBuffer.write(confirmData); + //this.peerBuffer.flush(); return true; //} catch (InterruptedException e) { // // Send failure @@ -709,8 +734,8 @@ public class RNSPeer { //@Synchronized public boolean sendMessage(Message message) { try { - log.trace("Sending {} message with ID {} to peer {}", message.getType().name(), message.getId(), this); - log.info("Sending {} message with ID {} to peer {}", message.getType().name(), message.getId(), this); + log.trace("Sending {} message with ID {} to peer {}", message.getType().name(), message.getId(), this.toString()); + //log.info("Sending {} message with ID {} to peer {}", message.getType().name(), message.getId(), this.toString()); var peerBuffer = getOrInitPeerBuffer(); peerBuffer.write(message.toBytes()); peerBuffer.flush(); @@ -728,7 +753,7 @@ public class RNSPeer { protected void startPings() { log.trace("[{}] Enabling pings for peer {}", - peerLink.getDestination().getHexHash(), this); + peerLink.getDestination().getHexHash(), this.toString()); this.lastPingSent = NTP.getTime(); } @@ -829,4 +854,41 @@ public class RNSPeer { return false; return true; } + + // Pending signature requests + public void addPendingSignatureRequest(byte[] signature) { + // Check if we already have this signature in the list + for (byte[] existingSignature : this.pendingSignatureRequests) { + if (Arrays.equals(existingSignature, signature )) { + return; + } + } + this.pendingSignatureRequests.add(signature); + } + + public void removePendingSignatureRequest(byte[] signature) { + Iterator iterator = this.pendingSignatureRequests.iterator(); + while (iterator.hasNext()) { + byte[] existingSignature = (byte[]) iterator.next(); + if (Arrays.equals(existingSignature, signature)) { + iterator.remove(); + } + } + } + + public List getPendingSignatureRequests() { + return this.pendingSignatureRequests; + } + + // Details used by API + public long getConnectionEstablishedTime() { + return linkEstablishedTime; + } + + public long getConnectionAge() { + if (linkEstablishedTime > 0L) { + return System.currentTimeMillis() - linkEstablishedTime; + } + return linkEstablishedTime; + } } diff --git a/src/main/java/org/qortal/network/task/RNSPingTask.java b/src/main/java/org/qortal/network/task/RNSPingTask.java index 5849d7f8..26b35378 100644 --- a/src/main/java/org/qortal/network/task/RNSPingTask.java +++ b/src/main/java/org/qortal/network/task/RNSPingTask.java @@ -35,8 +35,8 @@ public class RNSPingTask implements Task { // Note: Even though getResponse would work, we can use // peer.sendMessage(pingMessage) using Reticulum buffer instead. // More efficient and saves room for other request/response tasks. - //peer.getResponse(pingMessage); - peer.sendMessage(pingMessage); + peer.getResponse(pingMessage); + //peer.sendMessage(pingMessage); //// task is not over here (Reticulum is asynchronous) //peer.setLastPing(NTP.getTime() - now); diff --git a/src/main/java/org/qortal/settings/Settings.java b/src/main/java/org/qortal/settings/Settings.java index f3f84e12..9a68c628 100644 --- a/src/main/java/org/qortal/settings/Settings.java +++ b/src/main/java/org/qortal/settings/Settings.java @@ -614,12 +614,14 @@ public class Settings { } } - // Related to Reticulum networking + // Related to mesh networking + /** Preferred network: "tcpip" or "reticulum" */ + private String preferredNetwork = "reticulum"; /** Maximum number of Reticulum peers allowed. */ private int reticulumMaxPeers = 55; /** Minimum number of Reticulum peers desired. */ - private int reticulumMinDesiredPeers = 3; + private int reticulumMinDesiredPeers = 8; /** Maximum number of task executor network threads */ private int reticulumMaxNetworkThreadPoolSize = 89; @@ -1380,6 +1382,10 @@ public class Settings { return connectionPoolMonitorEnabled; } + public String getPreferredNetwork () { + return this.preferredNetwork.toLowerCase(Locale.getDefault()); + } + public int getReticulumMaxPeers() { return this.reticulumMaxPeers; } diff --git a/src/main/resources/reticulum_default_config.yml b/src/main/resources/reticulum_default_config.yml index 18e8b729..8ec22a6d 100644 --- a/src/main/resources/reticulum_default_config.yml +++ b/src/main/resources/reticulum_default_config.yml @@ -64,9 +64,9 @@ interfaces: # local IPv6 is enabled in your operating system, which # should be enabled by default in almost any OS. See # the Reticulum Manual for more configuration options. - #"Default Interface": - # type: AutoInterface - # enabled: true + "Default Interface": + type: AutoInterface + enabled: true # This interface enables communication with a "backbone" # server over TCP. @@ -76,7 +76,7 @@ interfaces: enabled: true target_host: phantom.mobilefabrik.com target_port: 4242 - #network_name: qortal + network_name: qortal # This interface turns this Reticulum instance into a # server other clients can connect to over TCP. @@ -88,6 +88,6 @@ interfaces: # type: TCPServerInterface # enabled: true # listen_ip: 0.0.0.0 - # listen_port: 3434 - # #network_name: qortal + # listen_port: 4242 + # network_name: qortal diff --git a/src/main/resources/reticulum_default_testnet_config.yml b/src/main/resources/reticulum_default_testnet_config.yml index 18c7b3fb..17ec5e6d 100644 --- a/src/main/resources/reticulum_default_testnet_config.yml +++ b/src/main/resources/reticulum_default_testnet_config.yml @@ -64,9 +64,9 @@ interfaces: # local IPv6 is enabled in your operating system, which # should be enabled by default in almost any OS. See # the Reticulum Manual for more configuration options. - #"Default Interface": - # type: AutoInterface - # enabled: true + "Default Interface": + type: AutoInterface + enabled: true # This interface enables communication with a "backbone" # server over TCP. @@ -75,8 +75,8 @@ interfaces: type: TCPClientInterface enabled: true target_host: phantom.mobilefabrik.com - target_port: 3434 - #network_name: qortal + target_port: 4242 + network_name: qortaltest # This interface turns this Reticulum instance into a # server other clients can connect to over TCP. @@ -88,6 +88,6 @@ interfaces: # type: TCPServerInterface # enabled: true # listen_ip: 0.0.0.0 - # listen_port: 3434 - # #network_name: qortal + # listen_port: 4242 + # network_name: qortaltest