From f6d5ca11790e2fd3bc2f24089e728a09ef6e3014 Mon Sep 17 00:00:00 2001 From: Gui Heise Date: Mon, 31 Jan 2022 20:42:24 -0500 Subject: [PATCH] Add commands and functions --- cli.py | 34 +++++++++++++++++++ mev | 23 ++++++++++++- mev_inspect/export.py | 78 +++++++++++++++++++++++++++++++++++++++++++ pyproject.toml | 2 ++ 4 files changed, 136 insertions(+), 1 deletion(-) create mode 100644 mev_inspect/export.py diff --git a/cli.py b/cli.py index e480ea8..1f13778 100644 --- a/cli.py +++ b/cli.py @@ -8,6 +8,7 @@ import click from mev_inspect.concurrency import coro from mev_inspect.crud.prices import write_prices from mev_inspect.db import get_inspect_session, get_trace_session +from mev_inspect.export import s3_export, s3_export_many from mev_inspect.inspector import MEVInspector from mev_inspect.prices import fetch_prices, fetch_prices_range @@ -131,6 +132,39 @@ def fetch_range(after: datetime, before: datetime): write_prices(inspect_db_session, prices) +@cli.command() +@click.argument("block_number", type=int) +@click.argument("bucket", type=str) +@click.argument("filepath", type=str) +@click.argument("region", type=str) +def s3_export_command(block_number: int, bucket: str, filepath: str, region: str): + inspect_db_session = get_inspect_session() + + logger.info(f"Exporting block {block_number}") + s3_export(inspect_db_session, block_number, bucket, filepath, region) + + return None + + +@cli.command() +@click.argument("after_block", type=int) +@click.argument("before_block", type=int) +@click.argument("bucket", type=str) +@click.argument("filepath_base", type=str) +@click.argument("region", type=str) +def s3_export_many_command( + after_block: int, before_block: int, bucket: str, filepath_base: str, region: str +): + inspect_db_session = get_inspect_session() + + logger.info(f"Exporting blocks {after_block} to {before_block}") + s3_export_many( + inspect_db_session, after_block, before_block, bucket, filepath_base, region + ) + + return None + + def get_rpc_url() -> str: return os.environ["RPC_URL"] diff --git a/mev b/mev index eb81fc6..8be28e2 100755 --- a/mev +++ b/mev @@ -94,12 +94,33 @@ case "$1" in exit 1 esac ;; + export) + shift + case "$1" in + s3-export) + block_number=$2 + bucket=$3 + filepath=$4 + region=$5 + echo "Exporting block" + kubectl exec -ti deploy/mev-inspect -- poetry run s3-export $block_number $bucket $filepath $region + ;; + s3-export-many) + after_block=$2 + before_block=$3 + bucket=$4 + filepath_base=$5 + region=$6 + echo "Exporting blocks" + kubectl exec -ti deploy/mev-inspect -- poetry run s3-export-many $after_block $before_block $bucket $filepath_base $region + ;; + ;; exec) shift kubectl exec -ti deploy/mev-inspect -- $@ ;; *) - echo "Usage: "$1" {db|backfill|inspect|test}" + echo "Usage: "$1" {db|redis|listener|backfill|inspect|inspect-many|test|fetch|prices|export}" exit 1 esac diff --git a/mev_inspect/export.py b/mev_inspect/export.py new file mode 100644 index 0000000..5cf96d9 --- /dev/null +++ b/mev_inspect/export.py @@ -0,0 +1,78 @@ +from mev_inspect.crud.latest_s3_block import ( + find_latest_s3_block, + update_latest_s3_block, +) + + +def s3_export( + db_session, block_number: int, bucket: str, filepath: str, region: str +) -> None: + """Export block to S3""" + + uri = _get_uri(db_session, bucket, filepath, region) + + latest_s3_block = find_latest_s3_block(db_session) + + if latest_s3_block is not None: + if block_number > latest_s3_block: + db_session.execute( + """ + SELECT * FROM aws_s3.query_export_to_s3( + 'SELECT * + FROM mev_summary + WHERE block_number={block_number}', + :{uri} + """, + params={"block_number": block_number, "uri": uri}, + ) + update_latest_s3_block(db_session, block_number) + + +def s3_export_many( + db_session, + after_block: int, + before_block: int, + bucket: str, + filepath_base: str, + region: str, +) -> None: + """Export block range to S3""" + + latest_s3_block = find_latest_s3_block(db_session) + + for block_number in range(after_block, before_block): + if latest_s3_block is not None: + if block_number > latest_s3_block: + filepath = f"{filepath_base}" + f"{block_number}" + uri = _get_uri(db_session, bucket, filepath, region) + db_session.execute( + """ + SELECT * FROM aws_s3.query_export_to_s3( + 'SELECT * + FROM mev_summary + WHERE block_number={block_number} + :{uri} + """, + params={ + "after_block": after_block, + "before_block": before_block, + "uri": uri, + }, + ) + update_latest_s3_block(db_session, block_number) + + +def _get_uri(db_session, bucket: str, filepath: str, region: str) -> str: + + uri = db_session.execute( + """ + SELECT aws_commons.create_s3_uri( + '{bucket}', + '{filepath}', + '{region}' + ) AS s3_uri_1 \gset + """, + params={"bucket": bucket, "filepath": filepath, "region": region}, + ) + + return uri diff --git a/pyproject.toml b/pyproject.toml index d616834..f1719d9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,6 +40,8 @@ enqueue-many-blocks = 'cli:enqueue_many_blocks_command' fetch-block = 'cli:fetch_block_command' fetch-all-prices = 'cli:fetch_all_prices' fetch-range = 'cli:fetch_range' +s3-export = 'cli:s3_export' +s3-export-many = 'cli:s3_export_many' [tool.black] exclude = '''