diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileListManager.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileListManager.java index 604fae94..a0b4886b 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileListManager.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileListManager.java @@ -636,6 +636,9 @@ public class ArbitraryDataFileListManager { // We should only respond if we have at least one hash if (hashes.size() > 0) { + // Firstly we should keep track of the requesting peer, to allow for potential direct connections later + ArbitraryDataFileManager.getInstance().addRecentDataRequest(requestingPeer); + // We have all the chunks, so update requests map to reflect that we've sent it // There is no need to keep track of the request, as we can serve all the chunks if (allChunksExist) { diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java index 11e15414..2fc883dc 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileManager.java @@ -20,6 +20,7 @@ import org.qortal.utils.ArbitraryTransactionUtils; import org.qortal.utils.Base58; import org.qortal.utils.NTP; +import java.net.InetSocketAddress; import java.security.SecureRandom; import java.util.*; import java.util.concurrent.ExecutorService; @@ -54,6 +55,13 @@ public class ArbitraryDataFileManager extends Thread { */ private List directConnectionInfo = Collections.synchronizedList(new ArrayList<>()); + /** + * Map to keep track of peers requesting QDN data that we hold. + * Key = peer address string, value = time of last request. + * This allows for additional "burst" connections beyond existing limits. + */ + private Map recentDataRequests = Collections.synchronizedMap(new HashMap<>()); + public static int MAX_FILE_HASH_RESPONSES = 1000; @@ -108,6 +116,9 @@ public class ArbitraryDataFileManager extends Thread { final long directConnectionInfoMinimumTimestamp = now - ArbitraryDataManager.getInstance().ARBITRARY_DIRECT_CONNECTION_INFO_TIMEOUT; directConnectionInfo.removeIf(entry -> entry.getTimestamp() < directConnectionInfoMinimumTimestamp); + + final long recentDataRequestMinimumTimestamp = now - ArbitraryDataManager.getInstance().ARBITRARY_RECENT_DATA_REQUESTS_TIMEOUT; + recentDataRequests.entrySet().removeIf(entry -> entry.getValue() < recentDataRequestMinimumTimestamp); } @@ -490,6 +501,36 @@ public class ArbitraryDataFileManager extends Thread { } + // Peers requesting QDN data from us + + /** + * Add an address string of a peer that is trying to request data from us. + * @param peerAddress + */ + public void addRecentDataRequest(String peerAddress) { + if (peerAddress == null) { + return; + } + + Long now = NTP.getTime(); + if (now == null) { + return; + } + + // Make sure to remove the port, since it isn't guaranteed to match next time + InetSocketAddress address = Peer.parsePeerAddress(peerAddress); + this.recentDataRequests.put(address.getHostString(), now); + } + + public boolean isPeerRequestingData(String peerAddressWithoutPort) { + return this.recentDataRequests.containsValue(peerAddressWithoutPort); + } + + public boolean hasPendingDataRequest() { + return !this.recentDataRequests.isEmpty(); + } + + // Network handlers public void onNetworkGetArbitraryDataFileMessage(Peer peer, Message message) { diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java index 4b6d3a28..6b3f0160 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java @@ -47,6 +47,9 @@ public class ArbitraryDataManager extends Thread { /** Maximum time to hold direct peer connection information */ public static final long ARBITRARY_DIRECT_CONNECTION_INFO_TIMEOUT = 2 * 60 * 1000L; // ms + /** Maximum time to hold information about recent data requests that we can fulfil */ + public static final long ARBITRARY_RECENT_DATA_REQUESTS_TIMEOUT = 2 * 60 * 1000L; // ms + /** Maximum number of hops that an arbitrary signatures request is allowed to make */ private static int ARBITRARY_SIGNATURES_REQUEST_MAX_HOPS = 3; diff --git a/src/main/java/org/qortal/network/Network.java b/src/main/java/org/qortal/network/Network.java index a04509f1..9789e62a 100644 --- a/src/main/java/org/qortal/network/Network.java +++ b/src/main/java/org/qortal/network/Network.java @@ -8,6 +8,7 @@ import org.bouncycastle.crypto.params.Ed25519PublicKeyParameters; import org.qortal.block.BlockChain; import org.qortal.controller.Controller; import org.qortal.controller.arbitrary.ArbitraryDataFileListManager; +import org.qortal.controller.arbitrary.ArbitraryDataManager; import org.qortal.crypto.Crypto; import org.qortal.data.block.BlockData; import org.qortal.data.network.PeerData; @@ -259,6 +260,18 @@ public class Network { return this.immutableConnectedPeers; } + public List getImmutableConnectedDataPeers() { + return this.getImmutableConnectedPeers().stream() + .filter(p -> p.isDataPeer()) + .collect(Collectors.toList()); + } + + public List getImmutableConnectedNonDataPeers() { + return this.getImmutableConnectedPeers().stream() + .filter(p -> !p.isDataPeer()) + .collect(Collectors.toList()); + } + public void addConnectedPeer(Peer peer) { this.connectedPeers.add(peer); // thread safe thanks to synchronized list this.immutableConnectedPeers = List.copyOf(this.connectedPeers); // also thread safe thanks to synchronized collection's toArray() being fed to List.of(array) diff --git a/src/main/java/org/qortal/network/Peer.java b/src/main/java/org/qortal/network/Peer.java index dbb03fda..7e51dc36 100644 --- a/src/main/java/org/qortal/network/Peer.java +++ b/src/main/java/org/qortal/network/Peer.java @@ -64,6 +64,11 @@ public class Peer { */ private boolean isLocal; + /** + * True if connected for the purposes of transfering specific QDN data + */ + private boolean isDataPeer; + private final UUID peerConnectionId = UUID.randomUUID(); private final Object byteBufferLock = new Object(); private ByteBuffer byteBuffer; @@ -194,6 +199,14 @@ public class Peer { return this.isOutbound; } + public boolean isDataPeer() { + return isDataPeer; + } + + public void setIsDataPeer(boolean isDataPeer) { + this.isDataPeer = isDataPeer; + } + public Handshake getHandshakeStatus() { synchronized (this.handshakingLock) { return this.handshakeStatus; @@ -211,6 +224,11 @@ public class Peer { } private void generateRandomMaxConnectionAge() { + if (this.maxConnectionAge > 0L) { + // Already generated, so we don't want to overwrite the existing value + return; + } + // Retrieve the min and max connection time from the settings, and calculate the range final int minPeerConnectionTime = Settings.getInstance().getMinPeerConnectionTime(); final int maxPeerConnectionTime = Settings.getInstance().getMaxPeerConnectionTime(); @@ -893,6 +911,10 @@ public class Peer { return maxConnectionAge; } + public void setMaxConnectionAge(long maxConnectionAge) { + this.maxConnectionAge = maxConnectionAge; + } + public boolean hasReachedMaxConnectionAge() { return this.getConnectionAge() > this.getMaxConnectionAge(); } diff --git a/src/main/java/org/qortal/network/task/ChannelAcceptTask.java b/src/main/java/org/qortal/network/task/ChannelAcceptTask.java index 3e2a3033..13ba888c 100644 --- a/src/main/java/org/qortal/network/task/ChannelAcceptTask.java +++ b/src/main/java/org/qortal/network/task/ChannelAcceptTask.java @@ -2,6 +2,7 @@ package org.qortal.network.task; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.qortal.controller.arbitrary.ArbitraryDataFileManager; import org.qortal.network.Network; import org.qortal.network.Peer; import org.qortal.network.PeerAddress; @@ -65,6 +66,47 @@ public class ChannelAcceptTask implements Task { return; } + // We allow up to a maximum of maxPeers connected peers, of which... + // - maxDataPeers must be prearranged data connections (these are intentionally short-lived) + // - the remainder can be any regular peers + + // Firstly, determine the maximum limits + int maxPeers = Settings.getInstance().getMaxPeers(); + int maxDataPeers = Settings.getInstance().getMaxDataPeers(); + int maxRegularPeers = maxPeers - maxDataPeers; + + // Next, obtain the current state + int connectedDataPeerCount = Network.getInstance().getImmutableConnectedDataPeers().size(); + int connectedRegularPeerCount = Network.getInstance().getImmutableConnectedNonDataPeers().size(); + + // Check if the incoming connection should be considered a data or regular peer + boolean isDataPeer = ArbitraryDataFileManager.getInstance().isPeerRequestingData(address.getHost()); + + // Finally, decide if we have any capacity for this incoming peer + boolean connectionLimitReached; + if (isDataPeer) { + connectionLimitReached = (connectedDataPeerCount >= maxDataPeers); + } + else { + connectionLimitReached = (connectedRegularPeerCount >= maxRegularPeers); + } + + // Extra maxPeers check just to be safe + if (Network.getInstance().getImmutableConnectedPeers().size() >= maxPeers) { + connectionLimitReached = true; + } + + if (connectionLimitReached) { + try { + // We have enough peers + LOGGER.debug("Connection discarded from peer {} because the server is full", address); + socketChannel.close(); + } catch (IOException e) { + // IGNORE + } + return; + } + final Long now = NTP.getTime(); Peer newPeer; @@ -78,6 +120,9 @@ public class ChannelAcceptTask implements Task { LOGGER.debug("Connection accepted from peer {}", address); newPeer = new Peer(socketChannel); + if (isDataPeer) { + newPeer.setMaxConnectionAge(Settings.getInstance().getMaxDataPeerConnectionTime() * 1000L); + } network.addConnectedPeer(newPeer); } catch (IOException e) {