From a58863b992d4515a4c06a8edde5526829342c392 Mon Sep 17 00:00:00 2001 From: Luke Van Seters Date: Tue, 15 Feb 2022 10:25:08 -0500 Subject: [PATCH 1/4] Add priorities to queue tasks --- cli.py | 17 ++++++++++++++--- mev_inspect/queue/tasks.py | 6 ++++++ worker.py | 11 ++++++++--- 3 files changed, 28 insertions(+), 6 deletions(-) diff --git a/cli.py b/cli.py index 597039a..0b360e0 100644 --- a/cli.py +++ b/cli.py @@ -12,7 +12,12 @@ from mev_inspect.db import get_inspect_session, get_trace_session from mev_inspect.inspector import MEVInspector from mev_inspect.prices import fetch_prices, fetch_prices_range from mev_inspect.queue.broker import connect_broker -from mev_inspect.queue.tasks import export_block_task, inspect_many_blocks_task +from mev_inspect.queue.tasks import ( + BACKFILL_INSPECT_MANY_BLOCKS_PRIORITY, + LIVE_EXPORT_BLOCK_PRIORITY, + export_block_task, + inspect_many_blocks_task, +) from mev_inspect.s3_export import export_block RPC_URL_ENV = "RPC_URL" @@ -102,7 +107,11 @@ async def inspect_many_blocks_command( @click.argument("batch_size", type=int, default=10) def enqueue_many_blocks_command(start_block: int, end_block: int, batch_size: int): broker = connect_broker() - inspect_many_blocks_actor = dramatiq.actor(inspect_many_blocks_task, broker=broker) + inspect_many_blocks_actor = dramatiq.actor( + inspect_many_blocks_task, + broker=broker, + priority=BACKFILL_INSPECT_MANY_BLOCKS_PRIORITY, + ) if start_block < end_block: after_block = start_block @@ -137,7 +146,9 @@ def fetch_all_prices(): @click.argument("block_number", type=int) def enqueue_s3_export(block_number: int): broker = connect_broker() - export_actor = dramatiq.actor(export_block_task, broker=broker) + export_actor = dramatiq.actor( + export_block_task, broker=broker, priority=LIVE_EXPORT_BLOCK_PRIORITY + ) logger.info(f"Sending block {block_number} export to queue") export_actor.send(block_number) diff --git a/mev_inspect/queue/tasks.py b/mev_inspect/queue/tasks.py index a55efd5..8d2f6fa 100644 --- a/mev_inspect/queue/tasks.py +++ b/mev_inspect/queue/tasks.py @@ -9,6 +9,12 @@ from .middleware import DbMiddleware, InspectorMiddleware logger = logging.getLogger(__name__) +# create a separate actor for backfill export +# set to same priority as backfill inspect +LIVE_EXPORT_BLOCK_PRIORITY = 1 +BACKFILL_INSPECT_MANY_BLOCKS_PRIORITY = 0 + + def inspect_many_blocks_task( after_block: int, before_block: int, diff --git a/worker.py b/worker.py index e0fc895..c9b58ec 100644 --- a/worker.py +++ b/worker.py @@ -10,7 +10,12 @@ from mev_inspect.queue.middleware import ( DbMiddleware, InspectorMiddleware, ) -from mev_inspect.queue.tasks import export_block_task, inspect_many_blocks_task +from mev_inspect.queue.tasks import ( + BACKFILL_INSPECT_MANY_BLOCKS_PRIORITY, + LIVE_EXPORT_BLOCK_PRIORITY, + export_block_task, + inspect_many_blocks_task, +) logging.basicConfig(stream=sys.stdout, level=logging.INFO) @@ -20,5 +25,5 @@ broker.add_middleware(AsyncMiddleware()) broker.add_middleware(InspectorMiddleware(os.environ["RPC_URL"])) dramatiq.set_broker(broker) -dramatiq.actor(inspect_many_blocks_task) -dramatiq.actor(export_block_task) +dramatiq.actor(inspect_many_blocks_task, priority=BACKFILL_INSPECT_MANY_BLOCKS_PRIORITY) +dramatiq.actor(export_block_task, priority=LIVE_EXPORT_BLOCK_PRIORITY) From 94f4ec7d40043e58e66eecbe6f938d8e846543a1 Mon Sep 17 00:00:00 2001 From: Luke Van Seters Date: Tue, 15 Feb 2022 10:28:06 -0500 Subject: [PATCH 2/4] Fix priorities. Lower comes first --- mev_inspect/queue/tasks.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/mev_inspect/queue/tasks.py b/mev_inspect/queue/tasks.py index 8d2f6fa..fc9659a 100644 --- a/mev_inspect/queue/tasks.py +++ b/mev_inspect/queue/tasks.py @@ -9,10 +9,11 @@ from .middleware import DbMiddleware, InspectorMiddleware logger = logging.getLogger(__name__) +# when we have export backfill, # create a separate actor for backfill export -# set to same priority as backfill inspect -LIVE_EXPORT_BLOCK_PRIORITY = 1 -BACKFILL_INSPECT_MANY_BLOCKS_PRIORITY = 0 +# and set to same priority as backfill inspect +LIVE_EXPORT_BLOCK_PRIORITY = 0 +BACKFILL_INSPECT_MANY_BLOCKS_PRIORITY = 1 def inspect_many_blocks_task( From 7a3f3874b6139b6ac52c8cfaf5ea4124fa3bfa71 Mon Sep 17 00:00:00 2001 From: Luke Van Seters Date: Tue, 15 Feb 2022 11:59:04 -0500 Subject: [PATCH 3/4] Add separate queue names to consume both then internally prioritize --- cli.py | 14 ++++++++++---- mev_inspect/queue/tasks.py | 10 +++++----- worker.py | 8 ++++---- 3 files changed, 19 insertions(+), 13 deletions(-) diff --git a/cli.py b/cli.py index 0b360e0..9c5cefb 100644 --- a/cli.py +++ b/cli.py @@ -13,8 +13,10 @@ from mev_inspect.inspector import MEVInspector from mev_inspect.prices import fetch_prices, fetch_prices_range from mev_inspect.queue.broker import connect_broker from mev_inspect.queue.tasks import ( - BACKFILL_INSPECT_MANY_BLOCKS_PRIORITY, - LIVE_EXPORT_BLOCK_PRIORITY, + HIGH_PRIORITY, + HIGH_PRIORITY_QUEUE, + LOW_PRIORITY, + LOW_PRIORITY_QUEUE, export_block_task, inspect_many_blocks_task, ) @@ -110,7 +112,8 @@ def enqueue_many_blocks_command(start_block: int, end_block: int, batch_size: in inspect_many_blocks_actor = dramatiq.actor( inspect_many_blocks_task, broker=broker, - priority=BACKFILL_INSPECT_MANY_BLOCKS_PRIORITY, + queue_name=LOW_PRIORITY_QUEUE, + priority=LOW_PRIORITY, ) if start_block < end_block: @@ -147,7 +150,10 @@ def fetch_all_prices(): def enqueue_s3_export(block_number: int): broker = connect_broker() export_actor = dramatiq.actor( - export_block_task, broker=broker, priority=LIVE_EXPORT_BLOCK_PRIORITY + export_block_task, + broker=broker, + queue_name=HIGH_PRIORITY_QUEUE, + priority=HIGH_PRIORITY, ) logger.info(f"Sending block {block_number} export to queue") export_actor.send(block_number) diff --git a/mev_inspect/queue/tasks.py b/mev_inspect/queue/tasks.py index fc9659a..f7c9272 100644 --- a/mev_inspect/queue/tasks.py +++ b/mev_inspect/queue/tasks.py @@ -9,11 +9,11 @@ from .middleware import DbMiddleware, InspectorMiddleware logger = logging.getLogger(__name__) -# when we have export backfill, -# create a separate actor for backfill export -# and set to same priority as backfill inspect -LIVE_EXPORT_BLOCK_PRIORITY = 0 -BACKFILL_INSPECT_MANY_BLOCKS_PRIORITY = 1 +HIGH_PRIORITY_QUEUE = "high" +LOW_PRIORITY_QUEUE = "low" + +HIGH_PRIORITY = 0 +LOW_PRIORITY = 1 def inspect_many_blocks_task( diff --git a/worker.py b/worker.py index c9b58ec..82cf349 100644 --- a/worker.py +++ b/worker.py @@ -11,8 +11,8 @@ from mev_inspect.queue.middleware import ( InspectorMiddleware, ) from mev_inspect.queue.tasks import ( - BACKFILL_INSPECT_MANY_BLOCKS_PRIORITY, - LIVE_EXPORT_BLOCK_PRIORITY, + HIGH_PRIORITY_QUEUE, + LOW_PRIORITY_QUEUE, export_block_task, inspect_many_blocks_task, ) @@ -25,5 +25,5 @@ broker.add_middleware(AsyncMiddleware()) broker.add_middleware(InspectorMiddleware(os.environ["RPC_URL"])) dramatiq.set_broker(broker) -dramatiq.actor(inspect_many_blocks_task, priority=BACKFILL_INSPECT_MANY_BLOCKS_PRIORITY) -dramatiq.actor(export_block_task, priority=LIVE_EXPORT_BLOCK_PRIORITY) +dramatiq.actor(inspect_many_blocks_task, queue_name=HIGH_PRIORITY_QUEUE) +dramatiq.actor(export_block_task, queue_name=LOW_PRIORITY_QUEUE) From 2e40c8bd5ec586c306ba5a2b110ed9ab3a010229 Mon Sep 17 00:00:00 2001 From: Luke Van Seters Date: Tue, 15 Feb 2022 12:01:02 -0500 Subject: [PATCH 4/4] Also add priority to listener --- listener.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/listener.py b/listener.py index 047083a..cf7736a 100644 --- a/listener.py +++ b/listener.py @@ -15,7 +15,11 @@ from mev_inspect.db import get_inspect_session, get_trace_session from mev_inspect.inspector import MEVInspector from mev_inspect.provider import get_base_provider from mev_inspect.queue.broker import connect_broker -from mev_inspect.queue.tasks import export_block_task +from mev_inspect.queue.tasks import ( + HIGH_PRIORITY, + HIGH_PRIORITY_QUEUE, + export_block_task, +) from mev_inspect.signal_handler import GracefulKiller logging.basicConfig(filename="listener.log", filemode="a", level=logging.INFO) @@ -41,7 +45,12 @@ async def run(): trace_db_session = get_trace_session() broker = connect_broker() - export_actor = dramatiq.actor(export_block_task, broker=broker) + export_actor = dramatiq.actor( + export_block_task, + broker=broker, + queue_name=HIGH_PRIORITY_QUEUE, + priority=HIGH_PRIORITY, + ) inspector = MEVInspector(rpc) base_provider = get_base_provider(rpc)