Add separate queue names to consume both then internally prioritize

This commit is contained in:
Luke Van Seters 2022-02-15 11:59:04 -05:00
parent 94f4ec7d40
commit 7a3f3874b6
3 changed files with 19 additions and 13 deletions

14
cli.py
View File

@ -13,8 +13,10 @@ from mev_inspect.inspector import MEVInspector
from mev_inspect.prices import fetch_prices, fetch_prices_range from mev_inspect.prices import fetch_prices, fetch_prices_range
from mev_inspect.queue.broker import connect_broker from mev_inspect.queue.broker import connect_broker
from mev_inspect.queue.tasks import ( from mev_inspect.queue.tasks import (
BACKFILL_INSPECT_MANY_BLOCKS_PRIORITY, HIGH_PRIORITY,
LIVE_EXPORT_BLOCK_PRIORITY, HIGH_PRIORITY_QUEUE,
LOW_PRIORITY,
LOW_PRIORITY_QUEUE,
export_block_task, export_block_task,
inspect_many_blocks_task, inspect_many_blocks_task,
) )
@ -110,7 +112,8 @@ def enqueue_many_blocks_command(start_block: int, end_block: int, batch_size: in
inspect_many_blocks_actor = dramatiq.actor( inspect_many_blocks_actor = dramatiq.actor(
inspect_many_blocks_task, inspect_many_blocks_task,
broker=broker, broker=broker,
priority=BACKFILL_INSPECT_MANY_BLOCKS_PRIORITY, queue_name=LOW_PRIORITY_QUEUE,
priority=LOW_PRIORITY,
) )
if start_block < end_block: if start_block < end_block:
@ -147,7 +150,10 @@ def fetch_all_prices():
def enqueue_s3_export(block_number: int): def enqueue_s3_export(block_number: int):
broker = connect_broker() broker = connect_broker()
export_actor = dramatiq.actor( export_actor = dramatiq.actor(
export_block_task, broker=broker, priority=LIVE_EXPORT_BLOCK_PRIORITY export_block_task,
broker=broker,
queue_name=HIGH_PRIORITY_QUEUE,
priority=HIGH_PRIORITY,
) )
logger.info(f"Sending block {block_number} export to queue") logger.info(f"Sending block {block_number} export to queue")
export_actor.send(block_number) export_actor.send(block_number)

View File

@ -9,11 +9,11 @@ from .middleware import DbMiddleware, InspectorMiddleware
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# when we have export backfill, HIGH_PRIORITY_QUEUE = "high"
# create a separate actor for backfill export LOW_PRIORITY_QUEUE = "low"
# and set to same priority as backfill inspect
LIVE_EXPORT_BLOCK_PRIORITY = 0 HIGH_PRIORITY = 0
BACKFILL_INSPECT_MANY_BLOCKS_PRIORITY = 1 LOW_PRIORITY = 1
def inspect_many_blocks_task( def inspect_many_blocks_task(

View File

@ -11,8 +11,8 @@ from mev_inspect.queue.middleware import (
InspectorMiddleware, InspectorMiddleware,
) )
from mev_inspect.queue.tasks import ( from mev_inspect.queue.tasks import (
BACKFILL_INSPECT_MANY_BLOCKS_PRIORITY, HIGH_PRIORITY_QUEUE,
LIVE_EXPORT_BLOCK_PRIORITY, LOW_PRIORITY_QUEUE,
export_block_task, export_block_task,
inspect_many_blocks_task, inspect_many_blocks_task,
) )
@ -25,5 +25,5 @@ broker.add_middleware(AsyncMiddleware())
broker.add_middleware(InspectorMiddleware(os.environ["RPC_URL"])) broker.add_middleware(InspectorMiddleware(os.environ["RPC_URL"]))
dramatiq.set_broker(broker) dramatiq.set_broker(broker)
dramatiq.actor(inspect_many_blocks_task, priority=BACKFILL_INSPECT_MANY_BLOCKS_PRIORITY) dramatiq.actor(inspect_many_blocks_task, queue_name=HIGH_PRIORITY_QUEUE)
dramatiq.actor(export_block_task, priority=LIVE_EXPORT_BLOCK_PRIORITY) dramatiq.actor(export_block_task, queue_name=LOW_PRIORITY_QUEUE)