Merge pull request #266 from flashbots/priority-export

Create low and high priority queues. Put export on high priority, backfill on low
This commit is contained in:
Luke Van Seters 2022-02-15 17:20:02 -05:00 committed by GitHub
commit cd5f82733b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 46 additions and 8 deletions

23
cli.py
View File

@ -12,7 +12,14 @@ from mev_inspect.db import get_inspect_session, get_trace_session
from mev_inspect.inspector import MEVInspector
from mev_inspect.prices import fetch_prices, fetch_prices_range
from mev_inspect.queue.broker import connect_broker
from mev_inspect.queue.tasks import export_block_task, inspect_many_blocks_task
from mev_inspect.queue.tasks import (
HIGH_PRIORITY,
HIGH_PRIORITY_QUEUE,
LOW_PRIORITY,
LOW_PRIORITY_QUEUE,
export_block_task,
inspect_many_blocks_task,
)
from mev_inspect.s3_export import export_block
RPC_URL_ENV = "RPC_URL"
@ -102,7 +109,12 @@ async def inspect_many_blocks_command(
@click.argument("batch_size", type=int, default=10)
def enqueue_many_blocks_command(start_block: int, end_block: int, batch_size: int):
broker = connect_broker()
inspect_many_blocks_actor = dramatiq.actor(inspect_many_blocks_task, broker=broker)
inspect_many_blocks_actor = dramatiq.actor(
inspect_many_blocks_task,
broker=broker,
queue_name=LOW_PRIORITY_QUEUE,
priority=LOW_PRIORITY,
)
if start_block < end_block:
after_block = start_block
@ -137,7 +149,12 @@ def fetch_all_prices():
@click.argument("block_number", type=int)
def enqueue_s3_export(block_number: int):
broker = connect_broker()
export_actor = dramatiq.actor(export_block_task, broker=broker)
export_actor = dramatiq.actor(
export_block_task,
broker=broker,
queue_name=HIGH_PRIORITY_QUEUE,
priority=HIGH_PRIORITY,
)
logger.info(f"Sending block {block_number} export to queue")
export_actor.send(block_number)

View File

@ -15,7 +15,11 @@ from mev_inspect.db import get_inspect_session, get_trace_session
from mev_inspect.inspector import MEVInspector
from mev_inspect.provider import get_base_provider
from mev_inspect.queue.broker import connect_broker
from mev_inspect.queue.tasks import export_block_task
from mev_inspect.queue.tasks import (
HIGH_PRIORITY,
HIGH_PRIORITY_QUEUE,
export_block_task,
)
from mev_inspect.signal_handler import GracefulKiller
logging.basicConfig(filename="listener.log", filemode="a", level=logging.INFO)
@ -41,7 +45,12 @@ async def run():
trace_db_session = get_trace_session()
broker = connect_broker()
export_actor = dramatiq.actor(export_block_task, broker=broker)
export_actor = dramatiq.actor(
export_block_task,
broker=broker,
queue_name=HIGH_PRIORITY_QUEUE,
priority=HIGH_PRIORITY,
)
inspector = MEVInspector(rpc)
base_provider = get_base_provider(rpc)

View File

@ -9,6 +9,13 @@ from .middleware import DbMiddleware, InspectorMiddleware
logger = logging.getLogger(__name__)
HIGH_PRIORITY_QUEUE = "high"
LOW_PRIORITY_QUEUE = "low"
HIGH_PRIORITY = 0
LOW_PRIORITY = 1
def inspect_many_blocks_task(
after_block: int,
before_block: int,

View File

@ -10,7 +10,12 @@ from mev_inspect.queue.middleware import (
DbMiddleware,
InspectorMiddleware,
)
from mev_inspect.queue.tasks import export_block_task, inspect_many_blocks_task
from mev_inspect.queue.tasks import (
HIGH_PRIORITY_QUEUE,
LOW_PRIORITY_QUEUE,
export_block_task,
inspect_many_blocks_task,
)
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
@ -20,5 +25,5 @@ broker.add_middleware(AsyncMiddleware())
broker.add_middleware(InspectorMiddleware(os.environ["RPC_URL"]))
dramatiq.set_broker(broker)
dramatiq.actor(inspect_many_blocks_task)
dramatiq.actor(export_block_task)
dramatiq.actor(inspect_many_blocks_task, queue_name=HIGH_PRIORITY_QUEUE)
dramatiq.actor(export_block_task, queue_name=LOW_PRIORITY_QUEUE)