optimized arbitrary metadata fetching, added arbitrary data cache manager notifications, removed redundant notifications, added method to arbitrary repository and a setting to support the optimization

This commit is contained in:
kennycud 2025-02-24 16:36:13 -08:00
parent 1f4ca6263f
commit 676885ea2d
7 changed files with 318 additions and 65 deletions

View File

@ -6,6 +6,8 @@ import org.qortal.api.resource.TransactionsResource;
import org.qortal.controller.Controller; import org.qortal.controller.Controller;
import org.qortal.data.arbitrary.ArbitraryResourceData; import org.qortal.data.arbitrary.ArbitraryResourceData;
import org.qortal.data.transaction.ArbitraryTransactionData; import org.qortal.data.transaction.ArbitraryTransactionData;
import org.qortal.event.DataMonitorEvent;
import org.qortal.event.EventBus;
import org.qortal.gui.SplashFrame; import org.qortal.gui.SplashFrame;
import org.qortal.repository.DataException; import org.qortal.repository.DataException;
import org.qortal.repository.Repository; import org.qortal.repository.Repository;
@ -84,15 +86,63 @@ public class ArbitraryDataCacheManager extends Thread {
// Update arbitrary resource caches // Update arbitrary resource caches
try { try {
EventBus.INSTANCE.notify(
new DataMonitorEvent(
System.currentTimeMillis(),
transactionData.getIdentifier(),
transactionData.getName(),
transactionData.getService().name(),
"updating resource cache, queue",
transactionData.getTimestamp(),
transactionData.getTimestamp()
)
);
ArbitraryTransaction arbitraryTransaction = new ArbitraryTransaction(repository, transactionData); ArbitraryTransaction arbitraryTransaction = new ArbitraryTransaction(repository, transactionData);
arbitraryTransaction.updateArbitraryResourceCache(repository); arbitraryTransaction.updateArbitraryResourceCache(repository);
arbitraryTransaction.updateArbitraryMetadataCache(repository); arbitraryTransaction.updateArbitraryMetadataCache(repository);
repository.saveChanges(); repository.saveChanges();
EventBus.INSTANCE.notify(
new DataMonitorEvent(
System.currentTimeMillis(),
transactionData.getIdentifier(),
transactionData.getName(),
transactionData.getService().name(),
"updated resource cache",
transactionData.getTimestamp(),
transactionData.getTimestamp()
)
);
EventBus.INSTANCE.notify(
new DataMonitorEvent(
System.currentTimeMillis(),
transactionData.getIdentifier(),
transactionData.getName(),
transactionData.getService().name(),
"updating resource status, queue",
transactionData.getTimestamp(),
transactionData.getTimestamp()
)
);
// Update status as separate commit, as this is more prone to failure // Update status as separate commit, as this is more prone to failure
arbitraryTransaction.updateArbitraryResourceStatus(repository); arbitraryTransaction.updateArbitraryResourceStatus(repository);
repository.saveChanges(); repository.saveChanges();
EventBus.INSTANCE.notify(
new DataMonitorEvent(
System.currentTimeMillis(),
transactionData.getIdentifier(),
transactionData.getName(),
transactionData.getService().name(),
"updated resource status",
transactionData.getTimestamp(),
transactionData.getTimestamp()
)
);
LOGGER.debug(() -> String.format("Finished processing transaction %.8s in arbitrary resource queue...", Base58.encode(transactionData.getSignature()))); LOGGER.debug(() -> String.format("Finished processing transaction %.8s in arbitrary resource queue...", Base58.encode(transactionData.getSignature())));
} catch (DataException e) { } catch (DataException e) {
@ -103,6 +153,9 @@ public class ArbitraryDataCacheManager extends Thread {
} catch (DataException e) { } catch (DataException e) {
LOGGER.error("Repository issue while processing arbitrary resource cache updates", e); LOGGER.error("Repository issue while processing arbitrary resource cache updates", e);
} }
catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
} }
public void addToUpdateQueue(ArbitraryTransactionData transactionData) { public void addToUpdateQueue(ArbitraryTransactionData transactionData) {
@ -163,19 +216,49 @@ public class ArbitraryDataCacheManager extends Thread {
// Expand signatures to transactions // Expand signatures to transactions
for (byte[] signature : signatures) { for (byte[] signature : signatures) {
ArbitraryTransactionData transactionData = (ArbitraryTransactionData) repository try {
.getTransactionRepository().fromSignature(signature); ArbitraryTransactionData transactionData = (ArbitraryTransactionData) repository
.getTransactionRepository().fromSignature(signature);
if (transactionData.getService() == null) { if (transactionData.getService() == null) {
// Unsupported service - ignore this resource // Unsupported service - ignore this resource
continue; continue;
}
EventBus.INSTANCE.notify(
new DataMonitorEvent(
System.currentTimeMillis(),
transactionData.getIdentifier(),
transactionData.getName(),
transactionData.getService().name(),
"updating resource cache, build",
transactionData.getTimestamp(),
transactionData.getTimestamp()
)
);
// Update arbitrary resource caches
ArbitraryTransaction arbitraryTransaction = new ArbitraryTransaction(repository, transactionData);
arbitraryTransaction.updateArbitraryResourceCache(repository);
arbitraryTransaction.updateArbitraryMetadataCache(repository);
repository.saveChanges();
EventBus.INSTANCE.notify(
new DataMonitorEvent(
System.currentTimeMillis(),
transactionData.getIdentifier(),
transactionData.getName(),
transactionData.getService().name(),
"updated resource cache",
transactionData.getTimestamp(),
transactionData.getTimestamp()
)
);
} catch (DataException e) {
repository.discardChanges();
LOGGER.error(e.getMessage(), e);
} }
// Update arbitrary resource caches
ArbitraryTransaction arbitraryTransaction = new ArbitraryTransaction(repository, transactionData);
arbitraryTransaction.updateArbitraryResourceCache(repository);
arbitraryTransaction.updateArbitraryMetadataCache(repository);
repository.saveChanges();
} }
offset += batchSize; offset += batchSize;
} }
@ -193,6 +276,11 @@ public class ArbitraryDataCacheManager extends Thread {
repository.discardChanges(); repository.discardChanges();
throw new DataException("Build of arbitrary resources cache failed."); throw new DataException("Build of arbitrary resources cache failed.");
} }
catch (Exception e) {
LOGGER.error(e.getMessage(), e);
return false;
}
} }
private boolean refreshArbitraryStatuses(Repository repository) throws DataException { private boolean refreshArbitraryStatuses(Repository repository) throws DataException {
@ -216,10 +304,41 @@ public class ArbitraryDataCacheManager extends Thread {
// Loop through hosted transactions // Loop through hosted transactions
for (ArbitraryTransactionData transactionData : hostedTransactions) { for (ArbitraryTransactionData transactionData : hostedTransactions) {
// Determine status and update cache try {
ArbitraryTransaction arbitraryTransaction = new ArbitraryTransaction(repository, transactionData); EventBus.INSTANCE.notify(
arbitraryTransaction.updateArbitraryResourceStatus(repository); new DataMonitorEvent(
repository.saveChanges(); System.currentTimeMillis(),
transactionData.getIdentifier(),
transactionData.getName(),
transactionData.getService().name(),
"updating resource status",
transactionData.getTimestamp(),
transactionData.getTimestamp()
)
);
// Determine status and update cache
ArbitraryTransaction arbitraryTransaction = new ArbitraryTransaction(repository, transactionData);
arbitraryTransaction.updateArbitraryResourceStatus(repository);
repository.saveChanges();
EventBus.INSTANCE.notify(
new DataMonitorEvent(
System.currentTimeMillis(),
transactionData.getIdentifier(),
transactionData.getName(),
transactionData.getService().name(),
"updated resource status",
transactionData.getTimestamp(),
transactionData.getTimestamp()
)
);
} catch (DataException e) {
repository.discardChanges();
LOGGER.error(e.getMessage(), e);
}
} }
offset += batchSize; offset += batchSize;
} }
@ -234,6 +353,11 @@ public class ArbitraryDataCacheManager extends Thread {
repository.discardChanges(); repository.discardChanges();
throw new DataException("Refresh of arbitrary resource statuses failed."); throw new DataException("Refresh of arbitrary resource statuses failed.");
} }
catch (Exception e) {
LOGGER.error(e.getMessage(), e);
return false;
}
} }
} }

View File

@ -120,33 +120,9 @@ public class ArbitraryDataFileRequestThread implements Runnable {
return; return;
} }
EventBus.INSTANCE.notify(
new DataMonitorEvent(
System.currentTimeMillis(),
arbitraryTransactionData.getIdentifier(),
arbitraryTransactionData.getName(),
arbitraryTransactionData.getService().name(),
"fetching file from peer",
arbitraryTransactionData.getTimestamp(),
arbitraryTransactionData.getTimestamp()
)
);
LOGGER.trace("Fetching file {} from peer {} via request thread...", hash58, peer); LOGGER.trace("Fetching file {} from peer {} via request thread...", hash58, peer);
arbitraryDataFileManager.fetchArbitraryDataFiles(repository, peer, signature, arbitraryTransactionData, Arrays.asList(hash)); arbitraryDataFileManager.fetchArbitraryDataFiles(repository, peer, signature, arbitraryTransactionData, Arrays.asList(hash));
EventBus.INSTANCE.notify(
new DataMonitorEvent(
System.currentTimeMillis(),
arbitraryTransactionData.getIdentifier(),
arbitraryTransactionData.getName(),
arbitraryTransactionData.getService().name(),
"fetched file from peer",
arbitraryTransactionData.getTimestamp(),
arbitraryTransactionData.getTimestamp()
)
);
} catch (DataException e) { } catch (DataException e) {
LOGGER.debug("Unable to process file hashes: {}", e.getMessage()); LOGGER.debug("Unable to process file hashes: {}", e.getMessage());
} }

View File

@ -30,6 +30,7 @@ import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.*; import java.util.*;
import java.util.stream.Collectors;
public class ArbitraryDataManager extends Thread { public class ArbitraryDataManager extends Thread {
@ -336,6 +337,20 @@ public class ArbitraryDataManager extends Thread {
final int limit = 100; final int limit = 100;
int offset = 0; int offset = 0;
List<ArbitraryTransactionData> allArbitraryTransactionsInDescendingOrder;
try (final Repository repository = RepositoryManager.getRepository()) {
allArbitraryTransactionsInDescendingOrder
= repository.getArbitraryRepository()
.getLatestArbitraryTransactions(Settings.getInstance().getDataFetchLimit());
} catch( Exception e) {
LOGGER.error(e.getMessage(), e);
allArbitraryTransactionsInDescendingOrder = new ArrayList<>(0);
}
// collect processed transactions in a set to ensure outdated data transactions do not get fetched
Set<ArbitraryTransactionDataHashWrapper> processedTransactions = new HashSet<>();
while (!isStopping) { while (!isStopping) {
final int minSeconds = 3; final int minSeconds = 3;
final int maxSeconds = 10; final int maxSeconds = 10;
@ -344,8 +359,8 @@ public class ArbitraryDataManager extends Thread {
// Any arbitrary transactions we want to fetch data for? // Any arbitrary transactions we want to fetch data for?
try (final Repository repository = RepositoryManager.getRepository()) { try (final Repository repository = RepositoryManager.getRepository()) {
List<byte[]> signatures = repository.getTransactionRepository().getSignaturesMatchingCriteria(null, null, null, ARBITRARY_TX_TYPE, null, null, null, ConfirmationStatus.BOTH, limit, offset, true); List<byte[]> signatures = processTransactionsForSignatures(limit, offset, allArbitraryTransactionsInDescendingOrder, processedTransactions);
// LOGGER.trace("Found {} arbitrary transactions at offset: {}, limit: {}", signatures.size(), offset, limit);
if (signatures == null || signatures.isEmpty()) { if (signatures == null || signatures.isEmpty()) {
offset = 0; offset = 0;
break; break;
@ -390,30 +405,10 @@ public class ArbitraryDataManager extends Thread {
continue; continue;
} }
// Check to see if we have had a more recent PUT // No longer need to see if we have had a more recent PUT since we compared the transactions to process
// to the transactions previously processed, so we can fetch the transactiondata, notify the event bus,
// fetch the metadata and notify the event bus again
ArbitraryTransactionData arbitraryTransactionData = ArbitraryTransactionUtils.fetchTransactionData(repository, signature); ArbitraryTransactionData arbitraryTransactionData = ArbitraryTransactionUtils.fetchTransactionData(repository, signature);
Optional<ArbitraryTransactionData> moreRecentPutTransaction = ArbitraryTransactionUtils.hasMoreRecentPutTransaction(repository, arbitraryTransactionData);
if (moreRecentPutTransaction.isPresent()) {
EventBus.INSTANCE.notify(
new DataMonitorEvent(
System.currentTimeMillis(),
arbitraryTransactionData.getIdentifier(),
arbitraryTransactionData.getName(),
arbitraryTransactionData.getService().name(),
"not fetching old metadata",
arbitraryTransactionData.getTimestamp(),
moreRecentPutTransaction.get().getTimestamp()
)
);
// There is a more recent PUT transaction than the one we are currently processing.
// When a PUT is issued, it replaces any layers that would have been there before.
// Therefore any data relating to this older transaction is no longer needed and we
// shouldn't fetch it from the network.
continue;
}
EventBus.INSTANCE.notify( EventBus.INSTANCE.notify(
new DataMonitorEvent( new DataMonitorEvent(
@ -443,10 +438,49 @@ public class ArbitraryDataManager extends Thread {
); );
} catch (DataException e) { } catch (DataException e) {
LOGGER.error("Repository issue when fetching arbitrary transaction data", e); LOGGER.error("Repository issue when fetching arbitrary transaction data", e);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
} }
} }
} }
private static List<byte[]> processTransactionsForSignatures(int limit, int offset, List<ArbitraryTransactionData> allArbitraryTransactionsInDescendingOrder, Set<ArbitraryTransactionDataHashWrapper> processedTransactions) {
// these transactions are in descending order, latest transactions come first
List<ArbitraryTransactionData> transactions
= allArbitraryTransactionsInDescendingOrder.stream()
.skip(offset)
.limit(limit)
.collect(Collectors.toList());
// wrap the transactions, so they can be used for hashing and comparing
// Class ArbitraryTransactionDataHashWrapper supports hashCode() and equals(...) for this purpose
List<ArbitraryTransactionDataHashWrapper> wrappedTransactions
= transactions.stream()
.map(transaction -> new ArbitraryTransactionDataHashWrapper(transaction))
.collect(Collectors.toList());
// create a set of wrappers and populate it first to last, so that all outdated transactions get rejected
Set<ArbitraryTransactionDataHashWrapper> transactionsToProcess = new HashSet<>(wrappedTransactions.size());
for(ArbitraryTransactionDataHashWrapper wrappedTransaction : wrappedTransactions) {
transactionsToProcess.add(wrappedTransaction);
}
// remove the matches for previously processed transactions,
// because these transactions have had updates that have already been processed
transactionsToProcess.removeAll(processedTransactions);
// add to processed transactions to compare and remove matches from future processing iterations
processedTransactions.addAll(transactionsToProcess);
List<byte[]> signatures
= transactionsToProcess.stream()
.map(transactionToProcess -> transactionToProcess.getData()
.getSignature())
.collect(Collectors.toList());
return signatures;
}
private ArbitraryTransaction fetchTransaction(final Repository repository, byte[] signature) { private ArbitraryTransaction fetchTransaction(final Repository repository, byte[] signature) {
try { try {
TransactionData transactionData = repository.getTransactionRepository().fromSignature(signature); TransactionData transactionData = repository.getTransactionRepository().fromSignature(signature);

View File

@ -0,0 +1,42 @@
package org.qortal.controller.arbitrary;
import org.qortal.arbitrary.misc.Service;
import org.qortal.data.transaction.ArbitraryTransactionData;
import java.util.Objects;
public class ArbitraryTransactionDataHashWrapper {
private final ArbitraryTransactionData data;
private int service;
private String name;
private String identifier;
public ArbitraryTransactionDataHashWrapper(ArbitraryTransactionData data) {
this.data = data;
this.service = data.getService().value;
this.name = data.getName();
this.identifier = data.getIdentifier();
}
public ArbitraryTransactionData getData() {
return data;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ArbitraryTransactionDataHashWrapper that = (ArbitraryTransactionDataHashWrapper) o;
return service == that.service && name.equals(that.name) && Objects.equals(identifier, that.identifier);
}
@Override
public int hashCode() {
return Objects.hash(service, name, identifier);
}
}

View File

@ -27,6 +27,8 @@ public interface ArbitraryRepository {
public List<ArbitraryTransactionData> getArbitraryTransactions(String name, Service service, String identifier, long since) throws DataException; public List<ArbitraryTransactionData> getArbitraryTransactions(String name, Service service, String identifier, long since) throws DataException;
List<ArbitraryTransactionData> getLatestArbitraryTransactions(int limit) throws DataException;
public ArbitraryTransactionData getInitialTransaction(String name, Service service, Method method, String identifier) throws DataException; public ArbitraryTransactionData getInitialTransaction(String name, Service service, Method method, String identifier) throws DataException;
public ArbitraryTransactionData getLatestTransaction(String name, Service service, Method method, String identifier) throws DataException; public ArbitraryTransactionData getLatestTransaction(String name, Service service, Method method, String identifier) throws DataException;

View File

@ -7,7 +7,6 @@ import org.qortal.arbitrary.ArbitraryDataFile;
import org.qortal.arbitrary.metadata.ArbitraryDataTransactionMetadata; import org.qortal.arbitrary.metadata.ArbitraryDataTransactionMetadata;
import org.qortal.arbitrary.misc.Category; import org.qortal.arbitrary.misc.Category;
import org.qortal.arbitrary.misc.Service; import org.qortal.arbitrary.misc.Service;
import org.qortal.controller.arbitrary.ArbitraryDataManager;
import org.qortal.data.arbitrary.ArbitraryResourceCache; import org.qortal.data.arbitrary.ArbitraryResourceCache;
import org.qortal.data.arbitrary.ArbitraryResourceData; import org.qortal.data.arbitrary.ArbitraryResourceData;
import org.qortal.data.arbitrary.ArbitraryResourceMetadata; import org.qortal.data.arbitrary.ArbitraryResourceMetadata;
@ -227,6 +226,76 @@ public class HSQLDBArbitraryRepository implements ArbitraryRepository {
} }
} }
@Override
public List<ArbitraryTransactionData> getLatestArbitraryTransactions(int limit) throws DataException {
String sql = "SELECT type, reference, signature, creator, created_when, fee, " +
"tx_group_id, block_height, approval_status, approval_height, " +
"version, nonce, service, size, is_data_raw, data, metadata_hash, " +
"name, identifier, update_method, secret, compression FROM ArbitraryTransactions " +
"JOIN Transactions USING (signature) " +
"WHERE name IS NOT NULL " +
"ORDER BY created_when DESC " +
"LIMIT ?";
List<ArbitraryTransactionData> arbitraryTransactionData = new ArrayList<>();
try (ResultSet resultSet = this.repository.checkedExecute(sql, limit)) {
if (resultSet == null)
return new ArrayList<>(0);
do {
byte[] reference = resultSet.getBytes(2);
byte[] signature = resultSet.getBytes(3);
byte[] creatorPublicKey = resultSet.getBytes(4);
long timestamp = resultSet.getLong(5);
Long fee = resultSet.getLong(6);
if (fee == 0 && resultSet.wasNull())
fee = null;
int txGroupId = resultSet.getInt(7);
Integer blockHeight = resultSet.getInt(8);
if (blockHeight == 0 && resultSet.wasNull())
blockHeight = null;
ApprovalStatus approvalStatus = ApprovalStatus.valueOf(resultSet.getInt(9));
Integer approvalHeight = resultSet.getInt(10);
if (approvalHeight == 0 && resultSet.wasNull())
approvalHeight = null;
BaseTransactionData baseTransactionData = new BaseTransactionData(timestamp, txGroupId, reference, creatorPublicKey, fee, approvalStatus, blockHeight, approvalHeight, signature);
int version = resultSet.getInt(11);
int nonce = resultSet.getInt(12);
int serviceInt = resultSet.getInt(13);
int size = resultSet.getInt(14);
boolean isDataRaw = resultSet.getBoolean(15); // NOT NULL, so no null to false
DataType dataType = isDataRaw ? DataType.RAW_DATA : DataType.DATA_HASH;
byte[] data = resultSet.getBytes(16);
byte[] metadataHash = resultSet.getBytes(17);
String nameResult = resultSet.getString(18);
String identifierResult = resultSet.getString(19);
Method method = Method.valueOf(resultSet.getInt(20));
byte[] secret = resultSet.getBytes(21);
Compression compression = Compression.valueOf(resultSet.getInt(22));
// FUTURE: get payments from signature if needed. Avoiding for now to reduce database calls.
ArbitraryTransactionData transactionData = new ArbitraryTransactionData(baseTransactionData,
version, serviceInt, nonce, size, nameResult, identifierResult, method, secret,
compression, data, dataType, metadataHash, null);
arbitraryTransactionData.add(transactionData);
} while (resultSet.next());
return arbitraryTransactionData;
} catch (SQLException e) {
throw new DataException("Unable to fetch arbitrary transactions from repository", e);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
return new ArrayList<>(0);
}
}
private ArbitraryTransactionData getSingleTransaction(String name, Service service, Method method, String identifier, boolean firstNotLast) throws DataException { private ArbitraryTransactionData getSingleTransaction(String name, Service service, Method method, String identifier, boolean firstNotLast) throws DataException {
if (name == null || service == null) { if (name == null || service == null) {
// Required fields // Required fields

View File

@ -508,6 +508,8 @@ public class Settings {
*/ */
private boolean connectionPoolMonitorEnabled = false; private boolean connectionPoolMonitorEnabled = false;
private int dataFetchLimit = 1_000_000;
// Domain mapping // Domain mapping
public static class ThreadLimit { public static class ThreadLimit {
private String messageType; private String messageType;
@ -1333,4 +1335,8 @@ public class Settings {
public boolean isConnectionPoolMonitorEnabled() { public boolean isConnectionPoolMonitorEnabled() {
return connectionPoolMonitorEnabled; return connectionPoolMonitorEnabled;
} }
public int getDataFetchLimit() {
return dataFetchLimit;
}
} }