From fa14caec1765ca0d7bfdd9e34f8e27a778b2106f Mon Sep 17 00:00:00 2001 From: Ryan Radomski Date: Thu, 13 Jan 2022 21:59:59 +0000 Subject: [PATCH 1/4] Added block-list command to enqueue a list of blocks from stdin --- README.md | 9 +++++++++ cli.py | 11 +++++++++++ mev | 4 ++++ pyproject.toml | 1 + 4 files changed, 25 insertions(+) diff --git a/README.md b/README.md index b532523..487ec56 100644 --- a/README.md +++ b/README.md @@ -162,6 +162,15 @@ DEL dramatiq:default.DQ.msgs For more information on queues, see the [spec shared by dramatiq](https://github.com/Bogdanp/dramatiq/blob/24cbc0dc551797783f41b08ea461e1b5d23a4058/dramatiq/brokers/redis/dispatch.lua#L24-L43) +To query a list of missing blocks in a range, run: +``` +psql postgresql://postgres:password@localhost:5432/mev_inspect -c "select generate_series(13999471, 13999571) EXCEPT (select distinct block_number from blocks)" --csv -t +``` + +To backfill missing blocks in a range, run: +``` +psql postgresql://postgres:password@localhost:5432/mev_inspect -c "select generate_series(13999471, 13999571) EXCEPT (select distinct block_number from blocks)" --csv -t | ./mev block-list +``` To watch the logs for a given worker pod, take its pod name using the above, then run: ``` diff --git a/cli.py b/cli.py index 9c5cefb..fa0092c 100644 --- a/cli.py +++ b/cli.py @@ -1,6 +1,7 @@ import logging import os import sys +import fileinput from datetime import datetime import click @@ -102,6 +103,16 @@ async def inspect_many_blocks_command( before_block=before_block, ) +@cli.command() +def enqueue_block_list_command(): + from worker import ( # pylint: disable=import-outside-toplevel + inspect_many_blocks_task, + ) + + for block_string in fileinput.input(): + block = int(block_string) + logger.info(f"Sending {block} to {block+1}") + inspect_many_blocks_task.send(block, block+1) @cli.command() @click.argument("start_block", type=int) diff --git a/mev b/mev index 372da57..0b2248f 100755 --- a/mev +++ b/mev @@ -45,6 +45,10 @@ case "$1" in listener) kubectl exec -ti deploy/mev-inspect -- ./listener $2 ;; + block-list) + echo "Backfilling blocks from stdin" + kubectl exec -ti deploy/mev-inspect -- poetry run enqueue-block-list + ;; backfill) after_block_number=$2 before_block_number=$3 diff --git a/pyproject.toml b/pyproject.toml index e673d39..0c5892f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,6 +38,7 @@ build-backend = "poetry.core.masonry.api" inspect-block = 'cli:inspect_block_command' inspect-many-blocks = 'cli:inspect_many_blocks_command' enqueue-many-blocks = 'cli:enqueue_many_blocks_command' +enqueue-block-list = 'cli:enqueue_block_list_command' fetch-block = 'cli:fetch_block_command' fetch-all-prices = 'cli:fetch_all_prices' fetch-range = 'cli:fetch_range' From 1b42920dd138f9f36a4bf79a162604cab3fe01f8 Mon Sep 17 00:00:00 2001 From: Ryan Radomski Date: Mon, 17 Jan 2022 21:53:46 +0000 Subject: [PATCH 2/4] Changed backfilling a list of blocks in Readme to using a text file example --- README.md | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 487ec56..5a4acef 100644 --- a/README.md +++ b/README.md @@ -162,14 +162,18 @@ DEL dramatiq:default.DQ.msgs For more information on queues, see the [spec shared by dramatiq](https://github.com/Bogdanp/dramatiq/blob/24cbc0dc551797783f41b08ea461e1b5d23a4058/dramatiq/brokers/redis/dispatch.lua#L24-L43) -To query a list of missing blocks in a range, run: +**Backfilling a list of blocks** + +Create a file containing a block per row, for example blocks.txt containing: ``` -psql postgresql://postgres:password@localhost:5432/mev_inspect -c "select generate_series(13999471, 13999571) EXCEPT (select distinct block_number from blocks)" --csv -t +12500000 +12500001 +12500002 ``` -To backfill missing blocks in a range, run: +Then queue the blocks with ``` -psql postgresql://postgres:password@localhost:5432/mev_inspect -c "select generate_series(13999471, 13999571) EXCEPT (select distinct block_number from blocks)" --csv -t | ./mev block-list +cat blocks.txt | ./mev block-list ``` To watch the logs for a given worker pod, take its pod name using the above, then run: From cb6f20ba634bfb7ac9fea88d0121bf446c6d5402 Mon Sep 17 00:00:00 2001 From: Luke Van Seters Date: Wed, 16 Feb 2022 11:56:33 -0500 Subject: [PATCH 3/4] No -t for stdin push --- mev | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mev b/mev index 0b2248f..8469268 100755 --- a/mev +++ b/mev @@ -47,7 +47,7 @@ case "$1" in ;; block-list) echo "Backfilling blocks from stdin" - kubectl exec -ti deploy/mev-inspect -- poetry run enqueue-block-list + kubectl exec -i deploy/mev-inspect -- poetry run enqueue-block-list ;; backfill) after_block_number=$2 From 19eb48aec0388b838b02fb2e71a51151f01c0aea Mon Sep 17 00:00:00 2001 From: Luke Van Seters Date: Wed, 16 Feb 2022 11:56:50 -0500 Subject: [PATCH 4/4] Use the actor --- cli.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/cli.py b/cli.py index fa0092c..0114744 100644 --- a/cli.py +++ b/cli.py @@ -1,7 +1,7 @@ +import fileinput import logging import os import sys -import fileinput from datetime import datetime import click @@ -103,16 +103,22 @@ async def inspect_many_blocks_command( before_block=before_block, ) + @cli.command() def enqueue_block_list_command(): - from worker import ( # pylint: disable=import-outside-toplevel + broker = connect_broker() + inspect_many_blocks_actor = dramatiq.actor( inspect_many_blocks_task, + broker=broker, + queue_name=LOW_PRIORITY_QUEUE, + priority=LOW_PRIORITY, ) for block_string in fileinput.input(): block = int(block_string) logger.info(f"Sending {block} to {block+1}") - inspect_many_blocks_task.send(block, block+1) + inspect_many_blocks_actor.send(block, block + 1) + @cli.command() @click.argument("start_block", type=int)