From d9439dfe27c0ea4564602cc2260c6f6373c8f766 Mon Sep 17 00:00:00 2001 From: Luke Van Seters Date: Fri, 4 Feb 2022 15:20:59 -0500 Subject: [PATCH] Run query. Export to S3 --- cli.py | 6 +-- mev_inspect/db.py | 2 +- mev_inspect/s3_export.py | 64 ++++++++++++++++++++++++ mev_inspect/{string_io.py => text_io.py} | 36 +++++++++++++ 4 files changed, 104 insertions(+), 4 deletions(-) create mode 100644 mev_inspect/s3_export.py rename mev_inspect/{string_io.py => text_io.py} (52%) diff --git a/cli.py b/cli.py index 6021d31..8a78314 100644 --- a/cli.py +++ b/cli.py @@ -13,7 +13,7 @@ from mev_inspect.inspector import MEVInspector from mev_inspect.prices import fetch_prices, fetch_prices_range from mev_inspect.queue.broker import connect_broker from mev_inspect.queue.tasks import inspect_many_blocks_task -from mev_inspect.s3_export import list_export_bucket +from mev_inspect.s3_export import export_block_range, list_export_bucket RPC_URL_ENV = "RPC_URL" @@ -125,10 +125,10 @@ def fetch_all_prices(): @click.argument("after_block_number", type=int) @click.argument("before_block_number", type=int) def s3_export(after_block_number: int, before_block_number: int): - logger.info(after_block_number) - logger.info(before_block_number) + inspect_db_session = get_inspect_session() print(list_export_bucket()) + export_block_range(inspect_db_session, after_block_number, before_block_number) @cli.command() diff --git a/mev_inspect/db.py b/mev_inspect/db.py index dd7c66a..7fb1a97 100644 --- a/mev_inspect/db.py +++ b/mev_inspect/db.py @@ -4,7 +4,7 @@ from typing import Any, Iterable, List, Optional from sqlalchemy import create_engine, orm from sqlalchemy.orm import sessionmaker -from mev_inspect.string_io import StringIteratorIO +from mev_inspect.text_io import StringIteratorIO def get_trace_database_uri() -> Optional[str]: diff --git a/mev_inspect/s3_export.py b/mev_inspect/s3_export.py new file mode 100644 index 0000000..b27c472 --- /dev/null +++ b/mev_inspect/s3_export.py @@ -0,0 +1,64 @@ +import json + +import boto3 + +from mev_inspect.text_io import BytesIteratorIO + +MEV_SUMMARY_EXPORT_QUERY = """ + SELECT to_json(mev_summary) + FROM mev_summary + WHERE + block_number >= :after_block_number AND + block_number < :before_block_number + """ + + +def export_block_range( + inspect_db_session, after_block_number: int, before_block_number +) -> None: + client = get_s3_client() + bucket_name = get_export_bucket_name() + + mev_summary_json_results = inspect_db_session.execute( + statement=MEV_SUMMARY_EXPORT_QUERY, + params={ + "after_block_number": after_block_number, + "before_block_number": before_block_number, + }, + ) + + mev_summary_json_fileobj = BytesIteratorIO( + (f"{json.dumps(row)}\n".encode("utf-8") for (row,) in mev_summary_json_results) + ) + + key = f"mev_summary/{after_block_number}-{before_block_number}.json" + + client.upload_fileobj( + mev_summary_json_fileobj, + Bucket=bucket_name, + Key=key, + ) + + +def list_export_bucket(): + client = get_s3_client() + return client.list_objects_v2( + Bucket=get_export_bucket_name(), + Prefix="/", + ) + + +# TODO - replaced by ConfigMap +def get_export_bucket_name() -> str: + return "local-export" + + +# TODO +def get_s3_client(): + return boto3.client( + "s3", + region_name="us-east-1", + endpoint_url="http://localstack:4566", + aws_access_key_id="test", + aws_secret_access_key="test", + ) diff --git a/mev_inspect/string_io.py b/mev_inspect/text_io.py similarity index 52% rename from mev_inspect/string_io.py rename to mev_inspect/text_io.py index 37efb5f..e787e72 100644 --- a/mev_inspect/string_io.py +++ b/mev_inspect/text_io.py @@ -38,3 +38,39 @@ class StringIteratorIO(io.TextIOBase): n -= len(m) line.append(m) return "".join(line) + + +class BytesIteratorIO(io.BufferedIOBase): + def __init__(self, iter: Iterator[bytes]): + self._iter = iter + self._buff = b"" + + def readable(self) -> bool: + return True + + def _read1(self, n: Optional[int] = None) -> bytes: + while not self._buff: + try: + self._buff = next(self._iter) + except StopIteration: + break + ret = self._buff[:n] + self._buff = self._buff[len(ret) :] + return ret + + def read(self, n: Optional[int] = None) -> bytes: + line = [] + if n is None or n < 0: + while True: + m = self._read1() + if not m: + break + line.append(m) + else: + while n > 0: + m = self._read1(n) + if not m: + break + n -= len(m) + line.append(m) + return b"".join(line)