From ecb3a563c16fdc3580e412381466954a2cdc7e5b Mon Sep 17 00:00:00 2001 From: Luke Van Seters Date: Wed, 2 Feb 2022 13:16:36 -0500 Subject: [PATCH 1/2] Separate tasks from the worker --- cli.py | 10 ++-- mev_inspect/queue/__init__.py | 0 mev_inspect/queue/broker.py | 7 +++ mev_inspect/queue/middleware.py | 75 ++++++++++++++++++++++++++++++ mev_inspect/queue/tasks.py | 32 +++++++++++++ worker.py | 82 +++++---------------------------- 6 files changed, 131 insertions(+), 75 deletions(-) create mode 100644 mev_inspect/queue/__init__.py create mode 100644 mev_inspect/queue/broker.py create mode 100644 mev_inspect/queue/middleware.py create mode 100644 mev_inspect/queue/tasks.py diff --git a/cli.py b/cli.py index e480ea8..3730e0f 100644 --- a/cli.py +++ b/cli.py @@ -4,12 +4,15 @@ import sys from datetime import datetime import click +import dramatiq from mev_inspect.concurrency import coro from mev_inspect.crud.prices import write_prices from mev_inspect.db import get_inspect_session, get_trace_session from mev_inspect.inspector import MEVInspector from mev_inspect.prices import fetch_prices, fetch_prices_range +from mev_inspect.queue.broker import connect_broker +from mev_inspect.queue.tasks import inspect_many_blocks_task RPC_URL_ENV = "RPC_URL" @@ -97,14 +100,13 @@ async def inspect_many_blocks_command( @click.argument("before_block", type=int) @click.argument("batch_size", type=int, default=10) def enqueue_many_blocks_command(after_block: int, before_block: int, batch_size: int): - from worker import ( # pylint: disable=import-outside-toplevel - inspect_many_blocks_task, - ) + broker = connect_broker() + inspect_many_blocks_actor = dramatiq.actor(inspect_many_blocks_task, broker=broker) for batch_after_block in range(after_block, before_block, batch_size): batch_before_block = min(batch_after_block + batch_size, before_block) logger.info(f"Sending {batch_after_block} to {batch_before_block}") - inspect_many_blocks_task.send(batch_after_block, batch_before_block) + inspect_many_blocks_actor.send(batch_after_block, batch_before_block) @cli.command() diff --git a/mev_inspect/queue/__init__.py b/mev_inspect/queue/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/mev_inspect/queue/broker.py b/mev_inspect/queue/broker.py new file mode 100644 index 0000000..4b157ce --- /dev/null +++ b/mev_inspect/queue/broker.py @@ -0,0 +1,7 @@ +import os + +from dramatiq.brokers.redis import RedisBroker + + +def connect_broker(): + return RedisBroker(host="redis-master", password=os.environ["REDIS_PASSWORD"]) diff --git a/mev_inspect/queue/middleware.py b/mev_inspect/queue/middleware.py new file mode 100644 index 0000000..075b35a --- /dev/null +++ b/mev_inspect/queue/middleware.py @@ -0,0 +1,75 @@ +import asyncio +import logging +from threading import local + +from dramatiq.middleware import Middleware + +from mev_inspect.db import get_inspect_sessionmaker, get_trace_sessionmaker +from mev_inspect.inspector import MEVInspector + +logger = logging.getLogger(__name__) + + +class DbMiddleware(Middleware): + STATE = local() + INSPECT_SESSION_STATE_KEY = "InspectSession" + TRACE_SESSION_STATE_KEY = "TraceSession" + + @classmethod + def get_inspect_sessionmaker(cls): + return getattr(cls.STATE, cls.INSPECT_SESSION_STATE_KEY, None) + + @classmethod + def get_trace_sessionmaker(cls): + return getattr(cls.STATE, cls.TRACE_SESSION_STATE_KEY, None) + + def before_process_message(self, _broker, message): + if not hasattr(self.STATE, self.INSPECT_SESSION_STATE_KEY): + logger.info("Building sessionmakers") + setattr( + self.STATE, self.INSPECT_SESSION_STATE_KEY, get_inspect_sessionmaker() + ) + setattr(self.STATE, self.TRACE_SESSION_STATE_KEY, get_trace_sessionmaker()) + else: + logger.info("Sessionmakers already set") + + +class InspectorMiddleware(Middleware): + STATE = local() + INSPECT_STATE_KEY = "inspector" + + def __init__(self, rpc_url): + self._rpc_url = rpc_url + + @classmethod + def get_inspector(cls): + return getattr(cls.STATE, cls.INSPECT_STATE_KEY, None) + + def before_process_message( + self, _broker, worker + ): # pylint: disable=unused-argument + if not hasattr(self.STATE, self.INSPECT_STATE_KEY): + logger.info("Building inspector") + inspector = MEVInspector( + self._rpc_url, + max_concurrency=5, + request_timeout=300, + ) + + setattr(self.STATE, self.INSPECT_STATE_KEY, inspector) + else: + logger.info("Inspector already exists") + + +class AsyncMiddleware(Middleware): + def before_process_message( + self, _broker, message + ): # pylint: disable=unused-argument + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + + def after_process_message( + self, _broker, message, *, result=None, exception=None + ): # pylint: disable=unused-argument + if hasattr(self, "loop"): + self.loop.close() diff --git a/mev_inspect/queue/tasks.py b/mev_inspect/queue/tasks.py new file mode 100644 index 0000000..78a4c3e --- /dev/null +++ b/mev_inspect/queue/tasks.py @@ -0,0 +1,32 @@ +import asyncio +import logging +from contextlib import contextmanager + +from .middleware import DbMiddleware, InspectorMiddleware + +logger = logging.getLogger(__name__) + + +def inspect_many_blocks_task( + after_block: int, + before_block: int, +): + with _session_scope(DbMiddleware.get_inspect_sessionmaker()) as inspect_db_session: + with _session_scope(DbMiddleware.get_trace_sessionmaker()) as trace_db_session: + asyncio.run( + InspectorMiddleware.get_inspector().inspect_many_blocks( + inspect_db_session=inspect_db_session, + trace_db_session=trace_db_session, + after_block=after_block, + before_block=before_block, + ) + ) + + +@contextmanager +def _session_scope(Session=None): + if Session is None: + yield None + else: + with Session() as session: + yield session diff --git a/worker.py b/worker.py index f798d77..3967e6a 100644 --- a/worker.py +++ b/worker.py @@ -1,87 +1,27 @@ -import asyncio import logging import os import sys -import threading -from contextlib import contextmanager import dramatiq -from dramatiq.brokers.redis import RedisBroker from dramatiq.cli import main as dramatiq_worker -from dramatiq.middleware import Middleware -from mev_inspect.db import get_inspect_sessionmaker, get_trace_sessionmaker -from mev_inspect.inspector import MEVInspector +from mev_inspect.queue.broker import connect_broker +from mev_inspect.queue.middleware import ( + AsyncMiddleware, + DbMiddleware, + InspectorMiddleware, +) +from mev_inspect.queue.tasks import inspect_many_blocks_task -InspectSession = get_inspect_sessionmaker() -TraceSession = get_trace_sessionmaker() - -thread_local = threading.local() logging.basicConfig(stream=sys.stdout, level=logging.INFO) -logger = logging.getLogger(__name__) - -class AsyncMiddleware(Middleware): - def before_process_message( - self, _broker, message - ): # pylint: disable=unused-argument - self.loop = asyncio.new_event_loop() - asyncio.set_event_loop(self.loop) - - def after_process_message( - self, _broker, message, *, result=None, exception=None - ): # pylint: disable=unused-argument - self.loop.close() - - -class InspectorMiddleware(Middleware): - def before_process_message( - self, _broker, worker - ): # pylint: disable=unused-argument - rpc = os.environ["RPC_URL"] - - if not hasattr(thread_local, "inspector"): - logger.info("Building inspector") - thread_local.inspector = MEVInspector( - rpc, - max_concurrency=5, - request_timeout=300, - ) - else: - logger.info("Inspector already exists") - - -broker = RedisBroker(host="redis-master", password=os.environ["REDIS_PASSWORD"]) +broker = connect_broker() +broker.add_middleware(DbMiddleware()) broker.add_middleware(AsyncMiddleware()) -broker.add_middleware(InspectorMiddleware()) +broker.add_middleware(InspectorMiddleware(os.environ["RPC_URL"])) dramatiq.set_broker(broker) - -@contextmanager -def session_scope(Session=None): - if Session is None: - yield None - else: - with Session() as session: - yield session - - -@dramatiq.actor -def inspect_many_blocks_task( - after_block: int, - before_block: int, -): - with session_scope(InspectSession) as inspect_db_session: - with session_scope(TraceSession) as trace_db_session: - asyncio.run( - thread_local.inspector.inspect_many_blocks( - inspect_db_session=inspect_db_session, - trace_db_session=trace_db_session, - after_block=after_block, - before_block=before_block, - ) - ) - +dramatiq.actor(inspect_many_blocks_task) if __name__ == "__main__": dramatiq_worker(processes=1, threads=1) From 4db05526b321c99db97bd84394d47a2b758d7243 Mon Sep 17 00:00:00 2001 From: Luke Van Seters Date: Thu, 3 Feb 2022 14:50:19 -0500 Subject: [PATCH 2/2] Remove unused __main__ --- worker.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/worker.py b/worker.py index 3967e6a..a717b47 100644 --- a/worker.py +++ b/worker.py @@ -3,7 +3,6 @@ import os import sys import dramatiq -from dramatiq.cli import main as dramatiq_worker from mev_inspect.queue.broker import connect_broker from mev_inspect.queue.middleware import ( @@ -22,6 +21,3 @@ broker.add_middleware(InspectorMiddleware(os.environ["RPC_URL"])) dramatiq.set_broker(broker) dramatiq.actor(inspect_many_blocks_task) - -if __name__ == "__main__": - dramatiq_worker(processes=1, threads=1)