diff --git a/mev_inspect/schemas/__init__.py b/mev_inspect/schemas/__init__.py index 442724e..b4319d2 100644 --- a/mev_inspect/schemas/__init__.py +++ b/mev_inspect/schemas/__init__.py @@ -1,2 +1,2 @@ from .abi import ABI -from .blocks import Block, Trace, TraceType +from .blocks import Block, NestedTrace, Trace, TraceType diff --git a/mev_inspect/traces.py b/mev_inspect/traces.py new file mode 100644 index 0000000..29c8acd --- /dev/null +++ b/mev_inspect/traces.py @@ -0,0 +1,63 @@ +from typing import Iterable, List + +from mev_inspect.schemas import Trace, NestedTrace + + +def as_nested_traces(traces: Iterable[Trace]) -> List[NestedTrace]: + """ + Turns a list of Traces into a a tree of NestedTraces + using their trace addresses + + Right now this has an exponential (?) runtime because we rescan + most traces at each level of tree depth + + TODO to write a better implementation if it becomes a bottleneck + Should be doable in linear time + """ + + nested_traces = [] + + parent = None + children: List[Trace] = [] + + sorted_traces = sorted(traces, key=lambda t: t.trace_address) + + for trace in sorted_traces: + if parent is None: + parent = trace + children = [] + continue + + elif not _is_subtrace(trace, parent): + nested_traces.append( + NestedTrace( + trace=parent, + subtraces=as_nested_traces(children), + ) + ) + + parent = trace + children = [] + + else: + children.append(trace) + + if parent is not None: + nested_traces.append( + NestedTrace( + trace=parent, + subtraces=as_nested_traces(children), + ) + ) + + return nested_traces + + +def _is_subtrace(trace: Trace, parent: Trace): + parent_trace_length = len(parent.trace_address) + + if len(trace.trace_address) > parent_trace_length: + prefix = trace.trace_address[:parent_trace_length] + return prefix == parent.trace_address + + return False diff --git a/tests/trace_test.py b/tests/trace_test.py new file mode 100644 index 0000000..24af48f --- /dev/null +++ b/tests/trace_test.py @@ -0,0 +1,83 @@ +import unittest +from typing import List + +from mev_inspect.schemas import Trace, TraceType, NestedTrace +from mev_inspect.traces import as_nested_traces + + +DEFAULT_BLOCK_NUMBER = 123 + + +class TestTraces(unittest.TestCase): + def test_nested_traces(self): + trace_addresses = [ + [0, 2], + [], + [2], + [0], + [0, 0], + [0, 1], + [1], + [1, 0], + [0, 1, 0], + ] + + traces = [build_trace_at_address(address) for address in trace_addresses] + + nested_traces = as_nested_traces(traces) + + assert len(nested_traces) == 1 + root_trace = nested_traces[0] + + assert_trace_address(root_trace, []) + assert len(root_trace.subtraces) == 3 + + [trace_0, trace_1, trace_2] = root_trace.subtraces + + assert_trace_address(trace_0, [0]) + assert_trace_address(trace_1, [1]) + assert_trace_address(trace_2, [2]) + + assert len(trace_0.subtraces) == 3 + assert len(trace_1.subtraces) == 1 + assert len(trace_2.subtraces) == 0 + + [trace_0_0, trace_0_1, trace_0_2] = trace_0.subtraces + [trace_1_0] = trace_1.subtraces + + assert_trace_address(trace_0_0, [0, 0]) + assert_trace_address(trace_0_1, [0, 1]) + assert_trace_address(trace_0_2, [0, 2]) + assert_trace_address(trace_1_0, [1, 0]) + + assert len(trace_0_0.subtraces) == 0 + assert len(trace_0_1.subtraces) == 1 + assert len(trace_0_2.subtraces) == 0 + assert len(trace_1_0.subtraces) == 0 + + [trace_0_1_0] = trace_0_1.subtraces + assert_trace_address(trace_0_1_0, [0, 1, 0]) + assert len(trace_0_1_0.subtraces) == 0 + + +def build_trace_at_address( + trace_address: List[int], +) -> Trace: + return Trace( + # real values + trace_address=trace_address, + # placeholders + transaction_hash="", + action={}, + block_hash="", + block_number=DEFAULT_BLOCK_NUMBER, + result=None, + subtraces=0, + transaction_position=None, + type=TraceType.call, + error=None, + ) + + +def assert_trace_address(nested_trace: NestedTrace, trace_address: List[int]): + assert nested_trace.trace.trace_address == trace_address