diff --git a/Tiltfile b/Tiltfile index 093e77e..1d4bc22 100644 --- a/Tiltfile +++ b/Tiltfile @@ -11,6 +11,7 @@ helm_remote("postgresql", helm_remote("redis", repo_name="bitnami", repo_url="https://charts.bitnami.com/bitnami", + set=["global.redis.password=password"], ) k8s_yaml(configmap_from_dict("mev-inspect-rpc", inputs = { diff --git a/cli.py b/cli.py index d116d99..c9c815f 100644 --- a/cli.py +++ b/cli.py @@ -3,7 +3,6 @@ import os import sys import click -from worker import inspect_many_blocks_task from mev_inspect.concurrency import coro from mev_inspect.crud.prices import write_prices @@ -97,6 +96,10 @@ async def inspect_many_blocks_command( @click.argument("before_block", type=int) @click.argument("batch_size", type=int, default=10) def enqueue_many_blocks_command(after_block: int, before_block: int, batch_size: int): + from worker import ( # pylint: disable=import-outside-toplevel + inspect_many_blocks_task, + ) + for batch_after_block in range(after_block, before_block, batch_size): batch_before_block = min(batch_after_block + batch_size, before_block) logger.info(f"Sending {batch_after_block} to {batch_before_block}") diff --git a/k8s/mev-inspect-workers/templates/deployment.yaml b/k8s/mev-inspect-workers/templates/deployment.yaml index 92e89b0..8b979aa 100644 --- a/k8s/mev-inspect-workers/templates/deployment.yaml +++ b/k8s/mev-inspect-workers/templates/deployment.yaml @@ -37,7 +37,8 @@ spec: - ls - / initialDelaySeconds: 20 - periodSeconds: 5 + periodSeconds: 10 + timeoutSeconds: 5 resources: {{- toYaml .Values.resources | nindent 12 }} env: diff --git a/k8s/mev-inspect/templates/deployment.yaml b/k8s/mev-inspect/templates/deployment.yaml index 21af65a..fa2123f 100644 --- a/k8s/mev-inspect/templates/deployment.yaml +++ b/k8s/mev-inspect/templates/deployment.yaml @@ -37,7 +37,8 @@ spec: - ls - / initialDelaySeconds: 20 - periodSeconds: 5 + periodSeconds: 10 + timeoutSeconds: 5 resources: {{- toYaml .Values.resources | nindent 12 }} env: diff --git a/mev_inspect/arbitrages.py b/mev_inspect/arbitrages.py index 893c159..fe3491b 100644 --- a/mev_inspect/arbitrages.py +++ b/mev_inspect/arbitrages.py @@ -92,11 +92,21 @@ def _get_shortest_route( start_swap: Swap, end_swaps: List[Swap], all_swaps: List[Swap], + max_route_length: Optional[int] = None, ) -> Optional[List[Swap]]: + if len(end_swaps) == 0: + return None + + if max_route_length is not None and max_route_length < 2: + return None + for end_swap in end_swaps: if start_swap.token_out_address == end_swap.token_in_address: return [start_swap, end_swap] + if max_route_length is not None and max_route_length == 2: + return None + other_swaps = [ swap for swap in all_swaps if (swap is not start_swap and swap not in end_swaps) ] @@ -105,6 +115,9 @@ def _get_shortest_route( return None shortest_remaining_route = None + max_remaining_route_length = ( + None if max_route_length is None else max_route_length - 1 + ) for next_swap in other_swaps: if start_swap.token_out_address == next_swap.token_in_address and ( @@ -116,6 +129,7 @@ def _get_shortest_route( next_swap, end_swaps, other_swaps, + max_route_length=max_remaining_route_length, ) if shortest_from_next is not None and ( @@ -123,6 +137,7 @@ def _get_shortest_route( or len(shortest_from_next) < len(shortest_remaining_route) ): shortest_remaining_route = shortest_from_next + max_remaining_route_length = len(shortest_from_next) - 1 if shortest_remaining_route is None: return None diff --git a/mev_inspect/classifiers/specs/zero_ex.py b/mev_inspect/classifiers/specs/zero_ex.py index 2766299..a6c5fa8 100644 --- a/mev_inspect/classifiers/specs/zero_ex.py +++ b/mev_inspect/classifiers/specs/zero_ex.py @@ -28,11 +28,11 @@ class ZeroExSwapClassifier(SwapClassifier): if len(child_transfers) < 2: return None - token_in_address, token_in_amount = _get_0x_token_in_data( + token_out_address, token_out_amount = _get_0x_token_out_data( trace, child_transfers ) - token_out_address, token_out_amount = _get_0x_token_out_data(trace) + token_in_address, token_in_amount = _get_0x_token_in_data(trace) return Swap( abi_name=trace.abi_name, @@ -222,10 +222,10 @@ ZEROX_GENERIC_SPECS = [ ZEROX_CLASSIFIER_SPECS = ZEROX_CONTRACT_SPECS + ZEROX_GENERIC_SPECS -def _get_taker_token_in_amount( +def _get_taker_token_transfer_amount( trace: DecodedCallTrace, taker_address: str, - token_in_address: str, + token_address: str, child_transfers: List[Transfer], ) -> int: @@ -239,7 +239,7 @@ def _get_taker_token_in_amount( if taker_address == ANY_TAKER_ADDRESS: for transfer in child_transfers: - if transfer.token_address == token_in_address: + if transfer.token_address == token_address: return transfer.amount else: for transfer in child_transfers: @@ -249,12 +249,11 @@ def _get_taker_token_in_amount( raise RuntimeError("Unable to find transfers matching 0x order.") -def _get_0x_token_in_data( +def _get_0x_token_out_data( trace: DecodedCallTrace, child_transfers: List[Transfer] ) -> Tuple[str, int]: - order: List = trace.inputs["order"] - token_in_address = order[0] + token_out_address = order[0] if trace.function_signature in RFQ_SIGNATURES: taker_address = order[5] @@ -267,17 +266,16 @@ def _get_0x_token_in_data( f"0x orderbook function {trace.function_signature} is not supported" ) - token_in_amount = _get_taker_token_in_amount( - trace, taker_address, token_in_address, child_transfers + token_out_amount = _get_taker_token_transfer_amount( + trace, taker_address, token_out_address, child_transfers ) - return token_in_address, token_in_amount - - -def _get_0x_token_out_data(trace: DecodedCallTrace) -> Tuple[str, int]: - - order: List = trace.inputs["order"] - token_out_address = order[1] - token_out_amount = trace.inputs["takerTokenFillAmount"] - return token_out_address, token_out_amount + + +def _get_0x_token_in_data(trace: DecodedCallTrace) -> Tuple[str, int]: + order: List = trace.inputs["order"] + token_in_address = order[1] + token_in_amount = trace.inputs["takerTokenFillAmount"] + + return token_in_address, token_in_amount diff --git a/mev_inspect/crud/blocks.py b/mev_inspect/crud/blocks.py index fce9d2e..41199a5 100644 --- a/mev_inspect/crud/blocks.py +++ b/mev_inspect/crud/blocks.py @@ -1,6 +1,7 @@ from datetime import datetime from typing import List +from mev_inspect.db import write_as_csv from mev_inspect.schemas.blocks import Block @@ -28,16 +29,11 @@ def write_blocks( db_session, blocks: List[Block], ) -> None: - block_params = [ - { - "block_number": block.block_number, - "block_timestamp": datetime.fromtimestamp(block.block_timestamp), - } + items_generator = ( + ( + block.block_number, + datetime.fromtimestamp(block.block_timestamp), + ) for block in blocks - ] - - db_session.execute( - "INSERT INTO blocks (block_number, block_timestamp) VALUES (:block_number, :block_timestamp)", - params=block_params, ) - db_session.commit() + write_as_csv(db_session, "blocks", items_generator) diff --git a/mev_inspect/crud/traces.py b/mev_inspect/crud/traces.py index 0f099f6..903026e 100644 --- a/mev_inspect/crud/traces.py +++ b/mev_inspect/crud/traces.py @@ -1,6 +1,8 @@ import json +from datetime import datetime, timezone from typing import List +from mev_inspect.db import to_postgres_list, write_as_csv from mev_inspect.models.traces import ClassifiedTraceModel from mev_inspect.schemas.traces import ClassifiedTrace @@ -26,30 +28,35 @@ def write_classified_traces( db_session, classified_traces: List[ClassifiedTrace], ) -> None: - models = [] - for trace in classified_traces: - inputs_json = (json.loads(trace.json(include={"inputs"}))["inputs"],) - models.append( - ClassifiedTraceModel( - transaction_hash=trace.transaction_hash, - transaction_position=trace.transaction_position, - block_number=trace.block_number, - classification=trace.classification.value, - trace_type=trace.type.value, - trace_address=trace.trace_address, - protocol=str(trace.protocol), - abi_name=trace.abi_name, - function_name=trace.function_name, - function_signature=trace.function_signature, - inputs=inputs_json, - from_address=trace.from_address, - to_address=trace.to_address, - gas=trace.gas, - value=trace.value, - gas_used=trace.gas_used, - error=trace.error, - ) + classified_at = datetime.now(timezone.utc) + items = ( + ( + classified_at, + trace.transaction_hash, + trace.block_number, + trace.classification.value, + trace.type.value, + str(trace.protocol), + trace.abi_name, + trace.function_name, + trace.function_signature, + _inputs_as_json(trace), + trace.from_address, + trace.to_address, + trace.gas, + trace.value, + trace.gas_used, + trace.error, + to_postgres_list(trace.trace_address), + trace.transaction_position, ) + for trace in classified_traces + ) - db_session.bulk_save_objects(models) - db_session.commit() + write_as_csv(db_session, "classified_traces", items) + + +def _inputs_as_json(trace) -> str: + inputs = json.dumps(json.loads(trace.json(include={"inputs"}))["inputs"]) + inputs_with_array = f"[{inputs}]" + return inputs_with_array diff --git a/mev_inspect/db.py b/mev_inspect/db.py index 15ccdc3..dd7c66a 100644 --- a/mev_inspect/db.py +++ b/mev_inspect/db.py @@ -1,9 +1,11 @@ import os -from typing import Optional +from typing import Any, Iterable, List, Optional from sqlalchemy import create_engine, orm from sqlalchemy.orm import sessionmaker +from mev_inspect.string_io import StringIteratorIO + def get_trace_database_uri() -> Optional[str]: username = os.getenv("TRACE_DB_USER") @@ -63,3 +65,29 @@ def get_trace_session() -> Optional[orm.Session]: return Session() return None + + +def write_as_csv( + db_session, + table_name: str, + items: Iterable[Iterable[Any]], +) -> None: + csv_iterator = StringIteratorIO( + ("|".join(map(_clean_csv_value, item)) + "\n" for item in items) + ) + + with db_session.connection().connection.cursor() as cursor: + cursor.copy_from(csv_iterator, table_name, sep="|") + + +def _clean_csv_value(value: Optional[Any]) -> str: + if value is None: + return r"\N" + return str(value).replace("\n", "\\n") + + +def to_postgres_list(values: List[Any]) -> str: + if len(values) == 0: + return "{}" + + return "{" + ",".join(map(str, values)) + "}" diff --git a/mev_inspect/string_io.py b/mev_inspect/string_io.py new file mode 100644 index 0000000..37efb5f --- /dev/null +++ b/mev_inspect/string_io.py @@ -0,0 +1,40 @@ +"""This is taken from https://hakibenita.com/fast-load-data-python-postgresql""" + +import io +from typing import Iterator, Optional + + +class StringIteratorIO(io.TextIOBase): + def __init__(self, iter: Iterator[str]): + self._iter = iter + self._buff = "" + + def readable(self) -> bool: + return True + + def _read1(self, n: Optional[int] = None) -> str: + while not self._buff: + try: + self._buff = next(self._iter) + except StopIteration: + break + ret = self._buff[:n] + self._buff = self._buff[len(ret) :] + return ret + + def read(self, n: Optional[int] = None) -> str: + line = [] + if n is None or n < 0: + while True: + m = self._read1() + if not m: + break + line.append(m) + else: + while n > 0: + m = self._read1(n) + if not m: + break + n -= len(m) + line.append(m) + return "".join(line) diff --git a/tests/test_0x.py b/tests/test_0x.py index 194d280..9943dcf 100644 --- a/tests/test_0x.py +++ b/tests/test_0x.py @@ -20,10 +20,10 @@ def test_fillLimitOrder_swap(trace_classifier: TraceClassifier): contract_address="0xdef1c0ded9bec7f1a1670819833240f027b25eff", from_address="0x00000000000e1d0dabf7b7c7b68866fc940d0db8", to_address="0xdef1c0ded9bec7f1a1670819833240f027b25eff", - token_in_address="0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2", - token_in_amount=35000000000000000000, - token_out_address="0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48", - token_out_amount=143949683150, + token_out_address="0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2", + token_out_amount=35000000000000000000, + token_in_address="0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48", + token_in_amount=143949683150, protocol=Protocol.zero_ex, error=None, ) @@ -50,10 +50,10 @@ def test__fillLimitOrder_swap(trace_classifier: TraceClassifier): contract_address="0xdef1c0ded9bec7f1a1670819833240f027b25eff", from_address="0xdef1c0ded9bec7f1a1670819833240f027b25eff", to_address="0xdef1c0ded9bec7f1a1670819833240f027b25eff", - token_in_address="0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48", - token_in_amount=30000000, - token_out_address="0x9ff79c75ae2bcbe0ec63c0375a3ec90ff75bbe0f", - token_out_amount=100000001, + token_out_address="0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48", + token_out_amount=30000000, + token_in_address="0x9ff79c75ae2bcbe0ec63c0375a3ec90ff75bbe0f", + token_in_amount=100000001, protocol=Protocol.zero_ex, error=None, ) @@ -80,10 +80,10 @@ def test_RfqLimitOrder_swap(trace_classifier: TraceClassifier): contract_address="0xdef1c0ded9bec7f1a1670819833240f027b25eff", from_address="0xdef171fe48cf0115b1d80b88dc8eab59176fee57", to_address="0xdef1c0ded9bec7f1a1670819833240f027b25eff", - token_in_address="0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48", - token_in_amount=288948250430, - token_out_address="0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2", - token_out_amount=70500000000000000000, + token_out_address="0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48", + token_out_amount=288948250430, + token_in_address="0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2", + token_in_amount=70500000000000000000, protocol=Protocol.zero_ex, error=None, ) @@ -110,10 +110,10 @@ def test__RfqLimitOrder_swap(trace_classifier: TraceClassifier): contract_address="0xdef1c0ded9bec7f1a1670819833240f027b25eff", from_address="0xdef1c0ded9bec7f1a1670819833240f027b25eff", to_address="0xdef1c0ded9bec7f1a1670819833240f027b25eff", - token_in_address="0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2", - token_in_amount=979486121594935552, - token_out_address="0x95ad61b0a150d79219dcf64e1e6cc01f0b64c4ce", - token_out_amount=92404351093861841165644172, + token_out_address="0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2", + token_out_amount=979486121594935552, + token_in_address="0x95ad61b0a150d79219dcf64e1e6cc01f0b64c4ce", + token_in_amount=92404351093861841165644172, protocol=Protocol.zero_ex, error=None, ) diff --git a/worker.py b/worker.py index 380ba86..f798d77 100644 --- a/worker.py +++ b/worker.py @@ -38,6 +38,8 @@ class InspectorMiddleware(Middleware): def before_process_message( self, _broker, worker ): # pylint: disable=unused-argument + rpc = os.environ["RPC_URL"] + if not hasattr(thread_local, "inspector"): logger.info("Building inspector") thread_local.inspector = MEVInspector( @@ -49,7 +51,6 @@ class InspectorMiddleware(Middleware): logger.info("Inspector already exists") -rpc = os.environ["RPC_URL"] broker = RedisBroker(host="redis-master", password=os.environ["REDIS_PASSWORD"]) broker.add_middleware(AsyncMiddleware()) broker.add_middleware(InspectorMiddleware())