Compare commits
10 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
5b9521e822 | ||
|
088abe6591 | ||
|
b93da2519e | ||
|
f3eee00ecf | ||
|
aac34cfcbd | ||
|
f1370f7bdc | ||
|
8496c5ab45 | ||
|
e01de19e73 | ||
|
f6d5ca1179 | ||
|
c8f8fb2aa3 |
9
Tiltfile
9
Tiltfile
@ -2,6 +2,11 @@ load("ext://helm_remote", "helm_remote")
|
||||
load("ext://secret", "secret_from_dict")
|
||||
load("ext://configmap", "configmap_from_dict")
|
||||
|
||||
helm_remote("localstack",
|
||||
repo_name="localstack-charts",
|
||||
repo_url="https://localstack.github.io/helm-charts",
|
||||
)
|
||||
|
||||
helm_remote("postgresql",
|
||||
repo_name="bitnami",
|
||||
repo_url="https://charts.bitnami.com/bitnami",
|
||||
@ -67,3 +72,7 @@ local_resource(
|
||||
serve_cmd='kubectl port-forward --namespace default svc/postgresql 5432:5432',
|
||||
resource_deps=["postgresql-postgresql"]
|
||||
)
|
||||
|
||||
k8s_yaml(configmap_from_dict("mev-inspect-s3", inputs = {
|
||||
"uri" : "https://s3.us-east-1.amazonaws.com/mybucket/"
|
||||
}))
|
||||
|
@ -0,0 +1,27 @@
|
||||
"""Create latest_s3_block table
|
||||
|
||||
Revision ID: ce116d0badc8
|
||||
Revises: 5c5375de15fd
|
||||
Create Date: 2022-01-31 23:36:34.971594
|
||||
|
||||
"""
|
||||
import sqlalchemy as sa
|
||||
from alembic import op
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = "ce116d0badc8"
|
||||
down_revision = "5c5375de15fd"
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade():
|
||||
op.create_table(
|
||||
"latest_s3_block",
|
||||
sa.Column("block_number", sa.Numeric, nullable=False),
|
||||
sa.Column("updated_at", sa.TIMESTAMP, server_default=sa.func.now()),
|
||||
)
|
||||
|
||||
|
||||
def downgrade():
|
||||
op.drop_table("latest_s3_block")
|
31
cli.py
31
cli.py
@ -8,10 +8,12 @@ import click
|
||||
from mev_inspect.concurrency import coro
|
||||
from mev_inspect.crud.prices import write_prices
|
||||
from mev_inspect.db import get_inspect_session, get_trace_session
|
||||
from mev_inspect.export import s3_export, s3_export_many
|
||||
from mev_inspect.inspector import MEVInspector
|
||||
from mev_inspect.prices import fetch_prices, fetch_prices_range
|
||||
|
||||
RPC_URL_ENV = "RPC_URL"
|
||||
S3_URI_ENV = "S3_URI"
|
||||
|
||||
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -131,9 +133,38 @@ def fetch_range(after: datetime, before: datetime):
|
||||
write_prices(inspect_db_session, prices)
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.argument("block_number", type=int)
|
||||
@click.option("--uri", default=lambda: os.environ.get(S3_URI_ENV, ""))
|
||||
def s3_export_command(block_number: int, uri: str):
|
||||
inspect_db_session = get_inspect_session()
|
||||
|
||||
logger.info(f"Exporting block {block_number}")
|
||||
s3_export(inspect_db_session, block_number, uri)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.argument("after_block", type=int)
|
||||
@click.argument("before_block", type=int)
|
||||
@click.option("--uri", default=lambda: os.environ.get(S3_URI_ENV, ""))
|
||||
def s3_export_many_command(after_block: int, before_block: int, uri: str):
|
||||
inspect_db_session = get_inspect_session()
|
||||
|
||||
logger.info(f"Exporting blocks {after_block} to {before_block}")
|
||||
s3_export_many(inspect_db_session, after_block, before_block, uri)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def get_rpc_url() -> str:
|
||||
return os.environ["RPC_URL"]
|
||||
|
||||
|
||||
def get_s3_urii() -> str:
|
||||
return os.environ["S3_URI"]
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
cli()
|
||||
|
@ -91,6 +91,11 @@ spec:
|
||||
name: mev-inspect-listener-healthcheck
|
||||
key: url
|
||||
optional: true
|
||||
- name: S3_URI
|
||||
valueFrom:
|
||||
configMapKeyRef:
|
||||
name: mev-inspect-s3
|
||||
key: uri
|
||||
{{- with .Values.nodeSelector }}
|
||||
nodeSelector:
|
||||
{{- toYaml . | nindent 8 }}
|
||||
|
26
mev
26
mev
@ -94,12 +94,36 @@ case "$1" in
|
||||
exit 1
|
||||
esac
|
||||
;;
|
||||
export)
|
||||
shift
|
||||
case "$1" in
|
||||
s3-export)
|
||||
block_number=$2
|
||||
uri=$3
|
||||
echo "Exporting block"
|
||||
kubectl exec -ti deploy/mev-inspect -- \
|
||||
poetry run s3-export $block_number $uri
|
||||
|
||||
;;
|
||||
s3-export-many)
|
||||
after_block=$2
|
||||
before_block=$3
|
||||
base_uri=$4
|
||||
echo "Exporting blocks"
|
||||
kubectl exec -ti deploy/mev-inspect -- \
|
||||
poetry run s3-export-many $after_block $before_block $base_uri
|
||||
;;
|
||||
*)
|
||||
echo "export usage: "$1" {s3-export}"
|
||||
exit 1
|
||||
esac
|
||||
;;
|
||||
exec)
|
||||
shift
|
||||
kubectl exec -ti deploy/mev-inspect -- $@
|
||||
;;
|
||||
*)
|
||||
echo "Usage: "$1" {db|backfill|inspect|test}"
|
||||
echo "Usage: "$1" {db|redis|listener|backfill|inspect|inspect-many|test|fetch|prices|export}"
|
||||
exit 1
|
||||
esac
|
||||
|
||||
|
25
mev_inspect/crud/latest_s3_block.py
Normal file
25
mev_inspect/crud/latest_s3_block.py
Normal file
@ -0,0 +1,25 @@
|
||||
from typing import Optional
|
||||
|
||||
|
||||
def find_latest_s3_block(db_session) -> Optional[int]:
|
||||
result = db_session.execute(
|
||||
"SELECT block_number FROM latest_s3_block LIMIT 1"
|
||||
).one_or_none()
|
||||
if result is None:
|
||||
return None
|
||||
else:
|
||||
return int(result[0])
|
||||
|
||||
|
||||
def update_latest_s3_block(db_session, block_number) -> None:
|
||||
db_session.execute(
|
||||
"""
|
||||
UPDATE latest_s3_block
|
||||
SET block_number = :block_number, updated_at = current_timestamp;
|
||||
INSERT INTO latest_s3_block
|
||||
(block_number, updated_at)
|
||||
SELECT :block_number, current_timestamp
|
||||
WHERE NOT EXISTS (SELECT 1 FROM latest_s3_blocks);
|
||||
""",
|
||||
params={"block_number": block_number},
|
||||
)
|
64
mev_inspect/export.py
Normal file
64
mev_inspect/export.py
Normal file
@ -0,0 +1,64 @@
|
||||
from mev_inspect.crud.latest_s3_block import (
|
||||
find_latest_s3_block,
|
||||
update_latest_s3_block,
|
||||
)
|
||||
|
||||
|
||||
def s3_export(
|
||||
db_session,
|
||||
block_number: int,
|
||||
uri: str,
|
||||
) -> None:
|
||||
"""Export block to S3"""
|
||||
|
||||
latest_s3_block = find_latest_s3_block(db_session)
|
||||
|
||||
if latest_s3_block is not None:
|
||||
|
||||
if block_number > latest_s3_block:
|
||||
|
||||
db_session.execute(
|
||||
"""
|
||||
SELECT * FROM aws_s3.query_export_to_s3(
|
||||
'SELECT *
|
||||
FROM mev_summary
|
||||
WHERE block_number={block_number}',
|
||||
:{uri}
|
||||
""",
|
||||
params={"block_number": block_number, "uri": uri},
|
||||
)
|
||||
update_latest_s3_block(db_session, block_number)
|
||||
|
||||
|
||||
def s3_export_many(
|
||||
db_session,
|
||||
after_block: int,
|
||||
before_block: int,
|
||||
uri: str,
|
||||
) -> None:
|
||||
"""Export block range to S3"""
|
||||
|
||||
latest_s3_block = find_latest_s3_block(db_session)
|
||||
|
||||
for block_number in range(after_block, before_block):
|
||||
|
||||
if latest_s3_block is not None:
|
||||
|
||||
if block_number > latest_s3_block:
|
||||
|
||||
uri += f"/{block_number}"
|
||||
db_session.execute(
|
||||
"""
|
||||
SELECT * FROM aws_s3.query_export_to_s3(
|
||||
'SELECT *
|
||||
FROM mev_summary
|
||||
WHERE block_number={block_number}
|
||||
:{uri}
|
||||
""",
|
||||
params={
|
||||
"after_block": after_block,
|
||||
"before_block": before_block,
|
||||
"uri": uri,
|
||||
},
|
||||
)
|
||||
update_latest_s3_block(db_session, block_number)
|
@ -40,6 +40,8 @@ enqueue-many-blocks = 'cli:enqueue_many_blocks_command'
|
||||
fetch-block = 'cli:fetch_block_command'
|
||||
fetch-all-prices = 'cli:fetch_all_prices'
|
||||
fetch-range = 'cli:fetch_range'
|
||||
s3-export = 'cli:s3_export_command'
|
||||
s3-export-many = 'cli:s3_export_many_command'
|
||||
|
||||
[tool.black]
|
||||
exclude = '''
|
||||
|
Loading…
x
Reference in New Issue
Block a user