feat: add polygon logs processing - Closes #issue-8 (#11)

* feat: add polygon logs processing Closes #issue-8

Signed-off-by: Arthurim <arthurbdauphine@gmail.com>

* fix: remove unused variables

Signed-off-by: Arthurim <arthurbdauphine@gmail.com>

* fix: add inspect db

Signed-off-by: Arthurim <arthurbdauphine@gmail.com>

* fix: remove parenthesis

Signed-off-by: Arthurim <arthurbdauphine@gmail.com>

Signed-off-by: Arthurim <arthurbdauphine@gmail.com>
This commit is contained in:
Eru Ilúvatar 2022-12-22 10:31:03 +00:00 committed by GitHub
parent b078bc69d0
commit eaf2698d7e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 210 additions and 235 deletions

6
cli.py
View File

@ -38,13 +38,11 @@ def cli():
@coro @coro
async def inspect_block_command(block_number: int, rpc: str): async def inspect_block_command(block_number: int, rpc: str):
inspect_db_session = get_inspect_session() inspect_db_session = get_inspect_session()
trace_db_session = get_trace_session()
inspector = MEVInspector(rpc) inspector = MEVInspector(rpc)
await inspector.inspect_single_block( await inspector.inspect_single_block(
inspect_db_session=inspect_db_session, inspect_db_session=inspect_db_session,
trace_db_session=trace_db_session,
block=block_number, block=block_number,
) )
@ -87,7 +85,6 @@ async def inspect_many_blocks_command(
request_timeout: int, request_timeout: int,
): ):
inspect_db_session = get_inspect_session() inspect_db_session = get_inspect_session()
trace_db_session = get_trace_session()
inspector = MEVInspector( inspector = MEVInspector(
rpc, rpc,
@ -96,7 +93,6 @@ async def inspect_many_blocks_command(
) )
await inspector.inspect_many_blocks( await inspector.inspect_many_blocks(
inspect_db_session=inspect_db_session, inspect_db_session=inspect_db_session,
trace_db_session=trace_db_session,
after_block=after_block, after_block=after_block,
before_block=before_block, before_block=before_block,
) )
@ -114,7 +110,7 @@ def enqueue_block_list_command():
for block_string in fileinput.input(): for block_string in fileinput.input():
block = int(block_string) block = int(block_string)
logger.info(f"Sending {block} to {block+1}") logger.info(f"Sending {block} to {block + 1}")
inspect_many_blocks_actor.send(block, block + 1) inspect_many_blocks_actor.send(block, block + 1)

View File

@ -11,7 +11,7 @@ from mev_inspect.crud.latest_block_update import (
find_latest_block_update, find_latest_block_update,
update_latest_block, update_latest_block,
) )
from mev_inspect.db import get_inspect_session, get_trace_session from mev_inspect.db import get_inspect_session
from mev_inspect.inspector import MEVInspector from mev_inspect.inspector import MEVInspector
from mev_inspect.provider import get_base_provider from mev_inspect.provider import get_base_provider
from mev_inspect.queue.broker import connect_broker from mev_inspect.queue.broker import connect_broker
@ -42,7 +42,6 @@ async def run():
killer = GracefulKiller() killer = GracefulKiller()
inspect_db_session = get_inspect_session() inspect_db_session = get_inspect_session()
trace_db_session = get_trace_session()
broker = connect_broker() broker = connect_broker()
export_actor = dramatiq.actor( export_actor = dramatiq.actor(
@ -59,7 +58,6 @@ async def run():
await inspect_next_block( await inspect_next_block(
inspector, inspector,
inspect_db_session, inspect_db_session,
trace_db_session,
base_provider, base_provider,
healthcheck_url, healthcheck_url,
export_actor, export_actor,
@ -71,7 +69,6 @@ async def run():
async def inspect_next_block( async def inspect_next_block(
inspector: MEVInspector, inspector: MEVInspector,
inspect_db_session, inspect_db_session,
trace_db_session,
base_provider, base_provider,
healthcheck_url, healthcheck_url,
export_actor, export_actor,
@ -94,7 +91,6 @@ async def inspect_next_block(
await inspector.inspect_single_block( await inspector.inspect_single_block(
inspect_db_session=inspect_db_session, inspect_db_session=inspect_db_session,
trace_db_session=trace_db_session,
block=block_number, block=block_number,
) )

View File

@ -1,25 +1,161 @@
import asyncio import asyncio
import logging import logging
from typing import List, Optional from typing import Dict, List, Optional, Tuple
from sqlalchemy import orm from sqlalchemy import orm
from web3 import Web3 from web3 import Web3
from mev_inspect.fees import fetch_base_fee_per_gas from mev_inspect.fees import fetch_base_fee_per_gas
from mev_inspect.schemas.blocks import Block from mev_inspect.schemas.blocks import Block
from mev_inspect.schemas.liquidations import Liquidation
from mev_inspect.schemas.receipts import Receipt from mev_inspect.schemas.receipts import Receipt
from mev_inspect.schemas.swaps import Swap
from mev_inspect.schemas.traces import Trace, TraceType from mev_inspect.schemas.traces import Trace, TraceType
from mev_inspect.utils import hex_to_int from mev_inspect.utils import hex_to_int
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
TOPIC_SWAP = "0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822"
TOPIC_LIQUIDATION = "0xe413a321e8681d831f4dbccbca790d2952b56f977908e45be37335533e005286"
UNI_TOKEN_0 = "0x0dfe1681"
UNI_TOKEN_1 = "0xd21220a7"
async def _get_logs_for_topics(base_provider, after_block, before_block, topics):
logs = await base_provider.make_request(
"eth_getLogs",
[
{
"fromBlock": hex(after_block),
"toBlock": hex(before_block),
"topics": topics,
}
],
)
return logs["result"]
def _logs_by_tx(logs):
logs_by_tx = dict()
for log in logs:
transaction_hash = log["transactionHash"]
if transaction_hash in logs_by_tx.keys():
logs_by_tx[transaction_hash].append(log)
else:
logs_by_tx[transaction_hash] = [log]
return logs_by_tx
def get_swap(data):
data = data[2:]
return (
int(data[0:64], base=16),
int(data[64:128], base=16),
int(data[128:192], base=16),
int(data[192:256], base=16),
)
def get_liquidation(data):
data = data[2:]
return (
int(data[0:64], base=16),
int(data[64:128], base=16),
"0x" + data[128 + 24 : 168 + 24],
)
async def classify_logs(logs, pool_reserves, w3):
cswaps = []
cliquidations = []
for log in logs:
topic = log["topics"][0]
if topic in [TOPIC_SWAP, TOPIC_LIQUIDATION]:
block = int(log["blockNumber"], 16)
transaction_hash = log["transactionHash"]
trace_address = [int(log["logIndex"], 16)]
first_token = "0x" + log["topics"][1][26:]
second_token = "0x" + log["topics"][2][26:]
if topic == TOPIC_SWAP:
pool_address = log["address"]
if pool_address in pool_reserves:
token0, token1 = pool_reserves[pool_address]
else:
addr = Web3.toChecksumAddress(pool_address)
token0, token1 = await asyncio.gather(
w3.eth.call({"to": addr, "data": UNI_TOKEN_0}),
w3.eth.call({"to": addr, "data": UNI_TOKEN_1}),
)
token0 = w3.toHex(token0)
token1 = w3.toHex(token1)
pool_reserves[pool_address] = (token0, token1)
am0in, am1in, am0out, am1out = get_swap(log["data"])
swap = Swap(
abi_name="uniswap_v2",
transaction_hash=transaction_hash,
block_number=block,
trace_address=trace_address,
contract_address=pool_address,
from_address=first_token,
to_address=second_token,
token_in_address=token0 if am0in != 0 else token1,
token_in_amount=am0in if am0in != 0 else am1in,
token_out_address=token1 if am1out != 0 else token0,
token_out_amount=am0out if am0out != 0 else am1out,
protocol=None,
error=None,
)
cswaps.append(swap)
elif topic == TOPIC_LIQUIDATION:
block = str(block)
am_debt, am_recv, addr_usr = get_liquidation(log["data"])
liquidation = Liquidation(
liquidated_user="0x" + log["topics"][3][26:],
liquidator_user=addr_usr,
debt_token_address=second_token,
debt_purchase_amount=am_debt,
received_amount=am_recv,
received_token_address=first_token,
protocol=None,
transaction_hash=transaction_hash,
trace_address=trace_address,
block_number=block,
error=None,
)
cliquidations.append(liquidation)
return cswaps, cliquidations
reserves: Dict[str, Tuple[str, str]] = dict()
async def get_classified_traces_from_events(
w3: Web3, after_block: int, before_block: int
):
base_provider = w3.provider
start = after_block
stride = 300
while start < before_block:
begin = start
end = start + stride if (start + stride) < before_block else before_block - 1
start += stride
print("fetching from node...", begin, end, flush=True)
all_logs = await _get_logs_for_topics(
base_provider, begin, end, [[TOPIC_SWAP, TOPIC_LIQUIDATION]]
)
logs_by_tx = _logs_by_tx(all_logs)
for tx in logs_by_tx.keys():
yield await classify_logs(logs_by_tx[tx], reserves, w3)
async def get_latest_block_number(base_provider) -> int: async def get_latest_block_number(base_provider) -> int:
latest_block = await base_provider.make_request( latest_block = await base_provider.make_request(
"eth_getBlockByNumber", "eth_getBlockByNumber",
["latest", False], ["latest", False],
) )
return hex_to_int(latest_block["result"]["number"]) return hex_to_int(latest_block["result"]["number"])

View File

@ -1,236 +1,94 @@
import logging import logging
from typing import List, Optional from typing import Any, Dict
from sqlalchemy import orm from sqlalchemy import orm
from web3 import Web3 from web3 import Web3
from mev_inspect.arbitrages import get_arbitrages from mev_inspect.arbitrages import get_arbitrages
from mev_inspect.block import create_from_block_number from mev_inspect.block import get_classified_traces_from_events
from mev_inspect.classifiers.trace import TraceClassifier
from mev_inspect.crud.arbitrages import delete_arbitrages_for_blocks, write_arbitrages
from mev_inspect.crud.blocks import delete_blocks, write_blocks
from mev_inspect.crud.liquidations import (
delete_liquidations_for_blocks,
write_liquidations,
)
from mev_inspect.crud.miner_payments import (
delete_miner_payments_for_blocks,
write_miner_payments,
)
from mev_inspect.crud.nft_trades import delete_nft_trades_for_blocks, write_nft_trades
from mev_inspect.crud.punks import (
delete_punk_bid_acceptances_for_blocks,
delete_punk_bids_for_blocks,
delete_punk_snipes_for_blocks,
write_punk_bid_acceptances,
write_punk_bids,
write_punk_snipes,
)
from mev_inspect.crud.sandwiches import delete_sandwiches_for_blocks, write_sandwiches
from mev_inspect.crud.summary import update_summary_for_block_range
from mev_inspect.crud.swaps import delete_swaps_for_blocks, write_swaps
from mev_inspect.crud.traces import (
delete_classified_traces_for_blocks,
write_classified_traces,
)
from mev_inspect.crud.transfers import delete_transfers_for_blocks, write_transfers
from mev_inspect.liquidations import get_liquidations
from mev_inspect.miner_payments import get_miner_payments
from mev_inspect.nft_trades import get_nft_trades
from mev_inspect.punks import get_punk_bid_acceptances, get_punk_bids, get_punk_snipes
from mev_inspect.sandwiches import get_sandwiches
from mev_inspect.schemas.arbitrages import Arbitrage
from mev_inspect.schemas.blocks import Block
from mev_inspect.schemas.liquidations import Liquidation
from mev_inspect.schemas.miner_payments import MinerPayment
from mev_inspect.schemas.nft_trades import NftTrade
from mev_inspect.schemas.punk_accept_bid import PunkBidAcceptance
from mev_inspect.schemas.punk_bid import PunkBid
from mev_inspect.schemas.punk_snipe import PunkSnipe
from mev_inspect.schemas.sandwiches import Sandwich
from mev_inspect.schemas.swaps import Swap
from mev_inspect.schemas.traces import ClassifiedTrace
from mev_inspect.schemas.transfers import Transfer
from mev_inspect.swaps import get_swaps
from mev_inspect.transfers import get_transfers
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
TRAILING_ZEROS = "000000000000000000000000"
async def inspect_block( async def inspect_block(
inspect_db_session: orm.Session, inspect_db_session: orm.Session,
w3: Web3, w3: Web3,
trace_classifier: TraceClassifier,
block_number: int, block_number: int,
trace_db_session: Optional[orm.Session],
should_write_classified_traces: bool = True,
): ):
await inspect_many_blocks( await inspect_many_blocks(
inspect_db_session, inspect_db_session,
w3, w3,
trace_classifier,
block_number, block_number,
block_number + 1, block_number + 1,
trace_db_session,
should_write_classified_traces,
) )
async def inspect_many_blocks( async def inspect_many_blocks(
inspect_db_session: orm.Session, inspect_db_session: orm.Session,
w3: Web3, w3: Web3,
trace_classifier: TraceClassifier,
after_block_number: int, after_block_number: int,
before_block_number: int, before_block_number: int,
trace_db_session: Optional[orm.Session],
should_write_classified_traces: bool = True,
): ):
all_blocks: List[Block] = [] count = 0
all_classified_traces: List[ClassifiedTrace] = [] arbitrages_payload = []
all_transfers: List[Transfer] = [] liquidations_payload = []
all_swaps: List[Swap] = []
all_arbitrages: List[Arbitrage] = []
all_liquidations: List[Liquidation] = []
all_sandwiches: List[Sandwich] = []
all_punk_bids: List[PunkBid] = []
all_punk_bid_acceptances: List[PunkBidAcceptance] = []
all_punk_snipes: List[PunkSnipe] = []
all_miner_payments: List[MinerPayment] = []
all_nft_trades: List[NftTrade] = []
for block_number in range(after_block_number, before_block_number):
block = await create_from_block_number(
w3,
block_number,
trace_db_session,
)
logger.info(f"Block: {block_number} -- Total traces: {len(block.traces)}")
total_transactions = len(
set(
t.transaction_hash
for t in block.traces
if t.transaction_hash is not None
)
)
logger.info(
f"Block: {block_number} -- Total transactions: {total_transactions}"
)
classified_traces = trace_classifier.classify(block.traces)
logger.info(
f"Block: {block_number} -- Returned {len(classified_traces)} classified traces"
)
transfers = get_transfers(classified_traces)
logger.info(f"Block: {block_number} -- Found {len(transfers)} transfers")
swaps = get_swaps(classified_traces)
logger.info(f"Block: {block_number} -- Found {len(swaps)} swaps")
profits = []
async for swaps, liquidations in get_classified_traces_from_events(
w3, after_block_number, before_block_number
):
arbitrages = get_arbitrages(swaps) arbitrages = get_arbitrages(swaps)
logger.info(f"Block: {block_number} -- Found {len(arbitrages)} arbitrages")
liquidations = get_liquidations(classified_traces) if len(arbitrages) > 0:
logger.info(f"Block: {block_number} -- Found {len(liquidations)} liquidations") for arb in arbitrages:
arb_payload: Dict[str, Any] = dict()
arb_payload["block_number"] = arb.block_number
arb_payload["transaction"] = arb.transaction_hash
arb_payload["account"] = arb.account_address
arb_payload["profit_amt"] = arb.profit_amount
arb_payload["token"] = arb.profit_token_address
arbitrages_payload.append(arb_payload)
count += 1
profits.append(
[
arb.block_number,
arb.transaction_hash,
"",
0,
str(arb.profit_token_address).replace(TRAILING_ZEROS, ""),
arb.profit_amount,
]
)
sandwiches = get_sandwiches(swaps) if len(liquidations) > 0:
logger.info(f"Block: {block_number} -- Found {len(sandwiches)} sandwiches") for liq in liquidations:
liq_payload: Dict[str, Any] = dict()
liq_payload["block_number"] = liq.block_number
liq_payload["transaction"] = liq.transaction_hash
liq_payload["liquidator"] = liq.liquidator_user
liq_payload["purchase_addr"] = liq.debt_token_address
liq_payload["receive_addr"] = liq.received_token_address
liq_payload["purchase_amount"] = liq.debt_purchase_amount
liq_payload["receive_amount"] = liq.received_amount
liquidations_payload.append(liq_payload)
count += 1
profits.append(
[
liq.block_number,
liq.transaction_hash,
str(liq.debt_token_address).replace(TRAILING_ZEROS, ""),
liq.debt_purchase_amount,
str(liq.received_amount).replace(TRAILING_ZEROS, ""),
liq.received_amount,
]
)
punk_bids = get_punk_bids(classified_traces) if count > 0:
punk_bid_acceptances = get_punk_bid_acceptances(classified_traces) print("writing profits of {0} mev transactions".format(count))
punk_snipes = get_punk_snipes(punk_bids, punk_bid_acceptances) # @TODO: Write profits to DB
logger.info(f"Block: {block_number} -- Found {len(punk_snipes)} punk snipes") print(inspect_db_session.info)
arbitrages_payload = []
nft_trades = get_nft_trades(classified_traces) liquidations_payload = []
logger.info(f"Block: {block_number} -- Found {len(nft_trades)} nft trades") count = 0
miner_payments = get_miner_payments(
block.miner, block.base_fee_per_gas, classified_traces, block.receipts
)
all_blocks.append(block)
all_classified_traces.extend(classified_traces)
all_transfers.extend(transfers)
all_swaps.extend(swaps)
all_arbitrages.extend(arbitrages)
all_liquidations.extend(liquidations)
all_sandwiches.extend(sandwiches)
all_punk_bids.extend(punk_bids)
all_punk_bid_acceptances.extend(punk_bid_acceptances)
all_punk_snipes.extend(punk_snipes)
all_nft_trades.extend(nft_trades)
all_miner_payments.extend(miner_payments)
logger.info("Writing data")
delete_blocks(inspect_db_session, after_block_number, before_block_number)
write_blocks(inspect_db_session, all_blocks)
if should_write_classified_traces:
delete_classified_traces_for_blocks(
inspect_db_session, after_block_number, before_block_number
)
write_classified_traces(inspect_db_session, all_classified_traces)
delete_transfers_for_blocks(
inspect_db_session, after_block_number, before_block_number
)
write_transfers(inspect_db_session, all_transfers)
delete_swaps_for_blocks(inspect_db_session, after_block_number, before_block_number)
write_swaps(inspect_db_session, all_swaps)
delete_arbitrages_for_blocks(
inspect_db_session, after_block_number, before_block_number
)
write_arbitrages(inspect_db_session, all_arbitrages)
delete_liquidations_for_blocks(
inspect_db_session, after_block_number, before_block_number
)
write_liquidations(inspect_db_session, all_liquidations)
delete_sandwiches_for_blocks(
inspect_db_session, after_block_number, before_block_number
)
write_sandwiches(inspect_db_session, all_sandwiches)
delete_punk_bids_for_blocks(
inspect_db_session, after_block_number, before_block_number
)
write_punk_bids(inspect_db_session, all_punk_bids)
delete_punk_bid_acceptances_for_blocks(
inspect_db_session, after_block_number, before_block_number
)
write_punk_bid_acceptances(inspect_db_session, all_punk_bid_acceptances)
delete_punk_snipes_for_blocks(
inspect_db_session, after_block_number, before_block_number
)
write_punk_snipes(inspect_db_session, all_punk_snipes)
delete_nft_trades_for_blocks(
inspect_db_session, after_block_number, before_block_number
)
write_nft_trades(inspect_db_session, all_nft_trades)
delete_miner_payments_for_blocks(
inspect_db_session, after_block_number, before_block_number
)
write_miner_payments(inspect_db_session, all_miner_payments)
update_summary_for_block_range(
inspect_db_session,
after_block_number,
before_block_number,
)
logger.info("Done writing")

View File

@ -51,20 +51,16 @@ class MEVInspector:
self, self,
inspect_db_session: orm.Session, inspect_db_session: orm.Session,
block: int, block: int,
trace_db_session: Optional[orm.Session],
): ):
return await inspect_block( return await inspect_block(
inspect_db_session, inspect_db_session,
self.w3, self.w3,
self.trace_classifier,
block, block,
trace_db_session=trace_db_session,
) )
async def inspect_many_blocks( async def inspect_many_blocks(
self, self,
inspect_db_session: orm.Session, inspect_db_session: orm.Session,
trace_db_session: Optional[orm.Session],
after_block: int, after_block: int,
before_block: int, before_block: int,
block_batch_size: int = 10, block_batch_size: int = 10,
@ -77,8 +73,7 @@ class MEVInspector:
tasks.append( tasks.append(
asyncio.ensure_future( asyncio.ensure_future(
self.safe_inspect_many_blocks( self.safe_inspect_many_blocks(
inspect_db_session, inspect_db_session=inspect_db_session,
trace_db_session,
after_block_number=batch_after_block, after_block_number=batch_after_block,
before_block_number=batch_before_block, before_block_number=batch_before_block,
) )
@ -97,7 +92,6 @@ class MEVInspector:
async def safe_inspect_many_blocks( async def safe_inspect_many_blocks(
self, self,
inspect_db_session: orm.Session, inspect_db_session: orm.Session,
trace_db_session: Optional[orm.Session],
after_block_number: int, after_block_number: int,
before_block_number: int, before_block_number: int,
): ):
@ -105,8 +99,6 @@ class MEVInspector:
return await inspect_many_blocks( return await inspect_many_blocks(
inspect_db_session, inspect_db_session,
self.w3, self.w3,
self.trace_classifier,
after_block_number, after_block_number,
before_block_number, before_block_number,
trace_db_session=trace_db_session,
) )

View File

@ -8,7 +8,6 @@ from .middleware import DbMiddleware, InspectorMiddleware
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
HIGH_PRIORITY_QUEUE = "high" HIGH_PRIORITY_QUEUE = "high"
LOW_PRIORITY_QUEUE = "low" LOW_PRIORITY_QUEUE = "low"
@ -21,15 +20,13 @@ def inspect_many_blocks_task(
before_block: int, before_block: int,
): ):
with _session_scope(DbMiddleware.get_inspect_sessionmaker()) as inspect_db_session: with _session_scope(DbMiddleware.get_inspect_sessionmaker()) as inspect_db_session:
with _session_scope(DbMiddleware.get_trace_sessionmaker()) as trace_db_session: asyncio.run(
asyncio.run( InspectorMiddleware.get_inspector().inspect_many_blocks(
InspectorMiddleware.get_inspector().inspect_many_blocks( inspect_db_session=inspect_db_session,
inspect_db_session=inspect_db_session, after_block=after_block,
trace_db_session=trace_db_session, before_block=before_block,
after_block=after_block,
before_block=before_block,
)
) )
)
def realtime_export_task(block_number: int): def realtime_export_task(block_number: int):

View File

@ -12,7 +12,7 @@ class Liquidation(BaseModel):
debt_purchase_amount: int debt_purchase_amount: int
received_amount: int received_amount: int
received_token_address: Optional[str] received_token_address: Optional[str]
protocol: Protocol protocol: Optional[Protocol]
transaction_hash: str transaction_hash: str
trace_address: List[int] trace_address: List[int]
block_number: str block_number: str

View File

@ -8,7 +8,7 @@ from mev_inspect.schemas.traces import Protocol
class Swap(BaseModel): class Swap(BaseModel):
abi_name: str abi_name: str
transaction_hash: str transaction_hash: str
transaction_position: int transaction_position: Optional[int]
block_number: int block_number: int
trace_address: List[int] trace_address: List[int]
contract_address: str contract_address: str
@ -18,5 +18,5 @@ class Swap(BaseModel):
token_in_amount: int token_in_amount: int
token_out_address: str token_out_address: str
token_out_amount: int token_out_amount: int
protocol: Protocol protocol: Optional[Protocol]
error: Optional[str] error: Optional[str]