Merge pull request #1612 from 0xProject/feature/pipeline/add-radar

[pipeline] Add Radar orders and maker_address column to token_orderbook_snapshots
This commit is contained in:
Francesco Agosti
2019-02-20 14:17:42 -08:00
committed by GitHub
22 changed files with 499 additions and 68 deletions

View File

@@ -2,7 +2,7 @@ import { logUtils } from '@0x/utils';
import * as R from 'ramda';
import { Connection, ConnectionOptions, createConnection } from 'typeorm';
import { DDEX_SOURCE, DdexMarket, DdexSource } from '../data_sources/ddex';
import { DdexMarket, DdexSource } from '../data_sources/ddex';
import { TokenOrderbookSnapshot as TokenOrder } from '../entities';
import * as ormConfig from '../ormconfig';
import { parseDdexOrders } from '../parsers/ddex_orders';
@@ -43,7 +43,7 @@ async function getAndSaveMarketOrderbookAsync(ddexSource: DdexSource, market: Dd
const observedTimestamp = Date.now();
logUtils.log(`${market.id}: Parsing orders.`);
const orders = parseDdexOrders(orderBook, market, observedTimestamp, DDEX_SOURCE);
const orders = parseDdexOrders(orderBook, market, observedTimestamp);
if (orders.length > 0) {
logUtils.log(`${market.id}: Saving ${orders.length} orders.`);

View File

@@ -2,7 +2,7 @@ import { logUtils } from '@0x/utils';
import * as R from 'ramda';
import { Connection, ConnectionOptions, createConnection } from 'typeorm';
import { IDEX_SOURCE, IdexSource } from '../data_sources/idex';
import { IdexSource } from '../data_sources/idex';
import { TokenOrderbookSnapshot as TokenOrder } from '../entities';
import * as ormConfig from '../ormconfig';
import { parseIdexOrders } from '../parsers/idex_orders';
@@ -51,7 +51,7 @@ async function getAndSaveMarketOrderbookAsync(idexSource: IdexSource, marketId:
}
logUtils.log(`${marketId}: Parsing orders.`);
const orders = parseIdexOrders(orderBook, observedTimestamp, IDEX_SOURCE);
const orders = parseIdexOrders(orderBook, observedTimestamp);
if (orders.length > 0) {
logUtils.log(`${marketId}: Saving ${orders.length} orders.`);

View File

@@ -2,7 +2,7 @@ import { logUtils } from '@0x/utils';
import * as R from 'ramda';
import { Connection, ConnectionOptions, createConnection } from 'typeorm';
import { OASIS_SOURCE, OasisMarket, OasisSource } from '../data_sources/oasis';
import { OasisMarket, OasisSource } from '../data_sources/oasis';
import { TokenOrderbookSnapshot as TokenOrder } from '../entities';
import * as ormConfig from '../ormconfig';
import { parseOasisOrders } from '../parsers/oasis_orders';
@@ -46,7 +46,7 @@ async function getAndSaveMarketOrderbookAsync(oasisSource: OasisSource, market:
const observedTimestamp = Date.now();
logUtils.log(`${market.id}: Parsing orders.`);
const orders = parseOasisOrders(orderBook, market, observedTimestamp, OASIS_SOURCE);
const orders = parseOasisOrders(orderBook, market, observedTimestamp);
if (orders.length > 0) {
logUtils.log(`${market.id}: Saving ${orders.length} orders.`);

View File

@@ -2,7 +2,6 @@ import { logUtils } from '@0x/utils';
import { Connection, ConnectionOptions, createConnection } from 'typeorm';
import {
PARADEX_SOURCE,
ParadexActiveMarketsResponse,
ParadexMarket,
ParadexSource,
@@ -75,7 +74,7 @@ async function getAndSaveMarketOrderbookAsync(paradexSource: ParadexSource, mark
const observedTimestamp = Date.now();
logUtils.log(`${market.symbol}: Parsing orders.`);
const orders = parseParadexOrders(paradexOrderbookResponse, market, observedTimestamp, PARADEX_SOURCE);
const orders = parseParadexOrders(paradexOrderbookResponse, market, observedTimestamp);
if (orders.length > 0) {
logUtils.log(`${market.symbol}: Saving ${orders.length} orders.`);

View File

@@ -0,0 +1,56 @@
import { logUtils } from '@0x/utils';
import { RadarMarket } from '@radarrelay/types';
import * as R from 'ramda';
import { Connection, ConnectionOptions, createConnection } from 'typeorm';
import { RadarSource } from '../data_sources/radar';
import { TokenOrderbookSnapshot as TokenOrder } from '../entities';
import * as ormConfig from '../ormconfig';
import { parseRadarOrders } from '../parsers/radar_orders';
import { handleError } from '../utils';
// Number of orders to save at once.
const BATCH_SAVE_SIZE = 1000;
// Number of markets to retrieve orderbooks for at once.
const MARKET_ORDERBOOK_REQUEST_BATCH_SIZE = 50;
// Delay between market orderbook requests.
const MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY = 5000;
let connection: Connection;
(async () => {
connection = await createConnection(ormConfig as ConnectionOptions);
const radarSource = new RadarSource();
const markets = await radarSource.getActiveMarketsAsync();
for (const marketsChunk of R.splitEvery(MARKET_ORDERBOOK_REQUEST_BATCH_SIZE, markets)) {
await Promise.all(
marketsChunk.map(async (market: RadarMarket) => getAndSaveMarketOrderbookAsync(radarSource, market)),
);
await new Promise<void>(resolve => setTimeout(resolve, MILLISEC_MARKET_ORDERBOOK_REQUEST_DELAY));
}
process.exit(0);
})().catch(handleError);
/**
* Retrieve orderbook from radar API for a given market. Parse orders and insert
* them into our database.
* @param radarSource Data source which can query radar API.
* @param market Object from radar API containing market data.
*/
async function getAndSaveMarketOrderbookAsync(radarSource: RadarSource, market: RadarMarket): Promise<void> {
const orderBook = await radarSource.getMarketOrderbookAsync(market.id);
const observedTimestamp = Date.now();
logUtils.log(`${market.id}: Parsing orders.`);
const orders = parseRadarOrders(orderBook, market, observedTimestamp);
if (orders.length > 0) {
logUtils.log(`${market.id}: Saving ${orders.length} orders.`);
const TokenOrderRepository = connection.getRepository(TokenOrder);
await TokenOrderRepository.save(orders, { chunk: Math.ceil(orders.length / BATCH_SAVE_SIZE) });
} else {
logUtils.log(`${market.id}: 0 orders to save.`);
}
}