diff --git a/src/main/java/org/qortal/api/ApiService.java b/src/main/java/org/qortal/api/ApiService.java index 25966fa6..5baf2c5d 100644 --- a/src/main/java/org/qortal/api/ApiService.java +++ b/src/main/java/org/qortal/api/ApiService.java @@ -43,6 +43,7 @@ import org.qortal.api.websocket.ActiveChatsWebSocket; import org.qortal.api.websocket.AdminStatusWebSocket; import org.qortal.api.websocket.BlocksWebSocket; import org.qortal.api.websocket.ChatMessagesWebSocket; +import org.qortal.api.websocket.PresenceWebSocket; import org.qortal.api.websocket.TradeBotWebSocket; import org.qortal.api.websocket.TradeOffersWebSocket; import org.qortal.settings.Settings; @@ -200,6 +201,7 @@ public class ApiService { context.addServlet(ChatMessagesWebSocket.class, "/websockets/chat/messages"); context.addServlet(TradeOffersWebSocket.class, "/websockets/crosschain/tradeoffers"); context.addServlet(TradeBotWebSocket.class, "/websockets/crosschain/tradebot"); + context.addServlet(PresenceWebSocket.class, "/websockets/presence"); // Start server this.server.start(); diff --git a/src/main/java/org/qortal/api/websocket/PresenceWebSocket.java b/src/main/java/org/qortal/api/websocket/PresenceWebSocket.java new file mode 100644 index 00000000..fb1b2eb2 --- /dev/null +++ b/src/main/java/org/qortal/api/websocket/PresenceWebSocket.java @@ -0,0 +1,190 @@ +package org.qortal.api.websocket; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.Collections; +import java.util.EnumMap; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; + +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage; +import org.eclipse.jetty.websocket.api.annotations.WebSocket; +import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; +import org.qortal.controller.Controller; +import org.qortal.data.transaction.PresenceTransactionData; +import org.qortal.data.transaction.TransactionData; +import org.qortal.event.Event; +import org.qortal.event.EventBus; +import org.qortal.event.Listener; +import org.qortal.repository.DataException; +import org.qortal.repository.Repository; +import org.qortal.repository.RepositoryManager; +import org.qortal.transaction.PresenceTransaction.PresenceType; +import org.qortal.transaction.Transaction.TransactionType; +import org.qortal.utils.Base58; + +@WebSocket +@SuppressWarnings("serial") +public class PresenceWebSocket extends ApiWebSocket implements Listener { + + @XmlAccessorType(XmlAccessType.FIELD) + @SuppressWarnings("unused") + private static class PresenceInfo { + public final PresenceType presenceType; + public final String publicKey; + public final long timestamp; + + protected PresenceInfo() { + this.presenceType = null; + this.publicKey = null; + this.timestamp = 0L; + } + + public PresenceInfo(PresenceType presenceType, String pubKey58, long timestamp) { + this.presenceType = presenceType; + this.publicKey = pubKey58; + this.timestamp = timestamp; + } + + public PresenceType getPresenceType() { + return this.presenceType; + } + + public String getPublicKey() { + return this.publicKey; + } + + public long getTimestamp() { + return this.timestamp; + } + } + + /** Outer map key is PresenceType (enum), inner map key is public key in base58, inner map value is timestamp */ + private static final Map> currentEntries = Collections.synchronizedMap(new EnumMap<>(PresenceType.class)); + + @Override + public void configure(WebSocketServletFactory factory) { + factory.register(PresenceWebSocket.class); + + try (final Repository repository = RepositoryManager.getRepository()) { + populateCurrentInfo(repository); + } catch (DataException e) { + // How to fail properly? + return; + } + + EventBus.INSTANCE.addListener(this::listen); + } + + @Override + public void listen(Event event) { + if (!(event instanceof Controller.NewTransactionEvent)) + return; + + TransactionData transactionData = ((Controller.NewTransactionEvent) event).getTransactionData(); + + if (transactionData.getType() != TransactionType.PRESENCE) + return; + + PresenceTransactionData presenceData = (PresenceTransactionData) transactionData; + PresenceType presenceType = presenceData.getPresenceType(); + + // Put/replace for this publickey making sure we keep newest timestamp + String pubKey58 = Base58.encode(presenceData.getCreatorPublicKey()); + Long ourTimestamp = presenceData.getTimestamp(); + Long computedTimestamp = mergePresence(presenceType, pubKey58, ourTimestamp); + + if (!computedTimestamp.equals(ourTimestamp)) + // nothing changed + return; + + List presenceInfo = Collections.singletonList(new PresenceInfo(presenceType, pubKey58, computedTimestamp)); + + // Notify sessions + for (Session session : getSessions()) + sendPresenceInfo(session, presenceInfo); + } + + @OnWebSocketConnect + @Override + public void onWebSocketConnect(Session session) { + List presenceInfo; + + synchronized (currentEntries) { + presenceInfo = currentEntries.entrySet().stream() + .flatMap(entry -> entry.getValue().entrySet().stream().map(innerEntry -> new PresenceInfo(entry.getKey(), innerEntry.getKey(), innerEntry.getValue()))) + .collect(Collectors.toList()); + } + + if (!sendPresenceInfo(session, presenceInfo)) { + session.close(4002, "websocket issue"); + return; + } + + super.onWebSocketConnect(session); + } + + @OnWebSocketClose + @Override + public void onWebSocketClose(Session session, int statusCode, String reason) { + super.onWebSocketClose(session, statusCode, reason); + } + + @OnWebSocketError + public void onWebSocketError(Session session, Throwable throwable) { + /* ignored */ + } + + @OnWebSocketMessage + public void onWebSocketMessage(Session session, String message) { + /* ignored */ + } + + private boolean sendPresenceInfo(Session session, List presenceInfo) { + try { + StringWriter stringWriter = new StringWriter(); + marshall(stringWriter, presenceInfo); + + String output = stringWriter.toString(); + session.getRemote().sendStringByFuture(output); + } catch (IOException e) { + // No output this time? + return false; + } + + return true; + } + + private static void populateCurrentInfo(Repository repository) throws DataException { + // We want ALL PRESENCE transactions + + List presenceTransactionsData = repository.getTransactionRepository().getUnconfirmedTransactions(TransactionType.PRESENCE, null); + + for (TransactionData transactionData : presenceTransactionsData) { + PresenceTransactionData presenceData = (PresenceTransactionData) transactionData; + + PresenceType presenceType = presenceData.getPresenceType(); + + // Put/replace for this publickey making sure we keep newest timestamp + String pubKey58 = Base58.encode(presenceData.getCreatorPublicKey()); + Long ourTimestamp = presenceData.getTimestamp(); + + mergePresence(presenceType, pubKey58, ourTimestamp); + } + } + + private static Long mergePresence(PresenceType presenceType, String pubKey58, Long ourTimestamp) { + Map typedPubkeyTimestamps = currentEntries.computeIfAbsent(presenceType, someType -> Collections.synchronizedMap(new HashMap<>())); + return typedPubkeyTimestamps.compute(pubKey58, (somePubKey58, currentTimestamp) -> (currentTimestamp == null || currentTimestamp < ourTimestamp) ? ourTimestamp : currentTimestamp); + } + +}