diff --git a/README.md b/README.md index 9b299c3..bf9eb5e 100644 --- a/README.md +++ b/README.md @@ -45,6 +45,7 @@ Example: export RPC_URL="http://111.111.111.111:8546" ``` +**Note**: mev-inspect-py currently requires an RPC of a full archive node with support for Erigon traces and receipts. Geth additions have been added to translate geth traces and receipts to Erigon ones and can be accessed using `--geth` flag. Next, start all services with: @@ -65,6 +66,7 @@ On first startup, you'll need to apply database migrations with: ### Inspect a single block Inspecting block [12914944](https://twitter.com/mevalphaleak/status/1420416437575901185): +**Note**: Add `--geth` at the end if RPC_URL points to a geth / geth like node. ``` ./mev inspect 12914944 @@ -73,6 +75,7 @@ Inspecting block [12914944](https://twitter.com/mevalphaleak/status/142041643757 ### Inspect many blocks Inspecting blocks 12914944 to 12914954: +**Note**: Add `--geth` at the end if RPC_URL points to a geth / geth like node. ``` ./mev inspect-many 12914944 12914954 diff --git a/cli.py b/cli.py index 12d62c7..4206570 100644 --- a/cli.py +++ b/cli.py @@ -3,6 +3,8 @@ import os import sys import click +from web3 import Web3 +from web3.middleware import geth_poa_middleware from mev_inspect.concurrency import coro from mev_inspect.db import get_inspect_session, get_trace_session @@ -21,12 +23,13 @@ def cli(): @cli.command() @click.argument("block_number", type=int) @click.option("--rpc", default=lambda: os.environ.get(RPC_URL_ENV, "")) +@click.option("--geth/--no-geth", default=False) @coro -async def inspect_block_command(block_number: int, rpc: str): +async def inspect_block_command(block_number: int, rpc: str, geth: bool): inspect_db_session = get_inspect_session() trace_db_session = get_trace_session() - inspector = MEVInspector(rpc, inspect_db_session, trace_db_session) + inspector = MEVInspector(rpc, inspect_db_session, trace_db_session, geth) await inspector.inspect_single_block(block=block_number) @@ -38,7 +41,7 @@ async def fetch_block_command(block_number: int, rpc: str): inspect_db_session = get_inspect_session() trace_db_session = get_trace_session() - inspector = MEVInspector(rpc, inspect_db_session, trace_db_session) + inspector = MEVInspector(rpc, inspect_db_session, trace_db_session, false) block = await inspector.create_from_block(block_number=block_number) print(block.json()) @@ -47,6 +50,8 @@ async def fetch_block_command(block_number: int, rpc: str): @click.argument("after_block", type=int) @click.argument("before_block", type=int) @click.option("--rpc", default=lambda: os.environ.get(RPC_URL_ENV, "")) +@click.option("--geth/--no-geth", default=False) + @click.option( "--max-concurrency", type=int, @@ -63,22 +68,22 @@ async def inspect_many_blocks_command( rpc: str, max_concurrency: int, request_timeout: int, + geth: bool ): inspect_db_session = get_inspect_session() trace_db_session = get_trace_session() - inspector = MEVInspector( rpc, inspect_db_session, trace_db_session, max_concurrency=max_concurrency, request_timeout=request_timeout, + geth ) await inspector.inspect_many_blocks( after_block=after_block, before_block=before_block ) - def get_rpc_url() -> str: return os.environ["RPC_URL"] diff --git a/mev_inspect/block.py b/mev_inspect/block.py index ab61919..592600c 100644 --- a/mev_inspect/block.py +++ b/mev_inspect/block.py @@ -1,6 +1,9 @@ import asyncio import logging from typing import List, Optional +import json +import asyncio +import aiohttp from sqlalchemy import orm from web3 import Web3 @@ -27,6 +30,7 @@ async def get_latest_block_number(base_provider) -> int: async def create_from_block_number( base_provider, w3: Web3, + geth: bool, block_number: int, trace_db_session: Optional[orm.Session], ) -> Block: @@ -42,38 +46,46 @@ async def create_from_block_number( return block -async def _fetch_block(w3, base_provider, block_number: int, retries: int = 0) -> Block: - block_json, receipts_json, traces_json, base_fee_per_gas = await asyncio.gather( - w3.eth.get_block(block_number), - base_provider.make_request("eth_getBlockReceipts", [block_number]), - base_provider.make_request("trace_block", [block_number]), - fetch_base_fee_per_gas(w3, block_number), - ) - - try: - receipts: List[Receipt] = [ - Receipt(**receipt) for receipt in receipts_json["result"] - ] - traces = [Trace(**trace_json) for trace_json in traces_json["result"]] - except KeyError as e: - logger.warning( - f"Failed to create objects from block: {block_number}: {e}, retrying: {retries + 1} / 3" +async def _fetch_block(w3, base_provider, geth, block_number: int, retries: int = 0) -> Block: + if not geth: + block_json, receipts_json, traces_json, base_fee_per_gas = await asyncio.gather( + w3.eth.get_block(block_number), + base_provider.make_request("eth_getBlockReceipts", [block_number]), + base_provider.make_request("trace_block", [block_number]), + fetch_base_fee_per_gas(w3, block_number), ) - if retries < 3: - await asyncio.sleep(5) - return await _fetch_block(w3, base_provider, block_number, retries) - else: - raise - - return Block( - block_number=block_number, - block_timestamp=block_json["timestamp"], - miner=block_json["miner"], - base_fee_per_gas=base_fee_per_gas, - traces=traces, - receipts=receipts, - ) + try: + receipts: List[Receipt] = [ + Receipt(**receipt) for receipt in receipts_json["result"] + ] + traces = [Trace(**trace_json) for trace_json in traces_json["result"]] + except KeyError as e: + logger.warning( + f"Failed to create objects from block: {block_number}: {e}, retrying: {retries + 1} / 3" + ) + if retries < 3: + await asyncio.sleep(5) + return await _fetch_block(w3, base_provider, block_number, retries) + else: + raise + else: + traces = geth_get_tx_traces_parity_format(base_provider, block_json) + geth_tx_receipts = geth_get_tx_receipts( + base_provider, block_json["transactions"] + ) + receipts = geth_receipts_translator(block_json, geth_tx_receipts) + base_fee_per_gas = 0 + + return Block( + block_number=block_number, + block_timestamp=block_json["timestamp"], + miner=block_json["miner"], + base_fee_per_gas=base_fee_per_gas, + traces=traces, + receipts=receipts, + ) + def _find_block( trace_db_session: orm.Session, @@ -106,7 +118,6 @@ def _find_block( receipts=receipts, ) - def _find_block_timestamp( trace_db_session: orm.Session, block_number: int, @@ -191,3 +202,143 @@ def get_transaction_hashes(calls: List[Trace]) -> List[str]: result.append(call.transaction_hash) return result + +# Geth specific additions + + +def geth_get_tx_traces_parity_format(base_provider, block_json): + block_hash = block_json["hash"] + block_trace = geth_get_tx_traces(base_provider, block_hash) + parity_traces = [] + for idx, trace in enumerate(block_trace["result"]): + if "result" in trace: + parity_traces.extend( + unwrap_tx_trace_for_parity(block_json, idx, trace["result"]) + ) + return parity_traces + + +def geth_get_tx_traces(base_provider, block_hash): + block_trace = base_provider.make_request( + "debug_traceBlockByHash", [block_hash.hex(), {"tracer": "callTracer"}] + ) + return block_trace + + +def unwrap_tx_trace_for_parity( + block_json, tx_pos_in_block, tx_trace, position=[] +) -> List[Trace]: + response_list = [] + _calltype_mapping = { + "CALL": "call", + "DELEGATECALL": "delegateCall", + "CREATE": "create", + "SUICIDE": "suicide", + "REWARD": "reward", + } + try: + if tx_trace["type"] == "STATICCALL": + return [] + action_dict = dict() + action_dict["callType"] = _calltype_mapping[tx_trace["type"]] + if action_dict["callType"] == "call": + action_dict["value"] = tx_trace["value"] + for key in ["from", "to", "gas", "input"]: + action_dict[key] = tx_trace[key] + + result_dict = dict() + for key in ["gasUsed", "output"]: + result_dict[key] = tx_trace[key] + + response_list.append( + Trace( + action=action_dict, + block_hash=str(block_json["hash"]), + block_number=int(block_json["number"]), + result=result_dict, + subtraces=len(tx_trace["calls"]) if "calls" in tx_trace.keys() else 0, + trace_address=position, + transaction_hash=block_json["transactions"][tx_pos_in_block].hex(), + transaction_position=tx_pos_in_block, + type=TraceType(_calltype_mapping[tx_trace["type"]]), + ) + ) + except Exception: + return [] + + if "calls" in tx_trace.keys(): + for idx, subcall in enumerate(tx_trace["calls"]): + response_list.extend( + unwrap_tx_trace_for_parity( + block_json, tx_pos_in_block, subcall, position + [idx] + ) + ) + return response_list + + +async def geth_get_tx_receipts_task(session, endpoint_uri, tx): + data = { + "jsonrpc": "2.0", + "id": "0", + "method": "eth_getTransactionReceipt", + "params": [tx.hex()], + } + async with session.post(endpoint_uri, json=data) as response: + if response.status != 200: + response.raise_for_status() + return await response.text() + + +async def geth_get_tx_receipts_async(endpoint_uri, transactions): + geth_tx_receipts = [] + async with aiohttp.ClientSession() as session: + tasks = [ + asyncio.create_task(geth_get_tx_receipts_task(session, endpoint_uri, tx)) + for tx in transactions + ] + geth_tx_receipts = await asyncio.gather(*tasks) + return [json.loads(tx_receipts) for tx_receipts in geth_tx_receipts] + + +def geth_get_tx_receipts(base_provider, transactions): + return asyncio.run( + geth_get_tx_receipts_async(base_provider.endpoint_uri, transactions) + ) + + +def geth_receipts_translator(block_json, geth_tx_receipts) -> List[Receipt]: + json_decoded_receipts = [ + tx_receipt["result"] + if tx_receipt != None and ("result" in tx_receipt.keys()) + else None + for tx_receipt in geth_tx_receipts + ] + results = [] + for idx, tx_receipt in enumerate(json_decoded_receipts): + if tx_receipt != None: + results.append(unwrap_tx_receipt_for_parity(block_json, idx, tx_receipt)) + return results + + +def unwrap_tx_receipt_for_parity(block_json, tx_pos_in_block, tx_receipt) -> Receipt: + try: + if tx_pos_in_block != int(tx_receipt["transactionIndex"], 16): + print( + "Alert the position of transaction in block is mismatched ", + tx_pos_in_block, + tx_receipt["transactionIndex"], + ) + return Receipt( + block_number=block_json["number"], + transaction_hash=tx_receipt["transactionHash"], + transaction_index=tx_pos_in_block, + gas_used=tx_receipt["gasUsed"], + effective_gas_price=tx_receipt["effectiveGasPrice"], + cumulative_gas_used=tx_receipt["cumulativeGasUsed"], + to=tx_receipt["to"], + ) + + except Exception as e: + print("error while decoding receipt", tx_receipt, e) + + return Receipt() diff --git a/mev_inspect/inspect_block.py b/mev_inspect/inspect_block.py index c5f849d..572394a 100644 --- a/mev_inspect/inspect_block.py +++ b/mev_inspect/inspect_block.py @@ -43,7 +43,8 @@ async def inspect_block( inspect_db_session: orm.Session, base_provider, w3: Web3, - trace_classifier: TraceClassifier, + geth: bool, + trace_clasifier: TraceClassifier, block_number: int, trace_db_session: Optional[orm.Session], should_write_classified_traces: bool = True, @@ -51,6 +52,7 @@ async def inspect_block( block = await create_from_block_number( base_provider, w3, + geth, block_number, trace_db_session, ) diff --git a/poetry.lock b/poetry.lock index 6a37a0a..c398671 100644 --- a/poetry.lock +++ b/poetry.lock @@ -66,6 +66,14 @@ python-versions = ">=3.6" [package.dependencies] typing-extensions = ">=3.6.5" +[[package]] +name = "asyncio" +version = "3.4.3" +description = "reference implementation of PEP 3156" +category = "main" +optional = false +python-versions = "*" + [[package]] name = "atomicwrites" version = "1.4.0" @@ -1125,6 +1133,12 @@ async-timeout = [ {file = "async-timeout-4.0.0.tar.gz", hash = "sha256:7d87a4e8adba8ededb52e579ce6bc8276985888913620c935094c2276fd83382"}, {file = "async_timeout-4.0.0-py3-none-any.whl", hash = "sha256:f3303dddf6cafa748a92747ab6c2ecf60e0aeca769aee4c151adfce243a05d9b"}, ] +asyncio = [ + {file = "asyncio-3.4.3-cp33-none-win32.whl", hash = "sha256:b62c9157d36187eca799c378e572c969f0da87cd5fc42ca372d92cdb06e7e1de"}, + {file = "asyncio-3.4.3-cp33-none-win_amd64.whl", hash = "sha256:c46a87b48213d7464f22d9a497b9eef8c1928b68320a2fa94240f969f6fec08c"}, + {file = "asyncio-3.4.3-py3-none-any.whl", hash = "sha256:c4d18b22701821de07bd6aea8b53d21449ec0ec5680645e5317062ea21817d2d"}, + {file = "asyncio-3.4.3.tar.gz", hash = "sha256:83360ff8bc97980e4ff25c964c7bd3923d333d177aa4f7fb736b019f26c7cb41"}, +] atomicwrites = [ {file = "atomicwrites-1.4.0-py2.py3-none-any.whl", hash = "sha256:6d1784dea7c0c8d4a5172b6c620f40b6e4cbfdf96d783691f2e1302a7b88e197"}, {file = "atomicwrites-1.4.0.tar.gz", hash = "sha256:ae70396ad1a434f9c7046fd2dd196fc04b12f9e91ffb859164193be8b6168a7a"}, diff --git a/pyproject.toml b/pyproject.toml index 9217449..61851d0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,6 +12,7 @@ hexbytes = "^0.2.1" click = "^8.0.1" psycopg2 = "^2.9.1" aiohttp = "^3.8.0" +asyncio = "^3.4.3" [tool.poetry.dev-dependencies] pre-commit = "^2.13.0"