55 lines
1.6 KiB
Python
55 lines
1.6 KiB
Python
from datetime import datetime
|
|
from typing import List
|
|
|
|
from pycoingecko import CoinGeckoAPI
|
|
|
|
from mev_inspect.schemas.prices import COINGECKO_ID_BY_ADDRESS, TOKEN_ADDRESSES, Price
|
|
|
|
|
|
def fetch_prices() -> List[Price]:
|
|
coingecko_api = CoinGeckoAPI()
|
|
prices = []
|
|
|
|
for token_address in TOKEN_ADDRESSES:
|
|
coingecko_price_data = coingecko_api.get_coin_market_chart_by_id(
|
|
id=COINGECKO_ID_BY_ADDRESS[token_address],
|
|
vs_currency="usd",
|
|
days="max",
|
|
interval="daily",
|
|
)
|
|
prices += _build_token_prices(coingecko_price_data, token_address)
|
|
|
|
return prices
|
|
|
|
|
|
def fetch_prices_range(after: datetime, before: datetime) -> List[Price]:
|
|
coingecko_api = CoinGeckoAPI()
|
|
prices = []
|
|
after_unix = int(after.timestamp())
|
|
before_unix = int(before.timestamp())
|
|
|
|
for token_address in TOKEN_ADDRESSES:
|
|
coingecko_price_data = coingecko_api.get_coin_market_chart_range_by_id(
|
|
COINGECKO_ID_BY_ADDRESS[token_address], "usd", after_unix, before_unix
|
|
)
|
|
|
|
prices += _build_token_prices(coingecko_price_data, token_address)
|
|
|
|
return prices
|
|
|
|
|
|
def _build_token_prices(coingecko_price_data, token_address) -> List[Price]:
|
|
time_series = coingecko_price_data["prices"]
|
|
prices = []
|
|
for entry in time_series:
|
|
timestamp = datetime.fromtimestamp(entry[0] / 1000)
|
|
token_price = entry[1]
|
|
prices.append(
|
|
Price(
|
|
timestamp=timestamp,
|
|
usd_price=token_price,
|
|
token_address=token_address,
|
|
)
|
|
)
|
|
return prices
|