RNSPingTask working

This commit is contained in:
Jürg Schulthess 2025-01-10 20:08:32 +01:00
parent b9c4a0c467
commit 5b519990cd
3 changed files with 366 additions and 80 deletions

View File

@ -68,11 +68,13 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
//import java.util.concurrent.locks.Lock;
//import java.util.concurrent.locks.ReentrantLock;
import java.util.Objects;
import org.apache.commons.codec.binary.Hex;
import org.qortal.utils.ExecuteProduceConsume;
import org.qortal.utils.NamedThreadFactory;
import org.qortal.utils.ExecuteProduceConsume.StatsSnapshot;
import org.qortal.utils.NTP;
import org.qortal.utils.NamedThreadFactory;
// logging
import lombok.extern.slf4j.Slf4j;
@ -97,13 +99,20 @@ public class RNSNetwork {
Identity serverIdentity;
public Destination baseDestination;
private volatile boolean isShuttingDown = false;
/**
* Maintain two lists for each subset of peers
* => a synchronizedList, modified when peers are added/removed
* => an immutable List, automatically rebuild to mirror synchronizedList, served to consumers
* linkedPeers are "initiators" (containing initiator reticulum Link), actively doing work.
* incomimgPeers are "non-initiators", the passive end of bidirectional Reticulum Buffers.
*/
private final List<RNSPeer> linkedPeers = Collections.synchronizedList(new ArrayList<>());
private List<RNSPeer> immutableLinkedPeers = Collections.emptyList();
//private final List<Link> incomingLinks = Collections.synchronizedList(new ArrayList<>());
private final List<RNSPeer> incomingPeers = Collections.synchronizedList(new ArrayList<>());
private List<RNSPeer> immutableIncomingPeers = Collections.emptyList();
//private final ExecuteProduceConsume rnsNetworkEPC;
private final ExecuteProduceConsume rnsNetworkEPC;
private static final long NETWORK_EPC_KEEPALIVE = 1000L; // 1 second
//private volatile boolean isShuttingDown = false;
private int totalThreadCount = 0;
@ -135,13 +144,13 @@ public class RNSNetwork {
log.info("reticulum instance created");
log.info("reticulum instance created: {}", reticulum);
//// Settings.getInstance().getMaxRNSNetworkThreadPoolSize(), // statically set to 5 below
//ExecutorService RNSNetworkExecutor = new ThreadPoolExecutor(1,
// 5,
// NETWORK_EPC_KEEPALIVE, TimeUnit.SECONDS,
// new SynchronousQueue<Runnable>(),
// new NamedThreadFactory("RNSNetwork-EPC", Settings.getInstance().getNetworkThreadPriority()));
//rnsNetworkEPC = new RNSNetworkProcessor(RNSNetworkExecutor);
// Settings.getInstance().getMaxRNSNetworkThreadPoolSize(), // statically set to 5 below
ExecutorService RNSNetworkExecutor = new ThreadPoolExecutor(1,
5,
NETWORK_EPC_KEEPALIVE, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new NamedThreadFactory("RNSNetwork-EPC", Settings.getInstance().getNetworkThreadPriority()));
rnsNetworkEPC = new RNSNetworkProcessor(RNSNetworkExecutor);
}
// Note: potentially create persistent serverIdentity (utility rnid) and load it from file
@ -198,10 +207,8 @@ public class RNSNetwork {
baseDestination.announce();
log.debug("Sent initial announce from {} ({})", Hex.encodeHexString(baseDestination.getHash()), baseDestination.getName());
log.info("check point 1");
// Start up first networking thread (the "server loop")
//rnsNetworkEPC.start();
log.info("check point 2");
// Start up first networking thread (the "server loop", JS: the "Tasks engine")
rnsNetworkEPC.start();
}
private void initConfig(String configDir) throws IOException {
@ -232,14 +239,6 @@ public class RNSNetwork {
p.sendCloseToRemote(pl);
}
}
//// Stop processing threads (the "server loop")
//try {
// if (!this.rnsNetworkEPC.shutdown(5000)) {
// log.warn("RNSNetwork threads failed to terminate");
// }
//} catch (InterruptedException e) {
// log.warn("Interrupted while waiting for RNS networking threads to terminate");
//}
// Disconnect peers gracefully and terminate Reticulum
for (RNSPeer p: linkedPeers) {
log.info("shutting down peer: {}", Hex.encodeHexString(p.getDestinationHash()));
@ -251,6 +250,14 @@ public class RNSNetwork {
log.error("exception: ", e);
}
}
// Stop processing threads (the "server loop")
try {
if (!this.rnsNetworkEPC.shutdown(5000)) {
log.warn("RNSNetwork threads failed to terminate");
}
} catch (InterruptedException e) {
log.warn("Interrupted while waiting for RNS networking threads to terminate");
}
// Note: we still need to get the packet timeout callback to work...
reticulum.exitHandler();
}
@ -340,8 +347,6 @@ public class RNSNetwork {
}
}
if (activePeerCount < MAX_PEERS) {
//if (!peerExists) {
//var peer = findPeerByDestinationHash(destinationHash);
for (RNSPeer p: lps) {
if (Arrays.equals(p.getDestinationHash(), destinationHash)) {
log.info("QAnnounceHandler - peer exists - found peer matching destinationHash");
@ -365,12 +370,10 @@ public class RNSNetwork {
RNSPeer newPeer = new RNSPeer(destinationHash);
newPeer.setServerIdentity(announcedIdentity);
newPeer.setIsInitiator(true);
//lps.add(newPeer);
addLinkedPeer(newPeer);
log.info("added new RNSPeer, destinationHash: {}", Hex.encodeHexString(destinationHash));
}
}
//}
}
}
@ -402,31 +405,68 @@ public class RNSNetwork {
//if (task != null) {
// return task;
//}
//
//final Long now = NTP.getTime();
//
//task = maybeProducePeerPingTask(now);
//if (task != null) {
// return task;
//}
//
//task = maybeProduceConnectPeerTask(now);
//if (task != null) {
// return task;
//}
//
final Long now = NTP.getTime();
task = maybeProducePeerPingTask(now);
if (task != null) {
return task;
}
//task = maybeProduceBroadcastTask(now);
//if (task != null) {
// return task;
//}
//
// Only this method can block to reduce CPU spin
//return maybeProduceChannelTask(canBlock);
// TODO: flesh out the tasks handled by Reticulum
return null;
}
//...TODO: implement abstract methods...
//private Task maybeProducePeerMessageTask() {
// return getImmutableConnectedPeers().stream()
// .map(Peer::getMessageTask)
// .filter(Objects::nonNull)
// .findFirst()
// .orElse(null);
//}
//private Task maybeProducePeerMessageTask() {
// return getImmutableIncommingPeers().stream()
// .map(RNSPeer::getMessageTask)
// .filter(RNSPeer::isAvailable)
// .findFirst*()
// .orElse(null);
//}
//private Task maybeProducePeerPingTask(Long now) {
// return getImmutableHandshakedPeers().stream()
// .map(peer -> peer.getPingTask(now))
// .filter(Objects::nonNull)
// .findFirst()
// .orElse(null);
//}
private Task maybeProducePeerPingTask(Long now) {
var ilp = getImmutableLinkedPeers().stream()
.map(peer -> peer.getPingTask(now))
.filter(Objects::nonNull)
.findFirst()
.orElse(null);
if (nonNull(ilp)) {
log.info("ilp - {}", ilp);
}
return ilp;
//return getImmutableLinkedPeers().stream()
// .map(peer -> peer.getPingTask(now))
// .filter(Objects::nonNull)
// .findFirst()
// .orElse(null);
}
//private Task maybeProduceBroadcastTask(Long now) {
// if (now == null || now < nextBroadcastTimestamp.get()) {
// return null;
// }
//
// nextBroadcastTimestamp.set(now + BROADCAST_INTERVAL);
// return new BroadcastTask();
//}
}
private static class SingletonContainer {
@ -437,9 +477,9 @@ public class RNSNetwork {
return SingletonContainer.INSTANCE;
}
//public List<RNSPeer> getImmutableLinkedPeers() {
// return this.immutableLinkedPeers;
//}
public List<RNSPeer> getImmutableLinkedPeers() {
return this.immutableLinkedPeers;
}
public void addLinkedPeer(RNSPeer peer) {
this.linkedPeers.add(peer);
@ -461,7 +501,12 @@ public class RNSNetwork {
//}
}
public void removeIncommingPeer(RNSPeer peer) {
public void addIncomingPeer(RNSPeer peer) {
this.incomingPeers.add(peer);
this.immutableIncomingPeers = List.copyOf(this.incomingPeers);
}
public void removeIncomingPeer(RNSPeer peer) {
if (nonNull(peer.getPeerLink())) {
peer.getPeerLink().teardown();
}
@ -530,7 +575,8 @@ public class RNSNetwork {
}
}
//removeExpiredPeers(this.linkedPeers);
log.info("number of links (linkedPeers) after prunig: {}", peerList.size());
log.info("number of links (linkedPeers / incomingPeers) after prunig: {}, {}", peerList.size(),
getIncomingPeers().size());
//log.info("we have {} non-initiator links, list: {}", incomingLinks.size(), incomingLinks);
var activePeerCount = 0;
var lps = RNSNetwork.getInstance().getLinkedPeers();

View File

@ -34,9 +34,25 @@ import io.reticulum.buffer.Buffer;
import io.reticulum.buffer.BufferedRWPair;
import static io.reticulum.utils.IdentityUtils.concatArrays;
import org.qortal.controller.Controller;
import org.qortal.data.block.BlockSummaryData;
import org.qortal.data.block.CommonBlockData;
import org.qortal.network.message.Message;
import org.qortal.network.message.PingMessage;
import org.qortal.network.message.*;
import org.qortal.network.message.MessageException;
import org.qortal.network.task.RNSMessageTask;
import org.qortal.network.task.RNSPingTask;
import org.qortal.settings.Settings;
import org.qortal.utils.ExecuteProduceConsume.Task;
import org.qortal.utils.NTP;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.*;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.commons.codec.binary.Hex.encodeHexString;
import static org.apache.commons.lang3.ArrayUtils.subarray;
@ -47,6 +63,14 @@ import lombok.extern.slf4j.Slf4j;
import lombok.Setter;
import lombok.Data;
import lombok.AccessLevel;
//
//import org.qortal.network.message.Message;
//import org.qortal.network.message.MessageException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.lang.IllegalStateException;
@Data
@Slf4j
@ -69,10 +93,22 @@ public class RNSPeer {
private Boolean isInitiator;
private Boolean deleteMe = false;
private Boolean isVacant = true;
private Long lastPacketRtt = null;
private Double requestResponseProgress;
@Setter(AccessLevel.PACKAGE) private Boolean peerTimedOut = false;
// for qortal networking
private static final int RESPONSE_TIMEOUT = 3000; // [ms]
private static final int PING_INTERVAL = 34_000; // [ms]
private Long lastPing = null; // last ping roundtrip time [ms]
private Long lastPingSent = null; // time last ping was sent, or null if not started.
private Map<Integer, BlockingQueue<Message>> replyQueues;
//private LinkedBlockingQueue<Message> pendingMessages; // we might not need this
// Versioning
public static final Pattern VERSION_PATTERN = Pattern.compile(Controller.VERSION_PREFIX
+ "(\\d{1,3})\\.(\\d{1,5})\\.(\\d{1,5})");
/**
* Constructor for initiator peers
*/
@ -83,6 +119,7 @@ public class RNSPeer {
//setCreationTimestamp(System.currentTimeMillis());
this.creationTimestamp = Instant.now();
this.isVacant = true;
this.replyQueues = new ConcurrentHashMap<>();
}
/**
@ -129,9 +166,17 @@ public class RNSPeer {
var channel = this.peerLink.getChannel();
if (nonNull(this.peerBuffer)) {
log.info("peerBuffer exists: {}, link status: {}", this.peerBuffer, this.peerLink.getStatus());
this.peerBuffer.close();
this.peerBuffer = Buffer.createBidirectionalBuffer(receiveStreamId, sendStreamId, channel, this::peerBufferReady);
//return this.peerBuffer;
try {
this.peerBuffer.close();
this.peerBuffer = Buffer.createBidirectionalBuffer(receiveStreamId, sendStreamId, channel, this::peerBufferReady);
} catch (IllegalStateException e) {
// Exception thrown by Reticulum BufferedRWPair.close()
// This is a chance to correct links status when doing a RNSPingTask
log.warn("can't establish Channel/Buffer (remote peer down?), closing link: {}");
this.peerLink.teardown();
this.peerLink = null;
//log.error("(handled) IllegalStateException - can't establish Channel/Buffer: {}", e);
}
}
else {
log.info("creating buffer - peerLink status: {}, channel: {}", this.peerLink.getStatus(), channel);
@ -181,6 +226,9 @@ public class RNSPeer {
log.info("peerLink {} established (link: {}) with peer: hash - {}, link destination hash: {}",
peerLink, link, encodeHexString(destinationHash),
encodeHexString(link.getDestination().getHash()));
if (isInitiator) {
startPings();
}
}
public void linkClosed(Link link) {
@ -232,10 +280,43 @@ public class RNSPeer {
* :param readyBytes: The number of bytes ready to read
*/
public void peerBufferReady(Integer readyBytes) {
// get the message data
var data = this.peerBuffer.read(readyBytes);
var decodedData = new String(data);
log.info("Received data over the buffer: {}", decodedData);
try {
Message message = Message.fromByteBuffer(ByteBuffer.wrap(data));
log.info("type {} message received: {}", message.getType(), message);
// TODO: Now what with message?
switch (message.getType()) {
//case GET_PEERS:
// onGetPeersMessage(peer, message);
// break;
case PING:
//onPingMessage(this, message);
PongMessage pongMessage = new PongMessage();
pongMessage.setId(message.getId());
//var peerBuffer = getOrInitPeerBuffer();
this.peerBuffer.write(message.toBytes());
this.peerBuffer.flush();
break;
case PONG:
//case PEERS_V2:
// onPeersV2Message(peer, message);
// break;
//
//default:
// // Bump up to controller for possible action
// Controller.getInstance().onNetworkMessage(peer, message);
// break;
}
} catch (MessageException e) {
log.error("{} from peer {}", e.getMessage(), this);
}
//var decodedData = new String(data);
//log.info("Received data over the buffer: {}", decodedData);
//if (isFalse(this.isInitiator)) {
// // TODO: process data and reply
@ -271,6 +352,7 @@ public class RNSPeer {
var rttString = new String("");
if (receipt.getStatus() == PacketReceiptStatus.DELIVERED) {
var rtt = receipt.getRtt(); // rtt (Java) is in milliseconds
this.lastPacketRtt = rtt;
if (rtt >= 1000) {
rtt = Math.round(rtt / 1000);
rttString = String.format("%d seconds", rtt);
@ -287,6 +369,7 @@ public class RNSPeer {
//log.info("packet delivered callback, receipt: {}", receipt);
if (receipt.getStatus() == PacketReceiptStatus.DELIVERED) {
var rtt = receipt.getRtt(); // rtt (Java) is in milliseconds
this.lastPacketRtt = rtt;
//log.info("qqp - packetDelivered - rtt: {}", rtt);
if (rtt >= 1000) {
rtt = Math.round((float) rtt / 1000);
@ -334,6 +417,19 @@ public class RNSPeer {
log.debug("Resource transfer complete");
}
///**
// * Send a message using the peer buffer
// */
//public Message getResponse(Message message) throws InterruptedException {
// var peerBuffer = getOrInitPeerBuffer();
//
// //// send message
// //peerBuffer.write(...);
// //peerBuffer.flush();
//
// // receive - peerBufferReady callback result
//}
/** Utility methods */
public void pingRemote() {
var link = this.peerLink;
@ -364,30 +460,121 @@ public class RNSPeer {
// packetReceipt.setDeliveryCallback(this::shutdownPacketDelivered);
//}
///** check if a link is available (ACTIVE)
// * link: a certain peer link, or null (default link == link to Qortal node RNS baseDestination)
// */
//public Boolean peerLinkIsAlive(Link link) {
// var result = false;
// if (isNull(link)) {
// // default link
// var defaultLink = getLink();
// if (nonNull(defaultLink) && defaultLink.getStatus() == ACTIVE) {
// result = true;
// log.info("Default link is available");
// } else {
// log.info("Default link {} is not available, status: {}", defaultLink, defaultLink.getStatus());
// }
// } else {
// // other link (future where we have multiple destinations...)
// if (link.getStatus() == ACTIVE) {
// result = true;
// log.info("Link {} is available (status: {})", link, link.getStatus());
// } else {
// log.info("Link {} is not available, status: {}", link, link.getStatus());
// }
// }
// return result;
/** qortal networking specific (Tasks) */
//private void onPingMessage(RNSPeer peer, Message message) {
// PingMessage pingMessage = (PingMessage) message;
//
// // Generate 'pong' using same ID
// PingMessage pongMessage = new PingMessage();
// pongMessage.setId(pingMessage.getId());
//
// sendMessageWithTimeout(pongMessage, RESPONSE_TIMEOUT);
//}
/**
* Send message to peer and await response, using default RESPONSE_TIMEOUT.
* <p>
* Message is assigned a random ID and sent.
* Responses are handled by registered callbacks.
* <p>
* Note: The method is called "get..." to match the original method name
*
* @param message message to send
* @return <code>Message</code> if valid response received; <code>null</code> if not or error/exception occurs
* @throws InterruptedException if interrupted while waiting
*/
public void getResponse(Message message) throws InterruptedException {
getResponseWithTimeout(message, RESPONSE_TIMEOUT);
}
/**
* Send message to peer and await response.
* <p>
* Message is assigned a random ID and sent.
* If a response with matching ID is received then it is returned to caller.
* <p>
* If no response with matching ID within timeout, or some other error/exception occurs,
* then return <code>null</code>.<br>
* (Assume peer will be rapidly disconnected after this).
*
* @param message message to send
* @return <code>Message</code> if valid response received; <code>null</code> if not or error/exception occurs
* @throws InterruptedException if interrupted while waiting
*/
public void getResponseWithTimeout(Message message, int timeout) throws InterruptedException {
BlockingQueue<Message> blockingQueue = new ArrayBlockingQueue<>(1);
// TODO: implement equivalent of Peer class...
// Assign random ID to this message
Random random = new Random();
int id;
do {
id = random.nextInt(Integer.MAX_VALUE - 1) + 1;
// Put queue into map (keyed by message ID) so we can poll for a response
// If putIfAbsent() doesn't return null, then this ID is already taken
} while (this.replyQueues.putIfAbsent(id, blockingQueue) != null);
message.setId(id);
// Try to send message
if (!this.sendMessageWithTimeout(message, timeout)) {
this.replyQueues.remove(id);
return;
}
try {
blockingQueue.poll(timeout, TimeUnit.MILLISECONDS);
} finally {
this.replyQueues.remove(id);
}
}
/**
* Attempt to send Message to peer using the buffer and a custom timeout.
*
* @param message message to be sent
* @return <code>true</code> if message successfully sent; <code>false</code> otherwise
*/
public boolean sendMessageWithTimeout(Message message, int timeout) {
try {
log.trace("Sending {} message with ID {} to peer {}", message.getType().name(), message.getId(), this);
var peerBuffer = getOrInitPeerBuffer();
peerBuffer.write(message.toBytes());
peerBuffer.flush();
return true;
//} catch (InterruptedException e) {
// // Send failure
// return false;
} catch (IllegalStateException e) {
//log.warn("Can't write to buffer (remote buffer down?)");
log.error("IllegalStateException - can't write to buffer: e", e);
return false;
} catch (MessageException e) {
log.error(e.getMessage(), e);
return false;
}
}
protected void startPings() {
log.trace("[{}] Enabling pings for peer {}",
peerLink.getDestination().getHexHash(), this);
this.lastPingSent = NTP.getTime();
}
protected Task getPingTask(Long now) {
// Pings not enabled yet?
if (now == null || this.lastPingSent == null) {
return null;
}
// Time to send another ping?
if (now < this.lastPingSent + PING_INTERVAL) {
return null; // Not yet
}
// Not strictly true, but prevents this peer from being immediately chosen again
this.lastPingSent = now;
return new RNSPingTask(this, now);
}
}

View File

@ -0,0 +1,53 @@
package org.qortal.network.task;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.qortal.network.RNSPeer;
import org.qortal.network.message.Message;
import org.qortal.network.message.MessageType;
import org.qortal.network.message.PingMessage;
//import org.qortal.network.message.RNSPingMessage;
import org.qortal.utils.ExecuteProduceConsume.Task;
import org.qortal.utils.NTP;
public class RNSPingTask implements Task {
private static final Logger LOGGER = LogManager.getLogger(PingTask.class);
private final RNSPeer peer;
private final Long now;
private final String name;
public RNSPingTask(RNSPeer peer, Long now) {
this.peer = peer;
this.now = now;
this.name = "PingTask::" + peer;
}
@Override
public String getName() {
return name;
}
@Override
public void perform() throws InterruptedException {
//RNSPingMessage pingMessage = new RNSPingMessage();
PingMessage pingMessage = new PingMessage();
//var peerBuffer = peer.getOrInitPeerBuffer();
//peerBuffer.write(...)
//peerBuffer.flush()
peer.getResponse(pingMessage);
//Message message = peer.getResponse(pingMessage);
//
//if (message == null || message.getType() != MessageType.PING) {
// LOGGER.debug("[{}] Didn't receive reply from {} for PING ID {}",
// peer.getPeerConnectionId(), peer, pingMessage.getId());
// peer.disconnect("no ping received");
// return;
//}
//// tast is not over here.
//peer.setLastPing(NTP.getTime() - now);
}
}