diff --git a/README.md b/README.md index 5a4acef..d8c911c 100644 --- a/README.md +++ b/README.md @@ -45,6 +45,7 @@ Example: export RPC_URL="http://111.111.111.111:8546" ``` +**Note**: mev-inspect-py currently requires an RPC of a full archive node with support for Erigon traces and receipts. Geth additions have been added to translate geth traces and receipts to Erigon ones and can be accessed using `--geth` flag. Next, start all services with: @@ -71,17 +72,19 @@ And load prices data ### Inspect a single block Inspecting block [12914944](https://twitter.com/mevalphaleak/status/1420416437575901185): +**Note**: Add `geth` at the end instead of `parity` if RPC_URL points to a geth / geth like node. ``` -./mev inspect 12914944 +./mev inspect 12914944 parity ``` ### Inspect many blocks Inspecting blocks 12914944 to 12914954: +**Note**: Add `geth` at the end instead of `parity` if RPC_URL points to a geth / geth like node. ``` -./mev inspect-many 12914944 12914954 +./mev inspect-many 12914944 12914954 parity ``` ### Inspect all incoming blocks diff --git a/cli.py b/cli.py index b5a6674..cfa5a0b 100644 --- a/cli.py +++ b/cli.py @@ -20,6 +20,8 @@ from mev_inspect.queue.tasks import ( inspect_many_blocks_task, ) from mev_inspect.s3_export import export_block +from mev_inspect.utils import RPCType +#from mev_inspect.prices import fetch_all_supported_prices RPC_URL_ENV = "RPC_URL" @@ -35,12 +37,18 @@ def cli(): @cli.command() @click.argument("block_number", type=int) @click.option("--rpc", default=lambda: os.environ.get(RPC_URL_ENV, "")) +@click.option( + "--type", + type=click.Choice(list(map(lambda x: x.name, RPCType)), case_sensitive=False), + default=RPCType.parity.name, +) @coro -async def inspect_block_command(block_number: int, rpc: str): +async def inspect_block_command(block_number: int, rpc: str, type: str): + type_e = convert_str_to_enum(type) inspect_db_session = get_inspect_session() trace_db_session = get_trace_session() - inspector = MEVInspector(rpc) + inspector = MEVInspector(rpc, inspect_db_session, trace_db_session, type_e) await inspector.inspect_single_block( inspect_db_session=inspect_db_session, @@ -48,6 +56,13 @@ async def inspect_block_command(block_number: int, rpc: str): block=block_number, ) +def convert_str_to_enum(type: str) -> RPCType: + if type == "parity": + return RPCType.parity + elif type == "geth": + return RPCType.geth + raise ValueError + @cli.command() @click.argument("block_number", type=int) @@ -56,12 +71,11 @@ async def inspect_block_command(block_number: int, rpc: str): async def fetch_block_command(block_number: int, rpc: str): trace_db_session = get_trace_session() - inspector = MEVInspector(rpc) + inspector = MEVInspector(rpc, RPCType.parity) block = await inspector.create_from_block( block_number=block_number, trace_db_session=trace_db_session, ) - print(block.json()) @@ -69,6 +83,11 @@ async def fetch_block_command(block_number: int, rpc: str): @click.argument("after_block", type=int) @click.argument("before_block", type=int) @click.option("--rpc", default=lambda: os.environ.get(RPC_URL_ENV, "")) +@click.option( + "--type", + type=click.Choice(list(map(lambda x: x.name, RPCType)), case_sensitive=False), + default=RPCType.parity.name, +) @click.option( "--max-concurrency", type=int, @@ -85,12 +104,14 @@ async def inspect_many_blocks_command( rpc: str, max_concurrency: int, request_timeout: int, + type: str, ): + type_e = convert_str_to_enum(type) inspect_db_session = get_inspect_session() trace_db_session = get_trace_session() - inspector = MEVInspector( - rpc, + rpc=rpc, + type=type_e, max_concurrency=max_concurrency, request_timeout=request_timeout, ) diff --git a/mev b/mev index 47ffb52..a2f09df 100755 --- a/mev +++ b/mev @@ -58,15 +58,17 @@ case "$1" in ;; inspect) block_number=$2 + rpc_type=$3 echo "Inspecting block $block_number" - kubectl exec -ti deploy/mev-inspect -- poetry run inspect-block $block_number + kubectl exec -ti deploy/mev-inspect -- poetry run inspect-block $block_number --type $rpc_type ;; inspect-many) - after_block_number=$2 - before_block_number=$3 - echo "Inspecting from block $after_block_number to $before_block_number" + start_block_number=$2 + end_block_number=$3 + rpc_type=$4 + echo "Inspecting from block $start_block_number to $end_block_number" kubectl exec -ti deploy/mev-inspect -- \ - poetry run inspect-many-blocks $after_block_number $before_block_number + poetry run inspect-many-blocks $start_block_number $end_block_number --type $rpc_type ;; test) shift diff --git a/mev_inspect/block.py b/mev_inspect/block.py index cf80fa5..8c62915 100644 --- a/mev_inspect/block.py +++ b/mev_inspect/block.py @@ -1,6 +1,7 @@ import asyncio import logging from typing import List, Optional +from aiohttp import TraceRequestStartParams from sqlalchemy import orm from web3 import Web3 @@ -9,9 +10,17 @@ from mev_inspect.fees import fetch_base_fee_per_gas from mev_inspect.schemas.blocks import Block from mev_inspect.schemas.receipts import Receipt from mev_inspect.schemas.traces import Trace, TraceType -from mev_inspect.utils import hex_to_int +from mev_inspect.utils import RPCType, hex_to_int logger = logging.getLogger(__name__) +_calltype_mapping = { + "CALL": "call", + "DELEGATECALL": "delegateCall", + "CREATE": "create", + "CREATE2": "create2", + "SUICIDE": "suicide", + "REWARD": "reward", +} async def get_latest_block_number(base_provider) -> int: @@ -25,13 +34,20 @@ async def get_latest_block_number(base_provider) -> int: async def create_from_block_number( w3: Web3, + type: RPCType, block_number: int, trace_db_session: Optional[orm.Session], ) -> Block: + + if type == RPCType.geth: + block_json = await asyncio.gather(w3.eth.get_block(block_number)) + else: + block_json = [] + block_timestamp, receipts, traces, base_fee_per_gas = await asyncio.gather( _find_or_fetch_block_timestamp(w3, block_number, trace_db_session), - _find_or_fetch_block_receipts(w3, block_number, trace_db_session), - _find_or_fetch_block_traces(w3, block_number, trace_db_session), + _find_or_fetch_block_receipts(w3, block_number, trace_db_session, type, block_json), + _find_or_fetch_block_traces(w3, block_number, trace_db_session, type, block_json), _find_or_fetch_base_fee_per_gas(w3, block_number, trace_db_session), ) @@ -56,20 +72,28 @@ async def _find_or_fetch_block_timestamp( existing_block_timestamp = _find_block_timestamp(trace_db_session, block_number) if existing_block_timestamp is not None: return existing_block_timestamp - - return await _fetch_block_timestamp(w3, block_number) + + return await _fetch_block_timestamp(w3, block_number) async def _find_or_fetch_block_receipts( w3, block_number: int, trace_db_session: Optional[orm.Session], + type: RPCType, + block_json: list = [] ) -> List[Receipt]: if trace_db_session is not None: existing_block_receipts = _find_block_receipts(trace_db_session, block_number) if existing_block_receipts is not None: return existing_block_receipts + if type == RPCType.geth: + geth_tx_receipts = await geth_get_tx_receipts_async( + w3.provider, block_json[0]["transactions"] + ) + receipts = geth_receipts_translator(block_json[0], geth_tx_receipts) + return await _fetch_block_receipts(w3, block_number) @@ -77,12 +101,19 @@ async def _find_or_fetch_block_traces( w3, block_number: int, trace_db_session: Optional[orm.Session], + type: RPCType, + block_json: list = [] ) -> List[Trace]: if trace_db_session is not None: existing_block_traces = _find_block_traces(trace_db_session, block_number) if existing_block_traces is not None: return existing_block_traces + if type == RPCType.geth: + # Translate to parity format + traces = await geth_get_tx_traces_parity_format(w3.provider, block_json[0]) + return traces + return await _fetch_block_traces(w3, block_number) @@ -200,3 +231,120 @@ def get_transaction_hashes(calls: List[Trace]) -> List[str]: result.append(call.transaction_hash) return result + + +# Geth specific additions + + +async def geth_get_tx_traces_parity_format(base_provider, block_json: dict): + # print(block_json['hash'].hex()) + block_hash = block_json["hash"] + block_trace = await geth_get_tx_traces(base_provider, block_hash) + # print(block_trace) + parity_traces = [] + for idx, trace in enumerate(block_trace["result"]): + if "result" in trace: + parity_traces.extend( + unwrap_tx_trace_for_parity(block_json, idx, trace["result"]) + ) + return parity_traces + + +async def geth_get_tx_traces(base_provider, block_hash): + block_trace = await base_provider.make_request( + "debug_traceBlockByHash", [block_hash.hex(), {"tracer": "callTracer"}] + ) + return block_trace + + +def unwrap_tx_trace_for_parity( + block_json, tx_pos_in_block, tx_trace, position=[] +) -> List[Trace]: + response_list = [] + try: + if tx_trace["type"] == "STATICCALL": + return [] + action_dict = dict() + action_dict["callType"] = _calltype_mapping[tx_trace["type"]] + if action_dict["callType"] == "call": + action_dict["value"] = tx_trace["value"] + for key in ["from", "to", "gas", "input"]: + action_dict[key] = tx_trace[key] + + result_dict = dict() + for key in ["gasUsed", "output"]: + result_dict[key] = tx_trace[key] + + response_list.append( + Trace( + action=action_dict, + block_hash=str(block_json["hash"]), + block_number=int(block_json["number"]), + result=result_dict, + subtraces=len(tx_trace["calls"]) if "calls" in tx_trace.keys() else 0, + trace_address=position, + transaction_hash=block_json["transactions"][tx_pos_in_block].hex(), + transaction_position=tx_pos_in_block, + type=TraceType(_calltype_mapping[tx_trace["type"]]), + ) + ) + except Exception as e: + logger.warn(f"error while unwraping tx trace for parity {e}") + return [] + + if "calls" in tx_trace.keys(): + for idx, subcall in enumerate(tx_trace["calls"]): + response_list.extend( + unwrap_tx_trace_for_parity( + block_json, tx_pos_in_block, subcall, position + [idx] + ) + ) + return response_list + + +async def geth_get_tx_receipts_task(base_provider, tx): + receipt = await base_provider.make_request("eth_getTransactionReceipt", [tx.hex()]) + return receipt + + +async def geth_get_tx_receipts_async(base_provider, transactions): + geth_tx_receipts = [] + tasks = [ + asyncio.create_task(geth_get_tx_receipts_task(base_provider, tx)) + for tx in transactions + ] + geth_tx_receipts = await asyncio.gather(*tasks) + # return [json.loads(tx_receipts) for tx_receipts in geth_tx_receipts] + return geth_tx_receipts + + +def geth_receipts_translator(block_json, geth_tx_receipts) -> List[Receipt]: + json_decoded_receipts = [ + tx_receipt["result"] + if tx_receipt != None and ("result" in tx_receipt.keys()) + else None + for tx_receipt in geth_tx_receipts + ] + results = [] + for idx, tx_receipt in enumerate(json_decoded_receipts): + if tx_receipt != None: + results.append(unwrap_tx_receipt_for_parity(block_json, idx, tx_receipt)) + return results + + +def unwrap_tx_receipt_for_parity(block_json, tx_pos_in_block, tx_receipt) -> Receipt: + if tx_pos_in_block != int(tx_receipt["transactionIndex"], 16): + logger.info( + "Alert the position of transaction in block is mismatched ", + tx_pos_in_block, + tx_receipt["transactionIndex"], + ) + return Receipt( + block_number=block_json["number"], + transaction_hash=tx_receipt["transactionHash"], + transaction_index=tx_pos_in_block, + gas_used=tx_receipt["gasUsed"], + effective_gas_price=tx_receipt["effectiveGasPrice"], + cumulative_gas_used=tx_receipt["cumulativeGasUsed"], + to=tx_receipt["to"], + ) diff --git a/mev_inspect/geth_poa_middleware.py b/mev_inspect/geth_poa_middleware.py new file mode 100644 index 0000000..6eb103d --- /dev/null +++ b/mev_inspect/geth_poa_middleware.py @@ -0,0 +1,101 @@ +""" +Modified asynchronous geth_poa_middleware which mirrors functionality of +https://github.com/ethereum/web3.py/blob/master/web3/middleware/geth_poa.py +""" +from typing import ( + Any, + Callable, +) + +from hexbytes import ( + HexBytes, +) + +from eth_utils.curried import ( + apply_formatter_if, + apply_formatters_to_dict, + apply_key_map, + is_null, +) +from eth_utils.toolz import ( + complement, + compose, + assoc, +) + +from web3._utils.rpc_abi import ( + RPC, +) + +from web3.types import ( + Formatters, + RPCEndpoint, + RPCResponse, +) + +from web3 import Web3 # noqa: F401 + + +async def get_geth_poa_middleware( + make_request: Callable[[RPCEndpoint, Any], RPCResponse], + request_formatters: Formatters = {}, + result_formatters: Formatters = {}, + error_formatters: Formatters = {}, +) -> RPCResponse: + async def middleware(method: RPCEndpoint, params: Any) -> RPCResponse: + if method in request_formatters: + formatter = request_formatters[method] + formatted_params = formatter(params) + response = await make_request(method, formatted_params) + else: + response = await make_request(method, params) + + if "result" in response and method in result_formatters: + formatter = result_formatters[method] + formatted_response = assoc( + response, + "result", + formatter(response["result"]), + ) + return formatted_response + elif "error" in response and method in error_formatters: + formatter = error_formatters[method] + formatted_response = assoc( + response, + "error", + formatter(response["error"]), + ) + return formatted_response + else: + return response + + return middleware + + +is_not_null = complement(is_null) + +remap_geth_poa_fields = apply_key_map( + { + "extraData": "proofOfAuthorityData", + } +) + +pythonic_geth_poa = apply_formatters_to_dict( + { + "proofOfAuthorityData": HexBytes, + } +) + +geth_poa_cleanup = compose(pythonic_geth_poa, remap_geth_poa_fields) + + +async def geth_poa_middleware(make_request: Callable[[RPCEndpoint, Any], Any], _: Web3): + return await get_geth_poa_middleware( + make_request=make_request, + request_formatters={}, + result_formatters={ + RPC.eth_getBlockByHash: apply_formatter_if(is_not_null, geth_poa_cleanup), + RPC.eth_getBlockByNumber: apply_formatter_if(is_not_null, geth_poa_cleanup), + }, + error_formatters={}, + ) diff --git a/mev_inspect/inspect_block.py b/mev_inspect/inspect_block.py index db24347..01fb172 100644 --- a/mev_inspect/inspect_block.py +++ b/mev_inspect/inspect_block.py @@ -53,6 +53,9 @@ from mev_inspect.schemas.traces import ClassifiedTrace from mev_inspect.schemas.transfers import Transfer from mev_inspect.swaps import get_swaps from mev_inspect.transfers import get_transfers +from mev_inspect.liquidations import get_liquidations +from mev_inspect.utils import RPCType + logger = logging.getLogger(__name__) @@ -60,6 +63,7 @@ logger = logging.getLogger(__name__) async def inspect_block( inspect_db_session: orm.Session, w3: Web3, + type: RPCType, trace_classifier: TraceClassifier, block_number: int, trace_db_session: Optional[orm.Session], @@ -69,6 +73,7 @@ async def inspect_block( inspect_db_session, w3, trace_classifier, + type, block_number, block_number + 1, trace_db_session, @@ -80,6 +85,7 @@ async def inspect_many_blocks( inspect_db_session: orm.Session, w3: Web3, trace_classifier: TraceClassifier, + type: RPCType, after_block_number: int, before_block_number: int, trace_db_session: Optional[orm.Session], @@ -103,9 +109,10 @@ async def inspect_many_blocks( for block_number in range(after_block_number, before_block_number): block = await create_from_block_number( - w3, - block_number, - trace_db_session, + w3=w3, + block_number=block_number, + trace_db_session=trace_db_session, + type=type ) logger.info(f"Block: {block_number} -- Total traces: {len(block.traces)}") diff --git a/mev_inspect/inspector.py b/mev_inspect/inspector.py index 7878527..9b02034 100644 --- a/mev_inspect/inspector.py +++ b/mev_inspect/inspector.py @@ -13,6 +13,7 @@ from mev_inspect.classifiers.trace import TraceClassifier from mev_inspect.inspect_block import inspect_block, inspect_many_blocks from mev_inspect.methods import get_block_receipts, trace_block from mev_inspect.provider import get_base_provider +from mev_inspect.utils import RPCType logger = logging.getLogger(__name__) @@ -27,11 +28,13 @@ class MEVInspector: def __init__( self, rpc: str, + type: RPCType = RPCType.parity, max_concurrency: int = 1, request_timeout: int = 300, ): - base_provider = get_base_provider(rpc, request_timeout=request_timeout) + base_provider = get_base_provider(rpc, request_timeout, type) self.w3 = Web3(base_provider, modules={"eth": (AsyncEth,)}, middlewares=[]) + self.type = type self.trace_classifier = TraceClassifier() self.max_concurrency = asyncio.Semaphore(max_concurrency) @@ -43,6 +46,7 @@ class MEVInspector: ): return await create_from_block_number( w3=self.w3, + type=self.type, block_number=block_number, trace_db_session=trace_db_session, ) @@ -56,6 +60,7 @@ class MEVInspector: return await inspect_block( inspect_db_session, self.w3, + self.type, self.trace_classifier, block, trace_db_session=trace_db_session, @@ -105,6 +110,7 @@ class MEVInspector: return await inspect_many_blocks( inspect_db_session, self.w3, + self.type, self.trace_classifier, after_block_number, before_block_number, diff --git a/mev_inspect/provider.py b/mev_inspect/provider.py index 1d59b79..890e9bb 100644 --- a/mev_inspect/provider.py +++ b/mev_inspect/provider.py @@ -1,9 +1,19 @@ from web3 import AsyncHTTPProvider, Web3 from mev_inspect.retry import http_retry_with_backoff_request_middleware +from mev_inspect.geth_poa_middleware import geth_poa_middleware +from mev_inspect.utils import RPCType -def get_base_provider(rpc: str, request_timeout: int = 500) -> Web3.AsyncHTTPProvider: +def get_base_provider( + rpc: str, request_timeout: int = 500, type: RPCType = RPCType.parity +) -> Web3.AsyncHTTPProvider: base_provider = AsyncHTTPProvider(rpc, request_kwargs={"timeout": request_timeout}) - base_provider.middlewares += (http_retry_with_backoff_request_middleware,) + if type is RPCType.geth: + base_provider.middlewares += ( + geth_poa_middleware, + http_retry_with_backoff_request_middleware, + ) + else: + base_provider.middlewares += (http_retry_with_backoff_request_middleware,) return base_provider diff --git a/mev_inspect/utils.py b/mev_inspect/utils.py index ec2b5b2..297ecf5 100644 --- a/mev_inspect/utils.py +++ b/mev_inspect/utils.py @@ -1,6 +1,12 @@ +from enum import Enum from hexbytes._utils import hexstr_to_bytes +class RPCType(Enum): + parity = 0 + geth = 1 + + def hex_to_int(value: str) -> int: return int.from_bytes(hexstr_to_bytes(value), byteorder="big") diff --git a/poetry.lock b/poetry.lock index 60c7399..199fdfd 100644 --- a/poetry.lock +++ b/poetry.lock @@ -77,6 +77,14 @@ python-versions = ">=3.6" [package.dependencies] typing-extensions = ">=3.6.5" +[[package]] +name = "asyncio" +version = "3.4.3" +description = "reference implementation of PEP 3156" +category = "main" +optional = false +python-versions = "*" + [[package]] name = "atomicwrites" version = "1.4.0" @@ -1289,6 +1297,12 @@ async-timeout = [ {file = "async-timeout-4.0.0.tar.gz", hash = "sha256:7d87a4e8adba8ededb52e579ce6bc8276985888913620c935094c2276fd83382"}, {file = "async_timeout-4.0.0-py3-none-any.whl", hash = "sha256:f3303dddf6cafa748a92747ab6c2ecf60e0aeca769aee4c151adfce243a05d9b"}, ] +asyncio = [ + {file = "asyncio-3.4.3-cp33-none-win32.whl", hash = "sha256:b62c9157d36187eca799c378e572c969f0da87cd5fc42ca372d92cdb06e7e1de"}, + {file = "asyncio-3.4.3-cp33-none-win_amd64.whl", hash = "sha256:c46a87b48213d7464f22d9a497b9eef8c1928b68320a2fa94240f969f6fec08c"}, + {file = "asyncio-3.4.3-py3-none-any.whl", hash = "sha256:c4d18b22701821de07bd6aea8b53d21449ec0ec5680645e5317062ea21817d2d"}, + {file = "asyncio-3.4.3.tar.gz", hash = "sha256:83360ff8bc97980e4ff25c964c7bd3923d333d177aa4f7fb736b019f26c7cb41"}, +] atomicwrites = [ {file = "atomicwrites-1.4.0-py2.py3-none-any.whl", hash = "sha256:6d1784dea7c0c8d4a5172b6c620f40b6e4cbfdf96d783691f2e1302a7b88e197"}, {file = "atomicwrites-1.4.0.tar.gz", hash = "sha256:ae70396ad1a434f9c7046fd2dd196fc04b12f9e91ffb859164193be8b6168a7a"}, diff --git a/pyproject.toml b/pyproject.toml index a63a0f6..ddf8604 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,6 +16,7 @@ dramatiq = {extras = ["redis"], version = "^1.12.1"} pycoingecko = "^2.2.0" boto3 = "^1.20.48" aiohttp-retry = "^2.4.6" +asyncio = "^3.4.3" [tool.poetry.dev-dependencies] pre-commit = "^2.13.0"