mirror of
https://github.com/Qortal/qortal.git
synced 2025-06-21 14:51:20 +00:00
implement getResponse over buffer, fixes, updates to config, switch to reticulum master
This commit is contained in:
parent
c9ed42b93c
commit
952764d908
2
pom.xml
2
pom.xml
@ -52,7 +52,7 @@
|
||||
<maven-surefire-plugin.version>3.5.2</maven-surefire-plugin.version>
|
||||
<protobuf.version>3.25.3</protobuf.version>
|
||||
<replacer.version>1.5.3</replacer.version>
|
||||
<reticulum.version>c0eeadf</reticulum.version>
|
||||
<reticulum.version>bb5f807</reticulum.version>
|
||||
<simplemagic.version>1.17</simplemagic.version>
|
||||
<swagger-api.version>2.0.10</swagger-api.version>
|
||||
<swagger-ui.version>5.18.2</swagger-ui.version>
|
||||
|
@ -543,8 +543,8 @@ public class Controller extends Thread {
|
||||
LOGGER.info("Starting synchronizer");
|
||||
Synchronizer.getInstance().start();
|
||||
|
||||
LOGGER.info("Starting synchronizer over Reticulum");
|
||||
RNSSynchronizer.getInstance().start();
|
||||
//LOGGER.info("Starting synchronizer over Reticulum");
|
||||
//RNSSynchronizer.getInstance().start();
|
||||
|
||||
LOGGER.info("Starting block minter");
|
||||
blockMinter = new BlockMinter();
|
||||
@ -2378,22 +2378,22 @@ public class Controller extends Thread {
|
||||
// OnlineAccountsManager.getInstance().onNetworkOnlineAccountsV3Message(peer, message);
|
||||
// break;
|
||||
|
||||
//// TODO: Compiles but much of the Manager details need to be rethought for Reticulum
|
||||
//case GET_ARBITRARY_DATA:
|
||||
// // Not currently supported
|
||||
// break;
|
||||
////
|
||||
//case ARBITRARY_DATA_FILE_LIST:
|
||||
// RNSArbitraryDataFileListManager.getInstance().onNetworkArbitraryDataFileListMessage(peer, message);
|
||||
// break;
|
||||
//
|
||||
//case GET_ARBITRARY_DATA_FILE:
|
||||
// RNSArbitraryDataFileManager.getInstance().onNetworkGetArbitraryDataFileMessage(peer, message);
|
||||
// break;
|
||||
//
|
||||
//case GET_ARBITRARY_DATA_FILE_LIST:
|
||||
// RNSArbitraryDataFileListManager.getInstance().onNetworkGetArbitraryDataFileListMessage(peer, message);
|
||||
// break;
|
||||
// TODO: Compiles but rethink for Reticulum
|
||||
case GET_ARBITRARY_DATA:
|
||||
// Not currently supported
|
||||
break;
|
||||
|
||||
case ARBITRARY_DATA_FILE_LIST:
|
||||
RNSArbitraryDataFileListManager.getInstance().onNetworkArbitraryDataFileListMessage(peer, message);
|
||||
break;
|
||||
|
||||
case GET_ARBITRARY_DATA_FILE:
|
||||
RNSArbitraryDataFileManager.getInstance().onNetworkGetArbitraryDataFileMessage(peer, message);
|
||||
break;
|
||||
|
||||
case GET_ARBITRARY_DATA_FILE_LIST:
|
||||
RNSArbitraryDataFileListManager.getInstance().onNetworkGetArbitraryDataFileListMessage(peer, message);
|
||||
break;
|
||||
//
|
||||
case ARBITRARY_SIGNATURES:
|
||||
// Not currently supported
|
||||
|
@ -204,12 +204,12 @@ public class RNSArbitraryDataFileManager extends Thread {
|
||||
Message getArbitraryDataFileMessage = new GetArbitraryDataFileMessage(signature, hash);
|
||||
|
||||
Message response = null;
|
||||
//// TODO - revisit (doesn't work with Reticulum)
|
||||
//try {
|
||||
// response = peer.getResponseWithTimeout(getArbitraryDataFileMessage, (int) ArbitraryDataManager.ARBITRARY_REQUEST_TIMEOUT);
|
||||
//} catch (InterruptedException e) {
|
||||
// // Will return below due to null response
|
||||
//}
|
||||
// TODO - revisit with RNS
|
||||
try {
|
||||
response = peer.getResponseWithTimeout(getArbitraryDataFileMessage, (int) ArbitraryDataManager.ARBITRARY_REQUEST_TIMEOUT);
|
||||
} catch (InterruptedException e) {
|
||||
// Will return below due to null response
|
||||
}
|
||||
arbitraryDataFileRequests.remove(hash58);
|
||||
LOGGER.trace(String.format("Removed hash %.8s from arbitraryDataFileRequests", hash58));
|
||||
|
||||
|
@ -0,0 +1,130 @@
|
||||
package org.qortal.controller.arbitrary;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.qortal.controller.Controller;
|
||||
import org.qortal.data.arbitrary.RNSArbitraryFileListResponseInfo;
|
||||
import org.qortal.data.transaction.ArbitraryTransactionData;
|
||||
import org.qortal.event.DataMonitorEvent;
|
||||
import org.qortal.event.EventBus;
|
||||
import org.qortal.network.RNSPeer;
|
||||
import org.qortal.repository.DataException;
|
||||
import org.qortal.repository.Repository;
|
||||
import org.qortal.repository.RepositoryManager;
|
||||
import org.qortal.utils.ArbitraryTransactionUtils;
|
||||
import org.qortal.utils.Base58;
|
||||
import org.qortal.utils.NTP;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Comparator;
|
||||
import java.util.Iterator;
|
||||
|
||||
import static java.lang.Thread.NORM_PRIORITY;
|
||||
|
||||
public class RNSArbitraryDataFileRequestThread implements Runnable {
|
||||
|
||||
private static final Logger LOGGER = LogManager.getLogger(ArbitraryDataFileRequestThread.class);
|
||||
|
||||
public RNSArbitraryDataFileRequestThread() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
Thread.currentThread().setName("Arbitrary Data File Request Thread");
|
||||
Thread.currentThread().setPriority(NORM_PRIORITY);
|
||||
|
||||
try {
|
||||
while (!Controller.isStopping()) {
|
||||
Long now = NTP.getTime();
|
||||
this.processFileHashes(now);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
// Fall-through to exit thread...
|
||||
}
|
||||
}
|
||||
|
||||
private void processFileHashes(Long now) throws InterruptedException {
|
||||
if (Controller.isStopping()) {
|
||||
return;
|
||||
}
|
||||
|
||||
RNSArbitraryDataFileManager arbitraryDataFileManager = RNSArbitraryDataFileManager.getInstance();
|
||||
String signature58 = null;
|
||||
String hash58 = null;
|
||||
RNSPeer peer = null;
|
||||
boolean shouldProcess = false;
|
||||
|
||||
synchronized (arbitraryDataFileManager.arbitraryDataFileHashResponses) {
|
||||
if (!arbitraryDataFileManager.arbitraryDataFileHashResponses.isEmpty()) {
|
||||
|
||||
// Sort by lowest number of node hops first
|
||||
Comparator<RNSArbitraryFileListResponseInfo> lowestHopsFirstComparator =
|
||||
Comparator.comparingInt(RNSArbitraryFileListResponseInfo::getRequestHops);
|
||||
arbitraryDataFileManager.arbitraryDataFileHashResponses.sort(lowestHopsFirstComparator);
|
||||
|
||||
Iterator iterator = arbitraryDataFileManager.arbitraryDataFileHashResponses.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
if (Controller.isStopping()) {
|
||||
return;
|
||||
}
|
||||
|
||||
RNSArbitraryFileListResponseInfo responseInfo = (RNSArbitraryFileListResponseInfo) iterator.next();
|
||||
if (responseInfo == null) {
|
||||
iterator.remove();
|
||||
continue;
|
||||
}
|
||||
|
||||
hash58 = responseInfo.getHash58();
|
||||
peer = responseInfo.getPeer();
|
||||
signature58 = responseInfo.getSignature58();
|
||||
Long timestamp = responseInfo.getTimestamp();
|
||||
|
||||
if (now - timestamp >= ArbitraryDataManager.ARBITRARY_RELAY_TIMEOUT || signature58 == null || peer == null) {
|
||||
// Ignore - to be deleted
|
||||
iterator.remove();
|
||||
continue;
|
||||
}
|
||||
|
||||
// Skip if already requesting, but don't remove, as we might want to retry later
|
||||
if (arbitraryDataFileManager.arbitraryDataFileRequests.containsKey(hash58)) {
|
||||
// Already requesting - leave this attempt for later
|
||||
continue;
|
||||
}
|
||||
|
||||
// We want to process this file
|
||||
shouldProcess = true;
|
||||
iterator.remove();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!shouldProcess) {
|
||||
// Nothing to do
|
||||
Thread.sleep(1000L);
|
||||
return;
|
||||
}
|
||||
|
||||
byte[] hash = Base58.decode(hash58);
|
||||
byte[] signature = Base58.decode(signature58);
|
||||
|
||||
// Fetch the transaction data
|
||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||
ArbitraryTransactionData arbitraryTransactionData = ArbitraryTransactionUtils.fetchTransactionData(repository, signature);
|
||||
if (arbitraryTransactionData == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (signature == null || hash == null || peer == null || arbitraryTransactionData == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
LOGGER.trace("Fetching file {} from peer {} via request thread...", hash58, peer);
|
||||
arbitraryDataFileManager.fetchArbitraryDataFiles(repository, peer, signature, arbitraryTransactionData, Arrays.asList(hash));
|
||||
|
||||
} catch (DataException e) {
|
||||
LOGGER.debug("Unable to process file hashes: {}", e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
@ -61,7 +61,7 @@ public class RNSArbitraryRelayInfo {
|
||||
if (other == this)
|
||||
return true;
|
||||
|
||||
if (!(other instanceof ArbitraryRelayInfo))
|
||||
if (!(other instanceof RNSArbitraryRelayInfo))
|
||||
return false;
|
||||
|
||||
RNSArbitraryRelayInfo otherRelayInfo = (RNSArbitraryRelayInfo) other;
|
||||
|
@ -72,7 +72,7 @@ import java.util.Objects;
|
||||
import java.util.function.Function;
|
||||
import java.time.Instant;
|
||||
|
||||
import org.apache.commons.codec.binary.Hex;
|
||||
import static org.apache.commons.codec.binary.Hex.encodeHexString;
|
||||
import org.qortal.utils.ExecuteProduceConsume;
|
||||
import org.qortal.utils.ExecuteProduceConsume.StatsSnapshot;
|
||||
import org.qortal.utils.NTP;
|
||||
@ -218,7 +218,7 @@ public class RNSNetwork {
|
||||
// APP_NAME,
|
||||
// "qdn"
|
||||
//);
|
||||
log.info("Destination {} {} running", Hex.encodeHexString(baseDestination.getHash()), baseDestination.getName());
|
||||
log.info("Destination {} {} running", encodeHexString(baseDestination.getHash()), baseDestination.getName());
|
||||
|
||||
baseDestination.setProofStrategy(ProofStrategy.PROVE_ALL);
|
||||
baseDestination.setAcceptLinkRequests(true);
|
||||
@ -228,7 +228,7 @@ public class RNSNetwork {
|
||||
log.debug("announceHandlers: {}", Transport.getInstance().getAnnounceHandlers());
|
||||
// do a first announce
|
||||
baseDestination.announce();
|
||||
log.debug("Sent initial announce from {} ({})", Hex.encodeHexString(baseDestination.getHash()), baseDestination.getName());
|
||||
log.debug("Sent initial announce from {} ({})", encodeHexString(baseDestination.getHash()), baseDestination.getName());
|
||||
|
||||
// Start up first networking thread (the "server loop", the "Tasks engine")
|
||||
rnsNetworkEPC.start();
|
||||
@ -306,7 +306,7 @@ public class RNSNetwork {
|
||||
}
|
||||
// Disconnect peers gracefully and terminate Reticulum
|
||||
for (RNSPeer p: linkedPeers) {
|
||||
log.info("shutting down peer: {}", Hex.encodeHexString(p.getDestinationHash()));
|
||||
log.info("shutting down peer: {}", encodeHexString(p.getDestinationHash()));
|
||||
//log.debug("peer: {}", p);
|
||||
p.shutdown();
|
||||
try {
|
||||
@ -355,7 +355,7 @@ public class RNSNetwork {
|
||||
rttString = String.format("%d miliseconds", rtt);
|
||||
}
|
||||
log.info("Shutdown packet confirmation received from {}, round-trip time is {}",
|
||||
Hex.encodeHexString(receipt.getDestination().getHash()), rttString);
|
||||
encodeHexString(receipt.getDestination().getHash()), rttString);
|
||||
}
|
||||
}
|
||||
|
||||
@ -366,14 +366,14 @@ public class RNSNetwork {
|
||||
public void clientConnected(Link link) {
|
||||
//link.setLinkClosedCallback(this::clientDisconnected);
|
||||
//link.setPacketCallback(this::serverPacketReceived);
|
||||
log.info("clientConnected - link hash: {}, {}", link.getHash(), Hex.encodeHexString(link.getHash()));
|
||||
log.info("clientConnected - link hash: {}, {}", link.getHash(), encodeHexString(link.getHash()));
|
||||
RNSPeer newPeer = new RNSPeer(link);
|
||||
newPeer.setPeerLinkHash(link.getHash());
|
||||
newPeer.setMessageMagic(getMessageMagic());
|
||||
// make sure the peer has a channel and buffer
|
||||
newPeer.getOrInitPeerBuffer();
|
||||
addIncomingPeer(newPeer);
|
||||
log.info("***> Client connected, link: {}", link);
|
||||
log.info("***> Client connected, link: {}", encodeHexString(link.getLinkId()));
|
||||
}
|
||||
|
||||
public void clientDisconnected(Link link) {
|
||||
@ -382,7 +382,7 @@ public class RNSNetwork {
|
||||
|
||||
public void serverPacketReceived(byte[] message, Packet packet) {
|
||||
var msgText = new String(message, StandardCharsets.UTF_8);
|
||||
log.info("Received data on link - message: {}, destinationHash: {}", msgText, Hex.encodeHexString(packet.getDestinationHash()));
|
||||
log.info("Received data on link - message: {}, destinationHash: {}", msgText, encodeHexString(packet.getDestinationHash()));
|
||||
}
|
||||
|
||||
//public void announceBaseDestination () {
|
||||
@ -401,7 +401,7 @@ public class RNSNetwork {
|
||||
var peerExists = false;
|
||||
var activePeerCount = 0;
|
||||
|
||||
log.info("Received an announce from {}", Hex.encodeHexString(destinationHash));
|
||||
log.info("Received an announce from {}", encodeHexString(destinationHash));
|
||||
|
||||
if (nonNull(appData)) {
|
||||
log.debug("The announce contained the following app data: {}", new String(appData, UTF_8));
|
||||
@ -421,7 +421,8 @@ public class RNSNetwork {
|
||||
if (Arrays.equals(p.getDestinationHash(), destinationHash)) {
|
||||
log.info("QAnnounceHandler - peer exists - found peer matching destinationHash");
|
||||
if (nonNull(p.getPeerLink())) {
|
||||
log.info("peer link: {}, status: {}", p.getPeerLink(), p.getPeerLink().getStatus());
|
||||
log.info("peer link: {}, status: {}",
|
||||
encodeHexString(p.getPeerLink().getLinkId()), p.getPeerLink().getStatus());
|
||||
}
|
||||
peerExists = true;
|
||||
if (p.getPeerLink().getStatus() != ACTIVE) {
|
||||
@ -430,7 +431,12 @@ public class RNSNetwork {
|
||||
break;
|
||||
} else {
|
||||
if (nonNull(p.getPeerLink())) {
|
||||
log.info("QAnnounceHandler - other peer - link: {}, status: {}", p.getPeerLink(), p.getPeerLink().getStatus());
|
||||
log.info("QAnnounceHandler - other peer - link: {}, status: {}",
|
||||
encodeHexString(p.getPeerLink().getLinkId()), p.getPeerLink().getStatus());
|
||||
if (p.getPeerLink().getStatus() == CLOSED) {
|
||||
// mark peer for deletion on nexe pruning
|
||||
p.setDeleteMe(true);
|
||||
}
|
||||
} else {
|
||||
log.info("QAnnounceHandler - peer link is null");
|
||||
}
|
||||
@ -442,7 +448,7 @@ public class RNSNetwork {
|
||||
newPeer.setIsInitiator(true);
|
||||
newPeer.setMessageMagic(getMessageMagic());
|
||||
addLinkedPeer(newPeer);
|
||||
log.info("added new RNSPeer, destinationHash: {}", Hex.encodeHexString(destinationHash));
|
||||
log.info("added new RNSPeer, destinationHash: {}", encodeHexString(destinationHash));
|
||||
}
|
||||
}
|
||||
// Chance to announce instead of waiting for next pruning.
|
||||
@ -487,12 +493,6 @@ public class RNSNetwork {
|
||||
//}
|
||||
|
||||
final Long now = NTP.getTime();
|
||||
|
||||
//// Prune stuck/slow/old peers (moved from Controller)
|
||||
//task = maybeProduceRNSPrunePeersTask(now);
|
||||
//if (task != null) {
|
||||
// return task;
|
||||
//}
|
||||
|
||||
// ping task (Link+Channel+Buffer)
|
||||
task = maybeProducePeerPingTask(now);
|
||||
@ -719,7 +719,7 @@ public class RNSNetwork {
|
||||
if (pLink.getStatus() == ACTIVE) {
|
||||
continue;
|
||||
}
|
||||
if (pLink.getStatus() == CLOSED) {
|
||||
if ((pLink.getStatus() == CLOSED) || (p.getDeleteMe())) {
|
||||
removeLinkedPeer(p);
|
||||
continue;
|
||||
}
|
||||
@ -732,6 +732,15 @@ public class RNSNetwork {
|
||||
}
|
||||
// prune non-initiator peers
|
||||
List<RNSPeer> inaps = getNonActiveIncomingPeers();
|
||||
incomingPeerList = this.incomingPeers;
|
||||
for (RNSPeer p: incomingPeerList) {
|
||||
var pLink = p.getOrInitPeerLink();
|
||||
if (nonNull(pLink) && (pLink.getStatus() == ACTIVE)) {
|
||||
// make false active links to timeout (and teardown in timeout callback)
|
||||
// note: actual removal of peer happens on the following pruning run.
|
||||
p.pingRemote();
|
||||
}
|
||||
}
|
||||
for (RNSPeer p: inaps) {
|
||||
var pLink = p.getPeerLink();
|
||||
if (nonNull(pLink)) {
|
||||
@ -747,6 +756,7 @@ public class RNSNetwork {
|
||||
log.info("number of links (linkedPeers (active) / incomingPeers (active) after prunig: {} ({}), {} ({})",
|
||||
initiatorPeerList.size(), getActiveImmutableLinkedPeers().size(),
|
||||
incomingPeerList.size(), numActiveIncomingPeers);
|
||||
maybeAnnounce(getBaseDestination());
|
||||
}
|
||||
|
||||
public void maybeAnnounce(Destination d) {
|
||||
@ -769,7 +779,7 @@ public class RNSNetwork {
|
||||
var pLink = p.getPeerLink();
|
||||
if (nonNull(pLink)) {
|
||||
if (Arrays.equals(pLink.getDestination().getHash(),link.getDestination().getHash())) {
|
||||
log.info("found peer matching destinationHash: {}", Hex.encodeHexString(link.getDestination().getHash()));
|
||||
log.info("found peer matching destinationHash: {}", encodeHexString(link.getDestination().getHash()));
|
||||
peer = p;
|
||||
break;
|
||||
}
|
||||
@ -784,7 +794,7 @@ public class RNSNetwork {
|
||||
RNSPeer peer = null;
|
||||
for (RNSPeer p : lps) {
|
||||
if (Arrays.equals(p.getDestinationHash(), dhash)) {
|
||||
log.info("found peer matching destinationHash: {}", Hex.encodeHexString(dhash));
|
||||
log.info("found peer matching destinationHash: {}", encodeHexString(dhash));
|
||||
peer = p;
|
||||
break;
|
||||
}
|
||||
|
@ -7,9 +7,7 @@ import static java.util.Objects.isNull;
|
||||
import static java.util.Objects.nonNull;
|
||||
//import java.io.IOException;
|
||||
import java.time.Instant;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Collections;
|
||||
import java.util.*;
|
||||
|
||||
//import io.reticulum.Reticulum;
|
||||
//import org.qortal.network.RNSNetwork;
|
||||
@ -53,8 +51,7 @@ import org.qortal.utils.NTP;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import static java.nio.charset.StandardCharsets.UTF_8;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.Arrays;
|
||||
|
||||
@ -73,6 +70,7 @@ import lombok.AccessLevel;
|
||||
//import org.qortal.network.message.Message;
|
||||
//import org.qortal.network.message.MessageException;
|
||||
|
||||
import java.util.concurrent.atomic.LongAdder;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
@ -117,11 +115,12 @@ public class RNSPeer {
|
||||
private Map<Integer, BlockingQueue<Message>> replyQueues;
|
||||
private LinkedBlockingQueue<Message> pendingMessages;
|
||||
private boolean syncInProgress = false;
|
||||
private RNSPeerData peerData = null;
|
||||
// Versioning
|
||||
public static final Pattern VERSION_PATTERN = Pattern.compile(Controller.VERSION_PREFIX
|
||||
+ "(\\d{1,3})\\.(\\d{1,5})\\.(\\d{1,5})");
|
||||
|
||||
private RNSPeerData peerData = null;
|
||||
/* Pending signature requests */
|
||||
private List<byte[]> pendingSignatureRequests = Collections.synchronizedList(new ArrayList<>());
|
||||
/**
|
||||
* Latest block info as reported by peer.
|
||||
*/
|
||||
@ -134,6 +133,20 @@ public class RNSPeer {
|
||||
* Last time we detected this peer as TOO_DIVERGENT
|
||||
*/
|
||||
private Long lastTooDivergentTime;
|
||||
///**
|
||||
// * Known starting sequences for data received over buffer
|
||||
// */
|
||||
//private byte[] SEQ_REQUEST_CONFIRM_ID = new byte[]{0x53, 0x52, 0x65, 0x71, 0x43, 0x49, 0x44}; // SReqCID
|
||||
//private byte[] SEQ_RESPONSE_CONFIRM_ID = new byte[]{0x53, 0x52, 0x65, 0x73, 0x70, 0x43, 0x49, 0x44}; // SRespCID
|
||||
|
||||
// Message stats
|
||||
private static class MessageStats {
|
||||
public final LongAdder count = new LongAdder();
|
||||
public final LongAdder totalBytes = new LongAdder();
|
||||
}
|
||||
|
||||
private final Map<MessageType, RNSPeer.MessageStats> receivedMessageStats = new ConcurrentHashMap<>();
|
||||
private final Map<MessageType, RNSPeer.MessageStats> sentMessageStats = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* Constructor for initiator peers
|
||||
@ -206,7 +219,7 @@ public class RNSPeer {
|
||||
public BufferedRWPair getOrInitPeerBuffer() {
|
||||
var channel = this.peerLink.getChannel();
|
||||
if (nonNull(this.peerBuffer)) {
|
||||
log.info("peerBuffer exists: {}, link status: {}", this.peerBuffer, this.peerLink.getStatus());
|
||||
//log.info("peerBuffer exists: {}, link status: {}", this.peerBuffer, this.peerLink.getStatus());
|
||||
try {
|
||||
log.trace("peerBuffer exists: {}, link status: {}", this.peerBuffer, this.peerLink.getStatus());
|
||||
} catch (IllegalStateException e) {
|
||||
@ -270,7 +283,7 @@ public class RNSPeer {
|
||||
public void linkEstablished(Link link) {
|
||||
link.setLinkClosedCallback(this::linkClosed);
|
||||
log.info("peerLink {} established (link: {}) with peer: hash - {}, link destination hash: {}",
|
||||
peerLink, link, encodeHexString(destinationHash),
|
||||
encodeHexString(peerLink.getLinkId()), encodeHexString(link.getLinkId()), encodeHexString(destinationHash),
|
||||
encodeHexString(link.getDestination().getHash()));
|
||||
if (isInitiator) {
|
||||
startPings();
|
||||
@ -285,12 +298,12 @@ public class RNSPeer {
|
||||
} else if (link.getTeardownReason() == INITIATOR_CLOSED) {
|
||||
log.info("Link closed callback: The initiator closed the link");
|
||||
log.info("peerLink {} closed (link: {}), link destination hash: {}",
|
||||
peerLink, link, encodeHexString(link.getDestination().getHash()));
|
||||
encodeHexString(peerLink.getLinkId()), encodeHexString(link.getLinkId()), encodeHexString(link.getDestination().getHash()));
|
||||
this.peerBuffer = null;
|
||||
} else if (link.getTeardownReason() == DESTINATION_CLOSED) {
|
||||
log.info("Link closed callback: The link was closed by the peer, removing peer");
|
||||
log.info("peerLink {} closed (link: {}), link destination hash: {}",
|
||||
peerLink, link, encodeHexString(link.getDestination().getHash()));
|
||||
encodeHexString(peerLink.getLinkId()), encodeHexString(link.getLinkId()), encodeHexString(link.getDestination().getHash()));
|
||||
this.peerBuffer = null;
|
||||
} else {
|
||||
log.info("Link closed callback");
|
||||
@ -345,11 +358,23 @@ public class RNSPeer {
|
||||
// log.info("peerBufferReady - empty buffer detected (length: {})", data.length);
|
||||
//}
|
||||
//else {
|
||||
//if (Arrays.equals(SEQ_REQUEST_CONFIRM_ID, Arrays.copyOfRange(data, 0, SEQ_REQUEST_CONFIRM_ID.length))) {
|
||||
// // a non-initiator peer requested to confirm sending of a packet
|
||||
// var messageId = subarray(data, SEQ_REQUEST_CONFIRM_ID.length + 1, data.length);
|
||||
// log.info("received request to confirm message id, id: {}", messageId);
|
||||
// var confirmData = concatArrays(SEQ_RESPONSE_CONFIRM_ID, "::",data.getBytes(UTF_8), messageId.getBytes(UTF_8));
|
||||
// this.peerBuffer.write(confirmData);
|
||||
// this.peerBuffer.flush();
|
||||
//} else if (Arrays.equals(SEQ_RESPONSE_CONFIRM_ID, Arrays.copyOfRange(data, 0, SEQ_RESPONSE_CONFIRM_ID.lenth))) {
|
||||
// // an initiator peer receiving the confirmation
|
||||
// var messageId = subarray(data, SEQ_RESPONSE_CONFIRM_ID.length + 1, data.length);
|
||||
// this.replyQueues.remove(messageId);
|
||||
//} else {
|
||||
try {
|
||||
//log.info("***> creating message from {} bytes", data.length);
|
||||
Message message = Message.fromByteBuffer(bb);
|
||||
//log.info("*=> type {} message received ({} bytes): {}", message.getType(), data.length, message);
|
||||
log.info("*=> type {} message received ({} bytes)", message.getType(), data.length);
|
||||
log.info("*=> type {} message received ({} bytes, id: {}", message.getType(), data.length, message.getId());
|
||||
|
||||
// Handle message based on type
|
||||
switch (message.getType()) {
|
||||
@ -364,12 +389,12 @@ public class RNSPeer {
|
||||
this.lastPingResponseReceived = Instant.now();
|
||||
if (isFalse(this.isInitiator)) {
|
||||
onPingMessage(this, message);
|
||||
// Note: buffer flush done in onPingMessage method
|
||||
}
|
||||
break;
|
||||
|
||||
case PONG:
|
||||
log.trace("PONG received");
|
||||
addToQueue(message); // as response in blocking queue for ping getResponse
|
||||
break;
|
||||
|
||||
// Do we need this ? (no need to relay peer list...)
|
||||
@ -377,42 +402,41 @@ public class RNSPeer {
|
||||
// onPeersV2Message(peer, message);
|
||||
// break;
|
||||
|
||||
//case BLOCK_SUMMARIES:
|
||||
// // from Synchronizer
|
||||
// addToQueue(message);
|
||||
// break;
|
||||
//
|
||||
//case BLOCK_SUMMARIES_V2:
|
||||
// // from Synchronizer
|
||||
// addToQueue(message);
|
||||
// break;
|
||||
//
|
||||
//case SIGNATURES:
|
||||
// // from Synchronizer
|
||||
// addToQueue(message);
|
||||
// break;
|
||||
//
|
||||
//case BLOCK:
|
||||
// // from Synchronizer
|
||||
// addToQueue(message);
|
||||
// break;
|
||||
//
|
||||
//case BLOCK_V2:
|
||||
// // from Synchronizer
|
||||
// addToQueue(message);
|
||||
// break;
|
||||
case BLOCK_SUMMARIES:
|
||||
// from Synchronizer
|
||||
addToQueue(message);
|
||||
|
||||
case BLOCK_SUMMARIES_V2:
|
||||
// from Synchronizer
|
||||
addToQueue(message);
|
||||
|
||||
case SIGNATURES:
|
||||
// from Synchronizer
|
||||
addToQueue(message);
|
||||
|
||||
case BLOCK:
|
||||
// from Synchronizer
|
||||
addToQueue(message);
|
||||
|
||||
case BLOCK_V2:
|
||||
// from Synchronizer
|
||||
addToQueue(message);
|
||||
|
||||
default:
|
||||
log.info("default - type {} message received ({} bytes)", message.getType(), data.length);
|
||||
// Bump up to controller for possible action
|
||||
//Controller.getInstance().onNetworkMessage(peer, message);
|
||||
addToQueue(message);
|
||||
Controller.getInstance().onRNSNetworkMessage(this, message);
|
||||
break;
|
||||
}
|
||||
} catch (MessageException e) {
|
||||
//log.error("{} from peer {}", e.getMessage(), this);
|
||||
log.error("{} from peer {}", e, this);
|
||||
log.info("{} from peer {}", e, this);
|
||||
log.error("{} from peer {}, closing link", e, this);
|
||||
//log.info("{} from peer {}", e, this);
|
||||
// don't take any chances:
|
||||
// can happen if link is closed by peer in which case we close this side of the link
|
||||
this.peerData.setLastMisbehaved(NTP.getTime());
|
||||
shutdown();
|
||||
}
|
||||
//}
|
||||
}
|
||||
@ -430,7 +454,8 @@ public class RNSPeer {
|
||||
if (queue != null) {
|
||||
// Adding message to queue will unblock thread waiting for response
|
||||
this.replyQueues.get(message.getId()).add(message);
|
||||
// Consumed elsewhere
|
||||
// Consumed elsewhere (getResponseWithTimeout)
|
||||
log.info("addToQueue - queue size: {}, message type: {} (id: {})", queue.size(), message.getType(), message.getId());
|
||||
}
|
||||
else if (!this.pendingMessages.offer(message)) {
|
||||
log.info("[{}] Busy, no room to queue message from peer {} - discarding",
|
||||
@ -490,14 +515,17 @@ public class RNSPeer {
|
||||
} else {
|
||||
rttString = String.format("%d milliseconds", rtt);
|
||||
}
|
||||
log.info("Valid reply received from {}, round-trip time is {}",
|
||||
encodeHexString(receipt.getDestination().getHash()), rttString);
|
||||
if (getIsInitiator()) {
|
||||
// reporting round trip time in one direction is enough
|
||||
log.info("Valid reply received from {}, round-trip time is {}",
|
||||
encodeHexString(receipt.getDestination().getHash()), rttString);
|
||||
}
|
||||
this.lastAccessTimestamp = Instant.now();
|
||||
}
|
||||
}
|
||||
|
||||
public void packetTimedOut(PacketReceipt receipt) {
|
||||
log.info("packet timed out, receipt status: {}", receipt.getStatus());
|
||||
//log.info("packet timed out, receipt status: {}", receipt.getStatus());
|
||||
if (receipt.getStatus() == PacketReceiptStatus.FAILED) {
|
||||
log.info("packet timed out, receipt status: {}", PacketReceiptStatus.FAILED);
|
||||
this.peerTimedOut = true;
|
||||
@ -534,19 +562,6 @@ public class RNSPeer {
|
||||
log.debug("Resource transfer complete");
|
||||
}
|
||||
|
||||
///**
|
||||
// * Send a message using the peer buffer
|
||||
// */
|
||||
//public Message getResponse(Message message) throws InterruptedException {
|
||||
// var peerBuffer = getOrInitPeerBuffer();
|
||||
//
|
||||
// //// send message
|
||||
// //peerBuffer.write(...);
|
||||
// //peerBuffer.flush();
|
||||
//
|
||||
// // receive - peerBufferReady callback result
|
||||
//}
|
||||
|
||||
/** Utility methods */
|
||||
public void pingRemote() {
|
||||
var link = this.peerLink;
|
||||
@ -554,7 +569,7 @@ public class RNSPeer {
|
||||
//if (nonNull(link) & link.isInitiator()) {
|
||||
if (nonNull(link)) {
|
||||
if (peerLink.getStatus() == ACTIVE) {
|
||||
log.info("pinging remote (direct, 1 packet): {}", link);
|
||||
log.info("pinging remote (direct, 1 packet): {}", encodeHexString(link.getLinkId()));
|
||||
var data = "ping".getBytes(UTF_8);
|
||||
link.setPacketCallback(this::linkPacketReceived);
|
||||
Packet pingPacket = new Packet(link, data);
|
||||
@ -586,7 +601,7 @@ public class RNSPeer {
|
||||
|
||||
try {
|
||||
PongMessage pongMessage = new PongMessage();
|
||||
pongMessage.setId(message.getId()); // use the ping message id
|
||||
pongMessage.setId(message.getId()); // use the ping message id (for ping getResponse)
|
||||
this.peerBuffer.write(pongMessage.toBytes());
|
||||
this.peerBuffer.flush();
|
||||
this.lastAccessTimestamp = Instant.now();
|
||||
@ -609,7 +624,7 @@ public class RNSPeer {
|
||||
* @throws InterruptedException if interrupted while waiting
|
||||
*/
|
||||
public Message getResponse(Message message) throws InterruptedException {
|
||||
log.info("RNSPingTask action - pinging peer {}", encodeHexString(getDestinationHash()));
|
||||
//log.info("RNSPingTask action - pinging peer {}", encodeHexString(getDestinationHash()));
|
||||
return getResponseWithTimeout(message, RESPONSE_TIMEOUT);
|
||||
}
|
||||
|
||||
@ -629,7 +644,6 @@ public class RNSPeer {
|
||||
*/
|
||||
public Message getResponseWithTimeout(Message message, int timeout) throws InterruptedException {
|
||||
BlockingQueue<Message> blockingQueue = new ArrayBlockingQueue<>(1);
|
||||
// TODO: implement equivalent of Peer class...
|
||||
// Assign random ID to this message
|
||||
Random random = new Random();
|
||||
int id;
|
||||
@ -640,17 +654,20 @@ public class RNSPeer {
|
||||
// If putIfAbsent() doesn't return null, then this ID is already taken
|
||||
} while (this.replyQueues.putIfAbsent(id, blockingQueue) != null);
|
||||
message.setId(id);
|
||||
//log.info("getResponse - before send {} message, random id is {}", message.getType(), id);
|
||||
|
||||
// Try to send message
|
||||
if (!this.sendMessageWithTimeout(message, timeout)) {
|
||||
this.replyQueues.remove(id);
|
||||
return null;
|
||||
}
|
||||
//log.info("getResponse - after send");
|
||||
|
||||
try {
|
||||
return blockingQueue.poll(timeout, TimeUnit.MILLISECONDS);
|
||||
} finally {
|
||||
this.replyQueues.remove(id);
|
||||
//log.info("getResponse - regular - id removed from replyQueues");
|
||||
}
|
||||
}
|
||||
|
||||
@ -662,10 +679,16 @@ public class RNSPeer {
|
||||
*/
|
||||
public boolean sendMessageWithTimeout(Message message, int timeout) {
|
||||
try {
|
||||
// send the message
|
||||
log.trace("Sending {} message with ID {} to peer {}", message.getType().name(), message.getId(), this);
|
||||
var peerBuffer = getOrInitPeerBuffer();
|
||||
this.peerBuffer.write(message.toBytes());
|
||||
this.peerBuffer.flush();
|
||||
//// send a message to confirm receipt over the buffer
|
||||
//var messageId = message.getId();
|
||||
//var confirmData = concatArrays(SEQ_REQUEST_CONFIRM_ID,"::".getBytes(UTF_8), messageId.getBytes(UTF_8));
|
||||
//this.peerBuffer.write(confirmData);
|
||||
//this.peerBuffer.flush();
|
||||
return true;
|
||||
//} catch (InterruptedException e) {
|
||||
// // Send failure
|
||||
@ -711,8 +734,8 @@ public class RNSPeer {
|
||||
//@Synchronized
|
||||
public boolean sendMessage(Message message) {
|
||||
try {
|
||||
log.trace("Sending {} message with ID {} to peer {}", message.getType().name(), message.getId(), this);
|
||||
log.info("Sending {} message with ID {} to peer {}", message.getType().name(), message.getId(), this);
|
||||
log.trace("Sending {} message with ID {} to peer {}", message.getType().name(), message.getId(), this.toString());
|
||||
//log.info("Sending {} message with ID {} to peer {}", message.getType().name(), message.getId(), this.toString());
|
||||
var peerBuffer = getOrInitPeerBuffer();
|
||||
peerBuffer.write(message.toBytes());
|
||||
peerBuffer.flush();
|
||||
@ -730,7 +753,7 @@ public class RNSPeer {
|
||||
|
||||
protected void startPings() {
|
||||
log.trace("[{}] Enabling pings for peer {}",
|
||||
peerLink.getDestination().getHexHash(), this);
|
||||
peerLink.getDestination().getHexHash(), this.toString());
|
||||
this.lastPingSent = NTP.getTime();
|
||||
}
|
||||
|
||||
@ -831,4 +854,29 @@ public class RNSPeer {
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
// Pending signature requests
|
||||
public void addPendingSignatureRequest(byte[] signature) {
|
||||
// Check if we already have this signature in the list
|
||||
for (byte[] existingSignature : this.pendingSignatureRequests) {
|
||||
if (Arrays.equals(existingSignature, signature )) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
this.pendingSignatureRequests.add(signature);
|
||||
}
|
||||
|
||||
public void removePendingSignatureRequest(byte[] signature) {
|
||||
Iterator iterator = this.pendingSignatureRequests.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
byte[] existingSignature = (byte[]) iterator.next();
|
||||
if (Arrays.equals(existingSignature, signature)) {
|
||||
iterator.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public List<byte[]> getPendingSignatureRequests() {
|
||||
return this.pendingSignatureRequests;
|
||||
}
|
||||
}
|
||||
|
@ -35,8 +35,8 @@ public class RNSPingTask implements Task {
|
||||
// Note: Even though getResponse would work, we can use
|
||||
// peer.sendMessage(pingMessage) using Reticulum buffer instead.
|
||||
// More efficient and saves room for other request/response tasks.
|
||||
//peer.getResponse(pingMessage);
|
||||
peer.sendMessage(pingMessage);
|
||||
peer.getResponse(pingMessage);
|
||||
//peer.sendMessage(pingMessage);
|
||||
|
||||
//// task is not over here (Reticulum is asynchronous)
|
||||
//peer.setLastPing(NTP.getTime() - now);
|
||||
|
@ -46,7 +46,7 @@ reticulum:
|
||||
# an optional directive, and can be left out for brevity.
|
||||
# This behaviour is disabled by default.
|
||||
|
||||
panic_on_interface_error: false
|
||||
panic_on_interface_error: true
|
||||
|
||||
|
||||
# The interfaces section defines the physical and virtual
|
||||
@ -64,9 +64,9 @@ interfaces:
|
||||
# local IPv6 is enabled in your operating system, which
|
||||
# should be enabled by default in almost any OS. See
|
||||
# the Reticulum Manual for more configuration options.
|
||||
#"Default Interface":
|
||||
# type: AutoInterface
|
||||
# enabled: true
|
||||
"Default Interface":
|
||||
type: AutoInterface
|
||||
enabled: true
|
||||
|
||||
# This interface enables communication with a "backbone"
|
||||
# server over TCP.
|
||||
@ -76,7 +76,7 @@ interfaces:
|
||||
enabled: true
|
||||
target_host: phantom.mobilefabrik.com
|
||||
target_port: 4242
|
||||
#network_name: qortal
|
||||
network_name: qortal
|
||||
|
||||
# This interface turns this Reticulum instance into a
|
||||
# server other clients can connect to over TCP.
|
||||
@ -88,6 +88,6 @@ interfaces:
|
||||
# type: TCPServerInterface
|
||||
# enabled: true
|
||||
# listen_ip: 0.0.0.0
|
||||
# listen_port: 3434
|
||||
# #network_name: qortal
|
||||
# listen_port: 4242
|
||||
# network_name: qortal
|
||||
|
||||
|
@ -46,7 +46,7 @@ reticulum:
|
||||
# an optional directive, and can be left out for brevity.
|
||||
# This behaviour is disabled by default.
|
||||
|
||||
panic_on_interface_error: false
|
||||
panic_on_interface_error: true
|
||||
|
||||
|
||||
# The interfaces section defines the physical and virtual
|
||||
@ -64,9 +64,9 @@ interfaces:
|
||||
# local IPv6 is enabled in your operating system, which
|
||||
# should be enabled by default in almost any OS. See
|
||||
# the Reticulum Manual for more configuration options.
|
||||
#"Default Interface":
|
||||
# type: AutoInterface
|
||||
# enabled: true
|
||||
"Default Interface":
|
||||
type: AutoInterface
|
||||
enabled: true
|
||||
|
||||
# This interface enables communication with a "backbone"
|
||||
# server over TCP.
|
||||
@ -75,8 +75,8 @@ interfaces:
|
||||
type: TCPClientInterface
|
||||
enabled: true
|
||||
target_host: phantom.mobilefabrik.com
|
||||
target_port: 3434
|
||||
#network_name: qortal
|
||||
target_port: 4242
|
||||
network_name: qortaltest
|
||||
|
||||
# This interface turns this Reticulum instance into a
|
||||
# server other clients can connect to over TCP.
|
||||
@ -88,6 +88,6 @@ interfaces:
|
||||
# type: TCPServerInterface
|
||||
# enabled: true
|
||||
# listen_ip: 0.0.0.0
|
||||
# listen_port: 3434
|
||||
# #network_name: qortal
|
||||
# listen_port: 4242
|
||||
# network_name: qortaltest
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user