Export block/range commands and task

This commit is contained in:
Gui Heise 2022-02-09 11:31:56 -05:00
parent 0c7aaa069f
commit c3f39d38c6
5 changed files with 67 additions and 6 deletions

20
cli.py
View File

@ -10,7 +10,7 @@ from mev_inspect.crud.prices import write_prices
from mev_inspect.db import get_inspect_session, get_trace_session from mev_inspect.db import get_inspect_session, get_trace_session
from mev_inspect.inspector import MEVInspector from mev_inspect.inspector import MEVInspector
from mev_inspect.prices import fetch_prices, fetch_prices_range from mev_inspect.prices import fetch_prices, fetch_prices_range
from mev_inspect.s3_export import export_block_range from mev_inspect.s3_export import export_block, export_block_range
RPC_URL_ENV = "RPC_URL" RPC_URL_ENV = "RPC_URL"
@ -122,11 +122,27 @@ def fetch_all_prices():
@cli.command() @cli.command()
@click.argument("after_block_number", type=int) @click.argument("after_block_number", type=int)
@click.argument("before_block_number", type=int) @click.argument("before_block_number", type=int)
def s3_export(after_block_number: int, before_block_number: int): def s3_export_range(after_block_number: int, before_block_number: int):
inspect_db_session = get_inspect_session() inspect_db_session = get_inspect_session()
export_block_range(inspect_db_session, after_block_number, before_block_number) export_block_range(inspect_db_session, after_block_number, before_block_number)
@cli.command()
@click.argument("block_number", type=int)
def s3_export(block_number: int):
inspect_db_session = get_inspect_session()
export_block(inspect_db_session, block_number)
@cli.command()
@click.argument("block_number", type=int)
def enqueue_s3_export(block_number: int):
# pylint: disable=import-outside-toplevel
from worker import export_block_task
export_block_task.send(block_number)
@cli.command() @cli.command()
@click.argument("after", type=click.DateTime(formats=["%Y-%m-%d", "%m-%d-%Y"])) @click.argument("after", type=click.DateTime(formats=["%Y-%m-%d", "%m-%d-%Y"]))
@click.argument("before", type=click.DateTime(formats=["%Y-%m-%d", "%m-%d-%Y"])) @click.argument("before", type=click.DateTime(formats=["%Y-%m-%d", "%m-%d-%Y"]))

11
mev
View File

@ -94,13 +94,20 @@ case "$1" in
exit 1 exit 1
esac esac
;; ;;
s3-export) s3-export-range)
after_block_number=$2 after_block_number=$2
before_block_number=$3 before_block_number=$3
echo "Exporting from $after_block_number to $before_block_number" echo "Exporting from $after_block_number to $before_block_number"
kubectl exec -ti deploy/mev-inspect -- poetry run s3-export $after_block_number $before_block_number kubectl exec -ti deploy/mev-inspect -- poetry run s3-export-range $after_block_number $before_block_number
;; ;;
s3-export)
block_number=$2
echo "Exporting $block_number"
kubectl exec -ti deploy/mev-inspect -- poetry run s3-export $block_number
;;
exec) exec)
shift shift
kubectl exec -ti deploy/mev-inspect -- $@ kubectl exec -ti deploy/mev-inspect -- $@

View File

@ -8,7 +8,12 @@ import boto3
from mev_inspect.text_io import BytesIteratorIO from mev_inspect.text_io import BytesIteratorIO
AWS_ENDPOINT_URL_ENV = "AWS_ENDPOINT_URL" AWS_ENDPOINT_URL_ENV = "AWS_ENDPOINT_URL"
MEV_SUMMARY_EXPORT_QUERY = """ MEV_SUMMARY_EXPORT_BLOCK_QUERY = """
SELECT to_json(mev_summary)
FROM mev_summary
WHERE block_number=:block_number
"""
MEV_SUMMARY_EXPORT_RANGE_QUERY = """
SELECT to_json(mev_summary) SELECT to_json(mev_summary)
FROM mev_summary FROM mev_summary
WHERE WHERE
@ -26,7 +31,7 @@ def export_block_range(
client = get_s3_client() client = get_s3_client()
mev_summary_json_results = inspect_db_session.execute( mev_summary_json_results = inspect_db_session.execute(
statement=MEV_SUMMARY_EXPORT_QUERY, statement=MEV_SUMMARY_EXPORT_RANGE_QUERY,
params={ params={
"after_block_number": after_block_number, "after_block_number": after_block_number,
"before_block_number": before_block_number, "before_block_number": before_block_number,
@ -48,6 +53,29 @@ def export_block_range(
logger.info(f"Exported to {key}") logger.info(f"Exported to {key}")
def export_block(inspect_db_session, block_number: int) -> None:
export_bucket_name = get_export_bucket_name()
client = get_s3_client()
mev_summary_json_results = inspect_db_session.execute(
statement=MEV_SUMMARY_EXPORT_BLOCK_QUERY,
params={"block_number": block_number},
)
mev_summary_json_fileobj = BytesIteratorIO(
(f"{json.dumps(row)}\n".encode("utf-8") for (row,) in mev_summary_json_results)
)
key = f"mev_summary/flashbots_{block_number}.json"
client.upload_fileobj(
mev_summary_json_fileobj,
Bucket=export_bucket_name,
Key=key,
)
logger.info(f"Exported to {key}")
def get_s3_client(): def get_s3_client():
endpoint_url = get_endpoint_url() endpoint_url = get_endpoint_url()
return boto3.client( return boto3.client(

View File

@ -41,6 +41,7 @@ enqueue-many-blocks = 'cli:enqueue_many_blocks_command'
fetch-block = 'cli:fetch_block_command' fetch-block = 'cli:fetch_block_command'
fetch-all-prices = 'cli:fetch_all_prices' fetch-all-prices = 'cli:fetch_all_prices'
fetch-range = 'cli:fetch_range' fetch-range = 'cli:fetch_range'
s3-export-range = 'cli:s3_export_range'
s3-export = 'cli:s3_export' s3-export = 'cli:s3_export'
[tool.black] [tool.black]

View File

@ -12,6 +12,7 @@ from dramatiq.middleware import Middleware
from mev_inspect.db import get_inspect_sessionmaker, get_trace_sessionmaker from mev_inspect.db import get_inspect_sessionmaker, get_trace_sessionmaker
from mev_inspect.inspector import MEVInspector from mev_inspect.inspector import MEVInspector
from mev_inspect.s3_export import export_block
InspectSession = get_inspect_sessionmaker() InspectSession = get_inspect_sessionmaker()
TraceSession = get_trace_sessionmaker() TraceSession = get_trace_sessionmaker()
@ -83,5 +84,13 @@ def inspect_many_blocks_task(
) )
@dramatiq.actor
def export_block_task(
block_number: int,
):
with session_scope(InspectSession) as inspect_db_session:
export_block(inspect_db_session, block_number)
if __name__ == "__main__": if __name__ == "__main__":
dramatiq_worker(processes=1, threads=1) dramatiq_worker(processes=1, threads=1)