From b862bddfe953f9ef8f6fd70090c6e2e148a4222f Mon Sep 17 00:00:00 2001 From: Luke Van Seters Date: Thu, 30 Dec 2021 22:20:25 -0500 Subject: [PATCH] Add worker deployment --- k8s/mev-inspect-workers/.helmignore | 23 ++++ k8s/mev-inspect-workers/Chart.yaml | 24 ++++ .../templates/_helpers.tpl | 62 +++++++++++ .../templates/deployment.yaml | 104 ++++++++++++++++++ k8s/mev-inspect-workers/values.yaml | 44 ++++++++ k8s/mev-inspect/templates/deployment.yaml | 5 + mev_inspect/db.py | 26 +++-- poetry.lock | 80 +++++++++++++- pyproject.toml | 1 + worker.py | 68 ++++++++++++ 10 files changed, 428 insertions(+), 9 deletions(-) create mode 100644 k8s/mev-inspect-workers/.helmignore create mode 100644 k8s/mev-inspect-workers/Chart.yaml create mode 100644 k8s/mev-inspect-workers/templates/_helpers.tpl create mode 100644 k8s/mev-inspect-workers/templates/deployment.yaml create mode 100644 k8s/mev-inspect-workers/values.yaml create mode 100644 worker.py diff --git a/k8s/mev-inspect-workers/.helmignore b/k8s/mev-inspect-workers/.helmignore new file mode 100644 index 0000000..0e8a0eb --- /dev/null +++ b/k8s/mev-inspect-workers/.helmignore @@ -0,0 +1,23 @@ +# Patterns to ignore when building packages. +# This supports shell glob matching, relative path matching, and +# negation (prefixed with !). Only one pattern per line. +.DS_Store +# Common VCS dirs +.git/ +.gitignore +.bzr/ +.bzrignore +.hg/ +.hgignore +.svn/ +# Common backup files +*.swp +*.bak +*.tmp +*.orig +*~ +# Various IDEs +.project +.idea/ +*.tmproj +.vscode/ diff --git a/k8s/mev-inspect-workers/Chart.yaml b/k8s/mev-inspect-workers/Chart.yaml new file mode 100644 index 0000000..c664fc2 --- /dev/null +++ b/k8s/mev-inspect-workers/Chart.yaml @@ -0,0 +1,24 @@ +apiVersion: v2 +name: mev-inspect-workers +description: A Helm chart for Kubernetes + +# A chart can be either an 'application' or a 'library' chart. +# +# Application charts are a collection of templates that can be packaged into versioned archives +# to be deployed. +# +# Library charts provide useful utilities or functions for the chart developer. They're included as +# a dependency of application charts to inject those utilities and functions into the rendering +# pipeline. Library charts do not define any templates and therefore cannot be deployed. +type: application + +# This is the chart version. This version number should be incremented each time you make changes +# to the chart and its templates, including the app version. +# Versions are expected to follow Semantic Versioning (https://semver.org/) +version: 0.1.0 + +# This is the version number of the application being deployed. This version number should be +# incremented each time you make changes to the application. Versions are not expected to +# follow Semantic Versioning. They should reflect the version the application is using. +# It is recommended to use it with quotes. +appVersion: "1.16.0" diff --git a/k8s/mev-inspect-workers/templates/_helpers.tpl b/k8s/mev-inspect-workers/templates/_helpers.tpl new file mode 100644 index 0000000..7c90136 --- /dev/null +++ b/k8s/mev-inspect-workers/templates/_helpers.tpl @@ -0,0 +1,62 @@ +{{/* +Expand the name of the chart. +*/}} +{{- define "mev-inspect-worker.name" -}} +{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" }} +{{- end }} + +{{/* +Create a default fully qualified app name. +We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec). +If release name contains chart name it will be used as a full name. +*/}} +{{- define "mev-inspect-worker.fullname" -}} +{{- if .Values.fullnameOverride }} +{{- .Values.fullnameOverride | trunc 63 | trimSuffix "-" }} +{{- else }} +{{- $name := default .Chart.Name .Values.nameOverride }} +{{- if contains $name .Release.Name }} +{{- .Release.Name | trunc 63 | trimSuffix "-" }} +{{- else }} +{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" }} +{{- end }} +{{- end }} +{{- end }} + +{{/* +Create chart name and version as used by the chart label. +*/}} +{{- define "mev-inspect-worker.chart" -}} +{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" }} +{{- end }} + +{{/* +Common labels +*/}} +{{- define "mev-inspect-worker.labels" -}} +helm.sh/chart: {{ include "mev-inspect-worker.chart" . }} +{{ include "mev-inspect-worker.selectorLabels" . }} +{{- if .Chart.AppVersion }} +app.kubernetes.io/version: {{ .Chart.AppVersion | quote }} +{{- end }} +app.kubernetes.io/managed-by: {{ .Release.Service }} +{{- end }} + +{{/* +Selector labels +*/}} +{{- define "mev-inspect-worker.selectorLabels" -}} +app.kubernetes.io/name: {{ include "mev-inspect-worker.name" . }} +app.kubernetes.io/instance: {{ .Release.Name }} +{{- end }} + +{{/* +Create the name of the service account to use +*/}} +{{- define "mev-inspect-worker.serviceAccountName" -}} +{{- if .Values.serviceAccount.create }} +{{- default (include "mev-inspect-worker.fullname" .) .Values.serviceAccount.name }} +{{- else }} +{{- default "default" .Values.serviceAccount.name }} +{{- end }} +{{- end }} diff --git a/k8s/mev-inspect-workers/templates/deployment.yaml b/k8s/mev-inspect-workers/templates/deployment.yaml new file mode 100644 index 0000000..92e89b0 --- /dev/null +++ b/k8s/mev-inspect-workers/templates/deployment.yaml @@ -0,0 +1,104 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ include "mev-inspect-worker.fullname" . }} + labels: + {{- include "mev-inspect-worker.labels" . | nindent 4 }} +spec: + replicas: {{ .Values.replicaCount }} + selector: + matchLabels: + {{- include "mev-inspect-worker.selectorLabels" . | nindent 6 }} + template: + metadata: + {{- with .Values.podAnnotations }} + annotations: + {{- toYaml . | nindent 8 }} + {{- end }} + labels: + {{- include "mev-inspect-worker.selectorLabels" . | nindent 8 }} + spec: + {{- with .Values.imagePullSecrets }} + imagePullSecrets: + {{- toYaml . | nindent 8 }} + {{- end }} + securityContext: + {{- toYaml .Values.podSecurityContext | nindent 8 }} + containers: + - name: {{ .Chart.Name }} + securityContext: + {{- toYaml .Values.securityContext | nindent 12 }} + image: "{{ .Values.image.repository }}" + imagePullPolicy: {{ .Values.image.pullPolicy }} + args: ["run", "dramatiq", "worker", "--threads=1", "--processes=1"] + livenessProbe: + exec: + command: + - ls + - / + initialDelaySeconds: 20 + periodSeconds: 5 + resources: + {{- toYaml .Values.resources | nindent 12 }} + env: + - name: POSTGRES_HOST + valueFrom: + secretKeyRef: + name: mev-inspect-db-credentials + key: host + - name: POSTGRES_USER + valueFrom: + secretKeyRef: + name: mev-inspect-db-credentials + key: username + - name: POSTGRES_PASSWORD + valueFrom: + secretKeyRef: + name: mev-inspect-db-credentials + key: password + - name: REDIS_PASSWORD + valueFrom: + secretKeyRef: + name: redis + key: redis-password + - name: TRACE_DB_HOST + valueFrom: + secretKeyRef: + name: trace-db-credentials + key: host + optional: true + - name: TRACE_DB_USER + valueFrom: + secretKeyRef: + name: trace-db-credentials + key: username + optional: true + - name: TRACE_DB_PASSWORD + valueFrom: + secretKeyRef: + name: trace-db-credentials + key: password + optional: true + - name: RPC_URL + valueFrom: + configMapKeyRef: + name: mev-inspect-rpc + key: url + - name: LISTENER_HEALTHCHECK_URL + valueFrom: + configMapKeyRef: + name: mev-inspect-listener-healthcheck + key: url + optional: true + {{- with .Values.nodeSelector }} + nodeSelector: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with .Values.affinity }} + affinity: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with .Values.tolerations }} + tolerations: + {{- toYaml . | nindent 8 }} + {{- end }} diff --git a/k8s/mev-inspect-workers/values.yaml b/k8s/mev-inspect-workers/values.yaml new file mode 100644 index 0000000..432ba43 --- /dev/null +++ b/k8s/mev-inspect-workers/values.yaml @@ -0,0 +1,44 @@ +# Default values for mev-inspect-workers +# This is a YAML-formatted file. +# Declare variables to be passed into your templates. + +replicaCount: 2 + +image: + repository: mev-inspect-py:latest + pullPolicy: IfNotPresent + +imagePullSecrets: [] +nameOverride: "" +fullnameOverride: "" + +podAnnotations: {} + +podSecurityContext: {} + # fsGroup: 2000 + +securityContext: {} + # capabilities: + # drop: + # - ALL + # readOnlyRootFilesystem: true + # runAsNonRoot: true + # runAsUser: 1000 + +resources: {} + # We usually recommend not to specify default resources and to leave this as a conscious + # choice for the user. This also increases chances charts run on environments with little + # resources, such as Minikube. If you do want to specify resources, uncomment the following + # lines, adjust them as necessary, and remove the curly braces after 'resources:'. + # limits: + # cpu: 100m + # memory: 128Mi + # requests: + # cpu: 100m + # memory: 128Mi + +nodeSelector: {} + +tolerations: [] + +affinity: {} diff --git a/k8s/mev-inspect/templates/deployment.yaml b/k8s/mev-inspect/templates/deployment.yaml index ec2045c..21af65a 100644 --- a/k8s/mev-inspect/templates/deployment.yaml +++ b/k8s/mev-inspect/templates/deployment.yaml @@ -56,6 +56,11 @@ spec: secretKeyRef: name: mev-inspect-db-credentials key: password + - name: REDIS_PASSWORD + valueFrom: + secretKeyRef: + name: redis + key: redis-password - name: TRACE_DB_HOST valueFrom: secretKeyRef: diff --git a/mev_inspect/db.py b/mev_inspect/db.py index 4d87b49..15ccdc3 100644 --- a/mev_inspect/db.py +++ b/mev_inspect/db.py @@ -34,20 +34,32 @@ def _get_engine(uri: str): ) -def _get_session(uri: str): - Session = sessionmaker(bind=_get_engine(uri)) - return Session() +def _get_sessionmaker(uri: str): + return sessionmaker(bind=_get_engine(uri)) -def get_inspect_session() -> orm.Session: +def get_inspect_sessionmaker(): uri = get_inspect_database_uri() - return _get_session(uri) + return _get_sessionmaker(uri) -def get_trace_session() -> Optional[orm.Session]: +def get_trace_sessionmaker(): uri = get_trace_database_uri() if uri is not None: - return _get_session(uri) + return _get_sessionmaker(uri) + + return None + + +def get_inspect_session() -> orm.Session: + Session = get_inspect_sessionmaker() + return Session() + + +def get_trace_session() -> Optional[orm.Session]: + Session = get_trace_sessionmaker() + if Session is not None: + return Session() return None diff --git a/poetry.lock b/poetry.lock index ed5b75f..38c29fd 100644 --- a/poetry.lock +++ b/poetry.lock @@ -209,6 +209,20 @@ toolz = ">=0.8.0" [package.extras] cython = ["cython"] +[[package]] +name = "deprecated" +version = "1.2.13" +description = "Python @deprecated decorator to deprecate old python classes, functions or methods." +category = "main" +optional = false +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" + +[package.dependencies] +wrapt = ">=1.10,<2" + +[package.extras] +dev = ["tox", "bump2version (<1)", "sphinx (<2)", "importlib-metadata (<3)", "importlib-resources (<4)", "configparser (<5)", "sphinxcontrib-websupport (<2)", "zipp (<2)", "PyTest (<5)", "PyTest-Cov (<2.6)", "pytest", "pytest-cov"] + [[package]] name = "distlib" version = "0.3.2" @@ -217,6 +231,27 @@ category = "dev" optional = false python-versions = "*" +[[package]] +name = "dramatiq" +version = "1.12.1" +description = "Background Processing for Python 3." +category = "main" +optional = false +python-versions = ">=3.6" + +[package.dependencies] +prometheus-client = ">=0.2" +redis = {version = ">=2.0,<5.0", optional = true, markers = "extra == \"redis\""} + +[package.extras] +all = ["redis (>=2.0,<5.0)", "pika (>=1.0,<2.0)", "watchdog", "pylibmc (>=1.5,<2.0)", "watchdog-gevent", "gevent (>=1.1)"] +dev = ["redis (>=2.0,<5.0)", "pika (>=1.0,<2.0)", "watchdog", "pylibmc (>=1.5,<2.0)", "watchdog-gevent", "gevent (>=1.1)", "alabaster", "sphinx (<1.8)", "sphinxcontrib-napoleon", "flake8", "flake8-bugbear", "flake8-quotes", "isort", "bumpversion", "hiredis", "twine", "wheel", "pytest", "pytest-benchmark", "pytest-cov", "tox"] +gevent = ["gevent (>=1.1)"] +memcached = ["pylibmc (>=1.5,<2.0)"] +rabbitmq = ["pika (>=1.0,<2.0)"] +redis = ["redis (>=2.0,<5.0)"] +watch = ["watchdog", "watchdog-gevent"] + [[package]] name = "eth-abi" version = "2.1.1" @@ -657,6 +692,17 @@ pyyaml = ">=5.1" toml = "*" virtualenv = ">=20.0.8" +[[package]] +name = "prometheus-client" +version = "0.12.0" +description = "Python client for the Prometheus monitoring system." +category = "main" +optional = false +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" + +[package.extras] +twisted = ["twisted"] + [[package]] name = "protobuf" version = "3.17.3" @@ -840,6 +886,20 @@ category = "dev" optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, !=3.5.*" +[[package]] +name = "redis" +version = "4.0.2" +description = "Python client for Redis database and key-value store" +category = "main" +optional = false +python-versions = ">=3.6" + +[package.dependencies] +deprecated = "*" + +[package.extras] +hiredis = ["hiredis (>=1.0.0)"] + [[package]] name = "regex" version = "2021.10.8" @@ -1037,7 +1097,7 @@ python-versions = ">=3.6.1" name = "wrapt" version = "1.12.1" description = "Module for decorators, wrappers and monkey patching." -category = "dev" +category = "main" optional = false python-versions = "*" @@ -1056,7 +1116,7 @@ multidict = ">=4.0" [metadata] lock-version = "1.1" python-versions = "^3.9" -content-hash = "0aa43e887fe106d4142d68b7a891ba94f2de28df9df0ed765d285b1e5ccee391" +content-hash = "2ce3bdeb2d8bd31210026e5054a54c67fc766cdf22dc83485eca425643cdf760" [metadata.files] aiohttp = [ @@ -1272,10 +1332,18 @@ cytoolz = [ {file = "cytoolz-0.11.0-cp39-cp39-win_amd64.whl", hash = "sha256:b61f23e9fa7cd5a87a503ab659f816858e2235926cd95b0c7e37403530d4a2d6"}, {file = "cytoolz-0.11.0.tar.gz", hash = "sha256:c64f3590c3eb40e1548f0d3c6b2ccde70493d0b8dc6cc7f9f3fec0bb3dcd4222"}, ] +deprecated = [ + {file = "Deprecated-1.2.13-py2.py3-none-any.whl", hash = "sha256:64756e3e14c8c5eea9795d93c524551432a0be75629f8f29e67ab8caf076c76d"}, + {file = "Deprecated-1.2.13.tar.gz", hash = "sha256:43ac5335da90c31c24ba028af536a91d41d53f9e6901ddb021bcc572ce44e38d"}, +] distlib = [ {file = "distlib-0.3.2-py2.py3-none-any.whl", hash = "sha256:23e223426b28491b1ced97dc3bbe183027419dfc7982b4fa2f05d5f3ff10711c"}, {file = "distlib-0.3.2.zip", hash = "sha256:106fef6dc37dd8c0e2c0a60d3fca3e77460a48907f335fa28420463a6f799736"}, ] +dramatiq = [ + {file = "dramatiq-1.12.1-py3-none-any.whl", hash = "sha256:caf8f5baed6cb4afaf73b8379ffcd07f483de990b0f93f05d336d4efdcdfdecf"}, + {file = "dramatiq-1.12.1.tar.gz", hash = "sha256:0aabb8e9164a7b88b3799319bbe294f9823eaf8b9fa9f701dd45affc8ea57bbe"}, +] eth-abi = [ {file = "eth_abi-2.1.1-py3-none-any.whl", hash = "sha256:78df5d2758247a8f0766a7cfcea4575bcfe568c34a33e6d05a72c328a9040444"}, {file = "eth_abi-2.1.1.tar.gz", hash = "sha256:4bb1d87bb6605823379b07f6c02c8af45df01a27cc85bd6abb7cf1446ce7d188"}, @@ -1659,6 +1727,10 @@ pre-commit = [ {file = "pre_commit-2.14.0-py2.py3-none-any.whl", hash = "sha256:ec3045ae62e1aa2eecfb8e86fa3025c2e3698f77394ef8d2011ce0aedd85b2d4"}, {file = "pre_commit-2.14.0.tar.gz", hash = "sha256:2386eeb4cf6633712c7cc9ede83684d53c8cafca6b59f79c738098b51c6d206c"}, ] +prometheus-client = [ + {file = "prometheus_client-0.12.0-py2.py3-none-any.whl", hash = "sha256:317453ebabff0a1b02df7f708efbab21e3489e7072b61cb6957230dd004a0af0"}, + {file = "prometheus_client-0.12.0.tar.gz", hash = "sha256:1b12ba48cee33b9b0b9de64a1047cbd3c5f2d0ab6ebcead7ddda613a750ec3c5"}, +] protobuf = [ {file = "protobuf-3.17.3-cp27-cp27m-macosx_10_9_x86_64.whl", hash = "sha256:ab6bb0e270c6c58e7ff4345b3a803cc59dbee19ddf77a4719c5b635f1d547aa8"}, {file = "protobuf-3.17.3-cp27-cp27mu-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:13ee7be3c2d9a5d2b42a1030976f760f28755fcf5863c55b1460fd205e6cd637"}, @@ -1861,6 +1933,10 @@ pyyaml = [ {file = "PyYAML-5.4.1-cp39-cp39-win_amd64.whl", hash = "sha256:c20cfa2d49991c8b4147af39859b167664f2ad4561704ee74c1de03318e898db"}, {file = "PyYAML-5.4.1.tar.gz", hash = "sha256:607774cbba28732bfa802b54baa7484215f530991055bb562efbed5b2f20a45e"}, ] +redis = [ + {file = "redis-4.0.2-py3-none-any.whl", hash = "sha256:c8481cf414474e3497ec7971a1ba9b998c8efad0f0d289a009a5bbef040894f9"}, + {file = "redis-4.0.2.tar.gz", hash = "sha256:ccf692811f2c1fc7a92b466aa2599e4a6d2d73d5f736a2c70be600657c0da34a"}, +] regex = [ {file = "regex-2021.10.8-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:094a905e87a4171508c2a0e10217795f83c636ccc05ddf86e7272c26e14056ae"}, {file = "regex-2021.10.8-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:981c786293a3115bc14c103086ae54e5ee50ca57f4c02ce7cf1b60318d1e8072"}, diff --git a/pyproject.toml b/pyproject.toml index 2161aad..f82cfc3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,6 +12,7 @@ hexbytes = "^0.2.1" click = "^8.0.1" psycopg2 = "^2.9.1" aiohttp = "^3.8.0" +dramatiq = {extras = ["redis"], version = "^1.12.1"} [tool.poetry.dev-dependencies] pre-commit = "^2.13.0" diff --git a/worker.py b/worker.py new file mode 100644 index 0000000..29c64d2 --- /dev/null +++ b/worker.py @@ -0,0 +1,68 @@ +import asyncio +import os +from contextlib import contextmanager + +import dramatiq +from dramatiq.brokers.redis import RedisBroker +from dramatiq.cli import main as dramatiq_worker +from dramatiq.middleware import Middleware + +from mev_inspect.db import get_inspect_sessionmaker, get_trace_sessionmaker +from mev_inspect.inspector import MEVInspector + +InspectSession = get_inspect_sessionmaker() +TraceSession = get_trace_sessionmaker() + + +class AsyncMiddleware(Middleware): + def before_process_message( + self, _broker, message + ): # pylint: disable=unused-argument + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + + def after_process_message( + self, _broker, message, *, result=None, exception=None + ): # pylint: disable=unused-argument + self.loop.close() + + +rpc = os.environ["RPC_URL"] +broker = RedisBroker(host="redis-master", password=os.environ["REDIS_PASSWORD"]) +broker.add_middleware(AsyncMiddleware()) +dramatiq.set_broker(broker) + + +@contextmanager +def session_scope(Session=None): + if Session is None: + return None + + with Session() as session: + yield session + + +@dramatiq.actor +def inspect_many_blocks_task( + after_block: int, + before_block: int, +): + with session_scope(InspectSession) as inspect_db_session: + with session_scope(TraceSession) as trace_db_session: + inspector = MEVInspector( + rpc, + inspect_db_session, + trace_db_session, + max_concurrency=5, + request_timeout=300, + ) + + asyncio.run( + inspector.inspect_many_blocks( + after_block=after_block, before_block=before_block + ) + ) + + +if __name__ == "__main__": + dramatiq_worker(processes=1, threads=1)