Add priorities to queue tasks

This commit is contained in:
Luke Van Seters 2022-02-15 10:25:08 -05:00
parent 0f4cd2f31d
commit a58863b992
3 changed files with 28 additions and 6 deletions

17
cli.py
View File

@ -12,7 +12,12 @@ from mev_inspect.db import get_inspect_session, get_trace_session
from mev_inspect.inspector import MEVInspector from mev_inspect.inspector import MEVInspector
from mev_inspect.prices import fetch_prices, fetch_prices_range from mev_inspect.prices import fetch_prices, fetch_prices_range
from mev_inspect.queue.broker import connect_broker from mev_inspect.queue.broker import connect_broker
from mev_inspect.queue.tasks import export_block_task, inspect_many_blocks_task from mev_inspect.queue.tasks import (
BACKFILL_INSPECT_MANY_BLOCKS_PRIORITY,
LIVE_EXPORT_BLOCK_PRIORITY,
export_block_task,
inspect_many_blocks_task,
)
from mev_inspect.s3_export import export_block from mev_inspect.s3_export import export_block
RPC_URL_ENV = "RPC_URL" RPC_URL_ENV = "RPC_URL"
@ -102,7 +107,11 @@ async def inspect_many_blocks_command(
@click.argument("batch_size", type=int, default=10) @click.argument("batch_size", type=int, default=10)
def enqueue_many_blocks_command(start_block: int, end_block: int, batch_size: int): def enqueue_many_blocks_command(start_block: int, end_block: int, batch_size: int):
broker = connect_broker() broker = connect_broker()
inspect_many_blocks_actor = dramatiq.actor(inspect_many_blocks_task, broker=broker) inspect_many_blocks_actor = dramatiq.actor(
inspect_many_blocks_task,
broker=broker,
priority=BACKFILL_INSPECT_MANY_BLOCKS_PRIORITY,
)
if start_block < end_block: if start_block < end_block:
after_block = start_block after_block = start_block
@ -137,7 +146,9 @@ def fetch_all_prices():
@click.argument("block_number", type=int) @click.argument("block_number", type=int)
def enqueue_s3_export(block_number: int): def enqueue_s3_export(block_number: int):
broker = connect_broker() broker = connect_broker()
export_actor = dramatiq.actor(export_block_task, broker=broker) export_actor = dramatiq.actor(
export_block_task, broker=broker, priority=LIVE_EXPORT_BLOCK_PRIORITY
)
logger.info(f"Sending block {block_number} export to queue") logger.info(f"Sending block {block_number} export to queue")
export_actor.send(block_number) export_actor.send(block_number)

View File

@ -9,6 +9,12 @@ from .middleware import DbMiddleware, InspectorMiddleware
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# create a separate actor for backfill export
# set to same priority as backfill inspect
LIVE_EXPORT_BLOCK_PRIORITY = 1
BACKFILL_INSPECT_MANY_BLOCKS_PRIORITY = 0
def inspect_many_blocks_task( def inspect_many_blocks_task(
after_block: int, after_block: int,
before_block: int, before_block: int,

View File

@ -10,7 +10,12 @@ from mev_inspect.queue.middleware import (
DbMiddleware, DbMiddleware,
InspectorMiddleware, InspectorMiddleware,
) )
from mev_inspect.queue.tasks import export_block_task, inspect_many_blocks_task from mev_inspect.queue.tasks import (
BACKFILL_INSPECT_MANY_BLOCKS_PRIORITY,
LIVE_EXPORT_BLOCK_PRIORITY,
export_block_task,
inspect_many_blocks_task,
)
logging.basicConfig(stream=sys.stdout, level=logging.INFO) logging.basicConfig(stream=sys.stdout, level=logging.INFO)
@ -20,5 +25,5 @@ broker.add_middleware(AsyncMiddleware())
broker.add_middleware(InspectorMiddleware(os.environ["RPC_URL"])) broker.add_middleware(InspectorMiddleware(os.environ["RPC_URL"]))
dramatiq.set_broker(broker) dramatiq.set_broker(broker)
dramatiq.actor(inspect_many_blocks_task) dramatiq.actor(inspect_many_blocks_task, priority=BACKFILL_INSPECT_MANY_BLOCKS_PRIORITY)
dramatiq.actor(export_block_task) dramatiq.actor(export_block_task, priority=LIVE_EXPORT_BLOCK_PRIORITY)