Compare commits

...

1 Commits

Author SHA1 Message Date
Luke Van Seters
f303d98c1d Save progress moving sqlalchmy to async 2021-11-05 16:57:04 -04:00
17 changed files with 286 additions and 212 deletions

23
cli.py
View File

@ -1,10 +1,8 @@
import asyncio
import os import os
import signal
from functools import wraps
import click import click
from mev_inspect.concurrency import coro
from mev_inspect.inspector import MEVInspector from mev_inspect.inspector import MEVInspector
RPC_URL_ENV = "RPC_URL" RPC_URL_ENV = "RPC_URL"
@ -15,25 +13,6 @@ def cli():
pass pass
def coro(f):
@wraps(f)
def wrapper(*args, **kwargs):
loop = asyncio.get_event_loop()
def cancel_task_callback():
for task in asyncio.all_tasks():
task.cancel()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, cancel_task_callback)
try:
loop.run_until_complete(f(*args, **kwargs))
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
return wrapper
@cli.command() @cli.command()
@click.argument("block_number", type=int) @click.argument("block_number", type=int)
@click.option("--rpc", default=lambda: os.environ.get(RPC_URL_ENV, "")) @click.option("--rpc", default=lambda: os.environ.get(RPC_URL_ENV, ""))

View File

@ -5,12 +5,13 @@ import time
from web3 import Web3 from web3 import Web3
from mev_inspect.block import get_latest_block_number from mev_inspect.block import get_latest_block_number
from mev_inspect.concurrency import coro
from mev_inspect.crud.latest_block_update import ( from mev_inspect.crud.latest_block_update import (
find_latest_block_update, find_latest_block_update,
update_latest_block, update_latest_block,
) )
from mev_inspect.classifiers.trace import TraceClassifier from mev_inspect.classifiers.trace import TraceClassifier
from mev_inspect.db import get_inspect_session, get_trace_session from mev_inspect.db import get_inspect_sessionmaker, get_trace_sessionmaker
from mev_inspect.inspect_block import inspect_block from mev_inspect.inspect_block import inspect_block
from mev_inspect.provider import get_base_provider from mev_inspect.provider import get_base_provider
from mev_inspect.signal_handler import GracefulKiller from mev_inspect.signal_handler import GracefulKiller
@ -23,7 +24,8 @@ logger = logging.getLogger(__name__)
BLOCK_NUMBER_LAG = 5 BLOCK_NUMBER_LAG = 5
def run(): @coro
async def run():
rpc = os.getenv("RPC_URL") rpc = os.getenv("RPC_URL")
if rpc is None: if rpc is None:
raise RuntimeError("Missing environment variable RPC_URL") raise RuntimeError("Missing environment variable RPC_URL")
@ -32,8 +34,14 @@ def run():
killer = GracefulKiller() killer = GracefulKiller()
inspect_db_session = get_inspect_session() inspect_db_sessionmaker = get_inspect_sessionmaker()
trace_db_session = get_trace_session() trace_db_sessionmaker = get_trace_sessionmaker()
inspect_db_session = inspect_db_sessionmaker()
trace_db_session = (
trace_db_sessionmaker() if trace_db_sessionmaker is not None else None
)
trace_classifier = TraceClassifier() trace_classifier = TraceClassifier()
base_provider = get_base_provider(rpc) base_provider = get_base_provider(rpc)

View File

@ -4,9 +4,10 @@ import sys
from pathlib import Path from pathlib import Path
from typing import List, Optional from typing import List, Optional
from sqlalchemy import orm from sqlalchemy.ext.asyncio import AsyncSession
from web3 import Web3 from web3 import Web3
from mev_inspect.crud.blocks import find_block
from mev_inspect.fees import fetch_base_fee_per_gas from mev_inspect.fees import fetch_base_fee_per_gas
from mev_inspect.schemas.blocks import Block from mev_inspect.schemas.blocks import Block
from mev_inspect.schemas.receipts import Receipt from mev_inspect.schemas.receipts import Receipt
@ -26,12 +27,12 @@ async def create_from_block_number(
base_provider, base_provider,
w3: Web3, w3: Web3,
block_number: int, block_number: int,
trace_db_session: Optional[orm.Session], trace_db_session: Optional[AsyncSession],
) -> Block: ) -> Block:
block: Optional[Block] = None block: Optional[Block] = None
if trace_db_session is not None: if trace_db_session is not None:
block = _find_block(trace_db_session, block_number) block = await find_block(trace_db_session, block_number)
if block is None: if block is None:
block = await _fetch_block(w3, base_provider, block_number) block = await _fetch_block(w3, base_provider, block_number)
@ -72,87 +73,6 @@ async def _fetch_block(w3, base_provider, block_number: int, retries: int = 0) -
) )
def _find_block(
trace_db_session: orm.Session,
block_number: int,
) -> Optional[Block]:
traces = _find_traces(trace_db_session, block_number)
receipts = _find_receipts(trace_db_session, block_number)
base_fee_per_gas = _find_base_fee(trace_db_session, block_number)
if traces is None or receipts is None or base_fee_per_gas is None:
return None
miner_address = _get_miner_address_from_traces(traces)
if miner_address is None:
return None
return Block(
block_number=block_number,
miner=miner_address,
base_fee_per_gas=base_fee_per_gas,
traces=traces,
receipts=receipts,
)
def _find_traces(
trace_db_session: orm.Session,
block_number: int,
) -> Optional[List[Trace]]:
result = trace_db_session.execute(
"SELECT raw_traces FROM block_traces WHERE block_number = :block_number",
params={"block_number": block_number},
).one_or_none()
if result is None:
return None
else:
(traces_json,) = result
return [Trace(**trace_json) for trace_json in traces_json]
def _find_receipts(
trace_db_session: orm.Session,
block_number: int,
) -> Optional[List[Receipt]]:
result = trace_db_session.execute(
"SELECT raw_receipts FROM block_receipts WHERE block_number = :block_number",
params={"block_number": block_number},
).one_or_none()
if result is None:
return None
else:
(receipts_json,) = result
return [Receipt(**receipt) for receipt in receipts_json]
def _find_base_fee(
trace_db_session: orm.Session,
block_number: int,
) -> Optional[int]:
result = trace_db_session.execute(
"SELECT base_fee_in_wei FROM base_fee WHERE block_number = :block_number",
params={"block_number": block_number},
).one_or_none()
if result is None:
return None
else:
(base_fee,) = result
return base_fee
def _get_miner_address_from_traces(traces: List[Trace]) -> Optional[str]:
for trace in traces:
if trace.type == TraceType.reward:
return trace.action["author"]
return None
def get_transaction_hashes(calls: List[Trace]) -> List[str]: def get_transaction_hashes(calls: List[Trace]) -> List[str]:
result = [] result = []

View File

@ -0,0 +1,22 @@
import asyncio
import signal
from functools import wraps
def coro(f):
@wraps(f)
def wrapper(*args, **kwargs):
loop = asyncio.get_event_loop()
def cancel_task_callback():
for task in asyncio.all_tasks():
task.cancel()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, cancel_task_callback)
try:
loop.run_until_complete(f(*args, **kwargs))
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
return wrapper

View File

@ -5,20 +5,20 @@ from mev_inspect.models.arbitrages import ArbitrageModel
from mev_inspect.schemas.arbitrages import Arbitrage from mev_inspect.schemas.arbitrages import Arbitrage
def delete_arbitrages_for_block( async def delete_arbitrages_for_block(
db_session, db_session,
block_number: int, block_number: int,
) -> None: ) -> None:
( await (
db_session.query(ArbitrageModel) db_session.query(ArbitrageModel)
.filter(ArbitrageModel.block_number == block_number) .filter(ArbitrageModel.block_number == block_number)
.delete() .delete()
) )
db_session.commit() await db_session.commit()
def write_arbitrages( async def write_arbitrages(
db_session, db_session,
arbitrages: List[Arbitrage], arbitrages: List[Arbitrage],
) -> None: ) -> None:
@ -50,8 +50,8 @@ def write_arbitrages(
) )
if len(arbitrage_models) > 0: if len(arbitrage_models) > 0:
db_session.bulk_save_objects(arbitrage_models) await db_session.bulk_save_objects(arbitrage_models)
db_session.execute( await db_session.execute(
""" """
INSERT INTO arbitrage_swaps INSERT INTO arbitrage_swaps
(arbitrage_id, swap_transaction_hash, swap_trace_address) (arbitrage_id, swap_transaction_hash, swap_trace_address)
@ -61,4 +61,4 @@ def write_arbitrages(
params=swap_arbitrage_ids, params=swap_arbitrage_ids,
) )
db_session.commit() await db_session.commit()

View File

@ -0,0 +1,88 @@
from typing import List, Optional
from sqlalchemy.ext.asyncio import AsyncSession
from mev_inspect.schemas.blocks import Block
from mev_inspect.schemas.receipts import Receipt
from mev_inspect.schemas.traces import Trace, TraceType
async def find_block(
trace_db_session: AsyncSession,
block_number: int,
) -> Optional[Block]:
traces = await _find_traces(trace_db_session, block_number)
receipts = await _find_receipts(trace_db_session, block_number)
base_fee_per_gas = await _find_base_fee(trace_db_session, block_number)
if traces is None or receipts is None or base_fee_per_gas is None:
return None
miner_address = _get_miner_address_from_traces(traces)
if miner_address is None:
return None
return Block(
block_number=block_number,
miner=miner_address,
base_fee_per_gas=base_fee_per_gas,
traces=traces,
receipts=receipts,
)
async def _find_traces(
trace_db_session: AsyncSession,
block_number: int,
) -> Optional[List[Trace]]:
result = await trace_db_session.execute(
"SELECT raw_traces FROM block_traces WHERE block_number = :block_number",
params={"block_number": block_number},
).one_or_none()
if result is None:
return None
else:
(traces_json,) = result
return [Trace(**trace_json) for trace_json in traces_json]
async def _find_receipts(
trace_db_session: AsyncSession,
block_number: int,
) -> Optional[List[Receipt]]:
result = await trace_db_session.execute(
"SELECT raw_receipts FROM block_receipts WHERE block_number = :block_number",
params={"block_number": block_number},
).one_or_none()
if result is None:
return None
else:
(receipts_json,) = result
return [Receipt(**receipt) for receipt in receipts_json]
async def _find_base_fee(
trace_db_session: AsyncSession,
block_number: int,
) -> Optional[int]:
result = await trace_db_session.execute(
"SELECT base_fee_in_wei FROM base_fee WHERE block_number = :block_number",
params={"block_number": block_number},
).one_or_none()
if result is None:
return None
else:
(base_fee,) = result
return base_fee
def _get_miner_address_from_traces(traces: List[Trace]) -> Optional[str]:
for trace in traces:
if trace.type == TraceType.reward:
return trace.action["author"]
return None

View File

@ -1,8 +1,8 @@
from typing import Optional from typing import Optional
def find_latest_block_update(db_session) -> Optional[int]: async def find_latest_block_update(db_session) -> Optional[int]:
result = db_session.execute( result = await db_session.execute(
"SELECT block_number FROM latest_block_update LIMIT 1" "SELECT block_number FROM latest_block_update LIMIT 1"
).one_or_none() ).one_or_none()
if result is None: if result is None:
@ -11,8 +11,8 @@ def find_latest_block_update(db_session) -> Optional[int]:
return int(result[0]) return int(result[0])
def update_latest_block(db_session, block_number) -> None: async def update_latest_block(db_session, block_number) -> None:
db_session.execute( await db_session.execute(
""" """
UPDATE latest_block_update UPDATE latest_block_update
SET block_number = :block_number, updated_at = current_timestamp; SET block_number = :block_number, updated_at = current_timestamp;

View File

@ -5,20 +5,20 @@ from mev_inspect.models.liquidations import LiquidationModel
from mev_inspect.schemas.liquidations import Liquidation from mev_inspect.schemas.liquidations import Liquidation
def delete_liquidations_for_block( async def delete_liquidations_for_block(
db_session, db_session,
block_number: int, block_number: int,
) -> None: ) -> None:
( await (
db_session.query(LiquidationModel) db_session.query(LiquidationModel)
.filter(LiquidationModel.block_number == block_number) .filter(LiquidationModel.block_number == block_number)
.delete() .delete()
) )
db_session.commit() await db_session.commit()
def write_liquidations( async def write_liquidations(
db_session, db_session,
liquidations: List[Liquidation], liquidations: List[Liquidation],
) -> None: ) -> None:
@ -27,5 +27,5 @@ def write_liquidations(
for liquidation in liquidations for liquidation in liquidations
] ]
db_session.bulk_save_objects(models) await db_session.bulk_save_objects(models)
db_session.commit() await db_session.commit()

View File

@ -5,20 +5,20 @@ from mev_inspect.models.miner_payments import MinerPaymentModel
from mev_inspect.schemas.miner_payments import MinerPayment from mev_inspect.schemas.miner_payments import MinerPayment
def delete_miner_payments_for_block( async def delete_miner_payments_for_block(
db_session, db_session,
block_number: int, block_number: int,
) -> None: ) -> None:
( await (
db_session.query(MinerPaymentModel) db_session.query(MinerPaymentModel)
.filter(MinerPaymentModel.block_number == block_number) .filter(MinerPaymentModel.block_number == block_number)
.delete() .delete()
) )
db_session.commit() await db_session.commit()
def write_miner_payments( async def write_miner_payments(
db_session, db_session,
miner_payments: List[MinerPayment], miner_payments: List[MinerPayment],
) -> None: ) -> None:
@ -27,5 +27,5 @@ def write_miner_payments(
for miner_payment in miner_payments for miner_payment in miner_payments
] ]
db_session.bulk_save_objects(models) await db_session.bulk_save_objects(models)
db_session.commit() await db_session.commit()

View File

@ -5,24 +5,24 @@ from mev_inspect.models.swaps import SwapModel
from mev_inspect.schemas.swaps import Swap from mev_inspect.schemas.swaps import Swap
def delete_swaps_for_block( async def delete_swaps_for_block(
db_session, db_session,
block_number: int, block_number: int,
) -> None: ) -> None:
( await (
db_session.query(SwapModel) db_session.query(SwapModel)
.filter(SwapModel.block_number == block_number) .filter(SwapModel.block_number == block_number)
.delete() .delete()
) )
db_session.commit() await db_session.commit()
def write_swaps( async def write_swaps(
db_session, db_session,
swaps: List[Swap], swaps: List[Swap],
) -> None: ) -> None:
models = [SwapModel(**json.loads(swap.json())) for swap in swaps] models = [SwapModel(**json.loads(swap.json())) for swap in swaps]
db_session.bulk_save_objects(models) await db_session.bulk_save_objects(models)
db_session.commit() await db_session.commit()

View File

@ -1,25 +1,25 @@
import json import json
from typing import List from typing import List
from sqlalchemy import delete
from mev_inspect.models.traces import ClassifiedTraceModel from mev_inspect.models.traces import ClassifiedTraceModel
from mev_inspect.schemas.traces import ClassifiedTrace from mev_inspect.schemas.traces import ClassifiedTrace
def delete_classified_traces_for_block( async def delete_classified_traces_for_block(
db_session, inspect_db_session,
block_number: int, block_number: int,
) -> None: ) -> None:
( statement = delete(ClassifiedTraceModel).where(
db_session.query(ClassifiedTraceModel) ClassifiedTraceModel.block_number == block_number
.filter(ClassifiedTraceModel.block_number == block_number)
.delete()
) )
await inspect_db_session.execute(statement)
db_session.commit() await inspect_db_session.commit()
def write_classified_traces( async def write_classified_traces(
db_session, inspect_db_session,
classified_traces: List[ClassifiedTrace], classified_traces: List[ClassifiedTrace],
) -> None: ) -> None:
models = [] models = []
@ -46,5 +46,5 @@ def write_classified_traces(
) )
) )
db_session.bulk_save_objects(models) inspect_db_session.add_all(models)
db_session.commit() await inspect_db_session.commit()

View File

@ -5,7 +5,7 @@ from mev_inspect.models.transfers import TransferModel
from mev_inspect.schemas.transfers import Transfer from mev_inspect.schemas.transfers import Transfer
def delete_transfers_for_block( async def delete_transfers_for_block(
db_session, db_session,
block_number: int, block_number: int,
) -> None: ) -> None:
@ -18,7 +18,7 @@ def delete_transfers_for_block(
db_session.commit() db_session.commit()
def write_transfers( async def write_transfers(
db_session, db_session,
transfers: List[Transfer], transfers: List[Transfer],
) -> None: ) -> None:

View File

@ -1,7 +1,8 @@
import os import os
from typing import Optional from typing import Optional
from sqlalchemy import create_engine, orm from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
@ -12,7 +13,7 @@ def get_trace_database_uri() -> Optional[str]:
db_name = "trace_db" db_name = "trace_db"
if all(field is not None for field in [username, password, host]): if all(field is not None for field in [username, password, host]):
return f"postgresql://{username}:{password}@{host}/{db_name}" return f"postgresql+asyncpg://{username}:{password}@{host}/{db_name}"
return None return None
@ -22,27 +23,30 @@ def get_inspect_database_uri():
password = os.getenv("POSTGRES_PASSWORD") password = os.getenv("POSTGRES_PASSWORD")
host = os.getenv("POSTGRES_HOST") host = os.getenv("POSTGRES_HOST")
db_name = "mev_inspect" db_name = "mev_inspect"
return f"postgresql://{username}:{password}@{host}/{db_name}" return f"postgresql+asyncpg://{username}:{password}@{host}/{db_name}"
def _get_engine(uri: str): def _get_engine(uri: str):
return create_engine(uri) return create_async_engine(uri)
def _get_session(uri: str): def _get_sessionmaker(uri: str):
Session = sessionmaker(bind=_get_engine(uri)) return sessionmaker(
return Session() _get_engine(uri),
class_=AsyncSession,
expire_on_commit=False,
)
def get_inspect_session() -> orm.Session: def get_inspect_sessionmaker():
uri = get_inspect_database_uri() uri = get_inspect_database_uri()
return _get_session(uri) return _get_sessionmaker(uri)
def get_trace_session() -> Optional[orm.Session]: def get_trace_sessionmaker():
uri = get_trace_database_uri() uri = get_trace_database_uri()
if uri is not None: if uri is not None:
return _get_session(uri) return _get_sessionmaker(uri)
return None return None

View File

@ -1,7 +1,7 @@
import logging import logging
from typing import Optional from typing import Optional
from sqlalchemy import orm from sqlalchemy.ext.asyncio import AsyncSession
from web3 import Web3 from web3 import Web3
from mev_inspect.arbitrages import get_arbitrages from mev_inspect.arbitrages import get_arbitrages
@ -36,14 +36,15 @@ logger = logging.getLogger(__name__)
async def inspect_block( async def inspect_block(
inspect_db_session: orm.Session, inspect_db_session: AsyncSession,
base_provider, base_provider,
w3: Web3, w3: Web3,
trace_clasifier: TraceClassifier, trace_clasifier: TraceClassifier,
block_number: int, block_number: int,
trace_db_session: Optional[orm.Session], trace_db_session: Optional[AsyncSession],
should_write_classified_traces: bool = True, should_write_classified_traces: bool = True,
): ):
logger.info(f"Block: {block_number} -- Entering")
block = await create_from_block_number( block = await create_from_block_number(
base_provider, base_provider,
w3, w3,
@ -64,36 +65,37 @@ async def inspect_block(
) )
if should_write_classified_traces: if should_write_classified_traces:
delete_classified_traces_for_block(inspect_db_session, block_number) await delete_classified_traces_for_block(inspect_db_session, block_number)
write_classified_traces(inspect_db_session, classified_traces) await write_classified_traces(inspect_db_session, classified_traces)
transfers = get_transfers(classified_traces) transfers = get_transfers(classified_traces)
logger.info(f"Block: {block_number} -- Found {len(transfers)} transfers") logger.info(f"Block: {block_number} -- Found {len(transfers)} transfers")
delete_transfers_for_block(inspect_db_session, block_number) await delete_transfers_for_block(inspect_db_session, block_number)
write_transfers(inspect_db_session, transfers) await write_transfers(inspect_db_session, transfers)
swaps = get_swaps(classified_traces) swaps = get_swaps(classified_traces)
logger.info(f"Block: {block_number} -- Found {len(swaps)} swaps") logger.info(f"Block: {block_number} -- Found {len(swaps)} swaps")
delete_swaps_for_block(inspect_db_session, block_number) await delete_swaps_for_block(inspect_db_session, block_number)
write_swaps(inspect_db_session, swaps) await write_swaps(inspect_db_session, swaps)
arbitrages = get_arbitrages(swaps) arbitrages = get_arbitrages(swaps)
logger.info(f"Block: {block_number} -- Found {len(arbitrages)} arbitrages") logger.info(f"Block: {block_number} -- Found {len(arbitrages)} arbitrages")
delete_arbitrages_for_block(inspect_db_session, block_number) await delete_arbitrages_for_block(inspect_db_session, block_number)
write_arbitrages(inspect_db_session, arbitrages) await write_arbitrages(inspect_db_session, arbitrages)
liquidations = get_liquidations(classified_traces) liquidations = get_liquidations(classified_traces)
logger.info(f"Block: {block_number} -- Found {len(liquidations)} liquidations") logger.info(f"Block: {block_number} -- Found {len(liquidations)} liquidations")
delete_liquidations_for_block(inspect_db_session, block_number) await delete_liquidations_for_block(inspect_db_session, block_number)
write_liquidations(inspect_db_session, liquidations) await write_liquidations(inspect_db_session, liquidations)
miner_payments = get_miner_payments( miner_payments = get_miner_payments(
block.miner, block.base_fee_per_gas, classified_traces, block.receipts block.miner, block.base_fee_per_gas, classified_traces, block.receipts
) )
delete_miner_payments_for_block(inspect_db_session, block_number) await delete_miner_payments_for_block(inspect_db_session, block_number)
write_miner_payments(inspect_db_session, miner_payments) await write_miner_payments(inspect_db_session, miner_payments)
logger.info(f"Block: {block_number} -- Exiting")

View File

@ -9,7 +9,7 @@ from web3.eth import AsyncEth
from mev_inspect.block import create_from_block_number from mev_inspect.block import create_from_block_number
from mev_inspect.classifiers.trace import TraceClassifier from mev_inspect.classifiers.trace import TraceClassifier
from mev_inspect.db import get_inspect_session, get_trace_session from mev_inspect.db import get_inspect_sessionmaker, get_trace_sessionmaker
from mev_inspect.inspect_block import inspect_block from mev_inspect.inspect_block import inspect_block
from mev_inspect.provider import get_base_provider from mev_inspect.provider import get_base_provider
@ -24,40 +24,68 @@ class MEVInspector:
max_concurrency: int = 1, max_concurrency: int = 1,
request_timeout: int = 300, request_timeout: int = 300,
): ):
self.inspect_db_session = get_inspect_session()
self.trace_db_session = get_trace_session()
self.base_provider = get_base_provider(rpc, request_timeout=request_timeout) self.base_provider = get_base_provider(rpc, request_timeout=request_timeout)
self.w3 = Web3(self.base_provider, modules={"eth": (AsyncEth,)}, middlewares=[]) self.w3 = Web3(self.base_provider, modules={"eth": (AsyncEth,)}, middlewares=[])
self.trace_classifier = TraceClassifier() self.trace_classifier = TraceClassifier()
self.max_concurrency = asyncio.Semaphore(max_concurrency) self.max_concurrency = asyncio.Semaphore(max_concurrency)
async def create_from_block(self, block_number: int): async def create_from_block(self, block_number: int):
trace_db_sessionmaker = await get_trace_sessionmaker()
trace_db_session = (
trace_db_sessionmaker() if trace_db_sessionmaker is not None else None
)
return await create_from_block_number( return await create_from_block_number(
base_provider=self.base_provider, base_provider=self.base_provider,
w3=self.w3, w3=self.w3,
block_number=block_number, block_number=block_number,
trace_db_session=self.trace_db_session, trace_db_session=trace_db_session,
) )
if trace_db_session is not None:
await trace_db_session.close()
async def inspect_single_block(self, block: int): async def inspect_single_block(self, block: int):
return await inspect_block( inspect_db_sessionmaker = await get_inspect_sessionmaker()
self.inspect_db_session, trace_db_sessionmaker = await get_trace_sessionmaker()
inspect_db_session = inspect_db_sessionmaker()
trace_db_session = (
trace_db_sessionmaker() if trace_db_sessionmaker is not None else None
)
await inspect_block(
inspect_db_session,
self.base_provider, self.base_provider,
self.w3, self.w3,
self.trace_classifier, self.trace_classifier,
block, block,
trace_db_session=self.trace_db_session, trace_db_session=trace_db_session,
) )
await inspect_db_session.close()
if trace_db_session is not None:
await trace_db_session.close()
async def inspect_many_blocks(self, after_block: int, before_block: int): async def inspect_many_blocks(self, after_block: int, before_block: int):
inspect_db_sessionmaker = get_inspect_sessionmaker()
trace_db_sessionmaker = get_trace_sessionmaker()
tasks = [] tasks = []
for block_number in range(after_block, before_block): for block_number in range(after_block, before_block):
tasks.append( tasks.append(
asyncio.ensure_future( asyncio.ensure_future(
self.safe_inspect_block(block_number=block_number) self.safe_inspect_block(
inspect_db_sessionmaker,
block_number,
trace_db_sessionmaker,
)
) )
) )
logger.info(f"Gathered {len(tasks)} blocks to inspect") logger.info(f"Gathered {len(tasks)} blocks to inspect")
try: try:
await asyncio.gather(*tasks) await asyncio.gather(*tasks)
except CancelledError: except CancelledError:
@ -66,13 +94,27 @@ class MEVInspector:
logger.error(f"Existed due to {type(e)}") logger.error(f"Existed due to {type(e)}")
traceback.print_exc() traceback.print_exc()
async def safe_inspect_block(self, block_number: int): async def safe_inspect_block(
self,
inspect_db_sessionmaker,
block_number: int,
trace_db_sessionmaker,
):
async with self.max_concurrency: async with self.max_concurrency:
return await inspect_block( inspect_db_session = inspect_db_sessionmaker()
self.inspect_db_session, trace_db_session = (
trace_db_sessionmaker() if trace_db_sessionmaker is not None else None
)
await inspect_block(
inspect_db_session,
self.base_provider, self.base_provider,
self.w3, self.w3,
self.trace_classifier, self.trace_classifier,
block_number, block_number,
trace_db_session=self.trace_db_session, trace_db_session=trace_db_session,
) )
await inspect_db_session.close()
if trace_db_session is not None:
await trace_db_session.close()

49
poetry.lock generated
View File

@ -51,6 +51,19 @@ category = "main"
optional = false optional = false
python-versions = ">=3.5.3" python-versions = ">=3.5.3"
[[package]]
name = "asyncpg"
version = "0.24.0"
description = "An asyncio PostgreSQL driver"
category = "main"
optional = false
python-versions = ">=3.6.0"
[package.extras]
dev = ["Cython (>=0.29.24,<0.30.0)", "pytest (>=6.0)", "Sphinx (>=4.1.2,<4.2.0)", "sphinxcontrib-asyncio (>=0.3.0,<0.4.0)", "sphinx-rtd-theme (>=0.5.2,<0.6.0)", "pycodestyle (>=2.7.0,<2.8.0)", "flake8 (>=3.9.2,<3.10.0)", "uvloop (>=0.15.3)"]
docs = ["Sphinx (>=4.1.2,<4.2.0)", "sphinxcontrib-asyncio (>=0.3.0,<0.4.0)", "sphinx-rtd-theme (>=0.5.2,<0.6.0)"]
test = ["pycodestyle (>=2.7.0,<2.8.0)", "flake8 (>=3.9.2,<3.10.0)", "uvloop (>=0.15.3)"]
[[package]] [[package]]
name = "atomicwrites" name = "atomicwrites"
version = "1.4.0" version = "1.4.0"
@ -645,14 +658,6 @@ python-versions = "*"
[package.dependencies] [package.dependencies]
six = ">=1.9" six = ">=1.9"
[[package]]
name = "psycopg2"
version = "2.9.1"
description = "psycopg2 - Python-PostgreSQL Database Adapter"
category = "main"
optional = false
python-versions = ">=3.6"
[[package]] [[package]]
name = "py" name = "py"
version = "1.10.0" version = "1.10.0"
@ -1017,7 +1022,7 @@ multidict = ">=4.0"
[metadata] [metadata]
lock-version = "1.1" lock-version = "1.1"
python-versions = "^3.9" python-versions = "^3.9"
content-hash = "baade6f62f3adaff192b2c85b4f602f4990b9b99d6fcce904aeb5087b6fa1921" content-hash = "61d28ab2afc95db3df7b96c56850ceb640113dde1ff62a782fac9ba52d9b49a7"
[metadata.files] [metadata.files]
aiohttp = [ aiohttp = [
@ -1071,6 +1076,21 @@ async-timeout = [
{file = "async-timeout-3.0.1.tar.gz", hash = "sha256:0c3c816a028d47f659d6ff5c745cb2acf1f966da1fe5c19c77a70282b25f4c5f"}, {file = "async-timeout-3.0.1.tar.gz", hash = "sha256:0c3c816a028d47f659d6ff5c745cb2acf1f966da1fe5c19c77a70282b25f4c5f"},
{file = "async_timeout-3.0.1-py3-none-any.whl", hash = "sha256:4291ca197d287d274d0b6cb5d6f8f8f82d434ed288f962539ff18cc9012f9ea3"}, {file = "async_timeout-3.0.1-py3-none-any.whl", hash = "sha256:4291ca197d287d274d0b6cb5d6f8f8f82d434ed288f962539ff18cc9012f9ea3"},
] ]
asyncpg = [
{file = "asyncpg-0.24.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:c4fc0205fe4ddd5aeb3dfdc0f7bafd43411181e1f5650189608e5971cceacff1"},
{file = "asyncpg-0.24.0-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:a7095890c96ba36f9f668eb552bb020dddb44f8e73e932f8573efc613ee83843"},
{file = "asyncpg-0.24.0-cp310-cp310-win_amd64.whl", hash = "sha256:8ff5073d4b654e34bd5eaadc01dc4d68b8a9609084d835acd364cd934190a08d"},
{file = "asyncpg-0.24.0-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:e36c6806883786b19551bb70a4882561f31135dc8105a59662e0376cf5b2cbc5"},
{file = "asyncpg-0.24.0-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:ddffcb85227bf39cd1bedd4603e0082b243cf3b14ced64dce506a15b05232b83"},
{file = "asyncpg-0.24.0-cp37-cp37m-win_amd64.whl", hash = "sha256:41704c561d354bef01353835a7846e5606faabbeb846214dfcf666cf53319f18"},
{file = "asyncpg-0.24.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:29ef6ae0a617fc13cc2ac5dc8e9b367bb83cba220614b437af9b67766f4b6b20"},
{file = "asyncpg-0.24.0-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:eed43abc6ccf1dc02e0d0efc06ce46a411362f3358847c6b0ec9a43426f91ece"},
{file = "asyncpg-0.24.0-cp38-cp38-win_amd64.whl", hash = "sha256:129d501f3d30616afd51eb8d3142ef51ba05374256bd5834cec3ef4956a9b317"},
{file = "asyncpg-0.24.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:a458fc69051fbb67d995fdda46d75a012b5d6200f91e17d23d4751482640ed4c"},
{file = "asyncpg-0.24.0-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:556b0e92e2b75dc028b3c4bc9bd5162ddf0053b856437cf1f04c97f9c6837d03"},
{file = "asyncpg-0.24.0-cp39-cp39-win_amd64.whl", hash = "sha256:a738f4807c853623d3f93f0fea11f61be6b0e5ca16ea8aeb42c2c7ee742aa853"},
{file = "asyncpg-0.24.0.tar.gz", hash = "sha256:dd2fa063c3344823487d9ddccb40802f02622ddf8bf8a6cc53885ee7a2c1c0c6"},
]
atomicwrites = [ atomicwrites = [
{file = "atomicwrites-1.4.0-py2.py3-none-any.whl", hash = "sha256:6d1784dea7c0c8d4a5172b6c620f40b6e4cbfdf96d783691f2e1302a7b88e197"}, {file = "atomicwrites-1.4.0-py2.py3-none-any.whl", hash = "sha256:6d1784dea7c0c8d4a5172b6c620f40b6e4cbfdf96d783691f2e1302a7b88e197"},
{file = "atomicwrites-1.4.0.tar.gz", hash = "sha256:ae70396ad1a434f9c7046fd2dd196fc04b12f9e91ffb859164193be8b6168a7a"}, {file = "atomicwrites-1.4.0.tar.gz", hash = "sha256:ae70396ad1a434f9c7046fd2dd196fc04b12f9e91ffb859164193be8b6168a7a"},
@ -1537,17 +1557,6 @@ protobuf = [
{file = "protobuf-3.17.3-py2.py3-none-any.whl", hash = "sha256:2bfb815216a9cd9faec52b16fd2bfa68437a44b67c56bee59bc3926522ecb04e"}, {file = "protobuf-3.17.3-py2.py3-none-any.whl", hash = "sha256:2bfb815216a9cd9faec52b16fd2bfa68437a44b67c56bee59bc3926522ecb04e"},
{file = "protobuf-3.17.3.tar.gz", hash = "sha256:72804ea5eaa9c22a090d2803813e280fb273b62d5ae497aaf3553d141c4fdd7b"}, {file = "protobuf-3.17.3.tar.gz", hash = "sha256:72804ea5eaa9c22a090d2803813e280fb273b62d5ae497aaf3553d141c4fdd7b"},
] ]
psycopg2 = [
{file = "psycopg2-2.9.1-cp36-cp36m-win32.whl", hash = "sha256:7f91312f065df517187134cce8e395ab37f5b601a42446bdc0f0d51773621854"},
{file = "psycopg2-2.9.1-cp36-cp36m-win_amd64.whl", hash = "sha256:830c8e8dddab6b6716a4bf73a09910c7954a92f40cf1d1e702fb93c8a919cc56"},
{file = "psycopg2-2.9.1-cp37-cp37m-win32.whl", hash = "sha256:89409d369f4882c47f7ea20c42c5046879ce22c1e4ea20ef3b00a4dfc0a7f188"},
{file = "psycopg2-2.9.1-cp37-cp37m-win_amd64.whl", hash = "sha256:7640e1e4d72444ef012e275e7b53204d7fab341fb22bc76057ede22fe6860b25"},
{file = "psycopg2-2.9.1-cp38-cp38-win32.whl", hash = "sha256:079d97fc22de90da1d370c90583659a9f9a6ee4007355f5825e5f1c70dffc1fa"},
{file = "psycopg2-2.9.1-cp38-cp38-win_amd64.whl", hash = "sha256:2c992196719fadda59f72d44603ee1a2fdcc67de097eea38d41c7ad9ad246e62"},
{file = "psycopg2-2.9.1-cp39-cp39-win32.whl", hash = "sha256:2087013c159a73e09713294a44d0c8008204d06326006b7f652bef5ace66eebb"},
{file = "psycopg2-2.9.1-cp39-cp39-win_amd64.whl", hash = "sha256:bf35a25f1aaa8a3781195595577fcbb59934856ee46b4f252f56ad12b8043bcf"},
{file = "psycopg2-2.9.1.tar.gz", hash = "sha256:de5303a6f1d0a7a34b9d40e4d3bef684ccc44a49bbe3eb85e3c0bffb4a131b7c"},
]
py = [ py = [
{file = "py-1.10.0-py2.py3-none-any.whl", hash = "sha256:3b80836aa6d1feeaa108e046da6423ab8f6ceda6468545ae8d02d9d58d18818a"}, {file = "py-1.10.0-py2.py3-none-any.whl", hash = "sha256:3b80836aa6d1feeaa108e046da6423ab8f6ceda6468545ae8d02d9d58d18818a"},
{file = "py-1.10.0.tar.gz", hash = "sha256:21b81bda15b66ef5e1a777a21c4dcd9c20ad3efd0b3f817e7a809035269e1bd3"}, {file = "py-1.10.0.tar.gz", hash = "sha256:21b81bda15b66ef5e1a777a21c4dcd9c20ad3efd0b3f817e7a809035269e1bd3"},

View File

@ -10,7 +10,7 @@ web3 = "^5.23.0"
pydantic = "^1.8.2" pydantic = "^1.8.2"
hexbytes = "^0.2.1" hexbytes = "^0.2.1"
click = "^8.0.1" click = "^8.0.1"
psycopg2 = "^2.9.1" asyncpg = "^0.24.0"
[tool.poetry.dev-dependencies] [tool.poetry.dev-dependencies]
pre-commit = "^2.13.0" pre-commit = "^2.13.0"