|
|
@ -107,6 +107,7 @@ public abstract class ExecuteProduceConsume implements Runnable { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
try { |
|
|
|
try { |
|
|
|
|
|
|
|
// It's possible this might need to become a class instance private volatile
|
|
|
|
boolean canBlock = false; |
|
|
|
boolean canBlock = false; |
|
|
|
|
|
|
|
|
|
|
|
while (true) { |
|
|
|
while (true) { |
|
|
@ -155,7 +156,11 @@ public abstract class ExecuteProduceConsume implements Runnable { |
|
|
|
++this.tasksProduced; |
|
|
|
++this.tasksProduced; |
|
|
|
++this.consumerCount; |
|
|
|
++this.consumerCount; |
|
|
|
|
|
|
|
|
|
|
|
if (!this.hasThreadPending) { |
|
|
|
this.logger.trace(() -> String.format("[%d] hasThreadPending: %b, activeThreadCount: %d, consumerCount now: %d", |
|
|
|
|
|
|
|
Thread.currentThread().getId(), this.hasThreadPending, this.activeThreadCount, this.consumerCount)); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// If we have no thread pending and no excess of threads then we should spawn a fresh thread
|
|
|
|
|
|
|
|
if (!this.hasThreadPending && this.activeThreadCount <= this.consumerCount + 1) { |
|
|
|
this.logger.trace(() -> String.format("[%d] spawning another thread", Thread.currentThread().getId())); |
|
|
|
this.logger.trace(() -> String.format("[%d] spawning another thread", Thread.currentThread().getId())); |
|
|
|
this.hasThreadPending = true; |
|
|
|
this.hasThreadPending = true; |
|
|
|
|
|
|
|
|
|
|
@ -165,6 +170,8 @@ public abstract class ExecuteProduceConsume implements Runnable { |
|
|
|
this.hasThreadPending = false; |
|
|
|
this.hasThreadPending = false; |
|
|
|
this.logger.trace(() -> String.format("[%d] failed to spawn another thread", Thread.currentThread().getId())); |
|
|
|
this.logger.trace(() -> String.format("[%d] failed to spawn another thread", Thread.currentThread().getId())); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
this.logger.trace(() -> String.format("[%d] NOT spawning another thread", Thread.currentThread().getId())); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -176,6 +183,9 @@ public abstract class ExecuteProduceConsume implements Runnable { |
|
|
|
++this.tasksConsumed; |
|
|
|
++this.tasksConsumed; |
|
|
|
--this.consumerCount; |
|
|
|
--this.consumerCount; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
this.logger.trace(() -> String.format("[%d] consumerCount now: %d", |
|
|
|
|
|
|
|
Thread.currentThread().getId(), this.consumerCount)); |
|
|
|
|
|
|
|
|
|
|
|
// Quicker, non-blocking produce next round
|
|
|
|
// Quicker, non-blocking produce next round
|
|
|
|
canBlock = false; |
|
|
|
canBlock = false; |
|
|
|
} |
|
|
|
} |
|
|
|