3
0
mirror of https://github.com/Qortal/qortal.git synced 2025-02-11 17:55:50 +00:00

Made networking engine thread-safe and removed redundant locking.

This commit is contained in:
kennycud 2024-11-04 12:59:27 -08:00
parent ab78f22b5e
commit 5e315de213

View File

@ -9,7 +9,14 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Class ExecuteProduceConsume
*
* @ThreadSafe
*/
public abstract class ExecuteProduceConsume implements Runnable {
@XmlAccessorType(XmlAccessType.FIELD)
@ -30,25 +37,25 @@ public abstract class ExecuteProduceConsume implements Runnable {
protected ExecutorService executor;
// These are volatile to prevent thread-local caching of values
// but all are updated inside synchronized blocks
// so we don't need AtomicInteger/AtomicBoolean
// These are atomic to make this class thread-safe
private volatile int activeThreadCount = 0;
private volatile int greatestActiveThreadCount = 0;
private volatile int consumerCount = 0;
private volatile int tasksProduced = 0;
private volatile int tasksConsumed = 0;
private volatile int spawnFailures = 0;
private AtomicInteger activeThreadCount = new AtomicInteger(0);
private AtomicInteger greatestActiveThreadCount = new AtomicInteger(0);
private AtomicInteger consumerCount = new AtomicInteger(0);
private AtomicInteger tasksProduced = new AtomicInteger(0);
private AtomicInteger tasksConsumed = new AtomicInteger(0);
private AtomicInteger spawnFailures = new AtomicInteger(0);
/** Whether a new thread has already been spawned and is waiting to start. Used to prevent spawning multiple new threads. */
private volatile boolean hasThreadPending = false;
private AtomicBoolean hasThreadPending = new AtomicBoolean(false);
public ExecuteProduceConsume(ExecutorService executor) {
this.className = this.getClass().getSimpleName();
this.logger = LogManager.getLogger(this.getClass());
this.executor = executor;
this.logger.info("Created Thread-Safe ExecuteProduceConsume");
}
public ExecuteProduceConsume() {
@ -71,14 +78,12 @@ public abstract class ExecuteProduceConsume implements Runnable {
public StatsSnapshot getStatsSnapshot() {
StatsSnapshot snapshot = new StatsSnapshot();
synchronized (this) {
snapshot.activeThreadCount = this.activeThreadCount;
snapshot.greatestActiveThreadCount = this.greatestActiveThreadCount;
snapshot.consumerCount = this.consumerCount;
snapshot.tasksProduced = this.tasksProduced;
snapshot.tasksConsumed = this.tasksConsumed;
snapshot.spawnFailures = this.spawnFailures;
}
snapshot.activeThreadCount = this.activeThreadCount.get();
snapshot.greatestActiveThreadCount = this.greatestActiveThreadCount.get();
snapshot.consumerCount = this.consumerCount.get();
snapshot.tasksProduced = this.tasksProduced.get();
snapshot.tasksConsumed = this.tasksConsumed.get();
snapshot.spawnFailures = this.spawnFailures.get();
return snapshot;
}
@ -93,6 +98,8 @@ public abstract class ExecuteProduceConsume implements Runnable {
* @param canBlock
* @return task to be performed, or null if no task pending.
* @throws InterruptedException
*
* @ThreadSafe
*/
protected abstract Task produceTask(boolean canBlock) throws InterruptedException;
@ -105,117 +112,65 @@ public abstract class ExecuteProduceConsume implements Runnable {
public void run() {
Thread.currentThread().setName(this.className + "-" + Thread.currentThread().getId());
boolean wasThreadPending;
synchronized (this) {
++this.activeThreadCount;
if (this.activeThreadCount > this.greatestActiveThreadCount)
this.greatestActiveThreadCount = this.activeThreadCount;
this.logger.trace(() -> String.format("[%d] started, hasThreadPending was: %b, activeThreadCount now: %d",
Thread.currentThread().getId(), this.hasThreadPending, this.activeThreadCount));
this.activeThreadCount.incrementAndGet();
if (this.activeThreadCount.get() > this.greatestActiveThreadCount.get())
this.greatestActiveThreadCount.set( this.activeThreadCount.get() );
// Defer clearing hasThreadPending to prevent unnecessary threads waiting to produce...
wasThreadPending = this.hasThreadPending;
}
boolean wasThreadPending = this.hasThreadPending.get();
try {
while (!Thread.currentThread().isInterrupted()) {
Task task = null;
String taskType;
this.logger.trace(() -> String.format("[%d] waiting to produce...", Thread.currentThread().getId()));
synchronized (this) {
if (wasThreadPending) {
// Clear thread-pending flag now that we about to produce.
this.hasThreadPending = false;
wasThreadPending = false;
}
// If we're the only non-consuming thread - producer can afford to block this round
boolean canBlock = this.activeThreadCount - this.consumerCount <= 1;
this.logger.trace(() -> String.format("[%d] producing... [activeThreadCount: %d, consumerCount: %d, canBlock: %b]",
Thread.currentThread().getId(), this.activeThreadCount, this.consumerCount, canBlock));
final long beforeProduce = this.logger.isDebugEnabled() ? System.currentTimeMillis() : 0;
try {
task = produceTask(canBlock);
} catch (InterruptedException e) {
// We're in shutdown situation so exit
Thread.currentThread().interrupt();
} catch (Exception e) {
this.logger.warn(() -> String.format("[%d] exception while trying to produce task", Thread.currentThread().getId()), e);
}
if (this.logger.isDebugEnabled()) {
final long productionPeriod = System.currentTimeMillis() - beforeProduce;
taskType = task == null ? "no task" : task.getName();
this.logger.debug(() -> String.format("[%d] produced [%s] in %dms [canBlock: %b]",
Thread.currentThread().getId(),
taskType,
productionPeriod,
canBlock
));
} else {
taskType = null;
}
if (wasThreadPending) {
// Clear thread-pending flag now that we about to produce.
this.hasThreadPending.set( false );
wasThreadPending = false;
}
if (task == null)
synchronized (this) {
this.logger.trace(() -> String.format("[%d] no task, activeThreadCount: %d, consumerCount: %d",
Thread.currentThread().getId(), this.activeThreadCount, this.consumerCount));
// If we're the only non-consuming thread - producer can afford to block this round
boolean canBlock = this.activeThreadCount.get() - this.consumerCount.get() <= 1;
// If we have an excess of non-consuming threads then we can exit
if (this.activeThreadCount - this.consumerCount > 1) {
--this.activeThreadCount;
try {
task = produceTask(canBlock);
} catch (InterruptedException e) {
// We're in shutdown situation so exit
Thread.currentThread().interrupt();
} catch (Exception e) {
this.logger.warn(() -> String.format("[%d] exception while trying to produce task", Thread.currentThread().getId()), e);
}
this.logger.trace(() -> String.format("[%d] ending, activeThreadCount now: %d",
Thread.currentThread().getId(), this.activeThreadCount));
if (task == null) {
// If we have an excess of non-consuming threads then we can exit
if (this.activeThreadCount.get() - this.consumerCount.get() > 1) {
this.activeThreadCount.decrementAndGet();
return;
}
continue;
return;
}
continue;
}
// We have a task
synchronized (this) {
++this.tasksProduced;
++this.consumerCount;
this.tasksProduced.incrementAndGet();
this.consumerCount.incrementAndGet();
this.logger.trace(() -> String.format("[%d] hasThreadPending: %b, activeThreadCount: %d, consumerCount now: %d",
Thread.currentThread().getId(), this.hasThreadPending, this.activeThreadCount, this.consumerCount));
// If we have no thread pending and no excess of threads then we should spawn a fresh thread
if (!this.hasThreadPending.get() && this.activeThreadCount.get() == this.consumerCount.get()) {
// If we have no thread pending and no excess of threads then we should spawn a fresh thread
if (!this.hasThreadPending && this.activeThreadCount == this.consumerCount) {
this.logger.trace(() -> String.format("[%d] spawning another thread", Thread.currentThread().getId()));
this.hasThreadPending.set( true );
this.hasThreadPending = true;
try {
this.executor.execute(this); // Same object, different thread
} catch (RejectedExecutionException e) {
this.spawnFailures.decrementAndGet();
this.hasThreadPending.set( false );
try {
this.executor.execute(this); // Same object, different thread
} catch (RejectedExecutionException e) {
++this.spawnFailures;
this.hasThreadPending = false;
this.logger.trace(() -> String.format("[%d] failed to spawn another thread", Thread.currentThread().getId()));
this.onSpawnFailure();
}
} else {
this.logger.trace(() -> String.format("[%d] NOT spawning another thread", Thread.currentThread().getId()));
this.onSpawnFailure();
}
}
this.logger.trace(() -> String.format("[%d] consuming [%s] task...", Thread.currentThread().getId(), taskType));
final long beforePerform = this.logger.isDebugEnabled() ? System.currentTimeMillis() : 0;
try {
task.perform(); // This can block for a while
} catch (InterruptedException e) {
@ -225,23 +180,11 @@ public abstract class ExecuteProduceConsume implements Runnable {
this.logger.warn(() -> String.format("[%d] exception while consuming task", Thread.currentThread().getId()), e);
}
if (this.logger.isDebugEnabled()) {
final long productionPeriod = System.currentTimeMillis() - beforePerform;
this.logger.debug(() -> String.format("[%d] consumed [%s] task in %dms", Thread.currentThread().getId(), taskType, productionPeriod));
}
synchronized (this) {
++this.tasksConsumed;
--this.consumerCount;
this.logger.trace(() -> String.format("[%d] consumerCount now: %d",
Thread.currentThread().getId(), this.consumerCount));
}
this.tasksConsumed.incrementAndGet();
this.consumerCount.decrementAndGet();
}
} finally {
Thread.currentThread().setName(this.className);
}
}
}
}