Remove nested traces

This commit is contained in:
Luke Van Seters 2021-08-05 12:35:38 -04:00
parent deea9e28ea
commit 162443efd8
4 changed files with 26 additions and 185 deletions

View File

@ -1,2 +1,2 @@
from .abi import ABI from .abi import ABI
from .blocks import Block, NestedTrace, Trace, TraceType from .blocks import Block, Trace, TraceType

View File

@ -1,7 +1,7 @@
from enum import Enum from enum import Enum
from typing import Dict, List, Optional from typing import Dict, List, Optional
from pydantic import BaseModel, validator from pydantic import validator
from mev_inspect.utils import hex_to_int from mev_inspect.utils import hex_to_int
from .utils import CamelModel, Web3Model from .utils import CamelModel, Web3Model
@ -66,11 +66,3 @@ class Block(Web3Model):
def get_filtered_traces(self, hash: str) -> List[Trace]: def get_filtered_traces(self, hash: str) -> List[Trace]:
return [trace for trace in self.traces if trace.transaction_hash == hash] return [trace for trace in self.traces if trace.transaction_hash == hash]
class NestedTrace(BaseModel):
trace: Trace
subtraces: List["NestedTrace"]
NestedTrace.update_forward_refs()

View File

@ -1,80 +1,32 @@
from itertools import groupby from typing import List
from typing import Iterable, List
from mev_inspect.schemas import Trace, NestedTrace from mev_inspect.schemas.classified_traces import ClassifiedTrace
def as_nested_traces(traces: Iterable[Trace]) -> List[NestedTrace]: def is_child_trace_address(
nested_traces = [] child_trace_address: List[int],
parent_trace_address: List[int],
) -> bool:
parent_trace_length = len(parent_trace_address)
sorted_by_transaction_hash = sorted(traces, key=_get_transaction_hash) return (
for _, transaction_traces in groupby( len(child_trace_address) > parent_trace_length
sorted_by_transaction_hash, _get_transaction_hash and child_trace_address[:parent_trace_length] == parent_trace_address
)
def get_child_traces(
parent_trace_address: List[int],
traces: List[ClassifiedTrace],
) -> List[ClassifiedTrace]:
ordered_traces = sorted(traces, key=lambda t: t.trace_address)
child_traces = []
for trace in ordered_traces:
if is_child_trace_address(
trace.trace_address,
parent_trace_address,
): ):
nested_traces += _as_nested_traces_by_transaction(transaction_traces) child_traces.append(trace)
return nested_traces return child_traces
def _get_transaction_hash(trace) -> str:
return trace.transaction_hash
def _as_nested_traces_by_transaction(traces: Iterable[Trace]) -> List[NestedTrace]:
"""
Turns a list of Traces into a a tree of NestedTraces
using their trace addresses
Right now this has an exponential (?) runtime because we rescan
most traces at each level of tree depth
TODO to write a better implementation if it becomes a bottleneck
Should be doable in linear time
"""
nested_traces = []
parent = None
children: List[Trace] = []
sorted_traces = sorted(traces, key=lambda t: t.trace_address)
for trace in sorted_traces:
if parent is None:
parent = trace
children = []
continue
elif not _is_subtrace(trace, parent):
nested_traces.append(
NestedTrace(
trace=parent,
subtraces=as_nested_traces(children),
)
)
parent = trace
children = []
else:
children.append(trace)
if parent is not None:
nested_traces.append(
NestedTrace(
trace=parent,
subtraces=as_nested_traces(children),
)
)
return nested_traces
def _is_subtrace(trace: Trace, parent: Trace):
parent_trace_length = len(parent.trace_address)
if len(trace.trace_address) > parent_trace_length:
prefix = trace.trace_address[:parent_trace_length]
return prefix == parent.trace_address
return False

View File

@ -1,103 +0,0 @@
import unittest
from typing import List
from mev_inspect.schemas import Trace, TraceType, NestedTrace
from mev_inspect.traces import as_nested_traces
DEFAULT_BLOCK_NUMBER = 123
class TestTraces(unittest.TestCase):
def test_nested_traces(self):
trace_hash_address_pairs = [
("abc", [0, 2]),
("abc", []),
("abc", [2]),
("abc", [0]),
("abc", [0, 0]),
("abc", [0, 1]),
("abc", [1]),
("efg", []),
("abc", [1, 0]),
("abc", [0, 1, 0]),
("efg", [0]),
]
traces = [
build_trace_at_address(hash, address)
for (hash, address) in trace_hash_address_pairs
]
nested_traces = as_nested_traces(traces)
assert len(nested_traces) == 2
abc_trace = nested_traces[0]
efg_trace = nested_traces[1]
# abc
assert abc_trace.trace.transaction_hash == "abc"
assert_trace_address(abc_trace, [])
assert len(abc_trace.subtraces) == 3
[trace_0, trace_1, trace_2] = abc_trace.subtraces
assert_trace_address(trace_0, [0])
assert_trace_address(trace_1, [1])
assert_trace_address(trace_2, [2])
assert len(trace_0.subtraces) == 3
assert len(trace_1.subtraces) == 1
assert len(trace_2.subtraces) == 0
[trace_0_0, trace_0_1, trace_0_2] = trace_0.subtraces
[trace_1_0] = trace_1.subtraces
assert_trace_address(trace_0_0, [0, 0])
assert_trace_address(trace_0_1, [0, 1])
assert_trace_address(trace_0_2, [0, 2])
assert_trace_address(trace_1_0, [1, 0])
assert len(trace_0_0.subtraces) == 0
assert len(trace_0_1.subtraces) == 1
assert len(trace_0_2.subtraces) == 0
assert len(trace_1_0.subtraces) == 0
[trace_0_1_0] = trace_0_1.subtraces
assert_trace_address(trace_0_1_0, [0, 1, 0])
assert len(trace_0_1_0.subtraces) == 0
# efg
assert efg_trace.trace.transaction_hash == "efg"
assert_trace_address(efg_trace, [])
assert len(efg_trace.subtraces) == 1
[efg_subtrace] = efg_trace.subtraces
assert_trace_address(efg_subtrace, [0])
assert len(efg_subtrace.subtraces) == 0
def build_trace_at_address(
transaction_hash: str,
trace_address: List[int],
) -> Trace:
return Trace(
# real values
transaction_hash=transaction_hash,
trace_address=trace_address,
# placeholders
action={},
block_hash="",
block_number=DEFAULT_BLOCK_NUMBER,
result=None,
subtraces=0,
transaction_position=None,
type=TraceType.call,
error=None,
)
def assert_trace_address(nested_trace: NestedTrace, trace_address: List[int]):
assert nested_trace.trace.trace_address == trace_address