forked from Qortal/qortal
Added initial protocol methods for metadata requests and forwarding. Not tested yet.
This commit is contained in:
parent
a79ed02ccf
commit
dedf65bd4b
@ -12,6 +12,7 @@ import io.swagger.v3.oas.annotations.security.SecurityRequirement;
|
||||
import io.swagger.v3.oas.annotations.tags.Tag;
|
||||
|
||||
import java.io.*;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.ArrayList;
|
||||
@ -33,10 +34,12 @@ import org.qortal.api.resource.TransactionsResource.ConfirmationStatus;
|
||||
import org.qortal.arbitrary.*;
|
||||
import org.qortal.arbitrary.ArbitraryDataFile.ResourceIdType;
|
||||
import org.qortal.arbitrary.exception.MissingDataException;
|
||||
import org.qortal.arbitrary.metadata.ArbitraryDataTransactionMetadata;
|
||||
import org.qortal.arbitrary.misc.Category;
|
||||
import org.qortal.arbitrary.misc.Service;
|
||||
import org.qortal.controller.Controller;
|
||||
import org.qortal.controller.arbitrary.ArbitraryDataStorageManager;
|
||||
import org.qortal.controller.arbitrary.ArbitraryMetadataManager;
|
||||
import org.qortal.data.account.AccountData;
|
||||
import org.qortal.data.arbitrary.ArbitraryCategoryInfo;
|
||||
import org.qortal.data.arbitrary.ArbitraryResourceInfo;
|
||||
@ -634,6 +637,42 @@ public class ArbitraryResource {
|
||||
}
|
||||
|
||||
|
||||
// Metadata
|
||||
|
||||
@GET
|
||||
@Path("/metadata/{service}/{name}/{identifier}")
|
||||
@Operation(
|
||||
summary = "Fetch raw metadata from resource with supplied service, name, identifier, and relative path",
|
||||
responses = {
|
||||
@ApiResponse(
|
||||
description = "Path to file structure containing requested data",
|
||||
content = @Content(
|
||||
mediaType = MediaType.APPLICATION_JSON,
|
||||
schema = @Schema(
|
||||
implementation = ArbitraryDataTransactionMetadata.class
|
||||
)
|
||||
)
|
||||
)
|
||||
}
|
||||
)
|
||||
@SecurityRequirement(name = "apiKey")
|
||||
public String getMetadata(@HeaderParam(Security.API_KEY_HEADER) String apiKey,
|
||||
@PathParam("service") Service service,
|
||||
@PathParam("name") String name,
|
||||
@PathParam("identifier") String identifier) {
|
||||
Security.checkApiCallAllowed(request);
|
||||
|
||||
ArbitraryDataResource resource = new ArbitraryDataResource(name, ResourceIdType.NAME, service, identifier);
|
||||
|
||||
byte[] metadata = ArbitraryMetadataManager.getInstance().fetchMetadata(resource);
|
||||
if (metadata != null) {
|
||||
return new String(metadata, StandardCharsets.UTF_8);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
|
||||
// Upload data at supplied path
|
||||
|
||||
|
@ -221,6 +221,15 @@ public class Controller extends Thread {
|
||||
}
|
||||
public GetArbitraryDataFileListMessageStats getArbitraryDataFileListMessageStats = new GetArbitraryDataFileListMessageStats();
|
||||
|
||||
public static class GetArbitraryMetadataMessageStats {
|
||||
public AtomicLong requests = new AtomicLong();
|
||||
public AtomicLong unknownFiles = new AtomicLong();
|
||||
|
||||
public GetArbitraryMetadataMessageStats() {
|
||||
}
|
||||
}
|
||||
public GetArbitraryMetadataMessageStats getArbitraryMetadataMessageStats = new GetArbitraryMetadataMessageStats();
|
||||
|
||||
public AtomicLong latestBlocksCacheRefills = new AtomicLong();
|
||||
|
||||
public StatsSnapshot() {
|
||||
@ -1396,6 +1405,14 @@ public class Controller extends Thread {
|
||||
ArbitraryDataManager.getInstance().onNetworkArbitrarySignaturesMessage(peer, message);
|
||||
break;
|
||||
|
||||
case GET_ARBITRARY_METADATA:
|
||||
ArbitraryMetadataManager.getInstance().onNetworkGetArbitraryMetadataMessage(peer, message);
|
||||
break;
|
||||
|
||||
case ARBITRARY_METADATA:
|
||||
ArbitraryMetadataManager.getInstance().onNetworkArbitraryMetadataMessage(peer, message);
|
||||
break;
|
||||
|
||||
default:
|
||||
LOGGER.debug(() -> String.format("Unhandled %s message [ID %d] from peer %s", message.getType().name(), message.getId(), peer));
|
||||
break;
|
||||
|
@ -57,9 +57,9 @@ public class ArbitraryDataFileListManager {
|
||||
|
||||
|
||||
/** Maximum number of seconds that a file list relay request is able to exist on the network */
|
||||
private static long RELAY_REQUEST_MAX_DURATION = 5000L;
|
||||
public static long RELAY_REQUEST_MAX_DURATION = 5000L;
|
||||
/** Maximum number of hops that a file list relay request is allowed to make */
|
||||
private static int RELAY_REQUEST_MAX_HOPS = 3;
|
||||
public static int RELAY_REQUEST_MAX_HOPS = 3;
|
||||
|
||||
|
||||
private ArbitraryDataFileListManager() {
|
||||
|
@ -275,6 +275,9 @@ public class ArbitraryDataManager extends Thread {
|
||||
|
||||
// Cleanup file request caches
|
||||
ArbitraryDataFileManager.getInstance().cleanupRequestCache(now);
|
||||
|
||||
// Clean up metadata request caches
|
||||
ArbitraryMetadataManager.getInstance().cleanupRequestCache(now);
|
||||
}
|
||||
|
||||
public boolean isResourceCached(ArbitraryDataResource resource) {
|
||||
|
@ -0,0 +1,434 @@
|
||||
package org.qortal.controller.arbitrary;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.qortal.arbitrary.ArbitraryDataFile;
|
||||
import org.qortal.arbitrary.ArbitraryDataResource;
|
||||
import org.qortal.controller.Controller;
|
||||
import org.qortal.data.transaction.ArbitraryTransactionData;
|
||||
import org.qortal.data.transaction.TransactionData;
|
||||
import org.qortal.network.Network;
|
||||
import org.qortal.network.Peer;
|
||||
import org.qortal.network.message.*;
|
||||
import org.qortal.repository.DataException;
|
||||
import org.qortal.repository.Repository;
|
||||
import org.qortal.repository.RepositoryManager;
|
||||
import org.qortal.settings.Settings;
|
||||
import org.qortal.utils.Base58;
|
||||
import org.qortal.utils.NTP;
|
||||
import org.qortal.utils.Triple;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
import static org.qortal.controller.arbitrary.ArbitraryDataFileListManager.RELAY_REQUEST_MAX_DURATION;
|
||||
import static org.qortal.controller.arbitrary.ArbitraryDataFileListManager.RELAY_REQUEST_MAX_HOPS;
|
||||
|
||||
public class ArbitraryMetadataManager {
|
||||
|
||||
private static final Logger LOGGER = LogManager.getLogger(ArbitraryMetadataManager.class);
|
||||
|
||||
private static ArbitraryMetadataManager instance;
|
||||
|
||||
/**
|
||||
* Map of recent incoming requests for ARBITRARY transaction metadata.
|
||||
* <p>
|
||||
* Key is original request's message ID<br>
|
||||
* Value is Triple<transaction signature in base58, first requesting peer, first request's timestamp>
|
||||
* <p>
|
||||
* If peer is null then either:<br>
|
||||
* <ul>
|
||||
* <li>we are the original requesting peer</li>
|
||||
* <li>we have already sent data payload to original requesting peer.</li>
|
||||
* </ul>
|
||||
* If signature is null then we have already received the file list and either:<br>
|
||||
* <ul>
|
||||
* <li>we are the original requesting peer and have processed it</li>
|
||||
* <li>we have forwarded the metadata</li>
|
||||
* </ul>
|
||||
*/
|
||||
public Map<Integer, Triple<String, Peer, Long>> arbitraryMetadataRequests = Collections.synchronizedMap(new HashMap<>());
|
||||
|
||||
/**
|
||||
* Map to keep track of in progress arbitrary metadata requests
|
||||
* Key: string - the signature encoded in base58
|
||||
* Value: Triple<networkBroadcastCount, directPeerRequestCount, lastAttemptTimestamp>
|
||||
*/
|
||||
private Map<String, Triple<Integer, Integer, Long>> arbitraryMetadataSignatureRequests = Collections.synchronizedMap(new HashMap<>());
|
||||
|
||||
|
||||
private ArbitraryMetadataManager() {
|
||||
}
|
||||
|
||||
public static ArbitraryMetadataManager getInstance() {
|
||||
if (instance == null)
|
||||
instance = new ArbitraryMetadataManager();
|
||||
|
||||
return instance;
|
||||
}
|
||||
|
||||
public void cleanupRequestCache(Long now) {
|
||||
if (now == null) {
|
||||
return;
|
||||
}
|
||||
final long requestMinimumTimestamp = now - ArbitraryDataManager.ARBITRARY_REQUEST_TIMEOUT;
|
||||
arbitraryMetadataRequests.entrySet().removeIf(entry -> entry.getValue().getC() == null || entry.getValue().getC() < requestMinimumTimestamp);
|
||||
}
|
||||
|
||||
|
||||
public byte[] fetchMetadata(ArbitraryDataResource arbitraryDataResource) {
|
||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||
// Find latest transaction
|
||||
ArbitraryTransactionData latestTransaction = repository.getArbitraryRepository()
|
||||
.getLatestTransaction(arbitraryDataResource.getResourceId(), arbitraryDataResource.getService(),
|
||||
null, arbitraryDataResource.getIdentifier());
|
||||
|
||||
if (latestTransaction != null) {
|
||||
byte[] signature = latestTransaction.getSignature();
|
||||
byte[] metadataHash = latestTransaction.getMetadataHash();
|
||||
if (metadataHash == null) {
|
||||
// This resource doesn't have metadata
|
||||
return null;
|
||||
}
|
||||
|
||||
ArbitraryDataFile metadataFile = ArbitraryDataFile.fromHash(metadataHash, signature);
|
||||
if (metadataFile.exists()) {
|
||||
// Use local copy
|
||||
return metadataFile.getBytes();
|
||||
}
|
||||
else {
|
||||
// Request from network
|
||||
this.fetchArbitraryMetadata(latestTransaction);
|
||||
}
|
||||
}
|
||||
|
||||
} catch (DataException e) {
|
||||
LOGGER.error("Repository issue when fetching arbitrary transaction metadata", e);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
// Request metadata from network
|
||||
|
||||
public boolean fetchArbitraryMetadata(ArbitraryTransactionData arbitraryTransactionData) {
|
||||
byte[] signature = arbitraryTransactionData.getSignature();
|
||||
String signature58 = Base58.encode(signature);
|
||||
|
||||
// Require an NTP sync
|
||||
Long now = NTP.getTime();
|
||||
if (now == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// If we've already tried too many times in a short space of time, make sure to give up
|
||||
if (!this.shouldMakeMetadataRequestForSignature(signature58)) {
|
||||
LOGGER.trace("Skipping metadata request for signature {} due to rate limit", signature58);
|
||||
return false;
|
||||
}
|
||||
this.addToSignatureRequests(signature58, true, false);
|
||||
|
||||
List<Peer> handshakedPeers = Network.getInstance().getHandshakedPeers();
|
||||
LOGGER.debug(String.format("Sending metadata request for signature %s to %d peers...", signature58, handshakedPeers.size()));
|
||||
|
||||
// Build request
|
||||
Message getArbitraryMetadataMessage = new GetArbitraryMetadataMessage(signature, now, 0);
|
||||
|
||||
// Save our request into requests map
|
||||
Triple<String, Peer, Long> requestEntry = new Triple<>(signature58, null, NTP.getTime());
|
||||
|
||||
// Assign random ID to this message
|
||||
int id;
|
||||
do {
|
||||
id = new Random().nextInt(Integer.MAX_VALUE - 1) + 1;
|
||||
|
||||
// Put queue into map (keyed by message ID) so we can poll for a response
|
||||
// If putIfAbsent() doesn't return null, then this ID is already taken
|
||||
} while (arbitraryMetadataRequests.put(id, requestEntry) != null);
|
||||
getArbitraryMetadataMessage.setId(id);
|
||||
|
||||
// Broadcast request
|
||||
Network.getInstance().broadcast(peer -> getArbitraryMetadataMessage);
|
||||
|
||||
// Poll to see if data has arrived
|
||||
final long singleWait = 100;
|
||||
long totalWait = 0;
|
||||
while (totalWait < ArbitraryDataManager.ARBITRARY_REQUEST_TIMEOUT) {
|
||||
try {
|
||||
Thread.sleep(singleWait);
|
||||
} catch (InterruptedException e) {
|
||||
break;
|
||||
}
|
||||
|
||||
requestEntry = arbitraryMetadataRequests.get(id);
|
||||
if (requestEntry == null)
|
||||
return false;
|
||||
|
||||
if (requestEntry.getA() == null)
|
||||
break;
|
||||
|
||||
totalWait += singleWait;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
// Track metadata lookups by signature
|
||||
|
||||
private boolean shouldMakeMetadataRequestForSignature(String signature58) {
|
||||
Triple<Integer, Integer, Long> request = arbitraryMetadataSignatureRequests.get(signature58);
|
||||
|
||||
if (request == null) {
|
||||
// Not attempted yet
|
||||
return true;
|
||||
}
|
||||
|
||||
// Extract the components
|
||||
Integer networkBroadcastCount = request.getA();
|
||||
// Integer directPeerRequestCount = request.getB();
|
||||
Long lastAttemptTimestamp = request.getC();
|
||||
|
||||
if (lastAttemptTimestamp == null) {
|
||||
// Not attempted yet
|
||||
return true;
|
||||
}
|
||||
|
||||
long timeSinceLastAttempt = NTP.getTime() - lastAttemptTimestamp;
|
||||
|
||||
// Allow a second attempt after 15 seconds, and another after 30 seconds
|
||||
if (timeSinceLastAttempt > 15 * 1000L) {
|
||||
// We haven't tried for at least 15 seconds
|
||||
|
||||
if (networkBroadcastCount < 3) {
|
||||
// We've made less than 3 total attempts
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
// Then allow another 5 attempts, each 5 minutes apart
|
||||
if (timeSinceLastAttempt > 5 * 60 * 1000L) {
|
||||
// We haven't tried for at least 5 minutes
|
||||
|
||||
if (networkBroadcastCount < 5) {
|
||||
// We've made less than 5 total attempts
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
// From then on, only try once every 24 hours, to reduce network spam
|
||||
if (timeSinceLastAttempt > 24 * 60 * 60 * 1000L) {
|
||||
// We haven't tried for at least 24 hours
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
public boolean isSignatureRateLimited(byte[] signature) {
|
||||
String signature58 = Base58.encode(signature);
|
||||
return !this.shouldMakeMetadataRequestForSignature(signature58);
|
||||
}
|
||||
|
||||
public long lastRequestForSignature(byte[] signature) {
|
||||
String signature58 = Base58.encode(signature);
|
||||
Triple<Integer, Integer, Long> request = arbitraryMetadataSignatureRequests.get(signature58);
|
||||
|
||||
if (request == null) {
|
||||
// Not attempted yet
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Extract the components
|
||||
Long lastAttemptTimestamp = request.getC();
|
||||
if (lastAttemptTimestamp != null) {
|
||||
return lastAttemptTimestamp;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
public void addToSignatureRequests(String signature58, boolean incrementNetworkRequests, boolean incrementPeerRequests) {
|
||||
Triple<Integer, Integer, Long> request = arbitraryMetadataSignatureRequests.get(signature58);
|
||||
Long now = NTP.getTime();
|
||||
|
||||
if (request == null) {
|
||||
// No entry yet
|
||||
Triple<Integer, Integer, Long> newRequest = new Triple<>(0, 0, now);
|
||||
arbitraryMetadataSignatureRequests.put(signature58, newRequest);
|
||||
}
|
||||
else {
|
||||
// There is an existing entry
|
||||
if (incrementNetworkRequests) {
|
||||
request.setA(request.getA() + 1);
|
||||
}
|
||||
if (incrementPeerRequests) {
|
||||
request.setB(request.getB() + 1);
|
||||
}
|
||||
request.setC(now);
|
||||
arbitraryMetadataSignatureRequests.put(signature58, request);
|
||||
}
|
||||
}
|
||||
|
||||
public void removeFromSignatureRequests(String signature58) {
|
||||
arbitraryMetadataSignatureRequests.remove(signature58);
|
||||
}
|
||||
|
||||
|
||||
// Network handlers
|
||||
|
||||
public void onNetworkArbitraryMetadataMessage(Peer peer, Message message) {
|
||||
// Don't process if QDN is disabled
|
||||
if (!Settings.getInstance().isQdnEnabled()) {
|
||||
return;
|
||||
}
|
||||
|
||||
ArbitraryMetadataMessage arbitraryMetadataMessage = (ArbitraryMetadataMessage) message;
|
||||
LOGGER.debug("Received metadata from peer {}", peer);
|
||||
|
||||
// Do we have a pending request for this data?
|
||||
Triple<String, Peer, Long> request = arbitraryMetadataRequests.get(message.getId());
|
||||
if (request == null || request.getA() == null) {
|
||||
return;
|
||||
}
|
||||
boolean isRelayRequest = (request.getB() != null);
|
||||
|
||||
// Does this message's signature match what we're expecting?
|
||||
byte[] signature = arbitraryMetadataMessage.getSignature();
|
||||
String signature58 = Base58.encode(signature);
|
||||
if (!request.getA().equals(signature58)) {
|
||||
return;
|
||||
}
|
||||
|
||||
ArbitraryTransactionData arbitraryTransactionData = null;
|
||||
ArbitraryDataFileManager arbitraryDataFileManager = ArbitraryDataFileManager.getInstance();
|
||||
|
||||
// Forwarding
|
||||
if (isRelayRequest && Settings.getInstance().isRelayModeEnabled()) {
|
||||
|
||||
// Get transaction info
|
||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||
TransactionData transactionData = repository.getTransactionRepository().fromSignature(signature);
|
||||
if (!(transactionData instanceof ArbitraryTransactionData))
|
||||
return;
|
||||
arbitraryTransactionData = (ArbitraryTransactionData) transactionData;
|
||||
} catch (DataException e) {
|
||||
LOGGER.error(String.format("Repository issue while finding arbitrary transaction metadata for peer %s", peer), e);
|
||||
}
|
||||
|
||||
// Check if the name is blocked
|
||||
boolean isBlocked = (arbitraryTransactionData == null || ArbitraryDataStorageManager.getInstance().isNameBlocked(arbitraryTransactionData.getName()));
|
||||
if (!isBlocked) {
|
||||
Peer requestingPeer = request.getB();
|
||||
if (requestingPeer != null) {
|
||||
|
||||
// Forward to requesting peer
|
||||
LOGGER.debug("Forwarding metadata to requesting peer: {}", requestingPeer);
|
||||
if (!requestingPeer.sendMessage(arbitraryMetadataMessage)) {
|
||||
requestingPeer.disconnect("failed to forward arbitrary metadata");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void onNetworkGetArbitraryMetadataMessage(Peer peer, Message message) {
|
||||
// Don't respond if QDN is disabled
|
||||
if (!Settings.getInstance().isQdnEnabled()) {
|
||||
return;
|
||||
}
|
||||
|
||||
Controller.getInstance().stats.getArbitraryMetadataMessageStats.requests.incrementAndGet();
|
||||
|
||||
GetArbitraryMetadataMessage getArbitraryMetadataMessage = (GetArbitraryMetadataMessage) message;
|
||||
byte[] signature = getArbitraryMetadataMessage.getSignature();
|
||||
String signature58 = Base58.encode(signature);
|
||||
Long now = NTP.getTime();
|
||||
Triple<String, Peer, Long> newEntry = new Triple<>(signature58, peer, now);
|
||||
|
||||
// If we've seen this request recently, then ignore
|
||||
if (arbitraryMetadataRequests.putIfAbsent(message.getId(), newEntry) != null) {
|
||||
LOGGER.debug("Ignoring metadata request from peer {} for signature {}", peer, signature58);
|
||||
return;
|
||||
}
|
||||
|
||||
LOGGER.debug("Received metadata request from peer {} for signature {}", peer, signature58);
|
||||
|
||||
ArbitraryTransactionData transactionData = null;
|
||||
ArbitraryDataFile metadataFile = null;
|
||||
|
||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||
|
||||
// Firstly we need to lookup this file on chain to get its metadata hash
|
||||
transactionData = (ArbitraryTransactionData)repository.getTransactionRepository().fromSignature(signature);
|
||||
if (transactionData instanceof ArbitraryTransactionData) {
|
||||
|
||||
// Check if we're even allowed to serve metadata for this transaction
|
||||
if (ArbitraryDataStorageManager.getInstance().canStoreData(transactionData)) {
|
||||
|
||||
byte[] metadataHash = transactionData.getMetadataHash();
|
||||
if (metadataHash != null) {
|
||||
|
||||
// Load metadata file
|
||||
metadataFile = ArbitraryDataFile.fromHash(metadataHash, signature);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} catch (DataException e) {
|
||||
LOGGER.error(String.format("Repository issue while fetching arbitrary metadata for peer %s", peer), e);
|
||||
}
|
||||
|
||||
// We should only respond if we have the metadata file
|
||||
if (metadataFile != null && metadataFile.exists()) {
|
||||
|
||||
// We have the metadata file, so update requests map to reflect that we've sent it
|
||||
newEntry = new Triple<>(null, null, now);
|
||||
arbitraryMetadataRequests.put(message.getId(), newEntry);
|
||||
|
||||
ArbitraryMetadataMessage arbitraryMetadataMessage = new ArbitraryMetadataMessage(signature, metadataFile);
|
||||
arbitraryMetadataMessage.setId(message.getId());
|
||||
if (!peer.sendMessage(arbitraryMetadataMessage)) {
|
||||
LOGGER.debug("Couldn't send metadata");
|
||||
peer.disconnect("failed to send metadata");
|
||||
return;
|
||||
}
|
||||
LOGGER.debug("Sent metadata");
|
||||
|
||||
// Nothing left to do, so return to prevent any unnecessary forwarding from occurring
|
||||
LOGGER.debug("No need for any forwarding because metadata request is fully served");
|
||||
return;
|
||||
|
||||
}
|
||||
|
||||
// We may need to forward this request on
|
||||
boolean isBlocked = (transactionData == null || ArbitraryDataStorageManager.getInstance().isNameBlocked(transactionData.getName()));
|
||||
if (Settings.getInstance().isRelayModeEnabled() && !isBlocked) {
|
||||
// In relay mode - so ask our other peers if they have it
|
||||
|
||||
long requestTime = getArbitraryMetadataMessage.getRequestTime();
|
||||
int requestHops = getArbitraryMetadataMessage.getRequestHops();
|
||||
getArbitraryMetadataMessage.setRequestHops(++requestHops);
|
||||
long totalRequestTime = now - requestTime;
|
||||
|
||||
if (totalRequestTime < RELAY_REQUEST_MAX_DURATION) {
|
||||
// Relay request hasn't timed out yet, so can potentially be rebroadcast
|
||||
if (requestHops < RELAY_REQUEST_MAX_HOPS) {
|
||||
// Relay request hasn't reached the maximum number of hops yet, so can be rebroadcast
|
||||
|
||||
LOGGER.debug("Rebroadcasting metadata request from peer {} for signature {} to our other peers... totalRequestTime: {}, requestHops: {}", peer, Base58.encode(signature), totalRequestTime, requestHops);
|
||||
Network.getInstance().broadcast(
|
||||
broadcastPeer -> broadcastPeer == peer ||
|
||||
Objects.equals(broadcastPeer.getPeerData().getAddress().getHost(), peer.getPeerData().getAddress().getHost())
|
||||
? null : getArbitraryMetadataMessage);
|
||||
|
||||
}
|
||||
else {
|
||||
// This relay request has reached the maximum number of allowed hops
|
||||
}
|
||||
}
|
||||
else {
|
||||
// This relay request has timed out
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,95 @@
|
||||
package org.qortal.network.message;
|
||||
|
||||
import com.google.common.primitives.Ints;
|
||||
import org.qortal.arbitrary.ArbitraryDataFile;
|
||||
import org.qortal.repository.DataException;
|
||||
import org.qortal.transform.Transformer;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
public class ArbitraryMetadataMessage extends Message {
|
||||
|
||||
private static final int SIGNATURE_LENGTH = Transformer.SIGNATURE_LENGTH;
|
||||
|
||||
private final byte[] signature;
|
||||
private final ArbitraryDataFile arbitraryMetadataFile;
|
||||
|
||||
public ArbitraryMetadataMessage(byte[] signature, ArbitraryDataFile arbitraryDataFile) {
|
||||
super(MessageType.ARBITRARY_METADATA);
|
||||
|
||||
this.signature = signature;
|
||||
this.arbitraryMetadataFile = arbitraryDataFile;
|
||||
}
|
||||
|
||||
public ArbitraryMetadataMessage(int id, byte[] signature, ArbitraryDataFile arbitraryDataFile) {
|
||||
super(id, MessageType.ARBITRARY_METADATA);
|
||||
|
||||
this.signature = signature;
|
||||
this.arbitraryMetadataFile = arbitraryDataFile;
|
||||
}
|
||||
|
||||
public byte[] getSignature() {
|
||||
return this.signature;
|
||||
}
|
||||
|
||||
public ArbitraryDataFile getArbitraryMetadataFile() {
|
||||
return this.arbitraryMetadataFile;
|
||||
}
|
||||
|
||||
public static Message fromByteBuffer(int id, ByteBuffer byteBuffer) throws UnsupportedEncodingException {
|
||||
byte[] signature = new byte[SIGNATURE_LENGTH];
|
||||
byteBuffer.get(signature);
|
||||
|
||||
int dataLength = byteBuffer.getInt();
|
||||
|
||||
if (byteBuffer.remaining() != dataLength)
|
||||
return null;
|
||||
|
||||
byte[] data = new byte[dataLength];
|
||||
byteBuffer.get(data);
|
||||
|
||||
try {
|
||||
ArbitraryDataFile arbitraryMetadataFile = new ArbitraryDataFile(data, signature);
|
||||
return new ArbitraryMetadataMessage(id, signature, arbitraryMetadataFile);
|
||||
}
|
||||
catch (DataException e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected byte[] toData() {
|
||||
if (this.arbitraryMetadataFile == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
byte[] data = this.arbitraryMetadataFile.getBytes();
|
||||
if (data == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
|
||||
|
||||
bytes.write(signature);
|
||||
|
||||
bytes.write(Ints.toByteArray(data.length));
|
||||
|
||||
bytes.write(data);
|
||||
|
||||
return bytes.toByteArray();
|
||||
} catch (IOException e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public ArbitraryMetadataMessage cloneWithNewId(int newId) {
|
||||
ArbitraryMetadataMessage clone = new ArbitraryMetadataMessage(this.signature, this.arbitraryMetadataFile);
|
||||
clone.setId(newId);
|
||||
return clone;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,83 @@
|
||||
package org.qortal.network.message;
|
||||
|
||||
import com.google.common.primitives.Ints;
|
||||
import com.google.common.primitives.Longs;
|
||||
import org.qortal.transform.Transformer;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import static org.qortal.transform.Transformer.INT_LENGTH;
|
||||
import static org.qortal.transform.Transformer.LONG_LENGTH;
|
||||
|
||||
public class GetArbitraryMetadataMessage extends Message {
|
||||
|
||||
private static final int SIGNATURE_LENGTH = Transformer.SIGNATURE_LENGTH;
|
||||
|
||||
private final byte[] signature;
|
||||
private final long requestTime;
|
||||
private int requestHops;
|
||||
|
||||
public GetArbitraryMetadataMessage(byte[] signature, long requestTime, int requestHops) {
|
||||
this(-1, signature, requestTime, requestHops);
|
||||
}
|
||||
|
||||
private GetArbitraryMetadataMessage(int id, byte[] signature, long requestTime, int requestHops) {
|
||||
super(id, MessageType.GET_ARBITRARY_METADATA);
|
||||
|
||||
this.signature = signature;
|
||||
this.requestTime = requestTime;
|
||||
this.requestHops = requestHops;
|
||||
}
|
||||
|
||||
public byte[] getSignature() {
|
||||
return this.signature;
|
||||
}
|
||||
|
||||
public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException {
|
||||
if (bytes.remaining() != SIGNATURE_LENGTH + LONG_LENGTH + INT_LENGTH)
|
||||
return null;
|
||||
|
||||
byte[] signature = new byte[SIGNATURE_LENGTH];
|
||||
|
||||
bytes.get(signature);
|
||||
|
||||
long requestTime = bytes.getLong();
|
||||
|
||||
int requestHops = bytes.getInt();
|
||||
|
||||
return new GetArbitraryMetadataMessage(id, signature, requestTime, requestHops);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected byte[] toData() {
|
||||
try {
|
||||
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
|
||||
|
||||
bytes.write(this.signature);
|
||||
|
||||
bytes.write(Longs.toByteArray(this.requestTime));
|
||||
|
||||
bytes.write(Ints.toByteArray(this.requestHops));
|
||||
|
||||
return bytes.toByteArray();
|
||||
} catch (IOException e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public long getRequestTime() {
|
||||
return this.requestTime;
|
||||
}
|
||||
|
||||
public int getRequestHops() {
|
||||
return this.requestHops;
|
||||
}
|
||||
|
||||
public void setRequestHops(int requestHops) {
|
||||
this.requestHops = requestHops;
|
||||
}
|
||||
|
||||
}
|
@ -91,7 +91,10 @@ public abstract class Message {
|
||||
ARBITRARY_DATA_FILE_LIST(120),
|
||||
GET_ARBITRARY_DATA_FILE_LIST(121),
|
||||
|
||||
ARBITRARY_SIGNATURES(130);
|
||||
ARBITRARY_SIGNATURES(130),
|
||||
|
||||
ARBITRARY_METADATA(130),
|
||||
GET_ARBITRARY_METADATA(131);
|
||||
|
||||
public final int value;
|
||||
public final Method fromByteBufferMethod;
|
||||
|
Loading…
x
Reference in New Issue
Block a user