mev-inspect-py/mev_inspect/jit_liquidity.py
2022-05-10 14:43:07 -07:00

277 lines
9.4 KiB
Python

from typing import List, Tuple
from pydantic import BaseModel
from mev_inspect.schemas.jit_liquidity import JITLiquidity
from mev_inspect.schemas.swaps import Swap
from mev_inspect.schemas.traces import (
Classification,
ClassifiedTrace,
DecodedCallTrace,
Protocol,
)
from mev_inspect.schemas.transfers import Transfer
from mev_inspect.traces import get_traces_by_transaction_hash, is_child_trace_address
from mev_inspect.transfers import get_net_transfers
LIQUIDITY_MINT_ROUTERS = [
"0xC36442b4a4522E871399CD717aBDD847Ab11FE88".lower(), # Uniswap V3 NFT Position Manager
]
ZERO_ADDRESS = "0x0000000000000000000000000000000000000000"
class JITTransferInfo(BaseModel):
token0_address: str
token1_address: str
mint_token0: int
mint_token1: int
burn_token0: int
burn_token1: int
error: bool
def get_jit_liquidity(
classified_traces: List[ClassifiedTrace], swaps: List[Swap]
) -> List[JITLiquidity]:
jit_liquidity_instances: List[JITLiquidity] = []
for index, trace in enumerate(classified_traces):
if not isinstance(trace, DecodedCallTrace):
continue
if (
trace.classification == Classification.liquidity_mint
and trace.protocol == Protocol.uniswap_v3
):
for search_trace in classified_traces[index:]:
if (
search_trace.classification == Classification.liquidity_burn
and search_trace.to_address == trace.to_address
and search_trace.transaction_hash != trace.transaction_hash
):
if (search_trace.error is not None) or (trace.error is not None):
continue
bot_address = _get_bot_address(trace, classified_traces)
transfer_info: JITTransferInfo = _get_transfer_info(
classified_traces,
trace,
search_trace,
)
jit_swaps, token0_volume, token1_volume = _get_swap_info(
swaps, trace, search_trace, transfer_info.token0_address
)
# -- Error Checking Section --
if transfer_info.error or len(jit_swaps) == 0:
continue
jit_liquidity_instances.append(
JITLiquidity(
block_number=trace.block_number,
bot_address=bot_address,
pool_address=trace.to_address,
mint_transaction_hash=trace.transaction_hash,
mint_trace=trace.trace_address,
burn_transaction_hash=search_trace.transaction_hash,
burn_trace=search_trace.trace_address,
swaps=jit_swaps,
token0_address=transfer_info.token0_address,
token1_address=transfer_info.token1_address,
mint_token0_amount=transfer_info.mint_token0,
mint_token1_amount=transfer_info.mint_token1,
burn_token0_amount=transfer_info.burn_token0,
burn_token1_amount=transfer_info.burn_token1,
token0_swap_volume=token0_volume,
token1_swap_volume=token1_volume,
)
)
return jit_liquidity_instances
def _get_token_order(token_a: str, token_b: str) -> Tuple[str, str]:
token_order = True if int(token_a, 16) < int(token_b, 16) else False
return (token_a, token_b) if token_order else (token_b, token_a)
def _get_swap_info(
swaps: List[Swap],
mint_trace: ClassifiedTrace,
burn_trace: ClassifiedTrace,
token0_address: str,
) -> Tuple[List[Swap], int, int]:
jit_swaps: List[Swap] = []
token0_swap_volume, token1_swap_volume = 0, 0
ordered_swaps = sorted(
swaps, key=lambda s: (s.transaction_position, s.trace_address)
)
for swap in ordered_swaps:
if swap.transaction_position <= mint_trace.transaction_position:
continue
if swap.transaction_position >= burn_trace.transaction_position:
break
if swap.contract_address == mint_trace.to_address:
jit_swaps.append(swap)
token0_swap_volume += (
swap.token_in_amount if swap.token_in_address == token0_address else 0
)
token1_swap_volume += (
0 if swap.token_in_address == token0_address else swap.token_in_amount
)
return jit_swaps, token0_swap_volume, token1_swap_volume
def _get_transfer_info(
classified_traces: List[ClassifiedTrace],
mint_trace: ClassifiedTrace,
burn_trace: ClassifiedTrace,
) -> JITTransferInfo:
grouped_traces = get_traces_by_transaction_hash(classified_traces)
mint_net_transfers, burn_net_transfers = [], []
pool_address = mint_trace.to_address
for transfer in get_net_transfers(grouped_traces[mint_trace.transaction_hash]):
if transfer.to_address == pool_address:
mint_net_transfers.append(transfer)
for transfer in get_net_transfers(grouped_traces[burn_trace.transaction_hash]):
if transfer.from_address == pool_address:
burn_net_transfers.append(transfer)
mint_len, burn_len = len(mint_net_transfers), len(burn_net_transfers)
if mint_len == 2 and burn_len == 2:
return _parse_standard_liquidity(mint_net_transfers, burn_net_transfers)
elif (mint_len == 2 and burn_len == 1) or (mint_len == 1 and burn_len == 2):
return _parse_liquidity_limit_order(mint_net_transfers, burn_net_transfers)
else:
return JITTransferInfo(
token0_address=ZERO_ADDRESS,
token1_address=ZERO_ADDRESS,
mint_token0=0,
mint_token1=0,
burn_token0=0,
burn_token1=0,
error=True,
)
def _get_bot_address(
mint_trace: ClassifiedTrace,
classified_traces: List[ClassifiedTrace],
) -> str:
if "from_address" in mint_trace.dict().keys():
if mint_trace.from_address in LIQUIDITY_MINT_ROUTERS:
bot_trace = list(
filter(
lambda t: t.to_address == mint_trace.from_address
and t.transaction_hash == mint_trace.transaction_hash,
classified_traces,
)
)
if len(bot_trace) == 1 or is_child_trace_address(
bot_trace[1].trace_address, bot_trace[0].trace_address
):
return _get_bot_address(bot_trace[0], classified_traces)
else:
return ZERO_ADDRESS
elif type(mint_trace.from_address) == str:
return mint_trace.from_address
return ZERO_ADDRESS
def _parse_standard_liquidity(
mint_net_transfers: List[Transfer],
burn_net_transfers: List[Transfer],
) -> JITTransferInfo:
token0_address, token1_address = _get_token_order(
mint_net_transfers[0].token_address, mint_net_transfers[1].token_address
)
mint_token0, mint_token1 = _parse_token_amounts(token0_address, mint_net_transfers)
burn_token0, burn_token1 = _parse_token_amounts(token0_address, burn_net_transfers)
return JITTransferInfo(
token0_address=token0_address,
token1_address=token1_address,
mint_token0=mint_token0,
mint_token1=mint_token1,
burn_token0=burn_token0,
burn_token1=burn_token1,
error=False,
)
def _parse_liquidity_limit_order(
mint_net_transfers: List[Transfer],
burn_net_transfers: List[Transfer],
) -> JITTransferInfo:
try:
token0_address, token1_address = _get_token_order(
burn_net_transfers[0].token_address, burn_net_transfers[1].token_address
)
except IndexError:
token0_address, token1_address = _get_token_order(
mint_net_transfers[0].token_address, mint_net_transfers[1].token_address
)
if len(mint_net_transfers) < 2:
if token0_address == mint_net_transfers[0].token_address:
mint_token0 = mint_net_transfers[0].amount
mint_token1 = 0
else:
mint_token0 = 0
mint_token1 = mint_net_transfers[0].amount
burn_token0, burn_token1 = _parse_token_amounts(
token0_address, burn_net_transfers
)
else:
if token0_address == burn_net_transfers[0].token_address:
burn_token0 = burn_net_transfers[0].amount
burn_token1 = 0
else:
burn_token0 = 0
burn_token1 = burn_net_transfers[0].amount
mint_token0, mint_token1 = _parse_token_amounts(
token0_address, mint_net_transfers
)
return JITTransferInfo(
token0_address=token0_address,
token1_address=token1_address,
mint_token0=mint_token0,
mint_token1=mint_token1,
burn_token0=burn_token0,
burn_token1=burn_token1,
error=False,
)
def _parse_token_amounts(
token0_address: str, net_transfers: List[Transfer]
) -> Tuple[int, int]:
if token0_address == net_transfers[0].token_address:
token0_amount = net_transfers[0].amount
token1_amount = net_transfers[1].amount
else:
token0_amount = net_transfers[1].amount
token1_amount = net_transfers[0].amount
return token0_amount, token1_amount