Update ArbitraryDataCacheManager.java

* Used CopyOnWriteArrayList for the updateQueue to reduce synchronization complexity
* Reorganized methods for transaction and batch processing into reusable units
* Separated logic for cache rebuilding and status refreshing
This commit is contained in:
cwd.systems | 0KN 2024-11-27 22:12:23 +06:00 committed by GitHub
parent 8ffb0625a1
commit 10e21002ec
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -15,47 +15,53 @@ import org.qortal.transaction.ArbitraryTransaction;
import org.qortal.transaction.Transaction; import org.qortal.transaction.Transaction;
import org.qortal.utils.Base58; import org.qortal.utils.Base58;
import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
public class ArbitraryDataCacheManager extends Thread { public class ArbitraryDataCacheManager extends Thread {
private static final Logger LOGGER = LogManager.getLogger(ArbitraryDataCacheManager.class); private static final Logger LOGGER = LogManager.getLogger(ArbitraryDataCacheManager.class);
private static final long SLEEP_INTERVAL_MS = 500L;
private static final int BATCH_SIZE = 100;
private static ArbitraryDataCacheManager instance; private static ArbitraryDataCacheManager instance;
private volatile boolean isStopping = false; private volatile boolean isStopping = false;
/** Queue of arbitrary transactions that require cache updates */ /** Queue of arbitrary transactions that require cache updates */
private final List<ArbitraryTransactionData> updateQueue = Collections.synchronizedList(new ArrayList<>()); private final List<ArbitraryTransactionData> updateQueue = new CopyOnWriteArrayList<>();
/** Singleton instance access */
public static synchronized ArbitraryDataCacheManager getInstance() { public static synchronized ArbitraryDataCacheManager getInstance() {
if (instance == null) { if (instance == null) {
instance = new ArbitraryDataCacheManager(); instance = new ArbitraryDataCacheManager();
} }
return instance; return instance;
} }
private ArbitraryDataCacheManager() {
// Private constructor for singleton
}
@Override @Override
public void run() { public void run() {
Thread.currentThread().setName("Arbitrary Data Cache Manager"); Thread.currentThread().setName("Arbitrary Data Cache Manager");
Thread.currentThread().setPriority(NORM_PRIORITY); Thread.currentThread().setPriority(NORM_PRIORITY);
try { try {
while (!Controller.isStopping()) { while (!isStopping && !Controller.isStopping()) {
Thread.sleep(500L); Thread.sleep(SLEEP_INTERVAL_MS);
processUpdateQueue();
// Process queue
processResourceQueue();
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
// Fall through to exit thread LOGGER.info("Cache Manager interrupted, preparing to stop.");
Thread.currentThread().interrupt(); // Restore interrupt flag
} finally {
// Ensure the queue is processed before shutting down
processUpdateQueue();
} }
// Clear queue before terminating thread
processResourceQueue();
} }
public void shutdown() { public void shutdown() {
@ -63,177 +69,155 @@ public class ArbitraryDataCacheManager extends Thread {
this.interrupt(); this.interrupt();
} }
private void processUpdateQueue() {
private void processResourceQueue() { if (updateQueue.isEmpty()) {
if (this.updateQueue.isEmpty()) {
// Nothing to do
return; return;
} }
try (final Repository repository = RepositoryManager.getRepository()) { try (Repository repository = RepositoryManager.getRepository()) {
// Take a snapshot of resourceQueue, so we don't need to lock it while processing // Snapshot the queue to avoid locking during processing
List<ArbitraryTransactionData> resourceQueueCopy = List.copyOf(this.updateQueue); List<ArbitraryTransactionData> snapshot = List.copyOf(updateQueue);
for (ArbitraryTransactionData transactionData : resourceQueueCopy) { for (ArbitraryTransactionData transactionData : snapshot) {
// Best not to return when controller is stopping, as ideally we need to finish processing processTransaction(repository, transactionData);
updateQueue.remove(transactionData); // Remove from queue after processing
LOGGER.debug(() -> String.format("Processing transaction %.8s in arbitrary resource queue...", Base58.encode(transactionData.getSignature())));
// Remove from the queue regardless of outcome
this.updateQueue.remove(transactionData);
// Update arbitrary resource caches
try {
ArbitraryTransaction arbitraryTransaction = new ArbitraryTransaction(repository, transactionData);
arbitraryTransaction.updateArbitraryResourceCache(repository);
arbitraryTransaction.updateArbitraryMetadataCache(repository);
repository.saveChanges();
// Update status as separate commit, as this is more prone to failure
arbitraryTransaction.updateArbitraryResourceStatus(repository);
repository.saveChanges();
LOGGER.debug(() -> String.format("Finished processing transaction %.8s in arbitrary resource queue...", Base58.encode(transactionData.getSignature())));
} catch (DataException e) {
repository.discardChanges();
LOGGER.error("Repository issue while updating arbitrary resource caches", e);
}
} }
} catch (DataException e) { } catch (DataException e) {
LOGGER.error("Repository issue while processing arbitrary resource cache updates", e); LOGGER.error("Repository issue while processing arbitrary resource cache updates", e);
} }
} }
public void addToUpdateQueue(ArbitraryTransactionData transactionData) { private void processTransaction(Repository repository, ArbitraryTransactionData transactionData) {
this.updateQueue.add(transactionData); try {
LOGGER.debug(() -> String.format("Transaction %.8s added to queue", Base58.encode(transactionData.getSignature()))); LOGGER.debug("Processing transaction {} in cache manager...", Base58.encode(transactionData.getSignature()));
// Update arbitrary resource caches
ArbitraryTransaction arbitraryTransaction = new ArbitraryTransaction(repository, transactionData);
arbitraryTransaction.updateArbitraryResourceCache(repository);
arbitraryTransaction.updateArbitraryMetadataCache(repository);
repository.saveChanges();
// Update resource statuses separately
arbitraryTransaction.updateArbitraryResourceStatus(repository);
repository.saveChanges();
LOGGER.debug("Completed processing transaction {}", Base58.encode(transactionData.getSignature()));
} catch (DataException e) {
LOGGER.error("Error processing transaction {}: {}", Base58.encode(transactionData.getSignature()), e.getMessage(), e);
repository.discardChanges();
}
} }
public boolean needsArbitraryResourcesCacheRebuild(Repository repository) throws DataException { public void addToUpdateQueue(ArbitraryTransactionData transactionData) {
// Check if we have an entry in the cache for the oldest ARBITRARY transaction with a name updateQueue.add(transactionData);
List<ArbitraryTransactionData> oldestCacheableTransactions = repository.getArbitraryRepository().getArbitraryTransactions(true, 1, 0, false); LOGGER.debug("Transaction {} added to update queue", Base58.encode(transactionData.getSignature()));
if (oldestCacheableTransactions == null || oldestCacheableTransactions.isEmpty()) { }
// No relevant arbitrary transactions yet on this chain
LOGGER.debug("No relevant arbitrary transactions exist to build cache from"); public boolean needsCacheRebuild(Repository repository) throws DataException {
List<ArbitraryTransactionData> oldestTransactions = repository.getArbitraryRepository()
.getArbitraryTransactions(true, 1, 0, false);
if (oldestTransactions == null || oldestTransactions.isEmpty()) {
LOGGER.debug("No arbitrary transactions available for cache rebuild.");
return false; return false;
} }
// We have an arbitrary transaction, so check if it's in the cache
ArbitraryTransactionData txn = oldestCacheableTransactions.get(0); ArbitraryTransactionData oldestTransaction = oldestTransactions.get(0);
ArbitraryResourceData cachedResource = repository.getArbitraryRepository().getArbitraryResource(txn.getService(), txn.getName(), txn.getIdentifier()); ArbitraryResourceData cachedResource = repository.getArbitraryRepository()
.getArbitraryResource(oldestTransaction.getService(), oldestTransaction.getName(), oldestTransaction.getIdentifier());
if (cachedResource != null) { if (cachedResource != null) {
// Earliest resource exists in the cache, so assume it has been built. LOGGER.debug("Cache already built for arbitrary resources.");
// We avoid checkpointing and prevent the node from starting up in the case of a rebuild failure, so
// we shouldn't ever be left in a partially rebuilt state.
LOGGER.debug("Arbitrary resources cache already built");
return false; return false;
} }
return true; return true;
} }
public boolean buildArbitraryResourcesCache(Repository repository, boolean forceRebuild) throws DataException { public boolean buildCache(Repository repository, boolean forceRebuild) throws DataException {
if (Settings.getInstance().isLite()) { if (Settings.getInstance().isLite()) {
// Lite nodes have no blockchain LOGGER.warn("Lite nodes cannot build caches.");
return false; return false;
} }
if (!needsCacheRebuild(repository) && !forceRebuild) {
LOGGER.debug("Arbitrary resources cache already built.");
return false;
}
LOGGER.info("Building arbitrary resources cache...");
SplashFrame.getInstance().updateStatus("Building QDN cache - please wait...");
try { try {
// Skip if already built processTransactionsInBatches(repository);
if (!needsArbitraryResourcesCacheRebuild(repository) && !forceRebuild) { refreshStatuses(repository);
LOGGER.debug("Arbitrary resources cache already built");
return false; LOGGER.info("Cache build completed successfully.");
return true;
} catch (DataException e) {
LOGGER.error("Error building cache: {}", e.getMessage(), e);
repository.discardChanges();
throw new DataException("Cache build failed.");
}
}
private void processTransactionsInBatches(Repository repository) throws DataException {
int offset = 0;
while (!Controller.isStopping()) {
LOGGER.info("Processing transactions {} - {}", offset, offset + BATCH_SIZE - 1);
List<byte[]> signatures = repository.getTransactionRepository().getSignaturesMatchingCriteria(
null, null, null,
List.of(Transaction.TransactionType.ARBITRARY),
null, null, null,
TransactionsResource.ConfirmationStatus.BOTH,
BATCH_SIZE, offset, false
);
if (signatures.isEmpty()) {
break; // No more transactions to process
} }
LOGGER.info("Building arbitrary resources cache..."); for (byte[] signature : signatures) {
SplashFrame.getInstance().updateStatus("Building QDN cache - please wait..."); ArbitraryTransactionData transactionData = (ArbitraryTransactionData)
repository.getTransactionRepository().fromSignature(signature);
final int batchSize = 100; if (transactionData.getService() != null) {
int offset = 0;
// Loop through all ARBITRARY transactions, and determine latest state
while (!Controller.isStopping()) {
LOGGER.info("Fetching arbitrary transactions {} - {}", offset, offset+batchSize-1);
List<byte[]> signatures = repository.getTransactionRepository().getSignaturesMatchingCriteria(null, null, null, List.of(Transaction.TransactionType.ARBITRARY), null, null, null, TransactionsResource.ConfirmationStatus.BOTH, batchSize, offset, false);
if (signatures.isEmpty()) {
// Complete
break;
}
// Expand signatures to transactions
for (byte[] signature : signatures) {
ArbitraryTransactionData transactionData = (ArbitraryTransactionData) repository
.getTransactionRepository().fromSignature(signature);
if (transactionData.getService() == null) {
// Unsupported service - ignore this resource
continue;
}
// Update arbitrary resource caches
ArbitraryTransaction arbitraryTransaction = new ArbitraryTransaction(repository, transactionData); ArbitraryTransaction arbitraryTransaction = new ArbitraryTransaction(repository, transactionData);
arbitraryTransaction.updateArbitraryResourceCache(repository); arbitraryTransaction.updateArbitraryResourceCache(repository);
arbitraryTransaction.updateArbitraryMetadataCache(repository); arbitraryTransaction.updateArbitraryMetadataCache(repository);
repository.saveChanges(); repository.saveChanges();
} }
offset += batchSize;
} }
// Now refresh all statuses offset += BATCH_SIZE;
refreshArbitraryStatuses(repository);
LOGGER.info("Completed build of arbitrary resources cache.");
return true;
}
catch (DataException e) {
LOGGER.info("Unable to build arbitrary resources cache: {}. The database may have been left in an inconsistent state.", e.getMessage());
// Throw an exception so that the node startup is halted, allowing for a retry next time.
repository.discardChanges();
throw new DataException("Build of arbitrary resources cache failed.");
} }
} }
private boolean refreshArbitraryStatuses(Repository repository) throws DataException { private void refreshStatuses(Repository repository) throws DataException {
try { LOGGER.info("Refreshing arbitrary resource statuses...");
LOGGER.info("Refreshing arbitrary resource statuses for locally hosted transactions..."); SplashFrame.getInstance().updateStatus("Refreshing statuses - please wait...");
SplashFrame.getInstance().updateStatus("Refreshing statuses - please wait...");
final int batchSize = 100; int offset = 0;
int offset = 0;
// Loop through all ARBITRARY transactions, and determine latest state while (!Controller.isStopping()) {
while (!Controller.isStopping()) { List<ArbitraryTransactionData> hostedTransactions = ArbitraryDataStorageManager.getInstance()
LOGGER.info("Fetching hosted transactions {} - {}", offset, offset+batchSize-1); .listAllHostedTransactions(repository, BATCH_SIZE, offset);
List<ArbitraryTransactionData> hostedTransactions = ArbitraryDataStorageManager.getInstance().listAllHostedTransactions(repository, batchSize, offset); if (hostedTransactions.isEmpty()) {
if (hostedTransactions.isEmpty()) { break;
// Complete
break;
}
// Loop through hosted transactions
for (ArbitraryTransactionData transactionData : hostedTransactions) {
// Determine status and update cache
ArbitraryTransaction arbitraryTransaction = new ArbitraryTransaction(repository, transactionData);
arbitraryTransaction.updateArbitraryResourceStatus(repository);
repository.saveChanges();
}
offset += batchSize;
} }
LOGGER.info("Completed refresh of arbitrary resource statuses."); for (ArbitraryTransactionData transactionData : hostedTransactions) {
return true; ArbitraryTransaction arbitraryTransaction = new ArbitraryTransaction(repository, transactionData);
} arbitraryTransaction.updateArbitraryResourceStatus(repository);
catch (DataException e) { repository.saveChanges();
LOGGER.info("Unable to refresh arbitrary resource statuses: {}. The database may have been left in an inconsistent state.", e.getMessage()); }
// Throw an exception so that the node startup is halted, allowing for a retry next time. offset += BATCH_SIZE;
repository.discardChanges();
throw new DataException("Refresh of arbitrary resource statuses failed.");
} }
LOGGER.info("Arbitrary resource statuses refreshed.");
} }
} }