diff --git a/src/main/java/org/qortal/controller/ArbitraryDataManager.java b/src/main/java/org/qortal/controller/ArbitraryDataManager.java index 1b3311d0..8292d2a5 100644 --- a/src/main/java/org/qortal/controller/ArbitraryDataManager.java +++ b/src/main/java/org/qortal/controller/ArbitraryDataManager.java @@ -12,8 +12,6 @@ import org.qortal.data.transaction.TransactionData; import org.qortal.repository.DataException; import org.qortal.repository.Repository; import org.qortal.repository.RepositoryManager; -import org.qortal.storage.DataFile; -import org.qortal.storage.DataFileChunk; import org.qortal.transaction.ArbitraryTransaction; import org.qortal.transaction.Transaction.TransactionType; @@ -62,53 +60,9 @@ public class ArbitraryDataManager extends Thread { final int index = new Random().nextInt(signatures.size()); byte[] signature = signatures.get(index); - // Load the full transaction data so we can access the file hashes - ArbitraryTransactionData transactionData = (ArbitraryTransactionData)repository.getTransactionRepository().fromSignature(signature); - if (!(transactionData instanceof ArbitraryTransactionData)) { - signatures.remove(signature); - continue; - } - - // Load hashes - byte[] digest = transactionData.getData(); - byte[] chunkHashes = transactionData.getChunkHashes(); - - // Load data file(s) - DataFile dataFile = DataFile.fromDigest(digest); - if (chunkHashes.length > 0) { - dataFile.addChunkHashes(chunkHashes); - - // Now try and fetch each chunk in turn if we don't have them already - for (DataFileChunk dataFileChunk : dataFile.getChunks()) { - if (!dataFileChunk.exists()) { - LOGGER.info("Requesting chunk {}...", dataFileChunk); - boolean success = Controller.getInstance().fetchArbitraryDataFile(dataFileChunk.getHash()); - if (success) { - LOGGER.info("Chunk {} received", dataFileChunk); - } - else { - LOGGER.info("Couldn't retrieve chunk {}", dataFileChunk); - } - } - } - } - else if (transactionData.getSize() < DataFileChunk.CHUNK_SIZE) { - // Fetch the complete file, as it is less than the chunk size - LOGGER.info("Requesting file {}...", dataFile.getHash58()); - boolean success = Controller.getInstance().fetchArbitraryDataFile(dataFile.getHash()); - if (success) { - LOGGER.info("File {} received", dataFile); - } - else { - LOGGER.info("Couldn't retrieve file {}", dataFile); - } - } - else { - // Invalid transaction (should have already failed validation) - LOGGER.info(String.format("Invalid arbitrary transaction: %.8s", signature)); - } - - signatures.remove(signature); + // Ask our connected peers if they have files for this signature + // This process automatically then fetches the files themselves if a peer is found + Controller.getInstance().fetchArbitraryDataFileList(signature); } catch (DataException e) { LOGGER.error("Repository issue when fetching arbitrary transaction data", e); diff --git a/src/main/java/org/qortal/controller/Controller.java b/src/main/java/org/qortal/controller/Controller.java index 9e3f8903..576356c4 100644 --- a/src/main/java/org/qortal/controller/Controller.java +++ b/src/main/java/org/qortal/controller/Controller.java @@ -14,7 +14,6 @@ import org.qortal.block.Block; import org.qortal.block.BlockChain; import org.qortal.block.BlockChain.BlockTimingByHeight; import org.qortal.controller.Synchronizer.SynchronizationResult; -import org.qortal.crypto.Crypto; import org.qortal.data.account.MintingAccountData; import org.qortal.data.account.RewardShareData; import org.qortal.data.block.BlockData; @@ -23,7 +22,6 @@ import org.qortal.data.network.OnlineAccountData; import org.qortal.data.network.PeerChainTipData; import org.qortal.data.network.PeerData; import org.qortal.data.transaction.ArbitraryTransactionData; -import org.qortal.data.transaction.ArbitraryTransactionData.DataType; import org.qortal.data.transaction.ChatTransactionData; import org.qortal.data.transaction.TransactionData; import org.qortal.event.Event; @@ -41,6 +39,7 @@ import org.qortal.repository.RepositoryManager; import org.qortal.repository.hsqldb.HSQLDBRepositoryFactory; import org.qortal.settings.Settings; import org.qortal.storage.DataFile; +import org.qortal.storage.DataFileChunk; import org.qortal.transaction.ArbitraryTransaction; import org.qortal.transaction.Transaction; import org.qortal.transaction.Transaction.TransactionType; @@ -167,7 +166,9 @@ public class Controller extends Thread { *
  • we have forwarded the data payload (and maybe also saved it locally)
  • * */ - private Map> arbitraryDataRequests = Collections.synchronizedMap(new HashMap<>()); + private Map> arbitraryDataFileListRequests = Collections.synchronizedMap(new HashMap<>()); + + private List arbitraryDataFileRequests = Collections.synchronizedList(new ArrayList<>()); /** Lock for only allowing one blockchain-modifying codepath at a time. e.g. synchronization or newly minted block. */ private final ReentrantLock blockchainLock = new ReentrantLock(); @@ -232,6 +233,15 @@ public class Controller extends Thread { } public GetDataFileMessageStats getDataFileMessageStats = new GetDataFileMessageStats(); + public static class GetArbitraryDataFileListMessageStats { + public AtomicLong requests = new AtomicLong(); + public AtomicLong unknownFiles = new AtomicLong(); + + public GetArbitraryDataFileListMessageStats() { + } + } + public GetArbitraryDataFileListMessageStats getArbitraryDataFileListMessageStats = new GetArbitraryDataFileListMessageStats(); + public AtomicLong latestBlocksCacheRefills = new AtomicLong(); public StatsSnapshot() { @@ -552,7 +562,7 @@ public class Controller extends Thread { // Clean up arbitrary data request cache final long requestMinimumTimestamp = now - ARBITRARY_REQUEST_TIMEOUT; - arbitraryDataRequests.entrySet().removeIf(entry -> entry.getValue().getC() < requestMinimumTimestamp); + arbitraryDataFileListRequests.entrySet().removeIf(entry -> entry.getValue().getC() < requestMinimumTimestamp); // Time to 'checkpoint' uncommitted repository writes? if (now >= repositoryCheckpointTimestamp + repositoryCheckpointInterval) { @@ -1235,8 +1245,8 @@ public class Controller extends Thread { onNetworkGetArbitraryDataMessage(peer, message); break; - case ARBITRARY_DATA: - onNetworkArbitraryDataMessage(peer, message); + case ARBITRARY_DATA_FILE_LIST: + onNetworkArbitraryDataFileListMessage(peer, message); break; case GET_ONLINE_ACCOUNTS: @@ -1251,6 +1261,10 @@ public class Controller extends Thread { onNetworkGetDataFileMessage(peer, message); break; + case GET_ARBITRARY_DATA_FILE_LIST: + onNetworkGetArbitraryDataFileListMessage(peer, message); + break; + default: LOGGER.debug(() -> String.format("Unhandled %s message [ID %d] from peer %s", message.getType().name(), message.getId(), peer)); break; @@ -1619,7 +1633,7 @@ public class Controller extends Thread { Triple newEntry = new Triple<>(signature58, peer, timestamp); // If we've seen this request recently, then ignore - if (arbitraryDataRequests.putIfAbsent(message.getId(), newEntry) != null) + if (arbitraryDataFileListRequests.putIfAbsent(message.getId(), newEntry) != null) return; // Do we even have this transaction? @@ -1638,7 +1652,7 @@ public class Controller extends Thread { // Update requests map to reflect that we've sent it newEntry = new Triple<>(signature58, null, timestamp); - arbitraryDataRequests.put(message.getId(), newEntry); + arbitraryDataFileListRequests.put(message.getId(), newEntry); Message arbitraryDataMessage = new ArbitraryDataMessage(signature, data); arbitraryDataMessage.setId(message.getId()); @@ -1655,23 +1669,29 @@ public class Controller extends Thread { } } - private void onNetworkArbitraryDataMessage(Peer peer, Message message) { - ArbitraryDataMessage arbitraryDataMessage = (ArbitraryDataMessage) message; + private void onNetworkArbitraryDataFileListMessage(Peer peer, Message message) { + LOGGER.info("Received hash list from peer {}", peer); + ArbitraryDataFileListMessage arbitraryDataFileListMessage = (ArbitraryDataFileListMessage) message; // Do we have a pending request for this data? - Triple request = arbitraryDataRequests.get(message.getId()); - if (request == null || request.getA() == null) + Triple request = arbitraryDataFileListRequests.get(message.getId()); + if (request == null || request.getA() == null) { return; + } // Does this message's signature match what we're expecting? - byte[] signature = arbitraryDataMessage.getSignature(); + byte[] signature = arbitraryDataFileListMessage.getSignature(); String signature58 = Base58.encode(signature); - if (!request.getA().equals(signature58)) + if (!request.getA().equals(signature58)) { return; + } - byte[] data = arbitraryDataMessage.getData(); + List hashes = arbitraryDataFileListMessage.getHashes(); + if (hashes == null || hashes.isEmpty()) { + return; + } - // Check transaction exists and payload hash is correct + // Check transaction exists and hashes are correct try (final Repository repository = RepositoryManager.getRepository()) { TransactionData transactionData = repository.getTransactionRepository().fromSignature(signature); if (!(transactionData instanceof ArbitraryTransactionData)) @@ -1679,31 +1699,47 @@ public class Controller extends Thread { ArbitraryTransactionData arbitraryTransactionData = (ArbitraryTransactionData) transactionData; - byte[] actualHash = Crypto.digest(data); + // Load data file(s) + DataFile dataFile = DataFile.fromHash(arbitraryTransactionData.getData()); + dataFile.addChunkHashes(arbitraryTransactionData.getChunkHashes()); - // "data" from repository will always be hash of actual raw data - if (!Arrays.equals(arbitraryTransactionData.getData(), actualHash)) - return; + // Check all hashes exist + for (byte[] hash : hashes) { + if (!dataFile.containsChunk(hash)) { + LOGGER.info("Received non-matching chunk hash {} for signature {}", Base58.encode(hash), signature58); + return; + } + } // Update requests map to reflect that we've received it Triple newEntry = new Triple<>(null, null, request.getC()); - arbitraryDataRequests.put(message.getId(), newEntry); + arbitraryDataFileListRequests.put(message.getId(), newEntry); + + // Now fetch actual data from this peer + for (byte[] hash : hashes) { + if (!dataFile.chunkExists(hash)) { + // Only request the file if we aren't already requesting it from someone else + if (!arbitraryDataFileRequests.contains(Base58.encode(hash))) { + DataFile receivedDataFile = fetchArbitraryDataFile(peer, hash); + LOGGER.info("Received data file {} from peer {}", receivedDataFile, peer); + } + else { + LOGGER.info("Already requesting data file {}", dataFile); + } + } + } - // Save payload locally - // TODO: storage policy - arbitraryTransactionData.setDataType(DataType.RAW_DATA); - arbitraryTransactionData.setData(data); - repository.getArbitraryRepository().save(arbitraryTransactionData); - repository.saveChanges(); - } catch (DataException e) { - LOGGER.error(String.format("Repository issue while finding arbitrary transaction data for peer %s", peer), e); + } catch (DataException | InterruptedException e) { + LOGGER.error(String.format("Repository issue while finding arbitrary transaction data list for peer %s", peer), e); } + // Forwarding (not yet used) Peer requestingPeer = request.getB(); if (requestingPeer != null) { // Forward to requesting peer; - if (!requestingPeer.sendMessage(arbitraryDataMessage)) - requestingPeer.disconnect("failed to forward arbitrary data"); + if (!requestingPeer.sendMessage(arbitraryDataFileListMessage)) { + requestingPeer.disconnect("failed to forward arbitrary data file list"); + } } } @@ -1756,10 +1792,10 @@ public class Controller extends Thread { private void onNetworkGetDataFileMessage(Peer peer, Message message) { GetDataFileMessage getDataFileMessage = (GetDataFileMessage) message; - byte[] digest = getDataFileMessage.getDigest(); + byte[] hash = getDataFileMessage.getHash(); this.stats.getDataFileMessageStats.requests.incrementAndGet(); - DataFile dataFile = DataFile.fromDigest(digest); + DataFile dataFile = DataFile.fromHash(hash); if (dataFile.exists()) { DataFileMessage dataFileMessage = new DataFileMessage(dataFile); dataFileMessage.setId(message.getId()); @@ -1789,6 +1825,49 @@ public class Controller extends Thread { } } + private void onNetworkGetArbitraryDataFileListMessage(Peer peer, Message message) { + GetArbitraryDataFileListMessage getArbitraryDataFileListMessage = (GetArbitraryDataFileListMessage) message; + byte[] signature = getArbitraryDataFileListMessage.getSignature(); + this.stats.getArbitraryDataFileListMessageStats.requests.incrementAndGet(); + + LOGGER.info("Received hash list request from peer {} for signature {}", peer, Base58.encode(signature)); + + List hashes = new ArrayList<>(); + + try (final Repository repository = RepositoryManager.getRepository()) { + + // Firstly we need to lookup this file on chain to get a list of its hashes + ArbitraryTransactionData transactionData = (ArbitraryTransactionData)repository.getTransactionRepository().fromSignature(signature); + if (transactionData instanceof ArbitraryTransactionData) { + + byte[] hash = transactionData.getData(); + byte[] chunkHashes = transactionData.getChunkHashes(); + + // Load file(s) and add any that exist to the list of hashes + DataFile dataFile = DataFile.fromHash(hash); + if (chunkHashes.length > 0) { + dataFile.addChunkHashes(chunkHashes); + for (DataFileChunk dataFileChunk : dataFile.getChunks()) { + if (dataFileChunk.exists()) { + hashes.add(dataFileChunk.getHash()); + } + } + } + } + + } catch (DataException e) { + LOGGER.error(String.format("Repository issue while fetching arbitrary file list for peer %s", peer), e); + } + + ArbitraryDataFileListMessage arbitraryDataFileListMessage = new ArbitraryDataFileListMessage(signature, hashes); + arbitraryDataFileListMessage.setId(message.getId()); + if (!peer.sendMessage(arbitraryDataFileListMessage)) { + LOGGER.info("Couldn't send list of hashes"); + peer.disconnect("failed to send list of hashes"); + } + LOGGER.info("Sent list of hashes", hashes); + } + // Utilities private void verifyAndAddAccount(Repository repository, OnlineAccountData onlineAccountData) throws DataException { @@ -2036,13 +2115,14 @@ public class Controller extends Thread { } } - public boolean fetchArbitraryDataFile(byte[] hash) throws InterruptedException { + public boolean fetchArbitraryDataFileList(byte[] signature) throws InterruptedException { + LOGGER.info(String.format("Sending data file list request for signature %s", Base58.encode(signature))); // Build request - Message getDataFileMessage = new GetDataFileMessage(hash); + Message getArbitraryDataFileListMessage = new GetArbitraryDataFileListMessage(signature); // Save our request into requests map - String hash58 = Base58.encode(hash); - Triple requestEntry = new Triple<>(hash58, null, NTP.getTime()); + String signature58 = Base58.encode(signature); + Triple requestEntry = new Triple<>(signature58, null, NTP.getTime()); // Assign random ID to this message int id; @@ -2051,11 +2131,11 @@ public class Controller extends Thread { // Put queue into map (keyed by message ID) so we can poll for a response // If putIfAbsent() doesn't return null, then this ID is already taken - } while (arbitraryDataRequests.put(id, requestEntry) != null); - getDataFileMessage.setId(id); + } while (arbitraryDataFileListRequests.put(id, requestEntry) != null); + getArbitraryDataFileListMessage.setId(id); // Broadcast request - Network.getInstance().broadcast(peer -> getDataFileMessage); + Network.getInstance().broadcast(peer -> getArbitraryDataFileListMessage); // Poll to see if data has arrived final long singleWait = 100; @@ -2063,7 +2143,7 @@ public class Controller extends Thread { while (totalWait < ARBITRARY_REQUEST_TIMEOUT) { Thread.sleep(singleWait); - requestEntry = arbitraryDataRequests.get(id); + requestEntry = arbitraryDataFileListRequests.get(id); if (requestEntry == null) return false; @@ -2075,6 +2155,23 @@ public class Controller extends Thread { return true; } + private DataFile fetchArbitraryDataFile(Peer peer, byte[] hash) throws InterruptedException { + String hash58 = Base58.encode(hash); + LOGGER.info(String.format("Fetching data file %.8s from peer %s", hash58, peer)); + arbitraryDataFileRequests.add(hash58); + Message getDataFileMessage = new GetDataFileMessage(hash); + + Message message = peer.getResponse(getDataFileMessage); + arbitraryDataFileRequests.remove(hash58); + + if (message == null || message.getType() != Message.MessageType.DATA_FILE) { + return null; + } + + DataFileMessage dataFileMessage = (DataFileMessage) message; + return dataFileMessage.getDataFile(); + } + /** Returns a list of peers that are not misbehaving, and have a recent block. */ public List getRecentBehavingPeers() { final Long minLatestBlockTimestamp = getMinimumLatestBlockTimestamp(); diff --git a/src/main/java/org/qortal/network/message/ArbitraryDataFileListMessage.java b/src/main/java/org/qortal/network/message/ArbitraryDataFileListMessage.java new file mode 100644 index 00000000..36658a9f --- /dev/null +++ b/src/main/java/org/qortal/network/message/ArbitraryDataFileListMessage.java @@ -0,0 +1,90 @@ +package org.qortal.network.message; + +import com.google.common.primitives.Ints; +import org.qortal.data.block.BlockSummaryData; +import org.qortal.transform.Transformer; +import org.qortal.transform.block.BlockTransformer; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +public class ArbitraryDataFileListMessage extends Message { + + private static final int SIGNATURE_LENGTH = Transformer.SIGNATURE_LENGTH; + private static final int HASH_LENGTH = Transformer.SHA256_LENGTH; + + private final byte[] signature; + private final List hashes; + + public ArbitraryDataFileListMessage(byte[] signature, List hashes) { + super(MessageType.ARBITRARY_DATA_FILE_LIST); + + this.signature = signature; + this.hashes = hashes; + } + + public ArbitraryDataFileListMessage(int id, byte[] signature, List hashes) { + super(id, MessageType.ARBITRARY_DATA_FILE_LIST); + + this.signature = signature; + this.hashes = hashes; + } + + public List getHashes() { + return this.hashes; + } + + public byte[] getSignature() { + return this.signature; + } + + public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException { + byte[] signature = new byte[SIGNATURE_LENGTH]; + bytes.get(signature); + + int count = bytes.getInt(); + + if (bytes.remaining() != count * HASH_LENGTH) + return null; + + List hashes = new ArrayList<>(); + for (int i = 0; i < count; ++i) { + + byte[] hash = new byte[HASH_LENGTH]; + bytes.get(hash); + hashes.add(hash); + } + + return new ArbitraryDataFileListMessage(id, signature, hashes); + } + + @Override + protected byte[] toData() { + try { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + + bytes.write(this.signature); + + bytes.write(Ints.toByteArray(this.hashes.size())); + + for (byte[] hash : this.hashes) { + bytes.write(hash); + } + + return bytes.toByteArray(); + } catch (IOException e) { + return null; + } + } + + public ArbitraryDataFileListMessage cloneWithNewId(int newId) { + ArbitraryDataFileListMessage clone = new ArbitraryDataFileListMessage(this.signature, this.hashes); + clone.setId(newId); + return clone; + } + +} diff --git a/src/main/java/org/qortal/network/message/GetArbitraryDataFileListMessage.java b/src/main/java/org/qortal/network/message/GetArbitraryDataFileListMessage.java new file mode 100644 index 00000000..49fd60cb --- /dev/null +++ b/src/main/java/org/qortal/network/message/GetArbitraryDataFileListMessage.java @@ -0,0 +1,54 @@ +package org.qortal.network.message; + +import org.qortal.transform.Transformer; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; + +public class GetArbitraryDataFileListMessage extends Message { + + private static final int SIGNATURE_LENGTH = Transformer.SIGNATURE_LENGTH; + + private final byte[] signature; + + public GetArbitraryDataFileListMessage(byte[] signature) { + this(-1, signature); + } + + private GetArbitraryDataFileListMessage(int id, byte[] signature) { + super(id, MessageType.GET_ARBITRARY_DATA_FILE_LIST); + + this.signature = signature; + } + + public byte[] getSignature() { + return this.signature; + } + + public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException { + if (bytes.remaining() != SIGNATURE_LENGTH) + return null; + + byte[] signature = new byte[SIGNATURE_LENGTH]; + + bytes.get(signature); + + return new GetArbitraryDataFileListMessage(id, signature); + } + + @Override + protected byte[] toData() { + try { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + + bytes.write(this.signature); + + return bytes.toByteArray(); + } catch (IOException e) { + return null; + } + } + +} diff --git a/src/main/java/org/qortal/network/message/Message.java b/src/main/java/org/qortal/network/message/Message.java index 4483caee..3ecb151b 100644 --- a/src/main/java/org/qortal/network/message/Message.java +++ b/src/main/java/org/qortal/network/message/Message.java @@ -86,7 +86,10 @@ public abstract class Message { GET_BLOCKS(101), DATA_FILE(110), - GET_DATA_FILE(111); + GET_DATA_FILE(111), + + ARBITRARY_DATA_FILE_LIST(120), + GET_ARBITRARY_DATA_FILE_LIST(121); public final int value; public final Method fromByteBufferMethod; diff --git a/src/main/java/org/qortal/storage/DataFile.java b/src/main/java/org/qortal/storage/DataFile.java index be78e6b0..2e872a51 100644 --- a/src/main/java/org/qortal/storage/DataFile.java +++ b/src/main/java/org/qortal/storage/DataFile.java @@ -372,8 +372,7 @@ public class DataFile { return chunk.exists(); } } - File file = new File(this.filePath); - return file.exists(); + return false; } public boolean allChunksExist(byte[] chunks) { @@ -389,6 +388,15 @@ public class DataFile { return true; } + public boolean containsChunk(byte[] hash) { + for (DataFileChunk chunk : this.chunks) { + if (Arrays.equals(hash, chunk.getHash())) { + return true; + } + } + return false; + } + public long size() { Path path = Paths.get(this.filePath); try {