From 4f20c540e6167bda7e75e9a47b880c4489d5d8f3 Mon Sep 17 00:00:00 2001 From: carlomazzaferro Date: Wed, 20 Oct 2021 17:12:21 +0100 Subject: [PATCH 1/4] asyncio-based concurrent backfilling --- Tiltfile | 6 +++ cli.py | 97 +++++++++++++++++++++++++++++------- mev_inspect/block.py | 20 ++++---- mev_inspect/fees.py | 9 ++-- mev_inspect/inspect_block.py | 20 ++++---- mev_inspect/provider.py | 14 ++---- mev_inspect/retry.py | 6 +-- 7 files changed, 117 insertions(+), 55 deletions(-) diff --git a/Tiltfile b/Tiltfile index 6675eca..01b4ca1 100644 --- a/Tiltfile +++ b/Tiltfile @@ -36,3 +36,9 @@ docker_build_with_restart("mev-inspect-py", ".", ) k8s_yaml(helm('./k8s/mev-inspect', name='mev-inspect')) k8s_resource(workload="mev-inspect", resource_deps=["postgresql-postgresql"]) + +local_resource( + 'pg-port-forward', + serve_cmd='kubectl port-forward --namespace default svc/postgresql 5432:5432', + resource_deps=["postgresql-postgresql"] +) diff --git a/cli.py b/cli.py index fb8a080..a62f3b5 100644 --- a/cli.py +++ b/cli.py @@ -1,43 +1,62 @@ -import os +import asyncio import logging +import os import sys +from functools import wraps import click from web3 import Web3 +from web3.eth import AsyncEth from mev_inspect.classifiers.trace import TraceClassifier from mev_inspect.db import get_inspect_session, get_trace_session from mev_inspect.inspect_block import inspect_block from mev_inspect.provider import get_base_provider - +from mev_inspect.retry import http_retry_with_backoff_request_middleware RPC_URL_ENV = "RPC_URL" logging.basicConfig(stream=sys.stdout, level=logging.INFO) logger = logging.getLogger(__name__) +semaphore: asyncio.Semaphore + @click.group() def cli(): pass +def coro(f): + @wraps(f) + def wrapper(*args, **kwargs): + loop = asyncio.get_event_loop() + try: + loop.run_until_complete(f(*args, **kwargs)) + finally: + loop.run_until_complete(loop.shutdown_asyncgens()) + loop.close() + + return wrapper + + @cli.command() @click.argument("block_number", type=int) @click.option("--rpc", default=lambda: os.environ.get(RPC_URL_ENV, "")) @click.option("--cache/--no-cache", default=True) -def inspect_block_command(block_number: int, rpc: str, cache: bool): +@coro +async def inspect_block_command(block_number: int, rpc: str, cache: bool): inspect_db_session = get_inspect_session() trace_db_session = get_trace_session() base_provider = get_base_provider(rpc) - w3 = Web3(base_provider) + w3 = Web3(base_provider, modules={"eth": (AsyncEth,)}, middlewares=[]) trace_classifier = TraceClassifier() if not cache: logger.info("Skipping cache") - inspect_block( + await inspect_block( inspect_db_session, base_provider, w3, @@ -52,30 +71,70 @@ def inspect_block_command(block_number: int, rpc: str, cache: bool): @click.argument("before_block", type=int) @click.option("--rpc", default=lambda: os.environ.get(RPC_URL_ENV, "")) @click.option("--cache/--no-cache", default=True) -def inspect_many_blocks_command( - after_block: int, before_block: int, rpc: str, cache: bool +@click.option( + "--max-concurrency", + type=int, + help="maximum number of concurrent connections", + default=5, +) +@click.option( + "--request-timeout", type=int, help="timeout for requests to nodes", default=500 +) +@coro +async def inspect_many_blocks_command( + after_block: int, + before_block: int, + rpc: str, + cache: bool, + max_concurrency: int, + request_timeout: int, ): - + global semaphore # pylint: disable=global-statement + semaphore = asyncio.Semaphore(max_concurrency) inspect_db_session = get_inspect_session() trace_db_session = get_trace_session() - base_provider = get_base_provider(rpc) - w3 = Web3(base_provider) + base_provider = get_base_provider(rpc, request_timeout=request_timeout) + w3 = Web3( + base_provider, + modules={"eth": (AsyncEth,)}, + middlewares=[http_retry_with_backoff_request_middleware], + ) + trace_classifier = TraceClassifier() if not cache: logger.info("Skipping cache") - for i, block_number in enumerate(range(after_block, before_block)): - block_message = ( - f"Running for {block_number} ({i+1}/{before_block - after_block})" - ) - dashes = "-" * len(block_message) - logger.info(dashes) - logger.info(block_message) - logger.info(dashes) + tasks = [] - inspect_block( + for block_number in range(after_block, before_block): + tasks.append( + asyncio.ensure_future( + safe_inspect_block( + inspect_db_session, + base_provider, + w3, + trace_classifier, + block_number, + trace_db_session, + ) + ) + ) + logger.info(f"Gathered {len(tasks)} blocks to inspect") + await asyncio.gather(*tasks) + + +async def safe_inspect_block( + inspect_db_session, + base_provider, + w3, + trace_classifier, + block_number, + trace_db_session, +): + async with semaphore: + return await inspect_block( inspect_db_session, base_provider, w3, diff --git a/mev_inspect/block.py b/mev_inspect/block.py index ab4afe4..6f47246 100644 --- a/mev_inspect/block.py +++ b/mev_inspect/block.py @@ -16,7 +16,7 @@ def get_latest_block_number(w3: Web3) -> int: return int(w3.eth.get_block("latest")["number"]) -def create_from_block_number( +async def create_from_block_number( base_provider, w3: Web3, block_number: int, @@ -28,25 +28,27 @@ def create_from_block_number( block = _find_block(trace_db_session, block_number) if block is None: - return _fetch_block(w3, base_provider, block_number) + block = await _fetch_block(w3, base_provider, block_number) + return block else: return block -def _fetch_block( +async def _fetch_block( w3, base_provider, block_number: int, ) -> Block: - block_json = w3.eth.get_block(block_number) - receipts_json = base_provider.make_request("eth_getBlockReceipts", [block_number]) - traces_json = w3.parity.trace_block(block_number) - + block_json = await w3.eth.get_block(block_number) + receipts_json = await base_provider.make_request( + "eth_getBlockReceipts", [block_number] + ) + traces_json = await base_provider.make_request("trace_block", [block_number]) receipts: List[Receipt] = [ Receipt(**receipt) for receipt in receipts_json["result"] ] - traces = [Trace(**trace_json) for trace_json in traces_json] - base_fee_per_gas = fetch_base_fee_per_gas(w3, block_number) + traces = [Trace(**trace_json) for trace_json in traces_json["result"]] + base_fee_per_gas = await fetch_base_fee_per_gas(w3, block_number) return Block( block_number=block_number, diff --git a/mev_inspect/fees.py b/mev_inspect/fees.py index 19b7c4b..2b4d35a 100644 --- a/mev_inspect/fees.py +++ b/mev_inspect/fees.py @@ -1,9 +1,10 @@ from web3 import Web3 -def fetch_base_fee_per_gas(w3: Web3, block_number: int) -> int: - base_fees = w3.eth.fee_history(1, block_number)["baseFeePerGas"] - if len(base_fees) == 0: +async def fetch_base_fee_per_gas(w3: Web3, block_number: int) -> int: + base_fees = await w3.eth.fee_history(1, block_number) + base_fees_per_gas = base_fees["baseFeePerGas"] + if len(base_fees_per_gas) == 0: raise RuntimeError("Unexpected error - no fees returned") - return base_fees[0] + return base_fees_per_gas[0] diff --git a/mev_inspect/inspect_block.py b/mev_inspect/inspect_block.py index d3ab75b..b78e3fe 100644 --- a/mev_inspect/inspect_block.py +++ b/mev_inspect/inspect_block.py @@ -35,7 +35,7 @@ from mev_inspect.liquidations import get_liquidations logger = logging.getLogger(__name__) -def inspect_block( +async def inspect_block( inspect_db_session: orm.Session, base_provider, w3: Web3, @@ -44,47 +44,49 @@ def inspect_block( trace_db_session: Optional[orm.Session], should_write_classified_traces: bool = True, ): - block = create_from_block_number( + block = await create_from_block_number( base_provider, w3, block_number, trace_db_session, ) - logger.info(f"Total traces: {len(block.traces)}") + logger.info(f"Block: {block_number} -- Total traces: {len(block.traces)}") total_transactions = len( set(t.transaction_hash for t in block.traces if t.transaction_hash is not None) ) - logger.info(f"Total transactions: {total_transactions}") + logger.info(f"Block: {block_number} -- Total transactions: {total_transactions}") classified_traces = trace_clasifier.classify(block.traces) - logger.info(f"Returned {len(classified_traces)} classified traces") + logger.info( + f"Block: {block_number} -- Returned {len(classified_traces)} classified traces" + ) if should_write_classified_traces: delete_classified_traces_for_block(inspect_db_session, block_number) write_classified_traces(inspect_db_session, classified_traces) transfers = get_transfers(classified_traces) - logger.info(f"Found {len(transfers)} transfers") + logger.info(f"Block: {block_number} -- Found {len(transfers)} transfers") delete_transfers_for_block(inspect_db_session, block_number) write_transfers(inspect_db_session, transfers) swaps = get_swaps(classified_traces) - logger.info(f"Found {len(swaps)} swaps") + logger.info(f"Block: {block_number} -- Found {len(swaps)} swaps") delete_swaps_for_block(inspect_db_session, block_number) write_swaps(inspect_db_session, swaps) arbitrages = get_arbitrages(swaps) - logger.info(f"Found {len(arbitrages)} arbitrages") + logger.info(f"Block: {block_number} -- Found {len(arbitrages)} arbitrages") delete_arbitrages_for_block(inspect_db_session, block_number) write_arbitrages(inspect_db_session, arbitrages) liquidations = get_liquidations(classified_traces) - logger.info(f"Found {len(liquidations)} liquidations") + logger.info(f"Block: {block_number} -- Found {len(liquidations)} liquidations") delete_liquidations_for_block(inspect_db_session, block_number) write_liquidations(inspect_db_session, liquidations) diff --git a/mev_inspect/provider.py b/mev_inspect/provider.py index 1bdc68d..45d996b 100644 --- a/mev_inspect/provider.py +++ b/mev_inspect/provider.py @@ -1,14 +1,6 @@ -from web3 import Web3 - -from mev_inspect.retry import http_retry_with_backoff_request_middleware +from web3 import Web3, AsyncHTTPProvider -def get_base_provider(rpc: str) -> Web3.HTTPProvider: - base_provider = Web3.HTTPProvider(rpc) - base_provider.middlewares.remove("http_retry_request") - base_provider.middlewares.add( - http_retry_with_backoff_request_middleware, - "http_retry_with_backoff", - ) - +def get_base_provider(rpc: str, request_timeout: int = 500) -> Web3.AsyncHTTPProvider: + base_provider = AsyncHTTPProvider(rpc, request_kwargs={"timeout": request_timeout}) return base_provider diff --git a/mev_inspect/retry.py b/mev_inspect/retry.py index c85acdf..63d2e7a 100644 --- a/mev_inspect/retry.py +++ b/mev_inspect/retry.py @@ -20,7 +20,7 @@ from web3.types import ( ) -def exception_retry_with_backoff_middleware( +async def exception_retry_with_backoff_middleware( make_request: Callable[[RPCEndpoint, Any], RPCResponse], web3: Web3, # pylint: disable=unused-argument errors: Collection[Type[BaseException]], @@ -51,9 +51,9 @@ def exception_retry_with_backoff_middleware( return middleware -def http_retry_with_backoff_request_middleware( +async def http_retry_with_backoff_request_middleware( make_request: Callable[[RPCEndpoint, Any], Any], web3: Web3 ) -> Callable[[RPCEndpoint, Any], Any]: - return exception_retry_with_backoff_middleware( + return await exception_retry_with_backoff_middleware( make_request, web3, (ConnectionError, HTTPError, Timeout, TooManyRedirects) ) From e15eef49c1f59d81caddf4d100a08263c9d7592d Mon Sep 17 00:00:00 2001 From: carlomazzaferro Date: Fri, 22 Oct 2021 13:58:00 +0100 Subject: [PATCH 2/4] async based middleware, better logging and async requests --- cli.py | 26 ++++++++++++++++++-------- mev_inspect/block.py | 12 +++++++----- mev_inspect/provider.py | 3 +++ mev_inspect/retry.py | 37 ++++++++++++++++++++++++++++--------- 4 files changed, 56 insertions(+), 22 deletions(-) diff --git a/cli.py b/cli.py index a62f3b5..5146e84 100644 --- a/cli.py +++ b/cli.py @@ -1,7 +1,10 @@ import asyncio import logging import os +import signal import sys +import traceback +from asyncio import CancelledError from functools import wraps import click @@ -12,7 +15,6 @@ from mev_inspect.classifiers.trace import TraceClassifier from mev_inspect.db import get_inspect_session, get_trace_session from mev_inspect.inspect_block import inspect_block from mev_inspect.provider import get_base_provider -from mev_inspect.retry import http_retry_with_backoff_request_middleware RPC_URL_ENV = "RPC_URL" @@ -31,11 +33,17 @@ def coro(f): @wraps(f) def wrapper(*args, **kwargs): loop = asyncio.get_event_loop() + + def cancel_task_callback(): + for task in asyncio.all_tasks(): + task.cancel() + + for sig in (signal.SIGINT, signal.SIGTERM): + loop.add_signal_handler(sig, cancel_task_callback) try: loop.run_until_complete(f(*args, **kwargs)) finally: loop.run_until_complete(loop.shutdown_asyncgens()) - loop.close() return wrapper @@ -95,11 +103,7 @@ async def inspect_many_blocks_command( trace_db_session = get_trace_session() base_provider = get_base_provider(rpc, request_timeout=request_timeout) - w3 = Web3( - base_provider, - modules={"eth": (AsyncEth,)}, - middlewares=[http_retry_with_backoff_request_middleware], - ) + w3 = Web3(base_provider, modules={"eth": (AsyncEth,)}, middlewares=[]) trace_classifier = TraceClassifier() @@ -122,7 +126,13 @@ async def inspect_many_blocks_command( ) ) logger.info(f"Gathered {len(tasks)} blocks to inspect") - await asyncio.gather(*tasks) + try: + await asyncio.gather(*tasks) + except CancelledError: + logger.info("Requested to exit, cleaning up...") + except Exception as e: + logger.error(f"Existed due to {type(e)}") + traceback.print_exc() async def safe_inspect_block( diff --git a/mev_inspect/block.py b/mev_inspect/block.py index 6f47246..7ab3f0b 100644 --- a/mev_inspect/block.py +++ b/mev_inspect/block.py @@ -1,3 +1,4 @@ +import asyncio from pathlib import Path from typing import List, Optional @@ -39,16 +40,17 @@ async def _fetch_block( base_provider, block_number: int, ) -> Block: - block_json = await w3.eth.get_block(block_number) - receipts_json = await base_provider.make_request( - "eth_getBlockReceipts", [block_number] + block_json, receipts_json, traces_json, base_fee_per_gas = await asyncio.gather( + w3.eth.get_block(block_number), + base_provider.make_request("eth_getBlockReceipts", [block_number]), + base_provider.make_request("trace_block", [block_number]), + fetch_base_fee_per_gas(w3, block_number), ) - traces_json = await base_provider.make_request("trace_block", [block_number]) + receipts: List[Receipt] = [ Receipt(**receipt) for receipt in receipts_json["result"] ] traces = [Trace(**trace_json) for trace_json in traces_json["result"]] - base_fee_per_gas = await fetch_base_fee_per_gas(w3, block_number) return Block( block_number=block_number, diff --git a/mev_inspect/provider.py b/mev_inspect/provider.py index 45d996b..3b930ea 100644 --- a/mev_inspect/provider.py +++ b/mev_inspect/provider.py @@ -1,6 +1,9 @@ from web3 import Web3, AsyncHTTPProvider +from mev_inspect.retry import http_retry_with_backoff_request_middleware + def get_base_provider(rpc: str, request_timeout: int = 500) -> Web3.AsyncHTTPProvider: base_provider = AsyncHTTPProvider(rpc, request_kwargs={"timeout": request_timeout}) + base_provider.middlewares += (http_retry_with_backoff_request_middleware,) return base_provider diff --git a/mev_inspect/retry.py b/mev_inspect/retry.py index 63d2e7a..51d6892 100644 --- a/mev_inspect/retry.py +++ b/mev_inspect/retry.py @@ -1,10 +1,15 @@ -import time +import asyncio +import logging +import random +import sys from typing import ( Any, Callable, Collection, Type, + Coroutine, ) +from asyncio.exceptions import TimeoutError from requests.exceptions import ( ConnectionError, @@ -20,40 +25,54 @@ from web3.types import ( ) +logging.basicConfig(stream=sys.stdout, level=logging.INFO) +logger = logging.getLogger(__name__) + + async def exception_retry_with_backoff_middleware( - make_request: Callable[[RPCEndpoint, Any], RPCResponse], + make_request: Callable[[RPCEndpoint, Any], Any], web3: Web3, # pylint: disable=unused-argument errors: Collection[Type[BaseException]], retries: int = 5, backoff_time_seconds: float = 0.1, -) -> Callable[[RPCEndpoint, Any], RPCResponse]: +) -> Callable[[RPCEndpoint, Any], Coroutine[Any, Any, RPCResponse]]: """ Creates middleware that retries failed HTTP requests. Is a default middleware for HTTPProvider. """ - def middleware(method: RPCEndpoint, params: Any) -> RPCResponse: + async def middleware(method: RPCEndpoint, params: Any) -> RPCResponse: + if check_if_retry_on_failure(method): for i in range(retries): try: - return make_request(method, params) + return await make_request(method, params) # https://github.com/python/mypy/issues/5349 except errors: # type: ignore + logger.error( + f"Request for method {method}, block: {int(params[0], 16)}, retrying: {i}/{retries}" + ) if i < retries - 1: - time.sleep(backoff_time_seconds) + backoff_time = backoff_time_seconds * ( + random.uniform(5, 10) ** i + ) + await asyncio.sleep(backoff_time) continue + else: raise return None else: - return make_request(method, params) + return await make_request(method, params) return middleware async def http_retry_with_backoff_request_middleware( make_request: Callable[[RPCEndpoint, Any], Any], web3: Web3 -) -> Callable[[RPCEndpoint, Any], Any]: +) -> Callable[[RPCEndpoint, Any], Coroutine[Any, Any, RPCResponse]]: return await exception_retry_with_backoff_middleware( - make_request, web3, (ConnectionError, HTTPError, Timeout, TooManyRedirects) + make_request, + web3, + (ConnectionError, HTTPError, Timeout, TooManyRedirects, TimeoutError), ) From c3475bbd8f6ae556be744af580ad177a2d680ad0 Mon Sep 17 00:00:00 2001 From: carlomazzaferro Date: Thu, 28 Oct 2021 11:04:24 +0100 Subject: [PATCH 3/4] Use inspector class -- remove global Semaphore and improve error handling --- cli.py | 95 +++++----------------------------------- mev_inspect/block.py | 19 ++++++-- mev_inspect/inspector.py | 73 ++++++++++++++++++++++++++++++ mev_inspect/retry.py | 16 ++++++- 4 files changed, 115 insertions(+), 88 deletions(-) create mode 100644 mev_inspect/inspector.py diff --git a/cli.py b/cli.py index 5146e84..7472530 100644 --- a/cli.py +++ b/cli.py @@ -3,26 +3,17 @@ import logging import os import signal import sys -import traceback -from asyncio import CancelledError from functools import wraps import click -from web3 import Web3 -from web3.eth import AsyncEth -from mev_inspect.classifiers.trace import TraceClassifier -from mev_inspect.db import get_inspect_session, get_trace_session -from mev_inspect.inspect_block import inspect_block -from mev_inspect.provider import get_base_provider +from mev_inspect.inspector import MEVInspector RPC_URL_ENV = "RPC_URL" logging.basicConfig(stream=sys.stdout, level=logging.INFO) logger = logging.getLogger(__name__) -semaphore: asyncio.Semaphore - @click.group() def cli(): @@ -54,24 +45,8 @@ def coro(f): @click.option("--cache/--no-cache", default=True) @coro async def inspect_block_command(block_number: int, rpc: str, cache: bool): - inspect_db_session = get_inspect_session() - trace_db_session = get_trace_session() - - base_provider = get_base_provider(rpc) - w3 = Web3(base_provider, modules={"eth": (AsyncEth,)}, middlewares=[]) - trace_classifier = TraceClassifier() - - if not cache: - logger.info("Skipping cache") - - await inspect_block( - inspect_db_session, - base_provider, - w3, - trace_classifier, - block_number, - trace_db_session=trace_db_session, - ) + inspector = MEVInspector(rpc=rpc, cache=cache) + await inspector.inspect_single_block(block=block_number) @cli.command() @@ -97,61 +72,15 @@ async def inspect_many_blocks_command( max_concurrency: int, request_timeout: int, ): - global semaphore # pylint: disable=global-statement - semaphore = asyncio.Semaphore(max_concurrency) - inspect_db_session = get_inspect_session() - trace_db_session = get_trace_session() - - base_provider = get_base_provider(rpc, request_timeout=request_timeout) - w3 = Web3(base_provider, modules={"eth": (AsyncEth,)}, middlewares=[]) - - trace_classifier = TraceClassifier() - - if not cache: - logger.info("Skipping cache") - - tasks = [] - - for block_number in range(after_block, before_block): - tasks.append( - asyncio.ensure_future( - safe_inspect_block( - inspect_db_session, - base_provider, - w3, - trace_classifier, - block_number, - trace_db_session, - ) - ) - ) - logger.info(f"Gathered {len(tasks)} blocks to inspect") - try: - await asyncio.gather(*tasks) - except CancelledError: - logger.info("Requested to exit, cleaning up...") - except Exception as e: - logger.error(f"Existed due to {type(e)}") - traceback.print_exc() - - -async def safe_inspect_block( - inspect_db_session, - base_provider, - w3, - trace_classifier, - block_number, - trace_db_session, -): - async with semaphore: - return await inspect_block( - inspect_db_session, - base_provider, - w3, - trace_classifier, - block_number, - trace_db_session=trace_db_session, - ) + inspector = MEVInspector( + rpc=rpc, + cache=cache, + max_concurrency=max_concurrency, + request_timeout=request_timeout, + ) + await inspector.inspect_many_blocks( + after_block=after_block, before_block=before_block + ) def get_rpc_url() -> str: diff --git a/mev_inspect/block.py b/mev_inspect/block.py index 7ab3f0b..073b37c 100644 --- a/mev_inspect/block.py +++ b/mev_inspect/block.py @@ -1,4 +1,6 @@ import asyncio +import logging +import sys from pathlib import Path from typing import List, Optional @@ -11,6 +13,8 @@ from mev_inspect.schemas.receipts import Receipt cache_directory = "./cache" +logging.basicConfig(stream=sys.stdout, level=logging.INFO) +logger = logging.getLogger(__name__) def get_latest_block_number(w3: Web3) -> int: @@ -47,10 +51,17 @@ async def _fetch_block( fetch_base_fee_per_gas(w3, block_number), ) - receipts: List[Receipt] = [ - Receipt(**receipt) for receipt in receipts_json["result"] - ] - traces = [Trace(**trace_json) for trace_json in traces_json["result"]] + try: + receipts: List[Receipt] = [ + Receipt(**receipt) for receipt in receipts_json["result"] + ] + traces = [Trace(**trace_json) for trace_json in traces_json["result"]] + except KeyError as e: + logger.warning( + f"Failed to create objects from block: {block_number}: {e}, retrying in 5..." + ) + await asyncio.sleep(5) + return await _fetch_block(w3, base_provider, block_number) return Block( block_number=block_number, diff --git a/mev_inspect/inspector.py b/mev_inspect/inspector.py new file mode 100644 index 0000000..244807a --- /dev/null +++ b/mev_inspect/inspector.py @@ -0,0 +1,73 @@ +import asyncio +import logging +import sys +import traceback +from asyncio import CancelledError + +from web3 import Web3 +from web3.eth import AsyncEth + +from mev_inspect.classifiers.trace import TraceClassifier +from mev_inspect.db import get_inspect_session, get_trace_session +from mev_inspect.inspect_block import inspect_block +from mev_inspect.provider import get_base_provider + +logging.basicConfig(stream=sys.stdout, level=logging.INFO) +logger = logging.getLogger(__name__) + + +class MEVInspector: + def __init__( + self, + rpc: str, + cache: bool, + max_concurrency: int = 1, + request_timeout: int = 300, + ): + if not cache: + logger.info("Skipping cache") + + self.inspect_db_session = get_inspect_session() + self.trace_db_session = get_trace_session() + self.base_provider = get_base_provider(rpc, request_timeout=request_timeout) + self.w3 = Web3(self.base_provider, modules={"eth": (AsyncEth,)}, middlewares=[]) + self.trace_classifier = TraceClassifier() + self.max_concurrency = asyncio.Semaphore(max_concurrency) + + async def inspect_single_block(self, block: int): + return await inspect_block( + self.inspect_db_session, + self.base_provider, + self.w3, + self.trace_classifier, + block, + trace_db_session=self.trace_db_session, + ) + + async def inspect_many_blocks(self, after_block: int, before_block: int): + tasks = [] + for block_number in range(after_block, before_block): + tasks.append( + asyncio.ensure_future( + self.safe_inspect_block(block_number=block_number) + ) + ) + logger.info(f"Gathered {len(tasks)} blocks to inspect") + try: + await asyncio.gather(*tasks) + except CancelledError: + logger.info("Requested to exit, cleaning up...") + except Exception as e: + logger.error(f"Existed due to {type(e)}") + traceback.print_exc() + + async def safe_inspect_block(self, block_number: int): + async with self.max_concurrency: + return await inspect_block( + self.inspect_db_session, + self.base_provider, + self.w3, + self.trace_classifier, + block_number, + trace_db_session=self.trace_db_session, + ) diff --git a/mev_inspect/retry.py b/mev_inspect/retry.py index 51d6892..81cd06d 100644 --- a/mev_inspect/retry.py +++ b/mev_inspect/retry.py @@ -11,6 +11,12 @@ from typing import ( ) from asyncio.exceptions import TimeoutError +from aiohttp.client_exceptions import ( + ClientOSError, + ServerDisconnectedError, + ServerTimeoutError, + ClientResponseError, +) from requests.exceptions import ( ConnectionError, HTTPError, @@ -25,6 +31,14 @@ from web3.types import ( ) +request_exceptions = (ConnectionError, HTTPError, Timeout, TooManyRedirects) +aiohttp_exceptions = ( + ClientOSError, + ServerDisconnectedError, + ServerTimeoutError, + ClientResponseError, +) + logging.basicConfig(stream=sys.stdout, level=logging.INFO) logger = logging.getLogger(__name__) @@ -74,5 +88,5 @@ async def http_retry_with_backoff_request_middleware( return await exception_retry_with_backoff_middleware( make_request, web3, - (ConnectionError, HTTPError, Timeout, TooManyRedirects, TimeoutError), + (request_exceptions + aiohttp_exceptions + (TimeoutError,)), ) From a6bf834e7687a30ffc23f509ecf8ee30c9aa8c8b Mon Sep 17 00:00:00 2001 From: carlomazzaferro Date: Sat, 30 Oct 2021 00:15:23 +0100 Subject: [PATCH 4/4] address PR comments --- mev_inspect/block.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/mev_inspect/block.py b/mev_inspect/block.py index 005fd4f..69c4b79 100644 --- a/mev_inspect/block.py +++ b/mev_inspect/block.py @@ -40,11 +40,7 @@ async def create_from_block_number( return block -async def _fetch_block( - w3, - base_provider, - block_number: int, -) -> Block: +async def _fetch_block(w3, base_provider, block_number: int, retries: int = 0) -> Block: block_json, receipts_json, traces_json, base_fee_per_gas = await asyncio.gather( w3.eth.get_block(block_number), base_provider.make_request("eth_getBlockReceipts", [block_number]), @@ -59,10 +55,13 @@ async def _fetch_block( traces = [Trace(**trace_json) for trace_json in traces_json["result"]] except KeyError as e: logger.warning( - f"Failed to create objects from block: {block_number}: {e}, retrying in 5..." + f"Failed to create objects from block: {block_number}: {e}, retrying: {retries + 1} / 3" ) - await asyncio.sleep(5) - return await _fetch_block(w3, base_provider, block_number) + if retries < 3: + await asyncio.sleep(5) + return await _fetch_block(w3, base_provider, block_number, retries) + else: + raise return Block( block_number=block_number,