initial compileing/working with buffer

This commit is contained in:
Jürg Schulthess 2024-12-30 14:11:30 +01:00
parent 4a81fb1ad5
commit b9c4a0c467
7 changed files with 367 additions and 218 deletions

View File

@ -6,6 +6,34 @@ rootLogger.level = info
rootLogger.appenderRef.console.ref = stdout rootLogger.appenderRef.console.ref = stdout
rootLogger.appenderRef.rolling.ref = FILE rootLogger.appenderRef.rolling.ref = FILE
### additional debug options (in case of problems eg. 202411
#to see more QDN details - add the stuff below
#logger.arbitrary.name = org.qortal.arbitrary
#logger.arbitrary.level = trace
#to see more QDN networking details - add stuff below
#logger.arbitrarycontroller.name = org.qortal.controller.arbitrary
#logger.arbitrarycontroller.level = debug
# Support optional, Network Task debugging
#logger.networkTask.name = org.qortal.network.task
#logger.networkTask.level = debug
# Support optional, Network Task tracing
#logger.networkTask.name = org.qortal.network.task
#logger.networkTask.level = trace
# Support optional, Block debugging
#logger.block.name = org.qortal.block
#logger.block.level = debug
# Support optional, Block tracing
#logger.block.name = org.qortal.block
#logger.block.level = trace
### end additional debug options
# Suppress extraneous bitcoinj library output # Suppress extraneous bitcoinj library output
logger.bitcoinj.name = org.bitcoinj logger.bitcoinj.name = org.bitcoinj
logger.bitcoinj.level = error logger.bitcoinj.level = error
@ -18,6 +46,10 @@ logger.hsqldb.level = warn
logger.hsqldbRepository.name = org.qortal.repository.hsqldb logger.hsqldbRepository.name = org.qortal.repository.hsqldb
logger.hsqldbRepository.level = debug logger.hsqldbRepository.level = debug
## Support optional, controller repository debugging
#logger.controllerRepository.name = org.qortal.controller.repository
#logger.controllerRepository.level = debug
# Suppress extraneous Jersey warning # Suppress extraneous Jersey warning
logger.jerseyInject.name = org.glassfish.jersey.internal.inject.Providers logger.jerseyInject.name = org.glassfish.jersey.internal.inject.Providers
logger.jerseyInject.level = off logger.jerseyInject.level = off

View File

@ -590,17 +590,18 @@
<artifactId>guava</artifactId> <artifactId>guava</artifactId>
<version>${guava.version}</version> <version>${guava.version}</version>
</dependency> </dependency>
<!-- logging: slf4j -->
<dependency> <dependency>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId> <artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version> <version>${slf4j.version}</version>
</dependency> </dependency>
<!-- Logging: log4j2 -->
<dependency> <dependency>
<groupId>org.apache.logging.log4j</groupId> <groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j2-impl</artifactId> <artifactId>log4j-slf4j2-impl</artifactId>
<version>${log4j.version}</version> <version>${log4j.version}</version>
</dependency> </dependency>
<!-- Logging: log4j2 -->
<dependency> <dependency>
<groupId>org.apache.logging.log4j</groupId> <groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId> <artifactId>log4j-core</artifactId>

View File

@ -1245,17 +1245,17 @@ public class Controller extends Thread {
} }
public void doRNSNetworkBroadcast() { public void doRNSNetworkBroadcast() {
if (Settings.getInstance().isLite()) { //if (Settings.getInstance().isLite()) {
// Lite nodes have nothing to broadcast // // Lite nodes have nothing to broadcast
return; // return;
} //}
RNSNetwork network = RNSNetwork.getInstance(); //RNSNetwork network = RNSNetwork.getInstance();
//// Send our current height //// Send our current height
//network.broadcastOurChain(); //network.broadcastOurChain();
//// Requiest unconfirmed transaction signatures, but only if we're up-to-date. //// Request unconfirmed transaction signatures, but only if we're up-to-date.
//// if we're not up-to-dat then then priority is synchronizing first //// if we're not up-to-dat then priority is synchronizing first
//if (isUpToDate()) { //if (isUpToDate()) {
// network.broadcast(network::buildGetUnconfirmedTransactionsMessage); // network.broadcast(network::buildGetUnconfirmedTransactionsMessage);
//} //}

View File

@ -18,7 +18,7 @@ public class RNSCommon {
* Default config * Default config
*/ */
public static String defaultRNSConfig = "reticulum_default_config.yml"; public static String defaultRNSConfig = "reticulum_default_config.yml";
public static String defaultRNSConfigTetnet = "reticulum_default_testnet_config.yml"; public static String defaultRNSConfigTestnet = "reticulum_default_testnet_config.yml";
///** ///**
// * Qortal RNS Destinations // * Qortal RNS Destinations

View File

@ -11,6 +11,9 @@ import io.reticulum.identity.Identity;
import io.reticulum.link.Link; import io.reticulum.link.Link;
import io.reticulum.link.LinkStatus; import io.reticulum.link.LinkStatus;
//import io.reticulum.constant.LinkConstant; //import io.reticulum.constant.LinkConstant;
//import static io.reticulum.constant.ReticulumConstant.MTU;
import io.reticulum.buffer.Buffer;
import io.reticulum.buffer.BufferedRWPair;
import io.reticulum.packet.Packet; import io.reticulum.packet.Packet;
import io.reticulum.packet.PacketReceipt; import io.reticulum.packet.PacketReceipt;
import io.reticulum.packet.PacketReceiptStatus; import io.reticulum.packet.PacketReceiptStatus;
@ -20,6 +23,7 @@ import io.reticulum.transport.AnnounceHandler;
import static io.reticulum.link.TeardownSession.TIMEOUT; import static io.reticulum.link.TeardownSession.TIMEOUT;
import static io.reticulum.link.LinkStatus.ACTIVE; import static io.reticulum.link.LinkStatus.ACTIVE;
import static io.reticulum.link.LinkStatus.STALE; import static io.reticulum.link.LinkStatus.STALE;
import static io.reticulum.link.LinkStatus.CLOSED;
//import static io.reticulum.link.LinkStatus.PENDING; //import static io.reticulum.link.LinkStatus.PENDING;
import static io.reticulum.link.LinkStatus.HANDSHAKE; import static io.reticulum.link.LinkStatus.HANDSHAKE;
//import static io.reticulum.packet.PacketContextType.LINKCLOSE; //import static io.reticulum.packet.PacketContextType.LINKCLOSE;
@ -42,11 +46,13 @@ import static java.nio.file.StandardOpenOption.CREATE;
import static java.nio.file.StandardOpenOption.WRITE; import static java.nio.file.StandardOpenOption.WRITE;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.channels.SelectionKey;
import static java.nio.charset.StandardCharsets.UTF_8; import static java.nio.charset.StandardCharsets.UTF_8;
//import static java.util.Objects.isNull; //import static java.util.Objects.isNull;
//import static java.util.Objects.isNull; //import static java.util.Objects.isNull;
import static java.util.Objects.nonNull; import static java.util.Objects.nonNull;
//import static org.apache.commons.lang3.BooleanUtils.isTrue;
//import static org.apache.commons.lang3.BooleanUtils.isFalse; //import static org.apache.commons.lang3.BooleanUtils.isFalse;
import java.io.File; import java.io.File;
@ -54,11 +60,19 @@ import java.util.Arrays;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Iterator;
//import java.util.Random; //import java.util.Random;
//import java.util.Scanner; //import java.util.Scanner;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
//import java.util.concurrent.locks.Lock;
//import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.codec.binary.Hex; import org.apache.commons.codec.binary.Hex;
import org.qortal.utils.ExecuteProduceConsume;
import org.qortal.utils.NamedThreadFactory;
import org.qortal.utils.ExecuteProduceConsume.StatsSnapshot;
// logging // logging
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -84,13 +98,21 @@ public class RNSNetwork {
public Destination baseDestination; public Destination baseDestination;
private volatile boolean isShuttingDown = false; private volatile boolean isShuttingDown = false;
private final List<RNSPeer> linkedPeers = Collections.synchronizedList(new ArrayList<>()); private final List<RNSPeer> linkedPeers = Collections.synchronizedList(new ArrayList<>());
private final List<Link> incomingLinks = Collections.synchronizedList(new ArrayList<>()); private List<RNSPeer> immutableLinkedPeers = Collections.emptyList();
//private final List<Link> incomingLinks = Collections.synchronizedList(new ArrayList<>());
private final List<RNSPeer> incomingPeers = Collections.synchronizedList(new ArrayList<>());
private List<RNSPeer> immutableIncomingPeers = Collections.emptyList();
////private final ExecuteProduceConsume rnsNetworkEPC; //private final ExecuteProduceConsume rnsNetworkEPC;
//private static final long NETWORK_EPC_KEEPALIVE = 1000L; // 1 second private static final long NETWORK_EPC_KEEPALIVE = 1000L; // 1 second
//private volatile boolean isShuttingDown = false; //private volatile boolean isShuttingDown = false;
//private int totalThreadCount = 0; private int totalThreadCount = 0;
//// TODO: settings - MaxReticulumPeers, MaxRNSNetworkThreadPoolSize (if needed) // TODO: settings - MaxReticulumPeers, MaxRNSNetworkThreadPoolSize (if needed)
// replicating a feature from Network.class needed in for base Message.java,
// just in case the classic TCP/IP Networking is turned off.
private static final byte[] MAINNET_MESSAGE_MAGIC = new byte[]{0x51, 0x4f, 0x52, 0x54}; // QORT
private static final byte[] TESTNET_MESSAGE_MAGIC = new byte[]{0x71, 0x6f, 0x72, 0x54}; // qorT
//private static final Logger logger = LoggerFactory.getLogger(RNSNetwork.class); //private static final Logger logger = LoggerFactory.getLogger(RNSNetwork.class);
@ -113,12 +135,12 @@ public class RNSNetwork {
log.info("reticulum instance created"); log.info("reticulum instance created");
log.info("reticulum instance created: {}", reticulum); log.info("reticulum instance created: {}", reticulum);
// Settings.getInstance().getMaxRNSNetworkThreadPoolSize(), // statically set to 5 below //// Settings.getInstance().getMaxRNSNetworkThreadPoolSize(), // statically set to 5 below
//ExecutorService RNSNetworkExecutor = new ThreadPoolExecutor(1, //ExecutorService RNSNetworkExecutor = new ThreadPoolExecutor(1,
// 5, // 5,
// NETWORK_EPC_KEEPALIVE, TimeUnit.SECONDS, // NETWORK_EPC_KEEPALIVE, TimeUnit.SECONDS,
// new SynchronousQueue<Runnable>(), // new SynchronousQueue<Runnable>(),
// new NamedThreadFactory("RNSNetwork-EPC")); // new NamedThreadFactory("RNSNetwork-EPC", Settings.getInstance().getNetworkThreadPriority()));
//rnsNetworkEPC = new RNSNetworkProcessor(RNSNetworkExecutor); //rnsNetworkEPC = new RNSNetworkProcessor(RNSNetworkExecutor);
} }
@ -170,16 +192,16 @@ public class RNSNetwork {
baseDestination.setAcceptLinkRequests(true); baseDestination.setAcceptLinkRequests(true);
baseDestination.setLinkEstablishedCallback(this::clientConnected); baseDestination.setLinkEstablishedCallback(this::clientConnected);
Transport.getInstance().registerAnnounceHandler(new QAnnounceHandler()); Transport.getInstance().registerAnnounceHandler(new QAnnounceHandler());
log.debug("announceHandlers: {}", Transport.getInstance().getAnnounceHandlers()); log.debug("announceHandlers: {}", Transport.getInstance().getAnnounceHandlers());
// do a first announce // do a first announce
baseDestination.announce(); baseDestination.announce();
log.debug("Sent initial announce from {} ({})", Hex.encodeHexString(baseDestination.getHash()), baseDestination.getName()); log.debug("Sent initial announce from {} ({})", Hex.encodeHexString(baseDestination.getHash()), baseDestination.getName());
log.info("check point 1");
// Start up first networking thread (the "server loop") // Start up first networking thread (the "server loop")
//rnsNetworkEPC.start(); //rnsNetworkEPC.start();
log.info("check point 2");
} }
private void initConfig(String configDir) throws IOException { private void initConfig(String configDir) throws IOException {
@ -193,7 +215,7 @@ public class RNSNetwork {
if (Files.notExists(configFile)) { if (Files.notExists(configFile)) {
var defaultConfig = this.getClass().getClassLoader().getResourceAsStream(RNSCommon.defaultRNSConfig); var defaultConfig = this.getClass().getClassLoader().getResourceAsStream(RNSCommon.defaultRNSConfig);
if (Settings.getInstance().isTestNet()) { if (Settings.getInstance().isTestNet()) {
defaultConfig = this.getClass().getClassLoader().getResourceAsStream(RNSCommon.defaultRNSConfigTetnet); defaultConfig = this.getClass().getClassLoader().getResourceAsStream(RNSCommon.defaultRNSConfigTestnet);
} }
Files.copy(defaultConfig, configFile, StandardCopyOption.REPLACE_EXISTING); Files.copy(defaultConfig, configFile, StandardCopyOption.REPLACE_EXISTING);
} }
@ -203,19 +225,25 @@ public class RNSNetwork {
isShuttingDown = true; isShuttingDown = true;
log.info("shutting down Reticulum"); log.info("shutting down Reticulum");
// Stop processing threads (the "server loop") // gracefully close links of peers that point to us
for (RNSPeer p: incomingPeers) {
var pl = p.getPeerLink();
if (nonNull(pl) & (pl.getStatus() == ACTIVE)) {
p.sendCloseToRemote(pl);
}
}
//// Stop processing threads (the "server loop")
//try { //try {
// if (!this.rnsNetworkEPC.shutdown(5000)) { // if (!this.rnsNetworkEPC.shutdown(5000)) {
// logger.warn("RNSNetwork threads failed to terminate"); // log.warn("RNSNetwork threads failed to terminate");
// } // }
//} catch (InterruptedException e) { //} catch (InterruptedException e) {
// logger.warn("Interrupted while waiting for RNS networking threads to terminate"); // log.warn("Interrupted while waiting for RNS networking threads to terminate");
//} //}
// Disconnect peers gracefully and terminate Reticulum // Disconnect peers gracefully and terminate Reticulum
for (RNSPeer p: linkedPeers) { for (RNSPeer p: linkedPeers) {
log.info("shutting down peer: {}", Hex.encodeHexString(p.getDestinationHash())); log.info("shutting down peer: {}", Hex.encodeHexString(p.getDestinationHash()));
log.debug("peer: {}", p); //log.debug("peer: {}", p);
p.shutdown(); p.shutdown();
try { try {
TimeUnit.SECONDS.sleep(1); // allow for peers to disconnect gracefully TimeUnit.SECONDS.sleep(1); // allow for peers to disconnect gracefully
@ -223,10 +251,6 @@ public class RNSNetwork {
log.error("exception: ", e); log.error("exception: ", e);
} }
} }
// gracefully close links of peers that point to us
for (Link l: incomingLinks) {
sendCloseToRemote(l);
}
// Note: we still need to get the packet timeout callback to work... // Note: we still need to get the packet timeout callback to work...
reticulum.exitHandler(); reticulum.exitHandler();
} }
@ -264,61 +288,24 @@ public class RNSNetwork {
} }
public void clientConnected(Link link) { public void clientConnected(Link link) {
link.setLinkClosedCallback(this::clientDisconnected); //link.setLinkClosedCallback(this::clientDisconnected);
link.setPacketCallback(this::serverPacketReceived); //link.setPacketCallback(this::serverPacketReceived);
var peer = findPeerByLink(link); log.info("clientConnected - link hash: {}, {}", link.getHash(), Hex.encodeHexString(link.getHash()));
if (nonNull(peer)) { RNSPeer newPeer = new RNSPeer(link);
log.info("initiator peer {} opened link (link lookup: {}), link destination hash: {}", newPeer.setPeerLinkHash(link.getHash());
Hex.encodeHexString(peer.getDestinationHash()), link, Hex.encodeHexString(link.getDestination().getHash())); // make sure the peer has a channel and buffer
// make sure the peerLink is actvive. newPeer.getOrInitPeerBuffer();
peer.getOrInitPeerLink(); incomingPeers.add(newPeer);
} else {
log.info("non-initiator closed link (link lookup: {}), link destination hash (initiator): {}",
link, Hex.encodeHexString(link.getDestination().getHash()));
}
incomingLinks.add(link);
log.info("***> Client connected, link: {}", link); log.info("***> Client connected, link: {}", link);
} }
public void clientDisconnected(Link link) { public void clientDisconnected(Link link) {
var peer = findPeerByLink(link);
if (nonNull(peer)) {
log.info("initiator peer {} closed link (link lookup: {}), link destination hash: {}",
Hex.encodeHexString(peer.getDestinationHash()), link, Hex.encodeHexString(link.getDestination().getHash()));
} else {
log.info("non-initiator closed link (link lookup: {}), link destination hash (initiator): {}",
link, Hex.encodeHexString(link.getDestination().getHash()));
}
// if we have a peer pointing to that destination, we can close and remove it
peer = findPeerByDestinationHash(link.getDestination().getHash());
if (nonNull(peer)) {
// Note: no shutdown as the remobe peer could be only rebooting.
// keep it to reopen link later if possible.
peer.getPeerLink().teardown();
}
incomingLinks.remove(link);
log.info("***> Client disconnected"); log.info("***> Client disconnected");
} }
public void serverPacketReceived(byte[] message, Packet packet) { public void serverPacketReceived(byte[] message, Packet packet) {
var msgText = new String(message, StandardCharsets.UTF_8); var msgText = new String(message, StandardCharsets.UTF_8);
log.info("Received data on link - message: {}, destinationHash: {}", msgText, Hex.encodeHexString(packet.getDestinationHash())); log.info("Received data on link - message: {}, destinationHash: {}", msgText, Hex.encodeHexString(packet.getDestinationHash()));
//var peer = findPeerByDestinationHash(packet.getDestinationHash());
//if (msgText.equals("ping")) {
// log.info("received ping");
// //if (nonNull(peer)) {
// // String replyText = "pong";
// // byte[] replyData = replyText.getBytes(StandardCharsets.UTF_8);
// // Packet reply = new Packet(peer.getPeerLink(), replyData);
// //}
//}
//if (msgText.equals("shutdown")) {
// log.info("shutdown packet received");
// var link = recall(packet.getDestinationHash());
// log.info("recalled destinationHash: {}", link);
// //...
//}
// TODO: process packet....
} }
//public void announceBaseDestination () { //public void announceBaseDestination () {
@ -328,9 +315,6 @@ public class RNSNetwork {
private class QAnnounceHandler implements AnnounceHandler { private class QAnnounceHandler implements AnnounceHandler {
@Override @Override
public String getAspectFilter() { public String getAspectFilter() {
// handle all announces
//return null;
// handle cortal.core announces
return "qortal.core"; return "qortal.core";
} }
@ -381,7 +365,8 @@ public class RNSNetwork {
RNSPeer newPeer = new RNSPeer(destinationHash); RNSPeer newPeer = new RNSPeer(destinationHash);
newPeer.setServerIdentity(announcedIdentity); newPeer.setServerIdentity(announcedIdentity);
newPeer.setIsInitiator(true); newPeer.setIsInitiator(true);
lps.add(newPeer); //lps.add(newPeer);
addLinkedPeer(newPeer);
log.info("added new RNSPeer, destinationHash: {}", Hex.encodeHexString(destinationHash)); log.info("added new RNSPeer, destinationHash: {}", Hex.encodeHexString(destinationHash));
} }
} }
@ -390,62 +375,60 @@ public class RNSNetwork {
} }
// Main thread // Main thread
//class RNSNetworkProcessor extends ExecuteProduceConsume { class RNSNetworkProcessor extends ExecuteProduceConsume {
//
// //private final Logger logger = LoggerFactory.getLogger(RNSNetworkProcessor.class); //private final Logger logger = LoggerFactory.getLogger(RNSNetworkProcessor.class);
//
// private final AtomicLong nextConnectTaskTimestamp = new AtomicLong(0L); // ms - try first connect once NTP syncs private final AtomicLong nextConnectTaskTimestamp = new AtomicLong(0L); // ms - try first connect once NTP syncs
// private final AtomicLong nextBroadcastTimestamp = new AtomicLong(0L); // ms - try first broadcast once NTP syncs private final AtomicLong nextBroadcastTimestamp = new AtomicLong(0L); // ms - try first broadcast once NTP syncs
//
// private Iterator<SelectionKey> channelIterator = null; private Iterator<SelectionKey> channelIterator = null;
//
// RNSNetworkProcessor(ExecutorService executor) { RNSNetworkProcessor(ExecutorService executor) {
// super(executor); super(executor);
// } }
//
// @Override @Override
// protected void onSpawnFailure() { protected void onSpawnFailure() {
// // For debugging: // For debugging:
// // ExecutorDumper.dump(this.executor, 3, ExecuteProduceConsume.class); // ExecutorDumper.dump(this.executor, 3, ExecuteProduceConsume.class);
// } }
//
// @Override @Override
// protected Task produceTask(boolean canBlock) throws InterruptedException { protected Task produceTask(boolean canBlock) throws InterruptedException {
// Task task; Task task;
//
// //task = maybeProducePeerMessageTask(); //task = maybeProducePeerMessageTask();
// //if (task != null) { //if (task != null) {
// // return task; // return task;
// //}
// //
// //final Long now = NTP.getTime();
// //
// //task = maybeProducePeerPingTask(now);
// //if (task != null) {
// // return task;
// //}
// //
// //task = maybeProduceConnectPeerTask(now);
// //if (task != null) {
// // return task;
// //}
// //
// //task = maybeProduceBroadcastTask(now);
// //if (task != null) {
// // return task;
// //}
// //
// // Only this method can block to reduce CPU spin
// //return maybeProduceChannelTask(canBlock);
//
// // TODO: flesh out the tasks handled by Reticulum
// return null;
// }
// //...TODO: implement abstract methods...
//} //}
//
//final Long now = NTP.getTime();
//
//task = maybeProducePeerPingTask(now);
//if (task != null) {
// return task;
//}
//
//task = maybeProduceConnectPeerTask(now);
//if (task != null) {
// return task;
//}
//
//task = maybeProduceBroadcastTask(now);
//if (task != null) {
// return task;
//}
//
// Only this method can block to reduce CPU spin
//return maybeProduceChannelTask(canBlock);
// TODO: flesh out the tasks handled by Reticulum
return null;
}
//...TODO: implement abstract methods...
}
// getter / setter
private static class SingletonContainer { private static class SingletonContainer {
private static final RNSNetwork INSTANCE = new RNSNetwork(); private static final RNSNetwork INSTANCE = new RNSNetwork();
} }
@ -454,30 +437,43 @@ public class RNSNetwork {
return SingletonContainer.INSTANCE; return SingletonContainer.INSTANCE;
} }
//public Identity getServerIdentity() { //public List<RNSPeer> getImmutableLinkedPeers() {
// return this.serverIdentity; // return this.immutableLinkedPeers;
//} //}
//public Reticulum getReticulum() { public void addLinkedPeer(RNSPeer peer) {
// return this.reticulum; this.linkedPeers.add(peer);
//} this.immutableLinkedPeers = List.copyOf(this.linkedPeers); // thread safe
}
public void removeLinkedPeer(RNSPeer peer) {
if (nonNull(peer.getPeerLink())) {
peer.getPeerLink().teardown();
}
this.linkedPeers.remove(peer); // thread safe
this.immutableLinkedPeers = List.copyOf(this.linkedPeers);
}
public List<RNSPeer> getLinkedPeers() { public List<RNSPeer> getLinkedPeers() {
synchronized(this.linkedPeers) { //synchronized(this.linkedPeers) {
//return new ArrayList<>(this.linkedPeers); //return new ArrayList<>(this.linkedPeers);
return this.linkedPeers; return this.linkedPeers;
}
}
public Integer getTotalPeers() {
synchronized (this) {
return linkedPeers.size();
}
}
//public Destination getBaseDestination() {
// return baseDestination;
//} //}
}
public void removeIncommingPeer(RNSPeer peer) {
if (nonNull(peer.getPeerLink())) {
peer.getPeerLink().teardown();
}
this.incomingPeers.remove(peer);
this.immutableIncomingPeers = List.copyOf(this.incomingPeers);
}
public List<RNSPeer> getIncomingPeers() {
return this.incomingPeers;
}
// TODO, methods for: getAvailablePeer
// maintenance // maintenance
//public void removePeer(RNSPeer peer) { //public void removePeer(RNSPeer peer) {
@ -496,12 +492,12 @@ public class RNSNetwork {
// } // }
//} //}
@Synchronized //@Synchronized
public void prunePeers() throws DataException { public void prunePeers() throws DataException {
// run periodically (by the Controller) // run periodically (by the Controller)
//List<Link> linkList = getLinkedPeers(); //List<Link> linkList = getLinkedPeers();
var peerList = getLinkedPeers(); var peerList = getLinkedPeers();
log.info("number of links (linkedPeers) before prunig: {}", peerList.size()); log.info("number of links (linkedPeers) before pruning: {}", peerList.size());
Link pLink; Link pLink;
LinkStatus lStatus; LinkStatus lStatus;
for (RNSPeer p: peerList) { for (RNSPeer p: peerList) {
@ -518,16 +514,19 @@ public class RNSNetwork {
log.info("Link {} status: {}", pLink, lStatus); log.info("Link {} status: {}", pLink, lStatus);
// lStatus in: PENDING, HANDSHAKE, ACTIVE, STALE, CLOSED // lStatus in: PENDING, HANDSHAKE, ACTIVE, STALE, CLOSED
if ((lStatus == STALE) || (pLink.getTeardownReason() == TIMEOUT) || (p.getDeleteMe())) { if ((lStatus == STALE) || (pLink.getTeardownReason() == TIMEOUT) || (p.getDeleteMe())) {
p.shutdown(); //p.shutdown();
peerList.remove(p); //peerList.remove(p);
removeLinkedPeer(p);
} else if (lStatus == HANDSHAKE) { } else if (lStatus == HANDSHAKE) {
// stuck in handshake state (do we need to shutdown/remove it?) // stuck in handshake state (do we need to shutdown/remove it?)
log.info("peer status HANDSHAKE"); log.info("peer status HANDSHAKE");
p.shutdown(); //p.shutdown();
peerList.remove(p); //peerList.remove(p);
removeLinkedPeer(p);
} }
} else { } else {
peerList.remove(p); //peerList.remove(p);
removeLinkedPeer(p);
} }
} }
//removeExpiredPeers(this.linkedPeers); //removeExpiredPeers(this.linkedPeers);
@ -602,7 +601,8 @@ public class RNSNetwork {
//} //}
public RNSPeer findPeerByLink(Link link) { public RNSPeer findPeerByLink(Link link) {
List<RNSPeer> lps = RNSNetwork.getInstance().getLinkedPeers(); //List<RNSPeer> lps = RNSNetwork.getInstance().getLinkedPeers();
List<RNSPeer> lps = RNSNetwork.getInstance().getImmutableLinkedPeers();
RNSPeer peer = null; RNSPeer peer = null;
for (RNSPeer p : lps) { for (RNSPeer p : lps) {
var pLink = p.getPeerLink(); var pLink = p.getPeerLink();
@ -618,7 +618,8 @@ public class RNSNetwork {
} }
public RNSPeer findPeerByDestinationHash(byte[] dhash) { public RNSPeer findPeerByDestinationHash(byte[] dhash) {
List<RNSPeer> lps = RNSNetwork.getInstance().getLinkedPeers(); //List<RNSPeer> lps = RNSNetwork.getInstance().getLinkedPeers();
List<RNSPeer> lps = RNSNetwork.getInstance().getImmutableLinkedPeers();
RNSPeer peer = null; RNSPeer peer = null;
for (RNSPeer p : lps) { for (RNSPeer p : lps) {
if (Arrays.equals(p.getDestinationHash(), dhash)) { if (Arrays.equals(p.getDestinationHash(), dhash)) {
@ -630,11 +631,15 @@ public class RNSNetwork {
return peer; return peer;
} }
public void removePeer(RNSPeer peer) { //public void removePeer(RNSPeer peer) {
List<RNSPeer> peerList = this.linkedPeers; // List<RNSPeer> peerList = this.linkedPeers;
if (nonNull(peer)) { // if (nonNull(peer)) {
peerList.remove(peer); // peerList.remove(peer);
} // }
//}
public byte[] getMessageMagic() {
return Settings.getInstance().isTestNet() ? TESTNET_MESSAGE_MAGIC : MAINNET_MESSAGE_MAGIC;
} }
} }

View File

@ -30,13 +30,18 @@ import static io.reticulum.link.LinkStatus.ACTIVE;
//import static io.reticulum.link.LinkStatus.CLOSED; //import static io.reticulum.link.LinkStatus.CLOSED;
import static io.reticulum.identity.IdentityKnownDestination.recall; import static io.reticulum.identity.IdentityKnownDestination.recall;
//import static io.reticulum.identity.IdentityKnownDestination.recallAppData; //import static io.reticulum.identity.IdentityKnownDestination.recallAppData;
import io.reticulum.buffer.Buffer;
import io.reticulum.buffer.BufferedRWPair;
import static io.reticulum.utils.IdentityUtils.concatArrays;
import org.qortal.settings.Settings; import org.qortal.settings.Settings;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import static java.nio.charset.StandardCharsets.UTF_8; import static java.nio.charset.StandardCharsets.UTF_8;
import org.apache.commons.codec.binary.Hex; import static org.apache.commons.codec.binary.Hex.encodeHexString;
import static org.apache.commons.lang3.ArrayUtils.subarray; import static org.apache.commons.lang3.ArrayUtils.subarray;
import static org.apache.commons.lang3.BooleanUtils.isFalse;
import static org.apache.commons.lang3.BooleanUtils.isTrue;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import lombok.Setter; import lombok.Setter;
@ -57,20 +62,48 @@ public class RNSPeer {
@Setter(AccessLevel.PACKAGE) private Instant creationTimestamp; @Setter(AccessLevel.PACKAGE) private Instant creationTimestamp;
private Instant lastAccessTimestamp; private Instant lastAccessTimestamp;
Link peerLink; Link peerLink;
byte[] peerLinkHash;
BufferedRWPair peerBuffer;
int receiveStreamId = 0;
int sendStreamId = 0;
private Boolean isInitiator; private Boolean isInitiator;
private Boolean deleteMe = false; private Boolean deleteMe = false;
private Boolean isVacant = true;
private Double requestResponseProgress; private Double requestResponseProgress;
@Setter(AccessLevel.PACKAGE) private Boolean peerTimedOut = false; @Setter(AccessLevel.PACKAGE) private Boolean peerTimedOut = false;
/**
* Constructor for initiator peers
*/
public RNSPeer(byte[] dhash) { public RNSPeer(byte[] dhash) {
destinationHash = dhash; this.destinationHash = dhash;
serverIdentity = recall(dhash); this.serverIdentity = recall(dhash);
initPeerLink(); initPeerLink();
//setCreationTimestamp(System.currentTimeMillis()); //setCreationTimestamp(System.currentTimeMillis());
creationTimestamp = Instant.now(); this.creationTimestamp = Instant.now();
this.isVacant = true;
} }
/**
* Constructor for non-initiator peers
*/
public RNSPeer(Link link) {
this.peerLink = link;
//this.peerLinkId = link.getLinkId();
this.peerDestination = link.getDestination();
this.destinationHash = link.getDestination().getHash();
this.serverIdentity = link.getRemoteIdentity();
this.creationTimestamp = Instant.now();
this.lastAccessTimestamp = null;
this.isInitiator = false;
this.isVacant = false;
//this.peerLink.setLinkEstablishedCallback(this::linkEstablished);
//this.peerLink.setLinkClosedCallback(this::linkClosed);
//this.peerLink.setPacketCallback(this::linkPacketReceived);
}
public void initPeerLink() { public void initPeerLink() {
peerDestination = new Destination( peerDestination = new Destination(
this.serverIdentity, this.serverIdentity,
@ -81,16 +114,32 @@ public class RNSPeer {
); );
peerDestination.setProofStrategy(ProofStrategy.PROVE_ALL); peerDestination.setProofStrategy(ProofStrategy.PROVE_ALL);
lastAccessTimestamp = Instant.now(); this.creationTimestamp = Instant.now();
isInitiator = true; this.lastAccessTimestamp = null;
this.isInitiator = true;
peerLink = new Link(peerDestination); this.peerLink = new Link(peerDestination);
this.peerLink.setLinkEstablishedCallback(this::linkEstablished); this.peerLink.setLinkEstablishedCallback(this::linkEstablished);
this.peerLink.setLinkClosedCallback(this::linkClosed); this.peerLink.setLinkClosedCallback(this::linkClosed);
this.peerLink.setPacketCallback(this::linkPacketReceived); this.peerLink.setPacketCallback(this::linkPacketReceived);
} }
public BufferedRWPair getOrInitPeerBuffer() {
var channel = this.peerLink.getChannel();
if (nonNull(this.peerBuffer)) {
log.info("peerBuffer exists: {}, link status: {}", this.peerBuffer, this.peerLink.getStatus());
this.peerBuffer.close();
this.peerBuffer = Buffer.createBidirectionalBuffer(receiveStreamId, sendStreamId, channel, this::peerBufferReady);
//return this.peerBuffer;
}
else {
log.info("creating buffer - peerLink status: {}, channel: {}", this.peerLink.getStatus(), channel);
this.peerBuffer = Buffer.createBidirectionalBuffer(receiveStreamId, sendStreamId, channel, this::peerBufferReady);
}
return getPeerBuffer();
}
public Link getOrInitPeerLink() { public Link getOrInitPeerLink() {
if (this.peerLink.getStatus() == ACTIVE) { if (this.peerLink.getStatus() == ACTIVE) {
lastAccessTimestamp = Instant.now(); lastAccessTimestamp = Instant.now();
@ -102,10 +151,15 @@ public class RNSPeer {
} }
public void shutdown() { public void shutdown() {
if (nonNull(peerLink)) { if (nonNull(this.peerLink)) {
log.info("shutdown - peerLink: {}, status: {}", peerLink, peerLink.getStatus()); log.info("shutdown - peerLink: {}, status: {}", peerLink, peerLink.getStatus());
if (peerLink.getStatus() == ACTIVE) { if (peerLink.getStatus() == ACTIVE) {
if (isFalse(this.isInitiator)) {
sendCloseToRemote(this.peerLink);
}
peerLink.teardown(); peerLink.teardown();
}else {
log.info("shutdown - status (non-ACTIVE): {}", peerLink.getStatus());
} }
this.peerLink = null; this.peerLink = null;
} }
@ -125,8 +179,8 @@ public class RNSPeer {
public void linkEstablished(Link link) { public void linkEstablished(Link link) {
link.setLinkClosedCallback(this::linkClosed); link.setLinkClosedCallback(this::linkClosed);
log.info("peerLink {} established (link: {}) with peer: hash - {}, link destination hash: {}", log.info("peerLink {} established (link: {}) with peer: hash - {}, link destination hash: {}",
peerLink, link, Hex.encodeHexString(destinationHash), peerLink, link, encodeHexString(destinationHash),
Hex.encodeHexString(link.getDestination().getHash())); encodeHexString(link.getDestination().getHash()));
} }
public void linkClosed(Link link) { public void linkClosed(Link link) {
@ -136,11 +190,11 @@ public class RNSPeer {
} else if (link.getTeardownReason() == INITIATOR_CLOSED) { } else if (link.getTeardownReason() == INITIATOR_CLOSED) {
log.info("Link closed callback: The initiator closed the link"); log.info("Link closed callback: The initiator closed the link");
log.info("peerLink {} closed (link: {}), link destination hash: {}", log.info("peerLink {} closed (link: {}), link destination hash: {}",
peerLink, link, Hex.encodeHexString(link.getDestination().getHash())); peerLink, link, encodeHexString(link.getDestination().getHash()));
} else if (link.getTeardownReason() == DESTINATION_CLOSED) { } else if (link.getTeardownReason() == DESTINATION_CLOSED) {
log.info("Link closed callback: The link was closed by the peer, removing peer"); log.info("Link closed callback: The link was closed by the peer, removing peer");
log.info("peerLink {} closed (link: {}), link destination hash: {}", log.info("peerLink {} closed (link: {}), link destination hash: {}",
peerLink, link, Hex.encodeHexString(link.getDestination().getHash())); peerLink, link, encodeHexString(link.getDestination().getHash()));
} else { } else {
log.info("Link closed callback"); log.info("Link closed callback");
} }
@ -153,8 +207,8 @@ public class RNSPeer {
} else if (msgText.startsWith("close::")) { } else if (msgText.startsWith("close::")) {
var targetPeerHash = subarray(message, 7, message.length); var targetPeerHash = subarray(message, 7, message.length);
log.info("peer dest hash: {}, target hash: {}", log.info("peer dest hash: {}, target hash: {}",
Hex.encodeHexString(destinationHash), encodeHexString(destinationHash),
Hex.encodeHexString(targetPeerHash)); encodeHexString(targetPeerHash));
if (Arrays.equals(destinationHash, targetPeerHash)) { if (Arrays.equals(destinationHash, targetPeerHash)) {
log.info("closing link: {}", peerLink.getDestination().getHexHash()); log.info("closing link: {}", peerLink.getDestination().getHexHash());
peerLink.teardown(); peerLink.teardown();
@ -162,8 +216,8 @@ public class RNSPeer {
} else if (msgText.startsWith("open::")) { } else if (msgText.startsWith("open::")) {
var targetPeerHash = subarray(message, 7, message.length); var targetPeerHash = subarray(message, 7, message.length);
log.info("peer dest hash: {}, target hash: {}", log.info("peer dest hash: {}, target hash: {}",
Hex.encodeHexString(destinationHash), encodeHexString(destinationHash),
Hex.encodeHexString(targetPeerHash)); encodeHexString(targetPeerHash));
if (Arrays.equals(destinationHash, targetPeerHash)) { if (Arrays.equals(destinationHash, targetPeerHash)) {
log.info("closing link: {}", peerLink.getDestination().getHexHash()); log.info("closing link: {}", peerLink.getDestination().getHexHash());
getOrInitPeerLink(); getOrInitPeerLink();
@ -172,8 +226,62 @@ public class RNSPeer {
// TODO: process incoming packet.... // TODO: process incoming packet....
} }
/*
* Callback from buffer when buffer has data available
*
* :param readyBytes: The number of bytes ready to read
*/
public void peerBufferReady(Integer readyBytes) {
var data = this.peerBuffer.read(readyBytes);
var decodedData = new String(data);
log.info("Received data over the buffer: {}", decodedData);
//if (isFalse(this.isInitiator)) {
// // TODO: process data and reply
//} else {
// this.peerBuffer.flush(); // clear buffer
//}
}
/**
* Set a packet to remote with the message format "close::<our_destination_hash>"
* This method is only useful for non-initiator links to close the remote initiator.
*
* @param link
*/
public void sendCloseToRemote(Link link) {
var baseDestination = RNSNetwork.getInstance().getBaseDestination();
if (nonNull(link) & (isFalse(link.isInitiator()))) {
// Note: if part of link we need to get the baseDesitination hash
//var data = concatArrays("close::".getBytes(UTF_8),link.getDestination().getHash());
var data = concatArrays("close::".getBytes(UTF_8), baseDestination.getHash());
Packet closePacket = new Packet(link, data);
var packetReceipt = closePacket.send();
packetReceipt.setDeliveryCallback(this::closePacketDelivered);
packetReceipt.setTimeout(1000L);
packetReceipt.setTimeoutCallback(this::packetTimedOut);
} else {
log.debug("can't send to null link");
}
}
/** PacketReceipt callbacks */ /** PacketReceipt callbacks */
public void closePacketDelivered(PacketReceipt receipt) {
var rttString = new String("");
if (receipt.getStatus() == PacketReceiptStatus.DELIVERED) {
var rtt = receipt.getRtt(); // rtt (Java) is in milliseconds
if (rtt >= 1000) {
rtt = Math.round(rtt / 1000);
rttString = String.format("%d seconds", rtt);
} else {
rttString = String.format("%d miliseconds", rtt);
}
log.info("Shutdown packet confirmation received from {}, round-trip time is {}",
encodeHexString(receipt.getDestination().getHash()), rttString);
}
}
public void packetDelivered(PacketReceipt receipt) { public void packetDelivered(PacketReceipt receipt) {
var rttString = ""; var rttString = "";
//log.info("packet delivered callback, receipt: {}", receipt); //log.info("packet delivered callback, receipt: {}", receipt);
@ -187,7 +295,7 @@ public class RNSPeer {
rttString = String.format("%d milliseconds", rtt); rttString = String.format("%d milliseconds", rtt);
} }
log.info("Valid reply received from {}, round-trip time is {}", log.info("Valid reply received from {}, round-trip time is {}",
Hex.encodeHexString(receipt.getDestination().getHash()), rttString); encodeHexString(receipt.getDestination().getHash()), rttString);
} }
} }
@ -242,7 +350,7 @@ public class RNSPeer {
packetReceipt.setDeliveryCallback(this::packetDelivered); packetReceipt.setDeliveryCallback(this::packetDelivered);
} else { } else {
log.info("can't send ping to a peer {} with (link) status: {}", log.info("can't send ping to a peer {} with (link) status: {}",
Hex.encodeHexString(peerLink.getDestination().getHash()), peerLink.getStatus()); encodeHexString(peerLink.getDestination().getHash()), peerLink.getStatus());
} }
} }
} }

View File

@ -48,6 +48,9 @@ JVM_MEMORY_ARGS="-XX:MaxRAMPercentage=50 -XX:+UseG1GC -Xss1024k"
nohup nice -n 20 java \ nohup nice -n 20 java \
-Djava.net.preferIPv4Stack=false \ -Djava.net.preferIPv4Stack=false \
${JVM_MEMORY_ARGS} \ ${JVM_MEMORY_ARGS} \
--add-opens=java.base/java.lang=ALL-UNNAMED \
--add-opens=java.base/java.net=ALL-UNNAMED \
--illegal-access=warn \
-jar qortal.jar \ -jar qortal.jar \
1>run.log 2>&1 & 1>run.log 2>&1 &