diff --git a/cli.py b/cli.py index 597039a..9c5cefb 100644 --- a/cli.py +++ b/cli.py @@ -12,7 +12,14 @@ from mev_inspect.db import get_inspect_session, get_trace_session from mev_inspect.inspector import MEVInspector from mev_inspect.prices import fetch_prices, fetch_prices_range from mev_inspect.queue.broker import connect_broker -from mev_inspect.queue.tasks import export_block_task, inspect_many_blocks_task +from mev_inspect.queue.tasks import ( + HIGH_PRIORITY, + HIGH_PRIORITY_QUEUE, + LOW_PRIORITY, + LOW_PRIORITY_QUEUE, + export_block_task, + inspect_many_blocks_task, +) from mev_inspect.s3_export import export_block RPC_URL_ENV = "RPC_URL" @@ -102,7 +109,12 @@ async def inspect_many_blocks_command( @click.argument("batch_size", type=int, default=10) def enqueue_many_blocks_command(start_block: int, end_block: int, batch_size: int): broker = connect_broker() - inspect_many_blocks_actor = dramatiq.actor(inspect_many_blocks_task, broker=broker) + inspect_many_blocks_actor = dramatiq.actor( + inspect_many_blocks_task, + broker=broker, + queue_name=LOW_PRIORITY_QUEUE, + priority=LOW_PRIORITY, + ) if start_block < end_block: after_block = start_block @@ -137,7 +149,12 @@ def fetch_all_prices(): @click.argument("block_number", type=int) def enqueue_s3_export(block_number: int): broker = connect_broker() - export_actor = dramatiq.actor(export_block_task, broker=broker) + export_actor = dramatiq.actor( + export_block_task, + broker=broker, + queue_name=HIGH_PRIORITY_QUEUE, + priority=HIGH_PRIORITY, + ) logger.info(f"Sending block {block_number} export to queue") export_actor.send(block_number) diff --git a/listener.py b/listener.py index 047083a..cf7736a 100644 --- a/listener.py +++ b/listener.py @@ -15,7 +15,11 @@ from mev_inspect.db import get_inspect_session, get_trace_session from mev_inspect.inspector import MEVInspector from mev_inspect.provider import get_base_provider from mev_inspect.queue.broker import connect_broker -from mev_inspect.queue.tasks import export_block_task +from mev_inspect.queue.tasks import ( + HIGH_PRIORITY, + HIGH_PRIORITY_QUEUE, + export_block_task, +) from mev_inspect.signal_handler import GracefulKiller logging.basicConfig(filename="listener.log", filemode="a", level=logging.INFO) @@ -41,7 +45,12 @@ async def run(): trace_db_session = get_trace_session() broker = connect_broker() - export_actor = dramatiq.actor(export_block_task, broker=broker) + export_actor = dramatiq.actor( + export_block_task, + broker=broker, + queue_name=HIGH_PRIORITY_QUEUE, + priority=HIGH_PRIORITY, + ) inspector = MEVInspector(rpc) base_provider = get_base_provider(rpc) diff --git a/mev_inspect/queue/tasks.py b/mev_inspect/queue/tasks.py index a55efd5..f7c9272 100644 --- a/mev_inspect/queue/tasks.py +++ b/mev_inspect/queue/tasks.py @@ -9,6 +9,13 @@ from .middleware import DbMiddleware, InspectorMiddleware logger = logging.getLogger(__name__) +HIGH_PRIORITY_QUEUE = "high" +LOW_PRIORITY_QUEUE = "low" + +HIGH_PRIORITY = 0 +LOW_PRIORITY = 1 + + def inspect_many_blocks_task( after_block: int, before_block: int, diff --git a/worker.py b/worker.py index e0fc895..82cf349 100644 --- a/worker.py +++ b/worker.py @@ -10,7 +10,12 @@ from mev_inspect.queue.middleware import ( DbMiddleware, InspectorMiddleware, ) -from mev_inspect.queue.tasks import export_block_task, inspect_many_blocks_task +from mev_inspect.queue.tasks import ( + HIGH_PRIORITY_QUEUE, + LOW_PRIORITY_QUEUE, + export_block_task, + inspect_many_blocks_task, +) logging.basicConfig(stream=sys.stdout, level=logging.INFO) @@ -20,5 +25,5 @@ broker.add_middleware(AsyncMiddleware()) broker.add_middleware(InspectorMiddleware(os.environ["RPC_URL"])) dramatiq.set_broker(broker) -dramatiq.actor(inspect_many_blocks_task) -dramatiq.actor(export_block_task) +dramatiq.actor(inspect_many_blocks_task, queue_name=HIGH_PRIORITY_QUEUE) +dramatiq.actor(export_block_task, queue_name=LOW_PRIORITY_QUEUE)