From 75ac0ea618f64701ecffa0ab50aaf5b6b1175862 Mon Sep 17 00:00:00 2001 From: Supragya Raj Date: Mon, 25 Oct 2021 05:56:23 +0200 Subject: [PATCH 1/9] geth additions --- cli.py | 13 ++- mev_inspect/block.py | 170 +++++++++++++++++++++++++++++++---- mev_inspect/inspect_block.py | 2 + poetry.lock | 16 +++- pyproject.toml | 2 + 5 files changed, 185 insertions(+), 18 deletions(-) diff --git a/cli.py b/cli.py index fb8a080..27f61b1 100644 --- a/cli.py +++ b/cli.py @@ -4,6 +4,7 @@ import sys import click from web3 import Web3 +from web3.middleware import geth_poa_middleware from mev_inspect.classifiers.trace import TraceClassifier from mev_inspect.db import get_inspect_session, get_trace_session @@ -26,12 +27,15 @@ def cli(): @click.argument("block_number", type=int) @click.option("--rpc", default=lambda: os.environ.get(RPC_URL_ENV, "")) @click.option("--cache/--no-cache", default=True) -def inspect_block_command(block_number: int, rpc: str, cache: bool): +@click.option("--geth/--no-geth", default=False) +def inspect_block_command(block_number: int, rpc: str, cache: bool, geth: bool): inspect_db_session = get_inspect_session() trace_db_session = get_trace_session() base_provider = get_base_provider(rpc) w3 = Web3(base_provider) + if geth: + w3.middleware_onion.inject(geth_poa_middleware, layer=0) trace_classifier = TraceClassifier() if not cache: @@ -41,6 +45,7 @@ def inspect_block_command(block_number: int, rpc: str, cache: bool): inspect_db_session, base_provider, w3, + geth, trace_classifier, block_number, trace_db_session=trace_db_session, @@ -52,8 +57,9 @@ def inspect_block_command(block_number: int, rpc: str, cache: bool): @click.argument("before_block", type=int) @click.option("--rpc", default=lambda: os.environ.get(RPC_URL_ENV, "")) @click.option("--cache/--no-cache", default=True) +@click.option("--geth/--no-geth", default=False) def inspect_many_blocks_command( - after_block: int, before_block: int, rpc: str, cache: bool + after_block: int, before_block: int, rpc: str, cache: bool, geth: bool ): inspect_db_session = get_inspect_session() @@ -61,6 +67,8 @@ def inspect_many_blocks_command( base_provider = get_base_provider(rpc) w3 = Web3(base_provider) + if geth: + w3.middleware_onion.inject(geth_poa_middleware, layer=0) trace_classifier = TraceClassifier() if not cache: @@ -79,6 +87,7 @@ def inspect_many_blocks_command( inspect_db_session, base_provider, w3, + geth, trace_classifier, block_number, trace_db_session=trace_db_session, diff --git a/mev_inspect/block.py b/mev_inspect/block.py index ab4afe4..29d90b0 100644 --- a/mev_inspect/block.py +++ b/mev_inspect/block.py @@ -1,5 +1,8 @@ from pathlib import Path from typing import List, Optional +import asyncio +import aiohttp +import json from sqlalchemy import orm from web3 import Web3 @@ -19,6 +22,7 @@ def get_latest_block_number(w3: Web3) -> int: def create_from_block_number( base_provider, w3: Web3, + geth: bool, block_number: int, trace_db_session: Optional[orm.Session], ) -> Block: @@ -28,7 +32,7 @@ def create_from_block_number( block = _find_block(trace_db_session, block_number) if block is None: - return _fetch_block(w3, base_provider, block_number) + return _fetch_block(w3, base_provider, geth, block_number) else: return block @@ -36,26 +40,33 @@ def create_from_block_number( def _fetch_block( w3, base_provider, + geth, block_number: int, ) -> Block: block_json = w3.eth.get_block(block_number) - receipts_json = base_provider.make_request("eth_getBlockReceipts", [block_number]) - traces_json = w3.parity.trace_block(block_number) - receipts: List[Receipt] = [ - Receipt(**receipt) for receipt in receipts_json["result"] - ] - traces = [Trace(**trace_json) for trace_json in traces_json] - base_fee_per_gas = fetch_base_fee_per_gas(w3, block_number) + if not geth: + receipts_json = base_provider.make_request("eth_getBlockReceipts", [block_number]) + traces_json = w3.parity.trace_block(block_number) + + receipts: List[Receipt] = [ + Receipt(**receipt) for receipt in receipts_json["result"] + ] + traces = [Trace(**trace_json) for trace_json in traces_json] + base_fee_per_gas = fetch_base_fee_per_gas(w3, block_number) + else: + traces = geth_get_tx_traces_parity_format(base_provider, block_json) + geth_tx_receipts = geth_get_tx_receipts(base_provider, block_json["transactions"]) + receipts = geth_receipts_translator(block_json, geth_tx_receipts) + base_fee_per_gas = 0 return Block( - block_number=block_number, - miner=block_json["miner"], - base_fee_per_gas=base_fee_per_gas, - traces=traces, - receipts=receipts, - ) - + block_number=block_number, + miner=block_json["miner"], + base_fee_per_gas=base_fee_per_gas, + traces=traces, + receipts=receipts, + ) def _find_block( trace_db_session: orm.Session, @@ -164,3 +175,132 @@ def cache_block(cache_path: Path, block: Block): def _get_cache_path(block_number: int) -> Path: cache_directory_path = Path(cache_directory) return cache_directory_path / f"{block_number}.json" + +# Geth specific additions + +def geth_get_tx_traces_parity_format(base_provider, block_json): + block_hash = block_json['hash'] + block_trace = geth_get_tx_traces(base_provider, block_hash) + parity_traces = [] + for idx, trace in enumerate(block_trace['result']): + if 'result' in trace: + parity_traces.extend(unwrap_tx_trace_for_parity(block_json, idx, trace['result'])) + return parity_traces + + +def geth_get_tx_traces(base_provider, block_hash): + block_trace = base_provider.make_request("debug_traceBlockByHash", [block_hash.hex(), {"tracer": "callTracer"}]) + return block_trace + + +def unwrap_tx_trace_for_parity( + block_json, tx_pos_in_block, tx_trace, position=[] +) -> List[Trace]: + response_list = [] + _calltype_mapping = { + "CALL": "call", + "DELEGATECALL": "delegateCall", + "CREATE": "create", + "SUICIDE": "suicide", + "REWARD": "reward", + } + try: + if tx_trace["type"] == "STATICCALL": + return [] + action_dict = dict() + action_dict["callType"] = _calltype_mapping[tx_trace["type"]] + if action_dict["callType"] == "call": + action_dict["value"] = tx_trace["value"] + for key in ["from", "to", "gas", "input"]: + action_dict[key] = tx_trace[key] + + result_dict = dict() + for key in ["gasUsed", "output"]: + result_dict[key] = tx_trace[key] + + response_list.append( + Trace( + action=action_dict, + block_hash = str(block_json['hash']), + block_number=int(block_json["number"]), + result=result_dict, + subtraces=len(tx_trace["calls"]) if "calls" in tx_trace.keys() else 0, + trace_address=position, + transaction_hash=block_json["transactions"][tx_pos_in_block].hex(), + transaction_position=tx_pos_in_block, + type=TraceType(_calltype_mapping[tx_trace["type"]]), + ) + ) + except Exception: + return [] + + if "calls" in tx_trace.keys(): + for idx, subcall in enumerate(tx_trace["calls"]): + response_list.extend( + unwrap_tx_trace_for_parity( + block_json, tx_pos_in_block, subcall, position + [idx] + ) + ) + return response_list + +async def geth_get_tx_receipts_task(session, endpoint_uri, tx): + data = { + "jsonrpc": "2.0", + "id": "0", + "method": "eth_getTransactionReceipt", + "params": [tx.hex()], + } + async with session.post(endpoint_uri, json=data) as response: + if response.status != 200: + response.raise_for_status() + return await response.text() + +async def geth_get_tx_receipts_async(endpoint_uri, transactions): + geth_tx_receipts = [] + async with aiohttp.ClientSession() as session: + tasks = [ + asyncio.create_task(geth_get_tx_receipts_task(session, endpoint_uri, tx)) + for tx in transactions + ] + geth_tx_receipts = await asyncio.gather(*tasks) + return [json.loads(tx_receipts) for tx_receipts in geth_tx_receipts] + +def geth_get_tx_receipts(base_provider, transactions): + return asyncio.run(geth_get_tx_receipts_async(base_provider.endpoint_uri, transactions)) + + +def geth_receipts_translator(block_json, geth_tx_receipts) -> List[Receipt]: + json_decoded_receipts = [ + tx_receipt["result"] + if tx_receipt != None and ("result" in tx_receipt.keys()) + else None + for tx_receipt in geth_tx_receipts + ] + results = [] + for idx, tx_receipt in enumerate(json_decoded_receipts): + if tx_receipt != None: + results.append(unwrap_tx_receipt_for_parity(block_json, idx, tx_receipt)) + return results + +def unwrap_tx_receipt_for_parity(block_json, tx_pos_in_block, tx_receipt) -> Receipt: + try: + if tx_pos_in_block != int(tx_receipt["transactionIndex"], 16): + print( + "Alert the position of transaction in block is mismatched ", + tx_pos_in_block, + tx_receipt["transactionIndex"], + ) + return Receipt( + block_number=block_json["number"], + transaction_hash=tx_receipt["transactionHash"], + transaction_index=tx_pos_in_block, + gas_used=tx_receipt["gasUsed"], + effective_gas_price=tx_receipt["effectiveGasPrice"], + cumulative_gas_used=tx_receipt["cumulativeGasUsed"], + to=tx_receipt["to"], + ) + + except Exception as e: + print("error while decoding receipt", tx_receipt, e) + + return Receipt() \ No newline at end of file diff --git a/mev_inspect/inspect_block.py b/mev_inspect/inspect_block.py index d3ab75b..67750ff 100644 --- a/mev_inspect/inspect_block.py +++ b/mev_inspect/inspect_block.py @@ -39,6 +39,7 @@ def inspect_block( inspect_db_session: orm.Session, base_provider, w3: Web3, + geth: bool, trace_clasifier: TraceClassifier, block_number: int, trace_db_session: Optional[orm.Session], @@ -47,6 +48,7 @@ def inspect_block( block = create_from_block_number( base_provider, w3, + geth, block_number, trace_db_session, ) diff --git a/poetry.lock b/poetry.lock index ccd2565..3ee7eab 100644 --- a/poetry.lock +++ b/poetry.lock @@ -51,6 +51,14 @@ category = "main" optional = false python-versions = ">=3.5.3" +[[package]] +name = "asyncio" +version = "3.4.3" +description = "reference implementation of PEP 3156" +category = "main" +optional = false +python-versions = "*" + [[package]] name = "atomicwrites" version = "1.4.0" @@ -1017,7 +1025,7 @@ multidict = ">=4.0" [metadata] lock-version = "1.1" python-versions = "^3.9" -content-hash = "baade6f62f3adaff192b2c85b4f602f4990b9b99d6fcce904aeb5087b6fa1921" +content-hash = "c9435e8660dcaddeb63b19f26dddb70287b0f3b4e43ca4ad6168d5f919f0089d" [metadata.files] aiohttp = [ @@ -1071,6 +1079,12 @@ async-timeout = [ {file = "async-timeout-3.0.1.tar.gz", hash = "sha256:0c3c816a028d47f659d6ff5c745cb2acf1f966da1fe5c19c77a70282b25f4c5f"}, {file = "async_timeout-3.0.1-py3-none-any.whl", hash = "sha256:4291ca197d287d274d0b6cb5d6f8f8f82d434ed288f962539ff18cc9012f9ea3"}, ] +asyncio = [ + {file = "asyncio-3.4.3-cp33-none-win32.whl", hash = "sha256:b62c9157d36187eca799c378e572c969f0da87cd5fc42ca372d92cdb06e7e1de"}, + {file = "asyncio-3.4.3-cp33-none-win_amd64.whl", hash = "sha256:c46a87b48213d7464f22d9a497b9eef8c1928b68320a2fa94240f969f6fec08c"}, + {file = "asyncio-3.4.3-py3-none-any.whl", hash = "sha256:c4d18b22701821de07bd6aea8b53d21449ec0ec5680645e5317062ea21817d2d"}, + {file = "asyncio-3.4.3.tar.gz", hash = "sha256:83360ff8bc97980e4ff25c964c7bd3923d333d177aa4f7fb736b019f26c7cb41"}, +] atomicwrites = [ {file = "atomicwrites-1.4.0-py2.py3-none-any.whl", hash = "sha256:6d1784dea7c0c8d4a5172b6c620f40b6e4cbfdf96d783691f2e1302a7b88e197"}, {file = "atomicwrites-1.4.0.tar.gz", hash = "sha256:ae70396ad1a434f9c7046fd2dd196fc04b12f9e91ffb859164193be8b6168a7a"}, diff --git a/pyproject.toml b/pyproject.toml index 797a99c..9fe58e3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,6 +11,8 @@ pydantic = "^1.8.2" hexbytes = "^0.2.1" click = "^8.0.1" psycopg2 = "^2.9.1" +aiohttp = "^3.7.4" +asyncio = "^3.4.3" [tool.poetry.dev-dependencies] pre-commit = "^2.13.0" From 2f1d8262aed8a0c1b43bb650571b69af58701837 Mon Sep 17 00:00:00 2001 From: Supragya Raj Date: Mon, 25 Oct 2021 14:39:24 +0200 Subject: [PATCH 2/9] Readme updates --- README.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index bef5711..1b97e2d 100644 --- a/README.md +++ b/README.md @@ -41,7 +41,7 @@ Example: export RPC_URL="http://111.111.111.111:8546" ``` -**Note**: mev-inspect-py currently requires an RPC of a full archive node with support for Erigon traces and receipts (not geth 😔). +**Note**: mev-inspect-py currently requires an RPC of a full archive node with support for Erigon traces and receipts. Geth additions have been added to translate geth traces and receipts to Erigon ones and can be accessed using `--geth` flag. Next, start all services with: @@ -62,6 +62,7 @@ kubectl exec deploy/mev-inspect -- alembic upgrade head ### Inspect a single block Inspecting block [12914944](https://twitter.com/mevalphaleak/status/1420416437575901185): +**Note**: Add `--geth` at the end if RPC_URL points to a geth / geth like node. ``` kubectl exec deploy/mev-inspect -- poetry run inspect-block 12914944 @@ -70,6 +71,7 @@ kubectl exec deploy/mev-inspect -- poetry run inspect-block 12914944 ### Inspect many blocks Inspecting blocks 12914944 to 12914954: +**Note**: Add `--geth` at the end if RPC_URL points to a geth / geth like node. ``` kubectl exec deploy/mev-inspect -- poetry run inspect-many-blocks 12914944 12914954 From c8413130ec713afaf8f1c3f21e37722d464c6f29 Mon Sep 17 00:00:00 2001 From: Supragya Raj Date: Wed, 27 Oct 2021 15:47:49 +0200 Subject: [PATCH 3/9] fixed lint issue --- mev_inspect/block.py | 51 +++++++++++++++++++++++++++++--------------- 1 file changed, 34 insertions(+), 17 deletions(-) diff --git a/mev_inspect/block.py b/mev_inspect/block.py index 29d90b0..a46e72a 100644 --- a/mev_inspect/block.py +++ b/mev_inspect/block.py @@ -1,8 +1,8 @@ from pathlib import Path from typing import List, Optional +import json import asyncio import aiohttp -import json from sqlalchemy import orm from web3 import Web3 @@ -46,7 +46,9 @@ def _fetch_block( block_json = w3.eth.get_block(block_number) if not geth: - receipts_json = base_provider.make_request("eth_getBlockReceipts", [block_number]) + receipts_json = base_provider.make_request( + "eth_getBlockReceipts", [block_number] + ) traces_json = w3.parity.trace_block(block_number) receipts: List[Receipt] = [ @@ -56,17 +58,20 @@ def _fetch_block( base_fee_per_gas = fetch_base_fee_per_gas(w3, block_number) else: traces = geth_get_tx_traces_parity_format(base_provider, block_json) - geth_tx_receipts = geth_get_tx_receipts(base_provider, block_json["transactions"]) + geth_tx_receipts = geth_get_tx_receipts( + base_provider, block_json["transactions"] + ) receipts = geth_receipts_translator(block_json, geth_tx_receipts) base_fee_per_gas = 0 return Block( - block_number=block_number, - miner=block_json["miner"], - base_fee_per_gas=base_fee_per_gas, - traces=traces, - receipts=receipts, - ) + block_number=block_number, + miner=block_json["miner"], + base_fee_per_gas=base_fee_per_gas, + traces=traces, + receipts=receipts, + ) + def _find_block( trace_db_session: orm.Session, @@ -176,20 +181,26 @@ def _get_cache_path(block_number: int) -> Path: cache_directory_path = Path(cache_directory) return cache_directory_path / f"{block_number}.json" + # Geth specific additions + def geth_get_tx_traces_parity_format(base_provider, block_json): - block_hash = block_json['hash'] + block_hash = block_json["hash"] block_trace = geth_get_tx_traces(base_provider, block_hash) parity_traces = [] - for idx, trace in enumerate(block_trace['result']): - if 'result' in trace: - parity_traces.extend(unwrap_tx_trace_for_parity(block_json, idx, trace['result'])) + for idx, trace in enumerate(block_trace["result"]): + if "result" in trace: + parity_traces.extend( + unwrap_tx_trace_for_parity(block_json, idx, trace["result"]) + ) return parity_traces def geth_get_tx_traces(base_provider, block_hash): - block_trace = base_provider.make_request("debug_traceBlockByHash", [block_hash.hex(), {"tracer": "callTracer"}]) + block_trace = base_provider.make_request( + "debug_traceBlockByHash", [block_hash.hex(), {"tracer": "callTracer"}] + ) return block_trace @@ -221,7 +232,7 @@ def unwrap_tx_trace_for_parity( response_list.append( Trace( action=action_dict, - block_hash = str(block_json['hash']), + block_hash=str(block_json["hash"]), block_number=int(block_json["number"]), result=result_dict, subtraces=len(tx_trace["calls"]) if "calls" in tx_trace.keys() else 0, @@ -243,6 +254,7 @@ def unwrap_tx_trace_for_parity( ) return response_list + async def geth_get_tx_receipts_task(session, endpoint_uri, tx): data = { "jsonrpc": "2.0", @@ -255,6 +267,7 @@ async def geth_get_tx_receipts_task(session, endpoint_uri, tx): response.raise_for_status() return await response.text() + async def geth_get_tx_receipts_async(endpoint_uri, transactions): geth_tx_receipts = [] async with aiohttp.ClientSession() as session: @@ -265,8 +278,11 @@ async def geth_get_tx_receipts_async(endpoint_uri, transactions): geth_tx_receipts = await asyncio.gather(*tasks) return [json.loads(tx_receipts) for tx_receipts in geth_tx_receipts] + def geth_get_tx_receipts(base_provider, transactions): - return asyncio.run(geth_get_tx_receipts_async(base_provider.endpoint_uri, transactions)) + return asyncio.run( + geth_get_tx_receipts_async(base_provider.endpoint_uri, transactions) + ) def geth_receipts_translator(block_json, geth_tx_receipts) -> List[Receipt]: @@ -282,6 +298,7 @@ def geth_receipts_translator(block_json, geth_tx_receipts) -> List[Receipt]: results.append(unwrap_tx_receipt_for_parity(block_json, idx, tx_receipt)) return results + def unwrap_tx_receipt_for_parity(block_json, tx_pos_in_block, tx_receipt) -> Receipt: try: if tx_pos_in_block != int(tx_receipt["transactionIndex"], 16): @@ -303,4 +320,4 @@ def unwrap_tx_receipt_for_parity(block_json, tx_pos_in_block, tx_receipt) -> Rec except Exception as e: print("error while decoding receipt", tx_receipt, e) - return Receipt() \ No newline at end of file + return Receipt() From 8504ac5ccaec2b67a60272d9fa4866c5894fef54 Mon Sep 17 00:00:00 2001 From: Supragya Raj Date: Thu, 25 Nov 2021 10:46:00 +0100 Subject: [PATCH 4/9] updates for latest master --- cli.py | 11 +++++------ mev_inspect/block.py | 32 +++++++++++++++++++++++--------- mev_inspect/inspect_block.py | 2 +- mev_inspect/inspector.py | 17 ++++++++++++++++- 4 files changed, 45 insertions(+), 17 deletions(-) diff --git a/cli.py b/cli.py index 4206570..86f9594 100644 --- a/cli.py +++ b/cli.py @@ -3,8 +3,6 @@ import os import sys import click -from web3 import Web3 -from web3.middleware import geth_poa_middleware from mev_inspect.concurrency import coro from mev_inspect.db import get_inspect_session, get_trace_session @@ -26,6 +24,7 @@ def cli(): @click.option("--geth/--no-geth", default=False) @coro async def inspect_block_command(block_number: int, rpc: str, geth: bool): + print("geth", geth) inspect_db_session = get_inspect_session() trace_db_session = get_trace_session() @@ -41,7 +40,7 @@ async def fetch_block_command(block_number: int, rpc: str): inspect_db_session = get_inspect_session() trace_db_session = get_trace_session() - inspector = MEVInspector(rpc, inspect_db_session, trace_db_session, false) + inspector = MEVInspector(rpc, inspect_db_session, trace_db_session, False) block = await inspector.create_from_block(block_number=block_number) print(block.json()) @@ -51,7 +50,6 @@ async def fetch_block_command(block_number: int, rpc: str): @click.argument("before_block", type=int) @click.option("--rpc", default=lambda: os.environ.get(RPC_URL_ENV, "")) @click.option("--geth/--no-geth", default=False) - @click.option( "--max-concurrency", type=int, @@ -68,7 +66,7 @@ async def inspect_many_blocks_command( rpc: str, max_concurrency: int, request_timeout: int, - geth: bool + geth: bool, ): inspect_db_session = get_inspect_session() trace_db_session = get_trace_session() @@ -76,14 +74,15 @@ async def inspect_many_blocks_command( rpc, inspect_db_session, trace_db_session, + geth, max_concurrency=max_concurrency, request_timeout=request_timeout, - geth ) await inspector.inspect_many_blocks( after_block=after_block, before_block=before_block ) + def get_rpc_url() -> str: return os.environ["RPC_URL"] diff --git a/mev_inspect/block.py b/mev_inspect/block.py index 592600c..b207187 100644 --- a/mev_inspect/block.py +++ b/mev_inspect/block.py @@ -2,7 +2,6 @@ import asyncio import logging from typing import List, Optional import json -import asyncio import aiohttp from sqlalchemy import orm @@ -40,13 +39,14 @@ async def create_from_block_number( block = _find_block(trace_db_session, block_number) if block is None: - block = await _fetch_block(w3, base_provider, block_number) - return block - else: + block = await _fetch_block(w3, base_provider, geth, block_number) return block + return block -async def _fetch_block(w3, base_provider, geth, block_number: int, retries: int = 0) -> Block: +async def _fetch_block( + w3, base_provider, geth: bool, block_number: int, retries: int = 0 +) -> Block: if not geth: block_json, receipts_json, traces_json, base_fee_per_gas = await asyncio.gather( w3.eth.get_block(block_number), @@ -60,23 +60,35 @@ async def _fetch_block(w3, base_provider, geth, block_number: int, retries: int Receipt(**receipt) for receipt in receipts_json["result"] ] traces = [Trace(**trace_json) for trace_json in traces_json["result"]] + return Block( + block_number=block_number, + block_timestamp=block_json["timestamp"], + miner=block_json["miner"], + base_fee_per_gas=base_fee_per_gas, + traces=traces, + receipts=receipts, + ) except KeyError as e: logger.warning( f"Failed to create objects from block: {block_number}: {e}, retrying: {retries + 1} / 3" ) if retries < 3: await asyncio.sleep(5) - return await _fetch_block(w3, base_provider, block_number, retries) + return await _fetch_block( + w3, base_provider, geth, block_number, retries + ) else: raise else: + block_json = await asyncio.gather(w3.eth.get_block(block_number)) + print(block_json) traces = geth_get_tx_traces_parity_format(base_provider, block_json) geth_tx_receipts = geth_get_tx_receipts( base_provider, block_json["transactions"] ) receipts = geth_receipts_translator(block_json, geth_tx_receipts) base_fee_per_gas = 0 - + return Block( block_number=block_number, block_timestamp=block_json["timestamp"], @@ -85,7 +97,7 @@ async def _fetch_block(w3, base_provider, geth, block_number: int, retries: int traces=traces, receipts=receipts, ) - + def _find_block( trace_db_session: orm.Session, @@ -118,6 +130,7 @@ def _find_block( receipts=receipts, ) + def _find_block_timestamp( trace_db_session: orm.Session, block_number: int, @@ -203,10 +216,11 @@ def get_transaction_hashes(calls: List[Trace]) -> List[str]: return result + # Geth specific additions -def geth_get_tx_traces_parity_format(base_provider, block_json): +def geth_get_tx_traces_parity_format(base_provider, block_json: dict): block_hash = block_json["hash"] block_trace = geth_get_tx_traces(base_provider, block_hash) parity_traces = [] diff --git a/mev_inspect/inspect_block.py b/mev_inspect/inspect_block.py index 572394a..53ad8a4 100644 --- a/mev_inspect/inspect_block.py +++ b/mev_inspect/inspect_block.py @@ -44,7 +44,7 @@ async def inspect_block( base_provider, w3: Web3, geth: bool, - trace_clasifier: TraceClassifier, + trace_classifier: TraceClassifier, block_number: int, trace_db_session: Optional[orm.Session], should_write_classified_traces: bool = True, diff --git a/mev_inspect/inspector.py b/mev_inspect/inspector.py index 4fb8160..0fe97f6 100644 --- a/mev_inspect/inspector.py +++ b/mev_inspect/inspector.py @@ -7,6 +7,7 @@ from typing import Optional from sqlalchemy import orm from web3 import Web3 from web3.eth import AsyncEth +from web3.middleware import geth_poa_middleware from mev_inspect.block import create_from_block_number from mev_inspect.classifiers.trace import TraceClassifier @@ -22,13 +23,24 @@ class MEVInspector: rpc: str, inspect_db_session: orm.Session, trace_db_session: Optional[orm.Session], + geth: bool = False, max_concurrency: int = 1, request_timeout: int = 300, ): self.inspect_db_session = inspect_db_session self.trace_db_session = trace_db_session self.base_provider = get_base_provider(rpc, request_timeout=request_timeout) - self.w3 = Web3(self.base_provider, modules={"eth": (AsyncEth,)}, middlewares=[]) + self.geth = geth + if geth: + self.w3 = Web3( + self.base_provider, + modules={"eth": (AsyncEth,)}, + middlewares=[geth_poa_middleware], + ) + else: + self.w3 = Web3( + self.base_provider, modules={"eth": (AsyncEth,)}, middlewares=[] + ) self.trace_classifier = TraceClassifier() self.max_concurrency = asyncio.Semaphore(max_concurrency) @@ -36,6 +48,7 @@ class MEVInspector: return await create_from_block_number( base_provider=self.base_provider, w3=self.w3, + geth=self.geth, block_number=block_number, trace_db_session=self.trace_db_session, ) @@ -45,6 +58,7 @@ class MEVInspector: self.inspect_db_session, self.base_provider, self.w3, + self.geth, self.trace_classifier, block, trace_db_session=self.trace_db_session, @@ -73,6 +87,7 @@ class MEVInspector: self.inspect_db_session, self.base_provider, self.w3, + self.geth, self.trace_classifier, block_number, trace_db_session=self.trace_db_session, From 0895a0f1cdac4c288e319a2ae9804cb1e2bca862 Mon Sep 17 00:00:00 2001 From: Supragya Raj Date: Thu, 25 Nov 2021 10:46:54 +0100 Subject: [PATCH 5/9] await getblock --- mev_inspect/block.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mev_inspect/block.py b/mev_inspect/block.py index b207187..df156df 100644 --- a/mev_inspect/block.py +++ b/mev_inspect/block.py @@ -80,7 +80,7 @@ async def _fetch_block( else: raise else: - block_json = await asyncio.gather(w3.eth.get_block(block_number)) + block_json = await w3.eth.get_block(block_number) print(block_json) traces = geth_get_tx_traces_parity_format(base_provider, block_json) geth_tx_receipts = geth_get_tx_receipts( From d1a1a531015b1aad98761ebc169b5b53450200d8 Mon Sep 17 00:00:00 2001 From: Supragya Raj Date: Thu, 2 Dec 2021 15:09:13 +0100 Subject: [PATCH 6/9] async middleware - geth poa --- mev_inspect/block.py | 32 +++++----- mev_inspect/geth_poa_middleware.py | 99 ++++++++++++++++++++++++++++++ mev_inspect/inspector.py | 24 ++++---- mev_inspect/provider.py | 13 +++- 4 files changed, 136 insertions(+), 32 deletions(-) create mode 100644 mev_inspect/geth_poa_middleware.py diff --git a/mev_inspect/block.py b/mev_inspect/block.py index df156df..dcd3554 100644 --- a/mev_inspect/block.py +++ b/mev_inspect/block.py @@ -80,19 +80,19 @@ async def _fetch_block( else: raise else: - block_json = await w3.eth.get_block(block_number) - print(block_json) - traces = geth_get_tx_traces_parity_format(base_provider, block_json) - geth_tx_receipts = geth_get_tx_receipts( - base_provider, block_json["transactions"] + # print(block_number) + block_json = await asyncio.gather(w3.eth.get_block(block_number)) + traces = await geth_get_tx_traces_parity_format(base_provider, block_json[0]) + geth_tx_receipts = await geth_get_tx_receipts_async( + base_provider.endpoint_uri, block_json[0]["transactions"] ) - receipts = geth_receipts_translator(block_json, geth_tx_receipts) + receipts = geth_receipts_translator(block_json[0], geth_tx_receipts) base_fee_per_gas = 0 return Block( block_number=block_number, - block_timestamp=block_json["timestamp"], - miner=block_json["miner"], + block_timestamp=block_json[0]["timestamp"], + miner=block_json[0]["miner"], base_fee_per_gas=base_fee_per_gas, traces=traces, receipts=receipts, @@ -220,9 +220,11 @@ def get_transaction_hashes(calls: List[Trace]) -> List[str]: # Geth specific additions -def geth_get_tx_traces_parity_format(base_provider, block_json: dict): +async def geth_get_tx_traces_parity_format(base_provider, block_json: dict): + # print(block_json['hash'].hex()) block_hash = block_json["hash"] - block_trace = geth_get_tx_traces(base_provider, block_hash) + block_trace = await geth_get_tx_traces(base_provider, block_hash) + # print(block_trace) parity_traces = [] for idx, trace in enumerate(block_trace["result"]): if "result" in trace: @@ -232,8 +234,8 @@ def geth_get_tx_traces_parity_format(base_provider, block_json: dict): return parity_traces -def geth_get_tx_traces(base_provider, block_hash): - block_trace = base_provider.make_request( +async def geth_get_tx_traces(base_provider, block_hash): + block_trace = await base_provider.make_request( "debug_traceBlockByHash", [block_hash.hex(), {"tracer": "callTracer"}] ) return block_trace @@ -314,12 +316,6 @@ async def geth_get_tx_receipts_async(endpoint_uri, transactions): return [json.loads(tx_receipts) for tx_receipts in geth_tx_receipts] -def geth_get_tx_receipts(base_provider, transactions): - return asyncio.run( - geth_get_tx_receipts_async(base_provider.endpoint_uri, transactions) - ) - - def geth_receipts_translator(block_json, geth_tx_receipts) -> List[Receipt]: json_decoded_receipts = [ tx_receipt["result"] diff --git a/mev_inspect/geth_poa_middleware.py b/mev_inspect/geth_poa_middleware.py new file mode 100644 index 0000000..712ac95 --- /dev/null +++ b/mev_inspect/geth_poa_middleware.py @@ -0,0 +1,99 @@ +from typing import ( + Any, + Callable, +) + +from hexbytes import ( + HexBytes, +) + +from eth_utils.curried import ( + apply_formatter_if, + apply_formatters_to_dict, + apply_key_map, + is_null, +) +from eth_utils.toolz import ( + complement, + compose, + assoc, +) + +from web3._utils.rpc_abi import ( + RPC, +) + +from web3.types import ( + Formatters, + RPCEndpoint, + RPCResponse, +) + +from web3 import Web3 # noqa: F401 + + +async def get_geth_poa_middleware( + make_request: Callable[[RPCEndpoint, Any], RPCResponse], + request_formatters: Formatters = {}, + result_formatters: Formatters = {}, + error_formatters: Formatters = {}, +) -> RPCResponse: + async def middleware(method: RPCEndpoint, params: Any) -> RPCResponse: + if method in request_formatters: + formatter = request_formatters[method] + formatted_params = formatter(params) + response = await make_request(method, formatted_params) + else: + response = await make_request(method, params) + + if "result" in response and method in result_formatters: + formatter = result_formatters[method] + formatted_response = assoc( + response, + "result", + formatter(response["result"]), + ) + return formatted_response + elif "error" in response and method in error_formatters: + formatter = error_formatters[method] + formatted_response = assoc( + response, + "error", + formatter(response["error"]), + ) + return formatted_response + else: + return response + + return middleware + + +is_not_null = complement(is_null) + +remap_geth_poa_fields = apply_key_map( + { + "extraData": "proofOfAuthorityData", + } +) + +pythonic_geth_poa = apply_formatters_to_dict( + { + "proofOfAuthorityData": HexBytes, + } +) + +geth_poa_cleanup = compose(pythonic_geth_poa, remap_geth_poa_fields) + + +async def geth_poa_middleware( + make_request: Callable[[RPCEndpoint, Any], Any], *_: Web3 +): + return await get_geth_poa_middleware( + make_request=make_request, + request_formatters={}, + result_formatters={ + RPC.eth_getBlockByHash: apply_formatter_if(is_not_null, geth_poa_cleanup), + RPC.eth_getBlockByNumber: apply_formatter_if(is_not_null, geth_poa_cleanup), + }, + error_formatters={}, + ) diff --git a/mev_inspect/inspector.py b/mev_inspect/inspector.py index 0fe97f6..51ea585 100644 --- a/mev_inspect/inspector.py +++ b/mev_inspect/inspector.py @@ -7,7 +7,6 @@ from typing import Optional from sqlalchemy import orm from web3 import Web3 from web3.eth import AsyncEth -from web3.middleware import geth_poa_middleware from mev_inspect.block import create_from_block_number from mev_inspect.classifiers.trace import TraceClassifier @@ -29,18 +28,19 @@ class MEVInspector: ): self.inspect_db_session = inspect_db_session self.trace_db_session = trace_db_session - self.base_provider = get_base_provider(rpc, request_timeout=request_timeout) + self.base_provider = get_base_provider(rpc, request_timeout, geth) self.geth = geth - if geth: - self.w3 = Web3( - self.base_provider, - modules={"eth": (AsyncEth,)}, - middlewares=[geth_poa_middleware], - ) - else: - self.w3 = Web3( - self.base_provider, modules={"eth": (AsyncEth,)}, middlewares=[] - ) + self.w3 = Web3(self.base_provider, modules={"eth": (AsyncEth,)}, middlewares=[]) + # if geth: + # self.w3 = Web3( + # self.base_provider, + # modules={"eth": (AsyncEth,)}, + # middlewares=[], + # ) + # else: + # self.w3 = Web3( + # self.base_provider, modules={"eth": (AsyncEth,)}, middlewares=[] + # ) self.trace_classifier = TraceClassifier() self.max_concurrency = asyncio.Semaphore(max_concurrency) diff --git a/mev_inspect/provider.py b/mev_inspect/provider.py index 3b930ea..9fb20eb 100644 --- a/mev_inspect/provider.py +++ b/mev_inspect/provider.py @@ -1,9 +1,18 @@ from web3 import Web3, AsyncHTTPProvider from mev_inspect.retry import http_retry_with_backoff_request_middleware +from mev_inspect.geth_poa_middleware import geth_poa_middleware -def get_base_provider(rpc: str, request_timeout: int = 500) -> Web3.AsyncHTTPProvider: +def get_base_provider( + rpc: str, request_timeout: int = 500, geth: bool = False +) -> Web3.AsyncHTTPProvider: base_provider = AsyncHTTPProvider(rpc, request_kwargs={"timeout": request_timeout}) - base_provider.middlewares += (http_retry_with_backoff_request_middleware,) + if geth: + base_provider.middlewares += ( + geth_poa_middleware, + http_retry_with_backoff_request_middleware, + ) + else: + base_provider.middlewares += (http_retry_with_backoff_request_middleware,) return base_provider From f705bb9f2b1c5453d97e6f4172a00fdbb4cd300b Mon Sep 17 00:00:00 2001 From: Supragya Raj Date: Thu, 2 Dec 2021 15:09:46 +0100 Subject: [PATCH 7/9] async middleware - geth poa --- mev_inspect/geth_poa_middleware.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/mev_inspect/geth_poa_middleware.py b/mev_inspect/geth_poa_middleware.py index 712ac95..d5ae8c2 100644 --- a/mev_inspect/geth_poa_middleware.py +++ b/mev_inspect/geth_poa_middleware.py @@ -85,9 +85,7 @@ pythonic_geth_poa = apply_formatters_to_dict( geth_poa_cleanup = compose(pythonic_geth_poa, remap_geth_poa_fields) -async def geth_poa_middleware( - make_request: Callable[[RPCEndpoint, Any], Any], *_: Web3 -): +async def geth_poa_middleware(make_request: Callable[[RPCEndpoint, Any], Any], _: Web3): return await get_geth_poa_middleware( make_request=make_request, request_formatters={}, From b31f5d750e033529db1579551a9904a7dcf6ec4b Mon Sep 17 00:00:00 2001 From: Supragya Raj Date: Thu, 9 Dec 2021 13:17:34 +0100 Subject: [PATCH 8/9] added RPCType, fixes --- cli.py | 34 ++++-- mev_inspect/block.py | 184 +++++++++++++++-------------- mev_inspect/geth_poa_middleware.py | 4 + mev_inspect/inspect_block.py | 5 +- mev_inspect/inspector.py | 23 ++-- mev_inspect/provider.py | 5 +- mev_inspect/utils.py | 6 + 7 files changed, 142 insertions(+), 119 deletions(-) diff --git a/cli.py b/cli.py index 86f9594..7bea3f5 100644 --- a/cli.py +++ b/cli.py @@ -7,6 +7,7 @@ import click from mev_inspect.concurrency import coro from mev_inspect.db import get_inspect_session, get_trace_session from mev_inspect.inspector import MEVInspector +from mev_inspect.utils import RPCType RPC_URL_ENV = "RPC_URL" @@ -21,17 +22,29 @@ def cli(): @cli.command() @click.argument("block_number", type=int) @click.option("--rpc", default=lambda: os.environ.get(RPC_URL_ENV, "")) -@click.option("--geth/--no-geth", default=False) +@click.option( + "--type", + type=click.Choice(list(map(lambda x: x.name, RPCType)), case_sensitive=False), + default=RPCType.parity.name, +) @coro -async def inspect_block_command(block_number: int, rpc: str, geth: bool): - print("geth", geth) +async def inspect_block_command(block_number: int, rpc: str, type: str): + type_e = convert_str_to_enum(type) inspect_db_session = get_inspect_session() trace_db_session = get_trace_session() - inspector = MEVInspector(rpc, inspect_db_session, trace_db_session, geth) + inspector = MEVInspector(rpc, inspect_db_session, trace_db_session, type_e) await inspector.inspect_single_block(block=block_number) +def convert_str_to_enum(type: str) -> RPCType: + if type == "parity": + return RPCType.parity + elif type == "geth": + return RPCType.geth + raise ValueError + + @cli.command() @click.argument("block_number", type=int) @click.option("--rpc", default=lambda: os.environ.get(RPC_URL_ENV, "")) @@ -40,7 +53,7 @@ async def fetch_block_command(block_number: int, rpc: str): inspect_db_session = get_inspect_session() trace_db_session = get_trace_session() - inspector = MEVInspector(rpc, inspect_db_session, trace_db_session, False) + inspector = MEVInspector(rpc, inspect_db_session, trace_db_session, RPCType.parity) block = await inspector.create_from_block(block_number=block_number) print(block.json()) @@ -49,7 +62,11 @@ async def fetch_block_command(block_number: int, rpc: str): @click.argument("after_block", type=int) @click.argument("before_block", type=int) @click.option("--rpc", default=lambda: os.environ.get(RPC_URL_ENV, "")) -@click.option("--geth/--no-geth", default=False) +@click.option( + "--type", + type=click.Choice(list(map(lambda x: x.name, RPCType)), case_sensitive=False), + default=RPCType.parity.name, +) @click.option( "--max-concurrency", type=int, @@ -66,15 +83,16 @@ async def inspect_many_blocks_command( rpc: str, max_concurrency: int, request_timeout: int, - geth: bool, + type: str, ): + type_e = convert_str_to_enum(type) inspect_db_session = get_inspect_session() trace_db_session = get_trace_session() inspector = MEVInspector( rpc, inspect_db_session, trace_db_session, - geth, + type_e, max_concurrency=max_concurrency, request_timeout=request_timeout, ) diff --git a/mev_inspect/block.py b/mev_inspect/block.py index dcd3554..5f6a3fa 100644 --- a/mev_inspect/block.py +++ b/mev_inspect/block.py @@ -1,8 +1,6 @@ import asyncio import logging from typing import List, Optional -import json -import aiohttp from sqlalchemy import orm from web3 import Web3 @@ -11,10 +9,17 @@ from mev_inspect.fees import fetch_base_fee_per_gas from mev_inspect.schemas.blocks import Block from mev_inspect.schemas.receipts import Receipt from mev_inspect.schemas.traces import Trace, TraceType -from mev_inspect.utils import hex_to_int +from mev_inspect.utils import RPCType, hex_to_int logger = logging.getLogger(__name__) +_calltype_mapping = { + "CALL": "call", + "DELEGATECALL": "delegateCall", + "CREATE": "create", + "SUICIDE": "suicide", + "REWARD": "reward", +} async def get_latest_block_number(base_provider) -> int: @@ -29,7 +34,7 @@ async def get_latest_block_number(base_provider) -> int: async def create_from_block_number( base_provider, w3: Web3, - geth: bool, + type: RPCType, block_number: int, trace_db_session: Optional[orm.Session], ) -> Block: @@ -39,55 +44,63 @@ async def create_from_block_number( block = _find_block(trace_db_session, block_number) if block is None: - block = await _fetch_block(w3, base_provider, geth, block_number) - return block + if type is RPCType.parity: + block = await _fetch_block_parity(w3, base_provider, block_number) + elif type is RPCType.geth: + block = await _fetch_block_geth(w3, base_provider, block_number) + else: + logger.error(f"RPCType not known - {type}") + raise ValueError return block -async def _fetch_block( - w3, base_provider, geth: bool, block_number: int, retries: int = 0 +async def _fetch_block_parity( + w3, base_provider, block_number: int, retries: int = 0 ) -> Block: - if not geth: - block_json, receipts_json, traces_json, base_fee_per_gas = await asyncio.gather( - w3.eth.get_block(block_number), - base_provider.make_request("eth_getBlockReceipts", [block_number]), - base_provider.make_request("trace_block", [block_number]), - fetch_base_fee_per_gas(w3, block_number), - ) + block_json, receipts_json, traces_json, base_fee_per_gas = await asyncio.gather( + w3.eth.get_block(block_number), + base_provider.make_request("eth_getBlockReceipts", [block_number]), + base_provider.make_request("trace_block", [block_number]), + fetch_base_fee_per_gas(w3, block_number), + ) - try: - receipts: List[Receipt] = [ - Receipt(**receipt) for receipt in receipts_json["result"] - ] - traces = [Trace(**trace_json) for trace_json in traces_json["result"]] - return Block( - block_number=block_number, - block_timestamp=block_json["timestamp"], - miner=block_json["miner"], - base_fee_per_gas=base_fee_per_gas, - traces=traces, - receipts=receipts, - ) - except KeyError as e: - logger.warning( - f"Failed to create objects from block: {block_number}: {e}, retrying: {retries + 1} / 3" - ) - if retries < 3: - await asyncio.sleep(5) - return await _fetch_block( - w3, base_provider, geth, block_number, retries - ) - else: - raise - else: - # print(block_number) - block_json = await asyncio.gather(w3.eth.get_block(block_number)) + try: + receipts: List[Receipt] = [ + Receipt(**receipt) for receipt in receipts_json["result"] + ] + traces = [Trace(**trace_json) for trace_json in traces_json["result"]] + return Block( + block_number=block_number, + block_timestamp=block_json["timestamp"], + miner=block_json["miner"], + base_fee_per_gas=base_fee_per_gas, + traces=traces, + receipts=receipts, + ) + except KeyError as e: + logger.warning( + f"Failed to create objects from block: {block_number}: {e}, retrying: {retries + 1} / 3" + ) + if retries < 3: + await asyncio.sleep(5) + return await _fetch_block_parity(w3, base_provider, block_number, retries) + else: + raise + + +async def _fetch_block_geth( + w3, base_provider, block_number: int, retries: int = 0 +) -> Block: + block_json = await asyncio.gather(w3.eth.get_block(block_number)) + + try: + # Separate calls to help with load during block tracing traces = await geth_get_tx_traces_parity_format(base_provider, block_json[0]) geth_tx_receipts = await geth_get_tx_receipts_async( - base_provider.endpoint_uri, block_json[0]["transactions"] + base_provider, block_json[0]["transactions"] ) receipts = geth_receipts_translator(block_json[0], geth_tx_receipts) - base_fee_per_gas = 0 + base_fee_per_gas = 0 # Polygon specific, TODO for other chains return Block( block_number=block_number, @@ -97,6 +110,15 @@ async def _fetch_block( traces=traces, receipts=receipts, ) + except KeyError as e: + logger.warning( + f"Failed to create objects from block: {block_number}: {e}, retrying: {retries + 1} / 3" + ) + if retries < 3: + await asyncio.sleep(5) + return await _fetch_block_geth(w3, base_provider, block_number, retries) + else: + raise def _find_block( @@ -245,13 +267,6 @@ def unwrap_tx_trace_for_parity( block_json, tx_pos_in_block, tx_trace, position=[] ) -> List[Trace]: response_list = [] - _calltype_mapping = { - "CALL": "call", - "DELEGATECALL": "delegateCall", - "CREATE": "create", - "SUICIDE": "suicide", - "REWARD": "reward", - } try: if tx_trace["type"] == "STATICCALL": return [] @@ -279,7 +294,8 @@ def unwrap_tx_trace_for_parity( type=TraceType(_calltype_mapping[tx_trace["type"]]), ) ) - except Exception: + except Exception as e: + logger.warn(f"error while unwraping tx trace for parity {e}") return [] if "calls" in tx_trace.keys(): @@ -292,28 +308,20 @@ def unwrap_tx_trace_for_parity( return response_list -async def geth_get_tx_receipts_task(session, endpoint_uri, tx): - data = { - "jsonrpc": "2.0", - "id": "0", - "method": "eth_getTransactionReceipt", - "params": [tx.hex()], - } - async with session.post(endpoint_uri, json=data) as response: - if response.status != 200: - response.raise_for_status() - return await response.text() +async def geth_get_tx_receipts_task(base_provider, tx): + receipt = await base_provider.make_request("eth_getTransactionReceipt", [tx.hex()]) + return receipt -async def geth_get_tx_receipts_async(endpoint_uri, transactions): +async def geth_get_tx_receipts_async(base_provider, transactions): geth_tx_receipts = [] - async with aiohttp.ClientSession() as session: - tasks = [ - asyncio.create_task(geth_get_tx_receipts_task(session, endpoint_uri, tx)) - for tx in transactions - ] - geth_tx_receipts = await asyncio.gather(*tasks) - return [json.loads(tx_receipts) for tx_receipts in geth_tx_receipts] + tasks = [ + asyncio.create_task(geth_get_tx_receipts_task(base_provider, tx)) + for tx in transactions + ] + geth_tx_receipts = await asyncio.gather(*tasks) + # return [json.loads(tx_receipts) for tx_receipts in geth_tx_receipts] + return geth_tx_receipts def geth_receipts_translator(block_json, geth_tx_receipts) -> List[Receipt]: @@ -331,24 +339,18 @@ def geth_receipts_translator(block_json, geth_tx_receipts) -> List[Receipt]: def unwrap_tx_receipt_for_parity(block_json, tx_pos_in_block, tx_receipt) -> Receipt: - try: - if tx_pos_in_block != int(tx_receipt["transactionIndex"], 16): - print( - "Alert the position of transaction in block is mismatched ", - tx_pos_in_block, - tx_receipt["transactionIndex"], - ) - return Receipt( - block_number=block_json["number"], - transaction_hash=tx_receipt["transactionHash"], - transaction_index=tx_pos_in_block, - gas_used=tx_receipt["gasUsed"], - effective_gas_price=tx_receipt["effectiveGasPrice"], - cumulative_gas_used=tx_receipt["cumulativeGasUsed"], - to=tx_receipt["to"], + if tx_pos_in_block != int(tx_receipt["transactionIndex"], 16): + logger.info( + "Alert the position of transaction in block is mismatched ", + tx_pos_in_block, + tx_receipt["transactionIndex"], ) - - except Exception as e: - print("error while decoding receipt", tx_receipt, e) - - return Receipt() + return Receipt( + block_number=block_json["number"], + transaction_hash=tx_receipt["transactionHash"], + transaction_index=tx_pos_in_block, + gas_used=tx_receipt["gasUsed"], + effective_gas_price=tx_receipt["effectiveGasPrice"], + cumulative_gas_used=tx_receipt["cumulativeGasUsed"], + to=tx_receipt["to"], + ) diff --git a/mev_inspect/geth_poa_middleware.py b/mev_inspect/geth_poa_middleware.py index d5ae8c2..6eb103d 100644 --- a/mev_inspect/geth_poa_middleware.py +++ b/mev_inspect/geth_poa_middleware.py @@ -1,3 +1,7 @@ +""" +Modified asynchronous geth_poa_middleware which mirrors functionality of +https://github.com/ethereum/web3.py/blob/master/web3/middleware/geth_poa.py +""" from typing import ( Any, Callable, diff --git a/mev_inspect/inspect_block.py b/mev_inspect/inspect_block.py index 53ad8a4..11c5625 100644 --- a/mev_inspect/inspect_block.py +++ b/mev_inspect/inspect_block.py @@ -34,6 +34,7 @@ from mev_inspect.miner_payments import get_miner_payments from mev_inspect.swaps import get_swaps from mev_inspect.transfers import get_transfers from mev_inspect.liquidations import get_liquidations +from mev_inspect.utils import RPCType logger = logging.getLogger(__name__) @@ -43,7 +44,7 @@ async def inspect_block( inspect_db_session: orm.Session, base_provider, w3: Web3, - geth: bool, + type: RPCType, trace_classifier: TraceClassifier, block_number: int, trace_db_session: Optional[orm.Session], @@ -52,7 +53,7 @@ async def inspect_block( block = await create_from_block_number( base_provider, w3, - geth, + type, block_number, trace_db_session, ) diff --git a/mev_inspect/inspector.py b/mev_inspect/inspector.py index 51ea585..cfc2df1 100644 --- a/mev_inspect/inspector.py +++ b/mev_inspect/inspector.py @@ -12,6 +12,7 @@ from mev_inspect.block import create_from_block_number from mev_inspect.classifiers.trace import TraceClassifier from mev_inspect.inspect_block import inspect_block from mev_inspect.provider import get_base_provider +from mev_inspect.utils import RPCType logger = logging.getLogger(__name__) @@ -22,25 +23,15 @@ class MEVInspector: rpc: str, inspect_db_session: orm.Session, trace_db_session: Optional[orm.Session], - geth: bool = False, + type: RPCType = RPCType.parity, max_concurrency: int = 1, request_timeout: int = 300, ): self.inspect_db_session = inspect_db_session self.trace_db_session = trace_db_session - self.base_provider = get_base_provider(rpc, request_timeout, geth) - self.geth = geth + self.base_provider = get_base_provider(rpc, request_timeout, type) + self.type = type self.w3 = Web3(self.base_provider, modules={"eth": (AsyncEth,)}, middlewares=[]) - # if geth: - # self.w3 = Web3( - # self.base_provider, - # modules={"eth": (AsyncEth,)}, - # middlewares=[], - # ) - # else: - # self.w3 = Web3( - # self.base_provider, modules={"eth": (AsyncEth,)}, middlewares=[] - # ) self.trace_classifier = TraceClassifier() self.max_concurrency = asyncio.Semaphore(max_concurrency) @@ -48,7 +39,7 @@ class MEVInspector: return await create_from_block_number( base_provider=self.base_provider, w3=self.w3, - geth=self.geth, + type=self.type, block_number=block_number, trace_db_session=self.trace_db_session, ) @@ -58,7 +49,7 @@ class MEVInspector: self.inspect_db_session, self.base_provider, self.w3, - self.geth, + self.type, self.trace_classifier, block, trace_db_session=self.trace_db_session, @@ -87,7 +78,7 @@ class MEVInspector: self.inspect_db_session, self.base_provider, self.w3, - self.geth, + self.type, self.trace_classifier, block_number, trace_db_session=self.trace_db_session, diff --git a/mev_inspect/provider.py b/mev_inspect/provider.py index 9fb20eb..836d51e 100644 --- a/mev_inspect/provider.py +++ b/mev_inspect/provider.py @@ -2,13 +2,14 @@ from web3 import Web3, AsyncHTTPProvider from mev_inspect.retry import http_retry_with_backoff_request_middleware from mev_inspect.geth_poa_middleware import geth_poa_middleware +from mev_inspect.utils import RPCType def get_base_provider( - rpc: str, request_timeout: int = 500, geth: bool = False + rpc: str, request_timeout: int = 500, type: RPCType = RPCType.parity ) -> Web3.AsyncHTTPProvider: base_provider = AsyncHTTPProvider(rpc, request_kwargs={"timeout": request_timeout}) - if geth: + if type is RPCType.geth: base_provider.middlewares += ( geth_poa_middleware, http_retry_with_backoff_request_middleware, diff --git a/mev_inspect/utils.py b/mev_inspect/utils.py index 922fada..eb78413 100644 --- a/mev_inspect/utils.py +++ b/mev_inspect/utils.py @@ -1,5 +1,11 @@ +from enum import Enum from hexbytes._utils import hexstr_to_bytes +class RPCType(Enum): + parity = 0 + geth = 1 + + def hex_to_int(value: str) -> int: return int.from_bytes(hexstr_to_bytes(value), byteorder="big") From 935d0c9866e0a21b38bc1ece5cb6d0b42809f540 Mon Sep 17 00:00:00 2001 From: Supragya Raj Date: Thu, 9 Dec 2021 15:33:20 +0100 Subject: [PATCH 9/9] added translation for create2, README updates, shell script updates --- README.md | 8 ++++---- mev | 6 ++++-- mev_inspect/block.py | 1 + 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index bf9eb5e..d215af8 100644 --- a/README.md +++ b/README.md @@ -66,19 +66,19 @@ On first startup, you'll need to apply database migrations with: ### Inspect a single block Inspecting block [12914944](https://twitter.com/mevalphaleak/status/1420416437575901185): -**Note**: Add `--geth` at the end if RPC_URL points to a geth / geth like node. +**Note**: Add `geth` at the end instead of `parity` if RPC_URL points to a geth / geth like node. ``` -./mev inspect 12914944 +./mev inspect 12914944 parity ``` ### Inspect many blocks Inspecting blocks 12914944 to 12914954: -**Note**: Add `--geth` at the end if RPC_URL points to a geth / geth like node. +**Note**: Add `geth` at the end instead of `parity` if RPC_URL points to a geth / geth like node. ``` -./mev inspect-many 12914944 12914954 +./mev inspect-many 12914944 12914954 parity ``` ### Inspect all incoming blocks diff --git a/mev b/mev index d85015a..186ea79 100755 --- a/mev +++ b/mev @@ -37,15 +37,17 @@ case "$1" in ;; inspect) block_number=$2 + rpc_type=$3 echo "Inspecting block $block_number" - kubectl exec -ti deploy/mev-inspect -- poetry run inspect-block $block_number + kubectl exec -ti deploy/mev-inspect -- poetry run inspect-block $block_number --type $rpc_type ;; inspect-many) start_block_number=$2 end_block_number=$3 + rpc_type=$4 echo "Inspecting from block $start_block_number to $end_block_number" kubectl exec -ti deploy/mev-inspect -- \ - poetry run inspect-many-blocks $start_block_number $end_block_number + poetry run inspect-many-blocks $start_block_number $end_block_number --type $rpc_type ;; test) echo "Running tests" diff --git a/mev_inspect/block.py b/mev_inspect/block.py index 5f6a3fa..0b2a759 100644 --- a/mev_inspect/block.py +++ b/mev_inspect/block.py @@ -17,6 +17,7 @@ _calltype_mapping = { "CALL": "call", "DELEGATECALL": "delegateCall", "CREATE": "create", + "CREATE2": "create2", "SUICIDE": "suicide", "REWARD": "reward", }