diff --git a/src/main/java/org/hsqldb/jdbc/HSQLDBPool.java b/src/main/java/org/hsqldb/jdbc/HSQLDBPool.java new file mode 100644 index 00000000..ce130eee --- /dev/null +++ b/src/main/java/org/hsqldb/jdbc/HSQLDBPool.java @@ -0,0 +1,46 @@ +package org.hsqldb.jdbc; + +import java.sql.Connection; +import java.sql.SQLException; + +import org.hsqldb.jdbc.JDBCPool; +import org.hsqldb.jdbc.pool.JDBCPooledConnection; + +public class HSQLDBPool extends JDBCPool { + + public HSQLDBPool(int poolSize) { + super(poolSize); + } + + /** + * Tries to retrieve a new connection using the properties that have already been + * set. + * + * @return a connection to the data source, or null if no spare connections in pool + * @exception SQLException if a database access error occurs + */ + public Connection tryConnection() throws SQLException { + for (int i = 0; i < states.length(); i++) { + if (states.compareAndSet(i, RefState.available, RefState.allocated)) { + return connections[i].getConnection(); + } + + if (states.compareAndSet(i, RefState.empty, RefState.allocated)) { + try { + JDBCPooledConnection connection = (JDBCPooledConnection) source.getPooledConnection(); + + connection.addConnectionEventListener(this); + connection.addStatementEventListener(this); + connections[i] = connection; + + return connections[i].getConnection(); + } catch (SQLException e) { + states.set(i, RefState.empty); + } + } + } + + return null; + } + +} diff --git a/src/main/java/org/qora/api/resource/AdminResource.java b/src/main/java/org/qora/api/resource/AdminResource.java index 5445c0c2..4350afec 100644 --- a/src/main/java/org/qora/api/resource/AdminResource.java +++ b/src/main/java/org/qora/api/resource/AdminResource.java @@ -47,7 +47,6 @@ import org.qora.api.model.ActivitySummary; import org.qora.api.model.NodeInfo; import org.qora.block.BlockChain; import org.qora.controller.Controller; -import org.qora.controller.Synchronizer; import org.qora.controller.Synchronizer.SynchronizationResult; import org.qora.repository.DataException; import org.qora.repository.Repository; @@ -455,7 +454,7 @@ public class AdminResource { SynchronizationResult syncResult; try { do { - syncResult = Synchronizer.getInstance().synchronize(targetPeer, true); + syncResult = Controller.getInstance().actuallySynchronize(targetPeer, true); } while (syncResult == SynchronizationResult.OK); } finally { blockchainLock.unlock(); diff --git a/src/main/java/org/qora/block/BlockChain.java b/src/main/java/org/qora/block/BlockChain.java index bf6c6b95..db45fa6b 100644 --- a/src/main/java/org/qora/block/BlockChain.java +++ b/src/main/java/org/qora/block/BlockChain.java @@ -416,6 +416,9 @@ public class BlockChain { repository.saveChanges(); } + BlockData lastBlockData = repository.getBlockRepository().getLastBlock(); + Controller.getInstance().setChainTip(lastBlockData); + return true; } } finally { diff --git a/src/main/java/org/qora/controller/Controller.java b/src/main/java/org/qora/controller/Controller.java index 2ee592a6..b63fed58 100644 --- a/src/main/java/org/qora/controller/Controller.java +++ b/src/main/java/org/qora/controller/Controller.java @@ -15,6 +15,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Random; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Predicate; @@ -102,6 +103,8 @@ public class Controller extends Thread { private final String buildVersion; private final long buildTimestamp; // seconds + private AtomicReference chainTip = new AtomicReference<>(); + private long repositoryBackupTimestamp = startTime + REPOSITORY_BACKUP_PERIOD; // ms private long ntpCheckTimestamp = startTime; // ms private long deleteExpiredTimestamp = startTime + DELETE_EXPIRED_INTERVAL; // ms @@ -183,22 +186,20 @@ public class Controller extends Thread { /** Returns current blockchain height, or 0 if there's a repository issue */ public int getChainHeight() { - try (final Repository repository = RepositoryManager.getRepository()) { - return repository.getBlockRepository().getBlockchainHeight(); - } catch (DataException e) { - LOGGER.error("Repository issue when fetching blockchain height", e); + BlockData blockData = this.chainTip.get(); + if (blockData == null) return 0; - } + + return blockData.getHeight(); } /** Returns highest block, or null if there's a repository issue */ public BlockData getChainTip() { - try (final Repository repository = RepositoryManager.getRepository()) { - return repository.getBlockRepository().getLastBlock(); - } catch (DataException e) { - LOGGER.error("Repository issue when fetching blockchain tip", e); - return null; - } + return this.chainTip.get(); + } + + public void setChainTip(BlockData blockData) { + this.chainTip.set(blockData); } public ReentrantLock getBlockchainLock() { @@ -238,7 +239,14 @@ public class Controller extends Thread { LOGGER.info("Validating blockchain"); try { BlockChain.validate(); - LOGGER.info(String.format("Our chain height at start-up: %d", getInstance().getChainHeight())); + + // Set initial chain height/tip + try (final Repository repository = RepositoryManager.getRepository()) { + BlockData blockData = repository.getBlockRepository().getLastBlock(); + + Controller.getInstance().setChainTip(blockData); + LOGGER.info(String.format("Our chain height at start-up: %d", blockData.getHeight())); + } } catch (DataException e) { LOGGER.error("Couldn't validate blockchain", e); System.exit(2); @@ -403,67 +411,84 @@ public class Controller extends Thread { int index = new SecureRandom().nextInt(peers.size()); Peer peer = peers.get(index); - noSyncOurBlockSignature = null; - noSyncPeerBlockSignature = null; - - SynchronizationResult syncResult = Synchronizer.getInstance().synchronize(peer, false); - switch (syncResult) { - case GENESIS_ONLY: - case NO_COMMON_BLOCK: - case TOO_FAR_BEHIND: - case TOO_DIVERGENT: - case INVALID_DATA: - // These are more serious results that warrant a cool-off - LOGGER.info(String.format("Failed to synchronize with peer %s (%s) - cooling off", peer, syncResult.name())); - - // Don't use this peer again for a while - PeerData peerData = peer.getPeerData(); - peerData.setLastMisbehaved(NTP.getTime()); - - // Only save to repository if outbound peer - if (peer.isOutbound()) - try (final Repository repository = RepositoryManager.getRepository()) { - repository.getNetworkRepository().save(peerData); - repository.saveChanges(); - } catch (DataException e) { - LOGGER.warn("Repository issue while updating peer synchronization info", e); - } - break; - - case INFERIOR_CHAIN: - noSyncOurBlockSignature = latestBlockData.getSignature(); - noSyncPeerBlockSignature = peer.getLastBlockSignature(); - // These are minor failure results so fine to try again - LOGGER.debug(() -> String.format("Refused to synchronize with peer %s (%s)", peer, syncResult.name())); - break; - - case NO_REPLY: - case NO_BLOCKCHAIN_LOCK: - case REPOSITORY_ISSUE: - // These are minor failure results so fine to try again - LOGGER.debug(() -> String.format("Failed to synchronize with peer %s (%s)", peer, syncResult.name())); - break; - - case OK: - requestSysTrayUpdate = true; - // fall-through... - case NOTHING_TO_DO: - noSyncOurBlockSignature = latestBlockData.getSignature(); - noSyncPeerBlockSignature = peer.getLastBlockSignature(); - LOGGER.debug(() -> String.format("Synchronized with peer %s (%s)", peer, syncResult.name())); - break; - } - - // Broadcast our new chain tip (if changed) - BlockData newLatestBlockData = getChainTip(); - if (!Arrays.equals(newLatestBlockData.getSignature(), latestBlockData.getSignature())) - Network.getInstance().broadcast(recipientPeer -> Network.getInstance().buildHeightMessage(recipientPeer, newLatestBlockData)); + actuallySynchronize(peer, false); } } + public SynchronizationResult actuallySynchronize(Peer peer, boolean force) throws InterruptedException { + BlockData latestBlockData = getChainTip(); + + noSyncOurBlockSignature = null; + noSyncPeerBlockSignature = null; + + SynchronizationResult syncResult = Synchronizer.getInstance().synchronize(peer, force); + switch (syncResult) { + case GENESIS_ONLY: + case NO_COMMON_BLOCK: + case TOO_FAR_BEHIND: + case TOO_DIVERGENT: + case INVALID_DATA: + // These are more serious results that warrant a cool-off + LOGGER.info(String.format("Failed to synchronize with peer %s (%s) - cooling off", peer, syncResult.name())); + + // Don't use this peer again for a while + PeerData peerData = peer.getPeerData(); + peerData.setLastMisbehaved(NTP.getTime()); + + // Only save to repository if outbound peer + if (peer.isOutbound()) + try (final Repository repository = RepositoryManager.getRepository()) { + repository.getNetworkRepository().save(peerData); + repository.saveChanges(); + } catch (DataException e) { + LOGGER.warn("Repository issue while updating peer synchronization info", e); + } + break; + + case INFERIOR_CHAIN: + noSyncOurBlockSignature = latestBlockData.getSignature(); + noSyncPeerBlockSignature = peer.getLastBlockSignature(); + // These are minor failure results so fine to try again + LOGGER.debug(() -> String.format("Refused to synchronize with peer %s (%s)", peer, syncResult.name())); + break; + + case NO_REPLY: + case NO_BLOCKCHAIN_LOCK: + case REPOSITORY_ISSUE: + // These are minor failure results so fine to try again + LOGGER.debug(() -> String.format("Failed to synchronize with peer %s (%s)", peer, syncResult.name())); + break; + + case OK: + requestSysTrayUpdate = true; + // fall-through... + case NOTHING_TO_DO: + noSyncOurBlockSignature = latestBlockData.getSignature(); + noSyncPeerBlockSignature = peer.getLastBlockSignature(); + LOGGER.debug(() -> String.format("Synchronized with peer %s (%s)", peer, syncResult.name())); + break; + } + + // Broadcast our new chain tip (if changed) + BlockData newLatestBlockData; + + try (final Repository repository = RepositoryManager.getRepository()) { + newLatestBlockData = repository.getBlockRepository().getLastBlock(); + this.setChainTip(newLatestBlockData); + } catch (DataException e) { + LOGGER.warn(String.format("Repository issue when trying to fetch post-synchronization chain tip: %s", e.getMessage())); + return syncResult; + } + + if (!Arrays.equals(newLatestBlockData.getSignature(), latestBlockData.getSignature())) + Network.getInstance().broadcast(recipientPeer -> Network.getInstance().buildHeightMessage(recipientPeer, newLatestBlockData)); + + return syncResult; + } + private void updateSysTray() { if (NTP.getTime() == null) { - SysTray.getInstance().setToolTipText(Translator.INSTANCE.translate("SysTray", "SYNCHRONIZING CLOCK")); + SysTray.getInstance().setToolTipText(Translator.INSTANCE.translate("SysTray", "SYNCHRONIZING_CLOCK")); return; } @@ -480,11 +505,19 @@ public class Controller extends Thread { } public void deleteExpiredTransactions() { - try (final Repository repository = RepositoryManager.getRepository()) { + final Long now = NTP.getTime(); + if (now == null) + return; + + // This isn't critical so don't block for repository instance. + try (final Repository repository = RepositoryManager.tryRepository()) { + if (repository == null) + return; + List transactions = repository.getTransactionRepository().getUnconfirmedTransactions(); for (TransactionData transactionData : transactions) - if (transactionData.getTimestamp() >= Transaction.getDeadline(transactionData)) { + if (now >= Transaction.getDeadline(transactionData)) { LOGGER.info(String.format("Deleting expired, unconfirmed transaction %s", Base58.encode(transactionData.getSignature()))); repository.getTransactionRepository().delete(transactionData); } @@ -574,7 +607,15 @@ public class Controller extends Thread { public void onGeneratedBlock() { // Broadcast our new height info - BlockData latestBlockData = getChainTip(); + BlockData latestBlockData; + + try (final Repository repository = RepositoryManager.getRepository()) { + latestBlockData = repository.getBlockRepository().getLastBlock(); + this.setChainTip(latestBlockData); + } catch (DataException e) { + LOGGER.warn(String.format("Repository issue when trying to fetch post-generation chain tip: %s", e.getMessage())); + return; + } Network network = Network.getInstance(); network.broadcast(peer -> network.buildHeightMessage(peer, latestBlockData)); diff --git a/src/main/java/org/qora/network/Network.java b/src/main/java/org/qora/network/Network.java index c7b0b694..d653f284 100644 --- a/src/main/java/org/qora/network/Network.java +++ b/src/main/java/org/qora/network/Network.java @@ -22,6 +22,8 @@ import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -160,8 +162,13 @@ public class Network extends Thread { mergePeersLock = new ReentrantLock(); + // We'll use a cached thread pool, but with more aggressive 10 second timeout. + ExecutorService networkExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, + 10L, TimeUnit.SECONDS, + new SynchronousQueue()); + networkEPC = new NetworkProcessor(networkExecutor); + // Start up first networking thread - networkEPC = new NetworkProcessor(); networkEPC.start(); } @@ -272,10 +279,16 @@ public class Network extends Thread { // Main thread class NetworkProcessor extends ExecuteProduceConsume { + + public NetworkProcessor(ExecutorService executor) { + super(executor); + } + @Override protected Task produceTask(boolean canBlock) throws InterruptedException { Task task; + // Only this method can block to reduce CPU spin task = maybeProduceChannelTask(canBlock); if (task != null) return task; @@ -465,14 +478,14 @@ public class Network extends Thread { try { if (now == null) { - LOGGER.trace(String.format("Connection discarded from peer %s due to lack of NTP sync", socketChannel.getRemoteAddress())); + LOGGER.debug(String.format("Connection discarded from peer %s due to lack of NTP sync", socketChannel.getRemoteAddress())); return; } synchronized (this.connectedPeers) { if (connectedPeers.size() >= maxPeers) { // We have enough peers - LOGGER.trace(String.format("Connection discarded from peer %s", socketChannel.getRemoteAddress())); + LOGGER.debug(String.format("Connection discarded from peer %s", socketChannel.getRemoteAddress())); return; } @@ -522,7 +535,11 @@ public class Network extends Thread { peer.disconnect(String.format("handshake timeout at %s", peer.getHandshakeStatus().name())); // Prune 'old' peers from repository... - try (final Repository repository = RepositoryManager.getRepository()) { + // Pruning peers isn't critical so no need to block for a repository instance. + try (final Repository repository = RepositoryManager.tryRepository()) { + if (repository == null) + return; + // Fetch all known peers List peers = repository.getNetworkRepository().getAllPeers(); @@ -564,7 +581,11 @@ public class Network extends Thread { private Peer getConnectablePeer() throws InterruptedException { final long now = NTP.getTime(); - try (final Repository repository = RepositoryManager.getRepository()) { + // We can't block here so use tryRepository(). We don't NEED to connect a new peer. + try (final Repository repository = RepositoryManager.tryRepository()) { + if (repository == null) + return null; + // Find an address to connect to List peers = repository.getNetworkRepository().getAllPeers(); @@ -626,7 +647,7 @@ public class Network extends Thread { return newPeer; } catch (DataException e) { - LOGGER.warn(String.format("Repository issue while finding a connectable peer: %s", e.getMessage())); + LOGGER.error("Repository issue while finding a connectable peer", e); return null; } } @@ -687,7 +708,7 @@ public class Network extends Thread { repository.getNetworkRepository().delete(peer.getPeerData().getAddress()); repository.saveChanges(); } catch (DataException e) { - LOGGER.warn(String.format("Repository issue while trying to delete inbound peer %s", peer)); + LOGGER.error(String.format("Repository issue while trying to delete inbound peer %s", peer), e); } } @@ -855,7 +876,7 @@ public class Network extends Thread { repository.getNetworkRepository().save(peer.getPeerData()); repository.saveChanges(); } catch (DataException e) { - LOGGER.warn(String.format("Repository issue while trying to update outbound peer %s", peer)); + LOGGER.error(String.format("Repository issue while trying to update outbound peer %s", peer), e); } // Start regular pings @@ -1053,7 +1074,11 @@ public class Network extends Thread { return; try { - try (final Repository repository = RepositoryManager.getRepository()) { + // Merging peers isn't critical so don't block for a repository instance. + try (final Repository repository = RepositoryManager.tryRepository()) { + if (repository == null) + return; + List knownPeers = repository.getNetworkRepository().getAllPeers(); // Filter out duplicates @@ -1139,18 +1164,18 @@ public class Network extends Thread { // Stop processing threads try { if (!this.networkEPC.shutdown(5000)) - LOGGER.debug("Network threads failed to terminate"); + LOGGER.warn("Network threads failed to terminate"); } catch (InterruptedException e) { - LOGGER.debug("Interrupted while waiting for networking threads to terminate"); + LOGGER.warn("Interrupted while waiting for networking threads to terminate"); } // Stop broadcasts this.broadcastExecutor.shutdownNow(); try { if (!this.broadcastExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS)) - LOGGER.debug("Broadcast threads failed to terminate"); + LOGGER.warn("Broadcast threads failed to terminate"); } catch (InterruptedException e) { - LOGGER.debug("Interrupted while waiting for broadcast threads failed to terminate"); + LOGGER.warn("Interrupted while waiting for broadcast threads failed to terminate"); } // Close all peer connections diff --git a/src/main/java/org/qora/network/Peer.java b/src/main/java/org/qora/network/Peer.java index 8ddce741..dec55735 100644 --- a/src/main/java/org/qora/network/Peer.java +++ b/src/main/java/org/qora/network/Peer.java @@ -51,7 +51,7 @@ public class Peer { *

* Just under every 30s is usually ideal to keep NAT mappings refreshed. */ - private static final int PING_INTERVAL = 8000; // ms + private static final int PING_INTERVAL = 20_000; // ms private volatile boolean isStopping = false; diff --git a/src/main/java/org/qora/repository/RepositoryFactory.java b/src/main/java/org/qora/repository/RepositoryFactory.java index 2ac7165c..1bf374e0 100644 --- a/src/main/java/org/qora/repository/RepositoryFactory.java +++ b/src/main/java/org/qora/repository/RepositoryFactory.java @@ -4,6 +4,8 @@ public interface RepositoryFactory { public Repository getRepository() throws DataException; + public Repository tryRepository() throws DataException; + public void close() throws DataException; } diff --git a/src/main/java/org/qora/repository/RepositoryManager.java b/src/main/java/org/qora/repository/RepositoryManager.java index e8c3aa5b..de4f4d43 100644 --- a/src/main/java/org/qora/repository/RepositoryManager.java +++ b/src/main/java/org/qora/repository/RepositoryManager.java @@ -15,6 +15,13 @@ public abstract class RepositoryManager { return repositoryFactory.getRepository(); } + public static Repository tryRepository() throws DataException { + if (repositoryFactory == null) + throw new DataException("No repository available"); + + return repositoryFactory.tryRepository(); + } + public static void closeRepositoryFactory() throws DataException { repositoryFactory.close(); repositoryFactory = null; diff --git a/src/main/java/org/qora/repository/hsqldb/HSQLDBRepositoryFactory.java b/src/main/java/org/qora/repository/hsqldb/HSQLDBRepositoryFactory.java index 6845b8c9..00040812 100644 --- a/src/main/java/org/qora/repository/hsqldb/HSQLDBRepositoryFactory.java +++ b/src/main/java/org/qora/repository/hsqldb/HSQLDBRepositoryFactory.java @@ -5,17 +5,25 @@ import java.sql.DriverManager; import java.sql.SQLException; import java.util.Properties; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.hsqldb.HsqlException; import org.hsqldb.error.ErrorCode; -import org.hsqldb.jdbc.JDBCPool; +import org.hsqldb.jdbc.HSQLDBPool; import org.qora.repository.DataException; import org.qora.repository.Repository; import org.qora.repository.RepositoryFactory; public class HSQLDBRepositoryFactory implements RepositoryFactory { + private static final Logger LOGGER = LogManager.getLogger(HSQLDBRepositoryFactory.class); + private static final int POOL_SIZE = 100; + + /** Log getConnection() calls that take longer than this. (ms) */ + private static final long SLOW_CONNECTION_THRESHOLD = 1000L; + private String connectionUrl; - private JDBCPool connectionPool; + private HSQLDBPool connectionPool; public HSQLDBRepositoryFactory(String connectionUrl) throws DataException { // one-time initialization goes in here @@ -36,7 +44,7 @@ public class HSQLDBRepositoryFactory implements RepositoryFactory { HSQLDBRepository.attemptRecovery(connectionUrl); } - this.connectionPool = new JDBCPool(); + this.connectionPool = new HSQLDBPool(POOL_SIZE); this.connectionPool.setUrl(this.connectionUrl); Properties properties = new Properties(); @@ -60,14 +68,41 @@ public class HSQLDBRepositoryFactory implements RepositoryFactory { } } - private Connection getConnection() throws SQLException { - Connection connection = this.connectionPool.getConnection(); + @Override + public Repository tryRepository() throws DataException { + try { + return new HSQLDBRepository(this.tryConnection()); + } catch (SQLException e) { + throw new DataException("Repository instantiation error", e); + } + } + private Connection getConnection() throws SQLException { + final long before = System.currentTimeMillis(); + Connection connection = this.connectionPool.getConnection(); + final long delay = System.currentTimeMillis() - before; + + if (delay > SLOW_CONNECTION_THRESHOLD) + // This could be an indication of excessive repository use, or insufficient pool size + LOGGER.warn(String.format("Fetching repository connection from pool took %dms (threshold: %dms)"), delay, SLOW_CONNECTION_THRESHOLD); + + setupConnection(connection); + return connection; + } + + private Connection tryConnection() throws SQLException { + Connection connection = this.connectionPool.tryConnection(); + if (connection == null) + return null; + + setupConnection(connection); + return connection; + } + + private void setupConnection(Connection connection) throws SQLException { // Set transaction level connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE); connection.setAutoCommit(false); - - return connection; } @Override diff --git a/src/main/java/org/qora/transaction/Transaction.java b/src/main/java/org/qora/transaction/Transaction.java index 2e5594ca..d30e53ee 100644 --- a/src/main/java/org/qora/transaction/Transaction.java +++ b/src/main/java/org/qora/transaction/Transaction.java @@ -517,16 +517,20 @@ public abstract class Transaction { * @throws DataException */ public ValidationResult isValidUnconfirmed() throws DataException { + final Long now = NTP.getTime(); + if (now == null) + return ValidationResult.CLOCK_NOT_SYNCED; + + // Expired already? + if (now >= this.getDeadline()) + return ValidationResult.TIMESTAMP_TOO_OLD; + // Transactions with a timestamp prior to latest block's timestamp are too old BlockData latestBlock = repository.getBlockRepository().getLastBlock(); if (this.getDeadline() <= latestBlock.getTimestamp()) return ValidationResult.TIMESTAMP_TOO_OLD; // Transactions with a timestamp too far into future are too new - final Long now = NTP.getTime(); - if (now == null) - return ValidationResult.CLOCK_NOT_SYNCED; - long maxTimestamp = now + Settings.getInstance().getMaxTransactionTimestampFuture(); if (this.transactionData.getTimestamp() > maxTimestamp) return ValidationResult.TIMESTAMP_TOO_NEW; diff --git a/src/main/java/org/qora/utils/ExecuteProduceConsume.java b/src/main/java/org/qora/utils/ExecuteProduceConsume.java index ccba247a..2d4320e9 100644 --- a/src/main/java/org/qora/utils/ExecuteProduceConsume.java +++ b/src/main/java/org/qora/utils/ExecuteProduceConsume.java @@ -103,8 +103,13 @@ public abstract class ExecuteProduceConsume implements Runnable { } final boolean lambdaCanIdle = canBlock; - logger.trace(() -> String.format("[%d] producing, canBlock is %b...", Thread.currentThread().getId(), lambdaCanIdle)); + logger.trace(() -> String.format("[%d] producing, activeThreadCount: %d, consumerCount: %d, canBlock is %b...", + Thread.currentThread().getId(), activeThreadCount, consumerCount, lambdaCanIdle)); + + final long now = System.currentTimeMillis(); task = produceTask(canBlock); + final long delay = System.currentTimeMillis() - now; + logger.trace(() -> String.format("[%d] producing took %dms", Thread.currentThread().getId(), delay)); } if (task == null)