Merge 9f41c5566d379b2ba5d59d12b7df8c1de4cea471 into 8ffb0625a1edcf0b3d1ec2498b15a31ec38ade3c

This commit is contained in:
cwd.systems | 0KN 2024-11-27 22:09:08 +06:00 committed by GitHub
commit 1551516df5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -13,9 +13,9 @@ import org.qortal.utils.ArbitraryTransactionUtils;
import org.qortal.utils.Base58; import org.qortal.utils.Base58;
import org.qortal.utils.NTP; import org.qortal.utils.NTP;
import java.util.Arrays;
import java.util.Comparator; import java.util.Comparator;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import static java.lang.Thread.NORM_PRIORITY; import static java.lang.Thread.NORM_PRIORITY;
@ -23,8 +23,11 @@ public class ArbitraryDataFileRequestThread implements Runnable {
private static final Logger LOGGER = LogManager.getLogger(ArbitraryDataFileRequestThread.class); private static final Logger LOGGER = LogManager.getLogger(ArbitraryDataFileRequestThread.class);
public ArbitraryDataFileRequestThread() { private static final long SLEEP_INTERVAL = 1000L; // 1 second
private static final ArbitraryDataFileManager FILE_MANAGER = ArbitraryDataFileManager.getInstance();
public ArbitraryDataFileRequestThread() {
// Default constructor
} }
@Override @Override
@ -34,95 +37,105 @@ public class ArbitraryDataFileRequestThread implements Runnable {
try { try {
while (!Controller.isStopping()) { while (!Controller.isStopping()) {
Long now = NTP.getTime(); processPendingFileHashes(NTP.getTime());
this.processFileHashes(now);
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
// Fall-through to exit thread... Thread.currentThread().interrupt(); // Restore interrupt status
LOGGER.info("Arbitrary Data File Request Thread interrupted, exiting.");
} }
} }
private void processFileHashes(Long now) throws InterruptedException { private void processPendingFileHashes(Long currentTime) throws InterruptedException {
if (Controller.isStopping()) { if (Controller.isStopping()) {
return; return;
} }
ArbitraryDataFileManager arbitraryDataFileManager = ArbitraryDataFileManager.getInstance(); ArbitraryFileListResponseInfo responseInfo = fetchNextValidResponseInfo(currentTime);
String signature58 = null;
String hash58 = null;
Peer peer = null;
boolean shouldProcess = false;
synchronized (arbitraryDataFileManager.arbitraryDataFileHashResponses) { if (responseInfo == null) {
if (!arbitraryDataFileManager.arbitraryDataFileHashResponses.isEmpty()) { // No files to process, sleep and retry later
Thread.sleep(SLEEP_INTERVAL);
return;
}
// Sort by lowest number of node hops first // Extract and decode data
Comparator<ArbitraryFileListResponseInfo> lowestHopsFirstComparator = byte[] hash = decodeBase58(responseInfo.getHash58(), "hash");
Comparator.comparingInt(ArbitraryFileListResponseInfo::getRequestHops); byte[] signature = decodeBase58(responseInfo.getSignature58(), "signature");
arbitraryDataFileManager.arbitraryDataFileHashResponses.sort(lowestHopsFirstComparator); Peer peer = responseInfo.getPeer();
Iterator iterator = arbitraryDataFileManager.arbitraryDataFileHashResponses.iterator(); if (hash == null || signature == null || peer == null) {
while (iterator.hasNext()) { LOGGER.warn("Incomplete response info: skipping hash {}", responseInfo.getHash58());
if (Controller.isStopping()) { return;
return; }
}
ArbitraryFileListResponseInfo responseInfo = (ArbitraryFileListResponseInfo) iterator.next(); processFileFromPeer(hash, signature, peer);
if (responseInfo == null) { }
iterator.remove();
continue;
}
hash58 = responseInfo.getHash58(); private ArbitraryFileListResponseInfo fetchNextValidResponseInfo(Long currentTime) {
peer = responseInfo.getPeer(); synchronized (FILE_MANAGER.arbitraryDataFileHashResponses) {
signature58 = responseInfo.getSignature58(); if (FILE_MANAGER.arbitraryDataFileHashResponses.isEmpty()) {
Long timestamp = responseInfo.getTimestamp(); return null;
}
if (now - timestamp >= ArbitraryDataManager.ARBITRARY_RELAY_TIMEOUT || signature58 == null || peer == null) { // Sort responses by the number of hops
// Ignore - to be deleted FILE_MANAGER.arbitraryDataFileHashResponses.sort(
iterator.remove(); Comparator.comparingInt(ArbitraryFileListResponseInfo::getRequestHops)
continue; );
}
// Skip if already requesting, but don't remove, as we might want to retry later Iterator<ArbitraryFileListResponseInfo> iterator = FILE_MANAGER.arbitraryDataFileHashResponses.iterator();
if (arbitraryDataFileManager.arbitraryDataFileRequests.containsKey(hash58)) {
// Already requesting - leave this attempt for later
continue;
}
// We want to process this file while (iterator.hasNext()) {
shouldProcess = true; ArbitraryFileListResponseInfo responseInfo = iterator.next();
if (shouldRemoveResponse(responseInfo, currentTime)) {
iterator.remove(); iterator.remove();
break; continue;
}
if (!FILE_MANAGER.arbitraryDataFileRequests.containsKey(responseInfo.getHash58())) {
// Valid response to process
iterator.remove();
return responseInfo;
} }
} }
} }
if (!shouldProcess) { return null;
// Nothing to do }
Thread.sleep(1000L);
return; private boolean shouldRemoveResponse(ArbitraryFileListResponseInfo responseInfo, Long currentTime) {
if (responseInfo == null) {
return true;
} }
byte[] hash = Base58.decode(hash58); Long timestamp = responseInfo.getTimestamp();
byte[] signature = Base58.decode(signature58); boolean isExpired = (currentTime - timestamp) >= ArbitraryDataManager.ARBITRARY_RELAY_TIMEOUT;
// Fetch the transaction data return isExpired || responseInfo.getSignature58() == null || responseInfo.getPeer() == null;
try (final Repository repository = RepositoryManager.getRepository()) { }
ArbitraryTransactionData arbitraryTransactionData = ArbitraryTransactionUtils.fetchTransactionData(repository, signature);
if (arbitraryTransactionData == null) { private void processFileFromPeer(byte[] hash, byte[] signature, Peer peer) {
try (Repository repository = RepositoryManager.getRepository()) {
ArbitraryTransactionData transactionData = ArbitraryTransactionUtils.fetchTransactionData(repository, signature);
if (transactionData == null) {
LOGGER.warn("Transaction data not found for signature: {}", Base58.encode(signature));
return; return;
} }
if (signature == null || hash == null || peer == null || arbitraryTransactionData == null) { LOGGER.trace("Fetching file {} from peer {} via request thread...", Base58.encode(hash), peer);
return; FILE_MANAGER.fetchArbitraryDataFiles(repository, peer, signature, transactionData, List.of(hash));
}
LOGGER.trace("Fetching file {} from peer {} via request thread...", hash58, peer);
arbitraryDataFileManager.fetchArbitraryDataFiles(repository, peer, signature, arbitraryTransactionData, Arrays.asList(hash));
} catch (DataException e) { } catch (DataException e) {
LOGGER.debug("Unable to process file hashes: {}", e.getMessage()); LOGGER.error("Error processing file from peer: {}", e.getMessage(), e);
}
}
private byte[] decodeBase58(String input, String type) {
try {
return Base58.decode(input);
} catch (Exception e) {
LOGGER.error("Failed to decode {}: {}", type, input, e);
return null;
} }
} }
} }