Merge ce047e1f2addd375d04958a166948c5b47959fc8 into 26aa190b03cfff865382818a641a125d2118b92f

This commit is contained in:
ZigaMr 2023-06-01 00:58:20 -07:00 committed by GitHub
commit 79320ea4c5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 359 additions and 31 deletions

View File

@ -45,6 +45,7 @@ Example:
export RPC_URL="http://111.111.111.111:8546"
```
**Note**: mev-inspect-py currently requires an RPC of a full archive node with support for Erigon traces and receipts. Geth additions have been added to translate geth traces and receipts to Erigon ones and can be accessed using `--geth` flag.
Next, start all services with:
@ -71,17 +72,19 @@ And load prices data
### Inspect a single block
Inspecting block [12914944](https://twitter.com/mevalphaleak/status/1420416437575901185):
**Note**: Add `geth` at the end instead of `parity` if RPC_URL points to a geth / geth like node.
```
./mev inspect 12914944
./mev inspect 12914944 parity
```
### Inspect many blocks
Inspecting blocks 12914944 to 12914954:
**Note**: Add `geth` at the end instead of `parity` if RPC_URL points to a geth / geth like node.
```
./mev inspect-many 12914944 12914954
./mev inspect-many 12914944 12914954 parity
```
### Inspect all incoming blocks

View File

@ -27,7 +27,7 @@ k8s_yaml(secret_from_dict("mev-inspect-db-credentials", inputs = {
"password": "password",
"host": "postgresql",
}))
# if using https://github.com/taarushv/trace-db
# k8s_yaml(secret_from_dict("trace-db-credentials", inputs = {
# "username" : "username",
@ -79,6 +79,8 @@ k8s_resource(
resource_deps=["postgresql", "redis-master"],
)
# k8s_resource(workload='mev-inspect', port_forwards='8101')
k8s_resource(
workload="mev-inspect-workers",
resource_deps=["postgresql", "redis-master"],
@ -102,17 +104,17 @@ local_resource(
# "export-aws-secret-access-key": "foobar",
#}))
#helm_remote(
# helm_remote(
# "localstack",
# repo_name="localstack-charts",
# repo_url="https://localstack.github.io/helm-charts",
#)
#
#local_resource(
# )
# local_resource(
# 'localstack-port-forward',
# serve_cmd='kubectl port-forward --namespace default svc/localstack 4566:4566',
# resource_deps=["localstack"]
#)
# )
#
#k8s_yaml(configmap_from_dict("mev-inspect-export", inputs = {
# "services": "s3",

33
cli.py
View File

@ -20,6 +20,7 @@ from mev_inspect.queue.tasks import (
inspect_many_blocks_task,
)
from mev_inspect.s3_export import export_block
from mev_inspect.utils import RPCType
RPC_URL_ENV = "RPC_URL"
@ -35,12 +36,18 @@ def cli():
@cli.command()
@click.argument("block_number", type=int)
@click.option("--rpc", default=lambda: os.environ.get(RPC_URL_ENV, ""))
@click.option(
"--type",
type=click.Choice(list(map(lambda x: x.name, RPCType)), case_sensitive=False),
default=RPCType.parity.name,
)
@coro
async def inspect_block_command(block_number: int, rpc: str):
async def inspect_block_command(block_number: int, rpc: str, type: str):
type_e = convert_str_to_enum(type)
inspect_db_session = get_inspect_session()
trace_db_session = get_trace_session()
inspector = MEVInspector(rpc)
inspector = MEVInspector(rpc, type_e)
await inspector.inspect_single_block(
inspect_db_session=inspect_db_session,
@ -49,6 +56,14 @@ async def inspect_block_command(block_number: int, rpc: str):
)
def convert_str_to_enum(type: str) -> RPCType:
if type == "parity":
return RPCType.parity
elif type == "geth":
return RPCType.geth
raise ValueError
@cli.command()
@click.argument("block_number", type=int)
@click.option("--rpc", default=lambda: os.environ.get(RPC_URL_ENV, ""))
@ -56,12 +71,11 @@ async def inspect_block_command(block_number: int, rpc: str):
async def fetch_block_command(block_number: int, rpc: str):
trace_db_session = get_trace_session()
inspector = MEVInspector(rpc)
inspector = MEVInspector(rpc, RPCType.parity)
block = await inspector.create_from_block(
block_number=block_number,
trace_db_session=trace_db_session,
)
print(block.json())
@ -69,6 +83,11 @@ async def fetch_block_command(block_number: int, rpc: str):
@click.argument("after_block", type=int)
@click.argument("before_block", type=int)
@click.option("--rpc", default=lambda: os.environ.get(RPC_URL_ENV, ""))
@click.option(
"--type",
type=click.Choice(list(map(lambda x: x.name, RPCType)), case_sensitive=False),
default=RPCType.parity.name,
)
@click.option(
"--max-concurrency",
type=int,
@ -85,12 +104,14 @@ async def inspect_many_blocks_command(
rpc: str,
max_concurrency: int,
request_timeout: int,
type: str,
):
type_e = convert_str_to_enum(type)
inspect_db_session = get_inspect_session()
trace_db_session = get_trace_session()
inspector = MEVInspector(
rpc,
rpc=rpc,
type=type_e,
max_concurrency=max_concurrency,
request_timeout=request_timeout,
)

View File

@ -4,6 +4,7 @@ import os
import dramatiq
from aiohttp_retry import ExponentialRetry, RetryClient
from web3 import HTTPProvider, Web3
from mev_inspect.block import get_latest_block_number
from mev_inspect.concurrency import coro
@ -21,6 +22,7 @@ from mev_inspect.queue.tasks import (
realtime_export_task,
)
from mev_inspect.signal_handler import GracefulKiller
from mev_inspect.utils import RPCType
logging.basicConfig(filename="listener.log", filemode="a", level=logging.INFO)
logger = logging.getLogger(__name__)
@ -29,6 +31,14 @@ logger = logging.getLogger(__name__)
BLOCK_NUMBER_LAG = 5
def convert_str_to_enum(type: str) -> RPCType:
if type == "parity":
return RPCType.parity
elif type == "geth":
return RPCType.geth
raise ValueError
@coro
async def run():
rpc = os.getenv("RPC_URL")
@ -52,8 +62,19 @@ async def run():
priority=HIGH_PRIORITY,
)
inspector = MEVInspector(rpc)
base_provider = get_base_provider(rpc)
w3 = Web3(HTTPProvider(rpc))
res = w3.provider.make_request("trace_block", ["earliest"])
if (
"error" in res
and res["error"]["message"]
== "the method trace_block does not exist/is not available"
):
type_e = RPCType.geth
else:
type_e = RPCType.parity
base_provider = get_base_provider(rpc, type=type_e)
# type_e = convert_str_to_enum(sys.argv[1])
inspector = MEVInspector(rpc, type_e)
while not killer.kill_now:
await inspect_next_block(

12
mev
View File

@ -58,15 +58,17 @@ case "$1" in
;;
inspect)
block_number=$2
rpc_type=$3
echo "Inspecting block $block_number"
kubectl exec -ti deploy/mev-inspect -- poetry run inspect-block $block_number
kubectl exec -ti deploy/mev-inspect -- poetry run inspect-block $block_number --type $rpc_type
;;
inspect-many)
after_block_number=$2
before_block_number=$3
echo "Inspecting from block $after_block_number to $before_block_number"
start_block_number=$2
end_block_number=$3
rpc_type=$4
echo "Inspecting from block $start_block_number to $end_block_number"
kubectl exec -ti deploy/mev-inspect -- \
poetry run inspect-many-blocks $after_block_number $before_block_number
poetry run inspect-many-blocks $start_block_number $end_block_number --type $rpc_type
;;
test)
shift

View File

@ -6,12 +6,21 @@ from sqlalchemy import orm
from web3 import Web3
from mev_inspect.fees import fetch_base_fee_per_gas
from mev_inspect.geth_poa_middleware import geth_poa_middleware
from mev_inspect.schemas.blocks import Block
from mev_inspect.schemas.receipts import Receipt
from mev_inspect.schemas.traces import Trace, TraceType
from mev_inspect.utils import hex_to_int
from mev_inspect.utils import RPCType, hex_to_int
logger = logging.getLogger(__name__)
_calltype_mapping = {
"CALL": "call",
"DELEGATECALL": "delegateCall",
"CREATE": "create",
"CREATE2": "create2",
"SUICIDE": "suicide",
"REWARD": "reward",
}
async def get_latest_block_number(base_provider) -> int:
@ -28,14 +37,32 @@ async def create_from_block_number(
block_number: int,
trace_db_session: Optional[orm.Session],
) -> Block:
type = (
RPCType.geth
if geth_poa_middleware in w3.provider.middlewares
else RPCType.parity
)
if type == RPCType.geth:
block_json = await w3.eth.get_block(block_number)
else:
block_json = dict()
block_timestamp, receipts, traces, base_fee_per_gas = await asyncio.gather(
_find_or_fetch_block_timestamp(w3, block_number, trace_db_session),
_find_or_fetch_block_receipts(w3, block_number, trace_db_session),
_find_or_fetch_block_traces(w3, block_number, trace_db_session),
_find_or_fetch_block_receipts(
w3, block_number, trace_db_session, type, block_json
),
_find_or_fetch_block_traces(
w3, block_number, trace_db_session, type, block_json
),
_find_or_fetch_base_fee_per_gas(w3, block_number, trace_db_session),
)
miner_address = _get_miner_address_from_traces(traces)
miner_address = (
_get_miner_address_from_traces(traces)
if type == RPCType.parity
else block_json.miner
)
return Block(
block_number=block_number,
@ -64,12 +91,21 @@ async def _find_or_fetch_block_receipts(
w3,
block_number: int,
trace_db_session: Optional[orm.Session],
type: RPCType,
block_json: dict,
) -> List[Receipt]:
if trace_db_session is not None:
existing_block_receipts = _find_block_receipts(trace_db_session, block_number)
if existing_block_receipts is not None:
return existing_block_receipts
if type == RPCType.geth:
geth_tx_receipts = await geth_get_tx_receipts_async(
w3.provider, block_json["transactions"]
)
receipts = geth_receipts_translator(block_json, geth_tx_receipts)
return receipts
return await _fetch_block_receipts(w3, block_number)
@ -77,12 +113,19 @@ async def _find_or_fetch_block_traces(
w3,
block_number: int,
trace_db_session: Optional[orm.Session],
type: RPCType,
block_json: dict,
) -> List[Trace]:
if trace_db_session is not None:
existing_block_traces = _find_block_traces(trace_db_session, block_number)
if existing_block_traces is not None:
return existing_block_traces
if type == RPCType.geth:
# Translate to parity format
traces = await geth_get_tx_traces_parity_format(w3.provider, block_json)
return traces
return await _fetch_block_traces(w3, block_number)
@ -200,3 +243,121 @@ def get_transaction_hashes(calls: List[Trace]) -> List[str]:
result.append(call.transaction_hash)
return result
# Geth specific additions
async def geth_get_tx_traces_parity_format(base_provider, block_json: dict):
# print(block_json['hash'].hex())
block_hash = block_json["hash"]
block_trace = await geth_get_tx_traces(base_provider, block_hash)
# print(block_trace)
parity_traces = []
for idx, trace in enumerate(block_trace["result"]):
if "result" in trace:
parity_traces.extend(
unwrap_tx_trace_for_parity(block_json, idx, trace["result"])
)
return parity_traces
async def geth_get_tx_traces(base_provider, block_hash):
block_trace = await base_provider.make_request(
"debug_traceBlockByHash", [block_hash.hex(), {"tracer": "callTracer"}]
)
return block_trace
def unwrap_tx_trace_for_parity(
block_json, tx_pos_in_block, tx_trace, position=[]
) -> List[Trace]:
response_list = []
try:
if tx_trace["type"] == "STATICCALL":
return []
action_dict = dict()
action_dict["callType"] = _calltype_mapping[tx_trace["type"]]
if action_dict["callType"] == "call":
action_dict["value"] = tx_trace["value"]
for key in ["from", "to", "gas", "input"]:
action_dict[key] = tx_trace[key]
result_dict = dict()
result_dict["gasUsed"] = tx_trace["gasUsed"]
if "output" in tx_trace.keys():
result_dict["output"] = tx_trace["output"]
response_list.append(
Trace(
action=action_dict,
block_hash=str(block_json["hash"]),
block_number=int(block_json["number"]),
result=result_dict,
subtraces=len(tx_trace["calls"]) if "calls" in tx_trace.keys() else 0,
trace_address=position,
transaction_hash=block_json["transactions"][tx_pos_in_block].hex(),
transaction_position=tx_pos_in_block,
type=TraceType(_calltype_mapping[tx_trace["type"]]),
)
)
except Exception as e:
logger.warn(f"error while unwraping tx trace for parity {e}")
return []
if "calls" in tx_trace.keys():
for idx, subcall in enumerate(tx_trace["calls"]):
response_list.extend(
unwrap_tx_trace_for_parity(
block_json, tx_pos_in_block, subcall, position + [idx]
)
)
return response_list
async def geth_get_tx_receipts_task(base_provider, tx):
receipt = await base_provider.make_request("eth_getTransactionReceipt", [tx.hex()])
return receipt
async def geth_get_tx_receipts_async(base_provider, transactions):
geth_tx_receipts = []
tasks = [
asyncio.create_task(geth_get_tx_receipts_task(base_provider, tx))
for tx in transactions
]
geth_tx_receipts = await asyncio.gather(*tasks)
# return [json.loads(tx_receipts) for tx_receipts in geth_tx_receipts]
return geth_tx_receipts
def geth_receipts_translator(block_json, geth_tx_receipts) -> List[Receipt]:
json_decoded_receipts = [
tx_receipt["result"]
if tx_receipt != None and ("result" in tx_receipt.keys())
else None
for tx_receipt in geth_tx_receipts
]
results = []
for idx, tx_receipt in enumerate(json_decoded_receipts):
if tx_receipt != None:
results.append(unwrap_tx_receipt_for_parity(block_json, idx, tx_receipt))
return results
def unwrap_tx_receipt_for_parity(block_json, tx_pos_in_block, tx_receipt) -> Receipt:
if tx_pos_in_block != int(tx_receipt["transactionIndex"], 16):
logger.info(
"Alert the position of transaction in block is mismatched ",
tx_pos_in_block,
tx_receipt["transactionIndex"],
)
return Receipt(
block_number=block_json["number"],
transaction_hash=tx_receipt["transactionHash"],
transaction_index=tx_pos_in_block,
gas_used=tx_receipt["gasUsed"],
effective_gas_price=tx_receipt["effectiveGasPrice"],
cumulative_gas_used=tx_receipt["cumulativeGasUsed"],
to=tx_receipt["to"],
)

View File

@ -0,0 +1,82 @@
"""
Modified asynchronous geth_poa_middleware which mirrors functionality of
https://github.com/ethereum/web3.py/blob/master/web3/middleware/geth_poa.py
"""
from typing import Any, Callable
from eth_utils.curried import (
apply_formatter_if,
apply_formatters_to_dict,
apply_key_map,
is_null,
)
from eth_utils.toolz import assoc, complement, compose
from hexbytes import HexBytes
from web3 import Web3 # noqa: F401
from web3._utils.rpc_abi import RPC
from web3.types import Formatters, RPCEndpoint, RPCResponse
async def get_geth_poa_middleware(
make_request: Callable[[RPCEndpoint, Any], RPCResponse],
request_formatters: Formatters = {},
result_formatters: Formatters = {},
error_formatters: Formatters = {},
) -> RPCResponse:
async def middleware(method: RPCEndpoint, params: Any) -> RPCResponse:
if method in request_formatters:
formatter = request_formatters[method]
formatted_params = formatter(params)
response = await make_request(method, formatted_params)
else:
response = await make_request(method, params)
if "result" in response and method in result_formatters:
formatter = result_formatters[method]
formatted_response = assoc(
response,
"result",
formatter(response["result"]),
)
return formatted_response
elif "error" in response and method in error_formatters:
formatter = error_formatters[method]
formatted_response = assoc(
response,
"error",
formatter(response["error"]),
)
return formatted_response
else:
return response
return middleware
is_not_null = complement(is_null)
remap_geth_poa_fields = apply_key_map(
{
"extraData": "proofOfAuthorityData",
}
)
pythonic_geth_poa = apply_formatters_to_dict(
{
"proofOfAuthorityData": HexBytes,
}
)
geth_poa_cleanup = compose(pythonic_geth_poa, remap_geth_poa_fields)
async def geth_poa_middleware(make_request: Callable[[RPCEndpoint, Any], Any], _: Web3):
return await get_geth_poa_middleware(
make_request=make_request,
request_formatters={},
result_formatters={
RPC.eth_getBlockByHash: apply_formatter_if(is_not_null, geth_poa_cleanup),
RPC.eth_getBlockByNumber: apply_formatter_if(is_not_null, geth_poa_cleanup),
},
error_formatters={},
)

View File

@ -103,9 +103,9 @@ async def inspect_many_blocks(
for block_number in range(after_block_number, before_block_number):
block = await create_from_block_number(
w3,
block_number,
trace_db_session,
w3=w3,
block_number=block_number,
trace_db_session=trace_db_session,
)
logger.info(f"Block: {block_number} -- Total traces: {len(block.traces)}")

View File

@ -13,6 +13,7 @@ from mev_inspect.classifiers.trace import TraceClassifier
from mev_inspect.inspect_block import inspect_block, inspect_many_blocks
from mev_inspect.methods import get_block_receipts, trace_block
from mev_inspect.provider import get_base_provider
from mev_inspect.utils import RPCType
logger = logging.getLogger(__name__)
@ -27,11 +28,13 @@ class MEVInspector:
def __init__(
self,
rpc: str,
type: RPCType = RPCType.parity,
max_concurrency: int = 1,
request_timeout: int = 300,
):
base_provider = get_base_provider(rpc, request_timeout=request_timeout)
base_provider = get_base_provider(rpc, request_timeout, type)
self.w3 = Web3(base_provider, modules={"eth": (AsyncEth,)}, middlewares=[])
self.type = type
self.trace_classifier = TraceClassifier()
self.max_concurrency = asyncio.Semaphore(max_concurrency)

View File

@ -1,9 +1,19 @@
from web3 import AsyncHTTPProvider, Web3
from mev_inspect.geth_poa_middleware import geth_poa_middleware
from mev_inspect.retry import http_retry_with_backoff_request_middleware
from mev_inspect.utils import RPCType
def get_base_provider(rpc: str, request_timeout: int = 500) -> Web3.AsyncHTTPProvider:
def get_base_provider(
rpc: str, request_timeout: int = 500, type: RPCType = RPCType.parity
) -> Web3.AsyncHTTPProvider:
base_provider = AsyncHTTPProvider(rpc, request_kwargs={"timeout": request_timeout})
base_provider.middlewares += (http_retry_with_backoff_request_middleware,)
if type is RPCType.geth:
base_provider.middlewares += (
geth_poa_middleware,
http_retry_with_backoff_request_middleware,
)
else:
base_provider.middlewares += (http_retry_with_backoff_request_middleware,)
return base_provider

View File

@ -7,6 +7,7 @@ from .utils import CamelModel
class TraceType(Enum):
call = "call"
create = "create"
create2 = "create2"
delegate_call = "delegateCall"
reward = "reward"
suicide = "suicide"

View File

@ -1,6 +1,13 @@
from enum import Enum
from hexbytes._utils import hexstr_to_bytes
class RPCType(Enum):
parity = 0
geth = 1
def hex_to_int(value: str) -> int:
return int.from_bytes(hexstr_to_bytes(value), byteorder="big")

14
poetry.lock generated
View File

@ -77,6 +77,14 @@ python-versions = ">=3.6"
[package.dependencies]
typing-extensions = ">=3.6.5"
[[package]]
name = "asyncio"
version = "3.4.3"
description = "reference implementation of PEP 3156"
category = "main"
optional = false
python-versions = "*"
[[package]]
name = "atomicwrites"
version = "1.4.0"
@ -1289,6 +1297,12 @@ async-timeout = [
{file = "async-timeout-4.0.0.tar.gz", hash = "sha256:7d87a4e8adba8ededb52e579ce6bc8276985888913620c935094c2276fd83382"},
{file = "async_timeout-4.0.0-py3-none-any.whl", hash = "sha256:f3303dddf6cafa748a92747ab6c2ecf60e0aeca769aee4c151adfce243a05d9b"},
]
asyncio = [
{file = "asyncio-3.4.3-cp33-none-win32.whl", hash = "sha256:b62c9157d36187eca799c378e572c969f0da87cd5fc42ca372d92cdb06e7e1de"},
{file = "asyncio-3.4.3-cp33-none-win_amd64.whl", hash = "sha256:c46a87b48213d7464f22d9a497b9eef8c1928b68320a2fa94240f969f6fec08c"},
{file = "asyncio-3.4.3-py3-none-any.whl", hash = "sha256:c4d18b22701821de07bd6aea8b53d21449ec0ec5680645e5317062ea21817d2d"},
{file = "asyncio-3.4.3.tar.gz", hash = "sha256:83360ff8bc97980e4ff25c964c7bd3923d333d177aa4f7fb736b019f26c7cb41"},
]
atomicwrites = [
{file = "atomicwrites-1.4.0-py2.py3-none-any.whl", hash = "sha256:6d1784dea7c0c8d4a5172b6c620f40b6e4cbfdf96d783691f2e1302a7b88e197"},
{file = "atomicwrites-1.4.0.tar.gz", hash = "sha256:ae70396ad1a434f9c7046fd2dd196fc04b12f9e91ffb859164193be8b6168a7a"},

View File

@ -16,6 +16,7 @@ dramatiq = {extras = ["redis"], version = "^1.12.1"}
pycoingecko = "^2.2.0"
boto3 = "^1.20.48"
aiohttp-retry = "^2.4.6"
asyncio = "^3.4.3"
[tool.poetry.dev-dependencies]
pre-commit = "^2.13.0"