Merge kenny's changes to ExecuteProduceConsume making it threadsafe and changing the methods utilized.

This commit is contained in:
crowetic 2024-11-04 13:11:40 -08:00
commit fd5ba48611

View File

@ -9,7 +9,14 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Class ExecuteProduceConsume
*
* @ThreadSafe
*/
public abstract class ExecuteProduceConsume implements Runnable { public abstract class ExecuteProduceConsume implements Runnable {
@XmlAccessorType(XmlAccessType.FIELD) @XmlAccessorType(XmlAccessType.FIELD)
@ -30,25 +37,25 @@ public abstract class ExecuteProduceConsume implements Runnable {
protected ExecutorService executor; protected ExecutorService executor;
// These are volatile to prevent thread-local caching of values // These are atomic to make this class thread-safe
// but all are updated inside synchronized blocks
// so we don't need AtomicInteger/AtomicBoolean
private volatile int activeThreadCount = 0; private AtomicInteger activeThreadCount = new AtomicInteger(0);
private volatile int greatestActiveThreadCount = 0; private AtomicInteger greatestActiveThreadCount = new AtomicInteger(0);
private volatile int consumerCount = 0; private AtomicInteger consumerCount = new AtomicInteger(0);
private volatile int tasksProduced = 0; private AtomicInteger tasksProduced = new AtomicInteger(0);
private volatile int tasksConsumed = 0; private AtomicInteger tasksConsumed = new AtomicInteger(0);
private volatile int spawnFailures = 0; private AtomicInteger spawnFailures = new AtomicInteger(0);
/** Whether a new thread has already been spawned and is waiting to start. Used to prevent spawning multiple new threads. */ /** Whether a new thread has already been spawned and is waiting to start. Used to prevent spawning multiple new threads. */
private volatile boolean hasThreadPending = false; private AtomicBoolean hasThreadPending = new AtomicBoolean(false);
public ExecuteProduceConsume(ExecutorService executor) { public ExecuteProduceConsume(ExecutorService executor) {
this.className = this.getClass().getSimpleName(); this.className = this.getClass().getSimpleName();
this.logger = LogManager.getLogger(this.getClass()); this.logger = LogManager.getLogger(this.getClass());
this.executor = executor; this.executor = executor;
this.logger.info("Created Thread-Safe ExecuteProduceConsume");
} }
public ExecuteProduceConsume() { public ExecuteProduceConsume() {
@ -71,14 +78,12 @@ public abstract class ExecuteProduceConsume implements Runnable {
public StatsSnapshot getStatsSnapshot() { public StatsSnapshot getStatsSnapshot() {
StatsSnapshot snapshot = new StatsSnapshot(); StatsSnapshot snapshot = new StatsSnapshot();
synchronized (this) { snapshot.activeThreadCount = this.activeThreadCount.get();
snapshot.activeThreadCount = this.activeThreadCount; snapshot.greatestActiveThreadCount = this.greatestActiveThreadCount.get();
snapshot.greatestActiveThreadCount = this.greatestActiveThreadCount; snapshot.consumerCount = this.consumerCount.get();
snapshot.consumerCount = this.consumerCount; snapshot.tasksProduced = this.tasksProduced.get();
snapshot.tasksProduced = this.tasksProduced; snapshot.tasksConsumed = this.tasksConsumed.get();
snapshot.tasksConsumed = this.tasksConsumed; snapshot.spawnFailures = this.spawnFailures.get();
snapshot.spawnFailures = this.spawnFailures;
}
return snapshot; return snapshot;
} }
@ -93,6 +98,8 @@ public abstract class ExecuteProduceConsume implements Runnable {
* @param canBlock * @param canBlock
* @return task to be performed, or null if no task pending. * @return task to be performed, or null if no task pending.
* @throws InterruptedException * @throws InterruptedException
*
* @ThreadSafe
*/ */
protected abstract Task produceTask(boolean canBlock) throws InterruptedException; protected abstract Task produceTask(boolean canBlock) throws InterruptedException;
@ -105,40 +112,25 @@ public abstract class ExecuteProduceConsume implements Runnable {
public void run() { public void run() {
Thread.currentThread().setName(this.className + "-" + Thread.currentThread().getId()); Thread.currentThread().setName(this.className + "-" + Thread.currentThread().getId());
boolean wasThreadPending; this.activeThreadCount.incrementAndGet();
synchronized (this) { if (this.activeThreadCount.get() > this.greatestActiveThreadCount.get())
++this.activeThreadCount; this.greatestActiveThreadCount.set( this.activeThreadCount.get() );
if (this.activeThreadCount > this.greatestActiveThreadCount)
this.greatestActiveThreadCount = this.activeThreadCount;
this.logger.trace(() -> String.format("[%d] started, hasThreadPending was: %b, activeThreadCount now: %d",
Thread.currentThread().getId(), this.hasThreadPending, this.activeThreadCount));
// Defer clearing hasThreadPending to prevent unnecessary threads waiting to produce... // Defer clearing hasThreadPending to prevent unnecessary threads waiting to produce...
wasThreadPending = this.hasThreadPending; boolean wasThreadPending = this.hasThreadPending.get();
}
try { try {
while (!Thread.currentThread().isInterrupted()) { while (!Thread.currentThread().isInterrupted()) {
Task task = null; Task task = null;
String taskType;
this.logger.trace(() -> String.format("[%d] waiting to produce...", Thread.currentThread().getId()));
synchronized (this) {
if (wasThreadPending) { if (wasThreadPending) {
// Clear thread-pending flag now that we about to produce. // Clear thread-pending flag now that we about to produce.
this.hasThreadPending = false; this.hasThreadPending.set( false );
wasThreadPending = false; wasThreadPending = false;
} }
// If we're the only non-consuming thread - producer can afford to block this round // If we're the only non-consuming thread - producer can afford to block this round
boolean canBlock = this.activeThreadCount - this.consumerCount <= 1; boolean canBlock = this.activeThreadCount.get() - this.consumerCount.get() <= 1;
this.logger.trace(() -> String.format("[%d] producing... [activeThreadCount: %d, consumerCount: %d, canBlock: %b]",
Thread.currentThread().getId(), this.activeThreadCount, this.consumerCount, canBlock));
final long beforeProduce = this.logger.isDebugEnabled() ? System.currentTimeMillis() : 0;
try { try {
task = produceTask(canBlock); task = produceTask(canBlock);
@ -149,72 +141,35 @@ public abstract class ExecuteProduceConsume implements Runnable {
this.logger.warn(() -> String.format("[%d] exception while trying to produce task", Thread.currentThread().getId()), e); this.logger.warn(() -> String.format("[%d] exception while trying to produce task", Thread.currentThread().getId()), e);
} }
if (this.logger.isDebugEnabled()) { if (task == null) {
final long productionPeriod = System.currentTimeMillis() - beforeProduce;
taskType = task == null ? "no task" : task.getName();
this.logger.debug(() -> String.format("[%d] produced [%s] in %dms [canBlock: %b]",
Thread.currentThread().getId(),
taskType,
productionPeriod,
canBlock
));
} else {
taskType = null;
}
}
if (task == null)
synchronized (this) {
this.logger.trace(() -> String.format("[%d] no task, activeThreadCount: %d, consumerCount: %d",
Thread.currentThread().getId(), this.activeThreadCount, this.consumerCount));
// If we have an excess of non-consuming threads then we can exit // If we have an excess of non-consuming threads then we can exit
if (this.activeThreadCount - this.consumerCount > 1) { if (this.activeThreadCount.get() - this.consumerCount.get() > 1) {
--this.activeThreadCount; this.activeThreadCount.decrementAndGet();
this.logger.trace(() -> String.format("[%d] ending, activeThreadCount now: %d",
Thread.currentThread().getId(), this.activeThreadCount));
return; return;
} }
continue; continue;
} }
// We have a task // We have a task
synchronized (this) { this.tasksProduced.incrementAndGet();
++this.tasksProduced; this.consumerCount.incrementAndGet();
++this.consumerCount;
this.logger.trace(() -> String.format("[%d] hasThreadPending: %b, activeThreadCount: %d, consumerCount now: %d",
Thread.currentThread().getId(), this.hasThreadPending, this.activeThreadCount, this.consumerCount));
// If we have no thread pending and no excess of threads then we should spawn a fresh thread // If we have no thread pending and no excess of threads then we should spawn a fresh thread
if (!this.hasThreadPending && this.activeThreadCount == this.consumerCount) { if (!this.hasThreadPending.get() && this.activeThreadCount.get() == this.consumerCount.get()) {
this.logger.trace(() -> String.format("[%d] spawning another thread", Thread.currentThread().getId()));
this.hasThreadPending = true; this.hasThreadPending.set( true );
try { try {
this.executor.execute(this); // Same object, different thread this.executor.execute(this); // Same object, different thread
} catch (RejectedExecutionException e) { } catch (RejectedExecutionException e) {
++this.spawnFailures; this.spawnFailures.decrementAndGet();
this.hasThreadPending = false; this.hasThreadPending.set( false );
this.logger.trace(() -> String.format("[%d] failed to spawn another thread", Thread.currentThread().getId()));
this.onSpawnFailure(); this.onSpawnFailure();
} }
} else {
this.logger.trace(() -> String.format("[%d] NOT spawning another thread", Thread.currentThread().getId()));
} }
}
this.logger.trace(() -> String.format("[%d] consuming [%s] task...", Thread.currentThread().getId(), taskType));
final long beforePerform = this.logger.isDebugEnabled() ? System.currentTimeMillis() : 0;
try { try {
task.perform(); // This can block for a while task.perform(); // This can block for a while
@ -225,23 +180,11 @@ public abstract class ExecuteProduceConsume implements Runnable {
this.logger.warn(() -> String.format("[%d] exception while consuming task", Thread.currentThread().getId()), e); this.logger.warn(() -> String.format("[%d] exception while consuming task", Thread.currentThread().getId()), e);
} }
if (this.logger.isDebugEnabled()) { this.tasksConsumed.incrementAndGet();
final long productionPeriod = System.currentTimeMillis() - beforePerform; this.consumerCount.decrementAndGet();
this.logger.debug(() -> String.format("[%d] consumed [%s] task in %dms", Thread.currentThread().getId(), taskType, productionPeriod));
}
synchronized (this) {
++this.tasksConsumed;
--this.consumerCount;
this.logger.trace(() -> String.format("[%d] consumerCount now: %d",
Thread.currentThread().getId(), this.consumerCount));
}
} }
} finally { } finally {
Thread.currentThread().setName(this.className); Thread.currentThread().setName(this.className);
} }
} }
} }