Break out transfer and trace functions

This commit is contained in:
Luke Van Seters 2021-08-05 11:50:18 -04:00
parent 5f4c018066
commit c73c4af901
3 changed files with 97 additions and 103 deletions

View File

@ -1,4 +1,5 @@
from typing import Dict, List, Optional
from itertools import groupby
from typing import List, Optional
from mev_inspect.schemas.arbitrage import Arbitrage
from mev_inspect.schemas.classified_traces import (
@ -7,7 +8,11 @@ from mev_inspect.schemas.classified_traces import (
)
from mev_inspect.schemas.swaps import Swap
from mev_inspect.schemas.transfers import Transfer
from mev_inspect.traces import is_child_trace_address
from mev_inspect.transfers import (
get_child_transfers,
filter_transfers,
remove_child_transfers,
)
UNISWAP_V2_PAIR_ABI_NAME = "UniswapV2Pair"
@ -15,30 +20,22 @@ UNISWAP_V3_POOL_ABI_NAME = "UniswapV3Pool"
def get_arbitrages(traces: List[ClassifiedTrace]) -> List[Arbitrage]:
all_arbitrages = []
traces_by_transaction = _group_traces_by_transaction(traces)
get_transaction_hash = lambda t: t.transaction_hash
traces_by_transaction = groupby(
sorted(traces, key=get_transaction_hash),
key=get_transaction_hash,
)
for transaction_traces in traces_by_transaction.values():
all_arbitrages = []
for _, transaction_traces in traces_by_transaction:
all_arbitrages += _get_arbitrages_for_transaction(
transaction_traces,
list(transaction_traces),
)
return all_arbitrages
def _group_traces_by_transaction(
traces: List[ClassifiedTrace],
) -> Dict[str, List[ClassifiedTrace]]:
grouped_traces: Dict[str, List[ClassifiedTrace]] = {}
for trace in traces:
hash = trace.transaction_hash
existing_traces = grouped_traces.get(hash, [])
grouped_traces[hash] = existing_traces + [trace]
return grouped_traces
def _get_arbitrages_for_transaction(
traces: List[ClassifiedTrace],
) -> List[Arbitrage]:
@ -130,11 +127,11 @@ def _get_swaps(traces: List[ClassifiedTrace]) -> List[Swap]:
prior_transfers.append(Transfer.from_trace(trace))
elif trace.classification == Classification.swap:
child_transfers = _get_child_transfers(trace.trace_address, traces)
child_transfers = get_child_transfers(trace.trace_address, traces)
swap = _build_swap(
trace,
_remove_inner_transfers(prior_transfers),
_remove_inner_transfers(child_transfers),
remove_child_transfers(prior_transfers),
remove_child_transfers(child_transfers),
)
if swap is not None:
@ -167,8 +164,8 @@ def _parse_uniswap_v3_swap(
else trace.from_address
)
transfers_to_pool = _filter_transfers(child_transfers, to_address=pool_address)
transfers_from_pool_to_recipient = _filter_transfers(
transfers_to_pool = filter_transfers(child_transfers, to_address=pool_address)
transfers_from_pool_to_recipient = filter_transfers(
child_transfers, to_address=recipient_address, from_address=pool_address
)
@ -208,8 +205,8 @@ def _parse_uniswap_v2_swap(
else trace.from_address
)
transfers_to_pool = _filter_transfers(prior_transfers, to_address=pool_address)
transfers_from_pool_to_recipient = _filter_transfers(
transfers_to_pool = filter_transfers(prior_transfers, to_address=pool_address)
transfers_from_pool_to_recipient = filter_transfers(
child_transfers, to_address=recipient_address, from_address=pool_address
)
@ -234,80 +231,3 @@ def _parse_uniswap_v2_swap(
token_out_address=transfer_out.token_address,
token_out_amount=transfer_out.amount,
)
def _get_child_transfers(
parent_trace_address: List[int],
traces: List[ClassifiedTrace],
) -> List[Transfer]:
child_transfers = []
for child_trace in _get_child_traces(parent_trace_address, traces):
if child_trace.classification == Classification.transfer:
child_transfers.append(Transfer.from_trace(child_trace))
return child_transfers
def _get_child_traces(
parent_trace_address: List[int],
traces: List[ClassifiedTrace],
) -> List[ClassifiedTrace]:
ordered_traces = sorted(traces, key=lambda t: t.trace_address)
child_traces = []
for trace in ordered_traces:
if _is_child_trace_address(
parent_trace_address,
trace.trace_address,
):
child_traces.append(trace)
return child_traces
def _is_child_trace_address(
parent_trace_address: List[int],
maybe_child_trace_address: List[int],
) -> bool:
parent_trace_length = len(parent_trace_address)
return (len(maybe_child_trace_address) > parent_trace_length) and (
maybe_child_trace_address[:parent_trace_length] == parent_trace_address
)
def _remove_inner_transfers(transfers: List[Transfer]) -> List[Transfer]:
updated_transfers = []
transfer_trace_addresses: List[List[int]] = []
sorted_transfers = sorted(transfers, key=lambda t: t.trace_address)
for transfer in sorted_transfers:
if not any(
is_child_trace_address(transfer.trace_address, parent_address)
for parent_address in transfer_trace_addresses
):
updated_transfers.append(transfer)
transfer_trace_addresses.append(transfer.trace_address)
return updated_transfers
def _filter_transfers(
transfers: List[Transfer],
to_address: Optional[str] = None,
from_address: Optional[str] = None,
) -> List[Transfer]:
filtered_transfers = []
for transfer in transfers:
if to_address is not None and transfer.to_address != to_address:
continue
if from_address is not None and transfer.from_address != from_address:
continue
filtered_transfers.append(transfer)
return filtered_transfers

View File

@ -1,5 +1,7 @@
from typing import List
from mev_inspect.schemas.classified_traces import ClassifiedTrace
def is_child_trace_address(
child_trace_address: List[int],
@ -11,3 +13,20 @@ def is_child_trace_address(
len(child_trace_address) > parent_trace_length
and child_trace_address[:parent_trace_length] == parent_trace_address
)
def get_child_traces(
parent_trace_address: List[int],
traces: List[ClassifiedTrace],
) -> List[ClassifiedTrace]:
ordered_traces = sorted(traces, key=lambda t: t.trace_address)
child_traces = []
for trace in ordered_traces:
if is_child_trace_address(
trace.trace_address,
parent_trace_address,
):
child_traces.append(trace)
return child_traces

55
mev_inspect/transfers.py Normal file
View File

@ -0,0 +1,55 @@
from typing import List, Optional
from mev_inspect.schemas.classified_traces import Classification, ClassifiedTrace
from mev_inspect.schemas.transfers import Transfer
from mev_inspect.traces import is_child_trace_address, get_child_traces
def get_child_transfers(
parent_trace_address: List[int],
traces: List[ClassifiedTrace],
) -> List[Transfer]:
child_transfers = []
for child_trace in get_child_traces(parent_trace_address, traces):
if child_trace.classification == Classification.transfer:
child_transfers.append(Transfer.from_trace(child_trace))
return child_transfers
def filter_transfers(
transfers: List[Transfer],
to_address: Optional[str] = None,
from_address: Optional[str] = None,
) -> List[Transfer]:
filtered_transfers = []
for transfer in transfers:
if to_address is not None and transfer.to_address != to_address:
continue
if from_address is not None and transfer.from_address != from_address:
continue
filtered_transfers.append(transfer)
return filtered_transfers
def remove_child_transfers(transfers: List[Transfer]) -> List[Transfer]:
updated_transfers = []
transfer_trace_addresses: List[List[int]] = []
sorted_transfers = sorted(transfers, key=lambda t: t.trace_address)
for transfer in sorted_transfers:
if not any(
is_child_trace_address(transfer.trace_address, parent_address)
for parent_address in transfer_trace_addresses
):
updated_transfers.append(transfer)
transfer_trace_addresses.append(transfer.trace_address)
return updated_transfers