Merge pull request #260 from kennycud/master

Promising QDN Improvements
This commit is contained in:
crowetic 2025-07-10 13:51:35 -07:00 committed by GitHub
commit 09d0af9b78
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 503 additions and 119 deletions

View File

@ -1,6 +1,7 @@
package org.qortal.arbitrary; package org.qortal.arbitrary;
import com.google.common.io.Resources; import com.google.common.io.Resources;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.ArrayUtils;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
@ -15,11 +16,13 @@ import org.qortal.settings.Settings;
import javax.servlet.ServletContext; import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.net.URL; import java.net.URL;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.NoSuchFileException; import java.nio.file.NoSuchFileException;
@ -167,7 +170,14 @@ public class ArbitraryDataRenderer {
if (HTMLParser.isHtmlFile(filename)) { if (HTMLParser.isHtmlFile(filename)) {
// HTML file - needs to be parsed // HTML file - needs to be parsed
byte[] data = Files.readAllBytes(filePath); // TODO: limit file size that can be read into memory byte[] data = Files.readAllBytes(filePath); // TODO: limit file size that can be read into memory
HTMLParser htmlParser = new HTMLParser(resourceId, inPath, prefix, includeResourceIdInPrefix, data, qdnContext, service, identifier, theme, usingCustomRouting, lang); String encodedResourceId;
if (resourceIdType == ResourceIdType.NAME) {
encodedResourceId = resourceId.replace(" ", "%20");
} else {
encodedResourceId = resourceId;
}
HTMLParser htmlParser = new HTMLParser(encodedResourceId, inPath, prefix, includeResourceIdInPrefix, data, qdnContext, service, identifier, theme, usingCustomRouting, lang);
htmlParser.addAdditionalHeaderTags(); htmlParser.addAdditionalHeaderTags();
response.addHeader("Content-Security-Policy", "default-src 'self' 'unsafe-inline' 'unsafe-eval'; font-src 'self' data:; media-src 'self' data: blob:; img-src 'self' data: blob:; connect-src 'self' wss: blob:;"); response.addHeader("Content-Security-Policy", "default-src 'self' 'unsafe-inline' 'unsafe-eval'; font-src 'self' data:; media-src 'self' data: blob:; img-src 'self' data: blob:; connect-src 'self' wss: blob:;");
response.setContentType(context.getMimeType(filename)); response.setContentType(context.getMimeType(filename));

View File

@ -567,6 +567,9 @@ public class Controller extends Thread {
LOGGER.info("Starting foreign fees manager"); LOGGER.info("Starting foreign fees manager");
ForeignFeesManager.getInstance().start(); ForeignFeesManager.getInstance().start();
LOGGER.info("Starting follower");
Follower.getInstance().start();
LOGGER.info("Starting transaction importer"); LOGGER.info("Starting transaction importer");
TransactionImporter.getInstance().start(); TransactionImporter.getInstance().start();

View File

@ -124,8 +124,8 @@ public class ArbitraryDataFileListManager {
if (timeSinceLastAttempt > 15 * 1000L) { if (timeSinceLastAttempt > 15 * 1000L) {
// We haven't tried for at least 15 seconds // We haven't tried for at least 15 seconds
if (networkBroadcastCount < 3) { if (networkBroadcastCount < 12) {
// We've made less than 3 total attempts // We've made less than 12 total attempts
return true; return true;
} }
} }
@ -134,8 +134,8 @@ public class ArbitraryDataFileListManager {
if (timeSinceLastAttempt > 60 * 1000L) { if (timeSinceLastAttempt > 60 * 1000L) {
// We haven't tried for at least 1 minute // We haven't tried for at least 1 minute
if (networkBroadcastCount < 8) { if (networkBroadcastCount < 40) {
// We've made less than 8 total attempts // We've made less than 40 total attempts
return true; return true;
} }
} }
@ -402,8 +402,8 @@ public class ArbitraryDataFileListManager {
return true; return true;
} }
public void deleteFileListRequestsForSignature(byte[] signature) { public void deleteFileListRequestsForSignature(String signature58) {
String signature58 = Base58.encode(signature);
for (Iterator<Map.Entry<Integer, Triple<String, Peer, Long>>> it = arbitraryDataFileListRequests.entrySet().iterator(); it.hasNext();) { for (Iterator<Map.Entry<Integer, Triple<String, Peer, Long>>> it = arbitraryDataFileListRequests.entrySet().iterator(); it.hasNext();) {
Map.Entry<Integer, Triple<String, Peer, Long>> entry = it.next(); Map.Entry<Integer, Triple<String, Peer, Long>> entry = it.next();
if (entry == null || entry.getKey() == null || entry.getValue() == null) { if (entry == null || entry.getKey() == null || entry.getValue() == null) {
@ -587,9 +587,7 @@ public class ArbitraryDataFileListManager {
// Forward to requesting peer // Forward to requesting peer
LOGGER.debug("Forwarding file list with {} hashes to requesting peer: {}", hashes.size(), requestingPeer); LOGGER.debug("Forwarding file list with {} hashes to requesting peer: {}", hashes.size(), requestingPeer);
if (!requestingPeer.sendMessage(forwardArbitraryDataFileListMessage)) { requestingPeer.sendMessage(forwardArbitraryDataFileListMessage);
requestingPeer.disconnect("failed to forward arbitrary data file list");
}
} }
} }
} }
@ -787,7 +785,6 @@ public class ArbitraryDataFileListManager {
if (!peer.sendMessage(arbitraryDataFileListMessage)) { if (!peer.sendMessage(arbitraryDataFileListMessage)) {
LOGGER.debug("Couldn't send list of hashes"); LOGGER.debug("Couldn't send list of hashes");
peer.disconnect("failed to send list of hashes");
continue; continue;
} }

View File

@ -1,6 +1,7 @@
package org.qortal.controller.arbitrary; package org.qortal.controller.arbitrary;
import com.google.common.net.InetAddresses; import com.google.common.net.InetAddresses;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.qortal.arbitrary.ArbitraryDataFile; import org.qortal.arbitrary.ArbitraryDataFile;
@ -23,12 +24,15 @@ import org.qortal.utils.NTP;
import java.security.SecureRandom; import java.security.SecureRandom;
import java.util.*; import java.util.*;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.qortal.network.PeerSendManager;
public class ArbitraryDataFileManager extends Thread { public class ArbitraryDataFileManager extends Thread {
private static final Logger LOGGER = LogManager.getLogger(ArbitraryDataFileManager.class); private static final Logger LOGGER = LogManager.getLogger(ArbitraryDataFileManager.class);
@ -66,10 +70,38 @@ public class ArbitraryDataFileManager extends Thread {
public static int MAX_FILE_HASH_RESPONSES = 1000; public static int MAX_FILE_HASH_RESPONSES = 1000;
private final Map<Peer, PeerSendManager> peerSendManagers = new ConcurrentHashMap<>();
private PeerSendManager getOrCreateSendManager(Peer peer) {
return peerSendManagers.computeIfAbsent(peer, p -> new PeerSendManager(p));
}
private ArbitraryDataFileManager() { private ArbitraryDataFileManager() {
this.arbitraryDataFileHashResponseScheduler.scheduleAtFixedRate( this::processResponses, 60, 1, TimeUnit.SECONDS); this.arbitraryDataFileHashResponseScheduler.scheduleAtFixedRate( this::processResponses, 60, 1, TimeUnit.SECONDS);
this.arbitraryDataFileHashResponseScheduler.scheduleAtFixedRate(this::handleFileListRequestProcess, 60, 1, TimeUnit.SECONDS);
ScheduledExecutorService cleaner = Executors.newSingleThreadScheduledExecutor();
cleaner.scheduleAtFixedRate(() -> {
long idleCutoff = TimeUnit.MINUTES.toMillis(2);
Iterator<Map.Entry<Peer, PeerSendManager>> iterator = peerSendManagers.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Peer, PeerSendManager> entry = iterator.next();
Peer peer = entry.getKey();
PeerSendManager manager = entry.getValue();
if (manager.isIdle(idleCutoff)) {
iterator.remove(); // SAFE removal during iteration
manager.shutdown();
LOGGER.debug("Cleaned up PeerSendManager for peer {}", peer);
}
}
}, 0, 5, TimeUnit.MINUTES);
} }
public static ArbitraryDataFileManager getInstance() { public static ArbitraryDataFileManager getInstance() {
@ -79,6 +111,8 @@ public class ArbitraryDataFileManager extends Thread {
return instance; return instance;
} }
@Override @Override
public void run() { public void run() {
Thread.currentThread().setName("Arbitrary Data File Manager"); Thread.currentThread().setName("Arbitrary Data File Manager");
@ -140,7 +174,7 @@ public class ArbitraryDataFileManager extends Thread {
if (!arbitraryDataFileRequests.containsKey(Base58.encode(hash))) { if (!arbitraryDataFileRequests.containsKey(Base58.encode(hash))) {
LOGGER.debug("Requesting data file {} from peer {}", hash58, peer); LOGGER.debug("Requesting data file {} from peer {}", hash58, peer);
Long startTime = NTP.getTime(); Long startTime = NTP.getTime();
ArbitraryDataFile receivedArbitraryDataFile = fetchArbitraryDataFile(peer, null, arbitraryTransactionData, signature, hash, null); ArbitraryDataFile receivedArbitraryDataFile = fetchArbitraryDataFile(peer, arbitraryTransactionData, signature, hash);
Long endTime = NTP.getTime(); Long endTime = NTP.getTime();
if (receivedArbitraryDataFile != null) { if (receivedArbitraryDataFile != null) {
LOGGER.debug("Received data file {} from peer {}. Time taken: {} ms", receivedArbitraryDataFile.getHash58(), peer, (endTime-startTime)); LOGGER.debug("Received data file {} from peer {}. Time taken: {} ms", receivedArbitraryDataFile.getHash58(), peer, (endTime-startTime));
@ -207,14 +241,71 @@ public class ArbitraryDataFileManager extends Thread {
} }
} }
private ArbitraryDataFile fetchArbitraryDataFile(Peer peer, Peer requestingPeer, ArbitraryTransactionData arbitraryTransactionData, byte[] signature, byte[] hash, Message originalMessage) throws DataException { private ArbitraryDataFile fetchArbitraryDataFile(Peer peer, ArbitraryTransactionData arbitraryTransactionData, byte[] signature, byte[] hash) throws DataException {
ArbitraryDataFile existingFile = ArbitraryDataFile.fromHash(hash, signature);
boolean fileAlreadyExists = existingFile.exists();
String hash58 = Base58.encode(hash);
ArbitraryDataFile arbitraryDataFile; ArbitraryDataFile arbitraryDataFile;
// Fetch the file if it doesn't exist locally try {
if (!fileAlreadyExists) { ArbitraryDataFile existingFile = ArbitraryDataFile.fromHash(hash, signature);
boolean fileAlreadyExists = existingFile.exists();
String hash58 = Base58.encode(hash);
// Fetch the file if it doesn't exist locally
if (!fileAlreadyExists) {
LOGGER.debug(String.format("Fetching data file %.8s from peer %s", hash58, peer));
arbitraryDataFileRequests.put(hash58, NTP.getTime());
Message getArbitraryDataFileMessage = new GetArbitraryDataFileMessage(signature, hash);
Message response = null;
try {
response = peer.getResponseWithTimeout(getArbitraryDataFileMessage, (int) ArbitraryDataManager.ARBITRARY_REQUEST_TIMEOUT);
} catch (InterruptedException e) {
// Will return below due to null response
}
arbitraryDataFileRequests.remove(hash58);
LOGGER.trace(String.format("Removed hash %.8s from arbitraryDataFileRequests", hash58));
if (response == null) {
LOGGER.debug("Received null response from peer {}", peer);
return null;
}
if (response.getType() != MessageType.ARBITRARY_DATA_FILE) {
LOGGER.debug("Received response with invalid type: {} from peer {}", response.getType(), peer);
return null;
}
ArbitraryDataFileMessage peersArbitraryDataFileMessage = (ArbitraryDataFileMessage) response;
arbitraryDataFile = peersArbitraryDataFileMessage.getArbitraryDataFile();
} else {
LOGGER.debug(String.format("File hash %s already exists, so skipping the request", hash58));
arbitraryDataFile = existingFile;
}
if (arbitraryDataFile != null) {
arbitraryDataFile.save();
// If this is a metadata file then we need to update the cache
if (arbitraryTransactionData != null && arbitraryTransactionData.getMetadataHash() != null) {
if (Arrays.equals(arbitraryTransactionData.getMetadataHash(), hash)) {
ArbitraryDataCacheManager.getInstance().addToUpdateQueue(arbitraryTransactionData);
}
}
// We may need to remove the file list request, if we have all the files for this transaction
this.handleFileListRequests(signature);
}
} catch (DataException e) {
LOGGER.error(e.getMessage(), e);
arbitraryDataFile = null;
}
return arbitraryDataFile;
}
private void fetchFileForRelay(Peer peer, Peer requestingPeer, byte[] signature, byte[] hash, Message originalMessage) throws DataException {
try {
String hash58 = Base58.encode(hash);
LOGGER.debug(String.format("Fetching data file %.8s from peer %s", hash58, peer)); LOGGER.debug(String.format("Fetching data file %.8s from peer %s", hash58, peer));
arbitraryDataFileRequests.put(hash58, NTP.getTime()); arbitraryDataFileRequests.put(hash58, NTP.getTime());
Message getArbitraryDataFileMessage = new GetArbitraryDataFileMessage(signature, hash); Message getArbitraryDataFileMessage = new GetArbitraryDataFileMessage(signature, hash);
@ -228,77 +319,73 @@ public class ArbitraryDataFileManager extends Thread {
arbitraryDataFileRequests.remove(hash58); arbitraryDataFileRequests.remove(hash58);
LOGGER.trace(String.format("Removed hash %.8s from arbitraryDataFileRequests", hash58)); LOGGER.trace(String.format("Removed hash %.8s from arbitraryDataFileRequests", hash58));
if (response == null) { if (response == null) {
LOGGER.debug("Received null response from peer {}", peer); LOGGER.debug("Received null response from peer {}", peer);
return null; return;
} }
if (response.getType() != MessageType.ARBITRARY_DATA_FILE) { if (response.getType() != MessageType.ARBITRARY_DATA_FILE) {
LOGGER.debug("Received response with invalid type: {} from peer {}", response.getType(), peer); LOGGER.debug("Received response with invalid type: {} from peer {}", response.getType(), peer);
return null;
}
ArbitraryDataFileMessage peersArbitraryDataFileMessage = (ArbitraryDataFileMessage) response;
arbitraryDataFile = peersArbitraryDataFileMessage.getArbitraryDataFile();
} else {
LOGGER.debug(String.format("File hash %s already exists, so skipping the request", hash58));
arbitraryDataFile = existingFile;
}
if (arbitraryDataFile == null) {
// We don't have a file, so give up here
return null;
}
// We might want to forward the request to the peer that originally requested it
this.handleArbitraryDataFileForwarding(requestingPeer, new ArbitraryDataFileMessage(signature, arbitraryDataFile), originalMessage);
boolean isRelayRequest = (requestingPeer != null);
if (isRelayRequest) {
if (!fileAlreadyExists) {
// File didn't exist locally before the request, and it's a forwarding request, so delete it if it exists.
// It shouldn't exist on the filesystem yet, but leaving this here just in case.
arbitraryDataFile.delete(10);
}
}
else {
arbitraryDataFile.save();
}
// If this is a metadata file then we need to update the cache
if (arbitraryTransactionData != null && arbitraryTransactionData.getMetadataHash() != null) {
if (Arrays.equals(arbitraryTransactionData.getMetadataHash(), hash)) {
ArbitraryDataCacheManager.getInstance().addToUpdateQueue(arbitraryTransactionData);
}
}
// We may need to remove the file list request, if we have all the files for this transaction
this.handleFileListRequests(signature);
return arbitraryDataFile;
}
private void handleFileListRequests(byte[] signature) {
try (final Repository repository = RepositoryManager.getRepository()) {
// Fetch the transaction data
ArbitraryTransactionData arbitraryTransactionData = ArbitraryTransactionUtils.fetchTransactionData(repository, signature);
if (arbitraryTransactionData == null) {
return; return;
} }
boolean completeFileExists = ArbitraryTransactionUtils.completeFileExists(arbitraryTransactionData); ArbitraryDataFileMessage peersArbitraryDataFileMessage = (ArbitraryDataFileMessage) response;
ArbitraryDataFile arbitraryDataFile = peersArbitraryDataFileMessage.getArbitraryDataFile();
if (completeFileExists) { if (arbitraryDataFile != null) {
String signature58 = Base58.encode(arbitraryTransactionData.getSignature());
LOGGER.info("All chunks or complete file exist for transaction {}", signature58);
ArbitraryDataFileListManager.getInstance().deleteFileListRequestsForSignature(signature); // We might want to forward the request to the peer that originally requested it
this.handleArbitraryDataFileForwarding(requestingPeer, new ArbitraryDataFileMessage(signature, arbitraryDataFile), originalMessage);
}
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}
Map<String, byte[]> signatureBySignature58 = new HashMap<>();
// Lock to synchronize access to the list
private final Object handleFileListRequestsLock = new Object();
// Scheduled executor service to process messages every second
private final ScheduledExecutorService handleFileListRequestsScheduler = Executors.newScheduledThreadPool(1);
private void handleFileListRequests(byte[] signature) {
synchronized (handleFileListRequestsLock) {
signatureBySignature58.put(Base58.encode(signature), signature);
}
}
private void handleFileListRequestProcess() {
Map<String, byte[]> signaturesToProcess;
synchronized (handleFileListRequestsLock) {
signaturesToProcess = new HashMap<>(signatureBySignature58);
signatureBySignature58.clear();
}
if( signaturesToProcess.isEmpty() ) return;
try (final Repository repository = RepositoryManager.getRepository()) {
// Fetch the transaction data
List<ArbitraryTransactionData> arbitraryTransactionDataList
= ArbitraryTransactionUtils.fetchTransactionDataList(repository, new ArrayList<>(signaturesToProcess.values()));
for( ArbitraryTransactionData arbitraryTransactionData : arbitraryTransactionDataList ) {
boolean completeFileExists = ArbitraryTransactionUtils.completeFileExists(arbitraryTransactionData);
if (completeFileExists) {
String signature58 = Base58.encode(arbitraryTransactionData.getSignature());
LOGGER.debug("All chunks or complete file exist for transaction {}", signature58);
ArbitraryDataFileListManager.getInstance().deleteFileListRequestsForSignature(signature58);
}
} }
} catch (DataException e) { } catch (Exception e) {
LOGGER.debug("Unable to handle file list requests: {}", e.getMessage()); LOGGER.error(e.getMessage(), e);
} }
} }
@ -315,15 +402,14 @@ public class ArbitraryDataFileManager extends Thread {
LOGGER.debug("Received arbitrary data file - forwarding is needed"); LOGGER.debug("Received arbitrary data file - forwarding is needed");
// The ID needs to match that of the original request try {
message.setId(originalMessage.getId()); // The ID needs to match that of the original request
message.setId(originalMessage.getId());
if (!requestingPeer.sendMessageWithTimeout(message, (int) ArbitraryDataManager.ARBITRARY_REQUEST_TIMEOUT)) { getOrCreateSendManager(requestingPeer).queueMessage(message);
LOGGER.debug("Failed to forward arbitrary data file to peer {}", requestingPeer);
requestingPeer.disconnect("failed to forward arbitrary data file"); } catch (Exception e) {
} LOGGER.error(e.getMessage(), e);
else {
LOGGER.debug("Forwarded arbitrary data file to peer {}", requestingPeer);
} }
} }
@ -597,13 +683,9 @@ public class ArbitraryDataFileManager extends Thread {
LOGGER.debug("Sending file {}...", arbitraryDataFile); LOGGER.debug("Sending file {}...", arbitraryDataFile);
ArbitraryDataFileMessage arbitraryDataFileMessage = new ArbitraryDataFileMessage(signature, arbitraryDataFile); ArbitraryDataFileMessage arbitraryDataFileMessage = new ArbitraryDataFileMessage(signature, arbitraryDataFile);
arbitraryDataFileMessage.setId(message.getId()); arbitraryDataFileMessage.setId(message.getId());
if (!peer.sendMessageWithTimeout(arbitraryDataFileMessage, (int) ArbitraryDataManager.ARBITRARY_REQUEST_TIMEOUT)) {
LOGGER.debug("Couldn't send file {}", arbitraryDataFile); getOrCreateSendManager(peer).queueMessage(arbitraryDataFileMessage);
peer.disconnect("failed to send file");
}
else {
LOGGER.debug("Sent file {}", arbitraryDataFile);
}
} }
else if (relayInfo != null) { else if (relayInfo != null) {
LOGGER.debug("We have relay info for hash {}", Base58.encode(hash)); LOGGER.debug("We have relay info for hash {}", Base58.encode(hash));
@ -615,7 +697,7 @@ public class ArbitraryDataFileManager extends Thread {
LOGGER.debug("Asking peer {} for hash {}", peerToAsk, hash58); LOGGER.debug("Asking peer {} for hash {}", peerToAsk, hash58);
// No need to pass arbitraryTransactionData below because this is only used for metadata caching, // No need to pass arbitraryTransactionData below because this is only used for metadata caching,
// and metadata isn't retained when relaying. // and metadata isn't retained when relaying.
this.fetchArbitraryDataFile(peerToAsk, peer, null, signature, hash, message); this.fetchFileForRelay(peerToAsk, peer, signature, hash, message);
} }
else { else {
LOGGER.debug("Peer {} not found in relay info", peer); LOGGER.debug("Peer {} not found in relay info", peer);
@ -637,7 +719,6 @@ public class ArbitraryDataFileManager extends Thread {
fileUnknownMessage.setId(message.getId()); fileUnknownMessage.setId(message.getId());
if (!peer.sendMessage(fileUnknownMessage)) { if (!peer.sendMessage(fileUnknownMessage)) {
LOGGER.debug("Couldn't sent file-unknown response"); LOGGER.debug("Couldn't sent file-unknown response");
peer.disconnect("failed to send file-unknown response");
} }
else { else {
LOGGER.debug("Sent file-unknown response for file {}", arbitraryDataFile); LOGGER.debug("Sent file-unknown response for file {}", arbitraryDataFile);

View File

@ -15,6 +15,7 @@ import org.qortal.settings.Settings;
import org.qortal.utils.ArbitraryTransactionUtils; import org.qortal.utils.ArbitraryTransactionUtils;
import org.qortal.utils.Base58; import org.qortal.utils.Base58;
import org.qortal.utils.NTP; import org.qortal.utils.NTP;
import org.qortal.utils.NamedThreadFactory;
import java.net.http.HttpResponse; import java.net.http.HttpResponse;
import java.util.ArrayList; import java.util.ArrayList;
@ -38,6 +39,9 @@ public class ArbitraryDataFileRequestThread {
private static final Logger LOGGER = LogManager.getLogger(ArbitraryDataFileRequestThread.class); private static final Logger LOGGER = LogManager.getLogger(ArbitraryDataFileRequestThread.class);
private static final Integer FETCHER_LIMIT_PER_PEER = Settings.getInstance().getMaxThreadsForMessageType(MessageType.GET_ARBITRARY_DATA_FILE);
private static final String FETCHER_THREAD_PREFIX = "Arbitrary Data Fetcher ";
private ConcurrentHashMap<String, ExecutorService> executorByPeer = new ConcurrentHashMap<>(); private ConcurrentHashMap<String, ExecutorService> executorByPeer = new ConcurrentHashMap<>();
private ArbitraryDataFileRequestThread() { private ArbitraryDataFileRequestThread() {
@ -64,8 +68,9 @@ public class ArbitraryDataFileRequestThread {
if (value instanceof ThreadPoolExecutor) { if (value instanceof ThreadPoolExecutor) {
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) value; ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) value;
if (threadPoolExecutor.getActiveCount() == 0) { if (threadPoolExecutor.getActiveCount() == 0) {
threadPoolExecutor.shutdown();
if (this.executorByPeer.computeIfPresent(key, (k, v) -> null) == null) { if (this.executorByPeer.computeIfPresent(key, (k, v) -> null) == null) {
LOGGER.info("removed executor: peer = " + key); LOGGER.trace("removed executor: peer = " + key);
} }
} }
} else { } else {
@ -147,7 +152,9 @@ public class ArbitraryDataFileRequestThread {
.computeIfAbsent( .computeIfAbsent(
responseInfo.getPeer().toString(), responseInfo.getPeer().toString(),
peer -> Executors.newFixedThreadPool( peer -> Executors.newFixedThreadPool(
Settings.getInstance().getMaxThreadsForMessageType(MessageType.GET_ARBITRARY_DATA_FILE)) FETCHER_LIMIT_PER_PEER,
new NamedThreadFactory(FETCHER_THREAD_PREFIX + responseInfo.getPeer().toString(), NORM_PRIORITY)
)
) )
.execute(fetcher); .execute(fetcher);
} }

View File

@ -42,10 +42,10 @@ public class ArbitraryDataManager extends Thread {
private int powDifficulty = 14; // Must not be final, as unit tests need to reduce this value private int powDifficulty = 14; // Must not be final, as unit tests need to reduce this value
/** Request timeout when transferring arbitrary data */ /** Request timeout when transferring arbitrary data */
public static final long ARBITRARY_REQUEST_TIMEOUT = 12 * 1000L; // ms public static final long ARBITRARY_REQUEST_TIMEOUT = 24 * 1000L; // ms
/** Maximum time to hold information about an in-progress relay */ /** Maximum time to hold information about an in-progress relay */
public static final long ARBITRARY_RELAY_TIMEOUT = 90 * 1000L; // ms public static final long ARBITRARY_RELAY_TIMEOUT = 120 * 1000L; // ms
/** Maximum time to hold direct peer connection information */ /** Maximum time to hold direct peer connection information */
public static final long ARBITRARY_DIRECT_CONNECTION_INFO_TIMEOUT = 2 * 60 * 1000L; // ms public static final long ARBITRARY_DIRECT_CONNECTION_INFO_TIMEOUT = 2 * 60 * 1000L; // ms

View File

@ -360,9 +360,8 @@ public class ArbitraryMetadataManager {
// Forward to requesting peer // Forward to requesting peer
LOGGER.debug("Forwarding metadata to requesting peer: {}", requestingPeer); LOGGER.debug("Forwarding metadata to requesting peer: {}", requestingPeer);
if (!requestingPeer.sendMessage(forwardArbitraryMetadataMessage)) { requestingPeer.sendMessage(forwardArbitraryMetadataMessage);
requestingPeer.disconnect("failed to forward arbitrary metadata");
}
} }
} }
} }
@ -479,7 +478,6 @@ public class ArbitraryMetadataManager {
arbitraryMetadataMessage.setId(message.getId()); arbitraryMetadataMessage.setId(message.getId());
if (!peer.sendMessage(arbitraryMetadataMessage)) { if (!peer.sendMessage(arbitraryMetadataMessage)) {
LOGGER.debug("Couldn't send metadata"); LOGGER.debug("Couldn't send metadata");
peer.disconnect("failed to send metadata");
continue; continue;
} }
LOGGER.debug("Sent metadata"); LOGGER.debug("Sent metadata");

View File

@ -0,0 +1,130 @@
package org.qortal.controller.arbitrary;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.qortal.data.transaction.ArbitraryTransactionData;
import org.qortal.repository.Repository;
import org.qortal.repository.RepositoryManager;
import org.qortal.settings.Settings;
import org.qortal.utils.ListUtils;
import org.qortal.utils.NamedThreadFactory;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class Follower {
private static final Logger LOGGER = LogManager.getLogger(Follower.class);
private ScheduledExecutorService service
= Executors.newScheduledThreadPool(2, new NamedThreadFactory("Follower", Thread.NORM_PRIORITY));
private Follower() {
}
private static Follower instance;
public static Follower getInstance() {
if( instance == null ) {
instance = new Follower();
}
return instance;
}
public void start() {
// fetch arbitrary transactions from followed names from the last 100 blocks every 2 minutes
service.scheduleWithFixedDelay(() -> fetch(OptionalInt.of(100)), 10, 2, TimeUnit.MINUTES);
// fetch arbitrary transaction from followed names from any block every 24 hours
service.scheduleWithFixedDelay(() -> fetch(OptionalInt.empty()), 4, 24, TimeUnit.HOURS);
}
private void fetch(OptionalInt limit) {
try {
// for each followed name, get arbitraty transactions, then examine those transactions before fetching
for (String name : ListUtils.followedNames()) {
List<ArbitraryTransactionData> transactionsInReverseOrder;
// open database to get the transactions in reverse order for the followed name
try (final Repository repository = RepositoryManager.getRepository()) {
List<ArbitraryTransactionData> latestArbitraryTransactionsByName
= repository.getArbitraryRepository().getLatestArbitraryTransactionsByName(name);
if (limit.isPresent()) {
final int blockHeightThreshold = repository.getBlockRepository().getBlockchainHeight() - limit.getAsInt();
transactionsInReverseOrder
= latestArbitraryTransactionsByName.stream().filter(tx -> tx.getBlockHeight() > blockHeightThreshold)
.collect(Collectors.toList());
} else {
transactionsInReverseOrder = latestArbitraryTransactionsByName;
}
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
transactionsInReverseOrder = new ArrayList<>(0);
}
// collect process transaction hashes, so we don't fetch outdated transactions
Set<ArbitraryTransactionDataHashWrapper> processedTransactions = new HashSet<>();
ArbitraryDataStorageManager storageManager = ArbitraryDataStorageManager.getInstance();
// for each arbitrary transaction for the followed name process, evaluate, fetch
for (ArbitraryTransactionData arbitraryTransaction : transactionsInReverseOrder) {
boolean examined = false;
try (final Repository repository = RepositoryManager.getRepository()) {
// if not processed
if (!processedTransactions.contains(new ArbitraryTransactionDataHashWrapper(arbitraryTransaction))) {
boolean isLocal = repository.getArbitraryRepository().isDataLocal(arbitraryTransaction.getSignature());
// if not local, then continue to evaluate
if (!isLocal) {
// evaluate fetching status for this transaction on this node
ArbitraryDataExamination examination = storageManager.shouldPreFetchData(repository, arbitraryTransaction);
// if the evaluation passed, then fetch
examined = examination.isPass();
}
// if locally stored, then nothing needs to be done
// add to processed transactions
processedTransactions.add(new ArbitraryTransactionDataHashWrapper(arbitraryTransaction));
}
}
// if passed examination for fetching, then fetch
if (examined) {
LOGGER.info("for {} on {}, fetching {}", name, arbitraryTransaction.getService(), arbitraryTransaction.getIdentifier());
boolean fetched = ArbitraryDataFileListManager.getInstance().fetchArbitraryDataFileList(arbitraryTransaction);
LOGGER.info("fetched = " + fetched);
}
// pause a second before moving on to another transaction
Thread.sleep(1000);
}
}
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}
}

View File

@ -640,10 +640,13 @@ public class Peer {
return false; return false;
try { try {
this.outputBuffer = ByteBuffer.wrap(message.toBytes()); byte[] messageBytes = message.toBytes();
this.outputBuffer = ByteBuffer.wrap(messageBytes);
this.outputMessageType = message.getType().name(); this.outputMessageType = message.getType().name();
this.outputMessageId = message.getId(); this.outputMessageId = message.getId();
LOGGER.trace("[{}] Sending {} message with ID {} to peer {}", LOGGER.trace("[{}] Sending {} message with ID {} to peer {}",
this.peerConnectionId, this.outputMessageType, this.outputMessageId, this); this.peerConnectionId, this.outputMessageType, this.outputMessageId, this);
@ -662,12 +665,22 @@ public class Peer {
// If output byte buffer is not null, send from that // If output byte buffer is not null, send from that
int bytesWritten = this.socketChannel.write(outputBuffer); int bytesWritten = this.socketChannel.write(outputBuffer);
LOGGER.trace("[{}] Sent {} bytes of {} message with ID {} to peer {} ({} total)", this.peerConnectionId, int zeroSendCount = 0;
bytesWritten, this.outputMessageType, this.outputMessageId, this, outputBuffer.limit());
// If we've sent 0 bytes then socket buffer is full so we need to wait until it's empty again while (bytesWritten == 0) {
if (bytesWritten == 0) { if (zeroSendCount > 9) {
return true; LOGGER.debug("Socket write stuck for too long, returning");
return true;
}
try {
Thread.sleep(10); // 10MS CPU Sleep to try and give it time to flush the socket
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false; // optional, if you want to signal shutdown
}
zeroSendCount++;
bytesWritten = this.socketChannel.write(outputBuffer);
} }
// If we then exhaust the byte buffer, set it to null (otherwise loop and try to send more) // If we then exhaust the byte buffer, set it to null (otherwise loop and try to send more)
@ -729,7 +742,7 @@ public class Peer {
try { try {
// Queue message, to be picked up by ChannelWriteTask and then peer.writeChannel() // Queue message, to be picked up by ChannelWriteTask and then peer.writeChannel()
LOGGER.trace("[{}] Queuing {} message with ID {} to peer {}", this.peerConnectionId, LOGGER.debug("[{}] Queuing {} message with ID {} to peer {}", this.peerConnectionId,
message.getType().name(), message.getId(), this); message.getType().name(), message.getId(), this);
// Check message properly constructed // Check message properly constructed

View File

@ -0,0 +1,131 @@
package org.qortal.network;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.qortal.network.message.Message;
public class PeerSendManager {
private static final Logger LOGGER = LogManager.getLogger(PeerSendManager.class);
private static final int MAX_FAILURES = 15;
private static final int MAX_MESSAGE_ATTEMPTS = 2;
private static final int SEND_TIMEOUT_MS = 500;
private static final int RETRY_DELAY_MS = 100;
private static final long MAX_QUEUE_DURATION_MS = 20_000;
private static final long COOLDOWN_DURATION_MS = 20_000;
private final Peer peer;
private final BlockingQueue<TimedMessage> queue = new LinkedBlockingQueue<>();
private final ExecutorService executor;
private final AtomicInteger failureCount = new AtomicInteger(0);
private static final AtomicInteger threadCount = new AtomicInteger(1);
private volatile boolean coolingDown = false;
private volatile long lastUsed = System.currentTimeMillis();
public PeerSendManager(Peer peer) {
this.peer = peer;
this.executor = Executors.newSingleThreadExecutor(r -> {
Thread t = new Thread(r);
t.setName("PeerSendManager-" + peer.getResolvedAddress().getHostString() + "-" + threadCount.getAndIncrement());
return t;
});
start();
}
private void start() {
executor.submit(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
TimedMessage timedMessage = queue.take();
long age = System.currentTimeMillis() - timedMessage.timestamp;
if (age > MAX_QUEUE_DURATION_MS) {
LOGGER.debug("Dropping stale message {} ({}ms old)", timedMessage.message.getId(), age);
continue;
}
Message message = timedMessage.message;
boolean success = false;
for (int attempt = 1; attempt <= MAX_MESSAGE_ATTEMPTS; attempt++) {
try {
if (peer.sendMessageWithTimeout(message, SEND_TIMEOUT_MS)) {
success = true;
failureCount.set(0); // reset on success
break;
}
} catch (Exception e) {
LOGGER.debug("Attempt {} failed for message {} to peer {}: {}", attempt, message.getId(), peer, e.getMessage());
}
Thread.sleep(RETRY_DELAY_MS);
}
if (!success) {
int totalFailures = failureCount.incrementAndGet();
LOGGER.debug("Failed to send message {} to peer {}. Total failures: {}", message.getId(), peer, totalFailures);
if (totalFailures >= MAX_FAILURES) {
LOGGER.debug("Peer {} exceeded failure limit ({}). Disconnecting...", peer, totalFailures);
peer.disconnect("Too many message send failures");
coolingDown = true;
queue.clear();
try {
Thread.sleep(COOLDOWN_DURATION_MS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
} finally {
coolingDown = false;
failureCount.set(0);
}
}
}
Thread.sleep(50); // small throttle
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
LOGGER.error("Unexpected error in PeerSendManager for peer {}: {}", peer, e.getMessage(), e);
}
}
});
}
public void queueMessage(Message message) {
if (coolingDown) {
LOGGER.debug("In cooldown, ignoring message {}", message.getId());
return;
}
lastUsed = System.currentTimeMillis();
if (!queue.offer(new TimedMessage(message))) {
LOGGER.debug("Send queue full, dropping message {}", message.getId());
}
}
public boolean isIdle(long cutoffMillis) {
return System.currentTimeMillis() - lastUsed > cutoffMillis;
}
public void shutdown() {
queue.clear();
executor.shutdownNow();
}
private static class TimedMessage {
final Message message;
final long timestamp;
TimedMessage(Message message) {
this.message = message;
this.timestamp = System.currentTimeMillis();
}
}
}

View File

@ -31,7 +31,27 @@ public class ChannelWriteTask implements Task {
@Override @Override
public void perform() throws InterruptedException { public void perform() throws InterruptedException {
try { try {
boolean isSocketClogged = peer.writeChannel();
boolean isSocketClogged;
int clogCounter = 0;
do {
isSocketClogged = peer.writeChannel();
if (clogCounter > 9) {
LOGGER.warn("10 Socket Clogs - GIVING UP");
break;
}
if (isSocketClogged) {
LOGGER.debug(
"socket is clogged: peer = {} {}, retrying",
peer.getPeerData().getAddress().toString(),
Thread.currentThread().getName()
);
Thread.sleep(1000);
clogCounter++;
}
} while( isSocketClogged );
// Tell Network that we've finished // Tell Network that we've finished
Network.getInstance().notifyChannelNotWriting(socketChannel); Network.getInstance().notifyChannelNotWriting(socketChannel);

View File

@ -44,11 +44,9 @@ public class HSQLDBChatRepository implements ChatRepository {
// if the PrimaryTable is available, then use it // if the PrimaryTable is available, then use it
if( this.repository.getBlockRepository().getBlockchainHeight() > BlockChain.getInstance().getMultipleNamesPerAccountHeight()) { if( this.repository.getBlockRepository().getBlockchainHeight() > BlockChain.getInstance().getMultipleNamesPerAccountHeight()) {
LOGGER.debug("using PrimaryNames for chat transactions");
tableName = "PrimaryNames"; tableName = "PrimaryNames";
} }
else { else {
LOGGER.debug("using Names for chat transactions");
tableName = "Names"; tableName = "Names";
} }
@ -164,11 +162,9 @@ public class HSQLDBChatRepository implements ChatRepository {
// if the PrimaryTable is available, then use it // if the PrimaryTable is available, then use it
if( this.repository.getBlockRepository().getBlockchainHeight() > BlockChain.getInstance().getMultipleNamesPerAccountHeight()) { if( this.repository.getBlockRepository().getBlockchainHeight() > BlockChain.getInstance().getMultipleNamesPerAccountHeight()) {
LOGGER.debug("using PrimaryNames for chat transactions");
tableName = "PrimaryNames"; tableName = "PrimaryNames";
} }
else { else {
LOGGER.debug("using Names for chat transactions");
tableName = "Names"; tableName = "Names";
} }
@ -322,11 +318,9 @@ public class HSQLDBChatRepository implements ChatRepository {
// if the PrimaryTable is available, then use it // if the PrimaryTable is available, then use it
if( this.repository.getBlockRepository().getBlockchainHeight() > BlockChain.getInstance().getMultipleNamesPerAccountHeight()) { if( this.repository.getBlockRepository().getBlockchainHeight() > BlockChain.getInstance().getMultipleNamesPerAccountHeight()) {
LOGGER.debug("using PrimaryNames for chat transactions");
tableName = "PrimaryNames"; tableName = "PrimaryNames";
} }
else { else {
LOGGER.debug("using Names for chat transactions");
tableName = "Names"; tableName = "Names";
} }