From b86ecbca870a7607483ea0c0d522090b2d79ff2f Mon Sep 17 00:00:00 2001 From: Gui Heise Date: Wed, 16 Feb 2022 10:34:21 -0500 Subject: [PATCH 01/13] Add commands --- cli.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/cli.py b/cli.py index 0114744..e1ea466 100644 --- a/cli.py +++ b/cli.py @@ -175,6 +175,15 @@ def enqueue_s3_export(block_number: int): logger.info(f"Sending block {block_number} export to queue") export_actor.send(block_number) +@cli.command() +@click.argument("block_number", type=int) +def enqueue_many_s3_exports(after_block: int, before_block: int): + broker = connect_broker() + export_actor = dramatiq.actor(export_block_task, broker=broker) + for block_number in range(after_block, before_block): + logger.info(f"Sending block {block_number} export to queue") + export_actor.send(block_number) + @cli.command() @click.argument("block_number", type=int) From c58d75118d6b83b695faf94db0536cfe7f94ca5c Mon Sep 17 00:00:00 2001 From: Gui Heise Date: Wed, 16 Feb 2022 10:41:48 -0500 Subject: [PATCH 02/13] Fix task priorities --- worker.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/worker.py b/worker.py index 82cf349..7daef96 100644 --- a/worker.py +++ b/worker.py @@ -11,7 +11,9 @@ from mev_inspect.queue.middleware import ( InspectorMiddleware, ) from mev_inspect.queue.tasks import ( + HIGH_PRIORITY, HIGH_PRIORITY_QUEUE, + LOW_PRIORITY, LOW_PRIORITY_QUEUE, export_block_task, inspect_many_blocks_task, @@ -25,5 +27,9 @@ broker.add_middleware(AsyncMiddleware()) broker.add_middleware(InspectorMiddleware(os.environ["RPC_URL"])) dramatiq.set_broker(broker) -dramatiq.actor(inspect_many_blocks_task, queue_name=HIGH_PRIORITY_QUEUE) -dramatiq.actor(export_block_task, queue_name=LOW_PRIORITY_QUEUE) +dramatiq.actor( + inspect_many_blocks_task, queue_name=LOW_PRIORITY_QUEUE, priority=LOW_PRIORITY +) +dramatiq.actor( + export_block_task, queue_name=HIGH_PRIORITY_QUEUE, priority=HIGH_PRIORITY +) From 54cc4f1dc697db8a443f85697fa7e011afa928a4 Mon Sep 17 00:00:00 2001 From: Gui Heise Date: Wed, 16 Feb 2022 10:45:34 -0500 Subject: [PATCH 03/13] Add bash script --- cli.py | 4 +++- mev | 7 +++++++ pyproject.toml | 1 + 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/cli.py b/cli.py index e1ea466..8b9b9a9 100644 --- a/cli.py +++ b/cli.py @@ -175,8 +175,10 @@ def enqueue_s3_export(block_number: int): logger.info(f"Sending block {block_number} export to queue") export_actor.send(block_number) + @cli.command() -@click.argument("block_number", type=int) +@click.argument("after_block", type=int) +@click.argument("before_block", type=int) def enqueue_many_s3_exports(after_block: int, before_block: int): broker = connect_broker() export_actor = dramatiq.actor(export_block_task, broker=broker) diff --git a/mev b/mev index 8469268..2fa0a06 100755 --- a/mev +++ b/mev @@ -98,6 +98,13 @@ case "$1" in exit 1 esac ;; + backfill-export) + after_block=$2 + before_block=$3 + + echo "Sending $block_number export to queue" + kubectl exec -ti deploy/mev-inspect -- poetry run enqueue-many-s3-exports $after_block $before_block + ;; enqueue-s3-export) block_number=$2 diff --git a/pyproject.toml b/pyproject.toml index 0c5892f..c8d050b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,6 +44,7 @@ fetch-all-prices = 'cli:fetch_all_prices' fetch-range = 'cli:fetch_range' s3-export = 'cli:s3_export' enqueue-s3-export = 'cli:enqueue_s3_export' +backfill-export = 'cli:enqueue_many_s3_exports' [tool.black] exclude = ''' From c7e94b55d4d0e2dec3b4f82af0f9f4940bdfd118 Mon Sep 17 00:00:00 2001 From: Gui Heise Date: Wed, 16 Feb 2022 10:51:22 -0500 Subject: [PATCH 04/13] Fix poetry config --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index c8d050b..0faecf7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,7 +44,7 @@ fetch-all-prices = 'cli:fetch_all_prices' fetch-range = 'cli:fetch_range' s3-export = 'cli:s3_export' enqueue-s3-export = 'cli:enqueue_s3_export' -backfill-export = 'cli:enqueue_many_s3_exports' +enqueue-many-s3-exports = 'cli:enqueue_many_s3_exports' [tool.black] exclude = ''' From db6b55ad383a1cf8a74f14c3d3fa69cc5c1ec16e Mon Sep 17 00:00:00 2001 From: Gui Heise Date: Wed, 16 Feb 2022 16:21:27 -0500 Subject: [PATCH 05/13] Task priority and queue --- Tiltfile | 38 +++++++++++++++++++------------------- cli.py | 9 +++++++-- mev | 2 +- 3 files changed, 27 insertions(+), 22 deletions(-) diff --git a/Tiltfile b/Tiltfile index 707e8b1..b18a75c 100644 --- a/Tiltfile +++ b/Tiltfile @@ -95,25 +95,25 @@ local_resource( ) # if using local S3 exports -#k8s_yaml(secret_from_dict("mev-inspect-export", inputs = { -# "export-bucket-name" : "local-export", -# "export-bucket-region": "us-east-1", -# "export-aws-access-key-id": "foobar", -# "export-aws-secret-access-key": "foobar", -#})) +k8s_yaml(secret_from_dict("mev-inspect-export", inputs = { + "export-bucket-name" : "local-export", + "export-bucket-region": "us-east-1", + "export-aws-access-key-id": "foobar", + "export-aws-secret-access-key": "foobar", +})) -#helm_remote( -# "localstack", -# repo_name="localstack-charts", -# repo_url="https://localstack.github.io/helm-charts", -#) +helm_remote( + "localstack", + repo_name="localstack-charts", + repo_url="https://localstack.github.io/helm-charts", +) -#local_resource( -# 'localstack-port-forward', -# serve_cmd='kubectl port-forward --namespace default svc/localstack 4566:4566', -# resource_deps=["localstack"] -#) +local_resource( + 'localstack-port-forward', + serve_cmd='kubectl port-forward --namespace default svc/localstack 4566:4566', + resource_deps=["localstack"] +) -#k8s_yaml(configmap_from_dict("mev-inspect-export", inputs = { -# "services": "s3", -#})) +k8s_yaml(configmap_from_dict("mev-inspect-export", inputs = { + "services": "s3", +})) diff --git a/cli.py b/cli.py index 8b9b9a9..fb3ec4e 100644 --- a/cli.py +++ b/cli.py @@ -181,9 +181,14 @@ def enqueue_s3_export(block_number: int): @click.argument("before_block", type=int) def enqueue_many_s3_exports(after_block: int, before_block: int): broker = connect_broker() - export_actor = dramatiq.actor(export_block_task, broker=broker) + export_actor = dramatiq.actor( + export_block_task, + broker=broker, + queue_name=HIGH_PRIORITY_QUEUE, + priority=HIGH_PRIORITY, + ) + logger.info(f"Sending blocks {after_block} to {before_block} to queue") for block_number in range(after_block, before_block): - logger.info(f"Sending block {block_number} export to queue") export_actor.send(block_number) diff --git a/mev b/mev index 2fa0a06..47ffb52 100755 --- a/mev +++ b/mev @@ -102,7 +102,7 @@ case "$1" in after_block=$2 before_block=$3 - echo "Sending $block_number export to queue" + echo "Sending $after_block to $before_block export to queue" kubectl exec -ti deploy/mev-inspect -- poetry run enqueue-many-s3-exports $after_block $before_block ;; enqueue-s3-export) From fe9253ca5e2bb1368e61acb7c4ce897bfaefea51 Mon Sep 17 00:00:00 2001 From: Gui Heise Date: Thu, 17 Feb 2022 10:41:47 -0500 Subject: [PATCH 06/13] Comment Tiltfile --- Tiltfile | 42 +++++++++++++++++++++--------------------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/Tiltfile b/Tiltfile index b18a75c..68f5b72 100644 --- a/Tiltfile +++ b/Tiltfile @@ -95,25 +95,25 @@ local_resource( ) # if using local S3 exports -k8s_yaml(secret_from_dict("mev-inspect-export", inputs = { - "export-bucket-name" : "local-export", - "export-bucket-region": "us-east-1", - "export-aws-access-key-id": "foobar", - "export-aws-secret-access-key": "foobar", -})) +#k8s_yaml(secret_from_dict("mev-inspect-export", inputs = { +# "export-bucket-name" : "local-export", +# "export-bucket-region": "us-east-1", +# "export-aws-access-key-id": "foobar", +# "export-aws-secret-access-key": "foobar", +#})) -helm_remote( - "localstack", - repo_name="localstack-charts", - repo_url="https://localstack.github.io/helm-charts", -) - -local_resource( - 'localstack-port-forward', - serve_cmd='kubectl port-forward --namespace default svc/localstack 4566:4566', - resource_deps=["localstack"] -) - -k8s_yaml(configmap_from_dict("mev-inspect-export", inputs = { - "services": "s3", -})) +#helm_remote( +# "localstack", +# repo_name="localstack-charts", +# repo_url="https://localstack.github.io/helm-charts", +#) +# +#local_resource( +# 'localstack-port-forward', +# serve_cmd='kubectl port-forward --namespace default svc/localstack 4566:4566', +# resource_deps=["localstack"] +#) +# +#k8s_yaml(configmap_from_dict("mev-inspect-export", inputs = { +# "services": "s3", +#})) From b486d53012db0d4d27abd8e4fd5fe3d87bd9784e Mon Sep 17 00:00:00 2001 From: Gui Heise Date: Thu, 17 Feb 2022 11:59:29 -0500 Subject: [PATCH 07/13] Remove priorities --- cli.py | 4 ++-- worker.py | 6 +----- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/cli.py b/cli.py index fb3ec4e..e03de61 100644 --- a/cli.py +++ b/cli.py @@ -184,8 +184,8 @@ def enqueue_many_s3_exports(after_block: int, before_block: int): export_actor = dramatiq.actor( export_block_task, broker=broker, - queue_name=HIGH_PRIORITY_QUEUE, - priority=HIGH_PRIORITY, + queue_name=LOW_PRIORITY_QUEUE, + priority=LOW_PRIORITY, ) logger.info(f"Sending blocks {after_block} to {before_block} to queue") for block_number in range(after_block, before_block): diff --git a/worker.py b/worker.py index 7daef96..ce852f0 100644 --- a/worker.py +++ b/worker.py @@ -11,8 +11,6 @@ from mev_inspect.queue.middleware import ( InspectorMiddleware, ) from mev_inspect.queue.tasks import ( - HIGH_PRIORITY, - HIGH_PRIORITY_QUEUE, LOW_PRIORITY, LOW_PRIORITY_QUEUE, export_block_task, @@ -30,6 +28,4 @@ dramatiq.set_broker(broker) dramatiq.actor( inspect_many_blocks_task, queue_name=LOW_PRIORITY_QUEUE, priority=LOW_PRIORITY ) -dramatiq.actor( - export_block_task, queue_name=HIGH_PRIORITY_QUEUE, priority=HIGH_PRIORITY -) +dramatiq.actor(export_block_task) From cbad9e79b625df9428ecb9d1f7b9a7673d874a2d Mon Sep 17 00:00:00 2001 From: Gui Heise Date: Mon, 21 Feb 2022 10:55:26 -0500 Subject: [PATCH 08/13] Separate tasks --- mev_inspect/queue/tasks.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/mev_inspect/queue/tasks.py b/mev_inspect/queue/tasks.py index f7c9272..51359a6 100644 --- a/mev_inspect/queue/tasks.py +++ b/mev_inspect/queue/tasks.py @@ -37,6 +37,16 @@ def export_block_task(block_number: int): export_block(inspect_db_session, block_number) +def realtime_export_task(block_number: int): + with _session_scope(DbMiddleware.get_inspect_sessionmaker()) as inspect_db_session: + export_block(inspect_db_session, block_number) + + +def backfill_export_task(block_number: int): + with _session_scope(DbMiddleware.get_inspect_sessionmaker()) as inspect_db_session: + export_block(inspect_db_session, block_number) + + @contextmanager def _session_scope(Session=None): if Session is None: From da04bc4351d4bd95fa36fded078602ba90b47557 Mon Sep 17 00:00:00 2001 From: Gui Heise Date: Mon, 21 Feb 2022 10:59:14 -0500 Subject: [PATCH 09/13] Add tasks to CLI --- cli.py | 11 +++++------ mev_inspect/queue/tasks.py | 5 ----- 2 files changed, 5 insertions(+), 11 deletions(-) diff --git a/cli.py b/cli.py index e03de61..ca37135 100644 --- a/cli.py +++ b/cli.py @@ -15,10 +15,9 @@ from mev_inspect.prices import fetch_prices, fetch_prices_range from mev_inspect.queue.broker import connect_broker from mev_inspect.queue.tasks import ( HIGH_PRIORITY, - HIGH_PRIORITY_QUEUE, LOW_PRIORITY, LOW_PRIORITY_QUEUE, - export_block_task, + backfill_export_task, inspect_many_blocks_task, ) from mev_inspect.s3_export import export_block @@ -167,9 +166,9 @@ def fetch_all_prices(): def enqueue_s3_export(block_number: int): broker = connect_broker() export_actor = dramatiq.actor( - export_block_task, + backfill_export_task, broker=broker, - queue_name=HIGH_PRIORITY_QUEUE, + queue_name=LOW_PRIORITY_QUEUE, priority=HIGH_PRIORITY, ) logger.info(f"Sending block {block_number} export to queue") @@ -182,10 +181,10 @@ def enqueue_s3_export(block_number: int): def enqueue_many_s3_exports(after_block: int, before_block: int): broker = connect_broker() export_actor = dramatiq.actor( - export_block_task, + backfill_export_task, broker=broker, queue_name=LOW_PRIORITY_QUEUE, - priority=LOW_PRIORITY, + priority=HIGH_PRIORITY, ) logger.info(f"Sending blocks {after_block} to {before_block} to queue") for block_number in range(after_block, before_block): diff --git a/mev_inspect/queue/tasks.py b/mev_inspect/queue/tasks.py index 51359a6..9e45b4d 100644 --- a/mev_inspect/queue/tasks.py +++ b/mev_inspect/queue/tasks.py @@ -32,11 +32,6 @@ def inspect_many_blocks_task( ) -def export_block_task(block_number: int): - with _session_scope(DbMiddleware.get_inspect_sessionmaker()) as inspect_db_session: - export_block(inspect_db_session, block_number) - - def realtime_export_task(block_number: int): with _session_scope(DbMiddleware.get_inspect_sessionmaker()) as inspect_db_session: export_block(inspect_db_session, block_number) From c6e6d694ec37e21388e7fb7a41216941acf4e891 Mon Sep 17 00:00:00 2001 From: Gui Heise Date: Mon, 21 Feb 2022 11:02:30 -0500 Subject: [PATCH 10/13] Add task to the listener --- listener.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/listener.py b/listener.py index cf7736a..8c147c1 100644 --- a/listener.py +++ b/listener.py @@ -18,7 +18,7 @@ from mev_inspect.queue.broker import connect_broker from mev_inspect.queue.tasks import ( HIGH_PRIORITY, HIGH_PRIORITY_QUEUE, - export_block_task, + realtime_export_task, ) from mev_inspect.signal_handler import GracefulKiller From 5eef1b7a8f883995f22c61f0e03c7f3e56ce84e3 Mon Sep 17 00:00:00 2001 From: Gui Heise Date: Mon, 21 Feb 2022 11:16:22 -0500 Subject: [PATCH 11/13] Add worker and listener task --- listener.py | 2 +- worker.py | 12 ++++++++++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/listener.py b/listener.py index 8c147c1..149d76d 100644 --- a/listener.py +++ b/listener.py @@ -46,7 +46,7 @@ async def run(): broker = connect_broker() export_actor = dramatiq.actor( - export_block_task, + realtime_export_task, broker=broker, queue_name=HIGH_PRIORITY_QUEUE, priority=HIGH_PRIORITY, diff --git a/worker.py b/worker.py index ce852f0..d7e0e10 100644 --- a/worker.py +++ b/worker.py @@ -11,10 +11,13 @@ from mev_inspect.queue.middleware import ( InspectorMiddleware, ) from mev_inspect.queue.tasks import ( + HIGH_PRIORITY, + HIGH_PRIORITY_QUEUE, LOW_PRIORITY, LOW_PRIORITY_QUEUE, - export_block_task, + backfill_export_task, inspect_many_blocks_task, + realtime_export_task, ) logging.basicConfig(stream=sys.stdout, level=logging.INFO) @@ -28,4 +31,9 @@ dramatiq.set_broker(broker) dramatiq.actor( inspect_many_blocks_task, queue_name=LOW_PRIORITY_QUEUE, priority=LOW_PRIORITY ) -dramatiq.actor(export_block_task) +dramatiq.actor( + backfill_export_task, queue_name=LOW_PRIORITY_QUEUE, priority=HIGH_PRIORITY_QUEUE +) +dramatiq.actor( + realtime_export_task, queue_name=HIGH_PRIORITY_QUEUE, priority=HIGH_PRIORITY +) From 180a987a6113f970233f9d8fd5378a7ad9f9a396 Mon Sep 17 00:00:00 2001 From: Gui Heise Date: Mon, 21 Feb 2022 12:45:36 -0500 Subject: [PATCH 12/13] Add low priority to cli tasks --- cli.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/cli.py b/cli.py index ca37135..b5a6674 100644 --- a/cli.py +++ b/cli.py @@ -14,7 +14,6 @@ from mev_inspect.inspector import MEVInspector from mev_inspect.prices import fetch_prices, fetch_prices_range from mev_inspect.queue.broker import connect_broker from mev_inspect.queue.tasks import ( - HIGH_PRIORITY, LOW_PRIORITY, LOW_PRIORITY_QUEUE, backfill_export_task, @@ -169,7 +168,7 @@ def enqueue_s3_export(block_number: int): backfill_export_task, broker=broker, queue_name=LOW_PRIORITY_QUEUE, - priority=HIGH_PRIORITY, + priority=LOW_PRIORITY, ) logger.info(f"Sending block {block_number} export to queue") export_actor.send(block_number) @@ -184,7 +183,7 @@ def enqueue_many_s3_exports(after_block: int, before_block: int): backfill_export_task, broker=broker, queue_name=LOW_PRIORITY_QUEUE, - priority=HIGH_PRIORITY, + priority=LOW_PRIORITY, ) logger.info(f"Sending blocks {after_block} to {before_block} to queue") for block_number in range(after_block, before_block): From 1fbecbec58c462013821af15b6b484d097e7f3a8 Mon Sep 17 00:00:00 2001 From: Gui Heise Date: Mon, 21 Feb 2022 13:19:25 -0500 Subject: [PATCH 13/13] Worker low priority --- worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/worker.py b/worker.py index d7e0e10..7842a8a 100644 --- a/worker.py +++ b/worker.py @@ -32,7 +32,7 @@ dramatiq.actor( inspect_many_blocks_task, queue_name=LOW_PRIORITY_QUEUE, priority=LOW_PRIORITY ) dramatiq.actor( - backfill_export_task, queue_name=LOW_PRIORITY_QUEUE, priority=HIGH_PRIORITY_QUEUE + backfill_export_task, queue_name=LOW_PRIORITY_QUEUE, priority=LOW_PRIORITY_QUEUE ) dramatiq.actor( realtime_export_task, queue_name=HIGH_PRIORITY_QUEUE, priority=HIGH_PRIORITY