Browse Source

Added arbitrary data file manager thread, which will ensure that all file list responses are tried until we receive the files.

Previously we would only try the first response and then discard the others due to being duplicates. They are now added to a queue and retried by the dedicated thread (up to the 60 second timeout).
block-minter-updates
CalDescent 3 years ago
parent
commit
114b1aac76
  1. 2
      src/main/java/org/qortal/controller/Controller.java
  2. 8
      src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileListManager.java
  3. 84
      src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java

2
src/main/java/org/qortal/controller/Controller.java

@ -452,6 +452,7 @@ public class Controller extends Thread {
// Arbitrary data controllers
LOGGER.info("Starting arbitrary-transaction controllers");
ArbitraryDataManager.getInstance().start();
ArbitraryDataFileManager.getInstance().start();
ArbitraryDataBuildManager.getInstance().start();
ArbitraryDataCleanupManager.getInstance().start();
ArbitraryDataStorageManager.getInstance().start();
@ -840,6 +841,7 @@ public class Controller extends Thread {
// Arbitrary data controllers
LOGGER.info("Shutting down arbitrary-transaction controllers");
ArbitraryDataManager.getInstance().shutdown();
ArbitraryDataFileManager.getInstance().shutdown();
ArbitraryDataBuildManager.getInstance().shutdown();
ArbitraryDataCleanupManager.getInstance().shutdown();
ArbitraryDataStorageManager.getInstance().shutdown();

8
src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileListManager.java

@ -431,6 +431,14 @@ public class ArbitraryDataFileListManager {
// }
if (!isRelayRequest || !Settings.getInstance().isRelayModeEnabled()) {
// Keep track of the hashes this peer reports to have access to
Long now = NTP.getTime();
for (byte[] hash : hashes) {
String hash58 = Base58.encode(hash);
String sig58 = Base58.encode(signature);
ArbitraryDataFileManager.getInstance().arbitraryDataFileHashResponses.put(hash58, new Triple<>(peer, sig58, now));
}
// Go and fetch the actual data, since this isn't a relay request
arbitraryDataFileManager.fetchArbitraryDataFiles(repository, peer, signature, arbitraryTransactionData, hashes);
}

84
src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java

@ -7,7 +7,6 @@ import org.qortal.controller.Controller;
import org.qortal.data.network.ArbitraryPeerData;
import org.qortal.data.network.PeerData;
import org.qortal.data.transaction.ArbitraryTransactionData;
import org.qortal.data.transaction.TransactionData;
import org.qortal.network.Network;
import org.qortal.network.Peer;
import org.qortal.network.message.*;
@ -24,11 +23,12 @@ import java.security.SecureRandom;
import java.util.*;
import java.util.stream.Collectors;
public class ArbitraryDataFileManager {
public class ArbitraryDataFileManager extends Thread {
private static final Logger LOGGER = LogManager.getLogger(ArbitraryDataFileManager.class);
private static ArbitraryDataFileManager instance;
private volatile boolean isStopping = false;
/**
@ -42,6 +42,13 @@ public class ArbitraryDataFileManager {
*/
public Map<String, Triple<String, Peer, Long>> arbitraryRelayMap = Collections.synchronizedMap(new HashMap<>());
/**
* Map to keep track of any arbitrary data file hash responses
* Key: string - the hash encoded in base58
* Value: Triple<respondingPeer, signature58, timeResponded>
*/
public Map<String, Triple<Peer, String, Long>> arbitraryDataFileHashResponses = Collections.synchronizedMap(new HashMap<>());
private ArbitraryDataFileManager() {
}
@ -53,6 +60,65 @@ public class ArbitraryDataFileManager {
return instance;
}
@Override
public void run() {
Thread.currentThread().setName("Arbitrary Data File Manager");
try {
while (!isStopping) {
Thread.sleep(1000);
Long now = NTP.getTime();
this.processFileHashes(now);
}
} catch (InterruptedException e) {
// Fall-through to exit thread...
}
}
public void shutdown() {
isStopping = true;
this.interrupt();
}
private void processFileHashes(Long now) {
try (final Repository repository = RepositoryManager.getRepository()) {
for (String hash58 : arbitraryDataFileHashResponses.keySet()) {
if (isStopping) {
return;
}
Triple<Peer, String, Long> value = arbitraryDataFileHashResponses.get(hash58);
if (value != null) {
Peer peer = value.getA();
String signature58 = value.getB();
Long timestamp = value.getC();
if (now - timestamp >= ArbitraryDataManager.ARBITRARY_RELAY_TIMEOUT || signature58 == null || peer == null) {
// Ignore - to be deleted
continue;
}
byte[] hash = Base58.decode(hash58);
byte[] signature = Base58.decode(signature58);
// Fetch the transaction data
ArbitraryTransactionData arbitraryTransactionData = ArbitraryTransactionUtils.fetchTransactionData(repository, signature);
if (arbitraryTransactionData == null) {
continue;
}
LOGGER.debug("Fetching file {} from peer {} via response queue...", hash58, peer);
this.fetchArbitraryDataFiles(repository, peer, signature, arbitraryTransactionData, Arrays.asList(hash));
}
}
} catch (DataException e) {
LOGGER.info("Unable to process file hashes: {}", e.getMessage());
}
}
public void cleanupRequestCache(Long now) {
if (now == null) {
@ -63,6 +129,7 @@ public class ArbitraryDataFileManager {
final long relayMinimumTimestamp = now - ArbitraryDataManager.getInstance().ARBITRARY_RELAY_TIMEOUT;
arbitraryRelayMap.entrySet().removeIf(entry -> entry.getValue().getC() == null || entry.getValue().getC() < relayMinimumTimestamp);
arbitraryDataFileHashResponses.entrySet().removeIf(entry -> entry.getValue().getC() == null || entry.getValue().getC() < relayMinimumTimestamp);
}
@ -83,10 +150,14 @@ public class ArbitraryDataFileManager {
// Now fetch actual data from this peer
for (byte[] hash : hashes) {
if (isStopping) {
return false;
}
String hash58 = Base58.encode(hash);
if (!arbitraryDataFile.chunkExists(hash)) {
// Only request the file if we aren't already requesting it from someone else
if (!arbitraryDataFileRequests.containsKey(Base58.encode(hash))) {
LOGGER.debug("Requesting data file {} from peer {}", Base58.encode(hash), peer);
LOGGER.debug("Requesting data file {} from peer {}", hash58, peer);
Long startTime = NTP.getTime();
ArbitraryDataFileMessage receivedArbitraryDataFileMessage = fetchArbitraryDataFile(peer, null, signature, hash, null);
Long endTime = NTP.getTime();
@ -97,11 +168,18 @@ public class ArbitraryDataFileManager {
else {
LOGGER.debug("Peer {} didn't respond with data file {} for signature {}. Time taken: {} ms", peer, Base58.encode(hash), Base58.encode(signature), (endTime-startTime));
}
// Remove this hash from arbitraryDataFileHashResponses now that we have tried to request it
arbitraryDataFileHashResponses.remove(hash58);
}
else {
LOGGER.trace("Already requesting data file {} for signature {}", arbitraryDataFile, Base58.encode(signature));
}
}
else {
// Remove this hash from arbitraryDataFileHashResponses because we have a local copy
arbitraryDataFileHashResponses.remove(hash58);
}
}
if (receivedAtLeastOneFile) {

Loading…
Cancel
Save