From e15eef49c1f59d81caddf4d100a08263c9d7592d Mon Sep 17 00:00:00 2001 From: carlomazzaferro Date: Fri, 22 Oct 2021 13:58:00 +0100 Subject: [PATCH] async based middleware, better logging and async requests --- cli.py | 26 ++++++++++++++++++-------- mev_inspect/block.py | 12 +++++++----- mev_inspect/provider.py | 3 +++ mev_inspect/retry.py | 37 ++++++++++++++++++++++++++++--------- 4 files changed, 56 insertions(+), 22 deletions(-) diff --git a/cli.py b/cli.py index a62f3b5..5146e84 100644 --- a/cli.py +++ b/cli.py @@ -1,7 +1,10 @@ import asyncio import logging import os +import signal import sys +import traceback +from asyncio import CancelledError from functools import wraps import click @@ -12,7 +15,6 @@ from mev_inspect.classifiers.trace import TraceClassifier from mev_inspect.db import get_inspect_session, get_trace_session from mev_inspect.inspect_block import inspect_block from mev_inspect.provider import get_base_provider -from mev_inspect.retry import http_retry_with_backoff_request_middleware RPC_URL_ENV = "RPC_URL" @@ -31,11 +33,17 @@ def coro(f): @wraps(f) def wrapper(*args, **kwargs): loop = asyncio.get_event_loop() + + def cancel_task_callback(): + for task in asyncio.all_tasks(): + task.cancel() + + for sig in (signal.SIGINT, signal.SIGTERM): + loop.add_signal_handler(sig, cancel_task_callback) try: loop.run_until_complete(f(*args, **kwargs)) finally: loop.run_until_complete(loop.shutdown_asyncgens()) - loop.close() return wrapper @@ -95,11 +103,7 @@ async def inspect_many_blocks_command( trace_db_session = get_trace_session() base_provider = get_base_provider(rpc, request_timeout=request_timeout) - w3 = Web3( - base_provider, - modules={"eth": (AsyncEth,)}, - middlewares=[http_retry_with_backoff_request_middleware], - ) + w3 = Web3(base_provider, modules={"eth": (AsyncEth,)}, middlewares=[]) trace_classifier = TraceClassifier() @@ -122,7 +126,13 @@ async def inspect_many_blocks_command( ) ) logger.info(f"Gathered {len(tasks)} blocks to inspect") - await asyncio.gather(*tasks) + try: + await asyncio.gather(*tasks) + except CancelledError: + logger.info("Requested to exit, cleaning up...") + except Exception as e: + logger.error(f"Existed due to {type(e)}") + traceback.print_exc() async def safe_inspect_block( diff --git a/mev_inspect/block.py b/mev_inspect/block.py index 6f47246..7ab3f0b 100644 --- a/mev_inspect/block.py +++ b/mev_inspect/block.py @@ -1,3 +1,4 @@ +import asyncio from pathlib import Path from typing import List, Optional @@ -39,16 +40,17 @@ async def _fetch_block( base_provider, block_number: int, ) -> Block: - block_json = await w3.eth.get_block(block_number) - receipts_json = await base_provider.make_request( - "eth_getBlockReceipts", [block_number] + block_json, receipts_json, traces_json, base_fee_per_gas = await asyncio.gather( + w3.eth.get_block(block_number), + base_provider.make_request("eth_getBlockReceipts", [block_number]), + base_provider.make_request("trace_block", [block_number]), + fetch_base_fee_per_gas(w3, block_number), ) - traces_json = await base_provider.make_request("trace_block", [block_number]) + receipts: List[Receipt] = [ Receipt(**receipt) for receipt in receipts_json["result"] ] traces = [Trace(**trace_json) for trace_json in traces_json["result"]] - base_fee_per_gas = await fetch_base_fee_per_gas(w3, block_number) return Block( block_number=block_number, diff --git a/mev_inspect/provider.py b/mev_inspect/provider.py index 45d996b..3b930ea 100644 --- a/mev_inspect/provider.py +++ b/mev_inspect/provider.py @@ -1,6 +1,9 @@ from web3 import Web3, AsyncHTTPProvider +from mev_inspect.retry import http_retry_with_backoff_request_middleware + def get_base_provider(rpc: str, request_timeout: int = 500) -> Web3.AsyncHTTPProvider: base_provider = AsyncHTTPProvider(rpc, request_kwargs={"timeout": request_timeout}) + base_provider.middlewares += (http_retry_with_backoff_request_middleware,) return base_provider diff --git a/mev_inspect/retry.py b/mev_inspect/retry.py index 63d2e7a..51d6892 100644 --- a/mev_inspect/retry.py +++ b/mev_inspect/retry.py @@ -1,10 +1,15 @@ -import time +import asyncio +import logging +import random +import sys from typing import ( Any, Callable, Collection, Type, + Coroutine, ) +from asyncio.exceptions import TimeoutError from requests.exceptions import ( ConnectionError, @@ -20,40 +25,54 @@ from web3.types import ( ) +logging.basicConfig(stream=sys.stdout, level=logging.INFO) +logger = logging.getLogger(__name__) + + async def exception_retry_with_backoff_middleware( - make_request: Callable[[RPCEndpoint, Any], RPCResponse], + make_request: Callable[[RPCEndpoint, Any], Any], web3: Web3, # pylint: disable=unused-argument errors: Collection[Type[BaseException]], retries: int = 5, backoff_time_seconds: float = 0.1, -) -> Callable[[RPCEndpoint, Any], RPCResponse]: +) -> Callable[[RPCEndpoint, Any], Coroutine[Any, Any, RPCResponse]]: """ Creates middleware that retries failed HTTP requests. Is a default middleware for HTTPProvider. """ - def middleware(method: RPCEndpoint, params: Any) -> RPCResponse: + async def middleware(method: RPCEndpoint, params: Any) -> RPCResponse: + if check_if_retry_on_failure(method): for i in range(retries): try: - return make_request(method, params) + return await make_request(method, params) # https://github.com/python/mypy/issues/5349 except errors: # type: ignore + logger.error( + f"Request for method {method}, block: {int(params[0], 16)}, retrying: {i}/{retries}" + ) if i < retries - 1: - time.sleep(backoff_time_seconds) + backoff_time = backoff_time_seconds * ( + random.uniform(5, 10) ** i + ) + await asyncio.sleep(backoff_time) continue + else: raise return None else: - return make_request(method, params) + return await make_request(method, params) return middleware async def http_retry_with_backoff_request_middleware( make_request: Callable[[RPCEndpoint, Any], Any], web3: Web3 -) -> Callable[[RPCEndpoint, Any], Any]: +) -> Callable[[RPCEndpoint, Any], Coroutine[Any, Any, RPCResponse]]: return await exception_retry_with_backoff_middleware( - make_request, web3, (ConnectionError, HTTPError, Timeout, TooManyRedirects) + make_request, + web3, + (ConnectionError, HTTPError, Timeout, TooManyRedirects, TimeoutError), )