Implement web browser socket

This commit is contained in:
Brandon Millman 2018-05-16 11:15:02 -07:00
parent 8fcc7aefa7
commit 16ddd1edfc
12 changed files with 259 additions and 33 deletions

View File

@ -68,7 +68,7 @@
"@types/lodash": "4.14.104", "@types/lodash": "4.14.104",
"@types/mocha": "^2.2.42", "@types/mocha": "^2.2.42",
"@types/query-string": "^5.0.1", "@types/query-string": "^5.0.1",
"@types/websocket": "^0.0.34", "@types/websocket": "^0.0.39",
"async-child-process": "^1.1.1", "async-child-process": "^1.1.1",
"chai": "^4.0.1", "chai": "^4.0.1",
"chai-as-promised": "^7.1.0", "chai-as-promised": "^7.1.0",

View File

@ -0,0 +1,140 @@
import * as _ from 'lodash';
import * as WebSocket from 'websocket';
import {
OrderbookChannel,
OrderbookChannelHandler,
OrderbookChannelMessageTypes,
OrderbookChannelSubscriptionOpts,
WebsocketClientEventType,
WebsocketConnectionEventType,
} from './types';
import { assert } from './utils/assert';
import { orderbookChannelMessageParser } from './utils/orderbook_channel_message_parser';
interface Subscription {
subscriptionOpts: OrderbookChannelSubscriptionOpts;
handler: OrderbookChannelHandler;
}
/**
* This class includes all the functionality related to interacting with a websocket endpoint
* that implements the standard relayer API v0 in a browser environment
*/
export class BrowserWebSocketOrderbookChannel implements OrderbookChannel {
private _apiEndpointUrl: string;
private _clientIfExists?: WebSocket.w3cwebsocket;
private _subscriptions: Subscription[] = [];
/**
* Instantiates a new WebSocketOrderbookChannel instance
* @param url The relayer API base WS url you would like to interact with
* @return An instance of WebSocketOrderbookChannel
*/
constructor(url: string) {
assert.isUri('url', url);
this._apiEndpointUrl = url;
}
/**
* Subscribe to orderbook snapshots and updates from the websocket
* @param subscriptionOpts An OrderbookChannelSubscriptionOpts instance describing which
* token pair to subscribe to
* @param handler An OrderbookChannelHandler instance that responds to various
* channel updates
*/
public subscribe(subscriptionOpts: OrderbookChannelSubscriptionOpts, handler: OrderbookChannelHandler): void {
assert.isOrderbookChannelSubscriptionOpts('subscriptionOpts', subscriptionOpts);
assert.isOrderbookChannelHandler('handler', handler);
const newSubscription: Subscription = {
subscriptionOpts,
handler,
};
this._subscriptions.push(newSubscription);
const subscribeMessage = {
type: 'subscribe',
channel: 'orderbook',
requestId: this._subscriptions.length - 1,
payload: subscriptionOpts,
};
if (_.isUndefined(this._clientIfExists)) {
this._clientIfExists = new WebSocket.w3cwebsocket(this._apiEndpointUrl);
this._clientIfExists.onopen = () => {
this._sendMessage(subscribeMessage);
};
this._clientIfExists.onerror = error => {
this._alertAllHandlersToError(error);
};
this._clientIfExists.onclose = () => {
_.forEach(this._subscriptions, subscription => {
subscription.handler.onClose(this, subscription.subscriptionOpts);
});
};
this._clientIfExists.onmessage = message => {
this._handleWebSocketMessage(message);
};
} else {
this._sendMessage(subscribeMessage);
}
}
/**
* Close the websocket and stop receiving updates
*/
public close(): void {
if (!_.isUndefined(this._clientIfExists)) {
this._clientIfExists.close();
}
}
/**
* Send a message to the client if it has been instantiated and it is open
*/
private _sendMessage(message: any): void {
if (!_.isUndefined(this._clientIfExists) && this._clientIfExists.readyState === WebSocket.w3cwebsocket.OPEN) {
this._clientIfExists.send(JSON.stringify(message));
}
}
/**
* For use in cases where we need to alert all handlers of an error
*/
private _alertAllHandlersToError(error: Error): void {
_.forEach(this._subscriptions, subscription => {
subscription.handler.onError(this, subscription.subscriptionOpts, error);
});
}
private _handleWebSocketMessage(message: any): void {
// if we get a message with no data, alert all handlers and return
if (_.isUndefined(message.data)) {
this._alertAllHandlersToError(new Error(`Message does not contain utf8Data`));
return;
}
// try to parse the message data and route it to the correct handler
try {
const utf8Data = message.data;
const parserResult = orderbookChannelMessageParser.parse(utf8Data);
const subscription = this._subscriptions[parserResult.requestId];
if (_.isUndefined(subscription)) {
this._alertAllHandlersToError(new Error(`Message has unknown requestId: ${utf8Data}`));
return;
}
const handler = subscription.handler;
const subscriptionOpts = subscription.subscriptionOpts;
switch (parserResult.type) {
case OrderbookChannelMessageTypes.Snapshot: {
handler.onSnapshot(this, subscriptionOpts, parserResult.payload);
break;
}
case OrderbookChannelMessageTypes.Update: {
handler.onUpdate(this, subscriptionOpts, parserResult.payload);
break;
}
default: {
handler.onError(
this,
subscriptionOpts,
new Error(`Message has unknown type parameter: ${utf8Data}`),
);
}
}
} catch (error) {
this._alertAllHandlersToError(error);
}
}
}

View File

@ -1,9 +1,11 @@
export { HttpClient } from './http_client'; export { HttpClient } from './http_client';
export { WebSocketOrderbookChannel } from './ws_orderbook_channel'; export { BrowserWebSocketOrderbookChannel } from './browser_ws_orderbook_channel';
export { NodeWebSocketOrderbookChannel } from './node_ws_orderbook_channel';
export { export {
Client, Client,
FeesRequest, FeesRequest,
FeesResponse, FeesResponse,
NodeWebSocketOrderbookChannelConfig,
OrderbookChannel, OrderbookChannel,
OrderbookChannelHandler, OrderbookChannelHandler,
OrderbookChannelSubscriptionOpts, OrderbookChannelSubscriptionOpts,
@ -14,7 +16,6 @@ export {
TokenPairsItem, TokenPairsItem,
TokenPairsRequestOpts, TokenPairsRequestOpts,
TokenTradeInfo, TokenTradeInfo,
WebSocketOrderbookChannelConfig,
} from './types'; } from './types';
export { Order, SignedOrder } from '@0xproject/types'; export { Order, SignedOrder } from '@0xproject/types';

View File

@ -1,18 +1,17 @@
import { assert } from '@0xproject/assert';
import { schemas } from '@0xproject/json-schemas';
import * as _ from 'lodash'; import * as _ from 'lodash';
import * as WebSocket from 'websocket'; import * as WebSocket from 'websocket';
import { schemas as clientSchemas } from './schemas/schemas'; import { schemas as clientSchemas } from './schemas/schemas';
import { import {
NodeWebSocketOrderbookChannelConfig,
OrderbookChannel, OrderbookChannel,
OrderbookChannelHandler, OrderbookChannelHandler,
OrderbookChannelMessageTypes, OrderbookChannelMessageTypes,
OrderbookChannelSubscriptionOpts, OrderbookChannelSubscriptionOpts,
WebsocketClientEventType, WebsocketClientEventType,
WebsocketConnectionEventType, WebsocketConnectionEventType,
WebSocketOrderbookChannelConfig,
} from './types'; } from './types';
import { assert } from './utils/assert';
import { orderbookChannelMessageParser } from './utils/orderbook_channel_message_parser'; import { orderbookChannelMessageParser } from './utils/orderbook_channel_message_parser';
const DEFAULT_HEARTBEAT_INTERVAL_MS = 15000; const DEFAULT_HEARTBEAT_INTERVAL_MS = 15000;
@ -20,9 +19,9 @@ const MINIMUM_HEARTBEAT_INTERVAL_MS = 10;
/** /**
* This class includes all the functionality related to interacting with a websocket endpoint * This class includes all the functionality related to interacting with a websocket endpoint
* that implements the standard relayer API v0 * that implements the standard relayer API v0 in a node environment
*/ */
export class WebSocketOrderbookChannel implements OrderbookChannel { export class NodeWebSocketOrderbookChannel implements OrderbookChannel {
private _apiEndpointUrl: string; private _apiEndpointUrl: string;
private _client: WebSocket.client; private _client: WebSocket.client;
private _connectionIfExists?: WebSocket.connection; private _connectionIfExists?: WebSocket.connection;
@ -30,15 +29,15 @@ export class WebSocketOrderbookChannel implements OrderbookChannel {
private _subscriptionCounter = 0; private _subscriptionCounter = 0;
private _heartbeatIntervalMs: number; private _heartbeatIntervalMs: number;
/** /**
* Instantiates a new WebSocketOrderbookChannel instance * Instantiates a new NodeWebSocketOrderbookChannelConfig instance
* @param url The relayer API base WS url you would like to interact with * @param url The relayer API base WS url you would like to interact with
* @param config The configuration object. Look up the type for the description. * @param config The configuration object. Look up the type for the description.
* @return An instance of WebSocketOrderbookChannel * @return An instance of NodeWebSocketOrderbookChannelConfig
*/ */
constructor(url: string, config?: WebSocketOrderbookChannelConfig) { constructor(url: string, config?: NodeWebSocketOrderbookChannelConfig) {
assert.isUri('url', url); assert.isUri('url', url);
if (!_.isUndefined(config)) { if (!_.isUndefined(config)) {
assert.doesConformToSchema('config', config, clientSchemas.webSocketOrderbookChannelConfigSchema); assert.doesConformToSchema('config', config, clientSchemas.nodeWebSocketOrderbookChannelConfigSchema);
} }
this._apiEndpointUrl = url; this._apiEndpointUrl = url;
this._heartbeatIntervalMs = this._heartbeatIntervalMs =
@ -55,15 +54,8 @@ export class WebSocketOrderbookChannel implements OrderbookChannel {
* channel updates * channel updates
*/ */
public subscribe(subscriptionOpts: OrderbookChannelSubscriptionOpts, handler: OrderbookChannelHandler): void { public subscribe(subscriptionOpts: OrderbookChannelSubscriptionOpts, handler: OrderbookChannelHandler): void {
assert.doesConformToSchema( assert.isOrderbookChannelSubscriptionOpts('subscriptionOpts', subscriptionOpts);
'subscriptionOpts', assert.isOrderbookChannelHandler('handler', handler);
subscriptionOpts,
schemas.relayerApiOrderbookChannelSubscribePayload,
);
assert.isFunction('handler.onSnapshot', _.get(handler, 'onSnapshot'));
assert.isFunction('handler.onUpdate', _.get(handler, 'onUpdate'));
assert.isFunction('handler.onError', _.get(handler, 'onError'));
assert.isFunction('handler.onClose', _.get(handler, 'onClose'));
this._subscriptionCounter += 1; this._subscriptionCounter += 1;
const subscribeMessage = { const subscribeMessage = {
type: 'subscribe', type: 'subscribe',

View File

@ -1,5 +1,5 @@
export const webSocketOrderbookChannelConfigSchema = { export const nodeWebSocketOrderbookChannelConfigSchema = {
id: '/WebSocketOrderbookChannelConfig', id: '/NodeWebSocketOrderbookChannelConfig',
type: 'object', type: 'object',
properties: { properties: {
heartbeatIntervalMs: { heartbeatIntervalMs: {

View File

@ -1,15 +1,15 @@
import { feesRequestSchema } from './fees_request_schema'; import { feesRequestSchema } from './fees_request_schema';
import { nodeWebSocketOrderbookChannelConfigSchema } from './node_websocket_orderbook_channel_config_schema';
import { orderBookRequestSchema } from './orderbook_request_schema'; import { orderBookRequestSchema } from './orderbook_request_schema';
import { ordersRequestOptsSchema } from './orders_request_opts_schema'; import { ordersRequestOptsSchema } from './orders_request_opts_schema';
import { pagedRequestOptsSchema } from './paged_request_opts_schema'; import { pagedRequestOptsSchema } from './paged_request_opts_schema';
import { tokenPairsRequestOptsSchema } from './token_pairs_request_opts_schema'; import { tokenPairsRequestOptsSchema } from './token_pairs_request_opts_schema';
import { webSocketOrderbookChannelConfigSchema } from './websocket_orderbook_channel_config_schema';
export const schemas = { export const schemas = {
feesRequestSchema, feesRequestSchema,
nodeWebSocketOrderbookChannelConfigSchema,
orderBookRequestSchema, orderBookRequestSchema,
ordersRequestOptsSchema, ordersRequestOptsSchema,
pagedRequestOptsSchema, pagedRequestOptsSchema,
tokenPairsRequestOptsSchema, tokenPairsRequestOptsSchema,
webSocketOrderbookChannelConfigSchema,
}; };

View File

@ -18,7 +18,7 @@ export interface OrderbookChannel {
/** /**
* heartbeatInterval: Interval in milliseconds that the orderbook channel should ping the underlying websocket. Default: 15000 * heartbeatInterval: Interval in milliseconds that the orderbook channel should ping the underlying websocket. Default: 15000
*/ */
export interface WebSocketOrderbookChannelConfig { export interface NodeWebSocketOrderbookChannelConfig {
heartbeatIntervalMs?: number; heartbeatIntervalMs?: number;
} }

View File

@ -0,0 +1,25 @@
import { assert as sharedAssert } from '@0xproject/assert';
// We need those two unused imports because they're actually used by sharedAssert which gets injected here
// tslint:disable-next-line:no-unused-variable
import { Schema, schemas } from '@0xproject/json-schemas';
// tslint:disable-next-line:no-unused-variable
import { ECSignature } from '@0xproject/types';
import { BigNumber } from '@0xproject/utils';
import * as _ from 'lodash';
export const assert = {
...sharedAssert,
isOrderbookChannelSubscriptionOpts(variableName: string, subscriptionOpts: any): void {
sharedAssert.doesConformToSchema(
'subscriptionOpts',
subscriptionOpts,
schemas.relayerApiOrderbookChannelSubscribePayload,
);
},
isOrderbookChannelHandler(variableName: string, handler: any): void {
sharedAssert.isFunction(`${variableName}.onSnapshot`, _.get(handler, 'onSnapshot'));
sharedAssert.isFunction(`${variableName}.onUpdate`, _.get(handler, 'onUpdate'));
sharedAssert.isFunction(`${variableName}.onError`, _.get(handler, 'onError'));
sharedAssert.isFunction(`${variableName}.onClose`, _.get(handler, 'onClose'));
},
};

View File

@ -8,10 +8,16 @@ import { relayerResponseJsonParsers } from './relayer_response_json_parsers';
export const orderbookChannelMessageParser = { export const orderbookChannelMessageParser = {
parse(utf8Data: string): OrderbookChannelMessage { parse(utf8Data: string): OrderbookChannelMessage {
// parse the message
const messageObj = JSON.parse(utf8Data); const messageObj = JSON.parse(utf8Data);
// ensure we have a type parameter to switch on
const type: string = _.get(messageObj, 'type'); const type: string = _.get(messageObj, 'type');
assert.assert(!_.isUndefined(type), `Message is missing a type parameter: ${utf8Data}`); assert.assert(!_.isUndefined(type), `Message is missing a type parameter: ${utf8Data}`);
assert.isString('type', type); assert.isString('type', type);
// ensure we have a request id for the resulting message
const requestId: number = _.get(messageObj, 'requestId');
assert.assert(!_.isUndefined(requestId), `Message is missing a requestId parameter: ${utf8Data}`);
assert.isNumber('requestId', requestId);
switch (type) { switch (type) {
case OrderbookChannelMessageTypes.Snapshot: { case OrderbookChannelMessageTypes.Snapshot: {
assert.doesConformToSchema('message', messageObj, schemas.relayerApiOrderbookChannelSnapshotSchema); assert.doesConformToSchema('message', messageObj, schemas.relayerApiOrderbookChannelSnapshotSchema);
@ -28,7 +34,7 @@ export const orderbookChannelMessageParser = {
default: { default: {
return { return {
type: OrderbookChannelMessageTypes.Unknown, type: OrderbookChannelMessageTypes.Unknown,
requestId: 0, requestId,
payload: undefined, payload: undefined,
}; };
} }

View File

@ -0,0 +1,61 @@
import * as chai from 'chai';
import * as dirtyChai from 'dirty-chai';
import * as _ from 'lodash';
import 'mocha';
import { BrowserWebSocketOrderbookChannel } from '../src/browser_ws_orderbook_channel';
chai.config.includeStack = true;
chai.use(dirtyChai);
const expect = chai.expect;
describe('BrowserWebSocketOrderbookChannel', () => {
const websocketUrl = 'ws://localhost:8080';
const orderbookChannel = new BrowserWebSocketOrderbookChannel(websocketUrl);
const subscriptionOpts = {
baseTokenAddress: '0x323b5d4c32345ced77393b3530b1eed0f346429d',
quoteTokenAddress: '0xef7fff64389b814a946f3e92105513705ca6b990',
snapshot: true,
limit: 100,
};
const emptyOrderbookChannelHandler = {
onSnapshot: () => {
_.noop();
},
onUpdate: () => {
_.noop();
},
onError: () => {
_.noop();
},
onClose: () => {
_.noop();
},
};
describe('#subscribe', () => {
it('throws when subscriptionOpts does not conform to schema', () => {
const badSubscribeCall = orderbookChannel.subscribe.bind(
orderbookChannel,
{},
emptyOrderbookChannelHandler,
);
expect(badSubscribeCall).throws(
'Expected subscriptionOpts to conform to schema /RelayerApiOrderbookChannelSubscribePayload\nEncountered: {}\nValidation errors: instance requires property "baseTokenAddress", instance requires property "quoteTokenAddress"',
);
});
it('throws when handler has the incorrect members', () => {
const badSubscribeCall = orderbookChannel.subscribe.bind(orderbookChannel, subscriptionOpts, {});
expect(badSubscribeCall).throws(
'Expected handler.onSnapshot to be of type function, encountered: undefined',
);
});
it('does not throw when inputs are of correct types', () => {
const goodSubscribeCall = orderbookChannel.subscribe.bind(
orderbookChannel,
subscriptionOpts,
emptyOrderbookChannelHandler,
);
expect(goodSubscribeCall).to.not.throw();
});
});
});

View File

@ -3,15 +3,15 @@ import * as dirtyChai from 'dirty-chai';
import * as _ from 'lodash'; import * as _ from 'lodash';
import 'mocha'; import 'mocha';
import { WebSocketOrderbookChannel } from '../src/ws_orderbook_channel'; import { NodeWebSocketOrderbookChannel } from '../src/node_ws_orderbook_channel';
chai.config.includeStack = true; chai.config.includeStack = true;
chai.use(dirtyChai); chai.use(dirtyChai);
const expect = chai.expect; const expect = chai.expect;
describe('WebSocketOrderbookChannel', () => { describe('NodeWebSocketOrderbookChannel', () => {
const websocketUrl = 'ws://localhost:8080'; const websocketUrl = 'ws://localhost:8080';
const orderbookChannel = new WebSocketOrderbookChannel(websocketUrl); const orderbookChannel = new NodeWebSocketOrderbookChannel(websocketUrl);
const subscriptionOpts = { const subscriptionOpts = {
baseTokenAddress: '0x323b5d4c32345ced77393b3530b1eed0f346429d', baseTokenAddress: '0x323b5d4c32345ced77393b3530b1eed0f346429d',
quoteTokenAddress: '0xef7fff64389b814a946f3e92105513705ca6b990', quoteTokenAddress: '0xef7fff64389b814a946f3e92105513705ca6b990',

View File

@ -548,10 +548,11 @@
version "1.0.2" version "1.0.2"
resolved "https://registry.yarnpkg.com/@types/valid-url/-/valid-url-1.0.2.tgz#60fa435ce24bfd5ba107b8d2a80796aeaf3a8f45" resolved "https://registry.yarnpkg.com/@types/valid-url/-/valid-url-1.0.2.tgz#60fa435ce24bfd5ba107b8d2a80796aeaf3a8f45"
"@types/websocket@^0.0.34": "@types/websocket@^0.0.39":
version "0.0.34" version "0.0.39"
resolved "https://registry.yarnpkg.com/@types/websocket/-/websocket-0.0.34.tgz#25596764cec885eda070fdb6d19cd76fe582747c" resolved "https://registry.yarnpkg.com/@types/websocket/-/websocket-0.0.39.tgz#aa971e24f9c1455fe2a57ee3e69c7d395016b12a"
dependencies: dependencies:
"@types/events" "*"
"@types/node" "*" "@types/node" "*"
"@types/yargs@^10.0.0": "@types/yargs@^10.0.0":