From a58863b992d4515a4c06a8edde5526829342c392 Mon Sep 17 00:00:00 2001 From: Luke Van Seters Date: Tue, 15 Feb 2022 10:25:08 -0500 Subject: [PATCH] Add priorities to queue tasks --- cli.py | 17 ++++++++++++++--- mev_inspect/queue/tasks.py | 6 ++++++ worker.py | 11 ++++++++--- 3 files changed, 28 insertions(+), 6 deletions(-) diff --git a/cli.py b/cli.py index 597039a..0b360e0 100644 --- a/cli.py +++ b/cli.py @@ -12,7 +12,12 @@ from mev_inspect.db import get_inspect_session, get_trace_session from mev_inspect.inspector import MEVInspector from mev_inspect.prices import fetch_prices, fetch_prices_range from mev_inspect.queue.broker import connect_broker -from mev_inspect.queue.tasks import export_block_task, inspect_many_blocks_task +from mev_inspect.queue.tasks import ( + BACKFILL_INSPECT_MANY_BLOCKS_PRIORITY, + LIVE_EXPORT_BLOCK_PRIORITY, + export_block_task, + inspect_many_blocks_task, +) from mev_inspect.s3_export import export_block RPC_URL_ENV = "RPC_URL" @@ -102,7 +107,11 @@ async def inspect_many_blocks_command( @click.argument("batch_size", type=int, default=10) def enqueue_many_blocks_command(start_block: int, end_block: int, batch_size: int): broker = connect_broker() - inspect_many_blocks_actor = dramatiq.actor(inspect_many_blocks_task, broker=broker) + inspect_many_blocks_actor = dramatiq.actor( + inspect_many_blocks_task, + broker=broker, + priority=BACKFILL_INSPECT_MANY_BLOCKS_PRIORITY, + ) if start_block < end_block: after_block = start_block @@ -137,7 +146,9 @@ def fetch_all_prices(): @click.argument("block_number", type=int) def enqueue_s3_export(block_number: int): broker = connect_broker() - export_actor = dramatiq.actor(export_block_task, broker=broker) + export_actor = dramatiq.actor( + export_block_task, broker=broker, priority=LIVE_EXPORT_BLOCK_PRIORITY + ) logger.info(f"Sending block {block_number} export to queue") export_actor.send(block_number) diff --git a/mev_inspect/queue/tasks.py b/mev_inspect/queue/tasks.py index a55efd5..8d2f6fa 100644 --- a/mev_inspect/queue/tasks.py +++ b/mev_inspect/queue/tasks.py @@ -9,6 +9,12 @@ from .middleware import DbMiddleware, InspectorMiddleware logger = logging.getLogger(__name__) +# create a separate actor for backfill export +# set to same priority as backfill inspect +LIVE_EXPORT_BLOCK_PRIORITY = 1 +BACKFILL_INSPECT_MANY_BLOCKS_PRIORITY = 0 + + def inspect_many_blocks_task( after_block: int, before_block: int, diff --git a/worker.py b/worker.py index e0fc895..c9b58ec 100644 --- a/worker.py +++ b/worker.py @@ -10,7 +10,12 @@ from mev_inspect.queue.middleware import ( DbMiddleware, InspectorMiddleware, ) -from mev_inspect.queue.tasks import export_block_task, inspect_many_blocks_task +from mev_inspect.queue.tasks import ( + BACKFILL_INSPECT_MANY_BLOCKS_PRIORITY, + LIVE_EXPORT_BLOCK_PRIORITY, + export_block_task, + inspect_many_blocks_task, +) logging.basicConfig(stream=sys.stdout, level=logging.INFO) @@ -20,5 +25,5 @@ broker.add_middleware(AsyncMiddleware()) broker.add_middleware(InspectorMiddleware(os.environ["RPC_URL"])) dramatiq.set_broker(broker) -dramatiq.actor(inspect_many_blocks_task) -dramatiq.actor(export_block_task) +dramatiq.actor(inspect_many_blocks_task, priority=BACKFILL_INSPECT_MANY_BLOCKS_PRIORITY) +dramatiq.actor(export_block_task, priority=LIVE_EXPORT_BLOCK_PRIORITY)