Support async for listener

This commit is contained in:
Luke Van Seters 2021-11-09 11:51:43 -05:00
parent e0d6919039
commit 7b60488f76
4 changed files with 47 additions and 46 deletions

23
cli.py
View File

@ -1,10 +1,8 @@
import asyncio
import os
import signal
from functools import wraps
import click
from mev_inspect.concurrency import coro
from mev_inspect.db import get_inspect_session, get_trace_session
from mev_inspect.inspector import MEVInspector
@ -16,25 +14,6 @@ def cli():
pass
def coro(f):
@wraps(f)
def wrapper(*args, **kwargs):
loop = asyncio.get_event_loop()
def cancel_task_callback():
for task in asyncio.all_tasks():
task.cancel()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, cancel_task_callback)
try:
loop.run_until_complete(f(*args, **kwargs))
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
return wrapper
@cli.command()
@click.argument("block_number", type=int)
@click.option("--rpc", default=lambda: os.environ.get(RPC_URL_ENV, ""))

View File

@ -1,17 +1,15 @@
import asyncio
import logging
import os
import time
from web3 import Web3
from mev_inspect.block import get_latest_block_number
from mev_inspect.concurrency import coro
from mev_inspect.crud.latest_block_update import (
find_latest_block_update,
update_latest_block,
)
from mev_inspect.classifiers.trace import TraceClassifier
from mev_inspect.db import get_inspect_session, get_trace_session
from mev_inspect.inspect_block import inspect_block
from mev_inspect.inspector import MEVInspector
from mev_inspect.provider import get_base_provider
from mev_inspect.signal_handler import GracefulKiller
@ -23,7 +21,8 @@ logger = logging.getLogger(__name__)
BLOCK_NUMBER_LAG = 5
def run():
@coro
async def run():
rpc = os.getenv("RPC_URL")
if rpc is None:
raise RuntimeError("Missing environment variable RPC_URL")
@ -34,21 +33,23 @@ def run():
inspect_db_session = get_inspect_session()
trace_db_session = get_trace_session()
trace_classifier = TraceClassifier()
inspector = MEVInspector(rpc, inspect_db_session, trace_db_session)
base_provider = get_base_provider(rpc)
w3 = Web3(base_provider)
latest_block_number = get_latest_block_number(w3)
latest_block_number = await get_latest_block_number(base_provider)
while not killer.kill_now:
last_written_block = find_latest_block_update(inspect_db_session)
logger.info(f"Latest block: {latest_block_number}")
logger.info(f"Last written block: {last_written_block}")
if (last_written_block is None) or (
last_written_block < (latest_block_number - BLOCK_NUMBER_LAG)
):
if last_written_block is None:
# maintain lag if no blocks written yet
last_written_block = latest_block_number - 1
if last_written_block < (latest_block_number - BLOCK_NUMBER_LAG):
block_number = (
latest_block_number
if last_written_block is None
@ -57,18 +58,11 @@ def run():
logger.info(f"Writing block: {block_number}")
inspect_block(
inspect_db_session,
base_provider,
w3,
trace_classifier,
block_number,
trace_db_session=trace_db_session,
)
await inspector.inspect_single_block(block=block_number)
update_latest_block(inspect_db_session, block_number)
else:
time.sleep(5)
latest_block_number = get_latest_block_number(w3)
await asyncio.sleep(5)
latest_block_number = await get_latest_block_number(base_provider)
logger.info("Stopping...")

View File

@ -11,6 +11,7 @@ from mev_inspect.fees import fetch_base_fee_per_gas
from mev_inspect.schemas.blocks import Block
from mev_inspect.schemas.receipts import Receipt
from mev_inspect.schemas.traces import Trace, TraceType
from mev_inspect.utils import hex_to_int
cache_directory = "./cache"
@ -18,8 +19,13 @@ logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logger = logging.getLogger(__name__)
def get_latest_block_number(w3: Web3) -> int:
return int(w3.eth.get_block("latest")["number"])
async def get_latest_block_number(base_provider) -> int:
latest_block = await base_provider.make_request(
"eth_getBlockByNumber",
["latest", False],
)
return hex_to_int(latest_block["result"]["number"])
async def create_from_block_number(

View File

@ -0,0 +1,22 @@
import asyncio
import signal
from functools import wraps
def coro(f):
@wraps(f)
def wrapper(*args, **kwargs):
loop = asyncio.get_event_loop()
def cancel_task_callback():
for task in asyncio.all_tasks():
task.cancel()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, cancel_task_callback)
try:
loop.run_until_complete(f(*args, **kwargs))
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
return wrapper