Merge pull request #106 from flashbots/traces-db-access

Use the trace DB for cached blocks
This commit is contained in:
Luke Van Seters 2021-10-18 13:06:54 -04:00 committed by GitHub
commit a5e4a2d1d4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 201 additions and 120 deletions

View File

@ -19,6 +19,13 @@ k8s_yaml(secret_from_dict("mev-inspect-db-credentials", inputs = {
"host": "postgresql",
}))
# if using https://github.com/taarushv/trace-db
# k8s_yaml(secret_from_dict("trace-db-credentials", inputs = {
# "username" : "username",
# "password": "password",
# "host": "trace-db-postgresql",
# }))
docker_build_with_restart("mev-inspect-py", ".",
entrypoint="/app/entrypoint.sh",
live_update=[

View File

@ -5,12 +5,12 @@ from sqlalchemy import pool
from alembic import context
from mev_inspect.db import get_sqlalchemy_database_uri
from mev_inspect.db import get_inspect_database_uri
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
config.set_main_option("sqlalchemy.url", get_sqlalchemy_database_uri())
config.set_main_option("sqlalchemy.url", get_inspect_database_uri())
# Interpret the config file for Python logging.
# This line sets up loggers basically.

19
cli.py
View File

@ -6,7 +6,7 @@ import click
from web3 import Web3
from mev_inspect.classifiers.trace import TraceClassifier
from mev_inspect.db import get_session
from mev_inspect.db import get_inspect_session, get_trace_session
from mev_inspect.inspect_block import inspect_block
from mev_inspect.provider import get_base_provider
@ -27,7 +27,9 @@ def cli():
@click.option("--rpc", default=lambda: os.environ.get(RPC_URL_ENV, ""))
@click.option("--cache/--no-cache", default=True)
def inspect_block_command(block_number: int, rpc: str, cache: bool):
db_session = get_session()
inspect_db_session = get_inspect_session()
trace_db_session = get_trace_session()
base_provider = get_base_provider(rpc)
w3 = Web3(base_provider)
trace_classifier = TraceClassifier()
@ -36,12 +38,12 @@ def inspect_block_command(block_number: int, rpc: str, cache: bool):
logger.info("Skipping cache")
inspect_block(
db_session,
inspect_db_session,
base_provider,
w3,
trace_classifier,
block_number,
should_cache=cache,
trace_db_session=trace_db_session,
)
@ -54,7 +56,9 @@ def inspect_many_blocks_command(
after_block: int, before_block: int, rpc: str, cache: bool
):
db_session = get_session()
inspect_db_session = get_inspect_session()
trace_db_session = get_trace_session()
base_provider = get_base_provider(rpc)
w3 = Web3(base_provider)
trace_classifier = TraceClassifier()
@ -72,13 +76,12 @@ def inspect_many_blocks_command(
logger.info(dashes)
inspect_block(
db_session,
inspect_db_session,
base_provider,
w3,
trace_classifier,
block_number,
should_write_classified_traces=False,
should_cache=cache,
trace_db_session=trace_db_session,
)

View File

@ -55,6 +55,24 @@ spec:
secretKeyRef:
name: mev-inspect-db-credentials
key: password
- name: TRACE_DB_HOST
valueFrom:
secretKeyRef:
name: trace-db-credentials
key: host
optional: true
- name: TRACE_DB_USER
valueFrom:
secretKeyRef:
name: trace-db-credentials
key: username
optional: true
- name: TRACE_DB_PASSWORD
valueFrom:
secretKeyRef:
name: trace-db-credentials
key: password
optional: true
- name: RPC_URL
valueFrom:
configMapKeyRef:

View File

@ -9,7 +9,8 @@ from mev_inspect.crud.latest_block_update import (
find_latest_block_update,
update_latest_block,
)
from mev_inspect.db import get_session
from mev_inspect.classifiers.trace import TraceClassifier
from mev_inspect.db import get_inspect_session, get_trace_session
from mev_inspect.inspect_block import inspect_block
from mev_inspect.provider import get_base_provider
from mev_inspect.signal_handler import GracefulKiller
@ -18,7 +19,6 @@ from mev_inspect.signal_handler import GracefulKiller
logging.basicConfig(filename="listener.log", level=logging.INFO)
logger = logging.getLogger(__name__)
# lag to make sure the blocks we see are settled
BLOCK_NUMBER_LAG = 5
@ -32,14 +32,17 @@ def run():
killer = GracefulKiller()
db_session = get_session()
inspect_db_session = get_inspect_session()
trace_db_session = get_trace_session()
trace_classifier = TraceClassifier()
base_provider = get_base_provider(rpc)
w3 = Web3(base_provider)
latest_block_number = get_latest_block_number(w3)
while not killer.kill_now:
last_written_block = find_latest_block_update(db_session)
last_written_block = find_latest_block_update(inspect_db_session)
logger.info(f"Latest block: {latest_block_number}")
logger.info(f"Last written block: {last_written_block}")
@ -55,14 +58,14 @@ def run():
logger.info(f"Writing block: {block_number}")
inspect_block(
db_session,
inspect_db_session,
base_provider,
w3,
trace_classifier,
block_number,
should_write_classified_traces=False,
should_cache=False,
trace_db_session=trace_db_session,
)
update_latest_block(db_session, block_number)
update_latest_block(inspect_db_session, block_number)
else:
time.sleep(5)
latest_block_number = get_latest_block_number(w3)
@ -71,4 +74,7 @@ def run():
if __name__ == "__main__":
run()
try:
run()
except Exception as e:
logger.error(e)

View File

@ -1,6 +1,7 @@
from pathlib import Path
from typing import List
from typing import List, Optional
from sqlalchemy import orm
from web3 import Web3
from mev_inspect.fees import fetch_base_fee_per_gas
@ -16,28 +17,27 @@ def get_latest_block_number(w3: Web3) -> int:
def create_from_block_number(
base_provider, w3: Web3, block_number: int, should_cache: bool
base_provider,
w3: Web3,
block_number: int,
trace_db_session: Optional[orm.Session],
) -> Block:
if not should_cache:
return fetch_block(w3, base_provider, block_number)
block: Optional[Block] = None
cache_path = _get_cache_path(block_number)
if trace_db_session is not None:
block = _find_block(trace_db_session, block_number)
if cache_path.is_file():
print(f"Cache for block {block_number} exists, " "loading data from cache")
return Block.parse_file(cache_path)
if block is None:
return _fetch_block(w3, base_provider, block_number)
else:
print(f"Cache for block {block_number} did not exist, getting data")
block = fetch_block(w3, base_provider, block_number)
cache_block(cache_path, block)
return block
def fetch_block(w3, base_provider, block_number: int) -> Block:
def _fetch_block(
w3,
base_provider,
block_number: int,
) -> Block:
block_json = w3.eth.get_block(block_number)
receipts_json = base_provider.make_request("eth_getBlockReceipts", [block_number])
traces_json = w3.parity.trace_block(block_number)
@ -57,6 +57,87 @@ def fetch_block(w3, base_provider, block_number: int) -> Block:
)
def _find_block(
trace_db_session: orm.Session,
block_number: int,
) -> Optional[Block]:
traces = _find_traces(trace_db_session, block_number)
receipts = _find_receipts(trace_db_session, block_number)
base_fee_per_gas = _find_base_fee(trace_db_session, block_number)
if traces is None or receipts is None or base_fee_per_gas is None:
return None
miner_address = _get_miner_address_from_traces(traces)
if miner_address is None:
return None
return Block(
block_number=block_number,
miner=miner_address,
base_fee_per_gas=base_fee_per_gas,
traces=traces,
receipts=receipts,
)
def _find_traces(
trace_db_session: orm.Session,
block_number: int,
) -> Optional[List[Trace]]:
result = trace_db_session.execute(
"SELECT raw_traces FROM block_traces WHERE block_number = :block_number",
params={"block_number": block_number},
).one_or_none()
if result is None:
return None
else:
(traces_json,) = result
return [Trace(**trace_json) for trace_json in traces_json]
def _find_receipts(
trace_db_session: orm.Session,
block_number: int,
) -> Optional[List[Receipt]]:
result = trace_db_session.execute(
"SELECT raw_receipts FROM block_receipts WHERE block_number = :block_number",
params={"block_number": block_number},
).one_or_none()
if result is None:
return None
else:
(receipts_json,) = result
return [Receipt(**receipt) for receipt in receipts_json]
def _find_base_fee(
trace_db_session: orm.Session,
block_number: int,
) -> Optional[int]:
result = trace_db_session.execute(
"SELECT base_fee_in_wei FROM base_fee WHERE block_number = :block_number",
params={"block_number": block_number},
).one_or_none()
if result is None:
return None
else:
(base_fee,) = result
return base_fee
def _get_miner_address_from_traces(traces: List[Trace]) -> Optional[str]:
for trace in traces:
if trace.type == TraceType.reward:
return trace.action["author"]
return None
def get_transaction_hashes(calls: List[Trace]) -> List[str]:
result = []

View File

@ -1,10 +1,23 @@
import os
from typing import Optional
from sqlalchemy import create_engine
from sqlalchemy import create_engine, orm
from sqlalchemy.orm import sessionmaker
def get_sqlalchemy_database_uri():
def get_trace_database_uri() -> Optional[str]:
username = os.getenv("TRACE_DB_USER")
password = os.getenv("TRACE_DB_PASSWORD")
host = os.getenv("TRACE_DB_HOST")
db_name = "trace_db"
if all(field is not None for field in [username, password, host]):
return f"postgresql://{username}:{password}@{host}/{db_name}"
return None
def get_inspect_database_uri():
username = os.getenv("POSTGRES_USER")
password = os.getenv("POSTGRES_PASSWORD")
host = os.getenv("POSTGRES_HOST")
@ -12,10 +25,24 @@ def get_sqlalchemy_database_uri():
return f"postgresql://{username}:{password}@{host}/{db_name}"
def get_engine():
return create_engine(get_sqlalchemy_database_uri())
def _get_engine(uri: str):
return create_engine(uri)
def get_session():
Session = sessionmaker(bind=get_engine())
def _get_session(uri: str):
Session = sessionmaker(bind=_get_engine(uri))
return Session()
def get_inspect_session() -> orm.Session:
uri = get_inspect_database_uri()
return _get_session(uri)
def get_trace_session() -> Optional[orm.Session]:
uri = get_trace_database_uri()
if uri is not None:
return _get_session(uri)
return None

View File

@ -1,5 +1,7 @@
import logging
from typing import Optional
from sqlalchemy import orm
from web3 import Web3
from mev_inspect.arbitrages import get_arbitrages
@ -34,19 +36,19 @@ logger = logging.getLogger(__name__)
def inspect_block(
db_session,
inspect_db_session: orm.Session,
base_provider,
w3: Web3,
trace_clasifier: TraceClassifier,
block_number: int,
should_cache: bool,
trace_db_session: Optional[orm.Session],
should_write_classified_traces: bool = True,
):
block = create_from_block_number(
base_provider,
w3,
block_number,
should_cache,
trace_db_session,
)
logger.info(f"Total traces: {len(block.traces)}")
@ -60,36 +62,36 @@ def inspect_block(
logger.info(f"Returned {len(classified_traces)} classified traces")
if should_write_classified_traces:
delete_classified_traces_for_block(db_session, block_number)
write_classified_traces(db_session, classified_traces)
delete_classified_traces_for_block(inspect_db_session, block_number)
write_classified_traces(inspect_db_session, classified_traces)
transfers = get_transfers(classified_traces)
logger.info(f"Found {len(transfers)} transfers")
delete_transfers_for_block(db_session, block_number)
write_transfers(db_session, transfers)
delete_transfers_for_block(inspect_db_session, block_number)
write_transfers(inspect_db_session, transfers)
swaps = get_swaps(classified_traces)
logger.info(f"Found {len(swaps)} swaps")
delete_swaps_for_block(db_session, block_number)
write_swaps(db_session, swaps)
delete_swaps_for_block(inspect_db_session, block_number)
write_swaps(inspect_db_session, swaps)
arbitrages = get_arbitrages(swaps)
logger.info(f"Found {len(arbitrages)} arbitrages")
delete_arbitrages_for_block(db_session, block_number)
write_arbitrages(db_session, arbitrages)
delete_arbitrages_for_block(inspect_db_session, block_number)
write_arbitrages(inspect_db_session, arbitrages)
liquidations = get_liquidations(classified_traces)
logger.info(f"Found {len(liquidations)} liquidations")
delete_liquidations_for_block(db_session, block_number)
write_liquidations(db_session, liquidations)
delete_liquidations_for_block(inspect_db_session, block_number)
write_liquidations(inspect_db_session, liquidations)
miner_payments = get_miner_payments(
block.miner, block.base_fee_per_gas, classified_traces, block.receipts
)
delete_miner_payments_for_block(db_session, block_number)
write_miner_payments(db_session, miner_payments)
delete_miner_payments_for_block(inspect_db_session, block_number)
write_miner_payments(inspect_db_session, miner_payments)

64
poetry.lock generated
View File

@ -31,14 +31,6 @@ python-dateutil = "*"
python-editor = ">=0.3"
SQLAlchemy = ">=1.3.0"
[[package]]
name = "appdirs"
version = "1.4.4"
description = "A small Python module for determining appropriate platform-specific dirs, e.g. a \"user data dir\"."
category = "dev"
optional = false
python-versions = "*"
[[package]]
name = "astroid"
version = "2.7.2"
@ -112,28 +104,6 @@ category = "main"
optional = false
python-versions = "*"
[[package]]
name = "black"
version = "21.7b0"
description = "The uncompromising code formatter."
category = "dev"
optional = false
python-versions = ">=3.6.2"
[package.dependencies]
appdirs = "*"
click = ">=7.1.2"
mypy-extensions = ">=0.4.3"
pathspec = ">=0.8.1,<1"
regex = ">=2020.1.8"
tomli = ">=0.2.6,<2.0.0"
[package.extras]
colorama = ["colorama (>=0.4.3)"]
d = ["aiohttp (>=3.6.0)", "aiohttp-cors (>=0.4.0)"]
python2 = ["typed-ast (>=1.4.2)"]
uvloop = ["uvloop (>=0.15.2)"]
[[package]]
name = "bottle"
version = "0.12.19"
@ -625,14 +595,6 @@ python-versions = "*"
[package.dependencies]
six = ">=1.9.0"
[[package]]
name = "pathspec"
version = "0.9.0"
description = "Utility library for gitignore style pattern matching of file paths."
category = "dev"
optional = false
python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,>=2.7"
[[package]]
name = "platformdirs"
version = "2.2.0"
@ -938,14 +900,6 @@ category = "dev"
optional = false
python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*"
[[package]]
name = "tomli"
version = "1.2.1"
description = "A lil' TOML parser"
category = "dev"
optional = false
python-versions = ">=3.6"
[[package]]
name = "toolz"
version = "0.11.1"
@ -1063,7 +1017,7 @@ multidict = ">=4.0"
[metadata]
lock-version = "1.1"
python-versions = "^3.9"
content-hash = "7b8511360be2f67f85c4e9e2d5ee73da2ce14cc691a44e8c6cb03552af6d75b5"
content-hash = "baade6f62f3adaff192b2c85b4f602f4990b9b99d6fcce904aeb5087b6fa1921"
[metadata.files]
aiohttp = [
@ -1109,10 +1063,6 @@ alembic = [
{file = "alembic-1.6.5-py2.py3-none-any.whl", hash = "sha256:e78be5b919f5bb184e3e0e2dd1ca986f2362e29a2bc933c446fe89f39dbe4e9c"},
{file = "alembic-1.6.5.tar.gz", hash = "sha256:a21fedebb3fb8f6bbbba51a11114f08c78709377051384c9c5ead5705ee93a51"},
]
appdirs = [
{file = "appdirs-1.4.4-py2.py3-none-any.whl", hash = "sha256:a841dacd6b99318a741b166adb07e19ee71a274450e68237b4650ca1055ab128"},
{file = "appdirs-1.4.4.tar.gz", hash = "sha256:7d5d0167b2b1ba821647616af46a749d1c653740dd0d2415100fe26e27afdf41"},
]
astroid = [
{file = "astroid-2.7.2-py3-none-any.whl", hash = "sha256:ecc50f9b3803ebf8ea19aa2c6df5622d8a5c31456a53c741d3be044d96ff0948"},
{file = "astroid-2.7.2.tar.gz", hash = "sha256:b6c2d75cd7c2982d09e7d41d70213e863b3ba34d3bd4014e08f167cee966e99e"},
@ -1140,10 +1090,6 @@ base58 = [
bitarray = [
{file = "bitarray-1.2.2.tar.gz", hash = "sha256:27a69ffcee3b868abab3ce8b17c69e02b63e722d4d64ffd91d659f81e9984954"},
]
black = [
{file = "black-21.7b0-py3-none-any.whl", hash = "sha256:1c7aa6ada8ee864db745b22790a32f94b2795c253a75d6d9b5e439ff10d23116"},
{file = "black-21.7b0.tar.gz", hash = "sha256:c8373c6491de9362e39271630b65b964607bc5c79c83783547d76c839b3aa219"},
]
bottle = [
{file = "bottle-0.12.19-py3-none-any.whl", hash = "sha256:f6b8a34fe9aa406f9813c02990db72ca69ce6a158b5b156d2c41f345016a723d"},
{file = "bottle-0.12.19.tar.gz", hash = "sha256:a9d73ffcbc6a1345ca2d7949638db46349f5b2b77dac65d6494d45c23628da2c"},
@ -1550,10 +1496,6 @@ packaging = [
parsimonious = [
{file = "parsimonious-0.8.1.tar.gz", hash = "sha256:3add338892d580e0cb3b1a39e4a1b427ff9f687858fdd61097053742391a9f6b"},
]
pathspec = [
{file = "pathspec-0.9.0-py2.py3-none-any.whl", hash = "sha256:7d15c4ddb0b5c802d161efc417ec1a2558ea2653c2e8ad9c19098201dc1c993a"},
{file = "pathspec-0.9.0.tar.gz", hash = "sha256:e564499435a2673d586f6b2130bb5b95f04a3ba06f81b8f895b651a3c76aabb1"},
]
platformdirs = [
{file = "platformdirs-2.2.0-py3-none-any.whl", hash = "sha256:4666d822218db6a262bdfdc9c39d21f23b4cfdb08af331a81e92751daf6c866c"},
{file = "platformdirs-2.2.0.tar.gz", hash = "sha256:632daad3ab546bd8e6af0537d09805cec458dce201bccfe23012df73332e181e"},
@ -1862,10 +1804,6 @@ toml = [
{file = "toml-0.10.2-py2.py3-none-any.whl", hash = "sha256:806143ae5bfb6a3c6e736a764057db0e6a0e05e338b5630894a5f779cabb4f9b"},
{file = "toml-0.10.2.tar.gz", hash = "sha256:b3bda1d108d5dd99f4a20d24d9c348e91c4db7ab1b749200bded2f839ccbe68f"},
]
tomli = [
{file = "tomli-1.2.1-py3-none-any.whl", hash = "sha256:8dd0e9524d6f386271a36b41dbf6c57d8e32fd96fd22b6584679dc569d20899f"},
{file = "tomli-1.2.1.tar.gz", hash = "sha256:a5b75cb6f3968abb47af1b40c1819dc519ea82bcc065776a866e8d74c5ca9442"},
]
toolz = [
{file = "toolz-0.11.1-py3-none-any.whl", hash = "sha256:1bc473acbf1a1db4e72a1ce587be347450e8f08324908b8a266b486f408f04d5"},
{file = "toolz-0.11.1.tar.gz", hash = "sha256:c7a47921f07822fe534fb1c01c9931ab335a4390c782bd28c6bcc7c2f71f3fbf"},

View File

@ -22,7 +22,6 @@ pytest-sugar = "^0.9.4"
pytest-cov = "^2.12.1"
coverage = "^5.5"
alembic = "^1.6.5"
black = "^21.7b0"
CProfileV = "^1.0.7"
regex = "^2021.10.8"