Compare commits

...

5 Commits

Author SHA1 Message Date
Gui Heise
8e68e251b5 Add queue rotation 2022-05-04 11:38:10 -04:00
Gui Heise
ae2e5464c3 Add rpc queue and function 2022-05-04 11:31:52 -04:00
Gui Heise
813e2034d4 Fix exception and add configmaps 2022-05-02 17:09:20 -04:00
Gui Heise
308902e62c Initial fallback logic 2022-05-02 16:40:48 -04:00
Gui Heise
f32e62ae55 Remove quiet flag 2022-05-02 13:57:21 -04:00
5 changed files with 76 additions and 32 deletions

View File

@ -15,12 +15,14 @@ helm_remote("redis",
) )
k8s_yaml(configmap_from_dict("mev-inspect-rpc", inputs = { k8s_yaml(configmap_from_dict("mev-inspect-rpc", inputs = {
"url" : os.environ["RPC_URL"], "primary_url" : os.environ["RPC_URL"],
"secondary_url" : os.environ["SECONDARY_RPC_URL"],
})) }))
k8s_yaml(configmap_from_dict("mev-inspect-listener-healthcheck", inputs = {
"url" : os.getenv("LISTENER_HEALTHCHECK_URL", default=""), #k8s_yaml(configmap_from_dict("mev-inspect-listener-healthcheck", inputs = {
})) # "url" : os.getenv("LISTENER_HEALTHCHECK_URL", default=""),
#}))
k8s_yaml(secret_from_dict("mev-inspect-db-credentials", inputs = { k8s_yaml(secret_from_dict("mev-inspect-db-credentials", inputs = {
"username" : "postgres", "username" : "postgres",

View File

@ -84,7 +84,14 @@ spec:
valueFrom: valueFrom:
configMapKeyRef: configMapKeyRef:
name: mev-inspect-rpc name: mev-inspect-rpc
key: url key: primary_url
optional: true
- name: SECONDARY_RPC_URL
valueFrom:
configMapKeyRef:
name: mev-inspect-rpc
key: secondary_url
optional: true
- name: LISTENER_HEALTHCHECK_URL - name: LISTENER_HEALTHCHECK_URL
valueFrom: valueFrom:
configMapKeyRef: configMapKeyRef:

View File

@ -84,7 +84,14 @@ spec:
valueFrom: valueFrom:
configMapKeyRef: configMapKeyRef:
name: mev-inspect-rpc name: mev-inspect-rpc
key: url key: primary_url
optional: true
- name: SECONDARY_RPC_URL
valueFrom:
configMapKeyRef:
name: mev-inspect-rpc
key: secondary_url
optional: true
- name: LISTENER_HEALTHCHECK_URL - name: LISTENER_HEALTHCHECK_URL
valueFrom: valueFrom:
configMapKeyRef: configMapKeyRef:

View File

@ -15,7 +15,6 @@ case "$1" in
--chdir /app \ --chdir /app \
--chuid flashbot \ --chuid flashbot \
--start \ --start \
--quiet \
--pidfile $PIDFILE \ --pidfile $PIDFILE \
--make-pidfile \ --make-pidfile \
--startas /bin/bash -- -c "poetry run python listener.py" --startas /bin/bash -- -c "poetry run python listener.py"

View File

@ -1,6 +1,8 @@
import asyncio import asyncio
import logging import logging
import os import os
from collections import deque
from typing import Dict, Optional
import dramatiq import dramatiq
from aiohttp_retry import ExponentialRetry, RetryClient from aiohttp_retry import ExponentialRetry, RetryClient
@ -28,41 +30,35 @@ logger = logging.getLogger(__name__)
# lag to make sure the blocks we see are settled # lag to make sure the blocks we see are settled
BLOCK_NUMBER_LAG = 5 BLOCK_NUMBER_LAG = 5
primary_rpc = os.getenv("RPC_URL")
secondary_rpc = os.getenv("SECONDARY_RPC_URL")
if os.getenv("RPC_URL") is None:
raise RuntimeError("Missing primary RPC environment variable: RPC_URL. ")
rpc_queue = deque([primary_rpc, secondary_rpc])
@coro @coro
async def run(): async def run():
rpc = os.getenv("RPC_URL")
if rpc is None:
raise RuntimeError("Missing environment variable RPC_URL")
healthcheck_url = os.getenv("LISTENER_HEALTHCHECK_URL")
logger.info("Starting...") logger.info("Starting...")
killer = GracefulKiller() if _get_inspector_params(rpc_queue[0]) is None and secondary_rpc is not None:
rpc_queue.rotate(-1)
inspect_db_session = get_inspect_session() inspect_params: Optional[Dict] = _get_inspector_params(rpc_queue[0])
trace_db_session = get_trace_session()
broker = connect_broker() killer = inspect_params["killer"]
export_actor = dramatiq.actor(
realtime_export_task,
broker=broker,
queue_name=HIGH_PRIORITY_QUEUE,
priority=HIGH_PRIORITY,
)
inspector = MEVInspector(rpc)
base_provider = get_base_provider(rpc)
while not killer.kill_now: while not killer.kill_now:
await inspect_next_block( await inspect_next_block(
inspector, inspect_params["inspector"],
inspect_db_session, inspect_params["inspect_db_session"],
trace_db_session, inspect_params["trace_db_session"],
base_provider, inspect_params["base_provider"],
healthcheck_url, inspect_params["healthcheck_url"],
export_actor, inspect_params["export_actor"],
) )
logger.info("Stopping...") logger.info("Stopping...")
@ -119,6 +115,39 @@ async def ping_healthcheck_url(url):
pass pass
def _get_inspector_params(rpc: str) -> Optional[Dict]:
try:
healthcheck_url = os.getenv("LISTENER_HEALTHCHECK_URL")
broker = connect_broker()
export_actor = dramatiq.actor(
realtime_export_task,
broker=broker,
queue_name=HIGH_PRIORITY_QUEUE,
priority=HIGH_PRIORITY,
)
killer = GracefulKiller()
inspect_db_session = get_inspect_session()
trace_db_session = get_trace_session()
inspector = MEVInspector(rpc)
base_provider = get_base_provider(rpc)
return {
"inspector": inspector,
"base_provider": base_provider,
"killer": killer,
"healthcheck_url": healthcheck_url,
"inspect_db_session": inspect_db_session,
"trace_db_session": trace_db_session,
"export_actor": export_actor,
}
except Exception:
return None
if __name__ == "__main__": if __name__ == "__main__":
try: try:
run() run()