diff --git a/cli.py b/cli.py index 597039a..0b360e0 100644 --- a/cli.py +++ b/cli.py @@ -12,7 +12,12 @@ from mev_inspect.db import get_inspect_session, get_trace_session from mev_inspect.inspector import MEVInspector from mev_inspect.prices import fetch_prices, fetch_prices_range from mev_inspect.queue.broker import connect_broker -from mev_inspect.queue.tasks import export_block_task, inspect_many_blocks_task +from mev_inspect.queue.tasks import ( + BACKFILL_INSPECT_MANY_BLOCKS_PRIORITY, + LIVE_EXPORT_BLOCK_PRIORITY, + export_block_task, + inspect_many_blocks_task, +) from mev_inspect.s3_export import export_block RPC_URL_ENV = "RPC_URL" @@ -102,7 +107,11 @@ async def inspect_many_blocks_command( @click.argument("batch_size", type=int, default=10) def enqueue_many_blocks_command(start_block: int, end_block: int, batch_size: int): broker = connect_broker() - inspect_many_blocks_actor = dramatiq.actor(inspect_many_blocks_task, broker=broker) + inspect_many_blocks_actor = dramatiq.actor( + inspect_many_blocks_task, + broker=broker, + priority=BACKFILL_INSPECT_MANY_BLOCKS_PRIORITY, + ) if start_block < end_block: after_block = start_block @@ -137,7 +146,9 @@ def fetch_all_prices(): @click.argument("block_number", type=int) def enqueue_s3_export(block_number: int): broker = connect_broker() - export_actor = dramatiq.actor(export_block_task, broker=broker) + export_actor = dramatiq.actor( + export_block_task, broker=broker, priority=LIVE_EXPORT_BLOCK_PRIORITY + ) logger.info(f"Sending block {block_number} export to queue") export_actor.send(block_number) diff --git a/mev_inspect/queue/tasks.py b/mev_inspect/queue/tasks.py index a55efd5..8d2f6fa 100644 --- a/mev_inspect/queue/tasks.py +++ b/mev_inspect/queue/tasks.py @@ -9,6 +9,12 @@ from .middleware import DbMiddleware, InspectorMiddleware logger = logging.getLogger(__name__) +# create a separate actor for backfill export +# set to same priority as backfill inspect +LIVE_EXPORT_BLOCK_PRIORITY = 1 +BACKFILL_INSPECT_MANY_BLOCKS_PRIORITY = 0 + + def inspect_many_blocks_task( after_block: int, before_block: int, diff --git a/worker.py b/worker.py index e0fc895..c9b58ec 100644 --- a/worker.py +++ b/worker.py @@ -10,7 +10,12 @@ from mev_inspect.queue.middleware import ( DbMiddleware, InspectorMiddleware, ) -from mev_inspect.queue.tasks import export_block_task, inspect_many_blocks_task +from mev_inspect.queue.tasks import ( + BACKFILL_INSPECT_MANY_BLOCKS_PRIORITY, + LIVE_EXPORT_BLOCK_PRIORITY, + export_block_task, + inspect_many_blocks_task, +) logging.basicConfig(stream=sys.stdout, level=logging.INFO) @@ -20,5 +25,5 @@ broker.add_middleware(AsyncMiddleware()) broker.add_middleware(InspectorMiddleware(os.environ["RPC_URL"])) dramatiq.set_broker(broker) -dramatiq.actor(inspect_many_blocks_task) -dramatiq.actor(export_block_task) +dramatiq.actor(inspect_many_blocks_task, priority=BACKFILL_INSPECT_MANY_BLOCKS_PRIORITY) +dramatiq.actor(export_block_task, priority=LIVE_EXPORT_BLOCK_PRIORITY)