diff --git a/src/main/java/org/qora/network/Network.java b/src/main/java/org/qora/network/Network.java index 35edd20c..3344f004 100644 --- a/src/main/java/org/qora/network/Network.java +++ b/src/main/java/org/qora/network/Network.java @@ -162,8 +162,8 @@ public class Network extends Thread { mergePeersLock = new ReentrantLock(); - // We'll use a cached thread pool, but with more aggressive 10 second timeout. - ExecutorService networkExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, + // We'll use a cached thread pool, max 10 threads, but with more aggressive 10 second timeout. + ExecutorService networkExecutor = new ThreadPoolExecutor(1, 10, 10L, TimeUnit.SECONDS, new SynchronousQueue()); networkEPC = new NetworkProcessor(networkExecutor); diff --git a/src/main/java/org/qora/utils/ExecuteProduceConsume.java b/src/main/java/org/qora/utils/ExecuteProduceConsume.java index 2d4320e9..5722752e 100644 --- a/src/main/java/org/qora/utils/ExecuteProduceConsume.java +++ b/src/main/java/org/qora/utils/ExecuteProduceConsume.java @@ -14,15 +14,22 @@ public abstract class ExecuteProduceConsume implements Runnable { private final Logger logger; private ExecutorService executor; - private int activeThreadCount = 0; - private int greatestActiveThreadCount = 0; - private int consumerCount = 0; - private boolean hasThreadPending = false; + // These are volatile to prevent thread-local caching of values + // but all are updated inside synchronized blocks + // so we don't need AtomicInteger/AtomicBoolean + + private volatile int activeThreadCount = 0; + private volatile int greatestActiveThreadCount = 0; + private volatile int consumerCount = 0; + private volatile int tasksProduced = 0; + private volatile int tasksConsumed = 0; + + private volatile boolean hasThreadPending = false; public ExecuteProduceConsume(ExecutorService executor) { - className = this.getClass().getSimpleName(); - logger = LogManager.getLogger(this.getClass()); + this.className = this.getClass().getSimpleName(); + this.logger = LogManager.getLogger(this.getClass()); this.executor = executor; } @@ -32,27 +39,39 @@ public abstract class ExecuteProduceConsume implements Runnable { } public void start() { - executor.execute(this); + this.executor.execute(this); } public void shutdown() { - executor.shutdownNow(); + this.executor.shutdownNow(); } public boolean shutdown(long timeout) throws InterruptedException { - executor.shutdownNow(); - return executor.awaitTermination(timeout, TimeUnit.MILLISECONDS); + this.executor.shutdownNow(); + return this.executor.awaitTermination(timeout, TimeUnit.MILLISECONDS); } public int getActiveThreadCount() { synchronized (this) { - return activeThreadCount; + return this.activeThreadCount; } } public int getGreatestActiveThreadCount() { synchronized (this) { - return greatestActiveThreadCount; + return this.greatestActiveThreadCount; + } + } + + public int getTasksProduced() { + synchronized (this) { + return this.tasksProduced; + } + } + + public int getTasksConsumed() { + synchronized (this) { + return this.tasksConsumed; } } @@ -72,19 +91,19 @@ public abstract class ExecuteProduceConsume implements Runnable { @Override public void run() { - Thread.currentThread().setName(className + "-" + Thread.currentThread().getId()); + Thread.currentThread().setName(this.className + "-" + Thread.currentThread().getId()); boolean wasThreadPending; synchronized (this) { - ++activeThreadCount; - if (activeThreadCount > greatestActiveThreadCount) - greatestActiveThreadCount = activeThreadCount; + ++this.activeThreadCount; + if (this.activeThreadCount > this.greatestActiveThreadCount) + this.greatestActiveThreadCount = this.activeThreadCount; - logger.trace(() -> String.format("[%d] started, hasThreadPending was: %b, activeThreadCount now: %d", - Thread.currentThread().getId(), hasThreadPending, activeThreadCount)); + this.logger.trace(() -> String.format("[%d] started, hasThreadPending was: %b, activeThreadCount now: %d", + Thread.currentThread().getId(), this.hasThreadPending, this.activeThreadCount)); // Defer clearing hasThreadPending to prevent unnecessary threads waiting to produce... - wasThreadPending = hasThreadPending; + wasThreadPending = this.hasThreadPending; } try { @@ -93,33 +112,34 @@ public abstract class ExecuteProduceConsume implements Runnable { while (true) { final Task task; - logger.trace(() -> String.format("[%d] waiting to produce...", Thread.currentThread().getId())); + this.logger.trace(() -> String.format("[%d] waiting to produce...", Thread.currentThread().getId())); synchronized (this) { if (wasThreadPending) { // Clear thread-pending flag now that we about to produce. - hasThreadPending = false; + this.hasThreadPending = false; wasThreadPending = false; } final boolean lambdaCanIdle = canBlock; - logger.trace(() -> String.format("[%d] producing, activeThreadCount: %d, consumerCount: %d, canBlock is %b...", - Thread.currentThread().getId(), activeThreadCount, consumerCount, lambdaCanIdle)); + this.logger.trace(() -> String.format("[%d] producing, activeThreadCount: %d, consumerCount: %d, canBlock is %b...", + Thread.currentThread().getId(), this.activeThreadCount, this.consumerCount, lambdaCanIdle)); final long now = System.currentTimeMillis(); task = produceTask(canBlock); final long delay = System.currentTimeMillis() - now; - logger.trace(() -> String.format("[%d] producing took %dms", Thread.currentThread().getId(), delay)); + this.logger.trace(() -> String.format("[%d] producing took %dms", Thread.currentThread().getId(), delay)); } if (task == null) synchronized (this) { - logger.trace(() -> String.format("[%d] no task, activeThreadCount: %d, consumerCount: %d", - Thread.currentThread().getId(), activeThreadCount, consumerCount)); + this.logger.trace(() -> String.format("[%d] no task, activeThreadCount: %d, consumerCount: %d", + Thread.currentThread().getId(), this.activeThreadCount, this.consumerCount)); - if (activeThreadCount > consumerCount + 1) { - --activeThreadCount; - logger.trace(() -> String.format("[%d] ending, activeThreadCount now: %d", Thread.currentThread().getId(), activeThreadCount)); + if (this.activeThreadCount > this.consumerCount + 1) { + --this.activeThreadCount; + this.logger.trace(() -> String.format("[%d] ending, activeThreadCount now: %d", + Thread.currentThread().getId(), this.activeThreadCount)); break; } @@ -132,30 +152,38 @@ public abstract class ExecuteProduceConsume implements Runnable { // We have a task synchronized (this) { - ++consumerCount; + ++this.tasksProduced; + ++this.consumerCount; - if (!hasThreadPending) { - logger.trace(() -> String.format("[%d] spawning another thread", Thread.currentThread().getId())); - hasThreadPending = true; - executor.execute(this); // Same object, different thread + if (!this.hasThreadPending) { + this.logger.trace(() -> String.format("[%d] spawning another thread", Thread.currentThread().getId())); + this.hasThreadPending = true; + + try { + this.executor.execute(this); // Same object, different thread + } catch (RejectedExecutionException e) { + this.hasThreadPending = false; + this.logger.trace(() -> String.format("[%d] failed to spawn another thread", Thread.currentThread().getId())); + } } } - logger.trace(() -> String.format("[%d] performing task...", Thread.currentThread().getId())); + this.logger.trace(() -> String.format("[%d] performing task...", Thread.currentThread().getId())); task.perform(); // This can block for a while - logger.trace(() -> String.format("[%d] finished task", Thread.currentThread().getId())); + this.logger.trace(() -> String.format("[%d] finished task", Thread.currentThread().getId())); synchronized (this) { - --consumerCount; + ++this.tasksConsumed; + --this.consumerCount; // Quicker, non-blocking produce next round canBlock = false; } } - } catch (InterruptedException | RejectedExecutionException e) { + } catch (InterruptedException e) { // We're in shutdown situation so exit } finally { - Thread.currentThread().setName(className + "-dormant"); + Thread.currentThread().setName(this.className + "-dormant"); } } diff --git a/src/test/java/org/qora/test/ThreadTests.java b/src/test/java/org/qora/test/EPCTests.java similarity index 62% rename from src/test/java/org/qora/test/ThreadTests.java rename to src/test/java/org/qora/test/EPCTests.java index a37a90f7..1ca8ae45 100644 --- a/src/test/java/org/qora/test/ThreadTests.java +++ b/src/test/java/org/qora/test/EPCTests.java @@ -4,11 +4,48 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.junit.Test; import org.qora.utils.ExecuteProduceConsume; -public class ThreadTests { +public class EPCTests { + + class RandomEPC extends ExecuteProduceConsume { + private final int TASK_PERCENT; + private final int PAUSE_PERCENT; + + public RandomEPC(ExecutorService executor, int taskPercent, int pausePercent) { + super(executor); + + this.TASK_PERCENT = taskPercent; + this.PAUSE_PERCENT = pausePercent; + } + + @Override + protected Task produceTask(boolean canIdle) throws InterruptedException { + if (Thread.interrupted()) + throw new InterruptedException(); + + Random random = new Random(); + + final int percent = random.nextInt(100); + + // Sometimes produce a task + if (percent < TASK_PERCENT) { + return () -> { + Thread.sleep(random.nextInt(500) + 100); + }; + } else { + // If we don't produce a task, then maybe simulate a pause until work arrives + if (canIdle && percent < PAUSE_PERCENT) + Thread.sleep(random.nextInt(100)); + + return null; + } + } + } private void testEPC(ExecuteProduceConsume testEPC) throws InterruptedException { testEPC.start(); @@ -16,7 +53,10 @@ public class ThreadTests { // Let it run for a minute for (int s = 1; s <= 60; ++s) { Thread.sleep(1000); - System.out.println(String.format("After %d second%s, active threads: %d, greatest thread count: %d", s, (s != 1 ? "s" : "") , testEPC.getActiveThreadCount(), testEPC.getGreatestActiveThreadCount())); + System.out.println(String.format("After %d second%s, active threads: %d, greatest thread count: %d, tasks produced: %d, tasks consumed: %d", + s, (s != 1 ? "s" : ""), + testEPC.getActiveThreadCount(), testEPC.getGreatestActiveThreadCount(), + testEPC.getTasksProduced(), testEPC.getTasksConsumed())); } final long before = System.currentTimeMillis(); @@ -25,6 +65,9 @@ public class ThreadTests { System.out.println(String.format("Shutdown took %d milliseconds", after - before)); System.out.println(String.format("Greatest thread count: %d", testEPC.getGreatestActiveThreadCount())); + + System.out.println(String.format("Tasks produced: %d", testEPC.getTasksProduced())); + System.out.println(String.format("Tasks consumed: %d", testEPC.getTasksConsumed())); } @Test @@ -32,32 +75,20 @@ public class ThreadTests { final int TASK_PERCENT = 25; // Produce a task this % of the time final int PAUSE_PERCENT = 80; // Pause for new work this % of the time - class RandomEPC extends ExecuteProduceConsume { - @Override - protected Task produceTask(boolean canIdle) throws InterruptedException { - Random random = new Random(); + final ExecutorService executor = Executors.newCachedThreadPool(); - final int percent = random.nextInt(100); + testEPC(new RandomEPC(executor, TASK_PERCENT, PAUSE_PERCENT)); + } - // Sometimes produce a task - if (percent < TASK_PERCENT) { - return new Task() { - @Override - public void perform() throws InterruptedException { - Thread.sleep(random.nextInt(500) + 100); - } - }; - } else { - // If we don't produce a task, then maybe simulate a pause until work arrives - if (canIdle && percent < PAUSE_PERCENT) - Thread.sleep(random.nextInt(100)); + @Test + public void testRandomFixedPoolEPC() throws InterruptedException { + final int TASK_PERCENT = 25; // Produce a task this % of the time + final int PAUSE_PERCENT = 80; // Pause for new work this % of the time + final int MAX_THREADS = 3; - return null; - } - } - } + final ExecutorService executor = Executors.newFixedThreadPool(MAX_THREADS); - testEPC(new RandomEPC()); + testEPC(new RandomEPC(executor, TASK_PERCENT, PAUSE_PERCENT)); } /**