qdn relay optimizations

This commit is contained in:
kennycud 2025-07-09 12:34:47 -07:00
parent 44d26b513a
commit 4b56690118
2 changed files with 138 additions and 79 deletions

View File

@ -402,8 +402,8 @@ public class ArbitraryDataFileListManager {
return true; return true;
} }
public void deleteFileListRequestsForSignature(byte[] signature) { public void deleteFileListRequestsForSignature(String signature58) {
String signature58 = Base58.encode(signature);
for (Iterator<Map.Entry<Integer, Triple<String, Peer, Long>>> it = arbitraryDataFileListRequests.entrySet().iterator(); it.hasNext();) { for (Iterator<Map.Entry<Integer, Triple<String, Peer, Long>>> it = arbitraryDataFileListRequests.entrySet().iterator(); it.hasNext();) {
Map.Entry<Integer, Triple<String, Peer, Long>> entry = it.next(); Map.Entry<Integer, Triple<String, Peer, Long>> entry = it.next();
if (entry == null || entry.getKey() == null || entry.getValue() == null) { if (entry == null || entry.getKey() == null || entry.getValue() == null) {

View File

@ -23,7 +23,6 @@ import org.qortal.utils.NTP;
import java.security.SecureRandom; import java.security.SecureRandom;
import java.util.*; import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -70,6 +69,7 @@ public class ArbitraryDataFileManager extends Thread {
private ArbitraryDataFileManager() { private ArbitraryDataFileManager() {
this.arbitraryDataFileHashResponseScheduler.scheduleAtFixedRate( this::processResponses, 60, 1, TimeUnit.SECONDS); this.arbitraryDataFileHashResponseScheduler.scheduleAtFixedRate( this::processResponses, 60, 1, TimeUnit.SECONDS);
this.arbitraryDataFileHashResponseScheduler.scheduleAtFixedRate(this::handleFileListRequestProcess, 60, 1, TimeUnit.SECONDS);
} }
public static ArbitraryDataFileManager getInstance() { public static ArbitraryDataFileManager getInstance() {
@ -140,7 +140,7 @@ public class ArbitraryDataFileManager extends Thread {
if (!arbitraryDataFileRequests.containsKey(Base58.encode(hash))) { if (!arbitraryDataFileRequests.containsKey(Base58.encode(hash))) {
LOGGER.debug("Requesting data file {} from peer {}", hash58, peer); LOGGER.debug("Requesting data file {} from peer {}", hash58, peer);
Long startTime = NTP.getTime(); Long startTime = NTP.getTime();
ArbitraryDataFile receivedArbitraryDataFile = fetchArbitraryDataFile(peer, null, arbitraryTransactionData, signature, hash, null); ArbitraryDataFile receivedArbitraryDataFile = fetchArbitraryDataFile(peer, arbitraryTransactionData, signature, hash);
Long endTime = NTP.getTime(); Long endTime = NTP.getTime();
if (receivedArbitraryDataFile != null) { if (receivedArbitraryDataFile != null) {
LOGGER.debug("Received data file {} from peer {}. Time taken: {} ms", receivedArbitraryDataFile.getHash58(), peer, (endTime-startTime)); LOGGER.debug("Received data file {} from peer {}. Time taken: {} ms", receivedArbitraryDataFile.getHash58(), peer, (endTime-startTime));
@ -207,11 +207,13 @@ public class ArbitraryDataFileManager extends Thread {
} }
} }
private ArbitraryDataFile fetchArbitraryDataFile(Peer peer, Peer requestingPeer, ArbitraryTransactionData arbitraryTransactionData, byte[] signature, byte[] hash, Message originalMessage) throws DataException { private ArbitraryDataFile fetchArbitraryDataFile(Peer peer, ArbitraryTransactionData arbitraryTransactionData, byte[] signature, byte[] hash) throws DataException {
ArbitraryDataFile arbitraryDataFile;
try {
ArbitraryDataFile existingFile = ArbitraryDataFile.fromHash(hash, signature); ArbitraryDataFile existingFile = ArbitraryDataFile.fromHash(hash, signature);
boolean fileAlreadyExists = existingFile.exists(); boolean fileAlreadyExists = existingFile.exists();
String hash58 = Base58.encode(hash); String hash58 = Base58.encode(hash);
ArbitraryDataFile arbitraryDataFile;
// Fetch the file if it doesn't exist locally // Fetch the file if it doesn't exist locally
if (!fileAlreadyExists) { if (!fileAlreadyExists) {
@ -228,8 +230,6 @@ public class ArbitraryDataFileManager extends Thread {
arbitraryDataFileRequests.remove(hash58); arbitraryDataFileRequests.remove(hash58);
LOGGER.trace(String.format("Removed hash %.8s from arbitraryDataFileRequests", hash58)); LOGGER.trace(String.format("Removed hash %.8s from arbitraryDataFileRequests", hash58));
if (response == null) { if (response == null) {
LOGGER.debug("Received null response from peer {}", peer); LOGGER.debug("Received null response from peer {}", peer);
return null; return null;
@ -246,25 +246,9 @@ public class ArbitraryDataFileManager extends Thread {
arbitraryDataFile = existingFile; arbitraryDataFile = existingFile;
} }
if (arbitraryDataFile == null) { if (arbitraryDataFile != null) {
// We don't have a file, so give up here
return null;
}
// We might want to forward the request to the peer that originally requested it
this.handleArbitraryDataFileForwarding(requestingPeer, new ArbitraryDataFileMessage(signature, arbitraryDataFile), originalMessage);
boolean isRelayRequest = (requestingPeer != null);
if (isRelayRequest) {
if (!fileAlreadyExists) {
// File didn't exist locally before the request, and it's a forwarding request, so delete it if it exists.
// It shouldn't exist on the filesystem yet, but leaving this here just in case.
arbitraryDataFile.delete(10);
}
}
else {
arbitraryDataFile.save(); arbitraryDataFile.save();
}
// If this is a metadata file then we need to update the cache // If this is a metadata file then we need to update the cache
if (arbitraryTransactionData != null && arbitraryTransactionData.getMetadataHash() != null) { if (arbitraryTransactionData != null && arbitraryTransactionData.getMetadataHash() != null) {
@ -275,30 +259,101 @@ public class ArbitraryDataFileManager extends Thread {
// We may need to remove the file list request, if we have all the files for this transaction // We may need to remove the file list request, if we have all the files for this transaction
this.handleFileListRequests(signature); this.handleFileListRequests(signature);
}
} catch (DataException e) {
LOGGER.error(e.getMessage(), e);
arbitraryDataFile = null;
}
return arbitraryDataFile; return arbitraryDataFile;
} }
private void handleFileListRequests(byte[] signature) { private void fetchFileForRelay(Peer peer, Peer requestingPeer, byte[] signature, byte[] hash, Message originalMessage) throws DataException {
try (final Repository repository = RepositoryManager.getRepository()) { try {
String hash58 = Base58.encode(hash);
// Fetch the transaction data LOGGER.debug(String.format("Fetching data file %.8s from peer %s", hash58, peer));
ArbitraryTransactionData arbitraryTransactionData = ArbitraryTransactionUtils.fetchTransactionData(repository, signature); arbitraryDataFileRequests.put(hash58, NTP.getTime());
if (arbitraryTransactionData == null) { Message getArbitraryDataFileMessage = new GetArbitraryDataFileMessage(signature, hash);
Message response = null;
try {
response = peer.getResponseWithTimeout(getArbitraryDataFileMessage, (int) ArbitraryDataManager.ARBITRARY_REQUEST_TIMEOUT);
} catch (InterruptedException e) {
// Will return below due to null response
}
arbitraryDataFileRequests.remove(hash58);
LOGGER.trace(String.format("Removed hash %.8s from arbitraryDataFileRequests", hash58));
if (response == null) {
LOGGER.debug("Received null response from peer {}", peer);
return;
}
if (response.getType() != MessageType.ARBITRARY_DATA_FILE) {
LOGGER.debug("Received response with invalid type: {} from peer {}", response.getType(), peer);
return; return;
} }
ArbitraryDataFileMessage peersArbitraryDataFileMessage = (ArbitraryDataFileMessage) response;
ArbitraryDataFile arbitraryDataFile = peersArbitraryDataFileMessage.getArbitraryDataFile();
if (arbitraryDataFile != null) {
// We might want to forward the request to the peer that originally requested it
this.handleArbitraryDataFileForwarding(requestingPeer, new ArbitraryDataFileMessage(signature, arbitraryDataFile), originalMessage);
}
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}
Map<String, byte[]> signatureBySignature58 = new HashMap<>();
// Lock to synchronize access to the list
private final Object handleFileListRequestsLock = new Object();
// Scheduled executor service to process messages every second
private final ScheduledExecutorService handleFileListRequestsScheduler = Executors.newScheduledThreadPool(1);
private void handleFileListRequests(byte[] signature) {
synchronized (handleFileListRequestsLock) {
signatureBySignature58.put(Base58.encode(signature), signature);
}
}
private void handleFileListRequestProcess() {
Map<String, byte[]> signaturesToProcess;
synchronized (handleFileListRequestsLock) {
signaturesToProcess = new HashMap<>(signatureBySignature58);
signatureBySignature58.clear();
}
if( signaturesToProcess.isEmpty() ) return;
LOGGER.info("signatures to process = " + signaturesToProcess.size());
try (final Repository repository = RepositoryManager.getRepository()) {
// Fetch the transaction data
List<ArbitraryTransactionData> arbitraryTransactionDataList
= ArbitraryTransactionUtils.fetchTransactionDataList(repository, new ArrayList<>(signaturesToProcess.values()));
for( ArbitraryTransactionData arbitraryTransactionData : arbitraryTransactionDataList ) {
boolean completeFileExists = ArbitraryTransactionUtils.completeFileExists(arbitraryTransactionData); boolean completeFileExists = ArbitraryTransactionUtils.completeFileExists(arbitraryTransactionData);
if (completeFileExists) { if (completeFileExists) {
String signature58 = Base58.encode(arbitraryTransactionData.getSignature()); String signature58 = Base58.encode(arbitraryTransactionData.getSignature());
LOGGER.info("All chunks or complete file exist for transaction {}", signature58); LOGGER.info("All chunks or complete file exist for transaction {}", signature58);
ArbitraryDataFileListManager.getInstance().deleteFileListRequestsForSignature(signature); ArbitraryDataFileListManager.getInstance().deleteFileListRequestsForSignature(signature58);
}
} }
} catch (DataException e) { } catch (Exception e) {
LOGGER.debug("Unable to handle file list requests: {}", e.getMessage()); LOGGER.error(e.getMessage(), e);
} }
} }
@ -315,15 +370,19 @@ public class ArbitraryDataFileManager extends Thread {
LOGGER.debug("Received arbitrary data file - forwarding is needed"); LOGGER.debug("Received arbitrary data file - forwarding is needed");
try {
// The ID needs to match that of the original request // The ID needs to match that of the original request
message.setId(originalMessage.getId()); message.setId(originalMessage.getId());
if (!requestingPeer.sendMessageWithTimeout(message, (int) ArbitraryDataManager.ARBITRARY_REQUEST_TIMEOUT)) { if (!requestingPeer.sendMessageWithTimeout(message, (int) ArbitraryDataManager.ARBITRARY_REQUEST_TIMEOUT)) {
LOGGER.debug("Failed to forward arbitrary data file to peer {}", requestingPeer); LOGGER.info("Failed to forward arbitrary data file to peer {}", requestingPeer);
requestingPeer.disconnect("failed to forward arbitrary data file"); requestingPeer.disconnect("failed to forward arbitrary data file");
} }
else { else {
LOGGER.debug("Forwarded arbitrary data file to peer {}", requestingPeer); LOGGER.info("Forwarded arbitrary data file to peer {}", requestingPeer);
}
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
} }
} }
@ -615,7 +674,7 @@ public class ArbitraryDataFileManager extends Thread {
LOGGER.debug("Asking peer {} for hash {}", peerToAsk, hash58); LOGGER.debug("Asking peer {} for hash {}", peerToAsk, hash58);
// No need to pass arbitraryTransactionData below because this is only used for metadata caching, // No need to pass arbitraryTransactionData below because this is only used for metadata caching,
// and metadata isn't retained when relaying. // and metadata isn't retained when relaying.
this.fetchArbitraryDataFile(peerToAsk, peer, null, signature, hash, message); this.fetchFileForRelay(peerToAsk, peer, signature, hash, message);
} }
else { else {
LOGGER.debug("Peer {} not found in relay info", peer); LOGGER.debug("Peer {} not found in relay info", peer);