Break out child trace check to traces.py. Remove unused nested code
This commit is contained in:
parent
1e5c82eac1
commit
5f4c018066
@ -51,11 +51,3 @@ class ClassifierSpec(BaseModel):
|
||||
protocol: Optional[Protocol] = None
|
||||
valid_contract_addresses: Optional[List[str]] = None
|
||||
classifications: Dict[str, Classification] = {}
|
||||
|
||||
|
||||
class NestedTrace(BaseModel):
|
||||
trace: ClassifiedTrace
|
||||
subtraces: List["NestedTrace"]
|
||||
|
||||
|
||||
NestedTrace.update_forward_refs()
|
||||
|
@ -7,6 +7,7 @@ from mev_inspect.schemas.classified_traces import (
|
||||
)
|
||||
from mev_inspect.schemas.swaps import Swap
|
||||
from mev_inspect.schemas.transfers import Transfer
|
||||
from mev_inspect.traces import is_child_trace_address
|
||||
|
||||
|
||||
UNISWAP_V2_PAIR_ABI_NAME = "UniswapV2Pair"
|
||||
@ -283,7 +284,7 @@ def _remove_inner_transfers(transfers: List[Transfer]) -> List[Transfer]:
|
||||
|
||||
for transfer in sorted_transfers:
|
||||
if not any(
|
||||
_is_subtrace(parent_address, transfer.trace_address)
|
||||
is_child_trace_address(transfer.trace_address, parent_address)
|
||||
for parent_address in transfer_trace_addresses
|
||||
):
|
||||
updated_transfers.append(transfer)
|
||||
@ -293,13 +294,6 @@ def _remove_inner_transfers(transfers: List[Transfer]) -> List[Transfer]:
|
||||
return updated_transfers
|
||||
|
||||
|
||||
def _is_subtrace(parent_trace_address, child_trace_address) -> bool:
|
||||
return (
|
||||
len(child_trace_address) > len(parent_trace_address)
|
||||
and child_trace_address[: len(parent_trace_address)] == parent_trace_address
|
||||
)
|
||||
|
||||
|
||||
def _filter_transfers(
|
||||
transfers: List[Transfer],
|
||||
to_address: Optional[str] = None,
|
||||
|
@ -1,85 +1,13 @@
|
||||
from itertools import groupby
|
||||
from typing import Iterable, List
|
||||
|
||||
from mev_inspect.schemas.classified_traces import ClassifiedTrace, NestedTrace
|
||||
from typing import List
|
||||
|
||||
|
||||
def as_nested_traces(traces: Iterable[ClassifiedTrace]) -> List[NestedTrace]:
|
||||
nested_traces = []
|
||||
def is_child_trace_address(
|
||||
child_trace_address: List[int],
|
||||
parent_trace_address: List[int],
|
||||
) -> bool:
|
||||
parent_trace_length = len(parent_trace_address)
|
||||
|
||||
sorted_by_transaction_hash = sorted(traces, key=_get_transaction_hash)
|
||||
for _, transaction_traces in groupby(
|
||||
sorted_by_transaction_hash, _get_transaction_hash
|
||||
):
|
||||
nested_traces += _as_nested_traces_by_transaction(transaction_traces)
|
||||
|
||||
return nested_traces
|
||||
|
||||
|
||||
def _get_transaction_hash(trace) -> str:
|
||||
return trace.transaction_hash
|
||||
|
||||
|
||||
def _as_nested_traces_by_transaction(
|
||||
traces: Iterable[ClassifiedTrace],
|
||||
) -> List[NestedTrace]:
|
||||
"""
|
||||
Turns a list of Traces into a a tree of NestedTraces
|
||||
using their trace addresses
|
||||
|
||||
Right now this has an exponential (?) runtime because we rescan
|
||||
most traces at each level of tree depth
|
||||
|
||||
TODO to write a better implementation if it becomes a bottleneck
|
||||
Should be doable in linear time
|
||||
"""
|
||||
|
||||
nested_traces = []
|
||||
|
||||
parent = None
|
||||
children: List[ClassifiedTrace] = []
|
||||
|
||||
sorted_traces = sorted(traces, key=lambda t: t.trace_address)
|
||||
|
||||
for trace in sorted_traces:
|
||||
if parent is None:
|
||||
parent = trace
|
||||
children = []
|
||||
continue
|
||||
|
||||
elif not is_subtrace(trace, parent):
|
||||
nested_traces.append(
|
||||
NestedTrace(
|
||||
trace=parent,
|
||||
subtraces=as_nested_traces(children),
|
||||
)
|
||||
)
|
||||
|
||||
parent = trace
|
||||
children = []
|
||||
|
||||
else:
|
||||
children.append(trace)
|
||||
|
||||
if parent is not None:
|
||||
nested_traces.append(
|
||||
NestedTrace(
|
||||
trace=parent,
|
||||
subtraces=as_nested_traces(children),
|
||||
)
|
||||
)
|
||||
|
||||
return nested_traces
|
||||
|
||||
|
||||
def is_subtrace(trace: ClassifiedTrace, parent: ClassifiedTrace):
|
||||
if trace.transaction_hash != parent.transaction_hash:
|
||||
return False
|
||||
|
||||
parent_trace_length = len(parent.trace_address)
|
||||
|
||||
if len(trace.trace_address) > parent_trace_length:
|
||||
prefix = trace.trace_address[:parent_trace_length]
|
||||
return prefix == parent.trace_address
|
||||
|
||||
return False
|
||||
return (
|
||||
len(child_trace_address) > parent_trace_length
|
||||
and child_trace_address[:parent_trace_length] == parent_trace_address
|
||||
)
|
||||
|
@ -1,97 +0,0 @@
|
||||
import unittest
|
||||
from typing import List
|
||||
|
||||
from mev_inspect.schemas import TraceType
|
||||
from mev_inspect.schemas.classified_traces import ClassifiedTrace, Classification, NestedTrace
|
||||
from mev_inspect.traces import as_nested_traces
|
||||
|
||||
|
||||
DEFAULT_BLOCK_NUMBER = 123
|
||||
|
||||
|
||||
class TestTraces(unittest.TestCase):
|
||||
def test_nested_traces(self):
|
||||
trace_hash_address_pairs = [
|
||||
("abc", [0, 2]),
|
||||
("abc", []),
|
||||
("abc", [2]),
|
||||
("abc", [0]),
|
||||
("abc", [0, 0]),
|
||||
("abc", [0, 1]),
|
||||
("abc", [1]),
|
||||
("efg", []),
|
||||
("abc", [1, 0]),
|
||||
("abc", [0, 1, 0]),
|
||||
("efg", [0]),
|
||||
]
|
||||
|
||||
traces = [
|
||||
build_trace_at_address(hash, address)
|
||||
for (hash, address) in trace_hash_address_pairs
|
||||
]
|
||||
|
||||
nested_traces = as_nested_traces(traces)
|
||||
|
||||
assert len(nested_traces) == 2
|
||||
|
||||
abc_trace = nested_traces[0]
|
||||
efg_trace = nested_traces[1]
|
||||
|
||||
# abc
|
||||
assert abc_trace.trace.transaction_hash == "abc"
|
||||
assert_trace_address(abc_trace, [])
|
||||
assert len(abc_trace.subtraces) == 3
|
||||
|
||||
[trace_0, trace_1, trace_2] = abc_trace.subtraces
|
||||
|
||||
assert_trace_address(trace_0, [0])
|
||||
assert_trace_address(trace_1, [1])
|
||||
assert_trace_address(trace_2, [2])
|
||||
|
||||
assert len(trace_0.subtraces) == 3
|
||||
assert len(trace_1.subtraces) == 1
|
||||
assert len(trace_2.subtraces) == 0
|
||||
|
||||
[trace_0_0, trace_0_1, trace_0_2] = trace_0.subtraces
|
||||
[trace_1_0] = trace_1.subtraces
|
||||
|
||||
assert_trace_address(trace_0_0, [0, 0])
|
||||
assert_trace_address(trace_0_1, [0, 1])
|
||||
assert_trace_address(trace_0_2, [0, 2])
|
||||
assert_trace_address(trace_1_0, [1, 0])
|
||||
|
||||
assert len(trace_0_0.subtraces) == 0
|
||||
assert len(trace_0_1.subtraces) == 1
|
||||
assert len(trace_0_2.subtraces) == 0
|
||||
assert len(trace_1_0.subtraces) == 0
|
||||
|
||||
[trace_0_1_0] = trace_0_1.subtraces
|
||||
assert_trace_address(trace_0_1_0, [0, 1, 0])
|
||||
assert len(trace_0_1_0.subtraces) == 0
|
||||
|
||||
# efg
|
||||
assert efg_trace.trace.transaction_hash == "efg"
|
||||
assert_trace_address(efg_trace, [])
|
||||
assert len(efg_trace.subtraces) == 1
|
||||
|
||||
[efg_subtrace] = efg_trace.subtraces
|
||||
|
||||
assert_trace_address(efg_subtrace, [0])
|
||||
assert len(efg_subtrace.subtraces) == 0
|
||||
|
||||
|
||||
def build_trace_at_address(
|
||||
transaction_hash: str,
|
||||
trace_address: List[int],
|
||||
) -> ClassifiedTrace:
|
||||
return ClassifiedTrace(
|
||||
transaction_hash=transaction_hash,
|
||||
trace_address=trace_address,
|
||||
block_number=DEFAULT_BLOCK_NUMBER,
|
||||
trace_type=TraceType.call,
|
||||
classification=Classification.unknown,
|
||||
)
|
||||
|
||||
|
||||
def assert_trace_address(nested_trace: NestedTrace, trace_address: List[int]):
|
||||
assert nested_trace.trace.trace_address == trace_address
|
Loading…
x
Reference in New Issue
Block a user