diff --git a/Tiltfile b/Tiltfile index 0ccc758..707e8b1 100644 --- a/Tiltfile +++ b/Tiltfile @@ -95,21 +95,25 @@ local_resource( ) # if using local S3 exports -# k8s_yaml(secret_from_dict("mev-inspect-export", inputs = { -# "export-bucket-name" : "local-export", -# "export-bucket-region": "us-east-1", -# "export-aws-access-key-id": "foobar", -# "export-aws-secret-access-key": "foobar", -# })) -# -# helm_remote( -# "localstack", -# repo_name="localstack-charts", -# repo_url="https://localstack.github.io/helm-charts", -# ) -# -# local_resource( -# 'localstack-port-forward', -# serve_cmd='kubectl port-forward --namespace default svc/localstack 4566:4566', -# resource_deps=["localstack"] -# ) +#k8s_yaml(secret_from_dict("mev-inspect-export", inputs = { +# "export-bucket-name" : "local-export", +# "export-bucket-region": "us-east-1", +# "export-aws-access-key-id": "foobar", +# "export-aws-secret-access-key": "foobar", +#})) + +#helm_remote( +# "localstack", +# repo_name="localstack-charts", +# repo_url="https://localstack.github.io/helm-charts", +#) + +#local_resource( +# 'localstack-port-forward', +# serve_cmd='kubectl port-forward --namespace default svc/localstack 4566:4566', +# resource_deps=["localstack"] +#) + +#k8s_yaml(configmap_from_dict("mev-inspect-export", inputs = { +# "services": "s3", +#})) diff --git a/cli.py b/cli.py index 7c7ed03..f752f44 100644 --- a/cli.py +++ b/cli.py @@ -12,8 +12,7 @@ from mev_inspect.db import get_inspect_session, get_trace_session from mev_inspect.inspector import MEVInspector from mev_inspect.prices import fetch_prices, fetch_prices_range from mev_inspect.queue.broker import connect_broker -from mev_inspect.queue.tasks import inspect_many_blocks_task -from mev_inspect.s3_export import export_block_range +from mev_inspect.queue.tasks import export_block_task, inspect_many_blocks_task RPC_URL_ENV = "RPC_URL" @@ -134,11 +133,12 @@ def fetch_all_prices(): @cli.command() -@click.argument("after_block_number", type=int) -@click.argument("before_block_number", type=int) -def s3_export(after_block_number: int, before_block_number: int): - inspect_db_session = get_inspect_session() - export_block_range(inspect_db_session, after_block_number, before_block_number) +@click.argument("block_number", type=int) +def s3_export(block_number: int): + broker = connect_broker() + export_actor = dramatiq.actor(export_block_task, broker=broker) + logger.info(f"Sending block {block_number} for export") + export_actor.send(block_number) @cli.command() diff --git a/k8s/mev-inspect/templates/deployment.yaml b/k8s/mev-inspect/templates/deployment.yaml index b9da25e..7993652 100644 --- a/k8s/mev-inspect/templates/deployment.yaml +++ b/k8s/mev-inspect/templates/deployment.yaml @@ -118,7 +118,7 @@ spec: {{- range .Values.extraEnv }} - name: {{ .name }} value: {{ .value }} - {{- end }} + {{- end }} {{- with .Values.nodeSelector }} nodeSelector: {{- toYaml . | nindent 8 }} diff --git a/listener.py b/listener.py index 9da3a45..047083a 100644 --- a/listener.py +++ b/listener.py @@ -3,6 +3,7 @@ import logging import os import aiohttp +import dramatiq from mev_inspect.block import get_latest_block_number from mev_inspect.concurrency import coro @@ -13,6 +14,8 @@ from mev_inspect.crud.latest_block_update import ( from mev_inspect.db import get_inspect_session, get_trace_session from mev_inspect.inspector import MEVInspector from mev_inspect.provider import get_base_provider +from mev_inspect.queue.broker import connect_broker +from mev_inspect.queue.tasks import export_block_task from mev_inspect.signal_handler import GracefulKiller logging.basicConfig(filename="listener.log", filemode="a", level=logging.INFO) @@ -37,6 +40,9 @@ async def run(): inspect_db_session = get_inspect_session() trace_db_session = get_trace_session() + broker = connect_broker() + export_actor = dramatiq.actor(export_block_task, broker=broker) + inspector = MEVInspector(rpc) base_provider = get_base_provider(rpc) @@ -47,6 +53,7 @@ async def run(): trace_db_session, base_provider, healthcheck_url, + export_actor, ) logger.info("Stopping...") @@ -58,7 +65,9 @@ async def inspect_next_block( trace_db_session, base_provider, healthcheck_url, + export_actor, ): + latest_block_number = await get_latest_block_number(base_provider) last_written_block = find_latest_block_update(inspect_db_session) @@ -82,6 +91,9 @@ async def inspect_next_block( update_latest_block(inspect_db_session, block_number) + logger.info(f"Sending block {block_number} for export") + export_actor.send(block_number) + if healthcheck_url: await ping_healthcheck_url(healthcheck_url) else: diff --git a/mev_inspect/queue/tasks.py b/mev_inspect/queue/tasks.py index 78a4c3e..a55efd5 100644 --- a/mev_inspect/queue/tasks.py +++ b/mev_inspect/queue/tasks.py @@ -2,6 +2,8 @@ import asyncio import logging from contextlib import contextmanager +from mev_inspect.s3_export import export_block + from .middleware import DbMiddleware, InspectorMiddleware logger = logging.getLogger(__name__) @@ -23,6 +25,11 @@ def inspect_many_blocks_task( ) +def export_block_task(block_number: int): + with _session_scope(DbMiddleware.get_inspect_sessionmaker()) as inspect_db_session: + export_block(inspect_db_session, block_number) + + @contextmanager def _session_scope(Session=None): if Session is None: diff --git a/mev_inspect/s3_export.py b/mev_inspect/s3_export.py index 30ea439..422dfe8 100644 --- a/mev_inspect/s3_export.py +++ b/mev_inspect/s3_export.py @@ -16,25 +16,21 @@ EXPORT_AWS_SECRET_ACCESS_KEY_ENV = "EXPORT_AWS_SECRET_ACCESS_KEY" MEV_SUMMARY_EXPORT_QUERY = """ SELECT to_json(mev_summary) FROM mev_summary - WHERE - block_number >= :after_block_number AND - block_number < :before_block_number +WHERE + block_number = :block_number """ logger = logging.getLogger(__name__) -def export_block_range( - inspect_db_session, after_block_number: int, before_block_number -) -> None: +def export_block(inspect_db_session, block_number: int) -> None: export_bucket_name = get_export_bucket_name() client = get_s3_client() mev_summary_json_results = inspect_db_session.execute( statement=MEV_SUMMARY_EXPORT_QUERY, params={ - "after_block_number": after_block_number, - "before_block_number": before_block_number, + "block_number": block_number, }, ) @@ -42,7 +38,7 @@ def export_block_range( (f"{json.dumps(row)}\n".encode("utf-8") for (row,) in mev_summary_json_results) ) - key = f"mev_summary/flashbots_{after_block_number}_{before_block_number}.json" + key = f"mev_summary/flashbots_{block_number}.json" client.upload_fileobj( mev_summary_json_fileobj, diff --git a/worker.py b/worker.py index a717b47..e0fc895 100644 --- a/worker.py +++ b/worker.py @@ -10,7 +10,7 @@ from mev_inspect.queue.middleware import ( DbMiddleware, InspectorMiddleware, ) -from mev_inspect.queue.tasks import inspect_many_blocks_task +from mev_inspect.queue.tasks import export_block_task, inspect_many_blocks_task logging.basicConfig(stream=sys.stdout, level=logging.INFO) @@ -21,3 +21,4 @@ broker.add_middleware(InspectorMiddleware(os.environ["RPC_URL"])) dramatiq.set_broker(broker) dramatiq.actor(inspect_many_blocks_task) +dramatiq.actor(export_block_task)