Compare commits
5 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
8e68e251b5 | ||
|
ae2e5464c3 | ||
|
813e2034d4 | ||
|
308902e62c | ||
|
f32e62ae55 |
10
Tiltfile
10
Tiltfile
@ -15,12 +15,14 @@ helm_remote("redis",
|
||||
)
|
||||
|
||||
k8s_yaml(configmap_from_dict("mev-inspect-rpc", inputs = {
|
||||
"url" : os.environ["RPC_URL"],
|
||||
"primary_url" : os.environ["RPC_URL"],
|
||||
"secondary_url" : os.environ["SECONDARY_RPC_URL"],
|
||||
}))
|
||||
|
||||
k8s_yaml(configmap_from_dict("mev-inspect-listener-healthcheck", inputs = {
|
||||
"url" : os.getenv("LISTENER_HEALTHCHECK_URL", default=""),
|
||||
}))
|
||||
|
||||
#k8s_yaml(configmap_from_dict("mev-inspect-listener-healthcheck", inputs = {
|
||||
# "url" : os.getenv("LISTENER_HEALTHCHECK_URL", default=""),
|
||||
#}))
|
||||
|
||||
k8s_yaml(secret_from_dict("mev-inspect-db-credentials", inputs = {
|
||||
"username" : "postgres",
|
||||
|
@ -84,7 +84,14 @@ spec:
|
||||
valueFrom:
|
||||
configMapKeyRef:
|
||||
name: mev-inspect-rpc
|
||||
key: url
|
||||
key: primary_url
|
||||
optional: true
|
||||
- name: SECONDARY_RPC_URL
|
||||
valueFrom:
|
||||
configMapKeyRef:
|
||||
name: mev-inspect-rpc
|
||||
key: secondary_url
|
||||
optional: true
|
||||
- name: LISTENER_HEALTHCHECK_URL
|
||||
valueFrom:
|
||||
configMapKeyRef:
|
||||
|
@ -84,7 +84,14 @@ spec:
|
||||
valueFrom:
|
||||
configMapKeyRef:
|
||||
name: mev-inspect-rpc
|
||||
key: url
|
||||
key: primary_url
|
||||
optional: true
|
||||
- name: SECONDARY_RPC_URL
|
||||
valueFrom:
|
||||
configMapKeyRef:
|
||||
name: mev-inspect-rpc
|
||||
key: secondary_url
|
||||
optional: true
|
||||
- name: LISTENER_HEALTHCHECK_URL
|
||||
valueFrom:
|
||||
configMapKeyRef:
|
||||
|
1
listener
1
listener
@ -15,7 +15,6 @@ case "$1" in
|
||||
--chdir /app \
|
||||
--chuid flashbot \
|
||||
--start \
|
||||
--quiet \
|
||||
--pidfile $PIDFILE \
|
||||
--make-pidfile \
|
||||
--startas /bin/bash -- -c "poetry run python listener.py"
|
||||
|
77
listener.py
77
listener.py
@ -1,6 +1,8 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
from collections import deque
|
||||
from typing import Dict, Optional
|
||||
|
||||
import dramatiq
|
||||
from aiohttp_retry import ExponentialRetry, RetryClient
|
||||
@ -28,41 +30,35 @@ logger = logging.getLogger(__name__)
|
||||
# lag to make sure the blocks we see are settled
|
||||
BLOCK_NUMBER_LAG = 5
|
||||
|
||||
primary_rpc = os.getenv("RPC_URL")
|
||||
secondary_rpc = os.getenv("SECONDARY_RPC_URL")
|
||||
|
||||
if os.getenv("RPC_URL") is None:
|
||||
raise RuntimeError("Missing primary RPC environment variable: RPC_URL. ")
|
||||
|
||||
rpc_queue = deque([primary_rpc, secondary_rpc])
|
||||
|
||||
|
||||
@coro
|
||||
async def run():
|
||||
rpc = os.getenv("RPC_URL")
|
||||
if rpc is None:
|
||||
raise RuntimeError("Missing environment variable RPC_URL")
|
||||
|
||||
healthcheck_url = os.getenv("LISTENER_HEALTHCHECK_URL")
|
||||
|
||||
logger.info("Starting...")
|
||||
|
||||
killer = GracefulKiller()
|
||||
if _get_inspector_params(rpc_queue[0]) is None and secondary_rpc is not None:
|
||||
rpc_queue.rotate(-1)
|
||||
|
||||
inspect_db_session = get_inspect_session()
|
||||
trace_db_session = get_trace_session()
|
||||
inspect_params: Optional[Dict] = _get_inspector_params(rpc_queue[0])
|
||||
|
||||
broker = connect_broker()
|
||||
export_actor = dramatiq.actor(
|
||||
realtime_export_task,
|
||||
broker=broker,
|
||||
queue_name=HIGH_PRIORITY_QUEUE,
|
||||
priority=HIGH_PRIORITY,
|
||||
)
|
||||
|
||||
inspector = MEVInspector(rpc)
|
||||
base_provider = get_base_provider(rpc)
|
||||
killer = inspect_params["killer"]
|
||||
|
||||
while not killer.kill_now:
|
||||
await inspect_next_block(
|
||||
inspector,
|
||||
inspect_db_session,
|
||||
trace_db_session,
|
||||
base_provider,
|
||||
healthcheck_url,
|
||||
export_actor,
|
||||
inspect_params["inspector"],
|
||||
inspect_params["inspect_db_session"],
|
||||
inspect_params["trace_db_session"],
|
||||
inspect_params["base_provider"],
|
||||
inspect_params["healthcheck_url"],
|
||||
inspect_params["export_actor"],
|
||||
)
|
||||
|
||||
logger.info("Stopping...")
|
||||
@ -119,6 +115,39 @@ async def ping_healthcheck_url(url):
|
||||
pass
|
||||
|
||||
|
||||
def _get_inspector_params(rpc: str) -> Optional[Dict]:
|
||||
try:
|
||||
healthcheck_url = os.getenv("LISTENER_HEALTHCHECK_URL")
|
||||
|
||||
broker = connect_broker()
|
||||
export_actor = dramatiq.actor(
|
||||
realtime_export_task,
|
||||
broker=broker,
|
||||
queue_name=HIGH_PRIORITY_QUEUE,
|
||||
priority=HIGH_PRIORITY,
|
||||
)
|
||||
|
||||
killer = GracefulKiller()
|
||||
|
||||
inspect_db_session = get_inspect_session()
|
||||
trace_db_session = get_trace_session()
|
||||
|
||||
inspector = MEVInspector(rpc)
|
||||
base_provider = get_base_provider(rpc)
|
||||
|
||||
return {
|
||||
"inspector": inspector,
|
||||
"base_provider": base_provider,
|
||||
"killer": killer,
|
||||
"healthcheck_url": healthcheck_url,
|
||||
"inspect_db_session": inspect_db_session,
|
||||
"trace_db_session": trace_db_session,
|
||||
"export_actor": export_actor,
|
||||
}
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
run()
|
||||
|
Loading…
x
Reference in New Issue
Block a user