Rework ApiWebSocket so it can manage sessions and in readiness to conversion from "notifiers" to event-bus

This commit is contained in:
catbref 2020-08-05 13:23:07 +01:00
parent ce5cf87094
commit d507383487
6 changed files with 73 additions and 51 deletions

View File

@ -10,7 +10,6 @@ import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
import org.qortal.controller.ChatNotifier;
import org.qortal.crypto.Crypto;
@ -22,7 +21,7 @@ import org.qortal.repository.RepositoryManager;
@WebSocket
@SuppressWarnings("serial")
public class ActiveChatsWebSocket extends WebSocketServlet implements ApiWebSocket {
public class ActiveChatsWebSocket extends ApiWebSocket {
@Override
public void configure(WebSocketServletFactory factory) {
@ -31,7 +30,7 @@ public class ActiveChatsWebSocket extends WebSocketServlet implements ApiWebSock
@OnWebSocketConnect
public void onWebSocketConnect(Session session) {
Map<String, String> pathParams = this.getPathParams(session, "/{address}");
Map<String, String> pathParams = getPathParams(session, "/{address}");
String address = pathParams.get("address");
if (address == null || !Crypto.isValidAddress(address)) {
@ -70,7 +69,7 @@ public class ActiveChatsWebSocket extends WebSocketServlet implements ApiWebSock
StringWriter stringWriter = new StringWriter();
this.marshall(stringWriter, activeChats);
marshall(stringWriter, activeChats);
// Only output if something has changed
String output = stringWriter.toString();

View File

@ -9,7 +9,6 @@ import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
import org.qortal.api.model.NodeStatus;
import org.qortal.controller.StatusNotifier;
@ -19,7 +18,7 @@ import org.qortal.repository.RepositoryManager;
@WebSocket
@SuppressWarnings("serial")
public class AdminStatusWebSocket extends WebSocketServlet implements ApiWebSocket {
public class AdminStatusWebSocket extends ApiWebSocket {
@Override
public void configure(WebSocketServletFactory factory) {
@ -51,7 +50,7 @@ public class AdminStatusWebSocket extends WebSocketServlet implements ApiWebSock
StringWriter stringWriter = new StringWriter();
this.marshall(stringWriter, nodeStatus);
marshall(stringWriter, nodeStatus);
// Only output if something has changed
String output = stringWriter.toString();

View File

@ -3,7 +3,10 @@ package org.qortal.api.websocket;
import java.io.IOException;
import java.io.StringWriter;
import java.io.Writer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.xml.bind.JAXBContext;
@ -13,24 +16,28 @@ import javax.xml.bind.Marshaller;
import org.eclipse.jetty.http.pathmap.UriTemplatePathSpec;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
import org.eclipse.persistence.jaxb.JAXBContextFactory;
import org.eclipse.persistence.jaxb.MarshallerProperties;
import org.qortal.api.ApiError;
import org.qortal.api.ApiErrorRoot;
interface ApiWebSocket {
@SuppressWarnings("serial")
abstract class ApiWebSocket extends WebSocketServlet {
default String getPathInfo(Session session) {
private static final Map<Class<? extends ApiWebSocket>, List<Session>> SESSIONS_BY_CLASS = new HashMap<>();
protected static String getPathInfo(Session session) {
ServletUpgradeRequest upgradeRequest = (ServletUpgradeRequest) session.getUpgradeRequest();
return upgradeRequest.getHttpServletRequest().getPathInfo();
}
default Map<String, String> getPathParams(Session session, String pathSpec) {
protected static Map<String, String> getPathParams(Session session, String pathSpec) {
UriTemplatePathSpec uriTemplatePathSpec = new UriTemplatePathSpec(pathSpec);
return uriTemplatePathSpec.getPathParams(this.getPathInfo(session));
return uriTemplatePathSpec.getPathParams(getPathInfo(session));
}
default void sendError(Session session, ApiError apiError) {
protected static void sendError(Session session, ApiError apiError) {
ApiErrorRoot apiErrorRoot = new ApiErrorRoot();
apiErrorRoot.setApiError(apiError);
@ -43,7 +50,7 @@ interface ApiWebSocket {
}
}
default void marshall(Writer writer, Object object) throws IOException {
protected static void marshall(Writer writer, Object object) throws IOException {
Marshaller marshaller = createMarshaller(object.getClass());
try {
@ -53,7 +60,7 @@ interface ApiWebSocket {
}
}
default void marshall(Writer writer, Collection<?> collection) throws IOException {
protected static void marshall(Writer writer, Collection<?> collection) throws IOException {
// If collection is empty then we're returning "[]" anyway
if (collection.isEmpty()) {
writer.append("[]");
@ -92,4 +99,22 @@ interface ApiWebSocket {
}
}
public void onWebSocketConnect(Session session) {
synchronized (SESSIONS_BY_CLASS) {
SESSIONS_BY_CLASS.computeIfAbsent(this.getClass(), clazz -> new ArrayList<>()).add(session);
}
}
public void onWebSocketClose(Session session, int statusCode, String reason) {
synchronized (SESSIONS_BY_CLASS) {
SESSIONS_BY_CLASS.get(this.getClass()).remove(session);
}
}
protected List<Session> getSessions() {
synchronized (SESSIONS_BY_CLASS) {
return new ArrayList<>(SESSIONS_BY_CLASS.get(this.getClass()));
}
}
}

View File

@ -8,7 +8,6 @@ import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
import org.qortal.api.ApiError;
import org.qortal.controller.BlockNotifier;
@ -20,7 +19,7 @@ import org.qortal.utils.Base58;
@WebSocket
@SuppressWarnings("serial")
public class BlocksWebSocket extends WebSocketServlet implements ApiWebSocket {
public class BlocksWebSocket extends ApiWebSocket {
@Override
public void configure(WebSocketServletFactory factory) {
@ -98,7 +97,7 @@ public class BlocksWebSocket extends WebSocketServlet implements ApiWebSocket {
StringWriter stringWriter = new StringWriter();
try {
this.marshall(stringWriter, blockData);
marshall(stringWriter, blockData);
session.getRemote().sendString(stringWriter.toString());
} catch (IOException e) {

View File

@ -12,7 +12,6 @@ import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
import org.qortal.controller.ChatNotifier;
import org.qortal.data.chat.ChatMessage;
@ -23,7 +22,7 @@ import org.qortal.repository.RepositoryManager;
@WebSocket
@SuppressWarnings("serial")
public class ChatMessagesWebSocket extends WebSocketServlet implements ApiWebSocket {
public class ChatMessagesWebSocket extends ApiWebSocket {
@Override
public void configure(WebSocketServletFactory factory) {
@ -123,7 +122,7 @@ public class ChatMessagesWebSocket extends WebSocketServlet implements ApiWebSoc
StringWriter stringWriter = new StringWriter();
try {
this.marshall(stringWriter, chatMessages);
marshall(stringWriter, chatMessages);
session.getRemote().sendString(stringWriter.toString());
} catch (IOException e) {

View File

@ -13,7 +13,6 @@ import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
import org.qortal.api.model.CrossChainOfferSummary;
import org.qortal.controller.BlockNotifier;
@ -28,7 +27,7 @@ import org.qortal.utils.NTP;
@WebSocket
@SuppressWarnings("serial")
public class TradeOffersWebSocket extends WebSocketServlet implements ApiWebSocket {
public class TradeOffersWebSocket extends ApiWebSocket {
@Override
public void configure(WebSocketServletFactory factory) {
@ -116,46 +115,48 @@ public class TradeOffersWebSocket extends WebSocketServlet implements ApiWebSock
}
private void onNotify(Session session, BlockData blockData, final Map<String, BTCACCT.Mode> previousAtModes) {
List<CrossChainOfferSummary> crossChainOfferSummaries = null;
try (final Repository repository = RepositoryManager.getRepository()) {
// Find any new trade ATs since this block
final Boolean isFinished = null;
final Integer dataByteOffset = null;
final Long expectedValue = null;
final Integer minimumFinalHeight = blockData.getHeight();
List<ATStateData> atStates = repository.getATRepository().getMatchingFinalATStates(BTCACCT.CODE_BYTES_HASH,
isFinished, dataByteOffset, expectedValue, minimumFinalHeight,
null, null, null);
if (atStates == null)
return;
crossChainOfferSummaries = produceSummaries(repository, atStates, blockData.getTimestamp());
} catch (DataException e) {
// No output this time
}
synchronized (previousAtModes) { //NOSONAR squid:S2445 suppressed because previousAtModes is final and curried in lambda
try (final Repository repository = RepositoryManager.getRepository()) {
// Find any new trade ATs since this block
final Boolean isFinished = null;
final Integer dataByteOffset = null;
final Long expectedValue = null;
final Integer minimumFinalHeight = blockData.getHeight();
// Remove any entries unchanged from last time
crossChainOfferSummaries.removeIf(offerSummary -> previousAtModes.get(offerSummary.getQortalAtAddress()) == offerSummary.getMode());
List<ATStateData> atStates = repository.getATRepository().getMatchingFinalATStates(BTCACCT.CODE_BYTES_HASH,
isFinished, dataByteOffset, expectedValue, minimumFinalHeight,
null, null, null);
// Don't send anything if no results
if (crossChainOfferSummaries.isEmpty())
return;
if (atStates == null)
return;
final boolean wasSent = sendOfferSummaries(session, crossChainOfferSummaries);
List<CrossChainOfferSummary> crossChainOfferSummaries = produceSummaries(repository, atStates, blockData.getTimestamp());
if (!wasSent)
return;
// Remove any entries unchanged from last time
crossChainOfferSummaries.removeIf(offerSummary -> previousAtModes.get(offerSummary.getQortalAtAddress()) == offerSummary.getMode());
// Don't send anything if no results
if (crossChainOfferSummaries.isEmpty())
return;
final boolean wasSent = sendOfferSummaries(session, crossChainOfferSummaries);
if (!wasSent)
return;
previousAtModes.putAll(crossChainOfferSummaries.stream().collect(Collectors.toMap(CrossChainOfferSummary::getQortalAtAddress, CrossChainOfferSummary::getMode)));
} catch (DataException e) {
// No output this time
}
previousAtModes.putAll(crossChainOfferSummaries.stream().collect(Collectors.toMap(CrossChainOfferSummary::getQortalAtAddress, CrossChainOfferSummary::getMode)));
}
}
private boolean sendOfferSummaries(Session session, List<CrossChainOfferSummary> crossChainOfferSummaries) {
try {
StringWriter stringWriter = new StringWriter();
this.marshall(stringWriter, crossChainOfferSummaries);
marshall(stringWriter, crossChainOfferSummaries);
String output = stringWriter.toString();
session.getRemote().sendStringByFuture(output);