Browse Source

Rework of arbitrary data requests

Previously we would ask all connected peers for the file itself, but this caused the network to be swamped when multiple peers responded with the same file.

This new approach instead asks all connected peers to send back a list of hashes for all files they have relating to a transaction signature. The requesting node then uses these lists to make separate requests for each missing file.
qdn
CalDescent 3 years ago
parent
commit
2679252b04
  1. 52
      src/main/java/org/qortal/controller/ArbitraryDataManager.java
  2. 179
      src/main/java/org/qortal/controller/Controller.java
  3. 90
      src/main/java/org/qortal/network/message/ArbitraryDataFileListMessage.java
  4. 54
      src/main/java/org/qortal/network/message/GetArbitraryDataFileListMessage.java
  5. 5
      src/main/java/org/qortal/network/message/Message.java
  6. 12
      src/main/java/org/qortal/storage/DataFile.java

52
src/main/java/org/qortal/controller/ArbitraryDataManager.java

@ -12,8 +12,6 @@ import org.qortal.data.transaction.TransactionData;
import org.qortal.repository.DataException;
import org.qortal.repository.Repository;
import org.qortal.repository.RepositoryManager;
import org.qortal.storage.DataFile;
import org.qortal.storage.DataFileChunk;
import org.qortal.transaction.ArbitraryTransaction;
import org.qortal.transaction.Transaction.TransactionType;
@ -62,53 +60,9 @@ public class ArbitraryDataManager extends Thread {
final int index = new Random().nextInt(signatures.size());
byte[] signature = signatures.get(index);
// Load the full transaction data so we can access the file hashes
ArbitraryTransactionData transactionData = (ArbitraryTransactionData)repository.getTransactionRepository().fromSignature(signature);
if (!(transactionData instanceof ArbitraryTransactionData)) {
signatures.remove(signature);
continue;
}
// Load hashes
byte[] digest = transactionData.getData();
byte[] chunkHashes = transactionData.getChunkHashes();
// Load data file(s)
DataFile dataFile = DataFile.fromDigest(digest);
if (chunkHashes.length > 0) {
dataFile.addChunkHashes(chunkHashes);
// Now try and fetch each chunk in turn if we don't have them already
for (DataFileChunk dataFileChunk : dataFile.getChunks()) {
if (!dataFileChunk.exists()) {
LOGGER.info("Requesting chunk {}...", dataFileChunk);
boolean success = Controller.getInstance().fetchArbitraryDataFile(dataFileChunk.getHash());
if (success) {
LOGGER.info("Chunk {} received", dataFileChunk);
}
else {
LOGGER.info("Couldn't retrieve chunk {}", dataFileChunk);
}
}
}
}
else if (transactionData.getSize() < DataFileChunk.CHUNK_SIZE) {
// Fetch the complete file, as it is less than the chunk size
LOGGER.info("Requesting file {}...", dataFile.getHash58());
boolean success = Controller.getInstance().fetchArbitraryDataFile(dataFile.getHash());
if (success) {
LOGGER.info("File {} received", dataFile);
}
else {
LOGGER.info("Couldn't retrieve file {}", dataFile);
}
}
else {
// Invalid transaction (should have already failed validation)
LOGGER.info(String.format("Invalid arbitrary transaction: %.8s", signature));
}
signatures.remove(signature);
// Ask our connected peers if they have files for this signature
// This process automatically then fetches the files themselves if a peer is found
Controller.getInstance().fetchArbitraryDataFileList(signature);
} catch (DataException e) {
LOGGER.error("Repository issue when fetching arbitrary transaction data", e);

179
src/main/java/org/qortal/controller/Controller.java

@ -14,7 +14,6 @@ import org.qortal.block.Block;
import org.qortal.block.BlockChain;
import org.qortal.block.BlockChain.BlockTimingByHeight;
import org.qortal.controller.Synchronizer.SynchronizationResult;
import org.qortal.crypto.Crypto;
import org.qortal.data.account.MintingAccountData;
import org.qortal.data.account.RewardShareData;
import org.qortal.data.block.BlockData;
@ -23,7 +22,6 @@ import org.qortal.data.network.OnlineAccountData;
import org.qortal.data.network.PeerChainTipData;
import org.qortal.data.network.PeerData;
import org.qortal.data.transaction.ArbitraryTransactionData;
import org.qortal.data.transaction.ArbitraryTransactionData.DataType;
import org.qortal.data.transaction.ChatTransactionData;
import org.qortal.data.transaction.TransactionData;
import org.qortal.event.Event;
@ -41,6 +39,7 @@ import org.qortal.repository.RepositoryManager;
import org.qortal.repository.hsqldb.HSQLDBRepositoryFactory;
import org.qortal.settings.Settings;
import org.qortal.storage.DataFile;
import org.qortal.storage.DataFileChunk;
import org.qortal.transaction.ArbitraryTransaction;
import org.qortal.transaction.Transaction;
import org.qortal.transaction.Transaction.TransactionType;
@ -167,7 +166,9 @@ public class Controller extends Thread {
* <li>we have forwarded the data payload (and maybe also saved it locally)</li>
* </ul>
*/
private Map<Integer, Triple<String, Peer, Long>> arbitraryDataRequests = Collections.synchronizedMap(new HashMap<>());
private Map<Integer, Triple<String, Peer, Long>> arbitraryDataFileListRequests = Collections.synchronizedMap(new HashMap<>());
private List<Object> arbitraryDataFileRequests = Collections.synchronizedList(new ArrayList<>());
/** Lock for only allowing one blockchain-modifying codepath at a time. e.g. synchronization or newly minted block. */
private final ReentrantLock blockchainLock = new ReentrantLock();
@ -232,6 +233,15 @@ public class Controller extends Thread {
}
public GetDataFileMessageStats getDataFileMessageStats = new GetDataFileMessageStats();
public static class GetArbitraryDataFileListMessageStats {
public AtomicLong requests = new AtomicLong();
public AtomicLong unknownFiles = new AtomicLong();
public GetArbitraryDataFileListMessageStats() {
}
}
public GetArbitraryDataFileListMessageStats getArbitraryDataFileListMessageStats = new GetArbitraryDataFileListMessageStats();
public AtomicLong latestBlocksCacheRefills = new AtomicLong();
public StatsSnapshot() {
@ -552,7 +562,7 @@ public class Controller extends Thread {
// Clean up arbitrary data request cache
final long requestMinimumTimestamp = now - ARBITRARY_REQUEST_TIMEOUT;
arbitraryDataRequests.entrySet().removeIf(entry -> entry.getValue().getC() < requestMinimumTimestamp);
arbitraryDataFileListRequests.entrySet().removeIf(entry -> entry.getValue().getC() < requestMinimumTimestamp);
// Time to 'checkpoint' uncommitted repository writes?
if (now >= repositoryCheckpointTimestamp + repositoryCheckpointInterval) {
@ -1235,8 +1245,8 @@ public class Controller extends Thread {
onNetworkGetArbitraryDataMessage(peer, message);
break;
case ARBITRARY_DATA:
onNetworkArbitraryDataMessage(peer, message);
case ARBITRARY_DATA_FILE_LIST:
onNetworkArbitraryDataFileListMessage(peer, message);
break;
case GET_ONLINE_ACCOUNTS:
@ -1251,6 +1261,10 @@ public class Controller extends Thread {
onNetworkGetDataFileMessage(peer, message);
break;
case GET_ARBITRARY_DATA_FILE_LIST:
onNetworkGetArbitraryDataFileListMessage(peer, message);
break;
default:
LOGGER.debug(() -> String.format("Unhandled %s message [ID %d] from peer %s", message.getType().name(), message.getId(), peer));
break;
@ -1619,7 +1633,7 @@ public class Controller extends Thread {
Triple<String, Peer, Long> newEntry = new Triple<>(signature58, peer, timestamp);
// If we've seen this request recently, then ignore
if (arbitraryDataRequests.putIfAbsent(message.getId(), newEntry) != null)
if (arbitraryDataFileListRequests.putIfAbsent(message.getId(), newEntry) != null)
return;
// Do we even have this transaction?
@ -1638,7 +1652,7 @@ public class Controller extends Thread {
// Update requests map to reflect that we've sent it
newEntry = new Triple<>(signature58, null, timestamp);
arbitraryDataRequests.put(message.getId(), newEntry);
arbitraryDataFileListRequests.put(message.getId(), newEntry);
Message arbitraryDataMessage = new ArbitraryDataMessage(signature, data);
arbitraryDataMessage.setId(message.getId());
@ -1655,23 +1669,29 @@ public class Controller extends Thread {
}
}
private void onNetworkArbitraryDataMessage(Peer peer, Message message) {
ArbitraryDataMessage arbitraryDataMessage = (ArbitraryDataMessage) message;
private void onNetworkArbitraryDataFileListMessage(Peer peer, Message message) {
LOGGER.info("Received hash list from peer {}", peer);
ArbitraryDataFileListMessage arbitraryDataFileListMessage = (ArbitraryDataFileListMessage) message;
// Do we have a pending request for this data?
Triple<String, Peer, Long> request = arbitraryDataRequests.get(message.getId());
if (request == null || request.getA() == null)
Triple<String, Peer, Long> request = arbitraryDataFileListRequests.get(message.getId());
if (request == null || request.getA() == null) {
return;
}
// Does this message's signature match what we're expecting?
byte[] signature = arbitraryDataMessage.getSignature();
byte[] signature = arbitraryDataFileListMessage.getSignature();
String signature58 = Base58.encode(signature);
if (!request.getA().equals(signature58))
if (!request.getA().equals(signature58)) {
return;
}
byte[] data = arbitraryDataMessage.getData();
List<byte[]> hashes = arbitraryDataFileListMessage.getHashes();
if (hashes == null || hashes.isEmpty()) {
return;
}
// Check transaction exists and payload hash is correct
// Check transaction exists and hashes are correct
try (final Repository repository = RepositoryManager.getRepository()) {
TransactionData transactionData = repository.getTransactionRepository().fromSignature(signature);
if (!(transactionData instanceof ArbitraryTransactionData))
@ -1679,31 +1699,47 @@ public class Controller extends Thread {
ArbitraryTransactionData arbitraryTransactionData = (ArbitraryTransactionData) transactionData;
byte[] actualHash = Crypto.digest(data);
// Load data file(s)
DataFile dataFile = DataFile.fromHash(arbitraryTransactionData.getData());
dataFile.addChunkHashes(arbitraryTransactionData.getChunkHashes());
// "data" from repository will always be hash of actual raw data
if (!Arrays.equals(arbitraryTransactionData.getData(), actualHash))
return;
// Check all hashes exist
for (byte[] hash : hashes) {
if (!dataFile.containsChunk(hash)) {
LOGGER.info("Received non-matching chunk hash {} for signature {}", Base58.encode(hash), signature58);
return;
}
}
// Update requests map to reflect that we've received it
Triple<String, Peer, Long> newEntry = new Triple<>(null, null, request.getC());
arbitraryDataRequests.put(message.getId(), newEntry);
arbitraryDataFileListRequests.put(message.getId(), newEntry);
// Now fetch actual data from this peer
for (byte[] hash : hashes) {
if (!dataFile.chunkExists(hash)) {
// Only request the file if we aren't already requesting it from someone else
if (!arbitraryDataFileRequests.contains(Base58.encode(hash))) {
DataFile receivedDataFile = fetchArbitraryDataFile(peer, hash);
LOGGER.info("Received data file {} from peer {}", receivedDataFile, peer);
}
else {
LOGGER.info("Already requesting data file {}", dataFile);
}
}
}
// Save payload locally
// TODO: storage policy
arbitraryTransactionData.setDataType(DataType.RAW_DATA);
arbitraryTransactionData.setData(data);
repository.getArbitraryRepository().save(arbitraryTransactionData);
repository.saveChanges();
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while finding arbitrary transaction data for peer %s", peer), e);
} catch (DataException | InterruptedException e) {
LOGGER.error(String.format("Repository issue while finding arbitrary transaction data list for peer %s", peer), e);
}
// Forwarding (not yet used)
Peer requestingPeer = request.getB();
if (requestingPeer != null) {
// Forward to requesting peer;
if (!requestingPeer.sendMessage(arbitraryDataMessage))
requestingPeer.disconnect("failed to forward arbitrary data");
if (!requestingPeer.sendMessage(arbitraryDataFileListMessage)) {
requestingPeer.disconnect("failed to forward arbitrary data file list");
}
}
}
@ -1756,10 +1792,10 @@ public class Controller extends Thread {
private void onNetworkGetDataFileMessage(Peer peer, Message message) {
GetDataFileMessage getDataFileMessage = (GetDataFileMessage) message;
byte[] digest = getDataFileMessage.getDigest();
byte[] hash = getDataFileMessage.getHash();
this.stats.getDataFileMessageStats.requests.incrementAndGet();
DataFile dataFile = DataFile.fromDigest(digest);
DataFile dataFile = DataFile.fromHash(hash);
if (dataFile.exists()) {
DataFileMessage dataFileMessage = new DataFileMessage(dataFile);
dataFileMessage.setId(message.getId());
@ -1789,6 +1825,49 @@ public class Controller extends Thread {
}
}
private void onNetworkGetArbitraryDataFileListMessage(Peer peer, Message message) {
GetArbitraryDataFileListMessage getArbitraryDataFileListMessage = (GetArbitraryDataFileListMessage) message;
byte[] signature = getArbitraryDataFileListMessage.getSignature();
this.stats.getArbitraryDataFileListMessageStats.requests.incrementAndGet();
LOGGER.info("Received hash list request from peer {} for signature {}", peer, Base58.encode(signature));
List<byte[]> hashes = new ArrayList<>();
try (final Repository repository = RepositoryManager.getRepository()) {
// Firstly we need to lookup this file on chain to get a list of its hashes
ArbitraryTransactionData transactionData = (ArbitraryTransactionData)repository.getTransactionRepository().fromSignature(signature);
if (transactionData instanceof ArbitraryTransactionData) {
byte[] hash = transactionData.getData();
byte[] chunkHashes = transactionData.getChunkHashes();
// Load file(s) and add any that exist to the list of hashes
DataFile dataFile = DataFile.fromHash(hash);
if (chunkHashes.length > 0) {
dataFile.addChunkHashes(chunkHashes);
for (DataFileChunk dataFileChunk : dataFile.getChunks()) {
if (dataFileChunk.exists()) {
hashes.add(dataFileChunk.getHash());
}
}
}
}
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while fetching arbitrary file list for peer %s", peer), e);
}
ArbitraryDataFileListMessage arbitraryDataFileListMessage = new ArbitraryDataFileListMessage(signature, hashes);
arbitraryDataFileListMessage.setId(message.getId());
if (!peer.sendMessage(arbitraryDataFileListMessage)) {
LOGGER.info("Couldn't send list of hashes");
peer.disconnect("failed to send list of hashes");
}
LOGGER.info("Sent list of hashes", hashes);
}
// Utilities
private void verifyAndAddAccount(Repository repository, OnlineAccountData onlineAccountData) throws DataException {
@ -2036,13 +2115,14 @@ public class Controller extends Thread {
}
}
public boolean fetchArbitraryDataFile(byte[] hash) throws InterruptedException {
public boolean fetchArbitraryDataFileList(byte[] signature) throws InterruptedException {
LOGGER.info(String.format("Sending data file list request for signature %s", Base58.encode(signature)));
// Build request
Message getDataFileMessage = new GetDataFileMessage(hash);
Message getArbitraryDataFileListMessage = new GetArbitraryDataFileListMessage(signature);
// Save our request into requests map
String hash58 = Base58.encode(hash);
Triple<String, Peer, Long> requestEntry = new Triple<>(hash58, null, NTP.getTime());
String signature58 = Base58.encode(signature);
Triple<String, Peer, Long> requestEntry = new Triple<>(signature58, null, NTP.getTime());
// Assign random ID to this message
int id;
@ -2051,11 +2131,11 @@ public class Controller extends Thread {
// Put queue into map (keyed by message ID) so we can poll for a response
// If putIfAbsent() doesn't return null, then this ID is already taken
} while (arbitraryDataRequests.put(id, requestEntry) != null);
getDataFileMessage.setId(id);
} while (arbitraryDataFileListRequests.put(id, requestEntry) != null);
getArbitraryDataFileListMessage.setId(id);
// Broadcast request
Network.getInstance().broadcast(peer -> getDataFileMessage);
Network.getInstance().broadcast(peer -> getArbitraryDataFileListMessage);
// Poll to see if data has arrived
final long singleWait = 100;
@ -2063,7 +2143,7 @@ public class Controller extends Thread {
while (totalWait < ARBITRARY_REQUEST_TIMEOUT) {
Thread.sleep(singleWait);
requestEntry = arbitraryDataRequests.get(id);
requestEntry = arbitraryDataFileListRequests.get(id);
if (requestEntry == null)
return false;
@ -2075,6 +2155,23 @@ public class Controller extends Thread {
return true;
}
private DataFile fetchArbitraryDataFile(Peer peer, byte[] hash) throws InterruptedException {
String hash58 = Base58.encode(hash);
LOGGER.info(String.format("Fetching data file %.8s from peer %s", hash58, peer));
arbitraryDataFileRequests.add(hash58);
Message getDataFileMessage = new GetDataFileMessage(hash);
Message message = peer.getResponse(getDataFileMessage);
arbitraryDataFileRequests.remove(hash58);
if (message == null || message.getType() != Message.MessageType.DATA_FILE) {
return null;
}
DataFileMessage dataFileMessage = (DataFileMessage) message;
return dataFileMessage.getDataFile();
}
/** Returns a list of peers that are not misbehaving, and have a recent block. */
public List<Peer> getRecentBehavingPeers() {
final Long minLatestBlockTimestamp = getMinimumLatestBlockTimestamp();

90
src/main/java/org/qortal/network/message/ArbitraryDataFileListMessage.java

@ -0,0 +1,90 @@
package org.qortal.network.message;
import com.google.common.primitives.Ints;
import org.qortal.data.block.BlockSummaryData;
import org.qortal.transform.Transformer;
import org.qortal.transform.block.BlockTransformer;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
public class ArbitraryDataFileListMessage extends Message {
private static final int SIGNATURE_LENGTH = Transformer.SIGNATURE_LENGTH;
private static final int HASH_LENGTH = Transformer.SHA256_LENGTH;
private final byte[] signature;
private final List<byte[]> hashes;
public ArbitraryDataFileListMessage(byte[] signature, List<byte[]> hashes) {
super(MessageType.ARBITRARY_DATA_FILE_LIST);
this.signature = signature;
this.hashes = hashes;
}
public ArbitraryDataFileListMessage(int id, byte[] signature, List<byte[]> hashes) {
super(id, MessageType.ARBITRARY_DATA_FILE_LIST);
this.signature = signature;
this.hashes = hashes;
}
public List<byte[]> getHashes() {
return this.hashes;
}
public byte[] getSignature() {
return this.signature;
}
public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException {
byte[] signature = new byte[SIGNATURE_LENGTH];
bytes.get(signature);
int count = bytes.getInt();
if (bytes.remaining() != count * HASH_LENGTH)
return null;
List<byte[]> hashes = new ArrayList<>();
for (int i = 0; i < count; ++i) {
byte[] hash = new byte[HASH_LENGTH];
bytes.get(hash);
hashes.add(hash);
}
return new ArbitraryDataFileListMessage(id, signature, hashes);
}
@Override
protected byte[] toData() {
try {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
bytes.write(this.signature);
bytes.write(Ints.toByteArray(this.hashes.size()));
for (byte[] hash : this.hashes) {
bytes.write(hash);
}
return bytes.toByteArray();
} catch (IOException e) {
return null;
}
}
public ArbitraryDataFileListMessage cloneWithNewId(int newId) {
ArbitraryDataFileListMessage clone = new ArbitraryDataFileListMessage(this.signature, this.hashes);
clone.setId(newId);
return clone;
}
}

54
src/main/java/org/qortal/network/message/GetArbitraryDataFileListMessage.java

@ -0,0 +1,54 @@
package org.qortal.network.message;
import org.qortal.transform.Transformer;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
public class GetArbitraryDataFileListMessage extends Message {
private static final int SIGNATURE_LENGTH = Transformer.SIGNATURE_LENGTH;
private final byte[] signature;
public GetArbitraryDataFileListMessage(byte[] signature) {
this(-1, signature);
}
private GetArbitraryDataFileListMessage(int id, byte[] signature) {
super(id, MessageType.GET_ARBITRARY_DATA_FILE_LIST);
this.signature = signature;
}
public byte[] getSignature() {
return this.signature;
}
public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException {
if (bytes.remaining() != SIGNATURE_LENGTH)
return null;
byte[] signature = new byte[SIGNATURE_LENGTH];
bytes.get(signature);
return new GetArbitraryDataFileListMessage(id, signature);
}
@Override
protected byte[] toData() {
try {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
bytes.write(this.signature);
return bytes.toByteArray();
} catch (IOException e) {
return null;
}
}
}

5
src/main/java/org/qortal/network/message/Message.java

@ -86,7 +86,10 @@ public abstract class Message {
GET_BLOCKS(101),
DATA_FILE(110),
GET_DATA_FILE(111);
GET_DATA_FILE(111),
ARBITRARY_DATA_FILE_LIST(120),
GET_ARBITRARY_DATA_FILE_LIST(121);
public final int value;
public final Method fromByteBufferMethod;

12
src/main/java/org/qortal/storage/DataFile.java

@ -372,8 +372,7 @@ public class DataFile {
return chunk.exists();
}
}
File file = new File(this.filePath);
return file.exists();
return false;
}
public boolean allChunksExist(byte[] chunks) {
@ -389,6 +388,15 @@ public class DataFile {
return true;
}
public boolean containsChunk(byte[] hash) {
for (DataFileChunk chunk : this.chunks) {
if (Arrays.equals(hash, chunk.getHash())) {
return true;
}
}
return false;
}
public long size() {
Path path = Paths.get(this.filePath);
try {

Loading…
Cancel
Save