From 95444eae24c0e8582a00902b6170a0aca80b1dab Mon Sep 17 00:00:00 2001 From: Gui Heise Date: Fri, 11 Feb 2022 18:43:39 -0500 Subject: [PATCH] Add actor --- Tiltfile | 40 +++++++++++++---------- cli.py | 9 ----- k8s/mev-inspect/templates/deployment.yaml | 2 +- listener.py | 8 +++++ worker.py | 3 +- 5 files changed, 33 insertions(+), 29 deletions(-) diff --git a/Tiltfile b/Tiltfile index 0ccc758..b18a75c 100644 --- a/Tiltfile +++ b/Tiltfile @@ -95,21 +95,25 @@ local_resource( ) # if using local S3 exports -# k8s_yaml(secret_from_dict("mev-inspect-export", inputs = { -# "export-bucket-name" : "local-export", -# "export-bucket-region": "us-east-1", -# "export-aws-access-key-id": "foobar", -# "export-aws-secret-access-key": "foobar", -# })) -# -# helm_remote( -# "localstack", -# repo_name="localstack-charts", -# repo_url="https://localstack.github.io/helm-charts", -# ) -# -# local_resource( -# 'localstack-port-forward', -# serve_cmd='kubectl port-forward --namespace default svc/localstack 4566:4566', -# resource_deps=["localstack"] -# ) +k8s_yaml(secret_from_dict("mev-inspect-export", inputs = { + "export-bucket-name" : "local-export", + "export-bucket-region": "us-east-1", + "export-aws-access-key-id": "foobar", + "export-aws-secret-access-key": "foobar", +})) + +helm_remote( + "localstack", + repo_name="localstack-charts", + repo_url="https://localstack.github.io/helm-charts", +) + +local_resource( + 'localstack-port-forward', + serve_cmd='kubectl port-forward --namespace default svc/localstack 4566:4566', + resource_deps=["localstack"] +) + +k8s_yaml(configmap_from_dict("mev-inspect-export", inputs = { + "services": "s3", +})) diff --git a/cli.py b/cli.py index 8473b83..d94373a 100644 --- a/cli.py +++ b/cli.py @@ -13,7 +13,6 @@ from mev_inspect.inspector import MEVInspector from mev_inspect.prices import fetch_prices, fetch_prices_range from mev_inspect.queue.broker import connect_broker from mev_inspect.queue.tasks import inspect_many_blocks_task -from mev_inspect.s3_export import export_block RPC_URL_ENV = "RPC_URL" @@ -133,14 +132,6 @@ def fetch_all_prices(): write_prices(inspect_db_session, prices) -@cli.command() -@click.argument("after_block_number", type=int) -@click.argument("before_block_number", type=int) -def s3_export(block_number: int): - inspect_db_session = get_inspect_session() - export_block(inspect_db_session, block_number) - - @cli.command() @click.argument("after", type=click.DateTime(formats=["%Y-%m-%d", "%m-%d-%Y"])) @click.argument("before", type=click.DateTime(formats=["%Y-%m-%d", "%m-%d-%Y"])) diff --git a/k8s/mev-inspect/templates/deployment.yaml b/k8s/mev-inspect/templates/deployment.yaml index b9da25e..7993652 100644 --- a/k8s/mev-inspect/templates/deployment.yaml +++ b/k8s/mev-inspect/templates/deployment.yaml @@ -118,7 +118,7 @@ spec: {{- range .Values.extraEnv }} - name: {{ .name }} value: {{ .value }} - {{- end }} + {{- end }} {{- with .Values.nodeSelector }} nodeSelector: {{- toYaml . | nindent 8 }} diff --git a/listener.py b/listener.py index 9da3a45..f670bb6 100644 --- a/listener.py +++ b/listener.py @@ -3,6 +3,7 @@ import logging import os import aiohttp +import dramatiq from mev_inspect.block import get_latest_block_number from mev_inspect.concurrency import coro @@ -13,6 +14,8 @@ from mev_inspect.crud.latest_block_update import ( from mev_inspect.db import get_inspect_session, get_trace_session from mev_inspect.inspector import MEVInspector from mev_inspect.provider import get_base_provider +from mev_inspect.queue.broker import connect_broker +from mev_inspect.queue.tasks import export_block_task from mev_inspect.signal_handler import GracefulKiller logging.basicConfig(filename="listener.log", filemode="a", level=logging.INFO) @@ -82,6 +85,11 @@ async def inspect_next_block( update_latest_block(inspect_db_session, block_number) + broker = connect_broker() + export_actor = dramatiq.actor(export_block_task, broker=broker) + logger.info(f"Sending block {block_number} for export") + export_actor.send(block_number) + if healthcheck_url: await ping_healthcheck_url(healthcheck_url) else: diff --git a/worker.py b/worker.py index a717b47..e0fc895 100644 --- a/worker.py +++ b/worker.py @@ -10,7 +10,7 @@ from mev_inspect.queue.middleware import ( DbMiddleware, InspectorMiddleware, ) -from mev_inspect.queue.tasks import inspect_many_blocks_task +from mev_inspect.queue.tasks import export_block_task, inspect_many_blocks_task logging.basicConfig(stream=sys.stdout, level=logging.INFO) @@ -21,3 +21,4 @@ broker.add_middleware(InspectorMiddleware(os.environ["RPC_URL"])) dramatiq.set_broker(broker) dramatiq.actor(inspect_many_blocks_task) +dramatiq.actor(export_block_task)