Compare commits

...

10 Commits
main ... s3

Author SHA1 Message Date
Gui Heise
5b9521e822 Remove resource 2022-02-04 15:52:00 -05:00
Gui Heise
088abe6591 Tiltfile 2022-02-04 14:14:05 -05:00
Gui Heise
b93da2519e Kubernetes integration 2022-02-04 13:49:02 -05:00
Luke Van Seters
f3eee00ecf Add localstack 2022-02-03 15:38:53 -05:00
Gui Heise
aac34cfcbd Fix black 2022-02-01 14:51:56 -05:00
Gui Heise
f1370f7bdc Formatting 2022-02-01 14:46:54 -05:00
Gui Heise
8496c5ab45 Add uri as parameter 2022-02-01 14:35:10 -05:00
Gui Heise
e01de19e73 Syntax 2022-02-01 12:17:54 -05:00
Gui Heise
f6d5ca1179 Add commands and functions 2022-01-31 20:42:24 -05:00
Gui Heise
c8f8fb2aa3 Add table and crud 2022-01-31 18:48:17 -05:00
8 changed files with 188 additions and 1 deletions

View File

@ -2,6 +2,11 @@ load("ext://helm_remote", "helm_remote")
load("ext://secret", "secret_from_dict")
load("ext://configmap", "configmap_from_dict")
helm_remote("localstack",
repo_name="localstack-charts",
repo_url="https://localstack.github.io/helm-charts",
)
helm_remote("postgresql",
repo_name="bitnami",
repo_url="https://charts.bitnami.com/bitnami",
@ -67,3 +72,7 @@ local_resource(
serve_cmd='kubectl port-forward --namespace default svc/postgresql 5432:5432',
resource_deps=["postgresql-postgresql"]
)
k8s_yaml(configmap_from_dict("mev-inspect-s3", inputs = {
"uri" : "https://s3.us-east-1.amazonaws.com/mybucket/"
}))

View File

@ -0,0 +1,27 @@
"""Create latest_s3_block table
Revision ID: ce116d0badc8
Revises: 5c5375de15fd
Create Date: 2022-01-31 23:36:34.971594
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "ce116d0badc8"
down_revision = "5c5375de15fd"
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
"latest_s3_block",
sa.Column("block_number", sa.Numeric, nullable=False),
sa.Column("updated_at", sa.TIMESTAMP, server_default=sa.func.now()),
)
def downgrade():
op.drop_table("latest_s3_block")

31
cli.py
View File

@ -8,10 +8,12 @@ import click
from mev_inspect.concurrency import coro
from mev_inspect.crud.prices import write_prices
from mev_inspect.db import get_inspect_session, get_trace_session
from mev_inspect.export import s3_export, s3_export_many
from mev_inspect.inspector import MEVInspector
from mev_inspect.prices import fetch_prices, fetch_prices_range
RPC_URL_ENV = "RPC_URL"
S3_URI_ENV = "S3_URI"
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logger = logging.getLogger(__name__)
@ -131,9 +133,38 @@ def fetch_range(after: datetime, before: datetime):
write_prices(inspect_db_session, prices)
@cli.command()
@click.argument("block_number", type=int)
@click.option("--uri", default=lambda: os.environ.get(S3_URI_ENV, ""))
def s3_export_command(block_number: int, uri: str):
inspect_db_session = get_inspect_session()
logger.info(f"Exporting block {block_number}")
s3_export(inspect_db_session, block_number, uri)
return None
@cli.command()
@click.argument("after_block", type=int)
@click.argument("before_block", type=int)
@click.option("--uri", default=lambda: os.environ.get(S3_URI_ENV, ""))
def s3_export_many_command(after_block: int, before_block: int, uri: str):
inspect_db_session = get_inspect_session()
logger.info(f"Exporting blocks {after_block} to {before_block}")
s3_export_many(inspect_db_session, after_block, before_block, uri)
return None
def get_rpc_url() -> str:
return os.environ["RPC_URL"]
def get_s3_urii() -> str:
return os.environ["S3_URI"]
if __name__ == "__main__":
cli()

View File

@ -91,6 +91,11 @@ spec:
name: mev-inspect-listener-healthcheck
key: url
optional: true
- name: S3_URI
valueFrom:
configMapKeyRef:
name: mev-inspect-s3
key: uri
{{- with .Values.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}

26
mev
View File

@ -94,12 +94,36 @@ case "$1" in
exit 1
esac
;;
export)
shift
case "$1" in
s3-export)
block_number=$2
uri=$3
echo "Exporting block"
kubectl exec -ti deploy/mev-inspect -- \
poetry run s3-export $block_number $uri
;;
s3-export-many)
after_block=$2
before_block=$3
base_uri=$4
echo "Exporting blocks"
kubectl exec -ti deploy/mev-inspect -- \
poetry run s3-export-many $after_block $before_block $base_uri
;;
*)
echo "export usage: "$1" {s3-export}"
exit 1
esac
;;
exec)
shift
kubectl exec -ti deploy/mev-inspect -- $@
;;
*)
echo "Usage: "$1" {db|backfill|inspect|test}"
echo "Usage: "$1" {db|redis|listener|backfill|inspect|inspect-many|test|fetch|prices|export}"
exit 1
esac

View File

@ -0,0 +1,25 @@
from typing import Optional
def find_latest_s3_block(db_session) -> Optional[int]:
result = db_session.execute(
"SELECT block_number FROM latest_s3_block LIMIT 1"
).one_or_none()
if result is None:
return None
else:
return int(result[0])
def update_latest_s3_block(db_session, block_number) -> None:
db_session.execute(
"""
UPDATE latest_s3_block
SET block_number = :block_number, updated_at = current_timestamp;
INSERT INTO latest_s3_block
(block_number, updated_at)
SELECT :block_number, current_timestamp
WHERE NOT EXISTS (SELECT 1 FROM latest_s3_blocks);
""",
params={"block_number": block_number},
)

64
mev_inspect/export.py Normal file
View File

@ -0,0 +1,64 @@
from mev_inspect.crud.latest_s3_block import (
find_latest_s3_block,
update_latest_s3_block,
)
def s3_export(
db_session,
block_number: int,
uri: str,
) -> None:
"""Export block to S3"""
latest_s3_block = find_latest_s3_block(db_session)
if latest_s3_block is not None:
if block_number > latest_s3_block:
db_session.execute(
"""
SELECT * FROM aws_s3.query_export_to_s3(
'SELECT *
FROM mev_summary
WHERE block_number={block_number}',
:{uri}
""",
params={"block_number": block_number, "uri": uri},
)
update_latest_s3_block(db_session, block_number)
def s3_export_many(
db_session,
after_block: int,
before_block: int,
uri: str,
) -> None:
"""Export block range to S3"""
latest_s3_block = find_latest_s3_block(db_session)
for block_number in range(after_block, before_block):
if latest_s3_block is not None:
if block_number > latest_s3_block:
uri += f"/{block_number}"
db_session.execute(
"""
SELECT * FROM aws_s3.query_export_to_s3(
'SELECT *
FROM mev_summary
WHERE block_number={block_number}
:{uri}
""",
params={
"after_block": after_block,
"before_block": before_block,
"uri": uri,
},
)
update_latest_s3_block(db_session, block_number)

View File

@ -40,6 +40,8 @@ enqueue-many-blocks = 'cli:enqueue_many_blocks_command'
fetch-block = 'cli:fetch_block_command'
fetch-all-prices = 'cli:fetch_all_prices'
fetch-range = 'cli:fetch_range'
s3-export = 'cli:s3_export_command'
s3-export-many = 'cli:s3_export_many_command'
[tool.black]
exclude = '''