Compare commits
10 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
5b9521e822 | ||
|
088abe6591 | ||
|
b93da2519e | ||
|
f3eee00ecf | ||
|
aac34cfcbd | ||
|
f1370f7bdc | ||
|
8496c5ab45 | ||
|
e01de19e73 | ||
|
f6d5ca1179 | ||
|
c8f8fb2aa3 |
9
Tiltfile
9
Tiltfile
@ -2,6 +2,11 @@ load("ext://helm_remote", "helm_remote")
|
|||||||
load("ext://secret", "secret_from_dict")
|
load("ext://secret", "secret_from_dict")
|
||||||
load("ext://configmap", "configmap_from_dict")
|
load("ext://configmap", "configmap_from_dict")
|
||||||
|
|
||||||
|
helm_remote("localstack",
|
||||||
|
repo_name="localstack-charts",
|
||||||
|
repo_url="https://localstack.github.io/helm-charts",
|
||||||
|
)
|
||||||
|
|
||||||
helm_remote("postgresql",
|
helm_remote("postgresql",
|
||||||
repo_name="bitnami",
|
repo_name="bitnami",
|
||||||
repo_url="https://charts.bitnami.com/bitnami",
|
repo_url="https://charts.bitnami.com/bitnami",
|
||||||
@ -67,3 +72,7 @@ local_resource(
|
|||||||
serve_cmd='kubectl port-forward --namespace default svc/postgresql 5432:5432',
|
serve_cmd='kubectl port-forward --namespace default svc/postgresql 5432:5432',
|
||||||
resource_deps=["postgresql-postgresql"]
|
resource_deps=["postgresql-postgresql"]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
k8s_yaml(configmap_from_dict("mev-inspect-s3", inputs = {
|
||||||
|
"uri" : "https://s3.us-east-1.amazonaws.com/mybucket/"
|
||||||
|
}))
|
||||||
|
@ -0,0 +1,27 @@
|
|||||||
|
"""Create latest_s3_block table
|
||||||
|
|
||||||
|
Revision ID: ce116d0badc8
|
||||||
|
Revises: 5c5375de15fd
|
||||||
|
Create Date: 2022-01-31 23:36:34.971594
|
||||||
|
|
||||||
|
"""
|
||||||
|
import sqlalchemy as sa
|
||||||
|
from alembic import op
|
||||||
|
|
||||||
|
# revision identifiers, used by Alembic.
|
||||||
|
revision = "ce116d0badc8"
|
||||||
|
down_revision = "5c5375de15fd"
|
||||||
|
branch_labels = None
|
||||||
|
depends_on = None
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade():
|
||||||
|
op.create_table(
|
||||||
|
"latest_s3_block",
|
||||||
|
sa.Column("block_number", sa.Numeric, nullable=False),
|
||||||
|
sa.Column("updated_at", sa.TIMESTAMP, server_default=sa.func.now()),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade():
|
||||||
|
op.drop_table("latest_s3_block")
|
31
cli.py
31
cli.py
@ -8,10 +8,12 @@ import click
|
|||||||
from mev_inspect.concurrency import coro
|
from mev_inspect.concurrency import coro
|
||||||
from mev_inspect.crud.prices import write_prices
|
from mev_inspect.crud.prices import write_prices
|
||||||
from mev_inspect.db import get_inspect_session, get_trace_session
|
from mev_inspect.db import get_inspect_session, get_trace_session
|
||||||
|
from mev_inspect.export import s3_export, s3_export_many
|
||||||
from mev_inspect.inspector import MEVInspector
|
from mev_inspect.inspector import MEVInspector
|
||||||
from mev_inspect.prices import fetch_prices, fetch_prices_range
|
from mev_inspect.prices import fetch_prices, fetch_prices_range
|
||||||
|
|
||||||
RPC_URL_ENV = "RPC_URL"
|
RPC_URL_ENV = "RPC_URL"
|
||||||
|
S3_URI_ENV = "S3_URI"
|
||||||
|
|
||||||
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
|
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@ -131,9 +133,38 @@ def fetch_range(after: datetime, before: datetime):
|
|||||||
write_prices(inspect_db_session, prices)
|
write_prices(inspect_db_session, prices)
|
||||||
|
|
||||||
|
|
||||||
|
@cli.command()
|
||||||
|
@click.argument("block_number", type=int)
|
||||||
|
@click.option("--uri", default=lambda: os.environ.get(S3_URI_ENV, ""))
|
||||||
|
def s3_export_command(block_number: int, uri: str):
|
||||||
|
inspect_db_session = get_inspect_session()
|
||||||
|
|
||||||
|
logger.info(f"Exporting block {block_number}")
|
||||||
|
s3_export(inspect_db_session, block_number, uri)
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
@cli.command()
|
||||||
|
@click.argument("after_block", type=int)
|
||||||
|
@click.argument("before_block", type=int)
|
||||||
|
@click.option("--uri", default=lambda: os.environ.get(S3_URI_ENV, ""))
|
||||||
|
def s3_export_many_command(after_block: int, before_block: int, uri: str):
|
||||||
|
inspect_db_session = get_inspect_session()
|
||||||
|
|
||||||
|
logger.info(f"Exporting blocks {after_block} to {before_block}")
|
||||||
|
s3_export_many(inspect_db_session, after_block, before_block, uri)
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
def get_rpc_url() -> str:
|
def get_rpc_url() -> str:
|
||||||
return os.environ["RPC_URL"]
|
return os.environ["RPC_URL"]
|
||||||
|
|
||||||
|
|
||||||
|
def get_s3_urii() -> str:
|
||||||
|
return os.environ["S3_URI"]
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
cli()
|
cli()
|
||||||
|
@ -91,6 +91,11 @@ spec:
|
|||||||
name: mev-inspect-listener-healthcheck
|
name: mev-inspect-listener-healthcheck
|
||||||
key: url
|
key: url
|
||||||
optional: true
|
optional: true
|
||||||
|
- name: S3_URI
|
||||||
|
valueFrom:
|
||||||
|
configMapKeyRef:
|
||||||
|
name: mev-inspect-s3
|
||||||
|
key: uri
|
||||||
{{- with .Values.nodeSelector }}
|
{{- with .Values.nodeSelector }}
|
||||||
nodeSelector:
|
nodeSelector:
|
||||||
{{- toYaml . | nindent 8 }}
|
{{- toYaml . | nindent 8 }}
|
||||||
|
26
mev
26
mev
@ -94,12 +94,36 @@ case "$1" in
|
|||||||
exit 1
|
exit 1
|
||||||
esac
|
esac
|
||||||
;;
|
;;
|
||||||
|
export)
|
||||||
|
shift
|
||||||
|
case "$1" in
|
||||||
|
s3-export)
|
||||||
|
block_number=$2
|
||||||
|
uri=$3
|
||||||
|
echo "Exporting block"
|
||||||
|
kubectl exec -ti deploy/mev-inspect -- \
|
||||||
|
poetry run s3-export $block_number $uri
|
||||||
|
|
||||||
|
;;
|
||||||
|
s3-export-many)
|
||||||
|
after_block=$2
|
||||||
|
before_block=$3
|
||||||
|
base_uri=$4
|
||||||
|
echo "Exporting blocks"
|
||||||
|
kubectl exec -ti deploy/mev-inspect -- \
|
||||||
|
poetry run s3-export-many $after_block $before_block $base_uri
|
||||||
|
;;
|
||||||
|
*)
|
||||||
|
echo "export usage: "$1" {s3-export}"
|
||||||
|
exit 1
|
||||||
|
esac
|
||||||
|
;;
|
||||||
exec)
|
exec)
|
||||||
shift
|
shift
|
||||||
kubectl exec -ti deploy/mev-inspect -- $@
|
kubectl exec -ti deploy/mev-inspect -- $@
|
||||||
;;
|
;;
|
||||||
*)
|
*)
|
||||||
echo "Usage: "$1" {db|backfill|inspect|test}"
|
echo "Usage: "$1" {db|redis|listener|backfill|inspect|inspect-many|test|fetch|prices|export}"
|
||||||
exit 1
|
exit 1
|
||||||
esac
|
esac
|
||||||
|
|
||||||
|
25
mev_inspect/crud/latest_s3_block.py
Normal file
25
mev_inspect/crud/latest_s3_block.py
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
|
||||||
|
def find_latest_s3_block(db_session) -> Optional[int]:
|
||||||
|
result = db_session.execute(
|
||||||
|
"SELECT block_number FROM latest_s3_block LIMIT 1"
|
||||||
|
).one_or_none()
|
||||||
|
if result is None:
|
||||||
|
return None
|
||||||
|
else:
|
||||||
|
return int(result[0])
|
||||||
|
|
||||||
|
|
||||||
|
def update_latest_s3_block(db_session, block_number) -> None:
|
||||||
|
db_session.execute(
|
||||||
|
"""
|
||||||
|
UPDATE latest_s3_block
|
||||||
|
SET block_number = :block_number, updated_at = current_timestamp;
|
||||||
|
INSERT INTO latest_s3_block
|
||||||
|
(block_number, updated_at)
|
||||||
|
SELECT :block_number, current_timestamp
|
||||||
|
WHERE NOT EXISTS (SELECT 1 FROM latest_s3_blocks);
|
||||||
|
""",
|
||||||
|
params={"block_number": block_number},
|
||||||
|
)
|
64
mev_inspect/export.py
Normal file
64
mev_inspect/export.py
Normal file
@ -0,0 +1,64 @@
|
|||||||
|
from mev_inspect.crud.latest_s3_block import (
|
||||||
|
find_latest_s3_block,
|
||||||
|
update_latest_s3_block,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def s3_export(
|
||||||
|
db_session,
|
||||||
|
block_number: int,
|
||||||
|
uri: str,
|
||||||
|
) -> None:
|
||||||
|
"""Export block to S3"""
|
||||||
|
|
||||||
|
latest_s3_block = find_latest_s3_block(db_session)
|
||||||
|
|
||||||
|
if latest_s3_block is not None:
|
||||||
|
|
||||||
|
if block_number > latest_s3_block:
|
||||||
|
|
||||||
|
db_session.execute(
|
||||||
|
"""
|
||||||
|
SELECT * FROM aws_s3.query_export_to_s3(
|
||||||
|
'SELECT *
|
||||||
|
FROM mev_summary
|
||||||
|
WHERE block_number={block_number}',
|
||||||
|
:{uri}
|
||||||
|
""",
|
||||||
|
params={"block_number": block_number, "uri": uri},
|
||||||
|
)
|
||||||
|
update_latest_s3_block(db_session, block_number)
|
||||||
|
|
||||||
|
|
||||||
|
def s3_export_many(
|
||||||
|
db_session,
|
||||||
|
after_block: int,
|
||||||
|
before_block: int,
|
||||||
|
uri: str,
|
||||||
|
) -> None:
|
||||||
|
"""Export block range to S3"""
|
||||||
|
|
||||||
|
latest_s3_block = find_latest_s3_block(db_session)
|
||||||
|
|
||||||
|
for block_number in range(after_block, before_block):
|
||||||
|
|
||||||
|
if latest_s3_block is not None:
|
||||||
|
|
||||||
|
if block_number > latest_s3_block:
|
||||||
|
|
||||||
|
uri += f"/{block_number}"
|
||||||
|
db_session.execute(
|
||||||
|
"""
|
||||||
|
SELECT * FROM aws_s3.query_export_to_s3(
|
||||||
|
'SELECT *
|
||||||
|
FROM mev_summary
|
||||||
|
WHERE block_number={block_number}
|
||||||
|
:{uri}
|
||||||
|
""",
|
||||||
|
params={
|
||||||
|
"after_block": after_block,
|
||||||
|
"before_block": before_block,
|
||||||
|
"uri": uri,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
update_latest_s3_block(db_session, block_number)
|
@ -40,6 +40,8 @@ enqueue-many-blocks = 'cli:enqueue_many_blocks_command'
|
|||||||
fetch-block = 'cli:fetch_block_command'
|
fetch-block = 'cli:fetch_block_command'
|
||||||
fetch-all-prices = 'cli:fetch_all_prices'
|
fetch-all-prices = 'cli:fetch_all_prices'
|
||||||
fetch-range = 'cli:fetch_range'
|
fetch-range = 'cli:fetch_range'
|
||||||
|
s3-export = 'cli:s3_export_command'
|
||||||
|
s3-export-many = 'cli:s3_export_many_command'
|
||||||
|
|
||||||
[tool.black]
|
[tool.black]
|
||||||
exclude = '''
|
exclude = '''
|
||||||
|
Loading…
x
Reference in New Issue
Block a user