From b30d6be0c5af9f12a6387db75507423f8fb9dbad Mon Sep 17 00:00:00 2001 From: Luke Van Seters Date: Wed, 16 Feb 2022 08:55:48 -0500 Subject: [PATCH 1/2] Add peek that preserves the iterable --- mev_inspect/s3_export.py | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/mev_inspect/s3_export.py b/mev_inspect/s3_export.py index 422dfe8..6d220bf 100644 --- a/mev_inspect/s3_export.py +++ b/mev_inspect/s3_export.py @@ -1,7 +1,8 @@ +import itertools import json import logging import os -from typing import Optional +from typing import Iterator, Optional, Tuple, TypeVar import boto3 @@ -34,6 +35,12 @@ def export_block(inspect_db_session, block_number: int) -> None: }, ) + first_value, mev_summary_json_results = _peek(mev_summary_json_results) + if first_value is None: + logger.info("No data for this block") + else: + logger.info("We have data for this block") + mev_summary_json_fileobj = BytesIteratorIO( (f"{json.dumps(row)}\n".encode("utf-8") for (row,) in mev_summary_json_results) ) @@ -78,3 +85,15 @@ def get_export_aws_access_key_id() -> Optional[str]: def get_export_aws_secret_access_key() -> Optional[str]: return os.environ.get(EXPORT_AWS_SECRET_ACCESS_KEY_ENV) + + +_T = TypeVar("_T") + + +def _peek(iterable: Iterator[_T]) -> Tuple[Optional[_T], Iterator[_T]]: + try: + first = next(iterable) + except StopIteration: + return None, iter([]) + + return first, itertools.chain([first], iterable) From 4ef214540983093628f02b7c31cd7eb62918d044 Mon Sep 17 00:00:00 2001 From: Luke Van Seters Date: Wed, 16 Feb 2022 09:06:51 -0500 Subject: [PATCH 2/2] Skip write if no data and no key or current upload has no data --- mev_inspect/s3_export.py | 27 ++++++++++++++++++++------- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/mev_inspect/s3_export.py b/mev_inspect/s3_export.py index 6d220bf..39da5f9 100644 --- a/mev_inspect/s3_export.py +++ b/mev_inspect/s3_export.py @@ -27,6 +27,7 @@ logger = logging.getLogger(__name__) def export_block(inspect_db_session, block_number: int) -> None: export_bucket_name = get_export_bucket_name() client = get_s3_client() + object_key = f"mev_summary/flashbots_{block_number}.json" mev_summary_json_results = inspect_db_session.execute( statement=MEV_SUMMARY_EXPORT_QUERY, @@ -37,23 +38,35 @@ def export_block(inspect_db_session, block_number: int) -> None: first_value, mev_summary_json_results = _peek(mev_summary_json_results) if first_value is None: - logger.info("No data for this block") - else: - logger.info("We have data for this block") + existing_object_size = _get_object_size(client, export_bucket_name, object_key) + if existing_object_size is None or existing_object_size == 0: + logger.info(f"Skipping block {block_number} - no data") + return mev_summary_json_fileobj = BytesIteratorIO( (f"{json.dumps(row)}\n".encode("utf-8") for (row,) in mev_summary_json_results) ) - key = f"mev_summary/flashbots_{block_number}.json" - client.upload_fileobj( mev_summary_json_fileobj, Bucket=export_bucket_name, - Key=key, + Key=object_key, ) - logger.info(f"Exported to {key}") + logger.info(f"Exported to {object_key}") + + +def _get_object_size(client, bucket: str, key: str) -> Optional[int]: + response = client.list_objects_v2( + Bucket=bucket, + Prefix=key, + ) + + for obj in response.get("Contents", []): + if obj["Key"] == key: + return obj["Size"] + + return None def get_s3_client():