mev-inspect-py/liquidation_inspector.py
Guilherme Peyrelongue Heise 790fbee002 Added data types
2021-08-03 16:37:35 -04:00

67 lines
1.7 KiB
Python

import json
from typing import Optional
from web3 import Web3
from typing import List, Optional
from mev_inspect import utils
from mev_inspect.config import load_config
from mev_inspect.schemas.blocks import NestedTrace, TraceType
from mev_inspect.schemas.classifications import Classification
from mev_inspect.schemas.strategy import Strategy, Liquidation
from .base import StrategyInspector
class LiquidationInspector(StrategyInspector):
def __init__(self):
self.result = []
# Inspect list of classified traces and identify liquidation
def inspect(self, traces):
event = []
# For each trace
for i in range(1, len(traces)):
trace = traces[i]
next = traces[i+1]
# Liquidation condition
if trace.classification == Classification.liquidate:
# Collateral data from the liquidation.
# The inputs will differ by DEX, this is AAVE
for i in trace.inputs:
if(i["name"] == '_purchaseAmount'):
liquidation_amount = trace.inputs[i]
elif (i["name"] == '_collateral'):
collateral_type = trace.inputs[i]
elif (i["name"] == '_reserve'):
collateral_source = trace.inputs[i]
# Define the address of the liquidator
liquidator = trace.from_address
# Find a transfer before liquidation with a to_address corresponding to the liquidator
for tx in traces:
if (tx.classification == Classification.transfer & tx.to_address == liquidator):
#Calculate profit
amount_sent = tx.value
amount_received = next.value
profit = amount_received - amount_sent
# Add gas used*price and coinbase check
# Tag liquidation
event.append(Strategy(strategy=StategyType.liquidation,
traces=[trace],
protocols=[trace.protocol]))
return result