diff --git a/src/main/java/org/qortal/network/RNSNetwork.java b/src/main/java/org/qortal/network/RNSNetwork.java index 7fe825e5..b941ab98 100644 --- a/src/main/java/org/qortal/network/RNSNetwork.java +++ b/src/main/java/org/qortal/network/RNSNetwork.java @@ -300,6 +300,7 @@ public class RNSNetwork { log.info("clientConnected - link hash: {}, {}", link.getHash(), Hex.encodeHexString(link.getHash())); RNSPeer newPeer = new RNSPeer(link); newPeer.setPeerLinkHash(link.getHash()); + newPeer.setMessageMagic(getMessageMagic()); // make sure the peer has a channel and buffer newPeer.getOrInitPeerBuffer(); incomingPeers.add(newPeer); @@ -370,6 +371,7 @@ public class RNSNetwork { RNSPeer newPeer = new RNSPeer(destinationHash); newPeer.setServerIdentity(announcedIdentity); newPeer.setIsInitiator(true); + newPeer.setMessageMagic(getMessageMagic()); addLinkedPeer(newPeer); log.info("added new RNSPeer, destinationHash: {}", Hex.encodeHexString(destinationHash)); } diff --git a/src/main/java/org/qortal/network/RNSPeer.java b/src/main/java/org/qortal/network/RNSPeer.java index ce6c5877..2ece7364 100644 --- a/src/main/java/org/qortal/network/RNSPeer.java +++ b/src/main/java/org/qortal/network/RNSPeer.java @@ -52,6 +52,7 @@ import java.nio.charset.StandardCharsets; import java.util.Map; import java.util.Random; import java.util.concurrent.*; +import java.util.Arrays; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.commons.codec.binary.Hex.encodeHexString; @@ -101,6 +102,7 @@ public class RNSPeer { // for qortal networking private static final int RESPONSE_TIMEOUT = 3000; // [ms] private static final int PING_INTERVAL = 34_000; // [ms] + private byte[] messageMagic; // set in creating classes private Long lastPing = null; // last ping roundtrip time [ms] private Long lastPingSent = null; // time last ping was sent, or null if not started. private Map> replyQueues; @@ -166,17 +168,18 @@ public class RNSPeer { var channel = this.peerLink.getChannel(); if (nonNull(this.peerBuffer)) { log.info("peerBuffer exists: {}, link status: {}", this.peerBuffer, this.peerLink.getStatus()); - try { - this.peerBuffer.close(); - this.peerBuffer = Buffer.createBidirectionalBuffer(receiveStreamId, sendStreamId, channel, this::peerBufferReady); - } catch (IllegalStateException e) { - // Exception thrown by Reticulum BufferedRWPair.close() - // This is a chance to correct links status when doing a RNSPingTask - log.warn("can't establish Channel/Buffer (remote peer down?), closing link: {}"); - this.peerLink.teardown(); - this.peerLink = null; - //log.error("(handled) IllegalStateException - can't establish Channel/Buffer: {}", e); - } + return this.peerBuffer; + //try { + // this.peerBuffer.close(); + // this.peerBuffer = Buffer.createBidirectionalBuffer(receiveStreamId, sendStreamId, channel, this::peerBufferReady); + //} catch (IllegalStateException e) { + // // Exception thrown by Reticulum BufferedRWPair.close() + // // This is a chance to correct links status when doing a RNSPingTask + // log.warn("can't establish Channel/Buffer (remote peer down?), closing link: {}"); + // this.peerLink.teardown(); + // this.peerLink = null; + // //log.error("(handled) IllegalStateException - can't establish Channel/Buffer: {}", e); + //} } else { log.info("creating buffer - peerLink status: {}, channel: {}", this.peerLink.getStatus(), channel); @@ -282,6 +285,8 @@ public class RNSPeer { public void peerBufferReady(Integer readyBytes) { // get the message data var data = this.peerBuffer.read(readyBytes); + //var pureData = Arrays.copyOfRange(data, this.messageMagic.length - 1, data.length); + log.trace("peerBufferReady - data bytes: {}", data.length); try { Message message = Message.fromByteBuffer(ByteBuffer.wrap(data)); @@ -293,15 +298,17 @@ public class RNSPeer { // break; case PING: + //log.info("sending PING response"); //onPingMessage(this, message); PongMessage pongMessage = new PongMessage(); pongMessage.setId(message.getId()); - //var peerBuffer = getOrInitPeerBuffer(); - this.peerBuffer.write(message.toBytes()); + this.peerBuffer.write(pongMessage.toBytes()); this.peerBuffer.flush(); break; case PONG: + //log.info("PONG received"); + //break; //case PEERS_V2: // onPeersV2Message(peer, message); @@ -313,7 +320,8 @@ public class RNSPeer { // break; } } catch (MessageException e) { - log.error("{} from peer {}", e.getMessage(), this); + //log.error("{} from peer {}", e.getMessage(), this); + log.error("{} from peer {}", e, this); } //var decodedData = new String(data); //log.info("Received data over the buffer: {}", decodedData); @@ -539,8 +547,8 @@ public class RNSPeer { try { log.trace("Sending {} message with ID {} to peer {}", message.getType().name(), message.getId(), this); var peerBuffer = getOrInitPeerBuffer(); - peerBuffer.write(message.toBytes()); - peerBuffer.flush(); + this.peerBuffer.write(message.toBytes()); + this.peerBuffer.flush(); return true; //} catch (InterruptedException e) { // // Send failure diff --git a/src/main/java/org/qortal/network/task/RNSPingTask.java b/src/main/java/org/qortal/network/task/RNSPingTask.java index 3c219ddd..b36d2e67 100644 --- a/src/main/java/org/qortal/network/task/RNSPingTask.java +++ b/src/main/java/org/qortal/network/task/RNSPingTask.java @@ -7,6 +7,7 @@ import org.qortal.network.message.Message; import org.qortal.network.message.MessageType; import org.qortal.network.message.PingMessage; //import org.qortal.network.message.RNSPingMessage; +import org.qortal.network.message.MessageException; import org.qortal.utils.ExecuteProduceConsume.Task; import org.qortal.utils.NTP; @@ -33,9 +34,17 @@ public class RNSPingTask implements Task { //RNSPingMessage pingMessage = new RNSPingMessage(); PingMessage pingMessage = new PingMessage(); - //var peerBuffer = peer.getOrInitPeerBuffer(); - //peerBuffer.write(...) - //peerBuffer.flush() + //try { + // var peerBuffer = this.peer.getOrInitPeerBuffer(); + // LOGGER.info("message toBytes: {}", pingMessage.toBytes()); + // peerBuffer.write(pingMessage.toBytes()); + // peerBuffer.flush(); + //} catch (IllegalStateException e) { + // //log.warn("Can't write to buffer (remote buffer down?)"); + // LOGGER.error("IllegalStateException - can't write to buffer: e", e); + //} catch (MessageException e) { + // LOGGER.error(e.getMessage(), e); + //} peer.getResponse(pingMessage); //Message message = peer.getResponse(pingMessage);