Attempt to request files directly from a peer if it isn't returned in the general network broadcast.

This commit is contained in:
CalDescent 2021-11-19 12:05:40 +00:00
parent e74dcff010
commit c0fedaa3a4
5 changed files with 316 additions and 54 deletions

View File

@ -221,6 +221,19 @@ public class ArbitraryDataFile {
}
}
public List<byte[]> getChunkHashes() {
List<byte[]> hashes = new ArrayList<>();
if (this.chunks == null || this.chunks.isEmpty()) {
return hashes;
}
for (ArbitraryDataFileChunk chunkData : this.chunks) {
hashes.add(chunkData.getHash());
}
return hashes;
}
public int split(int chunkSize) {
try {

View File

@ -1,7 +1,6 @@
package org.qortal.arbitrary;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

View File

@ -1,5 +1,6 @@
package org.qortal.controller.arbitrary;
import java.security.SecureRandom;
import java.util.*;
import org.apache.logging.log4j.LogManager;
@ -226,7 +227,7 @@ public class ArbitraryDataManager extends Thread {
// Ask our connected peers if they have files for this signature
// This process automatically then fetches the files themselves if a peer is found
fetchDataForSignature(signature); // TODO: keep track
fetchDataForSignature(signature);
} catch (DataException e) {
LOGGER.error("Repository issue when fetching arbitrary transaction data", e);
@ -296,6 +297,46 @@ public class ArbitraryDataManager extends Thread {
return false;
}
private boolean shouldMakeDirectFileRequestsForSignature(String signature58) {
Triple<Integer, Integer, Long> request = arbitraryDataSignatureRequests.get(signature58);
if (request == null) {
// Not attempted yet
return true;
}
// Extract the components
//Integer networkBroadcastCount = request.getA();
Integer directPeerRequestCount = request.getB();
Long lastAttemptTimestamp = request.getC();
if (lastAttemptTimestamp == null) {
// Not attempted yet
return true;
}
if (directPeerRequestCount == 0) {
// We haven't tried asking peers directly yet, so we should
return true;
}
long timeSinceLastAttempt = NTP.getTime() - lastAttemptTimestamp;
if (timeSinceLastAttempt > 5 * 60 * 1000L) {
// We haven't tried for at least 5 minutes
if (directPeerRequestCount < 5) {
// We've made less than 5 total attempts
return true;
}
}
if (timeSinceLastAttempt > 24 * 60 * 60 * 1000L) {
// We haven't tried for at least 24 hours
return true;
}
return false;
}
private void addToSignatureRequests(String signature58, boolean incrementNetworkRequests, boolean incrementPeerRequests) {
Triple<Integer, Integer, Long> request = arbitraryDataSignatureRequests.get(signature58);
Long now = NTP.getTime();
@ -334,12 +375,17 @@ public class ArbitraryDataManager extends Thread {
// If we've already tried too many times in a short space of time, make sure to give up
if (!this.shouldMakeFileListRequestForSignature(signature58)) {
LOGGER.trace("Skipping file list request for signature {}", signature58);
// Check if we should make direct connections to peers
if (this.shouldMakeDirectFileRequestsForSignature(signature58)) {
return this.fetchDataFilesFromPeersForSignature(signature);
}
LOGGER.debug("Skipping file list request for signature {} due to rate limit", signature58);
return false;
}
this.addToSignatureRequests(signature58, true, false);
LOGGER.info(String.format("Sending data file list request for signature %s", Base58.encode(signature)));
LOGGER.info(String.format("Sending data file list request for signature %s...", Base58.encode(signature)));
// Build request
Message getArbitraryDataFileListMessage = new GetArbitraryDataFileListMessage(signature);
@ -383,15 +429,51 @@ public class ArbitraryDataManager extends Thread {
}
// Fetch data directly from peers
private boolean fetchDataFilesFromPeersForSignature(byte[] signature) {
String signature58 = Base58.encode(signature);
this.addToSignatureRequests(signature58, false, true);
// Firstly fetch peers that claim to be hosting files for this signature
try (final Repository repository = RepositoryManager.getRepository()) {
List<ArbitraryPeerData> peers = repository.getArbitraryRepository().getArbitraryPeerDataForSignature(signature);
if (peers == null || peers.isEmpty()) {
LOGGER.info("No peers found for signature {}", signature58);
return false;
}
LOGGER.info("Attempting a direct peer connection for signature {}...", signature58);
// Peers found, so pick a random one and request data from it
int index = new SecureRandom().nextInt(peers.size());
ArbitraryPeerData arbitraryPeerData = peers.get(index);
String peerAddressString = arbitraryPeerData.getPeerAddress();
return Network.getInstance().requestDataFromPeer(peerAddressString, signature);
} catch (DataException e) {
LOGGER.info("Unable to fetch peer list from repository");
}
return false;
}
// Fetch data files by hash
private ArbitraryDataFile fetchArbitraryDataFile(Peer peer, byte[] hash) throws InterruptedException {
private ArbitraryDataFile fetchArbitraryDataFile(Peer peer, byte[] hash) {
String hash58 = Base58.encode(hash);
LOGGER.info(String.format("Fetching data file %.8s from peer %s", hash58, peer));
arbitraryDataFileRequests.put(hash58, NTP.getTime());
Message getArbitraryDataFileMessage = new GetArbitraryDataFileMessage(hash);
Message message = peer.getResponse(getArbitraryDataFileMessage);
Message message = null;
try {
message = peer.getResponse(getArbitraryDataFileMessage);
} catch (InterruptedException e) {
// Will return below due to null message
}
arbitraryDataFileRequests.remove(hash58);
LOGGER.info(String.format("Removed hash %.8s from arbitraryDataFileRequests", hash58));
@ -488,6 +570,94 @@ public class ArbitraryDataManager extends Thread {
}
}
public boolean fetchAllArbitraryDataFiles(Repository repository, Peer peer, byte[] signature) {
try {
TransactionData transactionData = repository.getTransactionRepository().fromSignature(signature);
if (!(transactionData instanceof ArbitraryTransactionData))
return false;
ArbitraryTransactionData arbitraryTransactionData = (ArbitraryTransactionData) transactionData;
// We use null to represent all hashes associated with this transaction
return this.fetchArbitraryDataFiles(repository, peer, signature, arbitraryTransactionData, null);
} catch (DataException e) {}
return false;
}
public boolean fetchArbitraryDataFiles(Repository repository,
Peer peer,
byte[] signature,
ArbitraryTransactionData arbitraryTransactionData,
List<byte[]> hashes) throws DataException {
// Load data file(s)
ArbitraryDataFile arbitraryDataFile = ArbitraryDataFile.fromHash(arbitraryTransactionData.getData());
arbitraryDataFile.addChunkHashes(arbitraryTransactionData.getChunkHashes());
// If hashes are null, we will treat this to mean all data hashes associated with this file
if (hashes == null) {
if (arbitraryTransactionData.getChunkHashes() == null) {
// This transaction has no chunks, so use the main file hash
hashes = Arrays.asList(arbitraryDataFile.getHash());
}
else {
// Add the chunk hashes
hashes = arbitraryDataFile.getChunkHashes();
}
}
boolean receivedAtLeastOneFile = false;
// Now fetch actual data from this peer
for (byte[] hash : hashes) {
if (!arbitraryDataFile.chunkExists(hash)) {
// Only request the file if we aren't already requesting it from someone else
if (!arbitraryDataFileRequests.containsKey(Base58.encode(hash))) {
ArbitraryDataFile receivedArbitraryDataFile = fetchArbitraryDataFile(peer, hash);
if (receivedArbitraryDataFile != null) {
LOGGER.info("Received data file {} from peer {}", receivedArbitraryDataFile, peer);
receivedAtLeastOneFile = true;
}
else {
LOGGER.info("Peer {} didn't respond with data file {}", peer, hash);
}
}
else {
LOGGER.info("Already requesting data file {}", arbitraryDataFile);
}
}
}
if (receivedAtLeastOneFile) {
// Update our lookup table to indicate that this peer holds data for this signature
String peerAddress = peer.getPeerData().getAddress().toString();
LOGGER.info("Adding arbitrary peer: {} for signature {}", peerAddress, Base58.encode(signature));
ArbitraryPeerData arbitraryPeerData = new ArbitraryPeerData(signature, peer);
repository.getArbitraryRepository().save(arbitraryPeerData);
repository.saveChanges();
}
// Check if we have all the chunks for this transaction
if (arbitraryDataFile.exists() || arbitraryDataFile.allChunksExist(arbitraryTransactionData.getChunkHashes())) {
// We have all the chunks for this transaction, so we should invalidate the transaction's name's
// data cache so that it is rebuilt the next time we serve it
invalidateCache(arbitraryTransactionData);
// We may also need to broadcast to the network that we are now hosting files for this transaction,
// but only if these files are in accordance with our storage policy
if (ArbitraryDataStorageManager.getInstance().canStoreDataForName(arbitraryTransactionData.getName())) {
// Use a null peer address to indicate our own
Message newArbitrarySignatureMessage = new ArbitrarySignaturesMessage(null, Arrays.asList(signature));
Network.getInstance().broadcast(broadcastPeer -> newArbitrarySignatureMessage);
}
}
return receivedAtLeastOneFile;
}
// Network handlers
@ -537,8 +707,8 @@ public class ArbitraryDataManager extends Thread {
}
public void onNetworkArbitraryDataFileListMessage(Peer peer, Message message) {
LOGGER.info("Received hash list from peer {}", peer);
ArbitraryDataFileListMessage arbitraryDataFileListMessage = (ArbitraryDataFileListMessage) message;
LOGGER.info("Received hash list from peer {} with {} hashes", peer, arbitraryDataFileListMessage.getHashes().size());
// Do we have a pending request for this data?
Triple<String, Peer, Long> request = arbitraryDataFileListRequests.get(message.getId());
@ -586,54 +756,11 @@ public class ArbitraryDataManager extends Thread {
Triple<String, Peer, Long> newEntry = new Triple<>(null, null, request.getC());
arbitraryDataFileListRequests.put(message.getId(), newEntry);
boolean receivedAtLeastOneFile = false;
// Go and fetch the actual data
this.fetchArbitraryDataFiles(repository, peer, signature, arbitraryTransactionData, hashes);
// FUTURE: handle response
// Now fetch actual data from this peer
for (byte[] hash : hashes) {
if (!arbitraryDataFile.chunkExists(hash)) {
// Only request the file if we aren't already requesting it from someone else
if (!arbitraryDataFileRequests.containsKey(Base58.encode(hash))) {
ArbitraryDataFile receivedArbitraryDataFile = fetchArbitraryDataFile(peer, hash);
if (receivedArbitraryDataFile != null) {
LOGGER.info("Received data file {} from peer {}", receivedArbitraryDataFile, peer);
receivedAtLeastOneFile = true;
}
else {
LOGGER.info("Peer {} didn't respond with data file {}", peer, hash);
}
}
else {
LOGGER.info("Already requesting data file {}", arbitraryDataFile);
}
}
}
if (receivedAtLeastOneFile) {
// Update our lookup table to indicate that this peer holds data for this signature
String peerAddress = peer.getPeerData().getAddress().toString();
LOGGER.info("Adding arbitrary peer: {} for signature {}", peerAddress, Base58.encode(signature));
ArbitraryPeerData arbitraryPeerData = new ArbitraryPeerData(signature, peer);
repository.getArbitraryRepository().save(arbitraryPeerData);
repository.saveChanges();
}
// Check if we have all the chunks for this transaction
if (arbitraryDataFile.exists() || arbitraryDataFile.allChunksExist(arbitraryTransactionData.getChunkHashes())) {
// We have all the chunks for this transaction, so we should invalidate the transaction's name's
// data cache so that it is rebuilt the next time we serve it
invalidateCache(arbitraryTransactionData);
// We may also need to broadcast to the network that we are now hosting files for this transaction,
// but only if these files are in accordance with our storage policy
if (ArbitraryDataStorageManager.getInstance().canStoreDataForName(arbitraryTransactionData.getName())) {
// Use a null peer address to indicate our own
Message newArbitrarySignatureMessage = new ArbitrarySignaturesMessage(null, Arrays.asList(signature));
Network.getInstance().broadcast(broadcastPeer -> newArbitrarySignatureMessage);
}
}
} catch (DataException | InterruptedException e) {
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while finding arbitrary transaction data list for peer %s", peer), e);
}

View File

@ -6,6 +6,7 @@ import org.bouncycastle.crypto.params.Ed25519PrivateKeyParameters;
import org.bouncycastle.crypto.params.Ed25519PublicKeyParameters;
import org.qortal.block.BlockChain;
import org.qortal.controller.Controller;
import org.qortal.controller.arbitrary.ArbitraryDataManager;
import org.qortal.crypto.Crypto;
import org.qortal.data.block.BlockData;
import org.qortal.data.network.PeerData;
@ -15,6 +16,7 @@ import org.qortal.repository.DataException;
import org.qortal.repository.Repository;
import org.qortal.repository.RepositoryManager;
import org.qortal.settings.Settings;
import org.qortal.utils.Base58;
import org.qortal.utils.ExecuteProduceConsume;
import org.qortal.utils.ExecuteProduceConsume.StatsSnapshot;
import org.qortal.utils.NTP;
@ -232,6 +234,83 @@ public class Network {
}
}
public boolean requestDataFromPeer(String peerAddressString, byte[] signature) {
if (peerAddressString != null) {
PeerAddress peerAddress = PeerAddress.fromString(peerAddressString);
// Reuse an existing PeerData instance if it's already in the known peers list
PeerData peerData = this.allKnownPeers.stream()
.filter(knownPeerData -> knownPeerData.getAddress().equals(peerAddress))
.findFirst()
.orElse(null);
if (peerData == null) {
// Not a known peer, so we need to create one
Long addedWhen = NTP.getTime();
String addedBy = "requestDataFromPeer";
peerData = new PeerData(peerAddress, addedWhen, addedBy);
}
if (peerData == null) {
LOGGER.info("PeerData is null when trying to request data from peer {}", peerAddressString);
return false;
}
// Check if we're already connected to and handshaked with this peer
Peer connectedPeer = this.connectedPeers.stream()
.filter(p -> p.getPeerData().getAddress().equals(peerAddress))
.findFirst()
.orElse(null);
boolean isConnected = (connectedPeer != null);
boolean isHandshaked = this.getHandshakedPeers().stream()
.anyMatch(p -> p.getPeerData().getAddress().equals(peerAddress));
if (isConnected && isHandshaked) {
// Already connected
return this.requestDataFromConnectedPeer(connectedPeer, signature);
}
else {
// We need to connect to this peer before we can request data
try {
if (!isConnected) {
// Add this signature to the list of pending requests for this peer
LOGGER.info("Making connection to peer {} to request files for signature {}...", peerAddressString, Base58.encode(signature));
Peer peer = new Peer(peerData);
peer.addPendingSignatureRequest(signature);
this.connectPeer(peer);
// If connection is successful, data will automatically be requested
// TODO: maybe we could block here (with a timeout) and return once we know the result of the file request
return true;
}
else if (!isHandshaked) {
LOGGER.info("Peer {} is connected but not handshaked. Not attempting a new connection.", peerAddress);
return false;
}
} catch (InterruptedException e) {
LOGGER.info("Interrupted when connecting to peer {}", peerAddress);
return false;
}
}
}
return false;
}
private boolean requestDataFromConnectedPeer(Peer connectedPeer, byte[] signature) {
if (signature == null) {
// Nothing to do
return false;
}
try (final Repository repository = RepositoryManager.getRepository()) {
return ArbitraryDataManager.getInstance().fetchAllArbitraryDataFiles(repository, connectedPeer, signature);
} catch (DataException e) {
LOGGER.info("Unable to fetch arbitrary data files");
}
return false;
}
/**
* Returns list of connected peers that have completed handshaking.
*/
@ -911,6 +990,17 @@ public class Network {
}
}
// Process any pending signature requests, as this peer may have been connected for this purpose only
List<byte[]> pendingSignatureRequests = new ArrayList<>(peer.getPendingSignatureRequests());
if (pendingSignatureRequests != null && !pendingSignatureRequests.isEmpty()) {
for (byte[] signature : pendingSignatureRequests) {
this.requestDataFromConnectedPeer(peer, signature);
peer.removePendingSignatureRequest(signature);
}
}
// FUTURE: we may want to disconnect from this peer if we've finished requesting data from it
// Start regular pings
peer.startPings();

View File

@ -104,6 +104,11 @@ public class Peer {
private boolean syncInProgress = false;
/* Pending signature requests */
private List<byte[]> pendingSignatureRequests = Collections.synchronizedList(new ArrayList<>());
// Versioning
public static final Pattern VERSION_PATTERN = Pattern.compile(Controller.VERSION_PREFIX
+ "(\\d{1,3})\\.(\\d{1,5})\\.(\\d{1,5})");
@ -355,6 +360,34 @@ public class Peer {
this.syncInProgress = syncInProgress;
}
// Pending signature requests
public void addPendingSignatureRequest(byte[] signature) {
// Check if we already have this signature in the list
for (byte[] existingSignature : this.pendingSignatureRequests) {
if (Arrays.equals(existingSignature, signature )) {
return;
}
}
this.pendingSignatureRequests.add(signature);
}
public void removePendingSignatureRequest(byte[] signature) {
Iterator iterator = this.pendingSignatureRequests.iterator();
while (iterator.hasNext()) {
byte[] existingSignature = (byte[]) iterator.next();
if (Arrays.equals(existingSignature, signature)) {
iterator.remove();
}
}
}
public List<byte[]> getPendingSignatureRequests() {
return this.pendingSignatureRequests;
}
@Override
public String toString() {
// Easier, and nicer output, than peer.getRemoteSocketAddress()