Add initial implementation of expiration watcher

This commit is contained in:
Leonid Logvinov
2017-11-15 14:54:56 -06:00
parent bc61b92070
commit 88d020f9f2
4 changed files with 173 additions and 2 deletions

View File

@@ -0,0 +1,58 @@
import * as _ from 'lodash';
import {BigNumber} from 'bignumber.js';
import {utils} from '../utils/utils';
import {intervalUtils} from '../utils/interval_utils';
import {SignedOrder} from '../types';
import {Heap} from '../utils/heap';
import {ZeroEx} from '../0x';
// Order prunning is very fast
const DEFAULT_ORDER_EXPIRATION_CHECKING_INTERVAL_MS = 50;
/**
* This class includes all the functionality related to prunning expired orders
*/
export class ExpirationWatcher {
private orderHashHeapByExpiration: Heap<string>;
private expiration: {[orderHash: string]: BigNumber};
private callbackIfExists?: (orderHash: string) => void;
private orderExpirationCheckingIntervalMs: number;
private orderExpirationCheckingIntervalIdIfExists?: NodeJS.Timer;
constructor(orderExpirationCheckingIntervalMs?: number) {
this.orderExpirationCheckingIntervalMs = orderExpirationCheckingIntervalMs ||
DEFAULT_ORDER_EXPIRATION_CHECKING_INTERVAL_MS;
this.expiration = {};
const scoreFunction = ((orderHash: string) => {
return this.expiration[orderHash].toNumber();
}).bind(this);
this.orderHashHeapByExpiration = new Heap(scoreFunction);
}
public subscribe(callback: (orderHash: string) => void): void {
this.callbackIfExists = callback;
this.orderExpirationCheckingIntervalIdIfExists = intervalUtils.setAsyncExcludingInterval(
this.pruneExpiredOrders.bind(this), this.orderExpirationCheckingIntervalMs,
);
}
public unsubscribe(): void {
intervalUtils.clearAsyncExcludingInterval(this.orderExpirationCheckingIntervalIdIfExists as NodeJS.Timer);
delete this.callbackIfExists;
}
public addOrder(orderHash: string, expirationUnixTimestampSec: BigNumber): void {
this.expiration[orderHash] = expirationUnixTimestampSec;
// We don't remove hashes on order remove because it's slow (linear).
// We just skip them later if the order was already removed from the order watcher.
this.orderHashHeapByExpiration.push(orderHash);
}
private pruneExpiredOrders(): void {
const currentUnixTimestampSec = utils.getCurrentUnixTimestamp();
while (
this.orderHashHeapByExpiration.size() !== 0 &&
this.expiration[this.orderHashHeapByExpiration.head()].lessThan(currentUnixTimestampSec) &&
!_.isUndefined(this.callbackIfExists)
) {
const orderHash = this.orderHashHeapByExpiration.pop();
delete this.expiration[orderHash];
this.callbackIfExists(orderHash);
}
}
}

View File

@@ -6,6 +6,7 @@ import {assert} from '../utils/assert';
import {utils} from '../utils/utils';
import {artifacts} from '../artifacts';
import {AbiDecoder} from '../utils/abi_decoder';
import {intervalUtils} from '../utils/interval_utils';
import {OrderStateUtils} from '../utils/order_state_utils';
import {
LogEvent,
@@ -24,14 +25,14 @@ import {
ExchangeEvents,
TokenEvents,
ZeroExError,
ExchangeContractErrs,
} from '../types';
import {Web3Wrapper} from '../web3_wrapper';
import {TokenWrapper} from '../contract_wrappers/token_wrapper';
import {ExchangeWrapper} from '../contract_wrappers/exchange_wrapper';
import {OrderFilledCancelledLazyStore} from '../stores/order_filled_cancelled_lazy_store';
import {BalanceAndProxyAllowanceLazyStore} from '../stores/balance_proxy_allowance_lazy_store';
const DEFAULT_NUM_CONFIRMATIONS = 0;
import {ExpirationWatcher} from './expiration_watcher';
interface DependentOrderHashes {
[makerAddress: string]: {
@@ -56,6 +57,7 @@ export class OrderStateWatcher {
private _eventWatcher: EventWatcher;
private _web3Wrapper: Web3Wrapper;
private _abiDecoder: AbiDecoder;
private _expirationWatcher: ExpirationWatcher;
private _orderStateUtils: OrderStateUtils;
private _orderFilledCancelledLazyStore: OrderFilledCancelledLazyStore;
private _balanceAndProxyAllowanceLazyStore: BalanceAndProxyAllowanceLazyStore;
@@ -72,6 +74,10 @@ export class OrderStateWatcher {
this._orderStateUtils = new OrderStateUtils(
this._balanceAndProxyAllowanceLazyStore, this._orderFilledCancelledLazyStore,
);
const orderExpirationCheckingIntervalMs = _.isUndefined(config) ?
undefined :
config.orderExpirationCheckingIntervalMs;
this._expirationWatcher = new ExpirationWatcher(orderExpirationCheckingIntervalMs);
}
/**
* Add an order to the orderStateWatcher. Before the order is added, it's
@@ -84,6 +90,8 @@ export class OrderStateWatcher {
assert.isValidSignature(orderHash, signedOrder.ecSignature, signedOrder.maker);
this._orderByOrderHash[orderHash] = signedOrder;
this.addToDependentOrderHashes(signedOrder, orderHash);
// We don't remove orders from expirationWatcher because heap removal is linear. We just skip it later
this._expirationWatcher.addOrder(orderHash, signedOrder.expirationUnixTimestampSec);
}
/**
* Removes an order from the orderStateWatcher
@@ -111,6 +119,7 @@ export class OrderStateWatcher {
}
this._callbackIfExistsAsync = callback;
this._eventWatcher.subscribe(this._onEventWatcherCallbackAsync.bind(this));
this._expirationWatcher.subscribe(this._onOrderExpired.bind(this));
}
/**
* Ends an orderStateWatcher subscription.
@@ -123,6 +132,18 @@ export class OrderStateWatcher {
this._orderFilledCancelledLazyStore.deleteAll();
delete this._callbackIfExistsAsync;
this._eventWatcher.unsubscribe();
this._expirationWatcher.unsubscribe();
}
private _onOrderExpired(orderHash: string): void {
const orderState: OrderState = {
isValid: false,
orderHash,
error: ExchangeContractErrs.OrderFillExpired,
};
if (!_.isUndefined(this._orderByOrderHash[orderHash])) {
// We need this check because we never remove the orders from expiration watcher
(this._callbackIfExistsAsync as OnOrderStateChangeCallback)(orderState);
}
}
private async _onEventWatcherCallbackAsync(log: LogEvent): Promise<void> {
const maybeDecodedLog = this._abiDecoder.tryToDecodeLogOrNoop(log);

View File

@@ -397,9 +397,11 @@ export interface JSONRPCPayload {
}
/*
* orderExpirationCheckingIntervalMs: How often to check for expired orders
* eventPollingIntervalMs: How often to poll the Ethereum node for new events
*/
export interface OrderStateWatcherConfig {
orderExpirationCheckingIntervalMs?: number;
eventPollingIntervalMs?: number;
}

View File

@@ -0,0 +1,90 @@
// Based on Original JavaScript Code from Marijn Haverbeke (http://eloquentjavascript.net/1st_edition/appendix2.html)
export class Heap<T> {
private content: T[];
private scoreFunction: (x: T) => number;
constructor(scoreFunction: (x: T) => number) {
this.content = [];
this.scoreFunction = scoreFunction;
}
public push(element: T) {
this.content.push(element);
this.bubbleUp(this.content.length - 1);
}
public size(): number {
const size = this.content.length;
return size;
}
public head(): T {
const head = this.content[0];
return head;
}
public pop(): T {
const head = this.content[0];
const end = this.content.pop();
if (this.content.length > 0) {
this.content[0] = end as T;
this.sinkDown(0);
}
return head;
}
private bubbleUp(n: number) {
// Fetch the element that has to be moved.
const element = this.content[n];
const score = this.scoreFunction(element);
// When at 0, an element can not go up any further.
while (n > 0) {
// Compute the parent element's index, and fetch it.
const parentN = Math.floor((n + 1) / 2) - 1;
const parent = this.content[parentN];
// If the parent has a lesser score, things are in order and we
// are done.
if (score >= this.scoreFunction(parent)) {
break;
}
// Otherwise, swap the parent with the current element and
// continue.
this.content[parentN] = element;
this.content[n] = parent;
n = parentN;
}
}
private sinkDown(n: number) {
// Look up the target element and its score.
const length = this.content.length;
const element = this.content[n];
const elemScore = this.scoreFunction(element);
while (true) {
// Compute the indices of the child elements.
const child2N = (n + 1) * 2;
const child1N = child2N - 1;
// This is used to store the new position of the element, if any.
let swap = n;
let child1Score;
let child2Score;
// If the first child exists (is inside the array)...
if (child1N < length) {
// Look it up and compute its score.
const child1 = this.content[child1N];
child1Score = this.scoreFunction(child1);
// If the score is less than our element's, we need to swap.
if (child1Score < elemScore) {
swap = child1N;
}
// Do the same checks for the other child.
if (child2N < length) {
const child2 = this.content[child2N];
child2Score = this.scoreFunction(child2);
if (child2Score < (swap == null ? elemScore : child1Score)) {
swap = child2N;
}
}
}
this.content[n] = this.content[swap];
this.content[swap] = element;
n = swap;
}
}
}