Fix/pipeline/ohlcv (#1393)

The OHLCV script in data pipeline quits early when we get no data from Crypto Compare. Sometimes Crypto Compare gives us a valid empty response (e.g. when we query for way back in time) and we need to just continue. This adds better filtering for the types of Crypto Compare responses to detect when we should continue and when we should really quit.
This commit is contained in:
Xianny
2018-12-05 16:05:06 -08:00
committed by GitHub
parent 21122f0137
commit 78d0ab1aa2
2 changed files with 43 additions and 31 deletions

View File

@@ -10,11 +10,9 @@ import { fetchOHLCVTradingPairsAsync, TradingPair } from '../utils/get_ohlcv_tra
const SOURCE_NAME = 'CryptoCompare';
const TWO_HOURS_AGO = new Date().getTime() - 2 * 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers
const ONE_HOUR_AGO = new Date().getTime() - 60 * 60 * 1000; // tslint:disable-line:custom-no-magic-numbers
const ONE_SECOND = 1000;
const MAX_CONCURRENT_REQUESTS = parseInt(process.env.CRYPTOCOMPARE_MAX_CONCURRENT_REQUESTS || '14', 10); // tslint:disable-line:custom-no-magic-numbers
const EARLIEST_BACKFILL_DATE = process.env.OHLCV_EARLIEST_BACKFILL_DATE || '2010-09-01'; // the time when BTC/USD info starts appearing on Crypto Compare
const EARLIEST_BACKFILL_DATE = process.env.OHLCV_EARLIEST_BACKFILL_DATE || '2014-06-01';
const EARLIEST_BACKFILL_TIME = new Date(EARLIEST_BACKFILL_DATE).getTime();
let connection: Connection;
@@ -60,28 +58,24 @@ async function fetchAndSaveAsync(
if (pair.latestSavedTime > TWO_HOURS_AGO) {
break;
}
const rawRecords = await source.getHourlyOHLCVAsync(pair);
const records = rawRecords.filter(rec => {
return rec.time * ONE_SECOND < ONE_HOUR_AGO && rec.time * ONE_SECOND > pair.latestSavedTime;
}); // Crypto Compare can take ~30mins to finalise records
if (records.length === 0) {
console.log(`No more records, stopping task for ${JSON.stringify(pair)}`);
break;
}
const metadata: OHLCVMetadata = {
exchange: source.default_exchange,
fromSymbol: pair.fromSymbol,
toSymbol: pair.toSymbol,
source: SOURCE_NAME,
observedTimestamp: jobTime,
interval: source.intervalBetweenRecords,
};
const parsedRecords = parseRecords(records, metadata);
try {
await saveRecordsAsync(repository, parsedRecords);
const records = await source.getHourlyOHLCVAsync(pair);
console.log(`Retrieved ${records.length} records for ${JSON.stringify(pair)}`);
if (records.length > 0) {
const metadata: OHLCVMetadata = {
exchange: source.default_exchange,
fromSymbol: pair.fromSymbol,
toSymbol: pair.toSymbol,
source: SOURCE_NAME,
observedTimestamp: jobTime,
interval: source.intervalBetweenRecords,
};
const parsedRecords = parseRecords(records, metadata);
await saveRecordsAsync(repository, parsedRecords);
}
i++;
} catch (err) {
console.log(`Error saving OHLCVRecords, stopping task for ${JSON.stringify(pair)} [${err}]`);
console.log(`Error scraping OHLCVRecords, stopping task for ${JSON.stringify(pair)} [${err}]`);
break;
}
}