Stop subscriptions from unsubscribing on recoverable network issues
This commit is contained in:
parent
03a6a088c5
commit
b750ce8be6
@ -1,5 +1,5 @@
|
|||||||
import { ContractArtifact } from '@0xproject/sol-compiler';
|
import { ContractArtifact } from '@0xproject/sol-compiler';
|
||||||
import { AbiDecoder, intervalUtils } from '@0xproject/utils';
|
import { AbiDecoder, intervalUtils, logUtils } from '@0xproject/utils';
|
||||||
import { Web3Wrapper } from '@0xproject/web3-wrapper';
|
import { Web3Wrapper } from '@0xproject/web3-wrapper';
|
||||||
import { BlockParamLiteral, ContractAbi, FilterObject, LogEntry, LogWithDecodedArgs, RawLog } from 'ethereum-types';
|
import { BlockParamLiteral, ContractAbi, FilterObject, LogEntry, LogWithDecodedArgs, RawLog } from 'ethereum-types';
|
||||||
import { Block, BlockAndLogStreamer, Log } from 'ethereumjs-blockstream';
|
import { Block, BlockAndLogStreamer, Log } from 'ethereumjs-blockstream';
|
||||||
@ -41,6 +41,13 @@ export abstract class ContractWrapper {
|
|||||||
};
|
};
|
||||||
private _onLogAddedSubscriptionToken: string | undefined;
|
private _onLogAddedSubscriptionToken: string | undefined;
|
||||||
private _onLogRemovedSubscriptionToken: string | undefined;
|
private _onLogRemovedSubscriptionToken: string | undefined;
|
||||||
|
private static _onBlockAndLogStreamerError(isVerbose: boolean, err: Error): void {
|
||||||
|
// Since Blockstream errors are all recoverable, we simply log them if the verbose
|
||||||
|
// config is passed in.
|
||||||
|
if (isVerbose) {
|
||||||
|
logUtils.warn(err);
|
||||||
|
}
|
||||||
|
}
|
||||||
constructor(web3Wrapper: Web3Wrapper, networkId: number, blockPollingIntervalMs?: number) {
|
constructor(web3Wrapper: Web3Wrapper, networkId: number, blockPollingIntervalMs?: number) {
|
||||||
this._web3Wrapper = web3Wrapper;
|
this._web3Wrapper = web3Wrapper;
|
||||||
this._networkId = networkId;
|
this._networkId = networkId;
|
||||||
@ -79,10 +86,11 @@ export abstract class ContractWrapper {
|
|||||||
indexFilterValues: IndexedFilterValues,
|
indexFilterValues: IndexedFilterValues,
|
||||||
abi: ContractAbi,
|
abi: ContractAbi,
|
||||||
callback: EventCallback<ArgsType>,
|
callback: EventCallback<ArgsType>,
|
||||||
|
isVerbose: boolean = false,
|
||||||
): string {
|
): string {
|
||||||
const filter = filterUtils.getFilter(address, eventName, indexFilterValues, abi);
|
const filter = filterUtils.getFilter(address, eventName, indexFilterValues, abi);
|
||||||
if (_.isUndefined(this._blockAndLogStreamerIfExists)) {
|
if (_.isUndefined(this._blockAndLogStreamerIfExists)) {
|
||||||
this._startBlockAndLogStream();
|
this._startBlockAndLogStream(isVerbose);
|
||||||
}
|
}
|
||||||
const filterToken = filterUtils.generateUUID();
|
const filterToken = filterUtils.generateUUID();
|
||||||
this._filters[filterToken] = filter;
|
this._filters[filterToken] = filter;
|
||||||
@ -151,21 +159,21 @@ export abstract class ContractWrapper {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
private _startBlockAndLogStream(): void {
|
private _startBlockAndLogStream(isVerbose: boolean): void {
|
||||||
if (!_.isUndefined(this._blockAndLogStreamerIfExists)) {
|
if (!_.isUndefined(this._blockAndLogStreamerIfExists)) {
|
||||||
throw new Error(ContractWrappersError.SubscriptionAlreadyPresent);
|
throw new Error(ContractWrappersError.SubscriptionAlreadyPresent);
|
||||||
}
|
}
|
||||||
this._blockAndLogStreamerIfExists = new BlockAndLogStreamer(
|
this._blockAndLogStreamerIfExists = new BlockAndLogStreamer(
|
||||||
this._web3Wrapper.getBlockAsync.bind(this._web3Wrapper),
|
this._web3Wrapper.getBlockAsync.bind(this._web3Wrapper),
|
||||||
this._web3Wrapper.getLogsAsync.bind(this._web3Wrapper),
|
this._web3Wrapper.getLogsAsync.bind(this._web3Wrapper),
|
||||||
this._onBlockAndLogStreamerError.bind(this),
|
ContractWrapper._onBlockAndLogStreamerError.bind(this, isVerbose),
|
||||||
);
|
);
|
||||||
const catchAllLogFilter = {};
|
const catchAllLogFilter = {};
|
||||||
this._blockAndLogStreamerIfExists.addLogFilter(catchAllLogFilter);
|
this._blockAndLogStreamerIfExists.addLogFilter(catchAllLogFilter);
|
||||||
this._blockAndLogStreamIntervalIfExists = intervalUtils.setAsyncExcludingInterval(
|
this._blockAndLogStreamIntervalIfExists = intervalUtils.setAsyncExcludingInterval(
|
||||||
this._reconcileBlockAsync.bind(this),
|
this._reconcileBlockAsync.bind(this),
|
||||||
this._blockPollingIntervalMs,
|
this._blockPollingIntervalMs,
|
||||||
this._onReconcileBlockError.bind(this),
|
ContractWrapper._onBlockAndLogStreamerError.bind(this, isVerbose),
|
||||||
);
|
);
|
||||||
let isRemoved = false;
|
let isRemoved = false;
|
||||||
this._onLogAddedSubscriptionToken = this._blockAndLogStreamerIfExists.subscribeToOnLogAdded(
|
this._onLogAddedSubscriptionToken = this._blockAndLogStreamerIfExists.subscribeToOnLogAdded(
|
||||||
@ -176,20 +184,10 @@ export abstract class ContractWrapper {
|
|||||||
this._onLogStateChanged.bind(this, isRemoved),
|
this._onLogStateChanged.bind(this, isRemoved),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
private _onBlockAndLogStreamerError(err: Error): void {
|
// HACK: This should be a package-scoped method (which doesn't exist in TS)
|
||||||
// Propogate all Blockstream subscriber errors to all
|
// We don't want this method available in the public interface for all classes
|
||||||
// top-level subscriptions
|
// who inherit from ContractWrapper, and it is only used by the internal implementation
|
||||||
const filterCallbacks = _.values(this._filterCallbacks);
|
// of those higher classes.
|
||||||
_.each(filterCallbacks, filterCallback => {
|
|
||||||
filterCallback(err);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
private _onReconcileBlockError(err: Error): void {
|
|
||||||
const filterTokens = _.keys(this._filterCallbacks);
|
|
||||||
_.each(filterTokens, filterToken => {
|
|
||||||
this._unsubscribe(filterToken, err);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
// tslint:disable-next-line:no-unused-variable
|
// tslint:disable-next-line:no-unused-variable
|
||||||
private _setNetworkId(networkId: number): void {
|
private _setNetworkId(networkId: number): void {
|
||||||
this._networkId = networkId;
|
this._networkId = networkId;
|
||||||
|
@ -347,6 +347,7 @@ export class ERC20TokenWrapper extends ContractWrapper {
|
|||||||
* @param indexFilterValues An object where the keys are indexed args returned by the event and
|
* @param indexFilterValues An object where the keys are indexed args returned by the event and
|
||||||
* the value is the value you are interested in. E.g `{maker: aUserAddressHex}`
|
* the value is the value you are interested in. E.g `{maker: aUserAddressHex}`
|
||||||
* @param callback Callback that gets called when a log is added/removed
|
* @param callback Callback that gets called when a log is added/removed
|
||||||
|
* @param isVerbose Enable verbose subscription warnings (e.g recoverable network issues encountered)
|
||||||
* @return Subscription token used later to unsubscribe
|
* @return Subscription token used later to unsubscribe
|
||||||
*/
|
*/
|
||||||
public subscribe<ArgsType extends ERC20TokenEventArgs>(
|
public subscribe<ArgsType extends ERC20TokenEventArgs>(
|
||||||
@ -354,6 +355,7 @@ export class ERC20TokenWrapper extends ContractWrapper {
|
|||||||
eventName: ERC20TokenEvents,
|
eventName: ERC20TokenEvents,
|
||||||
indexFilterValues: IndexedFilterValues,
|
indexFilterValues: IndexedFilterValues,
|
||||||
callback: EventCallback<ArgsType>,
|
callback: EventCallback<ArgsType>,
|
||||||
|
isVerbose: boolean = false,
|
||||||
): string {
|
): string {
|
||||||
assert.isETHAddressHex('tokenAddress', tokenAddress);
|
assert.isETHAddressHex('tokenAddress', tokenAddress);
|
||||||
assert.doesBelongToStringEnum('eventName', eventName, ERC20TokenEvents);
|
assert.doesBelongToStringEnum('eventName', eventName, ERC20TokenEvents);
|
||||||
@ -366,6 +368,7 @@ export class ERC20TokenWrapper extends ContractWrapper {
|
|||||||
indexFilterValues,
|
indexFilterValues,
|
||||||
artifacts.ERC20Token.compilerOutput.abi,
|
artifacts.ERC20Token.compilerOutput.abi,
|
||||||
callback,
|
callback,
|
||||||
|
isVerbose,
|
||||||
);
|
);
|
||||||
return subscriptionToken;
|
return subscriptionToken;
|
||||||
}
|
}
|
||||||
|
@ -372,6 +372,7 @@ export class ERC721TokenWrapper extends ContractWrapper {
|
|||||||
* @param indexFilterValues An object where the keys are indexed args returned by the event and
|
* @param indexFilterValues An object where the keys are indexed args returned by the event and
|
||||||
* the value is the value you are interested in. E.g `{maker: aUserAddressHex}`
|
* the value is the value you are interested in. E.g `{maker: aUserAddressHex}`
|
||||||
* @param callback Callback that gets called when a log is added/removed
|
* @param callback Callback that gets called when a log is added/removed
|
||||||
|
* @param isVerbose Enable verbose subscription warnings (e.g recoverable network issues encountered)
|
||||||
* @return Subscription token used later to unsubscribe
|
* @return Subscription token used later to unsubscribe
|
||||||
*/
|
*/
|
||||||
public subscribe<ArgsType extends ERC721TokenEventArgs>(
|
public subscribe<ArgsType extends ERC721TokenEventArgs>(
|
||||||
@ -379,6 +380,7 @@ export class ERC721TokenWrapper extends ContractWrapper {
|
|||||||
eventName: ERC721TokenEvents,
|
eventName: ERC721TokenEvents,
|
||||||
indexFilterValues: IndexedFilterValues,
|
indexFilterValues: IndexedFilterValues,
|
||||||
callback: EventCallback<ArgsType>,
|
callback: EventCallback<ArgsType>,
|
||||||
|
isVerbose: boolean = false,
|
||||||
): string {
|
): string {
|
||||||
assert.isETHAddressHex('tokenAddress', tokenAddress);
|
assert.isETHAddressHex('tokenAddress', tokenAddress);
|
||||||
assert.doesBelongToStringEnum('eventName', eventName, ERC721TokenEvents);
|
assert.doesBelongToStringEnum('eventName', eventName, ERC721TokenEvents);
|
||||||
@ -391,6 +393,7 @@ export class ERC721TokenWrapper extends ContractWrapper {
|
|||||||
indexFilterValues,
|
indexFilterValues,
|
||||||
artifacts.ERC721Token.compilerOutput.abi,
|
artifacts.ERC721Token.compilerOutput.abi,
|
||||||
callback,
|
callback,
|
||||||
|
isVerbose,
|
||||||
);
|
);
|
||||||
return subscriptionToken;
|
return subscriptionToken;
|
||||||
}
|
}
|
||||||
|
@ -146,6 +146,7 @@ export class EtherTokenWrapper extends ContractWrapper {
|
|||||||
* @param indexFilterValues An object where the keys are indexed args returned by the event and
|
* @param indexFilterValues An object where the keys are indexed args returned by the event and
|
||||||
* the value is the value you are interested in. E.g `{_owner: aUserAddressHex}`
|
* the value is the value you are interested in. E.g `{_owner: aUserAddressHex}`
|
||||||
* @param callback Callback that gets called when a log is added/removed
|
* @param callback Callback that gets called when a log is added/removed
|
||||||
|
* @param isVerbose Enable verbose subscription warnings (e.g recoverable network issues encountered)
|
||||||
* @return Subscription token used later to unsubscribe
|
* @return Subscription token used later to unsubscribe
|
||||||
*/
|
*/
|
||||||
public subscribe<ArgsType extends WETH9EventArgs>(
|
public subscribe<ArgsType extends WETH9EventArgs>(
|
||||||
@ -153,6 +154,7 @@ export class EtherTokenWrapper extends ContractWrapper {
|
|||||||
eventName: WETH9Events,
|
eventName: WETH9Events,
|
||||||
indexFilterValues: IndexedFilterValues,
|
indexFilterValues: IndexedFilterValues,
|
||||||
callback: EventCallback<ArgsType>,
|
callback: EventCallback<ArgsType>,
|
||||||
|
isVerbose: boolean = false,
|
||||||
): string {
|
): string {
|
||||||
assert.isETHAddressHex('etherTokenAddress', etherTokenAddress);
|
assert.isETHAddressHex('etherTokenAddress', etherTokenAddress);
|
||||||
const normalizedEtherTokenAddress = etherTokenAddress.toLowerCase();
|
const normalizedEtherTokenAddress = etherTokenAddress.toLowerCase();
|
||||||
@ -165,6 +167,7 @@ export class EtherTokenWrapper extends ContractWrapper {
|
|||||||
indexFilterValues,
|
indexFilterValues,
|
||||||
artifacts.EtherToken.compilerOutput.abi,
|
artifacts.EtherToken.compilerOutput.abi,
|
||||||
callback,
|
callback,
|
||||||
|
isVerbose,
|
||||||
);
|
);
|
||||||
return subscriptionToken;
|
return subscriptionToken;
|
||||||
}
|
}
|
||||||
|
@ -988,12 +988,14 @@ export class ExchangeWrapper extends ContractWrapper {
|
|||||||
* @param indexFilterValues An object where the keys are indexed args returned by the event and
|
* @param indexFilterValues An object where the keys are indexed args returned by the event and
|
||||||
* the value is the value you are interested in. E.g `{maker: aUserAddressHex}`
|
* the value is the value you are interested in. E.g `{maker: aUserAddressHex}`
|
||||||
* @param callback Callback that gets called when a log is added/removed
|
* @param callback Callback that gets called when a log is added/removed
|
||||||
|
* @param isVerbose Enable verbose subscription warnings (e.g recoverable network issues encountered)
|
||||||
* @return Subscription token used later to unsubscribe
|
* @return Subscription token used later to unsubscribe
|
||||||
*/
|
*/
|
||||||
public subscribe<ArgsType extends ExchangeEventArgs>(
|
public subscribe<ArgsType extends ExchangeEventArgs>(
|
||||||
eventName: ExchangeEvents,
|
eventName: ExchangeEvents,
|
||||||
indexFilterValues: IndexedFilterValues,
|
indexFilterValues: IndexedFilterValues,
|
||||||
callback: EventCallback<ArgsType>,
|
callback: EventCallback<ArgsType>,
|
||||||
|
isVerbose: boolean = false,
|
||||||
): string {
|
): string {
|
||||||
assert.doesBelongToStringEnum('eventName', eventName, ExchangeEvents);
|
assert.doesBelongToStringEnum('eventName', eventName, ExchangeEvents);
|
||||||
assert.doesConformToSchema('indexFilterValues', indexFilterValues, schemas.indexFilterValuesSchema);
|
assert.doesConformToSchema('indexFilterValues', indexFilterValues, schemas.indexFilterValuesSchema);
|
||||||
@ -1005,6 +1007,7 @@ export class ExchangeWrapper extends ContractWrapper {
|
|||||||
indexFilterValues,
|
indexFilterValues,
|
||||||
artifacts.Exchange.compilerOutput.abi,
|
artifacts.Exchange.compilerOutput.abi,
|
||||||
callback,
|
callback,
|
||||||
|
isVerbose,
|
||||||
);
|
);
|
||||||
return subscriptionToken;
|
return subscriptionToken;
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user