diff --git a/Tiltfile b/Tiltfile index 7dc1ec1..0ccc758 100644 --- a/Tiltfile +++ b/Tiltfile @@ -42,17 +42,43 @@ docker_build("mev-inspect-py", ".", trigger="./pyproject.toml"), ], ) -k8s_yaml(helm('./k8s/mev-inspect', name='mev-inspect')) + +k8s_yaml(helm( + './k8s/mev-inspect', + name='mev-inspect', + set=[ + "extraEnv[0].name=AWS_ACCESS_KEY_ID", + "extraEnv[0].value=foobar", + "extraEnv[1].name=AWS_SECRET_ACCESS_KEY", + "extraEnv[1].value=foobar", + "extraEnv[2].name=AWS_REGION", + "extraEnv[2].value=us-east-1", + "extraEnv[3].name=AWS_ENDPOINT_URL", + "extraEnv[3].value=http://localstack:4566", + ], +)) + +k8s_yaml(helm( + './k8s/mev-inspect-workers', + name='mev-inspect-workers', + set=[ + "extraEnv[0].name=AWS_ACCESS_KEY_ID", + "extraEnv[0].value=foobar", + "extraEnv[1].name=AWS_SECRET_ACCESS_KEY", + "extraEnv[1].value=foobar", + "extraEnv[2].name=AWS_REGION", + "extraEnv[2].value=us-east-1", + "extraEnv[3].name=AWS_ENDPOINT_URL", + "extraEnv[3].value=http://localstack:4566", + "replicaCount=1", + ], +)) + k8s_resource( workload="mev-inspect", resource_deps=["postgresql", "redis-master"], ) -k8s_yaml(helm( - './k8s/mev-inspect-workers', - name='mev-inspect-workers', - set=["replicaCount=1"], -)) k8s_resource( workload="mev-inspect-workers", resource_deps=["postgresql", "redis-master"], @@ -67,3 +93,23 @@ local_resource( serve_cmd='kubectl port-forward --namespace default svc/postgresql 5432:5432', resource_deps=["postgresql"] ) + +# if using local S3 exports +# k8s_yaml(secret_from_dict("mev-inspect-export", inputs = { +# "export-bucket-name" : "local-export", +# "export-bucket-region": "us-east-1", +# "export-aws-access-key-id": "foobar", +# "export-aws-secret-access-key": "foobar", +# })) +# +# helm_remote( +# "localstack", +# repo_name="localstack-charts", +# repo_url="https://localstack.github.io/helm-charts", +# ) +# +# local_resource( +# 'localstack-port-forward', +# serve_cmd='kubectl port-forward --namespace default svc/localstack 4566:4566', +# resource_deps=["localstack"] +# ) diff --git a/cli.py b/cli.py index d94373a..7c7ed03 100644 --- a/cli.py +++ b/cli.py @@ -13,6 +13,7 @@ from mev_inspect.inspector import MEVInspector from mev_inspect.prices import fetch_prices, fetch_prices_range from mev_inspect.queue.broker import connect_broker from mev_inspect.queue.tasks import inspect_many_blocks_task +from mev_inspect.s3_export import export_block_range RPC_URL_ENV = "RPC_URL" @@ -132,6 +133,14 @@ def fetch_all_prices(): write_prices(inspect_db_session, prices) +@cli.command() +@click.argument("after_block_number", type=int) +@click.argument("before_block_number", type=int) +def s3_export(after_block_number: int, before_block_number: int): + inspect_db_session = get_inspect_session() + export_block_range(inspect_db_session, after_block_number, before_block_number) + + @cli.command() @click.argument("after", type=click.DateTime(formats=["%Y-%m-%d", "%m-%d-%Y"])) @click.argument("before", type=click.DateTime(formats=["%Y-%m-%d", "%m-%d-%Y"])) diff --git a/k8s/mev-inspect-workers/templates/deployment.yaml b/k8s/mev-inspect-workers/templates/deployment.yaml index 8b979aa..0a3238f 100644 --- a/k8s/mev-inspect-workers/templates/deployment.yaml +++ b/k8s/mev-inspect-workers/templates/deployment.yaml @@ -91,6 +91,34 @@ spec: name: mev-inspect-listener-healthcheck key: url optional: true + - name: EXPORT_BUCKET_NAME + valueFrom: + secretKeyRef: + name: mev-inspect-export + key: export-bucket-name + optional: true + - name: EXPORT_BUCKET_REGION + valueFrom: + secretKeyRef: + name: mev-inspect-export + key: export-bucket-region + optional: true + - name: EXPORT_AWS_ACCESS_KEY_ID + valueFrom: + secretKeyRef: + name: mev-inspect-export + key: export-aws-access-key-id + optional: true + - name: EXPORT_AWS_SECRET_ACCESS_KEY + valueFrom: + secretKeyRef: + name: mev-inspect-export + key: export-aws-secret-access-key + optional: true + {{- range .Values.extraEnv }} + - name: {{ .name }} + value: {{ .value }} + {{- end }} {{- with .Values.nodeSelector }} nodeSelector: {{- toYaml . | nindent 8 }} diff --git a/k8s/mev-inspect/templates/deployment.yaml b/k8s/mev-inspect/templates/deployment.yaml index fa2123f..b9da25e 100644 --- a/k8s/mev-inspect/templates/deployment.yaml +++ b/k8s/mev-inspect/templates/deployment.yaml @@ -91,6 +91,34 @@ spec: name: mev-inspect-listener-healthcheck key: url optional: true + - name: EXPORT_BUCKET_NAME + valueFrom: + secretKeyRef: + name: mev-inspect-export + key: export-bucket-name + optional: true + - name: EXPORT_BUCKET_REGION + valueFrom: + secretKeyRef: + name: mev-inspect-export + key: export-bucket-region + optional: true + - name: EXPORT_AWS_ACCESS_KEY_ID + valueFrom: + secretKeyRef: + name: mev-inspect-export + key: export-aws-access-key-id + optional: true + - name: EXPORT_AWS_SECRET_ACCESS_KEY + valueFrom: + secretKeyRef: + name: mev-inspect-export + key: export-aws-secret-access-key + optional: true + {{- range .Values.extraEnv }} + - name: {{ .name }} + value: {{ .value }} + {{- end }} {{- with .Values.nodeSelector }} nodeSelector: {{- toYaml . | nindent 8 }} diff --git a/mev b/mev index eb81fc6..65a3556 100755 --- a/mev +++ b/mev @@ -46,11 +46,11 @@ case "$1" in kubectl exec -ti deploy/mev-inspect -- ./listener $2 ;; backfill) - start_block_number=$2 - end_block_number=$3 + after_block_number=$2 + before_block_number=$3 - echo "Backfilling from $start_block_number to $end_block_number" - kubectl exec -ti deploy/mev-inspect -- poetry run enqueue-many-blocks $start_block_number $end_block_number + echo "Backfilling from $after_block_number to $before_block_number" + kubectl exec -ti deploy/mev-inspect -- poetry run enqueue-many-blocks $after_block_number $before_block_number ;; inspect) block_number=$2 @@ -58,11 +58,11 @@ case "$1" in kubectl exec -ti deploy/mev-inspect -- poetry run inspect-block $block_number ;; inspect-many) - start_block_number=$2 - end_block_number=$3 - echo "Inspecting from block $start_block_number to $end_block_number" + after_block_number=$2 + before_block_number=$3 + echo "Inspecting from block $after_block_number to $before_block_number" kubectl exec -ti deploy/mev-inspect -- \ - poetry run inspect-many-blocks $start_block_number $end_block_number + poetry run inspect-many-blocks $after_block_number $before_block_number ;; test) shift @@ -94,6 +94,13 @@ case "$1" in exit 1 esac ;; + s3-export) + after_block_number=$2 + before_block_number=$3 + + echo "Exporting from $after_block_number to $before_block_number" + kubectl exec -ti deploy/mev-inspect -- poetry run s3-export $after_block_number $before_block_number + ;; exec) shift kubectl exec -ti deploy/mev-inspect -- $@ diff --git a/mev_inspect/db.py b/mev_inspect/db.py index dd7c66a..7fb1a97 100644 --- a/mev_inspect/db.py +++ b/mev_inspect/db.py @@ -4,7 +4,7 @@ from typing import Any, Iterable, List, Optional from sqlalchemy import create_engine, orm from sqlalchemy.orm import sessionmaker -from mev_inspect.string_io import StringIteratorIO +from mev_inspect.text_io import StringIteratorIO def get_trace_database_uri() -> Optional[str]: diff --git a/mev_inspect/s3_export.py b/mev_inspect/s3_export.py new file mode 100644 index 0000000..30ea439 --- /dev/null +++ b/mev_inspect/s3_export.py @@ -0,0 +1,84 @@ +import json +import logging +import os +from typing import Optional + +import boto3 + +from mev_inspect.text_io import BytesIteratorIO + +AWS_ENDPOINT_URL_ENV = "AWS_ENDPOINT_URL" +EXPORT_BUCKET_NAME_ENV = "EXPORT_BUCKET_NAME" +EXPORT_BUCKET_REGION_ENV = "EXPORT_BUCKET_REGION" +EXPORT_AWS_ACCESS_KEY_ID_ENV = "EXPORT_AWS_ACCESS_KEY_ID" +EXPORT_AWS_SECRET_ACCESS_KEY_ENV = "EXPORT_AWS_SECRET_ACCESS_KEY" + +MEV_SUMMARY_EXPORT_QUERY = """ + SELECT to_json(mev_summary) + FROM mev_summary + WHERE + block_number >= :after_block_number AND + block_number < :before_block_number + """ + +logger = logging.getLogger(__name__) + + +def export_block_range( + inspect_db_session, after_block_number: int, before_block_number +) -> None: + export_bucket_name = get_export_bucket_name() + client = get_s3_client() + + mev_summary_json_results = inspect_db_session.execute( + statement=MEV_SUMMARY_EXPORT_QUERY, + params={ + "after_block_number": after_block_number, + "before_block_number": before_block_number, + }, + ) + + mev_summary_json_fileobj = BytesIteratorIO( + (f"{json.dumps(row)}\n".encode("utf-8") for (row,) in mev_summary_json_results) + ) + + key = f"mev_summary/flashbots_{after_block_number}_{before_block_number}.json" + + client.upload_fileobj( + mev_summary_json_fileobj, + Bucket=export_bucket_name, + Key=key, + ) + + logger.info(f"Exported to {key}") + + +def get_s3_client(): + endpoint_url = get_endpoint_url() + return boto3.client( + "s3", + endpoint_url=endpoint_url, + region_name=get_export_bucket_region(), + aws_access_key_id=get_export_aws_access_key_id(), + aws_secret_access_key=get_export_aws_secret_access_key(), + ) + + +def get_endpoint_url() -> Optional[str]: + return os.environ.get(AWS_ENDPOINT_URL_ENV) + + +def get_export_bucket_name() -> str: + return os.environ[EXPORT_BUCKET_NAME_ENV] + + +def get_export_bucket_region() -> Optional[str]: + return os.environ.get(EXPORT_BUCKET_REGION_ENV) + + +def get_export_aws_access_key_id() -> Optional[str]: + return os.environ.get(EXPORT_AWS_ACCESS_KEY_ID_ENV) + + +def get_export_aws_secret_access_key() -> Optional[str]: + return os.environ.get(EXPORT_AWS_SECRET_ACCESS_KEY_ENV) diff --git a/mev_inspect/string_io.py b/mev_inspect/text_io.py similarity index 52% rename from mev_inspect/string_io.py rename to mev_inspect/text_io.py index 37efb5f..e787e72 100644 --- a/mev_inspect/string_io.py +++ b/mev_inspect/text_io.py @@ -38,3 +38,39 @@ class StringIteratorIO(io.TextIOBase): n -= len(m) line.append(m) return "".join(line) + + +class BytesIteratorIO(io.BufferedIOBase): + def __init__(self, iter: Iterator[bytes]): + self._iter = iter + self._buff = b"" + + def readable(self) -> bool: + return True + + def _read1(self, n: Optional[int] = None) -> bytes: + while not self._buff: + try: + self._buff = next(self._iter) + except StopIteration: + break + ret = self._buff[:n] + self._buff = self._buff[len(ret) :] + return ret + + def read(self, n: Optional[int] = None) -> bytes: + line = [] + if n is None or n < 0: + while True: + m = self._read1() + if not m: + break + line.append(m) + else: + while n > 0: + m = self._read1(n) + if not m: + break + n -= len(m) + line.append(m) + return b"".join(line) diff --git a/poetry.lock b/poetry.lock index fc708b4..2299d5a 100644 --- a/poetry.lock +++ b/poetry.lock @@ -119,6 +119,38 @@ category = "main" optional = false python-versions = "*" +[[package]] +name = "boto3" +version = "1.20.48" +description = "The AWS SDK for Python" +category = "main" +optional = false +python-versions = ">= 3.6" + +[package.dependencies] +botocore = ">=1.23.48,<1.24.0" +jmespath = ">=0.7.1,<1.0.0" +s3transfer = ">=0.5.0,<0.6.0" + +[package.extras] +crt = ["botocore[crt] (>=1.21.0,<2.0a0)"] + +[[package]] +name = "botocore" +version = "1.23.48" +description = "Low-level, data-driven core of boto 3." +category = "main" +optional = false +python-versions = ">= 3.6" + +[package.dependencies] +jmespath = ">=0.7.1,<1.0.0" +python-dateutil = ">=2.1,<3.0.0" +urllib3 = ">=1.25.4,<1.27" + +[package.extras] +crt = ["awscrt (==0.12.5)"] + [[package]] name = "bottle" version = "0.12.19" @@ -233,7 +265,7 @@ python-versions = "*" [[package]] name = "dramatiq" -version = "1.12.1" +version = "1.12.3" description = "Background Processing for Python 3." category = "main" optional = false @@ -244,8 +276,8 @@ prometheus-client = ">=0.2" redis = {version = ">=2.0,<5.0", optional = true, markers = "extra == \"redis\""} [package.extras] -all = ["redis (>=2.0,<5.0)", "pika (>=1.0,<2.0)", "watchdog", "pylibmc (>=1.5,<2.0)", "watchdog-gevent", "gevent (>=1.1)"] -dev = ["redis (>=2.0,<5.0)", "pika (>=1.0,<2.0)", "watchdog", "pylibmc (>=1.5,<2.0)", "watchdog-gevent", "gevent (>=1.1)", "alabaster", "sphinx (<1.8)", "sphinxcontrib-napoleon", "flake8", "flake8-bugbear", "flake8-quotes", "isort", "bumpversion", "hiredis", "twine", "wheel", "pytest", "pytest-benchmark", "pytest-cov", "tox"] +all = ["gevent (>=1.1)", "redis (>=2.0,<5.0)", "watchdog", "pika (>=1.0,<2.0)", "watchdog-gevent", "pylibmc (>=1.5,<2.0)"] +dev = ["gevent (>=1.1)", "redis (>=2.0,<5.0)", "watchdog", "pika (>=1.0,<2.0)", "watchdog-gevent", "pylibmc (>=1.5,<2.0)", "alabaster", "sphinx (<1.8)", "sphinxcontrib-napoleon", "flake8", "flake8-bugbear", "flake8-quotes", "isort", "bumpversion", "hiredis", "twine", "wheel", "pytest", "pytest-benchmark", "pytest-cov", "tox"] gevent = ["gevent (>=1.1)"] memcached = ["pylibmc (>=1.5,<2.0)"] rabbitmq = ["pika (>=1.0,<2.0)"] @@ -504,6 +536,14 @@ requirements_deprecated_finder = ["pipreqs", "pip-api"] colors = ["colorama (>=0.4.3,<0.5.0)"] plugins = ["setuptools"] +[[package]] +name = "jmespath" +version = "0.10.0" +description = "JSON Matching Expressions" +category = "main" +optional = false +python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*" + [[package]] name = "jsonschema" version = "3.2.0" @@ -866,7 +906,7 @@ termcolor = ">=1.1.0" name = "python-dateutil" version = "2.8.2" description = "Extensions to the standard Python datetime module" -category = "dev" +category = "main" optional = false python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7" @@ -955,6 +995,20 @@ lint = ["flake8 (==3.4.1)"] rust-backend = ["rusty-rlp (>=0.1.15,<0.2)"] test = ["hypothesis (==5.19.0)", "pytest (==5.4.3)", "tox (>=2.9.1,<3)"] +[[package]] +name = "s3transfer" +version = "0.5.1" +description = "An Amazon S3 Transfer Manager" +category = "main" +optional = false +python-versions = ">= 3.6" + +[package.dependencies] +botocore = ">=1.12.36,<2.0a.0" + +[package.extras] +crt = ["botocore[crt] (>=1.20.29,<2.0a.0)"] + [[package]] name = "six" version = "1.16.0" @@ -1127,7 +1181,7 @@ multidict = ">=4.0" [metadata] lock-version = "1.1" python-versions = "^3.9" -content-hash = "955c3df01b275e9b4807190e468a2df4d3d18b6a45a7c1659599ef476b35be51" +content-hash = "063e246b07155c7bbc227ffd8a0d237d402a3eb00a804dbb389b67b7a0e35354" [metadata.files] aiohttp = [ @@ -1239,6 +1293,14 @@ base58 = [ bitarray = [ {file = "bitarray-1.2.2.tar.gz", hash = "sha256:27a69ffcee3b868abab3ce8b17c69e02b63e722d4d64ffd91d659f81e9984954"}, ] +boto3 = [ + {file = "boto3-1.20.48-py3-none-any.whl", hash = "sha256:1c6301d9676cb18f2b0feddec393e52b9d5fa8147e6fe9a1665e39fd9739efc3"}, + {file = "boto3-1.20.48.tar.gz", hash = "sha256:6a8111492a571aeefbac2e4b6df5ce38bdbc16c7d8326f2a60a61c86032c49b0"}, +] +botocore = [ + {file = "botocore-1.23.48-py3-none-any.whl", hash = "sha256:768acb9a2247155b974a4184b29be321242ef8f61827f4bb958e60f00e476e90"}, + {file = "botocore-1.23.48.tar.gz", hash = "sha256:8652c11ff05d11d6cea7096aca8df7f8eb87980469860036ff47e196e4625c96"}, +] bottle = [ {file = "bottle-0.12.19-py3-none-any.whl", hash = "sha256:f6b8a34fe9aa406f9813c02990db72ca69ce6a158b5b156d2c41f345016a723d"}, {file = "bottle-0.12.19.tar.gz", hash = "sha256:a9d73ffcbc6a1345ca2d7949638db46349f5b2b77dac65d6494d45c23628da2c"}, @@ -1352,8 +1414,8 @@ distlib = [ {file = "distlib-0.3.2.zip", hash = "sha256:106fef6dc37dd8c0e2c0a60d3fca3e77460a48907f335fa28420463a6f799736"}, ] dramatiq = [ - {file = "dramatiq-1.12.1-py3-none-any.whl", hash = "sha256:caf8f5baed6cb4afaf73b8379ffcd07f483de990b0f93f05d336d4efdcdfdecf"}, - {file = "dramatiq-1.12.1.tar.gz", hash = "sha256:0aabb8e9164a7b88b3799319bbe294f9823eaf8b9fa9f701dd45affc8ea57bbe"}, + {file = "dramatiq-1.12.3-py3-none-any.whl", hash = "sha256:eccb0f54d44ebd9e2c79e00d67b808397589a1a621ba7c5fd58df5fb6204a0a8"}, + {file = "dramatiq-1.12.3.tar.gz", hash = "sha256:380bd77b6b19d642f417b642935049ff71ddf4b4e57d821e4f55b92541430f21"}, ] eth-abi = [ {file = "eth_abi-2.1.1-py3-none-any.whl", hash = "sha256:78df5d2758247a8f0766a7cfcea4575bcfe568c34a33e6d05a72c328a9040444"}, @@ -1544,6 +1606,10 @@ isort = [ {file = "isort-5.9.3-py3-none-any.whl", hash = "sha256:e17d6e2b81095c9db0a03a8025a957f334d6ea30b26f9ec70805411e5c7c81f2"}, {file = "isort-5.9.3.tar.gz", hash = "sha256:9c2ea1e62d871267b78307fe511c0838ba0da28698c5732d54e2790bf3ba9899"}, ] +jmespath = [ + {file = "jmespath-0.10.0-py2.py3-none-any.whl", hash = "sha256:cdf6525904cc597730141d61b36f2e4b8ecc257c420fa2f4549bac2c2d0cb72f"}, + {file = "jmespath-0.10.0.tar.gz", hash = "sha256:b85d0567b8666149a93172712e68920734333c0ce7e89b78b3e987f71e5ed4f9"}, +] jsonschema = [ {file = "jsonschema-3.2.0-py2.py3-none-any.whl", hash = "sha256:4e5b3cf8216f577bee9ce139cbe72eca3ea4f292ec60928ff24758ce626cd163"}, {file = "jsonschema-3.2.0.tar.gz", hash = "sha256:c8a85b28d377cc7737e46e2d9f2b4f44ee3c0e1deac6bf46ddefc7187d30797a"}, @@ -2009,6 +2075,10 @@ rlp = [ {file = "rlp-2.0.1-py2.py3-none-any.whl", hash = "sha256:52a57c9f53f03c88b189283734b397314288250cc4a3c4113e9e36e2ac6bdd16"}, {file = "rlp-2.0.1.tar.gz", hash = "sha256:665e8312750b3fc5f7002e656d05b9dcb6e93b6063df40d95c49ad90c19d1f0e"}, ] +s3transfer = [ + {file = "s3transfer-0.5.1-py3-none-any.whl", hash = "sha256:25c140f5c66aa79e1ac60be50dcd45ddc59e83895f062a3aab263b870102911f"}, + {file = "s3transfer-0.5.1.tar.gz", hash = "sha256:69d264d3e760e569b78aaa0f22c97e955891cd22e32b10c51f784eeda4d9d10a"}, +] six = [ {file = "six-1.16.0-py2.py3-none-any.whl", hash = "sha256:8abb2f1d86890a2dfb989f9a77cfcfd3e47c2a354b01111771326f8aa26e0254"}, {file = "six-1.16.0.tar.gz", hash = "sha256:1e61c37477a1626458e36f7b1d82aa5c9b094fa4802892072e49de9c60c4c926"}, diff --git a/pyproject.toml b/pyproject.toml index d616834..9b42e0f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,6 +14,7 @@ psycopg2 = "^2.9.1" aiohttp = "^3.8.0" dramatiq = {extras = ["redis"], version = "^1.12.1"} pycoingecko = "^2.2.0" +boto3 = "^1.20.48" [tool.poetry.dev-dependencies] pre-commit = "^2.13.0" @@ -40,6 +41,7 @@ enqueue-many-blocks = 'cli:enqueue_many_blocks_command' fetch-block = 'cli:fetch_block_command' fetch-all-prices = 'cli:fetch_all_prices' fetch-range = 'cli:fetch_range' +s3-export = 'cli:s3_export' [tool.black] exclude = '''