226 lines
7.6 KiB
Python

from typing import Dict, List, Optional, Sequence
from mev_inspect.classifiers.specs import get_classifier
from mev_inspect.schemas.classifiers import TransferClassifier
from mev_inspect.schemas.prices import ETH_TOKEN_ADDRESS
from mev_inspect.schemas.traces import Classification, ClassifiedTrace, DecodedCallTrace
from mev_inspect.schemas.transfers import Transfer
from mev_inspect.traces import get_child_traces, is_child_trace_address
def get_transfers(traces: List[ClassifiedTrace]) -> List[Transfer]:
transfers = []
for trace in traces:
transfer = get_transfer(trace)
if transfer is not None:
transfers.append(transfer)
return transfers
def get_eth_transfers(traces: List[ClassifiedTrace]) -> List[Transfer]:
transfers = get_transfers(traces)
return [
transfer
for transfer in transfers
if transfer.token_address == ETH_TOKEN_ADDRESS
]
def get_transfer(trace: ClassifiedTrace) -> Optional[Transfer]:
if _is_simple_eth_transfer(trace):
return build_eth_transfer(trace)
if isinstance(trace, DecodedCallTrace):
return _build_erc20_transfer(trace)
return None
def _is_simple_eth_transfer(trace: ClassifiedTrace) -> bool:
return (
trace.value is not None
and trace.value > 0
and "input" in trace.action
and trace.action["input"] == "0x"
)
def build_eth_transfer(trace: ClassifiedTrace) -> Transfer:
return Transfer(
block_number=trace.block_number,
transaction_hash=trace.transaction_hash,
trace_address=trace.trace_address,
amount=trace.value,
to_address=trace.to_address,
from_address=trace.from_address,
token_address=ETH_TOKEN_ADDRESS,
)
def _build_erc20_transfer(trace: DecodedCallTrace) -> Optional[Transfer]:
classifier = get_classifier(trace)
if classifier is not None and issubclass(classifier, TransferClassifier):
return classifier.get_transfer(trace)
return None
def get_child_transfers(
transaction_hash: str,
parent_trace_address: List[int],
traces: List[ClassifiedTrace],
) -> List[Transfer]:
child_transfers = []
for child_trace in get_child_traces(transaction_hash, parent_trace_address, traces):
transfer = get_transfer(child_trace)
if transfer is not None:
child_transfers.append(transfer)
return child_transfers
def filter_transfers(
transfers: Sequence[Transfer],
to_address: Optional[str] = None,
from_address: Optional[str] = None,
) -> List[Transfer]:
filtered_transfers = []
for transfer in transfers:
if to_address is not None and transfer.to_address != to_address:
continue
if from_address is not None and transfer.from_address != from_address:
continue
filtered_transfers.append(transfer)
return filtered_transfers
def remove_child_transfers_of_transfers(
transfers: List[Transfer],
) -> List[Transfer]:
updated_transfers = []
transfer_addresses_by_transaction: Dict[str, List[List[int]]] = {}
sorted_transfers = sorted(transfers, key=lambda t: t.trace_address)
for transfer in sorted_transfers:
existing_addresses = transfer_addresses_by_transaction.get(
transfer.transaction_hash, []
)
if not any(
is_child_trace_address(transfer.trace_address, parent_address)
for parent_address in existing_addresses
):
updated_transfers.append(transfer)
transfer_addresses_by_transaction[
transfer.transaction_hash
] = existing_addresses + [transfer.trace_address]
return updated_transfers
def get_net_transfers(
classified_traces: List[ClassifiedTrace],
) -> List[Transfer]:
"""
Returns the net transfers per transaction from a list of Classified Traces.
If A transfers 200WETH to B ,and later in the transaction, B transfers 50WETH to A,
the following transfer would be returned (from_address=A, to_address=B, amount=150)
If B transferred 300WETH to A, the following would be returned
(from_address=contract, to_address=bot, amount=100)
If B transferred 200WETH to A, no transfer would be returned
@param classified_traces:
@return: List of Transfer objects representing the net movement of tokens
"""
found_transfers: List[list] = []
return_transfers: List[Transfer] = []
for trace in classified_traces:
if not isinstance(trace, DecodedCallTrace):
continue
if trace.classification == Classification.transfer:
if trace.from_address in [
t.token_address for t in return_transfers
]: # Proxy Case
continue
if trace.function_signature == "transfer(address,uint256)":
net_search_info = {
"to_address": trace.inputs["recipient"],
"token_address": trace.to_address,
"from_address": trace.from_address,
}
elif trace.function_signature == "transferFrom(address,address,uint256)":
net_search_info = {
"to_address": trace.inputs["recipient"],
"token_address": trace.to_address,
"from_address": trace.inputs["sender"],
}
else:
continue
if sorted(list(net_search_info.values())) in found_transfers:
for index, transfer in enumerate(return_transfers):
if (
transfer.token_address != net_search_info["token_address"]
or transfer.transaction_hash != trace.transaction_hash
):
continue
if (
transfer.from_address == net_search_info["from_address"]
and transfer.to_address == net_search_info["to_address"]
):
return_transfers[index].amount += trace.inputs["amount"]
return_transfers[index].trace_address = [-1]
if (
transfer.from_address == net_search_info["to_address"]
and transfer.to_address == net_search_info["from_address"]
):
return_transfers[index].amount -= trace.inputs["amount"]
return_transfers[index].trace_address = [-1]
else:
return_transfers.append(
Transfer(
block_number=trace.block_number,
transaction_hash=trace.transaction_hash,
trace_address=trace.trace_address,
from_address=net_search_info["from_address"],
to_address=net_search_info["to_address"],
amount=trace.inputs["amount"],
token_address=net_search_info["token_address"],
)
)
found_transfers.append(sorted(list(net_search_info.values())))
process_index = -1
while True:
process_index += 1
try:
transfer = return_transfers[process_index]
except IndexError:
break
if transfer.amount < 0:
return_transfers[process_index].from_address = transfer.to_address
return_transfers[process_index].to_address = transfer.from_address
return_transfers[process_index].amount = transfer.amount * -1
if transfer.amount == 0:
return_transfers.pop(process_index)
process_index -= 1
return return_transfers