Browse Source

Added relay mode for file list requests

This reuses most of the code already in place in the core related to forwarding.

- A node can opt into relay mode via the "relayModeEnabled": true setting
- From this time onwards, they will ask their peers if they ever receive a file list request that they cannot serve by themselves
- Whenever a peer responds with a file list, it is forwarded on to the originally requesting peer, complete with the peer address of the node that responded
- The original peer can then make a request for the data file(s) themselves using a similar approach, specifying the IP address of the ultimate peer so that the relay node knows who to ask. This part is not implemented yet.
qdn
CalDescent 3 years ago
parent
commit
f4b06fb834
  1. 63
      src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java
  2. 30
      src/main/java/org/qortal/network/message/ArbitraryDataFileListMessage.java
  3. 7
      src/main/java/org/qortal/settings/Settings.java

63
src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java

@ -732,15 +732,7 @@ public class ArbitraryDataManager extends Thread {
public void onNetworkGetArbitraryDataMessage(Peer peer, Message message) { public void onNetworkGetArbitraryDataMessage(Peer peer, Message message) {
GetArbitraryDataMessage getArbitraryDataMessage = (GetArbitraryDataMessage) message; GetArbitraryDataMessage getArbitraryDataMessage = (GetArbitraryDataMessage) message;
byte[] signature = getArbitraryDataMessage.getSignature(); byte[] signature = getArbitraryDataMessage.getSignature();
String signature58 = Base58.encode(signature);
Long timestamp = NTP.getTime();
Triple<String, Peer, Long> newEntry = new Triple<>(signature58, peer, timestamp);
// If we've seen this request recently, then ignore
if (arbitraryDataFileListRequests.putIfAbsent(message.getId(), newEntry) != null)
return;
// Do we even have this transaction? // Do we even have this transaction?
try (final Repository repository = RepositoryManager.getRepository()) { try (final Repository repository = RepositoryManager.getRepository()) {
@ -756,10 +748,6 @@ public class ArbitraryDataManager extends Thread {
if (data == null) if (data == null)
return; return;
// Update requests map to reflect that we've sent it
newEntry = new Triple<>(signature58, null, timestamp);
arbitraryDataFileListRequests.put(message.getId(), newEntry);
Message arbitraryDataMessage = new ArbitraryDataMessage(signature, data); Message arbitraryDataMessage = new ArbitraryDataMessage(signature, data);
arbitraryDataMessage.setId(message.getId()); arbitraryDataMessage.setId(message.getId());
if (!peer.sendMessage(arbitraryDataMessage)) if (!peer.sendMessage(arbitraryDataMessage))
@ -777,10 +765,12 @@ public class ArbitraryDataManager extends Thread {
public void onNetworkArbitraryDataFileListMessage(Peer peer, Message message) { public void onNetworkArbitraryDataFileListMessage(Peer peer, Message message) {
ArbitraryDataFileListMessage arbitraryDataFileListMessage = (ArbitraryDataFileListMessage) message; ArbitraryDataFileListMessage arbitraryDataFileListMessage = (ArbitraryDataFileListMessage) message;
LOGGER.info("Received hash list from peer {} with {} hashes", peer, arbitraryDataFileListMessage.getHashes().size()); String sourcePeer = arbitraryDataFileListMessage.getPeerAddress();
LOGGER.info("Received hash list from peer {} with {} hashes, source peer: {}", peer, arbitraryDataFileListMessage.getHashes().size(), sourcePeer);
// Do we have a pending request for this data? // Do we have a pending request for this data?
Triple<String, Peer, Long> request = arbitraryDataFileListRequests.get(message.getId()); Triple<String, Peer, Long> request = arbitraryDataFileListRequests.get(message.getId());
boolean isRelayRequest = (request.getB() != null);
if (request == null || request.getA() == null) { if (request == null || request.getA() == null) {
return; return;
} }
@ -825,22 +815,27 @@ public class ArbitraryDataManager extends Thread {
Triple<String, Peer, Long> newEntry = new Triple<>(null, null, request.getC()); Triple<String, Peer, Long> newEntry = new Triple<>(null, null, request.getC());
arbitraryDataFileListRequests.put(message.getId(), newEntry); arbitraryDataFileListRequests.put(message.getId(), newEntry);
// Go and fetch the actual data if (!isRelayRequest || !Settings.getInstance().isRelayModeEnabled()) {
// Go and fetch the actual data, since this isn't a relay request
this.fetchArbitraryDataFiles(repository, peer, signature, arbitraryTransactionData, hashes); this.fetchArbitraryDataFiles(repository, peer, signature, arbitraryTransactionData, hashes);
// FUTURE: handle response }
} catch (DataException e) { } catch (DataException e) {
LOGGER.error(String.format("Repository issue while finding arbitrary transaction data list for peer %s", peer), e); LOGGER.error(String.format("Repository issue while finding arbitrary transaction data list for peer %s", peer), e);
} }
// // Forwarding (not yet used) // Forwarding
// Peer requestingPeer = request.getB(); if (isRelayRequest && Settings.getInstance().isRelayModeEnabled()) {
// if (requestingPeer != null) { Peer requestingPeer = request.getB();
// // Forward to requesting peer; if (requestingPeer != null) {
// if (!requestingPeer.sendMessage(arbitraryDataFileListMessage)) { // Add the source peer's address
// requestingPeer.disconnect("failed to forward arbitrary data file list"); arbitraryDataFileListMessage.setPeerAddress(peer.getPeerData().getAddress().toString());
// } // Forward to requesting peer;
// } if (!requestingPeer.sendMessage(arbitraryDataFileListMessage)) {
requestingPeer.disconnect("failed to forward arbitrary data file list");
}
}
}
} }
public void onNetworkGetArbitraryDataFileMessage(Peer peer, Message message) { public void onNetworkGetArbitraryDataFileMessage(Peer peer, Message message) {
@ -886,9 +881,18 @@ public class ArbitraryDataManager extends Thread {
} }
public void onNetworkGetArbitraryDataFileListMessage(Peer peer, Message message) { public void onNetworkGetArbitraryDataFileListMessage(Peer peer, Message message) {
Controller.getInstance().stats.getArbitraryDataFileListMessageStats.requests.incrementAndGet();
GetArbitraryDataFileListMessage getArbitraryDataFileListMessage = (GetArbitraryDataFileListMessage) message; GetArbitraryDataFileListMessage getArbitraryDataFileListMessage = (GetArbitraryDataFileListMessage) message;
byte[] signature = getArbitraryDataFileListMessage.getSignature(); byte[] signature = getArbitraryDataFileListMessage.getSignature();
Controller.getInstance().stats.getArbitraryDataFileListMessageStats.requests.incrementAndGet(); String signature58 = Base58.encode(signature);
Long timestamp = NTP.getTime();
Triple<String, Peer, Long> newEntry = new Triple<>(signature58, peer, timestamp);
// If we've seen this request recently, then ignore
if (arbitraryDataFileListRequests.putIfAbsent(message.getId(), newEntry) != null) {
return;
}
LOGGER.info("Received hash list request from peer {} for signature {}", peer, Base58.encode(signature)); LOGGER.info("Received hash list request from peer {} for signature {}", peer, Base58.encode(signature));
@ -937,13 +941,22 @@ public class ArbitraryDataManager extends Thread {
LOGGER.error(String.format("Repository issue while fetching arbitrary file list for peer %s", peer), e); LOGGER.error(String.format("Repository issue while fetching arbitrary file list for peer %s", peer), e);
} }
ArbitraryDataFileListMessage arbitraryDataFileListMessage = new ArbitraryDataFileListMessage(signature, hashes); // We should only respond if we have at least one hash
if (hashes.size() > 0) {
// Update requests map to reflect that we've sent it
newEntry = new Triple<>(signature58, null, timestamp);
arbitraryDataFileListRequests.put(message.getId(), newEntry);
ArbitraryDataFileListMessage arbitraryDataFileListMessage = new ArbitraryDataFileListMessage(signature, null, hashes);
arbitraryDataFileListMessage.setId(message.getId()); arbitraryDataFileListMessage.setId(message.getId());
if (!peer.sendMessage(arbitraryDataFileListMessage)) { if (!peer.sendMessage(arbitraryDataFileListMessage)) {
LOGGER.info("Couldn't send list of hashes"); LOGGER.info("Couldn't send list of hashes");
peer.disconnect("failed to send list of hashes"); peer.disconnect("failed to send list of hashes");
} }
LOGGER.info("Sent list of hashes (count: {})", hashes.size()); LOGGER.info("Sent list of hashes (count: {})", hashes.size());
}
} }
public void onNetworkArbitrarySignaturesMessage(Peer peer, Message message) { public void onNetworkArbitrarySignaturesMessage(Peer peer, Message message) {

30
src/main/java/org/qortal/network/message/ArbitraryDataFileListMessage.java

@ -1,9 +1,9 @@
package org.qortal.network.message; package org.qortal.network.message;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
import org.qortal.data.block.BlockSummaryData; import org.qortal.transform.TransformationException;
import org.qortal.transform.Transformer; import org.qortal.transform.Transformer;
import org.qortal.transform.block.BlockTransformer; import org.qortal.utils.Serialization;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
@ -19,18 +19,21 @@ public class ArbitraryDataFileListMessage extends Message {
private final byte[] signature; private final byte[] signature;
private final List<byte[]> hashes; private final List<byte[]> hashes;
private String peerAddress;
public ArbitraryDataFileListMessage(byte[] signature, List<byte[]> hashes) { public ArbitraryDataFileListMessage(byte[] signature, String peerAddress, List<byte[]> hashes) {
super(MessageType.ARBITRARY_DATA_FILE_LIST); super(MessageType.ARBITRARY_DATA_FILE_LIST);
this.signature = signature; this.signature = signature;
this.peerAddress = peerAddress;
this.hashes = hashes; this.hashes = hashes;
} }
public ArbitraryDataFileListMessage(int id, byte[] signature, List<byte[]> hashes) { public ArbitraryDataFileListMessage(int id, byte[] signature, String peerAddress, List<byte[]> hashes) {
super(id, MessageType.ARBITRARY_DATA_FILE_LIST); super(id, MessageType.ARBITRARY_DATA_FILE_LIST);
this.signature = signature; this.signature = signature;
this.peerAddress = peerAddress;
this.hashes = hashes; this.hashes = hashes;
} }
@ -42,10 +45,21 @@ public class ArbitraryDataFileListMessage extends Message {
return this.signature; return this.signature;
} }
public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException { public void setPeerAddress(String peerAddress) {
this.peerAddress = peerAddress;
}
public String getPeerAddress() {
return this.peerAddress;
}
public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException, TransformationException {
byte[] signature = new byte[SIGNATURE_LENGTH]; byte[] signature = new byte[SIGNATURE_LENGTH];
bytes.get(signature); bytes.get(signature);
String peerAddress = Serialization.deserializeSizedString(bytes, 255);
int count = bytes.getInt(); int count = bytes.getInt();
if (bytes.remaining() != count * HASH_LENGTH) if (bytes.remaining() != count * HASH_LENGTH)
@ -59,7 +73,7 @@ public class ArbitraryDataFileListMessage extends Message {
hashes.add(hash); hashes.add(hash);
} }
return new ArbitraryDataFileListMessage(id, signature, hashes); return new ArbitraryDataFileListMessage(id, signature, peerAddress, hashes);
} }
@Override @Override
@ -69,6 +83,8 @@ public class ArbitraryDataFileListMessage extends Message {
bytes.write(this.signature); bytes.write(this.signature);
Serialization.serializeSizedString(bytes, this.peerAddress);
bytes.write(Ints.toByteArray(this.hashes.size())); bytes.write(Ints.toByteArray(this.hashes.size()));
for (byte[] hash : this.hashes) { for (byte[] hash : this.hashes) {
@ -82,7 +98,7 @@ public class ArbitraryDataFileListMessage extends Message {
} }
public ArbitraryDataFileListMessage cloneWithNewId(int newId) { public ArbitraryDataFileListMessage cloneWithNewId(int newId) {
ArbitraryDataFileListMessage clone = new ArbitraryDataFileListMessage(this.signature, this.hashes); ArbitraryDataFileListMessage clone = new ArbitraryDataFileListMessage(this.signature, this.peerAddress, this.hashes);
clone.setId(newId); clone.setId(newId);
return clone; return clone;
} }

7
src/main/java/org/qortal/settings/Settings.java

@ -279,6 +279,9 @@ public class Settings {
/** Storage policy to indicate which data should be hosted */ /** Storage policy to indicate which data should be hosted */
private String storagePolicy = "FOLLOWED_AND_VIEWED"; private String storagePolicy = "FOLLOWED_AND_VIEWED";
/** Whether to allow data outside of the storage policy to be relayed between other peers */
private boolean relayModeEnabled = false;
/** Expiry time (ms) for (unencrypted) built/cached data */ /** Expiry time (ms) for (unencrypted) built/cached data */
private Long builtDataExpiryInterval = 30 * 24 * 60 * 60 * 1000L; // 30 days private Long builtDataExpiryInterval = 30 * 24 * 60 * 60 * 1000L; // 30 days
@ -828,6 +831,10 @@ public class Settings {
return StoragePolicy.valueOf(this.storagePolicy); return StoragePolicy.valueOf(this.storagePolicy);
} }
public boolean isRelayModeEnabled() {
return this.relayModeEnabled;
}
public Long getBuiltDataExpiryInterval() { public Long getBuiltDataExpiryInterval() {
return this.builtDataExpiryInterval; return this.builtDataExpiryInterval;
} }

Loading…
Cancel
Save