81 lines
2.0 KiB
Python
81 lines
2.0 KiB
Python
from itertools import groupby
|
|
from typing import Iterable, List
|
|
|
|
from mev_inspect.schemas import Trace, NestedTrace
|
|
|
|
|
|
def as_nested_traces(traces: Iterable[Trace]) -> List[NestedTrace]:
|
|
nested_traces = []
|
|
|
|
sorted_by_transaction_hash = sorted(traces, key=_get_transaction_hash)
|
|
for _, transaction_traces in groupby(
|
|
sorted_by_transaction_hash, _get_transaction_hash
|
|
):
|
|
nested_traces += _as_nested_traces_by_transaction(transaction_traces)
|
|
|
|
return nested_traces
|
|
|
|
|
|
def _get_transaction_hash(trace) -> str:
|
|
return trace.transaction_hash
|
|
|
|
|
|
def _as_nested_traces_by_transaction(traces: Iterable[Trace]) -> List[NestedTrace]:
|
|
"""
|
|
Turns a list of Traces into a a tree of NestedTraces
|
|
using their trace addresses
|
|
|
|
Right now this has an exponential (?) runtime because we rescan
|
|
most traces at each level of tree depth
|
|
|
|
TODO to write a better implementation if it becomes a bottleneck
|
|
Should be doable in linear time
|
|
"""
|
|
|
|
nested_traces = []
|
|
|
|
parent = None
|
|
children: List[Trace] = []
|
|
|
|
sorted_traces = sorted(traces, key=lambda t: t.trace_address)
|
|
|
|
for trace in sorted_traces:
|
|
if parent is None:
|
|
parent = trace
|
|
children = []
|
|
continue
|
|
|
|
elif not _is_subtrace(trace, parent):
|
|
nested_traces.append(
|
|
NestedTrace(
|
|
trace=parent,
|
|
subtraces=as_nested_traces(children),
|
|
)
|
|
)
|
|
|
|
parent = trace
|
|
children = []
|
|
|
|
else:
|
|
children.append(trace)
|
|
|
|
if parent is not None:
|
|
nested_traces.append(
|
|
NestedTrace(
|
|
trace=parent,
|
|
subtraces=as_nested_traces(children),
|
|
)
|
|
)
|
|
|
|
return nested_traces
|
|
|
|
|
|
def _is_subtrace(trace: Trace, parent: Trace):
|
|
parent_trace_length = len(parent.trace_address)
|
|
|
|
if len(trace.trace_address) > parent_trace_length:
|
|
prefix = trace.trace_address[:parent_trace_length]
|
|
return prefix == parent.trace_address
|
|
|
|
return False
|