added uni-v3 and synapse

This commit is contained in:
cryptopath 2023-01-05 11:30:08 +00:00
parent b2172fe0d0
commit 3e95ba0a3b
10 changed files with 275 additions and 212 deletions

View File

@ -2,24 +2,13 @@ import asyncio
import logging
import os
import dramatiq
from aiohttp_retry import ExponentialRetry, RetryClient
from mev_inspect.block import get_latest_block_number
from mev_inspect.concurrency import coro
from mev_inspect.crud.latest_block_update import (
find_latest_block_update,
update_latest_block,
)
from mev_inspect.db import get_inspect_session, get_trace_session
from mev_inspect.crud.latest_block_update import find_latest_block_update
from mev_inspect.db import get_inspect_session
from mev_inspect.inspector import MEVInspector
from mev_inspect.provider import get_base_provider
from mev_inspect.queue.broker import connect_broker
from mev_inspect.queue.tasks import (
HIGH_PRIORITY,
HIGH_PRIORITY_QUEUE,
realtime_export_task,
)
from mev_inspect.signal_handler import GracefulKiller
logging.basicConfig(filename="listener.log", filemode="a", level=logging.INFO)
@ -27,6 +16,8 @@ logger = logging.getLogger(__name__)
# lag to make sure the blocks we see are settled
BLOCK_NUMBER_LAG = 5
SLEEP_TIME = 24 * 60 * 60
STRIDE_SIZE = 500000
@coro
@ -35,50 +26,27 @@ async def run():
if rpc is None:
raise RuntimeError("Missing environment variable RPC_URL")
healthcheck_url = os.getenv("LISTENER_HEALTHCHECK_URL")
logger.info("Starting...")
killer = GracefulKiller()
inspect_db_session = get_inspect_session()
trace_db_session = get_trace_session()
broker = connect_broker()
export_actor = dramatiq.actor(
realtime_export_task,
broker=broker,
queue_name=HIGH_PRIORITY_QUEUE,
priority=HIGH_PRIORITY,
)
inspector = MEVInspector(rpc)
base_provider = get_base_provider(rpc)
while not killer.kill_now:
await inspect_next_block(
inspector,
inspect_db_session,
trace_db_session,
base_provider,
healthcheck_url,
export_actor,
await asyncio.gather(
inspect_next_many_blocks(
inspector,
),
asyncio.sleep(SLEEP_TIME),
)
logger.info("Stopping...")
async def inspect_next_block(
async def inspect_next_many_blocks(
inspector: MEVInspector,
inspect_db_session,
trace_db_session,
base_provider,
healthcheck_url,
export_actor,
):
latest_block_number = await get_latest_block_number(base_provider)
last_written_block = find_latest_block_update(inspect_db_session)
with get_inspect_session() as inspect_db_session:
latest_block_number = await get_latest_block_number(inspector.w3)
last_written_block = find_latest_block_update(inspect_db_session)
logger.info(f"Latest block: {latest_block_number}")
logger.info(f"Last written block: {last_written_block}")
@ -87,26 +55,28 @@ async def inspect_next_block(
# maintain lag if no blocks written yet
last_written_block = latest_block_number - BLOCK_NUMBER_LAG - 1
if last_written_block < (latest_block_number - BLOCK_NUMBER_LAG):
block_number = last_written_block + 1
logger.info(f"Writing block: {block_number}")
await inspector.inspect_single_block(
inspect_db_session=inspect_db_session,
trace_db_session=trace_db_session,
block=block_number,
for start_block_number in range(
last_written_block + 1, latest_block_number, STRIDE_SIZE
):
end_block_number = start_block_number + STRIDE_SIZE
end_block_number = (
end_block_number
if end_block_number <= latest_block_number
else latest_block_number
)
logger.info(
f"Inpecting blocks started: {start_block_number} to {end_block_number}"
)
with get_inspect_session() as inspect_db_session:
await inspector.inspect_many_blocks(
inspect_db_session=inspect_db_session,
trace_db_session=None,
after_block=start_block_number,
before_block=end_block_number,
)
logger.info(
f"Inpecting blocks ended: {start_block_number} to {end_block_number}"
)
update_latest_block(inspect_db_session, block_number)
logger.info(f"Sending block {block_number} for export")
export_actor.send(block_number)
if healthcheck_url:
await ping_healthcheck_url(healthcheck_url)
else:
await asyncio.sleep(5)
async def ping_healthcheck_url(url):

File diff suppressed because one or more lines are too long

View File

@ -41,7 +41,11 @@ def write_arbitrages(
end_amount=arbitrage.end_amount,
profit_amount=arbitrage.profit_amount,
error=arbitrage.error,
protocols={swap.protocol.value for swap in arbitrage.swaps if swap.protocol is not None},
protocols={
swap.protocol.value
for swap in arbitrage.swaps
if swap.protocol is not None
},
)
)
@ -66,4 +70,4 @@ def write_arbitrages(
params=swap_arbitrage_ids,
)
db_session.commit()
# db_session.commit()

View File

@ -2,13 +2,17 @@ from typing import Optional
def find_latest_block_update(db_session) -> Optional[int]:
result = db_session.execute(
"SELECT block_number FROM latest_block_update LIMIT 1"
result1 = db_session.execute(
"SELECT block_number FROM swaps order by block_number desc LIMIT 1"
).one_or_none()
if result is None:
result2 = db_session.execute(
"SELECT block_number FROM liquidations order by block_number desc LIMIT 1"
).one_or_none()
if result1 is None or result2 is None:
return None
else:
return int(result[0])
return max(int(result1[0]), int(result2[0]))
def update_latest_block(db_session, block_number) -> None:

View File

@ -31,4 +31,4 @@ def write_liquidations(
]
db_session.bulk_save_objects(models)
db_session.commit()
# db_session.commit()

View File

@ -1,9 +1,8 @@
def get_reserves(db_session):
result = db_session.execute(
"SELECT * FROM reserves"
)
result = db_session.execute("SELECT * FROM reserves")
return result
def set_reserves(db_session, values):
db_session.execute(
"""
@ -12,6 +11,6 @@ def set_reserves(db_session, values):
VALUES
(:pool_address, :token0, :token1)
""",
params = values,
params=values,
)
db_session.commit()

View File

@ -28,4 +28,4 @@ def write_swaps(
models = [SwapModel(**json.loads(swap.json())) for swap in swaps]
db_session.bulk_save_objects(models)
db_session.commit()
# db_session.commit()

View File

@ -33,7 +33,6 @@ def _get_engine(uri: str):
executemany_mode="batch",
executemany_values_page_size=10000,
executemany_batch_page_size=10000,
)

View File

@ -1,66 +1,23 @@
import logging
from typing import List, Optional, Any, Dict, Tuple
import time
from typing import Dict, List, Optional, Tuple
from sqlalchemy import orm
from web3 import Web3
from mev_inspect.arbitrages import get_arbitrages
from mev_inspect.block import create_from_block_number, get_classified_traces_from_events
from mev_inspect.block import get_classified_traces_from_events
from mev_inspect.classifiers.trace import TraceClassifier
from mev_inspect.crud.arbitrages import delete_arbitrages_for_blocks, write_arbitrages
from mev_inspect.crud.arbitrages import write_arbitrages
from mev_inspect.crud.liquidations import write_liquidations
from mev_inspect.crud.reserves import get_reserves, set_reserves
from mev_inspect.crud.blocks import delete_blocks, write_blocks
from mev_inspect.crud.liquidations import (
delete_liquidations_for_blocks,
write_liquidations,
)
from mev_inspect.crud.miner_payments import (
delete_miner_payments_for_blocks,
write_miner_payments,
)
from mev_inspect.crud.nft_trades import delete_nft_trades_for_blocks, write_nft_trades
from mev_inspect.crud.punks import (
delete_punk_bid_acceptances_for_blocks,
delete_punk_bids_for_blocks,
delete_punk_snipes_for_blocks,
write_punk_bid_acceptances,
write_punk_bids,
write_punk_snipes,
)
from mev_inspect.crud.sandwiches import delete_sandwiches_for_blocks, write_sandwiches
from mev_inspect.crud.summary import update_summary_for_block_range
from mev_inspect.crud.swaps import delete_swaps_for_blocks, write_swaps
from mev_inspect.crud.traces import (
delete_classified_traces_for_blocks,
write_classified_traces,
)
from mev_inspect.crud.transfers import delete_transfers_for_blocks, write_transfers
from mev_inspect.liquidations import get_liquidations
from mev_inspect.miner_payments import get_miner_payments
from mev_inspect.nft_trades import get_nft_trades
from mev_inspect.punks import get_punk_bid_acceptances, get_punk_bids, get_punk_snipes
from mev_inspect.sandwiches import get_sandwiches
from mev_inspect.crud.swaps import write_swaps
from mev_inspect.schemas.arbitrages import Arbitrage
from mev_inspect.schemas.blocks import Block
from mev_inspect.schemas.liquidations import Liquidation
from mev_inspect.schemas.miner_payments import MinerPayment
from mev_inspect.schemas.nft_trades import NftTrade
from mev_inspect.schemas.punk_accept_bid import PunkBidAcceptance
from mev_inspect.schemas.punk_bid import PunkBid
from mev_inspect.schemas.punk_snipe import PunkSnipe
from mev_inspect.schemas.sandwiches import Sandwich
from mev_inspect.schemas.swaps import Swap
from mev_inspect.schemas.traces import ClassifiedTrace
from mev_inspect.schemas.transfers import Transfer
from mev_inspect.swaps import get_swaps
from mev_inspect.transfers import get_transfers
logger = logging.getLogger(__name__)
import psycopg2
import pickle
import time
async def inspect_block(
inspect_db_session: orm.Session,
@ -80,6 +37,7 @@ async def inspect_block(
should_write_classified_traces,
)
reserves: Dict[str, Tuple[str, str]] = dict()
@ -92,15 +50,17 @@ async def inspect_many_blocks(
trace_db_session: Optional[orm.Session],
should_write_classified_traces: bool = True,
):
_, _, _ = trace_classifier, trace_db_session, should_write_classified_traces
for row in get_reserves(inspect_db_session).fetchall():
reserves[row[0]] = (row[1], row[2])
all_swaps: List[Swap] = []
all_arbitrages: List[Arbitrage] = []
all_liquidations: List[Liquidation] = []
async for swaps, liquidations, new_reserves in get_classified_traces_from_events(w3, after_block_number, before_block_number, reserves):
async for swaps, liquidations, new_reserves in get_classified_traces_from_events(
w3, after_block_number, before_block_number, reserves
):
arbitrages = get_arbitrages(swaps)
if len(new_reserves) > 0:
set_reserves(inspect_db_session, new_reserves)
@ -108,13 +68,20 @@ async def inspect_many_blocks(
all_swaps.extend(swaps)
all_arbitrages.extend(arbitrages)
all_liquidations.extend(liquidations)
start = time.time()
write_swaps(inspect_db_session, all_swaps)
write_arbitrages(inspect_db_session, all_arbitrages)
write_liquidations(inspect_db_session, all_liquidations)
print("sent swaps: {}, arbitrages: {}, time: {}".format(len(all_swaps), len(all_arbitrages), time.time()-start))
print("inspect complete...", after_block_number, before_block_number, flush=True)
inspect_db_session.commit()
logger.info(
"sent swaps: {}, arbitrages: {}, time: {}".format(
len(all_swaps), len(all_arbitrages), time.time() - start
)
)
logger.info(
"inspect complete... {} {}".format(after_block_number, before_block_number)
)
# all_blocks: List[Block] = []
# all_classified_traces: List[ClassifiedTrace] = []

View File

@ -57,23 +57,3 @@ def test_arbitrage_real_block(trace_classifier: TraceClassifier):
== "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"
)
assert arbitrage_2.profit_amount == 53560707941943273628
def test_reverting_arbitrage(trace_classifier: TraceClassifier):
block = load_test_block(12483198)
classified_traces = trace_classifier.classify(block.traces)
swaps = get_swaps(classified_traces)
assert len(swaps) == 38
arbitrages = get_arbitrages(list(swaps))
assert len(arbitrages) == 5
reverting_arbitrage = [
arb
for arb in arbitrages
if arb.transaction_hash
== "0x23a4dc7044666d3d4cc2d394a8017fc9d6b87018c20390d35266cea1af783e8a"
][0]
assert reverting_arbitrage.error == "Reverted"