Merge pull request #109 from carlomazzaferro/asyncio-backfilling
asyncio-based backfilling
This commit is contained in:
commit
c436c6480e
6
Tiltfile
6
Tiltfile
@ -36,3 +36,9 @@ docker_build_with_restart("mev-inspect-py", ".",
|
|||||||
)
|
)
|
||||||
k8s_yaml(helm('./k8s/mev-inspect', name='mev-inspect'))
|
k8s_yaml(helm('./k8s/mev-inspect', name='mev-inspect'))
|
||||||
k8s_resource(workload="mev-inspect", resource_deps=["postgresql-postgresql"])
|
k8s_resource(workload="mev-inspect", resource_deps=["postgresql-postgresql"])
|
||||||
|
|
||||||
|
local_resource(
|
||||||
|
'pg-port-forward',
|
||||||
|
serve_cmd='kubectl port-forward --namespace default svc/postgresql 5432:5432',
|
||||||
|
resource_deps=["postgresql-postgresql"]
|
||||||
|
)
|
||||||
|
130
cli.py
130
cli.py
@ -1,68 +1,56 @@
|
|||||||
|
import asyncio
|
||||||
import os
|
import os
|
||||||
import logging
|
import signal
|
||||||
import sys
|
from functools import wraps
|
||||||
|
|
||||||
import click
|
import click
|
||||||
from web3 import Web3
|
|
||||||
|
|
||||||
from mev_inspect.classifiers.trace import TraceClassifier
|
|
||||||
from mev_inspect.db import get_inspect_session, get_trace_session
|
|
||||||
from mev_inspect.inspect_block import inspect_block
|
|
||||||
from mev_inspect.provider import get_base_provider
|
|
||||||
from mev_inspect.block import create_from_block_number
|
|
||||||
|
|
||||||
|
from mev_inspect.inspector import MEVInspector
|
||||||
|
|
||||||
RPC_URL_ENV = "RPC_URL"
|
RPC_URL_ENV = "RPC_URL"
|
||||||
|
|
||||||
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
@click.group()
|
@click.group()
|
||||||
def cli():
|
def cli():
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def coro(f):
|
||||||
|
@wraps(f)
|
||||||
|
def wrapper(*args, **kwargs):
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
|
||||||
|
def cancel_task_callback():
|
||||||
|
for task in asyncio.all_tasks():
|
||||||
|
task.cancel()
|
||||||
|
|
||||||
|
for sig in (signal.SIGINT, signal.SIGTERM):
|
||||||
|
loop.add_signal_handler(sig, cancel_task_callback)
|
||||||
|
try:
|
||||||
|
loop.run_until_complete(f(*args, **kwargs))
|
||||||
|
finally:
|
||||||
|
loop.run_until_complete(loop.shutdown_asyncgens())
|
||||||
|
|
||||||
|
return wrapper
|
||||||
|
|
||||||
|
|
||||||
@cli.command()
|
@cli.command()
|
||||||
@click.argument("block_number", type=int)
|
@click.argument("block_number", type=int)
|
||||||
@click.option("--rpc", default=lambda: os.environ.get(RPC_URL_ENV, ""))
|
@click.option("--rpc", default=lambda: os.environ.get(RPC_URL_ENV, ""))
|
||||||
@click.option("--cache/--no-cache", default=True)
|
@click.option("--cache/--no-cache", default=True)
|
||||||
def inspect_block_command(block_number: int, rpc: str, cache: bool):
|
@coro
|
||||||
inspect_db_session = get_inspect_session()
|
async def inspect_block_command(block_number: int, rpc: str, cache: bool):
|
||||||
trace_db_session = get_trace_session()
|
inspector = MEVInspector(rpc=rpc, cache=cache)
|
||||||
|
await inspector.inspect_single_block(block=block_number)
|
||||||
base_provider = get_base_provider(rpc)
|
|
||||||
w3 = Web3(base_provider)
|
|
||||||
trace_classifier = TraceClassifier()
|
|
||||||
|
|
||||||
if not cache:
|
|
||||||
logger.info("Skipping cache")
|
|
||||||
|
|
||||||
inspect_block(
|
|
||||||
inspect_db_session,
|
|
||||||
base_provider,
|
|
||||||
w3,
|
|
||||||
trace_classifier,
|
|
||||||
block_number,
|
|
||||||
trace_db_session=trace_db_session,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@cli.command()
|
@cli.command()
|
||||||
@click.argument("block_number", type=int)
|
@click.argument("block_number", type=int)
|
||||||
@click.option("--rpc", default=lambda: os.environ.get(RPC_URL_ENV, ""))
|
@click.option("--rpc", default=lambda: os.environ.get(RPC_URL_ENV, ""))
|
||||||
def fetch_block_command(block_number: int, rpc: str):
|
@coro
|
||||||
base_provider = get_base_provider(rpc)
|
async def fetch_block_command(block_number: int, rpc: str):
|
||||||
w3 = Web3(base_provider)
|
inspector = MEVInspector(rpc=rpc)
|
||||||
trace_db_session = get_trace_session()
|
block = await inspector.create_from_block(block_number=block_number)
|
||||||
|
|
||||||
block = create_from_block_number(
|
|
||||||
base_provider,
|
|
||||||
w3,
|
|
||||||
block_number,
|
|
||||||
trace_db_session=trace_db_session,
|
|
||||||
)
|
|
||||||
|
|
||||||
print(block.json())
|
print(block.json())
|
||||||
|
|
||||||
|
|
||||||
@ -71,37 +59,33 @@ def fetch_block_command(block_number: int, rpc: str):
|
|||||||
@click.argument("before_block", type=int)
|
@click.argument("before_block", type=int)
|
||||||
@click.option("--rpc", default=lambda: os.environ.get(RPC_URL_ENV, ""))
|
@click.option("--rpc", default=lambda: os.environ.get(RPC_URL_ENV, ""))
|
||||||
@click.option("--cache/--no-cache", default=True)
|
@click.option("--cache/--no-cache", default=True)
|
||||||
def inspect_many_blocks_command(
|
@click.option(
|
||||||
after_block: int, before_block: int, rpc: str, cache: bool
|
"--max-concurrency",
|
||||||
|
type=int,
|
||||||
|
help="maximum number of concurrent connections",
|
||||||
|
default=5,
|
||||||
|
)
|
||||||
|
@click.option(
|
||||||
|
"--request-timeout", type=int, help="timeout for requests to nodes", default=500
|
||||||
|
)
|
||||||
|
@coro
|
||||||
|
async def inspect_many_blocks_command(
|
||||||
|
after_block: int,
|
||||||
|
before_block: int,
|
||||||
|
rpc: str,
|
||||||
|
cache: bool,
|
||||||
|
max_concurrency: int,
|
||||||
|
request_timeout: int,
|
||||||
):
|
):
|
||||||
|
inspector = MEVInspector(
|
||||||
inspect_db_session = get_inspect_session()
|
rpc=rpc,
|
||||||
trace_db_session = get_trace_session()
|
cache=cache,
|
||||||
|
max_concurrency=max_concurrency,
|
||||||
base_provider = get_base_provider(rpc)
|
request_timeout=request_timeout,
|
||||||
w3 = Web3(base_provider)
|
)
|
||||||
trace_classifier = TraceClassifier()
|
await inspector.inspect_many_blocks(
|
||||||
|
after_block=after_block, before_block=before_block
|
||||||
if not cache:
|
)
|
||||||
logger.info("Skipping cache")
|
|
||||||
|
|
||||||
for i, block_number in enumerate(range(after_block, before_block)):
|
|
||||||
block_message = (
|
|
||||||
f"Running for {block_number} ({i+1}/{before_block - after_block})"
|
|
||||||
)
|
|
||||||
dashes = "-" * len(block_message)
|
|
||||||
logger.info(dashes)
|
|
||||||
logger.info(block_message)
|
|
||||||
logger.info(dashes)
|
|
||||||
|
|
||||||
inspect_block(
|
|
||||||
inspect_db_session,
|
|
||||||
base_provider,
|
|
||||||
w3,
|
|
||||||
trace_classifier,
|
|
||||||
block_number,
|
|
||||||
trace_db_session=trace_db_session,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def get_rpc_url() -> str:
|
def get_rpc_url() -> str:
|
||||||
|
@ -1,3 +1,6 @@
|
|||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
import sys
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import List, Optional
|
from typing import List, Optional
|
||||||
|
|
||||||
@ -11,13 +14,15 @@ from mev_inspect.schemas.traces import Trace, TraceType
|
|||||||
|
|
||||||
|
|
||||||
cache_directory = "./cache"
|
cache_directory = "./cache"
|
||||||
|
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def get_latest_block_number(w3: Web3) -> int:
|
def get_latest_block_number(w3: Web3) -> int:
|
||||||
return int(w3.eth.get_block("latest")["number"])
|
return int(w3.eth.get_block("latest")["number"])
|
||||||
|
|
||||||
|
|
||||||
def create_from_block_number(
|
async def create_from_block_number(
|
||||||
base_provider,
|
base_provider,
|
||||||
w3: Web3,
|
w3: Web3,
|
||||||
block_number: int,
|
block_number: int,
|
||||||
@ -29,25 +34,34 @@ def create_from_block_number(
|
|||||||
block = _find_block(trace_db_session, block_number)
|
block = _find_block(trace_db_session, block_number)
|
||||||
|
|
||||||
if block is None:
|
if block is None:
|
||||||
return _fetch_block(w3, base_provider, block_number)
|
block = await _fetch_block(w3, base_provider, block_number)
|
||||||
|
return block
|
||||||
else:
|
else:
|
||||||
return block
|
return block
|
||||||
|
|
||||||
|
|
||||||
def _fetch_block(
|
async def _fetch_block(w3, base_provider, block_number: int, retries: int = 0) -> Block:
|
||||||
w3,
|
block_json, receipts_json, traces_json, base_fee_per_gas = await asyncio.gather(
|
||||||
base_provider,
|
w3.eth.get_block(block_number),
|
||||||
block_number: int,
|
base_provider.make_request("eth_getBlockReceipts", [block_number]),
|
||||||
) -> Block:
|
base_provider.make_request("trace_block", [block_number]),
|
||||||
block_json = w3.eth.get_block(block_number)
|
fetch_base_fee_per_gas(w3, block_number),
|
||||||
receipts_json = base_provider.make_request("eth_getBlockReceipts", [block_number])
|
)
|
||||||
traces_json = w3.parity.trace_block(block_number)
|
|
||||||
|
|
||||||
receipts: List[Receipt] = [
|
try:
|
||||||
Receipt(**receipt) for receipt in receipts_json["result"]
|
receipts: List[Receipt] = [
|
||||||
]
|
Receipt(**receipt) for receipt in receipts_json["result"]
|
||||||
traces = [Trace(**trace_json) for trace_json in traces_json]
|
]
|
||||||
base_fee_per_gas = fetch_base_fee_per_gas(w3, block_number)
|
traces = [Trace(**trace_json) for trace_json in traces_json["result"]]
|
||||||
|
except KeyError as e:
|
||||||
|
logger.warning(
|
||||||
|
f"Failed to create objects from block: {block_number}: {e}, retrying: {retries + 1} / 3"
|
||||||
|
)
|
||||||
|
if retries < 3:
|
||||||
|
await asyncio.sleep(5)
|
||||||
|
return await _fetch_block(w3, base_provider, block_number, retries)
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
|
||||||
return Block(
|
return Block(
|
||||||
block_number=block_number,
|
block_number=block_number,
|
||||||
|
@ -1,9 +1,10 @@
|
|||||||
from web3 import Web3
|
from web3 import Web3
|
||||||
|
|
||||||
|
|
||||||
def fetch_base_fee_per_gas(w3: Web3, block_number: int) -> int:
|
async def fetch_base_fee_per_gas(w3: Web3, block_number: int) -> int:
|
||||||
base_fees = w3.eth.fee_history(1, block_number)["baseFeePerGas"]
|
base_fees = await w3.eth.fee_history(1, block_number)
|
||||||
if len(base_fees) == 0:
|
base_fees_per_gas = base_fees["baseFeePerGas"]
|
||||||
|
if len(base_fees_per_gas) == 0:
|
||||||
raise RuntimeError("Unexpected error - no fees returned")
|
raise RuntimeError("Unexpected error - no fees returned")
|
||||||
|
|
||||||
return base_fees[0]
|
return base_fees_per_gas[0]
|
||||||
|
@ -35,7 +35,7 @@ from mev_inspect.liquidations import get_liquidations
|
|||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def inspect_block(
|
async def inspect_block(
|
||||||
inspect_db_session: orm.Session,
|
inspect_db_session: orm.Session,
|
||||||
base_provider,
|
base_provider,
|
||||||
w3: Web3,
|
w3: Web3,
|
||||||
@ -44,47 +44,49 @@ def inspect_block(
|
|||||||
trace_db_session: Optional[orm.Session],
|
trace_db_session: Optional[orm.Session],
|
||||||
should_write_classified_traces: bool = True,
|
should_write_classified_traces: bool = True,
|
||||||
):
|
):
|
||||||
block = create_from_block_number(
|
block = await create_from_block_number(
|
||||||
base_provider,
|
base_provider,
|
||||||
w3,
|
w3,
|
||||||
block_number,
|
block_number,
|
||||||
trace_db_session,
|
trace_db_session,
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.info(f"Total traces: {len(block.traces)}")
|
logger.info(f"Block: {block_number} -- Total traces: {len(block.traces)}")
|
||||||
|
|
||||||
total_transactions = len(
|
total_transactions = len(
|
||||||
set(t.transaction_hash for t in block.traces if t.transaction_hash is not None)
|
set(t.transaction_hash for t in block.traces if t.transaction_hash is not None)
|
||||||
)
|
)
|
||||||
logger.info(f"Total transactions: {total_transactions}")
|
logger.info(f"Block: {block_number} -- Total transactions: {total_transactions}")
|
||||||
|
|
||||||
classified_traces = trace_clasifier.classify(block.traces)
|
classified_traces = trace_clasifier.classify(block.traces)
|
||||||
logger.info(f"Returned {len(classified_traces)} classified traces")
|
logger.info(
|
||||||
|
f"Block: {block_number} -- Returned {len(classified_traces)} classified traces"
|
||||||
|
)
|
||||||
|
|
||||||
if should_write_classified_traces:
|
if should_write_classified_traces:
|
||||||
delete_classified_traces_for_block(inspect_db_session, block_number)
|
delete_classified_traces_for_block(inspect_db_session, block_number)
|
||||||
write_classified_traces(inspect_db_session, classified_traces)
|
write_classified_traces(inspect_db_session, classified_traces)
|
||||||
|
|
||||||
transfers = get_transfers(classified_traces)
|
transfers = get_transfers(classified_traces)
|
||||||
logger.info(f"Found {len(transfers)} transfers")
|
logger.info(f"Block: {block_number} -- Found {len(transfers)} transfers")
|
||||||
|
|
||||||
delete_transfers_for_block(inspect_db_session, block_number)
|
delete_transfers_for_block(inspect_db_session, block_number)
|
||||||
write_transfers(inspect_db_session, transfers)
|
write_transfers(inspect_db_session, transfers)
|
||||||
|
|
||||||
swaps = get_swaps(classified_traces)
|
swaps = get_swaps(classified_traces)
|
||||||
logger.info(f"Found {len(swaps)} swaps")
|
logger.info(f"Block: {block_number} -- Found {len(swaps)} swaps")
|
||||||
|
|
||||||
delete_swaps_for_block(inspect_db_session, block_number)
|
delete_swaps_for_block(inspect_db_session, block_number)
|
||||||
write_swaps(inspect_db_session, swaps)
|
write_swaps(inspect_db_session, swaps)
|
||||||
|
|
||||||
arbitrages = get_arbitrages(swaps)
|
arbitrages = get_arbitrages(swaps)
|
||||||
logger.info(f"Found {len(arbitrages)} arbitrages")
|
logger.info(f"Block: {block_number} -- Found {len(arbitrages)} arbitrages")
|
||||||
|
|
||||||
delete_arbitrages_for_block(inspect_db_session, block_number)
|
delete_arbitrages_for_block(inspect_db_session, block_number)
|
||||||
write_arbitrages(inspect_db_session, arbitrages)
|
write_arbitrages(inspect_db_session, arbitrages)
|
||||||
|
|
||||||
liquidations = get_liquidations(classified_traces)
|
liquidations = get_liquidations(classified_traces)
|
||||||
logger.info(f"Found {len(liquidations)} liquidations")
|
logger.info(f"Block: {block_number} -- Found {len(liquidations)} liquidations")
|
||||||
|
|
||||||
delete_liquidations_for_block(inspect_db_session, block_number)
|
delete_liquidations_for_block(inspect_db_session, block_number)
|
||||||
write_liquidations(inspect_db_session, liquidations)
|
write_liquidations(inspect_db_session, liquidations)
|
||||||
|
82
mev_inspect/inspector.py
Normal file
82
mev_inspect/inspector.py
Normal file
@ -0,0 +1,82 @@
|
|||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
import sys
|
||||||
|
import traceback
|
||||||
|
from asyncio import CancelledError
|
||||||
|
|
||||||
|
from web3 import Web3
|
||||||
|
from web3.eth import AsyncEth
|
||||||
|
|
||||||
|
from mev_inspect.block import create_from_block_number
|
||||||
|
from mev_inspect.classifiers.trace import TraceClassifier
|
||||||
|
from mev_inspect.db import get_inspect_session, get_trace_session
|
||||||
|
from mev_inspect.inspect_block import inspect_block
|
||||||
|
from mev_inspect.provider import get_base_provider
|
||||||
|
|
||||||
|
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class MEVInspector:
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
rpc: str,
|
||||||
|
cache: bool = False,
|
||||||
|
max_concurrency: int = 1,
|
||||||
|
request_timeout: int = 300,
|
||||||
|
):
|
||||||
|
if not cache:
|
||||||
|
logger.info("Skipping cache")
|
||||||
|
|
||||||
|
self.inspect_db_session = get_inspect_session()
|
||||||
|
self.trace_db_session = get_trace_session()
|
||||||
|
self.base_provider = get_base_provider(rpc, request_timeout=request_timeout)
|
||||||
|
self.w3 = Web3(self.base_provider, modules={"eth": (AsyncEth,)}, middlewares=[])
|
||||||
|
self.trace_classifier = TraceClassifier()
|
||||||
|
self.max_concurrency = asyncio.Semaphore(max_concurrency)
|
||||||
|
|
||||||
|
async def create_from_block(self, block_number: int):
|
||||||
|
return await create_from_block_number(
|
||||||
|
base_provider=self.base_provider,
|
||||||
|
w3=self.w3,
|
||||||
|
block_number=block_number,
|
||||||
|
trace_db_session=self.trace_db_session,
|
||||||
|
)
|
||||||
|
|
||||||
|
async def inspect_single_block(self, block: int):
|
||||||
|
return await inspect_block(
|
||||||
|
self.inspect_db_session,
|
||||||
|
self.base_provider,
|
||||||
|
self.w3,
|
||||||
|
self.trace_classifier,
|
||||||
|
block,
|
||||||
|
trace_db_session=self.trace_db_session,
|
||||||
|
)
|
||||||
|
|
||||||
|
async def inspect_many_blocks(self, after_block: int, before_block: int):
|
||||||
|
tasks = []
|
||||||
|
for block_number in range(after_block, before_block):
|
||||||
|
tasks.append(
|
||||||
|
asyncio.ensure_future(
|
||||||
|
self.safe_inspect_block(block_number=block_number)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
logger.info(f"Gathered {len(tasks)} blocks to inspect")
|
||||||
|
try:
|
||||||
|
await asyncio.gather(*tasks)
|
||||||
|
except CancelledError:
|
||||||
|
logger.info("Requested to exit, cleaning up...")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Existed due to {type(e)}")
|
||||||
|
traceback.print_exc()
|
||||||
|
|
||||||
|
async def safe_inspect_block(self, block_number: int):
|
||||||
|
async with self.max_concurrency:
|
||||||
|
return await inspect_block(
|
||||||
|
self.inspect_db_session,
|
||||||
|
self.base_provider,
|
||||||
|
self.w3,
|
||||||
|
self.trace_classifier,
|
||||||
|
block_number,
|
||||||
|
trace_db_session=self.trace_db_session,
|
||||||
|
)
|
@ -1,14 +1,9 @@
|
|||||||
from web3 import Web3
|
from web3 import Web3, AsyncHTTPProvider
|
||||||
|
|
||||||
from mev_inspect.retry import http_retry_with_backoff_request_middleware
|
from mev_inspect.retry import http_retry_with_backoff_request_middleware
|
||||||
|
|
||||||
|
|
||||||
def get_base_provider(rpc: str) -> Web3.HTTPProvider:
|
def get_base_provider(rpc: str, request_timeout: int = 500) -> Web3.AsyncHTTPProvider:
|
||||||
base_provider = Web3.HTTPProvider(rpc)
|
base_provider = AsyncHTTPProvider(rpc, request_kwargs={"timeout": request_timeout})
|
||||||
base_provider.middlewares.remove("http_retry_request")
|
base_provider.middlewares += (http_retry_with_backoff_request_middleware,)
|
||||||
base_provider.middlewares.add(
|
|
||||||
http_retry_with_backoff_request_middleware,
|
|
||||||
"http_retry_with_backoff",
|
|
||||||
)
|
|
||||||
|
|
||||||
return base_provider
|
return base_provider
|
||||||
|
@ -1,11 +1,22 @@
|
|||||||
import time
|
import asyncio
|
||||||
|
import logging
|
||||||
|
import random
|
||||||
|
import sys
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
Callable,
|
Callable,
|
||||||
Collection,
|
Collection,
|
||||||
Type,
|
Type,
|
||||||
|
Coroutine,
|
||||||
)
|
)
|
||||||
|
from asyncio.exceptions import TimeoutError
|
||||||
|
|
||||||
|
from aiohttp.client_exceptions import (
|
||||||
|
ClientOSError,
|
||||||
|
ServerDisconnectedError,
|
||||||
|
ServerTimeoutError,
|
||||||
|
ClientResponseError,
|
||||||
|
)
|
||||||
from requests.exceptions import (
|
from requests.exceptions import (
|
||||||
ConnectionError,
|
ConnectionError,
|
||||||
HTTPError,
|
HTTPError,
|
||||||
@ -20,40 +31,62 @@ from web3.types import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def exception_retry_with_backoff_middleware(
|
request_exceptions = (ConnectionError, HTTPError, Timeout, TooManyRedirects)
|
||||||
make_request: Callable[[RPCEndpoint, Any], RPCResponse],
|
aiohttp_exceptions = (
|
||||||
|
ClientOSError,
|
||||||
|
ServerDisconnectedError,
|
||||||
|
ServerTimeoutError,
|
||||||
|
ClientResponseError,
|
||||||
|
)
|
||||||
|
|
||||||
|
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
async def exception_retry_with_backoff_middleware(
|
||||||
|
make_request: Callable[[RPCEndpoint, Any], Any],
|
||||||
web3: Web3, # pylint: disable=unused-argument
|
web3: Web3, # pylint: disable=unused-argument
|
||||||
errors: Collection[Type[BaseException]],
|
errors: Collection[Type[BaseException]],
|
||||||
retries: int = 5,
|
retries: int = 5,
|
||||||
backoff_time_seconds: float = 0.1,
|
backoff_time_seconds: float = 0.1,
|
||||||
) -> Callable[[RPCEndpoint, Any], RPCResponse]:
|
) -> Callable[[RPCEndpoint, Any], Coroutine[Any, Any, RPCResponse]]:
|
||||||
"""
|
"""
|
||||||
Creates middleware that retries failed HTTP requests. Is a default
|
Creates middleware that retries failed HTTP requests. Is a default
|
||||||
middleware for HTTPProvider.
|
middleware for HTTPProvider.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def middleware(method: RPCEndpoint, params: Any) -> RPCResponse:
|
async def middleware(method: RPCEndpoint, params: Any) -> RPCResponse:
|
||||||
|
|
||||||
if check_if_retry_on_failure(method):
|
if check_if_retry_on_failure(method):
|
||||||
for i in range(retries):
|
for i in range(retries):
|
||||||
try:
|
try:
|
||||||
return make_request(method, params)
|
return await make_request(method, params)
|
||||||
# https://github.com/python/mypy/issues/5349
|
# https://github.com/python/mypy/issues/5349
|
||||||
except errors: # type: ignore
|
except errors: # type: ignore
|
||||||
|
logger.error(
|
||||||
|
f"Request for method {method}, block: {int(params[0], 16)}, retrying: {i}/{retries}"
|
||||||
|
)
|
||||||
if i < retries - 1:
|
if i < retries - 1:
|
||||||
time.sleep(backoff_time_seconds)
|
backoff_time = backoff_time_seconds * (
|
||||||
|
random.uniform(5, 10) ** i
|
||||||
|
)
|
||||||
|
await asyncio.sleep(backoff_time)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
else:
|
else:
|
||||||
raise
|
raise
|
||||||
return None
|
return None
|
||||||
else:
|
else:
|
||||||
return make_request(method, params)
|
return await make_request(method, params)
|
||||||
|
|
||||||
return middleware
|
return middleware
|
||||||
|
|
||||||
|
|
||||||
def http_retry_with_backoff_request_middleware(
|
async def http_retry_with_backoff_request_middleware(
|
||||||
make_request: Callable[[RPCEndpoint, Any], Any], web3: Web3
|
make_request: Callable[[RPCEndpoint, Any], Any], web3: Web3
|
||||||
) -> Callable[[RPCEndpoint, Any], Any]:
|
) -> Callable[[RPCEndpoint, Any], Coroutine[Any, Any, RPCResponse]]:
|
||||||
return exception_retry_with_backoff_middleware(
|
return await exception_retry_with_backoff_middleware(
|
||||||
make_request, web3, (ConnectionError, HTTPError, Timeout, TooManyRedirects)
|
make_request,
|
||||||
|
web3,
|
||||||
|
(request_exceptions + aiohttp_exceptions + (TimeoutError,)),
|
||||||
)
|
)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user