data monitor initial implementation

This commit is contained in:
kennycud 2025-02-19 17:18:05 -08:00
parent df37372180
commit 1f4ca6263f
9 changed files with 424 additions and 9 deletions

View File

@ -194,6 +194,7 @@ public class ApiService {
context.addServlet(AdminStatusWebSocket.class, "/websockets/admin/status");
context.addServlet(BlocksWebSocket.class, "/websockets/blocks");
context.addServlet(DataMonitorSocket.class, "/websockets/datamonitor");
context.addServlet(ActiveChatsWebSocket.class, "/websockets/chat/active/*");
context.addServlet(ChatMessagesWebSocket.class, "/websockets/chat/messages");
context.addServlet(TradeOffersWebSocket.class, "/websockets/crosschain/tradeoffers");

View File

@ -0,0 +1,102 @@
package org.qortal.api.websocket;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
import org.qortal.api.ApiError;
import org.qortal.controller.Controller;
import org.qortal.data.arbitrary.DataMonitorInfo;
import org.qortal.event.DataMonitorEvent;
import org.qortal.event.Event;
import org.qortal.event.EventBus;
import org.qortal.event.Listener;
import org.qortal.repository.DataException;
import org.qortal.repository.Repository;
import org.qortal.repository.RepositoryManager;
import org.qortal.utils.Base58;
import java.io.IOException;
import java.io.StringWriter;
import java.util.List;
@WebSocket
@SuppressWarnings("serial")
public class DataMonitorSocket extends ApiWebSocket implements Listener {
private static final Logger LOGGER = LogManager.getLogger(DataMonitorSocket.class);
@Override
public void configure(WebSocketServletFactory factory) {
LOGGER.info("configure");
factory.register(DataMonitorSocket.class);
EventBus.INSTANCE.addListener(this);
}
@Override
public void listen(Event event) {
if (!(event instanceof DataMonitorEvent))
return;
DataMonitorEvent dataMonitorEvent = (DataMonitorEvent) event;
for (Session session : getSessions())
sendDataEventSummary(session, buildInfo(dataMonitorEvent));
}
private DataMonitorInfo buildInfo(DataMonitorEvent dataMonitorEvent) {
return new DataMonitorInfo(
dataMonitorEvent.getTimestamp(),
dataMonitorEvent.getIdentifier(),
dataMonitorEvent.getName(),
dataMonitorEvent.getService(),
dataMonitorEvent.getDescription(),
dataMonitorEvent.getTransactionTimestamp(),
dataMonitorEvent.getLatestPutTimestamp()
);
}
@OnWebSocketConnect
@Override
public void onWebSocketConnect(Session session) {
super.onWebSocketConnect(session);
}
@OnWebSocketClose
@Override
public void onWebSocketClose(Session session, int statusCode, String reason) {
super.onWebSocketClose(session, statusCode, reason);
}
@OnWebSocketError
public void onWebSocketError(Session session, Throwable throwable) {
/* We ignore errors for now, but method here to silence log spam */
}
@OnWebSocketMessage
public void onWebSocketMessage(Session session, String message) {
LOGGER.info("onWebSocketMessage: message = " + message);
}
private void sendDataEventSummary(Session session, DataMonitorInfo dataMonitorInfo) {
StringWriter stringWriter = new StringWriter();
try {
marshall(stringWriter, dataMonitorInfo);
session.getRemote().sendStringByFuture(stringWriter.toString());
} catch (IOException | WebSocketException e) {
// No output this time
}
}
}

View File

@ -5,6 +5,8 @@ import org.apache.logging.log4j.Logger;
import org.qortal.api.resource.TransactionsResource.ConfirmationStatus;
import org.qortal.data.transaction.ArbitraryTransactionData;
import org.qortal.data.transaction.TransactionData;
import org.qortal.event.DataMonitorEvent;
import org.qortal.event.EventBus;
import org.qortal.repository.DataException;
import org.qortal.repository.Repository;
import org.qortal.repository.RepositoryManager;
@ -23,6 +25,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import static org.qortal.controller.arbitrary.ArbitraryDataStorageManager.DELETION_THRESHOLD;
@ -167,11 +170,24 @@ public class ArbitraryDataCleanupManager extends Thread {
LOGGER.info("Deleting transaction {} because we can't host its data",
Base58.encode(arbitraryTransactionData.getSignature()));
ArbitraryTransactionUtils.deleteCompleteFileAndChunks(arbitraryTransactionData);
EventBus.INSTANCE.notify(
new DataMonitorEvent(
System.currentTimeMillis(),
arbitraryTransactionData.getIdentifier(),
arbitraryTransactionData.getName(),
arbitraryTransactionData.getService().name(),
"can't store data, deleting",
arbitraryTransactionData.getTimestamp(),
arbitraryTransactionData.getTimestamp()
)
);
continue;
}
// Check to see if we have had a more recent PUT
boolean hasMoreRecentPutTransaction = ArbitraryTransactionUtils.hasMoreRecentPutTransaction(repository, arbitraryTransactionData);
Optional<ArbitraryTransactionData> moreRecentPutTransaction = ArbitraryTransactionUtils.hasMoreRecentPutTransaction(repository, arbitraryTransactionData);
boolean hasMoreRecentPutTransaction = moreRecentPutTransaction.isPresent();
if (hasMoreRecentPutTransaction) {
// There is a more recent PUT transaction than the one we are currently processing.
// When a PUT is issued, it replaces any layers that would have been there before.
@ -181,6 +197,17 @@ public class ArbitraryDataCleanupManager extends Thread {
arbitraryTransactionData.getName(), Base58.encode(signature)));
ArbitraryTransactionUtils.deleteCompleteFileAndChunks(arbitraryTransactionData);
EventBus.INSTANCE.notify(
new DataMonitorEvent(
System.currentTimeMillis(),
arbitraryTransactionData.getIdentifier(),
arbitraryTransactionData.getName(),
arbitraryTransactionData.getService().name(),
"deleting data due to replacement",
arbitraryTransactionData.getTimestamp(),
moreRecentPutTransaction.get().getTimestamp()
)
);
continue;
}
@ -200,6 +227,17 @@ public class ArbitraryDataCleanupManager extends Thread {
Base58.encode(arbitraryTransactionData.getSignature())));
ArbitraryTransactionUtils.deleteCompleteFile(arbitraryTransactionData, now, STALE_FILE_TIMEOUT);
EventBus.INSTANCE.notify(
new DataMonitorEvent(
System.currentTimeMillis(),
arbitraryTransactionData.getIdentifier(),
arbitraryTransactionData.getName(),
arbitraryTransactionData.getService().name(),
"deleting data due to age",
arbitraryTransactionData.getTimestamp(),
arbitraryTransactionData.getTimestamp()
)
);
continue;
}
@ -414,6 +452,18 @@ public class ArbitraryDataCleanupManager extends Thread {
// Relates to a different name - don't delete it
return false;
}
EventBus.INSTANCE.notify(
new DataMonitorEvent(
System.currentTimeMillis(),
arbitraryTransactionData.getIdentifier(),
arbitraryTransactionData.getName(),
arbitraryTransactionData.getService().name(),
"randomly selected for deletion",
arbitraryTransactionData.getTimestamp(),
arbitraryTransactionData.getTimestamp()
)
);
}
} catch (DataException e) {

View File

@ -5,6 +5,8 @@ import org.apache.logging.log4j.Logger;
import org.qortal.controller.Controller;
import org.qortal.data.arbitrary.ArbitraryFileListResponseInfo;
import org.qortal.data.transaction.ArbitraryTransactionData;
import org.qortal.event.DataMonitorEvent;
import org.qortal.event.EventBus;
import org.qortal.network.Peer;
import org.qortal.repository.DataException;
import org.qortal.repository.Repository;
@ -118,9 +120,33 @@ public class ArbitraryDataFileRequestThread implements Runnable {
return;
}
EventBus.INSTANCE.notify(
new DataMonitorEvent(
System.currentTimeMillis(),
arbitraryTransactionData.getIdentifier(),
arbitraryTransactionData.getName(),
arbitraryTransactionData.getService().name(),
"fetching file from peer",
arbitraryTransactionData.getTimestamp(),
arbitraryTransactionData.getTimestamp()
)
);
LOGGER.trace("Fetching file {} from peer {} via request thread...", hash58, peer);
arbitraryDataFileManager.fetchArbitraryDataFiles(repository, peer, signature, arbitraryTransactionData, Arrays.asList(hash));
EventBus.INSTANCE.notify(
new DataMonitorEvent(
System.currentTimeMillis(),
arbitraryTransactionData.getIdentifier(),
arbitraryTransactionData.getName(),
arbitraryTransactionData.getService().name(),
"fetched file from peer",
arbitraryTransactionData.getTimestamp(),
arbitraryTransactionData.getTimestamp()
)
);
} catch (DataException e) {
LOGGER.debug("Unable to process file hashes: {}", e.getMessage());
}

View File

@ -10,6 +10,8 @@ import org.qortal.arbitrary.misc.Service;
import org.qortal.controller.Controller;
import org.qortal.data.transaction.ArbitraryTransactionData;
import org.qortal.data.transaction.TransactionData;
import org.qortal.event.DataMonitorEvent;
import org.qortal.event.EventBus;
import org.qortal.network.Network;
import org.qortal.network.Peer;
import org.qortal.repository.DataException;
@ -225,12 +227,35 @@ public class ArbitraryDataManager extends Thread {
// Skip transactions that we don't need to proactively store data for
if (!storageManager.shouldPreFetchData(repository, arbitraryTransactionData)) {
iterator.remove();
EventBus.INSTANCE.notify(
new DataMonitorEvent(
System.currentTimeMillis(),
arbitraryTransactionData.getIdentifier(),
arbitraryTransactionData.getName(),
arbitraryTransactionData.getService().name(),
"don't need to proactively store, skipping",
arbitraryTransactionData.getTimestamp(),
arbitraryTransactionData.getTimestamp()
)
);
continue;
}
// Remove transactions that we already have local data for
if (hasLocalData(arbitraryTransaction)) {
iterator.remove();
EventBus.INSTANCE.notify(
new DataMonitorEvent(
System.currentTimeMillis(),
arbitraryTransactionData.getIdentifier(),
arbitraryTransactionData.getName(),
arbitraryTransactionData.getService().name(),
"already have local data, skipping",
arbitraryTransactionData.getTimestamp(),
arbitraryTransactionData.getTimestamp()
)
);
}
}
@ -248,8 +273,21 @@ public class ArbitraryDataManager extends Thread {
// Check to see if we have had a more recent PUT
ArbitraryTransactionData arbitraryTransactionData = ArbitraryTransactionUtils.fetchTransactionData(repository, signature);
boolean hasMoreRecentPutTransaction = ArbitraryTransactionUtils.hasMoreRecentPutTransaction(repository, arbitraryTransactionData);
if (hasMoreRecentPutTransaction) {
Optional<ArbitraryTransactionData> moreRecentPutTransaction = ArbitraryTransactionUtils.hasMoreRecentPutTransaction(repository, arbitraryTransactionData);
if (moreRecentPutTransaction.isPresent()) {
EventBus.INSTANCE.notify(
new DataMonitorEvent(
System.currentTimeMillis(),
arbitraryTransactionData.getIdentifier(),
arbitraryTransactionData.getName(),
arbitraryTransactionData.getService().name(),
"not fetching old data",
arbitraryTransactionData.getTimestamp(),
moreRecentPutTransaction.get().getTimestamp()
)
);
// There is a more recent PUT transaction than the one we are currently processing.
// When a PUT is issued, it replaces any layers that would have been there before.
// Therefore any data relating to this older transaction is no longer needed and we
@ -257,10 +295,34 @@ public class ArbitraryDataManager extends Thread {
continue;
}
EventBus.INSTANCE.notify(
new DataMonitorEvent(
System.currentTimeMillis(),
arbitraryTransactionData.getIdentifier(),
arbitraryTransactionData.getName(),
arbitraryTransactionData.getService().name(),
"fetching data",
arbitraryTransactionData.getTimestamp(),
arbitraryTransactionData.getTimestamp()
)
);
// Ask our connected peers if they have files for this signature
// This process automatically then fetches the files themselves if a peer is found
fetchData(arbitraryTransactionData);
EventBus.INSTANCE.notify(
new DataMonitorEvent(
System.currentTimeMillis(),
arbitraryTransactionData.getIdentifier(),
arbitraryTransactionData.getName(),
arbitraryTransactionData.getService().name(),
"fetched data",
arbitraryTransactionData.getTimestamp(),
arbitraryTransactionData.getTimestamp()
)
);
} catch (DataException e) {
LOGGER.error("Repository issue when fetching arbitrary transaction data", e);
}
@ -330,8 +392,22 @@ public class ArbitraryDataManager extends Thread {
// Check to see if we have had a more recent PUT
ArbitraryTransactionData arbitraryTransactionData = ArbitraryTransactionUtils.fetchTransactionData(repository, signature);
boolean hasMoreRecentPutTransaction = ArbitraryTransactionUtils.hasMoreRecentPutTransaction(repository, arbitraryTransactionData);
if (hasMoreRecentPutTransaction) {
Optional<ArbitraryTransactionData> moreRecentPutTransaction = ArbitraryTransactionUtils.hasMoreRecentPutTransaction(repository, arbitraryTransactionData);
if (moreRecentPutTransaction.isPresent()) {
EventBus.INSTANCE.notify(
new DataMonitorEvent(
System.currentTimeMillis(),
arbitraryTransactionData.getIdentifier(),
arbitraryTransactionData.getName(),
arbitraryTransactionData.getService().name(),
"not fetching old metadata",
arbitraryTransactionData.getTimestamp(),
moreRecentPutTransaction.get().getTimestamp()
)
);
// There is a more recent PUT transaction than the one we are currently processing.
// When a PUT is issued, it replaces any layers that would have been there before.
// Therefore any data relating to this older transaction is no longer needed and we
@ -339,9 +415,32 @@ public class ArbitraryDataManager extends Thread {
continue;
}
EventBus.INSTANCE.notify(
new DataMonitorEvent(
System.currentTimeMillis(),
arbitraryTransactionData.getIdentifier(),
arbitraryTransactionData.getName(),
arbitraryTransactionData.getService().name(),
"fetching metadata",
arbitraryTransactionData.getTimestamp(),
arbitraryTransactionData.getTimestamp()
)
);
// Ask our connected peers if they have metadata for this signature
fetchMetadata(arbitraryTransactionData);
EventBus.INSTANCE.notify(
new DataMonitorEvent(
System.currentTimeMillis(),
arbitraryTransactionData.getIdentifier(),
arbitraryTransactionData.getName(),
arbitraryTransactionData.getService().name(),
"fetched metadata",
arbitraryTransactionData.getTimestamp(),
arbitraryTransactionData.getTimestamp()
)
);
} catch (DataException e) {
LOGGER.error("Repository issue when fetching arbitrary transaction data", e);
}

View File

@ -0,0 +1,57 @@
package org.qortal.data.arbitrary;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
@XmlAccessorType(XmlAccessType.FIELD)
public class DataMonitorInfo {
private long timestamp;
private String identifier;
private String name;
private String service;
private String description;
private long transactionTimestamp;
private long latestPutTimestamp;
public DataMonitorInfo() {
}
public DataMonitorInfo(long timestamp, String identifier, String name, String service, String description, long transactionTimestamp, long latestPutTimestamp) {
this.timestamp = timestamp;
this.identifier = identifier;
this.name = name;
this.service = service;
this.description = description;
this.transactionTimestamp = transactionTimestamp;
this.latestPutTimestamp = latestPutTimestamp;
}
public long getTimestamp() {
return timestamp;
}
public String getIdentifier() {
return identifier;
}
public String getName() {
return name;
}
public String getService() {
return service;
}
public String getDescription() {
return description;
}
public long getTransactionTimestamp() {
return transactionTimestamp;
}
public long getLatestPutTimestamp() {
return latestPutTimestamp;
}
}

View File

@ -200,4 +200,26 @@ public class ArbitraryTransactionData extends TransactionData {
return this.payments;
}
@Override
public String toString() {
return "ArbitraryTransactionData{" +
"version=" + version +
", service=" + service +
", nonce=" + nonce +
", size=" + size +
", name='" + name + '\'' +
", identifier='" + identifier + '\'' +
", method=" + method +
", compression=" + compression +
", dataType=" + dataType +
", type=" + type +
", timestamp=" + timestamp +
", fee=" + fee +
", txGroupId=" + txGroupId +
", blockHeight=" + blockHeight +
", blockSequence=" + blockSequence +
", approvalStatus=" + approvalStatus +
", approvalHeight=" + approvalHeight +
'}';
}
}

View File

@ -0,0 +1,57 @@
package org.qortal.event;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
@XmlAccessorType(XmlAccessType.FIELD)
public class DataMonitorEvent implements Event{
private long timestamp;
private String identifier;
private String name;
private String service;
private String description;
private long transactionTimestamp;
private long latestPutTimestamp;
public DataMonitorEvent() {
}
public DataMonitorEvent(long timestamp, String identifier, String name, String service, String description, long transactionTimestamp, long latestPutTimestamp) {
this.timestamp = timestamp;
this.identifier = identifier;
this.name = name;
this.service = service;
this.description = description;
this.transactionTimestamp = transactionTimestamp;
this.latestPutTimestamp = latestPutTimestamp;
}
public long getTimestamp() {
return timestamp;
}
public String getIdentifier() {
return identifier;
}
public String getName() {
return name;
}
public String getService() {
return service;
}
public String getDescription() {
return description;
}
public long getTransactionTimestamp() {
return transactionTimestamp;
}
public long getLatestPutTimestamp() {
return latestPutTimestamp;
}
}

View File

@ -24,6 +24,7 @@ import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
@ -72,23 +73,23 @@ public class ArbitraryTransactionUtils {
return latestPut;
}
public static boolean hasMoreRecentPutTransaction(Repository repository, ArbitraryTransactionData arbitraryTransactionData) {
public static Optional<ArbitraryTransactionData> hasMoreRecentPutTransaction(Repository repository, ArbitraryTransactionData arbitraryTransactionData) {
byte[] signature = arbitraryTransactionData.getSignature();
if (signature == null) {
// We can't make a sensible decision without a signature
// so it's best to assume there is nothing newer
return false;
return Optional.empty();
}
ArbitraryTransactionData latestPut = ArbitraryTransactionUtils.fetchLatestPut(repository, arbitraryTransactionData);
if (latestPut == null) {
return false;
return Optional.empty();
}
// If the latest PUT transaction has a newer timestamp, it will override the existing transaction
// Any data relating to the older transaction is no longer needed
boolean hasNewerPut = (latestPut.getTimestamp() > arbitraryTransactionData.getTimestamp());
return hasNewerPut;
return hasNewerPut ? Optional.of(latestPut) : Optional.empty();
}
public static boolean completeFileExists(ArbitraryTransactionData transactionData) throws DataException {