From 10e21002ec95fbe535fa535a2ecb27d8102e49b7 Mon Sep 17 00:00:00 2001 From: "cwd.systems | 0KN" <101006007+infinitydaemon@users.noreply.github.com> Date: Wed, 27 Nov 2024 22:12:23 +0600 Subject: [PATCH] Update ArbitraryDataCacheManager.java * Used CopyOnWriteArrayList for the updateQueue to reduce synchronization complexity * Reorganized methods for transaction and batch processing into reusable units * Separated logic for cache rebuilding and status refreshing --- .../arbitrary/ArbitraryDataCacheManager.java | 262 ++++++++---------- 1 file changed, 123 insertions(+), 139 deletions(-) diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataCacheManager.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataCacheManager.java index d6b9303f..3246cc95 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataCacheManager.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataCacheManager.java @@ -15,47 +15,53 @@ import org.qortal.transaction.ArbitraryTransaction; import org.qortal.transaction.Transaction; import org.qortal.utils.Base58; -import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; public class ArbitraryDataCacheManager extends Thread { private static final Logger LOGGER = LogManager.getLogger(ArbitraryDataCacheManager.class); + private static final long SLEEP_INTERVAL_MS = 500L; + private static final int BATCH_SIZE = 100; + private static ArbitraryDataCacheManager instance; + private volatile boolean isStopping = false; /** Queue of arbitrary transactions that require cache updates */ - private final List updateQueue = Collections.synchronizedList(new ArrayList<>()); - + private final List updateQueue = new CopyOnWriteArrayList<>(); + /** Singleton instance access */ public static synchronized ArbitraryDataCacheManager getInstance() { if (instance == null) { instance = new ArbitraryDataCacheManager(); } - return instance; } + private ArbitraryDataCacheManager() { + // Private constructor for singleton + } + @Override public void run() { Thread.currentThread().setName("Arbitrary Data Cache Manager"); Thread.currentThread().setPriority(NORM_PRIORITY); try { - while (!Controller.isStopping()) { - Thread.sleep(500L); - - // Process queue - processResourceQueue(); + while (!isStopping && !Controller.isStopping()) { + Thread.sleep(SLEEP_INTERVAL_MS); + processUpdateQueue(); } } catch (InterruptedException e) { - // Fall through to exit thread + LOGGER.info("Cache Manager interrupted, preparing to stop."); + Thread.currentThread().interrupt(); // Restore interrupt flag + } finally { + // Ensure the queue is processed before shutting down + processUpdateQueue(); } - - // Clear queue before terminating thread - processResourceQueue(); } public void shutdown() { @@ -63,177 +69,155 @@ public class ArbitraryDataCacheManager extends Thread { this.interrupt(); } - - private void processResourceQueue() { - if (this.updateQueue.isEmpty()) { - // Nothing to do + private void processUpdateQueue() { + if (updateQueue.isEmpty()) { return; } - try (final Repository repository = RepositoryManager.getRepository()) { - // Take a snapshot of resourceQueue, so we don't need to lock it while processing - List resourceQueueCopy = List.copyOf(this.updateQueue); + try (Repository repository = RepositoryManager.getRepository()) { + // Snapshot the queue to avoid locking during processing + List snapshot = List.copyOf(updateQueue); - for (ArbitraryTransactionData transactionData : resourceQueueCopy) { - // Best not to return when controller is stopping, as ideally we need to finish processing - - LOGGER.debug(() -> String.format("Processing transaction %.8s in arbitrary resource queue...", Base58.encode(transactionData.getSignature()))); - - // Remove from the queue regardless of outcome - this.updateQueue.remove(transactionData); - - // Update arbitrary resource caches - try { - ArbitraryTransaction arbitraryTransaction = new ArbitraryTransaction(repository, transactionData); - arbitraryTransaction.updateArbitraryResourceCache(repository); - arbitraryTransaction.updateArbitraryMetadataCache(repository); - repository.saveChanges(); - - // Update status as separate commit, as this is more prone to failure - arbitraryTransaction.updateArbitraryResourceStatus(repository); - repository.saveChanges(); - - LOGGER.debug(() -> String.format("Finished processing transaction %.8s in arbitrary resource queue...", Base58.encode(transactionData.getSignature()))); - - } catch (DataException e) { - repository.discardChanges(); - LOGGER.error("Repository issue while updating arbitrary resource caches", e); - } + for (ArbitraryTransactionData transactionData : snapshot) { + processTransaction(repository, transactionData); + updateQueue.remove(transactionData); // Remove from queue after processing } } catch (DataException e) { LOGGER.error("Repository issue while processing arbitrary resource cache updates", e); } } - public void addToUpdateQueue(ArbitraryTransactionData transactionData) { - this.updateQueue.add(transactionData); - LOGGER.debug(() -> String.format("Transaction %.8s added to queue", Base58.encode(transactionData.getSignature()))); + private void processTransaction(Repository repository, ArbitraryTransactionData transactionData) { + try { + LOGGER.debug("Processing transaction {} in cache manager...", Base58.encode(transactionData.getSignature())); + + // Update arbitrary resource caches + ArbitraryTransaction arbitraryTransaction = new ArbitraryTransaction(repository, transactionData); + arbitraryTransaction.updateArbitraryResourceCache(repository); + arbitraryTransaction.updateArbitraryMetadataCache(repository); + repository.saveChanges(); + + // Update resource statuses separately + arbitraryTransaction.updateArbitraryResourceStatus(repository); + repository.saveChanges(); + + LOGGER.debug("Completed processing transaction {}", Base58.encode(transactionData.getSignature())); + } catch (DataException e) { + LOGGER.error("Error processing transaction {}: {}", Base58.encode(transactionData.getSignature()), e.getMessage(), e); + repository.discardChanges(); + } } - public boolean needsArbitraryResourcesCacheRebuild(Repository repository) throws DataException { - // Check if we have an entry in the cache for the oldest ARBITRARY transaction with a name - List oldestCacheableTransactions = repository.getArbitraryRepository().getArbitraryTransactions(true, 1, 0, false); - if (oldestCacheableTransactions == null || oldestCacheableTransactions.isEmpty()) { - // No relevant arbitrary transactions yet on this chain - LOGGER.debug("No relevant arbitrary transactions exist to build cache from"); + public void addToUpdateQueue(ArbitraryTransactionData transactionData) { + updateQueue.add(transactionData); + LOGGER.debug("Transaction {} added to update queue", Base58.encode(transactionData.getSignature())); + } + + public boolean needsCacheRebuild(Repository repository) throws DataException { + List oldestTransactions = repository.getArbitraryRepository() + .getArbitraryTransactions(true, 1, 0, false); + + if (oldestTransactions == null || oldestTransactions.isEmpty()) { + LOGGER.debug("No arbitrary transactions available for cache rebuild."); return false; } - // We have an arbitrary transaction, so check if it's in the cache - ArbitraryTransactionData txn = oldestCacheableTransactions.get(0); - ArbitraryResourceData cachedResource = repository.getArbitraryRepository().getArbitraryResource(txn.getService(), txn.getName(), txn.getIdentifier()); + + ArbitraryTransactionData oldestTransaction = oldestTransactions.get(0); + ArbitraryResourceData cachedResource = repository.getArbitraryRepository() + .getArbitraryResource(oldestTransaction.getService(), oldestTransaction.getName(), oldestTransaction.getIdentifier()); + if (cachedResource != null) { - // Earliest resource exists in the cache, so assume it has been built. - // We avoid checkpointing and prevent the node from starting up in the case of a rebuild failure, so - // we shouldn't ever be left in a partially rebuilt state. - LOGGER.debug("Arbitrary resources cache already built"); + LOGGER.debug("Cache already built for arbitrary resources."); return false; } return true; } - public boolean buildArbitraryResourcesCache(Repository repository, boolean forceRebuild) throws DataException { + public boolean buildCache(Repository repository, boolean forceRebuild) throws DataException { if (Settings.getInstance().isLite()) { - // Lite nodes have no blockchain + LOGGER.warn("Lite nodes cannot build caches."); return false; } + if (!needsCacheRebuild(repository) && !forceRebuild) { + LOGGER.debug("Arbitrary resources cache already built."); + return false; + } + + LOGGER.info("Building arbitrary resources cache..."); + SplashFrame.getInstance().updateStatus("Building QDN cache - please wait..."); + try { - // Skip if already built - if (!needsArbitraryResourcesCacheRebuild(repository) && !forceRebuild) { - LOGGER.debug("Arbitrary resources cache already built"); - return false; + processTransactionsInBatches(repository); + refreshStatuses(repository); + + LOGGER.info("Cache build completed successfully."); + return true; + } catch (DataException e) { + LOGGER.error("Error building cache: {}", e.getMessage(), e); + repository.discardChanges(); + throw new DataException("Cache build failed."); + } + } + + private void processTransactionsInBatches(Repository repository) throws DataException { + int offset = 0; + + while (!Controller.isStopping()) { + LOGGER.info("Processing transactions {} - {}", offset, offset + BATCH_SIZE - 1); + + List signatures = repository.getTransactionRepository().getSignaturesMatchingCriteria( + null, null, null, + List.of(Transaction.TransactionType.ARBITRARY), + null, null, null, + TransactionsResource.ConfirmationStatus.BOTH, + BATCH_SIZE, offset, false + ); + + if (signatures.isEmpty()) { + break; // No more transactions to process } - LOGGER.info("Building arbitrary resources cache..."); - SplashFrame.getInstance().updateStatus("Building QDN cache - please wait..."); + for (byte[] signature : signatures) { + ArbitraryTransactionData transactionData = (ArbitraryTransactionData) + repository.getTransactionRepository().fromSignature(signature); - final int batchSize = 100; - int offset = 0; - - // Loop through all ARBITRARY transactions, and determine latest state - while (!Controller.isStopping()) { - LOGGER.info("Fetching arbitrary transactions {} - {}", offset, offset+batchSize-1); - - List signatures = repository.getTransactionRepository().getSignaturesMatchingCriteria(null, null, null, List.of(Transaction.TransactionType.ARBITRARY), null, null, null, TransactionsResource.ConfirmationStatus.BOTH, batchSize, offset, false); - if (signatures.isEmpty()) { - // Complete - break; - } - - // Expand signatures to transactions - for (byte[] signature : signatures) { - ArbitraryTransactionData transactionData = (ArbitraryTransactionData) repository - .getTransactionRepository().fromSignature(signature); - - if (transactionData.getService() == null) { - // Unsupported service - ignore this resource - continue; - } - - // Update arbitrary resource caches + if (transactionData.getService() != null) { ArbitraryTransaction arbitraryTransaction = new ArbitraryTransaction(repository, transactionData); arbitraryTransaction.updateArbitraryResourceCache(repository); arbitraryTransaction.updateArbitraryMetadataCache(repository); repository.saveChanges(); } - offset += batchSize; } - // Now refresh all statuses - refreshArbitraryStatuses(repository); - - LOGGER.info("Completed build of arbitrary resources cache."); - return true; - } - catch (DataException e) { - LOGGER.info("Unable to build arbitrary resources cache: {}. The database may have been left in an inconsistent state.", e.getMessage()); - - // Throw an exception so that the node startup is halted, allowing for a retry next time. - repository.discardChanges(); - throw new DataException("Build of arbitrary resources cache failed."); + offset += BATCH_SIZE; } } - private boolean refreshArbitraryStatuses(Repository repository) throws DataException { - try { - LOGGER.info("Refreshing arbitrary resource statuses for locally hosted transactions..."); - SplashFrame.getInstance().updateStatus("Refreshing statuses - please wait..."); + private void refreshStatuses(Repository repository) throws DataException { + LOGGER.info("Refreshing arbitrary resource statuses..."); + SplashFrame.getInstance().updateStatus("Refreshing statuses - please wait..."); - final int batchSize = 100; - int offset = 0; + int offset = 0; - // Loop through all ARBITRARY transactions, and determine latest state - while (!Controller.isStopping()) { - LOGGER.info("Fetching hosted transactions {} - {}", offset, offset+batchSize-1); + while (!Controller.isStopping()) { + List hostedTransactions = ArbitraryDataStorageManager.getInstance() + .listAllHostedTransactions(repository, BATCH_SIZE, offset); - List hostedTransactions = ArbitraryDataStorageManager.getInstance().listAllHostedTransactions(repository, batchSize, offset); - if (hostedTransactions.isEmpty()) { - // Complete - break; - } - - // Loop through hosted transactions - for (ArbitraryTransactionData transactionData : hostedTransactions) { - - // Determine status and update cache - ArbitraryTransaction arbitraryTransaction = new ArbitraryTransaction(repository, transactionData); - arbitraryTransaction.updateArbitraryResourceStatus(repository); - repository.saveChanges(); - } - offset += batchSize; + if (hostedTransactions.isEmpty()) { + break; } - LOGGER.info("Completed refresh of arbitrary resource statuses."); - return true; - } - catch (DataException e) { - LOGGER.info("Unable to refresh arbitrary resource statuses: {}. The database may have been left in an inconsistent state.", e.getMessage()); + for (ArbitraryTransactionData transactionData : hostedTransactions) { + ArbitraryTransaction arbitraryTransaction = new ArbitraryTransaction(repository, transactionData); + arbitraryTransaction.updateArbitraryResourceStatus(repository); + repository.saveChanges(); + } - // Throw an exception so that the node startup is halted, allowing for a retry next time. - repository.discardChanges(); - throw new DataException("Refresh of arbitrary resource statuses failed."); + offset += BATCH_SIZE; } + + LOGGER.info("Arbitrary resource statuses refreshed."); } - }