From f303d98c1da2cc7d3547f193bb0fe732beb6f7b9 Mon Sep 17 00:00:00 2001 From: Luke Van Seters Date: Fri, 5 Nov 2021 16:57:04 -0400 Subject: [PATCH] Save progress moving sqlalchmy to async --- cli.py | 23 +------ listener.py | 16 +++-- mev_inspect/block.py | 88 ++----------------------- mev_inspect/concurrency.py | 22 +++++++ mev_inspect/crud/arbitrages.py | 14 ++-- mev_inspect/crud/blocks.py | 88 +++++++++++++++++++++++++ mev_inspect/crud/latest_block_update.py | 8 +-- mev_inspect/crud/liquidations.py | 12 ++-- mev_inspect/crud/miner_payments.py | 12 ++-- mev_inspect/crud/swaps.py | 12 ++-- mev_inspect/crud/traces.py | 24 +++---- mev_inspect/crud/transfers.py | 4 +- mev_inspect/db.py | 26 ++++---- mev_inspect/inspect_block.py | 32 ++++----- mev_inspect/inspector.py | 66 +++++++++++++++---- poetry.lock | 49 ++++++++------ pyproject.toml | 2 +- 17 files changed, 286 insertions(+), 212 deletions(-) create mode 100644 mev_inspect/concurrency.py create mode 100644 mev_inspect/crud/blocks.py diff --git a/cli.py b/cli.py index e535c4f..2849204 100644 --- a/cli.py +++ b/cli.py @@ -1,10 +1,8 @@ -import asyncio import os -import signal -from functools import wraps import click +from mev_inspect.concurrency import coro from mev_inspect.inspector import MEVInspector RPC_URL_ENV = "RPC_URL" @@ -15,25 +13,6 @@ def cli(): pass -def coro(f): - @wraps(f) - def wrapper(*args, **kwargs): - loop = asyncio.get_event_loop() - - def cancel_task_callback(): - for task in asyncio.all_tasks(): - task.cancel() - - for sig in (signal.SIGINT, signal.SIGTERM): - loop.add_signal_handler(sig, cancel_task_callback) - try: - loop.run_until_complete(f(*args, **kwargs)) - finally: - loop.run_until_complete(loop.shutdown_asyncgens()) - - return wrapper - - @cli.command() @click.argument("block_number", type=int) @click.option("--rpc", default=lambda: os.environ.get(RPC_URL_ENV, "")) diff --git a/listener.py b/listener.py index 5c1e386..f8fcad5 100644 --- a/listener.py +++ b/listener.py @@ -5,12 +5,13 @@ import time from web3 import Web3 from mev_inspect.block import get_latest_block_number +from mev_inspect.concurrency import coro from mev_inspect.crud.latest_block_update import ( find_latest_block_update, update_latest_block, ) from mev_inspect.classifiers.trace import TraceClassifier -from mev_inspect.db import get_inspect_session, get_trace_session +from mev_inspect.db import get_inspect_sessionmaker, get_trace_sessionmaker from mev_inspect.inspect_block import inspect_block from mev_inspect.provider import get_base_provider from mev_inspect.signal_handler import GracefulKiller @@ -23,7 +24,8 @@ logger = logging.getLogger(__name__) BLOCK_NUMBER_LAG = 5 -def run(): +@coro +async def run(): rpc = os.getenv("RPC_URL") if rpc is None: raise RuntimeError("Missing environment variable RPC_URL") @@ -32,8 +34,14 @@ def run(): killer = GracefulKiller() - inspect_db_session = get_inspect_session() - trace_db_session = get_trace_session() + inspect_db_sessionmaker = get_inspect_sessionmaker() + trace_db_sessionmaker = get_trace_sessionmaker() + + inspect_db_session = inspect_db_sessionmaker() + trace_db_session = ( + trace_db_sessionmaker() if trace_db_sessionmaker is not None else None + ) + trace_classifier = TraceClassifier() base_provider = get_base_provider(rpc) diff --git a/mev_inspect/block.py b/mev_inspect/block.py index 69c4b79..bab5764 100644 --- a/mev_inspect/block.py +++ b/mev_inspect/block.py @@ -4,9 +4,10 @@ import sys from pathlib import Path from typing import List, Optional -from sqlalchemy import orm +from sqlalchemy.ext.asyncio import AsyncSession from web3 import Web3 +from mev_inspect.crud.blocks import find_block from mev_inspect.fees import fetch_base_fee_per_gas from mev_inspect.schemas.blocks import Block from mev_inspect.schemas.receipts import Receipt @@ -26,12 +27,12 @@ async def create_from_block_number( base_provider, w3: Web3, block_number: int, - trace_db_session: Optional[orm.Session], + trace_db_session: Optional[AsyncSession], ) -> Block: block: Optional[Block] = None if trace_db_session is not None: - block = _find_block(trace_db_session, block_number) + block = await find_block(trace_db_session, block_number) if block is None: block = await _fetch_block(w3, base_provider, block_number) @@ -72,87 +73,6 @@ async def _fetch_block(w3, base_provider, block_number: int, retries: int = 0) - ) -def _find_block( - trace_db_session: orm.Session, - block_number: int, -) -> Optional[Block]: - traces = _find_traces(trace_db_session, block_number) - receipts = _find_receipts(trace_db_session, block_number) - base_fee_per_gas = _find_base_fee(trace_db_session, block_number) - - if traces is None or receipts is None or base_fee_per_gas is None: - return None - - miner_address = _get_miner_address_from_traces(traces) - - if miner_address is None: - return None - - return Block( - block_number=block_number, - miner=miner_address, - base_fee_per_gas=base_fee_per_gas, - traces=traces, - receipts=receipts, - ) - - -def _find_traces( - trace_db_session: orm.Session, - block_number: int, -) -> Optional[List[Trace]]: - result = trace_db_session.execute( - "SELECT raw_traces FROM block_traces WHERE block_number = :block_number", - params={"block_number": block_number}, - ).one_or_none() - - if result is None: - return None - else: - (traces_json,) = result - return [Trace(**trace_json) for trace_json in traces_json] - - -def _find_receipts( - trace_db_session: orm.Session, - block_number: int, -) -> Optional[List[Receipt]]: - result = trace_db_session.execute( - "SELECT raw_receipts FROM block_receipts WHERE block_number = :block_number", - params={"block_number": block_number}, - ).one_or_none() - - if result is None: - return None - else: - (receipts_json,) = result - return [Receipt(**receipt) for receipt in receipts_json] - - -def _find_base_fee( - trace_db_session: orm.Session, - block_number: int, -) -> Optional[int]: - result = trace_db_session.execute( - "SELECT base_fee_in_wei FROM base_fee WHERE block_number = :block_number", - params={"block_number": block_number}, - ).one_or_none() - - if result is None: - return None - else: - (base_fee,) = result - return base_fee - - -def _get_miner_address_from_traces(traces: List[Trace]) -> Optional[str]: - for trace in traces: - if trace.type == TraceType.reward: - return trace.action["author"] - - return None - - def get_transaction_hashes(calls: List[Trace]) -> List[str]: result = [] diff --git a/mev_inspect/concurrency.py b/mev_inspect/concurrency.py new file mode 100644 index 0000000..46ac147 --- /dev/null +++ b/mev_inspect/concurrency.py @@ -0,0 +1,22 @@ +import asyncio +import signal +from functools import wraps + + +def coro(f): + @wraps(f) + def wrapper(*args, **kwargs): + loop = asyncio.get_event_loop() + + def cancel_task_callback(): + for task in asyncio.all_tasks(): + task.cancel() + + for sig in (signal.SIGINT, signal.SIGTERM): + loop.add_signal_handler(sig, cancel_task_callback) + try: + loop.run_until_complete(f(*args, **kwargs)) + finally: + loop.run_until_complete(loop.shutdown_asyncgens()) + + return wrapper diff --git a/mev_inspect/crud/arbitrages.py b/mev_inspect/crud/arbitrages.py index 8d23e1d..1bbc970 100644 --- a/mev_inspect/crud/arbitrages.py +++ b/mev_inspect/crud/arbitrages.py @@ -5,20 +5,20 @@ from mev_inspect.models.arbitrages import ArbitrageModel from mev_inspect.schemas.arbitrages import Arbitrage -def delete_arbitrages_for_block( +async def delete_arbitrages_for_block( db_session, block_number: int, ) -> None: - ( + await ( db_session.query(ArbitrageModel) .filter(ArbitrageModel.block_number == block_number) .delete() ) - db_session.commit() + await db_session.commit() -def write_arbitrages( +async def write_arbitrages( db_session, arbitrages: List[Arbitrage], ) -> None: @@ -50,8 +50,8 @@ def write_arbitrages( ) if len(arbitrage_models) > 0: - db_session.bulk_save_objects(arbitrage_models) - db_session.execute( + await db_session.bulk_save_objects(arbitrage_models) + await db_session.execute( """ INSERT INTO arbitrage_swaps (arbitrage_id, swap_transaction_hash, swap_trace_address) @@ -61,4 +61,4 @@ def write_arbitrages( params=swap_arbitrage_ids, ) - db_session.commit() + await db_session.commit() diff --git a/mev_inspect/crud/blocks.py b/mev_inspect/crud/blocks.py new file mode 100644 index 0000000..c427fa3 --- /dev/null +++ b/mev_inspect/crud/blocks.py @@ -0,0 +1,88 @@ +from typing import List, Optional + +from sqlalchemy.ext.asyncio import AsyncSession + +from mev_inspect.schemas.blocks import Block +from mev_inspect.schemas.receipts import Receipt +from mev_inspect.schemas.traces import Trace, TraceType + + +async def find_block( + trace_db_session: AsyncSession, + block_number: int, +) -> Optional[Block]: + traces = await _find_traces(trace_db_session, block_number) + receipts = await _find_receipts(trace_db_session, block_number) + base_fee_per_gas = await _find_base_fee(trace_db_session, block_number) + + if traces is None or receipts is None or base_fee_per_gas is None: + return None + + miner_address = _get_miner_address_from_traces(traces) + + if miner_address is None: + return None + + return Block( + block_number=block_number, + miner=miner_address, + base_fee_per_gas=base_fee_per_gas, + traces=traces, + receipts=receipts, + ) + + +async def _find_traces( + trace_db_session: AsyncSession, + block_number: int, +) -> Optional[List[Trace]]: + result = await trace_db_session.execute( + "SELECT raw_traces FROM block_traces WHERE block_number = :block_number", + params={"block_number": block_number}, + ).one_or_none() + + if result is None: + return None + else: + (traces_json,) = result + return [Trace(**trace_json) for trace_json in traces_json] + + +async def _find_receipts( + trace_db_session: AsyncSession, + block_number: int, +) -> Optional[List[Receipt]]: + result = await trace_db_session.execute( + "SELECT raw_receipts FROM block_receipts WHERE block_number = :block_number", + params={"block_number": block_number}, + ).one_or_none() + + if result is None: + return None + else: + (receipts_json,) = result + return [Receipt(**receipt) for receipt in receipts_json] + + +async def _find_base_fee( + trace_db_session: AsyncSession, + block_number: int, +) -> Optional[int]: + result = await trace_db_session.execute( + "SELECT base_fee_in_wei FROM base_fee WHERE block_number = :block_number", + params={"block_number": block_number}, + ).one_or_none() + + if result is None: + return None + else: + (base_fee,) = result + return base_fee + + +def _get_miner_address_from_traces(traces: List[Trace]) -> Optional[str]: + for trace in traces: + if trace.type == TraceType.reward: + return trace.action["author"] + + return None diff --git a/mev_inspect/crud/latest_block_update.py b/mev_inspect/crud/latest_block_update.py index a918d93..a6860fc 100644 --- a/mev_inspect/crud/latest_block_update.py +++ b/mev_inspect/crud/latest_block_update.py @@ -1,8 +1,8 @@ from typing import Optional -def find_latest_block_update(db_session) -> Optional[int]: - result = db_session.execute( +async def find_latest_block_update(db_session) -> Optional[int]: + result = await db_session.execute( "SELECT block_number FROM latest_block_update LIMIT 1" ).one_or_none() if result is None: @@ -11,8 +11,8 @@ def find_latest_block_update(db_session) -> Optional[int]: return int(result[0]) -def update_latest_block(db_session, block_number) -> None: - db_session.execute( +async def update_latest_block(db_session, block_number) -> None: + await db_session.execute( """ UPDATE latest_block_update SET block_number = :block_number, updated_at = current_timestamp; diff --git a/mev_inspect/crud/liquidations.py b/mev_inspect/crud/liquidations.py index 2a58ead..aadbeea 100644 --- a/mev_inspect/crud/liquidations.py +++ b/mev_inspect/crud/liquidations.py @@ -5,20 +5,20 @@ from mev_inspect.models.liquidations import LiquidationModel from mev_inspect.schemas.liquidations import Liquidation -def delete_liquidations_for_block( +async def delete_liquidations_for_block( db_session, block_number: int, ) -> None: - ( + await ( db_session.query(LiquidationModel) .filter(LiquidationModel.block_number == block_number) .delete() ) - db_session.commit() + await db_session.commit() -def write_liquidations( +async def write_liquidations( db_session, liquidations: List[Liquidation], ) -> None: @@ -27,5 +27,5 @@ def write_liquidations( for liquidation in liquidations ] - db_session.bulk_save_objects(models) - db_session.commit() + await db_session.bulk_save_objects(models) + await db_session.commit() diff --git a/mev_inspect/crud/miner_payments.py b/mev_inspect/crud/miner_payments.py index e82bf96..46f2508 100644 --- a/mev_inspect/crud/miner_payments.py +++ b/mev_inspect/crud/miner_payments.py @@ -5,20 +5,20 @@ from mev_inspect.models.miner_payments import MinerPaymentModel from mev_inspect.schemas.miner_payments import MinerPayment -def delete_miner_payments_for_block( +async def delete_miner_payments_for_block( db_session, block_number: int, ) -> None: - ( + await ( db_session.query(MinerPaymentModel) .filter(MinerPaymentModel.block_number == block_number) .delete() ) - db_session.commit() + await db_session.commit() -def write_miner_payments( +async def write_miner_payments( db_session, miner_payments: List[MinerPayment], ) -> None: @@ -27,5 +27,5 @@ def write_miner_payments( for miner_payment in miner_payments ] - db_session.bulk_save_objects(models) - db_session.commit() + await db_session.bulk_save_objects(models) + await db_session.commit() diff --git a/mev_inspect/crud/swaps.py b/mev_inspect/crud/swaps.py index 4c51cd1..99a230b 100644 --- a/mev_inspect/crud/swaps.py +++ b/mev_inspect/crud/swaps.py @@ -5,24 +5,24 @@ from mev_inspect.models.swaps import SwapModel from mev_inspect.schemas.swaps import Swap -def delete_swaps_for_block( +async def delete_swaps_for_block( db_session, block_number: int, ) -> None: - ( + await ( db_session.query(SwapModel) .filter(SwapModel.block_number == block_number) .delete() ) - db_session.commit() + await db_session.commit() -def write_swaps( +async def write_swaps( db_session, swaps: List[Swap], ) -> None: models = [SwapModel(**json.loads(swap.json())) for swap in swaps] - db_session.bulk_save_objects(models) - db_session.commit() + await db_session.bulk_save_objects(models) + await db_session.commit() diff --git a/mev_inspect/crud/traces.py b/mev_inspect/crud/traces.py index aa6f32c..c0251e1 100644 --- a/mev_inspect/crud/traces.py +++ b/mev_inspect/crud/traces.py @@ -1,25 +1,25 @@ import json from typing import List +from sqlalchemy import delete + from mev_inspect.models.traces import ClassifiedTraceModel from mev_inspect.schemas.traces import ClassifiedTrace -def delete_classified_traces_for_block( - db_session, +async def delete_classified_traces_for_block( + inspect_db_session, block_number: int, ) -> None: - ( - db_session.query(ClassifiedTraceModel) - .filter(ClassifiedTraceModel.block_number == block_number) - .delete() + statement = delete(ClassifiedTraceModel).where( + ClassifiedTraceModel.block_number == block_number ) - - db_session.commit() + await inspect_db_session.execute(statement) + await inspect_db_session.commit() -def write_classified_traces( - db_session, +async def write_classified_traces( + inspect_db_session, classified_traces: List[ClassifiedTrace], ) -> None: models = [] @@ -46,5 +46,5 @@ def write_classified_traces( ) ) - db_session.bulk_save_objects(models) - db_session.commit() + inspect_db_session.add_all(models) + await inspect_db_session.commit() diff --git a/mev_inspect/crud/transfers.py b/mev_inspect/crud/transfers.py index 7aa5adb..e44cd74 100644 --- a/mev_inspect/crud/transfers.py +++ b/mev_inspect/crud/transfers.py @@ -5,7 +5,7 @@ from mev_inspect.models.transfers import TransferModel from mev_inspect.schemas.transfers import Transfer -def delete_transfers_for_block( +async def delete_transfers_for_block( db_session, block_number: int, ) -> None: @@ -18,7 +18,7 @@ def delete_transfers_for_block( db_session.commit() -def write_transfers( +async def write_transfers( db_session, transfers: List[Transfer], ) -> None: diff --git a/mev_inspect/db.py b/mev_inspect/db.py index 9cdaa48..1b9bd2c 100644 --- a/mev_inspect/db.py +++ b/mev_inspect/db.py @@ -1,7 +1,8 @@ import os from typing import Optional -from sqlalchemy import create_engine, orm +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.ext.asyncio import create_async_engine from sqlalchemy.orm import sessionmaker @@ -12,7 +13,7 @@ def get_trace_database_uri() -> Optional[str]: db_name = "trace_db" if all(field is not None for field in [username, password, host]): - return f"postgresql://{username}:{password}@{host}/{db_name}" + return f"postgresql+asyncpg://{username}:{password}@{host}/{db_name}" return None @@ -22,27 +23,30 @@ def get_inspect_database_uri(): password = os.getenv("POSTGRES_PASSWORD") host = os.getenv("POSTGRES_HOST") db_name = "mev_inspect" - return f"postgresql://{username}:{password}@{host}/{db_name}" + return f"postgresql+asyncpg://{username}:{password}@{host}/{db_name}" def _get_engine(uri: str): - return create_engine(uri) + return create_async_engine(uri) -def _get_session(uri: str): - Session = sessionmaker(bind=_get_engine(uri)) - return Session() +def _get_sessionmaker(uri: str): + return sessionmaker( + _get_engine(uri), + class_=AsyncSession, + expire_on_commit=False, + ) -def get_inspect_session() -> orm.Session: +def get_inspect_sessionmaker(): uri = get_inspect_database_uri() - return _get_session(uri) + return _get_sessionmaker(uri) -def get_trace_session() -> Optional[orm.Session]: +def get_trace_sessionmaker(): uri = get_trace_database_uri() if uri is not None: - return _get_session(uri) + return _get_sessionmaker(uri) return None diff --git a/mev_inspect/inspect_block.py b/mev_inspect/inspect_block.py index c517923..a067747 100644 --- a/mev_inspect/inspect_block.py +++ b/mev_inspect/inspect_block.py @@ -1,7 +1,7 @@ import logging from typing import Optional -from sqlalchemy import orm +from sqlalchemy.ext.asyncio import AsyncSession from web3 import Web3 from mev_inspect.arbitrages import get_arbitrages @@ -36,14 +36,15 @@ logger = logging.getLogger(__name__) async def inspect_block( - inspect_db_session: orm.Session, + inspect_db_session: AsyncSession, base_provider, w3: Web3, trace_clasifier: TraceClassifier, block_number: int, - trace_db_session: Optional[orm.Session], + trace_db_session: Optional[AsyncSession], should_write_classified_traces: bool = True, ): + logger.info(f"Block: {block_number} -- Entering") block = await create_from_block_number( base_provider, w3, @@ -64,36 +65,37 @@ async def inspect_block( ) if should_write_classified_traces: - delete_classified_traces_for_block(inspect_db_session, block_number) - write_classified_traces(inspect_db_session, classified_traces) + await delete_classified_traces_for_block(inspect_db_session, block_number) + await write_classified_traces(inspect_db_session, classified_traces) transfers = get_transfers(classified_traces) logger.info(f"Block: {block_number} -- Found {len(transfers)} transfers") - delete_transfers_for_block(inspect_db_session, block_number) - write_transfers(inspect_db_session, transfers) + await delete_transfers_for_block(inspect_db_session, block_number) + await write_transfers(inspect_db_session, transfers) swaps = get_swaps(classified_traces) logger.info(f"Block: {block_number} -- Found {len(swaps)} swaps") - delete_swaps_for_block(inspect_db_session, block_number) - write_swaps(inspect_db_session, swaps) + await delete_swaps_for_block(inspect_db_session, block_number) + await write_swaps(inspect_db_session, swaps) arbitrages = get_arbitrages(swaps) logger.info(f"Block: {block_number} -- Found {len(arbitrages)} arbitrages") - delete_arbitrages_for_block(inspect_db_session, block_number) - write_arbitrages(inspect_db_session, arbitrages) + await delete_arbitrages_for_block(inspect_db_session, block_number) + await write_arbitrages(inspect_db_session, arbitrages) liquidations = get_liquidations(classified_traces) logger.info(f"Block: {block_number} -- Found {len(liquidations)} liquidations") - delete_liquidations_for_block(inspect_db_session, block_number) - write_liquidations(inspect_db_session, liquidations) + await delete_liquidations_for_block(inspect_db_session, block_number) + await write_liquidations(inspect_db_session, liquidations) miner_payments = get_miner_payments( block.miner, block.base_fee_per_gas, classified_traces, block.receipts ) - delete_miner_payments_for_block(inspect_db_session, block_number) - write_miner_payments(inspect_db_session, miner_payments) + await delete_miner_payments_for_block(inspect_db_session, block_number) + await write_miner_payments(inspect_db_session, miner_payments) + logger.info(f"Block: {block_number} -- Exiting") diff --git a/mev_inspect/inspector.py b/mev_inspect/inspector.py index ca4e996..92a105c 100644 --- a/mev_inspect/inspector.py +++ b/mev_inspect/inspector.py @@ -9,7 +9,7 @@ from web3.eth import AsyncEth from mev_inspect.block import create_from_block_number from mev_inspect.classifiers.trace import TraceClassifier -from mev_inspect.db import get_inspect_session, get_trace_session +from mev_inspect.db import get_inspect_sessionmaker, get_trace_sessionmaker from mev_inspect.inspect_block import inspect_block from mev_inspect.provider import get_base_provider @@ -24,40 +24,68 @@ class MEVInspector: max_concurrency: int = 1, request_timeout: int = 300, ): - self.inspect_db_session = get_inspect_session() - self.trace_db_session = get_trace_session() self.base_provider = get_base_provider(rpc, request_timeout=request_timeout) self.w3 = Web3(self.base_provider, modules={"eth": (AsyncEth,)}, middlewares=[]) self.trace_classifier = TraceClassifier() self.max_concurrency = asyncio.Semaphore(max_concurrency) async def create_from_block(self, block_number: int): + trace_db_sessionmaker = await get_trace_sessionmaker() + trace_db_session = ( + trace_db_sessionmaker() if trace_db_sessionmaker is not None else None + ) + return await create_from_block_number( base_provider=self.base_provider, w3=self.w3, block_number=block_number, - trace_db_session=self.trace_db_session, + trace_db_session=trace_db_session, ) + if trace_db_session is not None: + await trace_db_session.close() + async def inspect_single_block(self, block: int): - return await inspect_block( - self.inspect_db_session, + inspect_db_sessionmaker = await get_inspect_sessionmaker() + trace_db_sessionmaker = await get_trace_sessionmaker() + + inspect_db_session = inspect_db_sessionmaker() + trace_db_session = ( + trace_db_sessionmaker() if trace_db_sessionmaker is not None else None + ) + + await inspect_block( + inspect_db_session, self.base_provider, self.w3, self.trace_classifier, block, - trace_db_session=self.trace_db_session, + trace_db_session=trace_db_session, ) + await inspect_db_session.close() + if trace_db_session is not None: + await trace_db_session.close() + async def inspect_many_blocks(self, after_block: int, before_block: int): + inspect_db_sessionmaker = get_inspect_sessionmaker() + trace_db_sessionmaker = get_trace_sessionmaker() + tasks = [] + for block_number in range(after_block, before_block): tasks.append( asyncio.ensure_future( - self.safe_inspect_block(block_number=block_number) + self.safe_inspect_block( + inspect_db_sessionmaker, + block_number, + trace_db_sessionmaker, + ) ) ) + logger.info(f"Gathered {len(tasks)} blocks to inspect") + try: await asyncio.gather(*tasks) except CancelledError: @@ -66,13 +94,27 @@ class MEVInspector: logger.error(f"Existed due to {type(e)}") traceback.print_exc() - async def safe_inspect_block(self, block_number: int): + async def safe_inspect_block( + self, + inspect_db_sessionmaker, + block_number: int, + trace_db_sessionmaker, + ): async with self.max_concurrency: - return await inspect_block( - self.inspect_db_session, + inspect_db_session = inspect_db_sessionmaker() + trace_db_session = ( + trace_db_sessionmaker() if trace_db_sessionmaker is not None else None + ) + + await inspect_block( + inspect_db_session, self.base_provider, self.w3, self.trace_classifier, block_number, - trace_db_session=self.trace_db_session, + trace_db_session=trace_db_session, ) + + await inspect_db_session.close() + if trace_db_session is not None: + await trace_db_session.close() diff --git a/poetry.lock b/poetry.lock index ccd2565..acf0ba3 100644 --- a/poetry.lock +++ b/poetry.lock @@ -51,6 +51,19 @@ category = "main" optional = false python-versions = ">=3.5.3" +[[package]] +name = "asyncpg" +version = "0.24.0" +description = "An asyncio PostgreSQL driver" +category = "main" +optional = false +python-versions = ">=3.6.0" + +[package.extras] +dev = ["Cython (>=0.29.24,<0.30.0)", "pytest (>=6.0)", "Sphinx (>=4.1.2,<4.2.0)", "sphinxcontrib-asyncio (>=0.3.0,<0.4.0)", "sphinx-rtd-theme (>=0.5.2,<0.6.0)", "pycodestyle (>=2.7.0,<2.8.0)", "flake8 (>=3.9.2,<3.10.0)", "uvloop (>=0.15.3)"] +docs = ["Sphinx (>=4.1.2,<4.2.0)", "sphinxcontrib-asyncio (>=0.3.0,<0.4.0)", "sphinx-rtd-theme (>=0.5.2,<0.6.0)"] +test = ["pycodestyle (>=2.7.0,<2.8.0)", "flake8 (>=3.9.2,<3.10.0)", "uvloop (>=0.15.3)"] + [[package]] name = "atomicwrites" version = "1.4.0" @@ -645,14 +658,6 @@ python-versions = "*" [package.dependencies] six = ">=1.9" -[[package]] -name = "psycopg2" -version = "2.9.1" -description = "psycopg2 - Python-PostgreSQL Database Adapter" -category = "main" -optional = false -python-versions = ">=3.6" - [[package]] name = "py" version = "1.10.0" @@ -1017,7 +1022,7 @@ multidict = ">=4.0" [metadata] lock-version = "1.1" python-versions = "^3.9" -content-hash = "baade6f62f3adaff192b2c85b4f602f4990b9b99d6fcce904aeb5087b6fa1921" +content-hash = "61d28ab2afc95db3df7b96c56850ceb640113dde1ff62a782fac9ba52d9b49a7" [metadata.files] aiohttp = [ @@ -1071,6 +1076,21 @@ async-timeout = [ {file = "async-timeout-3.0.1.tar.gz", hash = "sha256:0c3c816a028d47f659d6ff5c745cb2acf1f966da1fe5c19c77a70282b25f4c5f"}, {file = "async_timeout-3.0.1-py3-none-any.whl", hash = "sha256:4291ca197d287d274d0b6cb5d6f8f8f82d434ed288f962539ff18cc9012f9ea3"}, ] +asyncpg = [ + {file = "asyncpg-0.24.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:c4fc0205fe4ddd5aeb3dfdc0f7bafd43411181e1f5650189608e5971cceacff1"}, + {file = "asyncpg-0.24.0-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:a7095890c96ba36f9f668eb552bb020dddb44f8e73e932f8573efc613ee83843"}, + {file = "asyncpg-0.24.0-cp310-cp310-win_amd64.whl", hash = "sha256:8ff5073d4b654e34bd5eaadc01dc4d68b8a9609084d835acd364cd934190a08d"}, + {file = "asyncpg-0.24.0-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:e36c6806883786b19551bb70a4882561f31135dc8105a59662e0376cf5b2cbc5"}, + {file = "asyncpg-0.24.0-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:ddffcb85227bf39cd1bedd4603e0082b243cf3b14ced64dce506a15b05232b83"}, + {file = "asyncpg-0.24.0-cp37-cp37m-win_amd64.whl", hash = "sha256:41704c561d354bef01353835a7846e5606faabbeb846214dfcf666cf53319f18"}, + {file = "asyncpg-0.24.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:29ef6ae0a617fc13cc2ac5dc8e9b367bb83cba220614b437af9b67766f4b6b20"}, + {file = "asyncpg-0.24.0-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:eed43abc6ccf1dc02e0d0efc06ce46a411362f3358847c6b0ec9a43426f91ece"}, + {file = "asyncpg-0.24.0-cp38-cp38-win_amd64.whl", hash = "sha256:129d501f3d30616afd51eb8d3142ef51ba05374256bd5834cec3ef4956a9b317"}, + {file = "asyncpg-0.24.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:a458fc69051fbb67d995fdda46d75a012b5d6200f91e17d23d4751482640ed4c"}, + {file = "asyncpg-0.24.0-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:556b0e92e2b75dc028b3c4bc9bd5162ddf0053b856437cf1f04c97f9c6837d03"}, + {file = "asyncpg-0.24.0-cp39-cp39-win_amd64.whl", hash = "sha256:a738f4807c853623d3f93f0fea11f61be6b0e5ca16ea8aeb42c2c7ee742aa853"}, + {file = "asyncpg-0.24.0.tar.gz", hash = "sha256:dd2fa063c3344823487d9ddccb40802f02622ddf8bf8a6cc53885ee7a2c1c0c6"}, +] atomicwrites = [ {file = "atomicwrites-1.4.0-py2.py3-none-any.whl", hash = "sha256:6d1784dea7c0c8d4a5172b6c620f40b6e4cbfdf96d783691f2e1302a7b88e197"}, {file = "atomicwrites-1.4.0.tar.gz", hash = "sha256:ae70396ad1a434f9c7046fd2dd196fc04b12f9e91ffb859164193be8b6168a7a"}, @@ -1537,17 +1557,6 @@ protobuf = [ {file = "protobuf-3.17.3-py2.py3-none-any.whl", hash = "sha256:2bfb815216a9cd9faec52b16fd2bfa68437a44b67c56bee59bc3926522ecb04e"}, {file = "protobuf-3.17.3.tar.gz", hash = "sha256:72804ea5eaa9c22a090d2803813e280fb273b62d5ae497aaf3553d141c4fdd7b"}, ] -psycopg2 = [ - {file = "psycopg2-2.9.1-cp36-cp36m-win32.whl", hash = "sha256:7f91312f065df517187134cce8e395ab37f5b601a42446bdc0f0d51773621854"}, - {file = "psycopg2-2.9.1-cp36-cp36m-win_amd64.whl", hash = "sha256:830c8e8dddab6b6716a4bf73a09910c7954a92f40cf1d1e702fb93c8a919cc56"}, - {file = "psycopg2-2.9.1-cp37-cp37m-win32.whl", hash = "sha256:89409d369f4882c47f7ea20c42c5046879ce22c1e4ea20ef3b00a4dfc0a7f188"}, - {file = "psycopg2-2.9.1-cp37-cp37m-win_amd64.whl", hash = "sha256:7640e1e4d72444ef012e275e7b53204d7fab341fb22bc76057ede22fe6860b25"}, - {file = "psycopg2-2.9.1-cp38-cp38-win32.whl", hash = "sha256:079d97fc22de90da1d370c90583659a9f9a6ee4007355f5825e5f1c70dffc1fa"}, - {file = "psycopg2-2.9.1-cp38-cp38-win_amd64.whl", hash = "sha256:2c992196719fadda59f72d44603ee1a2fdcc67de097eea38d41c7ad9ad246e62"}, - {file = "psycopg2-2.9.1-cp39-cp39-win32.whl", hash = "sha256:2087013c159a73e09713294a44d0c8008204d06326006b7f652bef5ace66eebb"}, - {file = "psycopg2-2.9.1-cp39-cp39-win_amd64.whl", hash = "sha256:bf35a25f1aaa8a3781195595577fcbb59934856ee46b4f252f56ad12b8043bcf"}, - {file = "psycopg2-2.9.1.tar.gz", hash = "sha256:de5303a6f1d0a7a34b9d40e4d3bef684ccc44a49bbe3eb85e3c0bffb4a131b7c"}, -] py = [ {file = "py-1.10.0-py2.py3-none-any.whl", hash = "sha256:3b80836aa6d1feeaa108e046da6423ab8f6ceda6468545ae8d02d9d58d18818a"}, {file = "py-1.10.0.tar.gz", hash = "sha256:21b81bda15b66ef5e1a777a21c4dcd9c20ad3efd0b3f817e7a809035269e1bd3"}, diff --git a/pyproject.toml b/pyproject.toml index b593a95..cc87801 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,7 +10,7 @@ web3 = "^5.23.0" pydantic = "^1.8.2" hexbytes = "^0.2.1" click = "^8.0.1" -psycopg2 = "^2.9.1" +asyncpg = "^0.24.0" [tool.poetry.dev-dependencies] pre-commit = "^2.13.0"