diff --git a/Tiltfile b/Tiltfile index 707e8b1..68f5b72 100644 --- a/Tiltfile +++ b/Tiltfile @@ -107,13 +107,13 @@ local_resource( # repo_name="localstack-charts", # repo_url="https://localstack.github.io/helm-charts", #) - +# #local_resource( # 'localstack-port-forward', # serve_cmd='kubectl port-forward --namespace default svc/localstack 4566:4566', # resource_deps=["localstack"] #) - +# #k8s_yaml(configmap_from_dict("mev-inspect-export", inputs = { # "services": "s3", #})) diff --git a/cli.py b/cli.py index 0114744..b5a6674 100644 --- a/cli.py +++ b/cli.py @@ -14,11 +14,9 @@ from mev_inspect.inspector import MEVInspector from mev_inspect.prices import fetch_prices, fetch_prices_range from mev_inspect.queue.broker import connect_broker from mev_inspect.queue.tasks import ( - HIGH_PRIORITY, - HIGH_PRIORITY_QUEUE, LOW_PRIORITY, LOW_PRIORITY_QUEUE, - export_block_task, + backfill_export_task, inspect_many_blocks_task, ) from mev_inspect.s3_export import export_block @@ -167,15 +165,31 @@ def fetch_all_prices(): def enqueue_s3_export(block_number: int): broker = connect_broker() export_actor = dramatiq.actor( - export_block_task, + backfill_export_task, broker=broker, - queue_name=HIGH_PRIORITY_QUEUE, - priority=HIGH_PRIORITY, + queue_name=LOW_PRIORITY_QUEUE, + priority=LOW_PRIORITY, ) logger.info(f"Sending block {block_number} export to queue") export_actor.send(block_number) +@cli.command() +@click.argument("after_block", type=int) +@click.argument("before_block", type=int) +def enqueue_many_s3_exports(after_block: int, before_block: int): + broker = connect_broker() + export_actor = dramatiq.actor( + backfill_export_task, + broker=broker, + queue_name=LOW_PRIORITY_QUEUE, + priority=LOW_PRIORITY, + ) + logger.info(f"Sending blocks {after_block} to {before_block} to queue") + for block_number in range(after_block, before_block): + export_actor.send(block_number) + + @cli.command() @click.argument("block_number", type=int) def s3_export(block_number: int): diff --git a/listener.py b/listener.py index fe4295d..123d2fe 100644 --- a/listener.py +++ b/listener.py @@ -18,7 +18,7 @@ from mev_inspect.queue.broker import connect_broker from mev_inspect.queue.tasks import ( HIGH_PRIORITY, HIGH_PRIORITY_QUEUE, - export_block_task, + realtime_export_task, ) from mev_inspect.signal_handler import GracefulKiller @@ -46,7 +46,7 @@ async def run(): broker = connect_broker() export_actor = dramatiq.actor( - export_block_task, + realtime_export_task, broker=broker, queue_name=HIGH_PRIORITY_QUEUE, priority=HIGH_PRIORITY, diff --git a/mev b/mev index 8469268..47ffb52 100755 --- a/mev +++ b/mev @@ -98,6 +98,13 @@ case "$1" in exit 1 esac ;; + backfill-export) + after_block=$2 + before_block=$3 + + echo "Sending $after_block to $before_block export to queue" + kubectl exec -ti deploy/mev-inspect -- poetry run enqueue-many-s3-exports $after_block $before_block + ;; enqueue-s3-export) block_number=$2 diff --git a/mev_inspect/queue/tasks.py b/mev_inspect/queue/tasks.py index f7c9272..9e45b4d 100644 --- a/mev_inspect/queue/tasks.py +++ b/mev_inspect/queue/tasks.py @@ -32,7 +32,12 @@ def inspect_many_blocks_task( ) -def export_block_task(block_number: int): +def realtime_export_task(block_number: int): + with _session_scope(DbMiddleware.get_inspect_sessionmaker()) as inspect_db_session: + export_block(inspect_db_session, block_number) + + +def backfill_export_task(block_number: int): with _session_scope(DbMiddleware.get_inspect_sessionmaker()) as inspect_db_session: export_block(inspect_db_session, block_number) diff --git a/pyproject.toml b/pyproject.toml index 3c87066..a63a0f6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,6 +45,7 @@ fetch-all-prices = 'cli:fetch_all_prices' fetch-range = 'cli:fetch_range' s3-export = 'cli:s3_export' enqueue-s3-export = 'cli:enqueue_s3_export' +enqueue-many-s3-exports = 'cli:enqueue_many_s3_exports' [tool.black] exclude = ''' diff --git a/worker.py b/worker.py index 82cf349..7842a8a 100644 --- a/worker.py +++ b/worker.py @@ -11,10 +11,13 @@ from mev_inspect.queue.middleware import ( InspectorMiddleware, ) from mev_inspect.queue.tasks import ( + HIGH_PRIORITY, HIGH_PRIORITY_QUEUE, + LOW_PRIORITY, LOW_PRIORITY_QUEUE, - export_block_task, + backfill_export_task, inspect_many_blocks_task, + realtime_export_task, ) logging.basicConfig(stream=sys.stdout, level=logging.INFO) @@ -25,5 +28,12 @@ broker.add_middleware(AsyncMiddleware()) broker.add_middleware(InspectorMiddleware(os.environ["RPC_URL"])) dramatiq.set_broker(broker) -dramatiq.actor(inspect_many_blocks_task, queue_name=HIGH_PRIORITY_QUEUE) -dramatiq.actor(export_block_task, queue_name=LOW_PRIORITY_QUEUE) +dramatiq.actor( + inspect_many_blocks_task, queue_name=LOW_PRIORITY_QUEUE, priority=LOW_PRIORITY +) +dramatiq.actor( + backfill_export_task, queue_name=LOW_PRIORITY_QUEUE, priority=LOW_PRIORITY_QUEUE +) +dramatiq.actor( + realtime_export_task, queue_name=HIGH_PRIORITY_QUEUE, priority=HIGH_PRIORITY +)