diff --git a/src/main/java/org/qortal/api/websocket/AdminStatusWebSocket.java b/src/main/java/org/qortal/api/websocket/AdminStatusWebSocket.java index 173a3abf..6556d140 100644 --- a/src/main/java/org/qortal/api/websocket/AdminStatusWebSocket.java +++ b/src/main/java/org/qortal/api/websocket/AdminStatusWebSocket.java @@ -13,59 +13,87 @@ import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage; import org.eclipse.jetty.websocket.api.annotations.WebSocket; import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; import org.qortal.api.model.NodeStatus; -import org.qortal.controller.StatusNotifier; -import org.qortal.repository.DataException; -import org.qortal.repository.Repository; -import org.qortal.repository.RepositoryManager; +import org.qortal.controller.Controller; +import org.qortal.event.Event; +import org.qortal.event.EventBus; +import org.qortal.event.Listener; @WebSocket @SuppressWarnings("serial") -public class AdminStatusWebSocket extends ApiWebSocket { +public class AdminStatusWebSocket extends ApiWebSocket implements Listener { + + private static final AtomicReference previousOutput = new AtomicReference<>(null); @Override public void configure(WebSocketServletFactory factory) { factory.register(AdminStatusWebSocket.class); + + try { + previousOutput.set(buildStatusString()); + } catch (IOException e) { + // How to fail properly? + return; + } + + EventBus.INSTANCE.addListener(this::listen); + } + + @Override + public void listen(Event event) { + if (!(event instanceof Controller.StatusChangeEvent)) + return; + + String newOutput; + try { + newOutput = buildStatusString(); + } catch (IOException e) { + // Ignore this time? + return; + } + + if (previousOutput.getAndUpdate(currentValue -> newOutput).equals(newOutput)) + // Output hasn't changed, so don't send anything + return; + + for (Session session : getSessions()) + this.sendStatus(session, newOutput); } @OnWebSocketConnect + @Override public void onWebSocketConnect(Session session) { - AtomicReference previousOutput = new AtomicReference<>(null); + this.sendStatus(session, previousOutput.get()); - StatusNotifier.Listener listener = timestamp -> onNotify(session, previousOutput); - StatusNotifier.getInstance().register(session, listener); - - this.onNotify(session, previousOutput); + super.onWebSocketConnect(session); } @OnWebSocketClose + @Override public void onWebSocketClose(Session session, int statusCode, String reason) { - StatusNotifier.getInstance().deregister(session); + super.onWebSocketClose(session, statusCode, reason); } @OnWebSocketError public void onWebSocketError(Session session, Throwable throwable) { + /* We ignore errors for now, but method here to silence log spam */ } @OnWebSocketMessage public void onWebSocketMessage(Session session, String message) { + /* ignored */ } - private void onNotify(Session session,AtomicReference previousOutput) { - try (final Repository repository = RepositoryManager.getRepository()) { - NodeStatus nodeStatus = new NodeStatus(); + private static String buildStatusString() throws IOException { + NodeStatus nodeStatus = new NodeStatus(); + StringWriter stringWriter = new StringWriter(); + marshall(stringWriter, nodeStatus); + return stringWriter.toString(); + } - StringWriter stringWriter = new StringWriter(); - - marshall(stringWriter, nodeStatus); - - // Only output if something has changed - String output = stringWriter.toString(); - if (output.equals(previousOutput.get())) - return; - - previousOutput.set(output); - session.getRemote().sendStringByFuture(output); - } catch (DataException | IOException | WebSocketException e) { + private void sendStatus(Session session, String status) { + try { + session.getRemote().sendStringByFuture(status); + } catch (WebSocketException e) { // No output this time? } } diff --git a/src/main/java/org/qortal/api/websocket/BlocksWebSocket.java b/src/main/java/org/qortal/api/websocket/BlocksWebSocket.java index 46a5fd84..6698dd8b 100644 --- a/src/main/java/org/qortal/api/websocket/BlocksWebSocket.java +++ b/src/main/java/org/qortal/api/websocket/BlocksWebSocket.java @@ -14,7 +14,11 @@ import org.eclipse.jetty.websocket.api.annotations.WebSocket; import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; import org.qortal.api.ApiError; import org.qortal.api.model.BlockInfo; -import org.qortal.controller.BlockNotifier; +import org.qortal.controller.Controller; +import org.qortal.data.block.BlockData; +import org.qortal.event.Event; +import org.qortal.event.EventBus; +import org.qortal.event.Listener; import org.qortal.repository.DataException; import org.qortal.repository.Repository; import org.qortal.repository.RepositoryManager; @@ -22,26 +26,42 @@ import org.qortal.utils.Base58; @WebSocket @SuppressWarnings("serial") -public class BlocksWebSocket extends ApiWebSocket { +public class BlocksWebSocket extends ApiWebSocket implements Listener { @Override public void configure(WebSocketServletFactory factory) { factory.register(BlocksWebSocket.class); + + EventBus.INSTANCE.addListener(this::listen); + } + + @Override + public void listen(Event event) { + if (!(event instanceof Controller.NewBlockEvent)) + return; + + BlockData blockData = ((Controller.NewBlockEvent) event).getBlockData(); + BlockInfo blockInfo = new BlockInfo(blockData); + + for (Session session : getSessions()) + sendBlockInfo(session, blockInfo); } @OnWebSocketConnect + @Override public void onWebSocketConnect(Session session) { - BlockNotifier.Listener listener = blockInfo -> onNotify(session, blockInfo); - BlockNotifier.getInstance().register(session, listener); + super.onWebSocketConnect(session); } @OnWebSocketClose + @Override public void onWebSocketClose(Session session, int statusCode, String reason) { - BlockNotifier.getInstance().deregister(session); + super.onWebSocketClose(session, statusCode, reason); } @OnWebSocketError public void onWebSocketError(Session session, Throwable throwable) { + /* We ignore errors for now, but method here to silence log spam */ } @OnWebSocketMessage @@ -71,7 +91,7 @@ public class BlocksWebSocket extends ApiWebSocket { return; } - onNotify(session, blockInfos.get(0)); + sendBlockInfo(session, blockInfos.get(0)); } catch (DataException e) { sendError(session, ApiError.REPOSITORY_ISSUE); } @@ -100,13 +120,13 @@ public class BlocksWebSocket extends ApiWebSocket { return; } - onNotify(session, blockInfos.get(0)); + sendBlockInfo(session, blockInfos.get(0)); } catch (DataException e) { sendError(session, ApiError.REPOSITORY_ISSUE); } } - private void onNotify(Session session, BlockInfo blockInfo) { + private void sendBlockInfo(Session session, BlockInfo blockInfo) { StringWriter stringWriter = new StringWriter(); try { diff --git a/src/main/java/org/qortal/api/websocket/TradeOffersWebSocket.java b/src/main/java/org/qortal/api/websocket/TradeOffersWebSocket.java index 2169dffa..78f2ce2a 100644 --- a/src/main/java/org/qortal/api/websocket/TradeOffersWebSocket.java +++ b/src/main/java/org/qortal/api/websocket/TradeOffersWebSocket.java @@ -6,6 +6,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Predicate; import java.util.stream.Collectors; import org.eclipse.jetty.websocket.api.Session; @@ -14,12 +15,15 @@ import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect; import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage; import org.eclipse.jetty.websocket.api.annotations.WebSocket; import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; -import org.qortal.api.model.BlockInfo; import org.qortal.api.model.CrossChainOfferSummary; -import org.qortal.controller.BlockNotifier; +import org.qortal.controller.Controller; import org.qortal.crosschain.BTCACCT; import org.qortal.data.at.ATStateData; +import org.qortal.data.block.BlockData; import org.qortal.data.crosschain.CrossChainTradeData; +import org.qortal.event.Event; +import org.qortal.event.EventBus; +import org.qortal.event.Listener; import org.qortal.repository.DataException; import org.qortal.repository.Repository; import org.qortal.repository.RepositoryManager; @@ -27,120 +31,56 @@ import org.qortal.utils.NTP; @WebSocket @SuppressWarnings("serial") -public class TradeOffersWebSocket extends ApiWebSocket { +public class TradeOffersWebSocket extends ApiWebSocket implements Listener { + + private static final Map previousAtModes = new HashMap<>(); + + // OFFERING + private static final List currentSummaries = new ArrayList<>(); + // REDEEMED/REFUNDED/CANCELLED + private static final List historicSummaries = new ArrayList<>(); + + private static final Predicate isCurrent = offerSummary + -> offerSummary.getMode() == BTCACCT.Mode.OFFERING; + + private static final Predicate isHistoric = offerSummary + -> offerSummary.getMode() == BTCACCT.Mode.REDEEMED + || offerSummary.getMode() == BTCACCT.Mode.REFUNDED + || offerSummary.getMode() == BTCACCT.Mode.CANCELLED; + @Override public void configure(WebSocketServletFactory factory) { factory.register(TradeOffersWebSocket.class); - } - - @OnWebSocketConnect - public void onWebSocketConnect(Session session) { - Map> queryParams = session.getUpgradeRequest().getParameterMap(); - - final boolean includeHistoric = queryParams.get("includeHistoric") != null; - final Map previousAtModes = new HashMap<>(); - List crossChainOfferSummaries; try (final Repository repository = RepositoryManager.getRepository()) { - List initialAtStates; - - // We want ALL OFFERING trades - Boolean isFinished = Boolean.FALSE; - Integer dataByteOffset = BTCACCT.MODE_BYTE_OFFSET; - Long expectedValue = (long) BTCACCT.Mode.OFFERING.value; - Integer minimumFinalHeight = null; - - initialAtStates = repository.getATRepository().getMatchingFinalATStates(BTCACCT.CODE_BYTES_HASH, - isFinished, dataByteOffset, expectedValue, minimumFinalHeight, - null, null, null); - - if (initialAtStates == null) { - session.close(4001, "repository issue fetching OFFERING trades"); - return; - } - - // Save initial AT modes - previousAtModes.putAll(initialAtStates.stream().collect(Collectors.toMap(ATStateData::getATAddress, atState -> BTCACCT.Mode.OFFERING))); - - // Convert to offer summaries - crossChainOfferSummaries = produceSummaries(repository, initialAtStates, null); - - if (includeHistoric) { - // We also want REDEEMED/REFUNDED/CANCELLED trades over the last 24 hours - long timestamp = NTP.getTime() - 24 * 60 * 60 * 1000L; - minimumFinalHeight = repository.getBlockRepository().getHeightFromTimestamp(timestamp); - - if (minimumFinalHeight != 0) { - isFinished = Boolean.TRUE; - dataByteOffset = null; - expectedValue = null; - ++minimumFinalHeight; // because height is just *before* timestamp - - List historicAtStates = repository.getATRepository().getMatchingFinalATStates(BTCACCT.CODE_BYTES_HASH, - isFinished, dataByteOffset, expectedValue, minimumFinalHeight, - null, null, null); - - if (historicAtStates == null) { - session.close(4002, "repository issue fetching historic trades"); - return; - } - - for (ATStateData historicAtState : historicAtStates) { - CrossChainOfferSummary historicOfferSummary = produceSummary(repository, historicAtState, null); - - switch (historicOfferSummary.getMode()) { - case REDEEMED: - case REFUNDED: - case CANCELLED: - break; - - default: - continue; - } - - // Add summary to initial burst - crossChainOfferSummaries.add(historicOfferSummary); - - // Save initial AT mode - previousAtModes.put(historicAtState.getATAddress(), historicOfferSummary.getMode()); - } - } - } + populateCurrentSummaries(repository); + populateHistoricSummaries(repository); } catch (DataException e) { - session.close(4003, "generic repository issue"); + // How to fail properly? return; } - if (!sendOfferSummaries(session, crossChainOfferSummaries)) { - session.close(4004, "websocket issue"); + EventBus.INSTANCE.addListener(this::listen); + } + + @Override + public void listen(Event event) { + if (!(event instanceof Controller.NewBlockEvent)) return; - } - BlockNotifier.Listener listener = blockInfo -> onNotify(session, blockInfo, previousAtModes); - BlockNotifier.getInstance().register(session, listener); - } + BlockData blockData = ((Controller.NewBlockEvent) event).getBlockData(); - @OnWebSocketClose - public void onWebSocketClose(Session session, int statusCode, String reason) { - BlockNotifier.getInstance().deregister(session); - } - - @OnWebSocketMessage - public void onWebSocketMessage(Session session, String message) { - /* ignored */ - } - - private void onNotify(Session session, BlockInfo blockInfo, final Map previousAtModes) { - List crossChainOfferSummaries = null; + // Process any new info + List crossChainOfferSummaries; try (final Repository repository = RepositoryManager.getRepository()) { // Find any new trade ATs since this block final Boolean isFinished = null; final Integer dataByteOffset = null; final Long expectedValue = null; - final Integer minimumFinalHeight = blockInfo.getHeight(); + final Integer minimumFinalHeight = blockData.getHeight(); List atStates = repository.getATRepository().getMatchingFinalATStates(BTCACCT.CODE_BYTES_HASH, isFinished, dataByteOffset, expectedValue, minimumFinalHeight, @@ -149,12 +89,13 @@ public class TradeOffersWebSocket extends ApiWebSocket { if (atStates == null) return; - crossChainOfferSummaries = produceSummaries(repository, atStates, blockInfo.getTimestamp()); + crossChainOfferSummaries = produceSummaries(repository, atStates, blockData.getTimestamp()); } catch (DataException e) { // No output this time + return; } - synchronized (previousAtModes) { //NOSONAR squid:S2445 suppressed because previousAtModes is final and curried in lambda + synchronized (previousAtModes) { // Remove any entries unchanged from last time crossChainOfferSummaries.removeIf(offerSummary -> previousAtModes.get(offerSummary.getQortalAtAddress()) == offerSummary.getMode()); @@ -162,13 +103,63 @@ public class TradeOffersWebSocket extends ApiWebSocket { if (crossChainOfferSummaries.isEmpty()) return; - final boolean wasSent = sendOfferSummaries(session, crossChainOfferSummaries); - - if (!wasSent) - return; - + // Update previousAtModes.putAll(crossChainOfferSummaries.stream().collect(Collectors.toMap(CrossChainOfferSummary::getQortalAtAddress, CrossChainOfferSummary::getMode))); + + synchronized (currentSummaries) { + // Add any OFFERING to 'current' + currentSummaries.addAll(crossChainOfferSummaries.stream().filter(isCurrent).collect(Collectors.toList())); + } + + final long tooOldTimestamp = NTP.getTime() - 24 * 60 * 60 * 1000L; + synchronized (historicSummaries) { + // Add any REDEEMED/REFUNDED/CANCELLED + historicSummaries.addAll(crossChainOfferSummaries.stream().filter(isHistoric).collect(Collectors.toList())); + + // But also remove any that are over 24 hours old + historicSummaries.removeIf(offerSummary -> offerSummary.getTimestamp() < tooOldTimestamp); + } } + + // Notify sessions + for (Session session : getSessions()) + sendOfferSummaries(session, crossChainOfferSummaries); + } + + @OnWebSocketConnect + @Override + public void onWebSocketConnect(Session session) { + Map> queryParams = session.getUpgradeRequest().getParameterMap(); + final boolean includeHistoric = queryParams.get("includeHistoric") != null; + + List crossChainOfferSummaries; + + synchronized (currentSummaries) { + crossChainOfferSummaries = new ArrayList<>(currentSummaries); + } + + if (includeHistoric) + synchronized (historicSummaries) { + crossChainOfferSummaries.addAll(historicSummaries); + } + + if (!sendOfferSummaries(session, crossChainOfferSummaries)) { + session.close(4002, "websocket issue"); + return; + } + + super.onWebSocketConnect(session); + } + + @OnWebSocketClose + @Override + public void onWebSocketClose(Session session, int statusCode, String reason) { + super.onWebSocketClose(session, statusCode, reason); + } + + @OnWebSocketMessage + public void onWebSocketMessage(Session session, String message) { + /* ignored */ } private boolean sendOfferSummaries(Session session, List crossChainOfferSummaries) { @@ -186,6 +177,68 @@ public class TradeOffersWebSocket extends ApiWebSocket { return true; } + private static void populateCurrentSummaries(Repository repository) throws DataException { + // We want ALL OFFERING trades + Boolean isFinished = Boolean.FALSE; + Integer dataByteOffset = BTCACCT.MODE_BYTE_OFFSET; + Long expectedValue = (long) BTCACCT.Mode.OFFERING.value; + Integer minimumFinalHeight = null; + + List initialAtStates = repository.getATRepository().getMatchingFinalATStates(BTCACCT.CODE_BYTES_HASH, + isFinished, dataByteOffset, expectedValue, minimumFinalHeight, + null, null, null); + + if (initialAtStates == null) + throw new DataException("Couldn't fetch current trades from repository"); + + // Save initial AT modes + previousAtModes.putAll(initialAtStates.stream().collect(Collectors.toMap(ATStateData::getATAddress, atState -> BTCACCT.Mode.OFFERING))); + + // Convert to offer summaries + currentSummaries.addAll(produceSummaries(repository, initialAtStates, null)); + } + + private static void populateHistoricSummaries(Repository repository) throws DataException { + // We want REDEEMED/REFUNDED/CANCELLED trades over the last 24 hours + long timestamp = System.currentTimeMillis() - 24 * 60 * 60 * 1000L; + int minimumFinalHeight = repository.getBlockRepository().getHeightFromTimestamp(timestamp); + + if (minimumFinalHeight == 0) + throw new DataException("Couldn't fetch block timestamp from repository"); + + Boolean isFinished = Boolean.TRUE; + Integer dataByteOffset = null; + Long expectedValue = null; + ++minimumFinalHeight; // because height is just *before* timestamp + + List historicAtStates = repository.getATRepository().getMatchingFinalATStates(BTCACCT.CODE_BYTES_HASH, + isFinished, dataByteOffset, expectedValue, minimumFinalHeight, + null, null, null); + + if (historicAtStates == null) + throw new DataException("Couldn't fetch historic trades from repository"); + + for (ATStateData historicAtState : historicAtStates) { + CrossChainOfferSummary historicOfferSummary = produceSummary(repository, historicAtState, null); + + switch (historicOfferSummary.getMode()) { + case REDEEMED: + case REFUNDED: + case CANCELLED: + break; + + default: + continue; + } + + // Add summary to initial burst + historicSummaries.add(historicOfferSummary); + + // Save initial AT mode + previousAtModes.put(historicAtState.getATAddress(), historicOfferSummary.getMode()); + } + } + private static CrossChainOfferSummary produceSummary(Repository repository, ATStateData atState, Long timestamp) throws DataException { CrossChainTradeData crossChainTradeData = BTCACCT.populateTradeData(repository, atState); diff --git a/src/main/java/org/qortal/controller/BlockNotifier.java b/src/main/java/org/qortal/controller/BlockNotifier.java deleted file mode 100644 index ad742a82..00000000 --- a/src/main/java/org/qortal/controller/BlockNotifier.java +++ /dev/null @@ -1,61 +0,0 @@ -package org.qortal.controller; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; - -import org.eclipse.jetty.websocket.api.Session; -import org.qortal.api.model.BlockInfo; -import org.qortal.data.block.BlockData; - -public class BlockNotifier { - - private static BlockNotifier instance; - - @FunctionalInterface - public interface Listener { - void notify(BlockInfo blockInfo); - } - - private Map listenersBySession = new HashMap<>(); - - private BlockNotifier() { - } - - public static synchronized BlockNotifier getInstance() { - if (instance == null) - instance = new BlockNotifier(); - - return instance; - } - - public void register(Session session, Listener listener) { - synchronized (this.listenersBySession) { - this.listenersBySession.put(session, listener); - } - } - - public void deregister(Session session) { - synchronized (this.listenersBySession) { - this.listenersBySession.remove(session); - } - } - - public void onNewBlock(BlockData blockData) { - // Convert BlockData to BlockInfo - BlockInfo blockInfo = new BlockInfo(blockData); - - for (Listener listener : getAllListeners()) - listener.notify(blockInfo); - } - - private Collection getAllListeners() { - // Make a copy of listeners to both avoid concurrent modification - // and reduce synchronization time - synchronized (this.listenersBySession) { - return new ArrayList<>(this.listenersBySession.values()); - } - } - -} diff --git a/src/main/java/org/qortal/controller/Controller.java b/src/main/java/org/qortal/controller/Controller.java index 0e8d09d5..b796b1ff 100644 --- a/src/main/java/org/qortal/controller/Controller.java +++ b/src/main/java/org/qortal/controller/Controller.java @@ -50,6 +50,8 @@ import org.qortal.data.network.PeerData; import org.qortal.data.transaction.ArbitraryTransactionData; import org.qortal.data.transaction.TransactionData; import org.qortal.data.transaction.ArbitraryTransactionData.DataType; +import org.qortal.event.Event; +import org.qortal.event.EventBus; import org.qortal.data.transaction.ChatTransactionData; import org.qortal.globalization.Translator; import org.qortal.gui.Gui; @@ -629,6 +631,11 @@ public class Controller extends Thread { } } + public static class StatusChangeEvent implements Event { + public StatusChangeEvent() { + } + } + private void updateSysTray() { if (NTP.getTime() == null) { SysTray.getInstance().setToolTipText(Translator.INSTANCE.translate("SysTray", "SYNCHRONIZING_CLOCK")); @@ -656,7 +663,7 @@ public class Controller extends Thread { SysTray.getInstance().setToolTipText(tooltip); this.callbackExecutor.execute(() -> { - StatusNotifier.getInstance().onStatusChange(NTP.getTime()); + EventBus.INSTANCE.notify(new StatusChangeEvent()); }); } @@ -783,6 +790,18 @@ public class Controller extends Thread { requestSysTrayUpdate = true; } + public static class NewBlockEvent implements Event { + private final BlockData blockData; + + public NewBlockEvent(BlockData blockData) { + this.blockData = blockData; + } + + public BlockData getBlockData() { + return this.blockData; + } + } + public void onNewBlock(BlockData latestBlockData) { this.setChainTip(latestBlockData); requestSysTrayUpdate = true; @@ -792,7 +811,8 @@ public class Controller extends Thread { Network network = Network.getInstance(); network.broadcast(peer -> network.buildHeightMessage(peer, latestBlockData)); - BlockNotifier.getInstance().onNewBlock(latestBlockData); + // Notify listeners of new block + EventBus.INSTANCE.notify(new NewBlockEvent(latestBlockData)); if (this.notifyGroupMembershipChange) { this.notifyGroupMembershipChange = false; diff --git a/src/main/java/org/qortal/controller/StatusNotifier.java b/src/main/java/org/qortal/controller/StatusNotifier.java deleted file mode 100644 index c142d91c..00000000 --- a/src/main/java/org/qortal/controller/StatusNotifier.java +++ /dev/null @@ -1,56 +0,0 @@ -package org.qortal.controller; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; - -import org.eclipse.jetty.websocket.api.Session; - -public class StatusNotifier { - - private static StatusNotifier instance; - - @FunctionalInterface - public interface Listener { - void notify(long timestamp); - } - - private Map listenersBySession = new HashMap<>(); - - private StatusNotifier() { - } - - public static synchronized StatusNotifier getInstance() { - if (instance == null) - instance = new StatusNotifier(); - - return instance; - } - - public void register(Session session, Listener listener) { - synchronized (this.listenersBySession) { - this.listenersBySession.put(session, listener); - } - } - - public void deregister(Session session) { - synchronized (this.listenersBySession) { - this.listenersBySession.remove(session); - } - } - - public void onStatusChange(long now) { - for (Listener listener : getAllListeners()) - listener.notify(now); - } - - private Collection getAllListeners() { - // Make a copy of listeners to both avoid concurrent modification - // and reduce synchronization time - synchronized (this.listenersBySession) { - return new ArrayList<>(this.listenersBySession.values()); - } - } - -}