3
0
mirror of https://github.com/Qortal/qortal.git synced 2025-02-14 11:15:49 +00:00

Collate network PoW computes into a fixed-sized pool, with dead peer detection.

Also added Named/DaemonThreadFactory classes.

Network EPC now uses NamedThreadFactory for easier debugging.

Added settings field "networkPoWComputePoolSize", default 2, which
seems to work with both low-power ARM boards and high-power desktops.
This commit is contained in:
catbref 2020-07-03 09:31:46 +01:00
parent d8c5e557d8
commit e74a249388
5 changed files with 77 additions and 6 deletions

View File

@ -1,6 +1,8 @@
package org.qortal.network;
import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -13,7 +15,9 @@ import org.qortal.network.message.ChallengeMessage;
import org.qortal.network.message.HelloMessage;
import org.qortal.network.message.Message;
import org.qortal.network.message.Message.MessageType;
import org.qortal.settings.Settings;
import org.qortal.network.message.ResponseMessage;
import org.qortal.utils.DaemonThreadFactory;
import org.qortal.utils.NTP;
import com.google.common.primitives.Bytes;
@ -27,6 +31,7 @@ public enum Handshake {
@Override
public void action(Peer peer) {
/* Never called */
}
},
HELLO(MessageType.HELLO) {
@ -183,7 +188,12 @@ public enum Handshake {
final byte[] data = Crypto.digest(Bytes.concat(sharedSecret, peersChallenge));
// We do this in a new thread as it can take a while...
Thread responseThread = new Thread(() -> {
responseExecutor.execute(() -> {
// Are we still connected?
if (peer.isStopping())
// No point computing for dead peer
return;
Integer nonce = MemoryPoW.compute2(data, POW_BUFFER_SIZE, POW_DIFFICULTY);
Message responseMessage = new ResponseMessage(nonce, data);
@ -197,9 +207,6 @@ public enum Handshake {
Network.getInstance().onHandshakeCompleted(peer);
}
});
responseThread.setDaemon(true);
responseThread.start();
}
},
// Interim holding state while we compute RESPONSE to send to inbound peer
@ -237,6 +244,7 @@ public enum Handshake {
private static final int POW_BUFFER_SIZE = 8 * 1024 * 1024; // bytes
private static final int POW_DIFFICULTY = 8; // leading zero bits
private static final ExecutorService responseExecutor = Executors.newFixedThreadPool(Settings.getInstance().getNetworkPoWComputePoolSize(), new DaemonThreadFactory("Network-PoW"));
private static final byte[] ZERO_CHALLENGE = new byte[ChallengeMessage.CHALLENGE_LENGTH];

View File

@ -55,6 +55,7 @@ import org.qortal.utils.ExecuteProduceConsume;
// import org.qortal.utils.ExecutorDumper;
import org.qortal.utils.ExecuteProduceConsume.StatsSnapshot;
import org.qortal.utils.NTP;
import org.qortal.utils.NamedThreadFactory;
// For managing peers
public class Network {
@ -151,7 +152,8 @@ public class Network {
ExecutorService networkExecutor = new ThreadPoolExecutor(1,
Settings.getInstance().getMaxNetworkThreadPoolSize(),
NETWORK_EPC_KEEPALIVE, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
new SynchronousQueue<Runnable>(),
new NamedThreadFactory("Network-EPC"));
networkEPC = new NetworkProcessor(networkExecutor);
}
@ -355,7 +357,7 @@ public class Network {
private Task maybeProducePeerPingTask(Long now) {
// Ask connected peers whether they need a ping
for (Peer peer : getConnectedPeers()) {
for (Peer peer : getHandshakedPeers()) {
Task peerTask = peer.getPingTask(now);
if (peerTask != null)
return peerTask;

View File

@ -92,6 +92,8 @@ public class Settings {
private int maxPeers = 32;
/** Maximum number of threads for network engine. */
private int maxNetworkThreadPoolSize = 20;
/** Maximum number of threads for network proof-of-work compute, used during handshaking. */
private int networkPoWComputePoolSize = 2;
// Which blockchains this node is running
private String blockchainConfig = null; // use default from resources
@ -355,6 +357,10 @@ public class Settings {
return this.maxNetworkThreadPoolSize;
}
public int getNetworkPoWComputePoolSize() {
return this.networkPoWComputePoolSize;
}
public String getBlockchainConfig() {
return this.blockchainConfig;
}

View File

@ -0,0 +1,31 @@
package org.qortal.utils;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
public class DaemonThreadFactory implements ThreadFactory {
private final String name;
private final AtomicInteger threadNumber = new AtomicInteger(1);
public DaemonThreadFactory(String name) {
this.name = name;
}
public DaemonThreadFactory() {
this(null);
}
@Override
public Thread newThread(Runnable runnable) {
Thread thread = Executors.defaultThreadFactory().newThread(runnable);
thread.setDaemon(true);
if (this.name != null)
thread.setName(this.name + "-" + this.threadNumber.getAndIncrement());
return thread;
}
}

View File

@ -0,0 +1,24 @@
package org.qortal.utils;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
public class NamedThreadFactory implements ThreadFactory {
private final String name;
private final AtomicInteger threadNumber = new AtomicInteger(1);
public NamedThreadFactory(String name) {
this.name = name;
}
@Override
public Thread newThread(Runnable runnable) {
Thread thread = Executors.defaultThreadFactory().newThread(runnable);
thread.setName(this.name + "-" + this.threadNumber.getAndIncrement());
return thread;
}
}