mirror of
https://github.com/Qortal/qortal.git
synced 2025-07-19 19:01:22 +00:00
Merge remote-tracking branch 'kenny/master' into master-10
This commit is contained in:
commit
086ed6574f
@ -1,6 +1,7 @@
|
||||
package org.qortal.arbitrary;
|
||||
|
||||
import com.google.common.io.Resources;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.lang3.ArrayUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
@ -15,11 +16,13 @@ import org.qortal.settings.Settings;
|
||||
import javax.servlet.ServletContext;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.net.URL;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.NoSuchFileException;
|
||||
@ -167,7 +170,14 @@ public class ArbitraryDataRenderer {
|
||||
if (HTMLParser.isHtmlFile(filename)) {
|
||||
// HTML file - needs to be parsed
|
||||
byte[] data = Files.readAllBytes(filePath); // TODO: limit file size that can be read into memory
|
||||
HTMLParser htmlParser = new HTMLParser(resourceId, inPath, prefix, includeResourceIdInPrefix, data, qdnContext, service, identifier, theme, usingCustomRouting, lang);
|
||||
String encodedResourceId;
|
||||
|
||||
if (resourceIdType == ResourceIdType.NAME) {
|
||||
encodedResourceId = resourceId.replace(" ", "%20");
|
||||
} else {
|
||||
encodedResourceId = resourceId;
|
||||
}
|
||||
HTMLParser htmlParser = new HTMLParser(encodedResourceId, inPath, prefix, includeResourceIdInPrefix, data, qdnContext, service, identifier, theme, usingCustomRouting, lang);
|
||||
htmlParser.addAdditionalHeaderTags();
|
||||
response.addHeader("Content-Security-Policy", "default-src 'self' 'unsafe-inline' 'unsafe-eval'; font-src 'self' data:; media-src 'self' data: blob:; img-src 'self' data: blob:; connect-src 'self' wss: blob:;");
|
||||
response.setContentType(context.getMimeType(filename));
|
||||
|
@ -567,6 +567,9 @@ public class Controller extends Thread {
|
||||
LOGGER.info("Starting foreign fees manager");
|
||||
ForeignFeesManager.getInstance().start();
|
||||
|
||||
LOGGER.info("Starting follower");
|
||||
Follower.getInstance().start();
|
||||
|
||||
LOGGER.info("Starting transaction importer");
|
||||
TransactionImporter.getInstance().start();
|
||||
|
||||
|
@ -402,8 +402,8 @@ public class ArbitraryDataFileListManager {
|
||||
return true;
|
||||
}
|
||||
|
||||
public void deleteFileListRequestsForSignature(byte[] signature) {
|
||||
String signature58 = Base58.encode(signature);
|
||||
public void deleteFileListRequestsForSignature(String signature58) {
|
||||
|
||||
for (Iterator<Map.Entry<Integer, Triple<String, Peer, Long>>> it = arbitraryDataFileListRequests.entrySet().iterator(); it.hasNext();) {
|
||||
Map.Entry<Integer, Triple<String, Peer, Long>> entry = it.next();
|
||||
if (entry == null || entry.getKey() == null || entry.getValue() == null) {
|
||||
|
@ -23,7 +23,6 @@ import org.qortal.utils.NTP;
|
||||
|
||||
import java.security.SecureRandom;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@ -70,6 +69,7 @@ public class ArbitraryDataFileManager extends Thread {
|
||||
|
||||
private ArbitraryDataFileManager() {
|
||||
this.arbitraryDataFileHashResponseScheduler.scheduleAtFixedRate( this::processResponses, 60, 1, TimeUnit.SECONDS);
|
||||
this.arbitraryDataFileHashResponseScheduler.scheduleAtFixedRate(this::handleFileListRequestProcess, 60, 1, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
public static ArbitraryDataFileManager getInstance() {
|
||||
@ -140,7 +140,7 @@ public class ArbitraryDataFileManager extends Thread {
|
||||
if (!arbitraryDataFileRequests.containsKey(Base58.encode(hash))) {
|
||||
LOGGER.debug("Requesting data file {} from peer {}", hash58, peer);
|
||||
Long startTime = NTP.getTime();
|
||||
ArbitraryDataFile receivedArbitraryDataFile = fetchArbitraryDataFile(peer, null, arbitraryTransactionData, signature, hash, null);
|
||||
ArbitraryDataFile receivedArbitraryDataFile = fetchArbitraryDataFile(peer, arbitraryTransactionData, signature, hash);
|
||||
Long endTime = NTP.getTime();
|
||||
if (receivedArbitraryDataFile != null) {
|
||||
LOGGER.debug("Received data file {} from peer {}. Time taken: {} ms", receivedArbitraryDataFile.getHash58(), peer, (endTime-startTime));
|
||||
@ -207,14 +207,71 @@ public class ArbitraryDataFileManager extends Thread {
|
||||
}
|
||||
}
|
||||
|
||||
private ArbitraryDataFile fetchArbitraryDataFile(Peer peer, Peer requestingPeer, ArbitraryTransactionData arbitraryTransactionData, byte[] signature, byte[] hash, Message originalMessage) throws DataException {
|
||||
ArbitraryDataFile existingFile = ArbitraryDataFile.fromHash(hash, signature);
|
||||
boolean fileAlreadyExists = existingFile.exists();
|
||||
String hash58 = Base58.encode(hash);
|
||||
private ArbitraryDataFile fetchArbitraryDataFile(Peer peer, ArbitraryTransactionData arbitraryTransactionData, byte[] signature, byte[] hash) throws DataException {
|
||||
ArbitraryDataFile arbitraryDataFile;
|
||||
|
||||
// Fetch the file if it doesn't exist locally
|
||||
if (!fileAlreadyExists) {
|
||||
try {
|
||||
ArbitraryDataFile existingFile = ArbitraryDataFile.fromHash(hash, signature);
|
||||
boolean fileAlreadyExists = existingFile.exists();
|
||||
String hash58 = Base58.encode(hash);
|
||||
|
||||
// Fetch the file if it doesn't exist locally
|
||||
if (!fileAlreadyExists) {
|
||||
LOGGER.debug(String.format("Fetching data file %.8s from peer %s", hash58, peer));
|
||||
arbitraryDataFileRequests.put(hash58, NTP.getTime());
|
||||
Message getArbitraryDataFileMessage = new GetArbitraryDataFileMessage(signature, hash);
|
||||
|
||||
Message response = null;
|
||||
try {
|
||||
response = peer.getResponseWithTimeout(getArbitraryDataFileMessage, (int) ArbitraryDataManager.ARBITRARY_REQUEST_TIMEOUT);
|
||||
} catch (InterruptedException e) {
|
||||
// Will return below due to null response
|
||||
}
|
||||
arbitraryDataFileRequests.remove(hash58);
|
||||
LOGGER.trace(String.format("Removed hash %.8s from arbitraryDataFileRequests", hash58));
|
||||
|
||||
if (response == null) {
|
||||
LOGGER.debug("Received null response from peer {}", peer);
|
||||
return null;
|
||||
}
|
||||
if (response.getType() != MessageType.ARBITRARY_DATA_FILE) {
|
||||
LOGGER.debug("Received response with invalid type: {} from peer {}", response.getType(), peer);
|
||||
return null;
|
||||
}
|
||||
|
||||
ArbitraryDataFileMessage peersArbitraryDataFileMessage = (ArbitraryDataFileMessage) response;
|
||||
arbitraryDataFile = peersArbitraryDataFileMessage.getArbitraryDataFile();
|
||||
} else {
|
||||
LOGGER.debug(String.format("File hash %s already exists, so skipping the request", hash58));
|
||||
arbitraryDataFile = existingFile;
|
||||
}
|
||||
|
||||
if (arbitraryDataFile != null) {
|
||||
|
||||
arbitraryDataFile.save();
|
||||
|
||||
// If this is a metadata file then we need to update the cache
|
||||
if (arbitraryTransactionData != null && arbitraryTransactionData.getMetadataHash() != null) {
|
||||
if (Arrays.equals(arbitraryTransactionData.getMetadataHash(), hash)) {
|
||||
ArbitraryDataCacheManager.getInstance().addToUpdateQueue(arbitraryTransactionData);
|
||||
}
|
||||
}
|
||||
|
||||
// We may need to remove the file list request, if we have all the files for this transaction
|
||||
this.handleFileListRequests(signature);
|
||||
}
|
||||
} catch (DataException e) {
|
||||
LOGGER.error(e.getMessage(), e);
|
||||
arbitraryDataFile = null;
|
||||
}
|
||||
|
||||
return arbitraryDataFile;
|
||||
}
|
||||
|
||||
private void fetchFileForRelay(Peer peer, Peer requestingPeer, byte[] signature, byte[] hash, Message originalMessage) throws DataException {
|
||||
try {
|
||||
String hash58 = Base58.encode(hash);
|
||||
|
||||
LOGGER.debug(String.format("Fetching data file %.8s from peer %s", hash58, peer));
|
||||
arbitraryDataFileRequests.put(hash58, NTP.getTime());
|
||||
Message getArbitraryDataFileMessage = new GetArbitraryDataFileMessage(signature, hash);
|
||||
@ -228,77 +285,75 @@ public class ArbitraryDataFileManager extends Thread {
|
||||
arbitraryDataFileRequests.remove(hash58);
|
||||
LOGGER.trace(String.format("Removed hash %.8s from arbitraryDataFileRequests", hash58));
|
||||
|
||||
|
||||
|
||||
if (response == null) {
|
||||
LOGGER.debug("Received null response from peer {}", peer);
|
||||
return null;
|
||||
return;
|
||||
}
|
||||
if (response.getType() != MessageType.ARBITRARY_DATA_FILE) {
|
||||
LOGGER.debug("Received response with invalid type: {} from peer {}", response.getType(), peer);
|
||||
return null;
|
||||
}
|
||||
|
||||
ArbitraryDataFileMessage peersArbitraryDataFileMessage = (ArbitraryDataFileMessage) response;
|
||||
arbitraryDataFile = peersArbitraryDataFileMessage.getArbitraryDataFile();
|
||||
} else {
|
||||
LOGGER.debug(String.format("File hash %s already exists, so skipping the request", hash58));
|
||||
arbitraryDataFile = existingFile;
|
||||
}
|
||||
|
||||
if (arbitraryDataFile == null) {
|
||||
// We don't have a file, so give up here
|
||||
return null;
|
||||
}
|
||||
|
||||
// We might want to forward the request to the peer that originally requested it
|
||||
this.handleArbitraryDataFileForwarding(requestingPeer, new ArbitraryDataFileMessage(signature, arbitraryDataFile), originalMessage);
|
||||
|
||||
boolean isRelayRequest = (requestingPeer != null);
|
||||
if (isRelayRequest) {
|
||||
if (!fileAlreadyExists) {
|
||||
// File didn't exist locally before the request, and it's a forwarding request, so delete it if it exists.
|
||||
// It shouldn't exist on the filesystem yet, but leaving this here just in case.
|
||||
arbitraryDataFile.delete(10);
|
||||
}
|
||||
}
|
||||
else {
|
||||
arbitraryDataFile.save();
|
||||
}
|
||||
|
||||
// If this is a metadata file then we need to update the cache
|
||||
if (arbitraryTransactionData != null && arbitraryTransactionData.getMetadataHash() != null) {
|
||||
if (Arrays.equals(arbitraryTransactionData.getMetadataHash(), hash)) {
|
||||
ArbitraryDataCacheManager.getInstance().addToUpdateQueue(arbitraryTransactionData);
|
||||
}
|
||||
}
|
||||
|
||||
// We may need to remove the file list request, if we have all the files for this transaction
|
||||
this.handleFileListRequests(signature);
|
||||
|
||||
return arbitraryDataFile;
|
||||
}
|
||||
|
||||
private void handleFileListRequests(byte[] signature) {
|
||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||
|
||||
// Fetch the transaction data
|
||||
ArbitraryTransactionData arbitraryTransactionData = ArbitraryTransactionUtils.fetchTransactionData(repository, signature);
|
||||
if (arbitraryTransactionData == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
boolean completeFileExists = ArbitraryTransactionUtils.completeFileExists(arbitraryTransactionData);
|
||||
ArbitraryDataFileMessage peersArbitraryDataFileMessage = (ArbitraryDataFileMessage) response;
|
||||
ArbitraryDataFile arbitraryDataFile = peersArbitraryDataFileMessage.getArbitraryDataFile();
|
||||
|
||||
if (completeFileExists) {
|
||||
String signature58 = Base58.encode(arbitraryTransactionData.getSignature());
|
||||
LOGGER.info("All chunks or complete file exist for transaction {}", signature58);
|
||||
|
||||
ArbitraryDataFileListManager.getInstance().deleteFileListRequestsForSignature(signature);
|
||||
if (arbitraryDataFile != null) {
|
||||
|
||||
// We might want to forward the request to the peer that originally requested it
|
||||
this.handleArbitraryDataFileForwarding(requestingPeer, new ArbitraryDataFileMessage(signature, arbitraryDataFile), originalMessage);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOGGER.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
Map<String, byte[]> signatureBySignature58 = new HashMap<>();
|
||||
|
||||
// Lock to synchronize access to the list
|
||||
private final Object handleFileListRequestsLock = new Object();
|
||||
|
||||
// Scheduled executor service to process messages every second
|
||||
private final ScheduledExecutorService handleFileListRequestsScheduler = Executors.newScheduledThreadPool(1);
|
||||
|
||||
private void handleFileListRequests(byte[] signature) {
|
||||
|
||||
synchronized (handleFileListRequestsLock) {
|
||||
signatureBySignature58.put(Base58.encode(signature), signature);
|
||||
}
|
||||
}
|
||||
|
||||
private void handleFileListRequestProcess() {
|
||||
|
||||
Map<String, byte[]> signaturesToProcess;
|
||||
|
||||
synchronized (handleFileListRequestsLock) {
|
||||
signaturesToProcess = new HashMap<>(signatureBySignature58);
|
||||
signatureBySignature58.clear();
|
||||
}
|
||||
|
||||
if( signaturesToProcess.isEmpty() ) return;
|
||||
|
||||
LOGGER.info("signatures to process = " + signaturesToProcess.size());
|
||||
|
||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||
|
||||
// Fetch the transaction data
|
||||
List<ArbitraryTransactionData> arbitraryTransactionDataList
|
||||
= ArbitraryTransactionUtils.fetchTransactionDataList(repository, new ArrayList<>(signaturesToProcess.values()));
|
||||
|
||||
for( ArbitraryTransactionData arbitraryTransactionData : arbitraryTransactionDataList ) {
|
||||
boolean completeFileExists = ArbitraryTransactionUtils.completeFileExists(arbitraryTransactionData);
|
||||
|
||||
if (completeFileExists) {
|
||||
String signature58 = Base58.encode(arbitraryTransactionData.getSignature());
|
||||
LOGGER.info("All chunks or complete file exist for transaction {}", signature58);
|
||||
|
||||
ArbitraryDataFileListManager.getInstance().deleteFileListRequestsForSignature(signature58);
|
||||
}
|
||||
}
|
||||
|
||||
} catch (DataException e) {
|
||||
LOGGER.debug("Unable to handle file list requests: {}", e.getMessage());
|
||||
} catch (Exception e) {
|
||||
LOGGER.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
@ -315,15 +370,19 @@ public class ArbitraryDataFileManager extends Thread {
|
||||
|
||||
LOGGER.debug("Received arbitrary data file - forwarding is needed");
|
||||
|
||||
// The ID needs to match that of the original request
|
||||
message.setId(originalMessage.getId());
|
||||
try {
|
||||
// The ID needs to match that of the original request
|
||||
message.setId(originalMessage.getId());
|
||||
|
||||
if (!requestingPeer.sendMessageWithTimeout(message, (int) ArbitraryDataManager.ARBITRARY_REQUEST_TIMEOUT)) {
|
||||
LOGGER.debug("Failed to forward arbitrary data file to peer {}", requestingPeer);
|
||||
requestingPeer.disconnect("failed to forward arbitrary data file");
|
||||
}
|
||||
else {
|
||||
LOGGER.debug("Forwarded arbitrary data file to peer {}", requestingPeer);
|
||||
if (!requestingPeer.sendMessageWithTimeout(message, (int) ArbitraryDataManager.ARBITRARY_REQUEST_TIMEOUT)) {
|
||||
LOGGER.info("Failed to forward arbitrary data file to peer {}", requestingPeer);
|
||||
requestingPeer.disconnect("failed to forward arbitrary data file");
|
||||
}
|
||||
else {
|
||||
LOGGER.info("Forwarded arbitrary data file to peer {}", requestingPeer);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOGGER.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
@ -615,7 +674,7 @@ public class ArbitraryDataFileManager extends Thread {
|
||||
LOGGER.debug("Asking peer {} for hash {}", peerToAsk, hash58);
|
||||
// No need to pass arbitraryTransactionData below because this is only used for metadata caching,
|
||||
// and metadata isn't retained when relaying.
|
||||
this.fetchArbitraryDataFile(peerToAsk, peer, null, signature, hash, message);
|
||||
this.fetchFileForRelay(peerToAsk, peer, signature, hash, message);
|
||||
}
|
||||
else {
|
||||
LOGGER.debug("Peer {} not found in relay info", peer);
|
||||
|
@ -15,6 +15,7 @@ import org.qortal.settings.Settings;
|
||||
import org.qortal.utils.ArbitraryTransactionUtils;
|
||||
import org.qortal.utils.Base58;
|
||||
import org.qortal.utils.NTP;
|
||||
import org.qortal.utils.NamedThreadFactory;
|
||||
|
||||
import java.net.http.HttpResponse;
|
||||
import java.util.ArrayList;
|
||||
@ -38,6 +39,9 @@ public class ArbitraryDataFileRequestThread {
|
||||
|
||||
private static final Logger LOGGER = LogManager.getLogger(ArbitraryDataFileRequestThread.class);
|
||||
|
||||
private static final Integer FETCHER_LIMIT_PER_PEER = Settings.getInstance().getMaxThreadsForMessageType(MessageType.GET_ARBITRARY_DATA_FILE);
|
||||
private static final String FETCHER_THREAD_PREFIX = "Arbitrary Data Fetcher ";
|
||||
|
||||
private ConcurrentHashMap<String, ExecutorService> executorByPeer = new ConcurrentHashMap<>();
|
||||
|
||||
private ArbitraryDataFileRequestThread() {
|
||||
@ -64,8 +68,9 @@ public class ArbitraryDataFileRequestThread {
|
||||
if (value instanceof ThreadPoolExecutor) {
|
||||
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) value;
|
||||
if (threadPoolExecutor.getActiveCount() == 0) {
|
||||
threadPoolExecutor.shutdown();
|
||||
if (this.executorByPeer.computeIfPresent(key, (k, v) -> null) == null) {
|
||||
LOGGER.info("removed executor: peer = " + key);
|
||||
LOGGER.trace("removed executor: peer = " + key);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@ -147,7 +152,9 @@ public class ArbitraryDataFileRequestThread {
|
||||
.computeIfAbsent(
|
||||
responseInfo.getPeer().toString(),
|
||||
peer -> Executors.newFixedThreadPool(
|
||||
Settings.getInstance().getMaxThreadsForMessageType(MessageType.GET_ARBITRARY_DATA_FILE))
|
||||
FETCHER_LIMIT_PER_PEER,
|
||||
new NamedThreadFactory(FETCHER_THREAD_PREFIX + responseInfo.getPeer().toString(), NORM_PRIORITY)
|
||||
)
|
||||
)
|
||||
.execute(fetcher);
|
||||
}
|
||||
|
130
src/main/java/org/qortal/controller/arbitrary/Follower.java
Normal file
130
src/main/java/org/qortal/controller/arbitrary/Follower.java
Normal file
@ -0,0 +1,130 @@
|
||||
package org.qortal.controller.arbitrary;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.qortal.data.transaction.ArbitraryTransactionData;
|
||||
import org.qortal.repository.Repository;
|
||||
import org.qortal.repository.RepositoryManager;
|
||||
import org.qortal.settings.Settings;
|
||||
import org.qortal.utils.ListUtils;
|
||||
import org.qortal.utils.NamedThreadFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.OptionalInt;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class Follower {
|
||||
|
||||
private static final Logger LOGGER = LogManager.getLogger(Follower.class);
|
||||
|
||||
private ScheduledExecutorService service
|
||||
= Executors.newScheduledThreadPool(2, new NamedThreadFactory("Follower", Thread.NORM_PRIORITY));
|
||||
|
||||
private Follower() {
|
||||
|
||||
}
|
||||
|
||||
private static Follower instance;
|
||||
|
||||
public static Follower getInstance() {
|
||||
|
||||
if( instance == null ) {
|
||||
instance = new Follower();
|
||||
}
|
||||
|
||||
return instance;
|
||||
}
|
||||
|
||||
public void start() {
|
||||
|
||||
// fetch arbitrary transactions from followed names from the last 100 blocks every 2 minutes
|
||||
service.scheduleWithFixedDelay(() -> fetch(OptionalInt.of(100)), 10, 2, TimeUnit.MINUTES);
|
||||
|
||||
// fetch arbitrary transaction from followed names from any block every 24 hours
|
||||
service.scheduleWithFixedDelay(() -> fetch(OptionalInt.empty()), 4, 24, TimeUnit.HOURS);
|
||||
}
|
||||
|
||||
private void fetch(OptionalInt limit) {
|
||||
|
||||
try {
|
||||
// for each followed name, get arbitraty transactions, then examine those transactions before fetching
|
||||
for (String name : ListUtils.followedNames()) {
|
||||
|
||||
List<ArbitraryTransactionData> transactionsInReverseOrder;
|
||||
|
||||
// open database to get the transactions in reverse order for the followed name
|
||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||
|
||||
List<ArbitraryTransactionData> latestArbitraryTransactionsByName
|
||||
= repository.getArbitraryRepository().getLatestArbitraryTransactionsByName(name);
|
||||
|
||||
if (limit.isPresent()) {
|
||||
final int blockHeightThreshold = repository.getBlockRepository().getBlockchainHeight() - limit.getAsInt();
|
||||
|
||||
transactionsInReverseOrder
|
||||
= latestArbitraryTransactionsByName.stream().filter(tx -> tx.getBlockHeight() > blockHeightThreshold)
|
||||
.collect(Collectors.toList());
|
||||
} else {
|
||||
transactionsInReverseOrder = latestArbitraryTransactionsByName;
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
LOGGER.error(e.getMessage(), e);
|
||||
transactionsInReverseOrder = new ArrayList<>(0);
|
||||
}
|
||||
|
||||
// collect process transaction hashes, so we don't fetch outdated transactions
|
||||
Set<ArbitraryTransactionDataHashWrapper> processedTransactions = new HashSet<>();
|
||||
|
||||
ArbitraryDataStorageManager storageManager = ArbitraryDataStorageManager.getInstance();
|
||||
|
||||
// for each arbitrary transaction for the followed name process, evaluate, fetch
|
||||
for (ArbitraryTransactionData arbitraryTransaction : transactionsInReverseOrder) {
|
||||
|
||||
boolean examined = false;
|
||||
|
||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||
|
||||
// if not processed
|
||||
if (!processedTransactions.contains(new ArbitraryTransactionDataHashWrapper(arbitraryTransaction))) {
|
||||
boolean isLocal = repository.getArbitraryRepository().isDataLocal(arbitraryTransaction.getSignature());
|
||||
|
||||
// if not local, then continue to evaluate
|
||||
if (!isLocal) {
|
||||
|
||||
// evaluate fetching status for this transaction on this node
|
||||
ArbitraryDataExamination examination = storageManager.shouldPreFetchData(repository, arbitraryTransaction);
|
||||
|
||||
// if the evaluation passed, then fetch
|
||||
examined = examination.isPass();
|
||||
}
|
||||
// if locally stored, then nothing needs to be done
|
||||
|
||||
// add to processed transactions
|
||||
processedTransactions.add(new ArbitraryTransactionDataHashWrapper(arbitraryTransaction));
|
||||
}
|
||||
}
|
||||
|
||||
// if passed examination for fetching, then fetch
|
||||
if (examined) {
|
||||
LOGGER.info("for {} on {}, fetching {}", name, arbitraryTransaction.getService(), arbitraryTransaction.getIdentifier());
|
||||
boolean fetched = ArbitraryDataFileListManager.getInstance().fetchArbitraryDataFileList(arbitraryTransaction);
|
||||
|
||||
LOGGER.info("fetched = " + fetched);
|
||||
}
|
||||
|
||||
// pause a second before moving on to another transaction
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOGGER.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
@ -31,7 +31,20 @@ public class ChannelWriteTask implements Task {
|
||||
@Override
|
||||
public void perform() throws InterruptedException {
|
||||
try {
|
||||
boolean isSocketClogged = peer.writeChannel();
|
||||
|
||||
boolean isSocketClogged;
|
||||
do {
|
||||
isSocketClogged = peer.writeChannel();
|
||||
|
||||
if (isSocketClogged) {
|
||||
LOGGER.info(
|
||||
"socket is clogged: peer = {} {}, retrying",
|
||||
peer.getPeerData().getAddress().toString(),
|
||||
Thread.currentThread().getName()
|
||||
);
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
} while( isSocketClogged );
|
||||
|
||||
// Tell Network that we've finished
|
||||
Network.getInstance().notifyChannelNotWriting(socketChannel);
|
||||
|
Loading…
x
Reference in New Issue
Block a user