diff --git a/cli.py b/cli.py index ce034f1..aae37fc 100644 --- a/cli.py +++ b/cli.py @@ -10,7 +10,7 @@ from mev_inspect.crud.prices import write_prices from mev_inspect.db import get_inspect_session, get_trace_session from mev_inspect.inspector import MEVInspector from mev_inspect.prices import fetch_prices, fetch_prices_range -from mev_inspect.s3_export import export_block_range +from mev_inspect.s3_export import export_block, export_block_range RPC_URL_ENV = "RPC_URL" @@ -122,11 +122,27 @@ def fetch_all_prices(): @cli.command() @click.argument("after_block_number", type=int) @click.argument("before_block_number", type=int) -def s3_export(after_block_number: int, before_block_number: int): +def s3_export_range(after_block_number: int, before_block_number: int): inspect_db_session = get_inspect_session() export_block_range(inspect_db_session, after_block_number, before_block_number) +@cli.command() +@click.argument("block_number", type=int) +def s3_export(block_number: int): + inspect_db_session = get_inspect_session() + export_block(inspect_db_session, block_number) + + +@cli.command() +@click.argument("block_number", type=int) +def enqueue_s3_export(block_number: int): + # pylint: disable=import-outside-toplevel + from worker import export_block_task + + export_block_task.send(block_number) + + @cli.command() @click.argument("after", type=click.DateTime(formats=["%Y-%m-%d", "%m-%d-%Y"])) @click.argument("before", type=click.DateTime(formats=["%Y-%m-%d", "%m-%d-%Y"])) diff --git a/mev b/mev index 65a3556..446fb38 100755 --- a/mev +++ b/mev @@ -94,13 +94,20 @@ case "$1" in exit 1 esac ;; - s3-export) + s3-export-range) after_block_number=$2 before_block_number=$3 echo "Exporting from $after_block_number to $before_block_number" - kubectl exec -ti deploy/mev-inspect -- poetry run s3-export $after_block_number $before_block_number + kubectl exec -ti deploy/mev-inspect -- poetry run s3-export-range $after_block_number $before_block_number ;; + s3-export) + block_number=$2 + + echo "Exporting $block_number" + kubectl exec -ti deploy/mev-inspect -- poetry run s3-export $block_number + ;; + exec) shift kubectl exec -ti deploy/mev-inspect -- $@ diff --git a/mev_inspect/s3_export.py b/mev_inspect/s3_export.py index e1db97f..c4db5b4 100644 --- a/mev_inspect/s3_export.py +++ b/mev_inspect/s3_export.py @@ -8,7 +8,12 @@ import boto3 from mev_inspect.text_io import BytesIteratorIO AWS_ENDPOINT_URL_ENV = "AWS_ENDPOINT_URL" -MEV_SUMMARY_EXPORT_QUERY = """ +MEV_SUMMARY_EXPORT_BLOCK_QUERY = """ + SELECT to_json(mev_summary) + FROM mev_summary + WHERE block_number=:block_number + """ +MEV_SUMMARY_EXPORT_RANGE_QUERY = """ SELECT to_json(mev_summary) FROM mev_summary WHERE @@ -26,7 +31,7 @@ def export_block_range( client = get_s3_client() mev_summary_json_results = inspect_db_session.execute( - statement=MEV_SUMMARY_EXPORT_QUERY, + statement=MEV_SUMMARY_EXPORT_RANGE_QUERY, params={ "after_block_number": after_block_number, "before_block_number": before_block_number, @@ -48,6 +53,29 @@ def export_block_range( logger.info(f"Exported to {key}") +def export_block(inspect_db_session, block_number: int) -> None: + export_bucket_name = get_export_bucket_name() + client = get_s3_client() + + mev_summary_json_results = inspect_db_session.execute( + statement=MEV_SUMMARY_EXPORT_BLOCK_QUERY, + params={"block_number": block_number}, + ) + mev_summary_json_fileobj = BytesIteratorIO( + (f"{json.dumps(row)}\n".encode("utf-8") for (row,) in mev_summary_json_results) + ) + + key = f"mev_summary/flashbots_{block_number}.json" + + client.upload_fileobj( + mev_summary_json_fileobj, + Bucket=export_bucket_name, + Key=key, + ) + + logger.info(f"Exported to {key}") + + def get_s3_client(): endpoint_url = get_endpoint_url() return boto3.client( diff --git a/pyproject.toml b/pyproject.toml index 9b42e0f..b53fbcc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,6 +41,7 @@ enqueue-many-blocks = 'cli:enqueue_many_blocks_command' fetch-block = 'cli:fetch_block_command' fetch-all-prices = 'cli:fetch_all_prices' fetch-range = 'cli:fetch_range' +s3-export-range = 'cli:s3_export_range' s3-export = 'cli:s3_export' [tool.black] diff --git a/worker.py b/worker.py index f798d77..e7aa038 100644 --- a/worker.py +++ b/worker.py @@ -12,6 +12,7 @@ from dramatiq.middleware import Middleware from mev_inspect.db import get_inspect_sessionmaker, get_trace_sessionmaker from mev_inspect.inspector import MEVInspector +from mev_inspect.s3_export import export_block InspectSession = get_inspect_sessionmaker() TraceSession = get_trace_sessionmaker() @@ -83,5 +84,13 @@ def inspect_many_blocks_task( ) +@dramatiq.actor +def export_block_task( + block_number: int, +): + with session_scope(InspectSession) as inspect_db_session: + export_block(inspect_db_session, block_number) + + if __name__ == "__main__": dramatiq_worker(processes=1, threads=1)