diff --git a/cli.py b/cli.py index 0b360e0..9c5cefb 100644 --- a/cli.py +++ b/cli.py @@ -13,8 +13,10 @@ from mev_inspect.inspector import MEVInspector from mev_inspect.prices import fetch_prices, fetch_prices_range from mev_inspect.queue.broker import connect_broker from mev_inspect.queue.tasks import ( - BACKFILL_INSPECT_MANY_BLOCKS_PRIORITY, - LIVE_EXPORT_BLOCK_PRIORITY, + HIGH_PRIORITY, + HIGH_PRIORITY_QUEUE, + LOW_PRIORITY, + LOW_PRIORITY_QUEUE, export_block_task, inspect_many_blocks_task, ) @@ -110,7 +112,8 @@ def enqueue_many_blocks_command(start_block: int, end_block: int, batch_size: in inspect_many_blocks_actor = dramatiq.actor( inspect_many_blocks_task, broker=broker, - priority=BACKFILL_INSPECT_MANY_BLOCKS_PRIORITY, + queue_name=LOW_PRIORITY_QUEUE, + priority=LOW_PRIORITY, ) if start_block < end_block: @@ -147,7 +150,10 @@ def fetch_all_prices(): def enqueue_s3_export(block_number: int): broker = connect_broker() export_actor = dramatiq.actor( - export_block_task, broker=broker, priority=LIVE_EXPORT_BLOCK_PRIORITY + export_block_task, + broker=broker, + queue_name=HIGH_PRIORITY_QUEUE, + priority=HIGH_PRIORITY, ) logger.info(f"Sending block {block_number} export to queue") export_actor.send(block_number) diff --git a/mev_inspect/queue/tasks.py b/mev_inspect/queue/tasks.py index fc9659a..f7c9272 100644 --- a/mev_inspect/queue/tasks.py +++ b/mev_inspect/queue/tasks.py @@ -9,11 +9,11 @@ from .middleware import DbMiddleware, InspectorMiddleware logger = logging.getLogger(__name__) -# when we have export backfill, -# create a separate actor for backfill export -# and set to same priority as backfill inspect -LIVE_EXPORT_BLOCK_PRIORITY = 0 -BACKFILL_INSPECT_MANY_BLOCKS_PRIORITY = 1 +HIGH_PRIORITY_QUEUE = "high" +LOW_PRIORITY_QUEUE = "low" + +HIGH_PRIORITY = 0 +LOW_PRIORITY = 1 def inspect_many_blocks_task( diff --git a/worker.py b/worker.py index c9b58ec..82cf349 100644 --- a/worker.py +++ b/worker.py @@ -11,8 +11,8 @@ from mev_inspect.queue.middleware import ( InspectorMiddleware, ) from mev_inspect.queue.tasks import ( - BACKFILL_INSPECT_MANY_BLOCKS_PRIORITY, - LIVE_EXPORT_BLOCK_PRIORITY, + HIGH_PRIORITY_QUEUE, + LOW_PRIORITY_QUEUE, export_block_task, inspect_many_blocks_task, ) @@ -25,5 +25,5 @@ broker.add_middleware(AsyncMiddleware()) broker.add_middleware(InspectorMiddleware(os.environ["RPC_URL"])) dramatiq.set_broker(broker) -dramatiq.actor(inspect_many_blocks_task, priority=BACKFILL_INSPECT_MANY_BLOCKS_PRIORITY) -dramatiq.actor(export_block_task, priority=LIVE_EXPORT_BLOCK_PRIORITY) +dramatiq.actor(inspect_many_blocks_task, queue_name=HIGH_PRIORITY_QUEUE) +dramatiq.actor(export_block_task, queue_name=LOW_PRIORITY_QUEUE)