Add inspect many blocks - use for single inspect too

This commit is contained in:
Luke Van Seters 2021-12-21 14:58:39 -05:00
parent 82c167d842
commit fcfb40c864
12 changed files with 281 additions and 133 deletions

View File

@ -4,17 +4,20 @@ from uuid import uuid4
from mev_inspect.models.arbitrages import ArbitrageModel
from mev_inspect.schemas.arbitrages import Arbitrage
from .shared import delete_by_block_range
def delete_arbitrages_for_block(
def delete_arbitrages_for_blocks(
db_session,
block_number: int,
after_block_number: int,
before_block_number: int,
) -> None:
(
db_session.query(ArbitrageModel)
.filter(ArbitrageModel.block_number == block_number)
.delete()
delete_by_block_range(
db_session,
ArbitrageModel,
after_block_number,
before_block_number,
)
db_session.commit()

View File

@ -3,13 +3,22 @@ from datetime import datetime
from mev_inspect.schemas.blocks import Block
def delete_block(
def delete_blocks(
db_session,
block_number: int,
after_block_number: int,
before_block_number: int,
) -> None:
db_session.execute(
"DELETE FROM blocks WHERE block_number = :block_number",
params={"block_number": block_number},
"""
DELETE FROM blocks
WHERE
block_number >= :after_block_number AND
block_number < :before_block_number
""",
params={
"after_block_number": after_block_number,
"before_block_number": before_block_number,
},
)
db_session.commit()

View File

@ -4,17 +4,20 @@ from typing import List
from mev_inspect.models.liquidations import LiquidationModel
from mev_inspect.schemas.liquidations import Liquidation
from .shared import delete_by_block_range
def delete_liquidations_for_block(
def delete_liquidations_for_blocks(
db_session,
block_number: int,
after_block_number: int,
before_block_number: int,
) -> None:
(
db_session.query(LiquidationModel)
.filter(LiquidationModel.block_number == block_number)
.delete()
delete_by_block_range(
db_session,
LiquidationModel,
after_block_number,
before_block_number,
)
db_session.commit()

View File

@ -4,17 +4,20 @@ from typing import List
from mev_inspect.models.miner_payments import MinerPaymentModel
from mev_inspect.schemas.miner_payments import MinerPayment
from .shared import delete_by_block_range
def delete_miner_payments_for_block(
def delete_miner_payments_for_blocks(
db_session,
block_number: int,
after_block_number: int,
before_block_number: int,
) -> None:
(
db_session.query(MinerPaymentModel)
.filter(MinerPaymentModel.block_number == block_number)
.delete()
delete_by_block_range(
db_session,
MinerPaymentModel,
after_block_number,
before_block_number,
)
db_session.commit()

View File

@ -10,17 +10,20 @@ from mev_inspect.schemas.punk_accept_bid import PunkBidAcceptance
from mev_inspect.schemas.punk_bid import PunkBid
from mev_inspect.schemas.punk_snipe import PunkSnipe
from .shared import delete_by_block_range
def delete_punk_bid_acceptances_for_block(
def delete_punk_bid_acceptances_for_blocks(
db_session,
block_number: int,
after_block_number: int,
before_block_number: int,
) -> None:
(
db_session.query(PunkBidAcceptanceModel)
.filter(PunkBidAcceptanceModel.block_number == block_number)
.delete()
delete_by_block_range(
db_session,
PunkBidAcceptanceModel,
after_block_number,
before_block_number,
)
db_session.commit()
@ -37,16 +40,17 @@ def write_punk_bid_acceptances(
db_session.commit()
def delete_punk_bids_for_block(
def delete_punk_bids_for_blocks(
db_session,
block_number: int,
after_block_number: int,
before_block_number: int,
) -> None:
(
db_session.query(PunkBidModel)
.filter(PunkBidModel.block_number == block_number)
.delete()
delete_by_block_range(
db_session,
PunkBidModel,
after_block_number,
before_block_number,
)
db_session.commit()
@ -60,16 +64,17 @@ def write_punk_bids(
db_session.commit()
def delete_punk_snipes_for_block(
def delete_punk_snipes_for_blocks(
db_session,
block_number: int,
after_block_number: int,
before_block_number: int,
) -> None:
(
db_session.query(PunkSnipeModel)
.filter(PunkSnipeModel.block_number == block_number)
.delete()
delete_by_block_range(
db_session,
PunkSnipeModel,
after_block_number,
before_block_number,
)
db_session.commit()

View File

@ -4,17 +4,20 @@ from uuid import uuid4
from mev_inspect.models.sandwiches import SandwichModel
from mev_inspect.schemas.sandwiches import Sandwich
from .shared import delete_by_block_range
def delete_sandwiches_for_block(
def delete_sandwiches_for_blocks(
db_session,
block_number: int,
after_block_number: int,
before_block_number: int,
) -> None:
(
db_session.query(SandwichModel)
.filter(SandwichModel.block_number == block_number)
.delete()
delete_by_block_range(
db_session,
SandwichModel,
after_block_number,
before_block_number,
)
db_session.commit()

View File

@ -0,0 +1,20 @@
from typing import Type
from mev_inspect.models.base import Base
def delete_by_block_range(
db_session,
model_class: Type[Base],
after_block_number,
before_block_number,
) -> None:
(
db_session.query(model_class)
.filter(model_class.block_number >= after_block_number)
.filter(model_class.block_number < before_block_number)
.delete()
)
db_session.commit()

View File

@ -4,17 +4,20 @@ from typing import List
from mev_inspect.models.swaps import SwapModel
from mev_inspect.schemas.swaps import Swap
from .shared import delete_by_block_range
def delete_swaps_for_block(
def delete_swaps_for_blocks(
db_session,
block_number: int,
after_block_number: int,
before_block_number: int,
) -> None:
(
db_session.query(SwapModel)
.filter(SwapModel.block_number == block_number)
.delete()
delete_by_block_range(
db_session,
SwapModel,
after_block_number,
before_block_number,
)
db_session.commit()

View File

@ -4,15 +4,19 @@ from typing import List
from mev_inspect.models.traces import ClassifiedTraceModel
from mev_inspect.schemas.traces import ClassifiedTrace
from .shared import delete_by_block_range
def delete_classified_traces_for_block(
def delete_classified_traces_for_blocks(
db_session,
block_number: int,
after_block_number: int,
before_block_number: int,
) -> None:
(
db_session.query(ClassifiedTraceModel)
.filter(ClassifiedTraceModel.block_number == block_number)
.delete()
delete_by_block_range(
db_session,
ClassifiedTraceModel,
after_block_number,
before_block_number,
)
db_session.commit()

View File

@ -4,15 +4,19 @@ from typing import List
from mev_inspect.models.transfers import TransferModel
from mev_inspect.schemas.transfers import Transfer
from .shared import delete_by_block_range
def delete_transfers_for_block(
def delete_transfers_for_blocks(
db_session,
block_number: int,
after_block_number: int,
before_block_number: int,
) -> None:
(
db_session.query(TransferModel)
.filter(TransferModel.block_number == block_number)
.delete()
delete_by_block_range(
db_session,
TransferModel,
after_block_number,
before_block_number,
)
db_session.commit()

View File

@ -12,7 +12,7 @@ def get_trace_database_uri() -> Optional[str]:
db_name = "trace_db"
if all(field is not None for field in [username, password, host]):
return f"postgresql://{username}:{password}@{host}/{db_name}"
return f"postgresql+psycopg2://{username}:{password}@{host}/{db_name}"
return None
@ -22,11 +22,16 @@ def get_inspect_database_uri():
password = os.getenv("POSTGRES_PASSWORD")
host = os.getenv("POSTGRES_HOST")
db_name = "mev_inspect"
return f"postgresql://{username}:{password}@{host}/{db_name}"
return f"postgresql+psycopg2://{username}:{password}@{host}/{db_name}"
def _get_engine(uri: str):
return create_engine(uri)
return create_engine(
uri,
executemany_mode="values",
executemany_values_page_size=10000,
executemany_batch_page_size=500,
)
def _get_session(uri: str):

View File

@ -1,5 +1,5 @@
import logging
from typing import Optional
from typing import List, Optional
from sqlalchemy import orm
from web3 import Web3
@ -7,35 +7,46 @@ from web3 import Web3
from mev_inspect.arbitrages import get_arbitrages
from mev_inspect.block import create_from_block_number
from mev_inspect.classifiers.trace import TraceClassifier
from mev_inspect.crud.arbitrages import delete_arbitrages_for_block, write_arbitrages
from mev_inspect.crud.blocks import delete_block, write_block
from mev_inspect.crud.arbitrages import delete_arbitrages_for_blocks, write_arbitrages
from mev_inspect.crud.blocks import delete_blocks, write_block
from mev_inspect.crud.liquidations import (
delete_liquidations_for_block,
delete_liquidations_for_blocks,
write_liquidations,
)
from mev_inspect.crud.miner_payments import (
delete_miner_payments_for_block,
delete_miner_payments_for_blocks,
write_miner_payments,
)
from mev_inspect.crud.punks import (
delete_punk_bid_acceptances_for_block,
delete_punk_bids_for_block,
delete_punk_snipes_for_block,
delete_punk_bid_acceptances_for_blocks,
delete_punk_bids_for_blocks,
delete_punk_snipes_for_blocks,
write_punk_bid_acceptances,
write_punk_bids,
write_punk_snipes,
)
from mev_inspect.crud.sandwiches import delete_sandwiches_for_block, write_sandwiches
from mev_inspect.crud.swaps import delete_swaps_for_block, write_swaps
from mev_inspect.crud.sandwiches import delete_sandwiches_for_blocks, write_sandwiches
from mev_inspect.crud.swaps import delete_swaps_for_blocks, write_swaps
from mev_inspect.crud.traces import (
delete_classified_traces_for_block,
delete_classified_traces_for_blocks,
write_classified_traces,
)
from mev_inspect.crud.transfers import delete_transfers_for_block, write_transfers
from mev_inspect.crud.transfers import delete_transfers_for_blocks, write_transfers
from mev_inspect.liquidations import get_liquidations
from mev_inspect.miner_payments import get_miner_payments
from mev_inspect.punks import get_punk_bid_acceptances, get_punk_bids, get_punk_snipes
from mev_inspect.sandwiches import get_sandwiches
from mev_inspect.schemas.arbitrages import Arbitrage
from mev_inspect.schemas.blocks import Block
from mev_inspect.schemas.liquidations import Liquidation
from mev_inspect.schemas.miner_payments import MinerPayment
from mev_inspect.schemas.punk_accept_bid import PunkBidAcceptance
from mev_inspect.schemas.punk_bid import PunkBid
from mev_inspect.schemas.punk_snipe import PunkSnipe
from mev_inspect.schemas.sandwiches import Sandwich
from mev_inspect.schemas.swaps import Swap
from mev_inspect.schemas.traces import ClassifiedTrace
from mev_inspect.schemas.transfers import Transfer
from mev_inspect.swaps import get_swaps
from mev_inspect.transfers import get_transfers
@ -51,79 +62,154 @@ async def inspect_block(
trace_db_session: Optional[orm.Session],
should_write_classified_traces: bool = True,
):
block = await create_from_block_number(
await inspect_many_blocks(
inspect_db_session,
base_provider,
w3,
trace_classifier,
block_number,
block_number + 1,
trace_db_session,
should_write_classified_traces,
)
logger.info(f"Block: {block_number} -- Total traces: {len(block.traces)}")
delete_block(inspect_db_session, block_number)
async def inspect_many_blocks(
inspect_db_session: orm.Session,
base_provider,
w3: Web3,
trace_classifier: TraceClassifier,
after_block_number: int,
before_block_number: int,
trace_db_session: Optional[orm.Session],
should_write_classified_traces: bool = True,
):
all_blocks: List[Block] = []
all_classified_traces: List[ClassifiedTrace] = []
all_transfers: List[Transfer] = []
all_swaps: List[Swap] = []
all_arbitrages: List[Arbitrage] = []
all_liqudations: List[Liquidation] = []
all_sandwiches: List[Sandwich] = []
all_punk_bids: List[PunkBid] = []
all_punk_bid_acceptances: List[PunkBidAcceptance] = []
all_punk_snipes: List[PunkSnipe] = []
all_miner_payments: List[MinerPayment] = []
for block_number in range(after_block_number, before_block_number):
block = await create_from_block_number(
base_provider,
w3,
block_number,
trace_db_session,
)
logger.info(f"Block: {block_number} -- Total traces: {len(block.traces)}")
total_transactions = len(
set(
t.transaction_hash
for t in block.traces
if t.transaction_hash is not None
)
)
logger.info(
f"Block: {block_number} -- Total transactions: {total_transactions}"
)
classified_traces = trace_classifier.classify(block.traces)
logger.info(
f"Block: {block_number} -- Returned {len(classified_traces)} classified traces"
)
transfers = get_transfers(classified_traces)
logger.info(f"Block: {block_number} -- Found {len(transfers)} transfers")
swaps = get_swaps(classified_traces)
logger.info(f"Block: {block_number} -- Found {len(swaps)} swaps")
arbitrages = get_arbitrages(swaps)
logger.info(f"Block: {block_number} -- Found {len(arbitrages)} arbitrages")
liquidations = get_liquidations(classified_traces)
logger.info(f"Block: {block_number} -- Found {len(liquidations)} liquidations")
sandwiches = get_sandwiches(swaps)
logger.info(f"Block: {block_number} -- Found {len(sandwiches)} sandwiches")
punk_bids = get_punk_bids(classified_traces)
punk_bid_acceptances = get_punk_bid_acceptances(classified_traces)
punk_snipes = get_punk_snipes(punk_bids, punk_bid_acceptances)
logger.info(f"Block: {block_number} -- Found {len(punk_snipes)} punk snipes")
miner_payments = get_miner_payments(
block.miner, block.base_fee_per_gas, classified_traces, block.receipts
)
all_blocks.append(block)
all_classified_traces.extend(classified_traces)
all_transfers.extend(transfers)
all_swaps.extend(swaps)
all_arbitrages.extend(arbitrages)
all_liqudations.extend(liquidations)
all_sandwiches.extend(sandwiches)
all_punk_bids.extend(punk_bids)
all_punk_bid_acceptances.extend(punk_bid_acceptances)
all_punk_snipes.extend(punk_snipes)
all_miner_payments.extend(miner_payments)
delete_blocks(inspect_db_session, after_block_number, before_block_number)
write_block(inspect_db_session, block)
total_transactions = len(
set(t.transaction_hash for t in block.traces if t.transaction_hash is not None)
)
logger.info(f"Block: {block_number} -- Total transactions: {total_transactions}")
classified_traces = trace_classifier.classify(block.traces)
logger.info(
f"Block: {block_number} -- Returned {len(classified_traces)} classified traces"
)
if should_write_classified_traces:
delete_classified_traces_for_block(inspect_db_session, block_number)
delete_classified_traces_for_blocks(
inspect_db_session, after_block_number, before_block_number
)
write_classified_traces(inspect_db_session, classified_traces)
transfers = get_transfers(classified_traces)
logger.info(f"Block: {block_number} -- Found {len(transfers)} transfers")
delete_transfers_for_block(inspect_db_session, block_number)
delete_transfers_for_blocks(
inspect_db_session, after_block_number, before_block_number
)
write_transfers(inspect_db_session, transfers)
swaps = get_swaps(classified_traces)
logger.info(f"Block: {block_number} -- Found {len(swaps)} swaps")
delete_swaps_for_block(inspect_db_session, block_number)
delete_swaps_for_blocks(inspect_db_session, after_block_number, before_block_number)
write_swaps(inspect_db_session, swaps)
arbitrages = get_arbitrages(swaps)
logger.info(f"Block: {block_number} -- Found {len(arbitrages)} arbitrages")
delete_arbitrages_for_block(inspect_db_session, block_number)
delete_arbitrages_for_blocks(
inspect_db_session, after_block_number, before_block_number
)
write_arbitrages(inspect_db_session, arbitrages)
liquidations = get_liquidations(classified_traces)
logger.info(f"Block: {block_number} -- Found {len(liquidations)} liquidations")
delete_liquidations_for_block(inspect_db_session, block_number)
delete_liquidations_for_blocks(
inspect_db_session, after_block_number, before_block_number
)
write_liquidations(inspect_db_session, liquidations)
sandwiches = get_sandwiches(swaps)
logger.info(f"Block: {block_number} -- Found {len(sandwiches)} sandwiches")
delete_sandwiches_for_block(inspect_db_session, block_number)
delete_sandwiches_for_blocks(
inspect_db_session, after_block_number, before_block_number
)
write_sandwiches(inspect_db_session, sandwiches)
punk_bids = get_punk_bids(classified_traces)
delete_punk_bids_for_block(inspect_db_session, block_number)
delete_punk_bids_for_blocks(
inspect_db_session, after_block_number, before_block_number
)
write_punk_bids(inspect_db_session, punk_bids)
punk_bid_acceptances = get_punk_bid_acceptances(classified_traces)
delete_punk_bid_acceptances_for_block(inspect_db_session, block_number)
delete_punk_bid_acceptances_for_blocks(
inspect_db_session, after_block_number, before_block_number
)
write_punk_bid_acceptances(inspect_db_session, punk_bid_acceptances)
punk_snipes = get_punk_snipes(punk_bids, punk_bid_acceptances)
logger.info(f"Block: {block_number} -- Found {len(punk_snipes)} punk snipes")
delete_punk_snipes_for_block(inspect_db_session, block_number)
delete_punk_snipes_for_blocks(
inspect_db_session, after_block_number, before_block_number
)
write_punk_snipes(inspect_db_session, punk_snipes)
miner_payments = get_miner_payments(
block.miner, block.base_fee_per_gas, classified_traces, block.receipts
delete_miner_payments_for_blocks(
inspect_db_session, after_block_number, before_block_number
)
delete_miner_payments_for_block(inspect_db_session, block_number)
write_miner_payments(inspect_db_session, miner_payments)