Rebase pipeline branch off development

This commit is contained in:
Alex Browne
2018-09-17 11:27:38 -07:00
parent c43ba6b3c7
commit 57e7119c0d
26 changed files with 2667 additions and 68 deletions

View File

@@ -0,0 +1,258 @@
import * as commandLineArgs from 'command-line-args';
import { postgresClient } from '../postgres';
import { formatters } from '../utils';
const tableQueries: any = {
events_full: `CREATE TABLE IF NOT EXISTS events_full (
timestamp TIMESTAMP WITH TIME ZONE,
event_type VARCHAR,
error_id VARCHAR,
order_hash CHAR(66),
maker CHAR(42),
maker_amount NUMERIC(78),
maker_fee NUMERIC(78),
maker_token CHAR(42),
taker CHAR(42),
taker_amount NUMERIC(78),
taker_fee NUMERIC(78),
taker_token CHAR(42),
txn_hash CHAR(66),
gas_used NUMERIC(78),
gas_price NUMERIC(78),
fee_recipient CHAR(42),
method_id CHAR(10),
salt VARCHAR,
block_number BIGINT,
log_index BIGINT,
taker_symbol VARCHAR,
taker_name VARCHAR,
taker_decimals BIGINT,
taker_usd_price NUMERIC(78),
taker_txn_usd_value NUMERIC(78),
maker_symbol VARCHAR,
maker_name VARCHAR,
maker_decimals BIGINT,
maker_usd_price NUMERIC(78),
maker_txn_usd_value NUMERIC(78),
PRIMARY KEY (txn_hash, order_hash, log_index)
)`,
events: `CREATE TABLE IF NOT EXISTS events (
timestamp TIMESTAMP WITH TIME ZONE,
event_type VARCHAR,
error_id VARCHAR,
order_hash CHAR(66),
maker CHAR(42),
maker_amount NUMERIC(78),
maker_fee NUMERIC(78),
maker_token CHAR(42),
taker CHAR(42),
taker_amount NUMERIC(78),
taker_fee NUMERIC(78),
taker_token CHAR(42),
txn_hash CHAR(66),
gas_used NUMERIC(78),
gas_price NUMERIC(78),
fee_recipient CHAR(42),
method_id CHAR(10),
salt VARCHAR,
block_number BIGINT,
log_index BIGINT,
PRIMARY KEY (txn_hash, order_hash, log_index)
)`,
events_staging: `CREATE TABLE IF NOT EXISTS events_staging (
timestamp TIMESTAMP WITH TIME ZONE,
event_type VARCHAR,
error_id VARCHAR,
order_hash CHAR(66),
maker CHAR(42),
maker_amount NUMERIC(78),
maker_fee NUMERIC(78),
maker_token CHAR(42),
taker CHAR(42),
taker_amount NUMERIC(78),
taker_fee NUMERIC(78),
taker_token CHAR(42),
txn_hash CHAR(66),
fee_recipient CHAR(42),
block_number BIGINT,
log_index BIGINT,
PRIMARY KEY (txn_hash, order_hash, log_index)
)`,
events_raw: `CREATE TABLE IF NOT EXISTS events_raw (
event_type VARCHAR,
error_id VARCHAR,
order_hash CHAR(66),
maker CHAR(42),
maker_amount NUMERIC(78),
maker_fee NUMERIC(78),
maker_token CHAR(42),
taker CHAR(42),
taker_amount NUMERIC(78),
taker_fee NUMERIC(78),
taker_token CHAR(42),
txn_hash CHAR(66),
fee_recipient CHAR(42),
block_number BIGINT,
log_index BIGINT,
PRIMARY KEY (txn_hash, order_hash, log_index)
)`,
blocks: `CREATE TABLE IF NOT EXISTS blocks (
timestamp TIMESTAMP WITH TIME ZONE,
block_hash CHAR(66) UNIQUE,
block_number BIGINT,
PRIMARY KEY (block_hash)
)`,
transactions: `CREATE TABLE IF NOT EXISTS transactions (
txn_hash CHAR(66) UNIQUE,
block_hash CHAR(66),
block_number BIGINT,
gas_used NUMERIC(78),
gas_price NUMERIC(78),
method_id CHAR(10),
salt VARCHAR,
PRIMARY KEY (txn_hash)
)`,
tokens: `CREATE TABLE IF NOT EXISTS tokens (
address CHAR(42) UNIQUE,
name VARCHAR,
symbol VARCHAR,
decimals INT,
PRIMARY KEY (address)
)`,
prices: `CREATE TABLE IF NOT EXISTS prices (
address CHAR(42) UNIQUE,
timestamp TIMESTAMP WITH TIME ZONE,
price NUMERIC(78, 18),
PRIMARY KEY (address, timestamp)
)`,
relayers: `CREATE TABLE IF NOT EXISTS relayers (
name VARCHAR UNIQUE,
url VARCHAR DEFAULT '',
sra_http_endpoint VARCHAR DEFAULT '',
sra_ws_endpoint VARCHAR DEFAULT '',
fee_recipient_addresses CHAR(42)[] DEFAULT '{}',
taker_addresses CHAR(42)[] DEFAULT '{}',
PRIMARY KEY(name)`,
historical_prices: `CREATE TABLE IF NOT EXISTS historical_prices (
token VARCHAR,
base VARCHAR,
timestamp TIMESTAMP WITH TIME ZONE,
close NUMERIC(78, 18),
high NUMERIC(78, 18),
low NUMERIC(78, 18),
open NUMERIC(78, 18),
volume_from NUMERIC(78, 18),
volume_to NUMERIC(78, 18),
PRIMARY KEY (token, base, timestamp)
)`,
orders: `CREATE TABLE IF NOT EXISTS orders (
relayer_id VARCHAR,
exchange_contract_address CHAR(42),
maker CHAR(42),
maker_amount NUMERIC(78),
maker_fee NUMERIC(78),
maker_token CHAR(42),
taker CHAR(42),
taker_amount NUMERIC(78),
taker_fee NUMERIC(78),
taker_token CHAR(42),
fee_recipient CHAR(42),
expiration_unix_timestamp_sec NUMERIC(78),
salt VARCHAR,
order_hash CHAR(66),
PRIMARY KEY (relayer_id, order_hash)
)`,
};
function _safeQuery(query: string): any {
return new Promise((resolve, reject) => {
postgresClient
.query(query)
.then((data: any) => {
resolve(data);
})
.catch((err: any) => {
reject(err);
});
});
}
export const tableScripts = {
createTable(query: string): any {
return _safeQuery(query);
},
createAllTables(): any {
for (const tableName of tableQueries) {
_safeQuery(tableQueries[tableName]);
}
},
};
export const insertDataScripts = {
insertSingleRow(table: string, object: any): any {
return new Promise((resolve, reject) => {
const columns = Object.keys(object);
const safeArray: any = [];
for (const key of columns) {
if (key in object) {
if (key === 'timestamp') {
safeArray.push('to_timestamp(' + object[key] + ')');
} else if (typeof object[key] === 'string' || object[key] instanceof String) {
safeArray.push(formatters.escapeSQLParam(object[key]));
} else {
safeArray.push(object[key]);
}
} else {
safeArray.push('default');
}
}
const queryString = `INSERT INTO ${table} (${columns}) VALUES (${safeArray}) ON CONFLICT DO NOTHING`;
console.log(queryString);
postgresClient
.query(queryString)
.then((data: any) => {
resolve(data);
})
.catch((err: any) => {
reject(err);
});
});
},
insertMultipleRows(table: string, rows: any[], columns: any[]): any {
return new Promise((resolve, reject) => {
if (rows.length > 0) {
const rowsSplit = rows.map((value, index) => {
const safeArray: any = [];
for (const key of columns) {
if (key in value) {
if (key === 'timestamp') {
safeArray.push('to_timestamp(' + value[key] + ')');
} else if (typeof value[key] === 'string' || value[key] instanceof String) {
safeArray.push(formatters.escapeSQLParam(value[key]));
} else if (value[key] instanceof Array) {
const escapedArray = value[key].map((subValue: string, subIndex: number) => {
return formatters.escapeSQLParam(subValue);
});
safeArray.push('ARRAY[' + escapedArray.toString() + ']');
} else {
safeArray.push(value[key]);
}
} else {
safeArray.push('default');
}
}
return '(' + safeArray + ')';
});
const queryString = `INSERT INTO ${table} (${columns}) VALUES ${rowsSplit} ON CONFLICT DO NOTHING`;
postgresClient
.query(queryString)
.then((data: any) => {
resolve(data);
})
.catch((err: any) => {
// console.log(err);
reject(err);
});
} else {
resolve({});
}
});
},
};

View File

@@ -0,0 +1,234 @@
import * as commandLineArgs from 'command-line-args';
import { postgresClient } from '../postgres';
import { formatters } from '../utils';
const optionDefinitions = [
{ name: 'name', alias: 'n', type: String },
{ name: 'from', alias: 'f', type: Number },
{ name: 'to', alias: 't', type: Number },
];
const cli = commandLineArgs(optionDefinitions);
const dataInsertionQueries: any = {
events_staging: `INSERT INTO events_staging (
timestamp,
event_type,
error_id,
order_hash,
maker,
maker_amount,
maker_fee,
maker_token,
taker,
taker_amount,
taker_fee,
taker_token,
txn_hash,
fee_recipient,
block_number,
log_index
)
(SELECT
b.timestamp,
a.event_type,
a.error_id,
a.order_hash,
a.maker,
a.maker_amount,
a.maker_fee,
a.maker_token,
a.taker,
a.taker_amount,
a.taker_fee,
a.taker_token,
a.txn_hash,
a.fee_recipient,
a.block_number,
a.log_index
FROM
events_raw a
JOIN
blocks b
ON
a.block_number = b.block_number
AND
b.block_number >= $1
AND
b.block_number <= $2
) ON CONFLICT (order_hash, txn_hash, log_index) DO NOTHING`,
events: `INSERT INTO events (
timestamp,
event_type,
error_id,
order_hash,
maker,
maker_amount,
maker_fee,
maker_token,
taker,
taker_amount,
taker_fee,
taker_token,
txn_hash,
fee_recipient,
block_number,
log_index,
gas_used,
gas_price,
method_id,
salt
)
(SELECT
a.timestamp,
a.event_type,
a.error_id,
a.order_hash,
a.maker,
a.maker_amount,
a.maker_fee,
a.maker_token,
a.taker,
a.taker_amount,
a.taker_fee,
a.taker_token,
a.txn_hash,
a.fee_recipient,
a.block_number,
a.log_index,
b.gas_used,
b.gas_price,
b.method_id,
b.salt
FROM
events_staging a
JOIN
transactions b
ON
a.txn_hash = b.txn_hash
AND
a.block_number >= $1
AND
a.block_number <= $2
) ON CONFLICT (order_hash, txn_hash, log_index) DO NOTHING`,
events_full: `
INSERT INTO events_full (
timestamp,
event_type,
error_id,
order_hash,
maker,
maker_amount,
maker_fee,
maker_token,
taker,
taker_amount,
taker_fee,
taker_token,
txn_hash,
fee_recipient,
block_number,
log_index,
gas_used,
gas_price,
method_id,
salt,
taker_symbol,
taker_name,
taker_decimals,
taker_usd_price,
taker_txn_usd_value,
maker_symbol,
maker_name,
maker_decimals,
maker_usd_price,
maker_txn_usd_value
)
(SELECT
events.timestamp,
events.event_type,
events.error_id,
events.order_hash,
events.maker,
events.maker_amount,
events.maker_fee,
events.maker_token,
events.taker,
events.taker_amount,
events.taker_fee,
events.taker_token,
events.txn_hash,
events.fee_recipient,
events.block_number,
events.log_index,
events.gas_used,
events.gas_price,
events.method_id,
events.salt,
taker_token_prices.symbol,
taker_token_prices.name,
taker_token_prices.decimals,
taker_token_prices.price,
(events.taker_amount / (10 ^ taker_token_prices.decimals)) * taker_token_prices.price,
maker_token_prices.symbol,
maker_token_prices.name,
maker_token_prices.decimals,
maker_token_prices.price,
(events.maker_amount / (10 ^ maker_token_prices.decimals)) * maker_token_prices.price
FROM
events
LEFT JOIN
(SELECT
tokens.address,
tokens.name,
tokens.symbol,
tokens.decimals,
prices.timestamp,
prices.price
FROM
tokens
LEFT JOIN
prices
ON
tokens.symbol = prices.symbol) taker_token_prices
ON
(events.taker_token = taker_token_prices.address
AND
(DATE(events.timestamp) = DATE(taker_token_prices.timestamp) OR taker_token_prices.timestamp IS NULL))
LEFT JOIN
(SELECT
tokens.address,
tokens.name,
tokens.symbol,
tokens.decimals,
prices.timestamp,
prices.price
FROM
tokens
LEFT JOIN
prices
ON
tokens.symbol = prices.symbol) maker_token_prices
ON
(events.maker_token = maker_token_prices.address
AND
(DATE(events.timestamp) = DATE(maker_token_prices.timestamp) OR maker_token_prices.timestamp IS NULL))
WHERE
events.block_number >= $1
AND
events.block_number <= $2
) ON CONFLICT (order_hash, txn_hash, log_index) DO NOTHING`,
};
if (cli.name) {
const query = dataInsertionQueries[cli.name];
if (query && cli.from) {
const fromBlock = cli.from;
const toBlock = cli.to ? cli.to : cli.from + 1;
postgresClient
.query(query, [fromBlock, toBlock])
.then((data: any) => {
console.log(data);
})
.catch((err: any) => {
console.error(err);
});
}
}

View File

@@ -0,0 +1,87 @@
import { formatters } from '../utils';
export const dataFetchingQueries: any = {
get_missing_txn_hashes: `
SELECT
a.txn_hash
FROM
events_raw a
WHERE NOT EXISTS
(
SELECT
*
FROM
transactions b
WHERE
b.txn_hash = a.txn_hash
)
AND
a.block_number >= $1
AND
a.block_number < $2`,
get_used_block_numbers: `
SELECT DISTINCT
a.block_number
FROM
events_raw a
WHERE NOT EXISTS
(
SELECT
*
FROM
blocks b
WHERE
b.block_number = a.block_number
)
AND
a.block_number >= $1
AND
a.block_number < $2`,
get_token_registry: `
SELECT
*
FROM
tokens`,
get_max_block: `
SELECT
MAX(block_number)
FROM
events_raw`,
get_relayers: `
SELECT
*
FROM
relayers`,
get_most_recent_pricing_date: `
SELECT
MAX(DATE(timestamp))
FROM
prices
`,
get_top_unknown_token_addresses: `
SELECT a.token_address as address, a.txn_value / 2 as total_txn_value
FROM
(SELECT token_address, SUM(txn_value) as txn_value
FROM
(select a.timestamp, a.maker_token as token_address, (CASE WHEN a.taker_txn_usd_value > a.maker_txn_usd_value OR a.maker_txn_usd_value IS NULL
THEN a.taker_txn_usd_value
ELSE a.maker_txn_usd_value END) as txn_value
from events_full a
where a.event_type = 'LogFill'
and a.timestamp > (NOW() + INTERVAL '-24 hours')
union
select a.timestamp, a.taker_token as token_address, (CASE WHEN a.taker_txn_usd_value > a.maker_txn_usd_value OR a.maker_txn_usd_value IS NULL
THEN a.taker_txn_usd_value
ELSE a.maker_txn_usd_value END) as txn_value
from events_full a
where a.event_type = 'LogFill'
and a.timestamp > (NOW() + INTERVAL '-24 hours')) token_txn_values
WHERE token_address IS NOT NULL
AND txn_value > 0
GROUP BY 1
ORDER BY 2 DESC) a
LEFT JOIN tokens b
ON a.token_address = b.address
WHERE symbol is NULL
ORDER BY 2 DESC
`,
};

View File

@@ -0,0 +1,649 @@
import { ExchangeEvents, ZeroEx } from '0x.js';
import { HttpClient, Order, OrderbookRequest, OrderbookResponse, TokenPairsItem } from '@0xproject/connect';
import * as Airtable from 'airtable';
import * as commandLineArgs from 'command-line-args';
import * as _ from 'lodash';
import * as querystring from 'querystring';
import * as queue from 'queue';
import * as request from 'request';
import * as rpn from 'request-promise-native';
import { HttpRequestOptions } from '../../../connect/lib/src/types.js';
import { relayer } from '../models/relayer.js';
import { token } from '../models/tokens.js';
import { postgresClient } from '../postgres.js';
import { typeConverters } from '../utils.js';
import { web3, zrx } from '../zrx.js';
import { insertDataScripts } from './create_tables.js';
import { dataFetchingQueries } from './query_data.js';
const optionDefinitions = [
{ name: 'from', alias: 'f', type: Number },
{ name: 'to', alias: 't', type: Number },
{ name: 'type', type: String },
{ name: 'id', type: String },
{ name: 'force', type: Boolean },
{ name: 'token', type: String },
];
const cli = commandLineArgs(optionDefinitions);
const q = queue({ concurrency: 6, autostart: true });
const airtableBase = new Airtable({ apiKey: process.env.AIRTABLE_API_KEY }).base(process.env.AIRTABLE_0X_BASE);
const BLOCK_INCREMENTS = 1000;
const BASE_SYMBOL = 'USD'; // use USD as base currency against
const API_HIST_LIMIT = 2000; // cryptocompare API limits histoday price query to 2000 days
const SECONDS_PER_DAY = 86400;
const PRICE_API_ENDPOINT = 'https://min-api.cryptocompare.com/data/pricehistorical';
const RELAYER_REGISTRY_JSON = 'https://raw.githubusercontent.com/0xProject/0x-relayer-registry/master/relayers.json';
const METAMASK_ETH_CONTRACT_METADATA_JSON =
'https://raw.githubusercontent.com/MetaMask/eth-contract-metadata/master/contract-map.json';
const ETHPLORER_BASE_URL = 'http://api.ethplorer.io';
const ETHPLORER_TOP_TOKENS_JSON = `${ETHPLORER_BASE_URL}/getTopTokens?apiKey=dyijm5418TjOJe34`;
// const HIST_PRICE_API_ENDPOINT = 'https://min-api.cryptocompare.com/data/histoday';
const AIRTABLE_RELAYER_INFO = 'Relayer Info';
export const pullDataScripts = {
getAllEvents(fromBlockNumber: number, toBlockNumber: number): any {
return new Promise((resolve, reject) => {
const getLogsPromises: any[] = [];
getLogsPromises.push(
zrx.exchange.getLogsAsync(
ExchangeEvents.LogFill,
{ fromBlock: fromBlockNumber, toBlock: toBlockNumber },
{},
),
zrx.exchange.getLogsAsync(
ExchangeEvents.LogCancel,
{ fromBlock: fromBlockNumber, toBlock: toBlockNumber },
{},
),
zrx.exchange.getLogsAsync(
ExchangeEvents.LogError,
{ fromBlock: fromBlockNumber, toBlock: toBlockNumber },
{},
),
);
Promise.all(getLogsPromises)
.then((data: any[]) => {
resolve(data);
})
.catch((err: any) => {
reject(err);
});
});
},
getBlockInfo(blockNumber: number): any {
return new Promise((resolve, reject) => {
web3.eth.getBlock(blockNumber, (err, result) => {
if (err) {
reject(err);
} else {
resolve(result);
}
});
});
},
getTransactionInfo(transactionHash: string): any {
return new Promise((resolve, reject) => {
web3.eth.getTransaction(transactionHash, (err, result) => {
if (err) {
reject(err);
} else {
resolve(result);
}
});
});
},
getTokenRegistry(): any {
return new Promise((resolve, reject) => {
zrx.tokenRegistry
.getTokensAsync()
.then((data: any) => {
resolve(data);
})
.catch((err: any) => {
reject(err);
});
});
},
getMetaMaskTokens(): any {
return new Promise((resolve, reject) => {
request(METAMASK_ETH_CONTRACT_METADATA_JSON, (error, response, body) => {
if (error) {
reject(error);
} else {
resolve(JSON.parse(body));
}
});
});
},
getEthplorerTopTokens(): any {
return new Promise((resolve, reject) => {
request(ETHPLORER_TOP_TOKENS_JSON, (error, response, body) => {
if (error) {
reject(error);
} else {
resolve(JSON.parse(body));
}
});
});
},
getEthplorerToken(tokenAddress: string): any {
return new Promise((resolve, reject) => {
const url = `${ETHPLORER_BASE_URL}/getTokenInfo/${tokenAddress}?apiKey=dyijm5418TjOJe34`;
request(url, (error, response, body) => {
if (error) {
reject(error);
} else {
try {
const json = JSON.parse(body);
resolve(json);
} catch (err) {
resolve({ error: 'error' });
}
}
});
});
},
getPriceData(symbol: string, timestamp: number, timeDelay?: number): any {
return new Promise((resolve, reject) => {
if (symbol === 'WETH') {
symbol = 'ETH';
}
let parsedParams = querystring.stringify({
fsym: symbol,
tsyms: 'USD',
ts: timestamp / 1000,
});
console.debug(parsedParams);
setTimeout(() => {
request(PRICE_API_ENDPOINT + '?' + parsedParams, (error, response, body) => {
if (error) {
reject(error);
} else {
resolve(JSON.parse(body));
}
});
}, timeDelay);
});
},
getRelayers(): any {
return new Promise((resolve, reject) => {
request(RELAYER_REGISTRY_JSON, (error, response, body) => {
if (error) {
reject(error);
} else {
resolve(JSON.parse(body));
}
});
});
},
async getOrderBook(sraEndpoint: string): Promise<Object> {
const relayerClient = new HttpClient(sraEndpoint);
const tokenResponse: TokenPairsItem[] = await relayerClient.getTokenPairsAsync();
const fullOrderBook: OrderbookResponse[] = [];
for (const tokenPair of tokenResponse) {
const orderBookRequest: OrderbookRequest = {
baseTokenAddress: tokenPair.tokenA.address,
quoteTokenAddress: tokenPair.tokenB.address,
};
const orderBook: OrderbookResponse = await relayerClient.getOrderbookAsync(orderBookRequest);
fullOrderBook.push(orderBook);
}
return fullOrderBook;
},
// async getHistoricalPrices(
// fromSymbol: string,
// toSymbol: string,
// fromTimestamp: number,
// toTimestamp: number,
// ): Promise<HistoricalPriceResponse> {
// const daysInQueryPeriod = Math.round((toTimestamp - fromTimestamp) / (SECONDS_PER_DAY));
// if(fromSymbol === 'WETH') {
// fromSymbol = 'ETH';
// }
// var parsedParams = {
// fsym: fromSymbol,
// tsym: toSymbol,
// limit: Math.min(daysInQueryPeriod, API_HIST_LIMIT),
// toTs: toTimestamp,
// };
// var options = {
// uri: HIST_PRICE_API_ENDPOINT,
// qs: parsedParams,
// json: false,
// };
// try {
// const response = await rpn(options);
// return Promise.resolve(JSON.parse(response));
// } catch (error) {
// console.debug(error);
// return Promise.reject(error);
// }
// },
};
export const scrapeDataScripts = {
scrapeAllPricesToDB(fromTime: number, toTime: number) {
const fromDate = new Date(fromTime);
fromDate.setUTCHours(0);
fromDate.setUTCMinutes(0);
fromDate.setUTCSeconds(0);
fromDate.setUTCMilliseconds(0);
const toDate = new Date(toTime);
postgresClient
.query(dataFetchingQueries.get_token_registry, [])
.then((result: any) => {
for (const curDate = fromDate; curDate < toDate; curDate.setDate(curDate.getDate() + 1)) {
for (const token of Object.values(result.rows)) {
console.debug('Scraping ' + curDate + ' ' + token);
q.push(_scrapePriceToDB(curDate.getTime(), token, 500));
}
}
})
.catch((err: any) => {
console.debug(err);
});
},
};
function _scrapeEventsToDB(fromBlock: number, toBlock: number): any {
return (cb: () => void) => {
pullDataScripts
.getAllEvents(fromBlock, toBlock)
.then((data: any) => {
const parsedEvents: any = {};
parsedEvents[ExchangeEvents.LogFill] = [];
parsedEvents[ExchangeEvents.LogCancel] = [];
parsedEvents[ExchangeEvents.LogError] = [];
for (const index in data) {
for (const datum of data[index]) {
const event = typeConverters.convertLogEventToEventObject(datum);
parsedEvents[event.event_type].push(event);
}
}
console.log(fromBlock + ' : ' + toBlock + ' ' + parsedEvents[ExchangeEvents.LogFill].length);
for (const event_type in parsedEvents) {
if (parsedEvents[event_type].length > 0) {
insertDataScripts
.insertMultipleRows(
'events_raw',
parsedEvents[event_type],
Object.keys(parsedEvents[event_type][0]),
)
.then(() => {})
.catch((error: any) => {});
}
}
cb();
})
.catch((err: any) => {
cb();
});
};
}
function _scrapeBlockToDB(block: number): any {
return (cb: () => void) => {
pullDataScripts
.getBlockInfo(block)
.then((data: any) => {
const parsedBlock = typeConverters.convertLogBlockToBlockObject(data);
insertDataScripts
.insertSingleRow('blocks', parsedBlock)
.then((result: any) => {
cb();
})
.catch((err: any) => {
cb();
});
})
.catch((err: any) => {
cb();
});
};
}
// function _scrapeAllRelayersToDB(): any {
// return (cb: () => void) => {
// airtableBase(AIRTABLE_RELAYER_INFO)
// .select()
// .eachPage((records: any, fetchNextPage: () => void) => {
// const parsedRelayers: any[] = [];
// for(const record of records) {
// parsedRelayers.push(typeConverters.convertRelayerToRelayerObject(record));
// }
// insertDataScripts.insertMultipleRows('relayers', parsedRelayers, Object.keys(parsedRelayers[0]))
// .then((result: any) => {
// cb();
// })
// .catch((err: any) => {
// cb();
// });
// })
// .catch((err: any) => {
// cb();
// });
// };
// }
function _scrapeAllRelayersToDB(): any {
return (cb: () => void) => {
pullDataScripts
.getRelayers()
.then((relayers: any[]) => {
console.log(relayers);
const parsedRelayers: any[] = [];
for (const relayer of relayers) {
parsedRelayers.push(typeConverters.convertRelayerToRelayerObject(relayer));
}
console.log(parsedRelayers);
insertDataScripts
.insertMultipleRows('relayers', parsedRelayers, Object.keys(relayer.tableProperties))
.then((result: any) => {
console.log(result);
cb();
})
.catch((err: any) => {
console.log(err);
cb();
});
})
.catch((err: any) => {
cb();
});
};
}
function _scrapeTransactionToDB(transactionHash: string): any {
return (cb: () => void) => {
pullDataScripts
.getTransactionInfo(transactionHash)
.then((data: any) => {
const parsedTransaction = typeConverters.convertLogTransactionToTransactionObject(data);
insertDataScripts
.insertSingleRow('transactions', parsedTransaction)
.then((result: any) => {
cb();
})
.catch((err: any) => {
cb();
});
})
.catch((err: any) => {
cb();
});
};
}
function _scrapeTokenRegistryToDB(): any {
return (cb: () => void) => {
pullDataScripts
.getTokenRegistry()
.then((data: any) => {
const parsedTokens: any = [];
for (const token of data) {
parsedTokens.push(typeConverters.convertLogTokenToTokenObject(token));
}
insertDataScripts.insertMultipleRows('tokens', parsedTokens, Object.keys(parsedTokens[0]));
cb();
})
.catch((err: any) => {
cb();
});
};
}
function _scrapeMetaMaskEthContractMetadataToDB(): any {
return (cb: () => void) => {
pullDataScripts
.getMetaMaskTokens()
.then((data: any) => {
const parsedTokens: any = [];
const dataArray = _.map(_.keys(data), (tokenAddress: string) => {
const value = _.get(data, tokenAddress);
return {
address: tokenAddress,
...value,
};
});
const erc20TokensOnly = _.filter(dataArray, entry => {
const isErc20 = _.get(entry, 'erc20');
return isErc20;
});
for (const token of erc20TokensOnly) {
parsedTokens.push(typeConverters.convertMetaMaskTokenToTokenObject(token));
}
insertDataScripts.insertMultipleRows('tokens', parsedTokens, Object.keys(parsedTokens[0]));
cb();
})
.catch((err: any) => {
cb();
});
};
}
function _scrapeEthplorerTopTokensToDB(): any {
return (cb: () => void) => {
pullDataScripts
.getEthplorerTopTokens()
.then((data: any) => {
const parsedTokens: any = [];
const tokens = _.get(data, 'tokens');
for (const token of tokens) {
parsedTokens.push(typeConverters.convertMetaMaskTokenToTokenObject(token));
}
insertDataScripts.insertMultipleRows('tokens', parsedTokens, Object.keys(parsedTokens[0]));
cb();
})
.catch((err: any) => {
cb();
});
};
}
function _scrapeUnknownTokenInformationToDB(): any {
return (cb: () => void) => {
postgresClient
.query(dataFetchingQueries.get_top_unknown_token_addresses)
.then(async (result: any) => {
const addresses = _.map(result.rows, row => _.get(row, 'address'));
const responses = await Promise.all(
_.map(addresses, address => pullDataScripts.getEthplorerToken(address)),
);
const tokens = _.filter(responses, response => _.isUndefined(_.get(response, 'error')));
const parsedTokens = _.map(tokens, tokenInfo =>
typeConverters.convertEthplorerTokenToTokenObject(tokenInfo),
);
insertDataScripts.insertMultipleRows('tokens', parsedTokens, Object.keys(parsedTokens[0]));
cb();
})
.catch((err: any) => {
cb();
});
};
}
function _scrapePriceToDB(timestamp: number, token: any, timeDelay?: number): any {
return (cb: () => void) => {
pullDataScripts
.getPriceData(token.symbol, timestamp, timeDelay)
.then((data: any) => {
const safeSymbol = token.symbol === 'WETH' ? 'ETH' : token.symbol;
const parsedPrice = {
timestamp: timestamp / 1000,
symbol: token.symbol,
base: 'USD',
price: _.has(data[safeSymbol], 'USD') ? data[safeSymbol].USD : 0,
};
console.debug('Inserting ' + timestamp);
console.debug(parsedPrice);
insertDataScripts.insertSingleRow('prices', parsedPrice);
cb();
})
.catch((err: any) => {
console.debug(err);
cb();
});
};
}
// function _scrapeHistoricalPricesToDB(token: any, fromTimestamp: number, toTimestamp: number): any {
// return (cb: () => void) => {
// pullDataScripts
// .getHistoricalPrices(token, BASE_SYMBOL, fromTimestamp, toTimestamp)
// .then((data: any) => {
// const parsedHistoricalPrices: any = [];
// for (const historicalPrice of data['Data']) {
// const parsedHistoricalPrice = typeConverters.convertLogHistoricalPricesToHistoricalPricesObject(historicalPrice);
// parsedHistoricalPrice['token'] = token;
// parsedHistoricalPrice['base'] = BASE_SYMBOL;
// parsedHistoricalPrices.push(parsedHistoricalPrice);
// }
// if (parsedHistoricalPrices.length > 0) {
// insertDataScripts
// .insertMultipleRows(
// 'historical_prices',
// parsedHistoricalPrices,
// Object.keys(parsedHistoricalPrices[0]),
// )
// .catch((error: any) => {
// console.error(error);
// });
// }
// cb();
// })
// .catch((error: any) => {
// console.error(error);
// cb();
// });
// };
// }
function _scrapeOrderBookToDB(id: string, sraEndpoint: string): any {
return (cb: () => void) => {
pullDataScripts
.getOrderBook(sraEndpoint)
.then((data: any) => {
for (const book of data) {
for (const order of book.bids) {
console.debug(order);
const parsedOrder = typeConverters.convertLogOrderToOrderObject(order);
parsedOrder.relayer_id = id;
parsedOrder.order_hash = ZeroEx.getOrderHashHex(order);
insertDataScripts.insertSingleRow('orders', parsedOrder).catch((error: any) => {
console.error(error);
});
}
for (const order of book.asks) {
console.debug(order);
const parsedOrder = typeConverters.convertLogOrderToOrderObject(order);
parsedOrder.relayer_id = id;
parsedOrder.order_hash = ZeroEx.getOrderHashHex(order);
insertDataScripts.insertSingleRow('orders', parsedOrder).catch((error: any) => {
console.error(error);
});
}
}
cb();
})
.catch((error: any) => {
console.error(error);
cb();
});
};
}
if (cli.type === 'events') {
if (cli.from && cli.to) {
const destToBlock = cli.to ? cli.to : cli.from;
let curFromBlock = cli.from;
let curToBlock = curFromBlock;
do {
curToBlock += destToBlock - curToBlock < BLOCK_INCREMENTS ? destToBlock - curToBlock : BLOCK_INCREMENTS;
q.push(_scrapeEventsToDB(curFromBlock, curToBlock));
curFromBlock = curToBlock + 1;
} while (curToBlock < destToBlock);
}
} else if (cli.type === 'blocks') {
if (cli.from && cli.to) {
if (cli.force) {
const destToBlock = cli.to ? cli.to : cli.from;
let curFromBlock = cli.from;
const curToBlock = curFromBlock;
for (; curFromBlock < destToBlock; curFromBlock++) {
q.push(_scrapeBlockToDB(curFromBlock));
}
} else {
const fetchFrom = cli.from;
const fetchTo = cli.to ? cli.to : cli.from + 1;
postgresClient
.query(dataFetchingQueries.get_used_block_numbers, [fetchFrom, fetchTo])
.then((data: any) => {
for (const row of data.rows) {
q.push(_scrapeBlockToDB(row.block_number));
}
})
.catch((err: any) => {
// console.debug(err);
});
}
}
} else if (cli.type === 'transactions') {
if (cli.id) {
q.push(_scrapeTransactionToDB(cli.id));
} else if (cli.from) {
const fetchFrom = cli.from;
const fetchTo = cli.to ? cli.to : cli.from + 1;
postgresClient
.query(dataFetchingQueries.get_missing_txn_hashes, [fetchFrom, fetchTo])
.then((data: any) => {
for (const row of data.rows) {
q.push(_scrapeTransactionToDB(row.txn_hash));
}
})
.catch((err: any) => {
// console.debug(err);
});
}
} else if (cli.type === 'tokens') {
q.push(_scrapeMetaMaskEthContractMetadataToDB());
q.push(_scrapeEthplorerTopTokensToDB());
} else if (cli.type === 'unknown_tokens') {
q.push(_scrapeUnknownTokenInformationToDB());
} else if (cli.type === 'prices' && cli.from && cli.to) {
const fromDate = new Date(cli.from);
console.debug(fromDate);
fromDate.setUTCHours(0);
fromDate.setUTCMinutes(0);
fromDate.setUTCSeconds(0);
fromDate.setUTCMilliseconds(0);
console.debug(fromDate);
const toDate = new Date(cli.to);
postgresClient
.query(dataFetchingQueries.get_token_registry, [])
.then((result: any) => {
for (const curDate = fromDate; curDate < toDate; curDate.setDate(curDate.getDate() + 1)) {
for (const token of Object.values(result.rows)) {
console.debug('Scraping ' + curDate + ' ' + token);
q.push(_scrapePriceToDB(curDate.getTime(), token));
}
}
})
.catch((err: any) => {
console.debug(err);
});
// } else if (cli.type === 'historical_prices') {
// if (cli.token && cli.from && cli.to) {
// q.push(_scrapeHistoricalPricesToDB(cli.token, cli.from, cli.to));
// }
// } else if (cli.type === 'all_historical_prices') {
// if (cli.from && cli.to) {
// postgresClient
// .query(dataFetchingQueries.get_token_registry, [])
// .then((result: any) => {
// const curTokens: any = result.rows.map((a: any): any => a.symbol);
// for (const curToken of curTokens) {
// console.debug('Historical data backfill: Pushing coin ' + curToken);
// q.push(_scrapeHistoricalPricesToDB(curToken, cli.from, cli.to));
// }
// })
// .catch((err: any) => {
// console.debug(err);
// });
// }
} else if (cli.type === 'relayers') {
q.push(_scrapeAllRelayersToDB());
} else if (cli.type === 'orders') {
postgresClient.query(dataFetchingQueries.get_relayers, []).then((result: any) => {
for (const relayer of result.rows) {
if (relayer.sra_http_url) {
q.push(_scrapeOrderBookToDB(relayer.id, relayer.sra_http_url));
}
}
});
}