From 4662a1ecbc0180a74243f32817967d28c2be1b36 Mon Sep 17 00:00:00 2001 From: Luke Van Seters Date: Fri, 31 Dec 2021 15:50:07 -0500 Subject: [PATCH 01/12] Pass DB sessions into inspector --- cli.py | 25 +++++++++++++++++-------- listener.py | 10 ++++++++-- mev_inspect/inspector.py | 34 ++++++++++++++++++++++------------ 3 files changed, 47 insertions(+), 22 deletions(-) diff --git a/cli.py b/cli.py index 2a78b75..74cbbf9 100644 --- a/cli.py +++ b/cli.py @@ -29,8 +29,13 @@ async def inspect_block_command(block_number: int, rpc: str): inspect_db_session = get_inspect_session() trace_db_session = get_trace_session() - inspector = MEVInspector(rpc, inspect_db_session, trace_db_session) - await inspector.inspect_single_block(block=block_number) + inspector = MEVInspector(rpc) + + await inspector.inspect_single_block( + inspect_db_session=inspect_db_session, + trace_db_session=trace_db_session, + block=block_number, + ) @cli.command() @@ -38,11 +43,14 @@ async def inspect_block_command(block_number: int, rpc: str): @click.option("--rpc", default=lambda: os.environ.get(RPC_URL_ENV, "")) @coro async def fetch_block_command(block_number: int, rpc: str): - inspect_db_session = get_inspect_session() trace_db_session = get_trace_session() - inspector = MEVInspector(rpc, inspect_db_session, trace_db_session) - block = await inspector.create_from_block(block_number=block_number) + inspector = MEVInspector(rpc) + block = await inspector.create_from_block( + block_number=block_number, + trace_db_session=trace_db_session, + ) + print(block.json()) @@ -72,13 +80,14 @@ async def inspect_many_blocks_command( inspector = MEVInspector( rpc, - inspect_db_session, - trace_db_session, max_concurrency=max_concurrency, request_timeout=request_timeout, ) await inspector.inspect_many_blocks( - after_block=after_block, before_block=before_block + inspect_db_session=inspect_db_session, + trace_db_session=trace_db_session, + after_block=after_block, + before_block=before_block, ) diff --git a/listener.py b/listener.py index 0965b1b..4323bf4 100644 --- a/listener.py +++ b/listener.py @@ -37,13 +37,14 @@ async def run(): inspect_db_session = get_inspect_session() trace_db_session = get_trace_session() - inspector = MEVInspector(rpc, inspect_db_session, trace_db_session) + inspector = MEVInspector(rpc) base_provider = get_base_provider(rpc) while not killer.kill_now: await inspect_next_block( inspector, inspect_db_session, + trace_db_session, base_provider, healthcheck_url, ) @@ -54,6 +55,7 @@ async def run(): async def inspect_next_block( inspector: MEVInspector, inspect_db_session, + trace_db_session, base_provider, healthcheck_url, ): @@ -72,7 +74,11 @@ async def inspect_next_block( logger.info(f"Writing block: {block_number}") - await inspector.inspect_single_block(block=block_number) + await inspector.inspect_single_block( + inspect_db_session=inspect_db_session, + trace_db_session=trace_db_session, + block=block_number, + ) update_latest_block(inspect_db_session, block_number) if healthcheck_url: diff --git a/mev_inspect/inspector.py b/mev_inspect/inspector.py index 4327d84..7878527 100644 --- a/mev_inspect/inspector.py +++ b/mev_inspect/inspector.py @@ -27,38 +27,44 @@ class MEVInspector: def __init__( self, rpc: str, - inspect_db_session: orm.Session, - trace_db_session: Optional[orm.Session], max_concurrency: int = 1, request_timeout: int = 300, ): - self.inspect_db_session = inspect_db_session - self.trace_db_session = trace_db_session - base_provider = get_base_provider(rpc, request_timeout=request_timeout) self.w3 = Web3(base_provider, modules={"eth": (AsyncEth,)}, middlewares=[]) self.trace_classifier = TraceClassifier() self.max_concurrency = asyncio.Semaphore(max_concurrency) - async def create_from_block(self, block_number: int): + async def create_from_block( + self, + trace_db_session: Optional[orm.Session], + block_number: int, + ): return await create_from_block_number( w3=self.w3, block_number=block_number, - trace_db_session=self.trace_db_session, + trace_db_session=trace_db_session, ) - async def inspect_single_block(self, block: int): + async def inspect_single_block( + self, + inspect_db_session: orm.Session, + block: int, + trace_db_session: Optional[orm.Session], + ): return await inspect_block( - self.inspect_db_session, + inspect_db_session, self.w3, self.trace_classifier, block, - trace_db_session=self.trace_db_session, + trace_db_session=trace_db_session, ) async def inspect_many_blocks( self, + inspect_db_session: orm.Session, + trace_db_session: Optional[orm.Session], after_block: int, before_block: int, block_batch_size: int = 10, @@ -71,6 +77,8 @@ class MEVInspector: tasks.append( asyncio.ensure_future( self.safe_inspect_many_blocks( + inspect_db_session, + trace_db_session, after_block_number=batch_after_block, before_block_number=batch_before_block, ) @@ -88,15 +96,17 @@ class MEVInspector: async def safe_inspect_many_blocks( self, + inspect_db_session: orm.Session, + trace_db_session: Optional[orm.Session], after_block_number: int, before_block_number: int, ): async with self.max_concurrency: return await inspect_many_blocks( - self.inspect_db_session, + inspect_db_session, self.w3, self.trace_classifier, after_block_number, before_block_number, - trace_db_session=self.trace_db_session, + trace_db_session=trace_db_session, ) From 476db2500316fbbe798928794ac6f0c1bffbfc77 Mon Sep 17 00:00:00 2001 From: Luke Van Seters Date: Thu, 30 Dec 2021 22:19:46 -0500 Subject: [PATCH 02/12] Add redis --- Tiltfile | 16 +++++++++++++++- mev | 21 ++++++++++++++++++--- 2 files changed, 33 insertions(+), 4 deletions(-) diff --git a/Tiltfile b/Tiltfile index afc0d92..0a59bb1 100644 --- a/Tiltfile +++ b/Tiltfile @@ -8,6 +8,11 @@ helm_remote("postgresql", set=["postgresqlPassword=password", "postgresqlDatabase=mev_inspect"], ) +helm_remote("redis", + repo_name="bitnami", + repo_url="https://charts.bitnami.com/bitnami", +) + k8s_yaml(configmap_from_dict("mev-inspect-rpc", inputs = { "url" : os.environ["RPC_URL"], })) @@ -37,7 +42,16 @@ docker_build("mev-inspect-py", ".", ], ) k8s_yaml(helm('./k8s/mev-inspect', name='mev-inspect')) -k8s_resource(workload="mev-inspect", resource_deps=["postgresql-postgresql"]) +k8s_resource( + workload="mev-inspect", + resource_deps=["postgresql-postgresql", "redis-master"], +) + +k8s_yaml(helm('./k8s/mev-inspect-workers', name='mev-inspect-workers')) +k8s_resource( + workload="mev-inspect-workers", + resource_deps=["postgresql-postgresql", "redis-master"], +) # uncomment to enable price monitor # k8s_yaml(helm('./k8s/mev-inspect-prices', name='mev-inspect-prices')) diff --git a/mev b/mev index f2bb34e..17875d2 100755 --- a/mev +++ b/mev @@ -4,14 +4,18 @@ set -e DB_NAME=mev_inspect +function get_kube_secret(){ + kubectl get secrets $1 -o jsonpath="{.data.$2}" | base64 --decode +} + function get_kube_db_secret(){ kubectl get secrets mev-inspect-db-credentials -o jsonpath="{.data.$1}" | base64 --decode } function db(){ - host=$(get_kube_db_secret "host") - username=$(get_kube_db_secret "username") - password=$(get_kube_db_secret "password") + host=$(get_kube_secret "mev-inspect-db-credentials" "host") + username=$(get_kube_secret "mev-inspect-db-credentials" "username") + password=$(get_kube_secret "mev-inspect-db-credentials" "password") kubectl run -i --rm --tty postgres-client-$RANDOM \ --env="PGPASSWORD=$password" \ @@ -24,6 +28,17 @@ case "$1" in echo "Connecting to $DB_NAME" db ;; + redis) + echo "Connecting to redis" + echo "To continue enter 'shift + r'" + redis_password=$(get_kube_secret "redis" "redis-password") + kubectl run -i --rm --tty \ + --namespace default redis-client-$RANDOM \ + --restart='Never' \ + --env REDIS_PASSWORD=$redis_password \ + --image docker.io/bitnami/redis:6.2.6-debian-10-r0 \ + --command -- redis-cli -h redis-master -a $redis_password + ;; listener) kubectl exec -ti deploy/mev-inspect -- ./listener $2 ;; From b862bddfe953f9ef8f6fd70090c6e2e148a4222f Mon Sep 17 00:00:00 2001 From: Luke Van Seters Date: Thu, 30 Dec 2021 22:20:25 -0500 Subject: [PATCH 03/12] Add worker deployment --- k8s/mev-inspect-workers/.helmignore | 23 ++++ k8s/mev-inspect-workers/Chart.yaml | 24 ++++ .../templates/_helpers.tpl | 62 +++++++++++ .../templates/deployment.yaml | 104 ++++++++++++++++++ k8s/mev-inspect-workers/values.yaml | 44 ++++++++ k8s/mev-inspect/templates/deployment.yaml | 5 + mev_inspect/db.py | 26 +++-- poetry.lock | 80 +++++++++++++- pyproject.toml | 1 + worker.py | 68 ++++++++++++ 10 files changed, 428 insertions(+), 9 deletions(-) create mode 100644 k8s/mev-inspect-workers/.helmignore create mode 100644 k8s/mev-inspect-workers/Chart.yaml create mode 100644 k8s/mev-inspect-workers/templates/_helpers.tpl create mode 100644 k8s/mev-inspect-workers/templates/deployment.yaml create mode 100644 k8s/mev-inspect-workers/values.yaml create mode 100644 worker.py diff --git a/k8s/mev-inspect-workers/.helmignore b/k8s/mev-inspect-workers/.helmignore new file mode 100644 index 0000000..0e8a0eb --- /dev/null +++ b/k8s/mev-inspect-workers/.helmignore @@ -0,0 +1,23 @@ +# Patterns to ignore when building packages. +# This supports shell glob matching, relative path matching, and +# negation (prefixed with !). Only one pattern per line. +.DS_Store +# Common VCS dirs +.git/ +.gitignore +.bzr/ +.bzrignore +.hg/ +.hgignore +.svn/ +# Common backup files +*.swp +*.bak +*.tmp +*.orig +*~ +# Various IDEs +.project +.idea/ +*.tmproj +.vscode/ diff --git a/k8s/mev-inspect-workers/Chart.yaml b/k8s/mev-inspect-workers/Chart.yaml new file mode 100644 index 0000000..c664fc2 --- /dev/null +++ b/k8s/mev-inspect-workers/Chart.yaml @@ -0,0 +1,24 @@ +apiVersion: v2 +name: mev-inspect-workers +description: A Helm chart for Kubernetes + +# A chart can be either an 'application' or a 'library' chart. +# +# Application charts are a collection of templates that can be packaged into versioned archives +# to be deployed. +# +# Library charts provide useful utilities or functions for the chart developer. They're included as +# a dependency of application charts to inject those utilities and functions into the rendering +# pipeline. Library charts do not define any templates and therefore cannot be deployed. +type: application + +# This is the chart version. This version number should be incremented each time you make changes +# to the chart and its templates, including the app version. +# Versions are expected to follow Semantic Versioning (https://semver.org/) +version: 0.1.0 + +# This is the version number of the application being deployed. This version number should be +# incremented each time you make changes to the application. Versions are not expected to +# follow Semantic Versioning. They should reflect the version the application is using. +# It is recommended to use it with quotes. +appVersion: "1.16.0" diff --git a/k8s/mev-inspect-workers/templates/_helpers.tpl b/k8s/mev-inspect-workers/templates/_helpers.tpl new file mode 100644 index 0000000..7c90136 --- /dev/null +++ b/k8s/mev-inspect-workers/templates/_helpers.tpl @@ -0,0 +1,62 @@ +{{/* +Expand the name of the chart. +*/}} +{{- define "mev-inspect-worker.name" -}} +{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" }} +{{- end }} + +{{/* +Create a default fully qualified app name. +We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec). +If release name contains chart name it will be used as a full name. +*/}} +{{- define "mev-inspect-worker.fullname" -}} +{{- if .Values.fullnameOverride }} +{{- .Values.fullnameOverride | trunc 63 | trimSuffix "-" }} +{{- else }} +{{- $name := default .Chart.Name .Values.nameOverride }} +{{- if contains $name .Release.Name }} +{{- .Release.Name | trunc 63 | trimSuffix "-" }} +{{- else }} +{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" }} +{{- end }} +{{- end }} +{{- end }} + +{{/* +Create chart name and version as used by the chart label. +*/}} +{{- define "mev-inspect-worker.chart" -}} +{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" }} +{{- end }} + +{{/* +Common labels +*/}} +{{- define "mev-inspect-worker.labels" -}} +helm.sh/chart: {{ include "mev-inspect-worker.chart" . }} +{{ include "mev-inspect-worker.selectorLabels" . }} +{{- if .Chart.AppVersion }} +app.kubernetes.io/version: {{ .Chart.AppVersion | quote }} +{{- end }} +app.kubernetes.io/managed-by: {{ .Release.Service }} +{{- end }} + +{{/* +Selector labels +*/}} +{{- define "mev-inspect-worker.selectorLabels" -}} +app.kubernetes.io/name: {{ include "mev-inspect-worker.name" . }} +app.kubernetes.io/instance: {{ .Release.Name }} +{{- end }} + +{{/* +Create the name of the service account to use +*/}} +{{- define "mev-inspect-worker.serviceAccountName" -}} +{{- if .Values.serviceAccount.create }} +{{- default (include "mev-inspect-worker.fullname" .) .Values.serviceAccount.name }} +{{- else }} +{{- default "default" .Values.serviceAccount.name }} +{{- end }} +{{- end }} diff --git a/k8s/mev-inspect-workers/templates/deployment.yaml b/k8s/mev-inspect-workers/templates/deployment.yaml new file mode 100644 index 0000000..92e89b0 --- /dev/null +++ b/k8s/mev-inspect-workers/templates/deployment.yaml @@ -0,0 +1,104 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ include "mev-inspect-worker.fullname" . }} + labels: + {{- include "mev-inspect-worker.labels" . | nindent 4 }} +spec: + replicas: {{ .Values.replicaCount }} + selector: + matchLabels: + {{- include "mev-inspect-worker.selectorLabels" . | nindent 6 }} + template: + metadata: + {{- with .Values.podAnnotations }} + annotations: + {{- toYaml . | nindent 8 }} + {{- end }} + labels: + {{- include "mev-inspect-worker.selectorLabels" . | nindent 8 }} + spec: + {{- with .Values.imagePullSecrets }} + imagePullSecrets: + {{- toYaml . | nindent 8 }} + {{- end }} + securityContext: + {{- toYaml .Values.podSecurityContext | nindent 8 }} + containers: + - name: {{ .Chart.Name }} + securityContext: + {{- toYaml .Values.securityContext | nindent 12 }} + image: "{{ .Values.image.repository }}" + imagePullPolicy: {{ .Values.image.pullPolicy }} + args: ["run", "dramatiq", "worker", "--threads=1", "--processes=1"] + livenessProbe: + exec: + command: + - ls + - / + initialDelaySeconds: 20 + periodSeconds: 5 + resources: + {{- toYaml .Values.resources | nindent 12 }} + env: + - name: POSTGRES_HOST + valueFrom: + secretKeyRef: + name: mev-inspect-db-credentials + key: host + - name: POSTGRES_USER + valueFrom: + secretKeyRef: + name: mev-inspect-db-credentials + key: username + - name: POSTGRES_PASSWORD + valueFrom: + secretKeyRef: + name: mev-inspect-db-credentials + key: password + - name: REDIS_PASSWORD + valueFrom: + secretKeyRef: + name: redis + key: redis-password + - name: TRACE_DB_HOST + valueFrom: + secretKeyRef: + name: trace-db-credentials + key: host + optional: true + - name: TRACE_DB_USER + valueFrom: + secretKeyRef: + name: trace-db-credentials + key: username + optional: true + - name: TRACE_DB_PASSWORD + valueFrom: + secretKeyRef: + name: trace-db-credentials + key: password + optional: true + - name: RPC_URL + valueFrom: + configMapKeyRef: + name: mev-inspect-rpc + key: url + - name: LISTENER_HEALTHCHECK_URL + valueFrom: + configMapKeyRef: + name: mev-inspect-listener-healthcheck + key: url + optional: true + {{- with .Values.nodeSelector }} + nodeSelector: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with .Values.affinity }} + affinity: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with .Values.tolerations }} + tolerations: + {{- toYaml . | nindent 8 }} + {{- end }} diff --git a/k8s/mev-inspect-workers/values.yaml b/k8s/mev-inspect-workers/values.yaml new file mode 100644 index 0000000..432ba43 --- /dev/null +++ b/k8s/mev-inspect-workers/values.yaml @@ -0,0 +1,44 @@ +# Default values for mev-inspect-workers +# This is a YAML-formatted file. +# Declare variables to be passed into your templates. + +replicaCount: 2 + +image: + repository: mev-inspect-py:latest + pullPolicy: IfNotPresent + +imagePullSecrets: [] +nameOverride: "" +fullnameOverride: "" + +podAnnotations: {} + +podSecurityContext: {} + # fsGroup: 2000 + +securityContext: {} + # capabilities: + # drop: + # - ALL + # readOnlyRootFilesystem: true + # runAsNonRoot: true + # runAsUser: 1000 + +resources: {} + # We usually recommend not to specify default resources and to leave this as a conscious + # choice for the user. This also increases chances charts run on environments with little + # resources, such as Minikube. If you do want to specify resources, uncomment the following + # lines, adjust them as necessary, and remove the curly braces after 'resources:'. + # limits: + # cpu: 100m + # memory: 128Mi + # requests: + # cpu: 100m + # memory: 128Mi + +nodeSelector: {} + +tolerations: [] + +affinity: {} diff --git a/k8s/mev-inspect/templates/deployment.yaml b/k8s/mev-inspect/templates/deployment.yaml index ec2045c..21af65a 100644 --- a/k8s/mev-inspect/templates/deployment.yaml +++ b/k8s/mev-inspect/templates/deployment.yaml @@ -56,6 +56,11 @@ spec: secretKeyRef: name: mev-inspect-db-credentials key: password + - name: REDIS_PASSWORD + valueFrom: + secretKeyRef: + name: redis + key: redis-password - name: TRACE_DB_HOST valueFrom: secretKeyRef: diff --git a/mev_inspect/db.py b/mev_inspect/db.py index 4d87b49..15ccdc3 100644 --- a/mev_inspect/db.py +++ b/mev_inspect/db.py @@ -34,20 +34,32 @@ def _get_engine(uri: str): ) -def _get_session(uri: str): - Session = sessionmaker(bind=_get_engine(uri)) - return Session() +def _get_sessionmaker(uri: str): + return sessionmaker(bind=_get_engine(uri)) -def get_inspect_session() -> orm.Session: +def get_inspect_sessionmaker(): uri = get_inspect_database_uri() - return _get_session(uri) + return _get_sessionmaker(uri) -def get_trace_session() -> Optional[orm.Session]: +def get_trace_sessionmaker(): uri = get_trace_database_uri() if uri is not None: - return _get_session(uri) + return _get_sessionmaker(uri) + + return None + + +def get_inspect_session() -> orm.Session: + Session = get_inspect_sessionmaker() + return Session() + + +def get_trace_session() -> Optional[orm.Session]: + Session = get_trace_sessionmaker() + if Session is not None: + return Session() return None diff --git a/poetry.lock b/poetry.lock index ed5b75f..38c29fd 100644 --- a/poetry.lock +++ b/poetry.lock @@ -209,6 +209,20 @@ toolz = ">=0.8.0" [package.extras] cython = ["cython"] +[[package]] +name = "deprecated" +version = "1.2.13" +description = "Python @deprecated decorator to deprecate old python classes, functions or methods." +category = "main" +optional = false +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" + +[package.dependencies] +wrapt = ">=1.10,<2" + +[package.extras] +dev = ["tox", "bump2version (<1)", "sphinx (<2)", "importlib-metadata (<3)", "importlib-resources (<4)", "configparser (<5)", "sphinxcontrib-websupport (<2)", "zipp (<2)", "PyTest (<5)", "PyTest-Cov (<2.6)", "pytest", "pytest-cov"] + [[package]] name = "distlib" version = "0.3.2" @@ -217,6 +231,27 @@ category = "dev" optional = false python-versions = "*" +[[package]] +name = "dramatiq" +version = "1.12.1" +description = "Background Processing for Python 3." +category = "main" +optional = false +python-versions = ">=3.6" + +[package.dependencies] +prometheus-client = ">=0.2" +redis = {version = ">=2.0,<5.0", optional = true, markers = "extra == \"redis\""} + +[package.extras] +all = ["redis (>=2.0,<5.0)", "pika (>=1.0,<2.0)", "watchdog", "pylibmc (>=1.5,<2.0)", "watchdog-gevent", "gevent (>=1.1)"] +dev = ["redis (>=2.0,<5.0)", "pika (>=1.0,<2.0)", "watchdog", "pylibmc (>=1.5,<2.0)", "watchdog-gevent", "gevent (>=1.1)", "alabaster", "sphinx (<1.8)", "sphinxcontrib-napoleon", "flake8", "flake8-bugbear", "flake8-quotes", "isort", "bumpversion", "hiredis", "twine", "wheel", "pytest", "pytest-benchmark", "pytest-cov", "tox"] +gevent = ["gevent (>=1.1)"] +memcached = ["pylibmc (>=1.5,<2.0)"] +rabbitmq = ["pika (>=1.0,<2.0)"] +redis = ["redis (>=2.0,<5.0)"] +watch = ["watchdog", "watchdog-gevent"] + [[package]] name = "eth-abi" version = "2.1.1" @@ -657,6 +692,17 @@ pyyaml = ">=5.1" toml = "*" virtualenv = ">=20.0.8" +[[package]] +name = "prometheus-client" +version = "0.12.0" +description = "Python client for the Prometheus monitoring system." +category = "main" +optional = false +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" + +[package.extras] +twisted = ["twisted"] + [[package]] name = "protobuf" version = "3.17.3" @@ -840,6 +886,20 @@ category = "dev" optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, !=3.5.*" +[[package]] +name = "redis" +version = "4.0.2" +description = "Python client for Redis database and key-value store" +category = "main" +optional = false +python-versions = ">=3.6" + +[package.dependencies] +deprecated = "*" + +[package.extras] +hiredis = ["hiredis (>=1.0.0)"] + [[package]] name = "regex" version = "2021.10.8" @@ -1037,7 +1097,7 @@ python-versions = ">=3.6.1" name = "wrapt" version = "1.12.1" description = "Module for decorators, wrappers and monkey patching." -category = "dev" +category = "main" optional = false python-versions = "*" @@ -1056,7 +1116,7 @@ multidict = ">=4.0" [metadata] lock-version = "1.1" python-versions = "^3.9" -content-hash = "0aa43e887fe106d4142d68b7a891ba94f2de28df9df0ed765d285b1e5ccee391" +content-hash = "2ce3bdeb2d8bd31210026e5054a54c67fc766cdf22dc83485eca425643cdf760" [metadata.files] aiohttp = [ @@ -1272,10 +1332,18 @@ cytoolz = [ {file = "cytoolz-0.11.0-cp39-cp39-win_amd64.whl", hash = "sha256:b61f23e9fa7cd5a87a503ab659f816858e2235926cd95b0c7e37403530d4a2d6"}, {file = "cytoolz-0.11.0.tar.gz", hash = "sha256:c64f3590c3eb40e1548f0d3c6b2ccde70493d0b8dc6cc7f9f3fec0bb3dcd4222"}, ] +deprecated = [ + {file = "Deprecated-1.2.13-py2.py3-none-any.whl", hash = "sha256:64756e3e14c8c5eea9795d93c524551432a0be75629f8f29e67ab8caf076c76d"}, + {file = "Deprecated-1.2.13.tar.gz", hash = "sha256:43ac5335da90c31c24ba028af536a91d41d53f9e6901ddb021bcc572ce44e38d"}, +] distlib = [ {file = "distlib-0.3.2-py2.py3-none-any.whl", hash = "sha256:23e223426b28491b1ced97dc3bbe183027419dfc7982b4fa2f05d5f3ff10711c"}, {file = "distlib-0.3.2.zip", hash = "sha256:106fef6dc37dd8c0e2c0a60d3fca3e77460a48907f335fa28420463a6f799736"}, ] +dramatiq = [ + {file = "dramatiq-1.12.1-py3-none-any.whl", hash = "sha256:caf8f5baed6cb4afaf73b8379ffcd07f483de990b0f93f05d336d4efdcdfdecf"}, + {file = "dramatiq-1.12.1.tar.gz", hash = "sha256:0aabb8e9164a7b88b3799319bbe294f9823eaf8b9fa9f701dd45affc8ea57bbe"}, +] eth-abi = [ {file = "eth_abi-2.1.1-py3-none-any.whl", hash = "sha256:78df5d2758247a8f0766a7cfcea4575bcfe568c34a33e6d05a72c328a9040444"}, {file = "eth_abi-2.1.1.tar.gz", hash = "sha256:4bb1d87bb6605823379b07f6c02c8af45df01a27cc85bd6abb7cf1446ce7d188"}, @@ -1659,6 +1727,10 @@ pre-commit = [ {file = "pre_commit-2.14.0-py2.py3-none-any.whl", hash = "sha256:ec3045ae62e1aa2eecfb8e86fa3025c2e3698f77394ef8d2011ce0aedd85b2d4"}, {file = "pre_commit-2.14.0.tar.gz", hash = "sha256:2386eeb4cf6633712c7cc9ede83684d53c8cafca6b59f79c738098b51c6d206c"}, ] +prometheus-client = [ + {file = "prometheus_client-0.12.0-py2.py3-none-any.whl", hash = "sha256:317453ebabff0a1b02df7f708efbab21e3489e7072b61cb6957230dd004a0af0"}, + {file = "prometheus_client-0.12.0.tar.gz", hash = "sha256:1b12ba48cee33b9b0b9de64a1047cbd3c5f2d0ab6ebcead7ddda613a750ec3c5"}, +] protobuf = [ {file = "protobuf-3.17.3-cp27-cp27m-macosx_10_9_x86_64.whl", hash = "sha256:ab6bb0e270c6c58e7ff4345b3a803cc59dbee19ddf77a4719c5b635f1d547aa8"}, {file = "protobuf-3.17.3-cp27-cp27mu-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:13ee7be3c2d9a5d2b42a1030976f760f28755fcf5863c55b1460fd205e6cd637"}, @@ -1861,6 +1933,10 @@ pyyaml = [ {file = "PyYAML-5.4.1-cp39-cp39-win_amd64.whl", hash = "sha256:c20cfa2d49991c8b4147af39859b167664f2ad4561704ee74c1de03318e898db"}, {file = "PyYAML-5.4.1.tar.gz", hash = "sha256:607774cbba28732bfa802b54baa7484215f530991055bb562efbed5b2f20a45e"}, ] +redis = [ + {file = "redis-4.0.2-py3-none-any.whl", hash = "sha256:c8481cf414474e3497ec7971a1ba9b998c8efad0f0d289a009a5bbef040894f9"}, + {file = "redis-4.0.2.tar.gz", hash = "sha256:ccf692811f2c1fc7a92b466aa2599e4a6d2d73d5f736a2c70be600657c0da34a"}, +] regex = [ {file = "regex-2021.10.8-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:094a905e87a4171508c2a0e10217795f83c636ccc05ddf86e7272c26e14056ae"}, {file = "regex-2021.10.8-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:981c786293a3115bc14c103086ae54e5ee50ca57f4c02ce7cf1b60318d1e8072"}, diff --git a/pyproject.toml b/pyproject.toml index 2161aad..f82cfc3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,6 +12,7 @@ hexbytes = "^0.2.1" click = "^8.0.1" psycopg2 = "^2.9.1" aiohttp = "^3.8.0" +dramatiq = {extras = ["redis"], version = "^1.12.1"} [tool.poetry.dev-dependencies] pre-commit = "^2.13.0" diff --git a/worker.py b/worker.py new file mode 100644 index 0000000..29c64d2 --- /dev/null +++ b/worker.py @@ -0,0 +1,68 @@ +import asyncio +import os +from contextlib import contextmanager + +import dramatiq +from dramatiq.brokers.redis import RedisBroker +from dramatiq.cli import main as dramatiq_worker +from dramatiq.middleware import Middleware + +from mev_inspect.db import get_inspect_sessionmaker, get_trace_sessionmaker +from mev_inspect.inspector import MEVInspector + +InspectSession = get_inspect_sessionmaker() +TraceSession = get_trace_sessionmaker() + + +class AsyncMiddleware(Middleware): + def before_process_message( + self, _broker, message + ): # pylint: disable=unused-argument + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + + def after_process_message( + self, _broker, message, *, result=None, exception=None + ): # pylint: disable=unused-argument + self.loop.close() + + +rpc = os.environ["RPC_URL"] +broker = RedisBroker(host="redis-master", password=os.environ["REDIS_PASSWORD"]) +broker.add_middleware(AsyncMiddleware()) +dramatiq.set_broker(broker) + + +@contextmanager +def session_scope(Session=None): + if Session is None: + return None + + with Session() as session: + yield session + + +@dramatiq.actor +def inspect_many_blocks_task( + after_block: int, + before_block: int, +): + with session_scope(InspectSession) as inspect_db_session: + with session_scope(TraceSession) as trace_db_session: + inspector = MEVInspector( + rpc, + inspect_db_session, + trace_db_session, + max_concurrency=5, + request_timeout=300, + ) + + asyncio.run( + inspector.inspect_many_blocks( + after_block=after_block, before_block=before_block + ) + ) + + +if __name__ == "__main__": + dramatiq_worker(processes=1, threads=1) From 815af26f288e1c30cc8cbaf90b2908746bce7b5b Mon Sep 17 00:00:00 2001 From: Luke Van Seters Date: Thu, 30 Dec 2021 23:09:38 -0500 Subject: [PATCH 04/12] Enqueue messages to redis with backfill command --- cli.py | 12 ++++++++++++ mev | 5 ++--- pyproject.toml | 1 + worker.py | 8 ++++---- 4 files changed, 19 insertions(+), 7 deletions(-) diff --git a/cli.py b/cli.py index 74cbbf9..d116d99 100644 --- a/cli.py +++ b/cli.py @@ -3,6 +3,7 @@ import os import sys import click +from worker import inspect_many_blocks_task from mev_inspect.concurrency import coro from mev_inspect.crud.prices import write_prices @@ -91,6 +92,17 @@ async def inspect_many_blocks_command( ) +@cli.command() +@click.argument("after_block", type=int) +@click.argument("before_block", type=int) +@click.argument("batch_size", type=int, default=10) +def enqueue_many_blocks_command(after_block: int, before_block: int, batch_size: int): + for batch_after_block in range(after_block, before_block, batch_size): + batch_before_block = min(batch_after_block + batch_size, before_block) + logger.info(f"Sending {batch_after_block} to {batch_before_block}") + inspect_many_blocks_task.send(batch_after_block, batch_before_block) + + @cli.command() @coro async def fetch_all_prices(): diff --git a/mev b/mev index 17875d2..f753e7e 100755 --- a/mev +++ b/mev @@ -45,10 +45,9 @@ case "$1" in backfill) start_block_number=$2 end_block_number=$3 - n_workers=$4 - echo "Backfilling from $start_block_number to $end_block_number with $n_workers workers" - poetry run python backfill.py $start_block_number $end_block_number $n_workers + echo "Backfilling from $start_block_number to $end_block_number" + kubectl exec -ti deploy/mev-inspect -- poetry run enqueue-many-blocks $start_block_number $end_block_number ;; inspect) block_number=$2 diff --git a/pyproject.toml b/pyproject.toml index f82cfc3..3c4be4d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,6 +35,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry.scripts] inspect-block = 'cli:inspect_block_command' inspect-many-blocks = 'cli:inspect_many_blocks_command' +enqueue-many-blocks = 'cli:enqueue_many_blocks_command' fetch-block = 'cli:fetch_block_command' fetch-all-prices = 'cli:fetch_all_prices' diff --git a/worker.py b/worker.py index 29c64d2..e9969d2 100644 --- a/worker.py +++ b/worker.py @@ -36,10 +36,10 @@ dramatiq.set_broker(broker) @contextmanager def session_scope(Session=None): if Session is None: - return None - - with Session() as session: - yield session + yield None + else: + with Session() as session: + yield session @dramatiq.actor From cff148e21ffdd64e153134e5cc7456fbb5e1697a Mon Sep 17 00:00:00 2001 From: Luke Van Seters Date: Fri, 31 Dec 2021 16:11:18 -0500 Subject: [PATCH 05/12] Log when writing --- mev_inspect/inspect_block.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/mev_inspect/inspect_block.py b/mev_inspect/inspect_block.py index 13041ab..f97de75 100644 --- a/mev_inspect/inspect_block.py +++ b/mev_inspect/inspect_block.py @@ -168,6 +168,7 @@ async def inspect_many_blocks( all_miner_payments.extend(miner_payments) + logger.info("Writing data") delete_blocks(inspect_db_session, after_block_number, before_block_number) write_blocks(inspect_db_session, all_blocks) @@ -224,3 +225,4 @@ async def inspect_many_blocks( inspect_db_session, after_block_number, before_block_number ) write_miner_payments(inspect_db_session, all_miner_payments) + logger.info("Done writing") From cbec5b76137c868b95a9595c6b99327b46610390 Mon Sep 17 00:00:00 2001 From: Luke Van Seters Date: Fri, 31 Dec 2021 16:12:36 -0500 Subject: [PATCH 06/12] Only build inspector once --- worker.py | 31 +++++++++++++++++++++---------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/worker.py b/worker.py index e9969d2..78b1d05 100644 --- a/worker.py +++ b/worker.py @@ -1,5 +1,6 @@ import asyncio import os +import threading from contextlib import contextmanager import dramatiq @@ -13,6 +14,8 @@ from mev_inspect.inspector import MEVInspector InspectSession = get_inspect_sessionmaker() TraceSession = get_trace_sessionmaker() +thread_local = threading.local() + class AsyncMiddleware(Middleware): def before_process_message( @@ -27,9 +30,22 @@ class AsyncMiddleware(Middleware): self.loop.close() +class InspectorMiddleware(Middleware): + def before_process_message( + self, _broker, worker + ): # pylint: disable=unused-argument + if not hasattr(thread_local, "inspector"): + thread_local.inspector = MEVInspector( + rpc, + max_concurrency=5, + request_timeout=300, + ) + + rpc = os.environ["RPC_URL"] broker = RedisBroker(host="redis-master", password=os.environ["REDIS_PASSWORD"]) broker.add_middleware(AsyncMiddleware()) +broker.add_middleware(InspectorMiddleware()) dramatiq.set_broker(broker) @@ -49,17 +65,12 @@ def inspect_many_blocks_task( ): with session_scope(InspectSession) as inspect_db_session: with session_scope(TraceSession) as trace_db_session: - inspector = MEVInspector( - rpc, - inspect_db_session, - trace_db_session, - max_concurrency=5, - request_timeout=300, - ) - asyncio.run( - inspector.inspect_many_blocks( - after_block=after_block, before_block=before_block + thread_local.inspector.inspect_many_blocks( + inspect_db_session=inspect_db_session, + trace_db_session=trace_db_session, + after_block=after_block, + before_block=before_block, ) ) From 01bb566478e87cd8b3dcce5123cb475185d8db6f Mon Sep 17 00:00:00 2001 From: Luke Van Seters Date: Fri, 31 Dec 2021 16:18:05 -0500 Subject: [PATCH 07/12] Drop worker count to 1 locally --- k8s/mev-inspect-workers/values.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/k8s/mev-inspect-workers/values.yaml b/k8s/mev-inspect-workers/values.yaml index 432ba43..f7ead0e 100644 --- a/k8s/mev-inspect-workers/values.yaml +++ b/k8s/mev-inspect-workers/values.yaml @@ -2,7 +2,7 @@ # This is a YAML-formatted file. # Declare variables to be passed into your templates. -replicaCount: 2 +replicaCount: 1 image: repository: mev-inspect-py:latest From 0516fffa9c6015ee684d28ac4ee5371467b1f157 Mon Sep 17 00:00:00 2001 From: Luke Van Seters Date: Fri, 31 Dec 2021 16:18:17 -0500 Subject: [PATCH 08/12] Add some logging --- worker.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/worker.py b/worker.py index 78b1d05..380ba86 100644 --- a/worker.py +++ b/worker.py @@ -1,5 +1,7 @@ import asyncio +import logging import os +import sys import threading from contextlib import contextmanager @@ -15,6 +17,8 @@ InspectSession = get_inspect_sessionmaker() TraceSession = get_trace_sessionmaker() thread_local = threading.local() +logging.basicConfig(stream=sys.stdout, level=logging.INFO) +logger = logging.getLogger(__name__) class AsyncMiddleware(Middleware): @@ -35,11 +39,14 @@ class InspectorMiddleware(Middleware): self, _broker, worker ): # pylint: disable=unused-argument if not hasattr(thread_local, "inspector"): + logger.info("Building inspector") thread_local.inspector = MEVInspector( rpc, max_concurrency=5, request_timeout=300, ) + else: + logger.info("Inspector already exists") rpc = os.environ["RPC_URL"] From f296de5a20c52b17e77b3a8d9be98f27c9bad6fc Mon Sep 17 00:00:00 2001 From: Luke Van Seters Date: Fri, 31 Dec 2021 16:37:27 -0500 Subject: [PATCH 09/12] Update README to reflect new backfill --- README.md | 19 ++++++++++++++++--- Tiltfile | 6 +++++- 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 9b299c3..c514f71 100644 --- a/README.md +++ b/README.md @@ -103,11 +103,24 @@ And stop the listener with: ### Backfilling -For larger backfills, you can inspect many blocks in parallel using kubernetes +For larger backfills, you can inspect many blocks in parallel -To inspect blocks 12914944 to 12915044 divided across 10 worker pods: +To inspect blocks 12914944 to 12915044, run ``` -./mev backfill 12914944 12915044 10 +./mev backfill 12914944 12915044 +``` + +This queues the blocks in Redis to be pulled off by the mev-inspect-worker service + +To increase or decrease parallelism, update the replicaCount value for the mev-inspect-workers helm chart + +Locally, this can be done by editing Tiltfile and changing "replicaCount=1" to your desired parallelism: +``` +k8s_yaml(helm( + './k8s/mev-inspect-workers', + name='mev-inspect-workers', + set=["replicaCount=1"], +)) ``` You can see worker pods spin up then complete by watching the status of all pods diff --git a/Tiltfile b/Tiltfile index 0a59bb1..093e77e 100644 --- a/Tiltfile +++ b/Tiltfile @@ -47,7 +47,11 @@ k8s_resource( resource_deps=["postgresql-postgresql", "redis-master"], ) -k8s_yaml(helm('./k8s/mev-inspect-workers', name='mev-inspect-workers')) +k8s_yaml(helm( + './k8s/mev-inspect-workers', + name='mev-inspect-workers', + set=["replicaCount=1"], +)) k8s_resource( workload="mev-inspect-workers", resource_deps=["postgresql-postgresql", "redis-master"], From 139e45333bb456a60b26ccc6c0561646bbce63a0 Mon Sep 17 00:00:00 2001 From: Luke Van Seters Date: Fri, 31 Dec 2021 16:44:22 -0500 Subject: [PATCH 10/12] Clean up redis pods --- mev | 1 - 1 file changed, 1 deletion(-) diff --git a/mev b/mev index f753e7e..1f56863 100755 --- a/mev +++ b/mev @@ -34,7 +34,6 @@ case "$1" in redis_password=$(get_kube_secret "redis" "redis-password") kubectl run -i --rm --tty \ --namespace default redis-client-$RANDOM \ - --restart='Never' \ --env REDIS_PASSWORD=$redis_password \ --image docker.io/bitnami/redis:6.2.6-debian-10-r0 \ --command -- redis-cli -h redis-master -a $redis_password From 5cad2fef436688a03a7f3f93f49ee8e0ba7853d1 Mon Sep 17 00:00:00 2001 From: Luke Van Seters Date: Fri, 31 Dec 2021 18:00:32 -0500 Subject: [PATCH 11/12] Break redis into a function. Add reference to README for now --- README.md | 7 +++++++ mev | 18 +++++++++++------- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index c514f71..5c2deaa 100644 --- a/README.md +++ b/README.md @@ -128,6 +128,13 @@ You can see worker pods spin up then complete by watching the status of all pods watch kubectl get pods ``` +To see progress and failed batches, connect to Redis with +``` +./mev redis +``` + +then query keys keys and values using the [spec shared by dramatiq](https://github.com/Bogdanp/dramatiq/blob/24cbc0dc551797783f41b08ea461e1b5d23a4058/dramatiq/brokers/redis/dispatch.lua#L24-L43) + To watch the logs for a given pod, take its pod name using the above, then run: ``` kubectl logs -f pod/mev-inspect-backfill-abcdefg diff --git a/mev b/mev index 1f56863..d42e8ee 100755 --- a/mev +++ b/mev @@ -23,6 +23,16 @@ function db(){ -- $DB_NAME --host=$host --user=$username } +function redis(){ + echo "To continue, enter 'shift + r'" + redis_password=$(get_kube_secret "redis" "redis-password") + kubectl run -i --rm --tty \ + --namespace default redis-client-$RANDOM \ + --env REDIS_PASSWORD=$redis_password \ + --image docker.io/bitnami/redis:6.2.6-debian-10-r0 \ + --command -- redis-cli -h redis-master -a $redis_password +} + case "$1" in db) echo "Connecting to $DB_NAME" @@ -30,13 +40,7 @@ case "$1" in ;; redis) echo "Connecting to redis" - echo "To continue enter 'shift + r'" - redis_password=$(get_kube_secret "redis" "redis-password") - kubectl run -i --rm --tty \ - --namespace default redis-client-$RANDOM \ - --env REDIS_PASSWORD=$redis_password \ - --image docker.io/bitnami/redis:6.2.6-debian-10-r0 \ - --command -- redis-cli -h redis-master -a $redis_password + redis ;; listener) kubectl exec -ti deploy/mev-inspect -- ./listener $2 From 0860f4f7f5ec485d49049445eb1a10b03dbf4bce Mon Sep 17 00:00:00 2001 From: Luke Van Seters Date: Fri, 31 Dec 2021 18:08:04 -0500 Subject: [PATCH 12/12] More detail in the README --- README.md | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 5c2deaa..3f0854c 100644 --- a/README.md +++ b/README.md @@ -133,14 +133,30 @@ To see progress and failed batches, connect to Redis with ./mev redis ``` -then query keys keys and values using the [spec shared by dramatiq](https://github.com/Bogdanp/dramatiq/blob/24cbc0dc551797783f41b08ea461e1b5d23a4058/dramatiq/brokers/redis/dispatch.lua#L24-L43) - -To watch the logs for a given pod, take its pod name using the above, then run: +For total messages, query: ``` -kubectl logs -f pod/mev-inspect-backfill-abcdefg +HLEN dramatiq:default.msgs ``` -(where `mev-inspect-backfill-abcdefg` is your actual pod name) +For messages failed and waiting to retry in the delay queue (DQ), query: +``` +HGETALL dramatiq:default.DQ.msgs +``` + +For messages permanently failed in the dead letter queue (XQ), query: +``` +HGETALL dramatiq:default.XQ.msgs +``` + +For more information on queues, see the [spec shared by dramatiq](https://github.com/Bogdanp/dramatiq/blob/24cbc0dc551797783f41b08ea461e1b5d23a4058/dramatiq/brokers/redis/dispatch.lua#L24-L43) + + +To watch the logs for a given worker pod, take its pod name using the above, then run: +``` +kubectl logs -f pod/mev-inspect-worker-abcdefg +``` + +(where `mev-inspect-worker-abcdefg` is your actual pod name) ### Exploring