Merge pull request #825 from 0xProject/fix-order-watcher

OrderWatcher Fixes
This commit is contained in:
Fabio Berger
2018-07-06 13:06:12 +02:00
committed by GitHub
7 changed files with 96 additions and 181 deletions

View File

@@ -3,7 +3,12 @@
"version": "0.0.7",
"changes": [
{
"note": "Dependencies updated"
"note": "Switch out simple getLogs polling with ethereumjs-blockstream",
"pr": 825
},
{
"note": "Do not stop subscription if error is encountered",
"pr": 825
}
]
},

View File

@@ -87,6 +87,7 @@
"@0xproject/typescript-typings": "^0.4.1",
"@0xproject/utils": "^0.7.2",
"@0xproject/web3-wrapper": "^0.7.1",
"ethereumjs-blockstream": "5.0.0",
"ethereum-types": "^0.0.2",
"bintrees": "^1.0.2",
"ethers": "3.0.22",

View File

@@ -1,6 +1,7 @@
import { BlockParamLiteral, LogEntry } from '@0xproject/types';
import { intervalUtils } from '@0xproject/utils';
import { intervalUtils, logUtils } from '@0xproject/utils';
import { Web3Wrapper } from '@0xproject/web3-wrapper';
import { Block, BlockAndLogStreamer, Log } from 'ethereumjs-blockstream';
import * as _ from 'lodash';
import { EventWatcherCallback, OrderWatcherError } from '../types';
@@ -19,81 +20,115 @@ enum LogEventState {
*/
export class EventWatcher {
private _web3Wrapper: Web3Wrapper;
private _blockAndLogStreamerIfExists: BlockAndLogStreamer<Block, Log> | undefined;
private _blockAndLogStreamIntervalIfExists?: NodeJS.Timer;
private _onLogAddedSubscriptionToken: string | undefined;
private _onLogRemovedSubscriptionToken: string | undefined;
private _pollingIntervalMs: number;
private _intervalIdIfExists?: NodeJS.Timer;
private _lastEvents: LogEntry[] = [];
private _stateLayer: BlockParamLiteral;
private _isVerbose: boolean;
constructor(
web3Wrapper: Web3Wrapper,
pollingIntervalIfExistsMs: undefined | number,
stateLayer: BlockParamLiteral = BlockParamLiteral.Latest,
isVerbose: boolean,
) {
this._isVerbose = isVerbose;
this._web3Wrapper = web3Wrapper;
this._stateLayer = stateLayer;
this._pollingIntervalMs = _.isUndefined(pollingIntervalIfExistsMs)
? DEFAULT_EVENT_POLLING_INTERVAL_MS
: pollingIntervalIfExistsMs;
this._blockAndLogStreamerIfExists = undefined;
this._blockAndLogStreamIntervalIfExists = undefined;
this._onLogAddedSubscriptionToken = undefined;
this._onLogRemovedSubscriptionToken = undefined;
}
public subscribe(callback: EventWatcherCallback): void {
assert.isFunction('callback', callback);
if (!_.isUndefined(this._intervalIdIfExists)) {
if (!_.isUndefined(this._blockAndLogStreamIntervalIfExists)) {
throw new Error(OrderWatcherError.SubscriptionAlreadyPresent);
}
this._intervalIdIfExists = intervalUtils.setAsyncExcludingInterval(
this._pollForBlockchainEventsAsync.bind(this, callback),
this._pollingIntervalMs,
(err: Error) => {
this.unsubscribe();
callback(err);
},
);
this._startBlockAndLogStream(callback);
}
public unsubscribe(): void {
this._lastEvents = [];
if (!_.isUndefined(this._intervalIdIfExists)) {
intervalUtils.clearAsyncExcludingInterval(this._intervalIdIfExists);
delete this._intervalIdIfExists;
if (_.isUndefined(this._blockAndLogStreamIntervalIfExists)) {
throw new Error(OrderWatcherError.SubscriptionNotFound);
}
this._stopBlockAndLogStream();
}
private async _pollForBlockchainEventsAsync(callback: EventWatcherCallback): Promise<void> {
const pendingEvents = await this._getEventsAsync();
if (_.isUndefined(pendingEvents)) {
// HACK: This should never happen, but happens frequently on CI due to a ganache bug
return;
private _startBlockAndLogStream(callback: EventWatcherCallback): void {
if (!_.isUndefined(this._blockAndLogStreamerIfExists)) {
throw new Error(OrderWatcherError.SubscriptionAlreadyPresent);
}
if (pendingEvents.length === 0) {
// HACK: Sometimes when node rebuilds the pending block we get back the empty result.
// We don't want to emit a lot of removal events and bring them back after a couple of miliseconds,
// that's why we just ignore those cases.
return;
}
const removedEvents = _.differenceBy(this._lastEvents, pendingEvents, JSON.stringify);
const newEvents = _.differenceBy(pendingEvents, this._lastEvents, JSON.stringify);
await this._emitDifferencesAsync(removedEvents, LogEventState.Removed, callback);
await this._emitDifferencesAsync(newEvents, LogEventState.Added, callback);
this._lastEvents = pendingEvents;
}
private async _getEventsAsync(): Promise<LogEntry[]> {
const eventFilter = {
fromBlock: this._stateLayer,
toBlock: this._stateLayer,
};
const events = await this._web3Wrapper.getLogsAsync(eventFilter);
return events;
this._blockAndLogStreamerIfExists = new BlockAndLogStreamer(
this._web3Wrapper.getBlockAsync.bind(this._web3Wrapper, this._stateLayer),
this._web3Wrapper.getLogsAsync.bind(this._web3Wrapper, eventFilter),
this._onBlockAndLogStreamerError.bind(this),
);
const catchAllLogFilter = {};
this._blockAndLogStreamerIfExists.addLogFilter(catchAllLogFilter);
this._blockAndLogStreamIntervalIfExists = intervalUtils.setAsyncExcludingInterval(
this._reconcileBlockAsync.bind(this),
this._pollingIntervalMs,
this._onBlockAndLogStreamerError.bind(this),
);
let isRemoved = false;
this._onLogAddedSubscriptionToken = this._blockAndLogStreamerIfExists.subscribeToOnLogAdded(
this._onLogStateChangedAsync.bind(this, callback, isRemoved),
);
isRemoved = true;
this._onLogRemovedSubscriptionToken = this._blockAndLogStreamerIfExists.subscribeToOnLogRemoved(
this._onLogStateChangedAsync.bind(this, callback, isRemoved),
);
}
private _stopBlockAndLogStream(): void {
if (_.isUndefined(this._blockAndLogStreamerIfExists)) {
throw new Error(OrderWatcherError.SubscriptionNotFound);
}
this._blockAndLogStreamerIfExists.unsubscribeFromOnLogAdded(this._onLogAddedSubscriptionToken as string);
this._blockAndLogStreamerIfExists.unsubscribeFromOnLogRemoved(this._onLogRemovedSubscriptionToken as string);
intervalUtils.clearAsyncExcludingInterval(this._blockAndLogStreamIntervalIfExists as NodeJS.Timer);
delete this._blockAndLogStreamerIfExists;
delete this._blockAndLogStreamIntervalIfExists;
}
private async _onLogStateChangedAsync(
callback: EventWatcherCallback,
isRemoved: boolean,
log: LogEntry,
): Promise<void> {
await this._emitDifferencesAsync(log, isRemoved ? LogEventState.Removed : LogEventState.Added, callback);
}
private async _reconcileBlockAsync(): Promise<void> {
const latestBlock = await this._web3Wrapper.getBlockAsync(BlockParamLiteral.Latest);
// We need to coerce to Block type cause Web3.Block includes types for mempool blocks
if (!_.isUndefined(this._blockAndLogStreamerIfExists)) {
// If we clear the interval while fetching the block - this._blockAndLogStreamer will be undefined
await this._blockAndLogStreamerIfExists.reconcileNewBlock((latestBlock as any) as Block);
}
}
private async _emitDifferencesAsync(
logs: LogEntry[],
log: LogEntry,
logEventState: LogEventState,
callback: EventWatcherCallback,
): Promise<void> {
for (const log of logs) {
const logEvent = {
removed: logEventState === LogEventState.Removed,
...log,
};
if (!_.isUndefined(this._intervalIdIfExists)) {
callback(null, logEvent);
}
const logEvent = {
removed: logEventState === LogEventState.Removed,
...log,
};
if (!_.isUndefined(this._blockAndLogStreamIntervalIfExists)) {
callback(null, logEvent);
}
}
private _onBlockAndLogStreamerError(err: Error): void {
// Since Blockstream errors are all recoverable, we simply log them if the verbose
// config is passed in.
if (this._isVerbose) {
logUtils.warn(err);
}
}
}

View File

@@ -93,7 +93,8 @@ export class OrderWatcher {
const pollingIntervalIfExistsMs = _.isUndefined(config) ? undefined : config.eventPollingIntervalMs;
const stateLayer =
_.isUndefined(config) || _.isUndefined(config.stateLayer) ? BlockParamLiteral.Latest : config.stateLayer;
this._eventWatcher = new EventWatcher(this._web3Wrapper, pollingIntervalIfExistsMs, stateLayer);
const isVerbose = !_.isUndefined(config) && !_.isUndefined(config.isVerbose) ? config.isVerbose : false;
this._eventWatcher = new EventWatcher(this._web3Wrapper, pollingIntervalIfExistsMs, stateLayer, isVerbose);
this._balanceAndProxyAllowanceLazyStore = new BalanceAndProxyAllowanceLazyStore(
this._contractWrappers.token,
stateLayer,
@@ -236,7 +237,6 @@ export class OrderWatcher {
if (!_.isNull(err)) {
if (!_.isUndefined(this._callbackIfExists)) {
this._callbackIfExists(err);
this.unsubscribe();
}
return;
}

View File

@@ -16,11 +16,12 @@ export type EventWatcherCallback = (err: null | Error, log?: LogEntryEvent) => v
* stateLayer: Optional blockchain state layer OrderWatcher will monitor for new events. Default=latest.
*/
export interface OrderWatcherConfig {
stateLayer: BlockParamLiteral;
orderExpirationCheckingIntervalMs?: number;
eventPollingIntervalMs?: number;
expirationMarginMs?: number;
cleanupJobIntervalMs?: number;
stateLayer: BlockParamLiteral;
isVerbose?: boolean;
}
export type OnOrderStateChangeCallback = (err: Error | null, orderState?: OrderState) => void;

View File

@@ -1,126 +0,0 @@
import { callbackErrorReporter } from '@0xproject/dev-utils';
import { DoneCallback, LogEntry, LogEntryEvent } from '@0xproject/types';
import { Web3Wrapper } from '@0xproject/web3-wrapper';
import * as chai from 'chai';
import * as _ from 'lodash';
import 'mocha';
import * as Sinon from 'sinon';
import { EventWatcher } from '../src/order_watcher/event_watcher';
import { chaiSetup } from './utils/chai_setup';
import { provider } from './utils/web3_wrapper';
chaiSetup.configure();
const expect = chai.expect;
describe('EventWatcher', () => {
let stubs: Sinon.SinonStub[] = [];
let eventWatcher: EventWatcher;
let web3Wrapper: Web3Wrapper;
const logA: LogEntry = {
address: '0x71d271f8b14adef568f8f28f1587ce7271ac4ca5',
blockHash: null,
blockNumber: null,
data: '',
logIndex: null,
topics: [],
transactionHash: '0x004881d38cd4a8f72f1a0d68c8b9b8124504706041ff37019c1d1ed6bfda8e17',
transactionIndex: 0,
};
const logB: LogEntry = {
address: '0x8d12a197cb00d4747a1fe03395095ce2a5cc6819',
blockHash: null,
blockNumber: null,
data: '',
logIndex: null,
topics: ['0xf341246adaac6f497bc2a656f546ab9e182111d630394f0c57c710a59a2cb567'],
transactionHash: '0x01ef3c048b18d9b09ea195b4ed94cf8dd5f3d857a1905ff886b152cfb1166f25',
transactionIndex: 0,
};
const logC: LogEntry = {
address: '0x1d271f8b174adef58f1587ce68f8f27271ac4ca5',
blockHash: null,
blockNumber: null,
data: '',
logIndex: null,
topics: ['0xf341246adaac6f497bc2a656f546ab9e182111d630394f0c57c710a59a2cb567'],
transactionHash: '0x01ef3c048b18d9b09ea195b4ed94cf8dd5f3d857a1905ff886b152cfb1166f25',
transactionIndex: 0,
};
before(async () => {
const pollingIntervalMs = 10;
web3Wrapper = new Web3Wrapper(provider);
eventWatcher = new EventWatcher(web3Wrapper, pollingIntervalMs);
});
afterEach(() => {
// clean up any stubs after the test has completed
_.each(stubs, s => s.restore());
stubs = [];
eventWatcher.unsubscribe();
});
it('correctly emits initial log events', (done: DoneCallback) => {
const logs: LogEntry[] = [logA, logB];
const expectedLogEvents = [
{
removed: false,
...logA,
},
{
removed: false,
...logB,
},
];
const getLogsStub = Sinon.stub(web3Wrapper, 'getLogsAsync');
getLogsStub.onCall(0).returns(logs);
stubs.push(getLogsStub);
const expectedToBeCalledOnce = false;
const callback = callbackErrorReporter.reportNodeCallbackErrors(done, expectedToBeCalledOnce)(
(event: LogEntryEvent) => {
const expectedLogEvent = expectedLogEvents.shift();
expect(event).to.be.deep.equal(expectedLogEvent);
if (_.isEmpty(expectedLogEvents)) {
done();
}
},
);
eventWatcher.subscribe(callback);
});
it('correctly computes the difference and emits only changes', (done: DoneCallback) => {
const initialLogs: LogEntry[] = [logA, logB];
const changedLogs: LogEntry[] = [logA, logC];
const expectedLogEvents = [
{
removed: false,
...logA,
},
{
removed: false,
...logB,
},
{
removed: true,
...logB,
},
{
removed: false,
...logC,
},
];
const getLogsStub = Sinon.stub(web3Wrapper, 'getLogsAsync');
getLogsStub.onCall(0).returns(initialLogs);
getLogsStub.onCall(1).returns(changedLogs);
stubs.push(getLogsStub);
const expectedToBeCalledOnce = false;
const callback = callbackErrorReporter.reportNodeCallbackErrors(done, expectedToBeCalledOnce)(
(event: LogEntryEvent) => {
const expectedLogEvent = expectedLogEvents.shift();
expect(event).to.be.deep.equal(expectedLogEvent);
if (_.isEmpty(expectedLogEvents)) {
done();
}
},
);
eventWatcher.subscribe(callback);
});
});

View File

@@ -46,16 +46,15 @@ describe('OrderWatcher', () => {
let taker: string;
let signedOrder: SignedOrder;
let orderWatcher: OrderWatcher;
const config = {
networkId: constants.TESTRPC_NETWORK_ID,
};
const decimals = constants.ZRX_DECIMALS;
const fillableAmount = Web3Wrapper.toBaseUnitAmount(new BigNumber(5), decimals);
before(async () => {
contractWrappers = new ContractWrappers(provider, config);
// tslint:disable-next-line:no-unused-variable
const networkId = await web3Wrapper.getNetworkIdAsync();
orderWatcher = new OrderWatcher(provider, constants.TESTRPC_NETWORK_ID);
const config = {
networkId,
};
contractWrappers = new ContractWrappers(provider, config);
orderWatcher = new OrderWatcher(provider, networkId);
exchangeContractAddress = contractWrappers.exchange.getContractAddress();
userAddresses = await web3Wrapper.getAvailableAddressesAsync();
[, maker, taker] = userAddresses;