From cbec5b76137c868b95a9595c6b99327b46610390 Mon Sep 17 00:00:00 2001 From: Luke Van Seters Date: Fri, 31 Dec 2021 16:12:36 -0500 Subject: [PATCH] Only build inspector once --- worker.py | 31 +++++++++++++++++++++---------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/worker.py b/worker.py index e9969d2..78b1d05 100644 --- a/worker.py +++ b/worker.py @@ -1,5 +1,6 @@ import asyncio import os +import threading from contextlib import contextmanager import dramatiq @@ -13,6 +14,8 @@ from mev_inspect.inspector import MEVInspector InspectSession = get_inspect_sessionmaker() TraceSession = get_trace_sessionmaker() +thread_local = threading.local() + class AsyncMiddleware(Middleware): def before_process_message( @@ -27,9 +30,22 @@ class AsyncMiddleware(Middleware): self.loop.close() +class InspectorMiddleware(Middleware): + def before_process_message( + self, _broker, worker + ): # pylint: disable=unused-argument + if not hasattr(thread_local, "inspector"): + thread_local.inspector = MEVInspector( + rpc, + max_concurrency=5, + request_timeout=300, + ) + + rpc = os.environ["RPC_URL"] broker = RedisBroker(host="redis-master", password=os.environ["REDIS_PASSWORD"]) broker.add_middleware(AsyncMiddleware()) +broker.add_middleware(InspectorMiddleware()) dramatiq.set_broker(broker) @@ -49,17 +65,12 @@ def inspect_many_blocks_task( ): with session_scope(InspectSession) as inspect_db_session: with session_scope(TraceSession) as trace_db_session: - inspector = MEVInspector( - rpc, - inspect_db_session, - trace_db_session, - max_concurrency=5, - request_timeout=300, - ) - asyncio.run( - inspector.inspect_many_blocks( - after_block=after_block, before_block=before_block + thread_local.inspector.inspect_many_blocks( + inspect_db_session=inspect_db_session, + trace_db_session=trace_db_session, + after_block=after_block, + before_block=before_block, ) )