PeerSendManagement support for sending all messages through a queue

This commit is contained in:
kennycud 2025-07-12 14:02:19 -07:00
parent 5fabc7792c
commit cea63e7ec7
4 changed files with 77 additions and 38 deletions

View File

@ -13,6 +13,7 @@ import org.qortal.data.network.PeerData;
import org.qortal.data.transaction.ArbitraryTransactionData;
import org.qortal.network.Network;
import org.qortal.network.Peer;
import org.qortal.network.PeerSendManagement;
import org.qortal.network.message.*;
import org.qortal.repository.DataException;
import org.qortal.repository.Repository;
@ -31,10 +32,9 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.qortal.network.PeerSendManager;
public class ArbitraryDataFileManager extends Thread {
public static final int SEND_TIMEOUT_MS = 500;
private static final Logger LOGGER = LogManager.getLogger(ArbitraryDataFileManager.class);
private static ArbitraryDataFileManager instance;
@ -70,38 +70,10 @@ public class ArbitraryDataFileManager extends Thread {
public static int MAX_FILE_HASH_RESPONSES = 1000;
private final Map<Peer, PeerSendManager> peerSendManagers = new ConcurrentHashMap<>();
private PeerSendManager getOrCreateSendManager(Peer peer) {
return peerSendManagers.computeIfAbsent(peer, p -> new PeerSendManager(p));
}
private ArbitraryDataFileManager() {
this.arbitraryDataFileHashResponseScheduler.scheduleAtFixedRate( this::processResponses, 60, 1, TimeUnit.SECONDS);
this.arbitraryDataFileHashResponseScheduler.scheduleAtFixedRate(this::handleFileListRequestProcess, 60, 1, TimeUnit.SECONDS);
ScheduledExecutorService cleaner = Executors.newSingleThreadScheduledExecutor();
cleaner.scheduleAtFixedRate(() -> {
long idleCutoff = TimeUnit.MINUTES.toMillis(2);
Iterator<Map.Entry<Peer, PeerSendManager>> iterator = peerSendManagers.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Peer, PeerSendManager> entry = iterator.next();
Peer peer = entry.getKey();
PeerSendManager manager = entry.getValue();
if (manager.isIdle(idleCutoff)) {
iterator.remove(); // SAFE removal during iteration
manager.shutdown();
LOGGER.debug("Cleaned up PeerSendManager for peer {}", peer);
}
}
}, 0, 5, TimeUnit.MINUTES);
}
public static ArbitraryDataFileManager getInstance() {
@ -406,7 +378,7 @@ private PeerSendManager getOrCreateSendManager(Peer peer) {
// The ID needs to match that of the original request
message.setId(originalMessage.getId());
getOrCreateSendManager(requestingPeer).queueMessage(message);
PeerSendManagement.getInstance().getOrCreateSendManager(requestingPeer).queueMessage(message, SEND_TIMEOUT_MS);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
@ -684,7 +656,7 @@ private PeerSendManager getOrCreateSendManager(Peer peer) {
ArbitraryDataFileMessage arbitraryDataFileMessage = new ArbitraryDataFileMessage(signature, arbitraryDataFile);
arbitraryDataFileMessage.setId(message.getId());
getOrCreateSendManager(peer).queueMessage(arbitraryDataFileMessage);
PeerSendManagement.getInstance().getOrCreateSendManager(peer).queueMessage(arbitraryDataFileMessage, SEND_TIMEOUT_MS);
}
else if (relayInfo != null) {

View File

@ -736,6 +736,11 @@ public class Peer {
* @return <code>true</code> if message successfully sent; <code>false</code> otherwise
*/
public boolean sendMessageWithTimeout(Message message, int timeout) {
return PeerSendManagement.getInstance().getOrCreateSendManager(this).queueMessage(message, timeout);
}
public boolean sendMessageWithTimeoutNow(Message message, int timeout) {
if (!this.socketChannel.isOpen()) {
return false;
}

View File

@ -0,0 +1,55 @@
package org.qortal.network;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class PeerSendManagement {
private static final Logger LOGGER = LogManager.getLogger(PeerSendManagement.class);
private final Map<String, PeerSendManager> peerSendManagers = new ConcurrentHashMap<>();
public PeerSendManager getOrCreateSendManager(Peer peer) {
return peerSendManagers.computeIfAbsent(peer.toString(), p -> new PeerSendManager(peer));
}
private PeerSendManagement() {
ScheduledExecutorService cleaner = Executors.newSingleThreadScheduledExecutor();
cleaner.scheduleAtFixedRate(() -> {
long idleCutoff = TimeUnit.MINUTES.toMillis(2);
Iterator<Map.Entry<String, PeerSendManager>> iterator = peerSendManagers.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, PeerSendManager> entry = iterator.next();
PeerSendManager manager = entry.getValue();
if (manager.isIdle(idleCutoff)) {
iterator.remove(); // SAFE removal during iteration
manager.shutdown();
LOGGER.debug("Cleaned up PeerSendManager for peer {}", entry.getKey());
}
}
}, 0, 5, TimeUnit.MINUTES);
}
private static PeerSendManagement instance;
public static PeerSendManagement getInstance() {
if( instance == null ) {
instance = new PeerSendManagement();
}
return instance;
}
}

View File

@ -12,7 +12,6 @@ public class PeerSendManager {
private static final int MAX_FAILURES = 15;
private static final int MAX_MESSAGE_ATTEMPTS = 2;
private static final int SEND_TIMEOUT_MS = 500;
private static final int RETRY_DELAY_MS = 100;
private static final long MAX_QUEUE_DURATION_MS = 20_000;
private static final long COOLDOWN_DURATION_MS = 20_000;
@ -49,11 +48,12 @@ public class PeerSendManager {
}
Message message = timedMessage.message;
int timeout = timedMessage.timeout;
boolean success = false;
for (int attempt = 1; attempt <= MAX_MESSAGE_ATTEMPTS; attempt++) {
try {
if (peer.sendMessageWithTimeout(message, SEND_TIMEOUT_MS)) {
if (peer.sendMessageWithTimeoutNow(message, timeout)) {
success = true;
failureCount.set(0); // reset on success
break;
@ -98,16 +98,21 @@ public class PeerSendManager {
});
}
public void queueMessage(Message message) {
public boolean queueMessage(Message message, int timeout) {
if (coolingDown) {
LOGGER.debug("In cooldown, ignoring message {}", message.getId());
return;
return false;
}
lastUsed = System.currentTimeMillis();
if (!queue.offer(new TimedMessage(message))) {
if (!queue.offer(new TimedMessage(message, timeout))) {
LOGGER.debug("Send queue full, dropping message {}", message.getId());
return false;
}
return true;
}
public boolean isIdle(long cutoffMillis) {
@ -122,10 +127,12 @@ public class PeerSendManager {
private static class TimedMessage {
final Message message;
final long timestamp;
final int timeout;
TimedMessage(Message message) {
TimedMessage(Message message, int timeout) {
this.message = message;
this.timestamp = System.currentTimeMillis();
this.timeout = timeout;
}
}
}