PeerSendManger added

This commit is contained in:
PhilReact 2025-07-10 12:52:43 +03:00
parent 086ed6574f
commit f7cb4ce264
8 changed files with 316 additions and 38 deletions

View File

@ -124,7 +124,7 @@ public class ArbitraryDataFileListManager {
if (timeSinceLastAttempt > 15 * 1000L) {
// We haven't tried for at least 15 seconds
if (networkBroadcastCount < 3) {
if (networkBroadcastCount < 12) {
// We've made less than 3 total attempts
return true;
}
@ -134,7 +134,7 @@ public class ArbitraryDataFileListManager {
if (timeSinceLastAttempt > 60 * 1000L) {
// We haven't tried for at least 1 minute
if (networkBroadcastCount < 8) {
if (networkBroadcastCount < 40) {
// We've made less than 8 total attempts
return true;
}
@ -588,7 +588,7 @@ public class ArbitraryDataFileListManager {
// Forward to requesting peer
LOGGER.debug("Forwarding file list with {} hashes to requesting peer: {}", hashes.size(), requestingPeer);
if (!requestingPeer.sendMessage(forwardArbitraryDataFileListMessage)) {
requestingPeer.disconnect("failed to forward arbitrary data file list");
// requestingPeer.disconnect("failed to forward arbitrary data file list");
}
}
}
@ -787,7 +787,7 @@ public class ArbitraryDataFileListManager {
if (!peer.sendMessage(arbitraryDataFileListMessage)) {
LOGGER.debug("Couldn't send list of hashes");
peer.disconnect("failed to send list of hashes");
// peer.disconnect("failed to send list of hashes");
continue;
}

View File

@ -1,6 +1,7 @@
package org.qortal.controller.arbitrary;
import com.google.common.net.InetAddresses;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.qortal.arbitrary.ArbitraryDataFile;
@ -23,11 +24,15 @@ import org.qortal.utils.NTP;
import java.security.SecureRandom;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.qortal.network.PeerSendManager;
public class ArbitraryDataFileManager extends Thread {
private static final Logger LOGGER = LogManager.getLogger(ArbitraryDataFileManager.class);
@ -65,11 +70,40 @@ public class ArbitraryDataFileManager extends Thread {
public static int MAX_FILE_HASH_RESPONSES = 1000;
private final Map<Peer, PeerSendManager> peerSendManagers = new ConcurrentHashMap<>();
private PeerSendManager getOrCreateSendManager(Peer peer) {
return peerSendManagers.computeIfAbsent(peer, p -> new PeerSendManager(p));
}
public void queueFileSendToPeer(Peer peer, Message fileMessage) {
getOrCreateSendManager(peer).queueMessage(fileMessage);
}
private ArbitraryDataFileManager() {
this.arbitraryDataFileHashResponseScheduler.scheduleAtFixedRate( this::processResponses, 60, 1, TimeUnit.SECONDS);
this.arbitraryDataFileHashResponseScheduler.scheduleAtFixedRate(this::handleFileListRequestProcess, 60, 1, TimeUnit.SECONDS);
ScheduledExecutorService cleaner = Executors.newSingleThreadScheduledExecutor();
cleaner.scheduleAtFixedRate(() -> {
long idleCutoff = TimeUnit.MINUTES.toMillis(2);
Iterator<Map.Entry<Peer, PeerSendManager>> iterator = peerSendManagers.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Peer, PeerSendManager> entry = iterator.next();
Peer peer = entry.getKey();
PeerSendManager manager = entry.getValue();
if (manager.isIdle(idleCutoff)) {
iterator.remove(); // SAFE removal during iteration
manager.shutdown();
LOGGER.debug("Cleaned up PeerSendManager for peer {}", peer);
}
}
}, 0, 30, TimeUnit.MINUTES);
}
public static ArbitraryDataFileManager getInstance() {
@ -79,6 +113,8 @@ public class ArbitraryDataFileManager extends Thread {
return instance;
}
@Override
public void run() {
Thread.currentThread().setName("Arbitrary Data File Manager");
@ -231,7 +267,7 @@ public class ArbitraryDataFileManager extends Thread {
LOGGER.trace(String.format("Removed hash %.8s from arbitraryDataFileRequests", hash58));
if (response == null) {
LOGGER.debug("Received null response from peer {}", peer);
LOGGER.info("Received null response from peer {}", peer);
return null;
}
if (response.getType() != MessageType.ARBITRARY_DATA_FILE) {
@ -374,13 +410,16 @@ public class ArbitraryDataFileManager extends Thread {
// The ID needs to match that of the original request
message.setId(originalMessage.getId());
if (!requestingPeer.sendMessageWithTimeout(message, (int) ArbitraryDataManager.ARBITRARY_REQUEST_TIMEOUT)) {
LOGGER.info("Failed to forward arbitrary data file to peer {}", requestingPeer);
requestingPeer.disconnect("failed to forward arbitrary data file");
}
else {
LOGGER.info("Forwarded arbitrary data file to peer {}", requestingPeer);
}
// if (!requestingPeer.sendMessageWithTimeout(message, (int) ArbitraryDataManager.ARBITRARY_REQUEST_TIMEOUT)) {
// LOGGER.info("Failed to forward arbitrary data file to peer {}", requestingPeer);
// requestingPeer.disconnect("failed to forward arbitrary data file");
// }
// else {
// LOGGER.info("Forwarded arbitrary data file to peer {}", requestingPeer);
// }
getOrCreateSendManager(requestingPeer).queueMessage(message);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
@ -643,7 +682,7 @@ public class ArbitraryDataFileManager extends Thread {
byte[] signature = getArbitraryDataFileMessage.getSignature();
Controller.getInstance().stats.getArbitraryDataFileMessageStats.requests.incrementAndGet();
LOGGER.debug("Received GetArbitraryDataFileMessage from peer {} for hash {}", peer, Base58.encode(hash));
LOGGER.info("Received GetArbitraryDataFileMessage from peer {} for hash {}", peer, Base58.encode(hash));
try {
ArbitraryDataFile arbitraryDataFile = ArbitraryDataFile.fromHash(hash, signature);
@ -656,13 +695,15 @@ public class ArbitraryDataFileManager extends Thread {
LOGGER.debug("Sending file {}...", arbitraryDataFile);
ArbitraryDataFileMessage arbitraryDataFileMessage = new ArbitraryDataFileMessage(signature, arbitraryDataFile);
arbitraryDataFileMessage.setId(message.getId());
if (!peer.sendMessageWithTimeout(arbitraryDataFileMessage, (int) ArbitraryDataManager.ARBITRARY_REQUEST_TIMEOUT)) {
LOGGER.debug("Couldn't send file {}", arbitraryDataFile);
peer.disconnect("failed to send file");
}
else {
LOGGER.debug("Sent file {}", arbitraryDataFile);
}
// if (!peer.sendMessageWithTimeout(arbitraryDataFileMessage, (int) ArbitraryDataManager.ARBITRARY_REQUEST_TIMEOUT)) {
// LOGGER.info("Couldn't send file {}", arbitraryDataFile);
// // peer.disconnect("failed to send file");
// }
// else {
// LOGGER.debug("Sent file {}", arbitraryDataFile);
// }
getOrCreateSendManager(peer).queueMessage(arbitraryDataFileMessage);
}
else if (relayInfo != null) {
LOGGER.debug("We have relay info for hash {}", Base58.encode(hash));
@ -696,7 +737,7 @@ public class ArbitraryDataFileManager extends Thread {
fileUnknownMessage.setId(message.getId());
if (!peer.sendMessage(fileUnknownMessage)) {
LOGGER.debug("Couldn't sent file-unknown response");
peer.disconnect("failed to send file-unknown response");
// peer.disconnect("failed to send file-unknown response");
}
else {
LOGGER.debug("Sent file-unknown response for file {}", arbitraryDataFile);

View File

@ -45,7 +45,7 @@ public class ArbitraryDataManager extends Thread {
public static final long ARBITRARY_REQUEST_TIMEOUT = 12 * 1000L; // ms
/** Maximum time to hold information about an in-progress relay */
public static final long ARBITRARY_RELAY_TIMEOUT = 90 * 1000L; // ms
public static final long ARBITRARY_RELAY_TIMEOUT = 120 * 1000L; // ms
/** Maximum time to hold direct peer connection information */
public static final long ARBITRARY_DIRECT_CONNECTION_INFO_TIMEOUT = 2 * 60 * 1000L; // ms

View File

@ -361,7 +361,7 @@ public class ArbitraryMetadataManager {
// Forward to requesting peer
LOGGER.debug("Forwarding metadata to requesting peer: {}", requestingPeer);
if (!requestingPeer.sendMessage(forwardArbitraryMetadataMessage)) {
requestingPeer.disconnect("failed to forward arbitrary metadata");
// requestingPeer.disconnect("failed to forward arbitrary metadata");
}
}
}
@ -479,7 +479,7 @@ public class ArbitraryMetadataManager {
arbitraryMetadataMessage.setId(message.getId());
if (!peer.sendMessage(arbitraryMetadataMessage)) {
LOGGER.debug("Couldn't send metadata");
peer.disconnect("failed to send metadata");
// peer.disconnect("failed to send metadata");
continue;
}
LOGGER.debug("Sent metadata");

View File

@ -0,0 +1,100 @@
package org.qortal.data.arbitrary;
import org.qortal.network.Peer;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.*;
public class FileFetchPeerStatsManager {
public static class FilePeerStats {
private static final int MAX_HISTORY = 20;
private static final int MIN_REQUIRED_ATTEMPTS = 10;
private final Deque<Boolean> resultHistory = new ArrayDeque<>(MAX_HISTORY);
private long lastUsed = System.currentTimeMillis();
public synchronized void recordResult(boolean success) {
if (resultHistory.size() >= MAX_HISTORY) {
resultHistory.removeFirst();
}
resultHistory.addLast(success);
lastUsed = System.currentTimeMillis();
}
public synchronized double getSuccessRate() {
if (resultHistory.isEmpty()) return 1.0;
long successCount = resultHistory.stream().filter(b -> b).count();
return (double) successCount / resultHistory.size();
}
public synchronized boolean hasEnoughHistory() {
return resultHistory.size() >= MIN_REQUIRED_ATTEMPTS;
}
public synchronized long getLastUsed() {
return lastUsed;
}
}
private final ConcurrentMap<String, FilePeerStats> statsMap = new ConcurrentHashMap<>();
private final long ttlMillis;
private final ScheduledExecutorService cleanupScheduler;
public FileFetchPeerStatsManager(long ttlMillis) {
this.ttlMillis = ttlMillis;
this.cleanupScheduler = Executors.newSingleThreadScheduledExecutor();
startCleanupTask();
}
private String makeKey(String signature58, Peer peer, int hops) {
return signature58 + "|" + peer.toString() + "|hops=" + hops;
}
public void recordSuccess(String signature58, Peer peer, int hops) {
getOrCreateStats(signature58, peer, hops).recordResult(true);
}
public void recordFailure(String signature58, Peer peer, int hops) {
getOrCreateStats(signature58, peer, hops).recordResult(false);
}
private FilePeerStats getOrCreateStats(String signature58, Peer peer, int hops) {
String key = makeKey(signature58, peer, hops);
return statsMap.computeIfAbsent(key, k -> new FilePeerStats());
}
public FilePeerStats getStats(String signature58, Peer peer, int hops) {
String key = makeKey(signature58, peer, hops);
return statsMap.computeIfAbsent(key, k -> new FilePeerStats());
}
public double getSuccessRate(String signature58, Peer peer, int hops) {
return getStats(signature58, peer, hops).getSuccessRate();
}
public boolean hasEnoughHistory(String signature58, Peer peer, int hops) {
return getStats(signature58, peer, hops).hasEnoughHistory();
}
public void clearStatsForSignature(String signature58) {
statsMap.keySet().removeIf(key -> key.startsWith(signature58 + "|"));
}
private void startCleanupTask() {
cleanupScheduler.scheduleAtFixedRate(() -> {
try {
long now = System.currentTimeMillis();
statsMap.entrySet().removeIf(entry -> now - entry.getValue().getLastUsed() > ttlMillis);
} catch (Exception e) {
System.err.println("Error during FilePeerStats cleanup: " + e.getMessage());
e.printStackTrace();
}
}, 1, 1, TimeUnit.MINUTES);
}
public void shutdown() {
cleanupScheduler.shutdownNow();
}
}

View File

@ -640,10 +640,13 @@ public class Peer {
return false;
try {
this.outputBuffer = ByteBuffer.wrap(message.toBytes());
byte[] messageBytes = message.toBytes();
this.outputBuffer = ByteBuffer.wrap(messageBytes);
this.outputMessageType = message.getType().name();
this.outputMessageId = message.getId();
LOGGER.trace("[{}] Sending {} message with ID {} to peer {}",
this.peerConnectionId, this.outputMessageType, this.outputMessageId, this);
@ -661,14 +664,31 @@ public class Peer {
// If output byte buffer is not null, send from that
int bytesWritten = this.socketChannel.write(outputBuffer);
LOGGER.trace("[{}] Sent {} bytes of {} message with ID {} to peer {} ({} total)", this.peerConnectionId,
bytesWritten, this.outputMessageType, this.outputMessageId, this, outputBuffer.limit());
// If we've sent 0 bytes then socket buffer is full so we need to wait until it's empty again
if (bytesWritten == 0) {
return true;
}
if ("ARBITRARY_DATA_FILE".equals(this.outputMessageType)) {
LOGGER.info("[{}] Sent {} bytes of {} message with ID {} to peer {} ({} total)",
this.peerConnectionId,
bytesWritten,
this.outputMessageType,
this.outputMessageId,
this,
outputBuffer.limit());
}
int zeroSendCount = 0;
while (bytesWritten == 0) {
if (zeroSendCount > 9) {
LOGGER.warn("Socket write stuck for too long, returning");
return true;
}
try {
Thread.sleep(10); // 10MS CPU Sleep to try and give it time to flush the socket
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false; // optional, if you want to signal shutdown
}
zeroSendCount++;
bytesWritten = this.socketChannel.write(outputBuffer);
}
// If we then exhaust the byte buffer, set it to null (otherwise loop and try to send more)
if (!this.outputBuffer.hasRemaining()) {
@ -729,7 +749,7 @@ public class Peer {
try {
// Queue message, to be picked up by ChannelWriteTask and then peer.writeChannel()
LOGGER.trace("[{}] Queuing {} message with ID {} to peer {}", this.peerConnectionId,
LOGGER.debug("[{}] Queuing {} message with ID {} to peer {}", this.peerConnectionId,
message.getType().name(), message.getId(), this);
// Check message properly constructed

View File

@ -0,0 +1,110 @@
package org.qortal.network;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.qortal.network.message.Message;
public class PeerSendManager {
private static final Logger LOGGER = LogManager.getLogger(PeerSendManager.class);
private static final int MAX_RETRIES = 15;
private static final int BASE_RETRY_DELAY_MS = 100;
private final BlockingQueue<Message> queue = new LinkedBlockingQueue<>();
private final Peer peer;
private static final AtomicInteger threadCount = new AtomicInteger(1);
private final ExecutorService executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("PeerSendManager-" + peer.getResolvedAddress().getHostString() + "-" + threadCount.getAndIncrement());
return t;
}
});
public PeerSendManager(Peer peer) {
this.peer = peer;
start();
}
private void start() {
executor.submit(() -> {
while (true) {
try {
Message message = queue.take(); // Blocks until available
boolean success = false;
int attempt = 0;
while (attempt < MAX_RETRIES) {
try {
if (peer.sendMessageWithTimeout(message, 5000)) {
success = true;
break;
}
} catch (Exception e) {
LOGGER.warn("Send attempt {} failed for {} message ID {} to peer {}: {}",
attempt + 1,
message.getType().name(),
message.getId(),
peer,
e.getMessage());
}
attempt++;
try {
long delay = Math.min(BASE_RETRY_DELAY_MS * (1L << attempt), 2000);
Thread.sleep(delay);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
if (!success) {
LOGGER.warn("Failed to send {} message ID {} to peer {} after {} attempts. Disconnecting...",
message.getType().name(),
message.getId(),
peer,
MAX_RETRIES);
peer.disconnect("SendMessage retries exceeded");
queue.clear();
break;
}
// Throttle after successful send
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
LOGGER.warn("Unexpected error in PeerSendManager for peer {}: {}", peer, e.getMessage());
}
}
});
}
private volatile long lastUsed = System.currentTimeMillis();
public void queueMessage(Message message) {
lastUsed = System.currentTimeMillis();
this.queue.offer(message);
}
public boolean isIdle(long cutoffMillis) {
return System.currentTimeMillis() - lastUsed > cutoffMillis;
}
public void shutdown() {
queue.clear();
executor.shutdownNow();
}
}

View File

@ -31,11 +31,16 @@ public class ChannelWriteTask implements Task {
@Override
public void perform() throws InterruptedException {
try {
boolean isSocketClogged;
boolean isSocketClogged;
int clogCounter = 0;
do {
isSocketClogged = peer.writeChannel();
if (clogCounter > 9) {
LOGGER.warn("10 Socket Clogs - GIVING UP");
break;
}
if (isSocketClogged) {
LOGGER.info(
"socket is clogged: peer = {} {}, retrying",
@ -43,9 +48,11 @@ public class ChannelWriteTask implements Task {
Thread.currentThread().getName()
);
Thread.sleep(1000);
clogCounter++;
}
} while( isSocketClogged );
// Tell Network that we've finished
Network.getInstance().notifyChannelNotWriting(socketChannel);
@ -62,4 +69,4 @@ public class ChannelWriteTask implements Task {
peer.disconnect("I/O error");
}
}
}
}