diff --git a/pom.xml b/pom.xml index 42fe7ea9..c60fe6ba 100644 --- a/pom.xml +++ b/pom.xml @@ -52,7 +52,7 @@ 3.5.2 3.25.3 1.5.3 - c0eeadf + bb5f807 1.17 2.0.10 5.18.2 diff --git a/src/main/java/org/qortal/controller/Controller.java b/src/main/java/org/qortal/controller/Controller.java index c69bb2eb..33a00383 100644 --- a/src/main/java/org/qortal/controller/Controller.java +++ b/src/main/java/org/qortal/controller/Controller.java @@ -543,8 +543,8 @@ public class Controller extends Thread { LOGGER.info("Starting synchronizer"); Synchronizer.getInstance().start(); - LOGGER.info("Starting synchronizer over Reticulum"); - RNSSynchronizer.getInstance().start(); + //LOGGER.info("Starting synchronizer over Reticulum"); + //RNSSynchronizer.getInstance().start(); LOGGER.info("Starting block minter"); blockMinter = new BlockMinter(); @@ -2378,22 +2378,22 @@ public class Controller extends Thread { // OnlineAccountsManager.getInstance().onNetworkOnlineAccountsV3Message(peer, message); // break; - //// TODO: Compiles but much of the Manager details need to be rethought for Reticulum - //case GET_ARBITRARY_DATA: - // // Not currently supported - // break; - //// - //case ARBITRARY_DATA_FILE_LIST: - // RNSArbitraryDataFileListManager.getInstance().onNetworkArbitraryDataFileListMessage(peer, message); - // break; - // - //case GET_ARBITRARY_DATA_FILE: - // RNSArbitraryDataFileManager.getInstance().onNetworkGetArbitraryDataFileMessage(peer, message); - // break; - // - //case GET_ARBITRARY_DATA_FILE_LIST: - // RNSArbitraryDataFileListManager.getInstance().onNetworkGetArbitraryDataFileListMessage(peer, message); - // break; + // TODO: Compiles but rethink for Reticulum + case GET_ARBITRARY_DATA: + // Not currently supported + break; + + case ARBITRARY_DATA_FILE_LIST: + RNSArbitraryDataFileListManager.getInstance().onNetworkArbitraryDataFileListMessage(peer, message); + break; + + case GET_ARBITRARY_DATA_FILE: + RNSArbitraryDataFileManager.getInstance().onNetworkGetArbitraryDataFileMessage(peer, message); + break; + + case GET_ARBITRARY_DATA_FILE_LIST: + RNSArbitraryDataFileListManager.getInstance().onNetworkGetArbitraryDataFileListMessage(peer, message); + break; // case ARBITRARY_SIGNATURES: // Not currently supported diff --git a/src/main/java/org/qortal/controller/arbitrary/RNSArbitraryDataFileManager.java b/src/main/java/org/qortal/controller/arbitrary/RNSArbitraryDataFileManager.java index fc68fdec..ad2b1f89 100644 --- a/src/main/java/org/qortal/controller/arbitrary/RNSArbitraryDataFileManager.java +++ b/src/main/java/org/qortal/controller/arbitrary/RNSArbitraryDataFileManager.java @@ -204,12 +204,12 @@ public class RNSArbitraryDataFileManager extends Thread { Message getArbitraryDataFileMessage = new GetArbitraryDataFileMessage(signature, hash); Message response = null; - //// TODO - revisit (doesn't work with Reticulum) - //try { - // response = peer.getResponseWithTimeout(getArbitraryDataFileMessage, (int) ArbitraryDataManager.ARBITRARY_REQUEST_TIMEOUT); - //} catch (InterruptedException e) { - // // Will return below due to null response - //} + // TODO - revisit with RNS + try { + response = peer.getResponseWithTimeout(getArbitraryDataFileMessage, (int) ArbitraryDataManager.ARBITRARY_REQUEST_TIMEOUT); + } catch (InterruptedException e) { + // Will return below due to null response + } arbitraryDataFileRequests.remove(hash58); LOGGER.trace(String.format("Removed hash %.8s from arbitraryDataFileRequests", hash58)); diff --git a/src/main/java/org/qortal/controller/arbitrary/RNSArbitraryDataFileRequestThread.java b/src/main/java/org/qortal/controller/arbitrary/RNSArbitraryDataFileRequestThread.java new file mode 100644 index 00000000..2a477395 --- /dev/null +++ b/src/main/java/org/qortal/controller/arbitrary/RNSArbitraryDataFileRequestThread.java @@ -0,0 +1,130 @@ +package org.qortal.controller.arbitrary; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.qortal.controller.Controller; +import org.qortal.data.arbitrary.RNSArbitraryFileListResponseInfo; +import org.qortal.data.transaction.ArbitraryTransactionData; +import org.qortal.event.DataMonitorEvent; +import org.qortal.event.EventBus; +import org.qortal.network.RNSPeer; +import org.qortal.repository.DataException; +import org.qortal.repository.Repository; +import org.qortal.repository.RepositoryManager; +import org.qortal.utils.ArbitraryTransactionUtils; +import org.qortal.utils.Base58; +import org.qortal.utils.NTP; + +import java.util.Arrays; +import java.util.Comparator; +import java.util.Iterator; + +import static java.lang.Thread.NORM_PRIORITY; + +public class RNSArbitraryDataFileRequestThread implements Runnable { + + private static final Logger LOGGER = LogManager.getLogger(ArbitraryDataFileRequestThread.class); + + public RNSArbitraryDataFileRequestThread() { + + } + + @Override + public void run() { + Thread.currentThread().setName("Arbitrary Data File Request Thread"); + Thread.currentThread().setPriority(NORM_PRIORITY); + + try { + while (!Controller.isStopping()) { + Long now = NTP.getTime(); + this.processFileHashes(now); + } + } catch (InterruptedException e) { + // Fall-through to exit thread... + } + } + + private void processFileHashes(Long now) throws InterruptedException { + if (Controller.isStopping()) { + return; + } + + RNSArbitraryDataFileManager arbitraryDataFileManager = RNSArbitraryDataFileManager.getInstance(); + String signature58 = null; + String hash58 = null; + RNSPeer peer = null; + boolean shouldProcess = false; + + synchronized (arbitraryDataFileManager.arbitraryDataFileHashResponses) { + if (!arbitraryDataFileManager.arbitraryDataFileHashResponses.isEmpty()) { + + // Sort by lowest number of node hops first + Comparator lowestHopsFirstComparator = + Comparator.comparingInt(RNSArbitraryFileListResponseInfo::getRequestHops); + arbitraryDataFileManager.arbitraryDataFileHashResponses.sort(lowestHopsFirstComparator); + + Iterator iterator = arbitraryDataFileManager.arbitraryDataFileHashResponses.iterator(); + while (iterator.hasNext()) { + if (Controller.isStopping()) { + return; + } + + RNSArbitraryFileListResponseInfo responseInfo = (RNSArbitraryFileListResponseInfo) iterator.next(); + if (responseInfo == null) { + iterator.remove(); + continue; + } + + hash58 = responseInfo.getHash58(); + peer = responseInfo.getPeer(); + signature58 = responseInfo.getSignature58(); + Long timestamp = responseInfo.getTimestamp(); + + if (now - timestamp >= ArbitraryDataManager.ARBITRARY_RELAY_TIMEOUT || signature58 == null || peer == null) { + // Ignore - to be deleted + iterator.remove(); + continue; + } + + // Skip if already requesting, but don't remove, as we might want to retry later + if (arbitraryDataFileManager.arbitraryDataFileRequests.containsKey(hash58)) { + // Already requesting - leave this attempt for later + continue; + } + + // We want to process this file + shouldProcess = true; + iterator.remove(); + break; + } + } + } + + if (!shouldProcess) { + // Nothing to do + Thread.sleep(1000L); + return; + } + + byte[] hash = Base58.decode(hash58); + byte[] signature = Base58.decode(signature58); + + // Fetch the transaction data + try (final Repository repository = RepositoryManager.getRepository()) { + ArbitraryTransactionData arbitraryTransactionData = ArbitraryTransactionUtils.fetchTransactionData(repository, signature); + if (arbitraryTransactionData == null) { + return; + } + + if (signature == null || hash == null || peer == null || arbitraryTransactionData == null) { + return; + } + + LOGGER.trace("Fetching file {} from peer {} via request thread...", hash58, peer); + arbitraryDataFileManager.fetchArbitraryDataFiles(repository, peer, signature, arbitraryTransactionData, Arrays.asList(hash)); + + } catch (DataException e) { + LOGGER.debug("Unable to process file hashes: {}", e.getMessage()); + } + } +} diff --git a/src/main/java/org/qortal/data/arbitrary/RNSArbitraryRelayInfo.java b/src/main/java/org/qortal/data/arbitrary/RNSArbitraryRelayInfo.java index cc8a0f90..c344f13f 100644 --- a/src/main/java/org/qortal/data/arbitrary/RNSArbitraryRelayInfo.java +++ b/src/main/java/org/qortal/data/arbitrary/RNSArbitraryRelayInfo.java @@ -61,7 +61,7 @@ public class RNSArbitraryRelayInfo { if (other == this) return true; - if (!(other instanceof ArbitraryRelayInfo)) + if (!(other instanceof RNSArbitraryRelayInfo)) return false; RNSArbitraryRelayInfo otherRelayInfo = (RNSArbitraryRelayInfo) other; diff --git a/src/main/java/org/qortal/network/RNSNetwork.java b/src/main/java/org/qortal/network/RNSNetwork.java index 611625a1..e1412545 100644 --- a/src/main/java/org/qortal/network/RNSNetwork.java +++ b/src/main/java/org/qortal/network/RNSNetwork.java @@ -72,7 +72,7 @@ import java.util.Objects; import java.util.function.Function; import java.time.Instant; -import org.apache.commons.codec.binary.Hex; +import static org.apache.commons.codec.binary.Hex.encodeHexString; import org.qortal.utils.ExecuteProduceConsume; import org.qortal.utils.ExecuteProduceConsume.StatsSnapshot; import org.qortal.utils.NTP; @@ -218,7 +218,7 @@ public class RNSNetwork { // APP_NAME, // "qdn" //); - log.info("Destination {} {} running", Hex.encodeHexString(baseDestination.getHash()), baseDestination.getName()); + log.info("Destination {} {} running", encodeHexString(baseDestination.getHash()), baseDestination.getName()); baseDestination.setProofStrategy(ProofStrategy.PROVE_ALL); baseDestination.setAcceptLinkRequests(true); @@ -228,7 +228,7 @@ public class RNSNetwork { log.debug("announceHandlers: {}", Transport.getInstance().getAnnounceHandlers()); // do a first announce baseDestination.announce(); - log.debug("Sent initial announce from {} ({})", Hex.encodeHexString(baseDestination.getHash()), baseDestination.getName()); + log.debug("Sent initial announce from {} ({})", encodeHexString(baseDestination.getHash()), baseDestination.getName()); // Start up first networking thread (the "server loop", the "Tasks engine") rnsNetworkEPC.start(); @@ -306,7 +306,7 @@ public class RNSNetwork { } // Disconnect peers gracefully and terminate Reticulum for (RNSPeer p: linkedPeers) { - log.info("shutting down peer: {}", Hex.encodeHexString(p.getDestinationHash())); + log.info("shutting down peer: {}", encodeHexString(p.getDestinationHash())); //log.debug("peer: {}", p); p.shutdown(); try { @@ -355,7 +355,7 @@ public class RNSNetwork { rttString = String.format("%d miliseconds", rtt); } log.info("Shutdown packet confirmation received from {}, round-trip time is {}", - Hex.encodeHexString(receipt.getDestination().getHash()), rttString); + encodeHexString(receipt.getDestination().getHash()), rttString); } } @@ -366,14 +366,14 @@ public class RNSNetwork { public void clientConnected(Link link) { //link.setLinkClosedCallback(this::clientDisconnected); //link.setPacketCallback(this::serverPacketReceived); - log.info("clientConnected - link hash: {}, {}", link.getHash(), Hex.encodeHexString(link.getHash())); + log.info("clientConnected - link hash: {}, {}", link.getHash(), encodeHexString(link.getHash())); RNSPeer newPeer = new RNSPeer(link); newPeer.setPeerLinkHash(link.getHash()); newPeer.setMessageMagic(getMessageMagic()); // make sure the peer has a channel and buffer newPeer.getOrInitPeerBuffer(); addIncomingPeer(newPeer); - log.info("***> Client connected, link: {}", link); + log.info("***> Client connected, link: {}", encodeHexString(link.getLinkId())); } public void clientDisconnected(Link link) { @@ -382,7 +382,7 @@ public class RNSNetwork { public void serverPacketReceived(byte[] message, Packet packet) { var msgText = new String(message, StandardCharsets.UTF_8); - log.info("Received data on link - message: {}, destinationHash: {}", msgText, Hex.encodeHexString(packet.getDestinationHash())); + log.info("Received data on link - message: {}, destinationHash: {}", msgText, encodeHexString(packet.getDestinationHash())); } //public void announceBaseDestination () { @@ -401,7 +401,7 @@ public class RNSNetwork { var peerExists = false; var activePeerCount = 0; - log.info("Received an announce from {}", Hex.encodeHexString(destinationHash)); + log.info("Received an announce from {}", encodeHexString(destinationHash)); if (nonNull(appData)) { log.debug("The announce contained the following app data: {}", new String(appData, UTF_8)); @@ -421,7 +421,8 @@ public class RNSNetwork { if (Arrays.equals(p.getDestinationHash(), destinationHash)) { log.info("QAnnounceHandler - peer exists - found peer matching destinationHash"); if (nonNull(p.getPeerLink())) { - log.info("peer link: {}, status: {}", p.getPeerLink(), p.getPeerLink().getStatus()); + log.info("peer link: {}, status: {}", + encodeHexString(p.getPeerLink().getLinkId()), p.getPeerLink().getStatus()); } peerExists = true; if (p.getPeerLink().getStatus() != ACTIVE) { @@ -430,7 +431,12 @@ public class RNSNetwork { break; } else { if (nonNull(p.getPeerLink())) { - log.info("QAnnounceHandler - other peer - link: {}, status: {}", p.getPeerLink(), p.getPeerLink().getStatus()); + log.info("QAnnounceHandler - other peer - link: {}, status: {}", + encodeHexString(p.getPeerLink().getLinkId()), p.getPeerLink().getStatus()); + if (p.getPeerLink().getStatus() == CLOSED) { + // mark peer for deletion on nexe pruning + p.setDeleteMe(true); + } } else { log.info("QAnnounceHandler - peer link is null"); } @@ -442,7 +448,7 @@ public class RNSNetwork { newPeer.setIsInitiator(true); newPeer.setMessageMagic(getMessageMagic()); addLinkedPeer(newPeer); - log.info("added new RNSPeer, destinationHash: {}", Hex.encodeHexString(destinationHash)); + log.info("added new RNSPeer, destinationHash: {}", encodeHexString(destinationHash)); } } // Chance to announce instead of waiting for next pruning. @@ -487,12 +493,6 @@ public class RNSNetwork { //} final Long now = NTP.getTime(); - - //// Prune stuck/slow/old peers (moved from Controller) - //task = maybeProduceRNSPrunePeersTask(now); - //if (task != null) { - // return task; - //} // ping task (Link+Channel+Buffer) task = maybeProducePeerPingTask(now); @@ -719,7 +719,7 @@ public class RNSNetwork { if (pLink.getStatus() == ACTIVE) { continue; } - if (pLink.getStatus() == CLOSED) { + if ((pLink.getStatus() == CLOSED) || (p.getDeleteMe())) { removeLinkedPeer(p); continue; } @@ -732,6 +732,15 @@ public class RNSNetwork { } // prune non-initiator peers List inaps = getNonActiveIncomingPeers(); + incomingPeerList = this.incomingPeers; + for (RNSPeer p: incomingPeerList) { + var pLink = p.getOrInitPeerLink(); + if (nonNull(pLink) && (pLink.getStatus() == ACTIVE)) { + // make false active links to timeout (and teardown in timeout callback) + // note: actual removal of peer happens on the following pruning run. + p.pingRemote(); + } + } for (RNSPeer p: inaps) { var pLink = p.getPeerLink(); if (nonNull(pLink)) { @@ -747,6 +756,7 @@ public class RNSNetwork { log.info("number of links (linkedPeers (active) / incomingPeers (active) after prunig: {} ({}), {} ({})", initiatorPeerList.size(), getActiveImmutableLinkedPeers().size(), incomingPeerList.size(), numActiveIncomingPeers); + maybeAnnounce(getBaseDestination()); } public void maybeAnnounce(Destination d) { @@ -769,7 +779,7 @@ public class RNSNetwork { var pLink = p.getPeerLink(); if (nonNull(pLink)) { if (Arrays.equals(pLink.getDestination().getHash(),link.getDestination().getHash())) { - log.info("found peer matching destinationHash: {}", Hex.encodeHexString(link.getDestination().getHash())); + log.info("found peer matching destinationHash: {}", encodeHexString(link.getDestination().getHash())); peer = p; break; } @@ -784,7 +794,7 @@ public class RNSNetwork { RNSPeer peer = null; for (RNSPeer p : lps) { if (Arrays.equals(p.getDestinationHash(), dhash)) { - log.info("found peer matching destinationHash: {}", Hex.encodeHexString(dhash)); + log.info("found peer matching destinationHash: {}", encodeHexString(dhash)); peer = p; break; } diff --git a/src/main/java/org/qortal/network/RNSPeer.java b/src/main/java/org/qortal/network/RNSPeer.java index 99177b64..179946b8 100644 --- a/src/main/java/org/qortal/network/RNSPeer.java +++ b/src/main/java/org/qortal/network/RNSPeer.java @@ -7,9 +7,7 @@ import static java.util.Objects.isNull; import static java.util.Objects.nonNull; //import java.io.IOException; import java.time.Instant; -import java.util.Arrays; -import java.util.List; -import java.util.Collections; +import java.util.*; //import io.reticulum.Reticulum; //import org.qortal.network.RNSNetwork; @@ -53,8 +51,7 @@ import org.qortal.utils.NTP; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; -import java.util.Map; -import java.util.Random; +import static java.nio.charset.StandardCharsets.UTF_8; import java.util.concurrent.*; import java.util.Arrays; @@ -73,6 +70,7 @@ import lombok.AccessLevel; //import org.qortal.network.message.Message; //import org.qortal.network.message.MessageException; +import java.util.concurrent.atomic.LongAdder; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -117,11 +115,12 @@ public class RNSPeer { private Map> replyQueues; private LinkedBlockingQueue pendingMessages; private boolean syncInProgress = false; + private RNSPeerData peerData = null; // Versioning public static final Pattern VERSION_PATTERN = Pattern.compile(Controller.VERSION_PREFIX + "(\\d{1,3})\\.(\\d{1,5})\\.(\\d{1,5})"); - - private RNSPeerData peerData = null; + /* Pending signature requests */ + private List pendingSignatureRequests = Collections.synchronizedList(new ArrayList<>()); /** * Latest block info as reported by peer. */ @@ -134,6 +133,20 @@ public class RNSPeer { * Last time we detected this peer as TOO_DIVERGENT */ private Long lastTooDivergentTime; + ///** + // * Known starting sequences for data received over buffer + // */ + //private byte[] SEQ_REQUEST_CONFIRM_ID = new byte[]{0x53, 0x52, 0x65, 0x71, 0x43, 0x49, 0x44}; // SReqCID + //private byte[] SEQ_RESPONSE_CONFIRM_ID = new byte[]{0x53, 0x52, 0x65, 0x73, 0x70, 0x43, 0x49, 0x44}; // SRespCID + + // Message stats + private static class MessageStats { + public final LongAdder count = new LongAdder(); + public final LongAdder totalBytes = new LongAdder(); + } + + private final Map receivedMessageStats = new ConcurrentHashMap<>(); + private final Map sentMessageStats = new ConcurrentHashMap<>(); /** * Constructor for initiator peers @@ -206,7 +219,7 @@ public class RNSPeer { public BufferedRWPair getOrInitPeerBuffer() { var channel = this.peerLink.getChannel(); if (nonNull(this.peerBuffer)) { - log.info("peerBuffer exists: {}, link status: {}", this.peerBuffer, this.peerLink.getStatus()); + //log.info("peerBuffer exists: {}, link status: {}", this.peerBuffer, this.peerLink.getStatus()); try { log.trace("peerBuffer exists: {}, link status: {}", this.peerBuffer, this.peerLink.getStatus()); } catch (IllegalStateException e) { @@ -270,7 +283,7 @@ public class RNSPeer { public void linkEstablished(Link link) { link.setLinkClosedCallback(this::linkClosed); log.info("peerLink {} established (link: {}) with peer: hash - {}, link destination hash: {}", - peerLink, link, encodeHexString(destinationHash), + encodeHexString(peerLink.getLinkId()), encodeHexString(link.getLinkId()), encodeHexString(destinationHash), encodeHexString(link.getDestination().getHash())); if (isInitiator) { startPings(); @@ -285,12 +298,12 @@ public class RNSPeer { } else if (link.getTeardownReason() == INITIATOR_CLOSED) { log.info("Link closed callback: The initiator closed the link"); log.info("peerLink {} closed (link: {}), link destination hash: {}", - peerLink, link, encodeHexString(link.getDestination().getHash())); + encodeHexString(peerLink.getLinkId()), encodeHexString(link.getLinkId()), encodeHexString(link.getDestination().getHash())); this.peerBuffer = null; } else if (link.getTeardownReason() == DESTINATION_CLOSED) { log.info("Link closed callback: The link was closed by the peer, removing peer"); log.info("peerLink {} closed (link: {}), link destination hash: {}", - peerLink, link, encodeHexString(link.getDestination().getHash())); + encodeHexString(peerLink.getLinkId()), encodeHexString(link.getLinkId()), encodeHexString(link.getDestination().getHash())); this.peerBuffer = null; } else { log.info("Link closed callback"); @@ -345,11 +358,23 @@ public class RNSPeer { // log.info("peerBufferReady - empty buffer detected (length: {})", data.length); //} //else { + //if (Arrays.equals(SEQ_REQUEST_CONFIRM_ID, Arrays.copyOfRange(data, 0, SEQ_REQUEST_CONFIRM_ID.length))) { + // // a non-initiator peer requested to confirm sending of a packet + // var messageId = subarray(data, SEQ_REQUEST_CONFIRM_ID.length + 1, data.length); + // log.info("received request to confirm message id, id: {}", messageId); + // var confirmData = concatArrays(SEQ_RESPONSE_CONFIRM_ID, "::",data.getBytes(UTF_8), messageId.getBytes(UTF_8)); + // this.peerBuffer.write(confirmData); + // this.peerBuffer.flush(); + //} else if (Arrays.equals(SEQ_RESPONSE_CONFIRM_ID, Arrays.copyOfRange(data, 0, SEQ_RESPONSE_CONFIRM_ID.lenth))) { + // // an initiator peer receiving the confirmation + // var messageId = subarray(data, SEQ_RESPONSE_CONFIRM_ID.length + 1, data.length); + // this.replyQueues.remove(messageId); + //} else { try { //log.info("***> creating message from {} bytes", data.length); Message message = Message.fromByteBuffer(bb); //log.info("*=> type {} message received ({} bytes): {}", message.getType(), data.length, message); - log.info("*=> type {} message received ({} bytes)", message.getType(), data.length); + log.info("*=> type {} message received ({} bytes, id: {}", message.getType(), data.length, message.getId()); // Handle message based on type switch (message.getType()) { @@ -364,12 +389,12 @@ public class RNSPeer { this.lastPingResponseReceived = Instant.now(); if (isFalse(this.isInitiator)) { onPingMessage(this, message); - // Note: buffer flush done in onPingMessage method } break; case PONG: log.trace("PONG received"); + addToQueue(message); // as response in blocking queue for ping getResponse break; // Do we need this ? (no need to relay peer list...) @@ -377,42 +402,41 @@ public class RNSPeer { // onPeersV2Message(peer, message); // break; - //case BLOCK_SUMMARIES: - // // from Synchronizer - // addToQueue(message); - // break; - // - //case BLOCK_SUMMARIES_V2: - // // from Synchronizer - // addToQueue(message); - // break; - // - //case SIGNATURES: - // // from Synchronizer - // addToQueue(message); - // break; - // - //case BLOCK: - // // from Synchronizer - // addToQueue(message); - // break; - // - //case BLOCK_V2: - // // from Synchronizer - // addToQueue(message); - // break; + case BLOCK_SUMMARIES: + // from Synchronizer + addToQueue(message); + + case BLOCK_SUMMARIES_V2: + // from Synchronizer + addToQueue(message); + + case SIGNATURES: + // from Synchronizer + addToQueue(message); + + case BLOCK: + // from Synchronizer + addToQueue(message); + + case BLOCK_V2: + // from Synchronizer + addToQueue(message); default: log.info("default - type {} message received ({} bytes)", message.getType(), data.length); // Bump up to controller for possible action - //Controller.getInstance().onNetworkMessage(peer, message); + addToQueue(message); Controller.getInstance().onRNSNetworkMessage(this, message); break; } } catch (MessageException e) { //log.error("{} from peer {}", e.getMessage(), this); - log.error("{} from peer {}", e, this); - log.info("{} from peer {}", e, this); + log.error("{} from peer {}, closing link", e, this); + //log.info("{} from peer {}", e, this); + // don't take any chances: + // can happen if link is closed by peer in which case we close this side of the link + this.peerData.setLastMisbehaved(NTP.getTime()); + shutdown(); } //} } @@ -430,7 +454,8 @@ public class RNSPeer { if (queue != null) { // Adding message to queue will unblock thread waiting for response this.replyQueues.get(message.getId()).add(message); - // Consumed elsewhere + // Consumed elsewhere (getResponseWithTimeout) + log.info("addToQueue - queue size: {}, message type: {} (id: {})", queue.size(), message.getType(), message.getId()); } else if (!this.pendingMessages.offer(message)) { log.info("[{}] Busy, no room to queue message from peer {} - discarding", @@ -490,14 +515,17 @@ public class RNSPeer { } else { rttString = String.format("%d milliseconds", rtt); } - log.info("Valid reply received from {}, round-trip time is {}", - encodeHexString(receipt.getDestination().getHash()), rttString); + if (getIsInitiator()) { + // reporting round trip time in one direction is enough + log.info("Valid reply received from {}, round-trip time is {}", + encodeHexString(receipt.getDestination().getHash()), rttString); + } this.lastAccessTimestamp = Instant.now(); } } public void packetTimedOut(PacketReceipt receipt) { - log.info("packet timed out, receipt status: {}", receipt.getStatus()); + //log.info("packet timed out, receipt status: {}", receipt.getStatus()); if (receipt.getStatus() == PacketReceiptStatus.FAILED) { log.info("packet timed out, receipt status: {}", PacketReceiptStatus.FAILED); this.peerTimedOut = true; @@ -534,19 +562,6 @@ public class RNSPeer { log.debug("Resource transfer complete"); } - ///** - // * Send a message using the peer buffer - // */ - //public Message getResponse(Message message) throws InterruptedException { - // var peerBuffer = getOrInitPeerBuffer(); - // - // //// send message - // //peerBuffer.write(...); - // //peerBuffer.flush(); - // - // // receive - peerBufferReady callback result - //} - /** Utility methods */ public void pingRemote() { var link = this.peerLink; @@ -554,7 +569,7 @@ public class RNSPeer { //if (nonNull(link) & link.isInitiator()) { if (nonNull(link)) { if (peerLink.getStatus() == ACTIVE) { - log.info("pinging remote (direct, 1 packet): {}", link); + log.info("pinging remote (direct, 1 packet): {}", encodeHexString(link.getLinkId())); var data = "ping".getBytes(UTF_8); link.setPacketCallback(this::linkPacketReceived); Packet pingPacket = new Packet(link, data); @@ -586,7 +601,7 @@ public class RNSPeer { try { PongMessage pongMessage = new PongMessage(); - pongMessage.setId(message.getId()); // use the ping message id + pongMessage.setId(message.getId()); // use the ping message id (for ping getResponse) this.peerBuffer.write(pongMessage.toBytes()); this.peerBuffer.flush(); this.lastAccessTimestamp = Instant.now(); @@ -609,7 +624,7 @@ public class RNSPeer { * @throws InterruptedException if interrupted while waiting */ public Message getResponse(Message message) throws InterruptedException { - log.info("RNSPingTask action - pinging peer {}", encodeHexString(getDestinationHash())); + //log.info("RNSPingTask action - pinging peer {}", encodeHexString(getDestinationHash())); return getResponseWithTimeout(message, RESPONSE_TIMEOUT); } @@ -629,7 +644,6 @@ public class RNSPeer { */ public Message getResponseWithTimeout(Message message, int timeout) throws InterruptedException { BlockingQueue blockingQueue = new ArrayBlockingQueue<>(1); - // TODO: implement equivalent of Peer class... // Assign random ID to this message Random random = new Random(); int id; @@ -640,17 +654,20 @@ public class RNSPeer { // If putIfAbsent() doesn't return null, then this ID is already taken } while (this.replyQueues.putIfAbsent(id, blockingQueue) != null); message.setId(id); + //log.info("getResponse - before send {} message, random id is {}", message.getType(), id); // Try to send message if (!this.sendMessageWithTimeout(message, timeout)) { this.replyQueues.remove(id); return null; } + //log.info("getResponse - after send"); try { return blockingQueue.poll(timeout, TimeUnit.MILLISECONDS); } finally { this.replyQueues.remove(id); + //log.info("getResponse - regular - id removed from replyQueues"); } } @@ -662,10 +679,16 @@ public class RNSPeer { */ public boolean sendMessageWithTimeout(Message message, int timeout) { try { + // send the message log.trace("Sending {} message with ID {} to peer {}", message.getType().name(), message.getId(), this); var peerBuffer = getOrInitPeerBuffer(); this.peerBuffer.write(message.toBytes()); this.peerBuffer.flush(); + //// send a message to confirm receipt over the buffer + //var messageId = message.getId(); + //var confirmData = concatArrays(SEQ_REQUEST_CONFIRM_ID,"::".getBytes(UTF_8), messageId.getBytes(UTF_8)); + //this.peerBuffer.write(confirmData); + //this.peerBuffer.flush(); return true; //} catch (InterruptedException e) { // // Send failure @@ -711,8 +734,8 @@ public class RNSPeer { //@Synchronized public boolean sendMessage(Message message) { try { - log.trace("Sending {} message with ID {} to peer {}", message.getType().name(), message.getId(), this); - log.info("Sending {} message with ID {} to peer {}", message.getType().name(), message.getId(), this); + log.trace("Sending {} message with ID {} to peer {}", message.getType().name(), message.getId(), this.toString()); + //log.info("Sending {} message with ID {} to peer {}", message.getType().name(), message.getId(), this.toString()); var peerBuffer = getOrInitPeerBuffer(); peerBuffer.write(message.toBytes()); peerBuffer.flush(); @@ -730,7 +753,7 @@ public class RNSPeer { protected void startPings() { log.trace("[{}] Enabling pings for peer {}", - peerLink.getDestination().getHexHash(), this); + peerLink.getDestination().getHexHash(), this.toString()); this.lastPingSent = NTP.getTime(); } @@ -831,4 +854,29 @@ public class RNSPeer { return false; return true; } + + // Pending signature requests + public void addPendingSignatureRequest(byte[] signature) { + // Check if we already have this signature in the list + for (byte[] existingSignature : this.pendingSignatureRequests) { + if (Arrays.equals(existingSignature, signature )) { + return; + } + } + this.pendingSignatureRequests.add(signature); + } + + public void removePendingSignatureRequest(byte[] signature) { + Iterator iterator = this.pendingSignatureRequests.iterator(); + while (iterator.hasNext()) { + byte[] existingSignature = (byte[]) iterator.next(); + if (Arrays.equals(existingSignature, signature)) { + iterator.remove(); + } + } + } + + public List getPendingSignatureRequests() { + return this.pendingSignatureRequests; + } } diff --git a/src/main/java/org/qortal/network/task/RNSPingTask.java b/src/main/java/org/qortal/network/task/RNSPingTask.java index 5849d7f8..26b35378 100644 --- a/src/main/java/org/qortal/network/task/RNSPingTask.java +++ b/src/main/java/org/qortal/network/task/RNSPingTask.java @@ -35,8 +35,8 @@ public class RNSPingTask implements Task { // Note: Even though getResponse would work, we can use // peer.sendMessage(pingMessage) using Reticulum buffer instead. // More efficient and saves room for other request/response tasks. - //peer.getResponse(pingMessage); - peer.sendMessage(pingMessage); + peer.getResponse(pingMessage); + //peer.sendMessage(pingMessage); //// task is not over here (Reticulum is asynchronous) //peer.setLastPing(NTP.getTime() - now); diff --git a/src/main/resources/reticulum_default_config.yml b/src/main/resources/reticulum_default_config.yml index 18e8b729..c3d90194 100644 --- a/src/main/resources/reticulum_default_config.yml +++ b/src/main/resources/reticulum_default_config.yml @@ -46,7 +46,7 @@ reticulum: # an optional directive, and can be left out for brevity. # This behaviour is disabled by default. - panic_on_interface_error: false + panic_on_interface_error: true # The interfaces section defines the physical and virtual @@ -64,9 +64,9 @@ interfaces: # local IPv6 is enabled in your operating system, which # should be enabled by default in almost any OS. See # the Reticulum Manual for more configuration options. - #"Default Interface": - # type: AutoInterface - # enabled: true + "Default Interface": + type: AutoInterface + enabled: true # This interface enables communication with a "backbone" # server over TCP. @@ -76,7 +76,7 @@ interfaces: enabled: true target_host: phantom.mobilefabrik.com target_port: 4242 - #network_name: qortal + network_name: qortal # This interface turns this Reticulum instance into a # server other clients can connect to over TCP. @@ -88,6 +88,6 @@ interfaces: # type: TCPServerInterface # enabled: true # listen_ip: 0.0.0.0 - # listen_port: 3434 - # #network_name: qortal + # listen_port: 4242 + # network_name: qortal diff --git a/src/main/resources/reticulum_default_testnet_config.yml b/src/main/resources/reticulum_default_testnet_config.yml index 18c7b3fb..518abbd1 100644 --- a/src/main/resources/reticulum_default_testnet_config.yml +++ b/src/main/resources/reticulum_default_testnet_config.yml @@ -46,7 +46,7 @@ reticulum: # an optional directive, and can be left out for brevity. # This behaviour is disabled by default. - panic_on_interface_error: false + panic_on_interface_error: true # The interfaces section defines the physical and virtual @@ -64,9 +64,9 @@ interfaces: # local IPv6 is enabled in your operating system, which # should be enabled by default in almost any OS. See # the Reticulum Manual for more configuration options. - #"Default Interface": - # type: AutoInterface - # enabled: true + "Default Interface": + type: AutoInterface + enabled: true # This interface enables communication with a "backbone" # server over TCP. @@ -75,8 +75,8 @@ interfaces: type: TCPClientInterface enabled: true target_host: phantom.mobilefabrik.com - target_port: 3434 - #network_name: qortal + target_port: 4242 + network_name: qortaltest # This interface turns this Reticulum instance into a # server other clients can connect to over TCP. @@ -88,6 +88,6 @@ interfaces: # type: TCPServerInterface # enabled: true # listen_ip: 0.0.0.0 - # listen_port: 3434 - # #network_name: qortal + # listen_port: 4242 + # network_name: qortaltest