[swap] Remove RFQ Maker Balance Cache [LIT-908] (#175)

This commit is contained in:
Kyu
2023-03-14 12:42:03 -07:00
committed by GitHub
parent bd2e585c78
commit dfd51710fd
7 changed files with 3 additions and 579 deletions

View File

@@ -57,6 +57,6 @@ export { IdentityFillAdjustor } from './utils/market_operation_utils/identity_fi
export { GasPriceUtils } from './utils/gas_price_utils';
export { jsonifyFillData, ExtendedQuoteReport } from './utils/quote_report_generator';
export { QuoteRequestor } from './utils/quote_requestor';
export { ERC20BridgeSamplerContract, BalanceCheckerContract, FakeTakerContract } from '../wrappers';
export { ERC20BridgeSamplerContract, FakeTakerContract } from '../wrappers';
export { adjustOutput } from './utils/market_operation_utils/fills';

View File

@@ -283,11 +283,11 @@ export const SRA_PERSISTENT_ORDER_POSTING_WHITELISTED_API_KEYS: string[] =
);
// Whether or not prometheus metrics should be enabled.
export const ENABLE_PROMETHEUS_METRICS: boolean = _.isEmpty(process.env.ENABLE_PROMETHEUS_METRICS)
const ENABLE_PROMETHEUS_METRICS: boolean = _.isEmpty(process.env.ENABLE_PROMETHEUS_METRICS)
? false
: assertEnvVarType('ENABLE_PROMETHEUS_METRICS', process.env.ENABLE_PROMETHEUS_METRICS, EnvVarType.Boolean);
export const PROMETHEUS_PORT: number = _.isEmpty(process.env.PROMETHEUS_PORT)
const PROMETHEUS_PORT: number = _.isEmpty(process.env.PROMETHEUS_PORT)
? 8080
: assertEnvVarType('PROMETHEUS_PORT', process.env.PROMETHEUS_PORT, EnvVarType.Port);

View File

@@ -68,9 +68,6 @@ export const META_TRANSACTION_DOCS_URL = 'https://0x.org/docs/api#meta_transacti
export const DEFAULT_ZERO_EX_GAS_API_URL = 'https://gas.api.0x.org/source/median';
// RFQ Quote Validator expiration threshold
export const RFQ_FIRM_QUOTE_CACHE_EXPIRY = ONE_MINUTE_MS * 2;
export const RFQ_ALLOWANCE_TARGET = '0xdef1c0ded9bec7f1a1670819833240f027b25eff';
export const RFQ_DYNAMIC_BLACKLIST_TTL = ONE_SECOND_MS * 30;
// General cache control

View File

@@ -1,283 +0,0 @@
import { createMetricsRouter, MetricsService } from '@0x/api-utils';
import { BlockParamLiteral, SupportedProvider, Web3Wrapper } from '@0x/dev-utils';
import { BigNumber, logUtils } from '@0x/utils';
import * as delay from 'delay';
import * as express from 'express';
import * as _ from 'lodash';
import { Gauge, Summary } from 'prom-client';
import { Connection } from 'typeorm';
import { artifacts } from '../artifacts';
import { BalanceCheckerContract } from '../asset-swapper';
import * as defaultConfig from '../config';
import { METRICS_PATH, ONE_SECOND_MS, RFQ_ALLOWANCE_TARGET, RFQ_FIRM_QUOTE_CACHE_EXPIRY } from '../constants';
import { getDBConnectionOrThrow } from '../db_connection';
import { MakerBalanceChainCacheEntity } from '../entities';
import { logger } from '../logger';
import { providerUtils } from '../utils/provider_utils';
import { createResultCache, ResultCache } from '../utils/result_cache';
const DELAY_WHEN_NEW_BLOCK_FOUND = ONE_SECOND_MS * 5;
const DELAY_WHEN_NEW_BLOCK_NOT_FOUND = ONE_SECOND_MS;
const CACHE_MAKER_TOKENS_FOR_MS = Math.floor(RFQ_FIRM_QUOTE_CACHE_EXPIRY / 4);
// The eth_call will run out of gas if there are too many balance calls at once
const MAX_BALANCE_CHECKS_PER_CALL = 1000;
const BALANCE_CHECKER_GAS_LIMIT = 10000000;
// Maximum balances to save at once
const MAX_ROWS_TO_UPDATE = 1000;
const RANDOM_ADDRESS = '0xffffffffffffffffffffffffffffffffffffffff';
const MAX_REQUEST_ERRORS = 10;
const MAX_CACHE_RFQ_BALANCES_ERRORS = 10;
// Metric collection related fields
const LATEST_BLOCK_PROCESSED_GAUGE = new Gauge({
name: 'rfqtw_latest_block_processed',
help: 'Latest block processed by the RFQ worker process',
labelNames: ['workerId'],
});
const MAKER_BALANCE_CACHE_RESULT_COUNT = new Gauge({
name: 'maker_balance_cache_result_count',
help: 'Records the number of records being returned by the DB',
labelNames: ['workerId'],
});
const MAKER_BALANCE_CACHE_RETRIEVAL_TIME = new Summary({
name: 'maker_balance_cache_retrieval_time',
help: 'Records the amount of time needed to grab records',
labelNames: ['workerId'],
});
process.on('uncaughtException', (err) => {
logger.error(err);
process.exit(1);
});
process.on('unhandledRejection', (err) => {
if (err) {
logger.error(err);
}
});
interface BalancesCallInput {
addresses: string[];
tokens: string[];
}
if (require.main === module) {
(async () => {
logger.info('running RFQ balance cache runner');
const provider = providerUtils.createWeb3Provider(
defaultConfig.defaultHttpServiceConfig.ethereumRpcUrl,
defaultConfig.defaultHttpServiceConfig.rpcRequestTimeout,
defaultConfig.defaultHttpServiceConfig.shouldCompressRequest,
);
const web3Wrapper = new Web3Wrapper(provider);
const connection = await getDBConnectionOrThrow();
const balanceCheckerContractInterface = getBalanceCheckerContractInterface(RANDOM_ADDRESS, provider);
await runRfqBalanceCacheAsync(web3Wrapper, connection, balanceCheckerContractInterface);
})().catch((error) => {
logger.error(error);
process.exit(1);
});
}
async function runRfqBalanceCacheAsync(
web3Wrapper: Web3Wrapper,
connection: Connection,
balanceCheckerContractInterface: BalanceCheckerContract,
): Promise<void> {
if (defaultConfig.ENABLE_PROMETHEUS_METRICS) {
const app = express();
const metricsService = new MetricsService();
const metricsRouter = createMetricsRouter(metricsService);
app.use(METRICS_PATH, metricsRouter);
const server = app.listen(defaultConfig.PROMETHEUS_PORT, () => {
logger.info(`Metrics (HTTP) listening on port ${defaultConfig.PROMETHEUS_PORT}`);
});
server.on('error', (err) => {
logger.error(err);
});
}
let blockRequestErrors = 0;
let cacheRfqBalanceErrors = 0;
const workerId = _.uniqueId('rfqw_');
let lastBlockSeen = -1;
// eslint-disable-next-line no-constant-condition
while (true) {
if (blockRequestErrors >= MAX_REQUEST_ERRORS) {
throw new Error(`too many bad Web3 requests to fetch blocks (reached limit of ${MAX_REQUEST_ERRORS})`);
}
if (cacheRfqBalanceErrors >= MAX_CACHE_RFQ_BALANCES_ERRORS) {
throw new Error(
`too many errors from calling cacheRfqBalancesAsync (reached limit of ${MAX_CACHE_RFQ_BALANCES_ERRORS})`,
);
}
let newBlock: number;
try {
newBlock = await web3Wrapper.getBlockNumberAsync();
} catch (err) {
blockRequestErrors += 1;
logger.error(err);
continue;
}
if (lastBlockSeen < newBlock) {
logUtils.log(
{
block: newBlock,
workerId,
},
'Found new block',
);
try {
await cacheRfqBalancesAsync(connection, balanceCheckerContractInterface, true, workerId);
} catch (err) {
logger.error(err);
cacheRfqBalanceErrors += 1;
continue;
}
LATEST_BLOCK_PROCESSED_GAUGE.labels(workerId).set(newBlock);
lastBlockSeen = newBlock;
await delay(DELAY_WHEN_NEW_BLOCK_FOUND);
} else {
await delay(DELAY_WHEN_NEW_BLOCK_NOT_FOUND);
}
}
}
/**
* This function retrieves and caches ERC20 balances of RFQ market makers
*/
export async function cacheRfqBalancesAsync(
connection: Connection,
balanceCheckerContractInterface: BalanceCheckerContract,
codeOverride: boolean,
workerId: string,
): Promise<void> {
const makerTokens = await getMakerTokensAsync(connection, workerId);
const balancesCallInput = splitValues(makerTokens);
const updateTime = new Date();
const erc20Balances = await getErc20BalancesAsync(balanceCheckerContractInterface, balancesCallInput, codeOverride);
await updateErc20BalancesAsync(balancesCallInput, erc20Balances, connection, updateTime);
}
// NOTE: this only returns a partial entity class, just token address and maker address
// Cache the query results to reduce reads from the DB
let MAKER_TOKEN_CACHE: ResultCache<MakerBalanceChainCacheEntity[]>;
async function getMakerTokensAsync(connection: Connection, workerId: string): Promise<MakerBalanceChainCacheEntity[]> {
const start = new Date().getTime();
if (!MAKER_TOKEN_CACHE) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any -- TODO: fix me!
MAKER_TOKEN_CACHE = createResultCache<any[]>(
() =>
connection
.getRepository(MakerBalanceChainCacheEntity)
.createQueryBuilder('maker_balance_chain_cache')
.select(['maker_balance_chain_cache.tokenAddress', 'maker_balance_chain_cache.makerAddress'])
.getMany(),
CACHE_MAKER_TOKENS_FOR_MS,
);
}
const results = (await MAKER_TOKEN_CACHE.getResultAsync()).result;
MAKER_BALANCE_CACHE_RESULT_COUNT.labels(workerId).set(results.length);
MAKER_BALANCE_CACHE_RETRIEVAL_TIME.labels(workerId).observe(new Date().getTime() - start);
return results;
}
function splitValues(makerTokens: MakerBalanceChainCacheEntity[]): BalancesCallInput {
const functionInputs: BalancesCallInput = { addresses: [], tokens: [] };
return makerTokens.reduce(({ addresses, tokens }, makerToken) => {
return {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion -- TODO: fix me!
addresses: addresses.concat(makerToken.makerAddress!),
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion -- TODO: fix me!
tokens: tokens.concat(makerToken.tokenAddress!),
};
}, functionInputs);
}
/**
* Returns the balaceChecker interface given a random address
*/
function getBalanceCheckerContractInterface(
contractAddress: string,
provider: SupportedProvider,
): BalanceCheckerContract {
return new BalanceCheckerContract(contractAddress, provider, { gas: BALANCE_CHECKER_GAS_LIMIT });
}
async function getErc20BalancesAsync(
balanceCheckerContractInterface: BalanceCheckerContract,
balancesCallInput: BalancesCallInput,
// HACK: allow for testing on ganache without override
codeOverride: boolean,
): Promise<string[]> {
// due to gas contraints limit the call to 1K balance checks
const addressesChunkedArray = _.chunk(balancesCallInput.addresses, MAX_BALANCE_CHECKS_PER_CALL);
const tokensChunkedArray = _.chunk(balancesCallInput.tokens, MAX_BALANCE_CHECKS_PER_CALL);
const balanceCheckerByteCode = _.get(artifacts.BalanceChecker, 'compilerOutput.evm.deployedBytecode.object');
const balances = await Promise.all(
_.zip(addressesChunkedArray, tokensChunkedArray).map(async ([addressesChunk, tokensChunk]) => {
const txOpts = codeOverride
? {
overrides: {
[RANDOM_ADDRESS]: {
code: balanceCheckerByteCode,
},
},
}
: {};
return (
balanceCheckerContractInterface
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion -- TODO: fix me!
.getMinOfBalancesOrAllowances(addressesChunk!, tokensChunk!, RFQ_ALLOWANCE_TARGET)
.callAsync(txOpts, BlockParamLiteral.Latest)
);
}),
);
const balancesFlattened = Array.prototype.concat.apply([], balances);
// eslint-disable-next-line @typescript-eslint/no-explicit-any -- TODO: fix me!
return balancesFlattened.map((bal: any) => bal.toString());
}
async function updateErc20BalancesAsync(
balancesCallInput: BalancesCallInput,
balances: string[],
connection: Connection,
updateTime: Date,
): Promise<void> {
const toSave = balancesCallInput.addresses.map((addr, i) => {
const dbEntity = new MakerBalanceChainCacheEntity();
dbEntity.makerAddress = addr;
dbEntity.tokenAddress = balancesCallInput.tokens[i];
dbEntity.balance = new BigNumber(balances[i]);
dbEntity.timeOfSample = updateTime;
return dbEntity;
});
await connection.getRepository(MakerBalanceChainCacheEntity).save(toSave, { chunk: MAX_ROWS_TO_UPDATE });
}

View File

@@ -1,41 +0,0 @@
import { TEN_MINUTES_MS } from '../constants';
import { logger } from '../logger';
interface CachedResult<T> {
timestamp: number;
result: T;
}
export interface ResultCache<T> {
// eslint-disable-next-line @typescript-eslint/no-explicit-any -- TODO: fix me!
getResultAsync: (args?: any) => Promise<CachedResult<T>>;
}
export const createResultCache = <T>(
// eslint-disable-next-line @typescript-eslint/no-explicit-any -- TODO: fix me!
fn: (fnArgs?: any) => Promise<T>,
cacheExpiryMs: number = TEN_MINUTES_MS,
): ResultCache<T> => {
const resultCache: { [key: string]: { timestamp: number; result: T } } = {};
return {
// eslint-disable-next-line @typescript-eslint/no-explicit-any -- TODO: fix me!
getResultAsync: async (getArgs?: any): Promise<CachedResult<T>> => {
let timestamp = resultCache[getArgs] && resultCache[getArgs].timestamp;
let result = resultCache[getArgs] && resultCache[getArgs].result;
if (!result || !timestamp || timestamp < Date.now() - cacheExpiryMs) {
try {
result = await fn(getArgs);
timestamp = Date.now();
resultCache[getArgs] = { timestamp, result };
} catch (e) {
if (!result) {
// Throw if we've never received a result
throw e;
}
logger.warn(`Error performing cache miss update: ${e}`);
}
}
return { timestamp, result };
},
};
};

View File

@@ -1,129 +0,0 @@
import { artifacts as erc20Artifacts, DummyERC20TokenContract } from '@0x/contracts-erc20';
import { blockchainTests, constants, expect, web3Wrapper } from '@0x/contracts-test-utils';
import { BigNumber } from '@0x/utils';
import { artifacts } from '../../artifacts';
import { BalanceCheckerContract } from '../../wrappers';
const ETH_ADDRESS = '0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee';
blockchainTests.resets('BalanceChecker contract', (env) => {
let contract: BalanceCheckerContract;
before(async () => {
contract = await BalanceCheckerContract.deployFrom0xArtifactAsync(
artifacts.BalanceChecker,
env.provider,
env.txDefaults,
{},
);
});
describe('getBalances', () => {
it('returns the correct array for a successful call', async () => {
const makerToken = await DummyERC20TokenContract.deployFrom0xArtifactAsync(
erc20Artifacts.DummyERC20Token,
env.provider,
env.txDefaults,
artifacts,
constants.DUMMY_TOKEN_NAME,
constants.DUMMY_TOKEN_SYMBOL,
new BigNumber(18),
constants.DUMMY_TOKEN_TOTAL_SUPPLY,
);
const accounts = await web3Wrapper.getAvailableAddressesAsync();
const owner = accounts[0];
const owner2 = accounts[1];
await makerToken.mint(new BigNumber(100)).awaitTransactionSuccessAsync({ from: owner });
const testResults = await contract.balances([owner, owner2], [makerToken.address, ETH_ADDRESS]).callAsync();
expect(testResults).to.eql([new BigNumber(100), new BigNumber(1000000000000000000000)]);
});
it('it throws an error if the input arrays of different lengths', async () => {
const accounts = await web3Wrapper.getAvailableAddressesAsync();
const owner = accounts[0];
try {
await contract.balances([owner], [ETH_ADDRESS, ETH_ADDRESS]).callAsync();
expect.fail();
} catch (error) {
expect(error.message).to.eql('users array is a different length than the tokens array');
}
});
});
describe('getMinOfBalancesOrAllowances', () => {
it('returns the balance if the allowance can cover it', async () => {
const makerToken = await DummyERC20TokenContract.deployFrom0xArtifactAsync(
erc20Artifacts.DummyERC20Token,
env.provider,
env.txDefaults,
artifacts,
constants.DUMMY_TOKEN_NAME,
constants.DUMMY_TOKEN_SYMBOL,
new BigNumber(18),
constants.DUMMY_TOKEN_TOTAL_SUPPLY,
);
const accounts = await web3Wrapper.getAvailableAddressesAsync();
const owner = accounts[0];
const owner2 = accounts[1];
const allowanceTarget = '0xdef1c0ded9bec7f1a1670819833240f027b25eff';
await makerToken.mint(new BigNumber(100)).awaitTransactionSuccessAsync({ from: owner });
await makerToken.approve(allowanceTarget, new BigNumber(150)).awaitTransactionSuccessAsync({ from: owner });
await makerToken.mint(new BigNumber(150)).awaitTransactionSuccessAsync({ from: owner2 });
await makerToken
.approve(allowanceTarget, new BigNumber(200))
.awaitTransactionSuccessAsync({ from: owner2 });
const testResults = await contract
.getMinOfBalancesOrAllowances(
[owner, owner2],
[makerToken.address, makerToken.address],
allowanceTarget,
)
.callAsync();
expect(testResults).to.eql([new BigNumber(100), new BigNumber(150)]);
});
it('returns the allowance if the allowance < balance', async () => {
const makerToken = await DummyERC20TokenContract.deployFrom0xArtifactAsync(
erc20Artifacts.DummyERC20Token,
env.provider,
env.txDefaults,
artifacts,
constants.DUMMY_TOKEN_NAME,
constants.DUMMY_TOKEN_SYMBOL,
new BigNumber(18),
constants.DUMMY_TOKEN_TOTAL_SUPPLY,
);
const accounts = await web3Wrapper.getAvailableAddressesAsync();
const owner = accounts[0];
const owner2 = accounts[1];
const allowanceTarget = '0xdef1c0ded9bec7f1a1670819833240f027b25eff';
await makerToken.mint(new BigNumber(100)).awaitTransactionSuccessAsync({ from: owner });
await makerToken.approve(allowanceTarget, new BigNumber(50)).awaitTransactionSuccessAsync({ from: owner });
await makerToken.mint(new BigNumber(100)).awaitTransactionSuccessAsync({ from: owner2 });
await makerToken.approve(allowanceTarget, new BigNumber(75)).awaitTransactionSuccessAsync({ from: owner2 });
const testResults = await contract
.getMinOfBalancesOrAllowances(
[owner, owner2],
[makerToken.address, makerToken.address],
allowanceTarget,
)
.callAsync();
expect(testResults).to.eql([new BigNumber(50), new BigNumber(75)]);
});
});
});

View File

@@ -1,120 +0,0 @@
import { artifacts as erc20Artifacts, DummyERC20TokenContract } from '@0x/contracts-erc20';
import { expect } from '@0x/contracts-test-utils';
import { Web3ProviderEngine } from '@0x/dev-utils';
import { BigNumber } from '@0x/utils';
import { Web3Wrapper } from '@0x/web3-wrapper';
import 'mocha';
import { Connection, Repository } from 'typeorm';
import { artifacts } from '../src/artifacts';
import { BalanceCheckerContract } from '../src/asset-swapper';
import { RFQ_ALLOWANCE_TARGET } from '../src/constants';
import { MakerBalanceChainCacheEntity } from '../src/entities';
import { cacheRfqBalancesAsync } from '../src/runners/rfq_maker_balance_cache_runner';
import { getProvider } from './constants';
import { initDBConnectionAsync } from './utils/db_connection';
import { setupDependenciesAsync, teardownDependenciesAsync } from './utils/deployment';
const SUITE_NAME = 'RFQ Maker Balance Cache Tests';
describe(SUITE_NAME, () => {
let provider: Web3ProviderEngine;
let balanceCheckerContract: BalanceCheckerContract;
let dbConnection: Connection;
let zrx: DummyERC20TokenContract;
let balanceRepo: Repository<MakerBalanceChainCacheEntity>;
let web3Wrapper: Web3Wrapper;
let makerAddress1: string;
let makerAddress2: string;
before(async () => {
await setupDependenciesAsync(SUITE_NAME);
provider = getProvider();
web3Wrapper = new Web3Wrapper(provider);
const accounts = await web3Wrapper.getAvailableAddressesAsync();
const deployer = accounts[0];
makerAddress1 = accounts[1];
makerAddress2 = accounts[2];
zrx = await DummyERC20TokenContract.deployFrom0xArtifactAsync(
erc20Artifacts.DummyERC20Token,
provider,
{ from: deployer, gas: 10000000 },
{},
'0x Protocol Token',
'ZRX',
new BigNumber(18),
new BigNumber(1000000),
);
await zrx.mint(new BigNumber(100)).awaitTransactionSuccessAsync({ from: makerAddress1 });
await zrx
.approve(RFQ_ALLOWANCE_TARGET, new BigNumber(100))
.awaitTransactionSuccessAsync({ from: makerAddress1 });
await zrx.mint(new BigNumber(150)).awaitTransactionSuccessAsync({ from: makerAddress2 });
await zrx
.approve(RFQ_ALLOWANCE_TARGET, new BigNumber(125))
.awaitTransactionSuccessAsync({ from: makerAddress2 });
balanceCheckerContract = await BalanceCheckerContract.deployFrom0xArtifactAsync(
artifacts.BalanceChecker,
provider,
{ from: deployer, gas: 10000000 },
{},
);
dbConnection = await initDBConnectionAsync();
// save some balance cache entities
const maker1 = new MakerBalanceChainCacheEntity();
maker1.makerAddress = makerAddress1;
maker1.tokenAddress = zrx.address;
maker1.timeFirstSeen = new Date();
const maker2 = new MakerBalanceChainCacheEntity();
maker2.makerAddress = makerAddress2;
maker2.tokenAddress = zrx.address;
maker2.timeFirstSeen = new Date();
balanceRepo = dbConnection.getRepository(MakerBalanceChainCacheEntity);
await balanceRepo.save([maker1, maker2]);
});
after(async () => {
await teardownDependenciesAsync(SUITE_NAME);
});
describe('runRfqBalanceCheckerAsync', () => {
it('correctly updates maker addresses', async () => {
await cacheRfqBalancesAsync(dbConnection, balanceCheckerContract, false, '');
const maker1 = await dbConnection
.getRepository(MakerBalanceChainCacheEntity)
.createQueryBuilder('maker_balance_chain_cache')
.where(
'maker_balance_chain_cache.makerAddress = :address AND maker_balance_chain_cache.tokenAddress = :token',
{ address: makerAddress1, token: zrx.address },
)
.getOne();
const maker2 = await dbConnection
.getRepository(MakerBalanceChainCacheEntity)
.createQueryBuilder('maker_balance_chain_cache')
.where(
'maker_balance_chain_cache.makerAddress = :address AND maker_balance_chain_cache.tokenAddress = :token',
{ address: makerAddress2, token: zrx.address },
)
.getOne();
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion -- TODO: fix me!
expect(maker1!.balance).to.be.deep.equal(new BigNumber(100));
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion -- TODO: fix me!
expect(maker2!.balance).to.be.deep.equal(new BigNumber(125));
});
});
});