From 9dbe68b2842f161713a4704b876102dc2043a1d1 Mon Sep 17 00:00:00 2001 From: Gui Heise Date: Fri, 11 Feb 2022 16:39:50 -0500 Subject: [PATCH 1/5] Single block export function --- cli.py | 6 +++--- mev_inspect/s3_export.py | 14 +++++--------- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/cli.py b/cli.py index 7c7ed03..8473b83 100644 --- a/cli.py +++ b/cli.py @@ -13,7 +13,7 @@ from mev_inspect.inspector import MEVInspector from mev_inspect.prices import fetch_prices, fetch_prices_range from mev_inspect.queue.broker import connect_broker from mev_inspect.queue.tasks import inspect_many_blocks_task -from mev_inspect.s3_export import export_block_range +from mev_inspect.s3_export import export_block RPC_URL_ENV = "RPC_URL" @@ -136,9 +136,9 @@ def fetch_all_prices(): @cli.command() @click.argument("after_block_number", type=int) @click.argument("before_block_number", type=int) -def s3_export(after_block_number: int, before_block_number: int): +def s3_export(block_number: int): inspect_db_session = get_inspect_session() - export_block_range(inspect_db_session, after_block_number, before_block_number) + export_block(inspect_db_session, block_number) @cli.command() diff --git a/mev_inspect/s3_export.py b/mev_inspect/s3_export.py index 30ea439..422dfe8 100644 --- a/mev_inspect/s3_export.py +++ b/mev_inspect/s3_export.py @@ -16,25 +16,21 @@ EXPORT_AWS_SECRET_ACCESS_KEY_ENV = "EXPORT_AWS_SECRET_ACCESS_KEY" MEV_SUMMARY_EXPORT_QUERY = """ SELECT to_json(mev_summary) FROM mev_summary - WHERE - block_number >= :after_block_number AND - block_number < :before_block_number +WHERE + block_number = :block_number """ logger = logging.getLogger(__name__) -def export_block_range( - inspect_db_session, after_block_number: int, before_block_number -) -> None: +def export_block(inspect_db_session, block_number: int) -> None: export_bucket_name = get_export_bucket_name() client = get_s3_client() mev_summary_json_results = inspect_db_session.execute( statement=MEV_SUMMARY_EXPORT_QUERY, params={ - "after_block_number": after_block_number, - "before_block_number": before_block_number, + "block_number": block_number, }, ) @@ -42,7 +38,7 @@ def export_block_range( (f"{json.dumps(row)}\n".encode("utf-8") for (row,) in mev_summary_json_results) ) - key = f"mev_summary/flashbots_{after_block_number}_{before_block_number}.json" + key = f"mev_summary/flashbots_{block_number}.json" client.upload_fileobj( mev_summary_json_fileobj, From bb06c8a958d4fe90d368bbf868e937db4cc4b9f1 Mon Sep 17 00:00:00 2001 From: Gui Heise Date: Fri, 11 Feb 2022 16:47:24 -0500 Subject: [PATCH 2/5] Add export task --- mev_inspect/queue/tasks.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/mev_inspect/queue/tasks.py b/mev_inspect/queue/tasks.py index 78a4c3e..a55efd5 100644 --- a/mev_inspect/queue/tasks.py +++ b/mev_inspect/queue/tasks.py @@ -2,6 +2,8 @@ import asyncio import logging from contextlib import contextmanager +from mev_inspect.s3_export import export_block + from .middleware import DbMiddleware, InspectorMiddleware logger = logging.getLogger(__name__) @@ -23,6 +25,11 @@ def inspect_many_blocks_task( ) +def export_block_task(block_number: int): + with _session_scope(DbMiddleware.get_inspect_sessionmaker()) as inspect_db_session: + export_block(inspect_db_session, block_number) + + @contextmanager def _session_scope(Session=None): if Session is None: From 95444eae24c0e8582a00902b6170a0aca80b1dab Mon Sep 17 00:00:00 2001 From: Gui Heise Date: Fri, 11 Feb 2022 18:43:39 -0500 Subject: [PATCH 3/5] Add actor --- Tiltfile | 40 +++++++++++++---------- cli.py | 9 ----- k8s/mev-inspect/templates/deployment.yaml | 2 +- listener.py | 8 +++++ worker.py | 3 +- 5 files changed, 33 insertions(+), 29 deletions(-) diff --git a/Tiltfile b/Tiltfile index 0ccc758..b18a75c 100644 --- a/Tiltfile +++ b/Tiltfile @@ -95,21 +95,25 @@ local_resource( ) # if using local S3 exports -# k8s_yaml(secret_from_dict("mev-inspect-export", inputs = { -# "export-bucket-name" : "local-export", -# "export-bucket-region": "us-east-1", -# "export-aws-access-key-id": "foobar", -# "export-aws-secret-access-key": "foobar", -# })) -# -# helm_remote( -# "localstack", -# repo_name="localstack-charts", -# repo_url="https://localstack.github.io/helm-charts", -# ) -# -# local_resource( -# 'localstack-port-forward', -# serve_cmd='kubectl port-forward --namespace default svc/localstack 4566:4566', -# resource_deps=["localstack"] -# ) +k8s_yaml(secret_from_dict("mev-inspect-export", inputs = { + "export-bucket-name" : "local-export", + "export-bucket-region": "us-east-1", + "export-aws-access-key-id": "foobar", + "export-aws-secret-access-key": "foobar", +})) + +helm_remote( + "localstack", + repo_name="localstack-charts", + repo_url="https://localstack.github.io/helm-charts", +) + +local_resource( + 'localstack-port-forward', + serve_cmd='kubectl port-forward --namespace default svc/localstack 4566:4566', + resource_deps=["localstack"] +) + +k8s_yaml(configmap_from_dict("mev-inspect-export", inputs = { + "services": "s3", +})) diff --git a/cli.py b/cli.py index 8473b83..d94373a 100644 --- a/cli.py +++ b/cli.py @@ -13,7 +13,6 @@ from mev_inspect.inspector import MEVInspector from mev_inspect.prices import fetch_prices, fetch_prices_range from mev_inspect.queue.broker import connect_broker from mev_inspect.queue.tasks import inspect_many_blocks_task -from mev_inspect.s3_export import export_block RPC_URL_ENV = "RPC_URL" @@ -133,14 +132,6 @@ def fetch_all_prices(): write_prices(inspect_db_session, prices) -@cli.command() -@click.argument("after_block_number", type=int) -@click.argument("before_block_number", type=int) -def s3_export(block_number: int): - inspect_db_session = get_inspect_session() - export_block(inspect_db_session, block_number) - - @cli.command() @click.argument("after", type=click.DateTime(formats=["%Y-%m-%d", "%m-%d-%Y"])) @click.argument("before", type=click.DateTime(formats=["%Y-%m-%d", "%m-%d-%Y"])) diff --git a/k8s/mev-inspect/templates/deployment.yaml b/k8s/mev-inspect/templates/deployment.yaml index b9da25e..7993652 100644 --- a/k8s/mev-inspect/templates/deployment.yaml +++ b/k8s/mev-inspect/templates/deployment.yaml @@ -118,7 +118,7 @@ spec: {{- range .Values.extraEnv }} - name: {{ .name }} value: {{ .value }} - {{- end }} + {{- end }} {{- with .Values.nodeSelector }} nodeSelector: {{- toYaml . | nindent 8 }} diff --git a/listener.py b/listener.py index 9da3a45..f670bb6 100644 --- a/listener.py +++ b/listener.py @@ -3,6 +3,7 @@ import logging import os import aiohttp +import dramatiq from mev_inspect.block import get_latest_block_number from mev_inspect.concurrency import coro @@ -13,6 +14,8 @@ from mev_inspect.crud.latest_block_update import ( from mev_inspect.db import get_inspect_session, get_trace_session from mev_inspect.inspector import MEVInspector from mev_inspect.provider import get_base_provider +from mev_inspect.queue.broker import connect_broker +from mev_inspect.queue.tasks import export_block_task from mev_inspect.signal_handler import GracefulKiller logging.basicConfig(filename="listener.log", filemode="a", level=logging.INFO) @@ -82,6 +85,11 @@ async def inspect_next_block( update_latest_block(inspect_db_session, block_number) + broker = connect_broker() + export_actor = dramatiq.actor(export_block_task, broker=broker) + logger.info(f"Sending block {block_number} for export") + export_actor.send(block_number) + if healthcheck_url: await ping_healthcheck_url(healthcheck_url) else: diff --git a/worker.py b/worker.py index a717b47..e0fc895 100644 --- a/worker.py +++ b/worker.py @@ -10,7 +10,7 @@ from mev_inspect.queue.middleware import ( DbMiddleware, InspectorMiddleware, ) -from mev_inspect.queue.tasks import inspect_many_blocks_task +from mev_inspect.queue.tasks import export_block_task, inspect_many_blocks_task logging.basicConfig(stream=sys.stdout, level=logging.INFO) @@ -21,3 +21,4 @@ broker.add_middleware(InspectorMiddleware(os.environ["RPC_URL"])) dramatiq.set_broker(broker) dramatiq.actor(inspect_many_blocks_task) +dramatiq.actor(export_block_task) From c6f7fd509e814fd742767ad3ad2d04e8bd354138 Mon Sep 17 00:00:00 2001 From: Gui Heise Date: Mon, 14 Feb 2022 12:37:52 -0500 Subject: [PATCH 4/5] Export command and function edits --- Tiltfile | 38 +++++++++++++++++++------------------- cli.py | 11 ++++++++++- listener.py | 8 ++++++-- 3 files changed, 35 insertions(+), 22 deletions(-) diff --git a/Tiltfile b/Tiltfile index b18a75c..e89d250 100644 --- a/Tiltfile +++ b/Tiltfile @@ -95,25 +95,25 @@ local_resource( ) # if using local S3 exports -k8s_yaml(secret_from_dict("mev-inspect-export", inputs = { - "export-bucket-name" : "local-export", - "export-bucket-region": "us-east-1", - "export-aws-access-key-id": "foobar", - "export-aws-secret-access-key": "foobar", -})) +#k8s_yaml(secret_from_dict("mev-inspect-export", inputs = { +# "export-bucket-name" : "local-export", +# "export-bucket-region": "us-east-1", +# "export-aws-access-key-id": "foobar", +# "export-aws-secret-access-key": "foobar", +#})) -helm_remote( - "localstack", - repo_name="localstack-charts", - repo_url="https://localstack.github.io/helm-charts", -) +#helm_remote( +# "localstack", +# repo_name="localstack-charts", +# repo_url="https://localstack.github.io/helm-charts", +#) -local_resource( - 'localstack-port-forward', - serve_cmd='kubectl port-forward --namespace default svc/localstack 4566:4566', - resource_deps=["localstack"] -) +l#ocal_resource( +# 'localstack-port-forward', +# serve_cmd='kubectl port-forward --namespace default svc/localstack 4566:4566', +# resource_deps=["localstack"] +#) -k8s_yaml(configmap_from_dict("mev-inspect-export", inputs = { - "services": "s3", -})) +#k8s_yaml(configmap_from_dict("mev-inspect-export", inputs = { +# "services": "s3", +#})) diff --git a/cli.py b/cli.py index d94373a..f752f44 100644 --- a/cli.py +++ b/cli.py @@ -12,7 +12,7 @@ from mev_inspect.db import get_inspect_session, get_trace_session from mev_inspect.inspector import MEVInspector from mev_inspect.prices import fetch_prices, fetch_prices_range from mev_inspect.queue.broker import connect_broker -from mev_inspect.queue.tasks import inspect_many_blocks_task +from mev_inspect.queue.tasks import export_block_task, inspect_many_blocks_task RPC_URL_ENV = "RPC_URL" @@ -132,6 +132,15 @@ def fetch_all_prices(): write_prices(inspect_db_session, prices) +@cli.command() +@click.argument("block_number", type=int) +def s3_export(block_number: int): + broker = connect_broker() + export_actor = dramatiq.actor(export_block_task, broker=broker) + logger.info(f"Sending block {block_number} for export") + export_actor.send(block_number) + + @cli.command() @click.argument("after", type=click.DateTime(formats=["%Y-%m-%d", "%m-%d-%Y"])) @click.argument("before", type=click.DateTime(formats=["%Y-%m-%d", "%m-%d-%Y"])) diff --git a/listener.py b/listener.py index f670bb6..047083a 100644 --- a/listener.py +++ b/listener.py @@ -40,6 +40,9 @@ async def run(): inspect_db_session = get_inspect_session() trace_db_session = get_trace_session() + broker = connect_broker() + export_actor = dramatiq.actor(export_block_task, broker=broker) + inspector = MEVInspector(rpc) base_provider = get_base_provider(rpc) @@ -50,6 +53,7 @@ async def run(): trace_db_session, base_provider, healthcheck_url, + export_actor, ) logger.info("Stopping...") @@ -61,7 +65,9 @@ async def inspect_next_block( trace_db_session, base_provider, healthcheck_url, + export_actor, ): + latest_block_number = await get_latest_block_number(base_provider) last_written_block = find_latest_block_update(inspect_db_session) @@ -85,8 +91,6 @@ async def inspect_next_block( update_latest_block(inspect_db_session, block_number) - broker = connect_broker() - export_actor = dramatiq.actor(export_block_task, broker=broker) logger.info(f"Sending block {block_number} for export") export_actor.send(block_number) From 8c7baecf2a285dae356bfa5dbe49437928f54334 Mon Sep 17 00:00:00 2001 From: Gui Heise Date: Mon, 14 Feb 2022 13:30:20 -0500 Subject: [PATCH 5/5] Syntax --- Tiltfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Tiltfile b/Tiltfile index e89d250..707e8b1 100644 --- a/Tiltfile +++ b/Tiltfile @@ -108,7 +108,7 @@ local_resource( # repo_url="https://localstack.github.io/helm-charts", #) -l#ocal_resource( +#local_resource( # 'localstack-port-forward', # serve_cmd='kubectl port-forward --namespace default svc/localstack 4566:4566', # resource_deps=["localstack"]