Compare commits

...

102 Commits
s3 ... main

Author SHA1 Message Date
sukoneck
ce8179f07e
Update README.md
graduate repo
2024-11-27 14:42:51 -07:00
m-r-g-t
b3438d7353
feat: remove need for pg within mev-inspect env (#344)
* instructions for running without kubernetes ('monolithic mode')

* added docker instructions

* chore: remove pgsql as hard dependency

* chore: update deps

* docs: updated docs to remove local pg engine for docker install

* docs: reword docs

* ci: update poetry source

* fix: refactor tests for mypy

* fix: search miner for eth2

* feat: improve eth2 miner fn

* refactor: unnecessary comma

* test: add miner generation tests

---------

Co-authored-by: pintail <you@example.com>
2023-10-11 16:16:44 -04:00
Taarush Vemulapalli
26aa190b03
Create LICENSE 2023-05-22 13:42:43 -04:00
pintail-xyz
51c7345d26
exclude reverted punk bid acceptances (#330)
Co-authored-by: pintail <you@example.com>
2023-02-15 08:16:08 -08:00
pintail-xyz
b6777a25dc
check that swaps involve both pool assets (#328)
Co-authored-by: pintail <you@example.com>
2023-02-08 09:01:36 -08:00
pintail-xyz
297afec364
fix arbitrage by filtering out sequence where the start/finish swap contract is the same (#325)
Co-authored-by: pintail <you@example.com>
2023-01-17 08:34:33 -08:00
pintail-xyz
0e42ab6ba4
ignore reverted transactions from liquidation classifier (#317) 2022-12-05 07:59:52 -08:00
Gui Heise
d917ae72de
Merge pull request #306 from maxholloway/mh/fix-broken-readme-link
fix broken README link
2022-08-04 11:09:19 -04:00
Max Holloway
b720bb2c03 fix broken README link 2022-07-25 17:54:03 -05:00
Gui Heise
3c9fe3dca9
Merge pull request #269 from pintail-xyz/separate-cream-compound
separate out the cream and compound liquidation classifiers
2022-04-13 12:01:26 -04:00
pintail
60357cb449 update pre-commit packages 2022-04-09 00:30:43 +01:00
pintail
04d1c86dbd separate cream and compound classifiers 2022-04-07 19:27:47 +01:00
Gui Heise
22135c42dc
Merge pull request #290 from flashbots/export-deletions
Support export deletions
2022-03-22 17:05:32 -04:00
Gui Heise
8cb91afddc Add created_at to blocks table 2022-03-22 16:38:42 -04:00
Gui Heise
a9f57bcd2e Object deletions 2022-03-22 13:27:05 -04:00
Gui Heise
baf370c9fb Remove date from filename 2022-03-22 10:52:07 -04:00
Gui Heise
852ac93ba7
Merge pull request #287 from flashbots/blocks-export
add blocks to export
2022-03-04 14:09:34 -05:00
Gui Heise
c1b26f8888 add blocks to export 2022-03-04 14:03:34 -05:00
Gui Heise
7a0b21b006
Merge pull request #286 from flashbots/export-filename
Add timestamp to filename
2022-03-02 15:15:45 -05:00
Gui Heise
440c5f0754 Add timestamp to filename 2022-03-02 11:51:18 -05:00
Gui Heise
be280ab113
Merge pull request #284 from flashbots/fix-export-backfill
Fix worker actor priority
2022-02-28 13:33:45 -05:00
Gui Heise
77957e07cf Fix priority 2022-02-28 13:13:16 -05:00
Luke Van Seters
38cd60cf88
Merge pull request #282 from Dire-0x/fix/sandwicher-address-not-any-uniswap-router
fix check sandwicher against correct uniswap routers
2022-02-25 10:05:47 -05:00
Dire
75efdb4afe fix check sandwicher against correct uniswap routers 2022-02-24 11:01:44 -06:00
Gui Heise
6aca1d292d
Merge pull request #274 from flashbots/backfill-export
Backfill export
2022-02-22 13:30:39 -05:00
Gui Heise
1fbecbec58 Worker low priority 2022-02-21 13:19:25 -05:00
Gui Heise
180a987a61 Add low priority to cli tasks 2022-02-21 12:45:36 -05:00
Luke Van Seters
af7ae2c3b0
Merge pull request #272 from flashbots/healthcheck-retry
Retry on healthcheck
2022-02-21 12:43:34 -05:00
Gui Heise
5eef1b7a8f Add worker and listener task 2022-02-21 11:16:22 -05:00
Gui Heise
c6e6d694ec Add task to the listener 2022-02-21 11:02:30 -05:00
Gui Heise
da04bc4351 Add tasks to CLI 2022-02-21 10:59:14 -05:00
Gui Heise
cbad9e79b6 Separate tasks 2022-02-21 10:55:26 -05:00
Gui Heise
b486d53012 Remove priorities 2022-02-18 14:47:36 -05:00
Gui Heise
fe9253ca5e Comment Tiltfile 2022-02-18 14:47:36 -05:00
Gui Heise
db6b55ad38 Task priority and queue 2022-02-18 14:47:36 -05:00
Gui Heise
c7e94b55d4 Fix poetry config 2022-02-18 14:47:36 -05:00
Gui Heise
54cc4f1dc6 Add bash script 2022-02-18 14:47:36 -05:00
Gui Heise
c58d75118d Fix task priorities 2022-02-18 14:47:36 -05:00
Gui Heise
b86ecbca87 Add commands 2022-02-18 14:47:36 -05:00
Gui Heise
ed01c155b3
Merge pull request #278 from flashbots/export-tables
Add logic for more tables
2022-02-18 13:53:38 -05:00
Gui Heise
1edd39c382 Spacing 2022-02-18 11:52:45 -05:00
Gui Heise
ca6978a693 Add logic for more tables 2022-02-18 11:10:24 -05:00
Luke Van Seters
8767f27fe6
Merge pull request #275 from flashbots/block-list-2
Enqueue a list of blocks
2022-02-16 12:08:27 -05:00
Luke Van Seters
19eb48aec0 Use the actor 2022-02-16 11:56:50 -05:00
Luke Van Seters
cb6f20ba63 No -t for stdin push 2022-02-16 11:56:33 -05:00
Ryan Radomski
1b42920dd1 Changed backfilling a list of blocks in Readme to using a text file example 2022-02-16 11:36:23 -05:00
Ryan Radomski
fa14caec17 Added block-list command to enqueue a list of blocks from stdin 2022-02-16 11:35:38 -05:00
Luke Van Seters
eda0485fa5 Retry on healthcheck 2022-02-16 10:52:49 -05:00
Luke Van Seters
4b93f95d50
Merge pull request #270 from flashbots/only-write-if-newly-empty
Only export empty blocks if there's an existing non-empty one
2022-02-16 09:47:55 -05:00
Luke Van Seters
25f77a54fc
Merge pull request #267 from flashbots/lukevs-pr-template
Create pull_request_template.md
2022-02-16 09:27:35 -05:00
Luke Van Seters
48cf3612bd
Update pull_request_template.md 2022-02-16 09:19:36 -05:00
Luke Van Seters
4ef2145409 Skip write if no data and no key or current upload has no data 2022-02-16 09:06:51 -05:00
Luke Van Seters
b30d6be0c5 Add peek that preserves the iterable 2022-02-16 08:55:48 -05:00
Luke Van Seters
cd5f82733b
Merge pull request #266 from flashbots/priority-export
Create low and high priority queues. Put export on high priority, backfill on low
2022-02-15 17:20:02 -05:00
Luke Van Seters
6af61dac74
Update pull_request_template.md 2022-02-15 13:23:39 -05:00
Luke Van Seters
d0304474e6
Merge pull request #268 from flashbots/lukevs-contributing-fix
Update CONTRIBUTING.md to include new test command
2022-02-15 12:35:33 -05:00
Luke Van Seters
8544eb46ef
Update CONTRIBUTING.md 2022-02-15 12:26:16 -05:00
Luke Van Seters
a8856521d7
Create pull_request_template.md 2022-02-15 12:24:29 -05:00
Luke Van Seters
2e40c8bd5e Also add priority to listener 2022-02-15 12:01:02 -05:00
Luke Van Seters
7a3f3874b6 Add separate queue names to consume both then internally prioritize 2022-02-15 11:59:04 -05:00
Luke Van Seters
94f4ec7d40 Fix priorities. Lower comes first 2022-02-15 10:28:06 -05:00
Luke Van Seters
a58863b992 Add priorities to queue tasks 2022-02-15 10:25:08 -05:00
Gui Heise
0f4cd2f31d
Merge pull request #263 from flashbots/export-v3
Add Enqueue/Direct exports commands
2022-02-14 17:48:37 -05:00
Gui Heise
26ce3229ef Fix mev 2022-02-14 17:12:44 -05:00
Gui Heise
6e51443ab3 Add enqueue/direct commands 2022-02-14 16:48:38 -05:00
Gui Heise
328215bacb Fix mev 2022-02-14 15:45:38 -05:00
Gui Heise
5f5bafa7e1
Merge pull request #262 from flashbots/export-v2
Add S3 export task
2022-02-14 13:34:43 -05:00
Gui Heise
8c7baecf2a Syntax 2022-02-14 13:30:20 -05:00
Gui Heise
c6f7fd509e Export command and function edits 2022-02-14 12:37:52 -05:00
Gui Heise
95444eae24 Add actor 2022-02-11 18:43:39 -05:00
Gui Heise
bb06c8a958 Add export task 2022-02-11 16:47:24 -05:00
Gui Heise
9dbe68b284 Single block export function 2022-02-11 16:39:50 -05:00
Luke Van Seters
debcb8731a
Merge pull request #258 from flashbots/aws-s3-local
Export a range of blocks in mev_summary to S3
2022-02-11 11:22:48 -05:00
Luke Van Seters
88b5e0ce2a Move ENV names to variables. Make region and keys optional 2022-02-11 11:19:59 -05:00
Luke Van Seters
4dbe6ed2d7 Pass through AWS creds as well. Turn into a secret. Make all optional for folks not using the export 2022-02-11 11:16:06 -05:00
Luke Van Seters
c079ac9aa6 Add region for the export bucket 2022-02-11 11:16:06 -05:00
Luke Van Seters
001b6e2b85 Add a flashbots prefix 2022-02-11 11:16:06 -05:00
Luke Van Seters
aa5c90ae96 only one mev inpect helml 2022-02-11 11:16:06 -05:00
Gui Heise
751059c534 Remove some comments 2022-02-11 11:16:06 -05:00
Gui Heise
dbebb57b9c Tiltfile comments and services constraint 2022-02-11 11:16:06 -05:00
Luke Van Seters
462bff387a Break env piece into a function 2022-02-11 11:16:06 -05:00
Luke Van Seters
040be01e9d Set aws creds through environment variables locally 2022-02-11 11:16:06 -05:00
Gui Heise
00dba743d9 ConfigMap 2022-02-11 11:16:06 -05:00
Luke Van Seters
b1d4cb852b Add some logging. Remove unused list function 2022-02-11 11:16:05 -05:00
Luke Van Seters
d9439dfe27 Run query. Export to S3 2022-02-11 11:15:54 -05:00
Luke Van Seters
06c39d1495 Add boto3. Remove boto. Add a test connection to localstack 2022-02-11 11:15:36 -05:00
Luke Van Seters
e0fc9e7776 Add a shell of a command to do the export 2022-02-11 11:15:05 -05:00
Luke Van Seters
17dec2b203 Expose localhost port 2022-02-11 11:15:05 -05:00
Luke Van Seters
bb875cc45a Add boto 2022-02-11 11:15:05 -05:00
Luke Van Seters
f696bb72f4 Add localstack 2022-02-11 11:15:05 -05:00
Luke Van Seters
1a5aa6308c
Merge pull request #260 from pintail-xyz/reverse-backfill
implement reverse backfill
2022-02-10 15:57:14 -05:00
Luke Van Seters
6b6d80b3da
Merge pull request #261 from flashbots/revert-259-patch-1
Revert "Add new stablecoins and router contracts"
2022-02-10 10:57:59 -05:00
Luke Van Seters
b332bb703f
Revert "Add new stablecoins and router contracts" 2022-02-10 10:57:37 -05:00
Luke Van Seters
31bc65d617
Merge pull request #259 from ivigamberdiev/patch-1
Add new stablecoins and router contracts
2022-02-10 10:56:22 -05:00
pintail
c77869abd5 implement reverse backfill 2022-02-09 22:16:27 +00:00
Luke Van Seters
3965c5f7ba
Merge pull request #255 from flashbots/split-out-workers-from-task
Separate importing tasks from importing the worker
2022-02-08 13:06:37 -05:00
Igor Igamberdiev
0293ea3ed4
Add new stablecoins and router contracts 2022-02-07 21:11:26 +03:00
Luke Van Seters
f836b50ef5
Merge pull request #256 from tmikulin/fix-new-bitnami-postgres-update
adjust the new name for postgres bitnami
2022-02-04 10:02:48 -05:00
Tomislav Mikulin
7b236b7a71 change info in Tiltfile for postgres 2022-02-04 14:25:41 +01:00
Tomislav Mikulin
1b13e975a6 adjust the new name for postgres bitnami 2022-02-04 13:23:25 +01:00
Luke Van Seters
4db05526b3 Remove unused __main__ 2022-02-03 14:50:19 -05:00
Luke Van Seters
ecb3a563c1 Separate tasks from the worker 2022-02-02 13:16:36 -05:00
42 changed files with 29726 additions and 1522 deletions

View File

@ -21,7 +21,7 @@ jobs:
- name: Bootstrap poetry
shell: bash
run: |
curl -sL https://raw.githubusercontent.com/python-poetry/poetry/master/install-poetry.py \
curl -sSL https://install.python-poetry.org \
| python - -y
- name: Update PATH

3
.gitignore vendored
View File

@ -25,3 +25,6 @@ cache
# pycharm
.idea
.env
.python-version

View File

@ -1,6 +1,6 @@
repos:
- repo: https://github.com/ambv/black
rev: 20.8b1
rev: 22.3.0
hooks:
- id: black
language_version: python3.9
@ -20,7 +20,7 @@ repos:
language: system
types: [python]
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v0.910
rev: v0.942
hooks:
- id: 'mypy'
additional_dependencies:

View File

@ -23,7 +23,7 @@ poetry run pre-commit install
Run tests with:
```
kubectl exec deploy/mev-inspect-deployment -- poetry run pytest --cov=mev_inspect tests
./mev test
```
## Send a pull request

21
LICENSE Normal file
View File

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2023 Flashbots
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

126
MONOLITHIC.md Normal file
View File

@ -0,0 +1,126 @@
# Running mev-inspect-py without kubernetes ('monolithic mode')
Running mev-inspect-py outside of kubernetes can be useful for debug purposes. In this case, the steps for installation are:
1. Install dependencies (pyenv, poetry, postgres)
1. Set up python virtual environment using matching python version (3.9.x) and install required python modules using poetry
1. Create postgres database
1. Run database migrations
The database credentials and archive node address used by mev-inspect-py need to be loaded into environment variables (both for database migrations and to run mev-inspect-py).
## Ubuntu install instructions
So, starting from a clean Ubuntu 22.04 installation, the prerequisites for pyenv, psycopg2 (python3-dev libpq-dev) can be installed with
`sudo apt install -y make build-essential git libssl-dev zlib1g-dev libbz2-dev libreadline-dev libsqlite3-dev wget curl llvm libncurses5-dev libncursesw5-dev xz-utils tk-dev liblzma-dev python3-dev libpq-dev`
### pyenv
Install pyenv using the web installer
`curl https://pyenv.run | bash`
and add the following to `~/.bashrc` (if running locally) or `~/.profile` (if running over ssh).
```
export PYENV_ROOT="$HOME/.pyenv"
command -v pyenv >/dev/null || export PATH="$PYENV_ROOT/bin:$PATH"
eval "$(pyenv init -)"
```
Then update the current shell by running `source ~/.bashrc` or `source ~/.profile` as appropriate.
### Poetry
Install Poetry using the web installer
`curl -sSL https://install.python-poetry.org | python3 -`
add the following to `~/.bashrc` (if running locally) or `~/.profile` (if running over ssh)
`export PATH="/home/user/.local/bin:$PATH"`
If running over ssh you should also add the following to `~/.profile` to prevent [Poetry errors](https://github.com/python-poetry/poetry/issues/1917) from a lack of active keyring:
`export PYTHON_KEYRING_BACKEND=keyring.backends.null.Keyring`
Again update current shell by running `source ~/.bashrc` or `source ~/.profile` as appropriate.
### postgres
We have tested two alternatives for postgres - installing locally or as a container.
#### Option 1: Installing locally
To install locally from a clean Ubuntu 22.04 installation, run:
`sudo apt install postgresql postgresql-contrib`
Note: You may need to reconfigure your pg-hba.conf to allow local access.
#### Option 2: Installing docker
To avoid interfering with your local postgres instance, you may prefer to run postgres within a docker container.
For docker installation instructions, please refer to https://docs.docker.com/engine/install/ubuntu/
### mev-inspect-py
With all dependencies now installed, clone the mev-inspec-py repo
```
git clone https://github.com/flashbots/mev-inspect-py.git
cd mev-inspect-py
```
We now install the required pythn version and use Poetry to install the required python modules into a virtual environment.
```
pyenv install 3.9.16
pyenv local 3.9.16
poetry env use 3.9.16
poetry install
```
### Create database
mev-inspect-py outputs to a postgres database, so we need to set this up. There are various ways of doing this, two options are presented here.
#### Option 1 — Run postgres locally
```
sudo -u postgres psql
\password
postgres
create database mev_inspect;
\q
```
#### Option 2 — Use postgres docker image
To avoid interfering with your local postgres instance, you may prefer to run postgres within a docker container. First ensure that postgres is not currently running to ensure port `5432` is available:
`sudo systemctl stop postgresql`
and then start a containerised postgres instance:
`sudo docker run -d -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres -e POSTGRES_DB=mev_inspect postgres`
### Environment variables
We will need to set a few environment variables to use mev-inspect-py. **These will be required every time mev-inspect-py runs**, so again you may wish to add these to your `~/.bashrc` and/or `~/.profile` as appropriate. Note that you need to substitute the correct URL for your archive node below if you are not running Erigon locally.
```
export POSTGRES_USER=postgres
export POSTGRES_PASSWORD=postgres
export POSTGRES_HOST=localhost
export RPC_URL="http://127.0.0.1:8545"
```
### Database migrations
Finally run the database migrations and fetch price information:
```
poetry run alembic upgrade head
poetry run fetch-all-prices
```
## Usage instructions
The same functionality available through kubernetes can be run in 'monolithic mode', but the relevant functions now need to be invoked by Poetry directly. So to inspect a single block, run for example:
`poetry run inspect-block 16379706`
Or to inspect a range of blocks:
`poetry run inspect-many-blocks 16379606 16379706`
Or to run the test suite:
`poetry run pytest tests`

View File

@ -1,3 +1,5 @@
⚠️ This tool has been deprecated. You can visit [Flashbots Data](https://datasets.flashbots.net/) for historical mev-inspect data on Ethereum and join us on the [Flashbots forum](https://collective.flashbots.net). ⚠️
# mev-inspect-py
[![standard-readme compliant](https://img.shields.io/badge/readme%20style-standard-brightgreen.svg?style=flat-square)](https://github.com/RichardLitt/standard-readme)
@ -37,7 +39,7 @@ Set an environment variable `RPC_URL` to an RPC for fetching blocks.
mev-inspect-py currently requires a node with support for Erigon traces and receipts (not geth yet 😔).
[pokt.network](pokt.network)'s "Ethereum Mainnet Archival with trace calls" is a good hosted option.
[pokt.network](https://www.pokt.network/)'s "Ethereum Mainnet Archival with trace calls" is a good hosted option.
Example:
@ -66,6 +68,10 @@ And load prices data
./mev prices fetch-all
```
## Monolithic (non-kubernetes) install instructions
For an alternative means of running mev-inspect-py for smaller set-ups or debug purposes see the [monolithic install instructions](MONOLITHIC.md).
## Usage
### Inspect a single block
@ -162,6 +168,19 @@ DEL dramatiq:default.DQ.msgs
For more information on queues, see the [spec shared by dramatiq](https://github.com/Bogdanp/dramatiq/blob/24cbc0dc551797783f41b08ea461e1b5d23a4058/dramatiq/brokers/redis/dispatch.lua#L24-L43)
**Backfilling a list of blocks**
Create a file containing a block per row, for example blocks.txt containing:
```
12500000
12500001
12500002
```
Then queue the blocks with
```
cat blocks.txt | ./mev block-list
```
To watch the logs for a given worker pod, take its pod name using the above, then run:
```

View File

@ -5,7 +5,7 @@ load("ext://configmap", "configmap_from_dict")
helm_remote("postgresql",
repo_name="bitnami",
repo_url="https://charts.bitnami.com/bitnami",
set=["postgresqlPassword=password", "postgresqlDatabase=mev_inspect"],
set=["auth.postgresPassword=password", "auth.database=mev_inspect"],
)
helm_remote("redis",
@ -42,28 +42,78 @@ docker_build("mev-inspect-py", ".",
trigger="./pyproject.toml"),
],
)
k8s_yaml(helm('./k8s/mev-inspect', name='mev-inspect'))
k8s_resource(
workload="mev-inspect",
resource_deps=["postgresql-postgresql", "redis-master"],
)
k8s_yaml(helm(
'./k8s/mev-inspect',
name='mev-inspect',
set=[
"extraEnv[0].name=AWS_ACCESS_KEY_ID",
"extraEnv[0].value=foobar",
"extraEnv[1].name=AWS_SECRET_ACCESS_KEY",
"extraEnv[1].value=foobar",
"extraEnv[2].name=AWS_REGION",
"extraEnv[2].value=us-east-1",
"extraEnv[3].name=AWS_ENDPOINT_URL",
"extraEnv[3].value=http://localstack:4566",
],
))
k8s_yaml(helm(
'./k8s/mev-inspect-workers',
name='mev-inspect-workers',
set=["replicaCount=1"],
set=[
"extraEnv[0].name=AWS_ACCESS_KEY_ID",
"extraEnv[0].value=foobar",
"extraEnv[1].name=AWS_SECRET_ACCESS_KEY",
"extraEnv[1].value=foobar",
"extraEnv[2].name=AWS_REGION",
"extraEnv[2].value=us-east-1",
"extraEnv[3].name=AWS_ENDPOINT_URL",
"extraEnv[3].value=http://localstack:4566",
"replicaCount=1",
],
))
k8s_resource(
workload="mev-inspect",
resource_deps=["postgresql", "redis-master"],
)
k8s_resource(
workload="mev-inspect-workers",
resource_deps=["postgresql-postgresql", "redis-master"],
resource_deps=["postgresql", "redis-master"],
)
# uncomment to enable price monitor
# k8s_yaml(helm('./k8s/mev-inspect-prices', name='mev-inspect-prices'))
# k8s_resource(workload="mev-inspect-prices", resource_deps=["postgresql-postgresql"])
# k8s_resource(workload="mev-inspect-prices", resource_deps=["postgresql"])
local_resource(
'pg-port-forward',
serve_cmd='kubectl port-forward --namespace default svc/postgresql 5432:5432',
resource_deps=["postgresql-postgresql"]
resource_deps=["postgresql"]
)
# if using local S3 exports
#k8s_yaml(secret_from_dict("mev-inspect-export", inputs = {
# "export-bucket-name" : "local-export",
# "export-bucket-region": "us-east-1",
# "export-aws-access-key-id": "foobar",
# "export-aws-secret-access-key": "foobar",
#}))
#helm_remote(
# "localstack",
# repo_name="localstack-charts",
# repo_url="https://localstack.github.io/helm-charts",
#)
#
#local_resource(
# 'localstack-port-forward',
# serve_cmd='kubectl port-forward --namespace default svc/localstack 4566:4566',
# resource_deps=["localstack"]
#)
#
#k8s_yaml(configmap_from_dict("mev-inspect-export", inputs = {
# "services": "s3",
#}))

98
cli.py
View File

@ -1,15 +1,25 @@
import fileinput
import logging
import os
import sys
from datetime import datetime
import click
import dramatiq
from mev_inspect.concurrency import coro
from mev_inspect.crud.prices import write_prices
from mev_inspect.db import get_inspect_session, get_trace_session
from mev_inspect.inspector import MEVInspector
from mev_inspect.prices import fetch_prices, fetch_prices_range
from mev_inspect.queue.broker import connect_broker
from mev_inspect.queue.tasks import (
LOW_PRIORITY,
LOW_PRIORITY_QUEUE,
backfill_export_task,
inspect_many_blocks_task,
)
from mev_inspect.s3_export import export_block
RPC_URL_ENV = "RPC_URL"
@ -93,18 +103,50 @@ async def inspect_many_blocks_command(
@cli.command()
@click.argument("after_block", type=int)
@click.argument("before_block", type=int)
@click.argument("batch_size", type=int, default=10)
def enqueue_many_blocks_command(after_block: int, before_block: int, batch_size: int):
from worker import ( # pylint: disable=import-outside-toplevel
def enqueue_block_list_command():
broker = connect_broker()
inspect_many_blocks_actor = dramatiq.actor(
inspect_many_blocks_task,
broker=broker,
queue_name=LOW_PRIORITY_QUEUE,
priority=LOW_PRIORITY,
)
for batch_after_block in range(after_block, before_block, batch_size):
batch_before_block = min(batch_after_block + batch_size, before_block)
logger.info(f"Sending {batch_after_block} to {batch_before_block}")
inspect_many_blocks_task.send(batch_after_block, batch_before_block)
for block_string in fileinput.input():
block = int(block_string)
logger.info(f"Sending {block} to {block+1}")
inspect_many_blocks_actor.send(block, block + 1)
@cli.command()
@click.argument("start_block", type=int)
@click.argument("end_block", type=int)
@click.argument("batch_size", type=int, default=10)
def enqueue_many_blocks_command(start_block: int, end_block: int, batch_size: int):
broker = connect_broker()
inspect_many_blocks_actor = dramatiq.actor(
inspect_many_blocks_task,
broker=broker,
queue_name=LOW_PRIORITY_QUEUE,
priority=LOW_PRIORITY,
)
if start_block < end_block:
after_block = start_block
before_block = end_block
for batch_after_block in range(after_block, before_block, batch_size):
batch_before_block = min(batch_after_block + batch_size, before_block)
logger.info(f"Sending {batch_after_block} to {batch_before_block}")
inspect_many_blocks_actor.send(batch_after_block, batch_before_block)
else:
after_block = end_block
before_block = start_block
for batch_before_block in range(before_block, after_block, -1 * batch_size):
batch_after_block = max(batch_before_block - batch_size, after_block)
logger.info(f"Sending {batch_after_block} to {batch_before_block}")
inspect_many_blocks_actor.send(batch_after_block, batch_before_block)
@cli.command()
@ -118,6 +160,44 @@ def fetch_all_prices():
write_prices(inspect_db_session, prices)
@cli.command()
@click.argument("block_number", type=int)
def enqueue_s3_export(block_number: int):
broker = connect_broker()
export_actor = dramatiq.actor(
backfill_export_task,
broker=broker,
queue_name=LOW_PRIORITY_QUEUE,
priority=LOW_PRIORITY,
)
logger.info(f"Sending block {block_number} export to queue")
export_actor.send(block_number)
@cli.command()
@click.argument("after_block", type=int)
@click.argument("before_block", type=int)
def enqueue_many_s3_exports(after_block: int, before_block: int):
broker = connect_broker()
export_actor = dramatiq.actor(
backfill_export_task,
broker=broker,
queue_name=LOW_PRIORITY_QUEUE,
priority=LOW_PRIORITY,
)
logger.info(f"Sending blocks {after_block} to {before_block} to queue")
for block_number in range(after_block, before_block):
export_actor.send(block_number)
@cli.command()
@click.argument("block_number", type=int)
def s3_export(block_number: int):
inspect_db_session = get_inspect_session()
logger.info(f"Exporting {block_number}")
export_block(inspect_db_session, block_number)
@cli.command()
@click.argument("after", type=click.DateTime(formats=["%Y-%m-%d", "%m-%d-%Y"]))
@click.argument("before", type=click.DateTime(formats=["%Y-%m-%d", "%m-%d-%Y"]))

View File

@ -91,6 +91,34 @@ spec:
name: mev-inspect-listener-healthcheck
key: url
optional: true
- name: EXPORT_BUCKET_NAME
valueFrom:
secretKeyRef:
name: mev-inspect-export
key: export-bucket-name
optional: true
- name: EXPORT_BUCKET_REGION
valueFrom:
secretKeyRef:
name: mev-inspect-export
key: export-bucket-region
optional: true
- name: EXPORT_AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: mev-inspect-export
key: export-aws-access-key-id
optional: true
- name: EXPORT_AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: mev-inspect-export
key: export-aws-secret-access-key
optional: true
{{- range .Values.extraEnv }}
- name: {{ .name }}
value: {{ .value }}
{{- end }}
{{- with .Values.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}

View File

@ -91,6 +91,34 @@ spec:
name: mev-inspect-listener-healthcheck
key: url
optional: true
- name: EXPORT_BUCKET_NAME
valueFrom:
secretKeyRef:
name: mev-inspect-export
key: export-bucket-name
optional: true
- name: EXPORT_BUCKET_REGION
valueFrom:
secretKeyRef:
name: mev-inspect-export
key: export-bucket-region
optional: true
- name: EXPORT_AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: mev-inspect-export
key: export-aws-access-key-id
optional: true
- name: EXPORT_AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: mev-inspect-export
key: export-aws-secret-access-key
optional: true
{{- range .Values.extraEnv }}
- name: {{ .name }}
value: {{ .value }}
{{- end }}
{{- with .Values.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}

View File

@ -2,7 +2,8 @@ import asyncio
import logging
import os
import aiohttp
import dramatiq
from aiohttp_retry import ExponentialRetry, RetryClient
from mev_inspect.block import get_latest_block_number
from mev_inspect.concurrency import coro
@ -13,6 +14,12 @@ from mev_inspect.crud.latest_block_update import (
from mev_inspect.db import get_inspect_session, get_trace_session
from mev_inspect.inspector import MEVInspector
from mev_inspect.provider import get_base_provider
from mev_inspect.queue.broker import connect_broker
from mev_inspect.queue.tasks import (
HIGH_PRIORITY,
HIGH_PRIORITY_QUEUE,
realtime_export_task,
)
from mev_inspect.signal_handler import GracefulKiller
logging.basicConfig(filename="listener.log", filemode="a", level=logging.INFO)
@ -37,6 +44,14 @@ async def run():
inspect_db_session = get_inspect_session()
trace_db_session = get_trace_session()
broker = connect_broker()
export_actor = dramatiq.actor(
realtime_export_task,
broker=broker,
queue_name=HIGH_PRIORITY_QUEUE,
priority=HIGH_PRIORITY,
)
inspector = MEVInspector(rpc)
base_provider = get_base_provider(rpc)
@ -47,6 +62,7 @@ async def run():
trace_db_session,
base_provider,
healthcheck_url,
export_actor,
)
logger.info("Stopping...")
@ -58,7 +74,9 @@ async def inspect_next_block(
trace_db_session,
base_provider,
healthcheck_url,
export_actor,
):
latest_block_number = await get_latest_block_number(base_provider)
last_written_block = find_latest_block_update(inspect_db_session)
@ -82,6 +100,9 @@ async def inspect_next_block(
update_latest_block(inspect_db_session, block_number)
logger.info(f"Sending block {block_number} for export")
export_actor.send(block_number)
if healthcheck_url:
await ping_healthcheck_url(healthcheck_url)
else:
@ -89,8 +110,12 @@ async def inspect_next_block(
async def ping_healthcheck_url(url):
async with aiohttp.ClientSession() as session:
async with session.get(url):
retry_options = ExponentialRetry(attempts=3)
async with RetryClient(
raise_for_status=False, retry_options=retry_options
) as client:
async with client.get(url) as _response:
pass

39
mev
View File

@ -45,12 +45,16 @@ case "$1" in
listener)
kubectl exec -ti deploy/mev-inspect -- ./listener $2
;;
block-list)
echo "Backfilling blocks from stdin"
kubectl exec -i deploy/mev-inspect -- poetry run enqueue-block-list
;;
backfill)
start_block_number=$2
end_block_number=$3
after_block_number=$2
before_block_number=$3
echo "Backfilling from $start_block_number to $end_block_number"
kubectl exec -ti deploy/mev-inspect -- poetry run enqueue-many-blocks $start_block_number $end_block_number
echo "Backfilling from $after_block_number to $before_block_number"
kubectl exec -ti deploy/mev-inspect -- poetry run enqueue-many-blocks $after_block_number $before_block_number
;;
inspect)
block_number=$2
@ -58,11 +62,11 @@ case "$1" in
kubectl exec -ti deploy/mev-inspect -- poetry run inspect-block $block_number
;;
inspect-many)
start_block_number=$2
end_block_number=$3
echo "Inspecting from block $start_block_number to $end_block_number"
after_block_number=$2
before_block_number=$3
echo "Inspecting from block $after_block_number to $before_block_number"
kubectl exec -ti deploy/mev-inspect -- \
poetry run inspect-many-blocks $start_block_number $end_block_number
poetry run inspect-many-blocks $after_block_number $before_block_number
;;
test)
shift
@ -94,6 +98,25 @@ case "$1" in
exit 1
esac
;;
backfill-export)
after_block=$2
before_block=$3
echo "Sending $after_block to $before_block export to queue"
kubectl exec -ti deploy/mev-inspect -- poetry run enqueue-many-s3-exports $after_block $before_block
;;
enqueue-s3-export)
block_number=$2
echo "Sending $block_number export to queue"
kubectl exec -ti deploy/mev-inspect -- poetry run enqueue-s3-export $block_number
;;
s3-export)
block_number=$2
echo "Exporting $block_number"
kubectl exec -ti deploy/mev-inspect -- poetry run s3-export $block_number
;;
exec)
shift
kubectl exec -ti deploy/mev-inspect -- $@

View File

@ -163,6 +163,8 @@ def _get_all_start_end_swaps(swaps: List[Swap]) -> List[Tuple[Swap, List[Swap]]]
if (
potential_start_swap.token_in_address
== potential_end_swap.token_out_address
and potential_start_swap.contract_address
!= potential_end_swap.contract_address
and potential_start_swap.from_address == potential_end_swap.to_address
and not potential_start_swap.from_address in pool_addrs
):

View File

@ -34,8 +34,7 @@ async def create_from_block_number(
_find_or_fetch_block_traces(w3, block_number, trace_db_session),
_find_or_fetch_base_fee_per_gas(w3, block_number, trace_db_session),
)
miner_address = _get_miner_address_from_traces(traces)
miner_address = await _find_or_fetch_miner_address(w3, block_number, traces)
return Block(
block_number=block_number,
@ -180,11 +179,27 @@ def _find_base_fee_per_gas(
return base_fee
async def _find_or_fetch_miner_address(
w3,
block_number: int,
traces: List[Trace],
) -> Optional[str]:
# eth1 blocks
miner_address = _get_miner_address_from_traces(traces)
if miner_address is not None:
return miner_address
return await _fetch_miner_eth2(w3, block_number)
async def _fetch_miner_eth2(w3, block_number: int) -> Optional[str]:
block_json = await w3.eth.get_block(block_number)
return block_json["miner"]
def _get_miner_address_from_traces(traces: List[Trace]) -> Optional[str]:
for trace in traces:
if trace.type == TraceType.reward:
return trace.action["author"]
return None

View File

@ -94,6 +94,9 @@ def create_swap_from_pool_transfers(
transfer_in = transfers_to_pool[-1]
transfer_out = transfers_from_pool_to_recipient[0]
if transfer_in.token_address == transfer_out.token_address:
return None
return Swap(
abi_name=trace.abi_name,
transaction_hash=trace.transaction_hash,

View File

@ -7,6 +7,7 @@ from .aave import AAVE_CLASSIFIER_SPECS
from .balancer import BALANCER_CLASSIFIER_SPECS
from .bancor import BANCOR_CLASSIFIER_SPECS
from .compound import COMPOUND_CLASSIFIER_SPECS
from .cream import CREAM_CLASSIFIER_SPECS
from .cryptopunks import CRYPTOPUNKS_CLASSIFIER_SPECS
from .curve import CURVE_CLASSIFIER_SPECS
from .erc20 import ERC20_CLASSIFIER_SPECS
@ -24,6 +25,7 @@ ALL_CLASSIFIER_SPECS = (
+ ZEROX_CLASSIFIER_SPECS
+ BALANCER_CLASSIFIER_SPECS
+ COMPOUND_CLASSIFIER_SPECS
+ CREAM_CLASSIFIER_SPECS
+ CRYPTOPUNKS_CLASSIFIER_SPECS
+ OPENSEA_CLASSIFIER_SPECS
+ BANCOR_CLASSIFIER_SPECS

View File

@ -85,16 +85,6 @@ COMPOUND_V2_CETH_SPEC = ClassifierSpec(
},
)
CREAM_CETH_SPEC = ClassifierSpec(
abi_name="CEther",
protocol=Protocol.cream,
valid_contract_addresses=["0xD06527D5e56A3495252A528C4987003b712860eE"],
classifiers={
"liquidateBorrow(address,address)": CompoundLiquidationClassifier,
"seize(address,address,uint256)": SeizeClassifier,
},
)
COMPOUND_V2_CTOKEN_SPEC = ClassifierSpec(
abi_name="CToken",
protocol=Protocol.compound_v2,
@ -123,113 +113,9 @@ COMPOUND_V2_CTOKEN_SPEC = ClassifierSpec(
},
)
CREAM_CTOKEN_SPEC = ClassifierSpec(
abi_name="CToken",
protocol=Protocol.cream,
valid_contract_addresses=[
"0xd06527d5e56a3495252a528c4987003b712860ee",
"0x51f48b638f82e8765f7a26373a2cb4ccb10c07af",
"0x44fbebd2f576670a6c33f6fc0b00aa8c5753b322",
"0xcbae0a83f4f9926997c8339545fb8ee32edc6b76",
"0xce4fe9b4b8ff61949dcfeb7e03bc9faca59d2eb3",
"0x19d1666f543d42ef17f66e376944a22aea1a8e46",
"0x9baf8a5236d44ac410c0186fe39178d5aad0bb87",
"0x797aab1ce7c01eb727ab980762ba88e7133d2157",
"0x892b14321a4fcba80669ae30bd0cd99a7ecf6ac0",
"0x697256caa3ccafd62bb6d3aa1c7c5671786a5fd9",
"0x8b86e0598616a8d4f1fdae8b59e55fb5bc33d0d6",
"0xc7fd8dcee4697ceef5a2fd4608a7bd6a94c77480",
"0x17107f40d70f4470d20cb3f138a052cae8ebd4be",
"0x1ff8cdb51219a8838b52e9cac09b71e591bc998e",
"0x3623387773010d9214b10c551d6e7fc375d31f58",
"0x4ee15f44c6f0d8d1136c83efd2e8e4ac768954c6",
"0x338286c0bc081891a4bda39c7667ae150bf5d206",
"0x10fdbd1e48ee2fd9336a482d746138ae19e649db",
"0x01da76dea59703578040012357b81ffe62015c2d",
"0xef58b2d5a1b8d3cde67b8ab054dc5c831e9bc025",
"0xe89a6d0509faf730bd707bf868d9a2a744a363c7",
"0xeff039c3c1d668f408d09dd7b63008622a77532c",
"0x22b243b96495c547598d9042b6f94b01c22b2e9e",
"0x8b3ff1ed4f36c2c2be675afb13cc3aa5d73685a5",
"0x2a537fa9ffaea8c1a41d3c2b68a9cb791529366d",
"0x7ea9c63e216d5565c3940a2b3d150e59c2907db3",
"0x3225e3c669b39c7c8b3e204a8614bb218c5e31bc",
"0xf55bbe0255f7f4e70f63837ff72a577fbddbe924",
"0x903560b1cce601794c584f58898da8a8b789fc5d",
"0x054b7ed3f45714d3091e82aad64a1588dc4096ed",
"0xd5103afcd0b3fa865997ef2984c66742c51b2a8b",
"0xfd609a03b393f1a1cfcacedabf068cad09a924e2",
"0xd692ac3245bb82319a31068d6b8412796ee85d2c",
"0x92b767185fb3b04f881e3ac8e5b0662a027a1d9f",
"0x10a3da2bb0fae4d591476fd97d6636fd172923a8",
"0x3c6c553a95910f9fc81c98784736bd628636d296",
"0x21011bc93d9e515b9511a817a1ed1d6d468f49fc",
"0x85759961b116f1d36fd697855c57a6ae40793d9b",
"0x7c3297cfb4c4bbd5f44b450c0872e0ada5203112",
"0x7aaa323d7e398be4128c7042d197a2545f0f1fea",
"0x011a014d5e8eb4771e575bb1000318d509230afa",
"0xe6c3120f38f56deb38b69b65cc7dcaf916373963",
"0x4fe11bc316b6d7a345493127fbe298b95adaad85",
"0xcd22c4110c12ac41acefa0091c432ef44efaafa0",
"0x228619cca194fbe3ebeb2f835ec1ea5080dafbb2",
"0x73f6cba38922960b7092175c0add22ab8d0e81fc",
"0x38f27c03d6609a86ff7716ad03038881320be4ad",
"0x5ecad8a75216cea7dff978525b2d523a251eea92",
"0x5c291bc83d15f71fb37805878161718ea4b6aee9",
"0x6ba0c66c48641e220cf78177c144323b3838d375",
"0xd532944df6dfd5dd629e8772f03d4fc861873abf",
"0x197070723ce0d3810a0e47f06e935c30a480d4fc",
"0xc25eae724f189ba9030b2556a1533e7c8a732e14",
"0x25555933a8246ab67cbf907ce3d1949884e82b55",
"0xc68251421edda00a10815e273fa4b1191fac651b",
"0x65883978ada0e707c3b2be2a6825b1c4bdf76a90",
"0x8b950f43fcac4931d408f1fcda55c6cb6cbf3096",
"0x59089279987dd76fc65bf94cb40e186b96e03cb3",
"0x2db6c82ce72c8d7d770ba1b5f5ed0b6e075066d6",
"0xb092b4601850e23903a42eacbc9d8a0eec26a4d5",
"0x081fe64df6dc6fc70043aedf3713a3ce6f190a21",
"0x1d0986fb43985c88ffa9ad959cc24e6a087c7e35",
"0xc36080892c64821fa8e396bc1bd8678fa3b82b17",
"0x8379baa817c5c5ab929b03ee8e3c48e45018ae41",
"0x299e254a8a165bbeb76d9d69305013329eea3a3b",
"0xf8445c529d363ce114148662387eba5e62016e20",
"0x28526bb33d7230e65e735db64296413731c5402e",
"0x45406ba53bb84cd32a58e7098a2d4d1b11b107f6",
"0x6d1b9e01af17dd08d6dec08e210dfd5984ff1c20",
"0x1f9b4756b008106c806c7e64322d7ed3b72cb284",
"0xab10586c918612ba440482db77549d26b7abf8f7",
"0xdfff11dfe6436e42a17b86e7f419ac8292990393",
"0xdbb5e3081def4b6cdd8864ac2aeda4cbf778fecf",
"0x71cefcd324b732d4e058afacba040d908c441847",
"0x1a122348b73b58ea39f822a89e6ec67950c2bbd0",
"0x523effc8bfefc2948211a05a905f761cba5e8e9e",
"0x4202d97e00b9189936edf37f8d01cff88bdd81d4",
"0x4baa77013ccd6705ab0522853cb0e9d453579dd4",
"0x98e329eb5aae2125af273102f3440de19094b77c",
"0x8c3b7a4320ba70f8239f83770c4015b5bc4e6f91",
"0xe585c76573d7593abf21537b607091f76c996e73",
"0x81e346729723c4d15d0fb1c5679b9f2926ff13c6",
"0x766175eac1a99c969ddd1ebdbe7e270d508d8fff",
"0xd7394428536f63d5659cc869ef69d10f9e66314b",
"0x1241b10e7ea55b22f5b2d007e8fecdf73dcff999",
"0x2a867fd776b83e1bd4e13c6611afd2f6af07ea6d",
"0x250fb308199fe8c5220509c1bf83d21d60b7f74a",
"0x4112a717edd051f77d834a6703a1ef5e3d73387f",
"0xf04ce2e71d32d789a259428ddcd02d3c9f97fb4e",
"0x89e42987c39f72e2ead95a8a5bc92114323d5828",
"0x58da9c9fc3eb30abbcbbab5ddabb1e6e2ef3d2ef",
],
classifiers={
"liquidateBorrow(address,uint256,address)": CompoundLiquidationClassifier,
"seize(address,address,uint256)": SeizeClassifier,
},
)
COMPOUND_CLASSIFIER_SPECS: List[ClassifierSpec] = [
COMPOUND_V2_CETH_SPEC,
COMPOUND_V2_CTOKEN_SPEC,
CREAM_CETH_SPEC,
CREAM_CTOKEN_SPEC,
]

View File

@ -0,0 +1,204 @@
from typing import List, Optional
from mev_inspect.classifiers.helpers import get_debt_transfer, get_received_transfer
from mev_inspect.schemas.classifiers import (
Classification,
ClassifiedTrace,
ClassifierSpec,
DecodedCallTrace,
LiquidationClassifier,
SeizeClassifier,
)
from mev_inspect.schemas.liquidations import Liquidation
from mev_inspect.schemas.prices import ETH_TOKEN_ADDRESS
from mev_inspect.schemas.traces import Protocol
from mev_inspect.schemas.transfers import Transfer
CRETH_TOKEN_ADDRESS = "0xd06527d5e56a3495252a528c4987003b712860ee"
class CreamLiquidationClassifier(LiquidationClassifier):
@staticmethod
def parse_liquidation(
liquidation_trace: DecodedCallTrace,
child_transfers: List[Transfer],
child_traces: List[ClassifiedTrace],
) -> Optional[Liquidation]:
liquidator = liquidation_trace.from_address
liquidated = liquidation_trace.inputs["borrower"]
debt_token_address = liquidation_trace.to_address
received_token_address = liquidation_trace.inputs["cTokenCollateral"]
debt_purchase_amount = None
received_amount = None
debt_purchase_amount, debt_token_address = (
(liquidation_trace.value, ETH_TOKEN_ADDRESS)
if debt_token_address == CRETH_TOKEN_ADDRESS
and liquidation_trace.value != 0
else (liquidation_trace.inputs["repayAmount"], CRETH_TOKEN_ADDRESS)
)
debt_transfer = get_debt_transfer(liquidator, child_transfers)
received_transfer = get_received_transfer(liquidator, child_transfers)
seize_trace = _get_seize_call(child_traces)
if debt_transfer is not None:
debt_token_address = debt_transfer.token_address
debt_purchase_amount = debt_transfer.amount
if received_transfer is not None:
received_token_address = received_transfer.token_address
received_amount = received_transfer.amount
elif seize_trace is not None and seize_trace.inputs is not None:
received_amount = seize_trace.inputs["seizeTokens"]
if received_amount is None:
return None
return Liquidation(
liquidated_user=liquidated,
debt_token_address=debt_token_address,
liquidator_user=liquidator,
debt_purchase_amount=debt_purchase_amount,
protocol=liquidation_trace.protocol,
received_amount=received_amount,
received_token_address=received_token_address,
transaction_hash=liquidation_trace.transaction_hash,
trace_address=liquidation_trace.trace_address,
block_number=liquidation_trace.block_number,
error=liquidation_trace.error,
)
return None
CREAM_CRETH_SPEC = ClassifierSpec(
abi_name="CEther",
protocol=Protocol.cream,
valid_contract_addresses=["0xD06527D5e56A3495252A528C4987003b712860eE"],
classifiers={
"liquidateBorrow(address,address)": CreamLiquidationClassifier,
"seize(address,address,uint256)": SeizeClassifier,
},
)
CREAM_CTOKEN_SPEC = ClassifierSpec(
abi_name="CToken",
protocol=Protocol.cream,
valid_contract_addresses=[
"0xd06527d5e56a3495252a528c4987003b712860ee",
"0x51f48b638f82e8765f7a26373a2cb4ccb10c07af",
"0x44fbebd2f576670a6c33f6fc0b00aa8c5753b322",
"0xcbae0a83f4f9926997c8339545fb8ee32edc6b76",
"0xce4fe9b4b8ff61949dcfeb7e03bc9faca59d2eb3",
"0x19d1666f543d42ef17f66e376944a22aea1a8e46",
"0x9baf8a5236d44ac410c0186fe39178d5aad0bb87",
"0x797aab1ce7c01eb727ab980762ba88e7133d2157",
"0x892b14321a4fcba80669ae30bd0cd99a7ecf6ac0",
"0x697256caa3ccafd62bb6d3aa1c7c5671786a5fd9",
"0x8b86e0598616a8d4f1fdae8b59e55fb5bc33d0d6",
"0xc7fd8dcee4697ceef5a2fd4608a7bd6a94c77480",
"0x17107f40d70f4470d20cb3f138a052cae8ebd4be",
"0x1ff8cdb51219a8838b52e9cac09b71e591bc998e",
"0x3623387773010d9214b10c551d6e7fc375d31f58",
"0x4ee15f44c6f0d8d1136c83efd2e8e4ac768954c6",
"0x338286c0bc081891a4bda39c7667ae150bf5d206",
"0x10fdbd1e48ee2fd9336a482d746138ae19e649db",
"0x01da76dea59703578040012357b81ffe62015c2d",
"0xef58b2d5a1b8d3cde67b8ab054dc5c831e9bc025",
"0xe89a6d0509faf730bd707bf868d9a2a744a363c7",
"0xeff039c3c1d668f408d09dd7b63008622a77532c",
"0x22b243b96495c547598d9042b6f94b01c22b2e9e",
"0x8b3ff1ed4f36c2c2be675afb13cc3aa5d73685a5",
"0x2a537fa9ffaea8c1a41d3c2b68a9cb791529366d",
"0x7ea9c63e216d5565c3940a2b3d150e59c2907db3",
"0x3225e3c669b39c7c8b3e204a8614bb218c5e31bc",
"0xf55bbe0255f7f4e70f63837ff72a577fbddbe924",
"0x903560b1cce601794c584f58898da8a8b789fc5d",
"0x054b7ed3f45714d3091e82aad64a1588dc4096ed",
"0xd5103afcd0b3fa865997ef2984c66742c51b2a8b",
"0xfd609a03b393f1a1cfcacedabf068cad09a924e2",
"0xd692ac3245bb82319a31068d6b8412796ee85d2c",
"0x92b767185fb3b04f881e3ac8e5b0662a027a1d9f",
"0x10a3da2bb0fae4d591476fd97d6636fd172923a8",
"0x3c6c553a95910f9fc81c98784736bd628636d296",
"0x21011bc93d9e515b9511a817a1ed1d6d468f49fc",
"0x85759961b116f1d36fd697855c57a6ae40793d9b",
"0x7c3297cfb4c4bbd5f44b450c0872e0ada5203112",
"0x7aaa323d7e398be4128c7042d197a2545f0f1fea",
"0x011a014d5e8eb4771e575bb1000318d509230afa",
"0xe6c3120f38f56deb38b69b65cc7dcaf916373963",
"0x4fe11bc316b6d7a345493127fbe298b95adaad85",
"0xcd22c4110c12ac41acefa0091c432ef44efaafa0",
"0x228619cca194fbe3ebeb2f835ec1ea5080dafbb2",
"0x73f6cba38922960b7092175c0add22ab8d0e81fc",
"0x38f27c03d6609a86ff7716ad03038881320be4ad",
"0x5ecad8a75216cea7dff978525b2d523a251eea92",
"0x5c291bc83d15f71fb37805878161718ea4b6aee9",
"0x6ba0c66c48641e220cf78177c144323b3838d375",
"0xd532944df6dfd5dd629e8772f03d4fc861873abf",
"0x197070723ce0d3810a0e47f06e935c30a480d4fc",
"0xc25eae724f189ba9030b2556a1533e7c8a732e14",
"0x25555933a8246ab67cbf907ce3d1949884e82b55",
"0xc68251421edda00a10815e273fa4b1191fac651b",
"0x65883978ada0e707c3b2be2a6825b1c4bdf76a90",
"0x8b950f43fcac4931d408f1fcda55c6cb6cbf3096",
"0x59089279987dd76fc65bf94cb40e186b96e03cb3",
"0x2db6c82ce72c8d7d770ba1b5f5ed0b6e075066d6",
"0xb092b4601850e23903a42eacbc9d8a0eec26a4d5",
"0x081fe64df6dc6fc70043aedf3713a3ce6f190a21",
"0x1d0986fb43985c88ffa9ad959cc24e6a087c7e35",
"0xc36080892c64821fa8e396bc1bd8678fa3b82b17",
"0x8379baa817c5c5ab929b03ee8e3c48e45018ae41",
"0x299e254a8a165bbeb76d9d69305013329eea3a3b",
"0xf8445c529d363ce114148662387eba5e62016e20",
"0x28526bb33d7230e65e735db64296413731c5402e",
"0x45406ba53bb84cd32a58e7098a2d4d1b11b107f6",
"0x6d1b9e01af17dd08d6dec08e210dfd5984ff1c20",
"0x1f9b4756b008106c806c7e64322d7ed3b72cb284",
"0xab10586c918612ba440482db77549d26b7abf8f7",
"0xdfff11dfe6436e42a17b86e7f419ac8292990393",
"0xdbb5e3081def4b6cdd8864ac2aeda4cbf778fecf",
"0x71cefcd324b732d4e058afacba040d908c441847",
"0x1a122348b73b58ea39f822a89e6ec67950c2bbd0",
"0x523effc8bfefc2948211a05a905f761cba5e8e9e",
"0x4202d97e00b9189936edf37f8d01cff88bdd81d4",
"0x4baa77013ccd6705ab0522853cb0e9d453579dd4",
"0x98e329eb5aae2125af273102f3440de19094b77c",
"0x8c3b7a4320ba70f8239f83770c4015b5bc4e6f91",
"0xe585c76573d7593abf21537b607091f76c996e73",
"0x81e346729723c4d15d0fb1c5679b9f2926ff13c6",
"0x766175eac1a99c969ddd1ebdbe7e270d508d8fff",
"0xd7394428536f63d5659cc869ef69d10f9e66314b",
"0x1241b10e7ea55b22f5b2d007e8fecdf73dcff999",
"0x2a867fd776b83e1bd4e13c6611afd2f6af07ea6d",
"0x250fb308199fe8c5220509c1bf83d21d60b7f74a",
"0x4112a717edd051f77d834a6703a1ef5e3d73387f",
"0xf04ce2e71d32d789a259428ddcd02d3c9f97fb4e",
"0x89e42987c39f72e2ead95a8a5bc92114323d5828",
"0x58da9c9fc3eb30abbcbbab5ddabb1e6e2ef3d2ef",
],
classifiers={
"liquidateBorrow(address,uint256,address)": CreamLiquidationClassifier,
"seize(address,address,uint256)": SeizeClassifier,
},
)
CREAM_CLASSIFIER_SPECS: List[ClassifierSpec] = [
CREAM_CRETH_SPEC,
CREAM_CTOKEN_SPEC,
]
def _get_seize_call(traces: List[ClassifiedTrace]) -> Optional[ClassifiedTrace]:
"""Find the call to `seize` in the child traces (successful liquidation)"""
for trace in traces:
if trace.classification == Classification.seize:
return trace
return None

View File

@ -4,7 +4,7 @@ from typing import Any, Iterable, List, Optional
from sqlalchemy import create_engine, orm
from sqlalchemy.orm import sessionmaker
from mev_inspect.string_io import StringIteratorIO
from mev_inspect.text_io import StringIteratorIO
def get_trace_database_uri() -> Optional[str]:

View File

@ -30,6 +30,9 @@ def get_liquidations(classified_traces: List[ClassifiedTrace]) -> List[Liquidati
if _is_child_liquidation(trace, parent_liquidations):
continue
if trace.error == "Reverted":
continue
if trace.classification == Classification.liquidate:
parent_liquidations.append(trace)

View File

@ -74,7 +74,10 @@ def _get_punk_bid_acceptances_for_transaction(
if not isinstance(trace, DecodedCallTrace):
continue
elif trace.classification == Classification.punk_accept_bid:
elif (
trace.classification == Classification.punk_accept_bid
and trace.error is None
):
punk_accept_bid = PunkBidAcceptance(
block_number=trace.block_number,
transaction_hash=trace.transaction_hash,

View File

View File

@ -0,0 +1,7 @@
import os
from dramatiq.brokers.redis import RedisBroker
def connect_broker():
return RedisBroker(host="redis-master", password=os.environ["REDIS_PASSWORD"])

View File

@ -0,0 +1,75 @@
import asyncio
import logging
from threading import local
from dramatiq.middleware import Middleware
from mev_inspect.db import get_inspect_sessionmaker, get_trace_sessionmaker
from mev_inspect.inspector import MEVInspector
logger = logging.getLogger(__name__)
class DbMiddleware(Middleware):
STATE = local()
INSPECT_SESSION_STATE_KEY = "InspectSession"
TRACE_SESSION_STATE_KEY = "TraceSession"
@classmethod
def get_inspect_sessionmaker(cls):
return getattr(cls.STATE, cls.INSPECT_SESSION_STATE_KEY, None)
@classmethod
def get_trace_sessionmaker(cls):
return getattr(cls.STATE, cls.TRACE_SESSION_STATE_KEY, None)
def before_process_message(self, _broker, message):
if not hasattr(self.STATE, self.INSPECT_SESSION_STATE_KEY):
logger.info("Building sessionmakers")
setattr(
self.STATE, self.INSPECT_SESSION_STATE_KEY, get_inspect_sessionmaker()
)
setattr(self.STATE, self.TRACE_SESSION_STATE_KEY, get_trace_sessionmaker())
else:
logger.info("Sessionmakers already set")
class InspectorMiddleware(Middleware):
STATE = local()
INSPECT_STATE_KEY = "inspector"
def __init__(self, rpc_url):
self._rpc_url = rpc_url
@classmethod
def get_inspector(cls):
return getattr(cls.STATE, cls.INSPECT_STATE_KEY, None)
def before_process_message(
self, _broker, worker
): # pylint: disable=unused-argument
if not hasattr(self.STATE, self.INSPECT_STATE_KEY):
logger.info("Building inspector")
inspector = MEVInspector(
self._rpc_url,
max_concurrency=5,
request_timeout=300,
)
setattr(self.STATE, self.INSPECT_STATE_KEY, inspector)
else:
logger.info("Inspector already exists")
class AsyncMiddleware(Middleware):
def before_process_message(
self, _broker, message
): # pylint: disable=unused-argument
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
def after_process_message(
self, _broker, message, *, result=None, exception=None
): # pylint: disable=unused-argument
if hasattr(self, "loop"):
self.loop.close()

View File

@ -0,0 +1,51 @@
import asyncio
import logging
from contextlib import contextmanager
from mev_inspect.s3_export import export_block
from .middleware import DbMiddleware, InspectorMiddleware
logger = logging.getLogger(__name__)
HIGH_PRIORITY_QUEUE = "high"
LOW_PRIORITY_QUEUE = "low"
HIGH_PRIORITY = 0
LOW_PRIORITY = 1
def inspect_many_blocks_task(
after_block: int,
before_block: int,
):
with _session_scope(DbMiddleware.get_inspect_sessionmaker()) as inspect_db_session:
with _session_scope(DbMiddleware.get_trace_sessionmaker()) as trace_db_session:
asyncio.run(
InspectorMiddleware.get_inspector().inspect_many_blocks(
inspect_db_session=inspect_db_session,
trace_db_session=trace_db_session,
after_block=after_block,
before_block=before_block,
)
)
def realtime_export_task(block_number: int):
with _session_scope(DbMiddleware.get_inspect_sessionmaker()) as inspect_db_session:
export_block(inspect_db_session, block_number)
def backfill_export_task(block_number: int):
with _session_scope(DbMiddleware.get_inspect_sessionmaker()) as inspect_db_session:
export_block(inspect_db_session, block_number)
@contextmanager
def _session_scope(Session=None):
if Session is None:
yield None
else:
with Session() as session:
yield session

143
mev_inspect/s3_export.py Normal file
View File

@ -0,0 +1,143 @@
import itertools
import json
import logging
import os
from typing import Iterator, Optional, Tuple, TypeVar
import boto3
from mev_inspect.text_io import BytesIteratorIO
AWS_ENDPOINT_URL_ENV = "AWS_ENDPOINT_URL"
EXPORT_BUCKET_NAME_ENV = "EXPORT_BUCKET_NAME"
EXPORT_BUCKET_REGION_ENV = "EXPORT_BUCKET_REGION"
EXPORT_AWS_ACCESS_KEY_ID_ENV = "EXPORT_AWS_ACCESS_KEY_ID"
EXPORT_AWS_SECRET_ACCESS_KEY_ENV = "EXPORT_AWS_SECRET_ACCESS_KEY"
supported_tables = [
"mev_summary",
"arbitrages",
"liquidations",
"sandwiches",
"sandwiched_swaps",
"blocks",
]
logger = logging.getLogger(__name__)
def export_block(inspect_db_session, block_number: int) -> None:
for table in supported_tables:
_export_block_by_table(inspect_db_session, block_number, table)
def _export_block_by_table(inspect_db_session, block_number: int, table: str) -> None:
client = get_s3_client()
export_bucket_name = get_export_bucket_name()
export_statement = _get_export_statement(table)
object_key = f"{table}/flashbots_{block_number}.json"
mev_summary_json_results = inspect_db_session.execute(
statement=export_statement,
params={
"block_number": block_number,
},
)
first_value, mev_summary_json_results = _peek(mev_summary_json_results)
if first_value is None:
existing_object_size = _get_object_size(client, export_bucket_name, object_key)
if existing_object_size is None or existing_object_size == 0:
logger.info(f"Skipping {table} for block {block_number} - no data")
client.delete_object(
Bucket=export_bucket_name,
Key=object_key,
)
return
mev_summary_json_fileobj = BytesIteratorIO(
(f"{json.dumps(row)}\n".encode("utf-8") for (row,) in mev_summary_json_results)
)
client.delete_object(
Bucket=export_bucket_name,
Key=object_key,
)
client.upload_fileobj(
mev_summary_json_fileobj,
Bucket=export_bucket_name,
Key=object_key,
)
logger.info(f"Exported to {object_key}")
def _get_export_statement(table: str) -> str:
return f"""
SELECT to_json(json)
FROM (
SELECT *, CURRENT_TIMESTAMP(0) as timestamp
FROM {table}
) json
WHERE
block_number = :block_number
"""
def _get_object_size(client, bucket: str, key: str) -> Optional[int]:
response = client.list_objects_v2(
Bucket=bucket,
Prefix=key,
)
for obj in response.get("Contents", []):
if obj["Key"] == key:
return obj["Size"]
return None
def get_s3_client():
endpoint_url = get_endpoint_url()
return boto3.client(
"s3",
endpoint_url=endpoint_url,
region_name=get_export_bucket_region(),
aws_access_key_id=get_export_aws_access_key_id(),
aws_secret_access_key=get_export_aws_secret_access_key(),
)
def get_endpoint_url() -> Optional[str]:
return os.environ.get(AWS_ENDPOINT_URL_ENV)
def get_export_bucket_name() -> str:
return os.environ[EXPORT_BUCKET_NAME_ENV]
def get_export_bucket_region() -> Optional[str]:
return os.environ.get(EXPORT_BUCKET_REGION_ENV)
def get_export_aws_access_key_id() -> Optional[str]:
return os.environ.get(EXPORT_AWS_ACCESS_KEY_ID_ENV)
def get_export_aws_secret_access_key() -> Optional[str]:
return os.environ.get(EXPORT_AWS_SECRET_ACCESS_KEY_ENV)
_T = TypeVar("_T")
def _peek(iterable: Iterator[_T]) -> Tuple[Optional[_T], Iterator[_T]]:
try:
first = next(iterable)
except StopIteration:
return None, iter([])
return first, itertools.chain([first], iterable)

View File

@ -3,8 +3,9 @@ from typing import List, Optional
from mev_inspect.schemas.sandwiches import Sandwich
from mev_inspect.schemas.swaps import Swap
UNISWAP_V2_ROUTER = "0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D"
UNISWAP_V3_ROUTER = "0x68b3465833fb72a70ecdf485e0e4c7bd8665fc45"
UNISWAP_V2_ROUTER = "0x7a250d5630b4cf539739df2c5dacb4c659f2488d"
UNISWAP_V3_ROUTER = "0xe592427a0aece92de3edee1f18e0157c05861564"
UNISWAP_V3_ROUTER_2 = "0x68b3465833fb72a70ecdf485e0e4c7bd8665fc45"
def get_sandwiches(swaps: List[Swap]) -> List[Sandwich]:
@ -34,7 +35,11 @@ def _get_sandwich_starting_with_swap(
sandwicher_address = front_swap.to_address
sandwiched_swaps = []
if sandwicher_address in [UNISWAP_V2_ROUTER, UNISWAP_V3_ROUTER]:
if sandwicher_address in [
UNISWAP_V2_ROUTER,
UNISWAP_V3_ROUTER,
UNISWAP_V3_ROUTER_2,
]:
return None
for other_swap in rest_swaps:

View File

@ -13,7 +13,7 @@ class CallResult(CamelModel):
gas_used: int
@validator("gas_used", pre=True)
def maybe_hex_to_int(v):
def maybe_hex_to_int(cls, v):
if isinstance(v, str):
return hex_to_int(v)
return v
@ -27,7 +27,7 @@ class CallAction(Web3Model):
gas: int
@validator("value", "gas", pre=True)
def maybe_hex_to_int(v):
def maybe_hex_to_int(cls, v):
if isinstance(v, str):
return hex_to_int(v)
return v

View File

@ -24,7 +24,7 @@ class Receipt(CamelModel):
"cumulative_gas_used",
pre=True,
)
def maybe_hex_to_int(v):
def maybe_hex_to_int(cls, v):
if isinstance(v, str):
return hex_to_int(v)
return v

View File

@ -38,3 +38,39 @@ class StringIteratorIO(io.TextIOBase):
n -= len(m)
line.append(m)
return "".join(line)
class BytesIteratorIO(io.BufferedIOBase):
def __init__(self, iter: Iterator[bytes]):
self._iter = iter
self._buff = b""
def readable(self) -> bool:
return True
def _read1(self, n: Optional[int] = None) -> bytes:
while not self._buff:
try:
self._buff = next(self._iter)
except StopIteration:
break
ret = self._buff[:n]
self._buff = self._buff[len(ret) :]
return ret
def read(self, n: Optional[int] = None) -> bytes:
line = []
if n is None or n < 0:
while True:
m = self._read1()
if not m:
break
line.append(m)
else:
while n > 0:
m = self._read1(n)
if not m:
break
n -= len(m)
line.append(m)
return b"".join(line)

2603
poetry.lock generated

File diff suppressed because it is too large Load Diff

18
pull_request_template.md Normal file
View File

@ -0,0 +1,18 @@
## What does this PR do?
A short description of what the PR does.
## Related issue
Link to the issue this PR addresses.
If there isn't already an open issue, create an issue first. This will be our home for discussing the problem itself.
## Testing
What testing was performed to verify this works? Unit tests are a big plus!
## Checklist before merging
- [ ] Read the [contributing guide](https://github.com/flashbots/mev-inspect-py/blob/main/CONTRIBUTING.md)
- [ ] Installed and ran pre-commit hooks
- [ ] All tests pass with `./mev test`

View File

@ -10,10 +10,13 @@ web3 = "^5.23.0"
pydantic = "^1.8.2"
hexbytes = "^0.2.1"
click = "^8.0.1"
psycopg2 = "^2.9.1"
psycopg2-binary = "^2.9.7"
aiohttp = "^3.8.0"
dramatiq = {extras = ["redis"], version = "^1.12.1"}
pycoingecko = "^2.2.0"
boto3 = "^1.20.48"
aiohttp-retry = "^2.4.6"
pyyaml = "^6.0.1"
[tool.poetry.dev-dependencies]
pre-commit = "^2.13.0"
@ -28,6 +31,7 @@ alembic = "^1.6.5"
CProfileV = "^1.0.7"
regex = "^2021.10.8"
pytest-profiling = "^1.7.0"
sqlalchemy = "^1.4.23"
[build-system]
requires = ["poetry-core>=1.0.0"]
@ -37,9 +41,13 @@ build-backend = "poetry.core.masonry.api"
inspect-block = 'cli:inspect_block_command'
inspect-many-blocks = 'cli:inspect_many_blocks_command'
enqueue-many-blocks = 'cli:enqueue_many_blocks_command'
enqueue-block-list = 'cli:enqueue_block_list_command'
fetch-block = 'cli:fetch_block_command'
fetch-all-prices = 'cli:fetch_all_prices'
fetch-range = 'cli:fetch_range'
s3-export = 'cli:s3_export'
enqueue-s3-export = 'cli:enqueue_s3_export'
enqueue-many-s3-exports = 'cli:enqueue_many_s3_exports'
[tool.black]
exclude = '''
@ -76,3 +84,6 @@ filter_files = true
known_first_party = "mev_inspect"
known_third_party = "alembic"
py_version=39
[pytest]
asyncio_mode = "auto"

27188
tests/blocks/10921990.json Normal file

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

65
tests/test_block.py Normal file
View File

@ -0,0 +1,65 @@
from unittest.mock import MagicMock, patch
import pytest
from mev_inspect.block import _find_or_fetch_miner_address
from tests.utils import load_test_block
@pytest.fixture
def mocked_web3():
with patch("mev_inspect.block.Web3") as mock_web3:
yield mock_web3
@pytest.mark.asyncio
# pylint: disable=redefined-outer-name
async def test_eth1_block_miner(mocked_web3):
# Create a mock Web3 instance
mock_web3_instance = mocked_web3.return_value
# Set up the mock for web3.eth.get_block
mock_eth = mock_web3_instance.eth
mock_eth.get_block.return_value = {
"miner": "0x4a536c1f6a5d5a9c1aeca9f6d04fbbf5f0d8f4e3"
}
# Load a sample block and remove the miner
block_number = 10921991
block = load_test_block(block_number)
block.miner = None
# Test that the miner is fetched
miner_address = await _find_or_fetch_miner_address(
w3=mock_web3_instance, traces=block.traces, block_number=block_number
) # Use 'await'
# this is within the traces object
assert miner_address == "0x52bc44d5378309ee2abf1539bf71de1b7d7be3b5"
@pytest.mark.asyncio
# pylint: disable=redefined-outer-name
async def test_eth2_block_miner(mocked_web3):
# Create a mock Web3 instance
mock_web3_instance = mocked_web3.return_value
# Create a coroutine function to mock w3.eth.get_block
# pylint: disable=unused-argument
async def mock_get_block(block_number):
return {"miner": "0x4a536c1f6a5d5a9c1aeca9f6d04fbbf5f0d8f4e3"}
# Mock w3.eth.get_block with the coroutine function
mock_web3_instance.eth.get_block = MagicMock(side_effect=mock_get_block)
# Load a sample block and remove the miner
block_number = 10921990
block = load_test_block(block_number)
block.miner = None
# Test that the miner is fetched
miner_address = await _find_or_fetch_miner_address(
w3=mock_web3_instance, traces=block.traces, block_number=block_number
) # Use 'await'
assert miner_address == "0x4a536c1f6a5d5a9c1aeca9f6d04fbbf5f0d8f4e3"

View File

@ -3,10 +3,9 @@ from mev_inspect.liquidations import get_liquidations
from mev_inspect.schemas.liquidations import Liquidation
from mev_inspect.schemas.prices import ETH_TOKEN_ADDRESS
from mev_inspect.schemas.traces import Protocol
from tests.utils import load_comp_markets, load_cream_markets, load_test_block
from tests.utils import load_comp_markets, load_test_block
comp_markets = load_comp_markets()
cream_markets = load_cream_markets()
def test_c_ether_liquidations(trace_classifier: TraceClassifier):
@ -118,29 +117,16 @@ def test_c_token_liquidation(trace_classifier: TraceClassifier):
assert liquidation in result
def test_cream_token_liquidation(trace_classifier: TraceClassifier):
block_number = 12674514
def test_reverted_liquidation(trace_classifier: TraceClassifier):
block_number = 15049646
transaction_hash = (
"0x0809bdbbddcf566e5392682a9bd9d0006a92a4dc441163c791b1136f982994b1"
"0x6dd0d8be8a77651f64ef399b47fbc87011bd796b43349c3164ff7da965e0b345"
)
liquidations = [
Liquidation(
liquidated_user="0x46bf9479dc569bc796b7050344845f6564d45fba",
liquidator_user="0xa2863cad9c318669660eb4eca8b3154b90fb4357",
debt_token_address="0x514910771af9ca656af840dff83e8264ecf986ca",
debt_purchase_amount=14857434973806369550,
received_amount=1547215810826,
received_token_address="0x44fbebd2f576670a6c33f6fc0b00aa8c5753b322",
protocol=Protocol.cream,
transaction_hash=transaction_hash,
trace_address=[],
block_number=block_number,
)
]
block = load_test_block(block_number)
classified_traces = trace_classifier.classify(block.traces)
result = get_liquidations(classified_traces)
for liquidation in liquidations:
assert liquidation in result
assert transaction_hash not in [
liquidation.transaction_hash for liquidation in result
]

64
tests/test_cream.py Normal file
View File

@ -0,0 +1,64 @@
from mev_inspect.classifiers.trace import TraceClassifier
from mev_inspect.liquidations import get_liquidations
from mev_inspect.schemas.liquidations import Liquidation
from mev_inspect.schemas.prices import ETH_TOKEN_ADDRESS
from mev_inspect.schemas.traces import Protocol
from tests.utils import load_cream_markets, load_test_block
cream_markets = load_cream_markets()
def test_cream_ether_liquidation(trace_classifier: TraceClassifier):
block_number = 13404932
transaction_hash = (
"0xf5f3df6ec9b51e8e88d0d9078b04373742294530b6bcb9be045525fcab71b915"
)
liquidations = [
Liquidation(
liquidated_user="0x44f9636ef615a73688a84da1d714a40be503157d",
liquidator_user="0x949ed86c385d191e96af136e2024d96e467d7651",
debt_token_address=ETH_TOKEN_ADDRESS,
debt_purchase_amount=1002704779407853614,
received_amount=417926832636968,
received_token_address="0x2db6c82ce72c8d7d770ba1b5f5ed0b6e075066d6",
protocol=Protocol.cream,
transaction_hash=transaction_hash,
trace_address=[1, 0, 5, 1],
block_number=block_number,
)
]
block = load_test_block(block_number)
classified_traces = trace_classifier.classify(block.traces)
result = get_liquidations(classified_traces)
for liquidation in liquidations:
assert liquidation in result
def test_cream_token_liquidation(trace_classifier: TraceClassifier):
block_number = 12674514
transaction_hash = (
"0x0809bdbbddcf566e5392682a9bd9d0006a92a4dc441163c791b1136f982994b1"
)
liquidations = [
Liquidation(
liquidated_user="0x46bf9479dc569bc796b7050344845f6564d45fba",
liquidator_user="0xa2863cad9c318669660eb4eca8b3154b90fb4357",
debt_token_address="0x514910771af9ca656af840dff83e8264ecf986ca",
debt_purchase_amount=14857434973806369550,
received_amount=1547215810826,
received_token_address="0x44fbebd2f576670a6c33f6fc0b00aa8c5753b322",
protocol=Protocol.cream,
transaction_hash=transaction_hash,
trace_address=[],
block_number=block_number,
)
]
block = load_test_block(block_number)
classified_traces = trace_classifier.classify(block.traces)
result = get_liquidations(classified_traces)
for liquidation in liquidations:
assert liquidation in result

View File

@ -2,8 +2,6 @@ import json
import os
from typing import Dict, List
from pydantic import parse_file_as
from mev_inspect.schemas.blocks import Block
from mev_inspect.schemas.sandwiches import Sandwich
@ -14,7 +12,10 @@ TEST_SANDWICHES_DIRECTORY = os.path.join(THIS_FILE_DIRECTORY, "sandwiches")
def load_test_sandwiches(block_number: int) -> List[Sandwich]:
sandwiches_path = f"{TEST_SANDWICHES_DIRECTORY}/{block_number}.json"
return parse_file_as(List[Sandwich], sandwiches_path)
with open(sandwiches_path, "r") as file:
sandwiches_data = json.load(file)
return [Sandwich(**sandwich) for sandwich in sandwiches_data]
def load_test_block(block_number: int) -> Block:

102
worker.py
View File

@ -1,87 +1,39 @@
import asyncio
import logging
import os
import sys
import threading
from contextlib import contextmanager
import dramatiq
from dramatiq.brokers.redis import RedisBroker
from dramatiq.cli import main as dramatiq_worker
from dramatiq.middleware import Middleware
from mev_inspect.db import get_inspect_sessionmaker, get_trace_sessionmaker
from mev_inspect.inspector import MEVInspector
from mev_inspect.queue.broker import connect_broker
from mev_inspect.queue.middleware import (
AsyncMiddleware,
DbMiddleware,
InspectorMiddleware,
)
from mev_inspect.queue.tasks import (
HIGH_PRIORITY,
HIGH_PRIORITY_QUEUE,
LOW_PRIORITY,
LOW_PRIORITY_QUEUE,
backfill_export_task,
inspect_many_blocks_task,
realtime_export_task,
)
InspectSession = get_inspect_sessionmaker()
TraceSession = get_trace_sessionmaker()
thread_local = threading.local()
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logger = logging.getLogger(__name__)
class AsyncMiddleware(Middleware):
def before_process_message(
self, _broker, message
): # pylint: disable=unused-argument
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
def after_process_message(
self, _broker, message, *, result=None, exception=None
): # pylint: disable=unused-argument
self.loop.close()
class InspectorMiddleware(Middleware):
def before_process_message(
self, _broker, worker
): # pylint: disable=unused-argument
rpc = os.environ["RPC_URL"]
if not hasattr(thread_local, "inspector"):
logger.info("Building inspector")
thread_local.inspector = MEVInspector(
rpc,
max_concurrency=5,
request_timeout=300,
)
else:
logger.info("Inspector already exists")
broker = RedisBroker(host="redis-master", password=os.environ["REDIS_PASSWORD"])
broker = connect_broker()
broker.add_middleware(DbMiddleware())
broker.add_middleware(AsyncMiddleware())
broker.add_middleware(InspectorMiddleware())
broker.add_middleware(InspectorMiddleware(os.environ["RPC_URL"]))
dramatiq.set_broker(broker)
@contextmanager
def session_scope(Session=None):
if Session is None:
yield None
else:
with Session() as session:
yield session
@dramatiq.actor
def inspect_many_blocks_task(
after_block: int,
before_block: int,
):
with session_scope(InspectSession) as inspect_db_session:
with session_scope(TraceSession) as trace_db_session:
asyncio.run(
thread_local.inspector.inspect_many_blocks(
inspect_db_session=inspect_db_session,
trace_db_session=trace_db_session,
after_block=after_block,
before_block=before_block,
)
)
if __name__ == "__main__":
dramatiq_worker(processes=1, threads=1)
dramatiq.actor(
inspect_many_blocks_task, queue_name=LOW_PRIORITY_QUEUE, priority=LOW_PRIORITY
)
dramatiq.actor(
backfill_export_task, queue_name=LOW_PRIORITY_QUEUE, priority=LOW_PRIORITY
)
dramatiq.actor(
realtime_export_task, queue_name=HIGH_PRIORITY_QUEUE, priority=HIGH_PRIORITY
)