Merge 10e21002ec95fbe535fa535a2ecb27d8102e49b7 into 8ffb0625a1edcf0b3d1ec2498b15a31ec38ade3c

This commit is contained in:
cwd.systems | 0KN 2024-11-27 22:12:33 +06:00 committed by GitHub
commit a3a13b3811
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -15,47 +15,53 @@ import org.qortal.transaction.ArbitraryTransaction;
import org.qortal.transaction.Transaction;
import org.qortal.utils.Base58;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
public class ArbitraryDataCacheManager extends Thread {
private static final Logger LOGGER = LogManager.getLogger(ArbitraryDataCacheManager.class);
private static final long SLEEP_INTERVAL_MS = 500L;
private static final int BATCH_SIZE = 100;
private static ArbitraryDataCacheManager instance;
private volatile boolean isStopping = false;
/** Queue of arbitrary transactions that require cache updates */
private final List<ArbitraryTransactionData> updateQueue = Collections.synchronizedList(new ArrayList<>());
private final List<ArbitraryTransactionData> updateQueue = new CopyOnWriteArrayList<>();
/** Singleton instance access */
public static synchronized ArbitraryDataCacheManager getInstance() {
if (instance == null) {
instance = new ArbitraryDataCacheManager();
}
return instance;
}
private ArbitraryDataCacheManager() {
// Private constructor for singleton
}
@Override
public void run() {
Thread.currentThread().setName("Arbitrary Data Cache Manager");
Thread.currentThread().setPriority(NORM_PRIORITY);
try {
while (!Controller.isStopping()) {
Thread.sleep(500L);
// Process queue
processResourceQueue();
while (!isStopping && !Controller.isStopping()) {
Thread.sleep(SLEEP_INTERVAL_MS);
processUpdateQueue();
}
} catch (InterruptedException e) {
// Fall through to exit thread
LOGGER.info("Cache Manager interrupted, preparing to stop.");
Thread.currentThread().interrupt(); // Restore interrupt flag
} finally {
// Ensure the queue is processed before shutting down
processUpdateQueue();
}
// Clear queue before terminating thread
processResourceQueue();
}
public void shutdown() {
@ -63,177 +69,155 @@ public class ArbitraryDataCacheManager extends Thread {
this.interrupt();
}
private void processResourceQueue() {
if (this.updateQueue.isEmpty()) {
// Nothing to do
private void processUpdateQueue() {
if (updateQueue.isEmpty()) {
return;
}
try (final Repository repository = RepositoryManager.getRepository()) {
// Take a snapshot of resourceQueue, so we don't need to lock it while processing
List<ArbitraryTransactionData> resourceQueueCopy = List.copyOf(this.updateQueue);
try (Repository repository = RepositoryManager.getRepository()) {
// Snapshot the queue to avoid locking during processing
List<ArbitraryTransactionData> snapshot = List.copyOf(updateQueue);
for (ArbitraryTransactionData transactionData : resourceQueueCopy) {
// Best not to return when controller is stopping, as ideally we need to finish processing
LOGGER.debug(() -> String.format("Processing transaction %.8s in arbitrary resource queue...", Base58.encode(transactionData.getSignature())));
// Remove from the queue regardless of outcome
this.updateQueue.remove(transactionData);
// Update arbitrary resource caches
try {
ArbitraryTransaction arbitraryTransaction = new ArbitraryTransaction(repository, transactionData);
arbitraryTransaction.updateArbitraryResourceCache(repository);
arbitraryTransaction.updateArbitraryMetadataCache(repository);
repository.saveChanges();
// Update status as separate commit, as this is more prone to failure
arbitraryTransaction.updateArbitraryResourceStatus(repository);
repository.saveChanges();
LOGGER.debug(() -> String.format("Finished processing transaction %.8s in arbitrary resource queue...", Base58.encode(transactionData.getSignature())));
} catch (DataException e) {
repository.discardChanges();
LOGGER.error("Repository issue while updating arbitrary resource caches", e);
}
for (ArbitraryTransactionData transactionData : snapshot) {
processTransaction(repository, transactionData);
updateQueue.remove(transactionData); // Remove from queue after processing
}
} catch (DataException e) {
LOGGER.error("Repository issue while processing arbitrary resource cache updates", e);
}
}
public void addToUpdateQueue(ArbitraryTransactionData transactionData) {
this.updateQueue.add(transactionData);
LOGGER.debug(() -> String.format("Transaction %.8s added to queue", Base58.encode(transactionData.getSignature())));
}
public boolean needsArbitraryResourcesCacheRebuild(Repository repository) throws DataException {
// Check if we have an entry in the cache for the oldest ARBITRARY transaction with a name
List<ArbitraryTransactionData> oldestCacheableTransactions = repository.getArbitraryRepository().getArbitraryTransactions(true, 1, 0, false);
if (oldestCacheableTransactions == null || oldestCacheableTransactions.isEmpty()) {
// No relevant arbitrary transactions yet on this chain
LOGGER.debug("No relevant arbitrary transactions exist to build cache from");
return false;
}
// We have an arbitrary transaction, so check if it's in the cache
ArbitraryTransactionData txn = oldestCacheableTransactions.get(0);
ArbitraryResourceData cachedResource = repository.getArbitraryRepository().getArbitraryResource(txn.getService(), txn.getName(), txn.getIdentifier());
if (cachedResource != null) {
// Earliest resource exists in the cache, so assume it has been built.
// We avoid checkpointing and prevent the node from starting up in the case of a rebuild failure, so
// we shouldn't ever be left in a partially rebuilt state.
LOGGER.debug("Arbitrary resources cache already built");
return false;
}
return true;
}
public boolean buildArbitraryResourcesCache(Repository repository, boolean forceRebuild) throws DataException {
if (Settings.getInstance().isLite()) {
// Lite nodes have no blockchain
return false;
}
private void processTransaction(Repository repository, ArbitraryTransactionData transactionData) {
try {
// Skip if already built
if (!needsArbitraryResourcesCacheRebuild(repository) && !forceRebuild) {
LOGGER.debug("Arbitrary resources cache already built");
return false;
}
LOGGER.info("Building arbitrary resources cache...");
SplashFrame.getInstance().updateStatus("Building QDN cache - please wait...");
final int batchSize = 100;
int offset = 0;
// Loop through all ARBITRARY transactions, and determine latest state
while (!Controller.isStopping()) {
LOGGER.info("Fetching arbitrary transactions {} - {}", offset, offset+batchSize-1);
List<byte[]> signatures = repository.getTransactionRepository().getSignaturesMatchingCriteria(null, null, null, List.of(Transaction.TransactionType.ARBITRARY), null, null, null, TransactionsResource.ConfirmationStatus.BOTH, batchSize, offset, false);
if (signatures.isEmpty()) {
// Complete
break;
}
// Expand signatures to transactions
for (byte[] signature : signatures) {
ArbitraryTransactionData transactionData = (ArbitraryTransactionData) repository
.getTransactionRepository().fromSignature(signature);
if (transactionData.getService() == null) {
// Unsupported service - ignore this resource
continue;
}
LOGGER.debug("Processing transaction {} in cache manager...", Base58.encode(transactionData.getSignature()));
// Update arbitrary resource caches
ArbitraryTransaction arbitraryTransaction = new ArbitraryTransaction(repository, transactionData);
arbitraryTransaction.updateArbitraryResourceCache(repository);
arbitraryTransaction.updateArbitraryMetadataCache(repository);
repository.saveChanges();
// Update resource statuses separately
arbitraryTransaction.updateArbitraryResourceStatus(repository);
repository.saveChanges();
LOGGER.debug("Completed processing transaction {}", Base58.encode(transactionData.getSignature()));
} catch (DataException e) {
LOGGER.error("Error processing transaction {}: {}", Base58.encode(transactionData.getSignature()), e.getMessage(), e);
repository.discardChanges();
}
offset += batchSize;
}
// Now refresh all statuses
refreshArbitraryStatuses(repository);
public void addToUpdateQueue(ArbitraryTransactionData transactionData) {
updateQueue.add(transactionData);
LOGGER.debug("Transaction {} added to update queue", Base58.encode(transactionData.getSignature()));
}
public boolean needsCacheRebuild(Repository repository) throws DataException {
List<ArbitraryTransactionData> oldestTransactions = repository.getArbitraryRepository()
.getArbitraryTransactions(true, 1, 0, false);
if (oldestTransactions == null || oldestTransactions.isEmpty()) {
LOGGER.debug("No arbitrary transactions available for cache rebuild.");
return false;
}
ArbitraryTransactionData oldestTransaction = oldestTransactions.get(0);
ArbitraryResourceData cachedResource = repository.getArbitraryRepository()
.getArbitraryResource(oldestTransaction.getService(), oldestTransaction.getName(), oldestTransaction.getIdentifier());
if (cachedResource != null) {
LOGGER.debug("Cache already built for arbitrary resources.");
return false;
}
LOGGER.info("Completed build of arbitrary resources cache.");
return true;
}
catch (DataException e) {
LOGGER.info("Unable to build arbitrary resources cache: {}. The database may have been left in an inconsistent state.", e.getMessage());
// Throw an exception so that the node startup is halted, allowing for a retry next time.
repository.discardChanges();
throw new DataException("Build of arbitrary resources cache failed.");
}
public boolean buildCache(Repository repository, boolean forceRebuild) throws DataException {
if (Settings.getInstance().isLite()) {
LOGGER.warn("Lite nodes cannot build caches.");
return false;
}
private boolean refreshArbitraryStatuses(Repository repository) throws DataException {
if (!needsCacheRebuild(repository) && !forceRebuild) {
LOGGER.debug("Arbitrary resources cache already built.");
return false;
}
LOGGER.info("Building arbitrary resources cache...");
SplashFrame.getInstance().updateStatus("Building QDN cache - please wait...");
try {
LOGGER.info("Refreshing arbitrary resource statuses for locally hosted transactions...");
SplashFrame.getInstance().updateStatus("Refreshing statuses - please wait...");
processTransactionsInBatches(repository);
refreshStatuses(repository);
final int batchSize = 100;
LOGGER.info("Cache build completed successfully.");
return true;
} catch (DataException e) {
LOGGER.error("Error building cache: {}", e.getMessage(), e);
repository.discardChanges();
throw new DataException("Cache build failed.");
}
}
private void processTransactionsInBatches(Repository repository) throws DataException {
int offset = 0;
// Loop through all ARBITRARY transactions, and determine latest state
while (!Controller.isStopping()) {
LOGGER.info("Fetching hosted transactions {} - {}", offset, offset+batchSize-1);
LOGGER.info("Processing transactions {} - {}", offset, offset + BATCH_SIZE - 1);
List<byte[]> signatures = repository.getTransactionRepository().getSignaturesMatchingCriteria(
null, null, null,
List.of(Transaction.TransactionType.ARBITRARY),
null, null, null,
TransactionsResource.ConfirmationStatus.BOTH,
BATCH_SIZE, offset, false
);
if (signatures.isEmpty()) {
break; // No more transactions to process
}
for (byte[] signature : signatures) {
ArbitraryTransactionData transactionData = (ArbitraryTransactionData)
repository.getTransactionRepository().fromSignature(signature);
if (transactionData.getService() != null) {
ArbitraryTransaction arbitraryTransaction = new ArbitraryTransaction(repository, transactionData);
arbitraryTransaction.updateArbitraryResourceCache(repository);
arbitraryTransaction.updateArbitraryMetadataCache(repository);
repository.saveChanges();
}
}
offset += BATCH_SIZE;
}
}
private void refreshStatuses(Repository repository) throws DataException {
LOGGER.info("Refreshing arbitrary resource statuses...");
SplashFrame.getInstance().updateStatus("Refreshing statuses - please wait...");
int offset = 0;
while (!Controller.isStopping()) {
List<ArbitraryTransactionData> hostedTransactions = ArbitraryDataStorageManager.getInstance()
.listAllHostedTransactions(repository, BATCH_SIZE, offset);
List<ArbitraryTransactionData> hostedTransactions = ArbitraryDataStorageManager.getInstance().listAllHostedTransactions(repository, batchSize, offset);
if (hostedTransactions.isEmpty()) {
// Complete
break;
}
// Loop through hosted transactions
for (ArbitraryTransactionData transactionData : hostedTransactions) {
// Determine status and update cache
ArbitraryTransaction arbitraryTransaction = new ArbitraryTransaction(repository, transactionData);
arbitraryTransaction.updateArbitraryResourceStatus(repository);
repository.saveChanges();
}
offset += batchSize;
offset += BATCH_SIZE;
}
LOGGER.info("Completed refresh of arbitrary resource statuses.");
return true;
}
catch (DataException e) {
LOGGER.info("Unable to refresh arbitrary resource statuses: {}. The database may have been left in an inconsistent state.", e.getMessage());
// Throw an exception so that the node startup is halted, allowing for a retry next time.
repository.discardChanges();
throw new DataException("Refresh of arbitrary resource statuses failed.");
LOGGER.info("Arbitrary resource statuses refreshed.");
}
}
}