Add tasks to CLI

This commit is contained in:
Gui Heise 2022-02-21 10:59:14 -05:00
parent cbad9e79b6
commit da04bc4351
2 changed files with 5 additions and 11 deletions

11
cli.py
View File

@ -15,10 +15,9 @@ from mev_inspect.prices import fetch_prices, fetch_prices_range
from mev_inspect.queue.broker import connect_broker from mev_inspect.queue.broker import connect_broker
from mev_inspect.queue.tasks import ( from mev_inspect.queue.tasks import (
HIGH_PRIORITY, HIGH_PRIORITY,
HIGH_PRIORITY_QUEUE,
LOW_PRIORITY, LOW_PRIORITY,
LOW_PRIORITY_QUEUE, LOW_PRIORITY_QUEUE,
export_block_task, backfill_export_task,
inspect_many_blocks_task, inspect_many_blocks_task,
) )
from mev_inspect.s3_export import export_block from mev_inspect.s3_export import export_block
@ -167,9 +166,9 @@ def fetch_all_prices():
def enqueue_s3_export(block_number: int): def enqueue_s3_export(block_number: int):
broker = connect_broker() broker = connect_broker()
export_actor = dramatiq.actor( export_actor = dramatiq.actor(
export_block_task, backfill_export_task,
broker=broker, broker=broker,
queue_name=HIGH_PRIORITY_QUEUE, queue_name=LOW_PRIORITY_QUEUE,
priority=HIGH_PRIORITY, priority=HIGH_PRIORITY,
) )
logger.info(f"Sending block {block_number} export to queue") logger.info(f"Sending block {block_number} export to queue")
@ -182,10 +181,10 @@ def enqueue_s3_export(block_number: int):
def enqueue_many_s3_exports(after_block: int, before_block: int): def enqueue_many_s3_exports(after_block: int, before_block: int):
broker = connect_broker() broker = connect_broker()
export_actor = dramatiq.actor( export_actor = dramatiq.actor(
export_block_task, backfill_export_task,
broker=broker, broker=broker,
queue_name=LOW_PRIORITY_QUEUE, queue_name=LOW_PRIORITY_QUEUE,
priority=LOW_PRIORITY, priority=HIGH_PRIORITY,
) )
logger.info(f"Sending blocks {after_block} to {before_block} to queue") logger.info(f"Sending blocks {after_block} to {before_block} to queue")
for block_number in range(after_block, before_block): for block_number in range(after_block, before_block):

View File

@ -32,11 +32,6 @@ def inspect_many_blocks_task(
) )
def export_block_task(block_number: int):
with _session_scope(DbMiddleware.get_inspect_sessionmaker()) as inspect_db_session:
export_block(inspect_db_session, block_number)
def realtime_export_task(block_number: int): def realtime_export_task(block_number: int):
with _session_scope(DbMiddleware.get_inspect_sessionmaker()) as inspect_db_session: with _session_scope(DbMiddleware.get_inspect_sessionmaker()) as inspect_db_session:
export_block(inspect_db_session, block_number) export_block(inspect_db_session, block_number)