fix RNS ping task - working

This commit is contained in:
Jürg Schulthess 2025-01-11 22:16:52 +01:00
parent 5b519990cd
commit 1b4fffe0d2
3 changed files with 38 additions and 19 deletions

View File

@ -300,6 +300,7 @@ public class RNSNetwork {
log.info("clientConnected - link hash: {}, {}", link.getHash(), Hex.encodeHexString(link.getHash())); log.info("clientConnected - link hash: {}, {}", link.getHash(), Hex.encodeHexString(link.getHash()));
RNSPeer newPeer = new RNSPeer(link); RNSPeer newPeer = new RNSPeer(link);
newPeer.setPeerLinkHash(link.getHash()); newPeer.setPeerLinkHash(link.getHash());
newPeer.setMessageMagic(getMessageMagic());
// make sure the peer has a channel and buffer // make sure the peer has a channel and buffer
newPeer.getOrInitPeerBuffer(); newPeer.getOrInitPeerBuffer();
incomingPeers.add(newPeer); incomingPeers.add(newPeer);
@ -370,6 +371,7 @@ public class RNSNetwork {
RNSPeer newPeer = new RNSPeer(destinationHash); RNSPeer newPeer = new RNSPeer(destinationHash);
newPeer.setServerIdentity(announcedIdentity); newPeer.setServerIdentity(announcedIdentity);
newPeer.setIsInitiator(true); newPeer.setIsInitiator(true);
newPeer.setMessageMagic(getMessageMagic());
addLinkedPeer(newPeer); addLinkedPeer(newPeer);
log.info("added new RNSPeer, destinationHash: {}", Hex.encodeHexString(destinationHash)); log.info("added new RNSPeer, destinationHash: {}", Hex.encodeHexString(destinationHash));
} }

View File

@ -52,6 +52,7 @@ import java.nio.charset.StandardCharsets;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.Arrays;
import static java.nio.charset.StandardCharsets.UTF_8; import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.commons.codec.binary.Hex.encodeHexString; import static org.apache.commons.codec.binary.Hex.encodeHexString;
@ -101,6 +102,7 @@ public class RNSPeer {
// for qortal networking // for qortal networking
private static final int RESPONSE_TIMEOUT = 3000; // [ms] private static final int RESPONSE_TIMEOUT = 3000; // [ms]
private static final int PING_INTERVAL = 34_000; // [ms] private static final int PING_INTERVAL = 34_000; // [ms]
private byte[] messageMagic; // set in creating classes
private Long lastPing = null; // last ping roundtrip time [ms] private Long lastPing = null; // last ping roundtrip time [ms]
private Long lastPingSent = null; // time last ping was sent, or null if not started. private Long lastPingSent = null; // time last ping was sent, or null if not started.
private Map<Integer, BlockingQueue<Message>> replyQueues; private Map<Integer, BlockingQueue<Message>> replyQueues;
@ -166,17 +168,18 @@ public class RNSPeer {
var channel = this.peerLink.getChannel(); var channel = this.peerLink.getChannel();
if (nonNull(this.peerBuffer)) { if (nonNull(this.peerBuffer)) {
log.info("peerBuffer exists: {}, link status: {}", this.peerBuffer, this.peerLink.getStatus()); log.info("peerBuffer exists: {}, link status: {}", this.peerBuffer, this.peerLink.getStatus());
try { return this.peerBuffer;
this.peerBuffer.close(); //try {
this.peerBuffer = Buffer.createBidirectionalBuffer(receiveStreamId, sendStreamId, channel, this::peerBufferReady); // this.peerBuffer.close();
} catch (IllegalStateException e) { // this.peerBuffer = Buffer.createBidirectionalBuffer(receiveStreamId, sendStreamId, channel, this::peerBufferReady);
// Exception thrown by Reticulum BufferedRWPair.close() //} catch (IllegalStateException e) {
// This is a chance to correct links status when doing a RNSPingTask // // Exception thrown by Reticulum BufferedRWPair.close()
log.warn("can't establish Channel/Buffer (remote peer down?), closing link: {}"); // // This is a chance to correct links status when doing a RNSPingTask
this.peerLink.teardown(); // log.warn("can't establish Channel/Buffer (remote peer down?), closing link: {}");
this.peerLink = null; // this.peerLink.teardown();
//log.error("(handled) IllegalStateException - can't establish Channel/Buffer: {}", e); // this.peerLink = null;
} // //log.error("(handled) IllegalStateException - can't establish Channel/Buffer: {}", e);
//}
} }
else { else {
log.info("creating buffer - peerLink status: {}, channel: {}", this.peerLink.getStatus(), channel); log.info("creating buffer - peerLink status: {}, channel: {}", this.peerLink.getStatus(), channel);
@ -282,6 +285,8 @@ public class RNSPeer {
public void peerBufferReady(Integer readyBytes) { public void peerBufferReady(Integer readyBytes) {
// get the message data // get the message data
var data = this.peerBuffer.read(readyBytes); var data = this.peerBuffer.read(readyBytes);
//var pureData = Arrays.copyOfRange(data, this.messageMagic.length - 1, data.length);
log.trace("peerBufferReady - data bytes: {}", data.length);
try { try {
Message message = Message.fromByteBuffer(ByteBuffer.wrap(data)); Message message = Message.fromByteBuffer(ByteBuffer.wrap(data));
@ -293,15 +298,17 @@ public class RNSPeer {
// break; // break;
case PING: case PING:
//log.info("sending PING response");
//onPingMessage(this, message); //onPingMessage(this, message);
PongMessage pongMessage = new PongMessage(); PongMessage pongMessage = new PongMessage();
pongMessage.setId(message.getId()); pongMessage.setId(message.getId());
//var peerBuffer = getOrInitPeerBuffer(); this.peerBuffer.write(pongMessage.toBytes());
this.peerBuffer.write(message.toBytes());
this.peerBuffer.flush(); this.peerBuffer.flush();
break; break;
case PONG: case PONG:
//log.info("PONG received");
//break;
//case PEERS_V2: //case PEERS_V2:
// onPeersV2Message(peer, message); // onPeersV2Message(peer, message);
@ -313,7 +320,8 @@ public class RNSPeer {
// break; // break;
} }
} catch (MessageException e) { } catch (MessageException e) {
log.error("{} from peer {}", e.getMessage(), this); //log.error("{} from peer {}", e.getMessage(), this);
log.error("{} from peer {}", e, this);
} }
//var decodedData = new String(data); //var decodedData = new String(data);
//log.info("Received data over the buffer: {}", decodedData); //log.info("Received data over the buffer: {}", decodedData);
@ -539,8 +547,8 @@ public class RNSPeer {
try { try {
log.trace("Sending {} message with ID {} to peer {}", message.getType().name(), message.getId(), this); log.trace("Sending {} message with ID {} to peer {}", message.getType().name(), message.getId(), this);
var peerBuffer = getOrInitPeerBuffer(); var peerBuffer = getOrInitPeerBuffer();
peerBuffer.write(message.toBytes()); this.peerBuffer.write(message.toBytes());
peerBuffer.flush(); this.peerBuffer.flush();
return true; return true;
//} catch (InterruptedException e) { //} catch (InterruptedException e) {
// // Send failure // // Send failure

View File

@ -7,6 +7,7 @@ import org.qortal.network.message.Message;
import org.qortal.network.message.MessageType; import org.qortal.network.message.MessageType;
import org.qortal.network.message.PingMessage; import org.qortal.network.message.PingMessage;
//import org.qortal.network.message.RNSPingMessage; //import org.qortal.network.message.RNSPingMessage;
import org.qortal.network.message.MessageException;
import org.qortal.utils.ExecuteProduceConsume.Task; import org.qortal.utils.ExecuteProduceConsume.Task;
import org.qortal.utils.NTP; import org.qortal.utils.NTP;
@ -33,9 +34,17 @@ public class RNSPingTask implements Task {
//RNSPingMessage pingMessage = new RNSPingMessage(); //RNSPingMessage pingMessage = new RNSPingMessage();
PingMessage pingMessage = new PingMessage(); PingMessage pingMessage = new PingMessage();
//var peerBuffer = peer.getOrInitPeerBuffer(); //try {
//peerBuffer.write(...) // var peerBuffer = this.peer.getOrInitPeerBuffer();
//peerBuffer.flush() // LOGGER.info("message toBytes: {}", pingMessage.toBytes());
// peerBuffer.write(pingMessage.toBytes());
// peerBuffer.flush();
//} catch (IllegalStateException e) {
// //log.warn("Can't write to buffer (remote buffer down?)");
// LOGGER.error("IllegalStateException - can't write to buffer: e", e);
//} catch (MessageException e) {
// LOGGER.error(e.getMessage(), e);
//}
peer.getResponse(pingMessage); peer.getResponse(pingMessage);
//Message message = peer.getResponse(pingMessage); //Message message = peer.getResponse(pingMessage);