@ -20,6 +20,7 @@ public abstract class ExecuteProduceConsume implements Runnable {
public int consumerCount = 0 ;
public int tasksProduced = 0 ;
public int tasksConsumed = 0 ;
public int spawnFailures = 0 ;
public StatsSnapshot ( ) {
}
@ -27,6 +28,7 @@ public abstract class ExecuteProduceConsume implements Runnable {
private final String className ;
private final Logger logger ;
private final boolean isLoggerTraceEnabled ;
private ExecutorService executor ;
@ -39,12 +41,14 @@ public abstract class ExecuteProduceConsume implements Runnable {
private volatile int consumerCount = 0 ;
private volatile int tasksProduced = 0 ;
private volatile int tasksConsumed = 0 ;
private volatile int spawnFailures = 0 ;
private volatile boolean hasThreadPending = false ;
public ExecuteProduceConsume ( ExecutorService executor ) {
this . className = this . getClass ( ) . getSimpleName ( ) ;
this . logger = LogManager . getLogger ( this . getClass ( ) ) ;
this . isLoggerTraceEnabled = this . logger . isTraceEnabled ( ) ;
this . executor = executor ;
}
@ -75,6 +79,7 @@ public abstract class ExecuteProduceConsume implements Runnable {
snapshot . consumerCount = this . consumerCount ;
snapshot . tasksProduced = this . tasksProduced ;
snapshot . tasksConsumed = this . tasksConsumed ;
snapshot . spawnFailures = this . spawnFailures ;
}
return snapshot ;
@ -96,6 +101,7 @@ public abstract class ExecuteProduceConsume implements Runnable {
@Override
public void run ( ) {
if ( this . isLoggerTraceEnabled )
Thread . currentThread ( ) . setName ( this . className + "-" + Thread . currentThread ( ) . getId ( ) ) ;
boolean wasThreadPending ;
@ -131,10 +137,9 @@ public abstract class ExecuteProduceConsume implements Runnable {
this . logger . trace ( ( ) - > String . format ( "[%d] producing, activeThreadCount: %d, consumerCount: %d, canBlock is %b..." ,
Thread . currentThread ( ) . getId ( ) , this . activeThreadCount , this . consumerCount , lambdaCanIdle ) ) ;
final long now = System . currentTimeMillis ( ) ;
final long beforeProduce = isLoggerTraceEnabled ? System . currentTimeMillis ( ) : 0 ;
task = produceTask ( canBlock ) ;
final long delay = System . currentTimeMillis ( ) - now ;
this . logger . trace ( ( ) - > String . format ( "[%d] producing took %dms" , Thread . currentThread ( ) . getId ( ) , delay ) ) ;
this . logger . trace ( ( ) - > String . format ( "[%d] producing took %dms" , Thread . currentThread ( ) . getId ( ) , System . currentTimeMillis ( ) - beforeProduce ) ) ;
}
if ( task = = null )
@ -172,6 +177,7 @@ public abstract class ExecuteProduceConsume implements Runnable {
try {
this . executor . execute ( this ) ; // Same object, different thread
} catch ( RejectedExecutionException e ) {
+ + this . spawnFailures ;
this . hasThreadPending = false ;
this . logger . trace ( ( ) - > String . format ( "[%d] failed to spawn another thread" , Thread . currentThread ( ) . getId ( ) ) ) ;
}
@ -198,7 +204,8 @@ public abstract class ExecuteProduceConsume implements Runnable {
} catch ( InterruptedException e ) {
// We're in shutdown situation so exit
} finally {
Thread . currentThread ( ) . setName ( this . className + "-dormant" ) ;
if ( this . isLoggerTraceEnabled )
Thread . currentThread ( ) . setName ( this . className ) ;
}
}