Add implementation for building NestedTraces from Traces + tests

This commit is contained in:
Luke Van Seters 2021-07-20 18:25:21 -04:00
parent 06fce79512
commit 311f265d1b
3 changed files with 147 additions and 1 deletions

View File

@ -1,2 +1,2 @@
from .abi import ABI
from .blocks import Block, Trace, TraceType
from .blocks import Block, NestedTrace, Trace, TraceType

63
mev_inspect/traces.py Normal file
View File

@ -0,0 +1,63 @@
from typing import Iterable, List
from mev_inspect.schemas import Trace, NestedTrace
def as_nested_traces(traces: Iterable[Trace]) -> List[NestedTrace]:
"""
Turns a list of Traces into a a tree of NestedTraces
using their trace addresses
Right now this has an exponential (?) runtime because we rescan
most traces at each level of tree depth
TODO to write a better implementation if it becomes a bottleneck
Should be doable in linear time
"""
nested_traces = []
parent = None
children: List[Trace] = []
sorted_traces = sorted(traces, key=lambda t: t.trace_address)
for trace in sorted_traces:
if parent is None:
parent = trace
children = []
continue
elif not _is_subtrace(trace, parent):
nested_traces.append(
NestedTrace(
trace=parent,
subtraces=as_nested_traces(children),
)
)
parent = trace
children = []
else:
children.append(trace)
if parent is not None:
nested_traces.append(
NestedTrace(
trace=parent,
subtraces=as_nested_traces(children),
)
)
return nested_traces
def _is_subtrace(trace: Trace, parent: Trace):
parent_trace_length = len(parent.trace_address)
if len(trace.trace_address) > parent_trace_length:
prefix = trace.trace_address[:parent_trace_length]
return prefix == parent.trace_address
return False

83
tests/trace_test.py Normal file
View File

@ -0,0 +1,83 @@
import unittest
from typing import List
from mev_inspect.schemas import Trace, TraceType, NestedTrace
from mev_inspect.traces import as_nested_traces
DEFAULT_BLOCK_NUMBER = 123
class TestTraces(unittest.TestCase):
def test_nested_traces(self):
trace_addresses = [
[0, 2],
[],
[2],
[0],
[0, 0],
[0, 1],
[1],
[1, 0],
[0, 1, 0],
]
traces = [build_trace_at_address(address) for address in trace_addresses]
nested_traces = as_nested_traces(traces)
assert len(nested_traces) == 1
root_trace = nested_traces[0]
assert_trace_address(root_trace, [])
assert len(root_trace.subtraces) == 3
[trace_0, trace_1, trace_2] = root_trace.subtraces
assert_trace_address(trace_0, [0])
assert_trace_address(trace_1, [1])
assert_trace_address(trace_2, [2])
assert len(trace_0.subtraces) == 3
assert len(trace_1.subtraces) == 1
assert len(trace_2.subtraces) == 0
[trace_0_0, trace_0_1, trace_0_2] = trace_0.subtraces
[trace_1_0] = trace_1.subtraces
assert_trace_address(trace_0_0, [0, 0])
assert_trace_address(trace_0_1, [0, 1])
assert_trace_address(trace_0_2, [0, 2])
assert_trace_address(trace_1_0, [1, 0])
assert len(trace_0_0.subtraces) == 0
assert len(trace_0_1.subtraces) == 1
assert len(trace_0_2.subtraces) == 0
assert len(trace_1_0.subtraces) == 0
[trace_0_1_0] = trace_0_1.subtraces
assert_trace_address(trace_0_1_0, [0, 1, 0])
assert len(trace_0_1_0.subtraces) == 0
def build_trace_at_address(
trace_address: List[int],
) -> Trace:
return Trace(
# real values
trace_address=trace_address,
# placeholders
transaction_hash="",
action={},
block_hash="",
block_number=DEFAULT_BLOCK_NUMBER,
result=None,
subtraces=0,
transaction_position=None,
type=TraceType.call,
error=None,
)
def assert_trace_address(nested_trace: NestedTrace, trace_address: List[int]):
assert nested_trace.trace.trace_address == trace_address