diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataCacheManager.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataCacheManager.java index d6b9303f..c30e190c 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataCacheManager.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataCacheManager.java @@ -6,6 +6,8 @@ import org.qortal.api.resource.TransactionsResource; import org.qortal.controller.Controller; import org.qortal.data.arbitrary.ArbitraryResourceData; import org.qortal.data.transaction.ArbitraryTransactionData; +import org.qortal.event.DataMonitorEvent; +import org.qortal.event.EventBus; import org.qortal.gui.SplashFrame; import org.qortal.repository.DataException; import org.qortal.repository.Repository; @@ -84,15 +86,63 @@ public class ArbitraryDataCacheManager extends Thread { // Update arbitrary resource caches try { + EventBus.INSTANCE.notify( + new DataMonitorEvent( + System.currentTimeMillis(), + transactionData.getIdentifier(), + transactionData.getName(), + transactionData.getService().name(), + "updating resource cache, queue", + transactionData.getTimestamp(), + transactionData.getTimestamp() + ) + ); + ArbitraryTransaction arbitraryTransaction = new ArbitraryTransaction(repository, transactionData); arbitraryTransaction.updateArbitraryResourceCache(repository); arbitraryTransaction.updateArbitraryMetadataCache(repository); repository.saveChanges(); + EventBus.INSTANCE.notify( + new DataMonitorEvent( + System.currentTimeMillis(), + transactionData.getIdentifier(), + transactionData.getName(), + transactionData.getService().name(), + "updated resource cache", + transactionData.getTimestamp(), + transactionData.getTimestamp() + ) + ); + + EventBus.INSTANCE.notify( + new DataMonitorEvent( + System.currentTimeMillis(), + transactionData.getIdentifier(), + transactionData.getName(), + transactionData.getService().name(), + "updating resource status, queue", + transactionData.getTimestamp(), + transactionData.getTimestamp() + ) + ); + // Update status as separate commit, as this is more prone to failure arbitraryTransaction.updateArbitraryResourceStatus(repository); repository.saveChanges(); + EventBus.INSTANCE.notify( + new DataMonitorEvent( + System.currentTimeMillis(), + transactionData.getIdentifier(), + transactionData.getName(), + transactionData.getService().name(), + "updated resource status", + transactionData.getTimestamp(), + transactionData.getTimestamp() + ) + ); + LOGGER.debug(() -> String.format("Finished processing transaction %.8s in arbitrary resource queue...", Base58.encode(transactionData.getSignature()))); } catch (DataException e) { @@ -103,6 +153,9 @@ public class ArbitraryDataCacheManager extends Thread { } catch (DataException e) { LOGGER.error("Repository issue while processing arbitrary resource cache updates", e); } + catch (Exception e) { + LOGGER.error(e.getMessage(), e); + } } public void addToUpdateQueue(ArbitraryTransactionData transactionData) { @@ -163,19 +216,49 @@ public class ArbitraryDataCacheManager extends Thread { // Expand signatures to transactions for (byte[] signature : signatures) { - ArbitraryTransactionData transactionData = (ArbitraryTransactionData) repository - .getTransactionRepository().fromSignature(signature); + try { + ArbitraryTransactionData transactionData = (ArbitraryTransactionData) repository + .getTransactionRepository().fromSignature(signature); - if (transactionData.getService() == null) { - // Unsupported service - ignore this resource - continue; + if (transactionData.getService() == null) { + // Unsupported service - ignore this resource + continue; + } + + EventBus.INSTANCE.notify( + new DataMonitorEvent( + System.currentTimeMillis(), + transactionData.getIdentifier(), + transactionData.getName(), + transactionData.getService().name(), + "updating resource cache, build", + transactionData.getTimestamp(), + transactionData.getTimestamp() + ) + ); + + // Update arbitrary resource caches + ArbitraryTransaction arbitraryTransaction = new ArbitraryTransaction(repository, transactionData); + arbitraryTransaction.updateArbitraryResourceCache(repository); + arbitraryTransaction.updateArbitraryMetadataCache(repository); + repository.saveChanges(); + + EventBus.INSTANCE.notify( + new DataMonitorEvent( + System.currentTimeMillis(), + transactionData.getIdentifier(), + transactionData.getName(), + transactionData.getService().name(), + "updated resource cache", + transactionData.getTimestamp(), + transactionData.getTimestamp() + ) + ); + } catch (DataException e) { + repository.discardChanges(); + + LOGGER.error(e.getMessage(), e); } - - // Update arbitrary resource caches - ArbitraryTransaction arbitraryTransaction = new ArbitraryTransaction(repository, transactionData); - arbitraryTransaction.updateArbitraryResourceCache(repository); - arbitraryTransaction.updateArbitraryMetadataCache(repository); - repository.saveChanges(); } offset += batchSize; } @@ -193,6 +276,11 @@ public class ArbitraryDataCacheManager extends Thread { repository.discardChanges(); throw new DataException("Build of arbitrary resources cache failed."); } + catch (Exception e) { + LOGGER.error(e.getMessage(), e); + + return false; + } } private boolean refreshArbitraryStatuses(Repository repository) throws DataException { @@ -216,10 +304,41 @@ public class ArbitraryDataCacheManager extends Thread { // Loop through hosted transactions for (ArbitraryTransactionData transactionData : hostedTransactions) { - // Determine status and update cache - ArbitraryTransaction arbitraryTransaction = new ArbitraryTransaction(repository, transactionData); - arbitraryTransaction.updateArbitraryResourceStatus(repository); - repository.saveChanges(); + try { + EventBus.INSTANCE.notify( + new DataMonitorEvent( + System.currentTimeMillis(), + transactionData.getIdentifier(), + transactionData.getName(), + transactionData.getService().name(), + "updating resource status", + transactionData.getTimestamp(), + transactionData.getTimestamp() + ) + ); + + // Determine status and update cache + ArbitraryTransaction arbitraryTransaction = new ArbitraryTransaction(repository, transactionData); + arbitraryTransaction.updateArbitraryResourceStatus(repository); + repository.saveChanges(); + + EventBus.INSTANCE.notify( + new DataMonitorEvent( + System.currentTimeMillis(), + transactionData.getIdentifier(), + transactionData.getName(), + transactionData.getService().name(), + "updated resource status", + transactionData.getTimestamp(), + transactionData.getTimestamp() + ) + ); + + } catch (DataException e) { + repository.discardChanges(); + + LOGGER.error(e.getMessage(), e); + } } offset += batchSize; } @@ -234,6 +353,11 @@ public class ArbitraryDataCacheManager extends Thread { repository.discardChanges(); throw new DataException("Refresh of arbitrary resource statuses failed."); } + catch (Exception e) { + LOGGER.error(e.getMessage(), e); + + return false; + } } } diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileRequestThread.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileRequestThread.java index afde07c0..b8285052 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileRequestThread.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileRequestThread.java @@ -120,33 +120,9 @@ public class ArbitraryDataFileRequestThread implements Runnable { return; } - EventBus.INSTANCE.notify( - new DataMonitorEvent( - System.currentTimeMillis(), - arbitraryTransactionData.getIdentifier(), - arbitraryTransactionData.getName(), - arbitraryTransactionData.getService().name(), - "fetching file from peer", - arbitraryTransactionData.getTimestamp(), - arbitraryTransactionData.getTimestamp() - ) - ); - LOGGER.trace("Fetching file {} from peer {} via request thread...", hash58, peer); arbitraryDataFileManager.fetchArbitraryDataFiles(repository, peer, signature, arbitraryTransactionData, Arrays.asList(hash)); - EventBus.INSTANCE.notify( - new DataMonitorEvent( - System.currentTimeMillis(), - arbitraryTransactionData.getIdentifier(), - arbitraryTransactionData.getName(), - arbitraryTransactionData.getService().name(), - "fetched file from peer", - arbitraryTransactionData.getTimestamp(), - arbitraryTransactionData.getTimestamp() - ) - ); - } catch (DataException e) { LOGGER.debug("Unable to process file hashes: {}", e.getMessage()); } diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java index 4f3dbb3c..47a25a03 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java @@ -30,6 +30,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.*; +import java.util.stream.Collectors; public class ArbitraryDataManager extends Thread { @@ -336,6 +337,20 @@ public class ArbitraryDataManager extends Thread { final int limit = 100; int offset = 0; + List allArbitraryTransactionsInDescendingOrder; + + try (final Repository repository = RepositoryManager.getRepository()) { + allArbitraryTransactionsInDescendingOrder + = repository.getArbitraryRepository() + .getLatestArbitraryTransactions(Settings.getInstance().getDataFetchLimit()); + } catch( Exception e) { + LOGGER.error(e.getMessage(), e); + allArbitraryTransactionsInDescendingOrder = new ArrayList<>(0); + } + + // collect processed transactions in a set to ensure outdated data transactions do not get fetched + Set processedTransactions = new HashSet<>(); + while (!isStopping) { final int minSeconds = 3; final int maxSeconds = 10; @@ -344,8 +359,8 @@ public class ArbitraryDataManager extends Thread { // Any arbitrary transactions we want to fetch data for? try (final Repository repository = RepositoryManager.getRepository()) { - List signatures = repository.getTransactionRepository().getSignaturesMatchingCriteria(null, null, null, ARBITRARY_TX_TYPE, null, null, null, ConfirmationStatus.BOTH, limit, offset, true); - // LOGGER.trace("Found {} arbitrary transactions at offset: {}, limit: {}", signatures.size(), offset, limit); + List signatures = processTransactionsForSignatures(limit, offset, allArbitraryTransactionsInDescendingOrder, processedTransactions); + if (signatures == null || signatures.isEmpty()) { offset = 0; break; @@ -390,30 +405,10 @@ public class ArbitraryDataManager extends Thread { continue; } - // Check to see if we have had a more recent PUT + // No longer need to see if we have had a more recent PUT since we compared the transactions to process + // to the transactions previously processed, so we can fetch the transactiondata, notify the event bus, + // fetch the metadata and notify the event bus again ArbitraryTransactionData arbitraryTransactionData = ArbitraryTransactionUtils.fetchTransactionData(repository, signature); - Optional moreRecentPutTransaction = ArbitraryTransactionUtils.hasMoreRecentPutTransaction(repository, arbitraryTransactionData); - - if (moreRecentPutTransaction.isPresent()) { - - EventBus.INSTANCE.notify( - new DataMonitorEvent( - System.currentTimeMillis(), - arbitraryTransactionData.getIdentifier(), - arbitraryTransactionData.getName(), - arbitraryTransactionData.getService().name(), - "not fetching old metadata", - arbitraryTransactionData.getTimestamp(), - moreRecentPutTransaction.get().getTimestamp() - ) - ); - - // There is a more recent PUT transaction than the one we are currently processing. - // When a PUT is issued, it replaces any layers that would have been there before. - // Therefore any data relating to this older transaction is no longer needed and we - // shouldn't fetch it from the network. - continue; - } EventBus.INSTANCE.notify( new DataMonitorEvent( @@ -443,10 +438,49 @@ public class ArbitraryDataManager extends Thread { ); } catch (DataException e) { LOGGER.error("Repository issue when fetching arbitrary transaction data", e); + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); } } } + private static List processTransactionsForSignatures(int limit, int offset, List allArbitraryTransactionsInDescendingOrder, Set processedTransactions) { + // these transactions are in descending order, latest transactions come first + List transactions + = allArbitraryTransactionsInDescendingOrder.stream() + .skip(offset) + .limit(limit) + .collect(Collectors.toList()); + + // wrap the transactions, so they can be used for hashing and comparing + // Class ArbitraryTransactionDataHashWrapper supports hashCode() and equals(...) for this purpose + List wrappedTransactions + = transactions.stream() + .map(transaction -> new ArbitraryTransactionDataHashWrapper(transaction)) + .collect(Collectors.toList()); + + // create a set of wrappers and populate it first to last, so that all outdated transactions get rejected + Set transactionsToProcess = new HashSet<>(wrappedTransactions.size()); + for(ArbitraryTransactionDataHashWrapper wrappedTransaction : wrappedTransactions) { + transactionsToProcess.add(wrappedTransaction); + } + + // remove the matches for previously processed transactions, + // because these transactions have had updates that have already been processed + transactionsToProcess.removeAll(processedTransactions); + + // add to processed transactions to compare and remove matches from future processing iterations + processedTransactions.addAll(transactionsToProcess); + + List signatures + = transactionsToProcess.stream() + .map(transactionToProcess -> transactionToProcess.getData() + .getSignature()) + .collect(Collectors.toList()); + + return signatures; + } + private ArbitraryTransaction fetchTransaction(final Repository repository, byte[] signature) { try { TransactionData transactionData = repository.getTransactionRepository().fromSignature(signature); diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryTransactionDataHashWrapper.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryTransactionDataHashWrapper.java new file mode 100644 index 00000000..0f64652c --- /dev/null +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryTransactionDataHashWrapper.java @@ -0,0 +1,42 @@ +package org.qortal.controller.arbitrary; + +import org.qortal.arbitrary.misc.Service; +import org.qortal.data.transaction.ArbitraryTransactionData; + +import java.util.Objects; + +public class ArbitraryTransactionDataHashWrapper { + + private final ArbitraryTransactionData data; + + private int service; + + private String name; + + private String identifier; + + public ArbitraryTransactionDataHashWrapper(ArbitraryTransactionData data) { + this.data = data; + + this.service = data.getService().value; + this.name = data.getName(); + this.identifier = data.getIdentifier(); + } + + public ArbitraryTransactionData getData() { + return data; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ArbitraryTransactionDataHashWrapper that = (ArbitraryTransactionDataHashWrapper) o; + return service == that.service && name.equals(that.name) && Objects.equals(identifier, that.identifier); + } + + @Override + public int hashCode() { + return Objects.hash(service, name, identifier); + } +} diff --git a/src/main/java/org/qortal/repository/ArbitraryRepository.java b/src/main/java/org/qortal/repository/ArbitraryRepository.java index 1c0e84e2..ce4ef75e 100644 --- a/src/main/java/org/qortal/repository/ArbitraryRepository.java +++ b/src/main/java/org/qortal/repository/ArbitraryRepository.java @@ -27,6 +27,8 @@ public interface ArbitraryRepository { public List getArbitraryTransactions(String name, Service service, String identifier, long since) throws DataException; + List getLatestArbitraryTransactions(int limit) throws DataException; + public ArbitraryTransactionData getInitialTransaction(String name, Service service, Method method, String identifier) throws DataException; public ArbitraryTransactionData getLatestTransaction(String name, Service service, Method method, String identifier) throws DataException; diff --git a/src/main/java/org/qortal/repository/hsqldb/HSQLDBArbitraryRepository.java b/src/main/java/org/qortal/repository/hsqldb/HSQLDBArbitraryRepository.java index 049e98aa..d3a49ddd 100644 --- a/src/main/java/org/qortal/repository/hsqldb/HSQLDBArbitraryRepository.java +++ b/src/main/java/org/qortal/repository/hsqldb/HSQLDBArbitraryRepository.java @@ -7,7 +7,6 @@ import org.qortal.arbitrary.ArbitraryDataFile; import org.qortal.arbitrary.metadata.ArbitraryDataTransactionMetadata; import org.qortal.arbitrary.misc.Category; import org.qortal.arbitrary.misc.Service; -import org.qortal.controller.arbitrary.ArbitraryDataManager; import org.qortal.data.arbitrary.ArbitraryResourceCache; import org.qortal.data.arbitrary.ArbitraryResourceData; import org.qortal.data.arbitrary.ArbitraryResourceMetadata; @@ -227,6 +226,76 @@ public class HSQLDBArbitraryRepository implements ArbitraryRepository { } } + @Override + public List getLatestArbitraryTransactions(int limit) throws DataException { + String sql = "SELECT type, reference, signature, creator, created_when, fee, " + + "tx_group_id, block_height, approval_status, approval_height, " + + "version, nonce, service, size, is_data_raw, data, metadata_hash, " + + "name, identifier, update_method, secret, compression FROM ArbitraryTransactions " + + "JOIN Transactions USING (signature) " + + "WHERE name IS NOT NULL " + + "ORDER BY created_when DESC " + + "LIMIT ?"; + List arbitraryTransactionData = new ArrayList<>(); + + try (ResultSet resultSet = this.repository.checkedExecute(sql, limit)) { + if (resultSet == null) + return new ArrayList<>(0); + + do { + byte[] reference = resultSet.getBytes(2); + byte[] signature = resultSet.getBytes(3); + byte[] creatorPublicKey = resultSet.getBytes(4); + long timestamp = resultSet.getLong(5); + + Long fee = resultSet.getLong(6); + if (fee == 0 && resultSet.wasNull()) + fee = null; + + int txGroupId = resultSet.getInt(7); + + Integer blockHeight = resultSet.getInt(8); + if (blockHeight == 0 && resultSet.wasNull()) + blockHeight = null; + + ApprovalStatus approvalStatus = ApprovalStatus.valueOf(resultSet.getInt(9)); + Integer approvalHeight = resultSet.getInt(10); + if (approvalHeight == 0 && resultSet.wasNull()) + approvalHeight = null; + + BaseTransactionData baseTransactionData = new BaseTransactionData(timestamp, txGroupId, reference, creatorPublicKey, fee, approvalStatus, blockHeight, approvalHeight, signature); + + int version = resultSet.getInt(11); + int nonce = resultSet.getInt(12); + int serviceInt = resultSet.getInt(13); + int size = resultSet.getInt(14); + boolean isDataRaw = resultSet.getBoolean(15); // NOT NULL, so no null to false + DataType dataType = isDataRaw ? DataType.RAW_DATA : DataType.DATA_HASH; + byte[] data = resultSet.getBytes(16); + byte[] metadataHash = resultSet.getBytes(17); + String nameResult = resultSet.getString(18); + String identifierResult = resultSet.getString(19); + Method method = Method.valueOf(resultSet.getInt(20)); + byte[] secret = resultSet.getBytes(21); + Compression compression = Compression.valueOf(resultSet.getInt(22)); + // FUTURE: get payments from signature if needed. Avoiding for now to reduce database calls. + + ArbitraryTransactionData transactionData = new ArbitraryTransactionData(baseTransactionData, + version, serviceInt, nonce, size, nameResult, identifierResult, method, secret, + compression, data, dataType, metadataHash, null); + + arbitraryTransactionData.add(transactionData); + } while (resultSet.next()); + + return arbitraryTransactionData; + } catch (SQLException e) { + throw new DataException("Unable to fetch arbitrary transactions from repository", e); + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); + return new ArrayList<>(0); + } + } + private ArbitraryTransactionData getSingleTransaction(String name, Service service, Method method, String identifier, boolean firstNotLast) throws DataException { if (name == null || service == null) { // Required fields diff --git a/src/main/java/org/qortal/settings/Settings.java b/src/main/java/org/qortal/settings/Settings.java index 3a0d17bb..eede9756 100644 --- a/src/main/java/org/qortal/settings/Settings.java +++ b/src/main/java/org/qortal/settings/Settings.java @@ -508,6 +508,8 @@ public class Settings { */ private boolean connectionPoolMonitorEnabled = false; + private int dataFetchLimit = 1_000_000; + // Domain mapping public static class ThreadLimit { private String messageType; @@ -1333,4 +1335,8 @@ public class Settings { public boolean isConnectionPoolMonitorEnabled() { return connectionPoolMonitorEnabled; } + + public int getDataFetchLimit() { + return dataFetchLimit; + } }