From df373721802e2f4701996cdf63e7e9e1124e2acd Mon Sep 17 00:00:00 2001 From: kennycud Date: Tue, 11 Feb 2025 18:45:57 -0800 Subject: [PATCH 1/3] trade ledger export implementation, completed trades bug fix --- .../api/model/CrossChainTradeLedgerEntry.java | 72 +++++++++ .../api/resource/CrossChainResource.java | 141 +++++++++++++++++- .../qortal/api/resource/CrossChainUtils.java | 139 +++++++++++++++++ .../java/org/qortal/crosschain/Bitcoiny.java | 1 + .../qortal/crosschain/ForeignBlockchain.java | 2 + .../org/qortal/repository/ATRepository.java | 6 +- .../repository/hsqldb/HSQLDBATRepository.java | 29 ++-- .../qortal/test/api/CrossChainUtilsTests.java | 54 +++++++ 8 files changed, 422 insertions(+), 22 deletions(-) create mode 100644 src/main/java/org/qortal/api/model/CrossChainTradeLedgerEntry.java diff --git a/src/main/java/org/qortal/api/model/CrossChainTradeLedgerEntry.java b/src/main/java/org/qortal/api/model/CrossChainTradeLedgerEntry.java new file mode 100644 index 00000000..34f8fc57 --- /dev/null +++ b/src/main/java/org/qortal/api/model/CrossChainTradeLedgerEntry.java @@ -0,0 +1,72 @@ +package org.qortal.api.model; + +import io.swagger.v3.oas.annotations.media.Schema; +import org.qortal.data.crosschain.CrossChainTradeData; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; + +// All properties to be converted to JSON via JAXB +@XmlAccessorType(XmlAccessType.FIELD) +public class CrossChainTradeLedgerEntry { + + private String market; + + private String currency; + + @XmlJavaTypeAdapter(value = org.qortal.api.AmountTypeAdapter.class) + private long quantity; + + @XmlJavaTypeAdapter(value = org.qortal.api.AmountTypeAdapter.class) + private long feeAmount; + + private String feeCurrency; + + @XmlJavaTypeAdapter(value = org.qortal.api.AmountTypeAdapter.class) + private long totalPrice; + + private long tradeTimestamp; + + protected CrossChainTradeLedgerEntry() { + /* For JAXB */ + } + + public CrossChainTradeLedgerEntry(String market, String currency, long quantity, long feeAmount, String feeCurrency, long totalPrice, long tradeTimestamp) { + this.market = market; + this.currency = currency; + this.quantity = quantity; + this.feeAmount = feeAmount; + this.feeCurrency = feeCurrency; + this.totalPrice = totalPrice; + this.tradeTimestamp = tradeTimestamp; + } + + public String getMarket() { + return market; + } + + public String getCurrency() { + return currency; + } + + public long getQuantity() { + return quantity; + } + + public long getFeeAmount() { + return feeAmount; + } + + public String getFeeCurrency() { + return feeCurrency; + } + + public long getTotalPrice() { + return totalPrice; + } + + public long getTradeTimestamp() { + return tradeTimestamp; + } +} \ No newline at end of file diff --git a/src/main/java/org/qortal/api/resource/CrossChainResource.java b/src/main/java/org/qortal/api/resource/CrossChainResource.java index 748dcbe4..3f7acf68 100644 --- a/src/main/java/org/qortal/api/resource/CrossChainResource.java +++ b/src/main/java/org/qortal/api/resource/CrossChainResource.java @@ -10,11 +10,13 @@ import io.swagger.v3.oas.annotations.parameters.RequestBody; import io.swagger.v3.oas.annotations.responses.ApiResponse; import io.swagger.v3.oas.annotations.security.SecurityRequirement; import io.swagger.v3.oas.annotations.tags.Tag; +import org.glassfish.jersey.media.multipart.ContentDisposition; import org.qortal.api.ApiError; import org.qortal.api.ApiErrors; import org.qortal.api.ApiExceptionFactory; import org.qortal.api.Security; import org.qortal.api.model.CrossChainCancelRequest; +import org.qortal.api.model.CrossChainTradeLedgerEntry; import org.qortal.api.model.CrossChainTradeSummary; import org.qortal.controller.tradebot.TradeBot; import org.qortal.crosschain.ACCT; @@ -44,10 +46,14 @@ import org.qortal.utils.Base58; import org.qortal.utils.ByteArray; import org.qortal.utils.NTP; +import javax.servlet.ServletContext; import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; import javax.ws.rs.*; import javax.ws.rs.core.Context; +import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.MediaType; +import java.io.IOException; import java.util.*; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -61,6 +67,13 @@ public class CrossChainResource { @Context HttpServletRequest request; + @Context + HttpServletResponse response; + + @Context + ServletContext context; + + @GET @Path("/tradeoffers") @Operation( @@ -258,11 +271,11 @@ public class CrossChainResource { example = "1597310000000" ) @QueryParam("minimumTimestamp") Long minimumTimestamp, @Parameter( - description = "Optionally filter by buyer Qortal address" - ) @QueryParam("buyerAddress") String buyerAddress, + description = "Optionally filter by buyer Qortal public key" + ) @QueryParam("buyerPublicKey") String buyerPublicKey58, @Parameter( - description = "Optionally filter by seller Qortal address" - ) @QueryParam("sellerAddress") String sellerAddress, + description = "Optionally filter by seller Qortal public key" + ) @QueryParam("sellerPublicKey") String sellerPublicKey58, @Parameter( ref = "limit") @QueryParam("limit") Integer limit, @Parameter( ref = "offset" ) @QueryParam("offset") Integer offset, @Parameter( ref = "reverse" ) @QueryParam("reverse") Boolean reverse) { @@ -274,6 +287,10 @@ public class CrossChainResource { if (minimumTimestamp != null && minimumTimestamp <= 0) throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.INVALID_CRITERIA); + // Decode public keys + byte[] buyerPublicKey = decodePublicKey(buyerPublicKey58); + byte[] sellerPublicKey = decodePublicKey(sellerPublicKey58); + final Boolean isFinished = Boolean.TRUE; try (final Repository repository = RepositoryManager.getRepository()) { @@ -304,7 +321,7 @@ public class CrossChainResource { byte[] codeHash = acctInfo.getKey().value; ACCT acct = acctInfo.getValue().get(); - List atStates = repository.getATRepository().getMatchingFinalATStates(codeHash, buyerAddress, sellerAddress, + List atStates = repository.getATRepository().getMatchingFinalATStates(codeHash, buyerPublicKey, sellerPublicKey, isFinished, acct.getModeByteOffset(), (long) AcctMode.REDEEMED.value, minimumFinalHeight, limit, offset, reverse); @@ -343,6 +360,120 @@ public class CrossChainResource { } } + /** + * Decode Public Key + * + * @param publicKey58 the public key in a string + * + * @return the public key in bytes + */ + private byte[] decodePublicKey(String publicKey58) { + + if( publicKey58 == null ) return null; + if( publicKey58.isEmpty() ) return new byte[0]; + + byte[] publicKey; + try { + publicKey = Base58.decode(publicKey58); + } catch (NumberFormatException e) { + throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.INVALID_PUBLIC_KEY, e); + } + + // Correct size for public key? + if (publicKey.length != Transformer.PUBLIC_KEY_LENGTH) + throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.INVALID_PUBLIC_KEY); + + return publicKey; + } + + @GET + @Path("/ledger/{publicKey}") + @Operation( + summary = "Accounting entries for all trades.", + description = "Returns accounting entries for all completed cross-chain trades", + responses = { + @ApiResponse( + content = @Content( + schema = @Schema( + type = "string", + format = "byte" + ) + ) + ) + } + ) + @ApiErrors({ApiError.INVALID_CRITERIA, ApiError.REPOSITORY_ISSUE}) + public HttpServletResponse getLedgerEntries( + @PathParam("publicKey") String publicKey58, + @Parameter( + description = "Only return trades that completed on/after this timestamp (milliseconds since epoch)", + example = "1597310000000" + ) @QueryParam("minimumTimestamp") Long minimumTimestamp) { + + byte[] publicKey = decodePublicKey(publicKey58); + + // minimumTimestamp (if given) needs to be positive + if (minimumTimestamp != null && minimumTimestamp <= 0) + throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.INVALID_CRITERIA); + + try (final Repository repository = RepositoryManager.getRepository()) { + Integer minimumFinalHeight = null; + + if (minimumTimestamp != null) { + minimumFinalHeight = repository.getBlockRepository().getHeightFromTimestamp(minimumTimestamp); + // If not found in the block repository it will return either 0 or 1 + if (minimumFinalHeight == 0 || minimumFinalHeight == 1) { + // Try the archive + minimumFinalHeight = repository.getBlockArchiveRepository().getHeightFromTimestamp(minimumTimestamp); + } + + if (minimumFinalHeight == 0) + // We don't have any blocks since minimumTimestamp, let alone trades, so nothing to return + return response; + + // height returned from repository is for block BEFORE timestamp + // but we want trades AFTER timestamp so bump height accordingly + minimumFinalHeight++; + } + + List crossChainTradeLedgerEntries = new ArrayList<>(); + + Map> acctsByCodeHash = SupportedBlockchain.getAcctMap(); + + // collect ledger entries for each ACCT + for (Map.Entry> acctInfo : acctsByCodeHash.entrySet()) { + byte[] codeHash = acctInfo.getKey().value; + ACCT acct = acctInfo.getValue().get(); + + // collect buys and sells + CrossChainUtils.collectLedgerEntries(publicKey, repository, minimumFinalHeight, crossChainTradeLedgerEntries, codeHash, acct, true); + CrossChainUtils.collectLedgerEntries(publicKey, repository, minimumFinalHeight, crossChainTradeLedgerEntries, codeHash, acct, false); + } + + crossChainTradeLedgerEntries.sort((a, b) -> Longs.compare(a.getTradeTimestamp(), b.getTradeTimestamp())); + + response.setStatus(HttpServletResponse.SC_OK); + response.setContentType("text/csv"); + response.setHeader( + HttpHeaders.CONTENT_DISPOSITION, + ContentDisposition + .type("attachment") + .fileName(CrossChainUtils.createLedgerFileName(Crypto.toAddress(publicKey))) + .build() + .toString() + ); + + CrossChainUtils.writeToLedger( response.getWriter(), crossChainTradeLedgerEntries); + + return response; + } catch (DataException e) { + throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.REPOSITORY_ISSUE, e); + } catch (IOException e) { + response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + return response; + } + } + @GET @Path("/price/{blockchain}") @Operation( diff --git a/src/main/java/org/qortal/api/resource/CrossChainUtils.java b/src/main/java/org/qortal/api/resource/CrossChainUtils.java index 802faca1..ddd1d2d6 100644 --- a/src/main/java/org/qortal/api/resource/CrossChainUtils.java +++ b/src/main/java/org/qortal/api/resource/CrossChainUtils.java @@ -10,21 +10,36 @@ import org.bitcoinj.script.ScriptBuilder; import org.bouncycastle.util.Strings; import org.json.simple.JSONObject; +import org.qortal.api.model.CrossChainTradeLedgerEntry; import org.qortal.api.model.crosschain.BitcoinyTBDRequest; import org.qortal.crosschain.*; import org.qortal.data.at.ATData; +import org.qortal.data.at.ATStateData; import org.qortal.data.crosschain.*; import org.qortal.repository.DataException; import org.qortal.repository.Repository; +import org.qortal.utils.Amounts; import org.qortal.utils.BitTwiddling; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.io.Writer; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.time.Instant; +import java.time.ZoneId; +import java.time.ZonedDateTime; import java.util.*; import java.util.stream.Collectors; public class CrossChainUtils { + public static final String QORT_CURRENCY_CODE = "QORT"; private static final Logger LOGGER = LogManager.getLogger(CrossChainUtils.class); public static final String CORE_API_CALL = "Core API Call"; + public static final String QORTAL_EXCHANGE_LABEL = "Qortal"; public static ServerConfigurationInfo buildServerConfigurationInfo(Bitcoiny blockchain) { @@ -632,4 +647,128 @@ public class CrossChainUtils { byte[] lockTimeABytes = BitTwiddling.toBEByteArray((long) lockTimeA); return Bytes.concat(partnerBitcoinPKH, hashOfSecretA, lockTimeABytes); } + + /** + * Write To Ledger + * + * @param writer the writer to the ledger + * @param entries the entries to write to the ledger + * + * @throws IOException + */ + public static void writeToLedger(Writer writer, List entries) throws IOException { + + BufferedWriter bufferedWriter = new BufferedWriter(writer); + + StringJoiner header = new StringJoiner(","); + header.add("Market"); + header.add("Currency"); + header.add("Quantity"); + header.add("Commission Paid"); + header.add("Commission Currency"); + header.add("Total Price"); + header.add("Date Time"); + header.add("Exchange"); + + bufferedWriter.append(header.toString()); + + DateFormat dateFormatter = new SimpleDateFormat("yyyyMMdd HH:mm"); + dateFormatter.setTimeZone(TimeZone.getTimeZone("UTC")); + + for( CrossChainTradeLedgerEntry entry : entries ) { + StringJoiner joiner = new StringJoiner(","); + + joiner.add(entry.getMarket()); + joiner.add(entry.getCurrency()); + joiner.add(String.valueOf(Amounts.prettyAmount(entry.getQuantity()))); + joiner.add(String.valueOf(Amounts.prettyAmount(entry.getFeeAmount()))); + joiner.add(entry.getFeeCurrency()); + joiner.add(String.valueOf(Amounts.prettyAmount(entry.getTotalPrice()))); + joiner.add(dateFormatter.format(new Date(entry.getTradeTimestamp()))); + joiner.add(QORTAL_EXCHANGE_LABEL); + + bufferedWriter.newLine(); + bufferedWriter.append(joiner.toString()); + } + + bufferedWriter.newLine(); + bufferedWriter.flush(); + } + + /** + * Create Ledger File Name + * + * Create a file name the includes timestamp and address. + * + * @param address the address + * + * @return the file name created + */ + public static String createLedgerFileName(String address) { + DateFormat dateFormatter = new SimpleDateFormat("yyyyMMddHHmmss"); + String fileName = "ledger-" + address + "-" + dateFormatter.format(new Date()); + return fileName; + } + + /** + * Collect Ledger Entries + * + * @param publicKey the public key for the ledger entries, buy and sell + * @param repository the data repository + * @param minimumFinalHeight the minimum block height for entries to be collected + * @param entries the ledger entries to add to + * @param codeHash code hash for the entry blockchain + * @param acct the ACCT for the entry blockchain + * @param isBuy true collecting entries for a buy, otherwise false + * + * @throws DataException + */ + public static void collectLedgerEntries( + byte[] publicKey, + Repository repository, + Integer minimumFinalHeight, + List entries, + byte[] codeHash, + ACCT acct, + boolean isBuy) throws DataException { + + // get all the final AT states for the code hash (foreign coin) + List atStates + = repository.getATRepository().getMatchingFinalATStates( + codeHash, + isBuy ? publicKey : null, + !isBuy ? publicKey : null, + Boolean.TRUE, acct.getModeByteOffset(), + (long) AcctMode.REDEEMED.value, + minimumFinalHeight, + null, null, false + ); + + String foreignBlockchainCurrencyCode = acct.getBlockchain().getCurrencyCode(); + + // for each trade, build ledger entry, collect ledger entry + for (ATStateData atState : atStates) { + CrossChainTradeData crossChainTradeData = acct.populateTradeData(repository, atState); + + // We also need block timestamp for use as trade timestamp + long localTimestamp = repository.getBlockRepository().getTimestampFromHeight(atState.getHeight()); + + if (localTimestamp == 0) { + // Try the archive + localTimestamp = repository.getBlockArchiveRepository().getTimestampFromHeight(atState.getHeight()); + } + + CrossChainTradeLedgerEntry ledgerEntry + = new CrossChainTradeLedgerEntry( + isBuy ? QORT_CURRENCY_CODE : foreignBlockchainCurrencyCode, + isBuy ? foreignBlockchainCurrencyCode : QORT_CURRENCY_CODE, + isBuy ? crossChainTradeData.qortAmount : crossChainTradeData.expectedForeignAmount, + 0, + foreignBlockchainCurrencyCode, + isBuy ? crossChainTradeData.expectedForeignAmount : crossChainTradeData.qortAmount, + localTimestamp); + + entries.add(ledgerEntry); + } + } } \ No newline at end of file diff --git a/src/main/java/org/qortal/crosschain/Bitcoiny.java b/src/main/java/org/qortal/crosschain/Bitcoiny.java index a4f5a2af..d93fa65f 100644 --- a/src/main/java/org/qortal/crosschain/Bitcoiny.java +++ b/src/main/java/org/qortal/crosschain/Bitcoiny.java @@ -83,6 +83,7 @@ public abstract class Bitcoiny implements ForeignBlockchain { return this.bitcoinjContext; } + @Override public String getCurrencyCode() { return this.currencyCode; } diff --git a/src/main/java/org/qortal/crosschain/ForeignBlockchain.java b/src/main/java/org/qortal/crosschain/ForeignBlockchain.java index fe64ab83..c66f2719 100644 --- a/src/main/java/org/qortal/crosschain/ForeignBlockchain.java +++ b/src/main/java/org/qortal/crosschain/ForeignBlockchain.java @@ -2,6 +2,8 @@ package org.qortal.crosschain; public interface ForeignBlockchain { + public String getCurrencyCode(); + public boolean isValidAddress(String address); public boolean isValidWalletKey(String walletKey); diff --git a/src/main/java/org/qortal/repository/ATRepository.java b/src/main/java/org/qortal/repository/ATRepository.java index fe001137..2b653ab5 100644 --- a/src/main/java/org/qortal/repository/ATRepository.java +++ b/src/main/java/org/qortal/repository/ATRepository.java @@ -76,9 +76,9 @@ public interface ATRepository { * Although expectedValue, if provided, is natively an unsigned long, * the data segment comparison is done via unsigned hex string. */ - public List getMatchingFinalATStates(byte[] codeHash, String buyerAddress, String sellerAddress, Boolean isFinished, - Integer dataByteOffset, Long expectedValue, Integer minimumFinalHeight, - Integer limit, Integer offset, Boolean reverse) throws DataException; + public List getMatchingFinalATStates(byte[] codeHash, byte[] buyerPublicKey, byte[] sellerPublicKey, Boolean isFinished, + Integer dataByteOffset, Long expectedValue, Integer minimumFinalHeight, + Integer limit, Integer offset, Boolean reverse) throws DataException; /** * Returns final ATStateData for ATs matching codeHash (required) diff --git a/src/main/java/org/qortal/repository/hsqldb/HSQLDBATRepository.java b/src/main/java/org/qortal/repository/hsqldb/HSQLDBATRepository.java index 71a95428..6310ec02 100644 --- a/src/main/java/org/qortal/repository/hsqldb/HSQLDBATRepository.java +++ b/src/main/java/org/qortal/repository/hsqldb/HSQLDBATRepository.java @@ -5,6 +5,7 @@ import com.google.common.primitives.Longs; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.qortal.controller.Controller; +import org.qortal.crypto.Crypto; import org.qortal.data.at.ATData; import org.qortal.data.at.ATStateData; import org.qortal.repository.ATRepository; @@ -403,9 +404,9 @@ public class HSQLDBATRepository implements ATRepository { } @Override - public List getMatchingFinalATStates(byte[] codeHash, String buyerAddress, String sellerAddress, Boolean isFinished, - Integer dataByteOffset, Long expectedValue, Integer minimumFinalHeight, - Integer limit, Integer offset, Boolean reverse) throws DataException { + public List getMatchingFinalATStates(byte[] codeHash, byte[] buyerPublicKey, byte[] sellerPublicKey, Boolean isFinished, + Integer dataByteOffset, Long expectedValue, Integer minimumFinalHeight, + Integer limit, Integer offset, Boolean reverse) throws DataException { StringBuilder sql = new StringBuilder(1024); List bindParams = new ArrayList<>(); @@ -426,9 +427,9 @@ public class HSQLDBATRepository implements ATRepository { // Both must be the same direction (DESC) also sql.append("ORDER BY ATStates.height DESC LIMIT 1) AS FinalATStates "); - // Optional LEFT JOIN with ATTRANSACTIONS for buyerAddress - if (buyerAddress != null && !buyerAddress.isEmpty()) { - sql.append("LEFT JOIN ATTRANSACTIONS tx ON tx.at_address = ATs.AT_address "); + // Optional JOIN with ATTRANSACTIONS for buyerAddress + if (buyerPublicKey != null && buyerPublicKey.length > 0) { + sql.append("JOIN ATTRANSACTIONS tx ON tx.at_address = ATs.AT_address "); } sql.append("WHERE ATs.code_hash = ? "); @@ -450,18 +451,18 @@ public class HSQLDBATRepository implements ATRepository { bindParams.add(rawExpectedValue); } - if (buyerAddress != null && !buyerAddress.isEmpty()) { - sql.append("AND tx.recipient = ? "); - bindParams.add(buyerAddress); + if (buyerPublicKey != null && buyerPublicKey.length > 0 ) { + // the buyer must be the recipient of the transaction and not the creator of the AT + sql.append("AND tx.recipient = ? AND ATs.creator != ? "); + + bindParams.add(Crypto.toAddress(buyerPublicKey)); + bindParams.add(buyerPublicKey); } - if (sellerAddress != null && !sellerAddress.isEmpty()) { - // Convert sellerAddress to publicKey (method depends on your implementation) - AccountData accountData = this.repository.getAccountRepository().getAccount(sellerAddress); - byte[] publicKey = accountData.getPublicKey(); + if (sellerPublicKey != null && sellerPublicKey.length > 0) { sql.append("AND ATs.creator = ? "); - bindParams.add(publicKey); + bindParams.add(sellerPublicKey); } sql.append(" ORDER BY FinalATStates.height "); diff --git a/src/test/java/org/qortal/test/api/CrossChainUtilsTests.java b/src/test/java/org/qortal/test/api/CrossChainUtilsTests.java index 0e4a6f07..5c67267f 100644 --- a/src/test/java/org/qortal/test/api/CrossChainUtilsTests.java +++ b/src/test/java/org/qortal/test/api/CrossChainUtilsTests.java @@ -3,10 +3,15 @@ package org.qortal.test.api; import org.json.simple.JSONObject; import org.junit.Assert; import org.junit.Test; +import org.qortal.api.model.CrossChainTradeLedgerEntry; import org.qortal.api.resource.CrossChainUtils; import org.qortal.test.common.ApiCommon; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; public class CrossChainUtilsTests extends ApiCommon { @@ -137,4 +142,53 @@ public class CrossChainUtilsTests extends ApiCommon { Assert.assertEquals(5, versionDecimal, 0.001); Assert.assertFalse(thrown); } + + @Test + public void testWriteToLedgerHeaderOnly() throws IOException { + CrossChainUtils.writeToLedger(new PrintWriter(System.out), new ArrayList<>()); + } + + @Test + public void testWriteToLedgerOneRow() throws IOException { + CrossChainUtils.writeToLedger( + new PrintWriter(System.out), + List.of( + new CrossChainTradeLedgerEntry( + "QORT", + "LTC", + 1000, + 0, + "LTC", + 1, + System.currentTimeMillis()) + ) + ); + } + + @Test + public void testWriteToLedgerTwoRows() throws IOException { + CrossChainUtils.writeToLedger( + new PrintWriter(System.out), + List.of( + new CrossChainTradeLedgerEntry( + "QORT", + "LTC", + 1000, + 0, + "LTC", + 1, + System.currentTimeMillis() + ), + new CrossChainTradeLedgerEntry( + "LTC", + "QORT", + 1, + 0, + "LTC", + 1000, + System.currentTimeMillis() + ) + ) + ); + } } From 1f4ca6263fecdc6a9630a7f0936efbe8fd8e5b36 Mon Sep 17 00:00:00 2001 From: kennycud Date: Wed, 19 Feb 2025 17:18:05 -0800 Subject: [PATCH 2/3] data monitor initial implementation --- src/main/java/org/qortal/api/ApiService.java | 1 + .../api/websocket/DataMonitorSocket.java | 102 +++++++++++++++++ .../ArbitraryDataCleanupManager.java | 52 ++++++++- .../ArbitraryDataFileRequestThread.java | 26 +++++ .../arbitrary/ArbitraryDataManager.java | 107 +++++++++++++++++- .../data/arbitrary/DataMonitorInfo.java | 57 ++++++++++ .../transaction/ArbitraryTransactionData.java | 22 ++++ .../org/qortal/event/DataMonitorEvent.java | 57 ++++++++++ .../utils/ArbitraryTransactionUtils.java | 9 +- 9 files changed, 424 insertions(+), 9 deletions(-) create mode 100644 src/main/java/org/qortal/api/websocket/DataMonitorSocket.java create mode 100644 src/main/java/org/qortal/data/arbitrary/DataMonitorInfo.java create mode 100644 src/main/java/org/qortal/event/DataMonitorEvent.java diff --git a/src/main/java/org/qortal/api/ApiService.java b/src/main/java/org/qortal/api/ApiService.java index fbef50d3..2cebe8e5 100644 --- a/src/main/java/org/qortal/api/ApiService.java +++ b/src/main/java/org/qortal/api/ApiService.java @@ -194,6 +194,7 @@ public class ApiService { context.addServlet(AdminStatusWebSocket.class, "/websockets/admin/status"); context.addServlet(BlocksWebSocket.class, "/websockets/blocks"); + context.addServlet(DataMonitorSocket.class, "/websockets/datamonitor"); context.addServlet(ActiveChatsWebSocket.class, "/websockets/chat/active/*"); context.addServlet(ChatMessagesWebSocket.class, "/websockets/chat/messages"); context.addServlet(TradeOffersWebSocket.class, "/websockets/crosschain/tradeoffers"); diff --git a/src/main/java/org/qortal/api/websocket/DataMonitorSocket.java b/src/main/java/org/qortal/api/websocket/DataMonitorSocket.java new file mode 100644 index 00000000..a93bf2ed --- /dev/null +++ b/src/main/java/org/qortal/api/websocket/DataMonitorSocket.java @@ -0,0 +1,102 @@ +package org.qortal.api.websocket; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.WebSocketException; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage; +import org.eclipse.jetty.websocket.api.annotations.WebSocket; +import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; +import org.qortal.api.ApiError; +import org.qortal.controller.Controller; +import org.qortal.data.arbitrary.DataMonitorInfo; +import org.qortal.event.DataMonitorEvent; +import org.qortal.event.Event; +import org.qortal.event.EventBus; +import org.qortal.event.Listener; +import org.qortal.repository.DataException; +import org.qortal.repository.Repository; +import org.qortal.repository.RepositoryManager; +import org.qortal.utils.Base58; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.List; + +@WebSocket +@SuppressWarnings("serial") +public class DataMonitorSocket extends ApiWebSocket implements Listener { + + private static final Logger LOGGER = LogManager.getLogger(DataMonitorSocket.class); + + @Override + public void configure(WebSocketServletFactory factory) { + LOGGER.info("configure"); + + factory.register(DataMonitorSocket.class); + + EventBus.INSTANCE.addListener(this); + } + + @Override + public void listen(Event event) { + if (!(event instanceof DataMonitorEvent)) + return; + + DataMonitorEvent dataMonitorEvent = (DataMonitorEvent) event; + + for (Session session : getSessions()) + sendDataEventSummary(session, buildInfo(dataMonitorEvent)); + } + + private DataMonitorInfo buildInfo(DataMonitorEvent dataMonitorEvent) { + + return new DataMonitorInfo( + dataMonitorEvent.getTimestamp(), + dataMonitorEvent.getIdentifier(), + dataMonitorEvent.getName(), + dataMonitorEvent.getService(), + dataMonitorEvent.getDescription(), + dataMonitorEvent.getTransactionTimestamp(), + dataMonitorEvent.getLatestPutTimestamp() + ); + } + + @OnWebSocketConnect + @Override + public void onWebSocketConnect(Session session) { + super.onWebSocketConnect(session); + } + + @OnWebSocketClose + @Override + public void onWebSocketClose(Session session, int statusCode, String reason) { + super.onWebSocketClose(session, statusCode, reason); + } + + @OnWebSocketError + public void onWebSocketError(Session session, Throwable throwable) { + /* We ignore errors for now, but method here to silence log spam */ + } + + @OnWebSocketMessage + public void onWebSocketMessage(Session session, String message) { + LOGGER.info("onWebSocketMessage: message = " + message); + } + + private void sendDataEventSummary(Session session, DataMonitorInfo dataMonitorInfo) { + StringWriter stringWriter = new StringWriter(); + + try { + marshall(stringWriter, dataMonitorInfo); + + session.getRemote().sendStringByFuture(stringWriter.toString()); + } catch (IOException | WebSocketException e) { + // No output this time + } + } + +} diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataCleanupManager.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataCleanupManager.java index aa29a7b8..99044988 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataCleanupManager.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataCleanupManager.java @@ -5,6 +5,8 @@ import org.apache.logging.log4j.Logger; import org.qortal.api.resource.TransactionsResource.ConfirmationStatus; import org.qortal.data.transaction.ArbitraryTransactionData; import org.qortal.data.transaction.TransactionData; +import org.qortal.event.DataMonitorEvent; +import org.qortal.event.EventBus; import org.qortal.repository.DataException; import org.qortal.repository.Repository; import org.qortal.repository.RepositoryManager; @@ -23,6 +25,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Objects; +import java.util.Optional; import static org.qortal.controller.arbitrary.ArbitraryDataStorageManager.DELETION_THRESHOLD; @@ -167,11 +170,24 @@ public class ArbitraryDataCleanupManager extends Thread { LOGGER.info("Deleting transaction {} because we can't host its data", Base58.encode(arbitraryTransactionData.getSignature())); ArbitraryTransactionUtils.deleteCompleteFileAndChunks(arbitraryTransactionData); + + EventBus.INSTANCE.notify( + new DataMonitorEvent( + System.currentTimeMillis(), + arbitraryTransactionData.getIdentifier(), + arbitraryTransactionData.getName(), + arbitraryTransactionData.getService().name(), + "can't store data, deleting", + arbitraryTransactionData.getTimestamp(), + arbitraryTransactionData.getTimestamp() + ) + ); continue; } // Check to see if we have had a more recent PUT - boolean hasMoreRecentPutTransaction = ArbitraryTransactionUtils.hasMoreRecentPutTransaction(repository, arbitraryTransactionData); + Optional moreRecentPutTransaction = ArbitraryTransactionUtils.hasMoreRecentPutTransaction(repository, arbitraryTransactionData); + boolean hasMoreRecentPutTransaction = moreRecentPutTransaction.isPresent(); if (hasMoreRecentPutTransaction) { // There is a more recent PUT transaction than the one we are currently processing. // When a PUT is issued, it replaces any layers that would have been there before. @@ -181,6 +197,17 @@ public class ArbitraryDataCleanupManager extends Thread { arbitraryTransactionData.getName(), Base58.encode(signature))); ArbitraryTransactionUtils.deleteCompleteFileAndChunks(arbitraryTransactionData); + EventBus.INSTANCE.notify( + new DataMonitorEvent( + System.currentTimeMillis(), + arbitraryTransactionData.getIdentifier(), + arbitraryTransactionData.getName(), + arbitraryTransactionData.getService().name(), + "deleting data due to replacement", + arbitraryTransactionData.getTimestamp(), + moreRecentPutTransaction.get().getTimestamp() + ) + ); continue; } @@ -200,6 +227,17 @@ public class ArbitraryDataCleanupManager extends Thread { Base58.encode(arbitraryTransactionData.getSignature()))); ArbitraryTransactionUtils.deleteCompleteFile(arbitraryTransactionData, now, STALE_FILE_TIMEOUT); + EventBus.INSTANCE.notify( + new DataMonitorEvent( + System.currentTimeMillis(), + arbitraryTransactionData.getIdentifier(), + arbitraryTransactionData.getName(), + arbitraryTransactionData.getService().name(), + "deleting data due to age", + arbitraryTransactionData.getTimestamp(), + arbitraryTransactionData.getTimestamp() + ) + ); continue; } @@ -414,6 +452,18 @@ public class ArbitraryDataCleanupManager extends Thread { // Relates to a different name - don't delete it return false; } + + EventBus.INSTANCE.notify( + new DataMonitorEvent( + System.currentTimeMillis(), + arbitraryTransactionData.getIdentifier(), + arbitraryTransactionData.getName(), + arbitraryTransactionData.getService().name(), + "randomly selected for deletion", + arbitraryTransactionData.getTimestamp(), + arbitraryTransactionData.getTimestamp() + ) + ); } } catch (DataException e) { diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileRequestThread.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileRequestThread.java index 8f734457..afde07c0 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileRequestThread.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileRequestThread.java @@ -5,6 +5,8 @@ import org.apache.logging.log4j.Logger; import org.qortal.controller.Controller; import org.qortal.data.arbitrary.ArbitraryFileListResponseInfo; import org.qortal.data.transaction.ArbitraryTransactionData; +import org.qortal.event.DataMonitorEvent; +import org.qortal.event.EventBus; import org.qortal.network.Peer; import org.qortal.repository.DataException; import org.qortal.repository.Repository; @@ -118,9 +120,33 @@ public class ArbitraryDataFileRequestThread implements Runnable { return; } + EventBus.INSTANCE.notify( + new DataMonitorEvent( + System.currentTimeMillis(), + arbitraryTransactionData.getIdentifier(), + arbitraryTransactionData.getName(), + arbitraryTransactionData.getService().name(), + "fetching file from peer", + arbitraryTransactionData.getTimestamp(), + arbitraryTransactionData.getTimestamp() + ) + ); + LOGGER.trace("Fetching file {} from peer {} via request thread...", hash58, peer); arbitraryDataFileManager.fetchArbitraryDataFiles(repository, peer, signature, arbitraryTransactionData, Arrays.asList(hash)); + EventBus.INSTANCE.notify( + new DataMonitorEvent( + System.currentTimeMillis(), + arbitraryTransactionData.getIdentifier(), + arbitraryTransactionData.getName(), + arbitraryTransactionData.getService().name(), + "fetched file from peer", + arbitraryTransactionData.getTimestamp(), + arbitraryTransactionData.getTimestamp() + ) + ); + } catch (DataException e) { LOGGER.debug("Unable to process file hashes: {}", e.getMessage()); } diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java index 6d64e20a..4f3dbb3c 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java @@ -10,6 +10,8 @@ import org.qortal.arbitrary.misc.Service; import org.qortal.controller.Controller; import org.qortal.data.transaction.ArbitraryTransactionData; import org.qortal.data.transaction.TransactionData; +import org.qortal.event.DataMonitorEvent; +import org.qortal.event.EventBus; import org.qortal.network.Network; import org.qortal.network.Peer; import org.qortal.repository.DataException; @@ -225,12 +227,35 @@ public class ArbitraryDataManager extends Thread { // Skip transactions that we don't need to proactively store data for if (!storageManager.shouldPreFetchData(repository, arbitraryTransactionData)) { iterator.remove(); + + EventBus.INSTANCE.notify( + new DataMonitorEvent( + System.currentTimeMillis(), + arbitraryTransactionData.getIdentifier(), + arbitraryTransactionData.getName(), + arbitraryTransactionData.getService().name(), + "don't need to proactively store, skipping", + arbitraryTransactionData.getTimestamp(), + arbitraryTransactionData.getTimestamp() + ) + ); continue; } // Remove transactions that we already have local data for if (hasLocalData(arbitraryTransaction)) { iterator.remove(); + EventBus.INSTANCE.notify( + new DataMonitorEvent( + System.currentTimeMillis(), + arbitraryTransactionData.getIdentifier(), + arbitraryTransactionData.getName(), + arbitraryTransactionData.getService().name(), + "already have local data, skipping", + arbitraryTransactionData.getTimestamp(), + arbitraryTransactionData.getTimestamp() + ) + ); } } @@ -248,8 +273,21 @@ public class ArbitraryDataManager extends Thread { // Check to see if we have had a more recent PUT ArbitraryTransactionData arbitraryTransactionData = ArbitraryTransactionUtils.fetchTransactionData(repository, signature); - boolean hasMoreRecentPutTransaction = ArbitraryTransactionUtils.hasMoreRecentPutTransaction(repository, arbitraryTransactionData); - if (hasMoreRecentPutTransaction) { + + Optional moreRecentPutTransaction = ArbitraryTransactionUtils.hasMoreRecentPutTransaction(repository, arbitraryTransactionData); + + if (moreRecentPutTransaction.isPresent()) { + EventBus.INSTANCE.notify( + new DataMonitorEvent( + System.currentTimeMillis(), + arbitraryTransactionData.getIdentifier(), + arbitraryTransactionData.getName(), + arbitraryTransactionData.getService().name(), + "not fetching old data", + arbitraryTransactionData.getTimestamp(), + moreRecentPutTransaction.get().getTimestamp() + ) + ); // There is a more recent PUT transaction than the one we are currently processing. // When a PUT is issued, it replaces any layers that would have been there before. // Therefore any data relating to this older transaction is no longer needed and we @@ -257,10 +295,34 @@ public class ArbitraryDataManager extends Thread { continue; } + EventBus.INSTANCE.notify( + new DataMonitorEvent( + System.currentTimeMillis(), + arbitraryTransactionData.getIdentifier(), + arbitraryTransactionData.getName(), + arbitraryTransactionData.getService().name(), + "fetching data", + arbitraryTransactionData.getTimestamp(), + arbitraryTransactionData.getTimestamp() + ) + ); + // Ask our connected peers if they have files for this signature // This process automatically then fetches the files themselves if a peer is found fetchData(arbitraryTransactionData); + EventBus.INSTANCE.notify( + new DataMonitorEvent( + System.currentTimeMillis(), + arbitraryTransactionData.getIdentifier(), + arbitraryTransactionData.getName(), + arbitraryTransactionData.getService().name(), + "fetched data", + arbitraryTransactionData.getTimestamp(), + arbitraryTransactionData.getTimestamp() + ) + ); + } catch (DataException e) { LOGGER.error("Repository issue when fetching arbitrary transaction data", e); } @@ -330,8 +392,22 @@ public class ArbitraryDataManager extends Thread { // Check to see if we have had a more recent PUT ArbitraryTransactionData arbitraryTransactionData = ArbitraryTransactionUtils.fetchTransactionData(repository, signature); - boolean hasMoreRecentPutTransaction = ArbitraryTransactionUtils.hasMoreRecentPutTransaction(repository, arbitraryTransactionData); - if (hasMoreRecentPutTransaction) { + Optional moreRecentPutTransaction = ArbitraryTransactionUtils.hasMoreRecentPutTransaction(repository, arbitraryTransactionData); + + if (moreRecentPutTransaction.isPresent()) { + + EventBus.INSTANCE.notify( + new DataMonitorEvent( + System.currentTimeMillis(), + arbitraryTransactionData.getIdentifier(), + arbitraryTransactionData.getName(), + arbitraryTransactionData.getService().name(), + "not fetching old metadata", + arbitraryTransactionData.getTimestamp(), + moreRecentPutTransaction.get().getTimestamp() + ) + ); + // There is a more recent PUT transaction than the one we are currently processing. // When a PUT is issued, it replaces any layers that would have been there before. // Therefore any data relating to this older transaction is no longer needed and we @@ -339,9 +415,32 @@ public class ArbitraryDataManager extends Thread { continue; } + EventBus.INSTANCE.notify( + new DataMonitorEvent( + System.currentTimeMillis(), + arbitraryTransactionData.getIdentifier(), + arbitraryTransactionData.getName(), + arbitraryTransactionData.getService().name(), + "fetching metadata", + arbitraryTransactionData.getTimestamp(), + arbitraryTransactionData.getTimestamp() + ) + ); + // Ask our connected peers if they have metadata for this signature fetchMetadata(arbitraryTransactionData); + EventBus.INSTANCE.notify( + new DataMonitorEvent( + System.currentTimeMillis(), + arbitraryTransactionData.getIdentifier(), + arbitraryTransactionData.getName(), + arbitraryTransactionData.getService().name(), + "fetched metadata", + arbitraryTransactionData.getTimestamp(), + arbitraryTransactionData.getTimestamp() + ) + ); } catch (DataException e) { LOGGER.error("Repository issue when fetching arbitrary transaction data", e); } diff --git a/src/main/java/org/qortal/data/arbitrary/DataMonitorInfo.java b/src/main/java/org/qortal/data/arbitrary/DataMonitorInfo.java new file mode 100644 index 00000000..5ee76c29 --- /dev/null +++ b/src/main/java/org/qortal/data/arbitrary/DataMonitorInfo.java @@ -0,0 +1,57 @@ +package org.qortal.data.arbitrary; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; + +@XmlAccessorType(XmlAccessType.FIELD) +public class DataMonitorInfo { + private long timestamp; + private String identifier; + private String name; + private String service; + private String description; + private long transactionTimestamp; + private long latestPutTimestamp; + + public DataMonitorInfo() { + } + + public DataMonitorInfo(long timestamp, String identifier, String name, String service, String description, long transactionTimestamp, long latestPutTimestamp) { + + this.timestamp = timestamp; + this.identifier = identifier; + this.name = name; + this.service = service; + this.description = description; + this.transactionTimestamp = transactionTimestamp; + this.latestPutTimestamp = latestPutTimestamp; + } + + public long getTimestamp() { + return timestamp; + } + + public String getIdentifier() { + return identifier; + } + + public String getName() { + return name; + } + + public String getService() { + return service; + } + + public String getDescription() { + return description; + } + + public long getTransactionTimestamp() { + return transactionTimestamp; + } + + public long getLatestPutTimestamp() { + return latestPutTimestamp; + } +} diff --git a/src/main/java/org/qortal/data/transaction/ArbitraryTransactionData.java b/src/main/java/org/qortal/data/transaction/ArbitraryTransactionData.java index f3828de8..81cfaa68 100644 --- a/src/main/java/org/qortal/data/transaction/ArbitraryTransactionData.java +++ b/src/main/java/org/qortal/data/transaction/ArbitraryTransactionData.java @@ -200,4 +200,26 @@ public class ArbitraryTransactionData extends TransactionData { return this.payments; } + @Override + public String toString() { + return "ArbitraryTransactionData{" + + "version=" + version + + ", service=" + service + + ", nonce=" + nonce + + ", size=" + size + + ", name='" + name + '\'' + + ", identifier='" + identifier + '\'' + + ", method=" + method + + ", compression=" + compression + + ", dataType=" + dataType + + ", type=" + type + + ", timestamp=" + timestamp + + ", fee=" + fee + + ", txGroupId=" + txGroupId + + ", blockHeight=" + blockHeight + + ", blockSequence=" + blockSequence + + ", approvalStatus=" + approvalStatus + + ", approvalHeight=" + approvalHeight + + '}'; + } } diff --git a/src/main/java/org/qortal/event/DataMonitorEvent.java b/src/main/java/org/qortal/event/DataMonitorEvent.java new file mode 100644 index 00000000..c62d9acf --- /dev/null +++ b/src/main/java/org/qortal/event/DataMonitorEvent.java @@ -0,0 +1,57 @@ +package org.qortal.event; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; + +@XmlAccessorType(XmlAccessType.FIELD) +public class DataMonitorEvent implements Event{ + private long timestamp; + private String identifier; + private String name; + private String service; + private String description; + private long transactionTimestamp; + private long latestPutTimestamp; + + public DataMonitorEvent() { + } + + public DataMonitorEvent(long timestamp, String identifier, String name, String service, String description, long transactionTimestamp, long latestPutTimestamp) { + + this.timestamp = timestamp; + this.identifier = identifier; + this.name = name; + this.service = service; + this.description = description; + this.transactionTimestamp = transactionTimestamp; + this.latestPutTimestamp = latestPutTimestamp; + } + + public long getTimestamp() { + return timestamp; + } + + public String getIdentifier() { + return identifier; + } + + public String getName() { + return name; + } + + public String getService() { + return service; + } + + public String getDescription() { + return description; + } + + public long getTransactionTimestamp() { + return transactionTimestamp; + } + + public long getLatestPutTimestamp() { + return latestPutTimestamp; + } +} diff --git a/src/main/java/org/qortal/utils/ArbitraryTransactionUtils.java b/src/main/java/org/qortal/utils/ArbitraryTransactionUtils.java index f641255f..3837e1cf 100644 --- a/src/main/java/org/qortal/utils/ArbitraryTransactionUtils.java +++ b/src/main/java/org/qortal/utils/ArbitraryTransactionUtils.java @@ -24,6 +24,7 @@ import java.nio.file.attribute.BasicFileAttributes; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; @@ -72,23 +73,23 @@ public class ArbitraryTransactionUtils { return latestPut; } - public static boolean hasMoreRecentPutTransaction(Repository repository, ArbitraryTransactionData arbitraryTransactionData) { + public static Optional hasMoreRecentPutTransaction(Repository repository, ArbitraryTransactionData arbitraryTransactionData) { byte[] signature = arbitraryTransactionData.getSignature(); if (signature == null) { // We can't make a sensible decision without a signature // so it's best to assume there is nothing newer - return false; + return Optional.empty(); } ArbitraryTransactionData latestPut = ArbitraryTransactionUtils.fetchLatestPut(repository, arbitraryTransactionData); if (latestPut == null) { - return false; + return Optional.empty(); } // If the latest PUT transaction has a newer timestamp, it will override the existing transaction // Any data relating to the older transaction is no longer needed boolean hasNewerPut = (latestPut.getTimestamp() > arbitraryTransactionData.getTimestamp()); - return hasNewerPut; + return hasNewerPut ? Optional.of(latestPut) : Optional.empty(); } public static boolean completeFileExists(ArbitraryTransactionData transactionData) throws DataException { From 676885ea2d3cefb56f8436bbdbec04cab24b2c35 Mon Sep 17 00:00:00 2001 From: kennycud Date: Mon, 24 Feb 2025 16:36:13 -0800 Subject: [PATCH 3/3] optimized arbitrary metadata fetching, added arbitrary data cache manager notifications, removed redundant notifications, added method to arbitrary repository and a setting to support the optimization --- .../arbitrary/ArbitraryDataCacheManager.java | 154 ++++++++++++++++-- .../ArbitraryDataFileRequestThread.java | 24 --- .../arbitrary/ArbitraryDataManager.java | 84 +++++++--- .../ArbitraryTransactionDataHashWrapper.java | 42 +++++ .../repository/ArbitraryRepository.java | 2 + .../hsqldb/HSQLDBArbitraryRepository.java | 71 +++++++- .../java/org/qortal/settings/Settings.java | 6 + 7 files changed, 318 insertions(+), 65 deletions(-) create mode 100644 src/main/java/org/qortal/controller/arbitrary/ArbitraryTransactionDataHashWrapper.java diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataCacheManager.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataCacheManager.java index d6b9303f..c30e190c 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataCacheManager.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataCacheManager.java @@ -6,6 +6,8 @@ import org.qortal.api.resource.TransactionsResource; import org.qortal.controller.Controller; import org.qortal.data.arbitrary.ArbitraryResourceData; import org.qortal.data.transaction.ArbitraryTransactionData; +import org.qortal.event.DataMonitorEvent; +import org.qortal.event.EventBus; import org.qortal.gui.SplashFrame; import org.qortal.repository.DataException; import org.qortal.repository.Repository; @@ -84,15 +86,63 @@ public class ArbitraryDataCacheManager extends Thread { // Update arbitrary resource caches try { + EventBus.INSTANCE.notify( + new DataMonitorEvent( + System.currentTimeMillis(), + transactionData.getIdentifier(), + transactionData.getName(), + transactionData.getService().name(), + "updating resource cache, queue", + transactionData.getTimestamp(), + transactionData.getTimestamp() + ) + ); + ArbitraryTransaction arbitraryTransaction = new ArbitraryTransaction(repository, transactionData); arbitraryTransaction.updateArbitraryResourceCache(repository); arbitraryTransaction.updateArbitraryMetadataCache(repository); repository.saveChanges(); + EventBus.INSTANCE.notify( + new DataMonitorEvent( + System.currentTimeMillis(), + transactionData.getIdentifier(), + transactionData.getName(), + transactionData.getService().name(), + "updated resource cache", + transactionData.getTimestamp(), + transactionData.getTimestamp() + ) + ); + + EventBus.INSTANCE.notify( + new DataMonitorEvent( + System.currentTimeMillis(), + transactionData.getIdentifier(), + transactionData.getName(), + transactionData.getService().name(), + "updating resource status, queue", + transactionData.getTimestamp(), + transactionData.getTimestamp() + ) + ); + // Update status as separate commit, as this is more prone to failure arbitraryTransaction.updateArbitraryResourceStatus(repository); repository.saveChanges(); + EventBus.INSTANCE.notify( + new DataMonitorEvent( + System.currentTimeMillis(), + transactionData.getIdentifier(), + transactionData.getName(), + transactionData.getService().name(), + "updated resource status", + transactionData.getTimestamp(), + transactionData.getTimestamp() + ) + ); + LOGGER.debug(() -> String.format("Finished processing transaction %.8s in arbitrary resource queue...", Base58.encode(transactionData.getSignature()))); } catch (DataException e) { @@ -103,6 +153,9 @@ public class ArbitraryDataCacheManager extends Thread { } catch (DataException e) { LOGGER.error("Repository issue while processing arbitrary resource cache updates", e); } + catch (Exception e) { + LOGGER.error(e.getMessage(), e); + } } public void addToUpdateQueue(ArbitraryTransactionData transactionData) { @@ -163,19 +216,49 @@ public class ArbitraryDataCacheManager extends Thread { // Expand signatures to transactions for (byte[] signature : signatures) { - ArbitraryTransactionData transactionData = (ArbitraryTransactionData) repository - .getTransactionRepository().fromSignature(signature); + try { + ArbitraryTransactionData transactionData = (ArbitraryTransactionData) repository + .getTransactionRepository().fromSignature(signature); - if (transactionData.getService() == null) { - // Unsupported service - ignore this resource - continue; + if (transactionData.getService() == null) { + // Unsupported service - ignore this resource + continue; + } + + EventBus.INSTANCE.notify( + new DataMonitorEvent( + System.currentTimeMillis(), + transactionData.getIdentifier(), + transactionData.getName(), + transactionData.getService().name(), + "updating resource cache, build", + transactionData.getTimestamp(), + transactionData.getTimestamp() + ) + ); + + // Update arbitrary resource caches + ArbitraryTransaction arbitraryTransaction = new ArbitraryTransaction(repository, transactionData); + arbitraryTransaction.updateArbitraryResourceCache(repository); + arbitraryTransaction.updateArbitraryMetadataCache(repository); + repository.saveChanges(); + + EventBus.INSTANCE.notify( + new DataMonitorEvent( + System.currentTimeMillis(), + transactionData.getIdentifier(), + transactionData.getName(), + transactionData.getService().name(), + "updated resource cache", + transactionData.getTimestamp(), + transactionData.getTimestamp() + ) + ); + } catch (DataException e) { + repository.discardChanges(); + + LOGGER.error(e.getMessage(), e); } - - // Update arbitrary resource caches - ArbitraryTransaction arbitraryTransaction = new ArbitraryTransaction(repository, transactionData); - arbitraryTransaction.updateArbitraryResourceCache(repository); - arbitraryTransaction.updateArbitraryMetadataCache(repository); - repository.saveChanges(); } offset += batchSize; } @@ -193,6 +276,11 @@ public class ArbitraryDataCacheManager extends Thread { repository.discardChanges(); throw new DataException("Build of arbitrary resources cache failed."); } + catch (Exception e) { + LOGGER.error(e.getMessage(), e); + + return false; + } } private boolean refreshArbitraryStatuses(Repository repository) throws DataException { @@ -216,10 +304,41 @@ public class ArbitraryDataCacheManager extends Thread { // Loop through hosted transactions for (ArbitraryTransactionData transactionData : hostedTransactions) { - // Determine status and update cache - ArbitraryTransaction arbitraryTransaction = new ArbitraryTransaction(repository, transactionData); - arbitraryTransaction.updateArbitraryResourceStatus(repository); - repository.saveChanges(); + try { + EventBus.INSTANCE.notify( + new DataMonitorEvent( + System.currentTimeMillis(), + transactionData.getIdentifier(), + transactionData.getName(), + transactionData.getService().name(), + "updating resource status", + transactionData.getTimestamp(), + transactionData.getTimestamp() + ) + ); + + // Determine status and update cache + ArbitraryTransaction arbitraryTransaction = new ArbitraryTransaction(repository, transactionData); + arbitraryTransaction.updateArbitraryResourceStatus(repository); + repository.saveChanges(); + + EventBus.INSTANCE.notify( + new DataMonitorEvent( + System.currentTimeMillis(), + transactionData.getIdentifier(), + transactionData.getName(), + transactionData.getService().name(), + "updated resource status", + transactionData.getTimestamp(), + transactionData.getTimestamp() + ) + ); + + } catch (DataException e) { + repository.discardChanges(); + + LOGGER.error(e.getMessage(), e); + } } offset += batchSize; } @@ -234,6 +353,11 @@ public class ArbitraryDataCacheManager extends Thread { repository.discardChanges(); throw new DataException("Refresh of arbitrary resource statuses failed."); } + catch (Exception e) { + LOGGER.error(e.getMessage(), e); + + return false; + } } } diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileRequestThread.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileRequestThread.java index afde07c0..b8285052 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileRequestThread.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileRequestThread.java @@ -120,33 +120,9 @@ public class ArbitraryDataFileRequestThread implements Runnable { return; } - EventBus.INSTANCE.notify( - new DataMonitorEvent( - System.currentTimeMillis(), - arbitraryTransactionData.getIdentifier(), - arbitraryTransactionData.getName(), - arbitraryTransactionData.getService().name(), - "fetching file from peer", - arbitraryTransactionData.getTimestamp(), - arbitraryTransactionData.getTimestamp() - ) - ); - LOGGER.trace("Fetching file {} from peer {} via request thread...", hash58, peer); arbitraryDataFileManager.fetchArbitraryDataFiles(repository, peer, signature, arbitraryTransactionData, Arrays.asList(hash)); - EventBus.INSTANCE.notify( - new DataMonitorEvent( - System.currentTimeMillis(), - arbitraryTransactionData.getIdentifier(), - arbitraryTransactionData.getName(), - arbitraryTransactionData.getService().name(), - "fetched file from peer", - arbitraryTransactionData.getTimestamp(), - arbitraryTransactionData.getTimestamp() - ) - ); - } catch (DataException e) { LOGGER.debug("Unable to process file hashes: {}", e.getMessage()); } diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java index 4f3dbb3c..47a25a03 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java @@ -30,6 +30,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.*; +import java.util.stream.Collectors; public class ArbitraryDataManager extends Thread { @@ -336,6 +337,20 @@ public class ArbitraryDataManager extends Thread { final int limit = 100; int offset = 0; + List allArbitraryTransactionsInDescendingOrder; + + try (final Repository repository = RepositoryManager.getRepository()) { + allArbitraryTransactionsInDescendingOrder + = repository.getArbitraryRepository() + .getLatestArbitraryTransactions(Settings.getInstance().getDataFetchLimit()); + } catch( Exception e) { + LOGGER.error(e.getMessage(), e); + allArbitraryTransactionsInDescendingOrder = new ArrayList<>(0); + } + + // collect processed transactions in a set to ensure outdated data transactions do not get fetched + Set processedTransactions = new HashSet<>(); + while (!isStopping) { final int minSeconds = 3; final int maxSeconds = 10; @@ -344,8 +359,8 @@ public class ArbitraryDataManager extends Thread { // Any arbitrary transactions we want to fetch data for? try (final Repository repository = RepositoryManager.getRepository()) { - List signatures = repository.getTransactionRepository().getSignaturesMatchingCriteria(null, null, null, ARBITRARY_TX_TYPE, null, null, null, ConfirmationStatus.BOTH, limit, offset, true); - // LOGGER.trace("Found {} arbitrary transactions at offset: {}, limit: {}", signatures.size(), offset, limit); + List signatures = processTransactionsForSignatures(limit, offset, allArbitraryTransactionsInDescendingOrder, processedTransactions); + if (signatures == null || signatures.isEmpty()) { offset = 0; break; @@ -390,30 +405,10 @@ public class ArbitraryDataManager extends Thread { continue; } - // Check to see if we have had a more recent PUT + // No longer need to see if we have had a more recent PUT since we compared the transactions to process + // to the transactions previously processed, so we can fetch the transactiondata, notify the event bus, + // fetch the metadata and notify the event bus again ArbitraryTransactionData arbitraryTransactionData = ArbitraryTransactionUtils.fetchTransactionData(repository, signature); - Optional moreRecentPutTransaction = ArbitraryTransactionUtils.hasMoreRecentPutTransaction(repository, arbitraryTransactionData); - - if (moreRecentPutTransaction.isPresent()) { - - EventBus.INSTANCE.notify( - new DataMonitorEvent( - System.currentTimeMillis(), - arbitraryTransactionData.getIdentifier(), - arbitraryTransactionData.getName(), - arbitraryTransactionData.getService().name(), - "not fetching old metadata", - arbitraryTransactionData.getTimestamp(), - moreRecentPutTransaction.get().getTimestamp() - ) - ); - - // There is a more recent PUT transaction than the one we are currently processing. - // When a PUT is issued, it replaces any layers that would have been there before. - // Therefore any data relating to this older transaction is no longer needed and we - // shouldn't fetch it from the network. - continue; - } EventBus.INSTANCE.notify( new DataMonitorEvent( @@ -443,10 +438,49 @@ public class ArbitraryDataManager extends Thread { ); } catch (DataException e) { LOGGER.error("Repository issue when fetching arbitrary transaction data", e); + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); } } } + private static List processTransactionsForSignatures(int limit, int offset, List allArbitraryTransactionsInDescendingOrder, Set processedTransactions) { + // these transactions are in descending order, latest transactions come first + List transactions + = allArbitraryTransactionsInDescendingOrder.stream() + .skip(offset) + .limit(limit) + .collect(Collectors.toList()); + + // wrap the transactions, so they can be used for hashing and comparing + // Class ArbitraryTransactionDataHashWrapper supports hashCode() and equals(...) for this purpose + List wrappedTransactions + = transactions.stream() + .map(transaction -> new ArbitraryTransactionDataHashWrapper(transaction)) + .collect(Collectors.toList()); + + // create a set of wrappers and populate it first to last, so that all outdated transactions get rejected + Set transactionsToProcess = new HashSet<>(wrappedTransactions.size()); + for(ArbitraryTransactionDataHashWrapper wrappedTransaction : wrappedTransactions) { + transactionsToProcess.add(wrappedTransaction); + } + + // remove the matches for previously processed transactions, + // because these transactions have had updates that have already been processed + transactionsToProcess.removeAll(processedTransactions); + + // add to processed transactions to compare and remove matches from future processing iterations + processedTransactions.addAll(transactionsToProcess); + + List signatures + = transactionsToProcess.stream() + .map(transactionToProcess -> transactionToProcess.getData() + .getSignature()) + .collect(Collectors.toList()); + + return signatures; + } + private ArbitraryTransaction fetchTransaction(final Repository repository, byte[] signature) { try { TransactionData transactionData = repository.getTransactionRepository().fromSignature(signature); diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryTransactionDataHashWrapper.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryTransactionDataHashWrapper.java new file mode 100644 index 00000000..0f64652c --- /dev/null +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryTransactionDataHashWrapper.java @@ -0,0 +1,42 @@ +package org.qortal.controller.arbitrary; + +import org.qortal.arbitrary.misc.Service; +import org.qortal.data.transaction.ArbitraryTransactionData; + +import java.util.Objects; + +public class ArbitraryTransactionDataHashWrapper { + + private final ArbitraryTransactionData data; + + private int service; + + private String name; + + private String identifier; + + public ArbitraryTransactionDataHashWrapper(ArbitraryTransactionData data) { + this.data = data; + + this.service = data.getService().value; + this.name = data.getName(); + this.identifier = data.getIdentifier(); + } + + public ArbitraryTransactionData getData() { + return data; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ArbitraryTransactionDataHashWrapper that = (ArbitraryTransactionDataHashWrapper) o; + return service == that.service && name.equals(that.name) && Objects.equals(identifier, that.identifier); + } + + @Override + public int hashCode() { + return Objects.hash(service, name, identifier); + } +} diff --git a/src/main/java/org/qortal/repository/ArbitraryRepository.java b/src/main/java/org/qortal/repository/ArbitraryRepository.java index 1c0e84e2..ce4ef75e 100644 --- a/src/main/java/org/qortal/repository/ArbitraryRepository.java +++ b/src/main/java/org/qortal/repository/ArbitraryRepository.java @@ -27,6 +27,8 @@ public interface ArbitraryRepository { public List getArbitraryTransactions(String name, Service service, String identifier, long since) throws DataException; + List getLatestArbitraryTransactions(int limit) throws DataException; + public ArbitraryTransactionData getInitialTransaction(String name, Service service, Method method, String identifier) throws DataException; public ArbitraryTransactionData getLatestTransaction(String name, Service service, Method method, String identifier) throws DataException; diff --git a/src/main/java/org/qortal/repository/hsqldb/HSQLDBArbitraryRepository.java b/src/main/java/org/qortal/repository/hsqldb/HSQLDBArbitraryRepository.java index 049e98aa..d3a49ddd 100644 --- a/src/main/java/org/qortal/repository/hsqldb/HSQLDBArbitraryRepository.java +++ b/src/main/java/org/qortal/repository/hsqldb/HSQLDBArbitraryRepository.java @@ -7,7 +7,6 @@ import org.qortal.arbitrary.ArbitraryDataFile; import org.qortal.arbitrary.metadata.ArbitraryDataTransactionMetadata; import org.qortal.arbitrary.misc.Category; import org.qortal.arbitrary.misc.Service; -import org.qortal.controller.arbitrary.ArbitraryDataManager; import org.qortal.data.arbitrary.ArbitraryResourceCache; import org.qortal.data.arbitrary.ArbitraryResourceData; import org.qortal.data.arbitrary.ArbitraryResourceMetadata; @@ -227,6 +226,76 @@ public class HSQLDBArbitraryRepository implements ArbitraryRepository { } } + @Override + public List getLatestArbitraryTransactions(int limit) throws DataException { + String sql = "SELECT type, reference, signature, creator, created_when, fee, " + + "tx_group_id, block_height, approval_status, approval_height, " + + "version, nonce, service, size, is_data_raw, data, metadata_hash, " + + "name, identifier, update_method, secret, compression FROM ArbitraryTransactions " + + "JOIN Transactions USING (signature) " + + "WHERE name IS NOT NULL " + + "ORDER BY created_when DESC " + + "LIMIT ?"; + List arbitraryTransactionData = new ArrayList<>(); + + try (ResultSet resultSet = this.repository.checkedExecute(sql, limit)) { + if (resultSet == null) + return new ArrayList<>(0); + + do { + byte[] reference = resultSet.getBytes(2); + byte[] signature = resultSet.getBytes(3); + byte[] creatorPublicKey = resultSet.getBytes(4); + long timestamp = resultSet.getLong(5); + + Long fee = resultSet.getLong(6); + if (fee == 0 && resultSet.wasNull()) + fee = null; + + int txGroupId = resultSet.getInt(7); + + Integer blockHeight = resultSet.getInt(8); + if (blockHeight == 0 && resultSet.wasNull()) + blockHeight = null; + + ApprovalStatus approvalStatus = ApprovalStatus.valueOf(resultSet.getInt(9)); + Integer approvalHeight = resultSet.getInt(10); + if (approvalHeight == 0 && resultSet.wasNull()) + approvalHeight = null; + + BaseTransactionData baseTransactionData = new BaseTransactionData(timestamp, txGroupId, reference, creatorPublicKey, fee, approvalStatus, blockHeight, approvalHeight, signature); + + int version = resultSet.getInt(11); + int nonce = resultSet.getInt(12); + int serviceInt = resultSet.getInt(13); + int size = resultSet.getInt(14); + boolean isDataRaw = resultSet.getBoolean(15); // NOT NULL, so no null to false + DataType dataType = isDataRaw ? DataType.RAW_DATA : DataType.DATA_HASH; + byte[] data = resultSet.getBytes(16); + byte[] metadataHash = resultSet.getBytes(17); + String nameResult = resultSet.getString(18); + String identifierResult = resultSet.getString(19); + Method method = Method.valueOf(resultSet.getInt(20)); + byte[] secret = resultSet.getBytes(21); + Compression compression = Compression.valueOf(resultSet.getInt(22)); + // FUTURE: get payments from signature if needed. Avoiding for now to reduce database calls. + + ArbitraryTransactionData transactionData = new ArbitraryTransactionData(baseTransactionData, + version, serviceInt, nonce, size, nameResult, identifierResult, method, secret, + compression, data, dataType, metadataHash, null); + + arbitraryTransactionData.add(transactionData); + } while (resultSet.next()); + + return arbitraryTransactionData; + } catch (SQLException e) { + throw new DataException("Unable to fetch arbitrary transactions from repository", e); + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); + return new ArrayList<>(0); + } + } + private ArbitraryTransactionData getSingleTransaction(String name, Service service, Method method, String identifier, boolean firstNotLast) throws DataException { if (name == null || service == null) { // Required fields diff --git a/src/main/java/org/qortal/settings/Settings.java b/src/main/java/org/qortal/settings/Settings.java index 3a0d17bb..eede9756 100644 --- a/src/main/java/org/qortal/settings/Settings.java +++ b/src/main/java/org/qortal/settings/Settings.java @@ -508,6 +508,8 @@ public class Settings { */ private boolean connectionPoolMonitorEnabled = false; + private int dataFetchLimit = 1_000_000; + // Domain mapping public static class ThreadLimit { private String messageType; @@ -1333,4 +1335,8 @@ public class Settings { public boolean isConnectionPoolMonitorEnabled() { return connectionPoolMonitorEnabled; } + + public int getDataFetchLimit() { + return dataFetchLimit; + } }