Add worker deployment

This commit is contained in:
Luke Van Seters 2021-12-30 22:20:25 -05:00
parent 476db25003
commit b862bddfe9
10 changed files with 428 additions and 9 deletions

View File

@ -0,0 +1,23 @@
# Patterns to ignore when building packages.
# This supports shell glob matching, relative path matching, and
# negation (prefixed with !). Only one pattern per line.
.DS_Store
# Common VCS dirs
.git/
.gitignore
.bzr/
.bzrignore
.hg/
.hgignore
.svn/
# Common backup files
*.swp
*.bak
*.tmp
*.orig
*~
# Various IDEs
.project
.idea/
*.tmproj
.vscode/

View File

@ -0,0 +1,24 @@
apiVersion: v2
name: mev-inspect-workers
description: A Helm chart for Kubernetes
# A chart can be either an 'application' or a 'library' chart.
#
# Application charts are a collection of templates that can be packaged into versioned archives
# to be deployed.
#
# Library charts provide useful utilities or functions for the chart developer. They're included as
# a dependency of application charts to inject those utilities and functions into the rendering
# pipeline. Library charts do not define any templates and therefore cannot be deployed.
type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.1.0
# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
appVersion: "1.16.0"

View File

@ -0,0 +1,62 @@
{{/*
Expand the name of the chart.
*/}}
{{- define "mev-inspect-worker.name" -}}
{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" }}
{{- end }}
{{/*
Create a default fully qualified app name.
We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec).
If release name contains chart name it will be used as a full name.
*/}}
{{- define "mev-inspect-worker.fullname" -}}
{{- if .Values.fullnameOverride }}
{{- .Values.fullnameOverride | trunc 63 | trimSuffix "-" }}
{{- else }}
{{- $name := default .Chart.Name .Values.nameOverride }}
{{- if contains $name .Release.Name }}
{{- .Release.Name | trunc 63 | trimSuffix "-" }}
{{- else }}
{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" }}
{{- end }}
{{- end }}
{{- end }}
{{/*
Create chart name and version as used by the chart label.
*/}}
{{- define "mev-inspect-worker.chart" -}}
{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" }}
{{- end }}
{{/*
Common labels
*/}}
{{- define "mev-inspect-worker.labels" -}}
helm.sh/chart: {{ include "mev-inspect-worker.chart" . }}
{{ include "mev-inspect-worker.selectorLabels" . }}
{{- if .Chart.AppVersion }}
app.kubernetes.io/version: {{ .Chart.AppVersion | quote }}
{{- end }}
app.kubernetes.io/managed-by: {{ .Release.Service }}
{{- end }}
{{/*
Selector labels
*/}}
{{- define "mev-inspect-worker.selectorLabels" -}}
app.kubernetes.io/name: {{ include "mev-inspect-worker.name" . }}
app.kubernetes.io/instance: {{ .Release.Name }}
{{- end }}
{{/*
Create the name of the service account to use
*/}}
{{- define "mev-inspect-worker.serviceAccountName" -}}
{{- if .Values.serviceAccount.create }}
{{- default (include "mev-inspect-worker.fullname" .) .Values.serviceAccount.name }}
{{- else }}
{{- default "default" .Values.serviceAccount.name }}
{{- end }}
{{- end }}

View File

@ -0,0 +1,104 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ include "mev-inspect-worker.fullname" . }}
labels:
{{- include "mev-inspect-worker.labels" . | nindent 4 }}
spec:
replicas: {{ .Values.replicaCount }}
selector:
matchLabels:
{{- include "mev-inspect-worker.selectorLabels" . | nindent 6 }}
template:
metadata:
{{- with .Values.podAnnotations }}
annotations:
{{- toYaml . | nindent 8 }}
{{- end }}
labels:
{{- include "mev-inspect-worker.selectorLabels" . | nindent 8 }}
spec:
{{- with .Values.imagePullSecrets }}
imagePullSecrets:
{{- toYaml . | nindent 8 }}
{{- end }}
securityContext:
{{- toYaml .Values.podSecurityContext | nindent 8 }}
containers:
- name: {{ .Chart.Name }}
securityContext:
{{- toYaml .Values.securityContext | nindent 12 }}
image: "{{ .Values.image.repository }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
args: ["run", "dramatiq", "worker", "--threads=1", "--processes=1"]
livenessProbe:
exec:
command:
- ls
- /
initialDelaySeconds: 20
periodSeconds: 5
resources:
{{- toYaml .Values.resources | nindent 12 }}
env:
- name: POSTGRES_HOST
valueFrom:
secretKeyRef:
name: mev-inspect-db-credentials
key: host
- name: POSTGRES_USER
valueFrom:
secretKeyRef:
name: mev-inspect-db-credentials
key: username
- name: POSTGRES_PASSWORD
valueFrom:
secretKeyRef:
name: mev-inspect-db-credentials
key: password
- name: REDIS_PASSWORD
valueFrom:
secretKeyRef:
name: redis
key: redis-password
- name: TRACE_DB_HOST
valueFrom:
secretKeyRef:
name: trace-db-credentials
key: host
optional: true
- name: TRACE_DB_USER
valueFrom:
secretKeyRef:
name: trace-db-credentials
key: username
optional: true
- name: TRACE_DB_PASSWORD
valueFrom:
secretKeyRef:
name: trace-db-credentials
key: password
optional: true
- name: RPC_URL
valueFrom:
configMapKeyRef:
name: mev-inspect-rpc
key: url
- name: LISTENER_HEALTHCHECK_URL
valueFrom:
configMapKeyRef:
name: mev-inspect-listener-healthcheck
key: url
optional: true
{{- with .Values.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.affinity }}
affinity:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.tolerations }}
tolerations:
{{- toYaml . | nindent 8 }}
{{- end }}

View File

@ -0,0 +1,44 @@
# Default values for mev-inspect-workers
# This is a YAML-formatted file.
# Declare variables to be passed into your templates.
replicaCount: 2
image:
repository: mev-inspect-py:latest
pullPolicy: IfNotPresent
imagePullSecrets: []
nameOverride: ""
fullnameOverride: ""
podAnnotations: {}
podSecurityContext: {}
# fsGroup: 2000
securityContext: {}
# capabilities:
# drop:
# - ALL
# readOnlyRootFilesystem: true
# runAsNonRoot: true
# runAsUser: 1000
resources: {}
# We usually recommend not to specify default resources and to leave this as a conscious
# choice for the user. This also increases chances charts run on environments with little
# resources, such as Minikube. If you do want to specify resources, uncomment the following
# lines, adjust them as necessary, and remove the curly braces after 'resources:'.
# limits:
# cpu: 100m
# memory: 128Mi
# requests:
# cpu: 100m
# memory: 128Mi
nodeSelector: {}
tolerations: []
affinity: {}

View File

@ -56,6 +56,11 @@ spec:
secretKeyRef:
name: mev-inspect-db-credentials
key: password
- name: REDIS_PASSWORD
valueFrom:
secretKeyRef:
name: redis
key: redis-password
- name: TRACE_DB_HOST
valueFrom:
secretKeyRef:

View File

@ -34,20 +34,32 @@ def _get_engine(uri: str):
)
def _get_session(uri: str):
Session = sessionmaker(bind=_get_engine(uri))
return Session()
def _get_sessionmaker(uri: str):
return sessionmaker(bind=_get_engine(uri))
def get_inspect_session() -> orm.Session:
def get_inspect_sessionmaker():
uri = get_inspect_database_uri()
return _get_session(uri)
return _get_sessionmaker(uri)
def get_trace_session() -> Optional[orm.Session]:
def get_trace_sessionmaker():
uri = get_trace_database_uri()
if uri is not None:
return _get_session(uri)
return _get_sessionmaker(uri)
return None
def get_inspect_session() -> orm.Session:
Session = get_inspect_sessionmaker()
return Session()
def get_trace_session() -> Optional[orm.Session]:
Session = get_trace_sessionmaker()
if Session is not None:
return Session()
return None

80
poetry.lock generated
View File

@ -209,6 +209,20 @@ toolz = ">=0.8.0"
[package.extras]
cython = ["cython"]
[[package]]
name = "deprecated"
version = "1.2.13"
description = "Python @deprecated decorator to deprecate old python classes, functions or methods."
category = "main"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*"
[package.dependencies]
wrapt = ">=1.10,<2"
[package.extras]
dev = ["tox", "bump2version (<1)", "sphinx (<2)", "importlib-metadata (<3)", "importlib-resources (<4)", "configparser (<5)", "sphinxcontrib-websupport (<2)", "zipp (<2)", "PyTest (<5)", "PyTest-Cov (<2.6)", "pytest", "pytest-cov"]
[[package]]
name = "distlib"
version = "0.3.2"
@ -217,6 +231,27 @@ category = "dev"
optional = false
python-versions = "*"
[[package]]
name = "dramatiq"
version = "1.12.1"
description = "Background Processing for Python 3."
category = "main"
optional = false
python-versions = ">=3.6"
[package.dependencies]
prometheus-client = ">=0.2"
redis = {version = ">=2.0,<5.0", optional = true, markers = "extra == \"redis\""}
[package.extras]
all = ["redis (>=2.0,<5.0)", "pika (>=1.0,<2.0)", "watchdog", "pylibmc (>=1.5,<2.0)", "watchdog-gevent", "gevent (>=1.1)"]
dev = ["redis (>=2.0,<5.0)", "pika (>=1.0,<2.0)", "watchdog", "pylibmc (>=1.5,<2.0)", "watchdog-gevent", "gevent (>=1.1)", "alabaster", "sphinx (<1.8)", "sphinxcontrib-napoleon", "flake8", "flake8-bugbear", "flake8-quotes", "isort", "bumpversion", "hiredis", "twine", "wheel", "pytest", "pytest-benchmark", "pytest-cov", "tox"]
gevent = ["gevent (>=1.1)"]
memcached = ["pylibmc (>=1.5,<2.0)"]
rabbitmq = ["pika (>=1.0,<2.0)"]
redis = ["redis (>=2.0,<5.0)"]
watch = ["watchdog", "watchdog-gevent"]
[[package]]
name = "eth-abi"
version = "2.1.1"
@ -657,6 +692,17 @@ pyyaml = ">=5.1"
toml = "*"
virtualenv = ">=20.0.8"
[[package]]
name = "prometheus-client"
version = "0.12.0"
description = "Python client for the Prometheus monitoring system."
category = "main"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*"
[package.extras]
twisted = ["twisted"]
[[package]]
name = "protobuf"
version = "3.17.3"
@ -840,6 +886,20 @@ category = "dev"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, !=3.5.*"
[[package]]
name = "redis"
version = "4.0.2"
description = "Python client for Redis database and key-value store"
category = "main"
optional = false
python-versions = ">=3.6"
[package.dependencies]
deprecated = "*"
[package.extras]
hiredis = ["hiredis (>=1.0.0)"]
[[package]]
name = "regex"
version = "2021.10.8"
@ -1037,7 +1097,7 @@ python-versions = ">=3.6.1"
name = "wrapt"
version = "1.12.1"
description = "Module for decorators, wrappers and monkey patching."
category = "dev"
category = "main"
optional = false
python-versions = "*"
@ -1056,7 +1116,7 @@ multidict = ">=4.0"
[metadata]
lock-version = "1.1"
python-versions = "^3.9"
content-hash = "0aa43e887fe106d4142d68b7a891ba94f2de28df9df0ed765d285b1e5ccee391"
content-hash = "2ce3bdeb2d8bd31210026e5054a54c67fc766cdf22dc83485eca425643cdf760"
[metadata.files]
aiohttp = [
@ -1272,10 +1332,18 @@ cytoolz = [
{file = "cytoolz-0.11.0-cp39-cp39-win_amd64.whl", hash = "sha256:b61f23e9fa7cd5a87a503ab659f816858e2235926cd95b0c7e37403530d4a2d6"},
{file = "cytoolz-0.11.0.tar.gz", hash = "sha256:c64f3590c3eb40e1548f0d3c6b2ccde70493d0b8dc6cc7f9f3fec0bb3dcd4222"},
]
deprecated = [
{file = "Deprecated-1.2.13-py2.py3-none-any.whl", hash = "sha256:64756e3e14c8c5eea9795d93c524551432a0be75629f8f29e67ab8caf076c76d"},
{file = "Deprecated-1.2.13.tar.gz", hash = "sha256:43ac5335da90c31c24ba028af536a91d41d53f9e6901ddb021bcc572ce44e38d"},
]
distlib = [
{file = "distlib-0.3.2-py2.py3-none-any.whl", hash = "sha256:23e223426b28491b1ced97dc3bbe183027419dfc7982b4fa2f05d5f3ff10711c"},
{file = "distlib-0.3.2.zip", hash = "sha256:106fef6dc37dd8c0e2c0a60d3fca3e77460a48907f335fa28420463a6f799736"},
]
dramatiq = [
{file = "dramatiq-1.12.1-py3-none-any.whl", hash = "sha256:caf8f5baed6cb4afaf73b8379ffcd07f483de990b0f93f05d336d4efdcdfdecf"},
{file = "dramatiq-1.12.1.tar.gz", hash = "sha256:0aabb8e9164a7b88b3799319bbe294f9823eaf8b9fa9f701dd45affc8ea57bbe"},
]
eth-abi = [
{file = "eth_abi-2.1.1-py3-none-any.whl", hash = "sha256:78df5d2758247a8f0766a7cfcea4575bcfe568c34a33e6d05a72c328a9040444"},
{file = "eth_abi-2.1.1.tar.gz", hash = "sha256:4bb1d87bb6605823379b07f6c02c8af45df01a27cc85bd6abb7cf1446ce7d188"},
@ -1659,6 +1727,10 @@ pre-commit = [
{file = "pre_commit-2.14.0-py2.py3-none-any.whl", hash = "sha256:ec3045ae62e1aa2eecfb8e86fa3025c2e3698f77394ef8d2011ce0aedd85b2d4"},
{file = "pre_commit-2.14.0.tar.gz", hash = "sha256:2386eeb4cf6633712c7cc9ede83684d53c8cafca6b59f79c738098b51c6d206c"},
]
prometheus-client = [
{file = "prometheus_client-0.12.0-py2.py3-none-any.whl", hash = "sha256:317453ebabff0a1b02df7f708efbab21e3489e7072b61cb6957230dd004a0af0"},
{file = "prometheus_client-0.12.0.tar.gz", hash = "sha256:1b12ba48cee33b9b0b9de64a1047cbd3c5f2d0ab6ebcead7ddda613a750ec3c5"},
]
protobuf = [
{file = "protobuf-3.17.3-cp27-cp27m-macosx_10_9_x86_64.whl", hash = "sha256:ab6bb0e270c6c58e7ff4345b3a803cc59dbee19ddf77a4719c5b635f1d547aa8"},
{file = "protobuf-3.17.3-cp27-cp27mu-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:13ee7be3c2d9a5d2b42a1030976f760f28755fcf5863c55b1460fd205e6cd637"},
@ -1861,6 +1933,10 @@ pyyaml = [
{file = "PyYAML-5.4.1-cp39-cp39-win_amd64.whl", hash = "sha256:c20cfa2d49991c8b4147af39859b167664f2ad4561704ee74c1de03318e898db"},
{file = "PyYAML-5.4.1.tar.gz", hash = "sha256:607774cbba28732bfa802b54baa7484215f530991055bb562efbed5b2f20a45e"},
]
redis = [
{file = "redis-4.0.2-py3-none-any.whl", hash = "sha256:c8481cf414474e3497ec7971a1ba9b998c8efad0f0d289a009a5bbef040894f9"},
{file = "redis-4.0.2.tar.gz", hash = "sha256:ccf692811f2c1fc7a92b466aa2599e4a6d2d73d5f736a2c70be600657c0da34a"},
]
regex = [
{file = "regex-2021.10.8-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:094a905e87a4171508c2a0e10217795f83c636ccc05ddf86e7272c26e14056ae"},
{file = "regex-2021.10.8-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:981c786293a3115bc14c103086ae54e5ee50ca57f4c02ce7cf1b60318d1e8072"},

View File

@ -12,6 +12,7 @@ hexbytes = "^0.2.1"
click = "^8.0.1"
psycopg2 = "^2.9.1"
aiohttp = "^3.8.0"
dramatiq = {extras = ["redis"], version = "^1.12.1"}
[tool.poetry.dev-dependencies]
pre-commit = "^2.13.0"

68
worker.py Normal file
View File

@ -0,0 +1,68 @@
import asyncio
import os
from contextlib import contextmanager
import dramatiq
from dramatiq.brokers.redis import RedisBroker
from dramatiq.cli import main as dramatiq_worker
from dramatiq.middleware import Middleware
from mev_inspect.db import get_inspect_sessionmaker, get_trace_sessionmaker
from mev_inspect.inspector import MEVInspector
InspectSession = get_inspect_sessionmaker()
TraceSession = get_trace_sessionmaker()
class AsyncMiddleware(Middleware):
def before_process_message(
self, _broker, message
): # pylint: disable=unused-argument
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
def after_process_message(
self, _broker, message, *, result=None, exception=None
): # pylint: disable=unused-argument
self.loop.close()
rpc = os.environ["RPC_URL"]
broker = RedisBroker(host="redis-master", password=os.environ["REDIS_PASSWORD"])
broker.add_middleware(AsyncMiddleware())
dramatiq.set_broker(broker)
@contextmanager
def session_scope(Session=None):
if Session is None:
return None
with Session() as session:
yield session
@dramatiq.actor
def inspect_many_blocks_task(
after_block: int,
before_block: int,
):
with session_scope(InspectSession) as inspect_db_session:
with session_scope(TraceSession) as trace_db_session:
inspector = MEVInspector(
rpc,
inspect_db_session,
trace_db_session,
max_concurrency=5,
request_timeout=300,
)
asyncio.run(
inspector.inspect_many_blocks(
after_block=after_block, before_block=before_block
)
)
if __name__ == "__main__":
dramatiq_worker(processes=1, threads=1)