diff --git a/Tiltfile b/Tiltfile index 3a98416..90c7301 100644 --- a/Tiltfile +++ b/Tiltfile @@ -15,16 +15,14 @@ helm_remote("redis", ) k8s_yaml(configmap_from_dict("mev-inspect-rpc", inputs = { - "url" : os.environ["RPC_URL"], + "primary_url" : os.environ["RPC_URL"], + "secondary_url" : os.environ["SECONDARY_RPC_URL"], })) -k8s_yaml(configmap_from_dict("mev-inspect-rpc-list, inputs = { - "list" : os.environ["RPC_URL"], -})) -k8s_yaml(configmap_from_dict("mev-inspect-listener-healthcheck", inputs = { - "url" : os.getenv("LISTENER_HEALTHCHECK_URL", default=""), -})) +#k8s_yaml(configmap_from_dict("mev-inspect-listener-healthcheck", inputs = { +# "url" : os.getenv("LISTENER_HEALTHCHECK_URL", default=""), +#})) k8s_yaml(secret_from_dict("mev-inspect-db-credentials", inputs = { "username" : "postgres", diff --git a/k8s/mev-inspect-workers/templates/deployment.yaml b/k8s/mev-inspect-workers/templates/deployment.yaml index de961bf..d59112b 100644 --- a/k8s/mev-inspect-workers/templates/deployment.yaml +++ b/k8s/mev-inspect-workers/templates/deployment.yaml @@ -84,12 +84,14 @@ spec: valueFrom: configMapKeyRef: name: mev-inspect-rpc - key: url - - name: RPC_LIST + key: primary_url + optional: true + - name: SECONDARY_RPC_URL valueFrom: configMapKeyRef: - name: mev-inspect-rpc-list - key: list + name: mev-inspect-rpc + key: secondary_url + optional: true - name: LISTENER_HEALTHCHECK_URL valueFrom: configMapKeyRef: diff --git a/k8s/mev-inspect/templates/deployment.yaml b/k8s/mev-inspect/templates/deployment.yaml index 7993652..14c3a06 100644 --- a/k8s/mev-inspect/templates/deployment.yaml +++ b/k8s/mev-inspect/templates/deployment.yaml @@ -84,7 +84,14 @@ spec: valueFrom: configMapKeyRef: name: mev-inspect-rpc - key: url + key: primary_url + optional: true + - name: SECONDARY_RPC_URL + valueFrom: + configMapKeyRef: + name: mev-inspect-rpc + key: secondary_url + optional: true - name: LISTENER_HEALTHCHECK_URL valueFrom: configMapKeyRef: diff --git a/listener.py b/listener.py index c38e4ce..502836d 100644 --- a/listener.py +++ b/listener.py @@ -1,6 +1,8 @@ import asyncio import logging import os +from collections import deque +from typing import Dict, Optional import dramatiq from aiohttp_retry import ExponentialRetry, RetryClient @@ -26,49 +28,37 @@ logging.basicConfig(filename="listener.log", filemode="a", level=logging.INFO) logger = logging.getLogger(__name__) # lag to make sure the blocks we see are settled -CURRENT_RPC: int = 0 BLOCK_NUMBER_LAG = 5 -rpc = os.getenv("RPC_URL") -rpc_list = os.getenv("RPC_LIST") -if rpc_list is None and rpc is None: - raise RuntimeError("Missing RPC_URL or RPC_LIST environment variables.") +primary_rpc = os.getenv("RPC_URL") +secondary_rpc = os.getenv("SECONDARY_RPC_URL") -if rpc_list is not None: - rpc = rpc_list[CURRENT_RPC] +if os.getenv("RPC_URL") is None: + raise RuntimeError("Missing primary RPC environment variable: RPC_URL. ") + +rpc_queue = deque([primary_rpc, secondary_rpc]) @coro async def run(): - healthcheck_url = os.getenv("LISTENER_HEALTHCHECK_URL") - logger.info("Starting...") - killer = GracefulKiller() + if _get_inspector_params(rpc_queue[0]) is None and secondary_rpc is not None: + rpc_queue.reverse() - inspect_db_session = get_inspect_session() - trace_db_session = get_trace_session() + inspect_params: Optional[Dict] = _get_inspector_params(rpc_queue[0]) - broker = connect_broker() - export_actor = dramatiq.actor( - realtime_export_task, - broker=broker, - queue_name=HIGH_PRIORITY_QUEUE, - priority=HIGH_PRIORITY, - ) - - inspector = MEVInspector(rpc) - base_provider = get_base_provider(rpc) + killer = inspect_params["killer"] while not killer.kill_now: await inspect_next_block( - inspector, - inspect_db_session, - trace_db_session, - base_provider, - healthcheck_url, - export_actor, + inspect_params["inspector"], + inspect_params["inspect_db_session"], + inspect_params["trace_db_session"], + inspect_params["base_provider"], + inspect_params["healthcheck_url"], + inspect_params["export_actor"], ) logger.info("Stopping...") @@ -125,11 +115,41 @@ async def ping_healthcheck_url(url): pass +def _get_inspector_params(rpc: str) -> Optional[Dict]: + try: + healthcheck_url = os.getenv("LISTENER_HEALTHCHECK_URL") + + broker = connect_broker() + export_actor = dramatiq.actor( + realtime_export_task, + broker=broker, + queue_name=HIGH_PRIORITY_QUEUE, + priority=HIGH_PRIORITY, + ) + + killer = GracefulKiller() + + inspect_db_session = get_inspect_session() + trace_db_session = get_trace_session() + + inspector = MEVInspector(rpc) + base_provider = get_base_provider(rpc) + + return { + "inspector": inspector, + "base_provider": base_provider, + "killer": killer, + "healthcheck_url": healthcheck_url, + "inspect_db_session": inspect_db_session, + "trace_db_session": trace_db_session, + "export_actor": export_actor, + } + except Exception: + return None + + if __name__ == "__main__": try: run() except Exception as e: logger.error(e) - if rpc_list is not None and CURRENT_RPC < len(rpc_list): - rpc = rpc_list[CURRENT_RPC + 1] - run()