code refactoring, bug fixes, sql tables

This commit is contained in:
cryptopath 2022-11-22 13:22:50 +00:00
parent fdb61a5eca
commit b2172fe0d0
12 changed files with 388 additions and 110 deletions

2
cli.py
View File

@ -73,7 +73,7 @@ async def fetch_block_command(block_number: int, rpc: str):
"--max-concurrency", "--max-concurrency",
type=int, type=int,
help="maximum number of concurrent connections", help="maximum number of concurrent connections",
default=5, default=1,
) )
@click.option( @click.option(
"--request-timeout", type=int, help="timeout for requests to nodes", default=500 "--request-timeout", type=int, help="timeout for requests to nodes", default=500

View File

@ -5,7 +5,7 @@ from mev_inspect.schemas.arbitrages import Arbitrage
from mev_inspect.schemas.swaps import Swap from mev_inspect.schemas.swaps import Swap
from mev_inspect.utils import equal_within_percent from mev_inspect.utils import equal_within_percent
MAX_TOKEN_AMOUNT_PERCENT_DIFFERENCE = 0.01 MAX_TOKEN_AMOUNT_PERCENT_DIFFERENCE = 0.00001
def get_arbitrages(swaps: List[Swap]) -> List[Arbitrage]: def get_arbitrages(swaps: List[Swap]) -> List[Arbitrage]:
@ -55,7 +55,8 @@ def _get_arbitrages_from_swaps(swaps: List[Swap]) -> List[Arbitrage]:
continue continue
unused_ends = [end for end in ends if end not in used_swaps] unused_ends = [end for end in ends if end not in used_swaps]
route = _get_shortest_route(start, unused_ends, swaps) unused_swaps = [swap for swap in swaps if swap not in used_swaps]
route = _get_shortest_route(start, unused_ends, unused_swaps)
if route is not None: if route is not None:
start_amount = route[0].token_in_amount start_amount = route[0].token_in_amount
@ -81,14 +82,7 @@ def _get_arbitrages_from_swaps(swaps: List[Swap]) -> List[Arbitrage]:
all_arbitrages.append(arb) all_arbitrages.append(arb)
used_swaps.extend(route) used_swaps.extend(route)
if len(all_arbitrages) == 1:
return all_arbitrages return all_arbitrages
else:
return [
arb
for arb in all_arbitrages
if (arb.swaps[0].trace_address < arb.swaps[-1].trace_address)
]
def _get_shortest_route( def _get_shortest_route(

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,17 @@
def get_reserves(db_session):
result = db_session.execute(
"SELECT * FROM reserves"
)
return result
def set_reserves(db_session, values):
db_session.execute(
"""
INSERT INTO reserves
(pool_address, token0, token1)
VALUES
(:pool_address, :token0, :token1)
""",
params = values,
)
db_session.commit()

View File

@ -30,9 +30,10 @@ def get_inspect_database_uri():
def _get_engine(uri: str): def _get_engine(uri: str):
return create_engine( return create_engine(
uri, uri,
executemany_mode="values", executemany_mode="batch",
executemany_values_page_size=10000, executemany_values_page_size=10000,
executemany_batch_page_size=500, executemany_batch_page_size=10000,
) )

View File

@ -1,5 +1,5 @@
import logging import logging
from typing import List, Optional, Any, Dict from typing import List, Optional, Any, Dict, Tuple
from sqlalchemy import orm from sqlalchemy import orm
from web3 import Web3 from web3 import Web3
@ -8,6 +8,7 @@ from mev_inspect.arbitrages import get_arbitrages
from mev_inspect.block import create_from_block_number, get_classified_traces_from_events from mev_inspect.block import create_from_block_number, get_classified_traces_from_events
from mev_inspect.classifiers.trace import TraceClassifier from mev_inspect.classifiers.trace import TraceClassifier
from mev_inspect.crud.arbitrages import delete_arbitrages_for_blocks, write_arbitrages from mev_inspect.crud.arbitrages import delete_arbitrages_for_blocks, write_arbitrages
from mev_inspect.crud.reserves import get_reserves, set_reserves
from mev_inspect.crud.blocks import delete_blocks, write_blocks from mev_inspect.crud.blocks import delete_blocks, write_blocks
from mev_inspect.crud.liquidations import ( from mev_inspect.crud.liquidations import (
delete_liquidations_for_blocks, delete_liquidations_for_blocks,
@ -56,6 +57,10 @@ from mev_inspect.transfers import get_transfers
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
import psycopg2
import pickle
import time
async def inspect_block( async def inspect_block(
inspect_db_session: orm.Session, inspect_db_session: orm.Session,
@ -75,6 +80,8 @@ async def inspect_block(
should_write_classified_traces, should_write_classified_traces,
) )
reserves: Dict[str, Tuple[str, str]] = dict()
async def inspect_many_blocks( async def inspect_many_blocks(
inspect_db_session: orm.Session, inspect_db_session: orm.Session,
@ -86,54 +93,28 @@ async def inspect_many_blocks(
should_write_classified_traces: bool = True, should_write_classified_traces: bool = True,
): ):
count = 0 for row in get_reserves(inspect_db_session).fetchall():
arbitrages_payload = [] reserves[row[0]] = (row[1], row[2])
liquidations_payload = []
async for swaps, liquidations in get_classified_traces_from_events(w3, after_block_number, before_block_number): all_swaps: List[Swap] = []
all_arbitrages: List[Arbitrage] = []
all_liquidations: List[Liquidation] = []
async for swaps, liquidations, new_reserves in get_classified_traces_from_events(w3, after_block_number, before_block_number, reserves):
arbitrages = get_arbitrages(swaps) arbitrages = get_arbitrages(swaps)
if len(new_reserves) > 0:
set_reserves(inspect_db_session, new_reserves)
all_swaps.extend(swaps)
all_arbitrages.extend(arbitrages)
all_liquidations.extend(liquidations)
count += len(arbitrages) start = time.time()
logger.info(f"{count} Found {len(swaps)} swaps and {len(arbitrages)} arbitrages") write_swaps(inspect_db_session, all_swaps)
if len(arbitrages) > 0: write_arbitrages(inspect_db_session, all_arbitrages)
for arb in arbitrages: write_liquidations(inspect_db_session, all_liquidations)
arb_payload: Dict[str, Any] = dict() print("sent swaps: {}, arbitrages: {}, time: {}".format(len(all_swaps), len(all_arbitrages), time.time()-start))
arb_payload['block_number'] = arb.block_number print("inspect complete...", after_block_number, before_block_number, flush=True)
arb_payload['transaction'] = arb.transaction_hash
arb_payload['account'] = arb.account_address
arb_payload['profit_amt'] = arb.profit_amount
arb_payload['token'] = arb.profit_token_address
arbitrages_payload.append(arb_payload)
count += 1
if len(liquidations) > 0:
for liq in liquidations:
liq_payload: Dict[str, Any] = dict()
liq_payload['block_number'] = liq.block_number
liq_payload['transaction'] = liq.transaction_hash
liq_payload['liquidator'] = liq.liquidator_user
liq_payload['purchase_addr'] = liq.debt_token_address
liq_payload['receive_addr'] = liq.received_token_address
liq_payload['purchase_amount'] = liq.debt_purchase_amount
liq_payload['receive_amount'] = liq.received_amount
liquidations_payload.append(liq_payload)
count+=1
if count >= 100:
print("sending to endpoint now")
# resp = requests.post("https://asia-south1-marlin-internal.cloudfunctions.net/mevPolygon/alerts", headers={'Content-type': 'application/json'}, json={"arbitrages": arbitrages_payload, "liquidations": liquidations_payload})
# print("sending to endpoint ", resp.content.decode("utf-8"), flush=True)
arbitrages_payload = []
liquidations_payload = []
count = 0
if count > 0:
print("sending to endpoint now")
# resp = requests.post("https://asia-south1-marlin-internal.cloudfunctions.net/mevPolygon/alerts", headers={'Content-type': 'application/json'}, json={"arbitrages": arbitrages_payload, "liquidations": liquidations_payload})
# print("sending to endpoint ", resp.content.decode("utf-8"), flush=True)
arbitrages_payload = []
liquidations_payload = []
count = 0
# all_blocks: List[Block] = [] # all_blocks: List[Block] = []
# all_classified_traces: List[ClassifiedTrace] = [] # all_classified_traces: List[ClassifiedTrace] = []

View File

@ -67,7 +67,7 @@ class MEVInspector:
trace_db_session: Optional[orm.Session], trace_db_session: Optional[orm.Session],
after_block: int, after_block: int,
before_block: int, before_block: int,
block_batch_size: int = 300, block_batch_size: int = 10000,
): ):
tasks = [] tasks = []
for block_number in range(after_block, before_block, block_batch_size): for block_number in range(after_block, before_block, block_batch_size):

63
sql/arbitrage_swaps.sql Normal file
View File

@ -0,0 +1,63 @@
--
-- PostgreSQL database dump
--
-- Dumped from database version 14.4
-- Dumped by pg_dump version 15.0 (Ubuntu 15.0-1.pgdg22.04+1)
SET statement_timeout = 0;
SET lock_timeout = 0;
SET idle_in_transaction_session_timeout = 0;
SET client_encoding = 'UTF8';
SET standard_conforming_strings = on;
SELECT pg_catalog.set_config('search_path', '', false);
SET check_function_bodies = false;
SET xmloption = content;
SET client_min_messages = warning;
SET row_security = off;
SET default_tablespace = '';
SET default_table_access_method = heap;
--
-- Name: arbitrage_swaps; Type: TABLE; Schema: public; Owner: postgres
--
CREATE TABLE public.arbitrage_swaps (
created_at timestamp without time zone DEFAULT now(),
arbitrage_id character varying(1024) NOT NULL,
swap_transaction_hash character varying(66) NOT NULL,
swap_trace_address integer[] NOT NULL
);
ALTER TABLE public.arbitrage_swaps OWNER TO postgres;
--
-- Name: arbitrage_swaps arbitrage_swaps_pkey; Type: CONSTRAINT; Schema: public; Owner: postgres
--
ALTER TABLE ONLY public.arbitrage_swaps
ADD CONSTRAINT arbitrage_swaps_pkey PRIMARY KEY (arbitrage_id, swap_transaction_hash, swap_trace_address);
--
-- Name: arbitrage_swaps_swaps_idx; Type: INDEX; Schema: public; Owner: postgres
--
CREATE INDEX arbitrage_swaps_swaps_idx ON public.arbitrage_swaps USING btree (swap_transaction_hash, swap_trace_address);
--
-- Name: arbitrage_swaps arbitrage_swaps_arbitrage_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: postgres
--
ALTER TABLE ONLY public.arbitrage_swaps
ADD CONSTRAINT arbitrage_swaps_arbitrage_id_fkey FOREIGN KEY (arbitrage_id) REFERENCES public.arbitrages(id) ON DELETE CASCADE;
--
-- PostgreSQL database dump complete
--

55
sql/arbitrages.sql Normal file
View File

@ -0,0 +1,55 @@
--
-- PostgreSQL database dump
--
-- Dumped from database version 14.4
-- Dumped by pg_dump version 15.0 (Ubuntu 15.0-1.pgdg22.04+1)
SET statement_timeout = 0;
SET lock_timeout = 0;
SET idle_in_transaction_session_timeout = 0;
SET client_encoding = 'UTF8';
SET standard_conforming_strings = on;
SELECT pg_catalog.set_config('search_path', '', false);
SET check_function_bodies = false;
SET xmloption = content;
SET client_min_messages = warning;
SET row_security = off;
SET default_tablespace = '';
SET default_table_access_method = heap;
--
-- Name: arbitrages; Type: TABLE; Schema: public; Owner: postgres
--
CREATE TABLE public.arbitrages (
id character varying(256) NOT NULL,
created_at timestamp without time zone DEFAULT now(),
account_address character varying(256) NOT NULL,
profit_token_address character varying(256) NOT NULL,
block_number numeric NOT NULL,
transaction_hash character varying(256) NOT NULL,
start_amount numeric NOT NULL,
end_amount numeric NOT NULL,
profit_amount numeric NOT NULL,
error character varying(256),
protocols character varying(256)[] DEFAULT '{}'::character varying[]
);
ALTER TABLE public.arbitrages OWNER TO postgres;
--
-- Name: arbitrages arbitrages_pkey; Type: CONSTRAINT; Schema: public; Owner: postgres
--
ALTER TABLE ONLY public.arbitrages
ADD CONSTRAINT arbitrages_pkey PRIMARY KEY (id);
--
-- PostgreSQL database dump complete
--

56
sql/liquidations.sql Normal file
View File

@ -0,0 +1,56 @@
--
-- PostgreSQL database dump
--
-- Dumped from database version 14.4
-- Dumped by pg_dump version 15.0 (Ubuntu 15.0-1.pgdg22.04+1)
SET statement_timeout = 0;
SET lock_timeout = 0;
SET idle_in_transaction_session_timeout = 0;
SET client_encoding = 'UTF8';
SET standard_conforming_strings = on;
SELECT pg_catalog.set_config('search_path', '', false);
SET check_function_bodies = false;
SET xmloption = content;
SET client_min_messages = warning;
SET row_security = off;
SET default_tablespace = '';
SET default_table_access_method = heap;
--
-- Name: liquidations; Type: TABLE; Schema: public; Owner: postgres
--
CREATE TABLE public.liquidations (
created_at timestamp without time zone DEFAULT now(),
liquidated_user character varying(256) NOT NULL,
liquidator_user character varying(256) NOT NULL,
debt_token_address character varying(256) NOT NULL,
debt_purchase_amount numeric NOT NULL,
received_amount numeric NOT NULL,
protocol character varying(256),
transaction_hash character varying(66) NOT NULL,
trace_address character varying(256) NOT NULL,
block_number numeric NOT NULL,
received_token_address character varying(256),
error character varying(256)
);
ALTER TABLE public.liquidations OWNER TO postgres;
--
-- Name: liquidations liquidations_pkey; Type: CONSTRAINT; Schema: public; Owner: postgres
--
ALTER TABLE ONLY public.liquidations
ADD CONSTRAINT liquidations_pkey PRIMARY KEY (transaction_hash, trace_address);
--
-- PostgreSQL database dump complete
--

39
sql/reserves.sql Normal file
View File

@ -0,0 +1,39 @@
--
-- PostgreSQL database dump
--
-- Dumped from database version 14.5 (Ubuntu 14.5-0ubuntu0.22.04.1)
-- Dumped by pg_dump version 14.5 (Ubuntu 14.5-0ubuntu0.22.04.1)
SET statement_timeout = 0;
SET lock_timeout = 0;
SET idle_in_transaction_session_timeout = 0;
SET client_encoding = 'UTF8';
SET standard_conforming_strings = on;
SELECT pg_catalog.set_config('search_path', '', false);
SET check_function_bodies = false;
SET xmloption = content;
SET client_min_messages = warning;
SET row_security = off;
SET default_tablespace = '';
SET default_table_access_method = heap;
--
-- Name: reserves; Type: TABLE; Schema: public; Owner: postgres
--
CREATE TABLE public.reserves (
pool_address character(42),
token0 character(42),
token1 character(42)
);
ALTER TABLE public.reserves OWNER TO postgres;
--
-- PostgreSQL database dump complete
--

59
sql/swaps.sql Normal file
View File

@ -0,0 +1,59 @@
--
-- PostgreSQL database dump
--
-- Dumped from database version 14.4
-- Dumped by pg_dump version 15.0 (Ubuntu 15.0-1.pgdg22.04+1)
SET statement_timeout = 0;
SET lock_timeout = 0;
SET idle_in_transaction_session_timeout = 0;
SET client_encoding = 'UTF8';
SET standard_conforming_strings = on;
SELECT pg_catalog.set_config('search_path', '', false);
SET check_function_bodies = false;
SET xmloption = content;
SET client_min_messages = warning;
SET row_security = off;
SET default_tablespace = '';
SET default_table_access_method = heap;
--
-- Name: swaps; Type: TABLE; Schema: public; Owner: postgres
--
CREATE TABLE public.swaps (
created_at timestamp without time zone DEFAULT now(),
abi_name character varying(1024) NOT NULL,
transaction_hash character varying(66) NOT NULL,
block_number numeric NOT NULL,
protocol character varying(256),
contract_address character varying(256) NOT NULL,
from_address character varying(256) NOT NULL,
to_address character varying(256) NOT NULL,
token_in_address character varying(256) NOT NULL,
token_in_amount numeric NOT NULL,
token_out_address character varying(256) NOT NULL,
token_out_amount numeric NOT NULL,
trace_address integer[] NOT NULL,
error character varying(256),
transaction_position numeric
);
ALTER TABLE public.swaps OWNER TO postgres;
--
-- Name: swaps swaps_pkey; Type: CONSTRAINT; Schema: public; Owner: postgres
--
ALTER TABLE ONLY public.swaps
ADD CONSTRAINT swaps_pkey PRIMARY KEY (block_number, transaction_hash, trace_address);
--
-- PostgreSQL database dump complete
--