From b9c4a0c467b10b5fd76e24a9421134948eb6c8cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Schulthess?= Date: Mon, 30 Dec 2024 14:11:30 +0100 Subject: [PATCH] initial compileing/working with buffer --- log4j2.properties | 32 ++ pom.xml | 81 ++--- .../org/qortal/controller/Controller.java | 14 +- .../java/org/qortal/network/RNSCommon.java | 2 +- .../java/org/qortal/network/RNSNetwork.java | 307 +++++++++--------- src/main/java/org/qortal/network/RNSPeer.java | 146 +++++++-- start.sh | 3 + 7 files changed, 367 insertions(+), 218 deletions(-) diff --git a/log4j2.properties b/log4j2.properties index 54f295c1..584e74b6 100644 --- a/log4j2.properties +++ b/log4j2.properties @@ -6,6 +6,34 @@ rootLogger.level = info rootLogger.appenderRef.console.ref = stdout rootLogger.appenderRef.rolling.ref = FILE +### additional debug options (in case of problems eg. 202411 + +#to see more QDN details - add the stuff below +#logger.arbitrary.name = org.qortal.arbitrary +#logger.arbitrary.level = trace + +#to see more QDN networking details - add stuff below +#logger.arbitrarycontroller.name = org.qortal.controller.arbitrary +#logger.arbitrarycontroller.level = debug + +# Support optional, Network Task debugging +#logger.networkTask.name = org.qortal.network.task +#logger.networkTask.level = debug + +# Support optional, Network Task tracing +#logger.networkTask.name = org.qortal.network.task +#logger.networkTask.level = trace + +# Support optional, Block debugging +#logger.block.name = org.qortal.block +#logger.block.level = debug + +# Support optional, Block tracing +#logger.block.name = org.qortal.block +#logger.block.level = trace + +### end additional debug options + # Suppress extraneous bitcoinj library output logger.bitcoinj.name = org.bitcoinj logger.bitcoinj.level = error @@ -18,6 +46,10 @@ logger.hsqldb.level = warn logger.hsqldbRepository.name = org.qortal.repository.hsqldb logger.hsqldbRepository.level = debug +## Support optional, controller repository debugging +#logger.controllerRepository.name = org.qortal.controller.repository +#logger.controllerRepository.level = debug + # Suppress extraneous Jersey warning logger.jerseyInject.name = org.glassfish.jersey.internal.inject.Providers logger.jerseyInject.level = off diff --git a/pom.xml b/pom.xml index c632e29d..89eeac75 100644 --- a/pom.xml +++ b/pom.xml @@ -445,26 +445,26 @@ - + @@ -590,32 +590,33 @@ guava ${guava.version} + org.slf4j slf4j-api ${slf4j.version} + + org.apache.logging.log4j + log4j-slf4j2-impl + ${log4j.version} + - - org.apache.logging.log4j - log4j-slf4j2-impl - ${log4j.version} - - - org.apache.logging.log4j - log4j-core - ${log4j.version} - - - org.apache.logging.log4j - log4j-api - ${log4j.version} - - - org.apache.logging.log4j - log4j-jul - ${log4j.version} - + + org.apache.logging.log4j + log4j-core + ${log4j.version} + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + + + org.apache.logging.log4j + log4j-jul + ${log4j.version} + javax.servlet diff --git a/src/main/java/org/qortal/controller/Controller.java b/src/main/java/org/qortal/controller/Controller.java index 86e3d8ae..b3ca7f8d 100644 --- a/src/main/java/org/qortal/controller/Controller.java +++ b/src/main/java/org/qortal/controller/Controller.java @@ -1245,17 +1245,17 @@ public class Controller extends Thread { } public void doRNSNetworkBroadcast() { - if (Settings.getInstance().isLite()) { - // Lite nodes have nothing to broadcast - return; - } - RNSNetwork network = RNSNetwork.getInstance(); + //if (Settings.getInstance().isLite()) { + // // Lite nodes have nothing to broadcast + // return; + //} + //RNSNetwork network = RNSNetwork.getInstance(); //// Send our current height //network.broadcastOurChain(); - //// Requiest unconfirmed transaction signatures, but only if we're up-to-date. - //// if we're not up-to-dat then then priority is synchronizing first + //// Request unconfirmed transaction signatures, but only if we're up-to-date. + //// if we're not up-to-dat then priority is synchronizing first //if (isUpToDate()) { // network.broadcast(network::buildGetUnconfirmedTransactionsMessage); //} diff --git a/src/main/java/org/qortal/network/RNSCommon.java b/src/main/java/org/qortal/network/RNSCommon.java index a87bd9e0..29395d5f 100644 --- a/src/main/java/org/qortal/network/RNSCommon.java +++ b/src/main/java/org/qortal/network/RNSCommon.java @@ -18,7 +18,7 @@ public class RNSCommon { * Default config */ public static String defaultRNSConfig = "reticulum_default_config.yml"; - public static String defaultRNSConfigTetnet = "reticulum_default_testnet_config.yml"; + public static String defaultRNSConfigTestnet = "reticulum_default_testnet_config.yml"; ///** // * Qortal RNS Destinations diff --git a/src/main/java/org/qortal/network/RNSNetwork.java b/src/main/java/org/qortal/network/RNSNetwork.java index 9a472e0c..6453ad7f 100644 --- a/src/main/java/org/qortal/network/RNSNetwork.java +++ b/src/main/java/org/qortal/network/RNSNetwork.java @@ -11,6 +11,9 @@ import io.reticulum.identity.Identity; import io.reticulum.link.Link; import io.reticulum.link.LinkStatus; //import io.reticulum.constant.LinkConstant; +//import static io.reticulum.constant.ReticulumConstant.MTU; +import io.reticulum.buffer.Buffer; +import io.reticulum.buffer.BufferedRWPair; import io.reticulum.packet.Packet; import io.reticulum.packet.PacketReceipt; import io.reticulum.packet.PacketReceiptStatus; @@ -20,6 +23,7 @@ import io.reticulum.transport.AnnounceHandler; import static io.reticulum.link.TeardownSession.TIMEOUT; import static io.reticulum.link.LinkStatus.ACTIVE; import static io.reticulum.link.LinkStatus.STALE; +import static io.reticulum.link.LinkStatus.CLOSED; //import static io.reticulum.link.LinkStatus.PENDING; import static io.reticulum.link.LinkStatus.HANDSHAKE; //import static io.reticulum.packet.PacketContextType.LINKCLOSE; @@ -42,11 +46,13 @@ import static java.nio.file.StandardOpenOption.CREATE; import static java.nio.file.StandardOpenOption.WRITE; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.channels.SelectionKey; import static java.nio.charset.StandardCharsets.UTF_8; //import static java.util.Objects.isNull; //import static java.util.Objects.isNull; import static java.util.Objects.nonNull; +//import static org.apache.commons.lang3.BooleanUtils.isTrue; //import static org.apache.commons.lang3.BooleanUtils.isFalse; import java.io.File; @@ -54,11 +60,19 @@ import java.util.Arrays; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Iterator; //import java.util.Random; //import java.util.Scanner; import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; +//import java.util.concurrent.locks.Lock; +//import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.codec.binary.Hex; +import org.qortal.utils.ExecuteProduceConsume; +import org.qortal.utils.NamedThreadFactory; +import org.qortal.utils.ExecuteProduceConsume.StatsSnapshot; // logging import lombok.extern.slf4j.Slf4j; @@ -84,13 +98,21 @@ public class RNSNetwork { public Destination baseDestination; private volatile boolean isShuttingDown = false; private final List linkedPeers = Collections.synchronizedList(new ArrayList<>()); - private final List incomingLinks = Collections.synchronizedList(new ArrayList<>()); + private List immutableLinkedPeers = Collections.emptyList(); + //private final List incomingLinks = Collections.synchronizedList(new ArrayList<>()); + private final List incomingPeers = Collections.synchronizedList(new ArrayList<>()); + private List immutableIncomingPeers = Collections.emptyList(); - ////private final ExecuteProduceConsume rnsNetworkEPC; - //private static final long NETWORK_EPC_KEEPALIVE = 1000L; // 1 second + //private final ExecuteProduceConsume rnsNetworkEPC; + private static final long NETWORK_EPC_KEEPALIVE = 1000L; // 1 second //private volatile boolean isShuttingDown = false; - //private int totalThreadCount = 0; - //// TODO: settings - MaxReticulumPeers, MaxRNSNetworkThreadPoolSize (if needed) + private int totalThreadCount = 0; + // TODO: settings - MaxReticulumPeers, MaxRNSNetworkThreadPoolSize (if needed) + + // replicating a feature from Network.class needed in for base Message.java, + // just in case the classic TCP/IP Networking is turned off. + private static final byte[] MAINNET_MESSAGE_MAGIC = new byte[]{0x51, 0x4f, 0x52, 0x54}; // QORT + private static final byte[] TESTNET_MESSAGE_MAGIC = new byte[]{0x71, 0x6f, 0x72, 0x54}; // qorT //private static final Logger logger = LoggerFactory.getLogger(RNSNetwork.class); @@ -113,12 +135,12 @@ public class RNSNetwork { log.info("reticulum instance created"); log.info("reticulum instance created: {}", reticulum); - // Settings.getInstance().getMaxRNSNetworkThreadPoolSize(), // statically set to 5 below + //// Settings.getInstance().getMaxRNSNetworkThreadPoolSize(), // statically set to 5 below //ExecutorService RNSNetworkExecutor = new ThreadPoolExecutor(1, // 5, // NETWORK_EPC_KEEPALIVE, TimeUnit.SECONDS, // new SynchronousQueue(), - // new NamedThreadFactory("RNSNetwork-EPC")); + // new NamedThreadFactory("RNSNetwork-EPC", Settings.getInstance().getNetworkThreadPriority())); //rnsNetworkEPC = new RNSNetworkProcessor(RNSNetworkExecutor); } @@ -168,18 +190,18 @@ public class RNSNetwork { baseDestination.setProofStrategy(ProofStrategy.PROVE_ALL); baseDestination.setAcceptLinkRequests(true); - - baseDestination.setLinkEstablishedCallback(this::clientConnected); + baseDestination.setLinkEstablishedCallback(this::clientConnected); Transport.getInstance().registerAnnounceHandler(new QAnnounceHandler()); log.debug("announceHandlers: {}", Transport.getInstance().getAnnounceHandlers()); - // do a first announce baseDestination.announce(); log.debug("Sent initial announce from {} ({})", Hex.encodeHexString(baseDestination.getHash()), baseDestination.getName()); - + + log.info("check point 1"); // Start up first networking thread (the "server loop") //rnsNetworkEPC.start(); + log.info("check point 2"); } private void initConfig(String configDir) throws IOException { @@ -193,7 +215,7 @@ public class RNSNetwork { if (Files.notExists(configFile)) { var defaultConfig = this.getClass().getClassLoader().getResourceAsStream(RNSCommon.defaultRNSConfig); if (Settings.getInstance().isTestNet()) { - defaultConfig = this.getClass().getClassLoader().getResourceAsStream(RNSCommon.defaultRNSConfigTetnet); + defaultConfig = this.getClass().getClassLoader().getResourceAsStream(RNSCommon.defaultRNSConfigTestnet); } Files.copy(defaultConfig, configFile, StandardCopyOption.REPLACE_EXISTING); } @@ -203,19 +225,25 @@ public class RNSNetwork { isShuttingDown = true; log.info("shutting down Reticulum"); - // Stop processing threads (the "server loop") + // gracefully close links of peers that point to us + for (RNSPeer p: incomingPeers) { + var pl = p.getPeerLink(); + if (nonNull(pl) & (pl.getStatus() == ACTIVE)) { + p.sendCloseToRemote(pl); + } + } + //// Stop processing threads (the "server loop") //try { // if (!this.rnsNetworkEPC.shutdown(5000)) { - // logger.warn("RNSNetwork threads failed to terminate"); + // log.warn("RNSNetwork threads failed to terminate"); // } //} catch (InterruptedException e) { - // logger.warn("Interrupted while waiting for RNS networking threads to terminate"); + // log.warn("Interrupted while waiting for RNS networking threads to terminate"); //} - // Disconnect peers gracefully and terminate Reticulum for (RNSPeer p: linkedPeers) { log.info("shutting down peer: {}", Hex.encodeHexString(p.getDestinationHash())); - log.debug("peer: {}", p); + //log.debug("peer: {}", p); p.shutdown(); try { TimeUnit.SECONDS.sleep(1); // allow for peers to disconnect gracefully @@ -223,10 +251,6 @@ public class RNSNetwork { log.error("exception: ", e); } } - // gracefully close links of peers that point to us - for (Link l: incomingLinks) { - sendCloseToRemote(l); - } // Note: we still need to get the packet timeout callback to work... reticulum.exitHandler(); } @@ -264,61 +288,24 @@ public class RNSNetwork { } public void clientConnected(Link link) { - link.setLinkClosedCallback(this::clientDisconnected); - link.setPacketCallback(this::serverPacketReceived); - var peer = findPeerByLink(link); - if (nonNull(peer)) { - log.info("initiator peer {} opened link (link lookup: {}), link destination hash: {}", - Hex.encodeHexString(peer.getDestinationHash()), link, Hex.encodeHexString(link.getDestination().getHash())); - // make sure the peerLink is actvive. - peer.getOrInitPeerLink(); - } else { - log.info("non-initiator closed link (link lookup: {}), link destination hash (initiator): {}", - link, Hex.encodeHexString(link.getDestination().getHash())); - } - incomingLinks.add(link); + //link.setLinkClosedCallback(this::clientDisconnected); + //link.setPacketCallback(this::serverPacketReceived); + log.info("clientConnected - link hash: {}, {}", link.getHash(), Hex.encodeHexString(link.getHash())); + RNSPeer newPeer = new RNSPeer(link); + newPeer.setPeerLinkHash(link.getHash()); + // make sure the peer has a channel and buffer + newPeer.getOrInitPeerBuffer(); + incomingPeers.add(newPeer); log.info("***> Client connected, link: {}", link); } public void clientDisconnected(Link link) { - var peer = findPeerByLink(link); - if (nonNull(peer)) { - log.info("initiator peer {} closed link (link lookup: {}), link destination hash: {}", - Hex.encodeHexString(peer.getDestinationHash()), link, Hex.encodeHexString(link.getDestination().getHash())); - } else { - log.info("non-initiator closed link (link lookup: {}), link destination hash (initiator): {}", - link, Hex.encodeHexString(link.getDestination().getHash())); - } - // if we have a peer pointing to that destination, we can close and remove it - peer = findPeerByDestinationHash(link.getDestination().getHash()); - if (nonNull(peer)) { - // Note: no shutdown as the remobe peer could be only rebooting. - // keep it to reopen link later if possible. - peer.getPeerLink().teardown(); - } - incomingLinks.remove(link); log.info("***> Client disconnected"); } public void serverPacketReceived(byte[] message, Packet packet) { var msgText = new String(message, StandardCharsets.UTF_8); log.info("Received data on link - message: {}, destinationHash: {}", msgText, Hex.encodeHexString(packet.getDestinationHash())); - //var peer = findPeerByDestinationHash(packet.getDestinationHash()); - //if (msgText.equals("ping")) { - // log.info("received ping"); - // //if (nonNull(peer)) { - // // String replyText = "pong"; - // // byte[] replyData = replyText.getBytes(StandardCharsets.UTF_8); - // // Packet reply = new Packet(peer.getPeerLink(), replyData); - // //} - //} - //if (msgText.equals("shutdown")) { - // log.info("shutdown packet received"); - // var link = recall(packet.getDestinationHash()); - // log.info("recalled destinationHash: {}", link); - // //... - //} - // TODO: process packet.... } //public void announceBaseDestination () { @@ -328,9 +315,6 @@ public class RNSNetwork { private class QAnnounceHandler implements AnnounceHandler { @Override public String getAspectFilter() { - // handle all announces - //return null; - // handle cortal.core announces return "qortal.core"; } @@ -381,7 +365,8 @@ public class RNSNetwork { RNSPeer newPeer = new RNSPeer(destinationHash); newPeer.setServerIdentity(announcedIdentity); newPeer.setIsInitiator(true); - lps.add(newPeer); + //lps.add(newPeer); + addLinkedPeer(newPeer); log.info("added new RNSPeer, destinationHash: {}", Hex.encodeHexString(destinationHash)); } } @@ -390,62 +375,60 @@ public class RNSNetwork { } // Main thread - //class RNSNetworkProcessor extends ExecuteProduceConsume { - // - // //private final Logger logger = LoggerFactory.getLogger(RNSNetworkProcessor.class); - // - // private final AtomicLong nextConnectTaskTimestamp = new AtomicLong(0L); // ms - try first connect once NTP syncs - // private final AtomicLong nextBroadcastTimestamp = new AtomicLong(0L); // ms - try first broadcast once NTP syncs - // - // private Iterator channelIterator = null; - // - // RNSNetworkProcessor(ExecutorService executor) { - // super(executor); - // } - // - // @Override - // protected void onSpawnFailure() { - // // For debugging: - // // ExecutorDumper.dump(this.executor, 3, ExecuteProduceConsume.class); - // } - // - // @Override - // protected Task produceTask(boolean canBlock) throws InterruptedException { - // Task task; - // - // //task = maybeProducePeerMessageTask(); - // //if (task != null) { - // // return task; - // //} - // // - // //final Long now = NTP.getTime(); - // // - // //task = maybeProducePeerPingTask(now); - // //if (task != null) { - // // return task; - // //} - // // - // //task = maybeProduceConnectPeerTask(now); - // //if (task != null) { - // // return task; - // //} - // // - // //task = maybeProduceBroadcastTask(now); - // //if (task != null) { - // // return task; - // //} - // // - // // Only this method can block to reduce CPU spin - // //return maybeProduceChannelTask(canBlock); - // - // // TODO: flesh out the tasks handled by Reticulum - // return null; - // } - // //...TODO: implement abstract methods... - //} + class RNSNetworkProcessor extends ExecuteProduceConsume { + //private final Logger logger = LoggerFactory.getLogger(RNSNetworkProcessor.class); + + private final AtomicLong nextConnectTaskTimestamp = new AtomicLong(0L); // ms - try first connect once NTP syncs + private final AtomicLong nextBroadcastTimestamp = new AtomicLong(0L); // ms - try first broadcast once NTP syncs + + private Iterator channelIterator = null; + + RNSNetworkProcessor(ExecutorService executor) { + super(executor); + } + + @Override + protected void onSpawnFailure() { + // For debugging: + // ExecutorDumper.dump(this.executor, 3, ExecuteProduceConsume.class); + } + + @Override + protected Task produceTask(boolean canBlock) throws InterruptedException { + Task task; + + //task = maybeProducePeerMessageTask(); + //if (task != null) { + // return task; + //} + // + //final Long now = NTP.getTime(); + // + //task = maybeProducePeerPingTask(now); + //if (task != null) { + // return task; + //} + // + //task = maybeProduceConnectPeerTask(now); + //if (task != null) { + // return task; + //} + // + //task = maybeProduceBroadcastTask(now); + //if (task != null) { + // return task; + //} + // + // Only this method can block to reduce CPU spin + //return maybeProduceChannelTask(canBlock); + + // TODO: flesh out the tasks handled by Reticulum + return null; + } + //...TODO: implement abstract methods... + } - // getter / setter private static class SingletonContainer { private static final RNSNetwork INSTANCE = new RNSNetwork(); } @@ -454,30 +437,43 @@ public class RNSNetwork { return SingletonContainer.INSTANCE; } - //public Identity getServerIdentity() { - // return this.serverIdentity; + //public List getImmutableLinkedPeers() { + // return this.immutableLinkedPeers; //} - //public Reticulum getReticulum() { - // return this.reticulum; - //} + public void addLinkedPeer(RNSPeer peer) { + this.linkedPeers.add(peer); + this.immutableLinkedPeers = List.copyOf(this.linkedPeers); // thread safe + } + + public void removeLinkedPeer(RNSPeer peer) { + if (nonNull(peer.getPeerLink())) { + peer.getPeerLink().teardown(); + } + this.linkedPeers.remove(peer); // thread safe + this.immutableLinkedPeers = List.copyOf(this.linkedPeers); + } public List getLinkedPeers() { - synchronized(this.linkedPeers) { + //synchronized(this.linkedPeers) { //return new ArrayList<>(this.linkedPeers); return this.linkedPeers; - } + //} } - public Integer getTotalPeers() { - synchronized (this) { - return linkedPeers.size(); + public void removeIncommingPeer(RNSPeer peer) { + if (nonNull(peer.getPeerLink())) { + peer.getPeerLink().teardown(); } + this.incomingPeers.remove(peer); + this.immutableIncomingPeers = List.copyOf(this.incomingPeers); } - //public Destination getBaseDestination() { - // return baseDestination; - //} + public List getIncomingPeers() { + return this.incomingPeers; + } + + // TODO, methods for: getAvailablePeer // maintenance //public void removePeer(RNSPeer peer) { @@ -496,12 +492,12 @@ public class RNSNetwork { // } //} - @Synchronized + //@Synchronized public void prunePeers() throws DataException { // run periodically (by the Controller) //List linkList = getLinkedPeers(); var peerList = getLinkedPeers(); - log.info("number of links (linkedPeers) before prunig: {}", peerList.size()); + log.info("number of links (linkedPeers) before pruning: {}", peerList.size()); Link pLink; LinkStatus lStatus; for (RNSPeer p: peerList) { @@ -518,16 +514,19 @@ public class RNSNetwork { log.info("Link {} status: {}", pLink, lStatus); // lStatus in: PENDING, HANDSHAKE, ACTIVE, STALE, CLOSED if ((lStatus == STALE) || (pLink.getTeardownReason() == TIMEOUT) || (p.getDeleteMe())) { - p.shutdown(); - peerList.remove(p); + //p.shutdown(); + //peerList.remove(p); + removeLinkedPeer(p); } else if (lStatus == HANDSHAKE) { // stuck in handshake state (do we need to shutdown/remove it?) log.info("peer status HANDSHAKE"); - p.shutdown(); - peerList.remove(p); + //p.shutdown(); + //peerList.remove(p); + removeLinkedPeer(p); } } else { - peerList.remove(p); + //peerList.remove(p); + removeLinkedPeer(p); } } //removeExpiredPeers(this.linkedPeers); @@ -602,7 +601,8 @@ public class RNSNetwork { //} public RNSPeer findPeerByLink(Link link) { - List lps = RNSNetwork.getInstance().getLinkedPeers(); + //List lps = RNSNetwork.getInstance().getLinkedPeers(); + List lps = RNSNetwork.getInstance().getImmutableLinkedPeers(); RNSPeer peer = null; for (RNSPeer p : lps) { var pLink = p.getPeerLink(); @@ -618,7 +618,8 @@ public class RNSNetwork { } public RNSPeer findPeerByDestinationHash(byte[] dhash) { - List lps = RNSNetwork.getInstance().getLinkedPeers(); + //List lps = RNSNetwork.getInstance().getLinkedPeers(); + List lps = RNSNetwork.getInstance().getImmutableLinkedPeers(); RNSPeer peer = null; for (RNSPeer p : lps) { if (Arrays.equals(p.getDestinationHash(), dhash)) { @@ -630,11 +631,15 @@ public class RNSNetwork { return peer; } - public void removePeer(RNSPeer peer) { - List peerList = this.linkedPeers; - if (nonNull(peer)) { - peerList.remove(peer); - } + //public void removePeer(RNSPeer peer) { + // List peerList = this.linkedPeers; + // if (nonNull(peer)) { + // peerList.remove(peer); + // } + //} + + public byte[] getMessageMagic() { + return Settings.getInstance().isTestNet() ? TESTNET_MESSAGE_MAGIC : MAINNET_MESSAGE_MAGIC; } } diff --git a/src/main/java/org/qortal/network/RNSPeer.java b/src/main/java/org/qortal/network/RNSPeer.java index 37f8db14..ad0962bc 100644 --- a/src/main/java/org/qortal/network/RNSPeer.java +++ b/src/main/java/org/qortal/network/RNSPeer.java @@ -30,13 +30,18 @@ import static io.reticulum.link.LinkStatus.ACTIVE; //import static io.reticulum.link.LinkStatus.CLOSED; import static io.reticulum.identity.IdentityKnownDestination.recall; //import static io.reticulum.identity.IdentityKnownDestination.recallAppData; +import io.reticulum.buffer.Buffer; +import io.reticulum.buffer.BufferedRWPair; +import static io.reticulum.utils.IdentityUtils.concatArrays; import org.qortal.settings.Settings; import java.nio.charset.StandardCharsets; import static java.nio.charset.StandardCharsets.UTF_8; -import org.apache.commons.codec.binary.Hex; +import static org.apache.commons.codec.binary.Hex.encodeHexString; import static org.apache.commons.lang3.ArrayUtils.subarray; +import static org.apache.commons.lang3.BooleanUtils.isFalse; +import static org.apache.commons.lang3.BooleanUtils.isTrue; import lombok.extern.slf4j.Slf4j; import lombok.Setter; @@ -57,20 +62,48 @@ public class RNSPeer { @Setter(AccessLevel.PACKAGE) private Instant creationTimestamp; private Instant lastAccessTimestamp; Link peerLink; + byte[] peerLinkHash; + BufferedRWPair peerBuffer; + int receiveStreamId = 0; + int sendStreamId = 0; private Boolean isInitiator; private Boolean deleteMe = false; + private Boolean isVacant = true; private Double requestResponseProgress; @Setter(AccessLevel.PACKAGE) private Boolean peerTimedOut = false; + /** + * Constructor for initiator peers + */ public RNSPeer(byte[] dhash) { - destinationHash = dhash; - serverIdentity = recall(dhash); + this.destinationHash = dhash; + this.serverIdentity = recall(dhash); initPeerLink(); //setCreationTimestamp(System.currentTimeMillis()); - creationTimestamp = Instant.now(); + this.creationTimestamp = Instant.now(); + this.isVacant = true; } + /** + * Constructor for non-initiator peers + */ + public RNSPeer(Link link) { + this.peerLink = link; + //this.peerLinkId = link.getLinkId(); + this.peerDestination = link.getDestination(); + this.destinationHash = link.getDestination().getHash(); + this.serverIdentity = link.getRemoteIdentity(); + + this.creationTimestamp = Instant.now(); + this.lastAccessTimestamp = null; + this.isInitiator = false; + this.isVacant = false; + + //this.peerLink.setLinkEstablishedCallback(this::linkEstablished); + //this.peerLink.setLinkClosedCallback(this::linkClosed); + //this.peerLink.setPacketCallback(this::linkPacketReceived); + } public void initPeerLink() { peerDestination = new Destination( this.serverIdentity, @@ -81,16 +114,32 @@ public class RNSPeer { ); peerDestination.setProofStrategy(ProofStrategy.PROVE_ALL); - lastAccessTimestamp = Instant.now(); - isInitiator = true; + this.creationTimestamp = Instant.now(); + this.lastAccessTimestamp = null; + this.isInitiator = true; - peerLink = new Link(peerDestination); + this.peerLink = new Link(peerDestination); this.peerLink.setLinkEstablishedCallback(this::linkEstablished); this.peerLink.setLinkClosedCallback(this::linkClosed); this.peerLink.setPacketCallback(this::linkPacketReceived); } + public BufferedRWPair getOrInitPeerBuffer() { + var channel = this.peerLink.getChannel(); + if (nonNull(this.peerBuffer)) { + log.info("peerBuffer exists: {}, link status: {}", this.peerBuffer, this.peerLink.getStatus()); + this.peerBuffer.close(); + this.peerBuffer = Buffer.createBidirectionalBuffer(receiveStreamId, sendStreamId, channel, this::peerBufferReady); + //return this.peerBuffer; + } + else { + log.info("creating buffer - peerLink status: {}, channel: {}", this.peerLink.getStatus(), channel); + this.peerBuffer = Buffer.createBidirectionalBuffer(receiveStreamId, sendStreamId, channel, this::peerBufferReady); + } + return getPeerBuffer(); + } + public Link getOrInitPeerLink() { if (this.peerLink.getStatus() == ACTIVE) { lastAccessTimestamp = Instant.now(); @@ -102,10 +151,15 @@ public class RNSPeer { } public void shutdown() { - if (nonNull(peerLink)) { + if (nonNull(this.peerLink)) { log.info("shutdown - peerLink: {}, status: {}", peerLink, peerLink.getStatus()); if (peerLink.getStatus() == ACTIVE) { + if (isFalse(this.isInitiator)) { + sendCloseToRemote(this.peerLink); + } peerLink.teardown(); + }else { + log.info("shutdown - status (non-ACTIVE): {}", peerLink.getStatus()); } this.peerLink = null; } @@ -125,8 +179,8 @@ public class RNSPeer { public void linkEstablished(Link link) { link.setLinkClosedCallback(this::linkClosed); log.info("peerLink {} established (link: {}) with peer: hash - {}, link destination hash: {}", - peerLink, link, Hex.encodeHexString(destinationHash), - Hex.encodeHexString(link.getDestination().getHash())); + peerLink, link, encodeHexString(destinationHash), + encodeHexString(link.getDestination().getHash())); } public void linkClosed(Link link) { @@ -136,11 +190,11 @@ public class RNSPeer { } else if (link.getTeardownReason() == INITIATOR_CLOSED) { log.info("Link closed callback: The initiator closed the link"); log.info("peerLink {} closed (link: {}), link destination hash: {}", - peerLink, link, Hex.encodeHexString(link.getDestination().getHash())); + peerLink, link, encodeHexString(link.getDestination().getHash())); } else if (link.getTeardownReason() == DESTINATION_CLOSED) { log.info("Link closed callback: The link was closed by the peer, removing peer"); log.info("peerLink {} closed (link: {}), link destination hash: {}", - peerLink, link, Hex.encodeHexString(link.getDestination().getHash())); + peerLink, link, encodeHexString(link.getDestination().getHash())); } else { log.info("Link closed callback"); } @@ -153,8 +207,8 @@ public class RNSPeer { } else if (msgText.startsWith("close::")) { var targetPeerHash = subarray(message, 7, message.length); log.info("peer dest hash: {}, target hash: {}", - Hex.encodeHexString(destinationHash), - Hex.encodeHexString(targetPeerHash)); + encodeHexString(destinationHash), + encodeHexString(targetPeerHash)); if (Arrays.equals(destinationHash, targetPeerHash)) { log.info("closing link: {}", peerLink.getDestination().getHexHash()); peerLink.teardown(); @@ -162,8 +216,8 @@ public class RNSPeer { } else if (msgText.startsWith("open::")) { var targetPeerHash = subarray(message, 7, message.length); log.info("peer dest hash: {}, target hash: {}", - Hex.encodeHexString(destinationHash), - Hex.encodeHexString(targetPeerHash)); + encodeHexString(destinationHash), + encodeHexString(targetPeerHash)); if (Arrays.equals(destinationHash, targetPeerHash)) { log.info("closing link: {}", peerLink.getDestination().getHexHash()); getOrInitPeerLink(); @@ -172,8 +226,62 @@ public class RNSPeer { // TODO: process incoming packet.... } + /* + * Callback from buffer when buffer has data available + * + * :param readyBytes: The number of bytes ready to read + */ + public void peerBufferReady(Integer readyBytes) { + var data = this.peerBuffer.read(readyBytes); + var decodedData = new String(data); + + log.info("Received data over the buffer: {}", decodedData); + + //if (isFalse(this.isInitiator)) { + // // TODO: process data and reply + //} else { + // this.peerBuffer.flush(); // clear buffer + //} + } + + /** + * Set a packet to remote with the message format "close::" + * This method is only useful for non-initiator links to close the remote initiator. + * + * @param link + */ + public void sendCloseToRemote(Link link) { + var baseDestination = RNSNetwork.getInstance().getBaseDestination(); + if (nonNull(link) & (isFalse(link.isInitiator()))) { + // Note: if part of link we need to get the baseDesitination hash + //var data = concatArrays("close::".getBytes(UTF_8),link.getDestination().getHash()); + var data = concatArrays("close::".getBytes(UTF_8), baseDestination.getHash()); + Packet closePacket = new Packet(link, data); + var packetReceipt = closePacket.send(); + packetReceipt.setDeliveryCallback(this::closePacketDelivered); + packetReceipt.setTimeout(1000L); + packetReceipt.setTimeoutCallback(this::packetTimedOut); + } else { + log.debug("can't send to null link"); + } + } /** PacketReceipt callbacks */ + public void closePacketDelivered(PacketReceipt receipt) { + var rttString = new String(""); + if (receipt.getStatus() == PacketReceiptStatus.DELIVERED) { + var rtt = receipt.getRtt(); // rtt (Java) is in milliseconds + if (rtt >= 1000) { + rtt = Math.round(rtt / 1000); + rttString = String.format("%d seconds", rtt); + } else { + rttString = String.format("%d miliseconds", rtt); + } + log.info("Shutdown packet confirmation received from {}, round-trip time is {}", + encodeHexString(receipt.getDestination().getHash()), rttString); + } + } + public void packetDelivered(PacketReceipt receipt) { var rttString = ""; //log.info("packet delivered callback, receipt: {}", receipt); @@ -187,7 +295,7 @@ public class RNSPeer { rttString = String.format("%d milliseconds", rtt); } log.info("Valid reply received from {}, round-trip time is {}", - Hex.encodeHexString(receipt.getDestination().getHash()), rttString); + encodeHexString(receipt.getDestination().getHash()), rttString); } } @@ -242,7 +350,7 @@ public class RNSPeer { packetReceipt.setDeliveryCallback(this::packetDelivered); } else { log.info("can't send ping to a peer {} with (link) status: {}", - Hex.encodeHexString(peerLink.getDestination().getHash()), peerLink.getStatus()); + encodeHexString(peerLink.getDestination().getHash()), peerLink.getStatus()); } } } @@ -282,4 +390,4 @@ public class RNSPeer { // return result; //} -} \ No newline at end of file +} diff --git a/start.sh b/start.sh index 88937026..20322e4f 100755 --- a/start.sh +++ b/start.sh @@ -48,6 +48,9 @@ JVM_MEMORY_ARGS="-XX:MaxRAMPercentage=50 -XX:+UseG1GC -Xss1024k" nohup nice -n 20 java \ -Djava.net.preferIPv4Stack=false \ ${JVM_MEMORY_ARGS} \ + --add-opens=java.base/java.lang=ALL-UNNAMED \ + --add-opens=java.base/java.net=ALL-UNNAMED \ + --illegal-access=warn \ -jar qortal.jar \ 1>run.log 2>&1 &