diff --git a/src/main/java/org/qortal/controller/ArbitraryDataManager.java b/src/main/java/org/qortal/controller/ArbitraryDataManager.java index 8292d2a5..63455124 100644 --- a/src/main/java/org/qortal/controller/ArbitraryDataManager.java +++ b/src/main/java/org/qortal/controller/ArbitraryDataManager.java @@ -1,29 +1,62 @@ package org.qortal.controller; -import java.util.Arrays; -import java.util.List; -import java.util.Random; +import java.util.*; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.qortal.api.resource.TransactionsResource.ConfirmationStatus; import org.qortal.data.transaction.ArbitraryTransactionData; import org.qortal.data.transaction.TransactionData; +import org.qortal.network.Network; +import org.qortal.network.Peer; +import org.qortal.network.message.*; import org.qortal.repository.DataException; import org.qortal.repository.Repository; import org.qortal.repository.RepositoryManager; +import org.qortal.storage.DataFile; +import org.qortal.storage.DataFileChunk; import org.qortal.transaction.ArbitraryTransaction; import org.qortal.transaction.Transaction.TransactionType; +import org.qortal.utils.Base58; +import org.qortal.utils.NTP; +import org.qortal.utils.Triple; public class ArbitraryDataManager extends Thread { private static final Logger LOGGER = LogManager.getLogger(ArbitraryDataManager.class); private static final List ARBITRARY_TX_TYPE = Arrays.asList(TransactionType.ARBITRARY); + private static final long ARBITRARY_REQUEST_TIMEOUT = 5 * 1000L; // ms + private static ArbitraryDataManager instance; private volatile boolean isStopping = false; + + /** + * Map of recent requests for ARBITRARY transaction data file lists. + *

+ * Key is original request's message ID
+ * Value is Triple<transaction signature in base58, first requesting peer, first request's timestamp> + *

+ * If peer is null then either:
+ *

+ * If signature is null then we have already received the file list and either:
+ * + */ + public Map> arbitraryDataFileListRequests = Collections.synchronizedMap(new HashMap<>()); + + /** + * Array to keep track of in progress arbitrary data file requests + */ + private List arbitraryDataFileRequests = Collections.synchronizedList(new ArrayList<>()); + private ArbitraryDataManager() { } @@ -62,7 +95,7 @@ public class ArbitraryDataManager extends Thread { // Ask our connected peers if they have files for this signature // This process automatically then fetches the files themselves if a peer is found - Controller.getInstance().fetchArbitraryDataFileList(signature); + fetchArbitraryDataFileList(signature); } catch (DataException e) { LOGGER.error("Repository issue when fetching arbitrary transaction data", e); @@ -93,4 +126,274 @@ public class ArbitraryDataManager extends Thread { } } + private boolean fetchArbitraryDataFileList(byte[] signature) throws InterruptedException { + LOGGER.info(String.format("Sending data file list request for signature %s", Base58.encode(signature))); + // Build request + Message getArbitraryDataFileListMessage = new GetArbitraryDataFileListMessage(signature); + + // Save our request into requests map + String signature58 = Base58.encode(signature); + Triple requestEntry = new Triple<>(signature58, null, NTP.getTime()); + + // Assign random ID to this message + int id; + do { + id = new Random().nextInt(Integer.MAX_VALUE - 1) + 1; + + // Put queue into map (keyed by message ID) so we can poll for a response + // If putIfAbsent() doesn't return null, then this ID is already taken + } while (arbitraryDataFileListRequests.put(id, requestEntry) != null); + getArbitraryDataFileListMessage.setId(id); + + // Broadcast request + Network.getInstance().broadcast(peer -> getArbitraryDataFileListMessage); + + // Poll to see if data has arrived + final long singleWait = 100; + long totalWait = 0; + while (totalWait < ARBITRARY_REQUEST_TIMEOUT) { + Thread.sleep(singleWait); + + requestEntry = arbitraryDataFileListRequests.get(id); + if (requestEntry == null) + return false; + + if (requestEntry.getA() == null) + break; + + totalWait += singleWait; + } + return true; + } + + private DataFile fetchArbitraryDataFile(Peer peer, byte[] hash) throws InterruptedException { + String hash58 = Base58.encode(hash); + LOGGER.info(String.format("Fetching data file %.8s from peer %s", hash58, peer)); + arbitraryDataFileRequests.add(hash58); + Message getDataFileMessage = new GetDataFileMessage(hash); + + Message message = peer.getResponse(getDataFileMessage); + arbitraryDataFileRequests.remove(hash58); + LOGGER.info(String.format("Removed hash %.8s from arbitraryDataFileRequests", hash58)); + + if (message == null || message.getType() != Message.MessageType.DATA_FILE) { + return null; + } + + DataFileMessage dataFileMessage = (DataFileMessage) message; + return dataFileMessage.getDataFile(); + } + + public void cleanupRequestCache(long now) { + final long requestMinimumTimestamp = now - ARBITRARY_REQUEST_TIMEOUT; + arbitraryDataFileListRequests.entrySet().removeIf(entry -> entry.getValue().getC() < requestMinimumTimestamp); + + // TODO: cleanup arbitraryDataFileRequests + } + + + // Network handlers + + public void onNetworkGetArbitraryDataMessage(Peer peer, Message message) { + GetArbitraryDataMessage getArbitraryDataMessage = (GetArbitraryDataMessage) message; + + byte[] signature = getArbitraryDataMessage.getSignature(); + String signature58 = Base58.encode(signature); + Long timestamp = NTP.getTime(); + Triple newEntry = new Triple<>(signature58, peer, timestamp); + + // If we've seen this request recently, then ignore + if (arbitraryDataFileListRequests.putIfAbsent(message.getId(), newEntry) != null) + return; + + // Do we even have this transaction? + try (final Repository repository = RepositoryManager.getRepository()) { + TransactionData transactionData = repository.getTransactionRepository().fromSignature(signature); + if (transactionData == null || transactionData.getType() != TransactionType.ARBITRARY) + return; + + ArbitraryTransaction transaction = new ArbitraryTransaction(repository, transactionData); + + // If we have the data then send it + if (transaction.isDataLocal()) { + byte[] data = transaction.fetchData(); + if (data == null) + return; + + // Update requests map to reflect that we've sent it + newEntry = new Triple<>(signature58, null, timestamp); + arbitraryDataFileListRequests.put(message.getId(), newEntry); + + Message arbitraryDataMessage = new ArbitraryDataMessage(signature, data); + arbitraryDataMessage.setId(message.getId()); + if (!peer.sendMessage(arbitraryDataMessage)) + peer.disconnect("failed to send arbitrary data"); + + return; + } + + // Ask our other peers if they have it + Network.getInstance().broadcast(broadcastPeer -> broadcastPeer == peer ? null : message); + } catch (DataException e) { + LOGGER.error(String.format("Repository issue while finding arbitrary transaction data for peer %s", peer), e); + } + } + + public void onNetworkArbitraryDataFileListMessage(Peer peer, Message message) { + LOGGER.info("Received hash list from peer {}", peer); + ArbitraryDataFileListMessage arbitraryDataFileListMessage = (ArbitraryDataFileListMessage) message; + + // Do we have a pending request for this data? + Triple request = arbitraryDataFileListRequests.get(message.getId()); + if (request == null || request.getA() == null) { + return; + } + + // Does this message's signature match what we're expecting? + byte[] signature = arbitraryDataFileListMessage.getSignature(); + String signature58 = Base58.encode(signature); + if (!request.getA().equals(signature58)) { + return; + } + + List hashes = arbitraryDataFileListMessage.getHashes(); + if (hashes == null || hashes.isEmpty()) { + return; + } + + // Check transaction exists and hashes are correct + try (final Repository repository = RepositoryManager.getRepository()) { + TransactionData transactionData = repository.getTransactionRepository().fromSignature(signature); + if (!(transactionData instanceof ArbitraryTransactionData)) + return; + + ArbitraryTransactionData arbitraryTransactionData = (ArbitraryTransactionData) transactionData; + + // Load data file(s) + DataFile dataFile = DataFile.fromHash(arbitraryTransactionData.getData()); + dataFile.addChunkHashes(arbitraryTransactionData.getChunkHashes()); + + // Check all hashes exist + for (byte[] hash : hashes) { + //LOGGER.info("Received hash {}", Base58.encode(hash)); + if (!dataFile.containsChunk(hash)) { + LOGGER.info("Received non-matching chunk hash {} for signature {}", Base58.encode(hash), signature58); + return; + } + } + + // Update requests map to reflect that we've received it + Triple newEntry = new Triple<>(null, null, request.getC()); + arbitraryDataFileListRequests.put(message.getId(), newEntry); + + // Now fetch actual data from this peer + for (byte[] hash : hashes) { + if (!dataFile.chunkExists(hash)) { + // Only request the file if we aren't already requesting it from someone else + if (!arbitraryDataFileRequests.contains(Base58.encode(hash))) { + DataFile receivedDataFile = fetchArbitraryDataFile(peer, hash); + LOGGER.info("Received data file {} from peer {}", receivedDataFile, peer); + } + else { + LOGGER.info("Already requesting data file {}", dataFile); + } + } + } + + } catch (DataException | InterruptedException e) { + LOGGER.error(String.format("Repository issue while finding arbitrary transaction data list for peer %s", peer), e); + } + + // Forwarding (not yet used) + Peer requestingPeer = request.getB(); + if (requestingPeer != null) { + // Forward to requesting peer; + if (!requestingPeer.sendMessage(arbitraryDataFileListMessage)) { + requestingPeer.disconnect("failed to forward arbitrary data file list"); + } + } + } + + public void onNetworkGetDataFileMessage(Peer peer, Message message) { + GetDataFileMessage getDataFileMessage = (GetDataFileMessage) message; + byte[] hash = getDataFileMessage.getHash(); + Controller.getInstance().stats.getDataFileMessageStats.requests.incrementAndGet(); + + DataFile dataFile = DataFile.fromHash(hash); + if (dataFile.exists()) { + DataFileMessage dataFileMessage = new DataFileMessage(dataFile); + dataFileMessage.setId(message.getId()); + if (!peer.sendMessage(dataFileMessage)) { + LOGGER.info("Couldn't sent file"); + peer.disconnect("failed to send file"); + } + LOGGER.info("Sent file {}", dataFile); + } + else { + + // We don't have this file + Controller.getInstance().stats.getDataFileMessageStats.unknownFiles.getAndIncrement(); + + // Send valid, yet unexpected message type in response, so peer's synchronizer doesn't have to wait for timeout + LOGGER.debug(() -> String.format("Sending 'file unknown' response to peer %s for GET_FILE request for unknown file %s", peer, dataFile)); + + // We'll send empty block summaries message as it's very short + // TODO: use a different message type here + Message fileUnknownMessage = new BlockSummariesMessage(Collections.emptyList()); + fileUnknownMessage.setId(message.getId()); + if (!peer.sendMessage(fileUnknownMessage)) { + LOGGER.info("Couldn't sent file-unknown response"); + peer.disconnect("failed to send file-unknown response"); + } + LOGGER.info("Sent file-unknown response for file {}", dataFile); + } + } + + public void onNetworkGetArbitraryDataFileListMessage(Peer peer, Message message) { + GetArbitraryDataFileListMessage getArbitraryDataFileListMessage = (GetArbitraryDataFileListMessage) message; + byte[] signature = getArbitraryDataFileListMessage.getSignature(); + Controller.getInstance().stats.getArbitraryDataFileListMessageStats.requests.incrementAndGet(); + + LOGGER.info("Received hash list request from peer {} for signature {}", peer, Base58.encode(signature)); + + List hashes = new ArrayList<>(); + + try (final Repository repository = RepositoryManager.getRepository()) { + + // Firstly we need to lookup this file on chain to get a list of its hashes + ArbitraryTransactionData transactionData = (ArbitraryTransactionData)repository.getTransactionRepository().fromSignature(signature); + if (transactionData instanceof ArbitraryTransactionData) { + + byte[] hash = transactionData.getData(); + byte[] chunkHashes = transactionData.getChunkHashes(); + + // Load file(s) and add any that exist to the list of hashes + DataFile dataFile = DataFile.fromHash(hash); + if (chunkHashes.length > 0) { + dataFile.addChunkHashes(chunkHashes); + for (DataFileChunk dataFileChunk : dataFile.getChunks()) { + if (dataFileChunk.exists()) { + hashes.add(dataFileChunk.getHash()); + //LOGGER.info("Added hash {}", dataFileChunk.getHash58()); + } + else { + LOGGER.info("Couldn't add hash {} because it doesn't exist", dataFileChunk.getHash58()); + } + } + } + } + + } catch (DataException e) { + LOGGER.error(String.format("Repository issue while fetching arbitrary file list for peer %s", peer), e); + } + + ArbitraryDataFileListMessage arbitraryDataFileListMessage = new ArbitraryDataFileListMessage(signature, hashes); + arbitraryDataFileListMessage.setId(message.getId()); + if (!peer.sendMessage(arbitraryDataFileListMessage)) { + LOGGER.info("Couldn't send list of hashes"); + peer.disconnect("failed to send list of hashes"); + } + LOGGER.info("Sent list of hashes", hashes); + } + } diff --git a/src/main/java/org/qortal/controller/Controller.java b/src/main/java/org/qortal/controller/Controller.java index 576356c4..df6babd1 100644 --- a/src/main/java/org/qortal/controller/Controller.java +++ b/src/main/java/org/qortal/controller/Controller.java @@ -21,7 +21,6 @@ import org.qortal.data.block.BlockSummaryData; import org.qortal.data.network.OnlineAccountData; import org.qortal.data.network.PeerChainTipData; import org.qortal.data.network.PeerData; -import org.qortal.data.transaction.ArbitraryTransactionData; import org.qortal.data.transaction.ChatTransactionData; import org.qortal.data.transaction.TransactionData; import org.qortal.event.Event; @@ -38,9 +37,6 @@ import org.qortal.repository.RepositoryFactory; import org.qortal.repository.RepositoryManager; import org.qortal.repository.hsqldb.HSQLDBRepositoryFactory; import org.qortal.settings.Settings; -import org.qortal.storage.DataFile; -import org.qortal.storage.DataFileChunk; -import org.qortal.transaction.ArbitraryTransaction; import org.qortal.transaction.Transaction; import org.qortal.transaction.Transaction.TransactionType; import org.qortal.transaction.Transaction.ValidationResult; @@ -85,7 +81,6 @@ public class Controller extends Thread { private static final int MAX_BLOCKCHAIN_TIP_AGE = 5; // blocks private static final Object shutdownLock = new Object(); private static final String repositoryUrlTemplate = "jdbc:hsqldb:file:%s" + File.separator + "blockchain;create=true;hsqldb.full_log_replay=true"; - private static final long ARBITRARY_REQUEST_TIMEOUT = 5 * 1000L; // ms private static final long NTP_PRE_SYNC_CHECK_PERIOD = 5 * 1000L; // ms private static final long NTP_POST_SYNC_CHECK_PERIOD = 5 * 60 * 1000L; // ms private static final long DELETE_EXPIRED_INTERVAL = 5 * 60 * 1000L; // ms @@ -149,27 +144,6 @@ public class Controller extends Thread { private boolean peersAvailable = true; // peersAvailable must default to true private long timePeersLastAvailable = 0; - /** - * Map of recent requests for ARBITRARY transaction data payloads. - *

- * Key is original request's message ID
- * Value is Triple<transaction signature in base58, first requesting peer, first request's timestamp> - *

- * If peer is null then either:
- *

    - *
  • we are the original requesting peer
  • - *
  • we have already sent data payload to original requesting peer.
  • - *
- * If signature is null then we have already received the data payload and either:
- *
    - *
  • we are the original requesting peer and have saved it locally
  • - *
  • we have forwarded the data payload (and maybe also saved it locally)
  • - *
- */ - private Map> arbitraryDataFileListRequests = Collections.synchronizedMap(new HashMap<>()); - - private List arbitraryDataFileRequests = Collections.synchronizedList(new ArrayList<>()); - /** Lock for only allowing one blockchain-modifying codepath at a time. e.g. synchronization or newly minted block. */ private final ReentrantLock blockchainLock = new ReentrantLock(); @@ -247,7 +221,7 @@ public class Controller extends Thread { public StatsSnapshot() { } } - private final StatsSnapshot stats = new StatsSnapshot(); + public final StatsSnapshot stats = new StatsSnapshot(); // Constructors @@ -561,8 +535,7 @@ public class Controller extends Thread { } // Clean up arbitrary data request cache - final long requestMinimumTimestamp = now - ARBITRARY_REQUEST_TIMEOUT; - arbitraryDataFileListRequests.entrySet().removeIf(entry -> entry.getValue().getC() < requestMinimumTimestamp); + ArbitraryDataManager.getInstance().cleanupRequestCache(now); // Time to 'checkpoint' uncommitted repository writes? if (now >= repositoryCheckpointTimestamp + repositoryCheckpointInterval) { @@ -1241,14 +1214,6 @@ public class Controller extends Thread { onNetworkTransactionSignaturesMessage(peer, message); break; - case GET_ARBITRARY_DATA: - onNetworkGetArbitraryDataMessage(peer, message); - break; - - case ARBITRARY_DATA_FILE_LIST: - onNetworkArbitraryDataFileListMessage(peer, message); - break; - case GET_ONLINE_ACCOUNTS: onNetworkGetOnlineAccountsMessage(peer, message); break; @@ -1257,12 +1222,20 @@ public class Controller extends Thread { onNetworkOnlineAccountsMessage(peer, message); break; + case GET_ARBITRARY_DATA: + ArbitraryDataManager.getInstance().onNetworkGetArbitraryDataMessage(peer, message); + break; + + case ARBITRARY_DATA_FILE_LIST: + ArbitraryDataManager.getInstance().onNetworkArbitraryDataFileListMessage(peer, message); + break; + case GET_DATA_FILE: - onNetworkGetDataFileMessage(peer, message); + ArbitraryDataManager.getInstance().onNetworkGetDataFileMessage(peer, message); break; case GET_ARBITRARY_DATA_FILE_LIST: - onNetworkGetArbitraryDataFileListMessage(peer, message); + ArbitraryDataManager.getInstance().onNetworkGetArbitraryDataFileListMessage(peer, message); break; default: @@ -1624,125 +1597,6 @@ public class Controller extends Thread { } } - private void onNetworkGetArbitraryDataMessage(Peer peer, Message message) { - GetArbitraryDataMessage getArbitraryDataMessage = (GetArbitraryDataMessage) message; - - byte[] signature = getArbitraryDataMessage.getSignature(); - String signature58 = Base58.encode(signature); - Long timestamp = NTP.getTime(); - Triple newEntry = new Triple<>(signature58, peer, timestamp); - - // If we've seen this request recently, then ignore - if (arbitraryDataFileListRequests.putIfAbsent(message.getId(), newEntry) != null) - return; - - // Do we even have this transaction? - try (final Repository repository = RepositoryManager.getRepository()) { - TransactionData transactionData = repository.getTransactionRepository().fromSignature(signature); - if (transactionData == null || transactionData.getType() != TransactionType.ARBITRARY) - return; - - ArbitraryTransaction transaction = new ArbitraryTransaction(repository, transactionData); - - // If we have the data then send it - if (transaction.isDataLocal()) { - byte[] data = transaction.fetchData(); - if (data == null) - return; - - // Update requests map to reflect that we've sent it - newEntry = new Triple<>(signature58, null, timestamp); - arbitraryDataFileListRequests.put(message.getId(), newEntry); - - Message arbitraryDataMessage = new ArbitraryDataMessage(signature, data); - arbitraryDataMessage.setId(message.getId()); - if (!peer.sendMessage(arbitraryDataMessage)) - peer.disconnect("failed to send arbitrary data"); - - return; - } - - // Ask our other peers if they have it - Network.getInstance().broadcast(broadcastPeer -> broadcastPeer == peer ? null : message); - } catch (DataException e) { - LOGGER.error(String.format("Repository issue while finding arbitrary transaction data for peer %s", peer), e); - } - } - - private void onNetworkArbitraryDataFileListMessage(Peer peer, Message message) { - LOGGER.info("Received hash list from peer {}", peer); - ArbitraryDataFileListMessage arbitraryDataFileListMessage = (ArbitraryDataFileListMessage) message; - - // Do we have a pending request for this data? - Triple request = arbitraryDataFileListRequests.get(message.getId()); - if (request == null || request.getA() == null) { - return; - } - - // Does this message's signature match what we're expecting? - byte[] signature = arbitraryDataFileListMessage.getSignature(); - String signature58 = Base58.encode(signature); - if (!request.getA().equals(signature58)) { - return; - } - - List hashes = arbitraryDataFileListMessage.getHashes(); - if (hashes == null || hashes.isEmpty()) { - return; - } - - // Check transaction exists and hashes are correct - try (final Repository repository = RepositoryManager.getRepository()) { - TransactionData transactionData = repository.getTransactionRepository().fromSignature(signature); - if (!(transactionData instanceof ArbitraryTransactionData)) - return; - - ArbitraryTransactionData arbitraryTransactionData = (ArbitraryTransactionData) transactionData; - - // Load data file(s) - DataFile dataFile = DataFile.fromHash(arbitraryTransactionData.getData()); - dataFile.addChunkHashes(arbitraryTransactionData.getChunkHashes()); - - // Check all hashes exist - for (byte[] hash : hashes) { - if (!dataFile.containsChunk(hash)) { - LOGGER.info("Received non-matching chunk hash {} for signature {}", Base58.encode(hash), signature58); - return; - } - } - - // Update requests map to reflect that we've received it - Triple newEntry = new Triple<>(null, null, request.getC()); - arbitraryDataFileListRequests.put(message.getId(), newEntry); - - // Now fetch actual data from this peer - for (byte[] hash : hashes) { - if (!dataFile.chunkExists(hash)) { - // Only request the file if we aren't already requesting it from someone else - if (!arbitraryDataFileRequests.contains(Base58.encode(hash))) { - DataFile receivedDataFile = fetchArbitraryDataFile(peer, hash); - LOGGER.info("Received data file {} from peer {}", receivedDataFile, peer); - } - else { - LOGGER.info("Already requesting data file {}", dataFile); - } - } - } - - } catch (DataException | InterruptedException e) { - LOGGER.error(String.format("Repository issue while finding arbitrary transaction data list for peer %s", peer), e); - } - - // Forwarding (not yet used) - Peer requestingPeer = request.getB(); - if (requestingPeer != null) { - // Forward to requesting peer; - if (!requestingPeer.sendMessage(arbitraryDataFileListMessage)) { - requestingPeer.disconnect("failed to forward arbitrary data file list"); - } - } - } - private void onNetworkGetOnlineAccountsMessage(Peer peer, Message message) { GetOnlineAccountsMessage getOnlineAccountsMessage = (GetOnlineAccountsMessage) message; @@ -1790,84 +1644,6 @@ public class Controller extends Thread { } } - private void onNetworkGetDataFileMessage(Peer peer, Message message) { - GetDataFileMessage getDataFileMessage = (GetDataFileMessage) message; - byte[] hash = getDataFileMessage.getHash(); - this.stats.getDataFileMessageStats.requests.incrementAndGet(); - - DataFile dataFile = DataFile.fromHash(hash); - if (dataFile.exists()) { - DataFileMessage dataFileMessage = new DataFileMessage(dataFile); - dataFileMessage.setId(message.getId()); - if (!peer.sendMessage(dataFileMessage)) { - LOGGER.info("Couldn't sent file"); - peer.disconnect("failed to send file"); - } - LOGGER.info("Sent file {}", dataFile); - } - else { - - // We don't have this file - this.stats.getDataFileMessageStats.unknownFiles.getAndIncrement(); - - // Send valid, yet unexpected message type in response, so peer's synchronizer doesn't have to wait for timeout - LOGGER.debug(() -> String.format("Sending 'file unknown' response to peer %s for GET_FILE request for unknown file %s", peer, dataFile)); - - // We'll send empty block summaries message as it's very short - // TODO: use a different message type here - Message fileUnknownMessage = new BlockSummariesMessage(Collections.emptyList()); - fileUnknownMessage.setId(message.getId()); - if (!peer.sendMessage(fileUnknownMessage)) { - LOGGER.info("Couldn't sent file-unknown response"); - peer.disconnect("failed to send file-unknown response"); - } - LOGGER.info("Sent file-unknown response for file {}", dataFile); - } - } - - private void onNetworkGetArbitraryDataFileListMessage(Peer peer, Message message) { - GetArbitraryDataFileListMessage getArbitraryDataFileListMessage = (GetArbitraryDataFileListMessage) message; - byte[] signature = getArbitraryDataFileListMessage.getSignature(); - this.stats.getArbitraryDataFileListMessageStats.requests.incrementAndGet(); - - LOGGER.info("Received hash list request from peer {} for signature {}", peer, Base58.encode(signature)); - - List hashes = new ArrayList<>(); - - try (final Repository repository = RepositoryManager.getRepository()) { - - // Firstly we need to lookup this file on chain to get a list of its hashes - ArbitraryTransactionData transactionData = (ArbitraryTransactionData)repository.getTransactionRepository().fromSignature(signature); - if (transactionData instanceof ArbitraryTransactionData) { - - byte[] hash = transactionData.getData(); - byte[] chunkHashes = transactionData.getChunkHashes(); - - // Load file(s) and add any that exist to the list of hashes - DataFile dataFile = DataFile.fromHash(hash); - if (chunkHashes.length > 0) { - dataFile.addChunkHashes(chunkHashes); - for (DataFileChunk dataFileChunk : dataFile.getChunks()) { - if (dataFileChunk.exists()) { - hashes.add(dataFileChunk.getHash()); - } - } - } - } - - } catch (DataException e) { - LOGGER.error(String.format("Repository issue while fetching arbitrary file list for peer %s", peer), e); - } - - ArbitraryDataFileListMessage arbitraryDataFileListMessage = new ArbitraryDataFileListMessage(signature, hashes); - arbitraryDataFileListMessage.setId(message.getId()); - if (!peer.sendMessage(arbitraryDataFileListMessage)) { - LOGGER.info("Couldn't send list of hashes"); - peer.disconnect("failed to send list of hashes"); - } - LOGGER.info("Sent list of hashes", hashes); - } - // Utilities private void verifyAndAddAccount(Repository repository, OnlineAccountData onlineAccountData) throws DataException { @@ -2115,63 +1891,6 @@ public class Controller extends Thread { } } - public boolean fetchArbitraryDataFileList(byte[] signature) throws InterruptedException { - LOGGER.info(String.format("Sending data file list request for signature %s", Base58.encode(signature))); - // Build request - Message getArbitraryDataFileListMessage = new GetArbitraryDataFileListMessage(signature); - - // Save our request into requests map - String signature58 = Base58.encode(signature); - Triple requestEntry = new Triple<>(signature58, null, NTP.getTime()); - - // Assign random ID to this message - int id; - do { - id = new Random().nextInt(Integer.MAX_VALUE - 1) + 1; - - // Put queue into map (keyed by message ID) so we can poll for a response - // If putIfAbsent() doesn't return null, then this ID is already taken - } while (arbitraryDataFileListRequests.put(id, requestEntry) != null); - getArbitraryDataFileListMessage.setId(id); - - // Broadcast request - Network.getInstance().broadcast(peer -> getArbitraryDataFileListMessage); - - // Poll to see if data has arrived - final long singleWait = 100; - long totalWait = 0; - while (totalWait < ARBITRARY_REQUEST_TIMEOUT) { - Thread.sleep(singleWait); - - requestEntry = arbitraryDataFileListRequests.get(id); - if (requestEntry == null) - return false; - - if (requestEntry.getA() == null) - break; - - totalWait += singleWait; - } - return true; - } - - private DataFile fetchArbitraryDataFile(Peer peer, byte[] hash) throws InterruptedException { - String hash58 = Base58.encode(hash); - LOGGER.info(String.format("Fetching data file %.8s from peer %s", hash58, peer)); - arbitraryDataFileRequests.add(hash58); - Message getDataFileMessage = new GetDataFileMessage(hash); - - Message message = peer.getResponse(getDataFileMessage); - arbitraryDataFileRequests.remove(hash58); - - if (message == null || message.getType() != Message.MessageType.DATA_FILE) { - return null; - } - - DataFileMessage dataFileMessage = (DataFileMessage) message; - return dataFileMessage.getDataFile(); - } - /** Returns a list of peers that are not misbehaving, and have a recent block. */ public List getRecentBehavingPeers() { final Long minLatestBlockTimestamp = getMinimumLatestBlockTimestamp();