diff --git a/alembic/versions/2116e2f36a19_create_swaps_table.py b/alembic/versions/2116e2f36a19_create_swaps_table.py new file mode 100644 index 0000000..8f4b677 --- /dev/null +++ b/alembic/versions/2116e2f36a19_create_swaps_table.py @@ -0,0 +1,39 @@ +"""Create swaps table + +Revision ID: 2116e2f36a19 +Revises: c5da44eb072c +Create Date: 2021-08-05 21:06:33.340456 + +""" +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "2116e2f36a19" +down_revision = "c5da44eb072c" +branch_labels = None +depends_on = None + + +def upgrade(): + op.create_table( + "swaps", + sa.Column("created_at", sa.TIMESTAMP, server_default=sa.func.now()), + sa.Column("abi_name", sa.String(1024), nullable=False), + sa.Column("transaction_hash", sa.String(66), nullable=False), + sa.Column("block_number", sa.Numeric, nullable=False), + sa.Column("trace_address", sa.String(256), nullable=False), + sa.Column("protocol", sa.String(256), nullable=True), + sa.Column("pool_address", sa.String(256), nullable=False), + sa.Column("from_address", sa.String(256), nullable=False), + sa.Column("to_address", sa.String(256), nullable=False), + sa.Column("token_in_address", sa.String(256), nullable=False), + sa.Column("token_in_amount", sa.Numeric, nullable=False), + sa.Column("token_out_address", sa.String(256), nullable=False), + sa.Column("token_out_amount", sa.Numeric, nullable=False), + sa.PrimaryKeyConstraint("transaction_hash", "trace_address"), + ) + + +def downgrade(): + op.drop_table("swaps") diff --git a/alembic/versions/7eec417a4f3e_change_trace_addresses_to_array_types.py b/alembic/versions/7eec417a4f3e_change_trace_addresses_to_array_types.py new file mode 100644 index 0000000..b3d4f29 --- /dev/null +++ b/alembic/versions/7eec417a4f3e_change_trace_addresses_to_array_types.py @@ -0,0 +1,50 @@ +"""Change trace addresses to array types + +Revision ID: 7eec417a4f3e +Revises: 9d8c69b3dccb +Create Date: 2021-08-06 15:58:04.556762 + +""" +import sqlalchemy as sa +from alembic import op + + +# revision identifiers, used by Alembic. +revision = "7eec417a4f3e" +down_revision = "9d8c69b3dccb" +branch_labels = None +depends_on = None + + +def upgrade(): + op.drop_constraint("swaps_pkey", "swaps") + op.drop_column("swaps", "trace_address") + op.add_column("swaps", sa.Column("trace_address", sa.ARRAY(sa.Integer))) + op.create_primary_key("swaps_pkey", "swaps", ["transaction_hash", "trace_address"]) + + op.drop_constraint("classified_traces_pkey", "classified_traces") + op.drop_column("classified_traces", "trace_address") + op.add_column("classified_traces", sa.Column("trace_address", sa.ARRAY(sa.Integer))) + op.create_primary_key( + "classified_traces_pkey", + "classified_traces", + ["transaction_hash", "trace_address"], + ) + + +def downgrade(): + op.drop_constraint("swaps_pkey", "swaps") + op.drop_column("swaps", "trace_address") + op.add_column("swaps", sa.Column("trace_address", sa.String)) + + op.create_primary_key("swaps_pkey", "swaps", ["transaction_hash", "trace_address"]) + + op.drop_constraint("classified_traces_pkey", "classified_traces") + op.drop_column("classified_traces", "trace_address") + op.add_column("classified_traces", sa.Column("trace_address", sa.String)) + + op.create_primary_key( + "classified_traces_pkey", + "classified_traces", + ["transaction_hash", "trace_address"], + ) diff --git a/alembic/versions/9b8ae51c5d56_add_swap_arbitrage_join_table.py b/alembic/versions/9b8ae51c5d56_add_swap_arbitrage_join_table.py new file mode 100644 index 0000000..390c7a7 --- /dev/null +++ b/alembic/versions/9b8ae51c5d56_add_swap_arbitrage_join_table.py @@ -0,0 +1,38 @@ +"""Add swap arbitrage join table + +Revision ID: 9b8ae51c5d56 +Revises: 7eec417a4f3e +Create Date: 2021-08-06 17:06:55.364516 + +""" +import sqlalchemy as sa +from alembic import op + + +# revision identifiers, used by Alembic. +revision = "9b8ae51c5d56" +down_revision = "7eec417a4f3e" +branch_labels = None +depends_on = None + + +def upgrade(): + op.create_table( + "arbitrage_swaps", + sa.Column("created_at", sa.TIMESTAMP, server_default=sa.func.now()), + sa.Column("arbitrage_id", sa.String(1024), primary_key=True), + sa.Column("swap_transaction_hash", sa.String(66), primary_key=True), + sa.Column("swap_trace_address", sa.ARRAY(sa.Integer), primary_key=True), + sa.ForeignKeyConstraint( + ["arbitrage_id"], ["arbitrages.id"], ondelete="CASCADE" + ), + sa.ForeignKeyConstraint( + ["swap_transaction_hash", "swap_trace_address"], + ["swaps.transaction_hash", "swaps.trace_address"], + ondelete="CASCADE", + ), + ) + + +def downgrade(): + op.drop_table("arbitrage_swaps") diff --git a/alembic/versions/9d8c69b3dccb_add_arbitrages_and_swap_join_table.py b/alembic/versions/9d8c69b3dccb_add_arbitrages_and_swap_join_table.py new file mode 100644 index 0000000..69d42a1 --- /dev/null +++ b/alembic/versions/9d8c69b3dccb_add_arbitrages_and_swap_join_table.py @@ -0,0 +1,35 @@ +"""Add arbitrages and swap join table + +Revision ID: 9d8c69b3dccb +Revises: 2116e2f36a19 +Create Date: 2021-08-05 21:46:35.209199 + +""" +import sqlalchemy as sa +from alembic import op + + +# revision identifiers, used by Alembic. +revision = "9d8c69b3dccb" +down_revision = "2116e2f36a19" +branch_labels = None +depends_on = None + + +def upgrade(): + op.create_table( + "arbitrages", + sa.Column("id", sa.String(256), primary_key=True), + sa.Column("created_at", sa.TIMESTAMP, server_default=sa.func.now()), + sa.Column("account_address", sa.String(256), nullable=False), + sa.Column("profit_token_address", sa.String(256), nullable=False), + sa.Column("block_number", sa.Numeric, nullable=False), + sa.Column("transaction_hash", sa.String(256), nullable=False), + sa.Column("start_amount", sa.Numeric, nullable=False), + sa.Column("end_amount", sa.Numeric, nullable=False), + sa.Column("profit_amount", sa.Numeric, nullable=False), + ) + + +def downgrade(): + op.drop_table("arbitrages") diff --git a/mev_inspect/arbitrage.py b/mev_inspect/arbitrages.py similarity index 93% rename from mev_inspect/arbitrage.py rename to mev_inspect/arbitrages.py index f2fe761..08c13f4 100644 --- a/mev_inspect/arbitrage.py +++ b/mev_inspect/arbitrages.py @@ -1,7 +1,7 @@ from itertools import groupby from typing import List, Optional -from mev_inspect.schemas.arbitrage import Arbitrage +from mev_inspect.schemas.arbitrages import Arbitrage from mev_inspect.schemas.swaps import Swap @@ -70,6 +70,8 @@ def _get_arbitrage_starting_with_swap( return Arbitrage( swaps=swap_path, + block_number=start_swap.block_number, + transaction_hash=start_swap.transaction_hash, account_address=start_swap.from_address, profit_token_address=start_swap.token_in_address, start_amount=start_amount, diff --git a/mev_inspect/block.py b/mev_inspect/block.py index fed8b82..b8f26dc 100644 --- a/mev_inspect/block.py +++ b/mev_inspect/block.py @@ -12,7 +12,13 @@ cache_directory = "./cache" ## Creates a block object, either from the cache or from the chain itself ## Note that you need to pass in the provider, not the web3 wrapped provider object! ## This is because only the provider allows you to make json rpc requests -def create_from_block_number(block_number: int, base_provider) -> Block: +def create_from_block_number( + block_number: int, base_provider, should_cache: bool +) -> Block: + if not should_cache: + w3 = Web3(base_provider) + return fetch_block(w3, base_provider, block_number) + cache_path = _get_cache_path(block_number) if cache_path.is_file(): diff --git a/mev_inspect/crud/arbitrages.py b/mev_inspect/crud/arbitrages.py new file mode 100644 index 0000000..8d23e1d --- /dev/null +++ b/mev_inspect/crud/arbitrages.py @@ -0,0 +1,64 @@ +from typing import List +from uuid import uuid4 + +from mev_inspect.models.arbitrages import ArbitrageModel +from mev_inspect.schemas.arbitrages import Arbitrage + + +def delete_arbitrages_for_block( + db_session, + block_number: int, +) -> None: + ( + db_session.query(ArbitrageModel) + .filter(ArbitrageModel.block_number == block_number) + .delete() + ) + + db_session.commit() + + +def write_arbitrages( + db_session, + arbitrages: List[Arbitrage], +) -> None: + arbitrage_models = [] + swap_arbitrage_ids = [] + + for arbitrage in arbitrages: + arbitrage_id = str(uuid4()) + arbitrage_models.append( + ArbitrageModel( + id=arbitrage_id, + block_number=arbitrage.block_number, + transaction_hash=arbitrage.transaction_hash, + account_address=arbitrage.account_address, + profit_token_address=arbitrage.profit_token_address, + start_amount=arbitrage.start_amount, + end_amount=arbitrage.end_amount, + profit_amount=arbitrage.profit_amount, + ) + ) + + for swap in arbitrage.swaps: + swap_arbitrage_ids.append( + { + "arbitrage_id": arbitrage_id, + "swap_transaction_hash": swap.transaction_hash, + "swap_trace_address": swap.trace_address, + } + ) + + if len(arbitrage_models) > 0: + db_session.bulk_save_objects(arbitrage_models) + db_session.execute( + """ + INSERT INTO arbitrage_swaps + (arbitrage_id, swap_transaction_hash, swap_trace_address) + VALUES + (:arbitrage_id, :swap_transaction_hash, :swap_trace_address) + """, + params=swap_arbitrage_ids, + ) + + db_session.commit() diff --git a/mev_inspect/crud/swaps.py b/mev_inspect/crud/swaps.py new file mode 100644 index 0000000..4c51cd1 --- /dev/null +++ b/mev_inspect/crud/swaps.py @@ -0,0 +1,28 @@ +import json +from typing import List + +from mev_inspect.models.swaps import SwapModel +from mev_inspect.schemas.swaps import Swap + + +def delete_swaps_for_block( + db_session, + block_number: int, +) -> None: + ( + db_session.query(SwapModel) + .filter(SwapModel.block_number == block_number) + .delete() + ) + + db_session.commit() + + +def write_swaps( + db_session, + swaps: List[Swap], +) -> None: + models = [SwapModel(**json.loads(swap.json())) for swap in swaps] + + db_session.bulk_save_objects(models) + db_session.commit() diff --git a/mev_inspect/decode.py b/mev_inspect/decode.py index fc0a1ec..4fa2b57 100644 --- a/mev_inspect/decode.py +++ b/mev_inspect/decode.py @@ -2,6 +2,7 @@ from typing import Dict, Optional from hexbytes import HexBytes from eth_abi import decode_abi +from eth_abi.exceptions import InsufficientDataBytes, NonEmptyPaddingBytes from mev_inspect.schemas.abi import ABI, ABIFunctionDescription from mev_inspect.schemas.call_data import CallData @@ -27,7 +28,10 @@ class ABIDecoder: names = [input.name for input in func.inputs] types = [input.type for input in func.inputs] - decoded = decode_abi(types, params) + try: + decoded = decode_abi(types, params) + except (InsufficientDataBytes, NonEmptyPaddingBytes): + return None return CallData( function_name=func.name, diff --git a/mev_inspect/models/arbitrages.py b/mev_inspect/models/arbitrages.py new file mode 100644 index 0000000..a4dafb6 --- /dev/null +++ b/mev_inspect/models/arbitrages.py @@ -0,0 +1,16 @@ +from sqlalchemy import Column, Numeric, String + +from .base import Base + + +class ArbitrageModel(Base): + __tablename__ = "arbitrages" + + id = Column(String, primary_key=True) + block_number = Column(Numeric, nullable=False) + transaction_hash = Column(String, nullable=False) + account_address = Column(String, nullable=False) + profit_token_address = Column(String, nullable=False) + start_amount = Column(Numeric, nullable=False) + end_amount = Column(Numeric, nullable=False) + profit_amount = Column(Numeric, nullable=False) diff --git a/mev_inspect/models/classified_traces.py b/mev_inspect/models/classified_traces.py index bf898e0..529bf25 100644 --- a/mev_inspect/models/classified_traces.py +++ b/mev_inspect/models/classified_traces.py @@ -1,4 +1,4 @@ -from sqlalchemy import Column, JSON, Numeric, String +from sqlalchemy import Column, JSON, Numeric, String, ARRAY, Integer from .base import Base @@ -10,7 +10,7 @@ class ClassifiedTraceModel(Base): block_number = Column(Numeric, nullable=False) classification = Column(String, nullable=False) trace_type = Column(String, nullable=False) - trace_address = Column(String, nullable=False) + trace_address = Column(ARRAY(Integer), nullable=False) protocol = Column(String, nullable=True) abi_name = Column(String, nullable=True) function_name = Column(String, nullable=True) diff --git a/mev_inspect/models/swaps.py b/mev_inspect/models/swaps.py new file mode 100644 index 0000000..318aa24 --- /dev/null +++ b/mev_inspect/models/swaps.py @@ -0,0 +1,20 @@ +from sqlalchemy import Column, Numeric, String, ARRAY, Integer + +from .base import Base + + +class SwapModel(Base): + __tablename__ = "swaps" + + abi_name = Column(String, nullable=False) + transaction_hash = Column(String, primary_key=True) + block_number = Column(Numeric, nullable=False) + trace_address = Column(ARRAY(Integer), nullable=False) + protocol = Column(String, nullable=True) + pool_address = Column(String, nullable=False) + from_address = Column(String, nullable=False) + to_address = Column(String, nullable=False) + token_in_address = Column(String, nullable=False) + token_in_amount = Column(Numeric, nullable=False) + token_out_address = Column(String, nullable=False) + token_out_amount = Column(Numeric, nullable=False) diff --git a/mev_inspect/schemas/arbitrage.py b/mev_inspect/schemas/arbitrages.py similarity index 84% rename from mev_inspect/schemas/arbitrage.py rename to mev_inspect/schemas/arbitrages.py index 484ed9d..0c39917 100644 --- a/mev_inspect/schemas/arbitrage.py +++ b/mev_inspect/schemas/arbitrages.py @@ -7,6 +7,8 @@ from .swaps import Swap class Arbitrage(BaseModel): swaps: List[Swap] + block_number: int + transaction_hash: str account_address: str profit_token_address: str start_amount: int diff --git a/pyproject.toml b/pyproject.toml index b8ab469..4c5be19 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,6 +41,7 @@ build = 'scripts.poetry.docker:build' attach = 'scripts.poetry.docker:attach' exec = 'scripts.poetry.docker:exec' inspect = 'scripts.poetry.inspect:inspect' +inspect-many = 'scripts.poetry.inspect:inspect_many' [tool.black] exclude = ''' diff --git a/scripts/inspect_block.py b/scripts/inspect_block.py index 8536415..513fae0 100644 --- a/scripts/inspect_block.py +++ b/scripts/inspect_block.py @@ -4,24 +4,83 @@ import click from web3 import Web3 from mev_inspect import block +from mev_inspect.crud.arbitrages import ( + delete_arbitrages_for_block, + write_arbitrages, +) from mev_inspect.crud.classified_traces import ( delete_classified_traces_for_block, write_classified_traces, ) -from mev_inspect.arbitrage import get_arbitrages + +from mev_inspect.arbitrages import get_arbitrages from mev_inspect.classifiers.specs import ALL_CLASSIFIER_SPECS from mev_inspect.classifiers.trace import TraceClassifier +from mev_inspect.crud.swaps import delete_swaps_for_block, write_swaps from mev_inspect.db import get_session from mev_inspect.swaps import get_swaps -@click.command() +@click.group() +def cli(): + pass + + +@cli.command() @click.argument("block_number", type=int) @click.argument("rpc") -def inspect_block(block_number: int, rpc: str): +@click.option("--cache/--no-cache", default=True) +def inspect_block(block_number: int, rpc: str, cache: bool): base_provider = Web3.HTTPProvider(rpc) - block_data = block.create_from_block_number(block_number, base_provider) - print(f"Total traces: {len(block_data.traces)}") + + if not cache: + click.echo("Skipping cache") + + _inspect_block(base_provider, block_number, should_cache=cache) + + +@cli.command() +@click.argument("after_block", type=int) +@click.argument("before_block", type=int) +@click.argument("rpc") +@click.option("--cache/--no-cache", default=True) +def inspect_many_blocks(after_block: int, before_block: int, rpc: str, cache: bool): + base_provider = Web3.HTTPProvider(rpc) + + if not cache: + click.echo("Skipping cache") + + for block_number in range(after_block + 1, before_block): + _inspect_block( + base_provider, + block_number, + should_print_stats=False, + should_write_classified_traces=False, + should_cache=cache, + ) + + +def _inspect_block( + base_provider, + block_number: int, + should_cache: bool, + should_print_stats: bool = True, + should_write_classified_traces: bool = True, + should_write_swaps: bool = True, + should_write_arbitrages: bool = True, +): + + block_message = f"Running for {block_number}" + dashes = "-" * len(block_message) + click.echo(dashes) + click.echo(block_message) + click.echo(dashes) + + block_data = block.create_from_block_number( + block_number, base_provider, should_cache + ) + + click.echo(f"Total traces: {len(block_data.traces)}") total_transactions = len( set( @@ -30,44 +89,57 @@ def inspect_block(block_number: int, rpc: str): if t.transaction_hash is not None ) ) - print(f"Total transactions: {total_transactions}") + click.echo(f"Total transactions: {total_transactions}") trace_clasifier = TraceClassifier(ALL_CLASSIFIER_SPECS) classified_traces = trace_clasifier.classify(block_data.traces) - print(f"Returned {len(classified_traces)} classified traces") + click.echo(f"Returned {len(classified_traces)} classified traces") db_session = get_session() - delete_classified_traces_for_block(db_session, block_number) - write_classified_traces(db_session, classified_traces) - db_session.close() + + if should_write_classified_traces: + delete_classified_traces_for_block(db_session, block_number) + write_classified_traces(db_session, classified_traces) swaps = get_swaps(classified_traces) - print(f"Found {len(swaps)} swaps") + click.echo(f"Found {len(swaps)} swaps") + + if should_write_swaps: + delete_swaps_for_block(db_session, block_number) + write_swaps(db_session, swaps) arbitrages = get_arbitrages(swaps) - print(f"Found {len(arbitrages)} arbitrages") + click.echo(f"Found {len(arbitrages)} arbitrages") - stats = get_stats(classified_traces) - print(json.dumps(stats, indent=4)) + if should_write_arbitrages: + delete_arbitrages_for_block(db_session, block_number) + write_arbitrages(db_session, arbitrages) + + if should_print_stats: + stats = get_stats(classified_traces) + click.echo(json.dumps(stats, indent=4)) def get_stats(classified_traces) -> dict: stats: dict = {} for trace in classified_traces: + protocol = str(trace.protocol) abi_name = trace.abi_name classification = trace.classification.value signature = trace.function_signature - abi_name_stats = stats.get(abi_name, {}) + protocol_stats = stats.get(protocol, {}) + abi_name_stats = protocol_stats.get(abi_name, {}) class_stats = abi_name_stats.get(classification, {}) signature_count = class_stats.get(signature, 0) class_stats[signature] = signature_count + 1 abi_name_stats[classification] = class_stats - stats[abi_name] = abi_name_stats + protocol_stats[abi_name] = abi_name_stats + stats[protocol] = protocol_stats return stats if __name__ == "__main__": - inspect_block() + cli() diff --git a/scripts/poetry/inspect.py b/scripts/poetry/inspect.py index 1128925..df6979b 100644 --- a/scripts/poetry/inspect.py +++ b/scripts/poetry/inspect.py @@ -10,7 +10,12 @@ import click @click.option( "-r", "--rpc", help="rpc endpoint, this needs to have parity style traces" ) -def inspect(block_number: str, rpc: str): +@click.option( + "--cache/--no-cache", + help="whether to read / write to the cache", + default=True, +) +def inspect(block_number: str, rpc: str, cache: bool): check_call( [ "docker", @@ -19,7 +24,36 @@ def inspect(block_number: str, rpc: str): "mev-inspect", "python", "./scripts/inspect_block.py", + "inspect-block", block_number, rpc, + "--cache" if cache else "--no-cache", + ] + ) + + +@click.command() +@click.argument("after_block", type=str) +@click.argument("before_block", type=str) +@click.argument("rpc") +@click.option( + "--cache/--no-cache", + help="whether to read / write to the cache", + default=True, +) +def inspect_many(after_block: str, before_block: str, rpc: str, cache: bool): + check_call( + [ + "docker", + "compose", + "exec", + "mev-inspect", + "python", + "./scripts/inspect_block.py", + "inspect-many-blocks", + after_block, + before_block, + rpc, + "--cache" if cache else "--no-cache", ] ) diff --git a/tests/test_arbitrage_integration.py b/tests/test_arbitrage_integration.py index 210428f..14f9f77 100644 --- a/tests/test_arbitrage_integration.py +++ b/tests/test_arbitrage_integration.py @@ -1,6 +1,6 @@ +from mev_inspect.arbitrages import get_arbitrages from mev_inspect.classifiers.specs import ALL_CLASSIFIER_SPECS from mev_inspect.classifiers.trace import TraceClassifier -from mev_inspect.arbitrage import get_arbitrages from mev_inspect.swaps import get_swaps from .utils import load_test_block diff --git a/tests/test_arbitrages.py b/tests/test_arbitrages.py index 1eaa40a..18af3bd 100644 --- a/tests/test_arbitrages.py +++ b/tests/test_arbitrages.py @@ -1,4 +1,4 @@ -from mev_inspect.arbitrage import get_arbitrages +from mev_inspect.arbitrages import get_arbitrages from mev_inspect.schemas.swaps import Swap from mev_inspect.swaps import ( UNISWAP_V2_PAIR_ABI_NAME,