From c6f7fd509e814fd742767ad3ad2d04e8bd354138 Mon Sep 17 00:00:00 2001 From: Gui Heise Date: Mon, 14 Feb 2022 12:37:52 -0500 Subject: [PATCH] Export command and function edits --- Tiltfile | 38 +++++++++++++++++++------------------- cli.py | 11 ++++++++++- listener.py | 8 ++++++-- 3 files changed, 35 insertions(+), 22 deletions(-) diff --git a/Tiltfile b/Tiltfile index b18a75c..e89d250 100644 --- a/Tiltfile +++ b/Tiltfile @@ -95,25 +95,25 @@ local_resource( ) # if using local S3 exports -k8s_yaml(secret_from_dict("mev-inspect-export", inputs = { - "export-bucket-name" : "local-export", - "export-bucket-region": "us-east-1", - "export-aws-access-key-id": "foobar", - "export-aws-secret-access-key": "foobar", -})) +#k8s_yaml(secret_from_dict("mev-inspect-export", inputs = { +# "export-bucket-name" : "local-export", +# "export-bucket-region": "us-east-1", +# "export-aws-access-key-id": "foobar", +# "export-aws-secret-access-key": "foobar", +#})) -helm_remote( - "localstack", - repo_name="localstack-charts", - repo_url="https://localstack.github.io/helm-charts", -) +#helm_remote( +# "localstack", +# repo_name="localstack-charts", +# repo_url="https://localstack.github.io/helm-charts", +#) -local_resource( - 'localstack-port-forward', - serve_cmd='kubectl port-forward --namespace default svc/localstack 4566:4566', - resource_deps=["localstack"] -) +l#ocal_resource( +# 'localstack-port-forward', +# serve_cmd='kubectl port-forward --namespace default svc/localstack 4566:4566', +# resource_deps=["localstack"] +#) -k8s_yaml(configmap_from_dict("mev-inspect-export", inputs = { - "services": "s3", -})) +#k8s_yaml(configmap_from_dict("mev-inspect-export", inputs = { +# "services": "s3", +#})) diff --git a/cli.py b/cli.py index d94373a..f752f44 100644 --- a/cli.py +++ b/cli.py @@ -12,7 +12,7 @@ from mev_inspect.db import get_inspect_session, get_trace_session from mev_inspect.inspector import MEVInspector from mev_inspect.prices import fetch_prices, fetch_prices_range from mev_inspect.queue.broker import connect_broker -from mev_inspect.queue.tasks import inspect_many_blocks_task +from mev_inspect.queue.tasks import export_block_task, inspect_many_blocks_task RPC_URL_ENV = "RPC_URL" @@ -132,6 +132,15 @@ def fetch_all_prices(): write_prices(inspect_db_session, prices) +@cli.command() +@click.argument("block_number", type=int) +def s3_export(block_number: int): + broker = connect_broker() + export_actor = dramatiq.actor(export_block_task, broker=broker) + logger.info(f"Sending block {block_number} for export") + export_actor.send(block_number) + + @cli.command() @click.argument("after", type=click.DateTime(formats=["%Y-%m-%d", "%m-%d-%Y"])) @click.argument("before", type=click.DateTime(formats=["%Y-%m-%d", "%m-%d-%Y"])) diff --git a/listener.py b/listener.py index f670bb6..047083a 100644 --- a/listener.py +++ b/listener.py @@ -40,6 +40,9 @@ async def run(): inspect_db_session = get_inspect_session() trace_db_session = get_trace_session() + broker = connect_broker() + export_actor = dramatiq.actor(export_block_task, broker=broker) + inspector = MEVInspector(rpc) base_provider = get_base_provider(rpc) @@ -50,6 +53,7 @@ async def run(): trace_db_session, base_provider, healthcheck_url, + export_actor, ) logger.info("Stopping...") @@ -61,7 +65,9 @@ async def inspect_next_block( trace_db_session, base_provider, healthcheck_url, + export_actor, ): + latest_block_number = await get_latest_block_number(base_provider) last_written_block = find_latest_block_update(inspect_db_session) @@ -85,8 +91,6 @@ async def inspect_next_block( update_latest_block(inspect_db_session, block_number) - broker = connect_broker() - export_actor = dramatiq.actor(export_block_task, broker=broker) logger.info(f"Sending block {block_number} for export") export_actor.send(block_number)