diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java index 5407803c..1d9a3601 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java @@ -13,6 +13,7 @@ import org.qortal.data.network.PeerData; import org.qortal.data.transaction.ArbitraryTransactionData; import org.qortal.network.Network; import org.qortal.network.Peer; +import org.qortal.network.PeerSendManagement; import org.qortal.network.message.*; import org.qortal.repository.DataException; import org.qortal.repository.Repository; @@ -31,10 +32,9 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import org.qortal.network.PeerSendManager; - public class ArbitraryDataFileManager extends Thread { + public static final int SEND_TIMEOUT_MS = 500; private static final Logger LOGGER = LogManager.getLogger(ArbitraryDataFileManager.class); private static ArbitraryDataFileManager instance; @@ -70,38 +70,10 @@ public class ArbitraryDataFileManager extends Thread { public static int MAX_FILE_HASH_RESPONSES = 1000; -private final Map peerSendManagers = new ConcurrentHashMap<>(); - -private PeerSendManager getOrCreateSendManager(Peer peer) { - return peerSendManagers.computeIfAbsent(peer, p -> new PeerSendManager(p)); -} - - - - private ArbitraryDataFileManager() { this.arbitraryDataFileHashResponseScheduler.scheduleAtFixedRate( this::processResponses, 60, 1, TimeUnit.SECONDS); this.arbitraryDataFileHashResponseScheduler.scheduleAtFixedRate(this::handleFileListRequestProcess, 60, 1, TimeUnit.SECONDS); - ScheduledExecutorService cleaner = Executors.newSingleThreadScheduledExecutor(); - - cleaner.scheduleAtFixedRate(() -> { - long idleCutoff = TimeUnit.MINUTES.toMillis(2); - Iterator> iterator = peerSendManagers.entrySet().iterator(); - - while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - Peer peer = entry.getKey(); - PeerSendManager manager = entry.getValue(); - - if (manager.isIdle(idleCutoff)) { - iterator.remove(); // SAFE removal during iteration - manager.shutdown(); - LOGGER.debug("Cleaned up PeerSendManager for peer {}", peer); - } - } - }, 0, 5, TimeUnit.MINUTES); - } public static ArbitraryDataFileManager getInstance() { @@ -406,7 +378,7 @@ private PeerSendManager getOrCreateSendManager(Peer peer) { // The ID needs to match that of the original request message.setId(originalMessage.getId()); - getOrCreateSendManager(requestingPeer).queueMessage(message); + PeerSendManagement.getInstance().getOrCreateSendManager(requestingPeer).queueMessage(message, SEND_TIMEOUT_MS); } catch (Exception e) { LOGGER.error(e.getMessage(), e); @@ -684,7 +656,7 @@ private PeerSendManager getOrCreateSendManager(Peer peer) { ArbitraryDataFileMessage arbitraryDataFileMessage = new ArbitraryDataFileMessage(signature, arbitraryDataFile); arbitraryDataFileMessage.setId(message.getId()); - getOrCreateSendManager(peer).queueMessage(arbitraryDataFileMessage); + PeerSendManagement.getInstance().getOrCreateSendManager(peer).queueMessage(arbitraryDataFileMessage, SEND_TIMEOUT_MS); } else if (relayInfo != null) { diff --git a/src/main/java/org/qortal/network/Peer.java b/src/main/java/org/qortal/network/Peer.java index 6d22be3e..c783847d 100644 --- a/src/main/java/org/qortal/network/Peer.java +++ b/src/main/java/org/qortal/network/Peer.java @@ -736,6 +736,11 @@ public class Peer { * @return true if message successfully sent; false otherwise */ public boolean sendMessageWithTimeout(Message message, int timeout) { + + return PeerSendManagement.getInstance().getOrCreateSendManager(this).queueMessage(message, timeout); + } + + public boolean sendMessageWithTimeoutNow(Message message, int timeout) { if (!this.socketChannel.isOpen()) { return false; } diff --git a/src/main/java/org/qortal/network/PeerSendManagement.java b/src/main/java/org/qortal/network/PeerSendManagement.java new file mode 100644 index 00000000..d09ca69e --- /dev/null +++ b/src/main/java/org/qortal/network/PeerSendManagement.java @@ -0,0 +1,55 @@ +package org.qortal.network; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class PeerSendManagement { + + private static final Logger LOGGER = LogManager.getLogger(PeerSendManagement.class); + + private final Map peerSendManagers = new ConcurrentHashMap<>(); + + public PeerSendManager getOrCreateSendManager(Peer peer) { + return peerSendManagers.computeIfAbsent(peer.toString(), p -> new PeerSendManager(peer)); + } + + private PeerSendManagement() { + + ScheduledExecutorService cleaner = Executors.newSingleThreadScheduledExecutor(); + + cleaner.scheduleAtFixedRate(() -> { + long idleCutoff = TimeUnit.MINUTES.toMillis(2); + Iterator> iterator = peerSendManagers.entrySet().iterator(); + + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + + PeerSendManager manager = entry.getValue(); + + if (manager.isIdle(idleCutoff)) { + iterator.remove(); // SAFE removal during iteration + manager.shutdown(); + LOGGER.debug("Cleaned up PeerSendManager for peer {}", entry.getKey()); + } + } + }, 0, 5, TimeUnit.MINUTES); + } + + private static PeerSendManagement instance; + + public static PeerSendManagement getInstance() { + + if( instance == null ) { + instance = new PeerSendManagement(); + } + + return instance; + } +} diff --git a/src/main/java/org/qortal/network/PeerSendManager.java b/src/main/java/org/qortal/network/PeerSendManager.java index 7f9a6fc9..b9922287 100644 --- a/src/main/java/org/qortal/network/PeerSendManager.java +++ b/src/main/java/org/qortal/network/PeerSendManager.java @@ -12,7 +12,6 @@ public class PeerSendManager { private static final int MAX_FAILURES = 15; private static final int MAX_MESSAGE_ATTEMPTS = 2; - private static final int SEND_TIMEOUT_MS = 500; private static final int RETRY_DELAY_MS = 100; private static final long MAX_QUEUE_DURATION_MS = 20_000; private static final long COOLDOWN_DURATION_MS = 20_000; @@ -49,11 +48,12 @@ public class PeerSendManager { } Message message = timedMessage.message; + int timeout = timedMessage.timeout; boolean success = false; for (int attempt = 1; attempt <= MAX_MESSAGE_ATTEMPTS; attempt++) { try { - if (peer.sendMessageWithTimeout(message, SEND_TIMEOUT_MS)) { + if (peer.sendMessageWithTimeoutNow(message, timeout)) { success = true; failureCount.set(0); // reset on success break; @@ -98,16 +98,21 @@ public class PeerSendManager { }); } - public void queueMessage(Message message) { + public boolean queueMessage(Message message, int timeout) { if (coolingDown) { LOGGER.debug("In cooldown, ignoring message {}", message.getId()); - return; + + return false; } lastUsed = System.currentTimeMillis(); - if (!queue.offer(new TimedMessage(message))) { + if (!queue.offer(new TimedMessage(message, timeout))) { LOGGER.debug("Send queue full, dropping message {}", message.getId()); + + return false; } + + return true; } public boolean isIdle(long cutoffMillis) { @@ -122,10 +127,12 @@ public class PeerSendManager { private static class TimedMessage { final Message message; final long timestamp; + final int timeout; - TimedMessage(Message message) { + TimedMessage(Message message, int timeout) { this.message = message; this.timestamp = System.currentTimeMillis(); + this.timeout = timeout; } } }