Compare commits

..

1 Commits

Author SHA1 Message Date
Gui Heise
19f203e198 Create migration 2021-12-13 21:52:08 -05:00
168 changed files with 2526 additions and 32660 deletions

View File

@ -21,7 +21,7 @@ jobs:
- name: Bootstrap poetry
shell: bash
run: |
curl -sSL https://install.python-poetry.org \
curl -sL https://raw.githubusercontent.com/python-poetry/poetry/master/install-poetry.py \
| python - -y
- name: Update PATH

6
.gitignore vendored
View File

@ -22,9 +22,3 @@ cache
# env
.envrc
# pycharm
.idea
.env
.python-version

View File

@ -1,16 +1,9 @@
repos:
- repo: https://github.com/ambv/black
rev: 22.3.0
rev: 20.8b1
hooks:
- id: black
language_version: python3.9
- repo: local
hooks:
- id: isort
name: isort
entry: poetry run isort .
language: system
types: [python]
- id: black
language_version: python3.9
- repo: local
hooks:
- id: pylint
@ -20,7 +13,7 @@ repos:
language: system
types: [python]
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v0.942
rev: v0.910
hooks:
- id: 'mypy'
additional_dependencies:

View File

@ -433,7 +433,7 @@ int-import-graph=
known-standard-library=
# Force import order to recognize a module as part of a third party library.
known-third-party=alembic
known-third-party=enchant
# Couples of modules and preferred modules, separated by a comma.
preferred-modules=

View File

@ -23,7 +23,7 @@ poetry run pre-commit install
Run tests with:
```
./mev test
kubectl exec deploy/mev-inspect-deployment -- poetry run pytest --cov=mev_inspect tests
```
## Send a pull request

View File

@ -1,26 +1,19 @@
FROM python:3.9-slim-buster
FROM python:3.9
ENV POETRY_VERSION=1.1.12
RUN useradd --create-home flashbot \
RUN pip install -U pip \
&& apt-get update \
&& apt-get install -y --no-install-recommends build-essential libffi-dev libpq-dev gcc procps \
&& pip install poetry==$POETRY_VERSION \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
&& curl -sSL https://raw.githubusercontent.com/python-poetry/poetry/master/get-poetry.py | python -
ENV PATH="${PATH}:/home/flashbot/.local/bin"
ENV PATH="${PATH}:/root/.poetry/bin"
COPY --chown=flashbot ./pyproject.toml /app/pyproject.toml
COPY --chown=flashbot ./poetry.lock /app/poetry.lock
COPY ./pyproject.toml /app/pyproject.toml
COPY ./poetry.lock /app/poetry.lock
WORKDIR /app/
USER flashbot
RUN poetry config virtualenvs.create false && \
poetry install
RUN poetry config virtualenvs.create false \
&& poetry install
COPY --chown=flashbot . /app
COPY . /app
# easter eggs 😝
RUN echo "PS1='🕵️:\[\033[1;36m\]\h \[\033[1;34m\]\W\[\033[0;35m\]\[\033[1;36m\]$ \[\033[0m\]'" >> ~/.bashrc

21
LICENSE
View File

@ -1,21 +0,0 @@
MIT License
Copyright (c) 2023 Flashbots
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@ -1,126 +0,0 @@
# Running mev-inspect-py without kubernetes ('monolithic mode')
Running mev-inspect-py outside of kubernetes can be useful for debug purposes. In this case, the steps for installation are:
1. Install dependencies (pyenv, poetry, postgres)
1. Set up python virtual environment using matching python version (3.9.x) and install required python modules using poetry
1. Create postgres database
1. Run database migrations
The database credentials and archive node address used by mev-inspect-py need to be loaded into environment variables (both for database migrations and to run mev-inspect-py).
## Ubuntu install instructions
So, starting from a clean Ubuntu 22.04 installation, the prerequisites for pyenv, psycopg2 (python3-dev libpq-dev) can be installed with
`sudo apt install -y make build-essential git libssl-dev zlib1g-dev libbz2-dev libreadline-dev libsqlite3-dev wget curl llvm libncurses5-dev libncursesw5-dev xz-utils tk-dev liblzma-dev python3-dev libpq-dev`
### pyenv
Install pyenv using the web installer
`curl https://pyenv.run | bash`
and add the following to `~/.bashrc` (if running locally) or `~/.profile` (if running over ssh).
```
export PYENV_ROOT="$HOME/.pyenv"
command -v pyenv >/dev/null || export PATH="$PYENV_ROOT/bin:$PATH"
eval "$(pyenv init -)"
```
Then update the current shell by running `source ~/.bashrc` or `source ~/.profile` as appropriate.
### Poetry
Install Poetry using the web installer
`curl -sSL https://install.python-poetry.org | python3 -`
add the following to `~/.bashrc` (if running locally) or `~/.profile` (if running over ssh)
`export PATH="/home/user/.local/bin:$PATH"`
If running over ssh you should also add the following to `~/.profile` to prevent [Poetry errors](https://github.com/python-poetry/poetry/issues/1917) from a lack of active keyring:
`export PYTHON_KEYRING_BACKEND=keyring.backends.null.Keyring`
Again update current shell by running `source ~/.bashrc` or `source ~/.profile` as appropriate.
### postgres
We have tested two alternatives for postgres - installing locally or as a container.
#### Option 1: Installing locally
To install locally from a clean Ubuntu 22.04 installation, run:
`sudo apt install postgresql postgresql-contrib`
Note: You may need to reconfigure your pg-hba.conf to allow local access.
#### Option 2: Installing docker
To avoid interfering with your local postgres instance, you may prefer to run postgres within a docker container.
For docker installation instructions, please refer to https://docs.docker.com/engine/install/ubuntu/
### mev-inspect-py
With all dependencies now installed, clone the mev-inspec-py repo
```
git clone https://github.com/flashbots/mev-inspect-py.git
cd mev-inspect-py
```
We now install the required pythn version and use Poetry to install the required python modules into a virtual environment.
```
pyenv install 3.9.16
pyenv local 3.9.16
poetry env use 3.9.16
poetry install
```
### Create database
mev-inspect-py outputs to a postgres database, so we need to set this up. There are various ways of doing this, two options are presented here.
#### Option 1 — Run postgres locally
```
sudo -u postgres psql
\password
postgres
create database mev_inspect;
\q
```
#### Option 2 — Use postgres docker image
To avoid interfering with your local postgres instance, you may prefer to run postgres within a docker container. First ensure that postgres is not currently running to ensure port `5432` is available:
`sudo systemctl stop postgresql`
and then start a containerised postgres instance:
`sudo docker run -d -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres -e POSTGRES_DB=mev_inspect postgres`
### Environment variables
We will need to set a few environment variables to use mev-inspect-py. **These will be required every time mev-inspect-py runs**, so again you may wish to add these to your `~/.bashrc` and/or `~/.profile` as appropriate. Note that you need to substitute the correct URL for your archive node below if you are not running Erigon locally.
```
export POSTGRES_USER=postgres
export POSTGRES_PASSWORD=postgres
export POSTGRES_HOST=localhost
export RPC_URL="http://127.0.0.1:8545"
```
### Database migrations
Finally run the database migrations and fetch price information:
```
poetry run alembic upgrade head
poetry run fetch-all-prices
```
## Usage instructions
The same functionality available through kubernetes can be run in 'monolithic mode', but the relevant functions now need to be invoked by Poetry directly. So to inspect a single block, run for example:
`poetry run inspect-block 16379706`
Or to inspect a range of blocks:
`poetry run inspect-many-blocks 16379606 16379706`
Or to run the test suite:
`poetry run pytest tests`

View File

@ -1,5 +1,3 @@
⚠️ This tool has been deprecated. You can visit [Flashbots Data](https://datasets.flashbots.net/) for historical mev-inspect data on Ethereum and join us on the [Flashbots forum](https://collective.flashbots.net). ⚠️
# mev-inspect-py
[![standard-readme compliant](https://img.shields.io/badge/readme%20style-standard-brightgreen.svg?style=flat-square)](https://github.com/RichardLitt/standard-readme)
@ -39,7 +37,7 @@ Set an environment variable `RPC_URL` to an RPC for fetching blocks.
mev-inspect-py currently requires a node with support for Erigon traces and receipts (not geth yet 😔).
[pokt.network](https://www.pokt.network/)'s "Ethereum Mainnet Archival with trace calls" is a good hosted option.
[pokt.network](pokt.network)'s "Ethereum Mainnet Archival with trace calls" is a good hosted option.
Example:
@ -62,16 +60,6 @@ On first startup, you'll need to apply database migrations with:
./mev exec alembic upgrade head
```
And load prices data
```
./mev prices fetch-all
```
## Monolithic (non-kubernetes) install instructions
For an alternative means of running mev-inspect-py for smaller set-ups or debug purposes see the [monolithic install instructions](MONOLITHIC.md).
## Usage
### Inspect a single block
@ -115,24 +103,11 @@ And stop the listener with:
### Backfilling
For larger backfills, you can inspect many blocks in parallel
For larger backfills, you can inspect many blocks in parallel using kubernetes
To inspect blocks 12914944 to 12915044, run
To inspect blocks 12914944 to 12915044 divided across 10 worker pods:
```
./mev backfill 12914944 12915044
```
This queues the blocks in Redis to be pulled off by the mev-inspect-worker service
To increase or decrease parallelism, update the replicaCount value for the mev-inspect-workers helm chart
Locally, this can be done by editing Tiltfile and changing "replicaCount=1" to your desired parallelism:
```
k8s_yaml(helm(
'./k8s/mev-inspect-workers',
name='mev-inspect-workers',
set=["replicaCount=1"],
))
./mev backfill 12914944 12915044 10
```
You can see worker pods spin up then complete by watching the status of all pods
@ -140,54 +115,12 @@ You can see worker pods spin up then complete by watching the status of all pods
watch kubectl get pods
```
To see progress and failed batches, connect to Redis with
To watch the logs for a given pod, take its pod name using the above, then run:
```
./mev redis
kubectl logs -f pod/mev-inspect-backfill-abcdefg
```
For total messages, query:
```
HLEN dramatiq:default.msgs
```
For messages failed and waiting to retry in the delay queue (DQ), query:
```
HGETALL dramatiq:default.DQ.msgs
```
For messages permanently failed in the dead letter queue (XQ), query:
```
HGETALL dramatiq:default.XQ.msgs
```
To clear the queue, delete keys for the main queue and delay queue
```
DEL dramatiq:default.msgs
DEL dramatiq:default.DQ.msgs
```
For more information on queues, see the [spec shared by dramatiq](https://github.com/Bogdanp/dramatiq/blob/24cbc0dc551797783f41b08ea461e1b5d23a4058/dramatiq/brokers/redis/dispatch.lua#L24-L43)
**Backfilling a list of blocks**
Create a file containing a block per row, for example blocks.txt containing:
```
12500000
12500001
12500002
```
Then queue the blocks with
```
cat blocks.txt | ./mev block-list
```
To watch the logs for a given worker pod, take its pod name using the above, then run:
```
kubectl logs -f pod/mev-inspect-worker-abcdefg
```
(where `mev-inspect-worker-abcdefg` is your actual pod name)
(where `mev-inspect-backfill-abcdefg` is your actual pod name)
### Exploring

View File

@ -5,13 +5,7 @@ load("ext://configmap", "configmap_from_dict")
helm_remote("postgresql",
repo_name="bitnami",
repo_url="https://charts.bitnami.com/bitnami",
set=["auth.postgresPassword=password", "auth.database=mev_inspect"],
)
helm_remote("redis",
repo_name="bitnami",
repo_url="https://charts.bitnami.com/bitnami",
set=["global.redis.password=password"],
set=["postgresqlPassword=password", "postgresqlDatabase=mev_inspect"],
)
k8s_yaml(configmap_from_dict("mev-inspect-rpc", inputs = {
@ -42,78 +36,15 @@ docker_build("mev-inspect-py", ".",
trigger="./pyproject.toml"),
],
)
k8s_yaml(helm(
'./k8s/mev-inspect',
name='mev-inspect',
set=[
"extraEnv[0].name=AWS_ACCESS_KEY_ID",
"extraEnv[0].value=foobar",
"extraEnv[1].name=AWS_SECRET_ACCESS_KEY",
"extraEnv[1].value=foobar",
"extraEnv[2].name=AWS_REGION",
"extraEnv[2].value=us-east-1",
"extraEnv[3].name=AWS_ENDPOINT_URL",
"extraEnv[3].value=http://localstack:4566",
],
))
k8s_yaml(helm(
'./k8s/mev-inspect-workers',
name='mev-inspect-workers',
set=[
"extraEnv[0].name=AWS_ACCESS_KEY_ID",
"extraEnv[0].value=foobar",
"extraEnv[1].name=AWS_SECRET_ACCESS_KEY",
"extraEnv[1].value=foobar",
"extraEnv[2].name=AWS_REGION",
"extraEnv[2].value=us-east-1",
"extraEnv[3].name=AWS_ENDPOINT_URL",
"extraEnv[3].value=http://localstack:4566",
"replicaCount=1",
],
))
k8s_resource(
workload="mev-inspect",
resource_deps=["postgresql", "redis-master"],
)
k8s_resource(
workload="mev-inspect-workers",
resource_deps=["postgresql", "redis-master"],
)
k8s_yaml(helm('./k8s/mev-inspect', name='mev-inspect'))
k8s_resource(workload="mev-inspect", resource_deps=["postgresql-postgresql"])
# uncomment to enable price monitor
# k8s_yaml(helm('./k8s/mev-inspect-prices', name='mev-inspect-prices'))
# k8s_resource(workload="mev-inspect-prices", resource_deps=["postgresql"])
# k8s_resource(workload="mev-inspect-prices", resource_deps=["postgresql-postgresql"])
local_resource(
'pg-port-forward',
serve_cmd='kubectl port-forward --namespace default svc/postgresql 5432:5432',
resource_deps=["postgresql"]
resource_deps=["postgresql-postgresql"]
)
# if using local S3 exports
#k8s_yaml(secret_from_dict("mev-inspect-export", inputs = {
# "export-bucket-name" : "local-export",
# "export-bucket-region": "us-east-1",
# "export-aws-access-key-id": "foobar",
# "export-aws-secret-access-key": "foobar",
#}))
#helm_remote(
# "localstack",
# repo_name="localstack-charts",
# repo_url="https://localstack.github.io/helm-charts",
#)
#
#local_resource(
# 'localstack-port-forward',
# serve_cmd='kubectl port-forward --namespace default svc/localstack 4566:4566',
# resource_deps=["localstack"]
#)
#
#k8s_yaml(configmap_from_dict("mev-inspect-export", inputs = {
# "services": "s3",
#}))

View File

@ -1,7 +1,9 @@
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
from sqlalchemy import engine_from_config, pool
from mev_inspect.db import get_inspect_database_uri

View File

@ -7,6 +7,7 @@ Create Date: 2021-11-02 22:42:01.702538
"""
from alembic import op
# revision identifiers, used by Alembic.
revision = "04a3bb3740c3"
down_revision = "a10d68643476"

View File

@ -8,6 +8,7 @@ Create Date: 2021-11-26 15:31:21.111693
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "04b76ab1d2af"
down_revision = "0cef835f7b36"

View File

@ -8,6 +8,7 @@ Create Date: 2021-11-26 18:25:13.402822
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "d498bdb0a641"
down_revision = "b9fa1ecc9929"

View File

@ -8,6 +8,7 @@ Create Date: 2021-08-30 17:42:25.548130
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "083978d6e455"
down_revision = "92f28a2b4f52"

View File

@ -7,6 +7,7 @@ Create Date: 2021-11-19 15:36:15.152622
"""
from alembic import op
# revision identifiers, used by Alembic.
revision = "0cef835f7b36"
down_revision = "5427d62a2cc0"

View File

@ -1,28 +0,0 @@
"""Add nullable transaction_position field to swaps and traces
Revision ID: 15ba9c27ee8a
Revises: 04b76ab1d2af
Create Date: 2021-12-02 18:24:18.218880
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "15ba9c27ee8a"
down_revision = "ead7eb8283b9"
branch_labels = None
depends_on = None
def upgrade():
op.add_column(
"classified_traces",
sa.Column("transaction_position", sa.Numeric, nullable=True),
)
op.add_column("swaps", sa.Column("transaction_position", sa.Numeric, nullable=True))
def downgrade():
op.drop_column("classified_traces", "transaction_position")
op.drop_column("swaps", "transaction_position")

View File

@ -8,6 +8,7 @@ Create Date: 2021-10-04 19:52:40.017084
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "205ce02374b3"
down_revision = "c8363617aa07"

View File

@ -8,6 +8,7 @@ Create Date: 2021-11-17 18:29:13.065944
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "2c90b2b8a80b"
down_revision = "04a3bb3740c3"

View File

@ -7,6 +7,7 @@ Create Date: 2021-09-14 11:11:41.559137
"""
from alembic import op
# revision identifiers, used by Alembic.
revision = "320e56b0a99f"
down_revision = "a02f3f2c469f"

View File

@ -7,6 +7,7 @@ Create Date: 2021-11-02 20:50:32.854996
"""
from alembic import op
# revision identifiers, used by Alembic.
revision = "3417f49d97b3"
down_revision = "205ce02374b3"

View File

@ -1,40 +0,0 @@
"""Create NFT Trades table
Revision ID: 3c54832385e3
Revises: 4b9d289f2d74
Create Date: 2021-12-19 22:50:28.936516
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "3c54832385e3"
down_revision = "4b9d289f2d74"
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
"nft_trades",
sa.Column("created_at", sa.TIMESTAMP, server_default=sa.func.now()),
sa.Column("abi_name", sa.String(1024), nullable=False),
sa.Column("transaction_hash", sa.String(66), nullable=False),
sa.Column("transaction_position", sa.Numeric, nullable=False),
sa.Column("block_number", sa.Numeric, nullable=False),
sa.Column("trace_address", sa.String(256), nullable=False),
sa.Column("protocol", sa.String(256), nullable=False),
sa.Column("error", sa.String(256), nullable=True),
sa.Column("seller_address", sa.String(256), nullable=False),
sa.Column("buyer_address", sa.String(256), nullable=False),
sa.Column("payment_token_address", sa.String(256), nullable=False),
sa.Column("payment_amount", sa.Numeric, nullable=False),
sa.Column("collection_address", sa.String(256), nullable=False),
sa.Column("token_id", sa.Numeric, nullable=False),
sa.PrimaryKeyConstraint("transaction_hash", "trace_address"),
)
def downgrade():
op.drop_table("nft_trades")

View File

@ -1,23 +0,0 @@
"""Add error column to liquidations
Revision ID: 4b9d289f2d74
Revises: 99d376cb93cc
Create Date: 2021-12-23 14:54:28.406159
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "4b9d289f2d74"
down_revision = "99d376cb93cc"
branch_labels = None
depends_on = None
def upgrade():
op.add_column("liquidations", sa.Column("error", sa.String(256), nullable=True))
def downgrade():
op.drop_column("liquidations", "error")

View File

@ -8,6 +8,7 @@ Create Date: 2021-11-26 20:35:58.954138
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "52d75a7e0533"
down_revision = "7cf0eeb41da0"

View File

@ -8,6 +8,7 @@ Create Date: 2021-11-19 13:25:11.252774
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "5427d62a2cc0"
down_revision = "d540242ae368"

View File

@ -1,32 +0,0 @@
"""Add block_number to nft_trades primary key
Revision ID: 5c5375de15fd
Revises: e616420acd18
Create Date: 2022-01-21 15:27:57.790340
"""
from alembic import op
# revision identifiers, used by Alembic.
revision = "5c5375de15fd"
down_revision = "e616420acd18"
branch_labels = None
depends_on = None
def upgrade():
op.execute("ALTER TABLE nft_trades DROP CONSTRAINT nft_trades_pkey")
op.create_primary_key(
"nft_trades_pkey",
"nft_trades",
["block_number", "transaction_hash", "trace_address"],
)
def downgrade():
op.execute("ALTER TABLE nft_trades DROP CONSTRAINT nft_trades_pkey")
op.create_primary_key(
"nft_trades_pkey",
"nft_trades",
["transaction_hash", "trace_address"],
)

View File

@ -1,22 +0,0 @@
"""Make gross profit nullable on summary
Revision ID: 630783c18a93
Revises: ab9a9e449ff9
Create Date: 2022-01-19 23:09:51.816948
"""
from alembic import op
# revision identifiers, used by Alembic.
revision = "630783c18a93"
down_revision = "ab9a9e449ff9"
branch_labels = None
depends_on = None
def upgrade():
op.alter_column("mev_summary", "gross_profit_usd", nullable=True)
def downgrade():
op.alter_column("mev_summary", "gross_profit_usd", nullable=False)

View File

@ -8,6 +8,7 @@ Create Date: 2021-11-26 20:27:28.936516
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "7cf0eeb41da0"
down_revision = "d498bdb0a641"

View File

@ -8,6 +8,7 @@ Create Date: 2021-08-06 15:58:04.556762
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "7eec417a4f3e"
down_revision = "9d8c69b3dccb"

View File

@ -8,6 +8,7 @@ Create Date: 2021-08-17 03:46:21.498821
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "92f28a2b4f52"
down_revision = "9b8ae51c5d56"

View File

@ -1,23 +0,0 @@
"""error column
Revision ID: 99d376cb93cc
Revises: c4a7620a2d33
Create Date: 2021-12-21 21:26:12.142484
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "99d376cb93cc"
down_revision = "c4a7620a2d33"
branch_labels = None
depends_on = None
def upgrade():
op.add_column("arbitrages", sa.Column("error", sa.String(256), nullable=True))
def downgrade():
op.drop_column("arbitrages", "error")

View File

@ -8,6 +8,7 @@ Create Date: 2021-08-06 17:06:55.364516
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "9b8ae51c5d56"
down_revision = "7eec417a4f3e"

View File

@ -8,6 +8,7 @@ Create Date: 2021-08-05 21:46:35.209199
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "9d8c69b3dccb"
down_revision = "2116e2f36a19"

View File

@ -8,6 +8,7 @@ Create Date: 2021-09-13 21:32:27.181344
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "a02f3f2c469f"
down_revision = "d70c08b4db6f"

View File

@ -7,6 +7,7 @@ Create Date: 2021-11-02 22:03:26.312317
"""
from alembic import op
# revision identifiers, used by Alembic.
revision = "a10d68643476"
down_revision = "3417f49d97b3"

View File

@ -1,40 +0,0 @@
"""Create mev_summary table
Revision ID: ab9a9e449ff9
Revises: b26ab0051a88
Create Date: 2022-01-18 18:36:42.865154
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "ab9a9e449ff9"
down_revision = "b26ab0051a88"
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
"mev_summary",
sa.Column("created_at", sa.TIMESTAMP, server_default=sa.func.now()),
sa.Column("block_number", sa.Numeric, nullable=False),
sa.Column("block_timestamp", sa.TIMESTAMP, nullable=False),
sa.Column("protocol", sa.String(256), nullable=True),
sa.Column("transaction_hash", sa.String(66), nullable=False),
sa.Column("type", sa.String(256), nullable=False),
sa.Column("gross_profit_usd", sa.Numeric, nullable=False),
sa.Column("miner_payment_usd", sa.Numeric, nullable=False),
sa.Column("gas_used", sa.Numeric, nullable=False),
sa.Column("gas_price", sa.Numeric, nullable=False),
sa.Column("coinbase_transfer", sa.Numeric, nullable=False),
sa.Column("gas_price_with_coinbase_transfer", sa.Numeric, nullable=False),
sa.Column("miner_address", sa.String(256), nullable=False),
sa.Column("base_fee_per_gas", sa.Numeric, nullable=False),
sa.Column("error", sa.String(256), nullable=True),
)
def downgrade():
op.drop_table("mev_summary")

View File

@ -1,27 +0,0 @@
"""add profit_amount column to sandwiches table
Revision ID: b26ab0051a88
Revises: 3c54832385e3
Create Date: 2022-01-16 13:45:10.190969
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "b26ab0051a88"
down_revision = "3c54832385e3"
branch_labels = None
depends_on = None
def upgrade():
op.add_column(
"sandwiches", sa.Column("profit_token_address", sa.String(256), nullable=True)
)
op.add_column("sandwiches", sa.Column("profit_amount", sa.Numeric, nullable=True))
def downgrade():
op.drop_column("sandwiches", "profit_token_address")
op.drop_column("sandwiches", "profit_amount")

View File

@ -8,6 +8,7 @@ Create Date: 2021-12-01 23:32:40.574108
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "b9fa1ecc9929"
down_revision = "04b76ab1d2af"

View File

@ -1,40 +0,0 @@
"""Add tokens to database
Revision ID: bba80d21c5a4
Revises: b26ab0051a88
Create Date: 2022-01-19 22:19:59.514998
"""
from alembic import op
# revision identifiers, used by Alembic.
revision = "bba80d21c5a4"
down_revision = "630783c18a93"
branch_labels = None
depends_on = None
def upgrade():
op.execute(
"""
INSERT INTO tokens (token_address,decimals) VALUES
('0x514910771af9ca656af840dff83e8264ecf986ca',18),
('0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2',18),
('0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee',18),
('0x0bc529c00c6401aef6d220be8c6ea1667f6ad93e',18),
('0x5d3a536e4d6dbd6114cc1ead35777bab948e3643',8),
('0x2260fac5e5542a773aa44fbcfedf7c193bc2c599',8),
('0x80fb784b7ed66730e8b1dbd9820afd29931aab03',18),
('0x4ddc2d193948926d02f9b1fe9e1daa0718270ed5',8),
('0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48',6),
('0xdac17f958d2ee523a2206206994597c13d831ec7',6),
('0x6b175474e89094c44da98b954eedeac495271d0f',18),
('0x0000000000085d4780b73119b644ae5ecd22b376',18),
('0x39aa39c021dfbae8fac545936693ac917d5e7563',8),
('0x7fc66500c84a76ad7e9c93437bfc5ac33e2ddae9',18);
"""
)
def downgrade():
op.execute("DELETE FROM tokens")

View File

@ -1,26 +0,0 @@
"""Add protocols column to arbitrages
Revision ID: bdbb545f6c03
Revises: bba80d21c5a4
Create Date: 2022-01-20 23:17:19.316008
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "bdbb545f6c03"
down_revision = "bba80d21c5a4"
branch_labels = None
depends_on = None
def upgrade():
op.add_column(
"arbitrages",
sa.Column("protocols", sa.ARRAY(sa.String(256)), server_default="{}"),
)
def downgrade():
op.drop_column("arbitrages", "protocols")

View File

@ -1,28 +0,0 @@
"""Create tokens table
Revision ID: c4a7620a2d33
Revises: 15ba9c27ee8a
Create Date: 2021-12-21 19:12:33.940117
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "c4a7620a2d33"
down_revision = "15ba9c27ee8a"
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
"tokens",
sa.Column("token_address", sa.String(256), nullable=False),
sa.Column("decimals", sa.Numeric, nullable=False),
sa.PrimaryKeyConstraint("token_address"),
)
def downgrade():
op.drop_table("tokens")

View File

@ -7,6 +7,7 @@ Create Date: 2021-07-30 17:37:27.335475
"""
from alembic import op
# revision identifiers, used by Alembic.
revision = "c5da44eb072c"
down_revision = "0660432b9840"

View File

@ -8,6 +8,7 @@ Create Date: 2021-09-29 14:00:06.857103
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "c8363617aa07"
down_revision = "cd96af55108e"

View File

@ -8,6 +8,7 @@ Create Date: 2021-09-17 12:44:45.245137
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "cd96af55108e"
down_revision = "320e56b0a99f"

View File

@ -8,6 +8,7 @@ Create Date: 2021-11-18 04:30:06.802857
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "d540242ae368"
down_revision = "2c90b2b8a80b"

View File

@ -8,6 +8,7 @@ Create Date: 2021-08-30 22:10:04.186251
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "d70c08b4db6f"
down_revision = "083978d6e455"

View File

@ -1,26 +0,0 @@
"""Add protocols column to mev_summary
Revision ID: e616420acd18
Revises: bdbb545f6c03
Create Date: 2022-01-21 00:11:51.516459
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "e616420acd18"
down_revision = "bdbb545f6c03"
branch_labels = None
depends_on = None
def upgrade():
op.add_column(
"mev_summary",
sa.Column("protocols", sa.ARRAY(sa.String(256)), server_default="{}"),
)
def downgrade():
op.drop_column("mev_summary", "protocols")

View File

@ -1,69 +0,0 @@
"""Create sandwiches and sandwiched swaps tables
Revision ID: ead7eb8283b9
Revises: a5d80460f0e6
Create Date: 2021-12-03 16:37:28.077158
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "ead7eb8283b9"
down_revision = "52d75a7e0533"
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
"sandwiches",
sa.Column("id", sa.String(256), primary_key=True),
sa.Column("created_at", sa.TIMESTAMP, server_default=sa.func.now()),
sa.Column("block_number", sa.Numeric, nullable=False),
sa.Column("sandwicher_address", sa.String(256), nullable=False),
sa.Column("frontrun_swap_transaction_hash", sa.String(256), nullable=False),
sa.Column("frontrun_swap_trace_address", sa.ARRAY(sa.Integer), nullable=False),
sa.Column("backrun_swap_transaction_hash", sa.String(256), nullable=False),
sa.Column("backrun_swap_trace_address", sa.ARRAY(sa.Integer), nullable=False),
)
op.create_index(
"ik_sandwiches_frontrun",
"sandwiches",
[
"block_number",
"frontrun_swap_transaction_hash",
"frontrun_swap_trace_address",
],
)
op.create_index(
"ik_sandwiches_backrun",
"sandwiches",
["block_number", "backrun_swap_transaction_hash", "backrun_swap_trace_address"],
)
op.create_table(
"sandwiched_swaps",
sa.Column("created_at", sa.TIMESTAMP, server_default=sa.func.now()),
sa.Column("sandwich_id", sa.String(1024), primary_key=True),
sa.Column("block_number", sa.Numeric, primary_key=True),
sa.Column("transaction_hash", sa.String(66), primary_key=True),
sa.Column("trace_address", sa.ARRAY(sa.Integer), primary_key=True),
sa.ForeignKeyConstraint(["sandwich_id"], ["sandwiches.id"], ondelete="CASCADE"),
)
op.create_index(
"ik_sandwiched_swaps_secondary",
"sandwiched_swaps",
["block_number", "transaction_hash", "trace_address"],
)
def downgrade():
op.drop_index("ik_sandwiched_swaps_secondary")
op.drop_table("sandwiched_swaps")
op.drop_index("ik_sandwiches_frontrun")
op.drop_index("ik_sandwiches_backrun")
op.drop_table("sandwiches")

View File

@ -0,0 +1,28 @@
"""Add block timestamp to liquidations
Revision ID: f19270a53410
Revises: 52d75a7e0533
Create Date: 2021-12-14 02:40:46.802125
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "f19270a53410"
down_revision = "52d75a7e0533"
branch_labels = None
depends_on = None
def upgrade():
op.add_column(
"liquidations", sa.Column("block_timestamp", sa.TIMESTAMP, nullable=True)
)
pass
def downgrade():
sa.drop_column("block_timestamp", "liquidations")
pass

57
backfill.py Normal file
View File

@ -0,0 +1,57 @@
import subprocess
import sys
from typing import Iterator, Tuple
def get_block_after_before_chunks(
after_block: int,
before_block: int,
n_workers: int,
) -> Iterator[Tuple[int, int]]:
n_blocks = before_block - after_block
remainder = n_blocks % n_workers
floor_chunk_size = n_blocks // n_workers
last_before_block = None
for worker_index in range(n_workers):
chunk_size = floor_chunk_size
if worker_index < remainder:
chunk_size += 1
batch_after_block = (
last_before_block if last_before_block is not None else after_block
)
batch_before_block = batch_after_block + chunk_size
yield batch_after_block, batch_before_block
last_before_block = batch_before_block
def backfill(after_block: int, before_block: int, n_workers: int):
if n_workers <= 0:
raise ValueError("Need at least one worker")
for batch_after_block, batch_before_block in get_block_after_before_chunks(
after_block,
before_block,
n_workers,
):
print(f"Backfilling {batch_after_block} to {batch_before_block}")
backfill_command = f"sh backfill.sh {batch_after_block} {batch_before_block}"
process = subprocess.Popen(backfill_command.split(), stdout=subprocess.PIPE)
output, _ = process.communicate()
print(output)
def main():
after_block = int(sys.argv[1])
before_block = int(sys.argv[2])
n_workers = int(sys.argv[3])
backfill(after_block, before_block, n_workers)
if __name__ == "__main__":
main()

6
backfill.sh Normal file
View File

@ -0,0 +1,6 @@
current_image=$(kubectl get deployment mev-inspect -o=jsonpath='{$.spec.template.spec.containers[:1].image}')
helm template mev-inspect-backfill ./k8s/mev-inspect-backfill \
--set image.repository=$current_image \
--set command.startBlockNumber=$1 \
--set command.endBlockNumber=$2 | kubectl apply -f -

141
cli.py
View File

@ -1,25 +1,14 @@
import fileinput
import logging
import os
import sys
from datetime import datetime
import click
import dramatiq
from mev_inspect.concurrency import coro
from mev_inspect.crud.prices import write_prices
from mev_inspect.db import get_inspect_session, get_trace_session
from mev_inspect.inspector import MEVInspector
from mev_inspect.prices import fetch_prices, fetch_prices_range
from mev_inspect.queue.broker import connect_broker
from mev_inspect.queue.tasks import (
LOW_PRIORITY,
LOW_PRIORITY_QUEUE,
backfill_export_task,
inspect_many_blocks_task,
)
from mev_inspect.s3_export import export_block
from mev_inspect.prices import fetch_all_supported_prices
RPC_URL_ENV = "RPC_URL"
@ -40,13 +29,8 @@ async def inspect_block_command(block_number: int, rpc: str):
inspect_db_session = get_inspect_session()
trace_db_session = get_trace_session()
inspector = MEVInspector(rpc)
await inspector.inspect_single_block(
inspect_db_session=inspect_db_session,
trace_db_session=trace_db_session,
block=block_number,
)
inspector = MEVInspector(rpc, inspect_db_session, trace_db_session)
await inspector.inspect_single_block(block=block_number)
@cli.command()
@ -54,14 +38,11 @@ async def inspect_block_command(block_number: int, rpc: str):
@click.option("--rpc", default=lambda: os.environ.get(RPC_URL_ENV, ""))
@coro
async def fetch_block_command(block_number: int, rpc: str):
inspect_db_session = get_inspect_session()
trace_db_session = get_trace_session()
inspector = MEVInspector(rpc)
block = await inspector.create_from_block(
block_number=block_number,
trace_db_session=trace_db_session,
)
inspector = MEVInspector(rpc, inspect_db_session, trace_db_session)
block = await inspector.create_from_block(block_number=block_number)
print(block.json())
@ -91,121 +72,23 @@ async def inspect_many_blocks_command(
inspector = MEVInspector(
rpc,
inspect_db_session,
trace_db_session,
max_concurrency=max_concurrency,
request_timeout=request_timeout,
)
await inspector.inspect_many_blocks(
inspect_db_session=inspect_db_session,
trace_db_session=trace_db_session,
after_block=after_block,
before_block=before_block,
after_block=after_block, before_block=before_block
)
@cli.command()
def enqueue_block_list_command():
broker = connect_broker()
inspect_many_blocks_actor = dramatiq.actor(
inspect_many_blocks_task,
broker=broker,
queue_name=LOW_PRIORITY_QUEUE,
priority=LOW_PRIORITY,
)
for block_string in fileinput.input():
block = int(block_string)
logger.info(f"Sending {block} to {block+1}")
inspect_many_blocks_actor.send(block, block + 1)
@cli.command()
@click.argument("start_block", type=int)
@click.argument("end_block", type=int)
@click.argument("batch_size", type=int, default=10)
def enqueue_many_blocks_command(start_block: int, end_block: int, batch_size: int):
broker = connect_broker()
inspect_many_blocks_actor = dramatiq.actor(
inspect_many_blocks_task,
broker=broker,
queue_name=LOW_PRIORITY_QUEUE,
priority=LOW_PRIORITY,
)
if start_block < end_block:
after_block = start_block
before_block = end_block
for batch_after_block in range(after_block, before_block, batch_size):
batch_before_block = min(batch_after_block + batch_size, before_block)
logger.info(f"Sending {batch_after_block} to {batch_before_block}")
inspect_many_blocks_actor.send(batch_after_block, batch_before_block)
else:
after_block = end_block
before_block = start_block
for batch_before_block in range(before_block, after_block, -1 * batch_size):
batch_after_block = max(batch_before_block - batch_size, after_block)
logger.info(f"Sending {batch_after_block} to {batch_before_block}")
inspect_many_blocks_actor.send(batch_after_block, batch_before_block)
@cli.command()
def fetch_all_prices():
@coro
async def fetch_all_prices():
inspect_db_session = get_inspect_session()
logger.info("Fetching prices")
prices = fetch_prices()
logger.info("Writing prices")
write_prices(inspect_db_session, prices)
@cli.command()
@click.argument("block_number", type=int)
def enqueue_s3_export(block_number: int):
broker = connect_broker()
export_actor = dramatiq.actor(
backfill_export_task,
broker=broker,
queue_name=LOW_PRIORITY_QUEUE,
priority=LOW_PRIORITY,
)
logger.info(f"Sending block {block_number} export to queue")
export_actor.send(block_number)
@cli.command()
@click.argument("after_block", type=int)
@click.argument("before_block", type=int)
def enqueue_many_s3_exports(after_block: int, before_block: int):
broker = connect_broker()
export_actor = dramatiq.actor(
backfill_export_task,
broker=broker,
queue_name=LOW_PRIORITY_QUEUE,
priority=LOW_PRIORITY,
)
logger.info(f"Sending blocks {after_block} to {before_block} to queue")
for block_number in range(after_block, before_block):
export_actor.send(block_number)
@cli.command()
@click.argument("block_number", type=int)
def s3_export(block_number: int):
inspect_db_session = get_inspect_session()
logger.info(f"Exporting {block_number}")
export_block(inspect_db_session, block_number)
@cli.command()
@click.argument("after", type=click.DateTime(formats=["%Y-%m-%d", "%m-%d-%Y"]))
@click.argument("before", type=click.DateTime(formats=["%Y-%m-%d", "%m-%d-%Y"]))
def fetch_range(after: datetime, before: datetime):
inspect_db_session = get_inspect_session()
logger.info("Fetching prices")
prices = fetch_prices_range(after, before)
prices = await fetch_all_supported_prices()
logger.info("Writing prices")
write_prices(inspect_db_session, prices)

View File

@ -1,5 +1,5 @@
apiVersion: v2
name: mev-inspect-workers
name: mev-inspect-backfill
description: A Helm chart for Kubernetes
# A chart can be either an 'application' or a 'library' chart.

View File

@ -1,7 +1,7 @@
{{/*
Expand the name of the chart.
*/}}
{{- define "mev-inspect-worker.name" -}}
{{- define "mev-inspect-backfill.name" -}}
{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" }}
{{- end }}
@ -10,7 +10,7 @@ Create a default fully qualified app name.
We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec).
If release name contains chart name it will be used as a full name.
*/}}
{{- define "mev-inspect-worker.fullname" -}}
{{- define "mev-inspect-backfill.fullname" -}}
{{- if .Values.fullnameOverride }}
{{- .Values.fullnameOverride | trunc 63 | trimSuffix "-" }}
{{- else }}
@ -26,16 +26,16 @@ If release name contains chart name it will be used as a full name.
{{/*
Create chart name and version as used by the chart label.
*/}}
{{- define "mev-inspect-worker.chart" -}}
{{- define "mev-inspect-backfill.chart" -}}
{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" }}
{{- end }}
{{/*
Common labels
*/}}
{{- define "mev-inspect-worker.labels" -}}
helm.sh/chart: {{ include "mev-inspect-worker.chart" . }}
{{ include "mev-inspect-worker.selectorLabels" . }}
{{- define "mev-inspect-backfill.labels" -}}
helm.sh/chart: {{ include "mev-inspect-backfill.chart" . }}
{{ include "mev-inspect-backfill.selectorLabels" . }}
{{- if .Chart.AppVersion }}
app.kubernetes.io/version: {{ .Chart.AppVersion | quote }}
{{- end }}
@ -45,17 +45,17 @@ app.kubernetes.io/managed-by: {{ .Release.Service }}
{{/*
Selector labels
*/}}
{{- define "mev-inspect-worker.selectorLabels" -}}
app.kubernetes.io/name: {{ include "mev-inspect-worker.name" . }}
{{- define "mev-inspect-backfill.selectorLabels" -}}
app.kubernetes.io/name: {{ include "mev-inspect-backfill.name" . }}
app.kubernetes.io/instance: {{ .Release.Name }}
{{- end }}
{{/*
Create the name of the service account to use
*/}}
{{- define "mev-inspect-worker.serviceAccountName" -}}
{{- define "mev-inspect-backfill.serviceAccountName" -}}
{{- if .Values.serviceAccount.create }}
{{- default (include "mev-inspect-worker.fullname" .) .Values.serviceAccount.name }}
{{- default (include "mev-inspect-backfill.fullname" .) .Values.serviceAccount.name }}
{{- else }}
{{- default "default" .Values.serviceAccount.name }}
{{- end }}

View File

@ -0,0 +1,68 @@
apiVersion: batch/v1
kind: Job
metadata:
name: {{ include "mev-inspect-backfill.fullname" . }}-{{ randAlphaNum 5 | lower }}
labels:
{{- include "mev-inspect-backfill.labels" . | nindent 4 }}
spec:
completions: 1
parallelism: 1
ttlSecondsAfterFinished: 5
template:
metadata:
{{- with .Values.podAnnotations }}
annotations:
{{- toYaml . | nindent 8 }}
{{- end }}
spec:
containers:
- name: {{ .Chart.Name }}
securityContext:
{{- toYaml .Values.securityContext | nindent 12 }}
image: "{{ .Values.image.repository }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
args:
- run
- inspect-many-blocks
- {{ .Values.command.startBlockNumber | quote }}
- {{ .Values.command.endBlockNumber | quote }}
env:
- name: POSTGRES_HOST
valueFrom:
secretKeyRef:
name: mev-inspect-db-credentials
key: host
- name: POSTGRES_USER
valueFrom:
secretKeyRef:
name: mev-inspect-db-credentials
key: username
- name: POSTGRES_PASSWORD
valueFrom:
secretKeyRef:
name: mev-inspect-db-credentials
key: password
- name: TRACE_DB_HOST
valueFrom:
secretKeyRef:
name: trace-db-credentials
key: host
optional: true
- name: TRACE_DB_USER
valueFrom:
secretKeyRef:
name: trace-db-credentials
key: username
optional: true
- name: TRACE_DB_PASSWORD
valueFrom:
secretKeyRef:
name: trace-db-credentials
key: password
optional: true
- name: RPC_URL
valueFrom:
configMapKeyRef:
name: mev-inspect-rpc
key: url
restartPolicy: OnFailure

View File

@ -1,11 +1,9 @@
# Default values for mev-inspect-workers
# Default values for mev-inspect.
# This is a YAML-formatted file.
# Declare variables to be passed into your templates.
replicaCount: 1
image:
repository: mev-inspect-py:latest
repository: mev-inspect-py
pullPolicy: IfNotPresent
imagePullSecrets: []
@ -17,14 +15,13 @@ podAnnotations: {}
podSecurityContext: {}
# fsGroup: 2000
securityContext:
allowPrivilegeEscalation: false
capabilities:
drop:
- ALL
securityContext: {}
# capabilities:
# drop:
# - ALL
# readOnlyRootFilesystem: true
runAsNonRoot: true
runAsUser: 1000
# runAsNonRoot: true
# runAsUser: 1000
resources: {}
# We usually recommend not to specify default resources and to leave this as a conscious

View File

@ -1,133 +0,0 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ include "mev-inspect-worker.fullname" . }}
labels:
{{- include "mev-inspect-worker.labels" . | nindent 4 }}
spec:
replicas: {{ .Values.replicaCount }}
selector:
matchLabels:
{{- include "mev-inspect-worker.selectorLabels" . | nindent 6 }}
template:
metadata:
{{- with .Values.podAnnotations }}
annotations:
{{- toYaml . | nindent 8 }}
{{- end }}
labels:
{{- include "mev-inspect-worker.selectorLabels" . | nindent 8 }}
spec:
{{- with .Values.imagePullSecrets }}
imagePullSecrets:
{{- toYaml . | nindent 8 }}
{{- end }}
securityContext:
{{- toYaml .Values.podSecurityContext | nindent 8 }}
containers:
- name: {{ .Chart.Name }}
securityContext:
{{- toYaml .Values.securityContext | nindent 12 }}
image: "{{ .Values.image.repository }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
args: ["run", "dramatiq", "worker", "--threads=1", "--processes=1"]
livenessProbe:
exec:
command:
- ls
- /
initialDelaySeconds: 20
periodSeconds: 10
timeoutSeconds: 5
resources:
{{- toYaml .Values.resources | nindent 12 }}
env:
- name: POSTGRES_HOST
valueFrom:
secretKeyRef:
name: mev-inspect-db-credentials
key: host
- name: POSTGRES_USER
valueFrom:
secretKeyRef:
name: mev-inspect-db-credentials
key: username
- name: POSTGRES_PASSWORD
valueFrom:
secretKeyRef:
name: mev-inspect-db-credentials
key: password
- name: REDIS_PASSWORD
valueFrom:
secretKeyRef:
name: redis
key: redis-password
- name: TRACE_DB_HOST
valueFrom:
secretKeyRef:
name: trace-db-credentials
key: host
optional: true
- name: TRACE_DB_USER
valueFrom:
secretKeyRef:
name: trace-db-credentials
key: username
optional: true
- name: TRACE_DB_PASSWORD
valueFrom:
secretKeyRef:
name: trace-db-credentials
key: password
optional: true
- name: RPC_URL
valueFrom:
configMapKeyRef:
name: mev-inspect-rpc
key: url
- name: LISTENER_HEALTHCHECK_URL
valueFrom:
configMapKeyRef:
name: mev-inspect-listener-healthcheck
key: url
optional: true
- name: EXPORT_BUCKET_NAME
valueFrom:
secretKeyRef:
name: mev-inspect-export
key: export-bucket-name
optional: true
- name: EXPORT_BUCKET_REGION
valueFrom:
secretKeyRef:
name: mev-inspect-export
key: export-bucket-region
optional: true
- name: EXPORT_AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: mev-inspect-export
key: export-aws-access-key-id
optional: true
- name: EXPORT_AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: mev-inspect-export
key: export-aws-secret-access-key
optional: true
{{- range .Values.extraEnv }}
- name: {{ .name }}
value: {{ .value }}
{{- end }}
{{- with .Values.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.affinity }}
affinity:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.tolerations }}
tolerations:
{{- toYaml . | nindent 8 }}
{{- end }}

View File

@ -37,8 +37,7 @@ spec:
- ls
- /
initialDelaySeconds: 20
periodSeconds: 10
timeoutSeconds: 5
periodSeconds: 5
resources:
{{- toYaml .Values.resources | nindent 12 }}
env:
@ -57,11 +56,6 @@ spec:
secretKeyRef:
name: mev-inspect-db-credentials
key: password
- name: REDIS_PASSWORD
valueFrom:
secretKeyRef:
name: redis
key: redis-password
- name: TRACE_DB_HOST
valueFrom:
secretKeyRef:
@ -91,34 +85,6 @@ spec:
name: mev-inspect-listener-healthcheck
key: url
optional: true
- name: EXPORT_BUCKET_NAME
valueFrom:
secretKeyRef:
name: mev-inspect-export
key: export-bucket-name
optional: true
- name: EXPORT_BUCKET_REGION
valueFrom:
secretKeyRef:
name: mev-inspect-export
key: export-bucket-region
optional: true
- name: EXPORT_AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: mev-inspect-export
key: export-aws-access-key-id
optional: true
- name: EXPORT_AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: mev-inspect-export
key: export-aws-secret-access-key
optional: true
{{- range .Values.extraEnv }}
- name: {{ .name }}
value: {{ .value }}
{{- end }}
{{- with .Values.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}

View File

@ -17,15 +17,13 @@ podAnnotations: {}
podSecurityContext: {}
# fsGroup: 2000
securityContext:
allowPrivilegeEscalation: false
capabilities:
drop:
- all
#readOnlyRootFilesystem: true
runAsNonRoot: true
runAsUser: 1000
securityContext: {}
# capabilities:
# drop:
# - ALL
# readOnlyRootFilesystem: true
# runAsNonRoot: true
# runAsUser: 1000
resources: {}
# We usually recommend not to specify default resources and to leave this as a conscious

View File

@ -3,9 +3,9 @@
set -e
NAME=listener
PIDFILE=/home/flashbot/$NAME.pid
DAEMON=/bin/bash
DAEMON_OPTS='-c "poetry run python listener.py"'
PIDFILE=/var/run/$NAME.pid
DAEMON=/root/.poetry/bin/poetry
DAEMON_OPTS="run python listener.py"
case "$1" in
start)
@ -13,18 +13,16 @@ case "$1" in
start-stop-daemon \
--background \
--chdir /app \
--chuid flashbot \
--start \
--quiet \
--pidfile $PIDFILE \
--make-pidfile \
--startas /bin/bash -- -c "poetry run python listener.py"
--startas $DAEMON -- $DAEMON_OPTS
echo "."
;;
stop)
echo -n "Stopping daemon: "$NAME
start-stop-daemon --stop --quiet --oknodo --pidfile $PIDFILE
rm $PIDFILE
echo "."
;;
tail)
@ -33,16 +31,14 @@ case "$1" in
restart)
echo -n "Restarting daemon: "$NAME
start-stop-daemon --stop --quiet --oknodo --retry 30 --pidfile $PIDFILE
rm $PIDFILE
start-stop-daemon \
--background \
--chdir /app \
--chuid flashbot \
--start \
--quiet \
--pidfile $PIDFILE \
--make-pidfile \
--startas /bin/bash -- -c "poetry run python listener.py"
--startas $DAEMON -- $DAEMON_OPTS
echo "."
;;

View File

@ -2,8 +2,7 @@ import asyncio
import logging
import os
import dramatiq
from aiohttp_retry import ExponentialRetry, RetryClient
import aiohttp
from mev_inspect.block import get_latest_block_number
from mev_inspect.concurrency import coro
@ -14,14 +13,9 @@ from mev_inspect.crud.latest_block_update import (
from mev_inspect.db import get_inspect_session, get_trace_session
from mev_inspect.inspector import MEVInspector
from mev_inspect.provider import get_base_provider
from mev_inspect.queue.broker import connect_broker
from mev_inspect.queue.tasks import (
HIGH_PRIORITY,
HIGH_PRIORITY_QUEUE,
realtime_export_task,
)
from mev_inspect.signal_handler import GracefulKiller
logging.basicConfig(filename="listener.log", filemode="a", level=logging.INFO)
logger = logging.getLogger(__name__)
@ -44,25 +38,15 @@ async def run():
inspect_db_session = get_inspect_session()
trace_db_session = get_trace_session()
broker = connect_broker()
export_actor = dramatiq.actor(
realtime_export_task,
broker=broker,
queue_name=HIGH_PRIORITY_QUEUE,
priority=HIGH_PRIORITY,
)
inspector = MEVInspector(rpc)
inspector = MEVInspector(rpc, inspect_db_session, trace_db_session)
base_provider = get_base_provider(rpc)
while not killer.kill_now:
await inspect_next_block(
inspector,
inspect_db_session,
trace_db_session,
base_provider,
healthcheck_url,
export_actor,
)
logger.info("Stopping...")
@ -71,12 +55,9 @@ async def run():
async def inspect_next_block(
inspector: MEVInspector,
inspect_db_session,
trace_db_session,
base_provider,
healthcheck_url,
export_actor,
):
latest_block_number = await get_latest_block_number(base_provider)
last_written_block = find_latest_block_update(inspect_db_session)
@ -85,24 +66,20 @@ async def inspect_next_block(
if last_written_block is None:
# maintain lag if no blocks written yet
last_written_block = latest_block_number - BLOCK_NUMBER_LAG - 1
last_written_block = latest_block_number - 1
if last_written_block < (latest_block_number - BLOCK_NUMBER_LAG):
block_number = last_written_block + 1
block_number = (
latest_block_number
if last_written_block is None
else last_written_block + 1
)
logger.info(f"Writing block: {block_number}")
await inspector.inspect_single_block(
inspect_db_session=inspect_db_session,
trace_db_session=trace_db_session,
block=block_number,
)
await inspector.inspect_single_block(block=block_number)
update_latest_block(inspect_db_session, block_number)
logger.info(f"Sending block {block_number} for export")
export_actor.send(block_number)
if healthcheck_url:
await ping_healthcheck_url(healthcheck_url)
else:
@ -110,12 +87,8 @@ async def inspect_next_block(
async def ping_healthcheck_url(url):
retry_options = ExponentialRetry(attempts=3)
async with RetryClient(
raise_for_status=False, retry_options=retry_options
) as client:
async with client.get(url) as _response:
async with aiohttp.ClientSession() as session:
async with session.get(url):
pass

View File

@ -3,6 +3,7 @@ import time
from mev_inspect.signal_handler import GracefulKiller
logging.basicConfig(filename="loop.log", level=logging.INFO)
logger = logging.getLogger(__name__)

78
mev
View File

@ -4,57 +4,36 @@ set -e
DB_NAME=mev_inspect
function get_kube_secret(){
kubectl get secrets $1 -o jsonpath="{.data.$2}" | base64 --decode
}
function get_kube_db_secret(){
kubectl get secrets mev-inspect-db-credentials -o jsonpath="{.data.$1}" | base64 --decode
}
function db(){
host=$(get_kube_secret "mev-inspect-db-credentials" "host")
username=$(get_kube_secret "mev-inspect-db-credentials" "username")
password=$(get_kube_secret "mev-inspect-db-credentials" "password")
host=$(get_kube_db_secret "host")
username=$(get_kube_db_secret "username")
password=$(get_kube_db_secret "password")
kubectl run -i --rm --tty postgres-client-$RANDOM \
kubectl run -i --rm --tty postgres-client \
--env="PGPASSWORD=$password" \
--image=jbergknoff/postgresql-client \
-- $DB_NAME --host=$host --user=$username
}
function redis(){
echo "To continue, enter 'shift + r'"
redis_password=$(get_kube_secret "redis" "redis-password")
kubectl run -i --rm --tty \
--namespace default redis-client-$RANDOM \
--env REDIS_PASSWORD=$redis_password \
--image docker.io/bitnami/redis:6.2.6-debian-10-r0 \
--command -- redis-cli -h redis-master -a $redis_password
}
case "$1" in
db)
echo "Connecting to $DB_NAME"
db
;;
redis)
echo "Connecting to redis"
redis
;;
listener)
kubectl exec -ti deploy/mev-inspect -- ./listener $2
;;
block-list)
echo "Backfilling blocks from stdin"
kubectl exec -i deploy/mev-inspect -- poetry run enqueue-block-list
;;
backfill)
after_block_number=$2
before_block_number=$3
start_block_number=$2
end_block_number=$3
n_workers=$4
echo "Backfilling from $after_block_number to $before_block_number"
kubectl exec -ti deploy/mev-inspect -- poetry run enqueue-many-blocks $after_block_number $before_block_number
echo "Backfilling from $start_block_number to $end_block_number with $n_workers workers"
python backfill.py $start_block_number $end_block_number $n_workers
;;
inspect)
block_number=$2
@ -62,16 +41,15 @@ case "$1" in
kubectl exec -ti deploy/mev-inspect -- poetry run inspect-block $block_number
;;
inspect-many)
after_block_number=$2
before_block_number=$3
echo "Inspecting from block $after_block_number to $before_block_number"
start_block_number=$2
end_block_number=$3
echo "Inspecting from block $start_block_number to $end_block_number"
kubectl exec -ti deploy/mev-inspect -- \
poetry run inspect-many-blocks $after_block_number $before_block_number
poetry run inspect-many-blocks $start_block_number $end_block_number
;;
test)
shift
echo "Running tests"
kubectl exec -ti deploy/mev-inspect -- poetry run pytest tests $@
kubectl exec -ti deploy/mev-inspect -- poetry run pytest tests
;;
fetch)
block_number=$2
@ -86,37 +64,11 @@ case "$1" in
kubectl exec -ti deploy/mev-inspect -- \
poetry run fetch-all-prices
;;
fetch-range)
after=$2
before=$3
echo "Running price fetch-range"
kubectl exec -ti deploy/mev-inspect -- \
poetry run fetch-range $after $before
;;
*)
*)
echo "prices usage: "$1" {fetch-all}"
exit 1
esac
;;
backfill-export)
after_block=$2
before_block=$3
echo "Sending $after_block to $before_block export to queue"
kubectl exec -ti deploy/mev-inspect -- poetry run enqueue-many-s3-exports $after_block $before_block
;;
enqueue-s3-export)
block_number=$2
echo "Sending $block_number export to queue"
kubectl exec -ti deploy/mev-inspect -- poetry run enqueue-s3-export $block_number
;;
s3-export)
block_number=$2
echo "Exporting $block_number"
kubectl exec -ti deploy/mev-inspect -- poetry run s3-export $block_number
;;
exec)
shift
kubectl exec -ti deploy/mev-inspect -- $@

View File

@ -0,0 +1,105 @@
from typing import List, Tuple, Optional
from mev_inspect.traces import (
get_child_traces,
is_child_of_any_address,
)
from mev_inspect.schemas.traces import (
ClassifiedTrace,
CallTrace,
DecodedCallTrace,
Classification,
Protocol,
)
from mev_inspect.transfers import get_transfer
from mev_inspect.schemas.transfers import Transfer
from mev_inspect.schemas.liquidations import Liquidation
AAVE_CONTRACT_ADDRESSES: List[str] = [
# AAVE Proxy
"0x398ec7346dcd622edc5ae82352f02be94c62d119",
# AAVE V2
"0x7d2768de32b0b80b7a3454c06bdac94a69ddc7a9",
# AAVE V1
"0x3dfd23a6c5e8bbcfc9581d2e864a68feb6a076d3",
# AAVE V2 WETH
"0x030ba81f1c18d280636f32af80b9aad02cf0854e",
# AAVE AMM Market DAI
"0x79be75ffc64dd58e66787e4eae470c8a1fd08ba4",
# AAVE i
"0x030ba81f1c18d280636f32af80b9aad02cf0854e",
"0xbcca60bb61934080951369a648fb03df4f96263c",
]
def get_aave_liquidations(
traces: List[ClassifiedTrace],
) -> List[Liquidation]:
"""Inspect list of classified traces and identify liquidation"""
liquidations: List[Liquidation] = []
parent_liquidations: List[List[int]] = []
for trace in traces:
if (
trace.classification == Classification.liquidate
and isinstance(trace, DecodedCallTrace)
and not is_child_of_any_address(trace, parent_liquidations)
and trace.protocol == Protocol.aave
):
parent_liquidations.append(trace.trace_address)
liquidator = trace.from_address
child_traces = get_child_traces(
trace.transaction_hash, trace.trace_address, traces
)
(
received_token_address,
received_amount,
) = _get_payback_token_and_amount(trace, child_traces, liquidator)
liquidations.append(
Liquidation(
liquidated_user=trace.inputs["_user"],
debt_token_address=trace.inputs["_reserve"],
liquidator_user=liquidator,
debt_purchase_amount=trace.inputs["_purchaseAmount"],
protocol=Protocol.aave,
received_amount=received_amount,
received_token_address=received_token_address,
transaction_hash=trace.transaction_hash,
trace_address=trace.trace_address,
block_number=trace.block_number,
)
)
return liquidations
def _get_payback_token_and_amount(
liquidation: DecodedCallTrace, child_traces: List[ClassifiedTrace], liquidator: str
) -> Tuple[str, int]:
"""Look for and return liquidator payback from liquidation"""
for child in child_traces:
if isinstance(child, CallTrace):
child_transfer: Optional[Transfer] = get_transfer(child)
if child_transfer is not None:
if (
child_transfer.to_address == liquidator
and child.from_address in AAVE_CONTRACT_ADDRESSES
):
return child_transfer.token_address, child_transfer.amount
return liquidation.inputs["_collateral"], 0

View File

@ -7,6 +7,7 @@ from pydantic import parse_obj_as
from mev_inspect.schemas.abi import ABI
from mev_inspect.schemas.traces import Protocol
THIS_FILE_DIRECTORY = Path(__file__).parents[0]
ABI_DIRECTORY_PATH = THIS_FILE_DIRECTORY / "abis"

File diff suppressed because one or more lines are too long

View File

@ -1,11 +1,8 @@
from itertools import groupby
from typing import List, Optional, Tuple
from typing import List, Tuple
from mev_inspect.schemas.arbitrages import Arbitrage
from mev_inspect.schemas.swaps import Swap
from mev_inspect.utils import equal_within_percent
MAX_TOKEN_AMOUNT_PERCENT_DIFFERENCE = 0.01
def get_arbitrages(swaps: List[Swap]) -> List[Arbitrage]:
@ -48,23 +45,17 @@ def _get_arbitrages_from_swaps(swaps: List[Swap]) -> List[Arbitrage]:
if len(start_ends) == 0:
return []
used_swaps: List[Swap] = []
# for (start, end) in filtered_start_ends:
for (start, end) in start_ends:
potential_intermediate_swaps = [
swap for swap in swaps if swap is not start and swap is not end
]
routes = _get_all_routes(start, end, potential_intermediate_swaps)
for (start, ends) in start_ends:
if start in used_swaps:
continue
unused_ends = [end for end in ends if end not in used_swaps]
route = _get_shortest_route(start, unused_ends, swaps)
if route is not None:
for route in routes:
start_amount = route[0].token_in_amount
end_amount = route[-1].token_out_amount
profit_amount = end_amount - start_amount
error = None
for swap in route:
if swap.error is not None:
error = swap.error
arb = Arbitrage(
swaps=route,
@ -75,12 +66,8 @@ def _get_arbitrages_from_swaps(swaps: List[Swap]) -> List[Arbitrage]:
start_amount=start_amount,
end_amount=end_amount,
profit_amount=profit_amount,
error=error,
)
all_arbitrages.append(arb)
used_swaps.extend(route)
if len(all_arbitrages) == 1:
return all_arbitrages
else:
@ -91,103 +78,57 @@ def _get_arbitrages_from_swaps(swaps: List[Swap]) -> List[Arbitrage]:
]
def _get_shortest_route(
start_swap: Swap,
end_swaps: List[Swap],
all_swaps: List[Swap],
max_route_length: Optional[int] = None,
) -> Optional[List[Swap]]:
if len(end_swaps) == 0:
return None
if max_route_length is not None and max_route_length < 2:
return None
for end_swap in end_swaps:
if _swap_outs_match_swap_ins(start_swap, end_swap):
return [start_swap, end_swap]
if max_route_length is not None and max_route_length == 2:
return None
other_swaps = [
swap for swap in all_swaps if (swap is not start_swap and swap not in end_swaps)
]
if len(other_swaps) == 0:
return None
shortest_remaining_route = None
max_remaining_route_length = (
None if max_route_length is None else max_route_length - 1
)
for next_swap in other_swaps:
if _swap_outs_match_swap_ins(start_swap, next_swap):
shortest_from_next = _get_shortest_route(
next_swap,
end_swaps,
other_swaps,
max_route_length=max_remaining_route_length,
)
if shortest_from_next is not None and (
shortest_remaining_route is None
or len(shortest_from_next) < len(shortest_remaining_route)
):
shortest_remaining_route = shortest_from_next
max_remaining_route_length = len(shortest_from_next) - 1
if shortest_remaining_route is None:
return None
else:
return [start_swap] + shortest_remaining_route
def _get_all_start_end_swaps(swaps: List[Swap]) -> List[Tuple[Swap, List[Swap]]]:
def _get_all_start_end_swaps(swaps: List[Swap]) -> List[Tuple[Swap, Swap]]:
"""
Gets the set of all possible openings and corresponding closing swaps for an arbitrage via
Gets the set of all possible opening and closing swap pairs in an arbitrage via
- swap[start].token_in == swap[end].token_out
- swap[start].from_address == swap[end].to_address
- not swap[start].from_address in all_pool_addresses
- not swap[end].to_address in all_pool_addresses
"""
pool_addrs = [swap.contract_address for swap in swaps]
valid_start_ends: List[Tuple[Swap, List[Swap]]] = []
valid_start_ends: List[Tuple[Swap, Swap]] = []
for index, potential_start_swap in enumerate(swaps):
ends_for_start: List[Swap] = []
remaining_swaps = swaps[:index] + swaps[index + 1 :]
for potential_end_swap in remaining_swaps:
if (
potential_start_swap.token_in_address
== potential_end_swap.token_out_address
and potential_start_swap.contract_address
!= potential_end_swap.contract_address
and potential_start_swap.from_address == potential_end_swap.to_address
and not potential_start_swap.from_address in pool_addrs
):
ends_for_start.append(potential_end_swap)
if len(ends_for_start) > 0:
valid_start_ends.append((potential_start_swap, ends_for_start))
valid_start_ends.append((potential_start_swap, potential_end_swap))
return valid_start_ends
def _swap_outs_match_swap_ins(swap_out, swap_in) -> bool:
return (
swap_out.token_out_address == swap_in.token_in_address
and (
swap_out.contract_address == swap_in.from_address
or swap_out.to_address == swap_in.contract_address
or swap_out.to_address == swap_in.from_address
)
and equal_within_percent(
swap_out.token_out_amount,
swap_in.token_in_amount,
MAX_TOKEN_AMOUNT_PERCENT_DIFFERENCE,
)
)
def _get_all_routes(
start_swap: Swap, end_swap: Swap, other_swaps: List[Swap]
) -> List[List[Swap]]:
"""
Returns all routes (List[Swap]) from start to finish between a start_swap and an end_swap only accounting for token_address_in and token_address_out.
"""
# If the path is complete, return
if start_swap.token_out_address == end_swap.token_in_address:
return [[start_swap, end_swap]]
elif len(other_swaps) == 0:
return []
# Collect all potential next steps, check if valid, recursively find routes from next_step to end_swap
routes: List[List[Swap]] = []
for potential_next_swap in other_swaps:
if start_swap.token_out_address == potential_next_swap.token_in_address and (
start_swap.contract_address == potential_next_swap.from_address
or start_swap.to_address == potential_next_swap.contract_address
or start_swap.to_address == potential_next_swap.from_address
):
remaining_swaps = [
swap for swap in other_swaps if swap != potential_next_swap
]
next_swap_routes = _get_all_routes(
potential_next_swap, end_swap, remaining_swaps
)
if len(next_swap_routes) > 0:
for next_swap_route in next_swap_routes:
next_swap_route.insert(0, start_swap)
routes.append(next_swap_route)
return routes

View File

@ -11,6 +11,7 @@ from mev_inspect.schemas.receipts import Receipt
from mev_inspect.schemas.traces import Trace, TraceType
from mev_inspect.utils import hex_to_int
logger = logging.getLogger(__name__)
@ -24,17 +25,77 @@ async def get_latest_block_number(base_provider) -> int:
async def create_from_block_number(
base_provider,
w3: Web3,
block_number: int,
trace_db_session: Optional[orm.Session],
) -> Block:
block_timestamp, receipts, traces, base_fee_per_gas = await asyncio.gather(
_find_or_fetch_block_timestamp(w3, block_number, trace_db_session),
_find_or_fetch_block_receipts(w3, block_number, trace_db_session),
_find_or_fetch_block_traces(w3, block_number, trace_db_session),
_find_or_fetch_base_fee_per_gas(w3, block_number, trace_db_session),
block: Optional[Block] = None
if trace_db_session is not None:
block = _find_block(trace_db_session, block_number)
if block is None:
block = await _fetch_block(w3, base_provider, block_number)
return block
else:
return block
async def _fetch_block(w3, base_provider, block_number: int, retries: int = 0) -> Block:
block_json, receipts_json, traces_json, base_fee_per_gas = await asyncio.gather(
w3.eth.get_block(block_number),
base_provider.make_request("eth_getBlockReceipts", [block_number]),
base_provider.make_request("trace_block", [block_number]),
fetch_base_fee_per_gas(w3, block_number),
)
miner_address = await _find_or_fetch_miner_address(w3, block_number, traces)
try:
receipts: List[Receipt] = [
Receipt(**receipt) for receipt in receipts_json["result"]
]
traces = [Trace(**trace_json) for trace_json in traces_json["result"]]
except KeyError as e:
logger.warning(
f"Failed to create objects from block: {block_number}: {e}, retrying: {retries + 1} / 3"
)
if retries < 3:
await asyncio.sleep(5)
return await _fetch_block(w3, base_provider, block_number, retries)
else:
raise
return Block(
block_number=block_number,
block_timestamp=block_json["timestamp"],
miner=block_json["miner"],
base_fee_per_gas=base_fee_per_gas,
traces=traces,
receipts=receipts,
)
def _find_block(
trace_db_session: orm.Session,
block_number: int,
) -> Optional[Block]:
block_timestamp = _find_block_timestamp(trace_db_session, block_number)
traces = _find_traces(trace_db_session, block_number)
receipts = _find_receipts(trace_db_session, block_number)
base_fee_per_gas = _find_base_fee(trace_db_session, block_number)
if (
block_timestamp is None
or traces is None
or receipts is None
or base_fee_per_gas is None
):
return None
miner_address = _get_miner_address_from_traces(traces)
if miner_address is None:
return None
return Block(
block_number=block_number,
@ -46,75 +107,6 @@ async def create_from_block_number(
)
async def _find_or_fetch_block_timestamp(
w3,
block_number: int,
trace_db_session: Optional[orm.Session],
) -> int:
if trace_db_session is not None:
existing_block_timestamp = _find_block_timestamp(trace_db_session, block_number)
if existing_block_timestamp is not None:
return existing_block_timestamp
return await _fetch_block_timestamp(w3, block_number)
async def _find_or_fetch_block_receipts(
w3,
block_number: int,
trace_db_session: Optional[orm.Session],
) -> List[Receipt]:
if trace_db_session is not None:
existing_block_receipts = _find_block_receipts(trace_db_session, block_number)
if existing_block_receipts is not None:
return existing_block_receipts
return await _fetch_block_receipts(w3, block_number)
async def _find_or_fetch_block_traces(
w3,
block_number: int,
trace_db_session: Optional[orm.Session],
) -> List[Trace]:
if trace_db_session is not None:
existing_block_traces = _find_block_traces(trace_db_session, block_number)
if existing_block_traces is not None:
return existing_block_traces
return await _fetch_block_traces(w3, block_number)
async def _find_or_fetch_base_fee_per_gas(
w3,
block_number: int,
trace_db_session: Optional[orm.Session],
) -> int:
if trace_db_session is not None:
existing_base_fee_per_gas = _find_base_fee_per_gas(
trace_db_session, block_number
)
if existing_base_fee_per_gas is not None:
return existing_base_fee_per_gas
return await fetch_base_fee_per_gas(w3, block_number)
async def _fetch_block_timestamp(w3, block_number: int) -> int:
block_json = await w3.eth.get_block(block_number)
return block_json["timestamp"]
async def _fetch_block_receipts(w3, block_number: int) -> List[Receipt]:
receipts_json = await w3.eth.get_block_receipts(block_number)
return [Receipt(**receipt) for receipt in receipts_json]
async def _fetch_block_traces(w3, block_number: int) -> List[Trace]:
traces_json = await w3.eth.trace_block(block_number)
return [Trace(**trace_json) for trace_json in traces_json]
def _find_block_timestamp(
trace_db_session: orm.Session,
block_number: int,
@ -131,7 +123,7 @@ def _find_block_timestamp(
return block_timestamp
def _find_block_traces(
def _find_traces(
trace_db_session: orm.Session,
block_number: int,
) -> Optional[List[Trace]]:
@ -147,7 +139,7 @@ def _find_block_traces(
return [Trace(**trace_json) for trace_json in traces_json]
def _find_block_receipts(
def _find_receipts(
trace_db_session: orm.Session,
block_number: int,
) -> Optional[List[Receipt]]:
@ -163,7 +155,7 @@ def _find_block_receipts(
return [Receipt(**receipt) for receipt in receipts_json]
def _find_base_fee_per_gas(
def _find_base_fee(
trace_db_session: orm.Session,
block_number: int,
) -> Optional[int]:
@ -179,27 +171,11 @@ def _find_base_fee_per_gas(
return base_fee
async def _find_or_fetch_miner_address(
w3,
block_number: int,
traces: List[Trace],
) -> Optional[str]:
# eth1 blocks
miner_address = _get_miner_address_from_traces(traces)
if miner_address is not None:
return miner_address
return await _fetch_miner_eth2(w3, block_number)
async def _fetch_miner_eth2(w3, block_number: int) -> Optional[str]:
block_json = await w3.eth.get_block(block_number)
return block_json["miner"]
def _get_miner_address_from_traces(traces: List[Trace]) -> Optional[str]:
for trace in traces:
if trace.type == TraceType.reward:
return trace.action["author"]
return None

View File

@ -1,65 +1,9 @@
from typing import List, Optional, Sequence
from typing import Optional, List, Sequence
from mev_inspect.schemas.nft_trades import NftTrade
from mev_inspect.schemas.prices import ETH_TOKEN_ADDRESS
from mev_inspect.schemas.swaps import Swap
from mev_inspect.schemas.traces import ClassifiedTrace, DecodedCallTrace
from mev_inspect.schemas.transfers import Transfer
from mev_inspect.schemas.transfers import Transfer, ETH_TOKEN_ADDRESS
def create_nft_trade_from_transfers(
trace: DecodedCallTrace,
child_transfers: List[Transfer],
collection_address: str,
seller_address: str,
buyer_address: str,
exchange_wallet_address: str,
) -> Optional[NftTrade]:
transfers_to_buyer = _filter_transfers(child_transfers, to_address=buyer_address)
transfers_to_seller = _filter_transfers(child_transfers, to_address=seller_address)
if len(transfers_to_buyer) != 1 or len(transfers_to_seller) != 1:
return None
if transfers_to_buyer[0].token_address != collection_address:
return None
payment_token_address = transfers_to_seller[0].token_address
payment_amount = transfers_to_seller[0].amount
token_id = transfers_to_buyer[0].amount
transfers_from_seller_to_exchange = _filter_transfers(
child_transfers,
from_address=seller_address,
to_address=exchange_wallet_address,
)
transfers_from_buyer_to_exchange = _filter_transfers(
child_transfers,
from_address=buyer_address,
to_address=exchange_wallet_address,
)
for fee in [
*transfers_from_seller_to_exchange,
*transfers_from_buyer_to_exchange,
]:
# Assumes that exchange fees are paid with the same token as the sale
payment_amount -= fee.amount
return NftTrade(
abi_name=trace.abi_name,
transaction_hash=trace.transaction_hash,
transaction_position=trace.transaction_position,
block_number=trace.block_number,
trace_address=trace.trace_address,
protocol=trace.protocol,
error=trace.error,
seller_address=seller_address,
buyer_address=buyer_address,
payment_token_address=payment_token_address,
payment_amount=payment_amount,
collection_address=collection_address,
token_id=token_id,
)
from mev_inspect.schemas.traces import DecodedCallTrace, ClassifiedTrace
def create_swap_from_pool_transfers(
@ -94,13 +38,9 @@ def create_swap_from_pool_transfers(
transfer_in = transfers_to_pool[-1]
transfer_out = transfers_from_pool_to_recipient[0]
if transfer_in.token_address == transfer_out.token_address:
return None
return Swap(
abi_name=trace.abi_name,
transaction_hash=trace.transaction_hash,
transaction_position=trace.transaction_position,
block_number=trace.block_number,
trace_address=trace.trace_address,
contract_address=pool_address,
@ -138,7 +78,6 @@ def create_swap_from_recipient_transfers(
return Swap(
abi_name=trace.abi_name,
transaction_hash=trace.transaction_hash,
transaction_position=trace.transaction_position,
block_number=trace.block_number,
trace_address=trace.trace_address,
contract_address=pool_address,
@ -182,27 +121,3 @@ def _filter_transfers(
filtered_transfers.append(transfer)
return filtered_transfers
def get_received_transfer(
liquidator: str, child_transfers: List[Transfer]
) -> Optional[Transfer]:
"""Get transfer from AAVE to liquidator"""
for transfer in child_transfers:
if transfer.to_address == liquidator:
return transfer
return None
def get_debt_transfer(
liquidator: str, child_transfers: List[Transfer]
) -> Optional[Transfer]:
"""Get transfer from liquidator to AAVE"""
for transfer in child_transfers:
if transfer.from_address == liquidator:
return transfer
return None

View File

@ -1,20 +1,18 @@
from typing import Dict, Optional, Tuple, Type
from mev_inspect.schemas.classifiers import Classifier, ClassifierSpec
from mev_inspect.schemas.traces import DecodedCallTrace, Protocol
from mev_inspect.schemas.classifiers import ClassifierSpec, Classifier
from .aave import AAVE_CLASSIFIER_SPECS
from .balancer import BALANCER_CLASSIFIER_SPECS
from .bancor import BANCOR_CLASSIFIER_SPECS
from .compound import COMPOUND_CLASSIFIER_SPECS
from .cream import CREAM_CLASSIFIER_SPECS
from .cryptopunks import CRYPTOPUNKS_CLASSIFIER_SPECS
from .curve import CURVE_CLASSIFIER_SPECS
from .erc20 import ERC20_CLASSIFIER_SPECS
from .opensea import OPENSEA_CLASSIFIER_SPECS
from .uniswap import UNISWAP_CLASSIFIER_SPECS
from .weth import WETH_CLASSIFIER_SPECS
from .weth import WETH_CLASSIFIER_SPECS, WETH_ADDRESS
from .zero_ex import ZEROX_CLASSIFIER_SPECS
from .balancer import BALANCER_CLASSIFIER_SPECS
from .compound import COMPOUND_CLASSIFIER_SPECS
from .cryptopunks import CRYPTOPUNKS_CLASSIFIER_SPECS
from .bancor import BANCOR_CLASSIFIER_SPECS
ALL_CLASSIFIER_SPECS = (
ERC20_CLASSIFIER_SPECS
@ -25,9 +23,7 @@ ALL_CLASSIFIER_SPECS = (
+ ZEROX_CLASSIFIER_SPECS
+ BALANCER_CLASSIFIER_SPECS
+ COMPOUND_CLASSIFIER_SPECS
+ CREAM_CLASSIFIER_SPECS
+ CRYPTOPUNKS_CLASSIFIER_SPECS
+ OPENSEA_CLASSIFIER_SPECS
+ BANCOR_CLASSIFIER_SPECS
)

View File

@ -1,65 +1,13 @@
from typing import List, Optional
from mev_inspect.classifiers.helpers import get_debt_transfer, get_received_transfer
from mev_inspect.schemas.classifiers import (
ClassifiedTrace,
ClassifierSpec,
DecodedCallTrace,
LiquidationClassifier,
TransferClassifier,
LiquidationClassifier,
)
from mev_inspect.schemas.liquidations import Liquidation
from mev_inspect.schemas.traces import Protocol
from mev_inspect.schemas.transfers import Transfer
class AaveLiquidationClassifier(LiquidationClassifier):
@staticmethod
def parse_liquidation(
liquidation_trace: DecodedCallTrace,
child_transfers: List[Transfer],
child_traces: List[ClassifiedTrace],
) -> Optional[Liquidation]:
liquidator = liquidation_trace.from_address
liquidated = liquidation_trace.inputs["_user"]
debt_token_address = liquidation_trace.inputs["_reserve"]
received_token_address = liquidation_trace.inputs["_collateral"]
debt_purchase_amount = None
received_amount = None
debt_transfer = get_debt_transfer(liquidator, child_transfers)
received_transfer = get_received_transfer(liquidator, child_transfers)
if debt_transfer is not None and received_transfer is not None:
debt_token_address = debt_transfer.token_address
debt_purchase_amount = debt_transfer.amount
received_token_address = received_transfer.token_address
received_amount = received_transfer.amount
return Liquidation(
liquidated_user=liquidated,
debt_token_address=debt_token_address,
liquidator_user=liquidator,
debt_purchase_amount=debt_purchase_amount,
protocol=Protocol.aave,
received_amount=received_amount,
received_token_address=received_token_address,
transaction_hash=liquidation_trace.transaction_hash,
trace_address=liquidation_trace.trace_address,
block_number=liquidation_trace.block_number,
error=liquidation_trace.error,
)
else:
return None
class AaveTransferClassifier(TransferClassifier):
@staticmethod
def get_transfer(trace: DecodedCallTrace) -> Transfer:
@ -78,7 +26,7 @@ AAVE_SPEC = ClassifierSpec(
abi_name="AaveLendingPool",
protocol=Protocol.aave,
classifiers={
"liquidationCall(address,address,address,uint256,bool)": AaveLiquidationClassifier,
"liquidationCall(address,address,address,uint256,bool)": LiquidationClassifier,
},
)
@ -87,7 +35,8 @@ ATOKENS_SPEC = ClassifierSpec(
protocol=Protocol.aave,
classifiers={
"transferOnLiquidation(address,address,uint256)": AaveTransferClassifier,
"transferFrom(address,address,uint256)": AaveTransferClassifier,
},
)
AAVE_CLASSIFIER_SPECS: List[ClassifierSpec] = [AAVE_SPEC, ATOKENS_SPEC]
AAVE_CLASSIFIER_SPECS = [AAVE_SPEC, ATOKENS_SPEC]

View File

@ -1,10 +1,15 @@
from typing import List, Optional
from mev_inspect.classifiers.helpers import create_swap_from_pool_transfers
from mev_inspect.schemas.classifiers import ClassifierSpec, SwapClassifier
from mev_inspect.schemas.swaps import Swap
from mev_inspect.schemas.traces import DecodedCallTrace, Protocol
from typing import Optional, List
from mev_inspect.schemas.transfers import Transfer
from mev_inspect.schemas.swaps import Swap
from mev_inspect.schemas.traces import (
DecodedCallTrace,
Protocol,
)
from mev_inspect.schemas.classifiers import (
ClassifierSpec,
SwapClassifier,
)
from mev_inspect.classifiers.helpers import create_swap_from_pool_transfers
BALANCER_V1_POOL_ABI_NAME = "BPool"

View File

@ -1,10 +1,17 @@
from typing import List, Optional
from mev_inspect.classifiers.helpers import create_swap_from_recipient_transfers
from mev_inspect.schemas.classifiers import ClassifierSpec, SwapClassifier
from mev_inspect.schemas.swaps import Swap
from mev_inspect.schemas.traces import DecodedCallTrace, Protocol
from typing import Optional, List
from mev_inspect.schemas.transfers import Transfer
from mev_inspect.schemas.swaps import Swap
from mev_inspect.schemas.traces import (
DecodedCallTrace,
Protocol,
)
from mev_inspect.schemas.classifiers import (
ClassifierSpec,
SwapClassifier,
)
from mev_inspect.classifiers.helpers import (
create_swap_from_recipient_transfers,
)
BANCOR_NETWORK_ABI_NAME = "BancorNetwork"
BANCOR_NETWORK_CONTRACT_ADDRESS = "0x2F9EC37d6CcFFf1caB21733BdaDEdE11c823cCB0"

View File

@ -1,86 +1,28 @@
from typing import List, Optional
from mev_inspect.classifiers.helpers import get_debt_transfer, get_received_transfer
from mev_inspect.schemas.traces import (
Protocol,
)
from mev_inspect.schemas.classifiers import (
Classification,
ClassifiedTrace,
ClassifierSpec,
DecodedCallTrace,
LiquidationClassifier,
SeizeClassifier,
)
from mev_inspect.schemas.liquidations import Liquidation
from mev_inspect.schemas.prices import CETH_TOKEN_ADDRESS, ETH_TOKEN_ADDRESS
from mev_inspect.schemas.traces import Protocol
from mev_inspect.schemas.transfers import Transfer
class CompoundLiquidationClassifier(LiquidationClassifier):
@staticmethod
def parse_liquidation(
liquidation_trace: DecodedCallTrace,
child_transfers: List[Transfer],
child_traces: List[ClassifiedTrace],
) -> Optional[Liquidation]:
liquidator = liquidation_trace.from_address
liquidated = liquidation_trace.inputs["borrower"]
debt_token_address = liquidation_trace.to_address
received_token_address = liquidation_trace.inputs["cTokenCollateral"]
debt_purchase_amount = None
received_amount = None
debt_purchase_amount, debt_token_address = (
(liquidation_trace.value, ETH_TOKEN_ADDRESS)
if debt_token_address == CETH_TOKEN_ADDRESS and liquidation_trace.value != 0
else (liquidation_trace.inputs["repayAmount"], CETH_TOKEN_ADDRESS)
)
debt_transfer = get_debt_transfer(liquidator, child_transfers)
received_transfer = get_received_transfer(liquidator, child_transfers)
seize_trace = _get_seize_call(child_traces)
if debt_transfer is not None:
debt_token_address = debt_transfer.token_address
debt_purchase_amount = debt_transfer.amount
if received_transfer is not None:
received_token_address = received_transfer.token_address
received_amount = received_transfer.amount
elif seize_trace is not None and seize_trace.inputs is not None:
received_amount = seize_trace.inputs["seizeTokens"]
if received_amount is None:
return None
return Liquidation(
liquidated_user=liquidated,
debt_token_address=debt_token_address,
liquidator_user=liquidator,
debt_purchase_amount=debt_purchase_amount,
protocol=liquidation_trace.protocol,
received_amount=received_amount,
received_token_address=received_token_address,
transaction_hash=liquidation_trace.transaction_hash,
trace_address=liquidation_trace.trace_address,
block_number=liquidation_trace.block_number,
error=liquidation_trace.error,
)
return None
COMPOUND_V2_CETH_SPEC = ClassifierSpec(
abi_name="CEther",
protocol=Protocol.compound_v2,
valid_contract_addresses=["0x4ddc2d193948926d02f9b1fe9e1daa0718270ed5"],
classifiers={
"liquidateBorrow(address,address)": CompoundLiquidationClassifier,
"liquidateBorrow(address,address)": LiquidationClassifier,
"seize(address,address,uint256)": SeizeClassifier,
},
)
CREAM_CETH_SPEC = ClassifierSpec(
abi_name="CEther",
protocol=Protocol.cream,
valid_contract_addresses=["0xD06527D5e56A3495252A528C4987003b712860eE"],
classifiers={
"liquidateBorrow(address,address)": LiquidationClassifier,
"seize(address,address,uint256)": SeizeClassifier,
},
)
@ -108,20 +50,116 @@ COMPOUND_V2_CTOKEN_SPEC = ClassifierSpec(
"0x80a2ae356fc9ef4305676f7a3e2ed04e12c33946",
],
classifiers={
"liquidateBorrow(address,uint256,address)": CompoundLiquidationClassifier,
"liquidateBorrow(address,uint256,address)": LiquidationClassifier,
"seize(address,address,uint256)": SeizeClassifier,
},
)
COMPOUND_CLASSIFIER_SPECS: List[ClassifierSpec] = [
CREAM_CTOKEN_SPEC = ClassifierSpec(
abi_name="CToken",
protocol=Protocol.cream,
valid_contract_addresses=[
"0xd06527d5e56a3495252a528c4987003b712860ee",
"0x51f48b638f82e8765f7a26373a2cb4ccb10c07af",
"0x44fbebd2f576670a6c33f6fc0b00aa8c5753b322",
"0xcbae0a83f4f9926997c8339545fb8ee32edc6b76",
"0xce4fe9b4b8ff61949dcfeb7e03bc9faca59d2eb3",
"0x19d1666f543d42ef17f66e376944a22aea1a8e46",
"0x9baf8a5236d44ac410c0186fe39178d5aad0bb87",
"0x797aab1ce7c01eb727ab980762ba88e7133d2157",
"0x892b14321a4fcba80669ae30bd0cd99a7ecf6ac0",
"0x697256caa3ccafd62bb6d3aa1c7c5671786a5fd9",
"0x8b86e0598616a8d4f1fdae8b59e55fb5bc33d0d6",
"0xc7fd8dcee4697ceef5a2fd4608a7bd6a94c77480",
"0x17107f40d70f4470d20cb3f138a052cae8ebd4be",
"0x1ff8cdb51219a8838b52e9cac09b71e591bc998e",
"0x3623387773010d9214b10c551d6e7fc375d31f58",
"0x4ee15f44c6f0d8d1136c83efd2e8e4ac768954c6",
"0x338286c0bc081891a4bda39c7667ae150bf5d206",
"0x10fdbd1e48ee2fd9336a482d746138ae19e649db",
"0x01da76dea59703578040012357b81ffe62015c2d",
"0xef58b2d5a1b8d3cde67b8ab054dc5c831e9bc025",
"0xe89a6d0509faf730bd707bf868d9a2a744a363c7",
"0xeff039c3c1d668f408d09dd7b63008622a77532c",
"0x22b243b96495c547598d9042b6f94b01c22b2e9e",
"0x8b3ff1ed4f36c2c2be675afb13cc3aa5d73685a5",
"0x2a537fa9ffaea8c1a41d3c2b68a9cb791529366d",
"0x7ea9c63e216d5565c3940a2b3d150e59c2907db3",
"0x3225e3c669b39c7c8b3e204a8614bb218c5e31bc",
"0xf55bbe0255f7f4e70f63837ff72a577fbddbe924",
"0x903560b1cce601794c584f58898da8a8b789fc5d",
"0x054b7ed3f45714d3091e82aad64a1588dc4096ed",
"0xd5103afcd0b3fa865997ef2984c66742c51b2a8b",
"0xfd609a03b393f1a1cfcacedabf068cad09a924e2",
"0xd692ac3245bb82319a31068d6b8412796ee85d2c",
"0x92b767185fb3b04f881e3ac8e5b0662a027a1d9f",
"0x10a3da2bb0fae4d591476fd97d6636fd172923a8",
"0x3c6c553a95910f9fc81c98784736bd628636d296",
"0x21011bc93d9e515b9511a817a1ed1d6d468f49fc",
"0x85759961b116f1d36fd697855c57a6ae40793d9b",
"0x7c3297cfb4c4bbd5f44b450c0872e0ada5203112",
"0x7aaa323d7e398be4128c7042d197a2545f0f1fea",
"0x011a014d5e8eb4771e575bb1000318d509230afa",
"0xe6c3120f38f56deb38b69b65cc7dcaf916373963",
"0x4fe11bc316b6d7a345493127fbe298b95adaad85",
"0xcd22c4110c12ac41acefa0091c432ef44efaafa0",
"0x228619cca194fbe3ebeb2f835ec1ea5080dafbb2",
"0x73f6cba38922960b7092175c0add22ab8d0e81fc",
"0x38f27c03d6609a86ff7716ad03038881320be4ad",
"0x5ecad8a75216cea7dff978525b2d523a251eea92",
"0x5c291bc83d15f71fb37805878161718ea4b6aee9",
"0x6ba0c66c48641e220cf78177c144323b3838d375",
"0xd532944df6dfd5dd629e8772f03d4fc861873abf",
"0x197070723ce0d3810a0e47f06e935c30a480d4fc",
"0xc25eae724f189ba9030b2556a1533e7c8a732e14",
"0x25555933a8246ab67cbf907ce3d1949884e82b55",
"0xc68251421edda00a10815e273fa4b1191fac651b",
"0x65883978ada0e707c3b2be2a6825b1c4bdf76a90",
"0x8b950f43fcac4931d408f1fcda55c6cb6cbf3096",
"0x59089279987dd76fc65bf94cb40e186b96e03cb3",
"0x2db6c82ce72c8d7d770ba1b5f5ed0b6e075066d6",
"0xb092b4601850e23903a42eacbc9d8a0eec26a4d5",
"0x081fe64df6dc6fc70043aedf3713a3ce6f190a21",
"0x1d0986fb43985c88ffa9ad959cc24e6a087c7e35",
"0xc36080892c64821fa8e396bc1bd8678fa3b82b17",
"0x8379baa817c5c5ab929b03ee8e3c48e45018ae41",
"0x299e254a8a165bbeb76d9d69305013329eea3a3b",
"0xf8445c529d363ce114148662387eba5e62016e20",
"0x28526bb33d7230e65e735db64296413731c5402e",
"0x45406ba53bb84cd32a58e7098a2d4d1b11b107f6",
"0x6d1b9e01af17dd08d6dec08e210dfd5984ff1c20",
"0x1f9b4756b008106c806c7e64322d7ed3b72cb284",
"0xab10586c918612ba440482db77549d26b7abf8f7",
"0xdfff11dfe6436e42a17b86e7f419ac8292990393",
"0xdbb5e3081def4b6cdd8864ac2aeda4cbf778fecf",
"0x71cefcd324b732d4e058afacba040d908c441847",
"0x1a122348b73b58ea39f822a89e6ec67950c2bbd0",
"0x523effc8bfefc2948211a05a905f761cba5e8e9e",
"0x4202d97e00b9189936edf37f8d01cff88bdd81d4",
"0x4baa77013ccd6705ab0522853cb0e9d453579dd4",
"0x98e329eb5aae2125af273102f3440de19094b77c",
"0x8c3b7a4320ba70f8239f83770c4015b5bc4e6f91",
"0xe585c76573d7593abf21537b607091f76c996e73",
"0x81e346729723c4d15d0fb1c5679b9f2926ff13c6",
"0x766175eac1a99c969ddd1ebdbe7e270d508d8fff",
"0xd7394428536f63d5659cc869ef69d10f9e66314b",
"0x1241b10e7ea55b22f5b2d007e8fecdf73dcff999",
"0x2a867fd776b83e1bd4e13c6611afd2f6af07ea6d",
"0x250fb308199fe8c5220509c1bf83d21d60b7f74a",
"0x4112a717edd051f77d834a6703a1ef5e3d73387f",
"0xf04ce2e71d32d789a259428ddcd02d3c9f97fb4e",
"0x89e42987c39f72e2ead95a8a5bc92114323d5828",
"0x58da9c9fc3eb30abbcbbab5ddabb1e6e2ef3d2ef",
],
classifiers={
"liquidateBorrow(address,uint256,address)": LiquidationClassifier,
"seize(address,address,uint256)": SeizeClassifier,
},
)
COMPOUND_CLASSIFIER_SPECS = [
COMPOUND_V2_CETH_SPEC,
COMPOUND_V2_CTOKEN_SPEC,
CREAM_CETH_SPEC,
CREAM_CTOKEN_SPEC,
]
def _get_seize_call(traces: List[ClassifiedTrace]) -> Optional[ClassifiedTrace]:
"""Find the call to `seize` in the child traces (successful liquidation)"""
for trace in traces:
if trace.classification == Classification.seize:
return trace
return None

View File

@ -1,204 +0,0 @@
from typing import List, Optional
from mev_inspect.classifiers.helpers import get_debt_transfer, get_received_transfer
from mev_inspect.schemas.classifiers import (
Classification,
ClassifiedTrace,
ClassifierSpec,
DecodedCallTrace,
LiquidationClassifier,
SeizeClassifier,
)
from mev_inspect.schemas.liquidations import Liquidation
from mev_inspect.schemas.prices import ETH_TOKEN_ADDRESS
from mev_inspect.schemas.traces import Protocol
from mev_inspect.schemas.transfers import Transfer
CRETH_TOKEN_ADDRESS = "0xd06527d5e56a3495252a528c4987003b712860ee"
class CreamLiquidationClassifier(LiquidationClassifier):
@staticmethod
def parse_liquidation(
liquidation_trace: DecodedCallTrace,
child_transfers: List[Transfer],
child_traces: List[ClassifiedTrace],
) -> Optional[Liquidation]:
liquidator = liquidation_trace.from_address
liquidated = liquidation_trace.inputs["borrower"]
debt_token_address = liquidation_trace.to_address
received_token_address = liquidation_trace.inputs["cTokenCollateral"]
debt_purchase_amount = None
received_amount = None
debt_purchase_amount, debt_token_address = (
(liquidation_trace.value, ETH_TOKEN_ADDRESS)
if debt_token_address == CRETH_TOKEN_ADDRESS
and liquidation_trace.value != 0
else (liquidation_trace.inputs["repayAmount"], CRETH_TOKEN_ADDRESS)
)
debt_transfer = get_debt_transfer(liquidator, child_transfers)
received_transfer = get_received_transfer(liquidator, child_transfers)
seize_trace = _get_seize_call(child_traces)
if debt_transfer is not None:
debt_token_address = debt_transfer.token_address
debt_purchase_amount = debt_transfer.amount
if received_transfer is not None:
received_token_address = received_transfer.token_address
received_amount = received_transfer.amount
elif seize_trace is not None and seize_trace.inputs is not None:
received_amount = seize_trace.inputs["seizeTokens"]
if received_amount is None:
return None
return Liquidation(
liquidated_user=liquidated,
debt_token_address=debt_token_address,
liquidator_user=liquidator,
debt_purchase_amount=debt_purchase_amount,
protocol=liquidation_trace.protocol,
received_amount=received_amount,
received_token_address=received_token_address,
transaction_hash=liquidation_trace.transaction_hash,
trace_address=liquidation_trace.trace_address,
block_number=liquidation_trace.block_number,
error=liquidation_trace.error,
)
return None
CREAM_CRETH_SPEC = ClassifierSpec(
abi_name="CEther",
protocol=Protocol.cream,
valid_contract_addresses=["0xD06527D5e56A3495252A528C4987003b712860eE"],
classifiers={
"liquidateBorrow(address,address)": CreamLiquidationClassifier,
"seize(address,address,uint256)": SeizeClassifier,
},
)
CREAM_CTOKEN_SPEC = ClassifierSpec(
abi_name="CToken",
protocol=Protocol.cream,
valid_contract_addresses=[
"0xd06527d5e56a3495252a528c4987003b712860ee",
"0x51f48b638f82e8765f7a26373a2cb4ccb10c07af",
"0x44fbebd2f576670a6c33f6fc0b00aa8c5753b322",
"0xcbae0a83f4f9926997c8339545fb8ee32edc6b76",
"0xce4fe9b4b8ff61949dcfeb7e03bc9faca59d2eb3",
"0x19d1666f543d42ef17f66e376944a22aea1a8e46",
"0x9baf8a5236d44ac410c0186fe39178d5aad0bb87",
"0x797aab1ce7c01eb727ab980762ba88e7133d2157",
"0x892b14321a4fcba80669ae30bd0cd99a7ecf6ac0",
"0x697256caa3ccafd62bb6d3aa1c7c5671786a5fd9",
"0x8b86e0598616a8d4f1fdae8b59e55fb5bc33d0d6",
"0xc7fd8dcee4697ceef5a2fd4608a7bd6a94c77480",
"0x17107f40d70f4470d20cb3f138a052cae8ebd4be",
"0x1ff8cdb51219a8838b52e9cac09b71e591bc998e",
"0x3623387773010d9214b10c551d6e7fc375d31f58",
"0x4ee15f44c6f0d8d1136c83efd2e8e4ac768954c6",
"0x338286c0bc081891a4bda39c7667ae150bf5d206",
"0x10fdbd1e48ee2fd9336a482d746138ae19e649db",
"0x01da76dea59703578040012357b81ffe62015c2d",
"0xef58b2d5a1b8d3cde67b8ab054dc5c831e9bc025",
"0xe89a6d0509faf730bd707bf868d9a2a744a363c7",
"0xeff039c3c1d668f408d09dd7b63008622a77532c",
"0x22b243b96495c547598d9042b6f94b01c22b2e9e",
"0x8b3ff1ed4f36c2c2be675afb13cc3aa5d73685a5",
"0x2a537fa9ffaea8c1a41d3c2b68a9cb791529366d",
"0x7ea9c63e216d5565c3940a2b3d150e59c2907db3",
"0x3225e3c669b39c7c8b3e204a8614bb218c5e31bc",
"0xf55bbe0255f7f4e70f63837ff72a577fbddbe924",
"0x903560b1cce601794c584f58898da8a8b789fc5d",
"0x054b7ed3f45714d3091e82aad64a1588dc4096ed",
"0xd5103afcd0b3fa865997ef2984c66742c51b2a8b",
"0xfd609a03b393f1a1cfcacedabf068cad09a924e2",
"0xd692ac3245bb82319a31068d6b8412796ee85d2c",
"0x92b767185fb3b04f881e3ac8e5b0662a027a1d9f",
"0x10a3da2bb0fae4d591476fd97d6636fd172923a8",
"0x3c6c553a95910f9fc81c98784736bd628636d296",
"0x21011bc93d9e515b9511a817a1ed1d6d468f49fc",
"0x85759961b116f1d36fd697855c57a6ae40793d9b",
"0x7c3297cfb4c4bbd5f44b450c0872e0ada5203112",
"0x7aaa323d7e398be4128c7042d197a2545f0f1fea",
"0x011a014d5e8eb4771e575bb1000318d509230afa",
"0xe6c3120f38f56deb38b69b65cc7dcaf916373963",
"0x4fe11bc316b6d7a345493127fbe298b95adaad85",
"0xcd22c4110c12ac41acefa0091c432ef44efaafa0",
"0x228619cca194fbe3ebeb2f835ec1ea5080dafbb2",
"0x73f6cba38922960b7092175c0add22ab8d0e81fc",
"0x38f27c03d6609a86ff7716ad03038881320be4ad",
"0x5ecad8a75216cea7dff978525b2d523a251eea92",
"0x5c291bc83d15f71fb37805878161718ea4b6aee9",
"0x6ba0c66c48641e220cf78177c144323b3838d375",
"0xd532944df6dfd5dd629e8772f03d4fc861873abf",
"0x197070723ce0d3810a0e47f06e935c30a480d4fc",
"0xc25eae724f189ba9030b2556a1533e7c8a732e14",
"0x25555933a8246ab67cbf907ce3d1949884e82b55",
"0xc68251421edda00a10815e273fa4b1191fac651b",
"0x65883978ada0e707c3b2be2a6825b1c4bdf76a90",
"0x8b950f43fcac4931d408f1fcda55c6cb6cbf3096",
"0x59089279987dd76fc65bf94cb40e186b96e03cb3",
"0x2db6c82ce72c8d7d770ba1b5f5ed0b6e075066d6",
"0xb092b4601850e23903a42eacbc9d8a0eec26a4d5",
"0x081fe64df6dc6fc70043aedf3713a3ce6f190a21",
"0x1d0986fb43985c88ffa9ad959cc24e6a087c7e35",
"0xc36080892c64821fa8e396bc1bd8678fa3b82b17",
"0x8379baa817c5c5ab929b03ee8e3c48e45018ae41",
"0x299e254a8a165bbeb76d9d69305013329eea3a3b",
"0xf8445c529d363ce114148662387eba5e62016e20",
"0x28526bb33d7230e65e735db64296413731c5402e",
"0x45406ba53bb84cd32a58e7098a2d4d1b11b107f6",
"0x6d1b9e01af17dd08d6dec08e210dfd5984ff1c20",
"0x1f9b4756b008106c806c7e64322d7ed3b72cb284",
"0xab10586c918612ba440482db77549d26b7abf8f7",
"0xdfff11dfe6436e42a17b86e7f419ac8292990393",
"0xdbb5e3081def4b6cdd8864ac2aeda4cbf778fecf",
"0x71cefcd324b732d4e058afacba040d908c441847",
"0x1a122348b73b58ea39f822a89e6ec67950c2bbd0",
"0x523effc8bfefc2948211a05a905f761cba5e8e9e",
"0x4202d97e00b9189936edf37f8d01cff88bdd81d4",
"0x4baa77013ccd6705ab0522853cb0e9d453579dd4",
"0x98e329eb5aae2125af273102f3440de19094b77c",
"0x8c3b7a4320ba70f8239f83770c4015b5bc4e6f91",
"0xe585c76573d7593abf21537b607091f76c996e73",
"0x81e346729723c4d15d0fb1c5679b9f2926ff13c6",
"0x766175eac1a99c969ddd1ebdbe7e270d508d8fff",
"0xd7394428536f63d5659cc869ef69d10f9e66314b",
"0x1241b10e7ea55b22f5b2d007e8fecdf73dcff999",
"0x2a867fd776b83e1bd4e13c6611afd2f6af07ea6d",
"0x250fb308199fe8c5220509c1bf83d21d60b7f74a",
"0x4112a717edd051f77d834a6703a1ef5e3d73387f",
"0xf04ce2e71d32d789a259428ddcd02d3c9f97fb4e",
"0x89e42987c39f72e2ead95a8a5bc92114323d5828",
"0x58da9c9fc3eb30abbcbbab5ddabb1e6e2ef3d2ef",
],
classifiers={
"liquidateBorrow(address,uint256,address)": CreamLiquidationClassifier,
"seize(address,address,uint256)": SeizeClassifier,
},
)
CREAM_CLASSIFIER_SPECS: List[ClassifierSpec] = [
CREAM_CRETH_SPEC,
CREAM_CTOKEN_SPEC,
]
def _get_seize_call(traces: List[ClassifiedTrace]) -> Optional[ClassifiedTrace]:
"""Find the call to `seize` in the child traces (successful liquidation)"""
for trace in traces:
if trace.classification == Classification.seize:
return trace
return None

View File

@ -1,5 +1,9 @@
from mev_inspect.schemas.classifiers import Classifier, ClassifierSpec
from mev_inspect.schemas.traces import Classification, Protocol
from mev_inspect.schemas.traces import Protocol, Classification
from mev_inspect.schemas.classifiers import (
ClassifierSpec,
Classifier,
)
class PunkBidAcceptanceClassifier(Classifier):

View File

@ -1,10 +1,16 @@
from typing import List, Optional
from mev_inspect.classifiers.helpers import create_swap_from_pool_transfers
from mev_inspect.schemas.classifiers import ClassifierSpec, SwapClassifier
from mev_inspect.schemas.swaps import Swap
from mev_inspect.schemas.traces import DecodedCallTrace, Protocol
from typing import Optional, List
from mev_inspect.schemas.transfers import Transfer
from mev_inspect.schemas.swaps import Swap
from mev_inspect.schemas.traces import (
Protocol,
DecodedCallTrace,
)
from mev_inspect.schemas.classifiers import (
ClassifierSpec,
SwapClassifier,
)
from mev_inspect.classifiers.helpers import create_swap_from_pool_transfers
class CurveSwapClassifier(SwapClassifier):

View File

@ -1,5 +1,8 @@
from mev_inspect.schemas.classifiers import ClassifierSpec, TransferClassifier
from mev_inspect.schemas.traces import DecodedCallTrace
from mev_inspect.schemas.classifiers import (
ClassifierSpec,
TransferClassifier,
)
from mev_inspect.schemas.transfers import Transfer

View File

@ -1,42 +0,0 @@
from typing import List, Optional
from mev_inspect.classifiers.helpers import create_nft_trade_from_transfers
from mev_inspect.schemas.classifiers import ClassifierSpec, NftTradeClassifier
from mev_inspect.schemas.nft_trades import NftTrade
from mev_inspect.schemas.traces import DecodedCallTrace, Protocol
from mev_inspect.schemas.transfers import Transfer
OPENSEA_WALLET_ADDRESS = "0x5b3256965e7c3cf26e11fcaf296dfc8807c01073"
class OpenseaClassifier(NftTradeClassifier):
@staticmethod
def parse_trade(
trace: DecodedCallTrace,
child_transfers: List[Transfer],
) -> Optional[NftTrade]:
addresses = trace.inputs["addrs"]
buy_maker = addresses[1]
sell_maker = addresses[8]
target = addresses[4]
return create_nft_trade_from_transfers(
trace,
child_transfers,
collection_address=target,
seller_address=sell_maker,
buyer_address=buy_maker,
exchange_wallet_address=OPENSEA_WALLET_ADDRESS,
)
OPENSEA_SPEC = ClassifierSpec(
abi_name="WyvernExchange",
protocol=Protocol.opensea,
valid_contract_addresses=["0x7be8076f4ea4a4ad08075c2508e481d6c946d12b"],
classifiers={
"atomicMatch_(address[14],uint256[18],uint8[8],bytes,bytes,bytes,bytes,bytes,bytes,uint8[2],bytes32[5])": OpenseaClassifier,
},
)
OPENSEA_CLASSIFIER_SPECS = [OPENSEA_SPEC]

View File

@ -1,10 +1,16 @@
from typing import List, Optional
from mev_inspect.classifiers.helpers import create_swap_from_pool_transfers
from mev_inspect.schemas.classifiers import ClassifierSpec, SwapClassifier
from mev_inspect.schemas.swaps import Swap
from mev_inspect.schemas.traces import DecodedCallTrace, Protocol
from typing import Optional, List
from mev_inspect.schemas.transfers import Transfer
from mev_inspect.schemas.swaps import Swap
from mev_inspect.schemas.traces import (
DecodedCallTrace,
Protocol,
)
from mev_inspect.schemas.classifiers import (
ClassifierSpec,
SwapClassifier,
)
from mev_inspect.classifiers.helpers import create_swap_from_pool_transfers
UNISWAP_V2_PAIR_ABI_NAME = "UniswapV2Pair"
UNISWAP_V3_POOL_ABI_NAME = "UniswapV3Pool"
@ -103,7 +109,6 @@ UNISWAP_V3_CONTRACT_SPECS = [
UNISWAP_V3_GENERAL_SPECS = [
ClassifierSpec(
abi_name=UNISWAP_V3_POOL_ABI_NAME,
protocol=Protocol.uniswap_v3,
classifiers={
"swap(address,bool,int256,uint160,bytes)": UniswapV3SwapClassifier,
},
@ -135,7 +140,6 @@ UNISWAPPY_V2_CONTRACT_SPECS = [
UNISWAPPY_V2_PAIR_SPEC = ClassifierSpec(
abi_name=UNISWAP_V2_PAIR_ABI_NAME,
protocol=Protocol.uniswap_v2,
classifiers={
"swap(uint256,uint256,address,bytes)": UniswapV2SwapClassifier,
},

View File

@ -1,10 +1,11 @@
from mev_inspect.schemas.traces import (
Protocol,
)
from mev_inspect.schemas.classifiers import (
ClassifierSpec,
DecodedCallTrace,
TransferClassifier,
)
from mev_inspect.schemas.prices import WETH_TOKEN_ADDRESS
from mev_inspect.schemas.traces import Protocol
from mev_inspect.schemas.transfers import Transfer
@ -22,10 +23,12 @@ class WethTransferClassifier(TransferClassifier):
)
WETH_ADDRESS = "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"
WETH_SPEC = ClassifierSpec(
abi_name="WETH9",
protocol=Protocol.weth,
valid_contract_addresses=[WETH_TOKEN_ADDRESS],
valid_contract_addresses=[WETH_ADDRESS],
classifiers={
"transferFrom(address,address,uint256)": WethTransferClassifier,
"transfer(address,uint256)": WethTransferClassifier,

View File

@ -1,9 +1,14 @@
from typing import List, Optional, Tuple
from mev_inspect.schemas.classifiers import ClassifierSpec, SwapClassifier
from mev_inspect.schemas.swaps import Swap
from mev_inspect.schemas.traces import DecodedCallTrace, Protocol
from typing import Optional, List, Tuple
from mev_inspect.schemas.transfers import Transfer
from mev_inspect.schemas.swaps import Swap
from mev_inspect.schemas.traces import (
DecodedCallTrace,
Protocol,
)
from mev_inspect.schemas.classifiers import (
ClassifierSpec,
SwapClassifier,
)
ANY_TAKER_ADDRESS = "0x0000000000000000000000000000000000000000"
@ -25,19 +30,16 @@ class ZeroExSwapClassifier(SwapClassifier):
prior_transfers: List[Transfer],
child_transfers: List[Transfer],
) -> Optional[Swap]:
if len(child_transfers) < 2:
return None
token_out_address, token_out_amount = _get_0x_token_out_data(
token_in_address, token_in_amount = _get_0x_token_in_data(
trace, child_transfers
)
token_in_address, token_in_amount = _get_0x_token_in_data(trace)
token_out_address, token_out_amount = _get_0x_token_out_data(trace)
return Swap(
abi_name=trace.abi_name,
transaction_hash=trace.transaction_hash,
transaction_position=trace.transaction_position,
block_number=trace.block_number,
trace_address=trace.trace_address,
contract_address=trace.to_address,
@ -222,38 +224,32 @@ ZEROX_GENERIC_SPECS = [
ZEROX_CLASSIFIER_SPECS = ZEROX_CONTRACT_SPECS + ZEROX_GENERIC_SPECS
def _get_taker_token_transfer_amount(
trace: DecodedCallTrace,
taker_address: str,
token_address: str,
child_transfers: List[Transfer],
def _get_taker_token_in_amount(
taker_address: str, token_in_address: str, child_transfers: List[Transfer]
) -> int:
if trace.error is not None:
return 0
if len(child_transfers) < 2:
if len(child_transfers) != 2:
raise ValueError(
f"A settled order should consist of 2 child transfers, not {len(child_transfers)}."
)
if taker_address == ANY_TAKER_ADDRESS:
for transfer in child_transfers:
if transfer.token_address == token_address:
if transfer.token_address == token_in_address:
return transfer.amount
else:
for transfer in child_transfers:
if transfer.to_address == taker_address:
return transfer.amount
raise RuntimeError("Unable to find transfers matching 0x order.")
return 0
def _get_0x_token_out_data(
def _get_0x_token_in_data(
trace: DecodedCallTrace, child_transfers: List[Transfer]
) -> Tuple[str, int]:
order: List = trace.inputs["order"]
token_out_address = order[0]
token_in_address = order[0]
if trace.function_signature in RFQ_SIGNATURES:
taker_address = order[5]
@ -266,16 +262,17 @@ def _get_0x_token_out_data(
f"0x orderbook function {trace.function_signature} is not supported"
)
token_out_amount = _get_taker_token_transfer_amount(
trace, taker_address, token_out_address, child_transfers
token_in_amount = _get_taker_token_in_amount(
taker_address, token_in_address, child_transfers
)
return token_out_address, token_out_amount
def _get_0x_token_in_data(trace: DecodedCallTrace) -> Tuple[str, int]:
order: List = trace.inputs["order"]
token_in_address = order[1]
token_in_amount = trace.inputs["takerTokenFillAmount"]
return token_in_address, token_in_amount
def _get_0x_token_out_data(trace: DecodedCallTrace) -> Tuple[str, int]:
order: List = trace.inputs["order"]
token_out_address = order[1]
token_out_amount = trace.inputs["takerTokenFillAmount"]
return token_out_address, token_out_amount

View File

@ -4,13 +4,12 @@ from mev_inspect.abi import get_abi
from mev_inspect.decode import ABIDecoder
from mev_inspect.schemas.blocks import CallAction, CallResult
from mev_inspect.schemas.traces import (
CallTrace,
Classification,
ClassifiedTrace,
CallTrace,
DecodedCallTrace,
Trace,
TraceType,
)
from mev_inspect.schemas.traces import Trace, TraceType
from .specs import ALL_CLASSIFIER_SPECS

40
mev_inspect/coinbase.py Normal file
View File

@ -0,0 +1,40 @@
import aiohttp
from mev_inspect.classifiers.specs.weth import WETH_ADDRESS
from mev_inspect.schemas.transfers import ETH_TOKEN_ADDRESS
from mev_inspect.schemas.coinbase import CoinbasePrices, CoinbasePricesResponse
from mev_inspect.schemas.prices import (
WBTC_TOKEN_ADDRESS,
LINK_TOKEN_ADDRESS,
YEARN_TOKEN_ADDRESS,
AAVE_TOKEN_ADDRESS,
UNI_TOKEN_ADDRESS,
USDC_TOKEN_ADDRESS_ADDRESS,
REN_TOKEN_ADDRESS,
)
COINBASE_API_BASE = "https://www.coinbase.com/api/v2"
COINBASE_TOKEN_NAME_BY_ADDRESS = {
WETH_ADDRESS: "weth",
ETH_TOKEN_ADDRESS: "ethereum",
WBTC_TOKEN_ADDRESS: "wrapped-bitcoin",
LINK_TOKEN_ADDRESS: "link",
YEARN_TOKEN_ADDRESS: "yearn-finance",
AAVE_TOKEN_ADDRESS: "aave",
UNI_TOKEN_ADDRESS: "uniswap",
USDC_TOKEN_ADDRESS_ADDRESS: "usdc",
REN_TOKEN_ADDRESS: "ren",
}
async def fetch_coinbase_prices(token_address: str) -> CoinbasePrices:
if token_address not in COINBASE_TOKEN_NAME_BY_ADDRESS:
raise ValueError(f"Unsupported token_address {token_address}")
coinbase_token_name = COINBASE_TOKEN_NAME_BY_ADDRESS[token_address]
url = f"{COINBASE_API_BASE}/assets/prices/{coinbase_token_name}"
async with aiohttp.ClientSession() as session:
async with session.get(url, params={"base": "USD"}) as response:
json_data = await response.json()
return CoinbasePricesResponse(**json_data).data.prices

View File

@ -0,0 +1,81 @@
from typing import List, Optional
from mev_inspect.traces import get_child_traces
from mev_inspect.schemas.traces import (
ClassifiedTrace,
Classification,
Protocol,
)
from mev_inspect.schemas.liquidations import Liquidation
V2_COMPTROLLER_ADDRESS = "0x3d9819210A31b4961b30EF54bE2aeD79B9c9Cd3B"
V2_C_ETHER = "0x4Ddc2D193948926D02f9B1fE9e1daa0718270ED5"
CREAM_COMPTROLLER_ADDRESS = "0x3d5BC3c8d13dcB8bF317092d84783c2697AE9258"
CREAM_CR_ETHER = "0xD06527D5e56A3495252A528C4987003b712860eE"
def get_compound_liquidations(
traces: List[ClassifiedTrace],
) -> List[Liquidation]:
"""Inspect list of classified traces and identify liquidation"""
liquidations: List[Liquidation] = []
for trace in traces:
if (
trace.classification == Classification.liquidate
and (
trace.protocol == Protocol.compound_v2
or trace.protocol == Protocol.cream
)
and trace.inputs is not None
and trace.to_address is not None
):
# First, we look for cEther liquidations (position paid back via tx.value)
child_traces = get_child_traces(
trace.transaction_hash, trace.trace_address, traces
)
seize_trace = _get_seize_call(child_traces)
if seize_trace is not None and seize_trace.inputs is not None:
c_token_collateral = trace.inputs["cTokenCollateral"]
if trace.abi_name == "CEther":
liquidations.append(
Liquidation(
liquidated_user=trace.inputs["borrower"],
debt_token_address=c_token_collateral,
liquidator_user=seize_trace.inputs["liquidator"],
debt_purchase_amount=trace.value,
protocol=trace.protocol,
received_amount=seize_trace.inputs["seizeTokens"],
transaction_hash=trace.transaction_hash,
trace_address=trace.trace_address,
block_number=trace.block_number,
)
)
elif (
trace.abi_name == "CToken"
): # cToken liquidations where liquidator pays back via token transfer
liquidations.append(
Liquidation(
liquidated_user=trace.inputs["borrower"],
debt_token_address=c_token_collateral,
liquidator_user=seize_trace.inputs["liquidator"],
debt_purchase_amount=trace.inputs["repayAmount"],
protocol=trace.protocol,
received_amount=seize_trace.inputs["seizeTokens"],
transaction_hash=trace.transaction_hash,
trace_address=trace.trace_address,
block_number=trace.block_number,
)
)
return liquidations
def _get_seize_call(traces: List[ClassifiedTrace]) -> Optional[ClassifiedTrace]:
"""Find the call to `seize` in the child traces (successful liquidation)"""
for trace in traces:
if trace.classification == Classification.seize:
return trace
return None

View File

@ -4,20 +4,17 @@ from uuid import uuid4
from mev_inspect.models.arbitrages import ArbitrageModel
from mev_inspect.schemas.arbitrages import Arbitrage
from .shared import delete_by_block_range
def delete_arbitrages_for_blocks(
def delete_arbitrages_for_block(
db_session,
after_block_number: int,
before_block_number: int,
block_number: int,
) -> None:
delete_by_block_range(
db_session,
ArbitrageModel,
after_block_number,
before_block_number,
(
db_session.query(ArbitrageModel)
.filter(ArbitrageModel.block_number == block_number)
.delete()
)
db_session.commit()
@ -40,8 +37,6 @@ def write_arbitrages(
start_amount=arbitrage.start_amount,
end_amount=arbitrage.end_amount,
profit_amount=arbitrage.profit_amount,
error=arbitrage.error,
protocols={swap.protocol.value for swap in arbitrage.swaps},
)
)

View File

@ -1,39 +1,28 @@
from datetime import datetime
from typing import List
from mev_inspect.db import write_as_csv
from mev_inspect.schemas.blocks import Block
def delete_blocks(
def delete_block(
db_session,
after_block_number: int,
before_block_number: int,
block_number: int,
) -> None:
db_session.execute(
"""
DELETE FROM blocks
WHERE
block_number >= :after_block_number AND
block_number < :before_block_number
""",
params={
"after_block_number": after_block_number,
"before_block_number": before_block_number,
},
"DELETE FROM blocks WHERE block_number = :block_number",
params={"block_number": block_number},
)
db_session.commit()
def write_blocks(
def write_block(
db_session,
blocks: List[Block],
block: Block,
) -> None:
items_generator = (
(
block.block_number,
datetime.fromtimestamp(block.block_timestamp),
)
for block in blocks
db_session.execute(
"INSERT INTO blocks (block_number, block_timestamp) VALUES (:block_number, :block_timestamp)",
params={
"block_number": block.block_number,
"block_timestamp": datetime.fromtimestamp(block.block_timestamp),
},
)
write_as_csv(db_session, "blocks", items_generator)
db_session.commit()

View File

@ -4,20 +4,17 @@ from typing import List
from mev_inspect.models.liquidations import LiquidationModel
from mev_inspect.schemas.liquidations import Liquidation
from .shared import delete_by_block_range
def delete_liquidations_for_blocks(
def delete_liquidations_for_block(
db_session,
after_block_number: int,
before_block_number: int,
block_number: int,
) -> None:
delete_by_block_range(
db_session,
LiquidationModel,
after_block_number,
before_block_number,
(
db_session.query(LiquidationModel)
.filter(LiquidationModel.block_number == block_number)
.delete()
)
db_session.commit()

View File

@ -4,20 +4,17 @@ from typing import List
from mev_inspect.models.miner_payments import MinerPaymentModel
from mev_inspect.schemas.miner_payments import MinerPayment
from .shared import delete_by_block_range
def delete_miner_payments_for_blocks(
def delete_miner_payments_for_block(
db_session,
after_block_number: int,
before_block_number: int,
block_number: int,
) -> None:
delete_by_block_range(
db_session,
MinerPaymentModel,
after_block_number,
before_block_number,
(
db_session.query(MinerPaymentModel)
.filter(MinerPaymentModel.block_number == block_number)
.delete()
)
db_session.commit()

View File

@ -1,30 +0,0 @@
import json
from typing import List
from mev_inspect.crud.shared import delete_by_block_range
from mev_inspect.models.nft_trades import NftTradeModel
from mev_inspect.schemas.nft_trades import NftTrade
def delete_nft_trades_for_blocks(
db_session,
after_block_number: int,
before_block_number: int,
) -> None:
delete_by_block_range(
db_session,
NftTradeModel,
after_block_number,
before_block_number,
)
db_session.commit()
def write_nft_trades(
db_session,
nft_trades: List[NftTrade],
) -> None:
models = [NftTradeModel(**json.loads(nft_trade.json())) for nft_trade in nft_trades]
db_session.bulk_save_objects(models)
db_session.commit()

View File

@ -2,28 +2,25 @@ import json
from typing import List
from mev_inspect.models.punks import (
PunkBidAcceptanceModel,
PunkBidModel,
PunkSnipeModel,
PunkBidModel,
PunkBidAcceptanceModel,
)
from mev_inspect.schemas.punk_accept_bid import PunkBidAcceptance
from mev_inspect.schemas.punk_bid import PunkBid
from mev_inspect.schemas.punk_snipe import PunkSnipe
from .shared import delete_by_block_range
from mev_inspect.schemas.punk_bid import PunkBid
from mev_inspect.schemas.punk_accept_bid import PunkBidAcceptance
def delete_punk_bid_acceptances_for_blocks(
def delete_punk_bid_acceptances_for_block(
db_session,
after_block_number: int,
before_block_number: int,
block_number: int,
) -> None:
delete_by_block_range(
db_session,
PunkBidAcceptanceModel,
after_block_number,
before_block_number,
(
db_session.query(PunkBidAcceptanceModel)
.filter(PunkBidAcceptanceModel.block_number == block_number)
.delete()
)
db_session.commit()
@ -40,17 +37,16 @@ def write_punk_bid_acceptances(
db_session.commit()
def delete_punk_bids_for_blocks(
def delete_punk_bids_for_block(
db_session,
after_block_number: int,
before_block_number: int,
block_number: int,
) -> None:
delete_by_block_range(
db_session,
PunkBidModel,
after_block_number,
before_block_number,
(
db_session.query(PunkBidModel)
.filter(PunkBidModel.block_number == block_number)
.delete()
)
db_session.commit()
@ -64,17 +60,16 @@ def write_punk_bids(
db_session.commit()
def delete_punk_snipes_for_blocks(
def delete_punk_snipes_for_block(
db_session,
after_block_number: int,
before_block_number: int,
block_number: int,
) -> None:
delete_by_block_range(
db_session,
PunkSnipeModel,
after_block_number,
before_block_number,
(
db_session.query(PunkSnipeModel)
.filter(PunkSnipeModel.block_number == block_number)
.delete()
)
db_session.commit()

View File

@ -1,69 +0,0 @@
from typing import List
from uuid import uuid4
from mev_inspect.models.sandwiches import SandwichModel
from mev_inspect.schemas.sandwiches import Sandwich
from .shared import delete_by_block_range
def delete_sandwiches_for_blocks(
db_session,
after_block_number: int,
before_block_number: int,
) -> None:
delete_by_block_range(
db_session,
SandwichModel,
after_block_number,
before_block_number,
)
db_session.commit()
def write_sandwiches(
db_session,
sandwiches: List[Sandwich],
) -> None:
sandwich_models = []
sandwiched_swaps = []
for sandwich in sandwiches:
sandwich_id = str(uuid4())
sandwich_models.append(
SandwichModel(
id=sandwich_id,
block_number=sandwich.block_number,
sandwicher_address=sandwich.sandwicher_address,
frontrun_swap_transaction_hash=sandwich.frontrun_swap.transaction_hash,
frontrun_swap_trace_address=sandwich.frontrun_swap.trace_address,
backrun_swap_transaction_hash=sandwich.backrun_swap.transaction_hash,
backrun_swap_trace_address=sandwich.backrun_swap.trace_address,
profit_token_address=sandwich.profit_token_address,
profit_amount=sandwich.profit_amount,
)
)
for swap in sandwich.sandwiched_swaps:
sandwiched_swaps.append(
{
"sandwich_id": sandwich_id,
"block_number": swap.block_number,
"transaction_hash": swap.transaction_hash,
"trace_address": swap.trace_address,
}
)
if len(sandwich_models) > 0:
db_session.bulk_save_objects(sandwich_models)
db_session.execute(
"""
INSERT INTO sandwiched_swaps
(sandwich_id, block_number, transaction_hash, trace_address)
VALUES
(:sandwich_id, :block_number, :transaction_hash, :trace_address)
""",
params=sandwiched_swaps,
)
db_session.commit()

View File

@ -1,20 +0,0 @@
from typing import Type
from mev_inspect.models.base import Base
def delete_by_block_range(
db_session,
model_class: Type[Base],
after_block_number,
before_block_number,
) -> None:
(
db_session.query(model_class)
.filter(model_class.block_number >= after_block_number)
.filter(model_class.block_number < before_block_number)
.delete()
)
db_session.commit()

View File

@ -1,203 +0,0 @@
INSERT_ARBITRAGE_SUMMARY_QUERY = """
INSERT INTO mev_summary (
SELECT
NULL,
a.block_number,
b.block_timestamp,
NULL AS protocol,
a.transaction_hash,
'arbitrage' AS type,
(
(
SELECT usd_price
FROM prices
WHERE
token_address = a.profit_token_address
AND timestamp <= b.block_timestamp
ORDER BY timestamp DESC
LIMIT 1
) * a.profit_amount / POWER(10, profit_token.decimals)
) AS gross_profit_usd,
(
(
((mp.gas_used * mp.gas_price) + mp.coinbase_transfer) /
POWER(10, 18)
) *
(
SELECT usd_price
FROM prices p
WHERE
p.timestamp <= b.block_timestamp
AND p.token_address = '0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee'
ORDER BY p.timestamp DESC
LIMIT 1
)
) AS miner_payment_usd,
mp.gas_used,
mp.gas_price,
mp.coinbase_transfer,
mp.gas_price_with_coinbase_transfer,
mp.miner_address,
mp.base_fee_per_gas,
ct.error as error,
a.protocols
FROM arbitrages a
JOIN blocks b ON b.block_number = a.block_number
JOIN tokens profit_token ON profit_token.token_address = a.profit_token_address
JOIN classified_traces ct ON
ct.block_number = a.block_number AND
ct.transaction_hash = a.transaction_hash
JOIN miner_payments mp ON
mp.block_number = a.block_number AND
mp.transaction_hash = a.transaction_hash
WHERE
b.block_number >= :after_block_number
AND b.block_number < :before_block_number
AND ct.trace_address = '{}'
AND NOT EXISTS (
SELECT 1
FROM sandwiches front_sandwich
WHERE
front_sandwich.block_number = a.block_number AND
front_sandwich.frontrun_swap_transaction_hash = a.transaction_hash
)
AND NOT EXISTS (
SELECT 1
FROM sandwiches back_sandwich
WHERE
back_sandwich.block_number = a.block_number AND
back_sandwich.backrun_swap_transaction_hash = a.transaction_hash
)
)
"""
INSERT_LIQUIDATIONS_SUMMARY_QUERY = """
INSERT INTO mev_summary (
SELECT
NULL,
l.block_number,
b.block_timestamp,
l.protocol as protocol,
l.transaction_hash,
'liquidation' as type,
l.received_amount*
(
SELECT usd_price
FROM prices
WHERE token_address = l.received_token_address
AND timestamp <= b.block_timestamp
ORDER BY timestamp DESC
LIMIT 1
)
/POWER(10, received_token.decimals)
-
l.debt_purchase_amount*
(
SELECT usd_price
FROM prices
WHERE token_address = l.debt_token_address
AND timestamp <= b.block_timestamp
ORDER BY timestamp DESC
LIMIT 1
)
/POWER(10, debt_token.decimals) as gross_profit_usd,
(
(
((mp.gas_used * mp.gas_price) + mp.coinbase_transfer) /
POWER(10, 18)
) *
(
SELECT usd_price
FROM prices p
WHERE
p.timestamp <= b.block_timestamp
AND p.token_address = '0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee'
ORDER BY p.timestamp DESC
LIMIT 1
)
) AS miner_payment_usd,
mp.gas_used,
mp.gas_price,
mp.coinbase_transfer,
mp.gas_price_with_coinbase_transfer,
mp.miner_address,
mp.base_fee_per_gas,
ct.error as error,
ARRAY[l.protocol]
FROM liquidations l
JOIN blocks b ON b.block_number = l.block_number
JOIN tokens received_token ON received_token.token_address = l.received_token_address
JOIN tokens debt_token ON debt_token.token_address = l.debt_token_address
JOIN miner_payments mp ON
mp.block_number = l.block_number AND
mp.transaction_hash = l.transaction_hash
JOIN classified_traces ct ON
ct.block_number = l.block_number AND
ct.transaction_hash = l.transaction_hash
WHERE
b.block_number >= :after_block_number AND
b.block_number < :before_block_number AND
ct.trace_address = '{}' AND
l.debt_purchase_amount > 0 AND
l.received_amount > 0 AND
l.debt_purchase_amount < 115792089237316195423570985008687907853269984665640564039457584007913129639935
)
"""
def update_summary_for_block_range(
db_session,
after_block_number: int,
before_block_number: int,
) -> None:
_delete_summary_for_block_range(db_session, after_block_number, before_block_number)
_insert_into_summary_for_block_range(
db_session, after_block_number, before_block_number
)
def _delete_summary_for_block_range(
db_session,
after_block_number: int,
before_block_number: int,
) -> None:
db_session.execute(
"""
DELETE FROM mev_summary
WHERE
block_number >= :after_block_number AND
block_number < :before_block_number
""",
params={
"after_block_number": after_block_number,
"before_block_number": before_block_number,
},
)
db_session.commit()
def _insert_into_summary_for_block_range(
db_session,
after_block_number: int,
before_block_number: int,
) -> None:
db_session.execute(
INSERT_ARBITRAGE_SUMMARY_QUERY,
params={
"after_block_number": after_block_number,
"before_block_number": before_block_number,
},
)
db_session.execute(
INSERT_LIQUIDATIONS_SUMMARY_QUERY,
params={
"after_block_number": after_block_number,
"before_block_number": before_block_number,
},
)
db_session.commit()

View File

@ -4,20 +4,17 @@ from typing import List
from mev_inspect.models.swaps import SwapModel
from mev_inspect.schemas.swaps import Swap
from .shared import delete_by_block_range
def delete_swaps_for_blocks(
def delete_swaps_for_block(
db_session,
after_block_number: int,
before_block_number: int,
block_number: int,
) -> None:
delete_by_block_range(
db_session,
SwapModel,
after_block_number,
before_block_number,
(
db_session.query(SwapModel)
.filter(SwapModel.block_number == block_number)
.delete()
)
db_session.commit()

View File

@ -1,24 +1,18 @@
import json
from datetime import datetime, timezone
from typing import List
from mev_inspect.db import to_postgres_list, write_as_csv
from mev_inspect.models.traces import ClassifiedTraceModel
from mev_inspect.schemas.traces import ClassifiedTrace
from .shared import delete_by_block_range
def delete_classified_traces_for_blocks(
def delete_classified_traces_for_block(
db_session,
after_block_number: int,
before_block_number: int,
block_number: int,
) -> None:
delete_by_block_range(
db_session,
ClassifiedTraceModel,
after_block_number,
before_block_number,
(
db_session.query(ClassifiedTraceModel)
.filter(ClassifiedTraceModel.block_number == block_number)
.delete()
)
db_session.commit()
@ -28,35 +22,29 @@ def write_classified_traces(
db_session,
classified_traces: List[ClassifiedTrace],
) -> None:
classified_at = datetime.now(timezone.utc)
items = (
(
classified_at,
trace.transaction_hash,
trace.block_number,
trace.classification.value,
trace.type.value,
str(trace.protocol),
trace.abi_name,
trace.function_name,
trace.function_signature,
_inputs_as_json(trace),
trace.from_address,
trace.to_address,
trace.gas,
trace.value,
trace.gas_used,
trace.error,
to_postgres_list(trace.trace_address),
trace.transaction_position,
models = []
for trace in classified_traces:
inputs_json = (json.loads(trace.json(include={"inputs"}))["inputs"],)
models.append(
ClassifiedTraceModel(
transaction_hash=trace.transaction_hash,
block_number=trace.block_number,
classification=trace.classification.value,
trace_type=trace.type.value,
trace_address=trace.trace_address,
protocol=str(trace.protocol),
abi_name=trace.abi_name,
function_name=trace.function_name,
function_signature=trace.function_signature,
inputs=inputs_json,
from_address=trace.from_address,
to_address=trace.to_address,
gas=trace.gas,
value=trace.value,
gas_used=trace.gas_used,
error=trace.error,
)
)
for trace in classified_traces
)
write_as_csv(db_session, "classified_traces", items)
def _inputs_as_json(trace) -> str:
inputs = json.dumps(json.loads(trace.json(include={"inputs"}))["inputs"])
inputs_with_array = f"[{inputs}]"
return inputs_with_array
db_session.bulk_save_objects(models)
db_session.commit()

View File

@ -4,19 +4,15 @@ from typing import List
from mev_inspect.models.transfers import TransferModel
from mev_inspect.schemas.transfers import Transfer
from .shared import delete_by_block_range
def delete_transfers_for_blocks(
def delete_transfers_for_block(
db_session,
after_block_number: int,
before_block_number: int,
block_number: int,
) -> None:
delete_by_block_range(
db_session,
TransferModel,
after_block_number,
before_block_number,
(
db_session.query(TransferModel)
.filter(TransferModel.block_number == block_number)
.delete()
)
db_session.commit()

Some files were not shown because too many files have changed in this diff Show More