Merge branch 'development' into pull-github-data
This commit is contained in:
@@ -0,0 +1,48 @@
|
||||
/**
|
||||
* Needed because we store the initial dump of trades in S3, and some projects
|
||||
* (namely cryptokitties) have dumps that are too big to be transferred easily
|
||||
* as one big file to and from S3. This script breaks apart a dump file into a
|
||||
* set of files containing segments of the data. The number of segments is
|
||||
* based on S3_CHUNK_SIZES specified for each project, or "publisher" in their
|
||||
* parlance, in ../../data_sources/nonfungible_dot_com/index.ts.
|
||||
*
|
||||
* Usage: $ node partition_nonfungible_dot_com_dump.ts publisher
|
||||
* Example: $ node partition_nonfungible_dot_com_dump.ts cryptokitties
|
||||
*
|
||||
* Expects a to find on disk a data file named
|
||||
* `sales_summary_${publisher}.json`, as emailed by Daniel of nonfungible.com.
|
||||
*
|
||||
* Writes to disk a set of files named `sales_summary_${publisher}${N}.json`.
|
||||
*
|
||||
* Probably need to use `node` with --max-old-space-size=1024 or maybe
|
||||
* even more.
|
||||
*/
|
||||
|
||||
import { readFileSync, writeFileSync } from 'fs';
|
||||
|
||||
import { splitEvery } from 'ramda';
|
||||
|
||||
import { logUtils } from '@0x/utils';
|
||||
|
||||
import {
|
||||
NonfungibleDotComHistoryResponse,
|
||||
NonfungibleDotComTradeResponse,
|
||||
S3_CHUNK_SIZES,
|
||||
} from '../data_sources/nonfungible_dot_com';
|
||||
|
||||
(() => {
|
||||
const publisher = process.argv[2];
|
||||
|
||||
const inputFilename = `sales_summary_${publisher}.json`;
|
||||
logUtils.log(`Reading input file ${inputFilename}`);
|
||||
const sourceJson: NonfungibleDotComHistoryResponse = JSON.parse(readFileSync(inputFilename).toString());
|
||||
|
||||
const chunkSize = S3_CHUNK_SIZES[publisher];
|
||||
logUtils.log(`Splitting data into chunks of ${chunkSize} trades each`);
|
||||
const chunks: NonfungibleDotComTradeResponse[][] = splitEvery(chunkSize, sourceJson.data);
|
||||
|
||||
logUtils.log(`Writing ${chunks.length} chunks to disk`);
|
||||
for (let chunkIndex = 0; chunkIndex < chunks.length; chunkIndex++) {
|
||||
writeFileSync(`sales_summary_${publisher}${chunkIndex}.json`, JSON.stringify(chunks[chunkIndex]));
|
||||
}
|
||||
})();
|
66
packages/pipeline/src/scripts/pull_greenhouse.ts
Normal file
66
packages/pipeline/src/scripts/pull_greenhouse.ts
Normal file
@@ -0,0 +1,66 @@
|
||||
import * as R from 'ramda';
|
||||
import { Connection, ConnectionOptions, createConnection } from 'typeorm';
|
||||
|
||||
import { logUtils } from '@0x/utils';
|
||||
|
||||
import { GreenhouseSource } from '../data_sources/greenhouse';
|
||||
import { GreenhouseApplication } from '../entities';
|
||||
import * as ormConfig from '../ormconfig';
|
||||
import { parseApplications } from '../parsers/greenhouse';
|
||||
import { handleError } from '../utils';
|
||||
let connection: Connection;
|
||||
|
||||
const GREENHOUSE_FALLBACK_DATE = '2018-09-01';
|
||||
|
||||
(async () => {
|
||||
connection = await createConnection(ormConfig as ConnectionOptions);
|
||||
|
||||
const accessToken = process.env.GREENHOUSE_ACCESS_TOKEN;
|
||||
if (accessToken === undefined) {
|
||||
throw new Error('Missing required env var: GREENHOUSE_ACCESS_TOKEN');
|
||||
}
|
||||
const source = new GreenhouseSource(accessToken);
|
||||
|
||||
await fetchAndSaveApplicationsAsync(source);
|
||||
})().catch(handleError);
|
||||
|
||||
async function getStartDateAsync(conn: Connection, sortColumn: string, tableName: string): Promise<Date> {
|
||||
if (process.env.GREENHOUSE_START_DATE) {
|
||||
return new Date(process.env.GREENHOUSE_START_DATE);
|
||||
} else {
|
||||
const queryResult = await conn.query(`SELECT MAX(${sortColumn}) as _max from ${tableName};`);
|
||||
if (R.isEmpty(queryResult)) {
|
||||
return new Date(GREENHOUSE_FALLBACK_DATE);
|
||||
} else {
|
||||
return new Date(queryResult[0]._max);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function getEndDate(): Date {
|
||||
if (process.env.GREENHOUSE_END_DATE) {
|
||||
return new Date(process.env.GREENHOUSE_END_DATE);
|
||||
} else {
|
||||
return new Date();
|
||||
}
|
||||
}
|
||||
|
||||
async function fetchAndSaveApplicationsAsync(source: GreenhouseSource): Promise<void> {
|
||||
const repository = connection.getRepository(GreenhouseApplication);
|
||||
const startTime = await getStartDateAsync(connection, 'last_activity_at', 'raw.greenhouse_applications');
|
||||
const endTime = getEndDate();
|
||||
logUtils.log(`Fetching Greenhouse applications starting from ${startTime}...`);
|
||||
const allApplications = await source.fetchApplicationsAsync(startTime);
|
||||
const applications = allApplications.filter(app => {
|
||||
const date = new Date(app.last_activity_at);
|
||||
return date > startTime && date < endTime;
|
||||
});
|
||||
logUtils.log(
|
||||
`Found ${
|
||||
applications.length
|
||||
} updated Greenhouse applications between ${startTime.toISOString()} and ${endTime.toISOString()}...`,
|
||||
);
|
||||
const parsed = applications.map(a => parseApplications(a));
|
||||
await repository.save(parsed);
|
||||
logUtils.log(`Saved ${parsed.length} Greenhouse applications`);
|
||||
}
|
@@ -13,7 +13,7 @@ import { handleError } from '../utils';
|
||||
const BATCH_SAVE_SIZE = 1000;
|
||||
|
||||
// Max requests to make to API per second;
|
||||
const EDPS_MAX_REQUESTS_PER_SECOND = 1;
|
||||
const EDPS_MAX_REQUESTS_PER_SECOND = 0.5;
|
||||
|
||||
// Maximum requests per second to CryptoCompare
|
||||
const CRYPTO_COMPARE_MAX_REQS_PER_SECOND = 60;
|
||||
@@ -31,6 +31,7 @@ let connection: Connection;
|
||||
connection = await createConnection(ormConfig as ConnectionOptions);
|
||||
const edpsSource = new EdpsSource(EDPS_MAX_REQUESTS_PER_SECOND);
|
||||
const cryptoCompareSource = new CryptoCompareOHLCVSource(CRYPTO_COMPARE_MAX_REQS_PER_SECOND);
|
||||
let hasFailed: boolean = false;
|
||||
|
||||
logUtils.log('Fetching slippage records');
|
||||
const nestedSlippages: Slippage[][][] = await Promise.all(
|
||||
@@ -49,6 +50,7 @@ let connection: Connection;
|
||||
} catch (e) {
|
||||
logUtils.log(`Error getting data for symbol=${symbol}, amount=${amount}`);
|
||||
logUtils.log(e);
|
||||
hasFailed = true;
|
||||
return [new Slippage()];
|
||||
}
|
||||
}),
|
||||
@@ -63,5 +65,5 @@ let connection: Connection;
|
||||
logUtils.log(`Saving ${slippages.length} records to database`);
|
||||
await SlippageRepository.save(slippages, { chunk: Math.ceil(slippages.length / BATCH_SAVE_SIZE) });
|
||||
logUtils.log('Done');
|
||||
process.exit(0);
|
||||
process.exit(hasFailed ? 1 : 0);
|
||||
})().catch(handleError);
|
||||
|
Reference in New Issue
Block a user