Run query. Export to S3

This commit is contained in:
Luke Van Seters 2022-02-04 15:20:59 -05:00
parent d1529521d6
commit ee93eba7e4
4 changed files with 104 additions and 4 deletions

6
cli.py
View File

@ -10,7 +10,7 @@ from mev_inspect.crud.prices import write_prices
from mev_inspect.db import get_inspect_session, get_trace_session from mev_inspect.db import get_inspect_session, get_trace_session
from mev_inspect.inspector import MEVInspector from mev_inspect.inspector import MEVInspector
from mev_inspect.prices import fetch_prices, fetch_prices_range from mev_inspect.prices import fetch_prices, fetch_prices_range
from mev_inspect.s3_export import list_export_bucket from mev_inspect.s3_export import export_block_range, list_export_bucket
RPC_URL_ENV = "RPC_URL" RPC_URL_ENV = "RPC_URL"
@ -123,10 +123,10 @@ def fetch_all_prices():
@click.argument("after_block_number", type=int) @click.argument("after_block_number", type=int)
@click.argument("before_block_number", type=int) @click.argument("before_block_number", type=int)
def s3_export(after_block_number: int, before_block_number: int): def s3_export(after_block_number: int, before_block_number: int):
logger.info(after_block_number) inspect_db_session = get_inspect_session()
logger.info(before_block_number)
print(list_export_bucket()) print(list_export_bucket())
export_block_range(inspect_db_session, after_block_number, before_block_number)
@cli.command() @cli.command()

View File

@ -4,7 +4,7 @@ from typing import Any, Iterable, List, Optional
from sqlalchemy import create_engine, orm from sqlalchemy import create_engine, orm
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
from mev_inspect.string_io import StringIteratorIO from mev_inspect.text_io import StringIteratorIO
def get_trace_database_uri() -> Optional[str]: def get_trace_database_uri() -> Optional[str]:

64
mev_inspect/s3_export.py Normal file
View File

@ -0,0 +1,64 @@
import json
import boto3
from mev_inspect.text_io import BytesIteratorIO
MEV_SUMMARY_EXPORT_QUERY = """
SELECT to_json(mev_summary)
FROM mev_summary
WHERE
block_number >= :after_block_number AND
block_number < :before_block_number
"""
def export_block_range(
inspect_db_session, after_block_number: int, before_block_number
) -> None:
client = get_s3_client()
bucket_name = get_export_bucket_name()
mev_summary_json_results = inspect_db_session.execute(
statement=MEV_SUMMARY_EXPORT_QUERY,
params={
"after_block_number": after_block_number,
"before_block_number": before_block_number,
},
)
mev_summary_json_fileobj = BytesIteratorIO(
(f"{json.dumps(row)}\n".encode("utf-8") for (row,) in mev_summary_json_results)
)
key = f"mev_summary/{after_block_number}-{before_block_number}.json"
client.upload_fileobj(
mev_summary_json_fileobj,
Bucket=bucket_name,
Key=key,
)
def list_export_bucket():
client = get_s3_client()
return client.list_objects_v2(
Bucket=get_export_bucket_name(),
Prefix="/",
)
# TODO - replaced by ConfigMap
def get_export_bucket_name() -> str:
return "local-export"
# TODO
def get_s3_client():
return boto3.client(
"s3",
region_name="us-east-1",
endpoint_url="http://localstack:4566",
aws_access_key_id="test",
aws_secret_access_key="test",
)

View File

@ -38,3 +38,39 @@ class StringIteratorIO(io.TextIOBase):
n -= len(m) n -= len(m)
line.append(m) line.append(m)
return "".join(line) return "".join(line)
class BytesIteratorIO(io.BufferedIOBase):
def __init__(self, iter: Iterator[bytes]):
self._iter = iter
self._buff = b""
def readable(self) -> bool:
return True
def _read1(self, n: Optional[int] = None) -> bytes:
while not self._buff:
try:
self._buff = next(self._iter)
except StopIteration:
break
ret = self._buff[:n]
self._buff = self._buff[len(ret) :]
return ret
def read(self, n: Optional[int] = None) -> bytes:
line = []
if n is None or n < 0:
while True:
m = self._read1()
if not m:
break
line.append(m)
else:
while n > 0:
m = self._read1(n)
if not m:
break
n -= len(m)
line.append(m)
return b"".join(line)