Merge pull request #252 from jschulthess/reticulum

Reticulum Branch Update
This commit is contained in:
crowetic 2025-05-01 17:05:04 -07:00 committed by GitHub
commit d253d7753d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 1841 additions and 28 deletions

View File

@ -543,6 +543,9 @@ public class Controller extends Thread {
LOGGER.info("Starting synchronizer");
Synchronizer.getInstance().start();
//LOGGER.info("Starting synchronizer over Reticulum");
//RNSSynchronizer.getInstance().start();
LOGGER.info("Starting block minter");
blockMinter = new BlockMinter();
blockMinter.start();
@ -945,23 +948,47 @@ public class Controller extends Thread {
return peerChainTipData == null || peerChainTipData.getTimestamp() == null || peerChainTipData.getTimestamp() < minLatestBlockTimestamp;
};
public static final Predicate<RNSPeer> hasNoRecentBlock2 = peer -> {
final Long minLatestBlockTimestamp = getMinimumLatestBlockTimestamp();
final BlockSummaryData peerChainTipData = peer.getChainTipData();
return peerChainTipData == null || peerChainTipData.getTimestamp() == null || peerChainTipData.getTimestamp() < minLatestBlockTimestamp;
};
public static final Predicate<Peer> hasNoOrSameBlock = peer -> {
final BlockData latestBlockData = getInstance().getChainTip();
final BlockSummaryData peerChainTipData = peer.getChainTipData();
return peerChainTipData == null || peerChainTipData.getSignature() == null || Arrays.equals(latestBlockData.getSignature(), peerChainTipData.getSignature());
};
public static final Predicate<RNSPeer> hasNoOrSameBlock2 = peer -> {
final BlockData latestBlockData = getInstance().getChainTip();
final BlockSummaryData peerChainTipData = peer.getChainTipData();
return peerChainTipData == null || peerChainTipData.getSignature() == null || Arrays.equals(latestBlockData.getSignature(), peerChainTipData.getSignature());
};
public static final Predicate<Peer> hasOnlyGenesisBlock = peer -> {
final BlockSummaryData peerChainTipData = peer.getChainTipData();
return peerChainTipData == null || peerChainTipData.getHeight() == 1;
};
public static final Predicate<RNSPeer> hasOnlyGenesisBlock2 = peer -> {
final BlockSummaryData peerChainTipData = peer.getChainTipData();
return peerChainTipData == null || peerChainTipData.getHeight() == 1;
};
public static final Predicate<Peer> hasInferiorChainTip = peer -> {
final BlockSummaryData peerChainTipData = peer.getChainTipData();
final List<ByteArray> inferiorChainTips = Synchronizer.getInstance().inferiorChainSignatures;
return peerChainTipData == null || peerChainTipData.getSignature() == null || inferiorChainTips.contains(ByteArray.wrap(peerChainTipData.getSignature()));
};
public static final Predicate<RNSPeer> hasInferiorChainTip2 = peer -> {
final BlockSummaryData peerChainTipData = peer.getChainTipData();
final List<ByteArray> inferiorChainTips = Synchronizer.getInstance().inferiorChainSignatures;
return peerChainTipData == null || peerChainTipData.getSignature() == null || inferiorChainTips.contains(ByteArray.wrap(peerChainTipData.getSignature()));
};
public static final Predicate<Peer> hasOldVersion = peer -> {
final String minPeerVersion = Settings.getInstance().getMinPeerVersion();
return !peer.isAtLeastVersion(minPeerVersion);
@ -979,6 +1006,18 @@ public class Controller extends Thread {
}
};
public static final Predicate<RNSPeer> hasInvalidSigner2 = peer -> {
final BlockSummaryData peerChainTipData = peer.getChainTipData();
if (peerChainTipData == null)
return true;
try (Repository repository = RepositoryManager.getRepository()) {
return Account.getRewardShareEffectiveMintingLevel(repository, peerChainTipData.getMinterPublicKey()) == 0;
} catch (DataException e) {
return true;
}
};
public static final Predicate<Peer> wasRecentlyTooDivergent = peer -> {
Long now = NTP.getTime();
Long peerLastTooDivergentTime = peer.getLastTooDivergentTime();
@ -2879,7 +2918,7 @@ public class Controller extends Thread {
return true;
// Needs a mutable copy of the unmodifiableList
List<RNSPeer> peers = new ArrayList<>(RNSNetwork.getInstance().getImmutableLinkedPeers());
List<RNSPeer> peers = new ArrayList<>(RNSNetwork.getInstance().getImmutableActiveLinkedPeers());
if (peers == null)
return false;

File diff suppressed because it is too large Load Diff

View File

@ -83,6 +83,7 @@ import org.qortal.network.message.TransactionSignaturesMessage;
import org.qortal.network.message.GetUnconfirmedTransactionsMessage;
import org.qortal.network.task.RNSBroadcastTask;
import org.qortal.network.task.RNSPrunePeersTask;
import org.qortal.data.network.RNSPeerData;
import org.qortal.controller.Controller;
import org.qortal.repository.Repository;
import org.qortal.repository.RepositoryManager;
@ -107,7 +108,7 @@ public class RNSNetwork {
private final int MAX_PEERS = Settings.getInstance().getReticulumMaxPeers();
private final int MIN_DESIRED_PEERS = Settings.getInstance().getReticulumMinDesiredPeers();
// How long [ms] between pruning of peers
private long PRUNE_INTERVAL = 1 * 60 * 1000L; // ms;
private long PRUNE_INTERVAL = 1 * 64 * 1000L; // ms;
Identity serverIdentity;
public Destination baseDestination;
@ -251,7 +252,7 @@ public class RNSNetwork {
}
public void broadcast(Function<RNSPeer, Message> peerMessageBuilder) {
for (RNSPeer peer : getImmutableLinkedPeers()) {
for (RNSPeer peer : getImmutableActiveLinkedPeers()) {
if (this.isShuttingDown) {
return;
}
@ -530,7 +531,7 @@ public class RNSNetwork {
//// Note: we might not need this. All messages handled asynchronously in Reticulum
//// (RNSPeer peerBufferReady callback)
//private Task maybeProducePeerMessageTask() {
// return getImmutableLinkedPeers().stream()
// return getImmutableActiveLinkedPeers().stream()
// .map(RNSPeer::getMessageTask)
// .filter(Objects::nonNull)
// .findFirst()
@ -554,7 +555,7 @@ public class RNSNetwork {
// log.info("ilp - {}", ilp);
//}
//return ilp;
return getImmutableLinkedPeers().stream()
return getImmutableActiveLinkedPeers().stream()
.map(peer -> peer.getPingTask(now))
.filter(Objects::nonNull)
.findFirst()
@ -588,6 +589,16 @@ public class RNSNetwork {
return SingletonContainer.INSTANCE;
}
public List<RNSPeer> getImmutableActiveLinkedPeers() {
List<RNSPeer> activePeers = Collections.synchronizedList(new ArrayList<>());
for (RNSPeer p: this.immutableLinkedPeers) {
if (nonNull(p.getPeerLink()) && (p.getPeerLink().getStatus() == ACTIVE)) {
activePeers.add(p);
}
}
return activePeers;
}
public List<RNSPeer> getImmutableLinkedPeers() {
return this.immutableLinkedPeers;
}
@ -665,6 +676,23 @@ public class RNSNetwork {
return result;
}
public void peerMisbehaved(RNSPeer peer) {
RNSPeerData peerData = peer.getPeerData();
peerData.setLastMisbehaved(NTP.getTime());
//// Only update repository if outbound/initiator peer
//if (peer.getIsInitiator()) {
// try (Repository repository = RepositoryManager.getRepository()) {
// synchronized (this.allKnownPeers) {
// repository.getNetworkRepository().save(peerData);
// repository.saveChanges();
// }
// } catch (DataException e) {
// log.warn("Repository issue while updating peer synchronization info", e);
// }
//}
}
public List<RNSPeer> incomingNonActivePeers() {
var ips = getIncomingPeers();
List<RNSPeer> result = Collections.synchronizedList(new ArrayList<>());

View File

@ -41,6 +41,7 @@ import org.qortal.data.block.BlockSummaryData;
import org.qortal.data.block.CommonBlockData;
import org.qortal.data.network.RNSPeerData;
import org.qortal.network.message.Message;
import org.qortal.network.message.MessageType;
import org.qortal.network.message.PingMessage;
import org.qortal.network.message.*;
import org.qortal.network.message.MessageException;
@ -115,6 +116,7 @@ public class RNSPeer {
@Setter(AccessLevel.PACKAGE) private Instant lastPingResponseReceived = null; // time last (packet) ping succeeded
private Map<Integer, BlockingQueue<Message>> replyQueues;
private LinkedBlockingQueue<Message> pendingMessages;
private boolean syncInProgress = false;
// Versioning
public static final Pattern VERSION_PATTERN = Pattern.compile(Controller.VERSION_PREFIX
+ "(\\d{1,3})\\.(\\d{1,5})\\.(\\d{1,5})");
@ -128,7 +130,10 @@ public class RNSPeer {
* Our common block with this peer
*/
private CommonBlockData commonBlockData;
/**
* Last time we detected this peer as TOO_DIVERGENT
*/
private Long lastTooDivergentTime;
/**
* Constructor for initiator peers
@ -346,6 +351,7 @@ public class RNSPeer {
Message message = Message.fromByteBuffer(bb);
//log.info("*=> type {} message received ({} bytes): {}", message.getType(), data.length, message);
log.info("*=> type {} message received ({} bytes)", message.getType(), data.length);
// Handle message based on type
switch (message.getType()) {
// Do we need this ? (seems like a TCP scenario only thing)
@ -372,6 +378,31 @@ public class RNSPeer {
// onPeersV2Message(peer, message);
// break;
case BLOCK_SUMMARIES:
// from Synchronizer
addToQueue(message);
break;
case BLOCK_SUMMARIES_V2:
// from Synchronizer
addToQueue(message);
break;
case SIGNATURES:
// from Synchronizer
addToQueue(message);
break;
case BLOCK:
// from Synchronizer
addToQueue(message);
break;
case BLOCK_V2:
// from Synchronizer
addToQueue(message);
break;
default:
log.info("default - type {} message received ({} bytes)", message.getType(), data.length);
// Bump up to controller for possible action
@ -387,9 +418,26 @@ public class RNSPeer {
}
}
//public void handleMessage(Message message) {
//
//}
/**
* we need to queue all incomming messages that follow request/response
* with explicit handling of the response message.
*/
public void addToQueue(Message message) {
if (message.getType() == MessageType.UNSUPPORTED) {
log.trace("discarding/skipping UNSUPPORTED message");
return;
}
BlockingQueue<Message> queue = this.replyQueues.get(message.getId());
if (queue != null) {
// Adding message to queue will unblock thread waiting for response
this.replyQueues.get(message.getId()).add(message);
// Consumed elsewhere
}
else if (!this.pendingMessages.offer(message)) {
log.info("[{}] Busy, no room to queue message from peer {} - discarding",
this.peerLink, this);
}
}
/**
* Set a packet to remote with the message format "close::<our_destination_hash>"
@ -558,9 +606,9 @@ public class RNSPeer {
* @return <code>Message</code> if valid response received; <code>null</code> if not or error/exception occurs
* @throws InterruptedException if interrupted while waiting
*/
public void getResponse(Message message) throws InterruptedException {
public Message getResponse(Message message) throws InterruptedException {
log.info("RNSPingTask action - pinging peer {}", encodeHexString(getDestinationHash()));
getResponseWithTimeout(message, RESPONSE_TIMEOUT);
return getResponseWithTimeout(message, RESPONSE_TIMEOUT);
}
/**
@ -577,7 +625,7 @@ public class RNSPeer {
* @return <code>Message</code> if valid response received; <code>null</code> if not or error/exception occurs
* @throws InterruptedException if interrupted while waiting
*/
public void getResponseWithTimeout(Message message, int timeout) throws InterruptedException {
public Message getResponseWithTimeout(Message message, int timeout) throws InterruptedException {
BlockingQueue<Message> blockingQueue = new ArrayBlockingQueue<>(1);
// TODO: implement equivalent of Peer class...
// Assign random ID to this message
@ -594,11 +642,11 @@ public class RNSPeer {
// Try to send message
if (!this.sendMessageWithTimeout(message, timeout)) {
this.replyQueues.remove(id);
return;
return null;
}
try {
blockingQueue.poll(timeout, TimeUnit.MILLISECONDS);
return blockingQueue.poll(timeout, TimeUnit.MILLISECONDS);
} finally {
this.replyQueues.remove(id);
}
@ -765,4 +813,20 @@ public class RNSPeer {
public void setCommonBlockData(CommonBlockData commonBlockData) {
this.commonBlockData = commonBlockData;
}
// Common block data
public boolean canUseCachedCommonBlockData() {
BlockSummaryData peerChainTipData = this.getChainTipData();
if (peerChainTipData == null || peerChainTipData.getSignature() == null)
return false;
CommonBlockData commonBlockData = this.getCommonBlockData();
if (commonBlockData == null)
return false;
BlockSummaryData commonBlockChainTipData = commonBlockData.getChainTipData();
if (commonBlockChainTipData == null || commonBlockChainTipData.getSignature() == null)
return false;
if (!Arrays.equals(peerChainTipData.getSignature(), commonBlockChainTipData.getSignature()))
return false;
return true;
}
}

View File

@ -6,7 +6,6 @@ import org.qortal.network.RNSPeer;
import org.qortal.network.message.Message;
import org.qortal.network.message.MessageType;
import org.qortal.network.message.PingMessage;
//import org.qortal.network.message.RNSPingMessage;
import org.qortal.network.message.MessageException;
import org.qortal.utils.ExecuteProduceConsume.Task;
import org.qortal.utils.NTP;
@ -31,21 +30,11 @@ public class RNSPingTask implements Task {
@Override
public void perform() throws InterruptedException {
//RNSPingMessage pingMessage = new RNSPingMessage();
PingMessage pingMessage = new PingMessage();
//try {
// var peerBuffer = this.peer.getOrInitPeerBuffer();
// LOGGER.info("message toBytes: {}", pingMessage.toBytes());
// peerBuffer.write(pingMessage.toBytes());
// peerBuffer.flush();
//} catch (IllegalStateException e) {
// //log.warn("Can't write to buffer (remote buffer down?)");
// LOGGER.error("IllegalStateException - can't write to buffer: e", e);
//} catch (MessageException e) {
// LOGGER.error(e.getMessage(), e);
//}
// Note: We might use peer.sendMessage(pingMessage) instead
// Note: Even though getResponse would work, we can use
// peer.sendMessage(pingMessage) using Reticulum buffer instead.
// More efficient and saves room for other request/response tasks.
//peer.getResponse(pingMessage);
peer.sendMessage(pingMessage);