mirror of
https://github.com/Qortal/qortal.git
synced 2025-07-19 19:01:22 +00:00
commit
c19cad020e
31563
qortal.log
Normal file
31563
qortal.log
Normal file
File diff suppressed because it is too large
Load Diff
@ -124,8 +124,8 @@ public class ArbitraryDataFileListManager {
|
||||
if (timeSinceLastAttempt > 15 * 1000L) {
|
||||
// We haven't tried for at least 15 seconds
|
||||
|
||||
if (networkBroadcastCount < 3) {
|
||||
// We've made less than 3 total attempts
|
||||
if (networkBroadcastCount < 12) {
|
||||
// We've made less than 12 total attempts
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@ -134,8 +134,8 @@ public class ArbitraryDataFileListManager {
|
||||
if (timeSinceLastAttempt > 60 * 1000L) {
|
||||
// We haven't tried for at least 1 minute
|
||||
|
||||
if (networkBroadcastCount < 8) {
|
||||
// We've made less than 8 total attempts
|
||||
if (networkBroadcastCount < 40) {
|
||||
// We've made less than 40 total attempts
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@ -587,9 +587,7 @@ public class ArbitraryDataFileListManager {
|
||||
|
||||
// Forward to requesting peer
|
||||
LOGGER.debug("Forwarding file list with {} hashes to requesting peer: {}", hashes.size(), requestingPeer);
|
||||
if (!requestingPeer.sendMessage(forwardArbitraryDataFileListMessage)) {
|
||||
requestingPeer.disconnect("failed to forward arbitrary data file list");
|
||||
}
|
||||
requestingPeer.sendMessage(forwardArbitraryDataFileListMessage);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -787,7 +785,6 @@ public class ArbitraryDataFileListManager {
|
||||
|
||||
if (!peer.sendMessage(arbitraryDataFileListMessage)) {
|
||||
LOGGER.debug("Couldn't send list of hashes");
|
||||
peer.disconnect("failed to send list of hashes");
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -1,6 +1,7 @@
|
||||
package org.qortal.controller.arbitrary;
|
||||
|
||||
import com.google.common.net.InetAddresses;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.qortal.arbitrary.ArbitraryDataFile;
|
||||
@ -23,11 +24,15 @@ import org.qortal.utils.NTP;
|
||||
|
||||
import java.security.SecureRandom;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.qortal.network.PeerSendManager;
|
||||
|
||||
public class ArbitraryDataFileManager extends Thread {
|
||||
|
||||
private static final Logger LOGGER = LogManager.getLogger(ArbitraryDataFileManager.class);
|
||||
@ -65,11 +70,38 @@ public class ArbitraryDataFileManager extends Thread {
|
||||
|
||||
|
||||
public static int MAX_FILE_HASH_RESPONSES = 1000;
|
||||
private final Map<Peer, PeerSendManager> peerSendManagers = new ConcurrentHashMap<>();
|
||||
|
||||
private PeerSendManager getOrCreateSendManager(Peer peer) {
|
||||
return peerSendManagers.computeIfAbsent(peer, p -> new PeerSendManager(p));
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
private ArbitraryDataFileManager() {
|
||||
this.arbitraryDataFileHashResponseScheduler.scheduleAtFixedRate( this::processResponses, 60, 1, TimeUnit.SECONDS);
|
||||
this.arbitraryDataFileHashResponseScheduler.scheduleAtFixedRate(this::handleFileListRequestProcess, 60, 1, TimeUnit.SECONDS);
|
||||
ScheduledExecutorService cleaner = Executors.newSingleThreadScheduledExecutor();
|
||||
|
||||
cleaner.scheduleAtFixedRate(() -> {
|
||||
long idleCutoff = TimeUnit.MINUTES.toMillis(2);
|
||||
Iterator<Map.Entry<Peer, PeerSendManager>> iterator = peerSendManagers.entrySet().iterator();
|
||||
|
||||
while (iterator.hasNext()) {
|
||||
Map.Entry<Peer, PeerSendManager> entry = iterator.next();
|
||||
Peer peer = entry.getKey();
|
||||
PeerSendManager manager = entry.getValue();
|
||||
|
||||
if (manager.isIdle(idleCutoff)) {
|
||||
iterator.remove(); // SAFE removal during iteration
|
||||
manager.shutdown();
|
||||
LOGGER.debug("Cleaned up PeerSendManager for peer {}", peer);
|
||||
}
|
||||
}
|
||||
}, 0, 5, TimeUnit.MINUTES);
|
||||
|
||||
}
|
||||
|
||||
public static ArbitraryDataFileManager getInstance() {
|
||||
@ -79,6 +111,8 @@ public class ArbitraryDataFileManager extends Thread {
|
||||
return instance;
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
Thread.currentThread().setName("Arbitrary Data File Manager");
|
||||
@ -374,13 +408,8 @@ public class ArbitraryDataFileManager extends Thread {
|
||||
// The ID needs to match that of the original request
|
||||
message.setId(originalMessage.getId());
|
||||
|
||||
if (!requestingPeer.sendMessageWithTimeout(message, (int) ArbitraryDataManager.ARBITRARY_REQUEST_TIMEOUT)) {
|
||||
LOGGER.info("Failed to forward arbitrary data file to peer {}", requestingPeer);
|
||||
requestingPeer.disconnect("failed to forward arbitrary data file");
|
||||
}
|
||||
else {
|
||||
LOGGER.info("Forwarded arbitrary data file to peer {}", requestingPeer);
|
||||
}
|
||||
getOrCreateSendManager(requestingPeer).queueMessage(message);
|
||||
|
||||
} catch (Exception e) {
|
||||
LOGGER.error(e.getMessage(), e);
|
||||
}
|
||||
@ -656,13 +685,9 @@ public class ArbitraryDataFileManager extends Thread {
|
||||
LOGGER.debug("Sending file {}...", arbitraryDataFile);
|
||||
ArbitraryDataFileMessage arbitraryDataFileMessage = new ArbitraryDataFileMessage(signature, arbitraryDataFile);
|
||||
arbitraryDataFileMessage.setId(message.getId());
|
||||
if (!peer.sendMessageWithTimeout(arbitraryDataFileMessage, (int) ArbitraryDataManager.ARBITRARY_REQUEST_TIMEOUT)) {
|
||||
LOGGER.debug("Couldn't send file {}", arbitraryDataFile);
|
||||
peer.disconnect("failed to send file");
|
||||
}
|
||||
else {
|
||||
LOGGER.debug("Sent file {}", arbitraryDataFile);
|
||||
}
|
||||
|
||||
getOrCreateSendManager(peer).queueMessage(arbitraryDataFileMessage);
|
||||
|
||||
}
|
||||
else if (relayInfo != null) {
|
||||
LOGGER.debug("We have relay info for hash {}", Base58.encode(hash));
|
||||
@ -696,7 +721,6 @@ public class ArbitraryDataFileManager extends Thread {
|
||||
fileUnknownMessage.setId(message.getId());
|
||||
if (!peer.sendMessage(fileUnknownMessage)) {
|
||||
LOGGER.debug("Couldn't sent file-unknown response");
|
||||
peer.disconnect("failed to send file-unknown response");
|
||||
}
|
||||
else {
|
||||
LOGGER.debug("Sent file-unknown response for file {}", arbitraryDataFile);
|
||||
|
@ -42,10 +42,10 @@ public class ArbitraryDataManager extends Thread {
|
||||
private int powDifficulty = 14; // Must not be final, as unit tests need to reduce this value
|
||||
|
||||
/** Request timeout when transferring arbitrary data */
|
||||
public static final long ARBITRARY_REQUEST_TIMEOUT = 12 * 1000L; // ms
|
||||
public static final long ARBITRARY_REQUEST_TIMEOUT = 24 * 1000L; // ms
|
||||
|
||||
/** Maximum time to hold information about an in-progress relay */
|
||||
public static final long ARBITRARY_RELAY_TIMEOUT = 90 * 1000L; // ms
|
||||
public static final long ARBITRARY_RELAY_TIMEOUT = 120 * 1000L; // ms
|
||||
|
||||
/** Maximum time to hold direct peer connection information */
|
||||
public static final long ARBITRARY_DIRECT_CONNECTION_INFO_TIMEOUT = 2 * 60 * 1000L; // ms
|
||||
|
@ -360,9 +360,8 @@ public class ArbitraryMetadataManager {
|
||||
|
||||
// Forward to requesting peer
|
||||
LOGGER.debug("Forwarding metadata to requesting peer: {}", requestingPeer);
|
||||
if (!requestingPeer.sendMessage(forwardArbitraryMetadataMessage)) {
|
||||
requestingPeer.disconnect("failed to forward arbitrary metadata");
|
||||
}
|
||||
requestingPeer.sendMessage(forwardArbitraryMetadataMessage);
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -479,7 +478,6 @@ public class ArbitraryMetadataManager {
|
||||
arbitraryMetadataMessage.setId(message.getId());
|
||||
if (!peer.sendMessage(arbitraryMetadataMessage)) {
|
||||
LOGGER.debug("Couldn't send metadata");
|
||||
peer.disconnect("failed to send metadata");
|
||||
continue;
|
||||
}
|
||||
LOGGER.debug("Sent metadata");
|
||||
|
@ -640,10 +640,13 @@ public class Peer {
|
||||
return false;
|
||||
|
||||
try {
|
||||
this.outputBuffer = ByteBuffer.wrap(message.toBytes());
|
||||
byte[] messageBytes = message.toBytes();
|
||||
|
||||
this.outputBuffer = ByteBuffer.wrap(messageBytes);
|
||||
this.outputMessageType = message.getType().name();
|
||||
this.outputMessageId = message.getId();
|
||||
|
||||
|
||||
LOGGER.trace("[{}] Sending {} message with ID {} to peer {}",
|
||||
this.peerConnectionId, this.outputMessageType, this.outputMessageId, this);
|
||||
|
||||
@ -662,12 +665,22 @@ public class Peer {
|
||||
// If output byte buffer is not null, send from that
|
||||
int bytesWritten = this.socketChannel.write(outputBuffer);
|
||||
|
||||
LOGGER.trace("[{}] Sent {} bytes of {} message with ID {} to peer {} ({} total)", this.peerConnectionId,
|
||||
bytesWritten, this.outputMessageType, this.outputMessageId, this, outputBuffer.limit());
|
||||
int zeroSendCount = 0;
|
||||
|
||||
// If we've sent 0 bytes then socket buffer is full so we need to wait until it's empty again
|
||||
if (bytesWritten == 0) {
|
||||
return true;
|
||||
while (bytesWritten == 0) {
|
||||
if (zeroSendCount > 9) {
|
||||
LOGGER.debug("Socket write stuck for too long, returning");
|
||||
return true;
|
||||
}
|
||||
try {
|
||||
Thread.sleep(10); // 10MS CPU Sleep to try and give it time to flush the socket
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
return false; // optional, if you want to signal shutdown
|
||||
}
|
||||
zeroSendCount++;
|
||||
bytesWritten = this.socketChannel.write(outputBuffer);
|
||||
}
|
||||
|
||||
// If we then exhaust the byte buffer, set it to null (otherwise loop and try to send more)
|
||||
@ -729,7 +742,7 @@ public class Peer {
|
||||
|
||||
try {
|
||||
// Queue message, to be picked up by ChannelWriteTask and then peer.writeChannel()
|
||||
LOGGER.trace("[{}] Queuing {} message with ID {} to peer {}", this.peerConnectionId,
|
||||
LOGGER.debug("[{}] Queuing {} message with ID {} to peer {}", this.peerConnectionId,
|
||||
message.getType().name(), message.getId(), this);
|
||||
|
||||
// Check message properly constructed
|
||||
|
131
src/main/java/org/qortal/network/PeerSendManager.java
Normal file
131
src/main/java/org/qortal/network/PeerSendManager.java
Normal file
@ -0,0 +1,131 @@
|
||||
package org.qortal.network;
|
||||
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.qortal.network.message.Message;
|
||||
|
||||
public class PeerSendManager {
|
||||
private static final Logger LOGGER = LogManager.getLogger(PeerSendManager.class);
|
||||
|
||||
private static final int MAX_FAILURES = 15;
|
||||
private static final int MAX_MESSAGE_ATTEMPTS = 2;
|
||||
private static final int SEND_TIMEOUT_MS = 500;
|
||||
private static final int RETRY_DELAY_MS = 100;
|
||||
private static final long MAX_QUEUE_DURATION_MS = 20_000;
|
||||
private static final long COOLDOWN_DURATION_MS = 20_000;
|
||||
|
||||
private final Peer peer;
|
||||
private final BlockingQueue<TimedMessage> queue = new LinkedBlockingQueue<>();
|
||||
private final ExecutorService executor;
|
||||
private final AtomicInteger failureCount = new AtomicInteger(0);
|
||||
private static final AtomicInteger threadCount = new AtomicInteger(1);
|
||||
|
||||
private volatile boolean coolingDown = false;
|
||||
private volatile long lastUsed = System.currentTimeMillis();
|
||||
|
||||
public PeerSendManager(Peer peer) {
|
||||
this.peer = peer;
|
||||
this.executor = Executors.newSingleThreadExecutor(r -> {
|
||||
Thread t = new Thread(r);
|
||||
t.setName("PeerSendManager-" + peer.getResolvedAddress().getHostString() + "-" + threadCount.getAndIncrement());
|
||||
return t;
|
||||
});
|
||||
start();
|
||||
}
|
||||
|
||||
private void start() {
|
||||
executor.submit(() -> {
|
||||
while (!Thread.currentThread().isInterrupted()) {
|
||||
try {
|
||||
TimedMessage timedMessage = queue.take();
|
||||
long age = System.currentTimeMillis() - timedMessage.timestamp;
|
||||
|
||||
if (age > MAX_QUEUE_DURATION_MS) {
|
||||
LOGGER.debug("Dropping stale message {} ({}ms old)", timedMessage.message.getId(), age);
|
||||
continue;
|
||||
}
|
||||
|
||||
Message message = timedMessage.message;
|
||||
boolean success = false;
|
||||
|
||||
for (int attempt = 1; attempt <= MAX_MESSAGE_ATTEMPTS; attempt++) {
|
||||
try {
|
||||
if (peer.sendMessageWithTimeout(message, SEND_TIMEOUT_MS)) {
|
||||
success = true;
|
||||
failureCount.set(0); // reset on success
|
||||
break;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOGGER.debug("Attempt {} failed for message {} to peer {}: {}", attempt, message.getId(), peer, e.getMessage());
|
||||
}
|
||||
|
||||
Thread.sleep(RETRY_DELAY_MS);
|
||||
}
|
||||
|
||||
if (!success) {
|
||||
int totalFailures = failureCount.incrementAndGet();
|
||||
LOGGER.debug("Failed to send message {} to peer {}. Total failures: {}", message.getId(), peer, totalFailures);
|
||||
|
||||
if (totalFailures >= MAX_FAILURES) {
|
||||
LOGGER.debug("Peer {} exceeded failure limit ({}). Disconnecting...", peer, totalFailures);
|
||||
peer.disconnect("Too many message send failures");
|
||||
coolingDown = true;
|
||||
queue.clear();
|
||||
|
||||
try {
|
||||
Thread.sleep(COOLDOWN_DURATION_MS);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
return;
|
||||
} finally {
|
||||
coolingDown = false;
|
||||
failureCount.set(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Thread.sleep(50); // small throttle
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
break;
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Unexpected error in PeerSendManager for peer {}: {}", peer, e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void queueMessage(Message message) {
|
||||
if (coolingDown) {
|
||||
LOGGER.debug("In cooldown, ignoring message {}", message.getId());
|
||||
return;
|
||||
}
|
||||
|
||||
lastUsed = System.currentTimeMillis();
|
||||
if (!queue.offer(new TimedMessage(message))) {
|
||||
LOGGER.debug("Send queue full, dropping message {}", message.getId());
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isIdle(long cutoffMillis) {
|
||||
return System.currentTimeMillis() - lastUsed > cutoffMillis;
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
queue.clear();
|
||||
executor.shutdownNow();
|
||||
}
|
||||
|
||||
private static class TimedMessage {
|
||||
final Message message;
|
||||
final long timestamp;
|
||||
|
||||
TimedMessage(Message message) {
|
||||
this.message = message;
|
||||
this.timestamp = System.currentTimeMillis();
|
||||
}
|
||||
}
|
||||
}
|
@ -31,21 +31,28 @@ public class ChannelWriteTask implements Task {
|
||||
@Override
|
||||
public void perform() throws InterruptedException {
|
||||
try {
|
||||
|
||||
boolean isSocketClogged;
|
||||
|
||||
boolean isSocketClogged;
|
||||
int clogCounter = 0;
|
||||
do {
|
||||
isSocketClogged = peer.writeChannel();
|
||||
|
||||
if (clogCounter > 9) {
|
||||
LOGGER.warn("10 Socket Clogs - GIVING UP");
|
||||
break;
|
||||
}
|
||||
if (isSocketClogged) {
|
||||
LOGGER.info(
|
||||
LOGGER.debug(
|
||||
"socket is clogged: peer = {} {}, retrying",
|
||||
peer.getPeerData().getAddress().toString(),
|
||||
Thread.currentThread().getName()
|
||||
);
|
||||
Thread.sleep(1000);
|
||||
clogCounter++;
|
||||
}
|
||||
|
||||
} while( isSocketClogged );
|
||||
|
||||
|
||||
// Tell Network that we've finished
|
||||
Network.getInstance().notifyChannelNotWriting(socketChannel);
|
||||
|
||||
@ -62,4 +69,4 @@ public class ChannelWriteTask implements Task {
|
||||
peer.disconnect("I/O error");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -44,11 +44,9 @@ public class HSQLDBChatRepository implements ChatRepository {
|
||||
|
||||
// if the PrimaryTable is available, then use it
|
||||
if( this.repository.getBlockRepository().getBlockchainHeight() > BlockChain.getInstance().getMultipleNamesPerAccountHeight()) {
|
||||
LOGGER.debug("using PrimaryNames for chat transactions");
|
||||
tableName = "PrimaryNames";
|
||||
}
|
||||
else {
|
||||
LOGGER.debug("using Names for chat transactions");
|
||||
tableName = "Names";
|
||||
}
|
||||
|
||||
@ -164,11 +162,9 @@ public class HSQLDBChatRepository implements ChatRepository {
|
||||
|
||||
// if the PrimaryTable is available, then use it
|
||||
if( this.repository.getBlockRepository().getBlockchainHeight() > BlockChain.getInstance().getMultipleNamesPerAccountHeight()) {
|
||||
LOGGER.debug("using PrimaryNames for chat transactions");
|
||||
tableName = "PrimaryNames";
|
||||
}
|
||||
else {
|
||||
LOGGER.debug("using Names for chat transactions");
|
||||
tableName = "Names";
|
||||
}
|
||||
|
||||
@ -322,11 +318,9 @@ public class HSQLDBChatRepository implements ChatRepository {
|
||||
|
||||
// if the PrimaryTable is available, then use it
|
||||
if( this.repository.getBlockRepository().getBlockchainHeight() > BlockChain.getInstance().getMultipleNamesPerAccountHeight()) {
|
||||
LOGGER.debug("using PrimaryNames for chat transactions");
|
||||
tableName = "PrimaryNames";
|
||||
}
|
||||
else {
|
||||
LOGGER.debug("using Names for chat transactions");
|
||||
tableName = "Names";
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user