Merge pull request #46 from flashbots/transfers-and-swaps

Find arbitrages in a block
This commit is contained in:
Luke Van Seters 2021-08-13 13:44:12 -04:00 committed by GitHub
commit 1ee3203231
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 1028 additions and 199 deletions

3
.gitignore vendored
View File

@ -13,3 +13,6 @@ __pycache__
# coverage # coverage
htmlcov htmlcov
.coverage* .coverage*
# don't commit cache
cache

View File

@ -0,0 +1 @@
[{"constant":true,"inputs":[],"name":"name","outputs":[{"name":"","type":"string"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[{"name":"guy","type":"address"},{"name":"wad","type":"uint256"}],"name":"approve","outputs":[{"name":"","type":"bool"}],"payable":false,"stateMutability":"nonpayable","type":"function"},{"constant":true,"inputs":[],"name":"totalSupply","outputs":[{"name":"","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[{"name":"src","type":"address"},{"name":"dst","type":"address"},{"name":"wad","type":"uint256"}],"name":"transferFrom","outputs":[{"name":"","type":"bool"}],"payable":false,"stateMutability":"nonpayable","type":"function"},{"constant":false,"inputs":[{"name":"wad","type":"uint256"}],"name":"withdraw","outputs":[],"payable":false,"stateMutability":"nonpayable","type":"function"},{"constant":true,"inputs":[],"name":"decimals","outputs":[{"name":"","type":"uint8"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[{"name":"","type":"address"}],"name":"balanceOf","outputs":[{"name":"","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[],"name":"symbol","outputs":[{"name":"","type":"string"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[{"name":"dst","type":"address"},{"name":"wad","type":"uint256"}],"name":"transfer","outputs":[{"name":"","type":"bool"}],"payable":false,"stateMutability":"nonpayable","type":"function"},{"constant":false,"inputs":[],"name":"deposit","outputs":[],"payable":true,"stateMutability":"payable","type":"function"},{"constant":true,"inputs":[{"name":"","type":"address"},{"name":"","type":"address"}],"name":"allowance","outputs":[{"name":"","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"},{"payable":true,"stateMutability":"payable","type":"fallback"},{"anonymous":false,"inputs":[{"indexed":true,"name":"src","type":"address"},{"indexed":true,"name":"guy","type":"address"},{"indexed":false,"name":"wad","type":"uint256"}],"name":"Approval","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"name":"src","type":"address"},{"indexed":true,"name":"dst","type":"address"},{"indexed":false,"name":"wad","type":"uint256"}],"name":"Transfer","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"name":"dst","type":"address"},{"indexed":false,"name":"wad","type":"uint256"}],"name":"Deposit","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"name":"src","type":"address"},{"indexed":false,"name":"wad","type":"uint256"}],"name":"Withdrawal","type":"event"}]

90
mev_inspect/arbitrage.py Normal file
View File

@ -0,0 +1,90 @@
from itertools import groupby
from typing import List, Optional
from mev_inspect.schemas.arbitrage import Arbitrage
from mev_inspect.schemas.swaps import Swap
def get_arbitrages(swaps: List[Swap]) -> List[Arbitrage]:
get_transaction_hash = lambda swap: swap.transaction_hash
swaps_by_transaction = groupby(
sorted(swaps, key=get_transaction_hash),
key=get_transaction_hash,
)
all_arbitrages = []
for _, transaction_swaps in swaps_by_transaction:
all_arbitrages += _get_arbitrages_from_swaps(
list(transaction_swaps),
)
return all_arbitrages
def _get_arbitrages_from_swaps(swaps: List[Swap]) -> List[Arbitrage]:
pool_addresses = {swap.pool_address for swap in swaps}
all_arbitrages = []
for index, first_swap in enumerate(swaps):
other_swaps = swaps[:index] + swaps[index + 1 :]
if first_swap.from_address not in pool_addresses:
arbitrage = _get_arbitrage_starting_with_swap(first_swap, other_swaps)
if arbitrage is not None:
all_arbitrages.append(arbitrage)
return all_arbitrages
def _get_arbitrage_starting_with_swap(
start_swap: Swap,
other_swaps: List[Swap],
) -> Optional[Arbitrage]:
swap_path = [start_swap]
current_swap: Swap = start_swap
while True:
next_swap = _get_swap_from_address(
current_swap.to_address,
current_swap.token_out_address,
other_swaps,
)
if next_swap is None:
return None
swap_path.append(next_swap)
current_swap = next_swap
if (
current_swap.to_address == start_swap.from_address
and current_swap.token_out_address == start_swap.token_in_address
):
start_amount = start_swap.token_in_amount
end_amount = current_swap.token_out_amount
profit_amount = end_amount - start_amount
return Arbitrage(
swaps=swap_path,
account_address=start_swap.from_address,
profit_token_address=start_swap.token_in_address,
start_amount=start_amount,
end_amount=end_amount,
profit_amount=profit_amount,
)
return None
def _get_swap_from_address(
address: str, token_address: str, swaps: List[Swap]
) -> Optional[Swap]:
for swap in swaps:
if swap.pool_address == address and swap.token_in_address == token_address:
return swap
return None

View File

@ -101,16 +101,27 @@ ERC20_SPEC = ClassifierSpec(
AAVE_SPEC = ClassifierSpec( AAVE_SPEC = ClassifierSpec(
abi_name="AaveLendingPool", abi_name="AaveLendingPool",
protocol= Protocol.aave, protocol=Protocol.aave,
classifications={ classifications={
"liquidationCall(address,address,address,uint256,bool)": Classification.liquidate, "liquidationCall(address,address,address,uint256,bool)": Classification.liquidate,
}, },
) )
WETH_SPEC = ClassifierSpec(
abi_name="WETH9",
protocol=Protocol.weth,
valid_contract_addresses=["0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"],
classifications={
"transferFrom(address,address,uint256)": Classification.transfer,
"transfer(address,uint256)": Classification.transfer,
},
)
CLASSIFIER_SPECS = [ CLASSIFIER_SPECS = [
*UNISWAP_V3_CONTRACT_SPECS, *UNISWAP_V3_CONTRACT_SPECS,
*UNISWAPPY_V2_CONTRACT_SPECS, *UNISWAPPY_V2_CONTRACT_SPECS,
WETH_SPEC,
AAVE_SPEC, AAVE_SPEC,
ERC20_SPEC, ERC20_SPEC,
UNISWAP_V3_POOL_SPEC, UNISWAP_V3_POOL_SPEC,

View File

@ -1,2 +1,2 @@
from .abi import ABI from .abi import ABI
from .blocks import Block, NestedTrace, Trace, TraceType from .blocks import Block, Trace, TraceType

View File

@ -0,0 +1,14 @@
from typing import List
from pydantic import BaseModel
from .swaps import Swap
class Arbitrage(BaseModel):
swaps: List[Swap]
account_address: str
profit_token_address: str
start_amount: int
end_amount: int
profit_amount: int

View File

@ -1,7 +1,7 @@
from enum import Enum from enum import Enum
from typing import Dict, List, Optional from typing import Dict, List, Optional
from pydantic import BaseModel, validator from pydantic import validator
from mev_inspect.utils import hex_to_int from mev_inspect.utils import hex_to_int
from .utils import CamelModel, Web3Model from .utils import CamelModel, Web3Model
@ -66,11 +66,3 @@ class Block(Web3Model):
def get_filtered_traces(self, hash: str) -> List[Trace]: def get_filtered_traces(self, hash: str) -> List[Trace]:
return [trace for trace in self.traces if trace.transaction_hash == hash] return [trace for trace in self.traces if trace.transaction_hash == hash]
class NestedTrace(BaseModel):
trace: Trace
subtraces: List["NestedTrace"]
NestedTrace.update_forward_refs()

View File

@ -19,6 +19,7 @@ class Protocol(Enum):
uniswap_v3 = "uniswap_v3" uniswap_v3 = "uniswap_v3"
sushiswap = "sushiswap" sushiswap = "sushiswap"
aave = "aave" aave = "aave"
weth = "weth"
class ClassifiedTrace(BaseModel): class ClassifiedTrace(BaseModel):

View File

@ -0,0 +1,20 @@
from typing import List, Optional
from pydantic import BaseModel
from mev_inspect.schemas.classified_traces import Protocol
class Swap(BaseModel):
abi_name: str
transaction_hash: str
block_number: int
trace_address: List[int]
protocol: Optional[Protocol]
pool_address: str
from_address: str
to_address: str
token_in_address: str
token_in_amount: int
token_out_address: str
token_out_amount: int

View File

@ -0,0 +1,38 @@
from typing import List
from pydantic import BaseModel
from .classified_traces import Classification, ClassifiedTrace, Protocol
class Transfer(BaseModel):
transaction_hash: str
trace_address: List[int]
from_address: str
to_address: str
amount: int
token_address: str
@classmethod
def from_trace(cls, trace: ClassifiedTrace) -> "Transfer":
if trace.classification != Classification.transfer or trace.inputs is None:
raise ValueError("Invalid transfer")
if trace.protocol == Protocol.weth:
return cls(
transaction_hash=trace.transaction_hash,
trace_address=trace.trace_address,
amount=trace.inputs["wad"],
to_address=trace.inputs["dst"],
from_address=trace.from_address,
token_address=trace.to_address,
)
else:
return cls(
transaction_hash=trace.transaction_hash,
trace_address=trace.trace_address,
amount=trace.inputs["amount"],
to_address=trace.inputs["recipient"],
from_address=trace.inputs.get("sender", trace.from_address),
token_address=trace.to_address,
)

123
mev_inspect/swaps.py Normal file
View File

@ -0,0 +1,123 @@
from itertools import groupby
from typing import List, Optional
from mev_inspect.schemas.classified_traces import (
ClassifiedTrace,
Classification,
)
from mev_inspect.schemas.swaps import Swap
from mev_inspect.schemas.transfers import Transfer
from mev_inspect.transfers import (
get_child_transfers,
filter_transfers,
remove_child_transfers_of_transfers,
)
UNISWAP_V2_PAIR_ABI_NAME = "UniswapV2Pair"
UNISWAP_V3_POOL_ABI_NAME = "UniswapV3Pool"
def get_swaps(traces: List[ClassifiedTrace]) -> List[Swap]:
get_transaction_hash = lambda t: t.transaction_hash
traces_by_transaction = groupby(
sorted(traces, key=get_transaction_hash),
key=get_transaction_hash,
)
swaps = []
for _, transaction_traces in traces_by_transaction:
swaps += _get_swaps_for_transaction(list(transaction_traces))
return swaps
def _get_swaps_for_transaction(traces: List[ClassifiedTrace]) -> List[Swap]:
ordered_traces = list(sorted(traces, key=lambda t: t.trace_address))
swaps: List[Swap] = []
prior_transfers: List[Transfer] = []
for trace in ordered_traces:
if trace.classification == Classification.transfer:
prior_transfers.append(Transfer.from_trace(trace))
elif trace.classification == Classification.swap:
child_transfers = get_child_transfers(
trace.transaction_hash,
trace.trace_address,
traces,
)
swap = _parse_swap(
trace,
remove_child_transfers_of_transfers(prior_transfers),
remove_child_transfers_of_transfers(child_transfers),
)
if swap is not None:
swaps.append(swap)
return swaps
def _parse_swap(
trace: ClassifiedTrace,
prior_transfers: List[Transfer],
child_transfers: List[Transfer],
) -> Optional[Swap]:
pool_address = trace.to_address
recipient_address = _get_recipient_address(trace)
if recipient_address is None:
return None
transfers_to_pool = filter_transfers(prior_transfers, to_address=pool_address)
if len(transfers_to_pool) == 0:
transfers_to_pool = filter_transfers(child_transfers, to_address=pool_address)
if len(transfers_to_pool) == 0:
return None
transfers_from_pool_to_recipient = filter_transfers(
child_transfers, to_address=recipient_address, from_address=pool_address
)
if len(transfers_from_pool_to_recipient) != 1:
return None
transfer_in = transfers_to_pool[-1]
transfer_out = transfers_from_pool_to_recipient[0]
return Swap(
abi_name=trace.abi_name,
transaction_hash=trace.transaction_hash,
block_number=trace.block_number,
trace_address=trace.trace_address,
pool_address=pool_address,
from_address=transfer_in.from_address,
to_address=transfer_out.to_address,
token_in_address=transfer_in.token_address,
token_in_amount=transfer_in.amount,
token_out_address=transfer_out.token_address,
token_out_amount=transfer_out.amount,
)
def _get_recipient_address(trace: ClassifiedTrace) -> Optional[str]:
if trace.abi_name == UNISWAP_V3_POOL_ABI_NAME:
return (
trace.inputs["recipient"]
if trace.inputs is not None and "recipient" in trace.inputs
else trace.from_address
)
elif trace.abi_name == UNISWAP_V2_PAIR_ABI_NAME:
return (
trace.inputs["to"]
if trace.inputs is not None and "to" in trace.inputs
else trace.from_address
)
else:
return None

View File

@ -1,80 +1,33 @@
from itertools import groupby from typing import List
from typing import Iterable, List
from mev_inspect.schemas import Trace, NestedTrace from mev_inspect.schemas.classified_traces import ClassifiedTrace
def as_nested_traces(traces: Iterable[Trace]) -> List[NestedTrace]: def is_child_trace_address(
nested_traces = [] child_trace_address: List[int],
parent_trace_address: List[int],
) -> bool:
parent_trace_length = len(parent_trace_address)
sorted_by_transaction_hash = sorted(traces, key=_get_transaction_hash) return (
for _, transaction_traces in groupby( len(child_trace_address) > parent_trace_length
sorted_by_transaction_hash, _get_transaction_hash and child_trace_address[:parent_trace_length] == parent_trace_address
): )
nested_traces += _as_nested_traces_by_transaction(transaction_traces)
return nested_traces
def _get_transaction_hash(trace) -> str: def get_child_traces(
return trace.transaction_hash transaction_hash: str,
parent_trace_address: List[int],
traces: List[ClassifiedTrace],
) -> List[ClassifiedTrace]:
ordered_traces = sorted(traces, key=lambda t: t.trace_address)
child_traces = []
for trace in ordered_traces:
if trace.transaction_hash == transaction_hash and is_child_trace_address(
trace.trace_address,
parent_trace_address,
):
child_traces.append(trace)
def _as_nested_traces_by_transaction(traces: Iterable[Trace]) -> List[NestedTrace]: return child_traces
"""
Turns a list of Traces into a a tree of NestedTraces
using their trace addresses
Right now this has an exponential (?) runtime because we rescan
most traces at each level of tree depth
TODO to write a better implementation if it becomes a bottleneck
Should be doable in linear time
"""
nested_traces = []
parent = None
children: List[Trace] = []
sorted_traces = sorted(traces, key=lambda t: t.trace_address)
for trace in sorted_traces:
if parent is None:
parent = trace
children = []
continue
elif not _is_subtrace(trace, parent):
nested_traces.append(
NestedTrace(
trace=parent,
subtraces=as_nested_traces(children),
)
)
parent = trace
children = []
else:
children.append(trace)
if parent is not None:
nested_traces.append(
NestedTrace(
trace=parent,
subtraces=as_nested_traces(children),
)
)
return nested_traces
def _is_subtrace(trace: Trace, parent: Trace):
parent_trace_length = len(parent.trace_address)
if len(trace.trace_address) > parent_trace_length:
prefix = trace.trace_address[:parent_trace_length]
return prefix == parent.trace_address
return False

62
mev_inspect/transfers.py Normal file
View File

@ -0,0 +1,62 @@
from typing import Dict, List, Optional
from mev_inspect.schemas.classified_traces import Classification, ClassifiedTrace
from mev_inspect.schemas.transfers import Transfer
from mev_inspect.traces import is_child_trace_address, get_child_traces
def get_child_transfers(
transaction_hash: str,
parent_trace_address: List[int],
traces: List[ClassifiedTrace],
) -> List[Transfer]:
child_transfers = []
for child_trace in get_child_traces(transaction_hash, parent_trace_address, traces):
if child_trace.classification == Classification.transfer:
child_transfers.append(Transfer.from_trace(child_trace))
return child_transfers
def filter_transfers(
transfers: List[Transfer],
to_address: Optional[str] = None,
from_address: Optional[str] = None,
) -> List[Transfer]:
filtered_transfers = []
for transfer in transfers:
if to_address is not None and transfer.to_address != to_address:
continue
if from_address is not None and transfer.from_address != from_address:
continue
filtered_transfers.append(transfer)
return filtered_transfers
def remove_child_transfers_of_transfers(transfers: List[Transfer]) -> List[Transfer]:
updated_transfers = []
transfer_addresses_by_transaction: Dict[str, List[List[int]]] = {}
sorted_transfers = sorted(transfers, key=lambda t: t.trace_address)
for transfer in sorted_transfers:
existing_addresses = transfer_addresses_by_transaction.get(
transfer.transaction_hash, []
)
if not any(
is_child_trace_address(transfer.trace_address, parent_address)
for parent_address in existing_addresses
):
updated_transfers.append(transfer)
transfer_addresses_by_transaction[
transfer.transaction_hash
] = existing_addresses + [transfer.trace_address]
return updated_transfers

0
scripts/__init__.py Normal file
View File

View File

@ -11,6 +11,8 @@ from mev_inspect.crud.classified_traces import (
from mev_inspect.db import get_session from mev_inspect.db import get_session
from mev_inspect.classifier_specs import CLASSIFIER_SPECS from mev_inspect.classifier_specs import CLASSIFIER_SPECS
from mev_inspect.trace_classifier import TraceClassifier from mev_inspect.trace_classifier import TraceClassifier
from mev_inspect.arbitrage import get_arbitrages
from mev_inspect.swaps import get_swaps
@click.command() @click.command()
@ -39,6 +41,12 @@ def inspect_block(block_number: int, rpc: str):
write_classified_traces(db_session, classified_traces) write_classified_traces(db_session, classified_traces)
db_session.close() db_session.close()
swaps = get_swaps(classified_traces)
print(f"Found {len(swaps)} swaps")
arbitrages = get_arbitrages(swaps)
print(f"Found {len(arbitrages)} arbitrages")
stats = get_stats(classified_traces) stats = get_stats(classified_traces)
print(json.dumps(stats, indent=4)) print(json.dumps(stats, indent=4))

File diff suppressed because one or more lines are too long

26
tests/conftest.py Normal file
View File

@ -0,0 +1,26 @@
from hashlib import sha3_256
from typing import List
import pytest
@pytest.fixture(name="get_transaction_hashes")
def fixture_get_transaction_hashes():
def _get_transaction_hashes(n: int):
return _hash_with_prefix(n, "transaction_hash")
return _get_transaction_hashes
@pytest.fixture(name="get_addresses")
def fixture_get_addresses():
def _get_addresses(n: int):
return [f"0x{hash_value[:40]}" for hash_value in _hash_with_prefix(n, "addr")]
return _get_addresses
def _hash_with_prefix(n_hashes: int, prefix: str) -> List[str]:
return [
sha3_256(f"{prefix}{i}".encode("utf-8")).hexdigest() for i in range(n_hashes)
]

81
tests/helpers.py Normal file
View File

@ -0,0 +1,81 @@
from typing import List
from mev_inspect.schemas.blocks import TraceType
from mev_inspect.schemas.classified_traces import Classification, ClassifiedTrace
def make_transfer_trace(
block_number: int,
transaction_hash: str,
trace_address: List[int],
from_address: str,
to_address: str,
token_address: str,
amount: int,
):
return ClassifiedTrace(
transaction_hash=transaction_hash,
block_number=block_number,
trace_type=TraceType.call,
trace_address=trace_address,
classification=Classification.transfer,
from_address=from_address,
to_address=token_address,
inputs={
"recipient": to_address,
"amount": amount,
},
)
def make_swap_trace(
block_number: int,
transaction_hash: str,
trace_address: List[int],
from_address: str,
pool_address: str,
abi_name: str,
recipient_address: str,
recipient_input_key: str,
):
return ClassifiedTrace(
transaction_hash=transaction_hash,
block_number=block_number,
trace_type=TraceType.call,
trace_address=trace_address,
classification=Classification.swap,
from_address=from_address,
to_address=pool_address,
inputs={recipient_input_key: recipient_address},
abi_name=abi_name,
)
def make_unknown_trace(
block_number,
transaction_hash,
trace_address,
):
return ClassifiedTrace(
transaction_hash=transaction_hash,
block_number=block_number,
trace_type=TraceType.call,
trace_address=trace_address,
classification=Classification.unknown,
)
def make_many_unknown_traces(
block_number,
transaction_hash,
trace_addresses,
) -> List[ClassifiedTrace]:
return [
make_unknown_trace(
block_number,
transaction_hash,
trace_address,
)
for trace_address in trace_addresses
]

View File

@ -0,0 +1,26 @@
from mev_inspect.classifier_specs import CLASSIFIER_SPECS
from mev_inspect.trace_classifier import TraceClassifier
from mev_inspect.arbitrage import get_arbitrages
from mev_inspect.swaps import get_swaps
from .utils import load_test_block
def test_arbitrage_real_block():
block = load_test_block(12914994)
trace_clasifier = TraceClassifier(CLASSIFIER_SPECS)
classified_traces = trace_clasifier.classify(block.traces)
swaps = get_swaps(classified_traces)
arbitrages = get_arbitrages(swaps)
assert len(arbitrages) == 1
arbitrage = arbitrages[0]
assert len(arbitrage.swaps) == 3
assert (
arbitrage.profit_token_address == "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2"
)
assert arbitrage.profit_amount == 53560707941943273628

160
tests/test_arbitrages.py Normal file
View File

@ -0,0 +1,160 @@
from mev_inspect.arbitrage import get_arbitrages
from mev_inspect.schemas.swaps import Swap
from mev_inspect.swaps import (
UNISWAP_V2_PAIR_ABI_NAME,
UNISWAP_V3_POOL_ABI_NAME,
)
def test_two_pool_arbitrage(get_transaction_hashes, get_addresses):
block_number = 123
[transaction_hash] = get_transaction_hashes(1)
[
account_address,
first_pool_address,
second_pool_address,
unrelated_pool_address,
first_token_address,
second_token_address,
] = get_addresses(6)
first_token_in_amount = 10
first_token_out_amount = 10
second_token_amount = 15
arb_swaps = [
Swap(
abi_name=UNISWAP_V2_PAIR_ABI_NAME,
transaction_hash=transaction_hash,
block_number=block_number,
trace_address=[0],
pool_address=first_pool_address,
from_address=account_address,
to_address=second_pool_address,
token_in_address=first_token_address,
token_in_amount=first_token_in_amount,
token_out_address=second_token_address,
token_out_amount=second_token_amount,
),
Swap(
abi_name=UNISWAP_V3_POOL_ABI_NAME,
transaction_hash=transaction_hash,
block_number=block_number,
trace_address=[1],
pool_address=second_pool_address,
from_address=first_pool_address,
to_address=account_address,
token_in_address=second_token_address,
token_in_amount=second_token_amount,
token_out_address=first_token_address,
token_out_amount=first_token_out_amount,
),
]
unrelated_swap = Swap(
abi_name=UNISWAP_V3_POOL_ABI_NAME,
transaction_hash=transaction_hash,
block_number=block_number,
trace_address=[2, 0],
pool_address=unrelated_pool_address,
from_address=account_address,
to_address=account_address,
token_in_address=second_token_address,
token_in_amount=first_token_in_amount,
token_out_address=first_token_address,
token_out_amount=first_token_out_amount,
)
swaps = [
unrelated_swap,
*arb_swaps,
]
arbitrages = get_arbitrages(swaps)
assert len(arbitrages) == 1
arbitrage = arbitrages[0]
assert arbitrage.swaps == arb_swaps
assert arbitrage.account_address == account_address
assert arbitrage.profit_token_address == first_token_address
assert arbitrage.start_amount == first_token_in_amount
assert arbitrage.end_amount == first_token_out_amount
assert arbitrage.profit_amount == first_token_out_amount - first_token_in_amount
def test_three_pool_arbitrage(get_transaction_hashes, get_addresses):
block_number = 123
[transaction_hash] = get_transaction_hashes(1)
[
account_address,
first_pool_address,
second_pool_address,
third_pool_address,
first_token_address,
second_token_address,
third_token_address,
] = get_addresses(7)
first_token_in_amount = 10
first_token_out_amount = 10
second_token_amount = 15
third_token_amount = 40
swaps = [
Swap(
abi_name=UNISWAP_V2_PAIR_ABI_NAME,
transaction_hash=transaction_hash,
block_number=block_number,
trace_address=[0],
pool_address=first_pool_address,
from_address=account_address,
to_address=second_pool_address,
token_in_address=first_token_address,
token_in_amount=first_token_in_amount,
token_out_address=second_token_address,
token_out_amount=second_token_amount,
),
Swap(
abi_name=UNISWAP_V3_POOL_ABI_NAME,
transaction_hash=transaction_hash,
block_number=block_number,
trace_address=[1],
pool_address=second_pool_address,
from_address=first_pool_address,
to_address=third_pool_address,
token_in_address=second_token_address,
token_in_amount=second_token_amount,
token_out_address=third_token_address,
token_out_amount=third_token_amount,
),
Swap(
abi_name=UNISWAP_V3_POOL_ABI_NAME,
transaction_hash=transaction_hash,
block_number=block_number,
trace_address=[2],
pool_address=third_pool_address,
from_address=second_pool_address,
to_address=account_address,
token_in_address=third_token_address,
token_in_amount=third_token_amount,
token_out_address=first_token_address,
token_out_amount=first_token_out_amount,
),
]
arbitrages = get_arbitrages(swaps)
assert len(arbitrages) == 1
arbitrage = arbitrages[0]
assert arbitrage.swaps == swaps
assert arbitrage.account_address == account_address
assert arbitrage.profit_token_address == first_token_address
assert arbitrage.start_amount == first_token_in_amount
assert arbitrage.end_amount == first_token_out_amount
assert arbitrage.profit_amount == first_token_out_amount - first_token_in_amount

135
tests/test_swaps.py Normal file
View File

@ -0,0 +1,135 @@
from mev_inspect.swaps import (
get_swaps,
UNISWAP_V2_PAIR_ABI_NAME,
UNISWAP_V3_POOL_ABI_NAME,
)
from .helpers import (
make_unknown_trace,
make_transfer_trace,
make_swap_trace,
)
def test_swaps(
get_transaction_hashes,
get_addresses,
):
block_number = 123
[
first_transaction_hash,
second_transaction_hash,
] = get_transaction_hashes(2)
[
alice_address,
bob_address,
carl_address,
first_token_in_address,
first_token_out_address,
first_pool_address,
second_token_in_address,
second_token_out_address,
second_pool_address,
] = get_addresses(9)
first_token_in_amount = 10
first_token_out_amount = 20
second_token_in_amount = 30
second_token_out_amount = 40
traces = [
make_unknown_trace(block_number, first_transaction_hash, []),
make_transfer_trace(
block_number,
first_transaction_hash,
trace_address=[0],
from_address=alice_address,
to_address=first_pool_address,
token_address=first_token_in_address,
amount=first_token_in_amount,
),
make_swap_trace(
block_number,
first_transaction_hash,
trace_address=[1],
from_address=alice_address,
pool_address=first_pool_address,
abi_name=UNISWAP_V2_PAIR_ABI_NAME,
recipient_address=bob_address,
recipient_input_key="to",
),
make_transfer_trace(
block_number,
first_transaction_hash,
trace_address=[1, 0],
from_address=first_pool_address,
to_address=bob_address,
token_address=first_token_out_address,
amount=first_token_out_amount,
),
make_swap_trace(
block_number,
second_transaction_hash,
trace_address=[],
from_address=bob_address,
pool_address=second_pool_address,
abi_name=UNISWAP_V3_POOL_ABI_NAME,
recipient_address=carl_address,
recipient_input_key="recipient",
),
make_transfer_trace(
block_number,
second_transaction_hash,
trace_address=[0],
from_address=second_pool_address,
to_address=carl_address,
token_address=second_token_out_address,
amount=second_token_out_amount,
),
make_transfer_trace(
block_number,
second_transaction_hash,
trace_address=[1],
from_address=bob_address,
to_address=second_pool_address,
token_address=second_token_in_address,
amount=second_token_in_amount,
),
]
swaps = get_swaps(traces)
assert len(swaps) == 2
if swaps[0].abi_name == UNISWAP_V2_PAIR_ABI_NAME:
[first_swap, second_swap] = swaps # pylint: disable=unbalanced-tuple-unpacking
else:
[second_swap, first_swap] = swaps # pylint: disable=unbalanced-tuple-unpacking
assert first_swap.abi_name == UNISWAP_V2_PAIR_ABI_NAME
assert first_swap.transaction_hash == first_transaction_hash
assert first_swap.block_number == block_number
assert first_swap.trace_address == [1]
assert first_swap.protocol is None
assert first_swap.pool_address == first_pool_address
assert first_swap.from_address == alice_address
assert first_swap.to_address == bob_address
assert first_swap.token_in_address == first_token_in_address
assert first_swap.token_in_amount == first_token_in_amount
assert first_swap.token_out_address == first_token_out_address
assert first_swap.token_out_amount == first_token_out_amount
assert second_swap.abi_name == UNISWAP_V3_POOL_ABI_NAME
assert second_swap.transaction_hash == second_transaction_hash
assert second_swap.block_number == block_number
assert second_swap.trace_address == []
assert second_swap.protocol is None
assert second_swap.pool_address == second_pool_address
assert second_swap.from_address == bob_address
assert second_swap.to_address == carl_address
assert second_swap.token_in_address == second_token_in_address
assert second_swap.token_in_amount == second_token_in_amount
assert second_swap.token_out_address == second_token_out_address
assert second_swap.token_out_amount == second_token_out_amount

112
tests/test_traces.py Normal file
View File

@ -0,0 +1,112 @@
from typing import List
from mev_inspect.schemas.classified_traces import ClassifiedTrace
from mev_inspect.traces import is_child_trace_address, get_child_traces
from .helpers import make_many_unknown_traces
def test_is_child_trace_address():
assert is_child_trace_address([0], [])
assert is_child_trace_address([0, 0], [])
assert is_child_trace_address([0, 0], [0])
assert is_child_trace_address([100, 1, 10], [100])
assert is_child_trace_address([100, 1, 10], [100, 1])
assert not is_child_trace_address([0], [1])
assert not is_child_trace_address([1], [0])
assert not is_child_trace_address([1, 0], [0])
assert not is_child_trace_address([100, 2, 10], [100, 1])
def test_get_child_traces(get_transaction_hashes):
block_number = 123
[first_hash, second_hash] = get_transaction_hashes(2)
traces = []
first_hash_trace_addresses = [
[],
[0],
[0, 0],
[1],
[1, 0],
[1, 0, 0],
[1, 0, 1],
[1, 1],
[1, 2],
]
second_hash_trace_addresses = [[], [0], [1], [1, 0], [2]]
traces += make_many_unknown_traces(
block_number,
first_hash,
first_hash_trace_addresses,
)
traces += make_many_unknown_traces(
block_number,
second_hash,
second_hash_trace_addresses,
)
assert has_expected_child_traces(
first_hash,
[],
traces,
first_hash_trace_addresses[1:],
)
assert has_expected_child_traces(
first_hash,
[0],
traces,
[
[0, 0],
],
)
assert has_expected_child_traces(
second_hash,
[2],
traces,
[],
)
def has_expected_child_traces(
transaction_hash: str,
parent_trace_address: List[int],
traces: List[ClassifiedTrace],
expected_trace_addresses: List[List[int]],
):
child_traces = get_child_traces(
transaction_hash,
parent_trace_address,
traces,
)
distinct_trace_addresses = distinct_lists(expected_trace_addresses)
if len(child_traces) != len(distinct_trace_addresses):
return False
for trace in child_traces:
if trace.transaction_hash != transaction_hash:
return False
if trace.trace_address not in distinct_trace_addresses:
return False
return True
def distinct_lists(list_of_lists: List[List[int]]) -> List[List[int]]:
distinct_so_far = []
for list_of_values in list_of_lists:
if list_of_values not in distinct_so_far:
distinct_so_far.append(list_of_values)
return distinct_so_far

71
tests/test_transfers.py Normal file
View File

@ -0,0 +1,71 @@
from mev_inspect.schemas.transfers import Transfer
from mev_inspect.transfers import remove_child_transfers_of_transfers
def test_remove_child_transfers_of_transfers(get_transaction_hashes, get_addresses):
[transaction_hash, other_transaction_hash] = get_transaction_hashes(2)
[
alice_address,
bob_address,
first_token_address,
second_token_address,
third_token_address,
] = get_addresses(5)
outer_transfer = Transfer(
transaction_hash=transaction_hash,
trace_address=[0],
from_address=alice_address,
to_address=bob_address,
amount=10,
token_address=first_token_address,
)
inner_transfer = Transfer(
**{
**outer_transfer.dict(),
**dict(
trace_address=[0, 0],
token_address=second_token_address,
),
}
)
other_transfer = Transfer(
transaction_hash=transaction_hash,
trace_address=[1],
from_address=bob_address,
to_address=alice_address,
amount=10,
token_address=third_token_address,
)
separate_transaction_transfer = Transfer(
**{
**inner_transfer.dict(),
**dict(transaction_hash=other_transaction_hash),
}
)
transfers = [
outer_transfer,
inner_transfer,
other_transfer,
separate_transaction_transfer,
]
expected_transfers = [
outer_transfer,
other_transfer,
separate_transaction_transfer,
]
removed_transfers = remove_child_transfers_of_transfers(transfers)
assert _equal_ignoring_order(removed_transfers, expected_transfers)
def _equal_ignoring_order(first_list, second_list) -> bool:
return all(first in second_list for first in first_list) and all(
second in first_list for second in second_list
)

View File

@ -1,12 +1,8 @@
import json
import os
import unittest import unittest
from mev_inspect import tokenflow from mev_inspect import tokenflow
from mev_inspect.schemas.blocks import Block
THIS_FILE_DIRECTORY = os.path.dirname(__file__) from .utils import load_test_block
TEST_BLOCKS_DIRECTORY = os.path.join(THIS_FILE_DIRECTORY, "blocks")
class TestTokenFlow(unittest.TestCase): class TestTokenFlow(unittest.TestCase):
@ -37,13 +33,5 @@ class TestTokenFlow(unittest.TestCase):
self.assertEqual(res["dollar_flows"], [0, 0]) self.assertEqual(res["dollar_flows"], [0, 0])
def load_test_block(block_number):
block_path = f"{TEST_BLOCKS_DIRECTORY}/{block_number}.json"
with open(block_path, "r") as block_file:
block_json = json.load(block_file)
return Block(**block_json)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()

View File

@ -1,103 +0,0 @@
import unittest
from typing import List
from mev_inspect.schemas import Trace, TraceType, NestedTrace
from mev_inspect.traces import as_nested_traces
DEFAULT_BLOCK_NUMBER = 123
class TestTraces(unittest.TestCase):
def test_nested_traces(self):
trace_hash_address_pairs = [
("abc", [0, 2]),
("abc", []),
("abc", [2]),
("abc", [0]),
("abc", [0, 0]),
("abc", [0, 1]),
("abc", [1]),
("efg", []),
("abc", [1, 0]),
("abc", [0, 1, 0]),
("efg", [0]),
]
traces = [
build_trace_at_address(hash, address)
for (hash, address) in trace_hash_address_pairs
]
nested_traces = as_nested_traces(traces)
assert len(nested_traces) == 2
abc_trace = nested_traces[0]
efg_trace = nested_traces[1]
# abc
assert abc_trace.trace.transaction_hash == "abc"
assert_trace_address(abc_trace, [])
assert len(abc_trace.subtraces) == 3
[trace_0, trace_1, trace_2] = abc_trace.subtraces
assert_trace_address(trace_0, [0])
assert_trace_address(trace_1, [1])
assert_trace_address(trace_2, [2])
assert len(trace_0.subtraces) == 3
assert len(trace_1.subtraces) == 1
assert len(trace_2.subtraces) == 0
[trace_0_0, trace_0_1, trace_0_2] = trace_0.subtraces
[trace_1_0] = trace_1.subtraces
assert_trace_address(trace_0_0, [0, 0])
assert_trace_address(trace_0_1, [0, 1])
assert_trace_address(trace_0_2, [0, 2])
assert_trace_address(trace_1_0, [1, 0])
assert len(trace_0_0.subtraces) == 0
assert len(trace_0_1.subtraces) == 1
assert len(trace_0_2.subtraces) == 0
assert len(trace_1_0.subtraces) == 0
[trace_0_1_0] = trace_0_1.subtraces
assert_trace_address(trace_0_1_0, [0, 1, 0])
assert len(trace_0_1_0.subtraces) == 0
# efg
assert efg_trace.trace.transaction_hash == "efg"
assert_trace_address(efg_trace, [])
assert len(efg_trace.subtraces) == 1
[efg_subtrace] = efg_trace.subtraces
assert_trace_address(efg_subtrace, [0])
assert len(efg_subtrace.subtraces) == 0
def build_trace_at_address(
transaction_hash: str,
trace_address: List[int],
) -> Trace:
return Trace(
# real values
transaction_hash=transaction_hash,
trace_address=trace_address,
# placeholders
action={},
block_hash="",
block_number=DEFAULT_BLOCK_NUMBER,
result=None,
subtraces=0,
transaction_position=None,
type=TraceType.call,
error=None,
)
def assert_trace_address(nested_trace: NestedTrace, trace_address: List[int]):
assert nested_trace.trace.trace_address == trace_address

16
tests/utils.py Normal file
View File

@ -0,0 +1,16 @@
import json
import os
from mev_inspect.schemas.blocks import Block
THIS_FILE_DIRECTORY = os.path.dirname(__file__)
TEST_BLOCKS_DIRECTORY = os.path.join(THIS_FILE_DIRECTORY, "blocks")
def load_test_block(block_number: int) -> Block:
block_path = f"{TEST_BLOCKS_DIRECTORY}/{block_number}.json"
with open(block_path, "r") as block_file:
block_json = json.load(block_file)
return Block(**block_json)