forked from Qortal/qortal
Merge branch 'qdn-direct-connections'
This commit is contained in:
commit
122539596d
@ -283,8 +283,8 @@ public class ArbitraryDataFileListManager {
|
||||
|
||||
LOGGER.debug(String.format("Sending data file list request for signature %s with %d hashes to %d peers...", signature58, hashCount, handshakedPeers.size()));
|
||||
|
||||
// FUTURE: send our address as requestingPeer once enough peers have switched to the new protocol
|
||||
String requestingPeer = null; // Network.getInstance().getOurExternalIpAddressAndPort();
|
||||
// Send our address as requestingPeer, to allow for potential direct connections with seeds/peers
|
||||
String requestingPeer = Network.getInstance().getOurExternalIpAddressAndPort();
|
||||
|
||||
// Build request
|
||||
Message getArbitraryDataFileListMessage = new GetArbitraryDataFileListMessage(signature, missingHashes, now, 0, requestingPeer);
|
||||
@ -636,6 +636,9 @@ public class ArbitraryDataFileListManager {
|
||||
// We should only respond if we have at least one hash
|
||||
if (hashes.size() > 0) {
|
||||
|
||||
// Firstly we should keep track of the requesting peer, to allow for potential direct connections later
|
||||
ArbitraryDataFileManager.getInstance().addRecentDataRequest(requestingPeer);
|
||||
|
||||
// We have all the chunks, so update requests map to reflect that we've sent it
|
||||
// There is no need to keep track of the request, as we can serve all the chunks
|
||||
if (allChunksExist) {
|
||||
|
@ -1,5 +1,6 @@
|
||||
package org.qortal.controller.arbitrary;
|
||||
|
||||
import com.google.common.net.InetAddresses;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.qortal.arbitrary.ArbitraryDataFile;
|
||||
@ -54,6 +55,13 @@ public class ArbitraryDataFileManager extends Thread {
|
||||
*/
|
||||
private List<ArbitraryDirectConnectionInfo> directConnectionInfo = Collections.synchronizedList(new ArrayList<>());
|
||||
|
||||
/**
|
||||
* Map to keep track of peers requesting QDN data that we hold.
|
||||
* Key = peer address string, value = time of last request.
|
||||
* This allows for additional "burst" connections beyond existing limits.
|
||||
*/
|
||||
private Map<String, Long> recentDataRequests = Collections.synchronizedMap(new HashMap<>());
|
||||
|
||||
|
||||
public static int MAX_FILE_HASH_RESPONSES = 1000;
|
||||
|
||||
@ -108,6 +116,9 @@ public class ArbitraryDataFileManager extends Thread {
|
||||
|
||||
final long directConnectionInfoMinimumTimestamp = now - ArbitraryDataManager.getInstance().ARBITRARY_DIRECT_CONNECTION_INFO_TIMEOUT;
|
||||
directConnectionInfo.removeIf(entry -> entry.getTimestamp() < directConnectionInfoMinimumTimestamp);
|
||||
|
||||
final long recentDataRequestMinimumTimestamp = now - ArbitraryDataManager.getInstance().ARBITRARY_RECENT_DATA_REQUESTS_TIMEOUT;
|
||||
recentDataRequests.entrySet().removeIf(entry -> entry.getValue() < recentDataRequestMinimumTimestamp);
|
||||
}
|
||||
|
||||
|
||||
@ -490,6 +501,45 @@ public class ArbitraryDataFileManager extends Thread {
|
||||
}
|
||||
|
||||
|
||||
// Peers requesting QDN data from us
|
||||
|
||||
/**
|
||||
* Add an address string of a peer that is trying to request data from us.
|
||||
* @param peerAddress
|
||||
*/
|
||||
public void addRecentDataRequest(String peerAddress) {
|
||||
if (peerAddress == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
Long now = NTP.getTime();
|
||||
if (now == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Make sure to remove the port, since it isn't guaranteed to match next time
|
||||
String[] parts = peerAddress.split(":");
|
||||
if (parts.length == 0) {
|
||||
return;
|
||||
}
|
||||
String host = parts[0];
|
||||
if (!InetAddresses.isInetAddress(host)) {
|
||||
// Invalid host
|
||||
return;
|
||||
}
|
||||
|
||||
this.recentDataRequests.put(host, now);
|
||||
}
|
||||
|
||||
public boolean isPeerRequestingData(String peerAddressWithoutPort) {
|
||||
return this.recentDataRequests.containsKey(peerAddressWithoutPort);
|
||||
}
|
||||
|
||||
public boolean hasPendingDataRequest() {
|
||||
return !this.recentDataRequests.isEmpty();
|
||||
}
|
||||
|
||||
|
||||
// Network handlers
|
||||
|
||||
public void onNetworkGetArbitraryDataFileMessage(Peer peer, Message message) {
|
||||
|
@ -47,6 +47,9 @@ public class ArbitraryDataManager extends Thread {
|
||||
/** Maximum time to hold direct peer connection information */
|
||||
public static final long ARBITRARY_DIRECT_CONNECTION_INFO_TIMEOUT = 2 * 60 * 1000L; // ms
|
||||
|
||||
/** Maximum time to hold information about recent data requests that we can fulfil */
|
||||
public static final long ARBITRARY_RECENT_DATA_REQUESTS_TIMEOUT = 2 * 60 * 1000L; // ms
|
||||
|
||||
/** Maximum number of hops that an arbitrary signatures request is allowed to make */
|
||||
private static int ARBITRARY_SIGNATURES_REQUEST_MAX_HOPS = 3;
|
||||
|
||||
|
@ -8,6 +8,7 @@ import org.bouncycastle.crypto.params.Ed25519PublicKeyParameters;
|
||||
import org.qortal.block.BlockChain;
|
||||
import org.qortal.controller.Controller;
|
||||
import org.qortal.controller.arbitrary.ArbitraryDataFileListManager;
|
||||
import org.qortal.controller.arbitrary.ArbitraryDataManager;
|
||||
import org.qortal.crypto.Crypto;
|
||||
import org.qortal.data.block.BlockData;
|
||||
import org.qortal.data.network.PeerData;
|
||||
@ -259,6 +260,18 @@ public class Network {
|
||||
return this.immutableConnectedPeers;
|
||||
}
|
||||
|
||||
public List<Peer> getImmutableConnectedDataPeers() {
|
||||
return this.getImmutableConnectedPeers().stream()
|
||||
.filter(p -> p.isDataPeer())
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public List<Peer> getImmutableConnectedNonDataPeers() {
|
||||
return this.getImmutableConnectedPeers().stream()
|
||||
.filter(p -> !p.isDataPeer())
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public void addConnectedPeer(Peer peer) {
|
||||
this.connectedPeers.add(peer); // thread safe thanks to synchronized list
|
||||
this.immutableConnectedPeers = List.copyOf(this.connectedPeers); // also thread safe thanks to synchronized collection's toArray() being fed to List.of(array)
|
||||
@ -325,6 +338,7 @@ public class Network {
|
||||
// Add this signature to the list of pending requests for this peer
|
||||
LOGGER.info("Making connection to peer {} to request files for signature {}...", peerAddressString, Base58.encode(signature));
|
||||
Peer peer = new Peer(peerData);
|
||||
peer.setIsDataPeer(true);
|
||||
peer.addPendingSignatureRequest(signature);
|
||||
return this.connectPeer(peer);
|
||||
// If connection (and handshake) is successful, data will automatically be requested
|
||||
@ -685,6 +699,7 @@ public class Network {
|
||||
// Pick candidate
|
||||
PeerData peerData = peers.get(peerIndex);
|
||||
Peer newPeer = new Peer(peerData);
|
||||
newPeer.setIsDataPeer(false);
|
||||
|
||||
// Update connection attempt info
|
||||
peerData.setLastAttempted(now);
|
||||
|
@ -64,6 +64,11 @@ public class Peer {
|
||||
*/
|
||||
private boolean isLocal;
|
||||
|
||||
/**
|
||||
* True if connected for the purposes of transfering specific QDN data
|
||||
*/
|
||||
private boolean isDataPeer;
|
||||
|
||||
private final UUID peerConnectionId = UUID.randomUUID();
|
||||
private final Object byteBufferLock = new Object();
|
||||
private ByteBuffer byteBuffer;
|
||||
@ -194,6 +199,14 @@ public class Peer {
|
||||
return this.isOutbound;
|
||||
}
|
||||
|
||||
public boolean isDataPeer() {
|
||||
return isDataPeer;
|
||||
}
|
||||
|
||||
public void setIsDataPeer(boolean isDataPeer) {
|
||||
this.isDataPeer = isDataPeer;
|
||||
}
|
||||
|
||||
public Handshake getHandshakeStatus() {
|
||||
synchronized (this.handshakingLock) {
|
||||
return this.handshakeStatus;
|
||||
@ -211,6 +224,11 @@ public class Peer {
|
||||
}
|
||||
|
||||
private void generateRandomMaxConnectionAge() {
|
||||
if (this.maxConnectionAge > 0L) {
|
||||
// Already generated, so we don't want to overwrite the existing value
|
||||
return;
|
||||
}
|
||||
|
||||
// Retrieve the min and max connection time from the settings, and calculate the range
|
||||
final int minPeerConnectionTime = Settings.getInstance().getMinPeerConnectionTime();
|
||||
final int maxPeerConnectionTime = Settings.getInstance().getMaxPeerConnectionTime();
|
||||
@ -893,6 +911,10 @@ public class Peer {
|
||||
return maxConnectionAge;
|
||||
}
|
||||
|
||||
public void setMaxConnectionAge(long maxConnectionAge) {
|
||||
this.maxConnectionAge = maxConnectionAge;
|
||||
}
|
||||
|
||||
public boolean hasReachedMaxConnectionAge() {
|
||||
return this.getConnectionAge() > this.getMaxConnectionAge();
|
||||
}
|
||||
|
@ -2,6 +2,7 @@ package org.qortal.network.task;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.qortal.controller.arbitrary.ArbitraryDataFileManager;
|
||||
import org.qortal.network.Network;
|
||||
import org.qortal.network.Peer;
|
||||
import org.qortal.network.PeerAddress;
|
||||
@ -65,6 +66,47 @@ public class ChannelAcceptTask implements Task {
|
||||
return;
|
||||
}
|
||||
|
||||
// We allow up to a maximum of maxPeers connected peers, of which...
|
||||
// - maxDataPeers must be prearranged data connections (these are intentionally short-lived)
|
||||
// - the remainder can be any regular peers
|
||||
|
||||
// Firstly, determine the maximum limits
|
||||
int maxPeers = Settings.getInstance().getMaxPeers();
|
||||
int maxDataPeers = Settings.getInstance().getMaxDataPeers();
|
||||
int maxRegularPeers = maxPeers - maxDataPeers;
|
||||
|
||||
// Next, obtain the current state
|
||||
int connectedDataPeerCount = Network.getInstance().getImmutableConnectedDataPeers().size();
|
||||
int connectedRegularPeerCount = Network.getInstance().getImmutableConnectedNonDataPeers().size();
|
||||
|
||||
// Check if the incoming connection should be considered a data or regular peer
|
||||
boolean isDataPeer = ArbitraryDataFileManager.getInstance().isPeerRequestingData(address.getHost());
|
||||
|
||||
// Finally, decide if we have any capacity for this incoming peer
|
||||
boolean connectionLimitReached;
|
||||
if (isDataPeer) {
|
||||
connectionLimitReached = (connectedDataPeerCount >= maxDataPeers);
|
||||
}
|
||||
else {
|
||||
connectionLimitReached = (connectedRegularPeerCount >= maxRegularPeers);
|
||||
}
|
||||
|
||||
// Extra maxPeers check just to be safe
|
||||
if (Network.getInstance().getImmutableConnectedPeers().size() >= maxPeers) {
|
||||
connectionLimitReached = true;
|
||||
}
|
||||
|
||||
if (connectionLimitReached) {
|
||||
try {
|
||||
// We have enough peers
|
||||
LOGGER.debug("Connection discarded from peer {} because the server is full", address);
|
||||
socketChannel.close();
|
||||
} catch (IOException e) {
|
||||
// IGNORE
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
final Long now = NTP.getTime();
|
||||
Peer newPeer;
|
||||
|
||||
@ -78,6 +120,10 @@ public class ChannelAcceptTask implements Task {
|
||||
LOGGER.debug("Connection accepted from peer {}", address);
|
||||
|
||||
newPeer = new Peer(socketChannel);
|
||||
if (isDataPeer) {
|
||||
newPeer.setMaxConnectionAge(Settings.getInstance().getMaxDataPeerConnectionTime() * 1000L);
|
||||
}
|
||||
newPeer.setIsDataPeer(isDataPeer);
|
||||
network.addConnectedPeer(newPeer);
|
||||
|
||||
} catch (IOException e) {
|
||||
|
@ -190,7 +190,9 @@ public class Settings {
|
||||
/** Target number of outbound connections to peers we should make. */
|
||||
private int minOutboundPeers = 16;
|
||||
/** Maximum number of peer connections we allow. */
|
||||
private int maxPeers = 32;
|
||||
private int maxPeers = 36;
|
||||
/** Number of slots to reserve for short-lived QDN data transfers */
|
||||
private int maxDataPeers = 4;
|
||||
/** Maximum number of threads for network engine. */
|
||||
private int maxNetworkThreadPoolSize = 32;
|
||||
/** Maximum number of threads for network proof-of-work compute, used during handshaking. */
|
||||
@ -209,6 +211,8 @@ public class Settings {
|
||||
private int minPeerConnectionTime = 5 * 60; // seconds
|
||||
/** Maximum time (in seconds) that we should attempt to remain connected to a peer for */
|
||||
private int maxPeerConnectionTime = 60 * 60; // seconds
|
||||
/** Maximum time (in seconds) that a peer should remain connected when requesting QDN data */
|
||||
private int maxDataPeerConnectionTime = 2 * 60; // seconds
|
||||
|
||||
/** Whether to sync multiple blocks at once in normal operation */
|
||||
private boolean fastSyncEnabled = true;
|
||||
@ -650,6 +654,10 @@ public class Settings {
|
||||
return this.maxPeers;
|
||||
}
|
||||
|
||||
public int getMaxDataPeers() {
|
||||
return this.maxDataPeers;
|
||||
}
|
||||
|
||||
public int getMaxNetworkThreadPoolSize() {
|
||||
return this.maxNetworkThreadPoolSize;
|
||||
}
|
||||
@ -668,6 +676,10 @@ public class Settings {
|
||||
|
||||
public int getMaxPeerConnectionTime() { return this.maxPeerConnectionTime; }
|
||||
|
||||
public int getMaxDataPeerConnectionTime() {
|
||||
return this.maxDataPeerConnectionTime;
|
||||
}
|
||||
|
||||
public String getBlockchainConfig() {
|
||||
return this.blockchainConfig;
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user