Compare commits

...

12 Commits

Author SHA1 Message Date
Luke Van Seters
381c04f3cc Write swaps to the DB. Replace existing ones 2021-08-05 17:45:08 -04:00
Luke Van Seters
55b162ab88 Add swap model 2021-08-05 17:42:52 -04:00
Luke Van Seters
ed23024bfd Add table for swaps 2021-08-05 17:41:59 -04:00
Luke Van Seters
af4922d210 Pass swaps into arbitrage instead of computing inside 2021-08-05 17:32:22 -04:00
Luke Van Seters
d4ebd75eb4 Merge swap parsing into a shared function 2021-08-05 13:05:14 -04:00
Luke Van Seters
a9ba92af3e Add printing arbitrage from inspect_block 2021-08-05 12:41:24 -04:00
Luke Van Seters
860e41c079 Add Arbitrage + finding arbitrage from swaps 2021-08-05 12:40:05 -04:00
Luke Van Seters
8df83912a3 Add Swaps + parsing swaps from traces 2021-08-05 12:37:10 -04:00
Luke Van Seters
680da8763c Add Transfer schema + parse from trace 2021-08-05 12:36:37 -04:00
Luke Van Seters
677f0b3475 Remove nested traces 2021-08-05 12:35:38 -04:00
Luke Van Seters
2d9a47d05e Add WETH ABI 2021-08-05 12:28:48 -04:00
Luke Van Seters
3e08e696d0 Skip committing cache 2021-08-05 12:27:33 -04:00
19 changed files with 478 additions and 185 deletions

3
.gitignore vendored
View File

@ -13,3 +13,6 @@ __pycache__
# coverage # coverage
htmlcov htmlcov
.coverage* .coverage*
# don't commit cache
cache

View File

@ -0,0 +1,39 @@
"""Create swaps table
Revision ID: 2116e2f36a19
Revises: c5da44eb072c
Create Date: 2021-08-05 21:06:33.340456
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "2116e2f36a19"
down_revision = "c5da44eb072c"
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
"swaps",
sa.Column("created_at", sa.TIMESTAMP, server_default=sa.func.now()),
sa.Column("abi_name", sa.String(1024), nullable=False),
sa.Column("transaction_hash", sa.String(66), nullable=False),
sa.Column("block_number", sa.Numeric, nullable=False),
sa.Column("trace_address", sa.String(256), nullable=False),
sa.Column("protocol", sa.String(256), nullable=True),
sa.Column("pool_address", sa.String(256), nullable=False),
sa.Column("from_address", sa.String(256), nullable=False),
sa.Column("to_address", sa.String(256), nullable=False),
sa.Column("token_in_address", sa.String(256), nullable=False),
sa.Column("token_in_amount", sa.Numeric, nullable=False),
sa.Column("token_out_address", sa.String(256), nullable=False),
sa.Column("token_out_amount", sa.Numeric, nullable=False),
sa.PrimaryKeyConstraint("transaction_hash", "trace_address"),
)
def downgrade():
op.drop_table("swaps")

View File

@ -0,0 +1 @@
[{"constant":true,"inputs":[],"name":"name","outputs":[{"name":"","type":"string"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[{"name":"guy","type":"address"},{"name":"wad","type":"uint256"}],"name":"approve","outputs":[{"name":"","type":"bool"}],"payable":false,"stateMutability":"nonpayable","type":"function"},{"constant":true,"inputs":[],"name":"totalSupply","outputs":[{"name":"","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[{"name":"src","type":"address"},{"name":"dst","type":"address"},{"name":"wad","type":"uint256"}],"name":"transferFrom","outputs":[{"name":"","type":"bool"}],"payable":false,"stateMutability":"nonpayable","type":"function"},{"constant":false,"inputs":[{"name":"wad","type":"uint256"}],"name":"withdraw","outputs":[],"payable":false,"stateMutability":"nonpayable","type":"function"},{"constant":true,"inputs":[],"name":"decimals","outputs":[{"name":"","type":"uint8"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[{"name":"","type":"address"}],"name":"balanceOf","outputs":[{"name":"","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[],"name":"symbol","outputs":[{"name":"","type":"string"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[{"name":"dst","type":"address"},{"name":"wad","type":"uint256"}],"name":"transfer","outputs":[{"name":"","type":"bool"}],"payable":false,"stateMutability":"nonpayable","type":"function"},{"constant":false,"inputs":[],"name":"deposit","outputs":[],"payable":true,"stateMutability":"payable","type":"function"},{"constant":true,"inputs":[{"name":"","type":"address"},{"name":"","type":"address"}],"name":"allowance","outputs":[{"name":"","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"},{"payable":true,"stateMutability":"payable","type":"fallback"},{"anonymous":false,"inputs":[{"indexed":true,"name":"src","type":"address"},{"indexed":true,"name":"guy","type":"address"},{"indexed":false,"name":"wad","type":"uint256"}],"name":"Approval","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"name":"src","type":"address"},{"indexed":true,"name":"dst","type":"address"},{"indexed":false,"name":"wad","type":"uint256"}],"name":"Transfer","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"name":"dst","type":"address"},{"indexed":false,"name":"wad","type":"uint256"}],"name":"Deposit","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"name":"src","type":"address"},{"indexed":false,"name":"wad","type":"uint256"}],"name":"Withdrawal","type":"event"}]

90
mev_inspect/arbitrage.py Normal file
View File

@ -0,0 +1,90 @@
from itertools import groupby
from typing import List, Optional
from mev_inspect.schemas.arbitrage import Arbitrage
from mev_inspect.schemas.swaps import Swap
def get_arbitrages(swaps: List[Swap]) -> List[Arbitrage]:
get_transaction_hash = lambda swap: swap.transaction_hash
swaps_by_transaction = groupby(
sorted(swaps, key=get_transaction_hash),
key=get_transaction_hash,
)
all_arbitrages = []
for _, transaction_swaps in swaps_by_transaction:
all_arbitrages += _get_arbitrages_from_swaps(
list(transaction_swaps),
)
return all_arbitrages
def _get_arbitrages_from_swaps(swaps: List[Swap]) -> List[Arbitrage]:
pool_addresses = {swap.pool_address for swap in swaps}
all_arbitrages = []
for index, first_swap in enumerate(swaps):
other_swaps = swaps[:index] + swaps[index + 1 :]
if first_swap.from_address not in pool_addresses:
arbitrage = _get_arbitrage_starting_with_swap(first_swap, other_swaps)
if arbitrage is not None:
all_arbitrages.append(arbitrage)
return all_arbitrages
def _get_arbitrage_starting_with_swap(
start_swap: Swap,
other_swaps: List[Swap],
) -> Optional[Arbitrage]:
swap_path = [start_swap]
current_swap: Swap = start_swap
while True:
next_swap = _get_swap_from_address(
current_swap.to_address,
current_swap.token_out_address,
other_swaps,
)
if next_swap is None:
return None
swap_path.append(next_swap)
current_swap = next_swap
if (
current_swap.to_address == start_swap.from_address
and current_swap.token_out_address == start_swap.token_in_address
):
start_amount = start_swap.token_in_amount
end_amount = current_swap.token_out_amount
profit_amount = end_amount - start_amount
return Arbitrage(
swaps=swap_path,
account_address=start_swap.from_address,
profit_token_address=start_swap.token_in_address,
start_amount=start_amount,
end_amount=end_amount,
profit_amount=profit_amount,
)
return None
def _get_swap_from_address(
address: str, token_address: str, swaps: List[Swap]
) -> Optional[Swap]:
for swap in swaps:
if swap.pool_address == address and swap.token_in_address == token_address:
return swap
return None

View File

@ -99,10 +99,21 @@ ERC20_SPEC = ClassifierSpec(
}, },
) )
WETH_SPEC = ClassifierSpec(
abi_name="WETH9",
protocol=Protocol.weth,
valid_contract_addresses=["0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"],
classifications={
"transferFrom(address,address,uint256)": Classification.transfer,
"transfer(address,uint256)": Classification.transfer,
},
)
CLASSIFIER_SPECS = [ CLASSIFIER_SPECS = [
*UNISWAP_V3_CONTRACT_SPECS, *UNISWAP_V3_CONTRACT_SPECS,
*UNISWAPPY_V2_CONTRACT_SPECS, *UNISWAPPY_V2_CONTRACT_SPECS,
WETH_SPEC,
ERC20_SPEC, ERC20_SPEC,
UNISWAP_V3_POOL_SPEC, UNISWAP_V3_POOL_SPEC,
UNISWAPPY_V2_PAIR_SPEC, UNISWAPPY_V2_PAIR_SPEC,

28
mev_inspect/crud/swaps.py Normal file
View File

@ -0,0 +1,28 @@
import json
from typing import List
from mev_inspect.models.swaps import SwapModel
from mev_inspect.schemas.swaps import Swap
def delete_swaps_for_block(
db_session,
block_number: int,
) -> None:
(
db_session.query(SwapModel)
.filter(SwapModel.block_number == block_number)
.delete()
)
db_session.commit()
def write_swaps(
db_session,
swaps: List[Swap],
) -> None:
models = [SwapModel(**json.loads(swap.json())) for swap in swaps]
db_session.bulk_save_objects(models)
db_session.commit()

View File

@ -0,0 +1,20 @@
from sqlalchemy import Column, Numeric, String
from .base import Base
class SwapModel(Base):
__tablename__ = "swaps"
abi_name = Column(String, nullable=False)
transaction_hash = Column(String, primary_key=True)
block_number = Column(Numeric, nullable=False)
trace_address = Column(String, primary_key=True)
protocol = Column(String, nullable=True)
pool_address = Column(String, nullable=False)
from_address = Column(String, nullable=False)
to_address = Column(String, nullable=False)
token_in_address = Column(String, nullable=False)
token_in_amount = Column(Numeric, nullable=False)
token_out_address = Column(String, nullable=False)
token_out_amount = Column(Numeric, nullable=False)

View File

@ -1,2 +1,2 @@
from .abi import ABI from .abi import ABI
from .blocks import Block, NestedTrace, Trace, TraceType from .blocks import Block, Trace, TraceType

View File

@ -0,0 +1,14 @@
from typing import List
from pydantic import BaseModel
from .swaps import Swap
class Arbitrage(BaseModel):
swaps: List[Swap]
account_address: str
profit_token_address: str
start_amount: int
end_amount: int
profit_amount: int

View File

@ -1,7 +1,7 @@
from enum import Enum from enum import Enum
from typing import Dict, List, Optional from typing import Dict, List, Optional
from pydantic import BaseModel, validator from pydantic import validator
from mev_inspect.utils import hex_to_int from mev_inspect.utils import hex_to_int
from .utils import CamelModel, Web3Model from .utils import CamelModel, Web3Model
@ -66,11 +66,3 @@ class Block(Web3Model):
def get_filtered_traces(self, hash: str) -> List[Trace]: def get_filtered_traces(self, hash: str) -> List[Trace]:
return [trace for trace in self.traces if trace.transaction_hash == hash] return [trace for trace in self.traces if trace.transaction_hash == hash]
class NestedTrace(BaseModel):
trace: Trace
subtraces: List["NestedTrace"]
NestedTrace.update_forward_refs()

View File

@ -17,6 +17,7 @@ class Protocol(Enum):
uniswap_v2 = "uniswap_v2" uniswap_v2 = "uniswap_v2"
uniswap_v3 = "uniswap_v3" uniswap_v3 = "uniswap_v3"
sushiswap = "sushiswap" sushiswap = "sushiswap"
weth = "weth"
class ClassifiedTrace(BaseModel): class ClassifiedTrace(BaseModel):

View File

@ -0,0 +1,20 @@
from typing import List, Optional
from pydantic import BaseModel
from mev_inspect.schemas.classified_traces import Protocol
class Swap(BaseModel):
abi_name: str
transaction_hash: str
block_number: int
trace_address: List[int]
protocol: Optional[Protocol]
pool_address: str
from_address: str
to_address: str
token_in_address: str
token_in_amount: int
token_out_address: str
token_out_amount: int

View File

@ -0,0 +1,38 @@
from typing import List
from pydantic import BaseModel
from .classified_traces import Classification, ClassifiedTrace, Protocol
class Transfer(BaseModel):
transaction_hash: str
trace_address: List[int]
from_address: str
to_address: str
amount: int
token_address: str
@classmethod
def from_trace(cls, trace: ClassifiedTrace) -> "Transfer":
if trace.classification != Classification.transfer or trace.inputs is None:
raise ValueError("Invalid transfer")
if trace.protocol == Protocol.weth:
return cls(
transaction_hash=trace.transaction_hash,
trace_address=trace.trace_address,
amount=trace.inputs["wad"],
to_address=trace.inputs["dst"],
from_address=trace.from_address,
token_address=trace.to_address,
)
else:
return cls(
transaction_hash=trace.transaction_hash,
trace_address=trace.trace_address,
amount=trace.inputs["amount"],
to_address=trace.inputs["recipient"],
from_address=trace.inputs.get("sender", trace.from_address),
token_address=trace.to_address,
)

118
mev_inspect/swaps.py Normal file
View File

@ -0,0 +1,118 @@
from itertools import groupby
from typing import List, Optional
from mev_inspect.schemas.classified_traces import (
ClassifiedTrace,
Classification,
)
from mev_inspect.schemas.swaps import Swap
from mev_inspect.schemas.transfers import Transfer
from mev_inspect.transfers import (
get_child_transfers,
filter_transfers,
remove_inner_transfers,
)
UNISWAP_V2_PAIR_ABI_NAME = "UniswapV2Pair"
UNISWAP_V3_POOL_ABI_NAME = "UniswapV3Pool"
def get_swaps(traces: List[ClassifiedTrace]) -> List[Swap]:
get_transaction_hash = lambda t: t.transaction_hash
traces_by_transaction = groupby(
sorted(traces, key=get_transaction_hash),
key=get_transaction_hash,
)
swaps = []
for _, transaction_traces in traces_by_transaction:
swaps += _get_swaps_for_transaction(list(transaction_traces))
return swaps
def _get_swaps_for_transaction(traces: List[ClassifiedTrace]) -> List[Swap]:
ordered_traces = list(sorted(traces, key=lambda t: t.trace_address))
swaps: List[Swap] = []
prior_transfers: List[Transfer] = []
for trace in ordered_traces:
if trace.classification == Classification.transfer:
prior_transfers.append(Transfer.from_trace(trace))
elif trace.classification == Classification.swap:
child_transfers = get_child_transfers(trace.trace_address, traces)
swap = _parse_swap(
trace,
remove_inner_transfers(prior_transfers),
remove_inner_transfers(child_transfers),
)
if swap is not None:
swaps.append(swap)
return swaps
def _parse_swap(
trace: ClassifiedTrace,
prior_transfers: List[Transfer],
child_transfers: List[Transfer],
) -> Optional[Swap]:
pool_address = trace.to_address
recipient_address = _get_recipient_address(trace)
if recipient_address is None:
return None
transfers_to_pool = filter_transfers(prior_transfers, to_address=pool_address)
if len(transfers_to_pool) == 0:
transfers_to_pool = filter_transfers(child_transfers, to_address=pool_address)
if len(transfers_to_pool) == 0:
return None
transfers_from_pool_to_recipient = filter_transfers(
child_transfers, to_address=recipient_address, from_address=pool_address
)
if len(transfers_from_pool_to_recipient) != 1:
return None
transfer_in = transfers_to_pool[-1]
transfer_out = transfers_from_pool_to_recipient[0]
return Swap(
abi_name=trace.abi_name,
transaction_hash=trace.transaction_hash,
block_number=trace.block_number,
trace_address=trace.trace_address,
pool_address=pool_address,
from_address=transfer_in.from_address,
to_address=transfer_out.to_address,
token_in_address=transfer_in.token_address,
token_in_amount=transfer_in.amount,
token_out_address=transfer_out.token_address,
token_out_amount=transfer_out.amount,
)
def _get_recipient_address(trace: ClassifiedTrace) -> Optional[str]:
if trace.abi_name == UNISWAP_V3_POOL_ABI_NAME:
return (
trace.inputs["recipient"]
if trace.inputs is not None and "recipient" in trace.inputs
else trace.from_address
)
elif trace.abi_name == UNISWAP_V2_PAIR_ABI_NAME:
return (
trace.inputs["to"]
if trace.inputs is not None and "to" in trace.inputs
else trace.from_address
)
else:
return None

View File

@ -1,80 +1,32 @@
from itertools import groupby from typing import List
from typing import Iterable, List
from mev_inspect.schemas import Trace, NestedTrace from mev_inspect.schemas.classified_traces import ClassifiedTrace
def as_nested_traces(traces: Iterable[Trace]) -> List[NestedTrace]: def is_child_trace_address(
nested_traces = [] child_trace_address: List[int],
parent_trace_address: List[int],
) -> bool:
parent_trace_length = len(parent_trace_address)
sorted_by_transaction_hash = sorted(traces, key=_get_transaction_hash) return (
for _, transaction_traces in groupby( len(child_trace_address) > parent_trace_length
sorted_by_transaction_hash, _get_transaction_hash and child_trace_address[:parent_trace_length] == parent_trace_address
): )
nested_traces += _as_nested_traces_by_transaction(transaction_traces)
return nested_traces
def _get_transaction_hash(trace) -> str: def get_child_traces(
return trace.transaction_hash parent_trace_address: List[int],
traces: List[ClassifiedTrace],
) -> List[ClassifiedTrace]:
ordered_traces = sorted(traces, key=lambda t: t.trace_address)
child_traces = []
for trace in ordered_traces:
if is_child_trace_address(
trace.trace_address,
parent_trace_address,
):
child_traces.append(trace)
def _as_nested_traces_by_transaction(traces: Iterable[Trace]) -> List[NestedTrace]: return child_traces
"""
Turns a list of Traces into a a tree of NestedTraces
using their trace addresses
Right now this has an exponential (?) runtime because we rescan
most traces at each level of tree depth
TODO to write a better implementation if it becomes a bottleneck
Should be doable in linear time
"""
nested_traces = []
parent = None
children: List[Trace] = []
sorted_traces = sorted(traces, key=lambda t: t.trace_address)
for trace in sorted_traces:
if parent is None:
parent = trace
children = []
continue
elif not _is_subtrace(trace, parent):
nested_traces.append(
NestedTrace(
trace=parent,
subtraces=as_nested_traces(children),
)
)
parent = trace
children = []
else:
children.append(trace)
if parent is not None:
nested_traces.append(
NestedTrace(
trace=parent,
subtraces=as_nested_traces(children),
)
)
return nested_traces
def _is_subtrace(trace: Trace, parent: Trace):
parent_trace_length = len(parent.trace_address)
if len(trace.trace_address) > parent_trace_length:
prefix = trace.trace_address[:parent_trace_length]
return prefix == parent.trace_address
return False

55
mev_inspect/transfers.py Normal file
View File

@ -0,0 +1,55 @@
from typing import List, Optional
from mev_inspect.schemas.classified_traces import Classification, ClassifiedTrace
from mev_inspect.schemas.transfers import Transfer
from mev_inspect.traces import is_child_trace_address, get_child_traces
def get_child_transfers(
parent_trace_address: List[int],
traces: List[ClassifiedTrace],
) -> List[Transfer]:
child_transfers = []
for child_trace in get_child_traces(parent_trace_address, traces):
if child_trace.classification == Classification.transfer:
child_transfers.append(Transfer.from_trace(child_trace))
return child_transfers
def filter_transfers(
transfers: List[Transfer],
to_address: Optional[str] = None,
from_address: Optional[str] = None,
) -> List[Transfer]:
filtered_transfers = []
for transfer in transfers:
if to_address is not None and transfer.to_address != to_address:
continue
if from_address is not None and transfer.from_address != from_address:
continue
filtered_transfers.append(transfer)
return filtered_transfers
def remove_inner_transfers(transfers: List[Transfer]) -> List[Transfer]:
updated_transfers = []
transfer_trace_addresses: List[List[int]] = []
sorted_transfers = sorted(transfers, key=lambda t: t.trace_address)
for transfer in sorted_transfers:
if not any(
is_child_trace_address(transfer.trace_address, parent_address)
for parent_address in transfer_trace_addresses
):
updated_transfers.append(transfer)
transfer_trace_addresses.append(transfer.trace_address)
return updated_transfers

0
scripts/__init__.py Normal file
View File

View File

@ -8,9 +8,12 @@ from mev_inspect.crud.classified_traces import (
delete_classified_traces_for_block, delete_classified_traces_for_block,
write_classified_traces, write_classified_traces,
) )
from mev_inspect.crud.swaps import delete_swaps_for_block, write_swaps
from mev_inspect.db import get_session from mev_inspect.db import get_session
from mev_inspect.classifier_specs import CLASSIFIER_SPECS from mev_inspect.classifier_specs import CLASSIFIER_SPECS
from mev_inspect.trace_classifier import TraceClassifier from mev_inspect.trace_classifier import TraceClassifier
from mev_inspect.arbitrage import get_arbitrages
from mev_inspect.swaps import get_swaps
@click.command() @click.command()
@ -35,10 +38,21 @@ def inspect_block(block_number: int, rpc: str):
print(f"Returned {len(classified_traces)} classified traces") print(f"Returned {len(classified_traces)} classified traces")
db_session = get_session() db_session = get_session()
delete_classified_traces_for_block(db_session, block_number) delete_classified_traces_for_block(db_session, block_number)
write_classified_traces(db_session, classified_traces) write_classified_traces(db_session, classified_traces)
swaps = get_swaps(classified_traces)
print(f"Found {len(swaps)} swaps")
delete_swaps_for_block(db_session, block_number)
write_swaps(db_session, swaps)
db_session.close() db_session.close()
arbitrages = get_arbitrages(swaps)
print(f"Found {len(arbitrages)} arbitrages")
stats = get_stats(classified_traces) stats = get_stats(classified_traces)
print(json.dumps(stats, indent=4)) print(json.dumps(stats, indent=4))

View File

@ -1,103 +0,0 @@
import unittest
from typing import List
from mev_inspect.schemas import Trace, TraceType, NestedTrace
from mev_inspect.traces import as_nested_traces
DEFAULT_BLOCK_NUMBER = 123
class TestTraces(unittest.TestCase):
def test_nested_traces(self):
trace_hash_address_pairs = [
("abc", [0, 2]),
("abc", []),
("abc", [2]),
("abc", [0]),
("abc", [0, 0]),
("abc", [0, 1]),
("abc", [1]),
("efg", []),
("abc", [1, 0]),
("abc", [0, 1, 0]),
("efg", [0]),
]
traces = [
build_trace_at_address(hash, address)
for (hash, address) in trace_hash_address_pairs
]
nested_traces = as_nested_traces(traces)
assert len(nested_traces) == 2
abc_trace = nested_traces[0]
efg_trace = nested_traces[1]
# abc
assert abc_trace.trace.transaction_hash == "abc"
assert_trace_address(abc_trace, [])
assert len(abc_trace.subtraces) == 3
[trace_0, trace_1, trace_2] = abc_trace.subtraces
assert_trace_address(trace_0, [0])
assert_trace_address(trace_1, [1])
assert_trace_address(trace_2, [2])
assert len(trace_0.subtraces) == 3
assert len(trace_1.subtraces) == 1
assert len(trace_2.subtraces) == 0
[trace_0_0, trace_0_1, trace_0_2] = trace_0.subtraces
[trace_1_0] = trace_1.subtraces
assert_trace_address(trace_0_0, [0, 0])
assert_trace_address(trace_0_1, [0, 1])
assert_trace_address(trace_0_2, [0, 2])
assert_trace_address(trace_1_0, [1, 0])
assert len(trace_0_0.subtraces) == 0
assert len(trace_0_1.subtraces) == 1
assert len(trace_0_2.subtraces) == 0
assert len(trace_1_0.subtraces) == 0
[trace_0_1_0] = trace_0_1.subtraces
assert_trace_address(trace_0_1_0, [0, 1, 0])
assert len(trace_0_1_0.subtraces) == 0
# efg
assert efg_trace.trace.transaction_hash == "efg"
assert_trace_address(efg_trace, [])
assert len(efg_trace.subtraces) == 1
[efg_subtrace] = efg_trace.subtraces
assert_trace_address(efg_subtrace, [0])
assert len(efg_subtrace.subtraces) == 0
def build_trace_at_address(
transaction_hash: str,
trace_address: List[int],
) -> Trace:
return Trace(
# real values
transaction_hash=transaction_hash,
trace_address=trace_address,
# placeholders
action={},
block_hash="",
block_number=DEFAULT_BLOCK_NUMBER,
result=None,
subtraces=0,
transaction_position=None,
type=TraceType.call,
error=None,
)
def assert_trace_address(nested_trace: NestedTrace, trace_address: List[int]):
assert nested_trace.trace.trace_address == trace_address