Merge branch 'geth_additions' of https://github.com/marlinprotocol/mev-inspect-py into main

This commit is contained in:
ZigaMr 2022-05-07 18:29:45 +02:00
commit 22d98afa1c
11 changed files with 343 additions and 24 deletions

View File

@ -45,6 +45,7 @@ Example:
export RPC_URL="http://111.111.111.111:8546"
```
**Note**: mev-inspect-py currently requires an RPC of a full archive node with support for Erigon traces and receipts. Geth additions have been added to translate geth traces and receipts to Erigon ones and can be accessed using `--geth` flag.
Next, start all services with:
@ -71,17 +72,19 @@ And load prices data
### Inspect a single block
Inspecting block [12914944](https://twitter.com/mevalphaleak/status/1420416437575901185):
**Note**: Add `geth` at the end instead of `parity` if RPC_URL points to a geth / geth like node.
```
./mev inspect 12914944
./mev inspect 12914944 parity
```
### Inspect many blocks
Inspecting blocks 12914944 to 12914954:
**Note**: Add `geth` at the end instead of `parity` if RPC_URL points to a geth / geth like node.
```
./mev inspect-many 12914944 12914954
./mev inspect-many 12914944 12914954 parity
```
### Inspect all incoming blocks

33
cli.py
View File

@ -20,6 +20,8 @@ from mev_inspect.queue.tasks import (
inspect_many_blocks_task,
)
from mev_inspect.s3_export import export_block
from mev_inspect.utils import RPCType
#from mev_inspect.prices import fetch_all_supported_prices
RPC_URL_ENV = "RPC_URL"
@ -35,12 +37,18 @@ def cli():
@cli.command()
@click.argument("block_number", type=int)
@click.option("--rpc", default=lambda: os.environ.get(RPC_URL_ENV, ""))
@click.option(
"--type",
type=click.Choice(list(map(lambda x: x.name, RPCType)), case_sensitive=False),
default=RPCType.parity.name,
)
@coro
async def inspect_block_command(block_number: int, rpc: str):
async def inspect_block_command(block_number: int, rpc: str, type: str):
type_e = convert_str_to_enum(type)
inspect_db_session = get_inspect_session()
trace_db_session = get_trace_session()
inspector = MEVInspector(rpc)
inspector = MEVInspector(rpc, inspect_db_session, trace_db_session, type_e)
await inspector.inspect_single_block(
inspect_db_session=inspect_db_session,
@ -48,6 +56,13 @@ async def inspect_block_command(block_number: int, rpc: str):
block=block_number,
)
def convert_str_to_enum(type: str) -> RPCType:
if type == "parity":
return RPCType.parity
elif type == "geth":
return RPCType.geth
raise ValueError
@cli.command()
@click.argument("block_number", type=int)
@ -56,12 +71,11 @@ async def inspect_block_command(block_number: int, rpc: str):
async def fetch_block_command(block_number: int, rpc: str):
trace_db_session = get_trace_session()
inspector = MEVInspector(rpc)
inspector = MEVInspector(rpc, RPCType.parity)
block = await inspector.create_from_block(
block_number=block_number,
trace_db_session=trace_db_session,
)
print(block.json())
@ -69,6 +83,11 @@ async def fetch_block_command(block_number: int, rpc: str):
@click.argument("after_block", type=int)
@click.argument("before_block", type=int)
@click.option("--rpc", default=lambda: os.environ.get(RPC_URL_ENV, ""))
@click.option(
"--type",
type=click.Choice(list(map(lambda x: x.name, RPCType)), case_sensitive=False),
default=RPCType.parity.name,
)
@click.option(
"--max-concurrency",
type=int,
@ -85,12 +104,14 @@ async def inspect_many_blocks_command(
rpc: str,
max_concurrency: int,
request_timeout: int,
type: str,
):
type_e = convert_str_to_enum(type)
inspect_db_session = get_inspect_session()
trace_db_session = get_trace_session()
inspector = MEVInspector(
rpc,
rpc=rpc,
type=type_e,
max_concurrency=max_concurrency,
request_timeout=request_timeout,
)

12
mev
View File

@ -58,15 +58,17 @@ case "$1" in
;;
inspect)
block_number=$2
rpc_type=$3
echo "Inspecting block $block_number"
kubectl exec -ti deploy/mev-inspect -- poetry run inspect-block $block_number
kubectl exec -ti deploy/mev-inspect -- poetry run inspect-block $block_number --type $rpc_type
;;
inspect-many)
after_block_number=$2
before_block_number=$3
echo "Inspecting from block $after_block_number to $before_block_number"
start_block_number=$2
end_block_number=$3
rpc_type=$4
echo "Inspecting from block $start_block_number to $end_block_number"
kubectl exec -ti deploy/mev-inspect -- \
poetry run inspect-many-blocks $after_block_number $before_block_number
poetry run inspect-many-blocks $start_block_number $end_block_number --type $rpc_type
;;
test)
shift

View File

@ -1,6 +1,7 @@
import asyncio
import logging
from typing import List, Optional
from aiohttp import TraceRequestStartParams
from sqlalchemy import orm
from web3 import Web3
@ -9,9 +10,17 @@ from mev_inspect.fees import fetch_base_fee_per_gas
from mev_inspect.schemas.blocks import Block
from mev_inspect.schemas.receipts import Receipt
from mev_inspect.schemas.traces import Trace, TraceType
from mev_inspect.utils import hex_to_int
from mev_inspect.utils import RPCType, hex_to_int
logger = logging.getLogger(__name__)
_calltype_mapping = {
"CALL": "call",
"DELEGATECALL": "delegateCall",
"CREATE": "create",
"CREATE2": "create2",
"SUICIDE": "suicide",
"REWARD": "reward",
}
async def get_latest_block_number(base_provider) -> int:
@ -25,13 +34,20 @@ async def get_latest_block_number(base_provider) -> int:
async def create_from_block_number(
w3: Web3,
type: RPCType,
block_number: int,
trace_db_session: Optional[orm.Session],
) -> Block:
if type == RPCType.geth:
block_json = await asyncio.gather(w3.eth.get_block(block_number))
else:
block_json = []
block_timestamp, receipts, traces, base_fee_per_gas = await asyncio.gather(
_find_or_fetch_block_timestamp(w3, block_number, trace_db_session),
_find_or_fetch_block_receipts(w3, block_number, trace_db_session),
_find_or_fetch_block_traces(w3, block_number, trace_db_session),
_find_or_fetch_block_receipts(w3, block_number, trace_db_session, type, block_json),
_find_or_fetch_block_traces(w3, block_number, trace_db_session, type, block_json),
_find_or_fetch_base_fee_per_gas(w3, block_number, trace_db_session),
)
@ -56,20 +72,28 @@ async def _find_or_fetch_block_timestamp(
existing_block_timestamp = _find_block_timestamp(trace_db_session, block_number)
if existing_block_timestamp is not None:
return existing_block_timestamp
return await _fetch_block_timestamp(w3, block_number)
return await _fetch_block_timestamp(w3, block_number)
async def _find_or_fetch_block_receipts(
w3,
block_number: int,
trace_db_session: Optional[orm.Session],
type: RPCType,
block_json: list = []
) -> List[Receipt]:
if trace_db_session is not None:
existing_block_receipts = _find_block_receipts(trace_db_session, block_number)
if existing_block_receipts is not None:
return existing_block_receipts
if type == RPCType.geth:
geth_tx_receipts = await geth_get_tx_receipts_async(
w3.provider, block_json[0]["transactions"]
)
receipts = geth_receipts_translator(block_json[0], geth_tx_receipts)
return await _fetch_block_receipts(w3, block_number)
@ -77,12 +101,19 @@ async def _find_or_fetch_block_traces(
w3,
block_number: int,
trace_db_session: Optional[orm.Session],
type: RPCType,
block_json: list = []
) -> List[Trace]:
if trace_db_session is not None:
existing_block_traces = _find_block_traces(trace_db_session, block_number)
if existing_block_traces is not None:
return existing_block_traces
if type == RPCType.geth:
# Translate to parity format
traces = await geth_get_tx_traces_parity_format(w3.provider, block_json[0])
return traces
return await _fetch_block_traces(w3, block_number)
@ -200,3 +231,120 @@ def get_transaction_hashes(calls: List[Trace]) -> List[str]:
result.append(call.transaction_hash)
return result
# Geth specific additions
async def geth_get_tx_traces_parity_format(base_provider, block_json: dict):
# print(block_json['hash'].hex())
block_hash = block_json["hash"]
block_trace = await geth_get_tx_traces(base_provider, block_hash)
# print(block_trace)
parity_traces = []
for idx, trace in enumerate(block_trace["result"]):
if "result" in trace:
parity_traces.extend(
unwrap_tx_trace_for_parity(block_json, idx, trace["result"])
)
return parity_traces
async def geth_get_tx_traces(base_provider, block_hash):
block_trace = await base_provider.make_request(
"debug_traceBlockByHash", [block_hash.hex(), {"tracer": "callTracer"}]
)
return block_trace
def unwrap_tx_trace_for_parity(
block_json, tx_pos_in_block, tx_trace, position=[]
) -> List[Trace]:
response_list = []
try:
if tx_trace["type"] == "STATICCALL":
return []
action_dict = dict()
action_dict["callType"] = _calltype_mapping[tx_trace["type"]]
if action_dict["callType"] == "call":
action_dict["value"] = tx_trace["value"]
for key in ["from", "to", "gas", "input"]:
action_dict[key] = tx_trace[key]
result_dict = dict()
for key in ["gasUsed", "output"]:
result_dict[key] = tx_trace[key]
response_list.append(
Trace(
action=action_dict,
block_hash=str(block_json["hash"]),
block_number=int(block_json["number"]),
result=result_dict,
subtraces=len(tx_trace["calls"]) if "calls" in tx_trace.keys() else 0,
trace_address=position,
transaction_hash=block_json["transactions"][tx_pos_in_block].hex(),
transaction_position=tx_pos_in_block,
type=TraceType(_calltype_mapping[tx_trace["type"]]),
)
)
except Exception as e:
logger.warn(f"error while unwraping tx trace for parity {e}")
return []
if "calls" in tx_trace.keys():
for idx, subcall in enumerate(tx_trace["calls"]):
response_list.extend(
unwrap_tx_trace_for_parity(
block_json, tx_pos_in_block, subcall, position + [idx]
)
)
return response_list
async def geth_get_tx_receipts_task(base_provider, tx):
receipt = await base_provider.make_request("eth_getTransactionReceipt", [tx.hex()])
return receipt
async def geth_get_tx_receipts_async(base_provider, transactions):
geth_tx_receipts = []
tasks = [
asyncio.create_task(geth_get_tx_receipts_task(base_provider, tx))
for tx in transactions
]
geth_tx_receipts = await asyncio.gather(*tasks)
# return [json.loads(tx_receipts) for tx_receipts in geth_tx_receipts]
return geth_tx_receipts
def geth_receipts_translator(block_json, geth_tx_receipts) -> List[Receipt]:
json_decoded_receipts = [
tx_receipt["result"]
if tx_receipt != None and ("result" in tx_receipt.keys())
else None
for tx_receipt in geth_tx_receipts
]
results = []
for idx, tx_receipt in enumerate(json_decoded_receipts):
if tx_receipt != None:
results.append(unwrap_tx_receipt_for_parity(block_json, idx, tx_receipt))
return results
def unwrap_tx_receipt_for_parity(block_json, tx_pos_in_block, tx_receipt) -> Receipt:
if tx_pos_in_block != int(tx_receipt["transactionIndex"], 16):
logger.info(
"Alert the position of transaction in block is mismatched ",
tx_pos_in_block,
tx_receipt["transactionIndex"],
)
return Receipt(
block_number=block_json["number"],
transaction_hash=tx_receipt["transactionHash"],
transaction_index=tx_pos_in_block,
gas_used=tx_receipt["gasUsed"],
effective_gas_price=tx_receipt["effectiveGasPrice"],
cumulative_gas_used=tx_receipt["cumulativeGasUsed"],
to=tx_receipt["to"],
)

View File

@ -0,0 +1,101 @@
"""
Modified asynchronous geth_poa_middleware which mirrors functionality of
https://github.com/ethereum/web3.py/blob/master/web3/middleware/geth_poa.py
"""
from typing import (
Any,
Callable,
)
from hexbytes import (
HexBytes,
)
from eth_utils.curried import (
apply_formatter_if,
apply_formatters_to_dict,
apply_key_map,
is_null,
)
from eth_utils.toolz import (
complement,
compose,
assoc,
)
from web3._utils.rpc_abi import (
RPC,
)
from web3.types import (
Formatters,
RPCEndpoint,
RPCResponse,
)
from web3 import Web3 # noqa: F401
async def get_geth_poa_middleware(
make_request: Callable[[RPCEndpoint, Any], RPCResponse],
request_formatters: Formatters = {},
result_formatters: Formatters = {},
error_formatters: Formatters = {},
) -> RPCResponse:
async def middleware(method: RPCEndpoint, params: Any) -> RPCResponse:
if method in request_formatters:
formatter = request_formatters[method]
formatted_params = formatter(params)
response = await make_request(method, formatted_params)
else:
response = await make_request(method, params)
if "result" in response and method in result_formatters:
formatter = result_formatters[method]
formatted_response = assoc(
response,
"result",
formatter(response["result"]),
)
return formatted_response
elif "error" in response and method in error_formatters:
formatter = error_formatters[method]
formatted_response = assoc(
response,
"error",
formatter(response["error"]),
)
return formatted_response
else:
return response
return middleware
is_not_null = complement(is_null)
remap_geth_poa_fields = apply_key_map(
{
"extraData": "proofOfAuthorityData",
}
)
pythonic_geth_poa = apply_formatters_to_dict(
{
"proofOfAuthorityData": HexBytes,
}
)
geth_poa_cleanup = compose(pythonic_geth_poa, remap_geth_poa_fields)
async def geth_poa_middleware(make_request: Callable[[RPCEndpoint, Any], Any], _: Web3):
return await get_geth_poa_middleware(
make_request=make_request,
request_formatters={},
result_formatters={
RPC.eth_getBlockByHash: apply_formatter_if(is_not_null, geth_poa_cleanup),
RPC.eth_getBlockByNumber: apply_formatter_if(is_not_null, geth_poa_cleanup),
},
error_formatters={},
)

View File

@ -53,6 +53,9 @@ from mev_inspect.schemas.traces import ClassifiedTrace
from mev_inspect.schemas.transfers import Transfer
from mev_inspect.swaps import get_swaps
from mev_inspect.transfers import get_transfers
from mev_inspect.liquidations import get_liquidations
from mev_inspect.utils import RPCType
logger = logging.getLogger(__name__)
@ -60,6 +63,7 @@ logger = logging.getLogger(__name__)
async def inspect_block(
inspect_db_session: orm.Session,
w3: Web3,
type: RPCType,
trace_classifier: TraceClassifier,
block_number: int,
trace_db_session: Optional[orm.Session],
@ -69,6 +73,7 @@ async def inspect_block(
inspect_db_session,
w3,
trace_classifier,
type,
block_number,
block_number + 1,
trace_db_session,
@ -80,6 +85,7 @@ async def inspect_many_blocks(
inspect_db_session: orm.Session,
w3: Web3,
trace_classifier: TraceClassifier,
type: RPCType,
after_block_number: int,
before_block_number: int,
trace_db_session: Optional[orm.Session],
@ -103,9 +109,10 @@ async def inspect_many_blocks(
for block_number in range(after_block_number, before_block_number):
block = await create_from_block_number(
w3,
block_number,
trace_db_session,
w3=w3,
block_number=block_number,
trace_db_session=trace_db_session,
type=type
)
logger.info(f"Block: {block_number} -- Total traces: {len(block.traces)}")

View File

@ -13,6 +13,7 @@ from mev_inspect.classifiers.trace import TraceClassifier
from mev_inspect.inspect_block import inspect_block, inspect_many_blocks
from mev_inspect.methods import get_block_receipts, trace_block
from mev_inspect.provider import get_base_provider
from mev_inspect.utils import RPCType
logger = logging.getLogger(__name__)
@ -27,11 +28,13 @@ class MEVInspector:
def __init__(
self,
rpc: str,
type: RPCType = RPCType.parity,
max_concurrency: int = 1,
request_timeout: int = 300,
):
base_provider = get_base_provider(rpc, request_timeout=request_timeout)
base_provider = get_base_provider(rpc, request_timeout, type)
self.w3 = Web3(base_provider, modules={"eth": (AsyncEth,)}, middlewares=[])
self.type = type
self.trace_classifier = TraceClassifier()
self.max_concurrency = asyncio.Semaphore(max_concurrency)
@ -43,6 +46,7 @@ class MEVInspector:
):
return await create_from_block_number(
w3=self.w3,
type=self.type,
block_number=block_number,
trace_db_session=trace_db_session,
)
@ -56,6 +60,7 @@ class MEVInspector:
return await inspect_block(
inspect_db_session,
self.w3,
self.type,
self.trace_classifier,
block,
trace_db_session=trace_db_session,
@ -105,6 +110,7 @@ class MEVInspector:
return await inspect_many_blocks(
inspect_db_session,
self.w3,
self.type,
self.trace_classifier,
after_block_number,
before_block_number,

View File

@ -1,9 +1,19 @@
from web3 import AsyncHTTPProvider, Web3
from mev_inspect.retry import http_retry_with_backoff_request_middleware
from mev_inspect.geth_poa_middleware import geth_poa_middleware
from mev_inspect.utils import RPCType
def get_base_provider(rpc: str, request_timeout: int = 500) -> Web3.AsyncHTTPProvider:
def get_base_provider(
rpc: str, request_timeout: int = 500, type: RPCType = RPCType.parity
) -> Web3.AsyncHTTPProvider:
base_provider = AsyncHTTPProvider(rpc, request_kwargs={"timeout": request_timeout})
base_provider.middlewares += (http_retry_with_backoff_request_middleware,)
if type is RPCType.geth:
base_provider.middlewares += (
geth_poa_middleware,
http_retry_with_backoff_request_middleware,
)
else:
base_provider.middlewares += (http_retry_with_backoff_request_middleware,)
return base_provider

View File

@ -1,6 +1,12 @@
from enum import Enum
from hexbytes._utils import hexstr_to_bytes
class RPCType(Enum):
parity = 0
geth = 1
def hex_to_int(value: str) -> int:
return int.from_bytes(hexstr_to_bytes(value), byteorder="big")

14
poetry.lock generated
View File

@ -77,6 +77,14 @@ python-versions = ">=3.6"
[package.dependencies]
typing-extensions = ">=3.6.5"
[[package]]
name = "asyncio"
version = "3.4.3"
description = "reference implementation of PEP 3156"
category = "main"
optional = false
python-versions = "*"
[[package]]
name = "atomicwrites"
version = "1.4.0"
@ -1289,6 +1297,12 @@ async-timeout = [
{file = "async-timeout-4.0.0.tar.gz", hash = "sha256:7d87a4e8adba8ededb52e579ce6bc8276985888913620c935094c2276fd83382"},
{file = "async_timeout-4.0.0-py3-none-any.whl", hash = "sha256:f3303dddf6cafa748a92747ab6c2ecf60e0aeca769aee4c151adfce243a05d9b"},
]
asyncio = [
{file = "asyncio-3.4.3-cp33-none-win32.whl", hash = "sha256:b62c9157d36187eca799c378e572c969f0da87cd5fc42ca372d92cdb06e7e1de"},
{file = "asyncio-3.4.3-cp33-none-win_amd64.whl", hash = "sha256:c46a87b48213d7464f22d9a497b9eef8c1928b68320a2fa94240f969f6fec08c"},
{file = "asyncio-3.4.3-py3-none-any.whl", hash = "sha256:c4d18b22701821de07bd6aea8b53d21449ec0ec5680645e5317062ea21817d2d"},
{file = "asyncio-3.4.3.tar.gz", hash = "sha256:83360ff8bc97980e4ff25c964c7bd3923d333d177aa4f7fb736b019f26c7cb41"},
]
atomicwrites = [
{file = "atomicwrites-1.4.0-py2.py3-none-any.whl", hash = "sha256:6d1784dea7c0c8d4a5172b6c620f40b6e4cbfdf96d783691f2e1302a7b88e197"},
{file = "atomicwrites-1.4.0.tar.gz", hash = "sha256:ae70396ad1a434f9c7046fd2dd196fc04b12f9e91ffb859164193be8b6168a7a"},

View File

@ -16,6 +16,7 @@ dramatiq = {extras = ["redis"], version = "^1.12.1"}
pycoingecko = "^2.2.0"
boto3 = "^1.20.48"
aiohttp-retry = "^2.4.6"
asyncio = "^3.4.3"
[tool.poetry.dev-dependencies]
pre-commit = "^2.13.0"