async based middleware, better logging and async requests

This commit is contained in:
carlomazzaferro 2021-10-22 13:58:00 +01:00
parent 4f20c540e6
commit e15eef49c1
No known key found for this signature in database
GPG Key ID: 0CED3103EF7B2187
4 changed files with 56 additions and 22 deletions

26
cli.py
View File

@ -1,7 +1,10 @@
import asyncio import asyncio
import logging import logging
import os import os
import signal
import sys import sys
import traceback
from asyncio import CancelledError
from functools import wraps from functools import wraps
import click import click
@ -12,7 +15,6 @@ from mev_inspect.classifiers.trace import TraceClassifier
from mev_inspect.db import get_inspect_session, get_trace_session from mev_inspect.db import get_inspect_session, get_trace_session
from mev_inspect.inspect_block import inspect_block from mev_inspect.inspect_block import inspect_block
from mev_inspect.provider import get_base_provider from mev_inspect.provider import get_base_provider
from mev_inspect.retry import http_retry_with_backoff_request_middleware
RPC_URL_ENV = "RPC_URL" RPC_URL_ENV = "RPC_URL"
@ -31,11 +33,17 @@ def coro(f):
@wraps(f) @wraps(f)
def wrapper(*args, **kwargs): def wrapper(*args, **kwargs):
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
def cancel_task_callback():
for task in asyncio.all_tasks():
task.cancel()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, cancel_task_callback)
try: try:
loop.run_until_complete(f(*args, **kwargs)) loop.run_until_complete(f(*args, **kwargs))
finally: finally:
loop.run_until_complete(loop.shutdown_asyncgens()) loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
return wrapper return wrapper
@ -95,11 +103,7 @@ async def inspect_many_blocks_command(
trace_db_session = get_trace_session() trace_db_session = get_trace_session()
base_provider = get_base_provider(rpc, request_timeout=request_timeout) base_provider = get_base_provider(rpc, request_timeout=request_timeout)
w3 = Web3( w3 = Web3(base_provider, modules={"eth": (AsyncEth,)}, middlewares=[])
base_provider,
modules={"eth": (AsyncEth,)},
middlewares=[http_retry_with_backoff_request_middleware],
)
trace_classifier = TraceClassifier() trace_classifier = TraceClassifier()
@ -122,7 +126,13 @@ async def inspect_many_blocks_command(
) )
) )
logger.info(f"Gathered {len(tasks)} blocks to inspect") logger.info(f"Gathered {len(tasks)} blocks to inspect")
await asyncio.gather(*tasks) try:
await asyncio.gather(*tasks)
except CancelledError:
logger.info("Requested to exit, cleaning up...")
except Exception as e:
logger.error(f"Existed due to {type(e)}")
traceback.print_exc()
async def safe_inspect_block( async def safe_inspect_block(

View File

@ -1,3 +1,4 @@
import asyncio
from pathlib import Path from pathlib import Path
from typing import List, Optional from typing import List, Optional
@ -39,16 +40,17 @@ async def _fetch_block(
base_provider, base_provider,
block_number: int, block_number: int,
) -> Block: ) -> Block:
block_json = await w3.eth.get_block(block_number) block_json, receipts_json, traces_json, base_fee_per_gas = await asyncio.gather(
receipts_json = await base_provider.make_request( w3.eth.get_block(block_number),
"eth_getBlockReceipts", [block_number] base_provider.make_request("eth_getBlockReceipts", [block_number]),
base_provider.make_request("trace_block", [block_number]),
fetch_base_fee_per_gas(w3, block_number),
) )
traces_json = await base_provider.make_request("trace_block", [block_number])
receipts: List[Receipt] = [ receipts: List[Receipt] = [
Receipt(**receipt) for receipt in receipts_json["result"] Receipt(**receipt) for receipt in receipts_json["result"]
] ]
traces = [Trace(**trace_json) for trace_json in traces_json["result"]] traces = [Trace(**trace_json) for trace_json in traces_json["result"]]
base_fee_per_gas = await fetch_base_fee_per_gas(w3, block_number)
return Block( return Block(
block_number=block_number, block_number=block_number,

View File

@ -1,6 +1,9 @@
from web3 import Web3, AsyncHTTPProvider from web3 import Web3, AsyncHTTPProvider
from mev_inspect.retry import http_retry_with_backoff_request_middleware
def get_base_provider(rpc: str, request_timeout: int = 500) -> Web3.AsyncHTTPProvider: def get_base_provider(rpc: str, request_timeout: int = 500) -> Web3.AsyncHTTPProvider:
base_provider = AsyncHTTPProvider(rpc, request_kwargs={"timeout": request_timeout}) base_provider = AsyncHTTPProvider(rpc, request_kwargs={"timeout": request_timeout})
base_provider.middlewares += (http_retry_with_backoff_request_middleware,)
return base_provider return base_provider

View File

@ -1,10 +1,15 @@
import time import asyncio
import logging
import random
import sys
from typing import ( from typing import (
Any, Any,
Callable, Callable,
Collection, Collection,
Type, Type,
Coroutine,
) )
from asyncio.exceptions import TimeoutError
from requests.exceptions import ( from requests.exceptions import (
ConnectionError, ConnectionError,
@ -20,40 +25,54 @@ from web3.types import (
) )
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logger = logging.getLogger(__name__)
async def exception_retry_with_backoff_middleware( async def exception_retry_with_backoff_middleware(
make_request: Callable[[RPCEndpoint, Any], RPCResponse], make_request: Callable[[RPCEndpoint, Any], Any],
web3: Web3, # pylint: disable=unused-argument web3: Web3, # pylint: disable=unused-argument
errors: Collection[Type[BaseException]], errors: Collection[Type[BaseException]],
retries: int = 5, retries: int = 5,
backoff_time_seconds: float = 0.1, backoff_time_seconds: float = 0.1,
) -> Callable[[RPCEndpoint, Any], RPCResponse]: ) -> Callable[[RPCEndpoint, Any], Coroutine[Any, Any, RPCResponse]]:
""" """
Creates middleware that retries failed HTTP requests. Is a default Creates middleware that retries failed HTTP requests. Is a default
middleware for HTTPProvider. middleware for HTTPProvider.
""" """
def middleware(method: RPCEndpoint, params: Any) -> RPCResponse: async def middleware(method: RPCEndpoint, params: Any) -> RPCResponse:
if check_if_retry_on_failure(method): if check_if_retry_on_failure(method):
for i in range(retries): for i in range(retries):
try: try:
return make_request(method, params) return await make_request(method, params)
# https://github.com/python/mypy/issues/5349 # https://github.com/python/mypy/issues/5349
except errors: # type: ignore except errors: # type: ignore
logger.error(
f"Request for method {method}, block: {int(params[0], 16)}, retrying: {i}/{retries}"
)
if i < retries - 1: if i < retries - 1:
time.sleep(backoff_time_seconds) backoff_time = backoff_time_seconds * (
random.uniform(5, 10) ** i
)
await asyncio.sleep(backoff_time)
continue continue
else: else:
raise raise
return None return None
else: else:
return make_request(method, params) return await make_request(method, params)
return middleware return middleware
async def http_retry_with_backoff_request_middleware( async def http_retry_with_backoff_request_middleware(
make_request: Callable[[RPCEndpoint, Any], Any], web3: Web3 make_request: Callable[[RPCEndpoint, Any], Any], web3: Web3
) -> Callable[[RPCEndpoint, Any], Any]: ) -> Callable[[RPCEndpoint, Any], Coroutine[Any, Any, RPCResponse]]:
return await exception_retry_with_backoff_middleware( return await exception_retry_with_backoff_middleware(
make_request, web3, (ConnectionError, HTTPError, Timeout, TooManyRedirects) make_request,
web3,
(ConnectionError, HTTPError, Timeout, TooManyRedirects, TimeoutError),
) )