improve prune and more

This commit is contained in:
Jürg Schulthess 2025-04-13 10:57:26 +02:00
parent 096daa691a
commit eb244bb45b
2 changed files with 105 additions and 45 deletions

View File

@ -70,6 +70,7 @@ import java.util.concurrent.atomic.AtomicLong;
//import java.util.concurrent.locks.ReentrantLock;
import java.util.Objects;
import java.util.function.Function;
import java.time.Instant;
import org.apache.commons.codec.binary.Hex;
import org.qortal.utils.ExecuteProduceConsume;
@ -129,11 +130,16 @@ public class RNSNetwork {
// just in case the classic TCP/IP Networking is turned off.
private static final byte[] MAINNET_MESSAGE_MAGIC = new byte[]{0x51, 0x4f, 0x52, 0x54}; // QORT
private static final byte[] TESTNET_MESSAGE_MAGIC = new byte[]{0x71, 0x6f, 0x72, 0x54}; // qorT
private static final int BROADCAST_CHAIN_TIP_DEPTH = 7; // Just enough to fill a SINGLE TCP packet (~1440 bytes)
private static final int BROADCAST_CHAIN_TIP_DEPTH = 7; // (~1440 bytes)
/**
* How long between informational broadcasts to all ACTIVE peers, in milliseconds.
*/
private static final long BROADCAST_INTERVAL = 30 * 1000L; // ms
/**
* Link low-level ping interval and timeout
*/
private static final long LINK_PING_INTERVAL = 34 * 1000L; // ms
private static final long LINK_UNREACHABLE_TIMEOUT = 2 * LINK_PING_INTERVAL;
//private static final Logger logger = LoggerFactory.getLogger(RNSNetwork.class);
@ -279,7 +285,7 @@ public class RNSNetwork {
}
public void shutdown() {
isShuttingDown = true;
this.isShuttingDown = true;
log.info("shutting down Reticulum");
// gracefully close links of peers that point to us
@ -430,6 +436,9 @@ public class RNSNetwork {
log.info("added new RNSPeer, destinationHash: {}", Hex.encodeHexString(destinationHash));
}
}
// Chance to announce instead of waiting for next pruning.
// Note: good in theory but leads to ping-pong of announces => not a good idea!
//maybeAnnounce(getBaseDestination());
}
}
@ -440,6 +449,7 @@ public class RNSNetwork {
private final AtomicLong nextConnectTaskTimestamp = new AtomicLong(0L); // ms - try first connect once NTP syncs
private final AtomicLong nextBroadcastTimestamp = new AtomicLong(0L); // ms - try first broadcast once NTP syncs
private final AtomicLong nextPingTimestamp = new AtomicLong(0L); // ms - try first low-level Ping
private Iterator<SelectionKey> channelIterator = null;
@ -457,8 +467,8 @@ public class RNSNetwork {
protected Task produceTask(boolean canBlock) throws InterruptedException {
Task task;
//// TODO: enable this once we figure out how to add pending messages in RNSPeer
/// (RNSPeer: pendingMessages.offer(message))
//// TODO: Needed? Figure out how to add pending messages in RNSPeer
//// (RNSPeer: pendingMessages.offer(message))
//task = maybeProducePeerMessageTask();
//if (task != null) {
// return task;
@ -466,6 +476,7 @@ public class RNSNetwork {
final Long now = NTP.getTime();
// ping task (Link+Channel+Buffer)
task = maybeProducePeerPingTask(now);
if (task != null) {
return task;
@ -492,9 +503,12 @@ public class RNSNetwork {
//// .findFirst()
//// .orElse(null);
////}
//// Note: we might not need this. All messages handled asynchronously in Reticulum
//// (RNSPeer peerBufferReady callback)
//private Task maybeProducePeerMessageTask() {
// return getImmutableIncomingPeers().stream()
// return getImmutableLinkedPeers().stream()
// .map(RNSPeer::getMessageTask)
// .filter(Objects::nonNull)
// .findFirst()
// .orElse(null);
//}
@ -613,13 +627,15 @@ public class RNSNetwork {
log.info("number of links (linkedPeers) before pruning: {}", peerList.size());
Link pLink;
LinkStatus lStatus;
//final Long now = NTP.getTime();
Instant now = Instant.now();
for (RNSPeer p: peerList) {
pLink = p.getPeerLink();
log.info("prunePeers - pLink: {}, destinationHash: {}",
pLink, Hex.encodeHexString(p.getDestinationHash()));
log.debug("peer: {}", p);
if (nonNull(pLink)) {
if (p.getPeerTimedOut()) {
if ((p.getPeerTimedOut()) || (p.getLastPingResponseReceived() > LINK_UNREACHABLE_TIMEOUT)) {
// close peer link for now
pLink.teardown();
}
@ -669,12 +685,16 @@ public class RNSNetwork {
var ips = getImmutableLinkedPeers();
for (RNSPeer p: ips) {
pLink = p.getPeerLink();
p.pingRemote();
try {
TimeUnit.SECONDS.sleep(2); // allow for peers to disconnect gracefully
} catch (InterruptedException e) {
log.error("exception: ", e);
if (now.minusMillis(LINK_UNREACHABLE_TIMEOUT).isAfter(p.getLastAccessTimestamp())) {
// Link was not accessed for too long
pLink.teardown();
}
//p.pingRemote();
//try {
// TimeUnit.SECONDS.sleep(2); // allow for peers to disconnect gracefully
//} catch (InterruptedException e) {
// log.error("exception: ", e);
//}
if ((nonNull(pLink) && (pLink.getStatus() == ACTIVE))) {
activePeerCount = activePeerCount + 1;
}

View File

@ -99,7 +99,7 @@ public class RNSPeer {
private Boolean deleteMe = false;
private Boolean isVacant = true;
private Long lastPacketRtt = null;
private byte[] emptyBuffer = {0,0,0,0,0,0,0,0};
private byte[] emptyBuffer = {0,0,0,0,0};
private Double requestResponseProgress;
@Setter(AccessLevel.PACKAGE) private Boolean peerTimedOut = false;
@ -107,9 +107,11 @@ public class RNSPeer {
// for qortal networking
private static final int RESPONSE_TIMEOUT = 3000; // [ms]
private static final int PING_INTERVAL = 34_000; // [ms]
private byte[] messageMagic; // set in creating classes
private Long lastPing = null; // last ping roundtrip time [ms]
private Long lastPingSent = null; // time last ping was sent, or null if not started.
private static final long LINK_PING_INTERVAL = 34 * 1000L; // ms
private byte[] messageMagic; // set in message creating classes
private Long lastPing = null; // last (packet) ping roundtrip time [ms]
private Long lastPingSent = null; // time last (packet) ping was sent, or null if not started.
@Setter(AccessLevel.PACKAGE) private Long lastPingResponseReceived = null; // time last (packet) ping succeeded
private Map<Integer, BlockingQueue<Message>> replyQueues;
private LinkedBlockingQueue<Message> pendingMessages;
// Versioning
@ -186,27 +188,28 @@ public class RNSPeer {
@Override
public String toString() {
// for messages we want an address-like string representation
//return encodeHexString(this.getDestinationHash());
return this.getPeerLink().toString();
if (nonNull(this.peerLink)) {
return this.getPeerLink().toString();
} else {
return encodeHexString(this.getDestinationHash());
}
}
public BufferedRWPair getOrInitPeerBuffer() {
var channel = this.peerLink.getChannel();
if (nonNull(this.peerBuffer)) {
log.trace("peerBuffer exists: {}, link status: {}", this.peerBuffer, this.peerLink.getStatus());
log.info("peerBuffer exists: {}, link status: {}", this.peerBuffer, this.peerLink.getStatus());
//return this.peerBuffer;
//try {
// this.peerBuffer.close();
// this.peerBuffer = Buffer.createBidirectionalBuffer(receiveStreamId, sendStreamId, channel, this::peerBufferReady);
//} catch (IllegalStateException e) {
// // Exception thrown by Reticulum BufferedRWPair.close()
// // This is a chance to correct links status when doing a RNSPingTask
// log.warn("can't establish Channel/Buffer (remote peer down?), closing link: {}");
// this.peerLink.teardown();
// this.peerLink = null;
// //log.error("(handled) IllegalStateException - can't establish Channel/Buffer: {}", e);
//}
try {
log.trace("peerBuffer exists: {}, link status: {}", this.peerBuffer, this.peerLink.getStatus());
} catch (IllegalStateException e) {
// Exception thrown by Reticulum if the buffer is unusable (Channel, Link, etc)
// This is a chance to correct links status when doing a RNSPingTask
log.warn("can't establish Channel/Buffer (remote peer down?), closing link: {}");
this.peerBuffer.close();
this.peerLink.teardown();
this.peerLink = null;
//log.error("(handled) IllegalStateException - can't establish Channel/Buffer: {}", e);
}
}
else {
log.info("creating buffer - peerLink status: {}, channel: {}", this.peerLink.getStatus(), channel);
@ -219,7 +222,7 @@ public class RNSPeer {
public Link getOrInitPeerLink() {
if (this.peerLink.getStatus() == ACTIVE) {
lastAccessTimestamp = Instant.now();
return this.peerLink;
//return this.peerLink;
} else {
initPeerLink();
}
@ -318,18 +321,20 @@ public class RNSPeer {
// get the message data
byte[] data = this.peerBuffer.read(readyBytes);
ByteBuffer bb = ByteBuffer.wrap(data);
log.info("data length: {}, data: {}, ByteBuffer: {}", data.length, data, bb);
log.info("data length: {}, MAGIC: {}, data: {}, ByteBuffer: {}", data.length, this.messageMagic, data, bb);
//log.info("data length: {}, ByteBuffer: {}", data.length, bb);
//var pureData = Arrays.copyOfRange(data, this.messageMagic.length - 1, data.length);
log.trace("peerBufferReady - data bytes: {}", data.length);
this.lastAccessTimestamp = Instant.now();
if (ByteBuffer.wrap(data, 0, emptyBuffer.length).equals(ByteBuffer.wrap(emptyBuffer, 0, emptyBuffer.length))) {
log.info("peerBufferReady - empty buffer detected (length: {})", data.length);
//this.peerBuffer.flush();
} else {
try {
log.info("***> creating message from {} bytes", data.length);
//log.info("***> creating message from {} bytes", data.length);
Message message = Message.fromByteBuffer(bb);
log.info("type {} message received ({} bytes): {}", message.getType(), data.length, message);
log.info("*=> type {} message received ({} bytes): {}", message.getType(), data.length, message);
// Handle message based on type
switch (message.getType()) {
// Do we need this ? (seems like a TCP scenario only thing)
@ -348,31 +353,36 @@ public class RNSPeer {
case PONG:
log.info("PONG received");
//this.peerBuffer.flush();
break;
// Do we need this ? (We don't have RNSPeer versions)
// Do we need this ? (no need to relay peer list...)
//case PEERS_V2:
// onPeersV2Message(peer, message);
// this.peerBuffer.flush();
// break;
default:
//if (isFalse(this.isInitiator)) {
// Bump up to controller for possible action
//Controller.getInstance().onNetworkMessage(peer, message);
Controller.getInstance().onRNSNetworkMessage(this, message);
this.peerBuffer.flush();
//}
log.info("default - type {} message received ({} bytes)", message.getType(), data.length);
// Bump up to controller for possible action
//Controller.getInstance().onNetworkMessage(peer, message);
Controller.getInstance().onRNSNetworkMessage(this, message);
//this.peerBuffer.flush();
break;
}
} catch (MessageException e) {
//log.error("{} from peer {}", e.getMessage(), this);
log.error("{} from peer {}", e, this);
log.info("{} from peer {}", e, this);
}
//this.peerBuffer.flush(); // clear buffer
}
}
//public void handleMessage(Message message) {
//
//}
/**
* Set a packet to remote with the message format "close::<our_destination_hash>"
* This method is only useful for non-initiator links to close the remote initiator.
@ -427,6 +437,7 @@ public class RNSPeer {
}
log.info("Valid reply received from {}, round-trip time is {}",
encodeHexString(receipt.getDestination().getHash()), rttString);
this.lastAccessTimestamp = Instant.now();
}
}
@ -482,9 +493,10 @@ public class RNSPeer {
public void pingRemote() {
var link = this.peerLink;
//if (nonNull(link) & (isFalse(link.isInitiator()))) {
if (nonNull(link) & link.isInitiator()) {
//if (nonNull(link) & link.isInitiator()) {
if (nonNull(link)) {
if (peerLink.getStatus() == ACTIVE) {
log.info("pinging remote: {}", link);
log.info("pinging remote (direct, 1 packet): {}", link);
var data = "ping".getBytes(UTF_8);
link.setPacketCallback(this::linkPacketReceived);
Packet pingPacket = new Packet(link, data);
@ -519,6 +531,7 @@ public class RNSPeer {
pongMessage.setId(message.getId()); // use the ping message id
this.peerBuffer.write(pongMessage.toBytes());
this.peerBuffer.flush();
this.lastAccessTimestamp = Instant.now();
} catch (MessageException e) {
//log.error("{} from peer {}", e.getMessage(), this);
log.error("{} from peer {}", e, this);
@ -637,12 +650,14 @@ public class RNSPeer {
* @param message message to be sent
* @return <code>true</code> if message successfully sent; <code>false</code> otherwise
*/
//@Synchronized
public boolean sendMessage(Message message) {
try {
log.trace("Sending {} message with ID {} to peer {}", message.getType().name(), message.getId(), this);
log.info("Sending {} message with ID {} to peer {}", message.getType().name(), message.getId(), this);
var peerBuffer = getOrInitPeerBuffer();
this.peerBuffer.write(message.toBytes());
this.peerBuffer.flush();
peerBuffer.write(message.toBytes());
peerBuffer.flush();
return true;
} catch (IllegalStateException e) {
this.peerLink.teardown();
@ -687,6 +702,31 @@ public class RNSPeer {
return new RNSPingTask(this, now);
}
// low-level Link (packet) ping
protected Link getPingLinks(Long now) {
if (now == null || this.lastPingSent == null) {
return null;
}
// ping only possible over ACTIVE link
if (nonNull(this.peerLink)) {
if (this.peerLink.getStatus() != ACTIVE) {
return null;
}
} else {
return null;
}
if (now < this.lastPingSent + LINK_PING_INTERVAL) {
return null;
}
this.lastPingSent = now;
return this.peerLink;
}
// Peer methods reticulum implementations
public BlockSummaryData getChainTipData() {
List<BlockSummaryData> chainTipSummaries = this.peersChainTipData;