Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,14 @@ htmlcov/

# Cache files
cache-id.txt
cache-id-daily.txt
nonces.txt
tks-trigger-cache.json
selector-cache.txt
*.cache
.uv/

# SQLite state (alerts + migrated monitor state); resolved under CACHE_DIR
monitoring.db
monitoring.db-wal
monitoring.db-shm
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ uv run protocols/aave/main.py

In production the scripts run on a schedule via supercronic on a VPS, defined in [`automation/jobs.yaml`](./automation/jobs.yaml). See [`deploy/`](./deploy/) — [`install.sh`](./deploy/install.sh) provisions a host and [`runbook.md`](./deploy/runbook.md) covers operations.

The optional read-only alerts API exposes persisted alert history from SQLite. See [`deploy/alerts-api.md`](./deploy/alerts-api.md) for endpoint examples and pagination.

## Code Style

Format and lint code with ruff:
Expand Down
1 change: 1 addition & 0 deletions api/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Read-only monitoring alerts API."""
4 changes: 4 additions & 0 deletions api/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from api.server import main

if __name__ == "__main__":
main()
219 changes: 219 additions & 0 deletions api/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
from __future__ import annotations

import json
import os
from dataclasses import dataclass
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from typing import Any
from urllib.parse import parse_qs, urlparse

from automation.config import JobsConfig, load_jobs_config
from utils.logging import get_logger
from utils.store import AlertEvent, get_alert, normalize_timestamp, query_alerts

logger = get_logger("api.server")

ALLOWED_SEVERITIES = {"LOW", "MEDIUM", "HIGH", "CRITICAL"}
MAX_LIMIT = 500


class BadRequest(ValueError):
pass


@dataclass(frozen=True)
class AlertQuery:
protocol: str | None = None
severity: str | None = None
source: str | None = None
from_ts: str | None = None
to_ts: str | None = None
cursor: int | None = None
limit: int = 100


def _one(params: dict[str, list[str]], key: str) -> str | None:
values = params.get(key)
if not values:
return None
return values[-1]


def parse_timestamp(value: str, name: str) -> str:
try:
return normalize_timestamp(value)
except ValueError as exc:
message = f"{name} timestamp must include timezone" if "timezone" in str(exc) else f"invalid {name} timestamp"
raise BadRequest(message) from exc


def parse_alert_query(query: str) -> AlertQuery:
params = parse_qs(query, keep_blank_values=True)
severity = _one(params, "severity")
if severity is not None and severity not in ALLOWED_SEVERITIES:
raise BadRequest("invalid severity")

limit = 100
limit_raw = _one(params, "limit")
if limit_raw:
try:
limit = int(limit_raw)
except ValueError as exc:
raise BadRequest("invalid limit") from exc
if limit < 1:
raise BadRequest("invalid limit")
limit = min(limit, MAX_LIMIT)

cursor = None
cursor_raw = _one(params, "cursor")
if cursor_raw:
try:
cursor = int(cursor_raw)
except ValueError as exc:
raise BadRequest("invalid cursor") from exc
if cursor < 1:
raise BadRequest("invalid cursor")

from_raw = _one(params, "from") or _one(params, "since")
to_raw = _one(params, "to")
from_ts = parse_timestamp(from_raw, "from") if from_raw else None
to_ts = parse_timestamp(to_raw, "to") if to_raw else None
if from_ts and to_ts and to_ts <= from_ts:
raise BadRequest("to must be after from")

return AlertQuery(
protocol=_one(params, "protocol"),
severity=severity,
source=_one(params, "source"),
from_ts=from_ts,
to_ts=to_ts,
cursor=cursor,
limit=limit,
)


def alert_to_json(row: AlertEvent) -> dict[str, object]:
return {
"id": row["id"],
"created_at": row["created_at"],
"source": row["source"],
"protocol": row["protocol"],
"channel": row["channel"],
"severity": row["severity"],
"message": row["message"],
"plain_text": row["plain_text"],
"silent": row["silent"],
"delivery_status": row["delivery_status"],
"delivered_at": row["delivered_at"],
"delivery_error": row["delivery_error"],
"metadata": row["metadata"],
}


def _protocol_from_script(script: str) -> str | None:
parts = script.split("/")
if len(parts) < 2 or parts[0] != "protocols" or not parts[1]:
return None
return parts[1]


def protocols_to_json(config: JobsConfig) -> dict[str, object]:
protocols: dict[str, list[dict[str, object]]] = {}
for profile in config.enabled_profiles:
for task in profile.enabled_tasks:
protocol = _protocol_from_script(task.script)
if protocol is not None:
protocols.setdefault(protocol, []).append(
{
"name": task.name,
"script": task.script,
"args": task.args,
"profile": profile.name,
"cron": profile.cron,
}
)
data = [{"name": protocol, "tasks": tasks} for protocol, tasks in sorted(protocols.items())]
return {"data": data, "count": len(data)}


def write_json(handler: BaseHTTPRequestHandler, status: int, payload: dict[str, object]) -> None:
body = json.dumps(payload, separators=(",", ":")).encode()
handler.send_response(status)
handler.send_header("Content-Type", "application/json")
handler.send_header("Cache-Control", "no-store")
handler.send_header("Content-Length", str(len(body)))
handler.end_headers()
handler.wfile.write(body)


def write_error(handler: BaseHTTPRequestHandler, status: int, code: str, message: str) -> None:
write_json(handler, status, {"error": code, "message": message})


class AlertsHandler(BaseHTTPRequestHandler):
def do_GET(self) -> None:
parsed = urlparse(self.path)
try:
if parsed.path == "/healthz":
write_json(self, 200, {"status": "ok"})
return
if parsed.path == "/v1/protocols":
write_json(self, 200, protocols_to_json(load_jobs_config()))
return
if parsed.path == "/v1/alerts":
alert_query = parse_alert_query(parsed.query)
rows = query_alerts(
protocol=alert_query.protocol,
severity=alert_query.severity,
source=alert_query.source,
from_ts=alert_query.from_ts,
to_ts=alert_query.to_ts,
cursor=alert_query.cursor,
limit=alert_query.limit,
)
data = [alert_to_json(row) for row in rows]
# Rows are ordered id DESC, so the last row carries the smallest id.
next_cursor = str(rows[-1]["id"]) if len(rows) == alert_query.limit else None
write_json(self, 200, {"data": data, "next_cursor": next_cursor, "limit": alert_query.limit})
return
if parsed.path.startswith("/v1/alerts/"):
alert_id_raw = parsed.path.removeprefix("/v1/alerts/")
try:
alert_id = int(alert_id_raw)
except ValueError:
write_error(self, 404, "not_found", "unknown path")
return
row = get_alert(alert_id)
if row is None:
write_error(self, 404, "not_found", "alert not found")
return
write_json(self, 200, alert_to_json(row))
return
write_error(self, 404, "not_found", "unknown path")
except BadRequest as exc:
write_error(self, 400, "bad_request", str(exc))
except Exception:
logger.exception("API request failed for path %s", parsed.path)
write_error(self, 500, "server_error", "unexpected server error")

def do_POST(self) -> None:
write_error(self, 405, "method_not_allowed", "only GET is supported")

do_PUT = do_POST
do_PATCH = do_POST
do_DELETE = do_POST

def log_message(self, format: str, *args: Any) -> None:
logger.info("%s - %s", self.address_string(), format % args)


def run(host: str, port: int) -> None:
server = ThreadingHTTPServer((host, port), AlertsHandler)
logger.info("monitoring API listening on %s:%d", host, port)
server.serve_forever()


def main() -> None:
host = os.getenv("MONITORING_API_HOST", "127.0.0.1")
port = int(os.getenv("MONITORING_API_PORT", "8923"))
run(host, port)
1 change: 1 addition & 0 deletions automation/jobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ profiles:
- { name: "compound", script: protocols/compound/main.py }
- { name: "yearn-check-endorsed", script: protocols/yearn/check_endorsed.py }
- { name: "yearn-check-timelock-delay", script: protocols/yearn/check_timelock_delay.py }
- { name: "prune-alerts", script: utils/prune_alerts.py }

multisig:
cron: "0/10 * * * *"
Expand Down
9 changes: 8 additions & 1 deletion automation/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,13 @@ def _run_task(task: Task, *, profile: Profile, repo_root: Path, dry_run: bool) -
def _send_failure_digest(result: ProfileResult) -> None:
message = result.telegram_summary()
try:
send_telegram_message(message, protocol=TELEGRAM_PROTOCOL, plain_text=False)
send_telegram_message(
message,
protocol=TELEGRAM_PROTOCOL,
plain_text=False,
source="automation_digest",
origin_protocol=TELEGRAM_PROTOCOL,
channel=TELEGRAM_PROTOCOL,
)
except TelegramError as exc:
logger.error("failed to send automation digest for %s: %s", result.profile, exc)
96 changes: 96 additions & 0 deletions deploy/Caddyfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# Caddy reverse proxy for the monitoring alerts API on a Hetzner VPS.
#
# Routes:
# GET /healthz -> public (uptime checks); returns {"status":"ok"}
# everything else -> requires Authorization: Bearer <one of ALERTS_API_TOKENS>
#
# Caddy provisions and renews TLS automatically once DNS for $ALERTS_API_DOMAIN
# points at this box and ports 80 + 443 are reachable.
#
# Install (Debian/Ubuntu, official Caddy apt repo — https://caddyserver.com/docs/install):
# sudo cp deploy/Caddyfile /etc/caddy/Caddyfile
# sudo systemctl edit caddy # add the [Service] Environment= block below
# sudo systemctl restart caddy
#
# Secrets are read from the environment (never hard-code them here). Add via
# `sudo systemctl edit caddy`:
#
# [Service]
# Environment=ALERTS_API_DOMAIN=alerts.example.com
# Environment=ALERTS_API_TOKENS=<token1>|<token2> (pipe-separated; one per consumer)
# Environment=ACME_EMAIL=you@example.com
#
# Generate each token with `openssl rand -hex 32` (hex => safe inside the regex
# below, nothing to escape) and join multiple with `|`. Clients then call the
# API with: -H "Authorization: Bearer <their-token>"
#
# Rate limiting (the commented `rate_limit` blocks) needs the caddy-ratelimit
# plugin, which is NOT in the stock apt build. Either build it once with
# xcaddy build --with github.com/mholt/caddy-ratelimit
# and uncomment the blocks + the `order` line, or leave them off and rely on the
# Hetzner Cloud Firewall + fail2ban (Hetzner already absorbs L3/L4 volumetric
# DDoS for free at its network edge).

{
email {$ACME_EMAIL}
# Uncomment once the caddy-ratelimit plugin is installed:
# order rate_limit before reverse_proxy
}

{$ALERTS_API_DOMAIN} {
encode gzip

# GET-only API; reject anything trying to push a body.
request_body {
max_size 4KB
}

# --- public health check (no auth) ---
handle /healthz {
# rate_limit {
# zone health {
# key {remote_host}
# events 30
# window 1m
# }
# }
reverse_proxy 127.0.0.1:8923 {
# The stdlib server has no timeouts of its own and uses one
# thread per connection — these timeouts shield it from slowloris.
transport http {
dial_timeout 2s
read_timeout 5s
write_timeout 5s
}
}
}

# --- token-gated API (everything else) ---
handle {
# Accept any token in the pipe-separated ALERTS_API_TOKENS list.
@unauthorized {
not header_regexp Authorization "^Bearer ({$ALERTS_API_TOKENS})$"
}
error @unauthorized "unauthorized" 401

# rate_limit {
# zone api {
# key {remote_host}
# events 60
# window 1m
# }
# }
reverse_proxy 127.0.0.1:8923 {
transport http {
dial_timeout 2s
read_timeout 5s
write_timeout 5s
}
}
}

log {
output file /var/log/caddy/alerts.log
format json
}
}
Loading