From fcfb40c86444708706a24ee4d2b463552c51577f Mon Sep 17 00:00:00 2001 From: Luke Van Seters Date: Tue, 21 Dec 2021 14:58:39 -0500 Subject: [PATCH] Add inspect many blocks - use for single inspect too --- mev_inspect/crud/arbitrages.py | 17 ++- mev_inspect/crud/blocks.py | 17 ++- mev_inspect/crud/liquidations.py | 17 ++- mev_inspect/crud/miner_payments.py | 17 ++- mev_inspect/crud/punks.py | 47 ++++--- mev_inspect/crud/sandwiches.py | 17 ++- mev_inspect/crud/shared.py | 20 +++ mev_inspect/crud/swaps.py | 17 ++- mev_inspect/crud/traces.py | 16 ++- mev_inspect/crud/transfers.py | 16 ++- mev_inspect/db.py | 11 +- mev_inspect/inspect_block.py | 202 ++++++++++++++++++++--------- 12 files changed, 281 insertions(+), 133 deletions(-) create mode 100644 mev_inspect/crud/shared.py diff --git a/mev_inspect/crud/arbitrages.py b/mev_inspect/crud/arbitrages.py index 8d23e1d..4ad4881 100644 --- a/mev_inspect/crud/arbitrages.py +++ b/mev_inspect/crud/arbitrages.py @@ -4,17 +4,20 @@ from uuid import uuid4 from mev_inspect.models.arbitrages import ArbitrageModel from mev_inspect.schemas.arbitrages import Arbitrage +from .shared import delete_by_block_range -def delete_arbitrages_for_block( + +def delete_arbitrages_for_blocks( db_session, - block_number: int, + after_block_number: int, + before_block_number: int, ) -> None: - ( - db_session.query(ArbitrageModel) - .filter(ArbitrageModel.block_number == block_number) - .delete() + delete_by_block_range( + db_session, + ArbitrageModel, + after_block_number, + before_block_number, ) - db_session.commit() diff --git a/mev_inspect/crud/blocks.py b/mev_inspect/crud/blocks.py index 90ead13..0869612 100644 --- a/mev_inspect/crud/blocks.py +++ b/mev_inspect/crud/blocks.py @@ -3,13 +3,22 @@ from datetime import datetime from mev_inspect.schemas.blocks import Block -def delete_block( +def delete_blocks( db_session, - block_number: int, + after_block_number: int, + before_block_number: int, ) -> None: db_session.execute( - "DELETE FROM blocks WHERE block_number = :block_number", - params={"block_number": block_number}, + """ + DELETE FROM blocks + WHERE + block_number >= :after_block_number AND + block_number < :before_block_number + """, + params={ + "after_block_number": after_block_number, + "before_block_number": before_block_number, + }, ) db_session.commit() diff --git a/mev_inspect/crud/liquidations.py b/mev_inspect/crud/liquidations.py index 2a58ead..c51c1b0 100644 --- a/mev_inspect/crud/liquidations.py +++ b/mev_inspect/crud/liquidations.py @@ -4,17 +4,20 @@ from typing import List from mev_inspect.models.liquidations import LiquidationModel from mev_inspect.schemas.liquidations import Liquidation +from .shared import delete_by_block_range -def delete_liquidations_for_block( + +def delete_liquidations_for_blocks( db_session, - block_number: int, + after_block_number: int, + before_block_number: int, ) -> None: - ( - db_session.query(LiquidationModel) - .filter(LiquidationModel.block_number == block_number) - .delete() + delete_by_block_range( + db_session, + LiquidationModel, + after_block_number, + before_block_number, ) - db_session.commit() diff --git a/mev_inspect/crud/miner_payments.py b/mev_inspect/crud/miner_payments.py index e82bf96..acf688f 100644 --- a/mev_inspect/crud/miner_payments.py +++ b/mev_inspect/crud/miner_payments.py @@ -4,17 +4,20 @@ from typing import List from mev_inspect.models.miner_payments import MinerPaymentModel from mev_inspect.schemas.miner_payments import MinerPayment +from .shared import delete_by_block_range -def delete_miner_payments_for_block( + +def delete_miner_payments_for_blocks( db_session, - block_number: int, + after_block_number: int, + before_block_number: int, ) -> None: - ( - db_session.query(MinerPaymentModel) - .filter(MinerPaymentModel.block_number == block_number) - .delete() + delete_by_block_range( + db_session, + MinerPaymentModel, + after_block_number, + before_block_number, ) - db_session.commit() diff --git a/mev_inspect/crud/punks.py b/mev_inspect/crud/punks.py index 0abaf32..88ad5f6 100644 --- a/mev_inspect/crud/punks.py +++ b/mev_inspect/crud/punks.py @@ -10,17 +10,20 @@ from mev_inspect.schemas.punk_accept_bid import PunkBidAcceptance from mev_inspect.schemas.punk_bid import PunkBid from mev_inspect.schemas.punk_snipe import PunkSnipe +from .shared import delete_by_block_range -def delete_punk_bid_acceptances_for_block( + +def delete_punk_bid_acceptances_for_blocks( db_session, - block_number: int, + after_block_number: int, + before_block_number: int, ) -> None: - ( - db_session.query(PunkBidAcceptanceModel) - .filter(PunkBidAcceptanceModel.block_number == block_number) - .delete() + delete_by_block_range( + db_session, + PunkBidAcceptanceModel, + after_block_number, + before_block_number, ) - db_session.commit() @@ -37,16 +40,17 @@ def write_punk_bid_acceptances( db_session.commit() -def delete_punk_bids_for_block( +def delete_punk_bids_for_blocks( db_session, - block_number: int, + after_block_number: int, + before_block_number: int, ) -> None: - ( - db_session.query(PunkBidModel) - .filter(PunkBidModel.block_number == block_number) - .delete() + delete_by_block_range( + db_session, + PunkBidModel, + after_block_number, + before_block_number, ) - db_session.commit() @@ -60,16 +64,17 @@ def write_punk_bids( db_session.commit() -def delete_punk_snipes_for_block( +def delete_punk_snipes_for_blocks( db_session, - block_number: int, + after_block_number: int, + before_block_number: int, ) -> None: - ( - db_session.query(PunkSnipeModel) - .filter(PunkSnipeModel.block_number == block_number) - .delete() + delete_by_block_range( + db_session, + PunkSnipeModel, + after_block_number, + before_block_number, ) - db_session.commit() diff --git a/mev_inspect/crud/sandwiches.py b/mev_inspect/crud/sandwiches.py index ce08760..6a85def 100644 --- a/mev_inspect/crud/sandwiches.py +++ b/mev_inspect/crud/sandwiches.py @@ -4,17 +4,20 @@ from uuid import uuid4 from mev_inspect.models.sandwiches import SandwichModel from mev_inspect.schemas.sandwiches import Sandwich +from .shared import delete_by_block_range -def delete_sandwiches_for_block( + +def delete_sandwiches_for_blocks( db_session, - block_number: int, + after_block_number: int, + before_block_number: int, ) -> None: - ( - db_session.query(SandwichModel) - .filter(SandwichModel.block_number == block_number) - .delete() + delete_by_block_range( + db_session, + SandwichModel, + after_block_number, + before_block_number, ) - db_session.commit() diff --git a/mev_inspect/crud/shared.py b/mev_inspect/crud/shared.py new file mode 100644 index 0000000..d7c958c --- /dev/null +++ b/mev_inspect/crud/shared.py @@ -0,0 +1,20 @@ +from typing import Type + +from mev_inspect.models.base import Base + + +def delete_by_block_range( + db_session, + model_class: Type[Base], + after_block_number, + before_block_number, +) -> None: + + ( + db_session.query(model_class) + .filter(model_class.block_number >= after_block_number) + .filter(model_class.block_number < before_block_number) + .delete() + ) + + db_session.commit() diff --git a/mev_inspect/crud/swaps.py b/mev_inspect/crud/swaps.py index 4c51cd1..c12e08d 100644 --- a/mev_inspect/crud/swaps.py +++ b/mev_inspect/crud/swaps.py @@ -4,17 +4,20 @@ from typing import List from mev_inspect.models.swaps import SwapModel from mev_inspect.schemas.swaps import Swap +from .shared import delete_by_block_range -def delete_swaps_for_block( + +def delete_swaps_for_blocks( db_session, - block_number: int, + after_block_number: int, + before_block_number: int, ) -> None: - ( - db_session.query(SwapModel) - .filter(SwapModel.block_number == block_number) - .delete() + delete_by_block_range( + db_session, + SwapModel, + after_block_number, + before_block_number, ) - db_session.commit() diff --git a/mev_inspect/crud/traces.py b/mev_inspect/crud/traces.py index 76e45a1..0f099f6 100644 --- a/mev_inspect/crud/traces.py +++ b/mev_inspect/crud/traces.py @@ -4,15 +4,19 @@ from typing import List from mev_inspect.models.traces import ClassifiedTraceModel from mev_inspect.schemas.traces import ClassifiedTrace +from .shared import delete_by_block_range -def delete_classified_traces_for_block( + +def delete_classified_traces_for_blocks( db_session, - block_number: int, + after_block_number: int, + before_block_number: int, ) -> None: - ( - db_session.query(ClassifiedTraceModel) - .filter(ClassifiedTraceModel.block_number == block_number) - .delete() + delete_by_block_range( + db_session, + ClassifiedTraceModel, + after_block_number, + before_block_number, ) db_session.commit() diff --git a/mev_inspect/crud/transfers.py b/mev_inspect/crud/transfers.py index 7aa5adb..5e61046 100644 --- a/mev_inspect/crud/transfers.py +++ b/mev_inspect/crud/transfers.py @@ -4,15 +4,19 @@ from typing import List from mev_inspect.models.transfers import TransferModel from mev_inspect.schemas.transfers import Transfer +from .shared import delete_by_block_range -def delete_transfers_for_block( + +def delete_transfers_for_blocks( db_session, - block_number: int, + after_block_number: int, + before_block_number: int, ) -> None: - ( - db_session.query(TransferModel) - .filter(TransferModel.block_number == block_number) - .delete() + delete_by_block_range( + db_session, + TransferModel, + after_block_number, + before_block_number, ) db_session.commit() diff --git a/mev_inspect/db.py b/mev_inspect/db.py index 9cdaa48..4d87b49 100644 --- a/mev_inspect/db.py +++ b/mev_inspect/db.py @@ -12,7 +12,7 @@ def get_trace_database_uri() -> Optional[str]: db_name = "trace_db" if all(field is not None for field in [username, password, host]): - return f"postgresql://{username}:{password}@{host}/{db_name}" + return f"postgresql+psycopg2://{username}:{password}@{host}/{db_name}" return None @@ -22,11 +22,16 @@ def get_inspect_database_uri(): password = os.getenv("POSTGRES_PASSWORD") host = os.getenv("POSTGRES_HOST") db_name = "mev_inspect" - return f"postgresql://{username}:{password}@{host}/{db_name}" + return f"postgresql+psycopg2://{username}:{password}@{host}/{db_name}" def _get_engine(uri: str): - return create_engine(uri) + return create_engine( + uri, + executemany_mode="values", + executemany_values_page_size=10000, + executemany_batch_page_size=500, + ) def _get_session(uri: str): diff --git a/mev_inspect/inspect_block.py b/mev_inspect/inspect_block.py index d801315..0d33a4a 100644 --- a/mev_inspect/inspect_block.py +++ b/mev_inspect/inspect_block.py @@ -1,5 +1,5 @@ import logging -from typing import Optional +from typing import List, Optional from sqlalchemy import orm from web3 import Web3 @@ -7,35 +7,46 @@ from web3 import Web3 from mev_inspect.arbitrages import get_arbitrages from mev_inspect.block import create_from_block_number from mev_inspect.classifiers.trace import TraceClassifier -from mev_inspect.crud.arbitrages import delete_arbitrages_for_block, write_arbitrages -from mev_inspect.crud.blocks import delete_block, write_block +from mev_inspect.crud.arbitrages import delete_arbitrages_for_blocks, write_arbitrages +from mev_inspect.crud.blocks import delete_blocks, write_block from mev_inspect.crud.liquidations import ( - delete_liquidations_for_block, + delete_liquidations_for_blocks, write_liquidations, ) from mev_inspect.crud.miner_payments import ( - delete_miner_payments_for_block, + delete_miner_payments_for_blocks, write_miner_payments, ) from mev_inspect.crud.punks import ( - delete_punk_bid_acceptances_for_block, - delete_punk_bids_for_block, - delete_punk_snipes_for_block, + delete_punk_bid_acceptances_for_blocks, + delete_punk_bids_for_blocks, + delete_punk_snipes_for_blocks, write_punk_bid_acceptances, write_punk_bids, write_punk_snipes, ) -from mev_inspect.crud.sandwiches import delete_sandwiches_for_block, write_sandwiches -from mev_inspect.crud.swaps import delete_swaps_for_block, write_swaps +from mev_inspect.crud.sandwiches import delete_sandwiches_for_blocks, write_sandwiches +from mev_inspect.crud.swaps import delete_swaps_for_blocks, write_swaps from mev_inspect.crud.traces import ( - delete_classified_traces_for_block, + delete_classified_traces_for_blocks, write_classified_traces, ) -from mev_inspect.crud.transfers import delete_transfers_for_block, write_transfers +from mev_inspect.crud.transfers import delete_transfers_for_blocks, write_transfers from mev_inspect.liquidations import get_liquidations from mev_inspect.miner_payments import get_miner_payments from mev_inspect.punks import get_punk_bid_acceptances, get_punk_bids, get_punk_snipes from mev_inspect.sandwiches import get_sandwiches +from mev_inspect.schemas.arbitrages import Arbitrage +from mev_inspect.schemas.blocks import Block +from mev_inspect.schemas.liquidations import Liquidation +from mev_inspect.schemas.miner_payments import MinerPayment +from mev_inspect.schemas.punk_accept_bid import PunkBidAcceptance +from mev_inspect.schemas.punk_bid import PunkBid +from mev_inspect.schemas.punk_snipe import PunkSnipe +from mev_inspect.schemas.sandwiches import Sandwich +from mev_inspect.schemas.swaps import Swap +from mev_inspect.schemas.traces import ClassifiedTrace +from mev_inspect.schemas.transfers import Transfer from mev_inspect.swaps import get_swaps from mev_inspect.transfers import get_transfers @@ -51,79 +62,154 @@ async def inspect_block( trace_db_session: Optional[orm.Session], should_write_classified_traces: bool = True, ): - block = await create_from_block_number( + await inspect_many_blocks( + inspect_db_session, base_provider, w3, + trace_classifier, block_number, + block_number + 1, trace_db_session, + should_write_classified_traces, ) - logger.info(f"Block: {block_number} -- Total traces: {len(block.traces)}") - delete_block(inspect_db_session, block_number) +async def inspect_many_blocks( + inspect_db_session: orm.Session, + base_provider, + w3: Web3, + trace_classifier: TraceClassifier, + after_block_number: int, + before_block_number: int, + trace_db_session: Optional[orm.Session], + should_write_classified_traces: bool = True, +): + all_blocks: List[Block] = [] + all_classified_traces: List[ClassifiedTrace] = [] + all_transfers: List[Transfer] = [] + all_swaps: List[Swap] = [] + all_arbitrages: List[Arbitrage] = [] + all_liqudations: List[Liquidation] = [] + all_sandwiches: List[Sandwich] = [] + + all_punk_bids: List[PunkBid] = [] + all_punk_bid_acceptances: List[PunkBidAcceptance] = [] + all_punk_snipes: List[PunkSnipe] = [] + + all_miner_payments: List[MinerPayment] = [] + + for block_number in range(after_block_number, before_block_number): + block = await create_from_block_number( + base_provider, + w3, + block_number, + trace_db_session, + ) + + logger.info(f"Block: {block_number} -- Total traces: {len(block.traces)}") + + total_transactions = len( + set( + t.transaction_hash + for t in block.traces + if t.transaction_hash is not None + ) + ) + logger.info( + f"Block: {block_number} -- Total transactions: {total_transactions}" + ) + + classified_traces = trace_classifier.classify(block.traces) + logger.info( + f"Block: {block_number} -- Returned {len(classified_traces)} classified traces" + ) + + transfers = get_transfers(classified_traces) + logger.info(f"Block: {block_number} -- Found {len(transfers)} transfers") + + swaps = get_swaps(classified_traces) + logger.info(f"Block: {block_number} -- Found {len(swaps)} swaps") + + arbitrages = get_arbitrages(swaps) + logger.info(f"Block: {block_number} -- Found {len(arbitrages)} arbitrages") + + liquidations = get_liquidations(classified_traces) + logger.info(f"Block: {block_number} -- Found {len(liquidations)} liquidations") + + sandwiches = get_sandwiches(swaps) + logger.info(f"Block: {block_number} -- Found {len(sandwiches)} sandwiches") + + punk_bids = get_punk_bids(classified_traces) + punk_bid_acceptances = get_punk_bid_acceptances(classified_traces) + punk_snipes = get_punk_snipes(punk_bids, punk_bid_acceptances) + logger.info(f"Block: {block_number} -- Found {len(punk_snipes)} punk snipes") + + miner_payments = get_miner_payments( + block.miner, block.base_fee_per_gas, classified_traces, block.receipts + ) + + all_blocks.append(block) + all_classified_traces.extend(classified_traces) + all_transfers.extend(transfers) + all_swaps.extend(swaps) + all_arbitrages.extend(arbitrages) + all_liqudations.extend(liquidations) + all_sandwiches.extend(sandwiches) + + all_punk_bids.extend(punk_bids) + all_punk_bid_acceptances.extend(punk_bid_acceptances) + all_punk_snipes.extend(punk_snipes) + + all_miner_payments.extend(miner_payments) + + delete_blocks(inspect_db_session, after_block_number, before_block_number) write_block(inspect_db_session, block) - total_transactions = len( - set(t.transaction_hash for t in block.traces if t.transaction_hash is not None) - ) - logger.info(f"Block: {block_number} -- Total transactions: {total_transactions}") - - classified_traces = trace_classifier.classify(block.traces) - logger.info( - f"Block: {block_number} -- Returned {len(classified_traces)} classified traces" - ) - if should_write_classified_traces: - delete_classified_traces_for_block(inspect_db_session, block_number) + delete_classified_traces_for_blocks( + inspect_db_session, after_block_number, before_block_number + ) write_classified_traces(inspect_db_session, classified_traces) - transfers = get_transfers(classified_traces) - logger.info(f"Block: {block_number} -- Found {len(transfers)} transfers") - - delete_transfers_for_block(inspect_db_session, block_number) + delete_transfers_for_blocks( + inspect_db_session, after_block_number, before_block_number + ) write_transfers(inspect_db_session, transfers) - swaps = get_swaps(classified_traces) - logger.info(f"Block: {block_number} -- Found {len(swaps)} swaps") - - delete_swaps_for_block(inspect_db_session, block_number) + delete_swaps_for_blocks(inspect_db_session, after_block_number, before_block_number) write_swaps(inspect_db_session, swaps) - arbitrages = get_arbitrages(swaps) - logger.info(f"Block: {block_number} -- Found {len(arbitrages)} arbitrages") - - delete_arbitrages_for_block(inspect_db_session, block_number) + delete_arbitrages_for_blocks( + inspect_db_session, after_block_number, before_block_number + ) write_arbitrages(inspect_db_session, arbitrages) - liquidations = get_liquidations(classified_traces) - logger.info(f"Block: {block_number} -- Found {len(liquidations)} liquidations") - - delete_liquidations_for_block(inspect_db_session, block_number) + delete_liquidations_for_blocks( + inspect_db_session, after_block_number, before_block_number + ) write_liquidations(inspect_db_session, liquidations) - sandwiches = get_sandwiches(swaps) - logger.info(f"Block: {block_number} -- Found {len(sandwiches)} sandwiches") - - delete_sandwiches_for_block(inspect_db_session, block_number) + delete_sandwiches_for_blocks( + inspect_db_session, after_block_number, before_block_number + ) write_sandwiches(inspect_db_session, sandwiches) - punk_bids = get_punk_bids(classified_traces) - delete_punk_bids_for_block(inspect_db_session, block_number) + delete_punk_bids_for_blocks( + inspect_db_session, after_block_number, before_block_number + ) write_punk_bids(inspect_db_session, punk_bids) - punk_bid_acceptances = get_punk_bid_acceptances(classified_traces) - delete_punk_bid_acceptances_for_block(inspect_db_session, block_number) + delete_punk_bid_acceptances_for_blocks( + inspect_db_session, after_block_number, before_block_number + ) write_punk_bid_acceptances(inspect_db_session, punk_bid_acceptances) - punk_snipes = get_punk_snipes(punk_bids, punk_bid_acceptances) - logger.info(f"Block: {block_number} -- Found {len(punk_snipes)} punk snipes") - - delete_punk_snipes_for_block(inspect_db_session, block_number) + delete_punk_snipes_for_blocks( + inspect_db_session, after_block_number, before_block_number + ) write_punk_snipes(inspect_db_session, punk_snipes) - miner_payments = get_miner_payments( - block.miner, block.base_fee_per_gas, classified_traces, block.receipts + delete_miner_payments_for_blocks( + inspect_db_session, after_block_number, before_block_number ) - - delete_miner_payments_for_block(inspect_db_session, block_number) write_miner_payments(inspect_db_session, miner_payments)