Merge pull request #270 from flashbots/only-write-if-newly-empty

Only export empty blocks if there's an existing non-empty one
This commit is contained in:
Luke Van Seters 2022-02-16 09:47:55 -05:00 committed by GitHub
commit 4b93f95d50
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1,7 +1,8 @@
import itertools
import json
import logging
import os
from typing import Optional
from typing import Iterator, Optional, Tuple, TypeVar
import boto3
@ -26,6 +27,7 @@ logger = logging.getLogger(__name__)
def export_block(inspect_db_session, block_number: int) -> None:
export_bucket_name = get_export_bucket_name()
client = get_s3_client()
object_key = f"mev_summary/flashbots_{block_number}.json"
mev_summary_json_results = inspect_db_session.execute(
statement=MEV_SUMMARY_EXPORT_QUERY,
@ -34,19 +36,37 @@ def export_block(inspect_db_session, block_number: int) -> None:
},
)
first_value, mev_summary_json_results = _peek(mev_summary_json_results)
if first_value is None:
existing_object_size = _get_object_size(client, export_bucket_name, object_key)
if existing_object_size is None or existing_object_size == 0:
logger.info(f"Skipping block {block_number} - no data")
return
mev_summary_json_fileobj = BytesIteratorIO(
(f"{json.dumps(row)}\n".encode("utf-8") for (row,) in mev_summary_json_results)
)
key = f"mev_summary/flashbots_{block_number}.json"
client.upload_fileobj(
mev_summary_json_fileobj,
Bucket=export_bucket_name,
Key=key,
Key=object_key,
)
logger.info(f"Exported to {key}")
logger.info(f"Exported to {object_key}")
def _get_object_size(client, bucket: str, key: str) -> Optional[int]:
response = client.list_objects_v2(
Bucket=bucket,
Prefix=key,
)
for obj in response.get("Contents", []):
if obj["Key"] == key:
return obj["Size"]
return None
def get_s3_client():
@ -78,3 +98,15 @@ def get_export_aws_access_key_id() -> Optional[str]:
def get_export_aws_secret_access_key() -> Optional[str]:
return os.environ.get(EXPORT_AWS_SECRET_ACCESS_KEY_ENV)
_T = TypeVar("_T")
def _peek(iterable: Iterator[_T]) -> Tuple[Optional[_T], Iterator[_T]]:
try:
first = next(iterable)
except StopIteration:
return None, iter([])
return first, itertools.chain([first], iterable)