Merge branch 'geth_additions' into main-gethmerge

This commit is contained in:
Supragya Raj 2021-11-23 14:43:18 +05:30 committed by GitHub
commit 54bd2e68d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 213 additions and 37 deletions

View File

@ -45,6 +45,7 @@ Example:
export RPC_URL="http://111.111.111.111:8546" export RPC_URL="http://111.111.111.111:8546"
``` ```
**Note**: mev-inspect-py currently requires an RPC of a full archive node with support for Erigon traces and receipts. Geth additions have been added to translate geth traces and receipts to Erigon ones and can be accessed using `--geth` flag.
Next, start all services with: Next, start all services with:
@ -65,6 +66,7 @@ On first startup, you'll need to apply database migrations with:
### Inspect a single block ### Inspect a single block
Inspecting block [12914944](https://twitter.com/mevalphaleak/status/1420416437575901185): Inspecting block [12914944](https://twitter.com/mevalphaleak/status/1420416437575901185):
**Note**: Add `--geth` at the end if RPC_URL points to a geth / geth like node.
``` ```
./mev inspect 12914944 ./mev inspect 12914944
@ -73,6 +75,7 @@ Inspecting block [12914944](https://twitter.com/mevalphaleak/status/142041643757
### Inspect many blocks ### Inspect many blocks
Inspecting blocks 12914944 to 12914954: Inspecting blocks 12914944 to 12914954:
**Note**: Add `--geth` at the end if RPC_URL points to a geth / geth like node.
``` ```
./mev inspect-many 12914944 12914954 ./mev inspect-many 12914944 12914954

15
cli.py
View File

@ -3,6 +3,8 @@ import os
import sys import sys
import click import click
from web3 import Web3
from web3.middleware import geth_poa_middleware
from mev_inspect.concurrency import coro from mev_inspect.concurrency import coro
from mev_inspect.db import get_inspect_session, get_trace_session from mev_inspect.db import get_inspect_session, get_trace_session
@ -21,12 +23,13 @@ def cli():
@cli.command() @cli.command()
@click.argument("block_number", type=int) @click.argument("block_number", type=int)
@click.option("--rpc", default=lambda: os.environ.get(RPC_URL_ENV, "")) @click.option("--rpc", default=lambda: os.environ.get(RPC_URL_ENV, ""))
@click.option("--geth/--no-geth", default=False)
@coro @coro
async def inspect_block_command(block_number: int, rpc: str): async def inspect_block_command(block_number: int, rpc: str, geth: bool):
inspect_db_session = get_inspect_session() inspect_db_session = get_inspect_session()
trace_db_session = get_trace_session() trace_db_session = get_trace_session()
inspector = MEVInspector(rpc, inspect_db_session, trace_db_session) inspector = MEVInspector(rpc, inspect_db_session, trace_db_session, geth)
await inspector.inspect_single_block(block=block_number) await inspector.inspect_single_block(block=block_number)
@ -38,7 +41,7 @@ async def fetch_block_command(block_number: int, rpc: str):
inspect_db_session = get_inspect_session() inspect_db_session = get_inspect_session()
trace_db_session = get_trace_session() trace_db_session = get_trace_session()
inspector = MEVInspector(rpc, inspect_db_session, trace_db_session) inspector = MEVInspector(rpc, inspect_db_session, trace_db_session, false)
block = await inspector.create_from_block(block_number=block_number) block = await inspector.create_from_block(block_number=block_number)
print(block.json()) print(block.json())
@ -47,6 +50,8 @@ async def fetch_block_command(block_number: int, rpc: str):
@click.argument("after_block", type=int) @click.argument("after_block", type=int)
@click.argument("before_block", type=int) @click.argument("before_block", type=int)
@click.option("--rpc", default=lambda: os.environ.get(RPC_URL_ENV, "")) @click.option("--rpc", default=lambda: os.environ.get(RPC_URL_ENV, ""))
@click.option("--geth/--no-geth", default=False)
@click.option( @click.option(
"--max-concurrency", "--max-concurrency",
type=int, type=int,
@ -63,22 +68,22 @@ async def inspect_many_blocks_command(
rpc: str, rpc: str,
max_concurrency: int, max_concurrency: int,
request_timeout: int, request_timeout: int,
geth: bool
): ):
inspect_db_session = get_inspect_session() inspect_db_session = get_inspect_session()
trace_db_session = get_trace_session() trace_db_session = get_trace_session()
inspector = MEVInspector( inspector = MEVInspector(
rpc, rpc,
inspect_db_session, inspect_db_session,
trace_db_session, trace_db_session,
max_concurrency=max_concurrency, max_concurrency=max_concurrency,
request_timeout=request_timeout, request_timeout=request_timeout,
geth
) )
await inspector.inspect_many_blocks( await inspector.inspect_many_blocks(
after_block=after_block, before_block=before_block after_block=after_block, before_block=before_block
) )
def get_rpc_url() -> str: def get_rpc_url() -> str:
return os.environ["RPC_URL"] return os.environ["RPC_URL"]

View File

@ -1,6 +1,9 @@
import asyncio import asyncio
import logging import logging
from typing import List, Optional from typing import List, Optional
import json
import asyncio
import aiohttp
from sqlalchemy import orm from sqlalchemy import orm
from web3 import Web3 from web3 import Web3
@ -27,6 +30,7 @@ async def get_latest_block_number(base_provider) -> int:
async def create_from_block_number( async def create_from_block_number(
base_provider, base_provider,
w3: Web3, w3: Web3,
geth: bool,
block_number: int, block_number: int,
trace_db_session: Optional[orm.Session], trace_db_session: Optional[orm.Session],
) -> Block: ) -> Block:
@ -42,38 +46,46 @@ async def create_from_block_number(
return block return block
async def _fetch_block(w3, base_provider, block_number: int, retries: int = 0) -> Block: async def _fetch_block(w3, base_provider, geth, block_number: int, retries: int = 0) -> Block:
block_json, receipts_json, traces_json, base_fee_per_gas = await asyncio.gather( if not geth:
w3.eth.get_block(block_number), block_json, receipts_json, traces_json, base_fee_per_gas = await asyncio.gather(
base_provider.make_request("eth_getBlockReceipts", [block_number]), w3.eth.get_block(block_number),
base_provider.make_request("trace_block", [block_number]), base_provider.make_request("eth_getBlockReceipts", [block_number]),
fetch_base_fee_per_gas(w3, block_number), base_provider.make_request("trace_block", [block_number]),
) fetch_base_fee_per_gas(w3, block_number),
try:
receipts: List[Receipt] = [
Receipt(**receipt) for receipt in receipts_json["result"]
]
traces = [Trace(**trace_json) for trace_json in traces_json["result"]]
except KeyError as e:
logger.warning(
f"Failed to create objects from block: {block_number}: {e}, retrying: {retries + 1} / 3"
) )
if retries < 3:
await asyncio.sleep(5)
return await _fetch_block(w3, base_provider, block_number, retries)
else:
raise
return Block(
block_number=block_number,
block_timestamp=block_json["timestamp"],
miner=block_json["miner"],
base_fee_per_gas=base_fee_per_gas,
traces=traces,
receipts=receipts,
)
try:
receipts: List[Receipt] = [
Receipt(**receipt) for receipt in receipts_json["result"]
]
traces = [Trace(**trace_json) for trace_json in traces_json["result"]]
except KeyError as e:
logger.warning(
f"Failed to create objects from block: {block_number}: {e}, retrying: {retries + 1} / 3"
)
if retries < 3:
await asyncio.sleep(5)
return await _fetch_block(w3, base_provider, block_number, retries)
else:
raise
else:
traces = geth_get_tx_traces_parity_format(base_provider, block_json)
geth_tx_receipts = geth_get_tx_receipts(
base_provider, block_json["transactions"]
)
receipts = geth_receipts_translator(block_json, geth_tx_receipts)
base_fee_per_gas = 0
return Block(
block_number=block_number,
block_timestamp=block_json["timestamp"],
miner=block_json["miner"],
base_fee_per_gas=base_fee_per_gas,
traces=traces,
receipts=receipts,
)
def _find_block( def _find_block(
trace_db_session: orm.Session, trace_db_session: orm.Session,
@ -106,7 +118,6 @@ def _find_block(
receipts=receipts, receipts=receipts,
) )
def _find_block_timestamp( def _find_block_timestamp(
trace_db_session: orm.Session, trace_db_session: orm.Session,
block_number: int, block_number: int,
@ -191,3 +202,143 @@ def get_transaction_hashes(calls: List[Trace]) -> List[str]:
result.append(call.transaction_hash) result.append(call.transaction_hash)
return result return result
# Geth specific additions
def geth_get_tx_traces_parity_format(base_provider, block_json):
block_hash = block_json["hash"]
block_trace = geth_get_tx_traces(base_provider, block_hash)
parity_traces = []
for idx, trace in enumerate(block_trace["result"]):
if "result" in trace:
parity_traces.extend(
unwrap_tx_trace_for_parity(block_json, idx, trace["result"])
)
return parity_traces
def geth_get_tx_traces(base_provider, block_hash):
block_trace = base_provider.make_request(
"debug_traceBlockByHash", [block_hash.hex(), {"tracer": "callTracer"}]
)
return block_trace
def unwrap_tx_trace_for_parity(
block_json, tx_pos_in_block, tx_trace, position=[]
) -> List[Trace]:
response_list = []
_calltype_mapping = {
"CALL": "call",
"DELEGATECALL": "delegateCall",
"CREATE": "create",
"SUICIDE": "suicide",
"REWARD": "reward",
}
try:
if tx_trace["type"] == "STATICCALL":
return []
action_dict = dict()
action_dict["callType"] = _calltype_mapping[tx_trace["type"]]
if action_dict["callType"] == "call":
action_dict["value"] = tx_trace["value"]
for key in ["from", "to", "gas", "input"]:
action_dict[key] = tx_trace[key]
result_dict = dict()
for key in ["gasUsed", "output"]:
result_dict[key] = tx_trace[key]
response_list.append(
Trace(
action=action_dict,
block_hash=str(block_json["hash"]),
block_number=int(block_json["number"]),
result=result_dict,
subtraces=len(tx_trace["calls"]) if "calls" in tx_trace.keys() else 0,
trace_address=position,
transaction_hash=block_json["transactions"][tx_pos_in_block].hex(),
transaction_position=tx_pos_in_block,
type=TraceType(_calltype_mapping[tx_trace["type"]]),
)
)
except Exception:
return []
if "calls" in tx_trace.keys():
for idx, subcall in enumerate(tx_trace["calls"]):
response_list.extend(
unwrap_tx_trace_for_parity(
block_json, tx_pos_in_block, subcall, position + [idx]
)
)
return response_list
async def geth_get_tx_receipts_task(session, endpoint_uri, tx):
data = {
"jsonrpc": "2.0",
"id": "0",
"method": "eth_getTransactionReceipt",
"params": [tx.hex()],
}
async with session.post(endpoint_uri, json=data) as response:
if response.status != 200:
response.raise_for_status()
return await response.text()
async def geth_get_tx_receipts_async(endpoint_uri, transactions):
geth_tx_receipts = []
async with aiohttp.ClientSession() as session:
tasks = [
asyncio.create_task(geth_get_tx_receipts_task(session, endpoint_uri, tx))
for tx in transactions
]
geth_tx_receipts = await asyncio.gather(*tasks)
return [json.loads(tx_receipts) for tx_receipts in geth_tx_receipts]
def geth_get_tx_receipts(base_provider, transactions):
return asyncio.run(
geth_get_tx_receipts_async(base_provider.endpoint_uri, transactions)
)
def geth_receipts_translator(block_json, geth_tx_receipts) -> List[Receipt]:
json_decoded_receipts = [
tx_receipt["result"]
if tx_receipt != None and ("result" in tx_receipt.keys())
else None
for tx_receipt in geth_tx_receipts
]
results = []
for idx, tx_receipt in enumerate(json_decoded_receipts):
if tx_receipt != None:
results.append(unwrap_tx_receipt_for_parity(block_json, idx, tx_receipt))
return results
def unwrap_tx_receipt_for_parity(block_json, tx_pos_in_block, tx_receipt) -> Receipt:
try:
if tx_pos_in_block != int(tx_receipt["transactionIndex"], 16):
print(
"Alert the position of transaction in block is mismatched ",
tx_pos_in_block,
tx_receipt["transactionIndex"],
)
return Receipt(
block_number=block_json["number"],
transaction_hash=tx_receipt["transactionHash"],
transaction_index=tx_pos_in_block,
gas_used=tx_receipt["gasUsed"],
effective_gas_price=tx_receipt["effectiveGasPrice"],
cumulative_gas_used=tx_receipt["cumulativeGasUsed"],
to=tx_receipt["to"],
)
except Exception as e:
print("error while decoding receipt", tx_receipt, e)
return Receipt()

View File

@ -43,7 +43,8 @@ async def inspect_block(
inspect_db_session: orm.Session, inspect_db_session: orm.Session,
base_provider, base_provider,
w3: Web3, w3: Web3,
trace_classifier: TraceClassifier, geth: bool,
trace_clasifier: TraceClassifier,
block_number: int, block_number: int,
trace_db_session: Optional[orm.Session], trace_db_session: Optional[orm.Session],
should_write_classified_traces: bool = True, should_write_classified_traces: bool = True,
@ -51,6 +52,7 @@ async def inspect_block(
block = await create_from_block_number( block = await create_from_block_number(
base_provider, base_provider,
w3, w3,
geth,
block_number, block_number,
trace_db_session, trace_db_session,
) )

14
poetry.lock generated
View File

@ -66,6 +66,14 @@ python-versions = ">=3.6"
[package.dependencies] [package.dependencies]
typing-extensions = ">=3.6.5" typing-extensions = ">=3.6.5"
[[package]]
name = "asyncio"
version = "3.4.3"
description = "reference implementation of PEP 3156"
category = "main"
optional = false
python-versions = "*"
[[package]] [[package]]
name = "atomicwrites" name = "atomicwrites"
version = "1.4.0" version = "1.4.0"
@ -1125,6 +1133,12 @@ async-timeout = [
{file = "async-timeout-4.0.0.tar.gz", hash = "sha256:7d87a4e8adba8ededb52e579ce6bc8276985888913620c935094c2276fd83382"}, {file = "async-timeout-4.0.0.tar.gz", hash = "sha256:7d87a4e8adba8ededb52e579ce6bc8276985888913620c935094c2276fd83382"},
{file = "async_timeout-4.0.0-py3-none-any.whl", hash = "sha256:f3303dddf6cafa748a92747ab6c2ecf60e0aeca769aee4c151adfce243a05d9b"}, {file = "async_timeout-4.0.0-py3-none-any.whl", hash = "sha256:f3303dddf6cafa748a92747ab6c2ecf60e0aeca769aee4c151adfce243a05d9b"},
] ]
asyncio = [
{file = "asyncio-3.4.3-cp33-none-win32.whl", hash = "sha256:b62c9157d36187eca799c378e572c969f0da87cd5fc42ca372d92cdb06e7e1de"},
{file = "asyncio-3.4.3-cp33-none-win_amd64.whl", hash = "sha256:c46a87b48213d7464f22d9a497b9eef8c1928b68320a2fa94240f969f6fec08c"},
{file = "asyncio-3.4.3-py3-none-any.whl", hash = "sha256:c4d18b22701821de07bd6aea8b53d21449ec0ec5680645e5317062ea21817d2d"},
{file = "asyncio-3.4.3.tar.gz", hash = "sha256:83360ff8bc97980e4ff25c964c7bd3923d333d177aa4f7fb736b019f26c7cb41"},
]
atomicwrites = [ atomicwrites = [
{file = "atomicwrites-1.4.0-py2.py3-none-any.whl", hash = "sha256:6d1784dea7c0c8d4a5172b6c620f40b6e4cbfdf96d783691f2e1302a7b88e197"}, {file = "atomicwrites-1.4.0-py2.py3-none-any.whl", hash = "sha256:6d1784dea7c0c8d4a5172b6c620f40b6e4cbfdf96d783691f2e1302a7b88e197"},
{file = "atomicwrites-1.4.0.tar.gz", hash = "sha256:ae70396ad1a434f9c7046fd2dd196fc04b12f9e91ffb859164193be8b6168a7a"}, {file = "atomicwrites-1.4.0.tar.gz", hash = "sha256:ae70396ad1a434f9c7046fd2dd196fc04b12f9e91ffb859164193be8b6168a7a"},

View File

@ -12,6 +12,7 @@ hexbytes = "^0.2.1"
click = "^8.0.1" click = "^8.0.1"
psycopg2 = "^2.9.1" psycopg2 = "^2.9.1"
aiohttp = "^3.8.0" aiohttp = "^3.8.0"
asyncio = "^3.4.3"
[tool.poetry.dev-dependencies] [tool.poetry.dev-dependencies]
pre-commit = "^2.13.0" pre-commit = "^2.13.0"