Merge branch 'main' into improve_dockerfile

This commit is contained in:
Tomislav Mikulin 2022-01-07 09:30:35 +01:00
commit 02fb01dfb8
12 changed files with 167 additions and 76 deletions

View File

@ -11,6 +11,7 @@ helm_remote("postgresql",
helm_remote("redis", helm_remote("redis",
repo_name="bitnami", repo_name="bitnami",
repo_url="https://charts.bitnami.com/bitnami", repo_url="https://charts.bitnami.com/bitnami",
set=["global.redis.password=password"],
) )
k8s_yaml(configmap_from_dict("mev-inspect-rpc", inputs = { k8s_yaml(configmap_from_dict("mev-inspect-rpc", inputs = {

5
cli.py
View File

@ -3,7 +3,6 @@ import os
import sys import sys
import click import click
from worker import inspect_many_blocks_task
from mev_inspect.concurrency import coro from mev_inspect.concurrency import coro
from mev_inspect.crud.prices import write_prices from mev_inspect.crud.prices import write_prices
@ -97,6 +96,10 @@ async def inspect_many_blocks_command(
@click.argument("before_block", type=int) @click.argument("before_block", type=int)
@click.argument("batch_size", type=int, default=10) @click.argument("batch_size", type=int, default=10)
def enqueue_many_blocks_command(after_block: int, before_block: int, batch_size: int): def enqueue_many_blocks_command(after_block: int, before_block: int, batch_size: int):
from worker import ( # pylint: disable=import-outside-toplevel
inspect_many_blocks_task,
)
for batch_after_block in range(after_block, before_block, batch_size): for batch_after_block in range(after_block, before_block, batch_size):
batch_before_block = min(batch_after_block + batch_size, before_block) batch_before_block = min(batch_after_block + batch_size, before_block)
logger.info(f"Sending {batch_after_block} to {batch_before_block}") logger.info(f"Sending {batch_after_block} to {batch_before_block}")

View File

@ -37,7 +37,8 @@ spec:
- ls - ls
- / - /
initialDelaySeconds: 20 initialDelaySeconds: 20
periodSeconds: 5 periodSeconds: 10
timeoutSeconds: 5
resources: resources:
{{- toYaml .Values.resources | nindent 12 }} {{- toYaml .Values.resources | nindent 12 }}
env: env:

View File

@ -37,7 +37,8 @@ spec:
- ls - ls
- / - /
initialDelaySeconds: 20 initialDelaySeconds: 20
periodSeconds: 5 periodSeconds: 10
timeoutSeconds: 5
resources: resources:
{{- toYaml .Values.resources | nindent 12 }} {{- toYaml .Values.resources | nindent 12 }}
env: env:

View File

@ -92,11 +92,21 @@ def _get_shortest_route(
start_swap: Swap, start_swap: Swap,
end_swaps: List[Swap], end_swaps: List[Swap],
all_swaps: List[Swap], all_swaps: List[Swap],
max_route_length: Optional[int] = None,
) -> Optional[List[Swap]]: ) -> Optional[List[Swap]]:
if len(end_swaps) == 0:
return None
if max_route_length is not None and max_route_length < 2:
return None
for end_swap in end_swaps: for end_swap in end_swaps:
if start_swap.token_out_address == end_swap.token_in_address: if start_swap.token_out_address == end_swap.token_in_address:
return [start_swap, end_swap] return [start_swap, end_swap]
if max_route_length is not None and max_route_length == 2:
return None
other_swaps = [ other_swaps = [
swap for swap in all_swaps if (swap is not start_swap and swap not in end_swaps) swap for swap in all_swaps if (swap is not start_swap and swap not in end_swaps)
] ]
@ -105,6 +115,9 @@ def _get_shortest_route(
return None return None
shortest_remaining_route = None shortest_remaining_route = None
max_remaining_route_length = (
None if max_route_length is None else max_route_length - 1
)
for next_swap in other_swaps: for next_swap in other_swaps:
if start_swap.token_out_address == next_swap.token_in_address and ( if start_swap.token_out_address == next_swap.token_in_address and (
@ -116,6 +129,7 @@ def _get_shortest_route(
next_swap, next_swap,
end_swaps, end_swaps,
other_swaps, other_swaps,
max_route_length=max_remaining_route_length,
) )
if shortest_from_next is not None and ( if shortest_from_next is not None and (
@ -123,6 +137,7 @@ def _get_shortest_route(
or len(shortest_from_next) < len(shortest_remaining_route) or len(shortest_from_next) < len(shortest_remaining_route)
): ):
shortest_remaining_route = shortest_from_next shortest_remaining_route = shortest_from_next
max_remaining_route_length = len(shortest_from_next) - 1
if shortest_remaining_route is None: if shortest_remaining_route is None:
return None return None

View File

@ -28,11 +28,11 @@ class ZeroExSwapClassifier(SwapClassifier):
if len(child_transfers) < 2: if len(child_transfers) < 2:
return None return None
token_in_address, token_in_amount = _get_0x_token_in_data( token_out_address, token_out_amount = _get_0x_token_out_data(
trace, child_transfers trace, child_transfers
) )
token_out_address, token_out_amount = _get_0x_token_out_data(trace) token_in_address, token_in_amount = _get_0x_token_in_data(trace)
return Swap( return Swap(
abi_name=trace.abi_name, abi_name=trace.abi_name,
@ -222,10 +222,10 @@ ZEROX_GENERIC_SPECS = [
ZEROX_CLASSIFIER_SPECS = ZEROX_CONTRACT_SPECS + ZEROX_GENERIC_SPECS ZEROX_CLASSIFIER_SPECS = ZEROX_CONTRACT_SPECS + ZEROX_GENERIC_SPECS
def _get_taker_token_in_amount( def _get_taker_token_transfer_amount(
trace: DecodedCallTrace, trace: DecodedCallTrace,
taker_address: str, taker_address: str,
token_in_address: str, token_address: str,
child_transfers: List[Transfer], child_transfers: List[Transfer],
) -> int: ) -> int:
@ -239,7 +239,7 @@ def _get_taker_token_in_amount(
if taker_address == ANY_TAKER_ADDRESS: if taker_address == ANY_TAKER_ADDRESS:
for transfer in child_transfers: for transfer in child_transfers:
if transfer.token_address == token_in_address: if transfer.token_address == token_address:
return transfer.amount return transfer.amount
else: else:
for transfer in child_transfers: for transfer in child_transfers:
@ -249,12 +249,11 @@ def _get_taker_token_in_amount(
raise RuntimeError("Unable to find transfers matching 0x order.") raise RuntimeError("Unable to find transfers matching 0x order.")
def _get_0x_token_in_data( def _get_0x_token_out_data(
trace: DecodedCallTrace, child_transfers: List[Transfer] trace: DecodedCallTrace, child_transfers: List[Transfer]
) -> Tuple[str, int]: ) -> Tuple[str, int]:
order: List = trace.inputs["order"] order: List = trace.inputs["order"]
token_in_address = order[0] token_out_address = order[0]
if trace.function_signature in RFQ_SIGNATURES: if trace.function_signature in RFQ_SIGNATURES:
taker_address = order[5] taker_address = order[5]
@ -267,17 +266,16 @@ def _get_0x_token_in_data(
f"0x orderbook function {trace.function_signature} is not supported" f"0x orderbook function {trace.function_signature} is not supported"
) )
token_in_amount = _get_taker_token_in_amount( token_out_amount = _get_taker_token_transfer_amount(
trace, taker_address, token_in_address, child_transfers trace, taker_address, token_out_address, child_transfers
) )
return token_in_address, token_in_amount
def _get_0x_token_out_data(trace: DecodedCallTrace) -> Tuple[str, int]:
order: List = trace.inputs["order"]
token_out_address = order[1]
token_out_amount = trace.inputs["takerTokenFillAmount"]
return token_out_address, token_out_amount return token_out_address, token_out_amount
def _get_0x_token_in_data(trace: DecodedCallTrace) -> Tuple[str, int]:
order: List = trace.inputs["order"]
token_in_address = order[1]
token_in_amount = trace.inputs["takerTokenFillAmount"]
return token_in_address, token_in_amount

View File

@ -1,6 +1,7 @@
from datetime import datetime from datetime import datetime
from typing import List from typing import List
from mev_inspect.db import write_as_csv
from mev_inspect.schemas.blocks import Block from mev_inspect.schemas.blocks import Block
@ -28,16 +29,11 @@ def write_blocks(
db_session, db_session,
blocks: List[Block], blocks: List[Block],
) -> None: ) -> None:
block_params = [ items_generator = (
{ (
"block_number": block.block_number, block.block_number,
"block_timestamp": datetime.fromtimestamp(block.block_timestamp), datetime.fromtimestamp(block.block_timestamp),
} )
for block in blocks for block in blocks
]
db_session.execute(
"INSERT INTO blocks (block_number, block_timestamp) VALUES (:block_number, :block_timestamp)",
params=block_params,
) )
db_session.commit() write_as_csv(db_session, "blocks", items_generator)

View File

@ -1,6 +1,8 @@
import json import json
from datetime import datetime, timezone
from typing import List from typing import List
from mev_inspect.db import to_postgres_list, write_as_csv
from mev_inspect.models.traces import ClassifiedTraceModel from mev_inspect.models.traces import ClassifiedTraceModel
from mev_inspect.schemas.traces import ClassifiedTrace from mev_inspect.schemas.traces import ClassifiedTrace
@ -26,30 +28,35 @@ def write_classified_traces(
db_session, db_session,
classified_traces: List[ClassifiedTrace], classified_traces: List[ClassifiedTrace],
) -> None: ) -> None:
models = [] classified_at = datetime.now(timezone.utc)
for trace in classified_traces: items = (
inputs_json = (json.loads(trace.json(include={"inputs"}))["inputs"],) (
models.append( classified_at,
ClassifiedTraceModel( trace.transaction_hash,
transaction_hash=trace.transaction_hash, trace.block_number,
transaction_position=trace.transaction_position, trace.classification.value,
block_number=trace.block_number, trace.type.value,
classification=trace.classification.value, str(trace.protocol),
trace_type=trace.type.value, trace.abi_name,
trace_address=trace.trace_address, trace.function_name,
protocol=str(trace.protocol), trace.function_signature,
abi_name=trace.abi_name, _inputs_as_json(trace),
function_name=trace.function_name, trace.from_address,
function_signature=trace.function_signature, trace.to_address,
inputs=inputs_json, trace.gas,
from_address=trace.from_address, trace.value,
to_address=trace.to_address, trace.gas_used,
gas=trace.gas, trace.error,
value=trace.value, to_postgres_list(trace.trace_address),
gas_used=trace.gas_used, trace.transaction_position,
error=trace.error,
)
) )
for trace in classified_traces
)
db_session.bulk_save_objects(models) write_as_csv(db_session, "classified_traces", items)
db_session.commit()
def _inputs_as_json(trace) -> str:
inputs = json.dumps(json.loads(trace.json(include={"inputs"}))["inputs"])
inputs_with_array = f"[{inputs}]"
return inputs_with_array

View File

@ -1,9 +1,11 @@
import os import os
from typing import Optional from typing import Any, Iterable, List, Optional
from sqlalchemy import create_engine, orm from sqlalchemy import create_engine, orm
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
from mev_inspect.string_io import StringIteratorIO
def get_trace_database_uri() -> Optional[str]: def get_trace_database_uri() -> Optional[str]:
username = os.getenv("TRACE_DB_USER") username = os.getenv("TRACE_DB_USER")
@ -63,3 +65,29 @@ def get_trace_session() -> Optional[orm.Session]:
return Session() return Session()
return None return None
def write_as_csv(
db_session,
table_name: str,
items: Iterable[Iterable[Any]],
) -> None:
csv_iterator = StringIteratorIO(
("|".join(map(_clean_csv_value, item)) + "\n" for item in items)
)
with db_session.connection().connection.cursor() as cursor:
cursor.copy_from(csv_iterator, table_name, sep="|")
def _clean_csv_value(value: Optional[Any]) -> str:
if value is None:
return r"\N"
return str(value).replace("\n", "\\n")
def to_postgres_list(values: List[Any]) -> str:
if len(values) == 0:
return "{}"
return "{" + ",".join(map(str, values)) + "}"

40
mev_inspect/string_io.py Normal file
View File

@ -0,0 +1,40 @@
"""This is taken from https://hakibenita.com/fast-load-data-python-postgresql"""
import io
from typing import Iterator, Optional
class StringIteratorIO(io.TextIOBase):
def __init__(self, iter: Iterator[str]):
self._iter = iter
self._buff = ""
def readable(self) -> bool:
return True
def _read1(self, n: Optional[int] = None) -> str:
while not self._buff:
try:
self._buff = next(self._iter)
except StopIteration:
break
ret = self._buff[:n]
self._buff = self._buff[len(ret) :]
return ret
def read(self, n: Optional[int] = None) -> str:
line = []
if n is None or n < 0:
while True:
m = self._read1()
if not m:
break
line.append(m)
else:
while n > 0:
m = self._read1(n)
if not m:
break
n -= len(m)
line.append(m)
return "".join(line)

View File

@ -20,10 +20,10 @@ def test_fillLimitOrder_swap(trace_classifier: TraceClassifier):
contract_address="0xdef1c0ded9bec7f1a1670819833240f027b25eff", contract_address="0xdef1c0ded9bec7f1a1670819833240f027b25eff",
from_address="0x00000000000e1d0dabf7b7c7b68866fc940d0db8", from_address="0x00000000000e1d0dabf7b7c7b68866fc940d0db8",
to_address="0xdef1c0ded9bec7f1a1670819833240f027b25eff", to_address="0xdef1c0ded9bec7f1a1670819833240f027b25eff",
token_in_address="0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2", token_out_address="0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",
token_in_amount=35000000000000000000, token_out_amount=35000000000000000000,
token_out_address="0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48", token_in_address="0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48",
token_out_amount=143949683150, token_in_amount=143949683150,
protocol=Protocol.zero_ex, protocol=Protocol.zero_ex,
error=None, error=None,
) )
@ -50,10 +50,10 @@ def test__fillLimitOrder_swap(trace_classifier: TraceClassifier):
contract_address="0xdef1c0ded9bec7f1a1670819833240f027b25eff", contract_address="0xdef1c0ded9bec7f1a1670819833240f027b25eff",
from_address="0xdef1c0ded9bec7f1a1670819833240f027b25eff", from_address="0xdef1c0ded9bec7f1a1670819833240f027b25eff",
to_address="0xdef1c0ded9bec7f1a1670819833240f027b25eff", to_address="0xdef1c0ded9bec7f1a1670819833240f027b25eff",
token_in_address="0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48", token_out_address="0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48",
token_in_amount=30000000, token_out_amount=30000000,
token_out_address="0x9ff79c75ae2bcbe0ec63c0375a3ec90ff75bbe0f", token_in_address="0x9ff79c75ae2bcbe0ec63c0375a3ec90ff75bbe0f",
token_out_amount=100000001, token_in_amount=100000001,
protocol=Protocol.zero_ex, protocol=Protocol.zero_ex,
error=None, error=None,
) )
@ -80,10 +80,10 @@ def test_RfqLimitOrder_swap(trace_classifier: TraceClassifier):
contract_address="0xdef1c0ded9bec7f1a1670819833240f027b25eff", contract_address="0xdef1c0ded9bec7f1a1670819833240f027b25eff",
from_address="0xdef171fe48cf0115b1d80b88dc8eab59176fee57", from_address="0xdef171fe48cf0115b1d80b88dc8eab59176fee57",
to_address="0xdef1c0ded9bec7f1a1670819833240f027b25eff", to_address="0xdef1c0ded9bec7f1a1670819833240f027b25eff",
token_in_address="0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48", token_out_address="0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48",
token_in_amount=288948250430, token_out_amount=288948250430,
token_out_address="0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2", token_in_address="0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",
token_out_amount=70500000000000000000, token_in_amount=70500000000000000000,
protocol=Protocol.zero_ex, protocol=Protocol.zero_ex,
error=None, error=None,
) )
@ -110,10 +110,10 @@ def test__RfqLimitOrder_swap(trace_classifier: TraceClassifier):
contract_address="0xdef1c0ded9bec7f1a1670819833240f027b25eff", contract_address="0xdef1c0ded9bec7f1a1670819833240f027b25eff",
from_address="0xdef1c0ded9bec7f1a1670819833240f027b25eff", from_address="0xdef1c0ded9bec7f1a1670819833240f027b25eff",
to_address="0xdef1c0ded9bec7f1a1670819833240f027b25eff", to_address="0xdef1c0ded9bec7f1a1670819833240f027b25eff",
token_in_address="0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2", token_out_address="0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",
token_in_amount=979486121594935552, token_out_amount=979486121594935552,
token_out_address="0x95ad61b0a150d79219dcf64e1e6cc01f0b64c4ce", token_in_address="0x95ad61b0a150d79219dcf64e1e6cc01f0b64c4ce",
token_out_amount=92404351093861841165644172, token_in_amount=92404351093861841165644172,
protocol=Protocol.zero_ex, protocol=Protocol.zero_ex,
error=None, error=None,
) )

View File

@ -38,6 +38,8 @@ class InspectorMiddleware(Middleware):
def before_process_message( def before_process_message(
self, _broker, worker self, _broker, worker
): # pylint: disable=unused-argument ): # pylint: disable=unused-argument
rpc = os.environ["RPC_URL"]
if not hasattr(thread_local, "inspector"): if not hasattr(thread_local, "inspector"):
logger.info("Building inspector") logger.info("Building inspector")
thread_local.inspector = MEVInspector( thread_local.inspector = MEVInspector(
@ -49,7 +51,6 @@ class InspectorMiddleware(Middleware):
logger.info("Inspector already exists") logger.info("Inspector already exists")
rpc = os.environ["RPC_URL"]
broker = RedisBroker(host="redis-master", password=os.environ["REDIS_PASSWORD"]) broker = RedisBroker(host="redis-master", password=os.environ["REDIS_PASSWORD"])
broker.add_middleware(AsyncMiddleware()) broker.add_middleware(AsyncMiddleware())
broker.add_middleware(InspectorMiddleware()) broker.add_middleware(InspectorMiddleware())