diff --git a/src/main/java/org/qortal/api/ApiService.java b/src/main/java/org/qortal/api/ApiService.java index fbef50d3..2cebe8e5 100644 --- a/src/main/java/org/qortal/api/ApiService.java +++ b/src/main/java/org/qortal/api/ApiService.java @@ -194,6 +194,7 @@ public class ApiService { context.addServlet(AdminStatusWebSocket.class, "/websockets/admin/status"); context.addServlet(BlocksWebSocket.class, "/websockets/blocks"); + context.addServlet(DataMonitorSocket.class, "/websockets/datamonitor"); context.addServlet(ActiveChatsWebSocket.class, "/websockets/chat/active/*"); context.addServlet(ChatMessagesWebSocket.class, "/websockets/chat/messages"); context.addServlet(TradeOffersWebSocket.class, "/websockets/crosschain/tradeoffers"); diff --git a/src/main/java/org/qortal/api/websocket/DataMonitorSocket.java b/src/main/java/org/qortal/api/websocket/DataMonitorSocket.java new file mode 100644 index 00000000..a93bf2ed --- /dev/null +++ b/src/main/java/org/qortal/api/websocket/DataMonitorSocket.java @@ -0,0 +1,102 @@ +package org.qortal.api.websocket; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.WebSocketException; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage; +import org.eclipse.jetty.websocket.api.annotations.WebSocket; +import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; +import org.qortal.api.ApiError; +import org.qortal.controller.Controller; +import org.qortal.data.arbitrary.DataMonitorInfo; +import org.qortal.event.DataMonitorEvent; +import org.qortal.event.Event; +import org.qortal.event.EventBus; +import org.qortal.event.Listener; +import org.qortal.repository.DataException; +import org.qortal.repository.Repository; +import org.qortal.repository.RepositoryManager; +import org.qortal.utils.Base58; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.List; + +@WebSocket +@SuppressWarnings("serial") +public class DataMonitorSocket extends ApiWebSocket implements Listener { + + private static final Logger LOGGER = LogManager.getLogger(DataMonitorSocket.class); + + @Override + public void configure(WebSocketServletFactory factory) { + LOGGER.info("configure"); + + factory.register(DataMonitorSocket.class); + + EventBus.INSTANCE.addListener(this); + } + + @Override + public void listen(Event event) { + if (!(event instanceof DataMonitorEvent)) + return; + + DataMonitorEvent dataMonitorEvent = (DataMonitorEvent) event; + + for (Session session : getSessions()) + sendDataEventSummary(session, buildInfo(dataMonitorEvent)); + } + + private DataMonitorInfo buildInfo(DataMonitorEvent dataMonitorEvent) { + + return new DataMonitorInfo( + dataMonitorEvent.getTimestamp(), + dataMonitorEvent.getIdentifier(), + dataMonitorEvent.getName(), + dataMonitorEvent.getService(), + dataMonitorEvent.getDescription(), + dataMonitorEvent.getTransactionTimestamp(), + dataMonitorEvent.getLatestPutTimestamp() + ); + } + + @OnWebSocketConnect + @Override + public void onWebSocketConnect(Session session) { + super.onWebSocketConnect(session); + } + + @OnWebSocketClose + @Override + public void onWebSocketClose(Session session, int statusCode, String reason) { + super.onWebSocketClose(session, statusCode, reason); + } + + @OnWebSocketError + public void onWebSocketError(Session session, Throwable throwable) { + /* We ignore errors for now, but method here to silence log spam */ + } + + @OnWebSocketMessage + public void onWebSocketMessage(Session session, String message) { + LOGGER.info("onWebSocketMessage: message = " + message); + } + + private void sendDataEventSummary(Session session, DataMonitorInfo dataMonitorInfo) { + StringWriter stringWriter = new StringWriter(); + + try { + marshall(stringWriter, dataMonitorInfo); + + session.getRemote().sendStringByFuture(stringWriter.toString()); + } catch (IOException | WebSocketException e) { + // No output this time + } + } + +} diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataCleanupManager.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataCleanupManager.java index aa29a7b8..99044988 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataCleanupManager.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataCleanupManager.java @@ -5,6 +5,8 @@ import org.apache.logging.log4j.Logger; import org.qortal.api.resource.TransactionsResource.ConfirmationStatus; import org.qortal.data.transaction.ArbitraryTransactionData; import org.qortal.data.transaction.TransactionData; +import org.qortal.event.DataMonitorEvent; +import org.qortal.event.EventBus; import org.qortal.repository.DataException; import org.qortal.repository.Repository; import org.qortal.repository.RepositoryManager; @@ -23,6 +25,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Objects; +import java.util.Optional; import static org.qortal.controller.arbitrary.ArbitraryDataStorageManager.DELETION_THRESHOLD; @@ -167,11 +170,24 @@ public class ArbitraryDataCleanupManager extends Thread { LOGGER.info("Deleting transaction {} because we can't host its data", Base58.encode(arbitraryTransactionData.getSignature())); ArbitraryTransactionUtils.deleteCompleteFileAndChunks(arbitraryTransactionData); + + EventBus.INSTANCE.notify( + new DataMonitorEvent( + System.currentTimeMillis(), + arbitraryTransactionData.getIdentifier(), + arbitraryTransactionData.getName(), + arbitraryTransactionData.getService().name(), + "can't store data, deleting", + arbitraryTransactionData.getTimestamp(), + arbitraryTransactionData.getTimestamp() + ) + ); continue; } // Check to see if we have had a more recent PUT - boolean hasMoreRecentPutTransaction = ArbitraryTransactionUtils.hasMoreRecentPutTransaction(repository, arbitraryTransactionData); + Optional moreRecentPutTransaction = ArbitraryTransactionUtils.hasMoreRecentPutTransaction(repository, arbitraryTransactionData); + boolean hasMoreRecentPutTransaction = moreRecentPutTransaction.isPresent(); if (hasMoreRecentPutTransaction) { // There is a more recent PUT transaction than the one we are currently processing. // When a PUT is issued, it replaces any layers that would have been there before. @@ -181,6 +197,17 @@ public class ArbitraryDataCleanupManager extends Thread { arbitraryTransactionData.getName(), Base58.encode(signature))); ArbitraryTransactionUtils.deleteCompleteFileAndChunks(arbitraryTransactionData); + EventBus.INSTANCE.notify( + new DataMonitorEvent( + System.currentTimeMillis(), + arbitraryTransactionData.getIdentifier(), + arbitraryTransactionData.getName(), + arbitraryTransactionData.getService().name(), + "deleting data due to replacement", + arbitraryTransactionData.getTimestamp(), + moreRecentPutTransaction.get().getTimestamp() + ) + ); continue; } @@ -200,6 +227,17 @@ public class ArbitraryDataCleanupManager extends Thread { Base58.encode(arbitraryTransactionData.getSignature()))); ArbitraryTransactionUtils.deleteCompleteFile(arbitraryTransactionData, now, STALE_FILE_TIMEOUT); + EventBus.INSTANCE.notify( + new DataMonitorEvent( + System.currentTimeMillis(), + arbitraryTransactionData.getIdentifier(), + arbitraryTransactionData.getName(), + arbitraryTransactionData.getService().name(), + "deleting data due to age", + arbitraryTransactionData.getTimestamp(), + arbitraryTransactionData.getTimestamp() + ) + ); continue; } @@ -414,6 +452,18 @@ public class ArbitraryDataCleanupManager extends Thread { // Relates to a different name - don't delete it return false; } + + EventBus.INSTANCE.notify( + new DataMonitorEvent( + System.currentTimeMillis(), + arbitraryTransactionData.getIdentifier(), + arbitraryTransactionData.getName(), + arbitraryTransactionData.getService().name(), + "randomly selected for deletion", + arbitraryTransactionData.getTimestamp(), + arbitraryTransactionData.getTimestamp() + ) + ); } } catch (DataException e) { diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileRequestThread.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileRequestThread.java index 8f734457..afde07c0 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileRequestThread.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataFileRequestThread.java @@ -5,6 +5,8 @@ import org.apache.logging.log4j.Logger; import org.qortal.controller.Controller; import org.qortal.data.arbitrary.ArbitraryFileListResponseInfo; import org.qortal.data.transaction.ArbitraryTransactionData; +import org.qortal.event.DataMonitorEvent; +import org.qortal.event.EventBus; import org.qortal.network.Peer; import org.qortal.repository.DataException; import org.qortal.repository.Repository; @@ -118,9 +120,33 @@ public class ArbitraryDataFileRequestThread implements Runnable { return; } + EventBus.INSTANCE.notify( + new DataMonitorEvent( + System.currentTimeMillis(), + arbitraryTransactionData.getIdentifier(), + arbitraryTransactionData.getName(), + arbitraryTransactionData.getService().name(), + "fetching file from peer", + arbitraryTransactionData.getTimestamp(), + arbitraryTransactionData.getTimestamp() + ) + ); + LOGGER.trace("Fetching file {} from peer {} via request thread...", hash58, peer); arbitraryDataFileManager.fetchArbitraryDataFiles(repository, peer, signature, arbitraryTransactionData, Arrays.asList(hash)); + EventBus.INSTANCE.notify( + new DataMonitorEvent( + System.currentTimeMillis(), + arbitraryTransactionData.getIdentifier(), + arbitraryTransactionData.getName(), + arbitraryTransactionData.getService().name(), + "fetched file from peer", + arbitraryTransactionData.getTimestamp(), + arbitraryTransactionData.getTimestamp() + ) + ); + } catch (DataException e) { LOGGER.debug("Unable to process file hashes: {}", e.getMessage()); } diff --git a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java index 6d64e20a..4f3dbb3c 100644 --- a/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java +++ b/src/main/java/org/qortal/controller/arbitrary/ArbitraryDataManager.java @@ -10,6 +10,8 @@ import org.qortal.arbitrary.misc.Service; import org.qortal.controller.Controller; import org.qortal.data.transaction.ArbitraryTransactionData; import org.qortal.data.transaction.TransactionData; +import org.qortal.event.DataMonitorEvent; +import org.qortal.event.EventBus; import org.qortal.network.Network; import org.qortal.network.Peer; import org.qortal.repository.DataException; @@ -225,12 +227,35 @@ public class ArbitraryDataManager extends Thread { // Skip transactions that we don't need to proactively store data for if (!storageManager.shouldPreFetchData(repository, arbitraryTransactionData)) { iterator.remove(); + + EventBus.INSTANCE.notify( + new DataMonitorEvent( + System.currentTimeMillis(), + arbitraryTransactionData.getIdentifier(), + arbitraryTransactionData.getName(), + arbitraryTransactionData.getService().name(), + "don't need to proactively store, skipping", + arbitraryTransactionData.getTimestamp(), + arbitraryTransactionData.getTimestamp() + ) + ); continue; } // Remove transactions that we already have local data for if (hasLocalData(arbitraryTransaction)) { iterator.remove(); + EventBus.INSTANCE.notify( + new DataMonitorEvent( + System.currentTimeMillis(), + arbitraryTransactionData.getIdentifier(), + arbitraryTransactionData.getName(), + arbitraryTransactionData.getService().name(), + "already have local data, skipping", + arbitraryTransactionData.getTimestamp(), + arbitraryTransactionData.getTimestamp() + ) + ); } } @@ -248,8 +273,21 @@ public class ArbitraryDataManager extends Thread { // Check to see if we have had a more recent PUT ArbitraryTransactionData arbitraryTransactionData = ArbitraryTransactionUtils.fetchTransactionData(repository, signature); - boolean hasMoreRecentPutTransaction = ArbitraryTransactionUtils.hasMoreRecentPutTransaction(repository, arbitraryTransactionData); - if (hasMoreRecentPutTransaction) { + + Optional moreRecentPutTransaction = ArbitraryTransactionUtils.hasMoreRecentPutTransaction(repository, arbitraryTransactionData); + + if (moreRecentPutTransaction.isPresent()) { + EventBus.INSTANCE.notify( + new DataMonitorEvent( + System.currentTimeMillis(), + arbitraryTransactionData.getIdentifier(), + arbitraryTransactionData.getName(), + arbitraryTransactionData.getService().name(), + "not fetching old data", + arbitraryTransactionData.getTimestamp(), + moreRecentPutTransaction.get().getTimestamp() + ) + ); // There is a more recent PUT transaction than the one we are currently processing. // When a PUT is issued, it replaces any layers that would have been there before. // Therefore any data relating to this older transaction is no longer needed and we @@ -257,10 +295,34 @@ public class ArbitraryDataManager extends Thread { continue; } + EventBus.INSTANCE.notify( + new DataMonitorEvent( + System.currentTimeMillis(), + arbitraryTransactionData.getIdentifier(), + arbitraryTransactionData.getName(), + arbitraryTransactionData.getService().name(), + "fetching data", + arbitraryTransactionData.getTimestamp(), + arbitraryTransactionData.getTimestamp() + ) + ); + // Ask our connected peers if they have files for this signature // This process automatically then fetches the files themselves if a peer is found fetchData(arbitraryTransactionData); + EventBus.INSTANCE.notify( + new DataMonitorEvent( + System.currentTimeMillis(), + arbitraryTransactionData.getIdentifier(), + arbitraryTransactionData.getName(), + arbitraryTransactionData.getService().name(), + "fetched data", + arbitraryTransactionData.getTimestamp(), + arbitraryTransactionData.getTimestamp() + ) + ); + } catch (DataException e) { LOGGER.error("Repository issue when fetching arbitrary transaction data", e); } @@ -330,8 +392,22 @@ public class ArbitraryDataManager extends Thread { // Check to see if we have had a more recent PUT ArbitraryTransactionData arbitraryTransactionData = ArbitraryTransactionUtils.fetchTransactionData(repository, signature); - boolean hasMoreRecentPutTransaction = ArbitraryTransactionUtils.hasMoreRecentPutTransaction(repository, arbitraryTransactionData); - if (hasMoreRecentPutTransaction) { + Optional moreRecentPutTransaction = ArbitraryTransactionUtils.hasMoreRecentPutTransaction(repository, arbitraryTransactionData); + + if (moreRecentPutTransaction.isPresent()) { + + EventBus.INSTANCE.notify( + new DataMonitorEvent( + System.currentTimeMillis(), + arbitraryTransactionData.getIdentifier(), + arbitraryTransactionData.getName(), + arbitraryTransactionData.getService().name(), + "not fetching old metadata", + arbitraryTransactionData.getTimestamp(), + moreRecentPutTransaction.get().getTimestamp() + ) + ); + // There is a more recent PUT transaction than the one we are currently processing. // When a PUT is issued, it replaces any layers that would have been there before. // Therefore any data relating to this older transaction is no longer needed and we @@ -339,9 +415,32 @@ public class ArbitraryDataManager extends Thread { continue; } + EventBus.INSTANCE.notify( + new DataMonitorEvent( + System.currentTimeMillis(), + arbitraryTransactionData.getIdentifier(), + arbitraryTransactionData.getName(), + arbitraryTransactionData.getService().name(), + "fetching metadata", + arbitraryTransactionData.getTimestamp(), + arbitraryTransactionData.getTimestamp() + ) + ); + // Ask our connected peers if they have metadata for this signature fetchMetadata(arbitraryTransactionData); + EventBus.INSTANCE.notify( + new DataMonitorEvent( + System.currentTimeMillis(), + arbitraryTransactionData.getIdentifier(), + arbitraryTransactionData.getName(), + arbitraryTransactionData.getService().name(), + "fetched metadata", + arbitraryTransactionData.getTimestamp(), + arbitraryTransactionData.getTimestamp() + ) + ); } catch (DataException e) { LOGGER.error("Repository issue when fetching arbitrary transaction data", e); } diff --git a/src/main/java/org/qortal/data/arbitrary/DataMonitorInfo.java b/src/main/java/org/qortal/data/arbitrary/DataMonitorInfo.java new file mode 100644 index 00000000..5ee76c29 --- /dev/null +++ b/src/main/java/org/qortal/data/arbitrary/DataMonitorInfo.java @@ -0,0 +1,57 @@ +package org.qortal.data.arbitrary; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; + +@XmlAccessorType(XmlAccessType.FIELD) +public class DataMonitorInfo { + private long timestamp; + private String identifier; + private String name; + private String service; + private String description; + private long transactionTimestamp; + private long latestPutTimestamp; + + public DataMonitorInfo() { + } + + public DataMonitorInfo(long timestamp, String identifier, String name, String service, String description, long transactionTimestamp, long latestPutTimestamp) { + + this.timestamp = timestamp; + this.identifier = identifier; + this.name = name; + this.service = service; + this.description = description; + this.transactionTimestamp = transactionTimestamp; + this.latestPutTimestamp = latestPutTimestamp; + } + + public long getTimestamp() { + return timestamp; + } + + public String getIdentifier() { + return identifier; + } + + public String getName() { + return name; + } + + public String getService() { + return service; + } + + public String getDescription() { + return description; + } + + public long getTransactionTimestamp() { + return transactionTimestamp; + } + + public long getLatestPutTimestamp() { + return latestPutTimestamp; + } +} diff --git a/src/main/java/org/qortal/data/transaction/ArbitraryTransactionData.java b/src/main/java/org/qortal/data/transaction/ArbitraryTransactionData.java index f3828de8..81cfaa68 100644 --- a/src/main/java/org/qortal/data/transaction/ArbitraryTransactionData.java +++ b/src/main/java/org/qortal/data/transaction/ArbitraryTransactionData.java @@ -200,4 +200,26 @@ public class ArbitraryTransactionData extends TransactionData { return this.payments; } + @Override + public String toString() { + return "ArbitraryTransactionData{" + + "version=" + version + + ", service=" + service + + ", nonce=" + nonce + + ", size=" + size + + ", name='" + name + '\'' + + ", identifier='" + identifier + '\'' + + ", method=" + method + + ", compression=" + compression + + ", dataType=" + dataType + + ", type=" + type + + ", timestamp=" + timestamp + + ", fee=" + fee + + ", txGroupId=" + txGroupId + + ", blockHeight=" + blockHeight + + ", blockSequence=" + blockSequence + + ", approvalStatus=" + approvalStatus + + ", approvalHeight=" + approvalHeight + + '}'; + } } diff --git a/src/main/java/org/qortal/event/DataMonitorEvent.java b/src/main/java/org/qortal/event/DataMonitorEvent.java new file mode 100644 index 00000000..c62d9acf --- /dev/null +++ b/src/main/java/org/qortal/event/DataMonitorEvent.java @@ -0,0 +1,57 @@ +package org.qortal.event; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; + +@XmlAccessorType(XmlAccessType.FIELD) +public class DataMonitorEvent implements Event{ + private long timestamp; + private String identifier; + private String name; + private String service; + private String description; + private long transactionTimestamp; + private long latestPutTimestamp; + + public DataMonitorEvent() { + } + + public DataMonitorEvent(long timestamp, String identifier, String name, String service, String description, long transactionTimestamp, long latestPutTimestamp) { + + this.timestamp = timestamp; + this.identifier = identifier; + this.name = name; + this.service = service; + this.description = description; + this.transactionTimestamp = transactionTimestamp; + this.latestPutTimestamp = latestPutTimestamp; + } + + public long getTimestamp() { + return timestamp; + } + + public String getIdentifier() { + return identifier; + } + + public String getName() { + return name; + } + + public String getService() { + return service; + } + + public String getDescription() { + return description; + } + + public long getTransactionTimestamp() { + return transactionTimestamp; + } + + public long getLatestPutTimestamp() { + return latestPutTimestamp; + } +} diff --git a/src/main/java/org/qortal/utils/ArbitraryTransactionUtils.java b/src/main/java/org/qortal/utils/ArbitraryTransactionUtils.java index f641255f..3837e1cf 100644 --- a/src/main/java/org/qortal/utils/ArbitraryTransactionUtils.java +++ b/src/main/java/org/qortal/utils/ArbitraryTransactionUtils.java @@ -24,6 +24,7 @@ import java.nio.file.attribute.BasicFileAttributes; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; @@ -72,23 +73,23 @@ public class ArbitraryTransactionUtils { return latestPut; } - public static boolean hasMoreRecentPutTransaction(Repository repository, ArbitraryTransactionData arbitraryTransactionData) { + public static Optional hasMoreRecentPutTransaction(Repository repository, ArbitraryTransactionData arbitraryTransactionData) { byte[] signature = arbitraryTransactionData.getSignature(); if (signature == null) { // We can't make a sensible decision without a signature // so it's best to assume there is nothing newer - return false; + return Optional.empty(); } ArbitraryTransactionData latestPut = ArbitraryTransactionUtils.fetchLatestPut(repository, arbitraryTransactionData); if (latestPut == null) { - return false; + return Optional.empty(); } // If the latest PUT transaction has a newer timestamp, it will override the existing transaction // Any data relating to the older transaction is no longer needed boolean hasNewerPut = (latestPut.getTimestamp() > arbitraryTransactionData.getTimestamp()); - return hasNewerPut; + return hasNewerPut ? Optional.of(latestPut) : Optional.empty(); } public static boolean completeFileExists(ArbitraryTransactionData transactionData) throws DataException {