From c0fedaa3a49bd99463638935ff9a3d4c08bb4f9d Mon Sep 17 00:00:00 2001 From: CalDescent Date: Fri, 19 Nov 2021 12:05:40 +0000 Subject: [PATCH] Attempt to request files directly from a peer if it isn't returned in the general network broadcast. --- .../qortal/arbitrary/ArbitraryDataFile.java | 13 + .../qortal/arbitrary/ArbitraryDataReader.java | 1 - .../arbitrary/ArbitraryDataManager.java | 233 ++++++++++++++---- src/main/java/org/qortal/network/Network.java | 90 +++++++ src/main/java/org/qortal/network/Peer.java | 33 +++ 5 files changed, 316 insertions(+), 54 deletions(-) diff --git a/src/main/java/org/qortal/arbitrary/ArbitraryDataFile.java b/src/main/java/org/qortal/arbitrary/ArbitraryDataFile.java index 8883da71..38f9d78e 100644 --- a/src/main/java/org/qortal/arbitrary/ArbitraryDataFile.java +++ b/src/main/java/org/qortal/arbitrary/ArbitraryDataFile.java @@ -221,6 +221,19 @@ public class ArbitraryDataFile { } } + public List getChunkHashes() { + List hashes = new ArrayList<>(); + if (this.chunks == null || this.chunks.isEmpty()) { + return hashes; + } + + for (ArbitraryDataFileChunk chunkData : this.chunks) { + hashes.add(chunkData.getHash()); + } + + return hashes; + } + public int split(int chunkSize) { try { diff --git a/src/main/java/org/qortal/arbitrary/ArbitraryDataReader.java b/src/main/java/org/qortal/arbitrary/ArbitraryDataReader.java index bf2cf116..9d450a76 100644 --- a/src/main/java/org/qortal/arbitrary/ArbitraryDataReader.java +++ b/src/main/java/org/qortal/arbitrary/ArbitraryDataReader.java @@ -1,7 +1,6 @@ package org.qortal.arbitrary; import org.apache.commons.io.FileUtils; -import org.apache.commons.lang3.ArrayUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java index e46593be..066edc87 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java @@ -1,5 +1,6 @@ package org.qortal.controller.arbitrary; +import java.security.SecureRandom; import java.util.*; import org.apache.logging.log4j.LogManager; @@ -226,7 +227,7 @@ public class ArbitraryDataManager extends Thread { // Ask our connected peers if they have files for this signature // This process automatically then fetches the files themselves if a peer is found - fetchDataForSignature(signature); // TODO: keep track + fetchDataForSignature(signature); } catch (DataException e) { LOGGER.error("Repository issue when fetching arbitrary transaction data", e); @@ -296,6 +297,46 @@ public class ArbitraryDataManager extends Thread { return false; } + private boolean shouldMakeDirectFileRequestsForSignature(String signature58) { + Triple request = arbitraryDataSignatureRequests.get(signature58); + + if (request == null) { + // Not attempted yet + return true; + } + + // Extract the components + //Integer networkBroadcastCount = request.getA(); + Integer directPeerRequestCount = request.getB(); + Long lastAttemptTimestamp = request.getC(); + + if (lastAttemptTimestamp == null) { + // Not attempted yet + return true; + } + + if (directPeerRequestCount == 0) { + // We haven't tried asking peers directly yet, so we should + return true; + } + + long timeSinceLastAttempt = NTP.getTime() - lastAttemptTimestamp; + if (timeSinceLastAttempt > 5 * 60 * 1000L) { + // We haven't tried for at least 5 minutes + if (directPeerRequestCount < 5) { + // We've made less than 5 total attempts + return true; + } + } + + if (timeSinceLastAttempt > 24 * 60 * 60 * 1000L) { + // We haven't tried for at least 24 hours + return true; + } + + return false; + } + private void addToSignatureRequests(String signature58, boolean incrementNetworkRequests, boolean incrementPeerRequests) { Triple request = arbitraryDataSignatureRequests.get(signature58); Long now = NTP.getTime(); @@ -334,12 +375,17 @@ public class ArbitraryDataManager extends Thread { // If we've already tried too many times in a short space of time, make sure to give up if (!this.shouldMakeFileListRequestForSignature(signature58)) { - LOGGER.trace("Skipping file list request for signature {}", signature58); + // Check if we should make direct connections to peers + if (this.shouldMakeDirectFileRequestsForSignature(signature58)) { + return this.fetchDataFilesFromPeersForSignature(signature); + } + + LOGGER.debug("Skipping file list request for signature {} due to rate limit", signature58); return false; } this.addToSignatureRequests(signature58, true, false); - LOGGER.info(String.format("Sending data file list request for signature %s", Base58.encode(signature))); + LOGGER.info(String.format("Sending data file list request for signature %s...", Base58.encode(signature))); // Build request Message getArbitraryDataFileListMessage = new GetArbitraryDataFileListMessage(signature); @@ -383,15 +429,51 @@ public class ArbitraryDataManager extends Thread { } + // Fetch data directly from peers + + private boolean fetchDataFilesFromPeersForSignature(byte[] signature) { + String signature58 = Base58.encode(signature); + this.addToSignatureRequests(signature58, false, true); + + // Firstly fetch peers that claim to be hosting files for this signature + try (final Repository repository = RepositoryManager.getRepository()) { + + List peers = repository.getArbitraryRepository().getArbitraryPeerDataForSignature(signature); + if (peers == null || peers.isEmpty()) { + LOGGER.info("No peers found for signature {}", signature58); + return false; + } + + LOGGER.info("Attempting a direct peer connection for signature {}...", signature58); + + // Peers found, so pick a random one and request data from it + int index = new SecureRandom().nextInt(peers.size()); + ArbitraryPeerData arbitraryPeerData = peers.get(index); + String peerAddressString = arbitraryPeerData.getPeerAddress(); + return Network.getInstance().requestDataFromPeer(peerAddressString, signature); + + } catch (DataException e) { + LOGGER.info("Unable to fetch peer list from repository"); + } + + return false; + } + + // Fetch data files by hash - private ArbitraryDataFile fetchArbitraryDataFile(Peer peer, byte[] hash) throws InterruptedException { + private ArbitraryDataFile fetchArbitraryDataFile(Peer peer, byte[] hash) { String hash58 = Base58.encode(hash); LOGGER.info(String.format("Fetching data file %.8s from peer %s", hash58, peer)); arbitraryDataFileRequests.put(hash58, NTP.getTime()); Message getArbitraryDataFileMessage = new GetArbitraryDataFileMessage(hash); - Message message = peer.getResponse(getArbitraryDataFileMessage); + Message message = null; + try { + message = peer.getResponse(getArbitraryDataFileMessage); + } catch (InterruptedException e) { + // Will return below due to null message + } arbitraryDataFileRequests.remove(hash58); LOGGER.info(String.format("Removed hash %.8s from arbitraryDataFileRequests", hash58)); @@ -488,6 +570,94 @@ public class ArbitraryDataManager extends Thread { } } + public boolean fetchAllArbitraryDataFiles(Repository repository, Peer peer, byte[] signature) { + try { + TransactionData transactionData = repository.getTransactionRepository().fromSignature(signature); + if (!(transactionData instanceof ArbitraryTransactionData)) + return false; + + ArbitraryTransactionData arbitraryTransactionData = (ArbitraryTransactionData) transactionData; + + // We use null to represent all hashes associated with this transaction + return this.fetchArbitraryDataFiles(repository, peer, signature, arbitraryTransactionData, null); + + } catch (DataException e) {} + + return false; + } + + public boolean fetchArbitraryDataFiles(Repository repository, + Peer peer, + byte[] signature, + ArbitraryTransactionData arbitraryTransactionData, + List hashes) throws DataException { + + // Load data file(s) + ArbitraryDataFile arbitraryDataFile = ArbitraryDataFile.fromHash(arbitraryTransactionData.getData()); + arbitraryDataFile.addChunkHashes(arbitraryTransactionData.getChunkHashes()); + + // If hashes are null, we will treat this to mean all data hashes associated with this file + if (hashes == null) { + if (arbitraryTransactionData.getChunkHashes() == null) { + // This transaction has no chunks, so use the main file hash + hashes = Arrays.asList(arbitraryDataFile.getHash()); + } + else { + // Add the chunk hashes + hashes = arbitraryDataFile.getChunkHashes(); + } + } + + boolean receivedAtLeastOneFile = false; + + // Now fetch actual data from this peer + for (byte[] hash : hashes) { + if (!arbitraryDataFile.chunkExists(hash)) { + // Only request the file if we aren't already requesting it from someone else + if (!arbitraryDataFileRequests.containsKey(Base58.encode(hash))) { + ArbitraryDataFile receivedArbitraryDataFile = fetchArbitraryDataFile(peer, hash); + if (receivedArbitraryDataFile != null) { + LOGGER.info("Received data file {} from peer {}", receivedArbitraryDataFile, peer); + receivedAtLeastOneFile = true; + } + else { + LOGGER.info("Peer {} didn't respond with data file {}", peer, hash); + } + } + else { + LOGGER.info("Already requesting data file {}", arbitraryDataFile); + } + } + } + + if (receivedAtLeastOneFile) { + // Update our lookup table to indicate that this peer holds data for this signature + String peerAddress = peer.getPeerData().getAddress().toString(); + LOGGER.info("Adding arbitrary peer: {} for signature {}", peerAddress, Base58.encode(signature)); + ArbitraryPeerData arbitraryPeerData = new ArbitraryPeerData(signature, peer); + repository.getArbitraryRepository().save(arbitraryPeerData); + repository.saveChanges(); + } + + // Check if we have all the chunks for this transaction + if (arbitraryDataFile.exists() || arbitraryDataFile.allChunksExist(arbitraryTransactionData.getChunkHashes())) { + + // We have all the chunks for this transaction, so we should invalidate the transaction's name's + // data cache so that it is rebuilt the next time we serve it + invalidateCache(arbitraryTransactionData); + + // We may also need to broadcast to the network that we are now hosting files for this transaction, + // but only if these files are in accordance with our storage policy + if (ArbitraryDataStorageManager.getInstance().canStoreDataForName(arbitraryTransactionData.getName())) { + // Use a null peer address to indicate our own + Message newArbitrarySignatureMessage = new ArbitrarySignaturesMessage(null, Arrays.asList(signature)); + Network.getInstance().broadcast(broadcastPeer -> newArbitrarySignatureMessage); + } + } + + return receivedAtLeastOneFile; + } + // Network handlers @@ -537,8 +707,8 @@ public class ArbitraryDataManager extends Thread { } public void onNetworkArbitraryDataFileListMessage(Peer peer, Message message) { - LOGGER.info("Received hash list from peer {}", peer); ArbitraryDataFileListMessage arbitraryDataFileListMessage = (ArbitraryDataFileListMessage) message; + LOGGER.info("Received hash list from peer {} with {} hashes", peer, arbitraryDataFileListMessage.getHashes().size()); // Do we have a pending request for this data? Triple request = arbitraryDataFileListRequests.get(message.getId()); @@ -586,54 +756,11 @@ public class ArbitraryDataManager extends Thread { Triple newEntry = new Triple<>(null, null, request.getC()); arbitraryDataFileListRequests.put(message.getId(), newEntry); - boolean receivedAtLeastOneFile = false; + // Go and fetch the actual data + this.fetchArbitraryDataFiles(repository, peer, signature, arbitraryTransactionData, hashes); + // FUTURE: handle response - // Now fetch actual data from this peer - for (byte[] hash : hashes) { - if (!arbitraryDataFile.chunkExists(hash)) { - // Only request the file if we aren't already requesting it from someone else - if (!arbitraryDataFileRequests.containsKey(Base58.encode(hash))) { - ArbitraryDataFile receivedArbitraryDataFile = fetchArbitraryDataFile(peer, hash); - if (receivedArbitraryDataFile != null) { - LOGGER.info("Received data file {} from peer {}", receivedArbitraryDataFile, peer); - receivedAtLeastOneFile = true; - } - else { - LOGGER.info("Peer {} didn't respond with data file {}", peer, hash); - } - } - else { - LOGGER.info("Already requesting data file {}", arbitraryDataFile); - } - } - } - - if (receivedAtLeastOneFile) { - // Update our lookup table to indicate that this peer holds data for this signature - String peerAddress = peer.getPeerData().getAddress().toString(); - LOGGER.info("Adding arbitrary peer: {} for signature {}", peerAddress, Base58.encode(signature)); - ArbitraryPeerData arbitraryPeerData = new ArbitraryPeerData(signature, peer); - repository.getArbitraryRepository().save(arbitraryPeerData); - repository.saveChanges(); - } - - // Check if we have all the chunks for this transaction - if (arbitraryDataFile.exists() || arbitraryDataFile.allChunksExist(arbitraryTransactionData.getChunkHashes())) { - - // We have all the chunks for this transaction, so we should invalidate the transaction's name's - // data cache so that it is rebuilt the next time we serve it - invalidateCache(arbitraryTransactionData); - - // We may also need to broadcast to the network that we are now hosting files for this transaction, - // but only if these files are in accordance with our storage policy - if (ArbitraryDataStorageManager.getInstance().canStoreDataForName(arbitraryTransactionData.getName())) { - // Use a null peer address to indicate our own - Message newArbitrarySignatureMessage = new ArbitrarySignaturesMessage(null, Arrays.asList(signature)); - Network.getInstance().broadcast(broadcastPeer -> newArbitrarySignatureMessage); - } - } - - } catch (DataException | InterruptedException e) { + } catch (DataException e) { LOGGER.error(String.format("Repository issue while finding arbitrary transaction data list for peer %s", peer), e); } diff --git a/src/main/java/org/qortal/network/Network.java b/src/main/java/org/qortal/network/Network.java index b94659ed..49098daa 100644 --- a/src/main/java/org/qortal/network/Network.java +++ b/src/main/java/org/qortal/network/Network.java @@ -6,6 +6,7 @@ import org.bouncycastle.crypto.params.Ed25519PrivateKeyParameters; import org.bouncycastle.crypto.params.Ed25519PublicKeyParameters; import org.qortal.block.BlockChain; import org.qortal.controller.Controller; +import org.qortal.controller.arbitrary.ArbitraryDataManager; import org.qortal.crypto.Crypto; import org.qortal.data.block.BlockData; import org.qortal.data.network.PeerData; @@ -15,6 +16,7 @@ import org.qortal.repository.DataException; import org.qortal.repository.Repository; import org.qortal.repository.RepositoryManager; import org.qortal.settings.Settings; +import org.qortal.utils.Base58; import org.qortal.utils.ExecuteProduceConsume; import org.qortal.utils.ExecuteProduceConsume.StatsSnapshot; import org.qortal.utils.NTP; @@ -232,6 +234,83 @@ public class Network { } } + public boolean requestDataFromPeer(String peerAddressString, byte[] signature) { + if (peerAddressString != null) { + PeerAddress peerAddress = PeerAddress.fromString(peerAddressString); + + // Reuse an existing PeerData instance if it's already in the known peers list + PeerData peerData = this.allKnownPeers.stream() + .filter(knownPeerData -> knownPeerData.getAddress().equals(peerAddress)) + .findFirst() + .orElse(null); + + if (peerData == null) { + // Not a known peer, so we need to create one + Long addedWhen = NTP.getTime(); + String addedBy = "requestDataFromPeer"; + peerData = new PeerData(peerAddress, addedWhen, addedBy); + } + + if (peerData == null) { + LOGGER.info("PeerData is null when trying to request data from peer {}", peerAddressString); + return false; + } + + // Check if we're already connected to and handshaked with this peer + Peer connectedPeer = this.connectedPeers.stream() + .filter(p -> p.getPeerData().getAddress().equals(peerAddress)) + .findFirst() + .orElse(null); + boolean isConnected = (connectedPeer != null); + + boolean isHandshaked = this.getHandshakedPeers().stream() + .anyMatch(p -> p.getPeerData().getAddress().equals(peerAddress)); + + if (isConnected && isHandshaked) { + // Already connected + return this.requestDataFromConnectedPeer(connectedPeer, signature); + } + else { + // We need to connect to this peer before we can request data + try { + if (!isConnected) { + // Add this signature to the list of pending requests for this peer + LOGGER.info("Making connection to peer {} to request files for signature {}...", peerAddressString, Base58.encode(signature)); + Peer peer = new Peer(peerData); + peer.addPendingSignatureRequest(signature); + this.connectPeer(peer); + // If connection is successful, data will automatically be requested + // TODO: maybe we could block here (with a timeout) and return once we know the result of the file request + return true; + } + else if (!isHandshaked) { + LOGGER.info("Peer {} is connected but not handshaked. Not attempting a new connection.", peerAddress); + return false; + } + + } catch (InterruptedException e) { + LOGGER.info("Interrupted when connecting to peer {}", peerAddress); + return false; + } + } + } + return false; + } + + private boolean requestDataFromConnectedPeer(Peer connectedPeer, byte[] signature) { + if (signature == null) { + // Nothing to do + return false; + } + + try (final Repository repository = RepositoryManager.getRepository()) { + return ArbitraryDataManager.getInstance().fetchAllArbitraryDataFiles(repository, connectedPeer, signature); + } catch (DataException e) { + LOGGER.info("Unable to fetch arbitrary data files"); + } + return false; + } + /** * Returns list of connected peers that have completed handshaking. */ @@ -911,6 +990,17 @@ public class Network { } } + // Process any pending signature requests, as this peer may have been connected for this purpose only + List pendingSignatureRequests = new ArrayList<>(peer.getPendingSignatureRequests()); + if (pendingSignatureRequests != null && !pendingSignatureRequests.isEmpty()) { + for (byte[] signature : pendingSignatureRequests) { + this.requestDataFromConnectedPeer(peer, signature); + peer.removePendingSignatureRequest(signature); + } + } + + // FUTURE: we may want to disconnect from this peer if we've finished requesting data from it + // Start regular pings peer.startPings(); diff --git a/src/main/java/org/qortal/network/Peer.java b/src/main/java/org/qortal/network/Peer.java index 8763c114..3b50b777 100644 --- a/src/main/java/org/qortal/network/Peer.java +++ b/src/main/java/org/qortal/network/Peer.java @@ -104,6 +104,11 @@ public class Peer { private boolean syncInProgress = false; + + /* Pending signature requests */ + private List pendingSignatureRequests = Collections.synchronizedList(new ArrayList<>()); + + // Versioning public static final Pattern VERSION_PATTERN = Pattern.compile(Controller.VERSION_PREFIX + "(\\d{1,3})\\.(\\d{1,5})\\.(\\d{1,5})"); @@ -355,6 +360,34 @@ public class Peer { this.syncInProgress = syncInProgress; } + + // Pending signature requests + + public void addPendingSignatureRequest(byte[] signature) { + // Check if we already have this signature in the list + for (byte[] existingSignature : this.pendingSignatureRequests) { + if (Arrays.equals(existingSignature, signature )) { + return; + } + } + this.pendingSignatureRequests.add(signature); + } + + public void removePendingSignatureRequest(byte[] signature) { + Iterator iterator = this.pendingSignatureRequests.iterator(); + while (iterator.hasNext()) { + byte[] existingSignature = (byte[]) iterator.next(); + if (Arrays.equals(existingSignature, signature)) { + iterator.remove(); + } + } + } + + public List getPendingSignatureRequests() { + return this.pendingSignatureRequests; + } + + @Override public String toString() { // Easier, and nicer output, than peer.getRemoteSocketAddress()