From eaf2698d7e34185180f831cb19d0d8330f98d907 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Eru=20Il=C3=BAvatar?= Date: Thu, 22 Dec 2022 10:31:03 +0000 Subject: [PATCH] feat: add polygon logs processing - Closes #issue-8 (#11) * feat: add polygon logs processing Closes #issue-8 Signed-off-by: Arthurim * fix: remove unused variables Signed-off-by: Arthurim * fix: add inspect db Signed-off-by: Arthurim * fix: remove parenthesis Signed-off-by: Arthurim Signed-off-by: Arthurim --- cli.py | 6 +- listener.py | 6 +- mev_inspect/block.py | 140 ++++++++++++++- mev_inspect/inspect_block.py | 262 +++++++--------------------- mev_inspect/inspector.py | 10 +- mev_inspect/queue/tasks.py | 15 +- mev_inspect/schemas/liquidations.py | 2 +- mev_inspect/schemas/swaps.py | 4 +- 8 files changed, 210 insertions(+), 235 deletions(-) diff --git a/cli.py b/cli.py index b5a6674..f60998e 100644 --- a/cli.py +++ b/cli.py @@ -38,13 +38,11 @@ def cli(): @coro async def inspect_block_command(block_number: int, rpc: str): inspect_db_session = get_inspect_session() - trace_db_session = get_trace_session() inspector = MEVInspector(rpc) await inspector.inspect_single_block( inspect_db_session=inspect_db_session, - trace_db_session=trace_db_session, block=block_number, ) @@ -87,7 +85,6 @@ async def inspect_many_blocks_command( request_timeout: int, ): inspect_db_session = get_inspect_session() - trace_db_session = get_trace_session() inspector = MEVInspector( rpc, @@ -96,7 +93,6 @@ async def inspect_many_blocks_command( ) await inspector.inspect_many_blocks( inspect_db_session=inspect_db_session, - trace_db_session=trace_db_session, after_block=after_block, before_block=before_block, ) @@ -114,7 +110,7 @@ def enqueue_block_list_command(): for block_string in fileinput.input(): block = int(block_string) - logger.info(f"Sending {block} to {block+1}") + logger.info(f"Sending {block} to {block + 1}") inspect_many_blocks_actor.send(block, block + 1) diff --git a/listener.py b/listener.py index 123d2fe..a2021a8 100644 --- a/listener.py +++ b/listener.py @@ -11,7 +11,7 @@ from mev_inspect.crud.latest_block_update import ( find_latest_block_update, update_latest_block, ) -from mev_inspect.db import get_inspect_session, get_trace_session +from mev_inspect.db import get_inspect_session from mev_inspect.inspector import MEVInspector from mev_inspect.provider import get_base_provider from mev_inspect.queue.broker import connect_broker @@ -42,7 +42,6 @@ async def run(): killer = GracefulKiller() inspect_db_session = get_inspect_session() - trace_db_session = get_trace_session() broker = connect_broker() export_actor = dramatiq.actor( @@ -59,7 +58,6 @@ async def run(): await inspect_next_block( inspector, inspect_db_session, - trace_db_session, base_provider, healthcheck_url, export_actor, @@ -71,7 +69,6 @@ async def run(): async def inspect_next_block( inspector: MEVInspector, inspect_db_session, - trace_db_session, base_provider, healthcheck_url, export_actor, @@ -94,7 +91,6 @@ async def inspect_next_block( await inspector.inspect_single_block( inspect_db_session=inspect_db_session, - trace_db_session=trace_db_session, block=block_number, ) diff --git a/mev_inspect/block.py b/mev_inspect/block.py index cf80fa5..1b82a1b 100644 --- a/mev_inspect/block.py +++ b/mev_inspect/block.py @@ -1,25 +1,161 @@ import asyncio import logging -from typing import List, Optional +from typing import Dict, List, Optional, Tuple from sqlalchemy import orm from web3 import Web3 from mev_inspect.fees import fetch_base_fee_per_gas from mev_inspect.schemas.blocks import Block +from mev_inspect.schemas.liquidations import Liquidation from mev_inspect.schemas.receipts import Receipt +from mev_inspect.schemas.swaps import Swap from mev_inspect.schemas.traces import Trace, TraceType from mev_inspect.utils import hex_to_int logger = logging.getLogger(__name__) +TOPIC_SWAP = "0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822" +TOPIC_LIQUIDATION = "0xe413a321e8681d831f4dbccbca790d2952b56f977908e45be37335533e005286" +UNI_TOKEN_0 = "0x0dfe1681" +UNI_TOKEN_1 = "0xd21220a7" + + +async def _get_logs_for_topics(base_provider, after_block, before_block, topics): + logs = await base_provider.make_request( + "eth_getLogs", + [ + { + "fromBlock": hex(after_block), + "toBlock": hex(before_block), + "topics": topics, + } + ], + ) + return logs["result"] + + +def _logs_by_tx(logs): + logs_by_tx = dict() + for log in logs: + transaction_hash = log["transactionHash"] + if transaction_hash in logs_by_tx.keys(): + logs_by_tx[transaction_hash].append(log) + else: + logs_by_tx[transaction_hash] = [log] + return logs_by_tx + + +def get_swap(data): + data = data[2:] + return ( + int(data[0:64], base=16), + int(data[64:128], base=16), + int(data[128:192], base=16), + int(data[192:256], base=16), + ) + + +def get_liquidation(data): + data = data[2:] + return ( + int(data[0:64], base=16), + int(data[64:128], base=16), + "0x" + data[128 + 24 : 168 + 24], + ) + + +async def classify_logs(logs, pool_reserves, w3): + cswaps = [] + cliquidations = [] + + for log in logs: + topic = log["topics"][0] + if topic in [TOPIC_SWAP, TOPIC_LIQUIDATION]: + block = int(log["blockNumber"], 16) + transaction_hash = log["transactionHash"] + trace_address = [int(log["logIndex"], 16)] + first_token = "0x" + log["topics"][1][26:] + second_token = "0x" + log["topics"][2][26:] + if topic == TOPIC_SWAP: + pool_address = log["address"] + if pool_address in pool_reserves: + token0, token1 = pool_reserves[pool_address] + else: + addr = Web3.toChecksumAddress(pool_address) + token0, token1 = await asyncio.gather( + w3.eth.call({"to": addr, "data": UNI_TOKEN_0}), + w3.eth.call({"to": addr, "data": UNI_TOKEN_1}), + ) + token0 = w3.toHex(token0) + token1 = w3.toHex(token1) + pool_reserves[pool_address] = (token0, token1) + + am0in, am1in, am0out, am1out = get_swap(log["data"]) + swap = Swap( + abi_name="uniswap_v2", + transaction_hash=transaction_hash, + block_number=block, + trace_address=trace_address, + contract_address=pool_address, + from_address=first_token, + to_address=second_token, + token_in_address=token0 if am0in != 0 else token1, + token_in_amount=am0in if am0in != 0 else am1in, + token_out_address=token1 if am1out != 0 else token0, + token_out_amount=am0out if am0out != 0 else am1out, + protocol=None, + error=None, + ) + cswaps.append(swap) + elif topic == TOPIC_LIQUIDATION: + block = str(block) + am_debt, am_recv, addr_usr = get_liquidation(log["data"]) + liquidation = Liquidation( + liquidated_user="0x" + log["topics"][3][26:], + liquidator_user=addr_usr, + debt_token_address=second_token, + debt_purchase_amount=am_debt, + received_amount=am_recv, + received_token_address=first_token, + protocol=None, + transaction_hash=transaction_hash, + trace_address=trace_address, + block_number=block, + error=None, + ) + cliquidations.append(liquidation) + + return cswaps, cliquidations + + +reserves: Dict[str, Tuple[str, str]] = dict() + + +async def get_classified_traces_from_events( + w3: Web3, after_block: int, before_block: int +): + base_provider = w3.provider + start = after_block + stride = 300 + while start < before_block: + begin = start + end = start + stride if (start + stride) < before_block else before_block - 1 + start += stride + print("fetching from node...", begin, end, flush=True) + all_logs = await _get_logs_for_topics( + base_provider, begin, end, [[TOPIC_SWAP, TOPIC_LIQUIDATION]] + ) + logs_by_tx = _logs_by_tx(all_logs) + for tx in logs_by_tx.keys(): + yield await classify_logs(logs_by_tx[tx], reserves, w3) + async def get_latest_block_number(base_provider) -> int: latest_block = await base_provider.make_request( "eth_getBlockByNumber", ["latest", False], ) - return hex_to_int(latest_block["result"]["number"]) diff --git a/mev_inspect/inspect_block.py b/mev_inspect/inspect_block.py index db24347..875be1c 100644 --- a/mev_inspect/inspect_block.py +++ b/mev_inspect/inspect_block.py @@ -1,236 +1,94 @@ import logging -from typing import List, Optional +from typing import Any, Dict from sqlalchemy import orm from web3 import Web3 from mev_inspect.arbitrages import get_arbitrages -from mev_inspect.block import create_from_block_number -from mev_inspect.classifiers.trace import TraceClassifier -from mev_inspect.crud.arbitrages import delete_arbitrages_for_blocks, write_arbitrages -from mev_inspect.crud.blocks import delete_blocks, write_blocks -from mev_inspect.crud.liquidations import ( - delete_liquidations_for_blocks, - write_liquidations, -) -from mev_inspect.crud.miner_payments import ( - delete_miner_payments_for_blocks, - write_miner_payments, -) -from mev_inspect.crud.nft_trades import delete_nft_trades_for_blocks, write_nft_trades -from mev_inspect.crud.punks import ( - delete_punk_bid_acceptances_for_blocks, - delete_punk_bids_for_blocks, - delete_punk_snipes_for_blocks, - write_punk_bid_acceptances, - write_punk_bids, - write_punk_snipes, -) -from mev_inspect.crud.sandwiches import delete_sandwiches_for_blocks, write_sandwiches -from mev_inspect.crud.summary import update_summary_for_block_range -from mev_inspect.crud.swaps import delete_swaps_for_blocks, write_swaps -from mev_inspect.crud.traces import ( - delete_classified_traces_for_blocks, - write_classified_traces, -) -from mev_inspect.crud.transfers import delete_transfers_for_blocks, write_transfers -from mev_inspect.liquidations import get_liquidations -from mev_inspect.miner_payments import get_miner_payments -from mev_inspect.nft_trades import get_nft_trades -from mev_inspect.punks import get_punk_bid_acceptances, get_punk_bids, get_punk_snipes -from mev_inspect.sandwiches import get_sandwiches -from mev_inspect.schemas.arbitrages import Arbitrage -from mev_inspect.schemas.blocks import Block -from mev_inspect.schemas.liquidations import Liquidation -from mev_inspect.schemas.miner_payments import MinerPayment -from mev_inspect.schemas.nft_trades import NftTrade -from mev_inspect.schemas.punk_accept_bid import PunkBidAcceptance -from mev_inspect.schemas.punk_bid import PunkBid -from mev_inspect.schemas.punk_snipe import PunkSnipe -from mev_inspect.schemas.sandwiches import Sandwich -from mev_inspect.schemas.swaps import Swap -from mev_inspect.schemas.traces import ClassifiedTrace -from mev_inspect.schemas.transfers import Transfer -from mev_inspect.swaps import get_swaps -from mev_inspect.transfers import get_transfers +from mev_inspect.block import get_classified_traces_from_events logger = logging.getLogger(__name__) +TRAILING_ZEROS = "000000000000000000000000" + async def inspect_block( inspect_db_session: orm.Session, w3: Web3, - trace_classifier: TraceClassifier, block_number: int, - trace_db_session: Optional[orm.Session], - should_write_classified_traces: bool = True, ): await inspect_many_blocks( inspect_db_session, w3, - trace_classifier, block_number, block_number + 1, - trace_db_session, - should_write_classified_traces, ) async def inspect_many_blocks( inspect_db_session: orm.Session, w3: Web3, - trace_classifier: TraceClassifier, after_block_number: int, before_block_number: int, - trace_db_session: Optional[orm.Session], - should_write_classified_traces: bool = True, ): - all_blocks: List[Block] = [] - all_classified_traces: List[ClassifiedTrace] = [] - all_transfers: List[Transfer] = [] - all_swaps: List[Swap] = [] - all_arbitrages: List[Arbitrage] = [] - all_liquidations: List[Liquidation] = [] - all_sandwiches: List[Sandwich] = [] - - all_punk_bids: List[PunkBid] = [] - all_punk_bid_acceptances: List[PunkBidAcceptance] = [] - all_punk_snipes: List[PunkSnipe] = [] - - all_miner_payments: List[MinerPayment] = [] - - all_nft_trades: List[NftTrade] = [] - - for block_number in range(after_block_number, before_block_number): - block = await create_from_block_number( - w3, - block_number, - trace_db_session, - ) - - logger.info(f"Block: {block_number} -- Total traces: {len(block.traces)}") - - total_transactions = len( - set( - t.transaction_hash - for t in block.traces - if t.transaction_hash is not None - ) - ) - logger.info( - f"Block: {block_number} -- Total transactions: {total_transactions}" - ) - - classified_traces = trace_classifier.classify(block.traces) - logger.info( - f"Block: {block_number} -- Returned {len(classified_traces)} classified traces" - ) - - transfers = get_transfers(classified_traces) - logger.info(f"Block: {block_number} -- Found {len(transfers)} transfers") - - swaps = get_swaps(classified_traces) - logger.info(f"Block: {block_number} -- Found {len(swaps)} swaps") + count = 0 + arbitrages_payload = [] + liquidations_payload = [] + profits = [] + async for swaps, liquidations in get_classified_traces_from_events( + w3, after_block_number, before_block_number + ): arbitrages = get_arbitrages(swaps) - logger.info(f"Block: {block_number} -- Found {len(arbitrages)} arbitrages") - liquidations = get_liquidations(classified_traces) - logger.info(f"Block: {block_number} -- Found {len(liquidations)} liquidations") + if len(arbitrages) > 0: + for arb in arbitrages: + arb_payload: Dict[str, Any] = dict() + arb_payload["block_number"] = arb.block_number + arb_payload["transaction"] = arb.transaction_hash + arb_payload["account"] = arb.account_address + arb_payload["profit_amt"] = arb.profit_amount + arb_payload["token"] = arb.profit_token_address + arbitrages_payload.append(arb_payload) + count += 1 + profits.append( + [ + arb.block_number, + arb.transaction_hash, + "", + 0, + str(arb.profit_token_address).replace(TRAILING_ZEROS, ""), + arb.profit_amount, + ] + ) - sandwiches = get_sandwiches(swaps) - logger.info(f"Block: {block_number} -- Found {len(sandwiches)} sandwiches") + if len(liquidations) > 0: + for liq in liquidations: + liq_payload: Dict[str, Any] = dict() + liq_payload["block_number"] = liq.block_number + liq_payload["transaction"] = liq.transaction_hash + liq_payload["liquidator"] = liq.liquidator_user + liq_payload["purchase_addr"] = liq.debt_token_address + liq_payload["receive_addr"] = liq.received_token_address + liq_payload["purchase_amount"] = liq.debt_purchase_amount + liq_payload["receive_amount"] = liq.received_amount + liquidations_payload.append(liq_payload) + count += 1 + profits.append( + [ + liq.block_number, + liq.transaction_hash, + str(liq.debt_token_address).replace(TRAILING_ZEROS, ""), + liq.debt_purchase_amount, + str(liq.received_amount).replace(TRAILING_ZEROS, ""), + liq.received_amount, + ] + ) - punk_bids = get_punk_bids(classified_traces) - punk_bid_acceptances = get_punk_bid_acceptances(classified_traces) - punk_snipes = get_punk_snipes(punk_bids, punk_bid_acceptances) - logger.info(f"Block: {block_number} -- Found {len(punk_snipes)} punk snipes") - - nft_trades = get_nft_trades(classified_traces) - logger.info(f"Block: {block_number} -- Found {len(nft_trades)} nft trades") - - miner_payments = get_miner_payments( - block.miner, block.base_fee_per_gas, classified_traces, block.receipts - ) - - all_blocks.append(block) - all_classified_traces.extend(classified_traces) - all_transfers.extend(transfers) - all_swaps.extend(swaps) - all_arbitrages.extend(arbitrages) - all_liquidations.extend(liquidations) - all_sandwiches.extend(sandwiches) - - all_punk_bids.extend(punk_bids) - all_punk_bid_acceptances.extend(punk_bid_acceptances) - all_punk_snipes.extend(punk_snipes) - - all_nft_trades.extend(nft_trades) - - all_miner_payments.extend(miner_payments) - - logger.info("Writing data") - delete_blocks(inspect_db_session, after_block_number, before_block_number) - write_blocks(inspect_db_session, all_blocks) - - if should_write_classified_traces: - delete_classified_traces_for_blocks( - inspect_db_session, after_block_number, before_block_number - ) - write_classified_traces(inspect_db_session, all_classified_traces) - - delete_transfers_for_blocks( - inspect_db_session, after_block_number, before_block_number - ) - write_transfers(inspect_db_session, all_transfers) - - delete_swaps_for_blocks(inspect_db_session, after_block_number, before_block_number) - write_swaps(inspect_db_session, all_swaps) - - delete_arbitrages_for_blocks( - inspect_db_session, after_block_number, before_block_number - ) - write_arbitrages(inspect_db_session, all_arbitrages) - - delete_liquidations_for_blocks( - inspect_db_session, after_block_number, before_block_number - ) - write_liquidations(inspect_db_session, all_liquidations) - - delete_sandwiches_for_blocks( - inspect_db_session, after_block_number, before_block_number - ) - write_sandwiches(inspect_db_session, all_sandwiches) - - delete_punk_bids_for_blocks( - inspect_db_session, after_block_number, before_block_number - ) - write_punk_bids(inspect_db_session, all_punk_bids) - - delete_punk_bid_acceptances_for_blocks( - inspect_db_session, after_block_number, before_block_number - ) - write_punk_bid_acceptances(inspect_db_session, all_punk_bid_acceptances) - - delete_punk_snipes_for_blocks( - inspect_db_session, after_block_number, before_block_number - ) - write_punk_snipes(inspect_db_session, all_punk_snipes) - - delete_nft_trades_for_blocks( - inspect_db_session, after_block_number, before_block_number - ) - write_nft_trades(inspect_db_session, all_nft_trades) - - delete_miner_payments_for_blocks( - inspect_db_session, after_block_number, before_block_number - ) - write_miner_payments(inspect_db_session, all_miner_payments) - - update_summary_for_block_range( - inspect_db_session, - after_block_number, - before_block_number, - ) - - logger.info("Done writing") + if count > 0: + print("writing profits of {0} mev transactions".format(count)) + # @TODO: Write profits to DB + print(inspect_db_session.info) + arbitrages_payload = [] + liquidations_payload = [] + count = 0 diff --git a/mev_inspect/inspector.py b/mev_inspect/inspector.py index 7878527..2d21c03 100644 --- a/mev_inspect/inspector.py +++ b/mev_inspect/inspector.py @@ -51,20 +51,16 @@ class MEVInspector: self, inspect_db_session: orm.Session, block: int, - trace_db_session: Optional[orm.Session], ): return await inspect_block( inspect_db_session, self.w3, - self.trace_classifier, block, - trace_db_session=trace_db_session, ) async def inspect_many_blocks( self, inspect_db_session: orm.Session, - trace_db_session: Optional[orm.Session], after_block: int, before_block: int, block_batch_size: int = 10, @@ -77,8 +73,7 @@ class MEVInspector: tasks.append( asyncio.ensure_future( self.safe_inspect_many_blocks( - inspect_db_session, - trace_db_session, + inspect_db_session=inspect_db_session, after_block_number=batch_after_block, before_block_number=batch_before_block, ) @@ -97,7 +92,6 @@ class MEVInspector: async def safe_inspect_many_blocks( self, inspect_db_session: orm.Session, - trace_db_session: Optional[orm.Session], after_block_number: int, before_block_number: int, ): @@ -105,8 +99,6 @@ class MEVInspector: return await inspect_many_blocks( inspect_db_session, self.w3, - self.trace_classifier, after_block_number, before_block_number, - trace_db_session=trace_db_session, ) diff --git a/mev_inspect/queue/tasks.py b/mev_inspect/queue/tasks.py index 9e45b4d..4e88b48 100644 --- a/mev_inspect/queue/tasks.py +++ b/mev_inspect/queue/tasks.py @@ -8,7 +8,6 @@ from .middleware import DbMiddleware, InspectorMiddleware logger = logging.getLogger(__name__) - HIGH_PRIORITY_QUEUE = "high" LOW_PRIORITY_QUEUE = "low" @@ -21,15 +20,13 @@ def inspect_many_blocks_task( before_block: int, ): with _session_scope(DbMiddleware.get_inspect_sessionmaker()) as inspect_db_session: - with _session_scope(DbMiddleware.get_trace_sessionmaker()) as trace_db_session: - asyncio.run( - InspectorMiddleware.get_inspector().inspect_many_blocks( - inspect_db_session=inspect_db_session, - trace_db_session=trace_db_session, - after_block=after_block, - before_block=before_block, - ) + asyncio.run( + InspectorMiddleware.get_inspector().inspect_many_blocks( + inspect_db_session=inspect_db_session, + after_block=after_block, + before_block=before_block, ) + ) def realtime_export_task(block_number: int): diff --git a/mev_inspect/schemas/liquidations.py b/mev_inspect/schemas/liquidations.py index 8ef4faa..10b667d 100644 --- a/mev_inspect/schemas/liquidations.py +++ b/mev_inspect/schemas/liquidations.py @@ -12,7 +12,7 @@ class Liquidation(BaseModel): debt_purchase_amount: int received_amount: int received_token_address: Optional[str] - protocol: Protocol + protocol: Optional[Protocol] transaction_hash: str trace_address: List[int] block_number: str diff --git a/mev_inspect/schemas/swaps.py b/mev_inspect/schemas/swaps.py index f15bc20..eb3fb25 100644 --- a/mev_inspect/schemas/swaps.py +++ b/mev_inspect/schemas/swaps.py @@ -8,7 +8,7 @@ from mev_inspect.schemas.traces import Protocol class Swap(BaseModel): abi_name: str transaction_hash: str - transaction_position: int + transaction_position: Optional[int] block_number: int trace_address: List[int] contract_address: str @@ -18,5 +18,5 @@ class Swap(BaseModel): token_in_amount: int token_out_address: str token_out_amount: int - protocol: Protocol + protocol: Optional[Protocol] error: Optional[str]