fix: async pool cache (#226)

This commit is contained in:
Jacob Evans 2021-05-05 21:01:51 +10:00 committed by GitHub
parent c73097e688
commit 6ee0108565
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 77 additions and 63 deletions

View File

@ -33,6 +33,10 @@
{
"note": "Add Balancer V2 integration",
"pr": 206
},
{
"note": "Re-work the PoolCache for Balancer et al",
"pr": 226
}
]
},

View File

@ -46,7 +46,6 @@ import {
OptimizerResult,
OptimizerResultWithReport,
OrderDomain,
SourcesWithPoolsCache,
} from './types';
// tslint:disable:boolean-naming
@ -101,15 +100,6 @@ export class MarketOperationUtils {
const quoteSourceFilters = this._sellSources.merge(requestFilters);
const feeSourceFilters = this._feeSources.exclude(_opts.excludedFeeSources);
// Can't sample Balancer or Cream on-chain without the pools cache
const sourcesWithStaleCaches: SourcesWithPoolsCache[] = (Object.keys(
this._sampler.poolsCaches,
) as SourcesWithPoolsCache[]).filter(s => !this._sampler.poolsCaches[s].isFresh(takerToken, makerToken));
// tslint:disable-next-line:promise-function-async
const cacheRefreshPromises: Array<Promise<any[]>> = sourcesWithStaleCaches.map(s =>
this._sampler.poolsCaches[s].getFreshPoolsForPairAsync(takerToken, makerToken),
);
// Used to determine whether the tx origin is an EOA or a contract
const txOrigin = (_opts.rfqt && _opts.rfqt.txOrigin) || NULL_ADDRESS;
@ -142,6 +132,10 @@ export class MarketOperationUtils {
),
this._sampler.isAddressContract(txOrigin),
);
// Refresh the cached pools asynchronously if required
void this._refreshPoolCacheIfRequiredAsync(takerToken, makerToken);
const [
[
tokenDecimals,
@ -152,7 +146,7 @@ export class MarketOperationUtils {
rawTwoHopQuotes,
isTxOriginContract,
],
] = await Promise.all([samplerPromise, Promise.all(cacheRefreshPromises)]);
] = await Promise.all([samplerPromise]);
// Filter out any invalid two hop quotes where we couldn't find a route
const twoHopQuotes = rawTwoHopQuotes.filter(
@ -207,15 +201,6 @@ export class MarketOperationUtils {
const quoteSourceFilters = this._buySources.merge(requestFilters);
const feeSourceFilters = this._feeSources.exclude(_opts.excludedFeeSources);
// Can't sample Balancer or Cream on-chain without the pools cache
const sourcesWithStaleCaches: SourcesWithPoolsCache[] = (Object.keys(
this._sampler.poolsCaches,
) as SourcesWithPoolsCache[]).filter(s => !this._sampler.poolsCaches[s].isFresh(takerToken, makerToken));
// tslint:disable-next-line:promise-function-async
const cacheRefreshPromises: Array<Promise<any[]>> = sourcesWithStaleCaches.map(s =>
this._sampler.poolsCaches[s].getFreshPoolsForPairAsync(takerToken, makerToken),
);
// Used to determine whether the tx origin is an EOA or a contract
const txOrigin = (_opts.rfqt && _opts.rfqt.txOrigin) || NULL_ADDRESS;
@ -249,6 +234,9 @@ export class MarketOperationUtils {
this._sampler.isAddressContract(txOrigin),
);
// Refresh the cached pools asynchronously if required
void this._refreshPoolCacheIfRequiredAsync(takerToken, makerToken);
const [
[
tokenDecimals,
@ -259,7 +247,7 @@ export class MarketOperationUtils {
rawTwoHopQuotes,
isTxOriginContract,
],
] = await Promise.all([samplerPromise, Promise.all(cacheRefreshPromises)]);
] = await Promise.all([samplerPromise]);
// Filter out any invalid two hop quotes where we couldn't find a route
const twoHopQuotes = rawTwoHopQuotes.filter(
@ -691,6 +679,17 @@ export class MarketOperationUtils {
}
return { ...optimizerResult, quoteReport };
}
private async _refreshPoolCacheIfRequiredAsync(takerToken: string, makerToken: string): Promise<void> {
void Promise.all(
Object.values(this._sampler.poolsCaches).map(async cache => {
if (cache.isFresh(takerToken, makerToken)) {
return Promise.resolve([]);
}
return cache.getFreshPoolsForPairAsync(takerToken, makerToken);
}),
);
}
}
// tslint:disable: max-file-line-count

View File

@ -16,6 +16,7 @@ interface BalancerPoolResponse {
tokensList: string[];
totalWeight: string;
}
export class BalancerPoolsCache extends PoolsCache {
constructor(
private readonly _subgraphUrl: string = BALANCER_SUBGRAPH_URL,
@ -63,7 +64,8 @@ export class BalancerPoolsCache extends PoolsCache {
const poolData = parsePoolData([pool], from, to);
fromToPools[from][to].push(poolData[0]);
// Cache this as we progress through
this._cachePoolsForPair(from, to, fromToPools[from][to]);
const expiresAt = Date.now() + this._cacheTimeMs;
this._cachePoolsForPair(from, to, fromToPools[from][to], expiresAt);
} catch {
// soldier on
}

View File

@ -58,6 +58,7 @@ export class BalancerV2PoolsCache extends PoolsCache {
}
}
`;
try {
const { pools } = await request(this.subgraphUrl, query);
return pools.map((pool: any) => {
const tToken = pool.tokens.find((t: any) => t.address === takerToken);
@ -78,5 +79,8 @@ export class BalancerV2PoolsCache extends PoolsCache {
spotPrice,
};
});
} catch (e) {
return [];
}
}
}

View File

@ -1,18 +1,26 @@
import { Pool } from '@balancer-labs/sor/dist/types';
import { ONE_HOUR_IN_SECONDS, ONE_SECOND_MS } from '../constants';
export { Pool };
export interface CacheValue {
timestamp: number;
expiresAt: number;
pools: Pool[];
}
// tslint:disable:custom-no-magic-numbers
const FIVE_SECONDS_MS = 5 * 1000;
const ONE_DAY_MS = 24 * 60 * 60 * 1000;
// Cache results for 30mins
const DEFAULT_CACHE_TIME_MS = (ONE_HOUR_IN_SECONDS / 2) * ONE_SECOND_MS;
const DEFAULT_TIMEOUT_MS = 1000;
// tslint:enable:custom-no-magic-numbers
export abstract class PoolsCache {
constructor(protected readonly _cache: { [key: string]: CacheValue }) {}
protected static _isExpired(value: CacheValue): boolean {
return Date.now() >= value.expiresAt;
}
constructor(
protected readonly _cache: { [key: string]: CacheValue },
protected readonly _cacheTimeMs: number = DEFAULT_CACHE_TIME_MS,
) {}
public async getFreshPoolsForPairAsync(
takerToken: string,
@ -26,46 +34,43 @@ export abstract class PoolsCache {
public getCachedPoolAddressesForPair(
takerToken: string,
makerToken: string,
cacheExpiryMs?: number,
ignoreExpired: boolean = true,
): string[] | undefined {
const key = JSON.stringify([takerToken, makerToken]);
const value = this._cache[key];
if (cacheExpiryMs === undefined) {
if (ignoreExpired) {
return value === undefined ? [] : value.pools.map(pool => pool.id);
}
const minTimestamp = Date.now() - cacheExpiryMs;
if (value === undefined || value.timestamp < minTimestamp) {
if (!value) {
return undefined;
} else {
return value.pools.map(pool => pool.id);
}
if (PoolsCache._isExpired(value)) {
return undefined;
}
return (value || []).pools.map(pool => pool.id);
}
public isFresh(takerToken: string, makerToken: string): boolean {
const cached = this.getCachedPoolAddressesForPair(takerToken, makerToken, ONE_DAY_MS);
return cached !== undefined && cached.length > 0;
const cached = this.getCachedPoolAddressesForPair(takerToken, makerToken, false);
return cached !== undefined;
}
protected async _getAndSaveFreshPoolsForPairAsync(
takerToken: string,
makerToken: string,
cacheExpiryMs: number = FIVE_SECONDS_MS,
): Promise<Pool[]> {
protected async _getAndSaveFreshPoolsForPairAsync(takerToken: string, makerToken: string): Promise<Pool[]> {
const key = JSON.stringify([takerToken, makerToken]);
const value = this._cache[key];
const minTimestamp = Date.now() - cacheExpiryMs;
if (value === undefined || value.timestamp < minTimestamp) {
if (value === undefined || value.expiresAt >= Date.now()) {
const pools = await this._fetchPoolsForPairAsync(takerToken, makerToken);
this._cachePoolsForPair(takerToken, makerToken, pools);
const expiresAt = Date.now() + this._cacheTimeMs;
this._cachePoolsForPair(takerToken, makerToken, pools, expiresAt);
}
return this._cache[key].pools;
}
protected _cachePoolsForPair(takerToken: string, makerToken: string, pools: Pool[]): void {
protected _cachePoolsForPair(takerToken: string, makerToken: string, pools: Pool[], expiresAt: number): void {
const key = JSON.stringify([takerToken, makerToken]);
this._cache[key] = {
pools,
timestamp: Date.now(),
expiresAt,
};
}