Merge branch 'main' of https://github.com/flashbots/mev-inspect-py into flashbots-main

This commit is contained in:
Patrick Daly 2021-07-28 20:30:49 -07:00
commit 5586e8ff76
61 changed files with 50527 additions and 40802 deletions

13
.env Normal file
View File

@ -0,0 +1,13 @@
# Postgres
POSTGRES_SERVER=db
POSTGRES_USER=postgres
POSTGRES_PASSWORD=password
POSTGRES_DB=mev_inspect
# PgAdmin
PGADMIN_LISTEN_PORT=5050
PGADMIN_DEFAULT_EMAIL=admin@example.com
PGADMIN_DEFAULT_PASSWORD=password
# SQLAlchemy
SQLALCHEMY_DATABASE_URI=postgresql://$POSTGRES_USER:$POSTGRES_PASSWORD@$POSTGRES_SERVER/$POSTGRES_DB

58
.github/workflows/github-actions.yml vendored Normal file
View File

@ -0,0 +1,58 @@
name: Python package
on: [push, pull_request]
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Python 3.9
uses: actions/setup-python@v2
with:
python-version: 3.9
- name: Get full Python version
id: full-python-version
shell: bash
run: echo ::set-output name=version::$(python -c "import sys; print('-'.join(str(v) for v in sys.version_info))")
- name: Bootstrap poetry
shell: bash
run: |
curl -sL https://raw.githubusercontent.com/python-poetry/poetry/master/install-poetry.py \
| python - -y
- name: Update PATH
if: ${{ matrix.os != 'Windows' }}
shell: bash
run: echo "$HOME/.local/bin" >> $GITHUB_PATH
- name: Configure poetry
shell: bash
run: poetry config virtualenvs.in-project true
- name: Set up cache
uses: actions/cache@v2
id: cache
with:
path: .venv
key: venv-${{ runner.os }}-${{ steps.full-python-version.outputs.version }}-${{ hashFiles('**/poetry.lock') }}
- name: Ensure cache is healthy
if: steps.cache.outputs.cache-hit == 'true'
shell: bash
run: timeout 10s poetry run pip --version || rm -rf .venv
- name: Install dependencies
shell: bash
run: poetry install
- name: Run precommit
run: |
poetry run pre-commit
- name: Test with pytest
shell: bash
run: poetry run test

15
.gitignore vendored Normal file
View File

@ -0,0 +1,15 @@
# venv and test cache files
env/
__pycache__
.mypy_cache
# vim temp
*.sw?
.*.sw?
# pytest cache
.pytest_cache/
# coverage
htmlcov
.coverage*

20
.pre-commit-config.yaml Normal file
View File

@ -0,0 +1,20 @@
repos:
- repo: https://github.com/ambv/black
rev: 20.8b1
hooks:
- id: black
language_version: python3.9
- repo: local
hooks:
- id: pylint
name: pylint
entry: poetry run pylint
args: ['--rcfile=.pylintrc', --disable=redefined-builtin]
language: system
types: [python]
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v0.910
hooks:
- id: 'mypy'
additional_dependencies:
- 'pydantic'

503
.pylintrc Normal file
View File

@ -0,0 +1,503 @@
[MASTER]
# A comma-separated list of package or module names from where C extensions may
# be loaded. Extensions are loading into the active Python interpreter and may
# run arbitrary code.
extension-pkg-whitelist=pydantic
# Add files or directories to the blacklist. They should be base names, not
# paths.
ignore=
# Add files or directories matching the regex patterns to the blacklist. The
# regex matches against base names, not paths.
ignore-patterns=
# Python code to execute, usually for sys.path manipulation such as
# pygtk.require().
#init-hook=
# Use multiple processes to speed up Pylint. Specifying 0 will auto-detect the
# number of processors available to use.
jobs=1
# Control the amount of potential inferred values when inferring a single
# object. This can help the performance when dealing with large functions or
# complex, nested conditions.
limit-inference-results=100
# List of plugins (as comma separated values of python module names) to load,
# usually to register additional checkers.
load-plugins=
# Pickle collected data for later comparisons.
persistent=yes
# Specify a configuration file.
#rcfile=
# When enabled, pylint would attempt to guess common misconfiguration and emit
# user-friendly hints instead of false-positive error messages.
suggestion-mode=yes
# Allow loading of arbitrary C extensions. Extensions are imported into the
# active Python interpreter and may run arbitrary code.
unsafe-load-any-extension=no
[MESSAGES CONTROL]
# Only show warnings with the listed confidence levels. Leave empty to show
# all. Valid levels: HIGH, INFERENCE, INFERENCE_FAILURE, UNDEFINED.
confidence=
# Disable the message, report, category or checker with the given id(s). You
# can either give multiple identifiers separated by comma (,) or put this
# option multiple times (only on the command line, not in the configuration
# file where it should appear only once). You can also use "--disable=all" to
# disable everything first and then reenable specific checks. For example, if
# you want to run only the similarities checker, you can use "--disable=all
# --enable=similarities". If you want to run only the classes checker, but have
# no Warning level messages displayed, use "--disable=all --enable=classes
# --disable=W".
disable=all
# Enable the message, report, category or checker with the given id(s). You can
# either give multiple identifier separated by comma (,) or put this option
# multiple time (only on the command line, not in the configuration file where
# it should appear only once). See also the "--disable" option for examples.
enable=c-extension-no-member, imports, variables
[REPORTS]
# Python expression which should return a score less than or equal to 10. You
# have access to the variables 'error', 'warning', 'refactor', and 'convention'
# which contain the number of messages in each category, as well as 'statement'
# which is the total number of statements analyzed. This score is used by the
# global evaluation report (RP0004).
evaluation=10.0 - ((float(5 * error + warning + refactor + convention) / statement) * 10)
# Template used to display messages. This is a python new-style format string
# used to format the message information. See doc for all details.
#msg-template=
# Set the output format. Available formats are text, parseable, colorized, json
# and msvs (visual studio). You can also give a reporter class, e.g.
# mypackage.mymodule.MyReporterClass.
output-format=text
# Tells whether to display a full report or only the messages.
reports=no
# Activate the evaluation score.
score=yes
[REFACTORING]
# Maximum number of nested blocks for function / method body
max-nested-blocks=5
# Complete name of functions that never returns. When checking for
# inconsistent-return-statements if a never returning function is called then
# it will be considered as an explicit return statement and no message will be
# printed.
never-returning-functions=sys.exit
[LOGGING]
# Format style used to check logging format string. `old` means using %
# formatting, `new` is for `{}` formatting,and `fstr` is for f-strings.
logging-format-style=old
# Logging modules to check that the string format arguments are in logging
# function parameter format.
logging-modules=logging
[SPELLING]
# Limits count of emitted suggestions for spelling mistakes.
max-spelling-suggestions=4
# Spelling dictionary name. Available dictionaries: none. To make it work,
# install the python-enchant package.
spelling-dict=
# List of comma separated words that should not be checked.
spelling-ignore-words=
# A path to a file that contains the private dictionary; one word per line.
spelling-private-dict-file=
# Tells whether to store unknown words to the private dictionary (see the
# --spelling-private-dict-file option) instead of raising a message.
spelling-store-unknown-words=no
[MISCELLANEOUS]
# List of note tags to take in consideration, separated by a comma.
notes=FIXME,
XXX,
TODO
[TYPECHECK]
# List of decorators that produce context managers, such as
# contextlib.contextmanager. Add to this list to register other decorators that
# produce valid context managers.
contextmanager-decorators=contextlib.contextmanager
# List of members which are set dynamically and missed by pylint inference
# system, and so shouldn't trigger E1101 when accessed. Python regular
# expressions are accepted.
generated-members=numpy.*,torch.*,spacy.attrs.*
# Tells whether missing members accessed in mixin class should be ignored. A
# mixin class is detected if its name ends with "mixin" (case insensitive).
ignore-mixin-members=yes
# Tells whether to warn about missing members when the owner of the attribute
# is inferred to be None.
ignore-none=yes
# This flag controls whether pylint should warn about no-member and similar
# checks whenever an opaque object is returned when inferring. The inference
# can return multiple potential results while evaluating a Python object, but
# some branches might not be evaluated, which results in partial inference. In
# that case, it might be useful to still emit no-member and other checks for
# the rest of the inferred objects.
ignore-on-opaque-inference=yes
# List of class names for which member attributes should not be checked (useful
# for classes with dynamically set attributes). This supports the use of
# qualified names.
ignored-classes=optparse.Values,thread._local,_thread._local
# List of module names for which member attributes should not be checked
# (useful for modules/projects where namespaces are manipulated during runtime
# and thus existing member attributes cannot be deduced by static analysis). It
# supports qualified module names, as well as Unix pattern matching.
ignored-modules=
# Show a hint with possible names when a member name was not found. The aspect
# of finding the hint is based on edit distance.
missing-member-hint=yes
# The minimum edit distance a name should have in order to be considered a
# similar match for a missing member name.
missing-member-hint-distance=1
# The total number of similar names that should be taken in consideration when
# showing a hint for a missing member.
missing-member-max-choices=1
# List of decorators that change the signature of a decorated function.
signature-mutators=
[VARIABLES]
# List of additional names supposed to be defined in builtins. Remember that
# you should avoid defining new builtins when possible.
additional-builtins=
# Tells whether unused global variables should be treated as a violation.
allow-global-unused-variables=yes
# List of strings which can identify a callback function by name. A callback
# name must start or end with one of those strings.
callbacks=cb_,
_cb
# A regular expression matching the name of dummy variables (i.e. expected to
# not be used).
dummy-variables-rgx=_+$|(_[a-zA-Z0-9_]*[a-zA-Z0-9]+?$)|dummy|^ignored_|^unused_
# Argument names that match this expression will be ignored. Default to name
# with leading underscore.
ignored-argument-names=_.*|^ignored_|^unused_
# Tells whether we should check for unused import in __init__ files.
init-import=no
# List of qualified module names which can have objects that can redefine
# builtins.
redefining-builtins-modules=six.moves,past.builtins,future.builtins,builtins,io
[FORMAT]
# Expected format of line ending, e.g. empty (any line ending), LF or CRLF.
expected-line-ending-format=
# Regexp for a line that is allowed to be longer than the limit.
ignore-long-lines=^\s*(# )?<?https?://\S+>?$
# Number of spaces of indent required inside a hanging or continued line.
indent-after-paren=4
# String used as indentation unit. This is usually " " (4 spaces) or "\t" (1
# tab).
indent-string=' '
# Maximum number of characters on a single line.
max-line-length=100
# Maximum number of lines in a module.
max-module-lines=1000
# List of optional constructs for which whitespace checking is disabled. `dict-
# separator` is used to allow tabulation in dicts, etc.: {1 : 1,\n222: 2}.
# `trailing-comma` allows a space between comma and closing bracket: (a, ).
# `empty-line` allows space-only lines.
no-space-check=trailing-comma,
dict-separator
# Allow the body of a class to be on the same line as the declaration if body
# contains single statement.
single-line-class-stmt=no
# Allow the body of an if to be on the same line as the test if there is no
# else.
single-line-if-stmt=no
[SIMILARITIES]
# Ignore comments when computing similarities.
ignore-comments=yes
# Ignore docstrings when computing similarities.
ignore-docstrings=yes
# Ignore imports when computing similarities.
ignore-imports=no
# Minimum lines number of a similarity.
min-similarity-lines=4
[BASIC]
# Naming style matching correct argument names.
argument-naming-style=snake_case
# Regular expression matching correct argument names. Overrides argument-
# naming-style.
#argument-rgx=
# Naming style matching correct attribute names.
attr-naming-style=snake_case
# Regular expression matching correct attribute names. Overrides attr-naming-
# style.
#attr-rgx=
# Bad variable names which should always be refused, separated by a comma.
bad-names=foo,
bar,
baz,
toto,
tutu,
tata
# Naming style matching correct class attribute names.
class-attribute-naming-style=any
# Regular expression matching correct class attribute names. Overrides class-
# attribute-naming-style.
#class-attribute-rgx=
# Naming style matching correct class names.
class-naming-style=PascalCase
# Regular expression matching correct class names. Overrides class-naming-
# style.
#class-rgx=
# Naming style matching correct constant names.
const-naming-style=UPPER_CASE
# Regular expression matching correct constant names. Overrides const-naming-
# style.
#const-rgx=
# Minimum line length for functions/classes that require docstrings, shorter
# ones are exempt.
docstring-min-length=-1
# Naming style matching correct function names.
function-naming-style=snake_case
# Regular expression matching correct function names. Overrides function-
# naming-style.
#function-rgx=
# Good variable names which should always be accepted, separated by a comma.
good-names=i,
j,
k,
ex,
Run,
_
# Include a hint for the correct naming format with invalid-name.
include-naming-hint=no
# Naming style matching correct inline iteration names.
inlinevar-naming-style=any
# Regular expression matching correct inline iteration names. Overrides
# inlinevar-naming-style.
#inlinevar-rgx=
# Naming style matching correct method names.
method-naming-style=snake_case
# Regular expression matching correct method names. Overrides method-naming-
# style.
#method-rgx=
# Naming style matching correct module names.
module-naming-style=snake_case
# Regular expression matching correct module names. Overrides module-naming-
# style.
#module-rgx=
# Colon-delimited sets of names that determine each other's naming style when
# the name regexes allow several styles.
name-group=
# Regular expression which should only match function or class names that do
# not require a docstring.
no-docstring-rgx=^_
# List of decorators that produce properties, such as abc.abstractproperty. Add
# to this list to register other decorators that produce valid properties.
# These decorators are taken in consideration only for invalid-name.
property-classes=abc.abstractproperty
# Naming style matching correct variable names.
variable-naming-style=snake_case
# Regular expression matching correct variable names. Overrides variable-
# naming-style.
#variable-rgx=
[STRING]
# This flag controls whether the implicit-str-concat-in-sequence should
# generate a warning on implicit string concatenation in sequences defined over
# several lines.
check-str-concat-over-line-jumps=no
[IMPORTS]
# List of modules that can be imported at any level, not just the top level
# one.
allow-any-import-level=
# Allow wildcard imports from modules that define __all__.
allow-wildcard-with-all=no
# Analyse import fallback blocks. This can be used to support both Python 2 and
# 3 compatible code, which means that the block might have code that exists
# only in one or another interpreter, leading to false positives when analysed.
analyse-fallback-blocks=no
# Deprecated modules which should not be used, separated by a comma.
deprecated-modules=optparse,tkinter.tix
# Create a graph of external dependencies in the given file (report RP0402 must
# not be disabled).
ext-import-graph=
# Create a graph of every (i.e. internal and external) dependencies in the
# given file (report RP0402 must not be disabled).
import-graph=
# Create a graph of internal dependencies in the given file (report RP0402 must
# not be disabled).
int-import-graph=
# Force import order to recognize a module as part of the standard
# compatibility libraries.
known-standard-library=
# Force import order to recognize a module as part of a third party library.
known-third-party=enchant
# Couples of modules and preferred modules, separated by a comma.
preferred-modules=
[CLASSES]
# List of method names used to declare (i.e. assign) instance attributes.
defining-attr-methods=__init__,
__new__,
setUp,
__post_init__
# List of member names, which should be excluded from the protected access
# warning.
exclude-protected=_asdict,
_fields,
_replace,
_source,
_make
# List of valid names for the first argument in a class method.
valid-classmethod-first-arg=cls
# List of valid names for the first argument in a metaclass class method.
valid-metaclass-classmethod-first-arg=cls
[DESIGN]
# Maximum number of arguments for function / method.
max-args=5
# Maximum number of attributes for a class (see R0902).
max-attributes=7
# Maximum number of boolean expressions in an if statement (see R0916).
max-bool-expr=5
# Maximum number of branch for function / method body.
max-branches=12
# Maximum number of locals for function / method body.
max-locals=15
# Maximum number of parents for a class (see R0901).
max-parents=7
# Maximum number of public methods for a class (see R0904).
max-public-methods=20
# Maximum number of return / yield for function / method body.
max-returns=6
# Maximum number of statements in function / method body.
max-statements=50
# Minimum number of public methods for a class (see R0903).
min-public-methods=2
[EXCEPTIONS]
# Exceptions that will emit a warning when being caught. Defaults to
# "BaseException, Exception".
overgeneral-exceptions=BaseException,
Exception

19
Dockerfile Normal file
View File

@ -0,0 +1,19 @@
FROM python:3.9
RUN pip install -U pip \
&& apt-get update \
&& curl -sSL https://raw.githubusercontent.com/python-poetry/poetry/master/get-poetry.py | python -
ENV PATH="${PATH}:/root/.poetry/bin"
COPY . /app
WORKDIR /app/
# poetry uses virtual env by default, turn this off inside container
RUN poetry config virtualenvs.create false && \
poetry install
# easter eggs 😝
RUN echo "PS1='🕵️:\[\033[1;36m\]\h \[\033[1;34m\]\W\[\033[0;35m\]\[\033[1;36m\]$ \[\033[0m\]'" >> ~/.bashrc
CMD /bin/bash

106
README.md
View File

@ -1,4 +1,104 @@
# mev-inspect-py
MEV-inspect-py is a script which "inspects" an Ethereum block, or range of blocks, and tries to identify and analyze transactions which extract MEV. For example, it will identify and quantify arbitrage trades which capture profit from mispricing across two DEXes in a single transaction.
# mev-inspect
A [WIP] Ethereum MEV Inspector in Python managed by Poetry
MEV-inspect-py is currently a work in progress that builds on the work done in [MEV-inspect-rs](https://github.com/flashbots/mev-inspect-rs). In the coming weeks we will release a foundation from which contributors can add new inspectors.
## Containers
mev-inspect's local setup is built on [Docker Compose](https://docs.docker.com/compose/)
By default it starts up:
- `mev-insepct` - a container with the code in this repo used for running scripts
- `db` - a postgres database instance
- `pgadmin` - a postgres DB UI for querying and more (avaiable at localhost:5050)
## Running locally
Setup [Docker](https://www.docker.com/products/docker-desktop)
Setup [Poetry](https://python-poetry.org/docs/#osx--linux--bashonwindows-install-instructions)
Install dependencies through poetry
```
poetry install
```
Start the services (optionally as background processes)
```
poetry run start [-b]
```
Apply the latest migrations against the local DB:
```
poetry run exec alembic upgrade head
```
Run inspect on a block
```
poetry run inspect --block-number 11931270 --rpc 'http://111.11.11.111:8545/'
```
To stop the services (if running in the background, otherwise just ctrl+c)
```
poetry run stop
```
MEV container can be attached via
```
poetry run attach
```
Running additional compose commands are possible through standard `docker
compose ...` calls. Check `docker compose help` for more tools available
## Executing scripts
Any script can be run from the mev-inspect container like
```
poetry run exec <your command here>
```
For example
```
poetry run exec python examples/uniswap_inspect.py -block_number=123 -rpc='111.111.111'
```
### Poetry Scripts
```bash
# code check
poetry run lint # linting via Pylint
poetry run test # testing and code coverage with Pytest
poetry run isort # fixing imports
poetry run mypy # type checking
poetry run black # style guide
poetry run pre-commit run --all-files # runs Black, PyLint and MyPy
# docker management
poetry run start [-b] # starts all services, optionally in the background
poetry run stop # shutsdown all services or just ctrl + c if foreground
poetry run build # rebuilds containers
poetry run attach # enters the mev-inspect container in interactive mode
# launches inspection script
poetry run inspect --block-number 11931270 --rpc 'http://111.11.11.111:8545/'
```
## Rebuilding containers
After changes to the app's Dockerfile, rebuild with
```
poetry run build
```
## Using PGAdmin
1. Go to [localhost:5050](localhost:5050)
2. Login with the PGAdmin username and password in `.env`
3. Add a new engine for mev_inspect with
- host: db
- user / password: see `.env`
## Contributing
Pre-commit is used to maintain a consistent style, prevent errors and ensure test coverage.
Install pre-commit with:
```
poetry run pre-commit install
```
Update README if needed

89
alembic.ini Normal file
View File

@ -0,0 +1,89 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = alembic
# template used to generate migration files
# file_template = %%(rev)s_%%(slug)s
# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory.
prepend_sys_path = .
# timezone to use when rendering the date
# within the migration file as well as the filename.
# string value is passed to dateutil.tz.gettz()
# leave blank for localtime
# timezone =
# max length of characters to apply to the
# "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; this defaults
# to alembic/versions. When using multiple version
# directories, initial revisions must be specified with --version-path
# version_locations = %(here)s/bar %(here)s/bat alembic/versions
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
# sqlalchemy.url = postgresql://postgres:password@db/mev_inspect
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks = black
# black.type = console_scripts
# black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

1
alembic/README Normal file
View File

@ -0,0 +1 @@
Generic single-database configuration.

78
alembic/env.py Normal file
View File

@ -0,0 +1,78 @@
import os
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
config.set_main_option("sqlalchemy.url", os.environ["SQLALCHEMY_DATABASE_URI"])
# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = None
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

24
alembic/script.py.mako Normal file
View File

@ -0,0 +1,24 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade():
${upgrades if upgrades else "pass"}
def downgrade():
${downgrades if downgrades else "pass"}

View File

@ -0,0 +1,47 @@
"""Create classifications table
Revision ID: 0660432b9840
Revises:
Create Date: 2021-07-23 20:08:42.016711
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "0660432b9840"
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
"classified_traces",
sa.Column("classified_at", sa.TIMESTAMP, server_default=sa.func.now()),
sa.Column("transaction_hash", sa.String(66), nullable=False),
sa.Column("block_number", sa.Numeric, nullable=False),
sa.Column(
"classification",
sa.String(256),
nullable=False,
),
sa.Column("trace_type", sa.String(256), nullable=False),
sa.Column("trace_address", sa.String(256), nullable=False),
sa.Column("protocol", sa.String(256), nullable=True),
sa.Column("abi_name", sa.String(1024), nullable=True),
sa.Column("function_name", sa.String(2048), nullable=True),
sa.Column("function_signature", sa.String(2048), nullable=True),
sa.Column("inputs", sa.JSON, nullable=True),
sa.Column("from_address", sa.String(256), nullable=True),
sa.Column("to_address", sa.String(256), nullable=True),
sa.Column("gas", sa.Numeric, nullable=True),
sa.Column("value", sa.Numeric, nullable=True),
sa.Column("gas_used", sa.Numeric, nullable=True),
sa.Column("error", sa.String(256), nullable=True),
sa.PrimaryKeyConstraint("transaction_hash", "trace_address"),
)
def downgrade():
op.drop_table("classified_traces")

View File

@ -1,98 +0,0 @@
from web3 import Web3
from pathlib import Path
import json
cache_directoty = './cache'
class BlockData:
def __init__(self, block_number, data, receipts, calls, logs) -> None:
self.block_number = block_number
self.data = data
self.receipts = receipts
self.calls = calls
self.logs = logs
self.transaction_hashes = self.get_transaction_hashes()
pass
## Gets a list of unique transasction hashes in the calls of this block
def get_transaction_hashes(self):
result = []
for call in self.calls:
if call['type'] != 'reward':
if call['transactionHash'] in result:
continue
else:
result.append(call['transactionHash'])
return result
## Makes a nicely formatted JSON object out of this data object.
def toJSON(self):
return json.dumps(self, default=lambda o: o.__dict__,
sort_keys=True, indent=4)
## Writes this object to a JSON file for loading later
def writeJSON(self):
json_data = self.toJSON()
cache_file = '{cacheDirectory}/{blockNumber}.json'.format(cacheDirectory=cache_directoty, blockNumber=self.block_number)
file_exists = Path(cache_file).is_file()
if file_exists:
f = open(cache_file, "w")
f.write(json_data)
f.close()
else:
f = open(cache_file, "x")
f.write(json_data)
f.close()
## Gets all the calls associated with a transaction hash
def get_filtered_calls(self, hash):
result = []
for call in self.calls:
if call['transactionHash'] == hash:
result.append(call)
return result
## Creates a block object, either from the cache or from the chain itself
## Note that you need to pass in the provider, not the web3 wrapped provider object!
## This is because only the provider allows you to make json rpc requests
def createFromBlockNumber(block_number, base_provider):
cache_file = '{cacheDirectory}/{blockNumber}.json'.format(cacheDirectory=cache_directoty, blockNumber=block_number)
## Check to see if the data already exists in the cache
## if it exists load the data from cache
## If not then get the data from the chain and save it to the cache
if (Path(cache_file).is_file()):
print(('Cache for block {block_number} exists, loading data from cache').format(block_number=block_number))
block_file = open(cache_file)
block_json = json.load(block_file)
block = BlockData(block_number, block_json['data'], block_json['receipts'], block_json['calls'], block_json['logs'])
return block
else:
w3 = Web3(base_provider)
print(("Cache for block {block_number} did not exist, getting data").format(block_number=block_number))
## Get block data
block_data = w3.eth.get_block(block_number, True)
## Get the block receipts
## TODO: evaluate whether or not this is sufficient or if gas used needs to be converted to a proper big number.
## In inspect-ts it needed to be converted
block_receipts_raw = base_provider.make_request("eth_getBlockReceipts", [block_number])
## Trace the whole block, return those calls
block_calls = w3.parity.trace_block(block_number)
## Get the logs
block_hash = (block_data.hash).hex()
block_logs = w3.eth.get_logs({'blockHash': block_hash})
## Create a new object
block = BlockData(block_number, block_data, block_receipts_raw, block_calls, block_logs)
## Write the result to a JSON file for loading in the future
block.writeJSON()
return block

40552
cache/12412732.json vendored

File diff suppressed because it is too large Load Diff

33
docker-compose.yml Normal file
View File

@ -0,0 +1,33 @@
services:
mev-inspect:
build: .
depends_on:
- db
env_file:
- .env
volumes:
- .:/app
tty: true
db:
image: postgres:12
volumes:
- mev-inspect-db-data:/var/lib/postgresql/data/pgdata
env_file:
- .env
environment:
- PGDATA=/var/lib/postgresql/data/pgdata
pgadmin:
image: dpage/pgadmin4
networks:
- default
depends_on:
- db
env_file:
- .env
ports:
- "5050:5050"
volumes:
mev-inspect-db-data:

61
inspect_block.py Normal file
View File

@ -0,0 +1,61 @@
import json
import click
from web3 import Web3
from mev_inspect import block
from mev_inspect.crud.classified_traces import write_classified_traces
from mev_inspect.db import get_session
from mev_inspect.classifier_specs import CLASSIFIER_SPECS
from mev_inspect.trace_classifier import TraceClassifier
@click.command()
@click.argument("block_number", type=int)
@click.argument("rpc")
def inspect_block(block_number: int, rpc: str):
base_provider = Web3.HTTPProvider(rpc)
block_data = block.create_from_block_number(block_number, base_provider)
print(f"Total traces: {len(block_data.traces)}")
total_transactions = len(
set(
t.transaction_hash
for t in block_data.traces
if t.transaction_hash is not None
)
)
print(f"Total transactions: {total_transactions}")
trace_clasifier = TraceClassifier(CLASSIFIER_SPECS)
classified_traces = trace_clasifier.classify(block_data.traces)
print(f"Returned {len(classified_traces)} classified traces")
db_session = get_session()
write_classified_traces(db_session, classified_traces)
db_session.close()
stats = get_stats(classified_traces)
print(json.dumps(stats, indent=4))
def get_stats(classified_traces) -> dict:
stats: dict = {}
for trace in classified_traces:
abi_name = trace.abi_name
classification = trace.classification.value
signature = trace.function_signature
abi_name_stats = stats.get(abi_name, {})
class_stats = abi_name_stats.get(classification, {})
signature_count = class_stats.get(signature, 0)
class_stats[signature] = signature_count + 1
abi_name_stats[classification] = class_stats
stats[abi_name] = abi_name_stats
return stats
if __name__ == "__main__":
inspect_block()

View File

@ -1,81 +0,0 @@
from web3 import Web3
import configparser
import json
import utils
## Config file is used for addresses/ABIs
config = configparser.ConfigParser()
config.read('./utils/config.ini')
uniswap_router_abi = json.loads(config['ABI']['UniswapV2Router'])
uniswap_router_address = (config['ADDRESSES']['UniswapV2Router'])
sushiswap_router_address = (config['ADDRESSES']['SushiswapV2Router'])
uniswap_pair_abi = json.loads(config['ABI']['UniswapV2Pair'])
class UniswapInspector:
def __init__(self, base_provider) -> None:
self.w3 = Web3(base_provider)
self.trading_functions = self.get_trading_functions()
self.uniswap_v2_router_contract = self.w3.eth.contract(abi=uniswap_router_abi, address=uniswap_router_address)
self.uniswap_router_trade_signatures = self.get_router_signatures()
self.uniswap_v2_pair_contract = self.w3.eth.contract(abi=uniswap_pair_abi)
self.uniswap_v2_pair_swap_signatures = self.uniswap_v2_pair_contract.functions.swap(0, 0, uniswap_router_address, "").selector ## Note the address here doesn't matter, but it must be filled out
self.uniswap_v2_pair_reserves_signatures = self.uniswap_v2_pair_contract.functions.getReserves().selector ## Called "checksigs" in mev-inpsect.ts
print("Built Uniswap inspector")
def get_trading_functions(self):
## Gets all functions used for swapping
result = []
## For each entry in the ABI
for abi in uniswap_router_abi:
## Check to see if the entry is a function and if it is if the function's name starts with swap
if abi['type'] == 'function' and abi['name'].startswith('swap'):
## If so add it to our array
result.append(abi['name'])
return result
def get_router_signatures(self):
## Gets the selector / function signatures of all the router swap functions
result = []
## For each entry in the ABI
for abi in uniswap_router_abi:
## Check to see if the entry is a function and if it is if the function's name starts with swap
if abi['type'] == 'function' and abi['name'].startswith('swap'):
## Add a parantheses
function = abi['name'] + '('
## For each input in the function's input
for input in abi['inputs']:
## Concat them into a string with commas
function = function + input['internalType'] + ','
## Take off the last comma, add a ')' to close the parentheses
function = function[:-1] + ')'
## The result looks like this: 'swapETHForExactTokens(uint256,address[],address,uint256)'
## Take the first 4 bytes of the sha3 hash of the above string.
selector = (Web3.sha3(text=function)[0:4])
## Add that to an array
result.append(selector)
return result
def inspect(self, calls):
result = []
trade_calls = []
for call in calls:
print('\n',call)
if (call['action']['to'] == uniswap_router_address.lower() or call['action']['to'] == sushiswap_router_address.lower()) and utils.check_call_for_signature(call, self.uniswap_router_trade_signatures):
# print("WIP, here is where there is a call that matches what we are looking for")
1 == 1

0
mev_inspect/__init__.py Normal file
View File

22
mev_inspect/abi.py Normal file
View File

@ -0,0 +1,22 @@
import json
from pathlib import Path
from typing import Optional
from pydantic import parse_obj_as
from mev_inspect.schemas import ABI
THIS_FILE_DIRECTORY = Path(__file__).parents[0]
ABI_DIRECTORY_PATH = THIS_FILE_DIRECTORY / "abis"
def get_abi(abi_name: str) -> Optional[ABI]:
abi_path = ABI_DIRECTORY_PATH / f"{abi_name}.json"
if abi_path.is_file():
with abi_path.open() as abi_file:
abi_json = json.load(abi_file)
return parse_obj_as(ABI, abi_json)
return None

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1 @@
[{"inputs":[{"internalType":"string","name":"name_","type":"string"},{"internalType":"string","name":"symbol_","type":"string"}],"stateMutability":"nonpayable","type":"constructor"},{"anonymous":false,"inputs":[{"indexed":true,"internalType":"address","name":"owner","type":"address"},{"indexed":true,"internalType":"address","name":"spender","type":"address"},{"indexed":false,"internalType":"uint256","name":"value","type":"uint256"}],"name":"Approval","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"internalType":"address","name":"from","type":"address"},{"indexed":true,"internalType":"address","name":"to","type":"address"},{"indexed":false,"internalType":"uint256","name":"value","type":"uint256"}],"name":"Transfer","type":"event"},{"inputs":[{"internalType":"address","name":"owner","type":"address"},{"internalType":"address","name":"spender","type":"address"}],"name":"allowance","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"address","name":"spender","type":"address"},{"internalType":"uint256","name":"amount","type":"uint256"}],"name":"approve","outputs":[{"internalType":"bool","name":"","type":"bool"}],"stateMutability":"nonpayable","type":"function"},{"inputs":[{"internalType":"address","name":"account","type":"address"}],"name":"balanceOf","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"decimals","outputs":[{"internalType":"uint8","name":"","type":"uint8"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"address","name":"spender","type":"address"},{"internalType":"uint256","name":"subtractedValue","type":"uint256"}],"name":"decreaseAllowance","outputs":[{"internalType":"bool","name":"","type":"bool"}],"stateMutability":"nonpayable","type":"function"},{"inputs":[{"internalType":"address","name":"spender","type":"address"},{"internalType":"uint256","name":"addedValue","type":"uint256"}],"name":"increaseAllowance","outputs":[{"internalType":"bool","name":"","type":"bool"}],"stateMutability":"nonpayable","type":"function"},{"inputs":[],"name":"name","outputs":[{"internalType":"string","name":"","type":"string"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"symbol","outputs":[{"internalType":"string","name":"","type":"string"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"totalSupply","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"address","name":"recipient","type":"address"},{"internalType":"uint256","name":"amount","type":"uint256"}],"name":"transfer","outputs":[{"internalType":"bool","name":"","type":"bool"}],"stateMutability":"nonpayable","type":"function"},{"inputs":[{"internalType":"address","name":"sender","type":"address"},{"internalType":"address","name":"recipient","type":"address"},{"internalType":"uint256","name":"amount","type":"uint256"}],"name":"transferFrom","outputs":[{"internalType":"bool","name":"","type":"bool"}],"stateMutability":"nonpayable","type":"function"}]

View File

@ -0,0 +1 @@
[{"inputs": [{"internalType": "contract UniswapV2Factory", "name": "_uniswapFactory", "type": "address"}, {"internalType": "uint256", "name": "_start", "type": "uint256"}, {"internalType": "uint256", "name": "_stop", "type": "uint256"}], "name": "getPairsByIndexRange", "outputs": [{"internalType": "address[3][]", "name": "", "type": "address[3][]"}], "stateMutability": "view", "type": "function"}, {"inputs": [{"internalType": "contract IUniswapV2Pair[]", "name": "_pairs", "type": "address[]"}], "name": "getReservesByPairs", "outputs": [{"internalType": "uint256[3][]", "name": "", "type": "uint256[3][]"}], "stateMutability": "view", "type": "function"}]

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

104
mev_inspect/block.py Normal file
View File

@ -0,0 +1,104 @@
from pathlib import Path
from typing import Any, Dict, List
from web3 import Web3
from mev_inspect.schemas import Block, Trace, TraceType
cache_directory = "./cache"
## Creates a block object, either from the cache or from the chain itself
## Note that you need to pass in the provider, not the web3 wrapped provider object!
## This is because only the provider allows you to make json rpc requests
def create_from_block_number(block_number: int, base_provider) -> Block:
cache_path = _get_cache_path(block_number)
if cache_path.is_file():
print(f"Cache for block {block_number} exists, " "loading data from cache")
return Block.parse_file(cache_path)
else:
print(f"Cache for block {block_number} did not exist, getting data")
w3 = Web3(base_provider)
block = fetch_block(w3, base_provider, block_number)
cache_block(cache_path, block)
return block
def fetch_block(w3, base_provider, block_number: int) -> Block:
## Get block data
block_data = w3.eth.get_block(block_number, True)
## Get the block receipts
## TODO: evaluate whether or not this is sufficient or if gas used needs to be converted to a proper big number.
## In inspect-ts it needed to be converted
block_receipts_raw = base_provider.make_request(
"eth_getBlockReceipts", [block_number]
)
## Trace the whole block, return those calls
traces_json = w3.parity.trace_block(block_number)
traces = [Trace(**trace_json) for trace_json in traces_json]
## Get the logs
block_hash = (block_data.hash).hex()
block_logs = w3.eth.get_logs({"blockHash": block_hash})
## Get gas used by individual txs and store them too
txs_gas_data: Dict[str, Dict[str, Any]] = {}
for transaction in block_data["transactions"]:
tx_hash = (transaction.hash).hex()
tx_data = w3.eth.get_transaction(tx_hash)
tx_receipt = w3.eth.wait_for_transaction_receipt(tx_hash)
txs_gas_data[tx_hash] = {
"gasUsed": tx_receipt[
"gasUsed"
], # fix: why does this return 0 for certain txs?
"gasPrice": tx_data["gasPrice"],
"netFeePaid": tx_data["gasPrice"] * tx_receipt["gasUsed"],
}
transaction_hashes = get_transaction_hashes(traces)
## Create a new object
return Block(
block_number=block_number,
data=block_data,
receipts=block_receipts_raw,
traces=traces,
logs=block_logs,
transaction_hashes=transaction_hashes,
txs_gas_data=txs_gas_data,
)
def get_transaction_hashes(calls: List[Trace]) -> List[str]:
result = []
for call in calls:
if call.type != TraceType.reward:
if (
call.transaction_hash is not None
and call.transaction_hash not in result
):
result.append(call.transaction_hash)
return result
def cache_block(cache_path: Path, block: Block):
write_mode = "w" if cache_path.is_file() else "x"
with open(cache_path, mode=write_mode) as cache_file:
cache_file.write(block.json())
def _get_cache_path(block_number: int) -> Path:
cache_directory_path = Path(cache_directory)
return cache_directory_path / f"{block_number}-new.json"

View File

@ -0,0 +1,37 @@
from mev_inspect.schemas.classified_traces import (
Classification,
ClassifierSpec,
Protocol,
)
SUSHISWAP_ROUTER_ADDRESS = "0xd9e1cE17f2641f24aE83637ab66a2cca9C378B9F"
UNISWAP_V2_ROUTER_ADDRESS = "0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D"
CLASSIFIER_SPECS = [
ClassifierSpec(
abi_name="UniswapV2Router",
protocol=Protocol.uniswap_v2,
valid_contract_addresses=[UNISWAP_V2_ROUTER_ADDRESS],
),
ClassifierSpec(
abi_name="UniswapV2Router",
protocol=Protocol.sushiswap,
valid_contract_addresses=[SUSHISWAP_ROUTER_ADDRESS],
),
ClassifierSpec(
abi_name="ERC20",
classifications={
"transferFrom(address,address,uint256)": Classification.transfer,
"transfer(address,uint256)": Classification.transfer,
"burn(address)": Classification.burn,
},
),
ClassifierSpec(
abi_name="UniswapV2Pair",
classifications={
"swap(uint256,uint256,address,bytes)": Classification.swap,
},
),
]

7
mev_inspect/config.ini Normal file
View File

@ -0,0 +1,7 @@
[RPC]
Endpoint = http://localhost:8545/
[ADDRESSES]
UniswapV2Router = 0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D
SushiswapV2Router = 0xd9e1cE17f2641f24aE83637ab66a2cca9C378B9F
WETH = 0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2

13
mev_inspect/config.py Normal file
View File

@ -0,0 +1,13 @@
import os
import configparser
THIS_FILE_DIRECTORY = os.path.dirname(__file__)
CONFIG_PATH = os.path.join(THIS_FILE_DIRECTORY, "config.ini")
def load_config():
config = configparser.ConfigParser()
config.read(CONFIG_PATH)
return config

View File

View File

@ -0,0 +1,17 @@
import json
from typing import List
from mev_inspect.models.classified_traces import ClassifiedTraceModel
from mev_inspect.schemas.classified_traces import ClassifiedTrace
def write_classified_traces(
db_session,
classified_traces: List[ClassifiedTrace],
) -> None:
models = [
ClassifiedTraceModel(**json.loads(trace.json())) for trace in classified_traces
]
db_session.bulk_save_objects(models)
db_session.commit()

13
mev_inspect/db.py Normal file
View File

@ -0,0 +1,13 @@
import os
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
def get_engine():
return create_engine(os.getenv("SQLALCHEMY_DATABASE_URI"))
def get_session():
Session = sessionmaker(bind=get_engine())
return Session()

36
mev_inspect/decode.py Normal file
View File

@ -0,0 +1,36 @@
from typing import Dict, Optional
from hexbytes import HexBytes
from eth_abi import decode_abi
from mev_inspect.schemas.abi import ABI, ABIFunctionDescription
from mev_inspect.schemas.call_data import CallData
class ABIDecoder:
def __init__(self, abi: ABI):
self._functions_by_selector: Dict[str, ABIFunctionDescription] = {
description.get_selector(): description
for description in abi
if isinstance(description, ABIFunctionDescription)
}
def decode(self, data: str) -> Optional[CallData]:
hex_data = HexBytes(data)
selector, params = hex_data[:4], hex_data[4:]
func = self._functions_by_selector.get(selector)
if func is None:
return None
names = [input.name for input in func.inputs]
types = [input.type for input in func.inputs]
decoded = decode_abi(types, params)
return CallData(
function_name=func.name,
function_signature=func.get_signature(),
inputs={name: value for name, value in zip(names, decoded)},
)

View File

View File

@ -0,0 +1,3 @@
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()

View File

@ -0,0 +1,24 @@
from sqlalchemy import Column, JSON, Numeric, String
from .base import Base
class ClassifiedTraceModel(Base):
__tablename__ = "classified_traces"
transaction_hash = Column(String, primary_key=True)
block_number = Column(Numeric, nullable=False)
classification = Column(String, nullable=False)
trace_type = Column(String, nullable=False)
trace_address = Column(String, nullable=False)
protocol = Column(String, nullable=True)
abi_name = Column(String, nullable=True)
function_name = Column(String, nullable=True)
function_signature = Column(String, nullable=True)
inputs = Column(JSON, nullable=True)
from_address = Column(String, nullable=True)
to_address = Column(String, nullable=True)
gas = Column(Numeric, nullable=True)
value = Column(Numeric, nullable=True)
gas_used = Column(Numeric, nullable=True)
error = Column(String, nullable=True)

View File

@ -0,0 +1,2 @@
from .abi import ABI
from .blocks import Block, NestedTrace, Trace, TraceType

View File

@ -0,0 +1,50 @@
from enum import Enum
from typing import List, Union
from typing_extensions import Literal
from hexbytes import HexBytes
from pydantic import BaseModel
from web3 import Web3
class ABIDescriptionType(str, Enum):
function = "function"
constructor = "constructor"
fallback = "fallback"
event = "event"
receive = "receive"
NON_FUNCTION_DESCRIPTION_TYPES = Union[
Literal[ABIDescriptionType.constructor],
Literal[ABIDescriptionType.fallback],
Literal[ABIDescriptionType.event],
Literal[ABIDescriptionType.receive],
]
class ABIDescriptionInput(BaseModel):
name: str
type: str
class ABIGenericDescription(BaseModel):
type: NON_FUNCTION_DESCRIPTION_TYPES
class ABIFunctionDescription(BaseModel):
type: Literal[ABIDescriptionType.function]
name: str
inputs: List[ABIDescriptionInput]
def get_selector(self) -> HexBytes:
signature = self.get_signature()
return Web3.sha3(text=signature)[0:4]
def get_signature(self) -> str:
joined_input_types = ",".join(input.type for input in self.inputs)
return f"{self.name}({joined_input_types})"
ABIDescription = Union[ABIFunctionDescription, ABIGenericDescription]
ABI = List[ABIDescription]

View File

@ -0,0 +1,76 @@
from enum import Enum
from typing import Dict, List, Optional
from pydantic import BaseModel, validator
from mev_inspect.utils import hex_to_int
from .utils import CamelModel, Web3Model
class CallResult(CamelModel):
gas_used: int
@validator("gas_used", pre=True)
def maybe_hex_to_int(v):
if isinstance(v, str):
return hex_to_int(v)
return v
class CallAction(Web3Model):
to: str
from_: str
input: str
value: int
gas: int
@validator("value", "gas", pre=True)
def maybe_hex_to_int(v):
if isinstance(v, str):
return hex_to_int(v)
return v
class Config:
fields = {"from_": "from"}
class TraceType(Enum):
call = "call"
create = "create"
delegate_call = "delegateCall"
reward = "reward"
suicide = "suicide"
class Trace(CamelModel):
action: dict
block_hash: str
block_number: int
result: Optional[dict]
subtraces: int
trace_address: List[int]
transaction_hash: Optional[str]
transaction_position: Optional[int]
type: TraceType
error: Optional[str]
class Block(Web3Model):
block_number: int
traces: List[Trace]
data: dict
logs: List[dict]
receipts: dict
transaction_hashes: List[str]
txs_gas_data: Dict[str, dict]
def get_filtered_traces(self, hash: str) -> List[Trace]:
return [trace for trace in self.traces if trace.transaction_hash == hash]
class NestedTrace(BaseModel):
trace: Trace
subtraces: List["NestedTrace"]
NestedTrace.update_forward_refs()

View File

@ -0,0 +1,9 @@
from typing import Any, Dict
from pydantic import BaseModel
class CallData(BaseModel):
function_name: str
function_signature: str
inputs: Dict[str, Any]

View File

@ -0,0 +1,51 @@
from enum import Enum
from typing import Any, Dict, List, Optional
from pydantic import BaseModel
from .blocks import TraceType
class Classification(Enum):
unknown = "unknown"
swap = "swap"
burn = "burn"
transfer = "transfer"
class Protocol(Enum):
uniswap_v2 = "uniswap_v2"
sushiswap = "sushiswap"
class ClassifiedTrace(BaseModel):
transaction_hash: str
block_number: int
trace_type: TraceType
trace_address: List[int]
classification: Classification
protocol: Optional[Protocol]
abi_name: Optional[str]
function_name: Optional[str]
function_signature: Optional[str]
inputs: Optional[Dict[str, Any]]
to_address: Optional[str]
from_address: Optional[str]
gas: Optional[int]
value: Optional[int]
gas_used: Optional[int]
error: Optional[str]
class Config:
json_encoders = {
# a little lazy but fine for now
# this is used for bytes value inputs
bytes: lambda b: b.hex(),
}
class ClassifierSpec(BaseModel):
abi_name: str
protocol: Optional[Protocol] = None
valid_contract_addresses: Optional[List[str]] = None
classifications: Dict[str, Classification] = {}

View File

@ -0,0 +1,33 @@
import json
from hexbytes import HexBytes
from pydantic import BaseModel
from web3.datastructures import AttributeDict
def to_camel(string: str) -> str:
return "".join(
word.capitalize() if i > 0 else word for i, word in enumerate(string.split("_"))
)
def to_original_json_dict(model: BaseModel) -> dict:
return json.loads(model.json(by_alias=True, exclude_unset=True))
class Web3Model(BaseModel):
"""BaseModel that handles web3's unserializable objects"""
class Config:
json_encoders = {
AttributeDict: dict,
HexBytes: lambda h: h.hex(),
}
class CamelModel(BaseModel):
"""BaseModel that translates from camelCase to snake_case"""
class Config(Web3Model.Config):
alias_generator = to_camel
allow_population_by_field_name = True

242
mev_inspect/tokenflow.py Normal file
View File

@ -0,0 +1,242 @@
from typing import List, Optional
from mev_inspect.config import load_config
from mev_inspect.schemas import Block, Trace, TraceType
config = load_config()
rpc_url = config["RPC"]["Endpoint"]
weth_address = config["ADDRESSES"]["WETH"]
# w3 = Web3(HTTPProvider(rpc_url))
cache_directory = "./cache"
def is_stablecoin_address(address):
# to look for stablecoin inflow/outflows
stablecoin_addresses = [
"0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48", # USDC
"0xdac17f958d2ee523a2206206994597c13d831ec7", # USDT
"0x6b175474e89094c44da98b954eedeac495271d0f", # DAI
"0x0000000000085d4780b73119b644ae5ecd22b376", # TUSD
"0x4fabb145d64652a948d72533023f6e7a623c7c53", # BUSD
"0x8e870d67f660d95d5be530380d0ec0bd388289e1", # PAX
"0x956F47F50A910163D8BF957Cf5846D573E7f87CA", # FEI
"0x853d955aCEf822Db058eb8505911ED77F175b99e", # FRAX
"0xBC6DA0FE9aD5f3b0d58160288917AA56653660E9", # alUSD
"0x57Ab1ec28D129707052df4dF418D58a2D46d5f51", # sUSD
"0x5f98805A4E8be255a32880FDeC7F6728C6568bA0", # lUSD
"0x674C6Ad92Fd080e4004b2312b45f796a192D27a0", # USDN
]
return address in stablecoin_addresses
def is_known_router_address(address):
# to exclude known router addresses from token flow analysis
known_router_addresses = [
"0x3D71d79C224998E608d03C5Ec9B405E7a38505F0", # keeper dao, whitelists extraction
"0x11111254369792b2Ca5d084aB5eEA397cA8fa48B", # 1inch v1 router
"0x111111125434b319222cdbf8c261674adb56f3ae", # 1inch v2 router
"0x11111112542d85b3ef69ae05771c2dccff4faa26", # 1inch v3 router
"0xa356867fdcea8e71aeaf87805808803806231fdc", # DODO
"0xdef1c0ded9bec7f1a1670819833240f027b25eff", # 0x proxy
"0x90f765f63e7dc5ae97d6c576bf693fb6af41c129", # Set Trade
"0x7113dd99c79aff93d54cfa4b2885576535a132de", # Totle exchange
"0x9509665d015bfe3c77aa5ad6ca20c8afa1d98989", # Paraswap
"0x86969d29F5fd327E1009bA66072BE22DB6017cC6", # Paraswap v2
"0xf90e98f3d8dce44632e5020abf2e122e0f99dfab", # Paraswap v3
"0x57805e5a227937bac2b0fdacaa30413ddac6b8e1", # Furucombo
"0x17e8ca1b4798b97602895f63206afcd1fc90ca5f", # Furucombo proxy
"0x881d40237659c251811cec9c364ef91dc08d300c", # Metamask swap
"0x745daa146934b27e3f0b6bff1a6e36b9b90fb131", # DEX.ag
"0xb2be281e8b11b47fec825973fc8bb95332022a54", # Zerion SDK
"0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D", # UniswapV2Router02
"0xd9e1cE17f2641f24aE83637ab66a2cca9C378B9F", # SushiswapV2Router02
"0xE592427A0AEce92De3Edee1F18E0157C05861564", # Uniswap v3 router
"0x3E66B66Fd1d0b02fDa6C811Da9E0547970DB2f21", # Balance exchange proxy
"0x1bD435F3C054b6e901B7b108a0ab7617C808677b", # Paraswap v4
"0xC011a73ee8576Fb46F5E1c5751cA3B9Fe0af2a6F", # SNX proxy synth issuer
]
return address in known_router_addresses
# we're interested in the to address to run token flow on it as well
def get_tx_to_address(tx_hash, block) -> Optional[str]:
for receipt in block.receipts["result"]:
if receipt["transactionHash"] == tx_hash:
return receipt["to"]
return None
def get_tx_proxies(tx_traces: List[Trace], to_address: Optional[str]):
proxies = []
for trace in tx_traces:
if (
trace.type == TraceType.call
and trace.action["callType"] == "delegatecall"
and trace.action["from"] == to_address
):
proxies.append(trace.action["to"])
return proxies
def get_net_gas_used(tx_hash, block):
for trace in block.traces:
if trace.transaction_hash == tx_hash:
gas_used += int(trace.result["gasUsed"], 16)
return gas_used
def get_ether_flows(tx_traces, addresses_to_check):
eth_inflow = 0
eth_outflow = 0
for trace in tx_traces:
if trace.type == TraceType.call:
value = int(
trace.action["value"], 16
) # converting from 0x prefix to decimal
# ETH_GET
if (
trace.action["callType"] != "delegatecall"
and trace.action["from"] != weth_address
and value > 0
and trace.action["to"] in addresses_to_check
):
eth_inflow = eth_inflow + value
# ETH_GIVE
if (
trace.action["callType"] != "delegatecall"
and trace.action["to"] != weth_address
and value > 0
and trace.action["from"] in addresses_to_check
):
eth_outflow = eth_outflow + value
if trace.action["to"] == weth_address:
# WETH_GET1 & WETH_GET2 (to account for both 'transfer' and 'transferFrom' methods)
# WETH_GIVE1 & WETH_GIVE2
# transfer(address to,uint256 value) with args
if len(trace.action["input"]) == 138:
if trace.action["input"][2:10] == "a9059cbb":
transfer_to = "0x" + trace.action["input"][34:74]
transfer_value = int("0x" + trace.action["input"][74:138], 16)
if transfer_to in addresses_to_check:
eth_inflow = eth_inflow + transfer_value
elif trace.action["from"] in addresses_to_check:
eth_outflow = eth_outflow + transfer_value
# transferFrom(address from,address to,uint256 value )
if len(trace.action["input"]) == 202:
if trace.action["input"][2:10] == "23b872dd":
transfer_from = "0x" + trace.action["input"][34:74]
transfer_to = "0x" + trace.action["input"][98:138]
transfer_value = int("0x" + trace.action["input"][138:202], 16)
if transfer_to in addresses_to_check:
eth_inflow = eth_inflow + transfer_value
elif transfer_from in addresses_to_check:
eth_outflow = eth_outflow + transfer_value
if trace.type == TraceType.suicide:
if trace.action["refundAddress"] in addresses_to_check:
refund_value = int("0x" + trace.action["balance"], 16)
eth_inflow = eth_inflow + refund_value
return [eth_inflow, eth_outflow]
def get_dollar_flows(tx_traces, addresses_to_check):
dollar_inflow = 0
dollar_outflow = 0
for trace in tx_traces:
if trace.type == TraceType.call and is_stablecoin_address(trace.action["to"]):
_ = int(trace.action["value"], 16) # converting from 0x prefix to decimal
# USD_GET1 & USD_GET2 (to account for both 'transfer' and 'transferFrom' methods)
# USD_GIVE1 & USD_GIVE2
# transfer(address to,uint256 value) with args
if len(trace.action["input"]) == 138:
if trace.action["input"][2:10] == "a9059cbb":
transfer_to = "0x" + trace.action["input"][34:74]
transfer_value = int("0x" + trace.action["input"][74:138], 16)
if transfer_to in addresses_to_check:
dollar_inflow = dollar_inflow + transfer_value
elif trace.action["from"] in addresses_to_check:
dollar_outflow = dollar_outflow + transfer_value
# transferFrom(address from,address to,uint256 value )
if len(trace.action["input"]) == 202:
if trace.action["input"][2:10] == "23b872dd":
transfer_from = "0x" + trace.action["input"][34:74]
transfer_to = "0x" + trace.action["input"][98:138]
transfer_value = int("0x" + trace.action["input"][138:202], 16)
if transfer_to in addresses_to_check:
dollar_inflow = dollar_inflow + transfer_value
elif transfer_from in addresses_to_check:
dollar_outflow = dollar_outflow + transfer_value
return [dollar_inflow, dollar_outflow]
def run_tokenflow(tx_hash: str, block: Block):
tx_traces = block.get_filtered_traces(tx_hash)
to_address = get_tx_to_address(tx_hash, block)
if to_address is None:
raise ValueError("No to address found")
addresses_to_check = []
# check for proxies, add them to addresses to check
proxies = get_tx_proxies(tx_traces, to_address)
for proxy in proxies:
addresses_to_check.append(proxy.lower())
# check if the 'to' field is a known aggregator/router
# if not, add to relevant addresses to run TF on
if not is_known_router_address(to_address):
addresses_to_check.append(
to_address.lower()
) # traces need lowercase addresses to match
ether_flows = get_ether_flows(tx_traces, addresses_to_check)
dollar_flows = get_dollar_flows(tx_traces, addresses_to_check)
# print(addresses_to_check)
# print('net eth flow', ether_flows[0] - ether_flows[1])
# print('net dollar flow', dollar_flows )
return {"ether_flows": ether_flows, "dollar_flows": dollar_flows}
# note: not the gas set by user, only gas consumed upon execution
# def get_gas_used_by_tx(tx_hash):
# # tx_receipt = w3.eth.getTransactionReceipt(tx_hash)
# return tx_receipt["gasUsed"]
# tx_traces = get_tx_traces('0x4121ce805d33e952b2e6103a5024f70c118432fd0370128d6d7845f9b2987922', 11930296)
# print(tx_traces)
# print(type(known_router_addresses))
# print(is_stablecoin_address("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"))
# run_tokenflow("0x4121ce805d33e952b2e6103a5024f70c118432fd0370128d6d7845f9b2987922", 11930296)
# delegate call test
# run_tokenflow("0x9007b339c81de366cd53539edc15c86ffc87542c65f374c0d4d1f8823a3ccf60", 12051659)
# stable flow test
# res = run_tokenflow("0x496836e0bd1520388e36c79d587a31d4b3306e4f25352164178ca0667c7f9c29", 11935012)
# print(res)
# complex arb test
# res = run_tokenflow("0x5ab21bfba50ad3993528c2828c63e311aafe93b40ee934790e545e150cb6ca73", 11931272)
# print(res)
# get_gas_used_by_tx("0x4121ce805d33e952b2e6103a5024f70c118432fd0370128d6d7845f9b2987922")

View File

@ -0,0 +1,93 @@
from typing import Dict, List, Optional
from mev_inspect.abi import get_abi
from mev_inspect.decode import ABIDecoder
from mev_inspect.schemas.blocks import CallAction, CallResult, Trace, TraceType
from mev_inspect.schemas.classified_traces import (
Classification,
ClassifiedTrace,
ClassifierSpec,
)
class TraceClassifier:
def __init__(self, classifier_specs: List[ClassifierSpec]) -> None:
# TODO - index by contract_addresses for speed
self._classifier_specs = classifier_specs
self._decoders_by_abi_name: Dict[str, ABIDecoder] = {}
for spec in self._classifier_specs:
abi = get_abi(spec.abi_name)
if abi is None:
raise ValueError(f"No ABI found for {spec.abi_name}")
decoder = ABIDecoder(abi)
self._decoders_by_abi_name[spec.abi_name] = decoder
def classify(
self,
traces: List[Trace],
) -> List[ClassifiedTrace]:
return [
self._classify_trace(trace)
for trace in traces
if trace.type != TraceType.reward
]
def _classify_trace(self, trace: Trace) -> ClassifiedTrace:
if trace.type == TraceType.call:
classified_trace = self._classify_call(trace)
if classified_trace is not None:
return classified_trace
return ClassifiedTrace(
**trace.dict(),
trace_type=trace.type,
classification=Classification.unknown,
)
def _classify_call(self, trace) -> Optional[ClassifiedTrace]:
action = CallAction(**trace.action)
result = CallResult(**trace.result) if trace.result is not None else None
for spec in self._classifier_specs:
if spec.valid_contract_addresses is not None:
if action.to not in spec.valid_contract_addresses:
continue
decoder = self._decoders_by_abi_name[spec.abi_name]
call_data = decoder.decode(action.input)
if call_data is not None:
signature = call_data.function_signature
classification = spec.classifications.get(
signature, Classification.unknown
)
return ClassifiedTrace(
**trace.dict(),
trace_type=trace.type,
classification=classification,
protocol=spec.protocol,
abi_name=spec.abi_name,
function_name=call_data.function_name,
function_signature=signature,
inputs=call_data.inputs,
to_address=action.to,
from_address=action.from_,
value=action.value,
gas=action.gas,
gas_used=result.gas_used if result is not None else None,
)
return ClassifiedTrace(
**trace.dict(),
trace_type=trace.type,
classification=Classification.unknown,
to_address=action.to,
from_address=action.from_,
value=action.value,
gas=action.gas,
gas_used=result.gas_used if result is not None else None,
)

80
mev_inspect/traces.py Normal file
View File

@ -0,0 +1,80 @@
from itertools import groupby
from typing import Iterable, List
from mev_inspect.schemas import Trace, NestedTrace
def as_nested_traces(traces: Iterable[Trace]) -> List[NestedTrace]:
nested_traces = []
sorted_by_transaction_hash = sorted(traces, key=_get_transaction_hash)
for _, transaction_traces in groupby(
sorted_by_transaction_hash, _get_transaction_hash
):
nested_traces += _as_nested_traces_by_transaction(transaction_traces)
return nested_traces
def _get_transaction_hash(trace) -> str:
return trace.transaction_hash
def _as_nested_traces_by_transaction(traces: Iterable[Trace]) -> List[NestedTrace]:
"""
Turns a list of Traces into a a tree of NestedTraces
using their trace addresses
Right now this has an exponential (?) runtime because we rescan
most traces at each level of tree depth
TODO to write a better implementation if it becomes a bottleneck
Should be doable in linear time
"""
nested_traces = []
parent = None
children: List[Trace] = []
sorted_traces = sorted(traces, key=lambda t: t.trace_address)
for trace in sorted_traces:
if parent is None:
parent = trace
children = []
continue
elif not _is_subtrace(trace, parent):
nested_traces.append(
NestedTrace(
trace=parent,
subtraces=as_nested_traces(children),
)
)
parent = trace
children = []
else:
children.append(trace)
if parent is not None:
nested_traces.append(
NestedTrace(
trace=parent,
subtraces=as_nested_traces(children),
)
)
return nested_traces
def _is_subtrace(trace: Trace, parent: Trace):
parent_trace_length = len(parent.trace_address)
if len(trace.trace_address) > parent_trace_length:
prefix = trace.trace_address[:parent_trace_length]
return prefix == parent.trace_address
return False

5
mev_inspect/utils.py Normal file
View File

@ -0,0 +1,5 @@
from hexbytes.main import HexBytes
def hex_to_int(value: str) -> int:
return int.from_bytes(HexBytes(value), byteorder="big")

1891
poetry.lock generated Normal file

File diff suppressed because it is too large Load Diff

View File

@ -1,13 +0,0 @@
class Processor:
def __init__(self, base_provider, inspectors) -> None:
self.base_provider = base_provider
self.inspectors = inspectors
def get_transaction_evaluations(self, block_data):
for transaction_hash in block_data.transaction_hashes:
calls = block_data.get_filtered_calls(transaction_hash)
for inspector in self.inspectors:
inspector.inspect(calls)
# print(calls)

72
pyproject.toml Normal file
View File

@ -0,0 +1,72 @@
[tool.poetry]
name = "mev_inspect"
version = "0.1.0"
description = ""
authors = ["Your Name <you@example.com>"]
[tool.poetry.dependencies]
python = "^3.9"
web3 = "^5.21.0"
pydantic = "^1.8.2"
hexbytes = "^0.2.1"
click = "^8.0.1"
psycopg2 = "^2.9.1"
[tool.poetry.dev-dependencies]
pre-commit = "^2.13.0"
pylint = "^2.9.5"
mypy = "^0.910"
black = "^21.7b0"
isort = "^5.9.2"
pytest = "^6.2.4"
pytest-sugar = "^0.9.4"
pytest-cov = "^2.12.1"
coverage = "^5.5"
alembic = "^1.6.5"
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
[tool.poetry.scripts]
lint = 'scripts.dev_tools:lint'
test = 'scripts.dev_tools:test'
isort = 'scripts.dev_tools:isort'
mypy = 'scripts.dev_tools:mypy'
black = 'scripts.dev_tools:black'
pre_commit = 'scripts.dev_tools:pre_commit'
start = 'scripts.docker:start'
stop = 'scripts.docker:stop'
build = 'scripts.docker:build'
attach = 'scripts.docker:attach'
exec = 'scripts.docker:exec'
inspect = 'scripts.inspect:inspect'
[tool.black]
exclude = '''
/(
\.eggs
| \.git
| \.hg
| \.mypy_cache
| \.tox
| \.venv
| _build
| buck-out
| build
| dist
| tests/.*/setup.py
)/
'''
[tool.isort]
profile = "black"
atomic = true
include_trailing_comma = true
lines_after_imports = 2
lines_between_types = 1
use_parentheses = true
src_paths = ["poetry", "tests"]
skip_glob = ["*/setup.py"]
filter_files = true
known_first_party = "poetry"

34
scripts/dev_tools.py Normal file
View File

@ -0,0 +1,34 @@
from subprocess import check_call
import click
def lint():
check_call(["pylint", "."])
def test():
check_call(["pytest", "--cov=mev_inspect", "tests"])
@click.command()
@click.option("-c", required=False, is_flag=True)
def isort(c: str):
"""if c is present run isort in diff mode"""
if c:
check_call(["isort", "."])
else:
check_call(["isort", "--diff", "."])
def mypy():
check_call(["mypy", "."])
@click.command()
@click.option("-c", required=False, is_flag=True)
def black(c: str):
"""if c is present run black in diff mode"""
if c:
check_call(["black", "."])
else:
check_call(["black", "--diff", "--color", "."])

33
scripts/docker.py Normal file
View File

@ -0,0 +1,33 @@
from subprocess import check_call
from typing import List
import click
@click.command()
@click.option("-b", required=False, is_flag=True)
def start(b: str):
"""if d is present background compose"""
if b:
check_call(["docker", "compose", "up", "-d"])
click.echo("docker running in the background...")
else:
check_call(["docker", "compose", "up"])
def stop():
check_call(["docker", "compose", "down"])
def build():
check_call(["docker", "compose", "build"])
def attach():
check_call(["docker", "exec", "-it", "mev-inspect-py_mev-inspect_1", "bash"])
@click.command()
@click.argument("args", nargs=-1)
def exec(args: List[str]):
check_call(["docker", "compose", "exec", "mev-inspect", *args])

21
scripts/inspect.py Normal file
View File

@ -0,0 +1,21 @@
from subprocess import check_call
import click
@click.command()
@click.option("--block-number", type=int, help="the block number you are targetting")
@click.option("--rpc", help="rpc endpoint, this needs to have parity style traces")
def inspect(block_number: int, rpc: str):
check_call(
[
"docker",
"compose",
"exec",
"mev-inspect",
"python",
"inspect_block.py",
str(block_number),
rpc,
]
)

View File

@ -1,26 +0,0 @@
from processor import Processor
from web3.providers import base
from inspector_uniswap import UniswapInspector
import block
from web3 import Web3
import argparse
parser = argparse.ArgumentParser(description='Inspect some blocks.')
parser.add_argument('-block_number', metavar='b', type=int, nargs='+',
help='the block number you are targetting, eventually this will need to be changed')
parser.add_argument('-rpc', metavar='r', help='rpc endpoint, this needs to have parity style traces')
args = parser.parse_args()
## Set up the base provider, but don't wrap it in web3 so we can make requests to it with make_request()
base_provider = Web3.HTTPProvider(args.rpc)
## Get block data that we need
block_data = block.createFromBlockNumber(args.block_number[0], base_provider)
## Build a Uniswap inspector
uniswap_inspector = UniswapInspector(base_provider)
## Create a processor, pass in an ARRAY of inspects
processor = Processor(base_provider, [uniswap_inspector, uniswap_inspector])
processor.get_transaction_evaluations(block_data)

0
tests/__init__.py Normal file
View File

18135
tests/blocks/11930296.json Normal file

File diff suppressed because one or more lines are too long

17348
tests/blocks/11931272.json Normal file

File diff suppressed because one or more lines are too long

10832
tests/blocks/11935012.json Normal file

File diff suppressed because one or more lines are too long

23
tests/liquidation_test.py Normal file
View File

@ -0,0 +1,23 @@
import unittest
# Fails precommit because these inspectors don't exist yet
# from mev_inspect import inspector_compound
# from mev_inspect import inspector_aave
#
#
# class TestLiquidations(unittest.TestCase):
# def test_compound_liquidation(self):
# tx_hash = "0x0ec6d5044a47feb3ceb647bf7ea4ffc87d09244d629eeced82ba17ec66605012"
# block_no = 11338848
# res = inspector_compound.get_profit(tx_hash, block_no)
# # self.assertEqual(res['profit'], 0)
#
# def test_aave_liquidation(self):
# tx_hash = "0xc8d2501d28800b1557eb64c5d0e08fd6070c15b6c04c39ca05631f641d19ffb2"
# block_no = 10803840
# res = inspector_aave.get_profit(tx_hash, block_no)
# # self.assertEqual(res['profit'], 0)
if __name__ == "__main__":
unittest.main()

49
tests/tokenflow_test.py Normal file
View File

@ -0,0 +1,49 @@
import json
import os
import unittest
from mev_inspect import tokenflow
from mev_inspect.schemas.blocks import Block
THIS_FILE_DIRECTORY = os.path.dirname(__file__)
TEST_BLOCKS_DIRECTORY = os.path.join(THIS_FILE_DIRECTORY, "blocks")
class TestTokenFlow(unittest.TestCase):
def test_simple_arb(self):
tx_hash = "0x4121ce805d33e952b2e6103a5024f70c118432fd0370128d6d7845f9b2987922"
block_no = 11930296
block = load_test_block(block_no)
res = tokenflow.run_tokenflow(tx_hash, block)
self.assertEqual(res["ether_flows"], [3547869861992962562, 3499859860420296704])
self.assertEqual(res["dollar_flows"], [0, 0])
def test_arb_with_stable_flow(self):
tx_hash = "0x496836e0bd1520388e36c79d587a31d4b3306e4f25352164178ca0667c7f9c29"
block_no = 11935012
block = load_test_block(block_no)
res = tokenflow.run_tokenflow(tx_hash, block)
self.assertEqual(res["ether_flows"], [597044987302243493, 562445964778930176])
self.assertEqual(res["dollar_flows"], [871839781, 871839781])
def test_complex_cross_arb(self):
tx_hash = "0x5ab21bfba50ad3993528c2828c63e311aafe93b40ee934790e545e150cb6ca73"
block_no = 11931272
block = load_test_block(block_no)
res = tokenflow.run_tokenflow(tx_hash, block)
self.assertEqual(res["ether_flows"], [3636400213125714803, 3559576672903063566])
self.assertEqual(res["dollar_flows"], [0, 0])
def load_test_block(block_number):
block_path = f"{TEST_BLOCKS_DIRECTORY}/{block_number}.json"
with open(block_path, "r") as block_file:
block_json = json.load(block_file)
return Block(**block_json)
if __name__ == "__main__":
unittest.main()

103
tests/trace_test.py Normal file
View File

@ -0,0 +1,103 @@
import unittest
from typing import List
from mev_inspect.schemas import Trace, TraceType, NestedTrace
from mev_inspect.traces import as_nested_traces
DEFAULT_BLOCK_NUMBER = 123
class TestTraces(unittest.TestCase):
def test_nested_traces(self):
trace_hash_address_pairs = [
("abc", [0, 2]),
("abc", []),
("abc", [2]),
("abc", [0]),
("abc", [0, 0]),
("abc", [0, 1]),
("abc", [1]),
("efg", []),
("abc", [1, 0]),
("abc", [0, 1, 0]),
("efg", [0]),
]
traces = [
build_trace_at_address(hash, address)
for (hash, address) in trace_hash_address_pairs
]
nested_traces = as_nested_traces(traces)
assert len(nested_traces) == 2
abc_trace = nested_traces[0]
efg_trace = nested_traces[1]
# abc
assert abc_trace.trace.transaction_hash == "abc"
assert_trace_address(abc_trace, [])
assert len(abc_trace.subtraces) == 3
[trace_0, trace_1, trace_2] = abc_trace.subtraces
assert_trace_address(trace_0, [0])
assert_trace_address(trace_1, [1])
assert_trace_address(trace_2, [2])
assert len(trace_0.subtraces) == 3
assert len(trace_1.subtraces) == 1
assert len(trace_2.subtraces) == 0
[trace_0_0, trace_0_1, trace_0_2] = trace_0.subtraces
[trace_1_0] = trace_1.subtraces
assert_trace_address(trace_0_0, [0, 0])
assert_trace_address(trace_0_1, [0, 1])
assert_trace_address(trace_0_2, [0, 2])
assert_trace_address(trace_1_0, [1, 0])
assert len(trace_0_0.subtraces) == 0
assert len(trace_0_1.subtraces) == 1
assert len(trace_0_2.subtraces) == 0
assert len(trace_1_0.subtraces) == 0
[trace_0_1_0] = trace_0_1.subtraces
assert_trace_address(trace_0_1_0, [0, 1, 0])
assert len(trace_0_1_0.subtraces) == 0
# efg
assert efg_trace.trace.transaction_hash == "efg"
assert_trace_address(efg_trace, [])
assert len(efg_trace.subtraces) == 1
[efg_subtrace] = efg_trace.subtraces
assert_trace_address(efg_subtrace, [0])
assert len(efg_subtrace.subtraces) == 0
def build_trace_at_address(
transaction_hash: str,
trace_address: List[int],
) -> Trace:
return Trace(
# real values
transaction_hash=transaction_hash,
trace_address=trace_address,
# placeholders
action={},
block_hash="",
block_number=DEFAULT_BLOCK_NUMBER,
result=None,
subtraces=0,
transaction_position=None,
type=TraceType.call,
error=None,
)
def assert_trace_address(nested_trace: NestedTrace, trace_address: List[int]):
assert nested_trace.trace.trace_address == trace_address

View File

@ -1,21 +0,0 @@
from hexbytes.main import HexBytes
def check_call_for_signature(call, signatures):
if (call['action']['input'] == None):
return False
## By default set this to False
signature_present_boolean = False
## Iterate over all signatures, and if our call matches any of them set it to True
for signature in signatures:
# print("Desired signature:", str(signature))
# print("Actual", HexBytes(call['action']['input']))
if HexBytes(call['action']['input']).startswith((signature)):
## Note that we are turning the input into hex bytes here, which seems to be fine
## Working with strings was doing weird things
print("hit")
signature_present_boolean = True
return signature_present_boolean

File diff suppressed because one or more lines are too long