Refactor: moved arbitrary data code from Controller to ArbitraryDataManager

This commit is contained in:
CalDescent 2021-07-11 10:17:38 +01:00
parent 2679252b04
commit 483557163e
2 changed files with 319 additions and 297 deletions

View File

@ -1,29 +1,62 @@
package org.qortal.controller; package org.qortal.controller;
import java.util.Arrays; import java.util.*;
import java.util.List;
import java.util.Random;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.qortal.api.resource.TransactionsResource.ConfirmationStatus; import org.qortal.api.resource.TransactionsResource.ConfirmationStatus;
import org.qortal.data.transaction.ArbitraryTransactionData; import org.qortal.data.transaction.ArbitraryTransactionData;
import org.qortal.data.transaction.TransactionData; import org.qortal.data.transaction.TransactionData;
import org.qortal.network.Network;
import org.qortal.network.Peer;
import org.qortal.network.message.*;
import org.qortal.repository.DataException; import org.qortal.repository.DataException;
import org.qortal.repository.Repository; import org.qortal.repository.Repository;
import org.qortal.repository.RepositoryManager; import org.qortal.repository.RepositoryManager;
import org.qortal.storage.DataFile;
import org.qortal.storage.DataFileChunk;
import org.qortal.transaction.ArbitraryTransaction; import org.qortal.transaction.ArbitraryTransaction;
import org.qortal.transaction.Transaction.TransactionType; import org.qortal.transaction.Transaction.TransactionType;
import org.qortal.utils.Base58;
import org.qortal.utils.NTP;
import org.qortal.utils.Triple;
public class ArbitraryDataManager extends Thread { public class ArbitraryDataManager extends Thread {
private static final Logger LOGGER = LogManager.getLogger(ArbitraryDataManager.class); private static final Logger LOGGER = LogManager.getLogger(ArbitraryDataManager.class);
private static final List<TransactionType> ARBITRARY_TX_TYPE = Arrays.asList(TransactionType.ARBITRARY); private static final List<TransactionType> ARBITRARY_TX_TYPE = Arrays.asList(TransactionType.ARBITRARY);
private static final long ARBITRARY_REQUEST_TIMEOUT = 5 * 1000L; // ms
private static ArbitraryDataManager instance; private static ArbitraryDataManager instance;
private volatile boolean isStopping = false; private volatile boolean isStopping = false;
/**
* Map of recent requests for ARBITRARY transaction data file lists.
* <p>
* Key is original request's message ID<br>
* Value is Triple&lt;transaction signature in base58, first requesting peer, first request's timestamp&gt;
* <p>
* If peer is null then either:<br>
* <ul>
* <li>we are the original requesting peer</li>
* <li>we have already sent data payload to original requesting peer.</li>
* </ul>
* If signature is null then we have already received the file list and either:<br>
* <ul>
* <li>we are the original requesting peer and have processed it</li>
* <li>we have forwarded the file list</li>
* </ul>
*/
public Map<Integer, Triple<String, Peer, Long>> arbitraryDataFileListRequests = Collections.synchronizedMap(new HashMap<>());
/**
* Array to keep track of in progress arbitrary data file requests
*/
private List<Object> arbitraryDataFileRequests = Collections.synchronizedList(new ArrayList<>());
private ArbitraryDataManager() { private ArbitraryDataManager() {
} }
@ -62,7 +95,7 @@ public class ArbitraryDataManager extends Thread {
// Ask our connected peers if they have files for this signature // Ask our connected peers if they have files for this signature
// This process automatically then fetches the files themselves if a peer is found // This process automatically then fetches the files themselves if a peer is found
Controller.getInstance().fetchArbitraryDataFileList(signature); fetchArbitraryDataFileList(signature);
} catch (DataException e) { } catch (DataException e) {
LOGGER.error("Repository issue when fetching arbitrary transaction data", e); LOGGER.error("Repository issue when fetching arbitrary transaction data", e);
@ -93,4 +126,274 @@ public class ArbitraryDataManager extends Thread {
} }
} }
private boolean fetchArbitraryDataFileList(byte[] signature) throws InterruptedException {
LOGGER.info(String.format("Sending data file list request for signature %s", Base58.encode(signature)));
// Build request
Message getArbitraryDataFileListMessage = new GetArbitraryDataFileListMessage(signature);
// Save our request into requests map
String signature58 = Base58.encode(signature);
Triple<String, Peer, Long> requestEntry = new Triple<>(signature58, null, NTP.getTime());
// Assign random ID to this message
int id;
do {
id = new Random().nextInt(Integer.MAX_VALUE - 1) + 1;
// Put queue into map (keyed by message ID) so we can poll for a response
// If putIfAbsent() doesn't return null, then this ID is already taken
} while (arbitraryDataFileListRequests.put(id, requestEntry) != null);
getArbitraryDataFileListMessage.setId(id);
// Broadcast request
Network.getInstance().broadcast(peer -> getArbitraryDataFileListMessage);
// Poll to see if data has arrived
final long singleWait = 100;
long totalWait = 0;
while (totalWait < ARBITRARY_REQUEST_TIMEOUT) {
Thread.sleep(singleWait);
requestEntry = arbitraryDataFileListRequests.get(id);
if (requestEntry == null)
return false;
if (requestEntry.getA() == null)
break;
totalWait += singleWait;
}
return true;
}
private DataFile fetchArbitraryDataFile(Peer peer, byte[] hash) throws InterruptedException {
String hash58 = Base58.encode(hash);
LOGGER.info(String.format("Fetching data file %.8s from peer %s", hash58, peer));
arbitraryDataFileRequests.add(hash58);
Message getDataFileMessage = new GetDataFileMessage(hash);
Message message = peer.getResponse(getDataFileMessage);
arbitraryDataFileRequests.remove(hash58);
LOGGER.info(String.format("Removed hash %.8s from arbitraryDataFileRequests", hash58));
if (message == null || message.getType() != Message.MessageType.DATA_FILE) {
return null;
}
DataFileMessage dataFileMessage = (DataFileMessage) message;
return dataFileMessage.getDataFile();
}
public void cleanupRequestCache(long now) {
final long requestMinimumTimestamp = now - ARBITRARY_REQUEST_TIMEOUT;
arbitraryDataFileListRequests.entrySet().removeIf(entry -> entry.getValue().getC() < requestMinimumTimestamp);
// TODO: cleanup arbitraryDataFileRequests
}
// Network handlers
public void onNetworkGetArbitraryDataMessage(Peer peer, Message message) {
GetArbitraryDataMessage getArbitraryDataMessage = (GetArbitraryDataMessage) message;
byte[] signature = getArbitraryDataMessage.getSignature();
String signature58 = Base58.encode(signature);
Long timestamp = NTP.getTime();
Triple<String, Peer, Long> newEntry = new Triple<>(signature58, peer, timestamp);
// If we've seen this request recently, then ignore
if (arbitraryDataFileListRequests.putIfAbsent(message.getId(), newEntry) != null)
return;
// Do we even have this transaction?
try (final Repository repository = RepositoryManager.getRepository()) {
TransactionData transactionData = repository.getTransactionRepository().fromSignature(signature);
if (transactionData == null || transactionData.getType() != TransactionType.ARBITRARY)
return;
ArbitraryTransaction transaction = new ArbitraryTransaction(repository, transactionData);
// If we have the data then send it
if (transaction.isDataLocal()) {
byte[] data = transaction.fetchData();
if (data == null)
return;
// Update requests map to reflect that we've sent it
newEntry = new Triple<>(signature58, null, timestamp);
arbitraryDataFileListRequests.put(message.getId(), newEntry);
Message arbitraryDataMessage = new ArbitraryDataMessage(signature, data);
arbitraryDataMessage.setId(message.getId());
if (!peer.sendMessage(arbitraryDataMessage))
peer.disconnect("failed to send arbitrary data");
return;
}
// Ask our other peers if they have it
Network.getInstance().broadcast(broadcastPeer -> broadcastPeer == peer ? null : message);
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while finding arbitrary transaction data for peer %s", peer), e);
}
}
public void onNetworkArbitraryDataFileListMessage(Peer peer, Message message) {
LOGGER.info("Received hash list from peer {}", peer);
ArbitraryDataFileListMessage arbitraryDataFileListMessage = (ArbitraryDataFileListMessage) message;
// Do we have a pending request for this data?
Triple<String, Peer, Long> request = arbitraryDataFileListRequests.get(message.getId());
if (request == null || request.getA() == null) {
return;
}
// Does this message's signature match what we're expecting?
byte[] signature = arbitraryDataFileListMessage.getSignature();
String signature58 = Base58.encode(signature);
if (!request.getA().equals(signature58)) {
return;
}
List<byte[]> hashes = arbitraryDataFileListMessage.getHashes();
if (hashes == null || hashes.isEmpty()) {
return;
}
// Check transaction exists and hashes are correct
try (final Repository repository = RepositoryManager.getRepository()) {
TransactionData transactionData = repository.getTransactionRepository().fromSignature(signature);
if (!(transactionData instanceof ArbitraryTransactionData))
return;
ArbitraryTransactionData arbitraryTransactionData = (ArbitraryTransactionData) transactionData;
// Load data file(s)
DataFile dataFile = DataFile.fromHash(arbitraryTransactionData.getData());
dataFile.addChunkHashes(arbitraryTransactionData.getChunkHashes());
// Check all hashes exist
for (byte[] hash : hashes) {
//LOGGER.info("Received hash {}", Base58.encode(hash));
if (!dataFile.containsChunk(hash)) {
LOGGER.info("Received non-matching chunk hash {} for signature {}", Base58.encode(hash), signature58);
return;
}
}
// Update requests map to reflect that we've received it
Triple<String, Peer, Long> newEntry = new Triple<>(null, null, request.getC());
arbitraryDataFileListRequests.put(message.getId(), newEntry);
// Now fetch actual data from this peer
for (byte[] hash : hashes) {
if (!dataFile.chunkExists(hash)) {
// Only request the file if we aren't already requesting it from someone else
if (!arbitraryDataFileRequests.contains(Base58.encode(hash))) {
DataFile receivedDataFile = fetchArbitraryDataFile(peer, hash);
LOGGER.info("Received data file {} from peer {}", receivedDataFile, peer);
}
else {
LOGGER.info("Already requesting data file {}", dataFile);
}
}
}
} catch (DataException | InterruptedException e) {
LOGGER.error(String.format("Repository issue while finding arbitrary transaction data list for peer %s", peer), e);
}
// Forwarding (not yet used)
Peer requestingPeer = request.getB();
if (requestingPeer != null) {
// Forward to requesting peer;
if (!requestingPeer.sendMessage(arbitraryDataFileListMessage)) {
requestingPeer.disconnect("failed to forward arbitrary data file list");
}
}
}
public void onNetworkGetDataFileMessage(Peer peer, Message message) {
GetDataFileMessage getDataFileMessage = (GetDataFileMessage) message;
byte[] hash = getDataFileMessage.getHash();
Controller.getInstance().stats.getDataFileMessageStats.requests.incrementAndGet();
DataFile dataFile = DataFile.fromHash(hash);
if (dataFile.exists()) {
DataFileMessage dataFileMessage = new DataFileMessage(dataFile);
dataFileMessage.setId(message.getId());
if (!peer.sendMessage(dataFileMessage)) {
LOGGER.info("Couldn't sent file");
peer.disconnect("failed to send file");
}
LOGGER.info("Sent file {}", dataFile);
}
else {
// We don't have this file
Controller.getInstance().stats.getDataFileMessageStats.unknownFiles.getAndIncrement();
// Send valid, yet unexpected message type in response, so peer's synchronizer doesn't have to wait for timeout
LOGGER.debug(() -> String.format("Sending 'file unknown' response to peer %s for GET_FILE request for unknown file %s", peer, dataFile));
// We'll send empty block summaries message as it's very short
// TODO: use a different message type here
Message fileUnknownMessage = new BlockSummariesMessage(Collections.emptyList());
fileUnknownMessage.setId(message.getId());
if (!peer.sendMessage(fileUnknownMessage)) {
LOGGER.info("Couldn't sent file-unknown response");
peer.disconnect("failed to send file-unknown response");
}
LOGGER.info("Sent file-unknown response for file {}", dataFile);
}
}
public void onNetworkGetArbitraryDataFileListMessage(Peer peer, Message message) {
GetArbitraryDataFileListMessage getArbitraryDataFileListMessage = (GetArbitraryDataFileListMessage) message;
byte[] signature = getArbitraryDataFileListMessage.getSignature();
Controller.getInstance().stats.getArbitraryDataFileListMessageStats.requests.incrementAndGet();
LOGGER.info("Received hash list request from peer {} for signature {}", peer, Base58.encode(signature));
List<byte[]> hashes = new ArrayList<>();
try (final Repository repository = RepositoryManager.getRepository()) {
// Firstly we need to lookup this file on chain to get a list of its hashes
ArbitraryTransactionData transactionData = (ArbitraryTransactionData)repository.getTransactionRepository().fromSignature(signature);
if (transactionData instanceof ArbitraryTransactionData) {
byte[] hash = transactionData.getData();
byte[] chunkHashes = transactionData.getChunkHashes();
// Load file(s) and add any that exist to the list of hashes
DataFile dataFile = DataFile.fromHash(hash);
if (chunkHashes.length > 0) {
dataFile.addChunkHashes(chunkHashes);
for (DataFileChunk dataFileChunk : dataFile.getChunks()) {
if (dataFileChunk.exists()) {
hashes.add(dataFileChunk.getHash());
//LOGGER.info("Added hash {}", dataFileChunk.getHash58());
}
else {
LOGGER.info("Couldn't add hash {} because it doesn't exist", dataFileChunk.getHash58());
}
}
}
}
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while fetching arbitrary file list for peer %s", peer), e);
}
ArbitraryDataFileListMessage arbitraryDataFileListMessage = new ArbitraryDataFileListMessage(signature, hashes);
arbitraryDataFileListMessage.setId(message.getId());
if (!peer.sendMessage(arbitraryDataFileListMessage)) {
LOGGER.info("Couldn't send list of hashes");
peer.disconnect("failed to send list of hashes");
}
LOGGER.info("Sent list of hashes", hashes);
}
} }

View File

@ -21,7 +21,6 @@ import org.qortal.data.block.BlockSummaryData;
import org.qortal.data.network.OnlineAccountData; import org.qortal.data.network.OnlineAccountData;
import org.qortal.data.network.PeerChainTipData; import org.qortal.data.network.PeerChainTipData;
import org.qortal.data.network.PeerData; import org.qortal.data.network.PeerData;
import org.qortal.data.transaction.ArbitraryTransactionData;
import org.qortal.data.transaction.ChatTransactionData; import org.qortal.data.transaction.ChatTransactionData;
import org.qortal.data.transaction.TransactionData; import org.qortal.data.transaction.TransactionData;
import org.qortal.event.Event; import org.qortal.event.Event;
@ -38,9 +37,6 @@ import org.qortal.repository.RepositoryFactory;
import org.qortal.repository.RepositoryManager; import org.qortal.repository.RepositoryManager;
import org.qortal.repository.hsqldb.HSQLDBRepositoryFactory; import org.qortal.repository.hsqldb.HSQLDBRepositoryFactory;
import org.qortal.settings.Settings; import org.qortal.settings.Settings;
import org.qortal.storage.DataFile;
import org.qortal.storage.DataFileChunk;
import org.qortal.transaction.ArbitraryTransaction;
import org.qortal.transaction.Transaction; import org.qortal.transaction.Transaction;
import org.qortal.transaction.Transaction.TransactionType; import org.qortal.transaction.Transaction.TransactionType;
import org.qortal.transaction.Transaction.ValidationResult; import org.qortal.transaction.Transaction.ValidationResult;
@ -85,7 +81,6 @@ public class Controller extends Thread {
private static final int MAX_BLOCKCHAIN_TIP_AGE = 5; // blocks private static final int MAX_BLOCKCHAIN_TIP_AGE = 5; // blocks
private static final Object shutdownLock = new Object(); private static final Object shutdownLock = new Object();
private static final String repositoryUrlTemplate = "jdbc:hsqldb:file:%s" + File.separator + "blockchain;create=true;hsqldb.full_log_replay=true"; private static final String repositoryUrlTemplate = "jdbc:hsqldb:file:%s" + File.separator + "blockchain;create=true;hsqldb.full_log_replay=true";
private static final long ARBITRARY_REQUEST_TIMEOUT = 5 * 1000L; // ms
private static final long NTP_PRE_SYNC_CHECK_PERIOD = 5 * 1000L; // ms private static final long NTP_PRE_SYNC_CHECK_PERIOD = 5 * 1000L; // ms
private static final long NTP_POST_SYNC_CHECK_PERIOD = 5 * 60 * 1000L; // ms private static final long NTP_POST_SYNC_CHECK_PERIOD = 5 * 60 * 1000L; // ms
private static final long DELETE_EXPIRED_INTERVAL = 5 * 60 * 1000L; // ms private static final long DELETE_EXPIRED_INTERVAL = 5 * 60 * 1000L; // ms
@ -149,27 +144,6 @@ public class Controller extends Thread {
private boolean peersAvailable = true; // peersAvailable must default to true private boolean peersAvailable = true; // peersAvailable must default to true
private long timePeersLastAvailable = 0; private long timePeersLastAvailable = 0;
/**
* Map of recent requests for ARBITRARY transaction data payloads.
* <p>
* Key is original request's message ID<br>
* Value is Triple&lt;transaction signature in base58, first requesting peer, first request's timestamp&gt;
* <p>
* If peer is null then either:<br>
* <ul>
* <li>we are the original requesting peer</li>
* <li>we have already sent data payload to original requesting peer.</li>
* </ul>
* If signature is null then we have already received the data payload and either:<br>
* <ul>
* <li>we are the original requesting peer and have saved it locally</li>
* <li>we have forwarded the data payload (and maybe also saved it locally)</li>
* </ul>
*/
private Map<Integer, Triple<String, Peer, Long>> arbitraryDataFileListRequests = Collections.synchronizedMap(new HashMap<>());
private List<Object> arbitraryDataFileRequests = Collections.synchronizedList(new ArrayList<>());
/** Lock for only allowing one blockchain-modifying codepath at a time. e.g. synchronization or newly minted block. */ /** Lock for only allowing one blockchain-modifying codepath at a time. e.g. synchronization or newly minted block. */
private final ReentrantLock blockchainLock = new ReentrantLock(); private final ReentrantLock blockchainLock = new ReentrantLock();
@ -247,7 +221,7 @@ public class Controller extends Thread {
public StatsSnapshot() { public StatsSnapshot() {
} }
} }
private final StatsSnapshot stats = new StatsSnapshot(); public final StatsSnapshot stats = new StatsSnapshot();
// Constructors // Constructors
@ -561,8 +535,7 @@ public class Controller extends Thread {
} }
// Clean up arbitrary data request cache // Clean up arbitrary data request cache
final long requestMinimumTimestamp = now - ARBITRARY_REQUEST_TIMEOUT; ArbitraryDataManager.getInstance().cleanupRequestCache(now);
arbitraryDataFileListRequests.entrySet().removeIf(entry -> entry.getValue().getC() < requestMinimumTimestamp);
// Time to 'checkpoint' uncommitted repository writes? // Time to 'checkpoint' uncommitted repository writes?
if (now >= repositoryCheckpointTimestamp + repositoryCheckpointInterval) { if (now >= repositoryCheckpointTimestamp + repositoryCheckpointInterval) {
@ -1241,14 +1214,6 @@ public class Controller extends Thread {
onNetworkTransactionSignaturesMessage(peer, message); onNetworkTransactionSignaturesMessage(peer, message);
break; break;
case GET_ARBITRARY_DATA:
onNetworkGetArbitraryDataMessage(peer, message);
break;
case ARBITRARY_DATA_FILE_LIST:
onNetworkArbitraryDataFileListMessage(peer, message);
break;
case GET_ONLINE_ACCOUNTS: case GET_ONLINE_ACCOUNTS:
onNetworkGetOnlineAccountsMessage(peer, message); onNetworkGetOnlineAccountsMessage(peer, message);
break; break;
@ -1257,12 +1222,20 @@ public class Controller extends Thread {
onNetworkOnlineAccountsMessage(peer, message); onNetworkOnlineAccountsMessage(peer, message);
break; break;
case GET_ARBITRARY_DATA:
ArbitraryDataManager.getInstance().onNetworkGetArbitraryDataMessage(peer, message);
break;
case ARBITRARY_DATA_FILE_LIST:
ArbitraryDataManager.getInstance().onNetworkArbitraryDataFileListMessage(peer, message);
break;
case GET_DATA_FILE: case GET_DATA_FILE:
onNetworkGetDataFileMessage(peer, message); ArbitraryDataManager.getInstance().onNetworkGetDataFileMessage(peer, message);
break; break;
case GET_ARBITRARY_DATA_FILE_LIST: case GET_ARBITRARY_DATA_FILE_LIST:
onNetworkGetArbitraryDataFileListMessage(peer, message); ArbitraryDataManager.getInstance().onNetworkGetArbitraryDataFileListMessage(peer, message);
break; break;
default: default:
@ -1624,125 +1597,6 @@ public class Controller extends Thread {
} }
} }
private void onNetworkGetArbitraryDataMessage(Peer peer, Message message) {
GetArbitraryDataMessage getArbitraryDataMessage = (GetArbitraryDataMessage) message;
byte[] signature = getArbitraryDataMessage.getSignature();
String signature58 = Base58.encode(signature);
Long timestamp = NTP.getTime();
Triple<String, Peer, Long> newEntry = new Triple<>(signature58, peer, timestamp);
// If we've seen this request recently, then ignore
if (arbitraryDataFileListRequests.putIfAbsent(message.getId(), newEntry) != null)
return;
// Do we even have this transaction?
try (final Repository repository = RepositoryManager.getRepository()) {
TransactionData transactionData = repository.getTransactionRepository().fromSignature(signature);
if (transactionData == null || transactionData.getType() != TransactionType.ARBITRARY)
return;
ArbitraryTransaction transaction = new ArbitraryTransaction(repository, transactionData);
// If we have the data then send it
if (transaction.isDataLocal()) {
byte[] data = transaction.fetchData();
if (data == null)
return;
// Update requests map to reflect that we've sent it
newEntry = new Triple<>(signature58, null, timestamp);
arbitraryDataFileListRequests.put(message.getId(), newEntry);
Message arbitraryDataMessage = new ArbitraryDataMessage(signature, data);
arbitraryDataMessage.setId(message.getId());
if (!peer.sendMessage(arbitraryDataMessage))
peer.disconnect("failed to send arbitrary data");
return;
}
// Ask our other peers if they have it
Network.getInstance().broadcast(broadcastPeer -> broadcastPeer == peer ? null : message);
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while finding arbitrary transaction data for peer %s", peer), e);
}
}
private void onNetworkArbitraryDataFileListMessage(Peer peer, Message message) {
LOGGER.info("Received hash list from peer {}", peer);
ArbitraryDataFileListMessage arbitraryDataFileListMessage = (ArbitraryDataFileListMessage) message;
// Do we have a pending request for this data?
Triple<String, Peer, Long> request = arbitraryDataFileListRequests.get(message.getId());
if (request == null || request.getA() == null) {
return;
}
// Does this message's signature match what we're expecting?
byte[] signature = arbitraryDataFileListMessage.getSignature();
String signature58 = Base58.encode(signature);
if (!request.getA().equals(signature58)) {
return;
}
List<byte[]> hashes = arbitraryDataFileListMessage.getHashes();
if (hashes == null || hashes.isEmpty()) {
return;
}
// Check transaction exists and hashes are correct
try (final Repository repository = RepositoryManager.getRepository()) {
TransactionData transactionData = repository.getTransactionRepository().fromSignature(signature);
if (!(transactionData instanceof ArbitraryTransactionData))
return;
ArbitraryTransactionData arbitraryTransactionData = (ArbitraryTransactionData) transactionData;
// Load data file(s)
DataFile dataFile = DataFile.fromHash(arbitraryTransactionData.getData());
dataFile.addChunkHashes(arbitraryTransactionData.getChunkHashes());
// Check all hashes exist
for (byte[] hash : hashes) {
if (!dataFile.containsChunk(hash)) {
LOGGER.info("Received non-matching chunk hash {} for signature {}", Base58.encode(hash), signature58);
return;
}
}
// Update requests map to reflect that we've received it
Triple<String, Peer, Long> newEntry = new Triple<>(null, null, request.getC());
arbitraryDataFileListRequests.put(message.getId(), newEntry);
// Now fetch actual data from this peer
for (byte[] hash : hashes) {
if (!dataFile.chunkExists(hash)) {
// Only request the file if we aren't already requesting it from someone else
if (!arbitraryDataFileRequests.contains(Base58.encode(hash))) {
DataFile receivedDataFile = fetchArbitraryDataFile(peer, hash);
LOGGER.info("Received data file {} from peer {}", receivedDataFile, peer);
}
else {
LOGGER.info("Already requesting data file {}", dataFile);
}
}
}
} catch (DataException | InterruptedException e) {
LOGGER.error(String.format("Repository issue while finding arbitrary transaction data list for peer %s", peer), e);
}
// Forwarding (not yet used)
Peer requestingPeer = request.getB();
if (requestingPeer != null) {
// Forward to requesting peer;
if (!requestingPeer.sendMessage(arbitraryDataFileListMessage)) {
requestingPeer.disconnect("failed to forward arbitrary data file list");
}
}
}
private void onNetworkGetOnlineAccountsMessage(Peer peer, Message message) { private void onNetworkGetOnlineAccountsMessage(Peer peer, Message message) {
GetOnlineAccountsMessage getOnlineAccountsMessage = (GetOnlineAccountsMessage) message; GetOnlineAccountsMessage getOnlineAccountsMessage = (GetOnlineAccountsMessage) message;
@ -1790,84 +1644,6 @@ public class Controller extends Thread {
} }
} }
private void onNetworkGetDataFileMessage(Peer peer, Message message) {
GetDataFileMessage getDataFileMessage = (GetDataFileMessage) message;
byte[] hash = getDataFileMessage.getHash();
this.stats.getDataFileMessageStats.requests.incrementAndGet();
DataFile dataFile = DataFile.fromHash(hash);
if (dataFile.exists()) {
DataFileMessage dataFileMessage = new DataFileMessage(dataFile);
dataFileMessage.setId(message.getId());
if (!peer.sendMessage(dataFileMessage)) {
LOGGER.info("Couldn't sent file");
peer.disconnect("failed to send file");
}
LOGGER.info("Sent file {}", dataFile);
}
else {
// We don't have this file
this.stats.getDataFileMessageStats.unknownFiles.getAndIncrement();
// Send valid, yet unexpected message type in response, so peer's synchronizer doesn't have to wait for timeout
LOGGER.debug(() -> String.format("Sending 'file unknown' response to peer %s for GET_FILE request for unknown file %s", peer, dataFile));
// We'll send empty block summaries message as it's very short
// TODO: use a different message type here
Message fileUnknownMessage = new BlockSummariesMessage(Collections.emptyList());
fileUnknownMessage.setId(message.getId());
if (!peer.sendMessage(fileUnknownMessage)) {
LOGGER.info("Couldn't sent file-unknown response");
peer.disconnect("failed to send file-unknown response");
}
LOGGER.info("Sent file-unknown response for file {}", dataFile);
}
}
private void onNetworkGetArbitraryDataFileListMessage(Peer peer, Message message) {
GetArbitraryDataFileListMessage getArbitraryDataFileListMessage = (GetArbitraryDataFileListMessage) message;
byte[] signature = getArbitraryDataFileListMessage.getSignature();
this.stats.getArbitraryDataFileListMessageStats.requests.incrementAndGet();
LOGGER.info("Received hash list request from peer {} for signature {}", peer, Base58.encode(signature));
List<byte[]> hashes = new ArrayList<>();
try (final Repository repository = RepositoryManager.getRepository()) {
// Firstly we need to lookup this file on chain to get a list of its hashes
ArbitraryTransactionData transactionData = (ArbitraryTransactionData)repository.getTransactionRepository().fromSignature(signature);
if (transactionData instanceof ArbitraryTransactionData) {
byte[] hash = transactionData.getData();
byte[] chunkHashes = transactionData.getChunkHashes();
// Load file(s) and add any that exist to the list of hashes
DataFile dataFile = DataFile.fromHash(hash);
if (chunkHashes.length > 0) {
dataFile.addChunkHashes(chunkHashes);
for (DataFileChunk dataFileChunk : dataFile.getChunks()) {
if (dataFileChunk.exists()) {
hashes.add(dataFileChunk.getHash());
}
}
}
}
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while fetching arbitrary file list for peer %s", peer), e);
}
ArbitraryDataFileListMessage arbitraryDataFileListMessage = new ArbitraryDataFileListMessage(signature, hashes);
arbitraryDataFileListMessage.setId(message.getId());
if (!peer.sendMessage(arbitraryDataFileListMessage)) {
LOGGER.info("Couldn't send list of hashes");
peer.disconnect("failed to send list of hashes");
}
LOGGER.info("Sent list of hashes", hashes);
}
// Utilities // Utilities
private void verifyAndAddAccount(Repository repository, OnlineAccountData onlineAccountData) throws DataException { private void verifyAndAddAccount(Repository repository, OnlineAccountData onlineAccountData) throws DataException {
@ -2115,63 +1891,6 @@ public class Controller extends Thread {
} }
} }
public boolean fetchArbitraryDataFileList(byte[] signature) throws InterruptedException {
LOGGER.info(String.format("Sending data file list request for signature %s", Base58.encode(signature)));
// Build request
Message getArbitraryDataFileListMessage = new GetArbitraryDataFileListMessage(signature);
// Save our request into requests map
String signature58 = Base58.encode(signature);
Triple<String, Peer, Long> requestEntry = new Triple<>(signature58, null, NTP.getTime());
// Assign random ID to this message
int id;
do {
id = new Random().nextInt(Integer.MAX_VALUE - 1) + 1;
// Put queue into map (keyed by message ID) so we can poll for a response
// If putIfAbsent() doesn't return null, then this ID is already taken
} while (arbitraryDataFileListRequests.put(id, requestEntry) != null);
getArbitraryDataFileListMessage.setId(id);
// Broadcast request
Network.getInstance().broadcast(peer -> getArbitraryDataFileListMessage);
// Poll to see if data has arrived
final long singleWait = 100;
long totalWait = 0;
while (totalWait < ARBITRARY_REQUEST_TIMEOUT) {
Thread.sleep(singleWait);
requestEntry = arbitraryDataFileListRequests.get(id);
if (requestEntry == null)
return false;
if (requestEntry.getA() == null)
break;
totalWait += singleWait;
}
return true;
}
private DataFile fetchArbitraryDataFile(Peer peer, byte[] hash) throws InterruptedException {
String hash58 = Base58.encode(hash);
LOGGER.info(String.format("Fetching data file %.8s from peer %s", hash58, peer));
arbitraryDataFileRequests.add(hash58);
Message getDataFileMessage = new GetDataFileMessage(hash);
Message message = peer.getResponse(getDataFileMessage);
arbitraryDataFileRequests.remove(hash58);
if (message == null || message.getType() != Message.MessageType.DATA_FILE) {
return null;
}
DataFileMessage dataFileMessage = (DataFileMessage) message;
return dataFileMessage.getDataFile();
}
/** Returns a list of peers that are not misbehaving, and have a recent block. */ /** Returns a list of peers that are not misbehaving, and have a recent block. */
public List<Peer> getRecentBehavingPeers() { public List<Peer> getRecentBehavingPeers() {
final Long minLatestBlockTimestamp = getMinimumLatestBlockTimestamp(); final Long minLatestBlockTimestamp = getMinimumLatestBlockTimestamp();