Add rpc queue and function
This commit is contained in:
parent
813e2034d4
commit
ae2e5464c3
12
Tiltfile
12
Tiltfile
@ -15,16 +15,14 @@ helm_remote("redis",
|
|||||||
)
|
)
|
||||||
|
|
||||||
k8s_yaml(configmap_from_dict("mev-inspect-rpc", inputs = {
|
k8s_yaml(configmap_from_dict("mev-inspect-rpc", inputs = {
|
||||||
"url" : os.environ["RPC_URL"],
|
"primary_url" : os.environ["RPC_URL"],
|
||||||
|
"secondary_url" : os.environ["SECONDARY_RPC_URL"],
|
||||||
}))
|
}))
|
||||||
|
|
||||||
k8s_yaml(configmap_from_dict("mev-inspect-rpc-list, inputs = {
|
|
||||||
"list" : os.environ["RPC_URL"],
|
|
||||||
}))
|
|
||||||
|
|
||||||
k8s_yaml(configmap_from_dict("mev-inspect-listener-healthcheck", inputs = {
|
#k8s_yaml(configmap_from_dict("mev-inspect-listener-healthcheck", inputs = {
|
||||||
"url" : os.getenv("LISTENER_HEALTHCHECK_URL", default=""),
|
# "url" : os.getenv("LISTENER_HEALTHCHECK_URL", default=""),
|
||||||
}))
|
#}))
|
||||||
|
|
||||||
k8s_yaml(secret_from_dict("mev-inspect-db-credentials", inputs = {
|
k8s_yaml(secret_from_dict("mev-inspect-db-credentials", inputs = {
|
||||||
"username" : "postgres",
|
"username" : "postgres",
|
||||||
|
@ -84,12 +84,14 @@ spec:
|
|||||||
valueFrom:
|
valueFrom:
|
||||||
configMapKeyRef:
|
configMapKeyRef:
|
||||||
name: mev-inspect-rpc
|
name: mev-inspect-rpc
|
||||||
key: url
|
key: primary_url
|
||||||
- name: RPC_LIST
|
optional: true
|
||||||
|
- name: SECONDARY_RPC_URL
|
||||||
valueFrom:
|
valueFrom:
|
||||||
configMapKeyRef:
|
configMapKeyRef:
|
||||||
name: mev-inspect-rpc-list
|
name: mev-inspect-rpc
|
||||||
key: list
|
key: secondary_url
|
||||||
|
optional: true
|
||||||
- name: LISTENER_HEALTHCHECK_URL
|
- name: LISTENER_HEALTHCHECK_URL
|
||||||
valueFrom:
|
valueFrom:
|
||||||
configMapKeyRef:
|
configMapKeyRef:
|
||||||
|
@ -84,7 +84,14 @@ spec:
|
|||||||
valueFrom:
|
valueFrom:
|
||||||
configMapKeyRef:
|
configMapKeyRef:
|
||||||
name: mev-inspect-rpc
|
name: mev-inspect-rpc
|
||||||
key: url
|
key: primary_url
|
||||||
|
optional: true
|
||||||
|
- name: SECONDARY_RPC_URL
|
||||||
|
valueFrom:
|
||||||
|
configMapKeyRef:
|
||||||
|
name: mev-inspect-rpc
|
||||||
|
key: secondary_url
|
||||||
|
optional: true
|
||||||
- name: LISTENER_HEALTHCHECK_URL
|
- name: LISTENER_HEALTHCHECK_URL
|
||||||
valueFrom:
|
valueFrom:
|
||||||
configMapKeyRef:
|
configMapKeyRef:
|
||||||
|
82
listener.py
82
listener.py
@ -1,6 +1,8 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
|
from collections import deque
|
||||||
|
from typing import Dict, Optional
|
||||||
|
|
||||||
import dramatiq
|
import dramatiq
|
||||||
from aiohttp_retry import ExponentialRetry, RetryClient
|
from aiohttp_retry import ExponentialRetry, RetryClient
|
||||||
@ -26,49 +28,37 @@ logging.basicConfig(filename="listener.log", filemode="a", level=logging.INFO)
|
|||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
# lag to make sure the blocks we see are settled
|
# lag to make sure the blocks we see are settled
|
||||||
CURRENT_RPC: int = 0
|
|
||||||
BLOCK_NUMBER_LAG = 5
|
BLOCK_NUMBER_LAG = 5
|
||||||
rpc = os.getenv("RPC_URL")
|
|
||||||
rpc_list = os.getenv("RPC_LIST")
|
|
||||||
|
|
||||||
if rpc_list is None and rpc is None:
|
primary_rpc = os.getenv("RPC_URL")
|
||||||
raise RuntimeError("Missing RPC_URL or RPC_LIST environment variables.")
|
secondary_rpc = os.getenv("SECONDARY_RPC_URL")
|
||||||
|
|
||||||
if rpc_list is not None:
|
if os.getenv("RPC_URL") is None:
|
||||||
rpc = rpc_list[CURRENT_RPC]
|
raise RuntimeError("Missing primary RPC environment variable: RPC_URL. ")
|
||||||
|
|
||||||
|
rpc_queue = deque([primary_rpc, secondary_rpc])
|
||||||
|
|
||||||
|
|
||||||
@coro
|
@coro
|
||||||
async def run():
|
async def run():
|
||||||
|
|
||||||
healthcheck_url = os.getenv("LISTENER_HEALTHCHECK_URL")
|
|
||||||
|
|
||||||
logger.info("Starting...")
|
logger.info("Starting...")
|
||||||
|
|
||||||
killer = GracefulKiller()
|
if _get_inspector_params(rpc_queue[0]) is None and secondary_rpc is not None:
|
||||||
|
rpc_queue.reverse()
|
||||||
|
|
||||||
inspect_db_session = get_inspect_session()
|
inspect_params: Optional[Dict] = _get_inspector_params(rpc_queue[0])
|
||||||
trace_db_session = get_trace_session()
|
|
||||||
|
|
||||||
broker = connect_broker()
|
killer = inspect_params["killer"]
|
||||||
export_actor = dramatiq.actor(
|
|
||||||
realtime_export_task,
|
|
||||||
broker=broker,
|
|
||||||
queue_name=HIGH_PRIORITY_QUEUE,
|
|
||||||
priority=HIGH_PRIORITY,
|
|
||||||
)
|
|
||||||
|
|
||||||
inspector = MEVInspector(rpc)
|
|
||||||
base_provider = get_base_provider(rpc)
|
|
||||||
|
|
||||||
while not killer.kill_now:
|
while not killer.kill_now:
|
||||||
await inspect_next_block(
|
await inspect_next_block(
|
||||||
inspector,
|
inspect_params["inspector"],
|
||||||
inspect_db_session,
|
inspect_params["inspect_db_session"],
|
||||||
trace_db_session,
|
inspect_params["trace_db_session"],
|
||||||
base_provider,
|
inspect_params["base_provider"],
|
||||||
healthcheck_url,
|
inspect_params["healthcheck_url"],
|
||||||
export_actor,
|
inspect_params["export_actor"],
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.info("Stopping...")
|
logger.info("Stopping...")
|
||||||
@ -125,11 +115,41 @@ async def ping_healthcheck_url(url):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def _get_inspector_params(rpc: str) -> Optional[Dict]:
|
||||||
|
try:
|
||||||
|
healthcheck_url = os.getenv("LISTENER_HEALTHCHECK_URL")
|
||||||
|
|
||||||
|
broker = connect_broker()
|
||||||
|
export_actor = dramatiq.actor(
|
||||||
|
realtime_export_task,
|
||||||
|
broker=broker,
|
||||||
|
queue_name=HIGH_PRIORITY_QUEUE,
|
||||||
|
priority=HIGH_PRIORITY,
|
||||||
|
)
|
||||||
|
|
||||||
|
killer = GracefulKiller()
|
||||||
|
|
||||||
|
inspect_db_session = get_inspect_session()
|
||||||
|
trace_db_session = get_trace_session()
|
||||||
|
|
||||||
|
inspector = MEVInspector(rpc)
|
||||||
|
base_provider = get_base_provider(rpc)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"inspector": inspector,
|
||||||
|
"base_provider": base_provider,
|
||||||
|
"killer": killer,
|
||||||
|
"healthcheck_url": healthcheck_url,
|
||||||
|
"inspect_db_session": inspect_db_session,
|
||||||
|
"trace_db_session": trace_db_session,
|
||||||
|
"export_actor": export_actor,
|
||||||
|
}
|
||||||
|
except Exception:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
try:
|
try:
|
||||||
run()
|
run()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(e)
|
logger.error(e)
|
||||||
if rpc_list is not None and CURRENT_RPC < len(rpc_list):
|
|
||||||
rpc = rpc_list[CURRENT_RPC + 1]
|
|
||||||
run()
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user