diff --git a/src/main/java/org/qortal/utils/ExecuteProduceConsume.java b/src/main/java/org/qortal/utils/ExecuteProduceConsume.java index 58e1af05..a103f6f7 100644 --- a/src/main/java/org/qortal/utils/ExecuteProduceConsume.java +++ b/src/main/java/org/qortal/utils/ExecuteProduceConsume.java @@ -9,7 +9,14 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +/** + * Class ExecuteProduceConsume + * + * @ThreadSafe + */ public abstract class ExecuteProduceConsume implements Runnable { @XmlAccessorType(XmlAccessType.FIELD) @@ -30,25 +37,25 @@ public abstract class ExecuteProduceConsume implements Runnable { protected ExecutorService executor; - // These are volatile to prevent thread-local caching of values - // but all are updated inside synchronized blocks - // so we don't need AtomicInteger/AtomicBoolean + // These are atomic to make this class thread-safe - private volatile int activeThreadCount = 0; - private volatile int greatestActiveThreadCount = 0; - private volatile int consumerCount = 0; - private volatile int tasksProduced = 0; - private volatile int tasksConsumed = 0; - private volatile int spawnFailures = 0; + private AtomicInteger activeThreadCount = new AtomicInteger(0); + private AtomicInteger greatestActiveThreadCount = new AtomicInteger(0); + private AtomicInteger consumerCount = new AtomicInteger(0); + private AtomicInteger tasksProduced = new AtomicInteger(0); + private AtomicInteger tasksConsumed = new AtomicInteger(0); + private AtomicInteger spawnFailures = new AtomicInteger(0); /** Whether a new thread has already been spawned and is waiting to start. Used to prevent spawning multiple new threads. */ - private volatile boolean hasThreadPending = false; + private AtomicBoolean hasThreadPending = new AtomicBoolean(false); public ExecuteProduceConsume(ExecutorService executor) { this.className = this.getClass().getSimpleName(); this.logger = LogManager.getLogger(this.getClass()); this.executor = executor; + + this.logger.info("Created Thread-Safe ExecuteProduceConsume"); } public ExecuteProduceConsume() { @@ -71,14 +78,12 @@ public abstract class ExecuteProduceConsume implements Runnable { public StatsSnapshot getStatsSnapshot() { StatsSnapshot snapshot = new StatsSnapshot(); - synchronized (this) { - snapshot.activeThreadCount = this.activeThreadCount; - snapshot.greatestActiveThreadCount = this.greatestActiveThreadCount; - snapshot.consumerCount = this.consumerCount; - snapshot.tasksProduced = this.tasksProduced; - snapshot.tasksConsumed = this.tasksConsumed; - snapshot.spawnFailures = this.spawnFailures; - } + snapshot.activeThreadCount = this.activeThreadCount.get(); + snapshot.greatestActiveThreadCount = this.greatestActiveThreadCount.get(); + snapshot.consumerCount = this.consumerCount.get(); + snapshot.tasksProduced = this.tasksProduced.get(); + snapshot.tasksConsumed = this.tasksConsumed.get(); + snapshot.spawnFailures = this.spawnFailures.get(); return snapshot; } @@ -93,6 +98,8 @@ public abstract class ExecuteProduceConsume implements Runnable { * @param canBlock * @return task to be performed, or null if no task pending. * @throws InterruptedException + * + * @ThreadSafe */ protected abstract Task produceTask(boolean canBlock) throws InterruptedException; @@ -105,117 +112,65 @@ public abstract class ExecuteProduceConsume implements Runnable { public void run() { Thread.currentThread().setName(this.className + "-" + Thread.currentThread().getId()); - boolean wasThreadPending; - synchronized (this) { - ++this.activeThreadCount; - if (this.activeThreadCount > this.greatestActiveThreadCount) - this.greatestActiveThreadCount = this.activeThreadCount; - - this.logger.trace(() -> String.format("[%d] started, hasThreadPending was: %b, activeThreadCount now: %d", - Thread.currentThread().getId(), this.hasThreadPending, this.activeThreadCount)); + this.activeThreadCount.incrementAndGet(); + if (this.activeThreadCount.get() > this.greatestActiveThreadCount.get()) + this.greatestActiveThreadCount.set( this.activeThreadCount.get() ); // Defer clearing hasThreadPending to prevent unnecessary threads waiting to produce... - wasThreadPending = this.hasThreadPending; - } + boolean wasThreadPending = this.hasThreadPending.get(); try { while (!Thread.currentThread().isInterrupted()) { Task task = null; - String taskType; - this.logger.trace(() -> String.format("[%d] waiting to produce...", Thread.currentThread().getId())); - - synchronized (this) { - if (wasThreadPending) { - // Clear thread-pending flag now that we about to produce. - this.hasThreadPending = false; - wasThreadPending = false; - } - - // If we're the only non-consuming thread - producer can afford to block this round - boolean canBlock = this.activeThreadCount - this.consumerCount <= 1; - - this.logger.trace(() -> String.format("[%d] producing... [activeThreadCount: %d, consumerCount: %d, canBlock: %b]", - Thread.currentThread().getId(), this.activeThreadCount, this.consumerCount, canBlock)); - - final long beforeProduce = this.logger.isDebugEnabled() ? System.currentTimeMillis() : 0; - - try { - task = produceTask(canBlock); - } catch (InterruptedException e) { - // We're in shutdown situation so exit - Thread.currentThread().interrupt(); - } catch (Exception e) { - this.logger.warn(() -> String.format("[%d] exception while trying to produce task", Thread.currentThread().getId()), e); - } - - if (this.logger.isDebugEnabled()) { - final long productionPeriod = System.currentTimeMillis() - beforeProduce; - taskType = task == null ? "no task" : task.getName(); - - this.logger.debug(() -> String.format("[%d] produced [%s] in %dms [canBlock: %b]", - Thread.currentThread().getId(), - taskType, - productionPeriod, - canBlock - )); - } else { - taskType = null; - } + if (wasThreadPending) { + // Clear thread-pending flag now that we about to produce. + this.hasThreadPending.set( false ); + wasThreadPending = false; } - if (task == null) - synchronized (this) { - this.logger.trace(() -> String.format("[%d] no task, activeThreadCount: %d, consumerCount: %d", - Thread.currentThread().getId(), this.activeThreadCount, this.consumerCount)); + // If we're the only non-consuming thread - producer can afford to block this round + boolean canBlock = this.activeThreadCount.get() - this.consumerCount.get() <= 1; - // If we have an excess of non-consuming threads then we can exit - if (this.activeThreadCount - this.consumerCount > 1) { - --this.activeThreadCount; + try { + task = produceTask(canBlock); + } catch (InterruptedException e) { + // We're in shutdown situation so exit + Thread.currentThread().interrupt(); + } catch (Exception e) { + this.logger.warn(() -> String.format("[%d] exception while trying to produce task", Thread.currentThread().getId()), e); + } - this.logger.trace(() -> String.format("[%d] ending, activeThreadCount now: %d", - Thread.currentThread().getId(), this.activeThreadCount)); + if (task == null) { + // If we have an excess of non-consuming threads then we can exit + if (this.activeThreadCount.get() - this.consumerCount.get() > 1) { + this.activeThreadCount.decrementAndGet(); - return; - } - - continue; + return; } + continue; + } // We have a task - synchronized (this) { - ++this.tasksProduced; - ++this.consumerCount; + this.tasksProduced.incrementAndGet(); + this.consumerCount.incrementAndGet(); - this.logger.trace(() -> String.format("[%d] hasThreadPending: %b, activeThreadCount: %d, consumerCount now: %d", - Thread.currentThread().getId(), this.hasThreadPending, this.activeThreadCount, this.consumerCount)); + // If we have no thread pending and no excess of threads then we should spawn a fresh thread + if (!this.hasThreadPending.get() && this.activeThreadCount.get() == this.consumerCount.get()) { - // If we have no thread pending and no excess of threads then we should spawn a fresh thread - if (!this.hasThreadPending && this.activeThreadCount == this.consumerCount) { - this.logger.trace(() -> String.format("[%d] spawning another thread", Thread.currentThread().getId())); + this.hasThreadPending.set( true ); - this.hasThreadPending = true; + try { + this.executor.execute(this); // Same object, different thread + } catch (RejectedExecutionException e) { + this.spawnFailures.decrementAndGet(); + this.hasThreadPending.set( false ); - try { - this.executor.execute(this); // Same object, different thread - } catch (RejectedExecutionException e) { - ++this.spawnFailures; - this.hasThreadPending = false; - - this.logger.trace(() -> String.format("[%d] failed to spawn another thread", Thread.currentThread().getId())); - - this.onSpawnFailure(); - } - } else { - this.logger.trace(() -> String.format("[%d] NOT spawning another thread", Thread.currentThread().getId())); + this.onSpawnFailure(); } } - this.logger.trace(() -> String.format("[%d] consuming [%s] task...", Thread.currentThread().getId(), taskType)); - - final long beforePerform = this.logger.isDebugEnabled() ? System.currentTimeMillis() : 0; - try { task.perform(); // This can block for a while } catch (InterruptedException e) { @@ -225,23 +180,11 @@ public abstract class ExecuteProduceConsume implements Runnable { this.logger.warn(() -> String.format("[%d] exception while consuming task", Thread.currentThread().getId()), e); } - if (this.logger.isDebugEnabled()) { - final long productionPeriod = System.currentTimeMillis() - beforePerform; - - this.logger.debug(() -> String.format("[%d] consumed [%s] task in %dms", Thread.currentThread().getId(), taskType, productionPeriod)); - } - - synchronized (this) { - ++this.tasksConsumed; - --this.consumerCount; - - this.logger.trace(() -> String.format("[%d] consumerCount now: %d", - Thread.currentThread().getId(), this.consumerCount)); - } + this.tasksConsumed.incrementAndGet(); + this.consumerCount.decrementAndGet(); } } finally { Thread.currentThread().setName(this.className); } } - -} +} \ No newline at end of file