From 815af26f288e1c30cc8cbaf90b2908746bce7b5b Mon Sep 17 00:00:00 2001 From: Luke Van Seters Date: Thu, 30 Dec 2021 23:09:38 -0500 Subject: [PATCH] Enqueue messages to redis with backfill command --- cli.py | 12 ++++++++++++ mev | 5 ++--- pyproject.toml | 1 + worker.py | 8 ++++---- 4 files changed, 19 insertions(+), 7 deletions(-) diff --git a/cli.py b/cli.py index 74cbbf9..d116d99 100644 --- a/cli.py +++ b/cli.py @@ -3,6 +3,7 @@ import os import sys import click +from worker import inspect_many_blocks_task from mev_inspect.concurrency import coro from mev_inspect.crud.prices import write_prices @@ -91,6 +92,17 @@ async def inspect_many_blocks_command( ) +@cli.command() +@click.argument("after_block", type=int) +@click.argument("before_block", type=int) +@click.argument("batch_size", type=int, default=10) +def enqueue_many_blocks_command(after_block: int, before_block: int, batch_size: int): + for batch_after_block in range(after_block, before_block, batch_size): + batch_before_block = min(batch_after_block + batch_size, before_block) + logger.info(f"Sending {batch_after_block} to {batch_before_block}") + inspect_many_blocks_task.send(batch_after_block, batch_before_block) + + @cli.command() @coro async def fetch_all_prices(): diff --git a/mev b/mev index 17875d2..f753e7e 100755 --- a/mev +++ b/mev @@ -45,10 +45,9 @@ case "$1" in backfill) start_block_number=$2 end_block_number=$3 - n_workers=$4 - echo "Backfilling from $start_block_number to $end_block_number with $n_workers workers" - poetry run python backfill.py $start_block_number $end_block_number $n_workers + echo "Backfilling from $start_block_number to $end_block_number" + kubectl exec -ti deploy/mev-inspect -- poetry run enqueue-many-blocks $start_block_number $end_block_number ;; inspect) block_number=$2 diff --git a/pyproject.toml b/pyproject.toml index f82cfc3..3c4be4d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,6 +35,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry.scripts] inspect-block = 'cli:inspect_block_command' inspect-many-blocks = 'cli:inspect_many_blocks_command' +enqueue-many-blocks = 'cli:enqueue_many_blocks_command' fetch-block = 'cli:fetch_block_command' fetch-all-prices = 'cli:fetch_all_prices' diff --git a/worker.py b/worker.py index 29c64d2..e9969d2 100644 --- a/worker.py +++ b/worker.py @@ -36,10 +36,10 @@ dramatiq.set_broker(broker) @contextmanager def session_scope(Session=None): if Session is None: - return None - - with Session() as session: - yield session + yield None + else: + with Session() as session: + yield session @dramatiq.actor