diff --git a/src/main/java/org/qortal/controller/Controller.java b/src/main/java/org/qortal/controller/Controller.java index 3f3a0462..41370f5d 100644 --- a/src/main/java/org/qortal/controller/Controller.java +++ b/src/main/java/org/qortal/controller/Controller.java @@ -1391,7 +1391,7 @@ public class Controller extends Thread { break; case GET_ARBITRARY_DATA: - ArbitraryDataManager.getInstance().onNetworkGetArbitraryDataMessage(peer, message); + // Not currently supported break; case ARBITRARY_DATA_FILE_LIST: diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java index 1720d738..2046e2d9 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java @@ -32,7 +32,11 @@ public class ArbitraryDataManager extends Thread { private static final Logger LOGGER = LogManager.getLogger(ArbitraryDataManager.class); private static final List ARBITRARY_TX_TYPE = Arrays.asList(TransactionType.ARBITRARY); - private static final long ARBITRARY_REQUEST_TIMEOUT = 5 * 1000L; // ms + /** Request timeout when transferring arbitrary data */ + private static final long ARBITRARY_REQUEST_TIMEOUT = 6 * 1000L; // ms + + /** Maximum time to hold information about an in-progress relay */ + private static final long ARBITRARY_RELAY_TIMEOUT = 30 * 1000L; // ms private static ArbitraryDataManager instance; private final Object peerDataLock = new Object(); @@ -40,7 +44,7 @@ public class ArbitraryDataManager extends Thread { private volatile boolean isStopping = false; /** - * Map of recent requests for ARBITRARY transaction data file lists. + * Map of recent incoming requests for ARBITRARY transaction data file lists. *

* Key is original request's message ID
* Value is Triple<transaction signature in base58, first requesting peer, first request's timestamp> @@ -59,10 +63,16 @@ public class ArbitraryDataManager extends Thread { public Map> arbitraryDataFileListRequests = Collections.synchronizedMap(new HashMap<>()); /** - * Map to keep track of in progress arbitrary data file requests + * Map to keep track of our in progress (outgoing) arbitrary data file requests */ private Map arbitraryDataFileRequests = Collections.synchronizedMap(new HashMap<>()); + /** + * Map to keep track of hashes that we might need to relay, keyed by the hash of the file (base58 encoded). + * Value is comprised of the base58-encoded signature, the peer that is hosting it, and the timestamp that it was added + */ + private Map> arbitraryRelayMap = Collections.synchronizedMap(new HashMap<>()); + /** * Map to keep track of in progress arbitrary data signature requests * Key: string - the signature encoded in base58 @@ -527,27 +537,46 @@ public class ArbitraryDataManager extends Thread { // Fetch data files by hash - private ArbitraryDataFile fetchArbitraryDataFile(Peer peer, byte[] signature, byte[] hash) { - String hash58 = Base58.encode(hash); - LOGGER.info(String.format("Fetching data file %.8s from peer %s", hash58, peer)); - arbitraryDataFileRequests.put(hash58, NTP.getTime()); - Message getArbitraryDataFileMessage = new GetArbitraryDataFileMessage(signature, hash); - + private ArbitraryDataFileMessage fetchArbitraryDataFile(Peer peer, Peer requestingPeer, byte[] signature, byte[] hash, Message originalMessage) throws DataException { + ArbitraryDataFile existingFile = ArbitraryDataFile.fromHash(hash, signature); + boolean fileAlreadyExists = existingFile.exists(); Message message = null; - try { - message = peer.getResponse(getArbitraryDataFileMessage); - } catch (InterruptedException e) { - // Will return below due to null message + + // Fetch the file if it doesn't exist locally + if (!fileAlreadyExists) { + String hash58 = Base58.encode(hash); + LOGGER.info(String.format("Fetching data file %.8s from peer %s", hash58, peer)); + arbitraryDataFileRequests.put(hash58, NTP.getTime()); + Message getArbitraryDataFileMessage = new GetArbitraryDataFileMessage(signature, hash); + + try { + message = peer.getResponseWithTimeout(getArbitraryDataFileMessage, (int) ARBITRARY_REQUEST_TIMEOUT); + } catch (InterruptedException e) { + // Will return below due to null message + } + arbitraryDataFileRequests.remove(hash58); + LOGGER.trace(String.format("Removed hash %.8s from arbitraryDataFileRequests", hash58)); + + if (message == null || message.getType() != Message.MessageType.ARBITRARY_DATA_FILE) { + return null; + } } - arbitraryDataFileRequests.remove(hash58); - LOGGER.info(String.format("Removed hash %.8s from arbitraryDataFileRequests", hash58)); + ArbitraryDataFileMessage arbitraryDataFileMessage = (ArbitraryDataFileMessage) message; - if (message == null || message.getType() != Message.MessageType.ARBITRARY_DATA_FILE) { - return null; + // We might want to forward the request to the peer that originally requested it + this.handleArbitraryDataFileForwarding(requestingPeer, message, originalMessage); + + boolean isRelayRequest = (requestingPeer != null); + if (isRelayRequest) { + if (!fileAlreadyExists) { + // File didn't exist locally before the request, and it's a forwarding request, so delete it + LOGGER.info("Deleting file {} because it was needed for forwarding only", Base58.encode(hash)); + ArbitraryDataFile dataFile = arbitraryDataFileMessage.getArbitraryDataFile(); + dataFile.delete(); + } } - ArbitraryDataFileMessage arbitraryDataFileMessage = (ArbitraryDataFileMessage) message; - return arbitraryDataFileMessage.getArbitraryDataFile(); + return arbitraryDataFileMessage; } @@ -560,6 +589,9 @@ public class ArbitraryDataManager extends Thread { final long requestMinimumTimestamp = now - ARBITRARY_REQUEST_TIMEOUT; arbitraryDataFileListRequests.entrySet().removeIf(entry -> entry.getValue().getC() == null || entry.getValue().getC() < requestMinimumTimestamp); arbitraryDataFileRequests.entrySet().removeIf(entry -> entry.getValue() == null || entry.getValue() < requestMinimumTimestamp); + + final long relayMinimumTimestamp = now - ARBITRARY_RELAY_TIMEOUT; + arbitraryRelayMap.entrySet().removeIf(entry -> entry.getValue().getC() == null || entry.getValue().getC() < relayMinimumTimestamp); } public boolean isResourceCached(String resourceId) { @@ -685,9 +717,9 @@ public class ArbitraryDataManager extends Thread { if (!arbitraryDataFile.chunkExists(hash)) { // Only request the file if we aren't already requesting it from someone else if (!arbitraryDataFileRequests.containsKey(Base58.encode(hash))) { - ArbitraryDataFile receivedArbitraryDataFile = fetchArbitraryDataFile(peer, signature, hash); - if (receivedArbitraryDataFile != null) { - LOGGER.info("Received data file {} from peer {}", receivedArbitraryDataFile, peer); + ArbitraryDataFileMessage receivedArbitraryDataFileMessage = fetchArbitraryDataFile(peer, null, signature, hash, null); + if (receivedArbitraryDataFileMessage != null) { + LOGGER.info("Received data file {} from peer {}", receivedArbitraryDataFileMessage.getArbitraryDataFile().getHash58(), peer); receivedAtLeastOneFile = true; } else { @@ -732,47 +764,39 @@ public class ArbitraryDataManager extends Thread { return receivedAtLeastOneFile; } + public void handleArbitraryDataFileForwarding(Peer requestingPeer, Message message, Message originalMessage) { + // Return if there is no originally requesting peer to forward to + if (requestingPeer == null) { + return; + } - // Network handlers - - public void onNetworkGetArbitraryDataMessage(Peer peer, Message message) { - GetArbitraryDataMessage getArbitraryDataMessage = (GetArbitraryDataMessage) message; - byte[] signature = getArbitraryDataMessage.getSignature(); - - // Do we even have this transaction? - try (final Repository repository = RepositoryManager.getRepository()) { - TransactionData transactionData = repository.getTransactionRepository().fromSignature(signature); - if (transactionData == null || transactionData.getType() != TransactionType.ARBITRARY) - return; - - ArbitraryTransaction transaction = new ArbitraryTransaction(repository, transactionData); - - // If we have the data then send it - if (transaction.isDataLocal()) { - byte[] data = transaction.fetchData(); - if (data == null) - return; + // Return if we're not in relay mode or if this request doesn't need forwarding + if (!Settings.getInstance().isRelayModeEnabled()) { + return; + } - Message arbitraryDataMessage = new ArbitraryDataMessage(signature, data); - arbitraryDataMessage.setId(message.getId()); - if (!peer.sendMessage(arbitraryDataMessage)) - peer.disconnect("failed to send arbitrary data"); + LOGGER.info("Received arbitrary data file - forwarding is needed"); - return; - } + // The ID needs to match that of the original request + message.setId(originalMessage.getId()); - // Ask our other peers if they have it - Network.getInstance().broadcast(broadcastPeer -> broadcastPeer == peer ? null : message); - } catch (DataException e) { - LOGGER.error(String.format("Repository issue while finding arbitrary transaction data for peer %s", peer), e); + if (!requestingPeer.sendMessage(message)) { + LOGGER.info("Failed to forward arbitrary data file to peer {}", requestingPeer); + requestingPeer.disconnect("failed to forward arbitrary data file"); + } + else { + LOGGER.info("Forwarded arbitrary data file to peer {}", requestingPeer); } } + + // Network handlers + public void onNetworkArbitraryDataFileListMessage(Peer peer, Message message) { ArbitraryDataFileListMessage arbitraryDataFileListMessage = (ArbitraryDataFileListMessage) message; LOGGER.info("Received hash list from peer {} with {} hashes", peer, arbitraryDataFileListMessage.getHashes().size()); - // Do we have a pending request for this data? + // Do we have a pending request for this data? // TODO: might we want to relay all of them anyway? Triple request = arbitraryDataFileListRequests.get(message.getId()); if (request == null || request.getA() == null) { return; @@ -832,7 +856,17 @@ public class ArbitraryDataManager extends Thread { if (isRelayRequest && Settings.getInstance().isRelayModeEnabled()) { Peer requestingPeer = request.getB(); if (requestingPeer != null) { - // Forward to requesting peer; + // Add each hash to our local mapping so we know who to ask later + Long now = NTP.getTime(); + for (byte[] hash : hashes) { + String hash58 = Base58.encode(hash); + Triple value = new Triple<>(signature58, peer, now); + this.arbitraryRelayMap.put(hash58, value); + LOGGER.debug("Added {} to relay map: {}, {}, {}", hash58, signature58, peer, now); + } + + // Forward to requesting peer + LOGGER.info("Forwarding file list with {} hashes to requesting peer: {}", hashes.size(), requestingPeer); if (!requestingPeer.sendMessage(arbitraryDataFileListMessage)) { requestingPeer.disconnect("failed to forward arbitrary data file list"); } @@ -843,13 +877,20 @@ public class ArbitraryDataManager extends Thread { public void onNetworkGetArbitraryDataFileMessage(Peer peer, Message message) { GetArbitraryDataFileMessage getArbitraryDataFileMessage = (GetArbitraryDataFileMessage) message; byte[] hash = getArbitraryDataFileMessage.getHash(); + String hash58 = Base58.encode(hash); byte[] signature = getArbitraryDataFileMessage.getSignature(); Controller.getInstance().stats.getArbitraryDataFileMessageStats.requests.incrementAndGet(); + LOGGER.info("Received GetArbitraryDataFileMessage from peer {} for hash {}", peer, Base58.encode(hash)); + try { ArbitraryDataFile arbitraryDataFile = ArbitraryDataFile.fromHash(hash, signature); + Triple relayInfo = this.arbitraryRelayMap.get(hash58); if (arbitraryDataFile.exists()) { + LOGGER.info("Hash {} exists", hash58); + + // We can serve the file directly as we already have it ArbitraryDataFileMessage arbitraryDataFileMessage = new ArbitraryDataFileMessage(signature, arbitraryDataFile); arbitraryDataFileMessage.setId(message.getId()); if (!peer.sendMessage(arbitraryDataFileMessage)) { @@ -858,7 +899,25 @@ public class ArbitraryDataManager extends Thread { } LOGGER.info("Sent file {}", arbitraryDataFile); } + else if (relayInfo != null) { + LOGGER.info("We have relay info for hash {}", Base58.encode(hash)); + // We need to ask this peer for the file + Peer peerToAsk = relayInfo.getB(); + //Peer peerToAsk = Network.getInstance().getConnectedPeerWithAddress(peerAddress); + if (peerToAsk != null) { + // Forward the message to this peer + LOGGER.info("Asking peer {} for hash {}", peerToAsk, hash58); + + ArbitraryDataFileMessage arbitraryDataFileMessage = this.fetchArbitraryDataFile(peerToAsk, peer, signature, hash, message); + // Remove from the map regardless of outcome, as the relay attempt is now considered complete + arbitraryRelayMap.remove(hash58); + } + else { + LOGGER.info("Peer {} not found in relay info", peer); + } + } else { + LOGGER.info("Hash {} doesn't exist and we don't have relay info", hash58); // We don't have this file Controller.getInstance().stats.getArbitraryDataFileMessageStats.unknownFiles.getAndIncrement(); @@ -874,11 +933,13 @@ public class ArbitraryDataManager extends Thread { LOGGER.info("Couldn't sent file-unknown response"); peer.disconnect("failed to send file-unknown response"); } - LOGGER.info("Sent file-unknown response for file {}", arbitraryDataFile); + else { + LOGGER.info("Sent file-unknown response for file {}", arbitraryDataFile); + } } } catch (DataException e) { - LOGGER.info("Unable to handle request for arbitrary data file: {}", Base58.encode(hash)); + LOGGER.info("Unable to handle request for arbitrary data file: {}", hash58); } } @@ -962,7 +1023,10 @@ public class ArbitraryDataManager extends Thread { else { // Ask our other peers if they have it LOGGER.info("Rebroadcasted hash list request from peer {} for signature {} to our other peers", peer, Base58.encode(signature)); - Network.getInstance().broadcast(broadcastPeer -> broadcastPeer == peer ? null : message); + Network.getInstance().broadcast( + broadcastPeer -> broadcastPeer == peer || + Objects.equals(broadcastPeer.getPeerData().getAddress().getHost(), peer.getPeerData().getAddress().getHost()) + ? null : message); } }