Updating token prices with ./mev prices fetch-all can fail due to CoinGecko's API rate limits https://www.coingecko.com/en/api/pricing_2 (10-30 requests a minute) Error message: ValueError: {'status': {'error_code': 429, 'error_message': "You've exceeded the Rate Limit. Please visit https://www.coingecko.com/en/api/pricing to subscribe to our API plans for higher rate limits."}} Currently, TOKEN_ADDRESSES length is 14 which can (and does) fail the function, depending on the dynamic limit By introducing time delays between API queries we can avoid hitting the limit and failing the price update.
59 lines
1.7 KiB
Python
59 lines
1.7 KiB
Python
from datetime import datetime
|
|
from typing import List
|
|
import time
|
|
|
|
from pycoingecko import CoinGeckoAPI
|
|
|
|
from mev_inspect.schemas.prices import COINGECKO_ID_BY_ADDRESS, TOKEN_ADDRESSES, Price
|
|
|
|
SLEEP_TIME = 10
|
|
|
|
def fetch_prices() -> List[Price]:
|
|
coingecko_api = CoinGeckoAPI()
|
|
prices = []
|
|
|
|
for token_address in TOKEN_ADDRESSES:
|
|
# Avoid Coingecko's API rate limits
|
|
time.sleep(SLEEP_TIME)
|
|
coingecko_price_data = coingecko_api.get_coin_market_chart_by_id(
|
|
id=COINGECKO_ID_BY_ADDRESS[token_address],
|
|
vs_currency="usd",
|
|
days="max",
|
|
interval="daily",
|
|
)
|
|
prices += _build_token_prices(coingecko_price_data, token_address)
|
|
|
|
return prices
|
|
|
|
|
|
def fetch_prices_range(after: datetime, before: datetime) -> List[Price]:
|
|
coingecko_api = CoinGeckoAPI()
|
|
prices = []
|
|
after_unix = int(after.timestamp())
|
|
before_unix = int(before.timestamp())
|
|
|
|
for token_address in TOKEN_ADDRESSES:
|
|
coingecko_price_data = coingecko_api.get_coin_market_chart_range_by_id(
|
|
COINGECKO_ID_BY_ADDRESS[token_address], "usd", after_unix, before_unix
|
|
)
|
|
|
|
prices += _build_token_prices(coingecko_price_data, token_address)
|
|
|
|
return prices
|
|
|
|
|
|
def _build_token_prices(coingecko_price_data, token_address) -> List[Price]:
|
|
time_series = coingecko_price_data["prices"]
|
|
prices = []
|
|
for entry in time_series:
|
|
timestamp = datetime.fromtimestamp(entry[0] / 1000)
|
|
token_price = entry[1]
|
|
prices.append(
|
|
Price(
|
|
timestamp=timestamp,
|
|
usd_price=token_price,
|
|
token_address=token_address,
|
|
)
|
|
)
|
|
return prices
|