Update ArbitraryDataFileRequestThread.java

* Readability Improvements: Added comments and refactored long methods for better readability.
* Error Handling: Added more specific logging to help diagnose issues.
* Avoid Redundant Computations: Reorganized some logic to avoid unnecessary computations.
* Generics in Iterators: Replaced raw iterators with typed iterators.
* Thread-Safety: Minor adjustments to ensure synchronized operations are safe and well-encapsulated.
* Magic Numbers: Removed hardcoded values like 1000L, replacing them with constants for clarity.
This commit is contained in:
cwd.systems | 0KN 2024-11-27 22:08:44 +06:00 committed by GitHub
parent 8ffb0625a1
commit 9f41c5566d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -13,9 +13,9 @@ import org.qortal.utils.ArbitraryTransactionUtils;
import org.qortal.utils.Base58;
import org.qortal.utils.NTP;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import static java.lang.Thread.NORM_PRIORITY;
@ -23,8 +23,11 @@ public class ArbitraryDataFileRequestThread implements Runnable {
private static final Logger LOGGER = LogManager.getLogger(ArbitraryDataFileRequestThread.class);
public ArbitraryDataFileRequestThread() {
private static final long SLEEP_INTERVAL = 1000L; // 1 second
private static final ArbitraryDataFileManager FILE_MANAGER = ArbitraryDataFileManager.getInstance();
public ArbitraryDataFileRequestThread() {
// Default constructor
}
@Override
@ -34,95 +37,105 @@ public class ArbitraryDataFileRequestThread implements Runnable {
try {
while (!Controller.isStopping()) {
Long now = NTP.getTime();
this.processFileHashes(now);
processPendingFileHashes(NTP.getTime());
}
} catch (InterruptedException e) {
// Fall-through to exit thread...
Thread.currentThread().interrupt(); // Restore interrupt status
LOGGER.info("Arbitrary Data File Request Thread interrupted, exiting.");
}
}
private void processFileHashes(Long now) throws InterruptedException {
private void processPendingFileHashes(Long currentTime) throws InterruptedException {
if (Controller.isStopping()) {
return;
}
ArbitraryDataFileManager arbitraryDataFileManager = ArbitraryDataFileManager.getInstance();
String signature58 = null;
String hash58 = null;
Peer peer = null;
boolean shouldProcess = false;
ArbitraryFileListResponseInfo responseInfo = fetchNextValidResponseInfo(currentTime);
synchronized (arbitraryDataFileManager.arbitraryDataFileHashResponses) {
if (!arbitraryDataFileManager.arbitraryDataFileHashResponses.isEmpty()) {
// Sort by lowest number of node hops first
Comparator<ArbitraryFileListResponseInfo> lowestHopsFirstComparator =
Comparator.comparingInt(ArbitraryFileListResponseInfo::getRequestHops);
arbitraryDataFileManager.arbitraryDataFileHashResponses.sort(lowestHopsFirstComparator);
Iterator iterator = arbitraryDataFileManager.arbitraryDataFileHashResponses.iterator();
while (iterator.hasNext()) {
if (Controller.isStopping()) {
return;
}
ArbitraryFileListResponseInfo responseInfo = (ArbitraryFileListResponseInfo) iterator.next();
if (responseInfo == null) {
// No files to process, sleep and retry later
Thread.sleep(SLEEP_INTERVAL);
return;
}
// Extract and decode data
byte[] hash = decodeBase58(responseInfo.getHash58(), "hash");
byte[] signature = decodeBase58(responseInfo.getSignature58(), "signature");
Peer peer = responseInfo.getPeer();
if (hash == null || signature == null || peer == null) {
LOGGER.warn("Incomplete response info: skipping hash {}", responseInfo.getHash58());
return;
}
processFileFromPeer(hash, signature, peer);
}
private ArbitraryFileListResponseInfo fetchNextValidResponseInfo(Long currentTime) {
synchronized (FILE_MANAGER.arbitraryDataFileHashResponses) {
if (FILE_MANAGER.arbitraryDataFileHashResponses.isEmpty()) {
return null;
}
// Sort responses by the number of hops
FILE_MANAGER.arbitraryDataFileHashResponses.sort(
Comparator.comparingInt(ArbitraryFileListResponseInfo::getRequestHops)
);
Iterator<ArbitraryFileListResponseInfo> iterator = FILE_MANAGER.arbitraryDataFileHashResponses.iterator();
while (iterator.hasNext()) {
ArbitraryFileListResponseInfo responseInfo = iterator.next();
if (shouldRemoveResponse(responseInfo, currentTime)) {
iterator.remove();
continue;
}
hash58 = responseInfo.getHash58();
peer = responseInfo.getPeer();
signature58 = responseInfo.getSignature58();
if (!FILE_MANAGER.arbitraryDataFileRequests.containsKey(responseInfo.getHash58())) {
// Valid response to process
iterator.remove();
return responseInfo;
}
}
}
return null;
}
private boolean shouldRemoveResponse(ArbitraryFileListResponseInfo responseInfo, Long currentTime) {
if (responseInfo == null) {
return true;
}
Long timestamp = responseInfo.getTimestamp();
boolean isExpired = (currentTime - timestamp) >= ArbitraryDataManager.ARBITRARY_RELAY_TIMEOUT;
if (now - timestamp >= ArbitraryDataManager.ARBITRARY_RELAY_TIMEOUT || signature58 == null || peer == null) {
// Ignore - to be deleted
iterator.remove();
continue;
return isExpired || responseInfo.getSignature58() == null || responseInfo.getPeer() == null;
}
// Skip if already requesting, but don't remove, as we might want to retry later
if (arbitraryDataFileManager.arbitraryDataFileRequests.containsKey(hash58)) {
// Already requesting - leave this attempt for later
continue;
}
private void processFileFromPeer(byte[] hash, byte[] signature, Peer peer) {
try (Repository repository = RepositoryManager.getRepository()) {
ArbitraryTransactionData transactionData = ArbitraryTransactionUtils.fetchTransactionData(repository, signature);
// We want to process this file
shouldProcess = true;
iterator.remove();
break;
}
}
}
if (!shouldProcess) {
// Nothing to do
Thread.sleep(1000L);
if (transactionData == null) {
LOGGER.warn("Transaction data not found for signature: {}", Base58.encode(signature));
return;
}
byte[] hash = Base58.decode(hash58);
byte[] signature = Base58.decode(signature58);
// Fetch the transaction data
try (final Repository repository = RepositoryManager.getRepository()) {
ArbitraryTransactionData arbitraryTransactionData = ArbitraryTransactionUtils.fetchTransactionData(repository, signature);
if (arbitraryTransactionData == null) {
return;
}
if (signature == null || hash == null || peer == null || arbitraryTransactionData == null) {
return;
}
LOGGER.trace("Fetching file {} from peer {} via request thread...", hash58, peer);
arbitraryDataFileManager.fetchArbitraryDataFiles(repository, peer, signature, arbitraryTransactionData, Arrays.asList(hash));
LOGGER.trace("Fetching file {} from peer {} via request thread...", Base58.encode(hash), peer);
FILE_MANAGER.fetchArbitraryDataFiles(repository, peer, signature, transactionData, List.of(hash));
} catch (DataException e) {
LOGGER.debug("Unable to process file hashes: {}", e.getMessage());
LOGGER.error("Error processing file from peer: {}", e.getMessage(), e);
}
}
private byte[] decodeBase58(String input, String type) {
try {
return Base58.decode(input);
} catch (Exception e) {
LOGGER.error("Failed to decode {}: {}", type, input, e);
return null;
}
}
}