Add script for parsing competing dex trades from Bloxy (#1355)

This commit is contained in:
Alex Browne
2018-11-29 11:40:09 -08:00
parent 3d211c415b
commit 7198b441e0
9 changed files with 498 additions and 4 deletions

View File

@@ -0,0 +1,41 @@
import { MigrationInterface, QueryRunner, Table } from 'typeorm';
const dexTrades = new Table({
name: 'raw.dex_trades',
columns: [
{ name: 'source_url', type: 'varchar', isPrimary: true },
{ name: 'tx_hash', type: 'varchar', isPrimary: true },
{ name: 'tx_timestamp', type: 'bigint' },
{ name: 'tx_date', type: 'varchar' },
{ name: 'tx_sender', type: 'varchar(42)' },
{ name: 'smart_contract_id', type: 'bigint' },
{ name: 'smart_contract_address', type: 'varchar(42)' },
{ name: 'contract_type', type: 'varchar' },
{ name: 'maker', type: 'varchar(42)' },
{ name: 'taker', type: 'varchar(42)' },
{ name: 'amount_buy', type: 'numeric' },
{ name: 'maker_fee_amount', type: 'numeric' },
{ name: 'buy_currency_id', type: 'bigint' },
{ name: 'buy_symbol', type: 'varchar' },
{ name: 'amount_sell', type: 'numeric' },
{ name: 'taker_fee_amount', type: 'numeric' },
{ name: 'sell_currency_id', type: 'bigint' },
{ name: 'sell_symbol', type: 'varchar' },
{ name: 'maker_annotation', type: 'varchar' },
{ name: 'taker_annotation', type: 'varchar' },
{ name: 'protocol', type: 'varchar' },
{ name: 'buy_address', type: 'varchar(42)', isNullable: true },
{ name: 'sell_address', type: 'varchar(42)', isNullable: true },
],
});
export class CreateDexTrades1543446690436 implements MigrationInterface {
public async up(queryRunner: QueryRunner): Promise<any> {
await queryRunner.createTable(dexTrades);
}
public async down(queryRunner: QueryRunner): Promise<any> {
await queryRunner.dropTable(dexTrades);
}
}

View File

@@ -0,0 +1,133 @@
import axios from 'axios';
import * as R from 'ramda';
// URL to use for getting dex trades from Bloxy.
export const BLOXY_DEX_TRADES_URL = 'https://bloxy.info/api/dex/trades';
// Number of trades to get at once. Must be less than or equal to MAX_OFFSET.
const TRADES_PER_QUERY = 10000;
// Maximum offset supported by the Bloxy API.
const MAX_OFFSET = 100000;
// Buffer to subtract from offset. This means we will request some trades twice
// but we have less chance on missing out on any data.
const OFFSET_BUFFER = 1000;
// Maximum number of days supported by the Bloxy API.
const MAX_DAYS = 30;
// Buffer used for comparing the last seen timestamp to the last returned
// timestamp. Increasing this reduces chances of data loss but also creates more
// redundancy and can impact performance.
// tslint:disable-next-line:custom-no-magic-numbers
const LAST_SEEN_TIMESTAMP_BUFFER_MS = 1000 * 60 * 30; // 30 minutes
// tslint:disable-next-line:custom-no-magic-numbers
const millisecondsPerDay = 1000 * 60 * 60 * 24; // ms/d = ms/s * s/m * m/h * h/d
export interface BloxyTrade {
tx_hash: string;
tx_time: string;
tx_date: string;
tx_sender: string;
smart_contract_id: number;
smart_contract_address: string;
contract_type: string;
maker: string;
taker: string;
amountBuy: number;
makerFee: number;
buyCurrencyId: number;
buySymbol: string;
amountSell: number;
takerFee: number;
sellCurrencyId: number;
sellSymbol: string;
maker_annotation: string;
taker_annotation: string;
protocol: string;
buyAddress: string | null;
sellAddress: string | null;
}
interface BloxyError {
error: string;
}
type BloxyResponse<T> = T | BloxyError;
type BloxyTradeResponse = BloxyResponse<BloxyTrade[]>;
function isError<T>(response: BloxyResponse<T>): response is BloxyError {
return (response as BloxyError).error !== undefined;
}
export class BloxySource {
private readonly _apiKey: string;
constructor(apiKey: string) {
this._apiKey = apiKey;
}
/**
* Gets all latest trades between the lastSeenTimestamp (minus some buffer)
* and the current time. Note that because the Bloxy API has some hard
* limits it might not always be possible to get *all* the trades in the
* desired time range.
* @param lastSeenTimestamp The latest timestamp for trades that have
* already been seen.
*/
public async getDexTradesAsync(lastSeenTimestamp: number): Promise<BloxyTrade[]> {
let allTrades: BloxyTrade[] = [];
// Clamp numberOfDays so that it is always between 1 and MAX_DAYS (inclusive)
const numberOfDays = R.clamp(1, MAX_DAYS, getDaysSinceTimestamp(lastSeenTimestamp));
// Keep getting trades until we hit one of the following conditions:
//
// 1. Offset hits MAX_OFFSET (we can't go back any further).
// 2. There are no more trades in the response.
// 3. We see a tx_time equal to or earlier than lastSeenTimestamp (plus
// some buffer).
//
for (let offset = 0; offset <= MAX_OFFSET; offset += TRADES_PER_QUERY - OFFSET_BUFFER) {
const trades = await this._getTradesWithOffsetAsync(numberOfDays, offset);
if (trades.length === 0) {
// There are no more trades left for the days we are querying.
// This means we are done.
return filterDuplicateTrades(allTrades);
}
const sortedTrades = R.reverse(R.sortBy(trade => trade.tx_time, trades));
allTrades = allTrades.concat(sortedTrades);
// Check if lastReturnedTimestamp < lastSeenTimestamp
const lastReturnedTimestamp = new Date(sortedTrades[0].tx_time).getTime();
if (lastReturnedTimestamp < lastSeenTimestamp - LAST_SEEN_TIMESTAMP_BUFFER_MS) {
// We are at the point where we have already seen trades for the
// timestamp range that is being returned. We're done.
return filterDuplicateTrades(allTrades);
}
}
return filterDuplicateTrades(allTrades);
}
private async _getTradesWithOffsetAsync(numberOfDays: number, offset: number): Promise<BloxyTrade[]> {
const resp = await axios.get<BloxyTradeResponse>(BLOXY_DEX_TRADES_URL, {
params: {
key: this._apiKey,
days: numberOfDays,
limit: TRADES_PER_QUERY,
offset,
},
});
if (isError(resp.data)) {
throw new Error('Error in Bloxy API response: ' + resp.data.error);
}
return resp.data;
}
}
// Computes the number of days between the given timestamp and the current
// timestamp (rounded up).
function getDaysSinceTimestamp(timestamp: number): number {
const msSinceTimestamp = Date.now() - timestamp;
const daysSinceTimestamp = msSinceTimestamp / millisecondsPerDay;
return Math.ceil(daysSinceTimestamp);
}
const filterDuplicateTrades = R.uniqBy((trade: BloxyTrade) => trade.tx_hash);

View File

@@ -0,0 +1,54 @@
import { BigNumber } from '@0x/utils';
import { Column, Entity, PrimaryColumn } from 'typeorm';
import { bigNumberTransformer, numberToBigIntTransformer } from '../utils';
@Entity({ name: 'dex_trades', schema: 'raw' })
export class DexTrade {
@PrimaryColumn({ name: 'source_url' })
public sourceUrl!: string;
@PrimaryColumn({ name: 'tx_hash' })
public txHash!: string;
@Column({ name: 'tx_timestamp', type: 'bigint', transformer: numberToBigIntTransformer })
public txTimestamp!: number;
@Column({ name: 'tx_date' })
public txDate!: string;
@Column({ name: 'tx_sender' })
public txSender!: string;
@Column({ name: 'smart_contract_id', type: 'bigint', transformer: numberToBigIntTransformer })
public smartContractId!: number;
@Column({ name: 'smart_contract_address' })
public smartContractAddress!: string;
@Column({ name: 'contract_type' })
public contractType!: string;
@Column({ type: 'varchar' })
public maker!: string;
@Column({ type: 'varchar' })
public taker!: string;
@Column({ name: 'amount_buy', type: 'numeric', transformer: bigNumberTransformer })
public amountBuy!: BigNumber;
@Column({ name: 'maker_fee_amount', type: 'numeric', transformer: bigNumberTransformer })
public makerFeeAmount!: BigNumber;
@Column({ name: 'buy_currency_id', type: 'bigint', transformer: numberToBigIntTransformer })
public buyCurrencyId!: number;
@Column({ name: 'buy_symbol' })
public buySymbol!: string;
@Column({ name: 'amount_sell', type: 'numeric', transformer: bigNumberTransformer })
public amountSell!: BigNumber;
@Column({ name: 'taker_fee_amount', type: 'numeric', transformer: bigNumberTransformer })
public takerFeeAmount!: BigNumber;
@Column({ name: 'sell_currency_id', type: 'bigint', transformer: numberToBigIntTransformer })
public sellCurrencyId!: number;
@Column({ name: 'sell_symbol' })
public sellSymbol!: string;
@Column({ name: 'maker_annotation' })
public makerAnnotation!: string;
@Column({ name: 'taker_annotation' })
public takerAnnotation!: string;
@Column() public protocol!: string;
@Column({ name: 'buy_address', type: 'varchar', nullable: true })
public buyAddress!: string | null;
@Column({ name: 'sell_address', type: 'varchar', nullable: true })
public sellAddress!: string | null;
}

View File

@@ -3,14 +3,15 @@ import { ExchangeCancelUpToEvent } from './exchange_cancel_up_to_event';
import { ExchangeFillEvent } from './exchange_fill_event';
export { Block } from './block';
export { DexTrade } from './dex_trade';
export { ExchangeCancelEvent } from './exchange_cancel_event';
export { ExchangeCancelUpToEvent } from './exchange_cancel_up_to_event';
export { ExchangeFillEvent } from './exchange_fill_event';
export { OHLCVExternal } from './ohlcv_external';
export { Relayer } from './relayer';
export { SraOrder } from './sra_order';
export { Transaction } from './transaction';
export { TokenMetadata } from './token_metadata';
export { SraOrdersObservedTimeStamp, createObservedTimestampForOrder } from './sra_order_observed_timestamp';
export { OHLCVExternal } from './ohlcv_external';
export { TokenMetadata } from './token_metadata';
export { Transaction } from './transaction';
export type ExchangeEvent = ExchangeFillEvent | ExchangeCancelEvent | ExchangeCancelUpToEvent;

View File

@@ -2,6 +2,7 @@ import { ConnectionOptions } from 'typeorm';
import {
Block,
DexTrade,
ExchangeCancelEvent,
ExchangeCancelUpToEvent,
ExchangeFillEvent,
@@ -14,6 +15,7 @@ import {
const entities = [
Block,
DexTrade,
ExchangeCancelEvent,
ExchangeCancelUpToEvent,
ExchangeFillEvent,
@@ -28,7 +30,7 @@ const config: ConnectionOptions = {
type: 'postgres',
url: process.env.ZEROEX_DATA_PIPELINE_DB_URL,
synchronize: false,
// logging: ['error'],
logging: ['error'],
entities,
migrations: ['./lib/migrations/**/*.js'],
};

View File

@@ -0,0 +1,53 @@
import { BigNumber } from '@0x/utils';
import * as R from 'ramda';
import { BLOXY_DEX_TRADES_URL, BloxyTrade } from '../../data_sources/bloxy';
import { DexTrade } from '../../entities';
/**
* Parses a raw trades response from the Bloxy Dex API and returns an array of
* DexTrade entities.
* @param rawTrades A raw order response from an SRA endpoint.
*/
export function parseBloxyTrades(rawTrades: BloxyTrade[]): DexTrade[] {
return R.map(_parseBloxyTrade, rawTrades);
}
/**
* Converts a single Bloxy trade into a DexTrade entity.
* @param rawTrade A single trade from the response from the Bloxy API.
*/
export function _parseBloxyTrade(rawTrade: BloxyTrade): DexTrade {
const dexTrade = new DexTrade();
dexTrade.sourceUrl = BLOXY_DEX_TRADES_URL;
dexTrade.txHash = rawTrade.tx_hash;
dexTrade.txTimestamp = new Date(rawTrade.tx_time).getTime();
dexTrade.txDate = rawTrade.tx_date;
dexTrade.txSender = rawTrade.tx_sender;
dexTrade.smartContractId = rawTrade.smart_contract_id;
dexTrade.smartContractAddress = rawTrade.smart_contract_address;
dexTrade.contractType = rawTrade.contract_type;
dexTrade.maker = rawTrade.maker;
dexTrade.taker = rawTrade.taker;
// TODO(albrow): The Bloxy API returns amounts and fees as a `number` type
// but some of their values have too many significant digits to be
// represented that way. Ideally they will switch to using strings and then
// we can update this code.
dexTrade.amountBuy = new BigNumber(rawTrade.amountBuy.toString());
dexTrade.makerFeeAmount = new BigNumber(rawTrade.makerFee.toString());
dexTrade.buyCurrencyId = rawTrade.buyCurrencyId;
dexTrade.buySymbol = filterNullCharacters(rawTrade.buySymbol);
dexTrade.amountSell = new BigNumber(rawTrade.amountSell.toString());
dexTrade.takerFeeAmount = new BigNumber(rawTrade.takerFee.toString());
dexTrade.sellCurrencyId = rawTrade.sellCurrencyId;
dexTrade.sellSymbol = filterNullCharacters(rawTrade.sellSymbol);
dexTrade.makerAnnotation = rawTrade.maker_annotation;
dexTrade.takerAnnotation = rawTrade.taker_annotation;
dexTrade.protocol = rawTrade.protocol;
dexTrade.buyAddress = rawTrade.buyAddress;
dexTrade.sellAddress = rawTrade.sellAddress;
return dexTrade;
}
// Works with any form of escaoed null character (e.g., '\0' and '\u0000').
const filterNullCharacters = R.replace(/\0/g, '');

View File

@@ -0,0 +1,51 @@
// tslint:disable:no-console
import 'reflect-metadata';
import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm';
import { BloxySource } from '../data_sources/bloxy';
import { DexTrade } from '../entities';
import * as ormConfig from '../ormconfig';
import { parseBloxyTrades } from '../parsers/bloxy';
import { handleError } from '../utils';
// Number of trades to save at once.
const BATCH_SAVE_SIZE = 1000;
let connection: Connection;
(async () => {
connection = await createConnection(ormConfig as ConnectionOptions);
await getAndSaveTrades();
process.exit(0);
})().catch(handleError);
async function getAndSaveTrades(): Promise<void> {
const apiKey = process.env.BLOXY_API_KEY;
if (apiKey === undefined) {
throw new Error('Missing required env var: BLOXY_API_KEY');
}
const bloxySource = new BloxySource(apiKey);
const tradesRepository = connection.getRepository(DexTrade);
const lastSeenTimestamp = await getLastSeenTimestampAsync(tradesRepository);
console.log(`Last seen timestamp: ${lastSeenTimestamp === 0 ? 'none' : lastSeenTimestamp}`);
console.log('Getting latest dex trades...');
const rawTrades = await bloxySource.getDexTradesAsync(lastSeenTimestamp);
console.log(`Parsing ${rawTrades.length} trades...`);
const trades = parseBloxyTrades(rawTrades);
console.log(`Saving ${trades.length} trades...`);
await tradesRepository.save(trades, { chunk: Math.ceil(trades.length / BATCH_SAVE_SIZE) });
console.log('Done saving trades.');
}
async function getLastSeenTimestampAsync(tradesRepository: Repository<DexTrade>): Promise<number> {
if ((await tradesRepository.count()) === 0) {
return 0;
}
const response = (await connection.query(
'SELECT tx_timestamp FROM raw.dex_trades ORDER BY tx_timestamp DESC LIMIT 1',
)) as Array<{ tx_timestamp: number }>;
if (response.length === 0) {
return 0;
}
return response[0].tx_timestamp;
}

View File

@@ -0,0 +1,60 @@
import { BigNumber } from '@0x/utils';
import 'mocha';
import * as R from 'ramda';
import 'reflect-metadata';
import { DexTrade } from '../../src/entities';
import { createDbConnectionOnceAsync } from '../db_setup';
import { chaiSetup } from '../utils/chai_setup';
import { testSaveAndFindEntityAsync } from './util';
chaiSetup.configure();
const baseTrade = {
sourceUrl: 'https://bloxy.info/api/dex/trades',
txTimestamp: 1543447585938,
txDate: '2018-11-21',
txSender: '0x00923b9a074762b93650716333b3e1473a15048e',
smartContractId: 7091917,
smartContractAddress: '0x818e6fecd516ecc3849daf6845e3ec868087b755',
contractType: 'DEX/Kyber Network Proxy',
maker: '0xbf2179859fc6d5bee9bf9158632dc51678a4100c',
taker: '0xbf2179859fc6d5bee9bf9158632dc51678a4100d',
amountBuy: new BigNumber('1.011943163078103'),
makerFeeAmount: new BigNumber(0),
buyCurrencyId: 1,
buySymbol: 'ETH',
amountSell: new BigNumber('941.4997928436911'),
takerFeeAmount: new BigNumber(0),
sellCurrencyId: 16610,
sellSymbol: 'ELF',
makerAnnotation: '',
takerAnnotation: '',
protocol: 'Kyber Network Proxy',
sellAddress: '0xbf2179859fc6d5bee9bf9158632dc51678a4100e',
};
const tradeWithNullAddresses: DexTrade = R.merge(baseTrade, {
txHash: '0xb93a7faf92efbbb5405c9a73cd4efd99702fe27c03ff22baee1f1b1e37b3a0bf',
buyAddress: '0xbf2179859fc6d5bee9bf9158632dc51678a4100e',
sellAddress: '0xbf2179859fc6d5bee9bf9158632dc51678a4100f',
});
const tradeWithNonNullAddresses: DexTrade = R.merge(baseTrade, {
txHash: '0xb93a7faf92efbbb5405c9a73cd4efd99702fe27c03ff22baee1f1b1e37b3a0be',
buyAddress: null,
sellAddress: null,
});
// tslint:disable:custom-no-magic-numbers
describe('DexTrade entity', () => {
it('save/find', async () => {
const connection = await createDbConnectionOnceAsync();
const trades = [tradeWithNullAddresses, tradeWithNonNullAddresses];
const tradesRepository = connection.getRepository(DexTrade);
for (const trade of trades) {
await testSaveAndFindEntityAsync(tradesRepository, trade);
}
});
});

View File

@@ -0,0 +1,99 @@
// tslint:disable:custom-no-magic-numbers
import { BigNumber } from '@0x/utils';
import * as chai from 'chai';
import 'mocha';
import * as R from 'ramda';
import { BLOXY_DEX_TRADES_URL, BloxyTrade } from '../../../src/data_sources/bloxy';
import { DexTrade } from '../../../src/entities';
import { _parseBloxyTrade } from '../../../src/parsers/bloxy';
import { _convertToExchangeFillEvent } from '../../../src/parsers/events';
import { chaiSetup } from '../../utils/chai_setup';
chaiSetup.configure();
const expect = chai.expect;
const baseInput: BloxyTrade = {
tx_hash: '0xb93a7faf92efbbb5405c9a73cd4efd99702fe27c03ff22baee1f1b1e37b3a0bf',
tx_time: '2018-11-21T09:06:28.000+00:00',
tx_date: '2018-11-21',
tx_sender: '0x00923b9a074762b93650716333b3e1473a15048e',
smart_contract_id: 7091917,
smart_contract_address: '0x818e6fecd516ecc3849daf6845e3ec868087b755',
contract_type: 'DEX/Kyber Network Proxy',
maker: '0x0000000000000000000000000000000000000001',
taker: '0x0000000000000000000000000000000000000002',
amountBuy: 1.011943163078103,
makerFee: 38.912083,
buyCurrencyId: 1,
buySymbol: 'ETH',
amountSell: 941.4997928436911,
takerFee: 100.39,
sellCurrencyId: 16610,
sellSymbol: 'ELF',
maker_annotation: 'random annotation',
taker_annotation: 'random other annotation',
protocol: 'Kyber Network Proxy',
buyAddress: '0xbf2179859fc6d5bee9bf9158632dc51678a4100d',
sellAddress: '0xbf2179859fc6d5bee9bf9158632dc51678a4100e',
};
const baseExpected: DexTrade = {
sourceUrl: BLOXY_DEX_TRADES_URL,
txHash: '0xb93a7faf92efbbb5405c9a73cd4efd99702fe27c03ff22baee1f1b1e37b3a0bf',
txTimestamp: 1542791188000,
txDate: '2018-11-21',
txSender: '0x00923b9a074762b93650716333b3e1473a15048e',
smartContractId: 7091917,
smartContractAddress: '0x818e6fecd516ecc3849daf6845e3ec868087b755',
contractType: 'DEX/Kyber Network Proxy',
maker: '0x0000000000000000000000000000000000000001',
taker: '0x0000000000000000000000000000000000000002',
amountBuy: new BigNumber('1.011943163078103'),
makerFeeAmount: new BigNumber('38.912083'),
buyCurrencyId: 1,
buySymbol: 'ETH',
amountSell: new BigNumber('941.4997928436911'),
takerFeeAmount: new BigNumber('100.39'),
sellCurrencyId: 16610,
sellSymbol: 'ELF',
makerAnnotation: 'random annotation',
takerAnnotation: 'random other annotation',
protocol: 'Kyber Network Proxy',
buyAddress: '0xbf2179859fc6d5bee9bf9158632dc51678a4100d',
sellAddress: '0xbf2179859fc6d5bee9bf9158632dc51678a4100e',
};
interface TestCase {
input: BloxyTrade;
expected: DexTrade;
}
const testCases: TestCase[] = [
{
input: baseInput,
expected: baseExpected,
},
{
input: R.merge(baseInput, { buyAddress: null, sellAddress: null }),
expected: R.merge(baseExpected, { buyAddress: null, sellAddress: null }),
},
{
input: R.merge(baseInput, {
buySymbol:
'RING\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000',
}),
expected: R.merge(baseExpected, { buySymbol: 'RING' }),
},
];
describe('bloxy', () => {
describe('_parseBloxyTrade', () => {
for (const [i, testCase] of testCases.entries()) {
it(`converts BloxyTrade to DexTrade entity (${i + 1}/${testCases.length})`, () => {
const actual = _parseBloxyTrade(testCase.input);
expect(actual).deep.equal(testCase.expected);
});
}
});
});