From c9536d1bde819c1335d67d42c78545a607e070d0 Mon Sep 17 00:00:00 2001 From: Gui Heise Date: Thu, 17 Feb 2022 11:08:21 -0500 Subject: [PATCH] Add functions --- mev_inspect/s3_export.py | 109 +++++++++++++++++---------------------- 1 file changed, 48 insertions(+), 61 deletions(-) diff --git a/mev_inspect/s3_export.py b/mev_inspect/s3_export.py index bdceccf..292e29b 100644 --- a/mev_inspect/s3_export.py +++ b/mev_inspect/s3_export.py @@ -14,78 +14,65 @@ EXPORT_BUCKET_REGION_ENV = "EXPORT_BUCKET_REGION" EXPORT_AWS_ACCESS_KEY_ID_ENV = "EXPORT_AWS_ACCESS_KEY_ID" EXPORT_AWS_SECRET_ACCESS_KEY_ENV = "EXPORT_AWS_SECRET_ACCESS_KEY" -MEV_SUMMARY_EXPORT_QUERY = """ - SELECT to_json(mev_summary) - FROM mev_summary -WHERE - block_number = :block_number - """ - -ARBITRAGES_EXPORT_QUERY = """ - SELECT to_json(arbitrages) - FROM arbitrages -WHERE - block_number = :block_number - """ -LIQUIDATIONS_EXPORT_QUERY = """ - SELECT to_json(liquidations) - FROM liquidations -WHERE - block_number = :block_number - """ -SANDWICHES_EXPORT_QUERY = """ - SELECT to_json(sandwiches) - FROM sandwiches -WHERE - block_number = :block_number - """ - -query_by_table = { - "mev_summary": MEV_SUMMARY_EXPORT_QUERY, - "arbitrages": ARBITRAGES_EXPORT_QUERY, - "liquidations": LIQUIDATIONS_EXPORT_QUERY, - "sandwiches": SANDWICHES_EXPORT_QUERY, -} +supported_tables = [ + "mev_summary", + "arbitrages", + "liquidations", + "sandwiches", + "sandwiched_swaps", +] logger = logging.getLogger(__name__) def export_block(inspect_db_session, block_number: int) -> None: - export_bucket_name = get_export_bucket_name() + + for table in supported_tables: + + _export_table(inspect_db_session, block_number, table) + + +def _export_table(inspect_db_session, block_number: int, table: str) -> None: client = get_s3_client() + export_bucket_name = get_export_bucket_name() + export_statement = _get_export_statement(table) - for table in query_by_table.keys(): - object_key = f"{table}/flashbots_{block_number}.json" - mev_summary_json_results = inspect_db_session.execute( - statement=query_by_table[table], - params={ - "block_number": block_number, - }, - ) + object_key = f"{table}/flashbots_{block_number}.json" - first_value, mev_summary_json_results = _peek(mev_summary_json_results) - if first_value is None: - existing_object_size = _get_object_size( - client, export_bucket_name, object_key - ) - if existing_object_size is None or existing_object_size == 0: - logger.info(f"Skipping {table} for block {block_number} - no data") - continue + mev_summary_json_results = inspect_db_session.execute( + statement=export_statement, + params={ + "block_number": block_number, + }, + ) - mev_summary_json_fileobj = BytesIteratorIO( - ( - f"{json.dumps(row)}\n".encode("utf-8") - for (row,) in mev_summary_json_results - ) - ) + first_value, mev_summary_json_results = _peek(mev_summary_json_results) + if first_value is None: + existing_object_size = _get_object_size(client, export_bucket_name, object_key) + if existing_object_size is None or existing_object_size == 0: + logger.info(f"Skipping {table} for block {block_number} - no data") + return - client.upload_fileobj( - mev_summary_json_fileobj, - Bucket=export_bucket_name, - Key=object_key, - ) + mev_summary_json_fileobj = BytesIteratorIO( + (f"{json.dumps(row)}\n".encode("utf-8") for (row,) in mev_summary_json_results) + ) - logger.info(f"Exported to {object_key}") + client.upload_fileobj( + mev_summary_json_fileobj, + Bucket=export_bucket_name, + Key=object_key, + ) + + logger.info(f"Exported to {object_key}") + + +def _get_export_statement(table: str) -> str: + return f""" + SELECT to_json({table}) + FROM {table} + WHERE + block_number = :block_number + """ def _get_object_size(client, bucket: str, key: str) -> Optional[int]: