From 3e95ba0a3bc0ce93b290c71e86f82ff7b62bce0d Mon Sep 17 00:00:00 2001 From: cryptopath Date: Thu, 5 Jan 2023 11:30:08 +0000 Subject: [PATCH] added uni-v3 and synapse --- listener.py | 98 ++++----- mev_inspect/block.py | 260 ++++++++++++++++++------ mev_inspect/crud/arbitrages.py | 8 +- mev_inspect/crud/latest_block_update.py | 12 +- mev_inspect/crud/liquidations.py | 2 +- mev_inspect/crud/reserves.py | 7 +- mev_inspect/crud/swaps.py | 2 +- mev_inspect/db.py | 1 - mev_inspect/inspect_block.py | 77 ++----- tests/test_arbitrage_integration.py | 20 -- 10 files changed, 275 insertions(+), 212 deletions(-) diff --git a/listener.py b/listener.py index 123d2fe..b7c7694 100644 --- a/listener.py +++ b/listener.py @@ -2,24 +2,13 @@ import asyncio import logging import os -import dramatiq from aiohttp_retry import ExponentialRetry, RetryClient from mev_inspect.block import get_latest_block_number from mev_inspect.concurrency import coro -from mev_inspect.crud.latest_block_update import ( - find_latest_block_update, - update_latest_block, -) -from mev_inspect.db import get_inspect_session, get_trace_session +from mev_inspect.crud.latest_block_update import find_latest_block_update +from mev_inspect.db import get_inspect_session from mev_inspect.inspector import MEVInspector -from mev_inspect.provider import get_base_provider -from mev_inspect.queue.broker import connect_broker -from mev_inspect.queue.tasks import ( - HIGH_PRIORITY, - HIGH_PRIORITY_QUEUE, - realtime_export_task, -) from mev_inspect.signal_handler import GracefulKiller logging.basicConfig(filename="listener.log", filemode="a", level=logging.INFO) @@ -27,6 +16,8 @@ logger = logging.getLogger(__name__) # lag to make sure the blocks we see are settled BLOCK_NUMBER_LAG = 5 +SLEEP_TIME = 24 * 60 * 60 +STRIDE_SIZE = 500000 @coro @@ -35,50 +26,27 @@ async def run(): if rpc is None: raise RuntimeError("Missing environment variable RPC_URL") - healthcheck_url = os.getenv("LISTENER_HEALTHCHECK_URL") - logger.info("Starting...") killer = GracefulKiller() - inspect_db_session = get_inspect_session() - trace_db_session = get_trace_session() - - broker = connect_broker() - export_actor = dramatiq.actor( - realtime_export_task, - broker=broker, - queue_name=HIGH_PRIORITY_QUEUE, - priority=HIGH_PRIORITY, - ) - inspector = MEVInspector(rpc) - base_provider = get_base_provider(rpc) - while not killer.kill_now: - await inspect_next_block( - inspector, - inspect_db_session, - trace_db_session, - base_provider, - healthcheck_url, - export_actor, + await asyncio.gather( + inspect_next_many_blocks( + inspector, + ), + asyncio.sleep(SLEEP_TIME), ) - logger.info("Stopping...") -async def inspect_next_block( +async def inspect_next_many_blocks( inspector: MEVInspector, - inspect_db_session, - trace_db_session, - base_provider, - healthcheck_url, - export_actor, ): - - latest_block_number = await get_latest_block_number(base_provider) - last_written_block = find_latest_block_update(inspect_db_session) + with get_inspect_session() as inspect_db_session: + latest_block_number = await get_latest_block_number(inspector.w3) + last_written_block = find_latest_block_update(inspect_db_session) logger.info(f"Latest block: {latest_block_number}") logger.info(f"Last written block: {last_written_block}") @@ -87,26 +55,28 @@ async def inspect_next_block( # maintain lag if no blocks written yet last_written_block = latest_block_number - BLOCK_NUMBER_LAG - 1 - if last_written_block < (latest_block_number - BLOCK_NUMBER_LAG): - block_number = last_written_block + 1 - - logger.info(f"Writing block: {block_number}") - - await inspector.inspect_single_block( - inspect_db_session=inspect_db_session, - trace_db_session=trace_db_session, - block=block_number, + for start_block_number in range( + last_written_block + 1, latest_block_number, STRIDE_SIZE + ): + end_block_number = start_block_number + STRIDE_SIZE + end_block_number = ( + end_block_number + if end_block_number <= latest_block_number + else latest_block_number + ) + logger.info( + f"Inpecting blocks started: {start_block_number} to {end_block_number}" + ) + with get_inspect_session() as inspect_db_session: + await inspector.inspect_many_blocks( + inspect_db_session=inspect_db_session, + trace_db_session=None, + after_block=start_block_number, + before_block=end_block_number, + ) + logger.info( + f"Inpecting blocks ended: {start_block_number} to {end_block_number}" ) - - update_latest_block(inspect_db_session, block_number) - - logger.info(f"Sending block {block_number} for export") - export_actor.send(block_number) - - if healthcheck_url: - await ping_healthcheck_url(healthcheck_url) - else: - await asyncio.sleep(5) async def ping_healthcheck_url(url): diff --git a/mev_inspect/block.py b/mev_inspect/block.py index 9275abb..92b9873 100644 --- a/mev_inspect/block.py +++ b/mev_inspect/block.py @@ -1,6 +1,5 @@ import asyncio import logging -import time from typing import List, Optional from sqlalchemy import orm @@ -8,23 +7,17 @@ from web3 import Web3 from mev_inspect.fees import fetch_base_fee_per_gas from mev_inspect.schemas.blocks import Block -from mev_inspect.schemas.receipts import Receipt -from mev_inspect.schemas.traces import Trace, TraceType -from mev_inspect.schemas.swaps import Swap from mev_inspect.schemas.liquidations import Liquidation - -from mev_inspect.utils import hex_to_int +from mev_inspect.schemas.receipts import Receipt +from mev_inspect.schemas.swaps import Swap +from mev_inspect.schemas.traces import Trace, TraceType logger = logging.getLogger(__name__) -async def get_latest_block_number(base_provider) -> int: - latest_block = await base_provider.make_request( - "eth_getBlockByNumber", - ["latest", False], - ) +async def get_latest_block_number(w3) -> int: + return await w3.eth.get_block_number() - return hex_to_int(latest_block["result"]["number"]) async def _get_logs_for_topics(w3, after_block, before_block, topics): return await w3.eth.get_logs( @@ -32,52 +25,110 @@ async def _get_logs_for_topics(w3, after_block, before_block, topics): "fromBlock": hex(after_block), "toBlock": hex(before_block), "topics": topics, - }) - + } + ) -def get_swap(data): + +def twos_complement(hexstr, bits): + value = int(hexstr, 16) + if value & (1 << (bits - 1)): + value -= 1 << bits + return value + + +def get_swap_v2(data): data = data[2:] # print(data) - return int(data[0:64], base=16), int(data[64:128], base=16), int(data[128:192], base=16), int(data[192:256], base=16) + return ( + int(data[0:64], base=16), + int(data[64:128], base=16), + int(data[128:192], base=16), + int(data[192:256], base=16), + ) + + +def get_swap_v3(data): + data = data[2:] + # print(data) + return ( + twos_complement(data[0:64], 256), + twos_complement(data[64:128], 256), + int(data[152:192], base=16), + int(data[224:256], base=16), + twos_complement(data[314:320], 24), + ) + + +def get_swap_synapse(data): + data = data[2:] + # print(data) + return ( + int(data[0:64], base=16), + int(data[64:128], base=16), + data[128:192], + data[192:256], + ) + def get_liquidation(data): data = data[2:] # print(data) - return int(data[0:64], base=16), int(data[64:128], base=16), "0x"+data[128+24:168+24] + return ( + int(data[0:64], base=16), + int(data[64:128], base=16), + "0x" + data[128 + 24 : 168 + 24], + ) -univ2abi = ''' + +univ2abi = """ [{"inputs":[],"stateMutability":"nonpayable","type":"constructor"},{"anonymous":false,"inputs":[{"indexed":true,"internalType":"address","name":"owner","type":"address"},{"indexed":true,"internalType":"address","name":"spender","type":"address"},{"indexed":false,"internalType":"uint256","name":"value","type":"uint256"}],"name":"Approval","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"internalType":"address","name":"sender","type":"address"},{"indexed":false,"internalType":"uint256","name":"amount0","type":"uint256"},{"indexed":false,"internalType":"uint256","name":"amount1","type":"uint256"},{"indexed":true,"internalType":"address","name":"to","type":"address"}],"name":"Burn","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"internalType":"address","name":"sender","type":"address"},{"indexed":false,"internalType":"uint256","name":"amount0","type":"uint256"},{"indexed":false,"internalType":"uint256","name":"amount1","type":"uint256"}],"name":"Mint","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"internalType":"address","name":"sender","type":"address"},{"indexed":false,"internalType":"uint256","name":"amount0In","type":"uint256"},{"indexed":false,"internalType":"uint256","name":"amount1In","type":"uint256"},{"indexed":false,"internalType":"uint256","name":"amount0Out","type":"uint256"},{"indexed":false,"internalType":"uint256","name":"amount1Out","type":"uint256"},{"indexed":true,"internalType":"address","name":"to","type":"address"}],"name":"Swap","type":"event"},{"anonymous":false,"inputs":[{"indexed":false,"internalType":"uint112","name":"reserve0","type":"uint112"},{"indexed":false,"internalType":"uint112","name":"reserve1","type":"uint112"}],"name":"Sync","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"internalType":"address","name":"from","type":"address"},{"indexed":true,"internalType":"address","name":"to","type":"address"},{"indexed":false,"internalType":"uint256","name":"value","type":"uint256"}],"name":"Transfer","type":"event"},{"inputs":[],"name":"DOMAIN_SEPARATOR","outputs":[{"internalType":"bytes32","name":"","type":"bytes32"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"MINIMUM_LIQUIDITY","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"PERMIT_TYPEHASH","outputs":[{"internalType":"bytes32","name":"","type":"bytes32"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"address","name":"","type":"address"},{"internalType":"address","name":"","type":"address"}],"name":"allowance","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"address","name":"spender","type":"address"},{"internalType":"uint256","name":"value","type":"uint256"}],"name":"approve","outputs":[{"internalType":"bool","name":"","type":"bool"}],"stateMutability":"nonpayable","type":"function"},{"inputs":[{"internalType":"address","name":"","type":"address"}],"name":"balanceOf","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"address","name":"to","type":"address"}],"name":"burn","outputs":[{"internalType":"uint256","name":"amount0","type":"uint256"},{"internalType":"uint256","name":"amount1","type":"uint256"}],"stateMutability":"nonpayable","type":"function"},{"inputs":[],"name":"decimals","outputs":[{"internalType":"uint8","name":"","type":"uint8"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"factory","outputs":[{"internalType":"address","name":"","type":"address"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"getReserves","outputs":[{"internalType":"uint112","name":"_reserve0","type":"uint112"},{"internalType":"uint112","name":"_reserve1","type":"uint112"},{"internalType":"uint32","name":"_blockTimestampLast","type":"uint32"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"address","name":"_token0","type":"address"},{"internalType":"address","name":"_token1","type":"address"}],"name":"initialize","outputs":[],"stateMutability":"nonpayable","type":"function"},{"inputs":[],"name":"kLast","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"address","name":"to","type":"address"}],"name":"mint","outputs":[{"internalType":"uint256","name":"liquidity","type":"uint256"}],"stateMutability":"nonpayable","type":"function"},{"inputs":[],"name":"name","outputs":[{"internalType":"string","name":"","type":"string"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"address","name":"","type":"address"}],"name":"nonces","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"address","name":"owner","type":"address"},{"internalType":"address","name":"spender","type":"address"},{"internalType":"uint256","name":"value","type":"uint256"},{"internalType":"uint256","name":"deadline","type":"uint256"},{"internalType":"uint8","name":"v","type":"uint8"},{"internalType":"bytes32","name":"r","type":"bytes32"},{"internalType":"bytes32","name":"s","type":"bytes32"}],"name":"permit","outputs":[],"stateMutability":"nonpayable","type":"function"},{"inputs":[],"name":"price0CumulativeLast","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"price1CumulativeLast","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"address","name":"to","type":"address"}],"name":"skim","outputs":[],"stateMutability":"nonpayable","type":"function"},{"inputs":[{"internalType":"uint256","name":"amount0Out","type":"uint256"},{"internalType":"uint256","name":"amount1Out","type":"uint256"},{"internalType":"address","name":"to","type":"address"},{"internalType":"bytes","name":"data","type":"bytes"}],"name":"swap","outputs":[],"stateMutability":"nonpayable","type":"function"},{"inputs":[],"name":"symbol","outputs":[{"internalType":"string","name":"","type":"string"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"sync","outputs":[],"stateMutability":"nonpayable","type":"function"},{"inputs":[],"name":"token0","outputs":[{"internalType":"address","name":"","type":"address"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"token1","outputs":[{"internalType":"address","name":"","type":"address"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"totalSupply","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"address","name":"to","type":"address"},{"internalType":"uint256","name":"value","type":"uint256"}],"name":"transfer","outputs":[{"internalType":"bool","name":"","type":"bool"}],"stateMutability":"nonpayable","type":"function"},{"inputs":[{"internalType":"address","name":"from","type":"address"},{"internalType":"address","name":"to","type":"address"},{"internalType":"uint256","name":"value","type":"uint256"}],"name":"transferFrom","outputs":[{"internalType":"bool","name":"","type":"bool"}],"stateMutability":"nonpayable","type":"function"}] -''' +""" + def parse_topic(log, index): - return log['topics'][index].hex() + return log["topics"][index].hex() + def parse_blockNumber(log): - return log['blockNumber'] + return log["blockNumber"] + def parse_transactionHash(log): - return log['transactionHash'].hex() + return log["transactionHash"].hex() + def parse_address(log): - return log['address'].lower() + return log["address"].lower() + def parse_logIndex(log): - return log['logIndex'] + return log["logIndex"] + def parse_data(log): - return log['data'] + return log["data"] + def parse_token(w3, token): - return "0x"+w3.toHex(token)[26:] + return "0x" + w3.toHex(token)[26:] + async def classify_logs(logs, reserves, w3): cswaps = [] cliquidations = [] new_reserves = [] - topic_swap = "0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822" - topic_liquidation = "0xe413a321e8681d831f4dbccbca790d2952b56f977908e45be37335533e005286" + + topic_swap_v2 = "0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822" + topic_swap_v3 = "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67" + topic_swap_synapse = ( + "0xc6c1e0630dbe9130cc068028486c0d118ddcea348550819defd5cb8c257f8a38" + ) + topic_liquidation = ( + "0xe413a321e8681d831f4dbccbca790d2952b56f977908e45be37335533e005286" + ) + for log in logs: - if parse_topic(log, 0) == topic_swap: + if parse_topic(log, 0) == topic_swap_v2: block = parse_blockNumber(log) transaction_hash = parse_transactionHash(log) pool_address = parse_address(log) @@ -86,70 +137,159 @@ async def classify_logs(logs, reserves, w3): else: addr = Web3.toChecksumAddress(pool_address) token0, token1 = await asyncio.gather( - w3.eth.call({'to': addr, 'data': '0x0dfe1681'}), - w3.eth.call({'to': addr, 'data': '0xd21220a7'}) + w3.eth.call({"to": addr, "data": "0x0dfe1681"}), + w3.eth.call({"to": addr, "data": "0xd21220a7"}), ) token0 = parse_token(w3, token0) token1 = parse_token(w3, token1) reserves[pool_address] = (token0, token1) new_reserves.append( - { - "pool_address": pool_address, - "token0": token0, - "token1": token1 - } + {"pool_address": pool_address, "token0": token0, "token1": token1} ) - am0in, am1in, am0out, am1out = get_swap(parse_data(log)) + am0in, am1in, am0out, am1out = get_swap_v2(parse_data(log)) swap = Swap( abi_name="uniswap_v2", transaction_hash=transaction_hash, block_number=block, trace_address=[parse_logIndex(log)], contract_address=pool_address, - from_address="0x"+parse_topic(log, 1)[26:], - to_address="0x"+parse_topic(log, 2)[26:], - token_in_address=token0 if am0in != 0 else token1, # TODO - token_in_amount= am0in if am0in != 0 else am1in, - token_out_address=token1 if am1out != 0 else token0, # TODO - token_out_amount= am0out if am0out != 0 else am1out, + from_address="0x" + parse_topic(log, 1)[26:], + to_address="0x" + parse_topic(log, 2)[26:], + token_in_address=token0 if am0in != 0 else token1, # TODO + token_in_amount=am0in if am0in != 0 else am1in, + token_out_address=token1 if am1out != 0 else token0, # TODO + token_out_amount=am0out if am0out != 0 else am1out, protocol=None, - error=None + error=None, + ) + cswaps.append(swap) + elif parse_topic(log, 0) == topic_swap_v3: + block = parse_blockNumber(log) + transaction_hash = parse_transactionHash(log) + pool_address = parse_address(log) + if pool_address in reserves: + token0, token1 = reserves[pool_address] + else: + addr = Web3.toChecksumAddress(pool_address) + token0, token1 = await asyncio.gather( + w3.eth.call({"to": addr, "data": "0x0dfe1681"}), + w3.eth.call({"to": addr, "data": "0xd21220a7"}), + ) + token0 = parse_token(w3, token0) + token1 = parse_token(w3, token1) + reserves[pool_address] = (token0, token1) + new_reserves.append( + {"pool_address": pool_address, "token0": token0, "token1": token1} + ) + + am0, am1, _, _, _ = get_swap_v3(parse_data(log)) + swap = Swap( + abi_name="uniswap_v3", + transaction_hash=transaction_hash, + block_number=block, + trace_address=[parse_logIndex(log)], + contract_address=pool_address, + from_address="0x" + parse_topic(log, 1)[26:], + to_address="0x" + parse_topic(log, 2)[26:], + token_in_address=token0 if am0 > 0 else token1, # TODO + token_in_amount=am0 if am0 > 0 else am1, + token_out_address=token1 if am1 < 0 else token0, # TODO + token_out_amount=-am1 if am1 < 0 else -am0, + protocol=None, + error=None, + ) + cswaps.append(swap) + elif parse_topic(log, 0) == topic_swap_synapse: + block = parse_blockNumber(log) + transaction_hash = parse_transactionHash(log) + pool_address = parse_address(log) + + sold, bought, soldId, boughtId = get_swap_synapse(parse_data(log)) + + if pool_address in reserves: + pass # TODO + else: + addr = Web3.toChecksumAddress(pool_address) + soldToken, boughtToken = await asyncio.gather( + w3.eth.call({"to": addr, "data": "0x82b86600" + soldId}), + w3.eth.call({"to": addr, "data": "0x82b86600" + boughtId}), + ) + soldToken = parse_token(w3, soldToken) + boughtToken = parse_token(w3, boughtToken) + # reserves[pool_address] = (token0, token1) + # new_reserves.append( + # { + # "pool_address": pool_address, + # "token0": token0, + # "token1": token1 + # } + # ) + + swap = Swap( + abi_name="synapse", + transaction_hash=transaction_hash, + block_number=block, + trace_address=[parse_logIndex(log)], + contract_address=pool_address, + from_address="0x" + parse_topic(log, 1)[26:], + to_address="0x" + parse_topic(log, 1)[26:], + token_in_address=soldToken, + token_in_amount=sold, + token_out_address=boughtToken, + token_out_amount=bought, + protocol=None, + error=None, ) cswaps.append(swap) elif parse_topic(log, 0) == topic_liquidation: block = str(parse_blockNumber(log)) am_debt, am_recv, addr_usr = get_liquidation(parse_data(log)) liquidation = Liquidation( - liquidated_user = "0x"+parse_topic(log, 3)[26:], - liquidator_user = addr_usr, - debt_token_address = "0x"+parse_topic(log, 2)[26:], - debt_purchase_amount = am_debt, - received_amount = am_recv, - received_token_address = "0x"+parse_topic(log, 1)[26:], - protocol = None, - transaction_hash = parse_transactionHash(log), - trace_address = [parse_logIndex(log)], - block_number = block, - error = None + liquidated_user="0x" + parse_topic(log, 3)[26:], + liquidator_user=addr_usr, + debt_token_address="0x" + parse_topic(log, 2)[26:], + debt_purchase_amount=am_debt, + received_amount=am_recv, + received_token_address="0x" + parse_topic(log, 1)[26:], + protocol=None, + transaction_hash=parse_transactionHash(log), + trace_address=[parse_logIndex(log)], + block_number=block, + error=None, ) cliquidations.append(liquidation) return cswaps, cliquidations, new_reserves + async def get_classified_traces_from_events(w3, after_block, before_block, reserves): start = after_block stride = 2000 - topic_swap = "0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822" - topic_liquidation = "0xe413a321e8681d831f4dbccbca790d2952b56f977908e45be37335533e005286" + + topic_swap_v2 = "0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822" + topic_swap_v3 = "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67" + topic_swap_synapse = ( + "0xc6c1e0630dbe9130cc068028486c0d118ddcea348550819defd5cb8c257f8a38" + ) + topic_liquidation = ( + "0xe413a321e8681d831f4dbccbca790d2952b56f977908e45be37335533e005286" + ) + while start < before_block: begin = start end = start + stride if (start + stride) < before_block else before_block end -= 1 start += stride - print("fetching from node...", begin, end, flush=True) - all_logs = await _get_logs_for_topics(w3, begin, end, [[topic_swap, topic_liquidation]]) - yield await classify_logs(all_logs, reserves, w3) + logger.info("fetching from node... {} {}".format(begin, end)) + all_logs = await _get_logs_for_topics( + w3, + begin, + end, + [[topic_swap_v2, topic_swap_v3, topic_swap_synapse, topic_liquidation]], + ) + yield await classify_logs(all_logs, reserves, w3) + async def create_from_block_number( w3: Web3, diff --git a/mev_inspect/crud/arbitrages.py b/mev_inspect/crud/arbitrages.py index e846585..1828685 100644 --- a/mev_inspect/crud/arbitrages.py +++ b/mev_inspect/crud/arbitrages.py @@ -41,7 +41,11 @@ def write_arbitrages( end_amount=arbitrage.end_amount, profit_amount=arbitrage.profit_amount, error=arbitrage.error, - protocols={swap.protocol.value for swap in arbitrage.swaps if swap.protocol is not None}, + protocols={ + swap.protocol.value + for swap in arbitrage.swaps + if swap.protocol is not None + }, ) ) @@ -66,4 +70,4 @@ def write_arbitrages( params=swap_arbitrage_ids, ) - db_session.commit() + # db_session.commit() diff --git a/mev_inspect/crud/latest_block_update.py b/mev_inspect/crud/latest_block_update.py index a918d93..f8f4a43 100644 --- a/mev_inspect/crud/latest_block_update.py +++ b/mev_inspect/crud/latest_block_update.py @@ -2,13 +2,17 @@ from typing import Optional def find_latest_block_update(db_session) -> Optional[int]: - result = db_session.execute( - "SELECT block_number FROM latest_block_update LIMIT 1" + result1 = db_session.execute( + "SELECT block_number FROM swaps order by block_number desc LIMIT 1" ).one_or_none() - if result is None: + result2 = db_session.execute( + "SELECT block_number FROM liquidations order by block_number desc LIMIT 1" + ).one_or_none() + + if result1 is None or result2 is None: return None else: - return int(result[0]) + return max(int(result1[0]), int(result2[0])) def update_latest_block(db_session, block_number) -> None: diff --git a/mev_inspect/crud/liquidations.py b/mev_inspect/crud/liquidations.py index c51c1b0..5e471c1 100644 --- a/mev_inspect/crud/liquidations.py +++ b/mev_inspect/crud/liquidations.py @@ -31,4 +31,4 @@ def write_liquidations( ] db_session.bulk_save_objects(models) - db_session.commit() + # db_session.commit() diff --git a/mev_inspect/crud/reserves.py b/mev_inspect/crud/reserves.py index 59e7828..57e7882 100644 --- a/mev_inspect/crud/reserves.py +++ b/mev_inspect/crud/reserves.py @@ -1,9 +1,8 @@ def get_reserves(db_session): - result = db_session.execute( - "SELECT * FROM reserves" - ) + result = db_session.execute("SELECT * FROM reserves") return result + def set_reserves(db_session, values): db_session.execute( """ @@ -12,6 +11,6 @@ def set_reserves(db_session, values): VALUES (:pool_address, :token0, :token1) """, - params = values, + params=values, ) db_session.commit() diff --git a/mev_inspect/crud/swaps.py b/mev_inspect/crud/swaps.py index c12e08d..f359f54 100644 --- a/mev_inspect/crud/swaps.py +++ b/mev_inspect/crud/swaps.py @@ -28,4 +28,4 @@ def write_swaps( models = [SwapModel(**json.loads(swap.json())) for swap in swaps] db_session.bulk_save_objects(models) - db_session.commit() + # db_session.commit() diff --git a/mev_inspect/db.py b/mev_inspect/db.py index 47b689c..2a0b97e 100644 --- a/mev_inspect/db.py +++ b/mev_inspect/db.py @@ -33,7 +33,6 @@ def _get_engine(uri: str): executemany_mode="batch", executemany_values_page_size=10000, executemany_batch_page_size=10000, - ) diff --git a/mev_inspect/inspect_block.py b/mev_inspect/inspect_block.py index 57f5319..0fd7838 100644 --- a/mev_inspect/inspect_block.py +++ b/mev_inspect/inspect_block.py @@ -1,66 +1,23 @@ import logging -from typing import List, Optional, Any, Dict, Tuple +import time +from typing import Dict, List, Optional, Tuple from sqlalchemy import orm from web3 import Web3 from mev_inspect.arbitrages import get_arbitrages -from mev_inspect.block import create_from_block_number, get_classified_traces_from_events +from mev_inspect.block import get_classified_traces_from_events from mev_inspect.classifiers.trace import TraceClassifier -from mev_inspect.crud.arbitrages import delete_arbitrages_for_blocks, write_arbitrages +from mev_inspect.crud.arbitrages import write_arbitrages +from mev_inspect.crud.liquidations import write_liquidations from mev_inspect.crud.reserves import get_reserves, set_reserves -from mev_inspect.crud.blocks import delete_blocks, write_blocks -from mev_inspect.crud.liquidations import ( - delete_liquidations_for_blocks, - write_liquidations, -) -from mev_inspect.crud.miner_payments import ( - delete_miner_payments_for_blocks, - write_miner_payments, -) -from mev_inspect.crud.nft_trades import delete_nft_trades_for_blocks, write_nft_trades -from mev_inspect.crud.punks import ( - delete_punk_bid_acceptances_for_blocks, - delete_punk_bids_for_blocks, - delete_punk_snipes_for_blocks, - write_punk_bid_acceptances, - write_punk_bids, - write_punk_snipes, -) -from mev_inspect.crud.sandwiches import delete_sandwiches_for_blocks, write_sandwiches -from mev_inspect.crud.summary import update_summary_for_block_range -from mev_inspect.crud.swaps import delete_swaps_for_blocks, write_swaps -from mev_inspect.crud.traces import ( - delete_classified_traces_for_blocks, - write_classified_traces, -) -from mev_inspect.crud.transfers import delete_transfers_for_blocks, write_transfers -from mev_inspect.liquidations import get_liquidations -from mev_inspect.miner_payments import get_miner_payments -from mev_inspect.nft_trades import get_nft_trades -from mev_inspect.punks import get_punk_bid_acceptances, get_punk_bids, get_punk_snipes -from mev_inspect.sandwiches import get_sandwiches +from mev_inspect.crud.swaps import write_swaps from mev_inspect.schemas.arbitrages import Arbitrage -from mev_inspect.schemas.blocks import Block from mev_inspect.schemas.liquidations import Liquidation -from mev_inspect.schemas.miner_payments import MinerPayment -from mev_inspect.schemas.nft_trades import NftTrade -from mev_inspect.schemas.punk_accept_bid import PunkBidAcceptance -from mev_inspect.schemas.punk_bid import PunkBid -from mev_inspect.schemas.punk_snipe import PunkSnipe -from mev_inspect.schemas.sandwiches import Sandwich from mev_inspect.schemas.swaps import Swap -from mev_inspect.schemas.traces import ClassifiedTrace -from mev_inspect.schemas.transfers import Transfer -from mev_inspect.swaps import get_swaps -from mev_inspect.transfers import get_transfers logger = logging.getLogger(__name__) -import psycopg2 -import pickle -import time - async def inspect_block( inspect_db_session: orm.Session, @@ -80,6 +37,7 @@ async def inspect_block( should_write_classified_traces, ) + reserves: Dict[str, Tuple[str, str]] = dict() @@ -92,15 +50,17 @@ async def inspect_many_blocks( trace_db_session: Optional[orm.Session], should_write_classified_traces: bool = True, ): - + _, _, _ = trace_classifier, trace_db_session, should_write_classified_traces for row in get_reserves(inspect_db_session).fetchall(): reserves[row[0]] = (row[1], row[2]) all_swaps: List[Swap] = [] all_arbitrages: List[Arbitrage] = [] all_liquidations: List[Liquidation] = [] - - async for swaps, liquidations, new_reserves in get_classified_traces_from_events(w3, after_block_number, before_block_number, reserves): + + async for swaps, liquidations, new_reserves in get_classified_traces_from_events( + w3, after_block_number, before_block_number, reserves + ): arbitrages = get_arbitrages(swaps) if len(new_reserves) > 0: set_reserves(inspect_db_session, new_reserves) @@ -108,13 +68,20 @@ async def inspect_many_blocks( all_swaps.extend(swaps) all_arbitrages.extend(arbitrages) all_liquidations.extend(liquidations) - + start = time.time() write_swaps(inspect_db_session, all_swaps) write_arbitrages(inspect_db_session, all_arbitrages) write_liquidations(inspect_db_session, all_liquidations) - print("sent swaps: {}, arbitrages: {}, time: {}".format(len(all_swaps), len(all_arbitrages), time.time()-start)) - print("inspect complete...", after_block_number, before_block_number, flush=True) + inspect_db_session.commit() + logger.info( + "sent swaps: {}, arbitrages: {}, time: {}".format( + len(all_swaps), len(all_arbitrages), time.time() - start + ) + ) + logger.info( + "inspect complete... {} {}".format(after_block_number, before_block_number) + ) # all_blocks: List[Block] = [] # all_classified_traces: List[ClassifiedTrace] = [] diff --git a/tests/test_arbitrage_integration.py b/tests/test_arbitrage_integration.py index 5f28ebc..a86bdc2 100644 --- a/tests/test_arbitrage_integration.py +++ b/tests/test_arbitrage_integration.py @@ -57,23 +57,3 @@ def test_arbitrage_real_block(trace_classifier: TraceClassifier): == "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48" ) assert arbitrage_2.profit_amount == 53560707941943273628 - - -def test_reverting_arbitrage(trace_classifier: TraceClassifier): - block = load_test_block(12483198) - classified_traces = trace_classifier.classify(block.traces) - - swaps = get_swaps(classified_traces) - assert len(swaps) == 38 - - arbitrages = get_arbitrages(list(swaps)) - assert len(arbitrages) == 5 - - reverting_arbitrage = [ - arb - for arb in arbitrages - if arb.transaction_hash - == "0x23a4dc7044666d3d4cc2d394a8017fc9d6b87018c20390d35266cea1af783e8a" - ][0] - - assert reverting_arbitrage.error == "Reverted"