Added defensive try-catch around network engine calls (actually ExecuteProduceConsume)

This commit is contained in:
catbref 2021-01-17 15:34:50 +00:00
parent 5b2b2bab46
commit e5bb3e2f0a

View File

@ -125,8 +125,8 @@ public abstract class ExecuteProduceConsume implements Runnable {
// It's possible this might need to become a class instance private volatile // It's possible this might need to become a class instance private volatile
boolean canBlock = false; boolean canBlock = false;
while (true) { while (!Thread.currentThread().isInterrupted()) {
final Task task; Task task = null;
this.logger.trace(() -> String.format("[%d] waiting to produce...", Thread.currentThread().getId())); this.logger.trace(() -> String.format("[%d] waiting to produce...", Thread.currentThread().getId()));
@ -142,7 +142,16 @@ public abstract class ExecuteProduceConsume implements Runnable {
Thread.currentThread().getId(), this.activeThreadCount, this.consumerCount, lambdaCanIdle)); Thread.currentThread().getId(), this.activeThreadCount, this.consumerCount, lambdaCanIdle));
final long beforeProduce = isLoggerTraceEnabled ? System.currentTimeMillis() : 0; final long beforeProduce = isLoggerTraceEnabled ? System.currentTimeMillis() : 0;
try {
task = produceTask(canBlock); task = produceTask(canBlock);
} catch (InterruptedException e) {
// We're in shutdown situation so exit
Thread.currentThread().interrupt();
} catch (Exception e) {
this.logger.warn(() -> String.format("[%d] exception while trying to produce task", Thread.currentThread().getId()), e);
}
this.logger.trace(() -> String.format("[%d] producing took %dms", Thread.currentThread().getId(), System.currentTimeMillis() - beforeProduce)); this.logger.trace(() -> String.format("[%d] producing took %dms", Thread.currentThread().getId(), System.currentTimeMillis() - beforeProduce));
} }
@ -155,7 +164,8 @@ public abstract class ExecuteProduceConsume implements Runnable {
--this.activeThreadCount; --this.activeThreadCount;
this.logger.trace(() -> String.format("[%d] ending, activeThreadCount now: %d", this.logger.trace(() -> String.format("[%d] ending, activeThreadCount now: %d",
Thread.currentThread().getId(), this.activeThreadCount)); Thread.currentThread().getId(), this.activeThreadCount));
break;
return;
} }
// We're the last surviving thread - producer can afford to block next round // We're the last surviving thread - producer can afford to block next round
@ -192,7 +202,16 @@ public abstract class ExecuteProduceConsume implements Runnable {
} }
this.logger.trace(() -> String.format("[%d] performing task...", Thread.currentThread().getId())); this.logger.trace(() -> String.format("[%d] performing task...", Thread.currentThread().getId()));
try {
task.perform(); // This can block for a while task.perform(); // This can block for a while
} catch (InterruptedException e) {
// We're in shutdown situation so exit
Thread.currentThread().interrupt();
} catch (Exception e) {
this.logger.warn(() -> String.format("[%d] exception while performing task", Thread.currentThread().getId()), e);
}
this.logger.trace(() -> String.format("[%d] finished task", Thread.currentThread().getId())); this.logger.trace(() -> String.format("[%d] finished task", Thread.currentThread().getId()));
synchronized (this) { synchronized (this) {
@ -206,8 +225,6 @@ public abstract class ExecuteProduceConsume implements Runnable {
canBlock = false; canBlock = false;
} }
} }
} catch (InterruptedException e) {
// We're in shutdown situation so exit
} finally { } finally {
if (this.isLoggerTraceEnabled) if (this.isLoggerTraceEnabled)
Thread.currentThread().setName(this.className); Thread.currentThread().setName(this.className);