Only build inspector once
This commit is contained in:
parent
cff148e21f
commit
cbec5b7613
31
worker.py
31
worker.py
@ -1,5 +1,6 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
import os
|
import os
|
||||||
|
import threading
|
||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
|
|
||||||
import dramatiq
|
import dramatiq
|
||||||
@ -13,6 +14,8 @@ from mev_inspect.inspector import MEVInspector
|
|||||||
InspectSession = get_inspect_sessionmaker()
|
InspectSession = get_inspect_sessionmaker()
|
||||||
TraceSession = get_trace_sessionmaker()
|
TraceSession = get_trace_sessionmaker()
|
||||||
|
|
||||||
|
thread_local = threading.local()
|
||||||
|
|
||||||
|
|
||||||
class AsyncMiddleware(Middleware):
|
class AsyncMiddleware(Middleware):
|
||||||
def before_process_message(
|
def before_process_message(
|
||||||
@ -27,9 +30,22 @@ class AsyncMiddleware(Middleware):
|
|||||||
self.loop.close()
|
self.loop.close()
|
||||||
|
|
||||||
|
|
||||||
|
class InspectorMiddleware(Middleware):
|
||||||
|
def before_process_message(
|
||||||
|
self, _broker, worker
|
||||||
|
): # pylint: disable=unused-argument
|
||||||
|
if not hasattr(thread_local, "inspector"):
|
||||||
|
thread_local.inspector = MEVInspector(
|
||||||
|
rpc,
|
||||||
|
max_concurrency=5,
|
||||||
|
request_timeout=300,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
rpc = os.environ["RPC_URL"]
|
rpc = os.environ["RPC_URL"]
|
||||||
broker = RedisBroker(host="redis-master", password=os.environ["REDIS_PASSWORD"])
|
broker = RedisBroker(host="redis-master", password=os.environ["REDIS_PASSWORD"])
|
||||||
broker.add_middleware(AsyncMiddleware())
|
broker.add_middleware(AsyncMiddleware())
|
||||||
|
broker.add_middleware(InspectorMiddleware())
|
||||||
dramatiq.set_broker(broker)
|
dramatiq.set_broker(broker)
|
||||||
|
|
||||||
|
|
||||||
@ -49,17 +65,12 @@ def inspect_many_blocks_task(
|
|||||||
):
|
):
|
||||||
with session_scope(InspectSession) as inspect_db_session:
|
with session_scope(InspectSession) as inspect_db_session:
|
||||||
with session_scope(TraceSession) as trace_db_session:
|
with session_scope(TraceSession) as trace_db_session:
|
||||||
inspector = MEVInspector(
|
|
||||||
rpc,
|
|
||||||
inspect_db_session,
|
|
||||||
trace_db_session,
|
|
||||||
max_concurrency=5,
|
|
||||||
request_timeout=300,
|
|
||||||
)
|
|
||||||
|
|
||||||
asyncio.run(
|
asyncio.run(
|
||||||
inspector.inspect_many_blocks(
|
thread_local.inspector.inspect_many_blocks(
|
||||||
after_block=after_block, before_block=before_block
|
inspect_db_session=inspect_db_session,
|
||||||
|
trace_db_session=trace_db_session,
|
||||||
|
after_block=after_block,
|
||||||
|
before_block=before_block,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user