diff --git a/.gitignore b/.gitignore index 7da9a8f6..1cc01b6d 100644 --- a/.gitignore +++ b/.gitignore @@ -43,7 +43,14 @@ htmlcov/ # Cache files cache-id.txt +cache-id-daily.txt +nonces.txt tks-trigger-cache.json selector-cache.txt *.cache .uv/ + +# SQLite state (alerts + migrated monitor state); resolved under CACHE_DIR +monitoring.db +monitoring.db-wal +monitoring.db-shm diff --git a/README.md b/README.md index c7bedebd..a84ab183 100644 --- a/README.md +++ b/README.md @@ -82,6 +82,8 @@ uv run protocols/aave/main.py In production the scripts run on a schedule via supercronic on a VPS, defined in [`automation/jobs.yaml`](./automation/jobs.yaml). See [`deploy/`](./deploy/) — [`install.sh`](./deploy/install.sh) provisions a host and [`runbook.md`](./deploy/runbook.md) covers operations. +The optional read-only alerts API exposes persisted alert history from SQLite. See [`deploy/alerts-api.md`](./deploy/alerts-api.md) for endpoint examples and pagination. + ## Code Style Format and lint code with ruff: diff --git a/api/__init__.py b/api/__init__.py new file mode 100644 index 00000000..0c241c9e --- /dev/null +++ b/api/__init__.py @@ -0,0 +1 @@ +"""Read-only monitoring alerts API.""" diff --git a/api/__main__.py b/api/__main__.py new file mode 100644 index 00000000..4dc1f8c4 --- /dev/null +++ b/api/__main__.py @@ -0,0 +1,4 @@ +from api.server import main + +if __name__ == "__main__": + main() diff --git a/api/server.py b/api/server.py new file mode 100644 index 00000000..b914f544 --- /dev/null +++ b/api/server.py @@ -0,0 +1,219 @@ +from __future__ import annotations + +import json +import os +from dataclasses import dataclass +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +from typing import Any +from urllib.parse import parse_qs, urlparse + +from automation.config import JobsConfig, load_jobs_config +from utils.logging import get_logger +from utils.store import AlertEvent, get_alert, normalize_timestamp, query_alerts + +logger = get_logger("api.server") + +ALLOWED_SEVERITIES = {"LOW", "MEDIUM", "HIGH", "CRITICAL"} +MAX_LIMIT = 500 + + +class BadRequest(ValueError): + pass + + +@dataclass(frozen=True) +class AlertQuery: + protocol: str | None = None + severity: str | None = None + source: str | None = None + from_ts: str | None = None + to_ts: str | None = None + cursor: int | None = None + limit: int = 100 + + +def _one(params: dict[str, list[str]], key: str) -> str | None: + values = params.get(key) + if not values: + return None + return values[-1] + + +def parse_timestamp(value: str, name: str) -> str: + try: + return normalize_timestamp(value) + except ValueError as exc: + message = f"{name} timestamp must include timezone" if "timezone" in str(exc) else f"invalid {name} timestamp" + raise BadRequest(message) from exc + + +def parse_alert_query(query: str) -> AlertQuery: + params = parse_qs(query, keep_blank_values=True) + severity = _one(params, "severity") + if severity is not None and severity not in ALLOWED_SEVERITIES: + raise BadRequest("invalid severity") + + limit = 100 + limit_raw = _one(params, "limit") + if limit_raw: + try: + limit = int(limit_raw) + except ValueError as exc: + raise BadRequest("invalid limit") from exc + if limit < 1: + raise BadRequest("invalid limit") + limit = min(limit, MAX_LIMIT) + + cursor = None + cursor_raw = _one(params, "cursor") + if cursor_raw: + try: + cursor = int(cursor_raw) + except ValueError as exc: + raise BadRequest("invalid cursor") from exc + if cursor < 1: + raise BadRequest("invalid cursor") + + from_raw = _one(params, "from") or _one(params, "since") + to_raw = _one(params, "to") + from_ts = parse_timestamp(from_raw, "from") if from_raw else None + to_ts = parse_timestamp(to_raw, "to") if to_raw else None + if from_ts and to_ts and to_ts <= from_ts: + raise BadRequest("to must be after from") + + return AlertQuery( + protocol=_one(params, "protocol"), + severity=severity, + source=_one(params, "source"), + from_ts=from_ts, + to_ts=to_ts, + cursor=cursor, + limit=limit, + ) + + +def alert_to_json(row: AlertEvent) -> dict[str, object]: + return { + "id": row["id"], + "created_at": row["created_at"], + "source": row["source"], + "protocol": row["protocol"], + "channel": row["channel"], + "severity": row["severity"], + "message": row["message"], + "plain_text": row["plain_text"], + "silent": row["silent"], + "delivery_status": row["delivery_status"], + "delivered_at": row["delivered_at"], + "delivery_error": row["delivery_error"], + "metadata": row["metadata"], + } + + +def _protocol_from_script(script: str) -> str | None: + parts = script.split("/") + if len(parts) < 2 or parts[0] != "protocols" or not parts[1]: + return None + return parts[1] + + +def protocols_to_json(config: JobsConfig) -> dict[str, object]: + protocols: dict[str, list[dict[str, object]]] = {} + for profile in config.enabled_profiles: + for task in profile.enabled_tasks: + protocol = _protocol_from_script(task.script) + if protocol is not None: + protocols.setdefault(protocol, []).append( + { + "name": task.name, + "script": task.script, + "args": task.args, + "profile": profile.name, + "cron": profile.cron, + } + ) + data = [{"name": protocol, "tasks": tasks} for protocol, tasks in sorted(protocols.items())] + return {"data": data, "count": len(data)} + + +def write_json(handler: BaseHTTPRequestHandler, status: int, payload: dict[str, object]) -> None: + body = json.dumps(payload, separators=(",", ":")).encode() + handler.send_response(status) + handler.send_header("Content-Type", "application/json") + handler.send_header("Cache-Control", "no-store") + handler.send_header("Content-Length", str(len(body))) + handler.end_headers() + handler.wfile.write(body) + + +def write_error(handler: BaseHTTPRequestHandler, status: int, code: str, message: str) -> None: + write_json(handler, status, {"error": code, "message": message}) + + +class AlertsHandler(BaseHTTPRequestHandler): + def do_GET(self) -> None: + parsed = urlparse(self.path) + try: + if parsed.path == "/healthz": + write_json(self, 200, {"status": "ok"}) + return + if parsed.path == "/v1/protocols": + write_json(self, 200, protocols_to_json(load_jobs_config())) + return + if parsed.path == "/v1/alerts": + alert_query = parse_alert_query(parsed.query) + rows = query_alerts( + protocol=alert_query.protocol, + severity=alert_query.severity, + source=alert_query.source, + from_ts=alert_query.from_ts, + to_ts=alert_query.to_ts, + cursor=alert_query.cursor, + limit=alert_query.limit, + ) + data = [alert_to_json(row) for row in rows] + # Rows are ordered id DESC, so the last row carries the smallest id. + next_cursor = str(rows[-1]["id"]) if len(rows) == alert_query.limit else None + write_json(self, 200, {"data": data, "next_cursor": next_cursor, "limit": alert_query.limit}) + return + if parsed.path.startswith("/v1/alerts/"): + alert_id_raw = parsed.path.removeprefix("/v1/alerts/") + try: + alert_id = int(alert_id_raw) + except ValueError: + write_error(self, 404, "not_found", "unknown path") + return + row = get_alert(alert_id) + if row is None: + write_error(self, 404, "not_found", "alert not found") + return + write_json(self, 200, alert_to_json(row)) + return + write_error(self, 404, "not_found", "unknown path") + except BadRequest as exc: + write_error(self, 400, "bad_request", str(exc)) + except Exception: + logger.exception("API request failed for path %s", parsed.path) + write_error(self, 500, "server_error", "unexpected server error") + + def do_POST(self) -> None: + write_error(self, 405, "method_not_allowed", "only GET is supported") + + do_PUT = do_POST + do_PATCH = do_POST + do_DELETE = do_POST + + def log_message(self, format: str, *args: Any) -> None: + logger.info("%s - %s", self.address_string(), format % args) + + +def run(host: str, port: int) -> None: + server = ThreadingHTTPServer((host, port), AlertsHandler) + logger.info("monitoring API listening on %s:%d", host, port) + server.serve_forever() + + +def main() -> None: + host = os.getenv("MONITORING_API_HOST", "127.0.0.1") + port = int(os.getenv("MONITORING_API_PORT", "8923")) + run(host, port) diff --git a/automation/jobs.yaml b/automation/jobs.yaml index e9fa9d5c..c30c39ef 100644 --- a/automation/jobs.yaml +++ b/automation/jobs.yaml @@ -68,6 +68,7 @@ profiles: - { name: "compound", script: protocols/compound/main.py } - { name: "yearn-check-endorsed", script: protocols/yearn/check_endorsed.py } - { name: "yearn-check-timelock-delay", script: protocols/yearn/check_timelock_delay.py } + - { name: "prune-alerts", script: utils/prune_alerts.py } multisig: cron: "0/10 * * * *" diff --git a/automation/runner.py b/automation/runner.py index 1bfba9f4..2e88fdcd 100644 --- a/automation/runner.py +++ b/automation/runner.py @@ -237,6 +237,13 @@ def _run_task(task: Task, *, profile: Profile, repo_root: Path, dry_run: bool) - def _send_failure_digest(result: ProfileResult) -> None: message = result.telegram_summary() try: - send_telegram_message(message, protocol=TELEGRAM_PROTOCOL, plain_text=False) + send_telegram_message( + message, + protocol=TELEGRAM_PROTOCOL, + plain_text=False, + source="automation_digest", + origin_protocol=TELEGRAM_PROTOCOL, + channel=TELEGRAM_PROTOCOL, + ) except TelegramError as exc: logger.error("failed to send automation digest for %s: %s", result.profile, exc) diff --git a/deploy/Caddyfile b/deploy/Caddyfile new file mode 100644 index 00000000..8605949c --- /dev/null +++ b/deploy/Caddyfile @@ -0,0 +1,96 @@ +# Caddy reverse proxy for the monitoring alerts API on a Hetzner VPS. +# +# Routes: +# GET /healthz -> public (uptime checks); returns {"status":"ok"} +# everything else -> requires Authorization: Bearer +# +# Caddy provisions and renews TLS automatically once DNS for $ALERTS_API_DOMAIN +# points at this box and ports 80 + 443 are reachable. +# +# Install (Debian/Ubuntu, official Caddy apt repo — https://caddyserver.com/docs/install): +# sudo cp deploy/Caddyfile /etc/caddy/Caddyfile +# sudo systemctl edit caddy # add the [Service] Environment= block below +# sudo systemctl restart caddy +# +# Secrets are read from the environment (never hard-code them here). Add via +# `sudo systemctl edit caddy`: +# +# [Service] +# Environment=ALERTS_API_DOMAIN=alerts.example.com +# Environment=ALERTS_API_TOKENS=| (pipe-separated; one per consumer) +# Environment=ACME_EMAIL=you@example.com +# +# Generate each token with `openssl rand -hex 32` (hex => safe inside the regex +# below, nothing to escape) and join multiple with `|`. Clients then call the +# API with: -H "Authorization: Bearer " +# +# Rate limiting (the commented `rate_limit` blocks) needs the caddy-ratelimit +# plugin, which is NOT in the stock apt build. Either build it once with +# xcaddy build --with github.com/mholt/caddy-ratelimit +# and uncomment the blocks + the `order` line, or leave them off and rely on the +# Hetzner Cloud Firewall + fail2ban (Hetzner already absorbs L3/L4 volumetric +# DDoS for free at its network edge). + +{ + email {$ACME_EMAIL} + # Uncomment once the caddy-ratelimit plugin is installed: + # order rate_limit before reverse_proxy +} + +{$ALERTS_API_DOMAIN} { + encode gzip + + # GET-only API; reject anything trying to push a body. + request_body { + max_size 4KB + } + + # --- public health check (no auth) --- + handle /healthz { + # rate_limit { + # zone health { + # key {remote_host} + # events 30 + # window 1m + # } + # } + reverse_proxy 127.0.0.1:8923 { + # The stdlib server has no timeouts of its own and uses one + # thread per connection — these timeouts shield it from slowloris. + transport http { + dial_timeout 2s + read_timeout 5s + write_timeout 5s + } + } + } + + # --- token-gated API (everything else) --- + handle { + # Accept any token in the pipe-separated ALERTS_API_TOKENS list. + @unauthorized { + not header_regexp Authorization "^Bearer ({$ALERTS_API_TOKENS})$" + } + error @unauthorized "unauthorized" 401 + + # rate_limit { + # zone api { + # key {remote_host} + # events 60 + # window 1m + # } + # } + reverse_proxy 127.0.0.1:8923 { + transport http { + dial_timeout 2s + read_timeout 5s + write_timeout 5s + } + } + } + + log { + output file /var/log/caddy/alerts.log + format json + } +} diff --git a/deploy/alerts-api.md b/deploy/alerts-api.md new file mode 100644 index 00000000..e06ea36b --- /dev/null +++ b/deploy/alerts-api.md @@ -0,0 +1,235 @@ +# Alerts API + +The alerts API exposes persisted monitoring alerts from the local SQLite +database at `$CACHE_DIR/monitoring.db`. It is read-only. + +Production runs it as `monitoring-api.service`, bound to localhost: + +```sh +sudo systemctl enable --now monitoring-api +curl http://127.0.0.1:8923/healthz +``` + +For local testing: + +```sh +CACHE_DIR=/tmp/monitoring-cache uv run python -m api +``` + +## Endpoints + +### `GET /healthz` + +Returns: + +```json +{"status":"ok"} +``` + +### `GET /v1/alerts` + +Returns alert rows ordered newest first. + +Common examples: + +```sh +curl 'http://127.0.0.1:8923/v1/alerts?limit=10' +curl 'http://127.0.0.1:8923/v1/alerts?source=protocol&limit=50' +curl 'http://127.0.0.1:8923/v1/alerts?protocol=aave&severity=HIGH' +curl 'http://127.0.0.1:8923/v1/alerts?from=2026-06-11T00:00:00Z&to=2026-06-12T00:00:00Z' +``` + +Query parameters: + +- `limit`: rows to return, default `100`, max `500`. +- `cursor`: pagination cursor from the previous response. +- `from`: inclusive timestamp with timezone. +- `to`: exclusive timestamp with timezone. +- `since`: alias for `from`. +- `protocol`: exact protocol filter, for example `aave`. +- `severity`: one of `LOW`, `MEDIUM`, `HIGH`, `CRITICAL`. +- `source`: alert source, commonly `protocol`, `ops_error`, `crash`, or `automation_digest`. + +Example response: + +```json +{ + "data": [ + { + "id": 5021, + "created_at": "2026-06-11T10:23:45.123456Z", + "source": "protocol", + "protocol": "aave", + "channel": "aave", + "severity": "LOW", + "message": "message text", + "plain_text": false, + "silent": true, + "delivery_status": "delivered", + "delivered_at": "2026-06-11T10:23:45.456789Z", + "delivery_error": null, + "metadata": {} + } + ], + "next_cursor": "5021", + "limit": 100 +} +``` + +For the next page, pass `cursor=`: + +```sh +curl 'http://127.0.0.1:8923/v1/alerts?cursor=5021&limit=100' +``` + +### `GET /v1/alerts/{id}` + +Returns one alert: + +```sh +curl http://127.0.0.1:8923/v1/alerts/5021 +``` + +Missing alerts return `404`. + +### `GET /v1/protocols` + +Returns enabled protocol objects from `automation/jobs.yaml`, with the tasks +that monitor each protocol. + +```sh +curl http://127.0.0.1:8923/v1/protocols +``` + +Example response: + +```json +{ + "data": [ + { + "name": "aave", + "tasks": [ + { + "name": "aave", + "script": "protocols/aave/main.py", + "args": {}, + "profile": "hourly", + "cron": "5 * * * *" + }, + { + "name": "aave-proposals", + "script": "protocols/aave/proposals.py", + "args": {}, + "profile": "hourly", + "cron": "5 * * * *" + } + ] + } + ], + "count": 1 +} +``` + +## Delivery Status + +An alert row means a monitor generated an alert. Telegram delivery is tracked +separately: + +- `generated`: row inserted before Telegram delivery completed. +- `delivered`: Telegram API call succeeded. +- `failed`: Telegram API call failed. +- `skipped_debug`: `LOG_LEVEL=DEBUG` skipped Telegram delivery. +- `skipped_missing_credentials`: Telegram credentials were missing. +- `not_attempted`: no Telegram attempt was made. + +## Public Access + +Do not expose the Python service directly. Keep it bound to `127.0.0.1` and put +a reverse proxy in front of it. The proxy enforces TLS, authentication, rate +limits, and request/response timeouts — the stdlib server does none of these and +uses one thread per connection, so an unshielded slowloris/connection flood +would wedge it. + +### Caddy (Hetzner VPS) + +[`Caddyfile`](./Caddyfile) is a ready-to-use config: it terminates TLS +(auto-provisioned by Caddy), exposes `GET /healthz` publicly for uptime checks, +and gates everything else behind an `Authorization: Bearer ` header. + +```sh +# 1. DNS: point an A record (e.g. alerts.example.com) at the VPS public IP. + +# 2. Install Caddy (official apt repo) and drop in the config: +sudo cp /srv/monitoring/deploy/Caddyfile /etc/caddy/Caddyfile + +# 3. Provide domain + tokens + ACME email to the caddy service: +sudo systemctl edit caddy # add, then save: +# [Service] +# Environment=ALERTS_API_DOMAIN=alerts.example.com +# Environment=ALERTS_API_TOKENS=| # pipe-separated, one per consumer +# Environment=ACME_EMAIL=you@example.com +sudo systemctl restart caddy + +# 4. Verify: +curl https://alerts.example.com/healthz # public -> {"status":"ok"} +curl https://alerts.example.com/v1/alerts?limit=5 # 401 (no token) +curl -H "Authorization: Bearer " https://alerts.example.com/v1/alerts?limit=5 +``` + +### Generating and storing API tokens + +`/v1/*` is gated by a bearer token. Any value in the pipe-separated +`ALERTS_API_TOKENS` list is accepted, so each consumer gets its own token and +you revoke one by dropping it from the list. + +```sh +openssl rand -hex 32 # generate one token per consumer (run once each) +``` + +Use hex (`openssl rand -hex`) — it contains only `[0-9a-f]`, so it needs no +escaping inside Caddy's matcher regex. ~32 bytes (256 bits) is plenty. + +**Where to store them:** + +- **Source of truth**: your team password manager / secrets vault, one entry per + consumer labelled with who holds it (the proxy can't tell tokens apart, so this + label is how you know which to revoke). Never commit a token to git or write it + into `deploy/Caddyfile`. +- **On the box**: the pipe-joined list lives only in the caddy systemd override + (`sudo systemctl edit caddy` → `Environment=ALERTS_API_TOKENS=...`), which is + root-owned. It is visible via `systemctl show caddy` to root, which is fine for + an admin-only host. For stricter isolation, put it in a `0600 root:root` + `EnvironmentFile=` instead. +- **To each consumer**: hand over only their single token, over a secure channel. + +**Add / rotate / revoke** — edit the list, then restart Caddy: + +```sh +sudo systemctl edit caddy # add or remove a token in ALERTS_API_TOKENS +sudo systemctl daemon-reload +sudo systemctl restart caddy +``` + +A `restart` (not `reload`) is required because the tokens live in Caddy's process +environment, which is only read at start — `reload` just re-reads the Caddyfile +from disk. The restart is sub-second; this is a read API and clients retry, so +the blip is harmless. Removing a token invalidates it immediately; the others +keep working. + +### Firewall (Hetzner) + +Hetzner includes baseline L3/L4 DDoS protection at its network edge for free, so +volumetric floods are largely absorbed before reaching the box. Layer the rest: + +- **Hetzner Cloud Firewall** (console): allow inbound `22` (SSH), `80` + `443` + (Caddy/ACME) only. Never open `8923` — the API must stay localhost-only so the + only path in is through Caddy's auth. +- On-box `ufw` as defense-in-depth: `ufw allow 22,80,443/tcp && ufw enable`. +- **Rate limiting**: enable the commented `rate_limit` blocks in the Caddyfile + (needs the `caddy-ratelimit` plugin — see the file header), and/or add + `fail2ban` on `/var/log/caddy/alerts.log` to ban IPs that repeatedly 401 or + flood. +- Optional: front it with Cloudflare (proxied DNS) for L7 WAF + IP hiding if you + expect hostile traffic; then restrict the firewall to Cloudflare IP ranges and + rate-limit on `{http.request.header.CF-Connecting-IP}` instead of + `{remote_host}`. diff --git a/deploy/install.sh b/deploy/install.sh index 4dd5cdba..56481d4b 100755 --- a/deploy/install.sh +++ b/deploy/install.sh @@ -104,7 +104,7 @@ log "installing apt prerequisites…" export DEBIAN_FRONTEND=noninteractive apt-get update -qq apt-get install -y -qq \ - ca-certificates curl git jq util-linux + ca-certificates curl git jq sqlite3 util-linux # ─── uv (manages Python + venvs) ────────────────────────────────────── if ! command -v uv >/dev/null; then @@ -172,6 +172,10 @@ as_user bash -c "cd '${REPO_DIR}' && uv sync --frozen --extra ai" log "ensuring ${CACHE_DIR} exists (owned by ${TARGET_USER})…" install -m 0755 -o "$TARGET_USER" -g "$TARGET_USER" -d "$CACHE_DIR" +log "initializing ${CACHE_DIR}/monitoring.db and importing legacy text caches…" +as_user env CACHE_DIR="${CACHE_DIR}" LOG_LEVEL=INFO bash -c \ + "cd '${REPO_DIR}' && '${REPO_DIR}/.venv/bin/python' -m utils.migrate_cache_to_db --checkpoint" + # ─── /etc/monitoring scaffolding ───────────────────────────────── log "ensuring ${ETC_DIR} exists with the right perms…" install -m 0750 -o root -g "$TARGET_USER" -d "$ETC_DIR" @@ -195,7 +199,7 @@ else fi # ─── systemd unit ────────────────────────────────────────────────────── -log "installing systemd unit (User=${TARGET_USER})…" +log "installing systemd units (User=${TARGET_USER})…" sed -e "s|__MONITOR_USER__|${TARGET_USER}|g" \ -e "s|__REPO_DIR__|${REPO_DIR}|g" \ -e "s|__ETC_DIR__|${ETC_DIR}|g" \ @@ -203,6 +207,13 @@ sed -e "s|__MONITOR_USER__|${TARGET_USER}|g" \ "${REPO_DIR}/deploy/systemd/monitoring.service" \ > /etc/systemd/system/monitoring.service chmod 0644 /etc/systemd/system/monitoring.service +sed -e "s|__MONITOR_USER__|${TARGET_USER}|g" \ + -e "s|__REPO_DIR__|${REPO_DIR}|g" \ + -e "s|__ETC_DIR__|${ETC_DIR}|g" \ + -e "s|__CACHE_DIR__|${CACHE_DIR}|g" \ + "${REPO_DIR}/deploy/systemd/monitoring-api.service" \ + > /etc/systemd/system/monitoring-api.service +chmod 0644 /etc/systemd/system/monitoring-api.service systemctl daemon-reload cat <&2; exit 1; } + +[[ -d "$REPO_DIR" ]] || die "repo not found: ${REPO_DIR}" +[[ -x "$PYTHON" ]] || die "python not executable: ${PYTHON}" +install -d -m 0755 "$CACHE_DIR" + +log "importing legacy cache files from ${CACHE_DIR} into ${CACHE_DIR}/monitoring.db" +if [[ "${EUID}" -eq 0 && "$TARGET_USER" != "root" ]]; then + chown "$TARGET_USER:$TARGET_USER" "$CACHE_DIR" + sudo -u "$TARGET_USER" -H bash -c ' + repo_dir="$1"; cache_dir="$2"; python="$3"; log_level="$4"; shift 4 + cd "$repo_dir" + env CACHE_DIR="$cache_dir" LOG_LEVEL="$log_level" "$python" -m utils.migrate_cache_to_db --checkpoint "$@" + ' bash "$REPO_DIR" "$CACHE_DIR" "$PYTHON" "${LOG_LEVEL:-INFO}" "$@" +else + cd "$REPO_DIR" + env CACHE_DIR="$CACHE_DIR" LOG_LEVEL="${LOG_LEVEL:-INFO}" "$PYTHON" -m utils.migrate_cache_to_db --checkpoint "$@" +fi +log "done" diff --git a/deploy/runbook.md b/deploy/runbook.md index 1554f6e3..88803f7a 100644 --- a/deploy/runbook.md +++ b/deploy/runbook.md @@ -9,10 +9,12 @@ All command examples assume you are SSH'd into the VPS as the deploy user (the same account the systemd unit runs as), with the repository checked out at `/srv/monitoring`. -No Docker, no compose, no Caddy, no reverse proxy. One systemd unit runs +No Docker or compose. One systemd unit runs [supercronic](https://github.com/aptible/supercronic), which executes `python -m automation run ` on each cron tick per `automation/jobs.yaml`. +An optional `monitoring-api` unit exposes persisted alert history on localhost; +public authentication and rate limiting belong at the reverse proxy. --- @@ -20,6 +22,7 @@ No Docker, no compose, no Caddy, no reverse proxy. One systemd unit runs ```sh systemctl status monitoring +systemctl status monitoring-api ``` See the schedule supercronic is actually running (rendered from `jobs.yaml`): @@ -39,6 +42,7 @@ record of what ran and when. ```sh journalctl -u monitoring -f --since '15 min ago' +journalctl -u monitoring-api -f ``` supercronic prints a structured line per job (`channel=stdout`, `job.position`, @@ -190,6 +194,68 @@ skipped, not queued) — the others keep ticking. --- +## Alerts API + +The alerts API is a separate service. It reads the SQLite alert database under +`/srv/cache` and binds to `127.0.0.1:8923` by default, so stopping it does not +stop scheduled monitoring. See [`alerts-api.md`](./alerts-api.md) for endpoint +examples and response shapes. + +```sh +sudo systemctl enable --now monitoring-api +systemctl status monitoring-api +journalctl -u monitoring-api -f +curl http://127.0.0.1:8923/healthz +curl 'http://127.0.0.1:8923/v1/alerts?limit=10&source=protocol' +``` + +Forward public traffic through a reverse proxy to `127.0.0.1:8923`. The proxy +must require bearer token or basic auth, apply rate limiting, and set request +and response timeouts. Do not expose `/srv/cache` or the SQLite database file +directly. + +--- + +## SQLite state and cache migration + +Alert history and migrated monitor state live in `/srv/cache/monitoring.db`. +SQLite may also create `/srv/cache/monitoring.db-wal` and +`/srv/cache/monitoring.db-shm`; keep all three files local to the VPS and owned +by the deploy user. `deploy/install.sh` installs the `sqlite3` CLI, creates the +database schema, and imports existing text cache files into the `monitor_state` +table. + +For the current live host, freeze writes briefly and run the migration once: + +```sh +cd /srv/monitoring +git pull --ff-only +uv sync --frozen --extra ai +sudo systemctl stop monitoring +sudo REPO_DIR=/srv/monitoring CACHE_DIR=/srv/cache ./deploy/migrate-file-cache-to-db.sh +sudo systemctl daemon-reload +sudo systemctl start monitoring +sudo systemctl enable --now monitoring-api +``` + +The migration imports known cache files from `automation/jobs.yaml`, including +`cache-id.txt`, `cache-id-daily.txt`, and `nonces.txt`. It preserves existing +SQLite values by default, so rerunning it is safe. Use `--overwrite` only when +the legacy text files are known to be the source of truth. + +Inspect migrated state with: + +```sh +sqlite3 /srv/cache/monitoring.db \ + 'select namespace, key, value from monitor_state order by namespace, key limit 20;' +``` + +If rollback is needed, set `CACHE_BACKEND=file` in `/etc/monitoring/.env` and +restart `monitoring`. That makes `utils.cache` use the legacy text files again. +Remove the variable and restart to return to SQLite-backed cache state. + +--- + ## Common failure modes | Symptom | First thing to check | Likely cause | @@ -210,9 +276,11 @@ skipped, not queued) — the others keep ticking. it via `ReadWritePaths` and sets `CACHE_DIR=/srv/cache`, which `utils.cache` resolves every cache file against). A profile only overrides a cache *basename* in `automation/jobs.yaml` when it needs an isolated file (e.g. daily). +- SQLite database: `/srv/cache/monitoring.db` plus WAL/shm sidecars. - Env file: `/etc/monitoring/.env` (mode 0640, root:; operator-supplied, not in git). -- systemd unit: `/etc/systemd/system/monitoring.service`. +- systemd units: `/etc/systemd/system/monitoring.service` and + `/etc/systemd/system/monitoring-api.service`. - Rendered crontab: `/tmp/crontab` (per-service `PrivateTmp`; regenerated on every start). - Code sync: no separate unit — the `multisig` profile's pre-run `git pull diff --git a/deploy/systemd/monitoring-api.service b/deploy/systemd/monitoring-api.service new file mode 100644 index 00000000..f93ce368 --- /dev/null +++ b/deploy/systemd/monitoring-api.service @@ -0,0 +1,59 @@ +# Drop in /etc/systemd/system/monitoring-api.service then: +# +# sudo systemctl daemon-reload +# sudo systemctl enable --now monitoring-api +# sudo systemctl status monitoring-api + +[Unit] +Description=yearn monitoring alerts API +Documentation=https://github.com/yearn/monitoring +After=network-online.target +Wants=network-online.target + +[Service] +Type=exec +User=__MONITOR_USER__ +Group=__MONITOR_USER__ +WorkingDirectory=__REPO_DIR__ + +Environment=REPO_ROOT=__REPO_DIR__ +Environment=PATH=__REPO_DIR__/.venv/bin:/usr/local/bin:/usr/bin:/bin +Environment=LOG_LEVEL=INFO +Environment=PYTHONUNBUFFERED=1 +Environment=CACHE_DIR=__CACHE_DIR__ +Environment=MONITORING_API_HOST=127.0.0.1 +Environment=MONITORING_API_PORT=8923 + +ExecStartPre=+/bin/bash -c '\ + install -m 750 -o root -g __MONITOR_USER__ -d __ETC_DIR__; \ + if [ ! -f __ETC_DIR__/.env ]; then \ + echo "monitoring-api: __ETC_DIR__/.env missing" >&2; \ + exit 1; \ + fi' + +EnvironmentFile=__ETC_DIR__/.env +ExecStart=__REPO_DIR__/.venv/bin/python -m api + +Restart=on-failure +RestartSec=10 +StartLimitIntervalSec=60 +StartLimitBurst=5 + +NoNewPrivileges=yes +PrivateTmp=yes +ProtectSystem=strict +ProtectHome=read-only +ReadWritePaths=__CACHE_DIR__ +ProtectKernelTunables=yes +ProtectKernelModules=yes +ProtectControlGroups=yes +RestrictAddressFamilies=AF_UNIX AF_INET AF_INET6 +RestrictSUIDSGID=yes +LockPersonality=yes + +StandardOutput=journal +StandardError=journal +SyslogIdentifier=monitoring-api + +[Install] +WantedBy=multi-user.target diff --git a/deploy/systemd/monitoring.service b/deploy/systemd/monitoring.service index 4c2f44c1..e97ee8e5 100644 --- a/deploy/systemd/monitoring.service +++ b/deploy/systemd/monitoring.service @@ -10,10 +10,10 @@ # crontab from automation/jobs.yaml, ExecStart hands it to supercronic, which # spawns `python -m automation run ` on each tick. # -# There is no HTTP surface and nothing to expose. A crashed supercronic is -# restarted by systemd (Restart=on-failure); there's no event loop that can -# wedge while the process stays alive, so no /healthz watchdog is needed -# (unlike the liquidity-monitor daemon). +# The optional HTTP alerts API lives in monitoring-api.service. A crashed +# supercronic is restarted by systemd (Restart=on-failure); there's no event +# loop that can wedge while the process stays alive, so no /healthz watchdog is +# needed for this scheduler unit. [Unit] Description=yearn monitoring cron runner (supercronic + automation) diff --git a/protocols/compound/README.md b/protocols/compound/README.md index 8f63095f..3349b10a 100644 --- a/protocols/compound/README.md +++ b/protocols/compound/README.md @@ -8,7 +8,7 @@ Github actions run hourly and send telegram message if there is a market with ut [Internal timelock monitoring](../timelock/README.md) for queueing tx to [Timelock contract on Mainnet](https://etherscan.io/address/0x6d903f6003cca6255D85CcA4D3B5E5146dC33925#code). -This Timelock contract covers **Mainnet and all other chains**. Each protocol contract is controlled by the [Timelock contract](https://etherscan.io/address/0x6d903f6003cca6255D85CcA4D3B5E5146dC33925#code). For more info see the [governance docs](https://docs.compound.finance/governance/). Delay is [2 days](https://etherscan.io/address/0x6d903f6003cca6255D85CcA4D3B5E5146dC33925#readContract). +This Timelock contract covers **Mainnet and all other chains**. Each protocol contract is controlled by the [Timelock contract](https://etherscan.io/address/0x6d903f6003cca6255D85CcA4D3B5E5146dC33925#code). Delay is [2 days](https://etherscan.io/address/0x6d903f6003cca6255D85CcA4D3B5E5146dC33925#readContract). Additionally, Github actions bot runs every hour and fetches queued proposals using Compound API: [proposals.py](./proposals.py) diff --git a/pyproject.toml b/pyproject.toml index 7e2bde95..4a28237f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -99,7 +99,7 @@ build-backend = "setuptools.build_meta" # they don't need a finder entry. [tool.setuptools.packages.find] where = ["."] -include = ["protocols*", "utils*", "automation*"] +include = ["protocols*", "utils*", "automation*", "api*"] namespaces = true [tool.ruff] diff --git a/tests/conftest.py b/tests/conftest.py index 9bdb0a53..fd896cf4 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -17,17 +17,22 @@ def _isolate_from_live_apis(monkeypatch: pytest.MonkeyPatch) -> None: """Block accidental live API/RPC calls and reset cross-test singletons. - Strips `ETHERSCAN_TOKEN` and every `PROVIDER_URL_*` so a missing mock - short-circuits cheaply via the "no token / no provider" code paths that - already exist for production use. Tests that intentionally exercise - those code paths opt back in via @patch.dict. + Strips `ETHERSCAN_TOKEN`, every `PROVIDER_URL_*`, every `TELEGRAM_*` + credential, and `PAT_DISPATCH` so a missing mock short-circuits cheaply via + the "no token / no provider / no credentials" code paths that already exist + for production use. Forces `LOG_LEVEL=INFO` so a developer's `.env` + `LOG_LEVEL=DEBUG` (which skips Telegram sends) can't change tested behavior. + Tests that intentionally exercise those code paths opt back in via + monkeypatch / @patch.dict. This keeps local runs deterministic and matching + CI, where none of these vars are set. Also clears `ChainManager._instances` so a real client object cached by one test can't leak into the next. """ for key in list(os.environ): - if key == "ETHERSCAN_TOKEN" or key.startswith("PROVIDER_URL_"): + if key in {"ETHERSCAN_TOKEN", "PAT_DISPATCH"} or key.startswith(("PROVIDER_URL_", "TELEGRAM_")): monkeypatch.delenv(key, raising=False) + monkeypatch.setenv("LOG_LEVEL", "INFO") try: from utils.web3_wrapper import ChainManager diff --git a/tests/test_alert_capture.py b/tests/test_alert_capture.py new file mode 100644 index 00000000..4cf6d1f6 --- /dev/null +++ b/tests/test_alert_capture.py @@ -0,0 +1,94 @@ +from __future__ import annotations + +from unittest.mock import patch + +import pytest + +from utils import paths, store +from utils.alert import Alert, AlertSeverity, send_alert +from utils.telegram import TelegramError, send_error_message, send_telegram_message + + +def _use_cache_dir(monkeypatch, tmp_path) -> None: + monkeypatch.setattr(paths, "CACHE_DIR", str(tmp_path)) + monkeypatch.setattr(store, "_initialized", False) + monkeypatch.setattr(store, "_initialized_path", None) + + +def test_raw_send_records_protocol(monkeypatch, tmp_path): + _use_cache_dir(monkeypatch, tmp_path) + monkeypatch.setenv("TELEGRAM_BOT_TOKEN_DEFAULT", "token") + monkeypatch.setenv("TELEGRAM_CHAT_ID_AAVE", "chat") + with patch("utils.telegram._post_message") as post: + send_telegram_message("hello", "aave") + post.assert_called_once() + row = store.query_alerts()[0] + assert row["source"] == "protocol" + assert row["protocol"] == "aave" + assert row["severity"] is None + assert row["delivery_status"] == "delivered" + + +def test_send_alert_records_severity_and_origin(monkeypatch, tmp_path): + _use_cache_dir(monkeypatch, tmp_path) + monkeypatch.setenv("TELEGRAM_BOT_TOKEN_DEFAULT", "token") + monkeypatch.setenv("TELEGRAM_CHAT_ID_ROUTED", "chat") + with patch("utils.telegram._post_message"): + send_alert(Alert(AlertSeverity.HIGH, "boom", "aave", channel="routed")) + row = store.query_alerts()[0] + assert row["protocol"] == "aave" + assert row["channel"] == "routed" + assert row["severity"] == "HIGH" + + +def test_error_message_records_ops_source(monkeypatch, tmp_path): + _use_cache_dir(monkeypatch, tmp_path) + monkeypatch.setenv("TELEGRAM_BOT_TOKEN_DEFAULT", "token") + monkeypatch.setenv("TELEGRAM_CHAT_ID_ERRORS", "chat") + with patch("utils.telegram._post_message"): + send_error_message("boom", "aave") + row = store.query_alerts()[0] + assert row["source"] == "ops_error" + assert row["protocol"] == "aave" + assert row["channel"] == "errors" + + +def test_capture_failures_are_swallowed(monkeypatch, tmp_path): + _use_cache_dir(monkeypatch, tmp_path) + monkeypatch.setenv("TELEGRAM_BOT_TOKEN_DEFAULT", "token") + monkeypatch.setenv("TELEGRAM_CHAT_ID_AAVE", "chat") + with patch("utils.telegram.store.record_alert", side_effect=RuntimeError("db down")): + with patch("utils.telegram._post_message") as post: + send_telegram_message("hello", "aave") + post.assert_called_once() + + +def test_delivery_update_failure_is_swallowed(monkeypatch, tmp_path): + _use_cache_dir(monkeypatch, tmp_path) + monkeypatch.setenv("TELEGRAM_BOT_TOKEN_DEFAULT", "token") + monkeypatch.setenv("TELEGRAM_CHAT_ID_AAVE", "chat") + with patch("utils.telegram.store.update_alert_delivery", side_effect=RuntimeError("db down")): + with patch("utils.telegram._post_message"): + send_telegram_message("hello", "aave") + + +def test_telegram_failure_marks_failed_and_reraises(monkeypatch, tmp_path): + _use_cache_dir(monkeypatch, tmp_path) + monkeypatch.setenv("TELEGRAM_BOT_TOKEN_DEFAULT", "token") + monkeypatch.setenv("TELEGRAM_CHAT_ID_AAVE", "chat") + with patch("utils.telegram._post_message", side_effect=TelegramError("bad")): + with pytest.raises(TelegramError): + send_telegram_message("hello", "aave") + row = store.query_alerts()[0] + assert row["delivery_status"] == "failed" + assert row["delivery_error"] == "bad" + + +def test_missing_credentials_and_debug_are_recorded(monkeypatch, tmp_path): + _use_cache_dir(monkeypatch, tmp_path) + send_telegram_message("missing", "aave") + assert store.query_alerts()[0]["delivery_status"] == "skipped_missing_credentials" + + monkeypatch.setenv("LOG_LEVEL", "DEBUG") + send_telegram_message("debug", "aave") + assert store.query_alerts()[0]["delivery_status"] == "skipped_debug" diff --git a/tests/test_alerts_api.py b/tests/test_alerts_api.py new file mode 100644 index 00000000..84997b5d --- /dev/null +++ b/tests/test_alerts_api.py @@ -0,0 +1,192 @@ +from __future__ import annotations + +import json +import sqlite3 +import threading +from http.client import HTTPConnection +from http.server import ThreadingHTTPServer +from pathlib import Path + +from api.server import AlertsHandler, parse_alert_query +from automation.config import JobsConfig, Profile, Task +from utils import paths, store + + +def _use_cache_dir(monkeypatch, tmp_path) -> None: + monkeypatch.setattr(paths, "CACHE_DIR", str(tmp_path)) + monkeypatch.setattr(store, "_initialized", False) + monkeypatch.setattr(store, "_initialized_path", None) + + +def _request(server: ThreadingHTTPServer, method: str, path: str) -> tuple[int, dict]: + host, port = server.server_address + conn = HTTPConnection(host, port) + try: + conn.request(method, path) + response = conn.getresponse() + body = response.read() + return response.status, json.loads(body.decode()) + finally: + conn.close() + + +def _server(): + server = ThreadingHTTPServer(("127.0.0.1", 0), AlertsHandler) + thread = threading.Thread(target=server.serve_forever, daemon=True) + thread.start() + return server + + +def test_parse_alert_query_validates_timestamps(): + parsed = parse_alert_query("since=2026-06-11T01:00:00%2B01:00&to=2026-06-11T02:00:00Z&limit=999") + assert parsed.from_ts == "2026-06-11T00:00:00.000000Z" + assert parsed.limit == 500 + + +def test_healthz(monkeypatch, tmp_path): + _use_cache_dir(monkeypatch, tmp_path) + server = _server() + try: + status, body = _request(server, "GET", "/healthz") + finally: + server.shutdown() + assert status == 200 + assert body == {"status": "ok"} + + +def test_protocols_route(monkeypatch, tmp_path): + _use_cache_dir(monkeypatch, tmp_path) + config = JobsConfig( + profiles={ + "hourly": Profile( + name="hourly", + cron="5 * * * *", + tasks=[ + Task(name="aave", script="protocols/aave/main.py"), + Task(name="aave-proposals", script="protocols/aave/proposals.py"), + Task(name="lido-steth", script="protocols/lido/steth/main.py"), + Task(name="prune-alerts", script="utils/prune_alerts.py"), + Task(name="off", script="protocols/off/main.py", enabled=False), + ], + ), + "disabled": Profile( + name="disabled", + cron="0 * * * *", + tasks=[Task(name="disabled-task", script="disabled.py")], + enabled=False, + ), + }, + path=Path("jobs.yaml"), + ) + monkeypatch.setattr("api.server.load_jobs_config", lambda: config) + server = _server() + try: + status, body = _request(server, "GET", "/v1/protocols") + finally: + server.shutdown() + + assert status == 200 + assert body == { + "data": [ + { + "name": "aave", + "tasks": [ + { + "name": "aave", + "script": "protocols/aave/main.py", + "args": {}, + "profile": "hourly", + "cron": "5 * * * *", + }, + { + "name": "aave-proposals", + "script": "protocols/aave/proposals.py", + "args": {}, + "profile": "hourly", + "cron": "5 * * * *", + }, + ], + }, + { + "name": "lido", + "tasks": [ + { + "name": "lido-steth", + "script": "protocols/lido/steth/main.py", + "args": {}, + "profile": "hourly", + "cron": "5 * * * *", + } + ], + }, + ], + "count": 2, + } + + +def test_alerts_routes_filters_and_errors(monkeypatch, tmp_path): + _use_cache_dir(monkeypatch, tmp_path) + aave_id = store.record_alert(message="aave", protocol="aave", severity="LOW", source="protocol") + morpho_id = store.record_alert(message="morpho", protocol="morpho", severity="HIGH", source="ops_error") + server = _server() + try: + status, body = _request(server, "GET", "/v1/alerts?limit=1") + assert status == 200 + assert body["data"][0]["id"] == morpho_id + assert body["next_cursor"] == str(morpho_id) + + status, body = _request(server, "GET", f"/v1/alerts?cursor={morpho_id}") + assert status == 200 + assert [row["id"] for row in body["data"]] == [aave_id] + + status, body = _request(server, "GET", "/v1/alerts?protocol=aave&severity=LOW&source=protocol") + assert status == 200 + assert [row["id"] for row in body["data"]] == [aave_id] + + status, body = _request(server, "GET", f"/v1/alerts/{aave_id}") + assert status == 200 + assert body["message"] == "aave" + + status, body = _request(server, "GET", "/v1/alerts/999") + assert status == 404 + assert body["error"] == "not_found" + + status, body = _request(server, "GET", "/v1/alerts?severity=BAD") + assert status == 400 + assert body["error"] == "bad_request" + + status, _ = _request(server, "GET", "/v1/alerts?from=2026-06-11T00:00:00") + assert status == 400 + + status, _ = _request(server, "GET", "/v1/alerts?from=2026-06-11T01:00:00Z&to=2026-06-11T00:00:00Z") + assert status == 400 + + status, _ = _request(server, "GET", "/unknown") + assert status == 404 + + status, _ = _request(server, "POST", "/v1/alerts") + assert status == 405 + finally: + server.shutdown() + + +def test_alerts_route_normalizes_second_precision_timestamp_bounds(monkeypatch, tmp_path): + _use_cache_dir(monkeypatch, tmp_path) + alert_id = store.record_alert(message="same second", protocol="aave") + with sqlite3.connect(store.db_path()) as conn: + conn.execute( + "UPDATE alert_events SET created_at = ? WHERE id = ?", + ("2026-06-11T00:00:00.123456Z", alert_id), + ) + + server = _server() + try: + status, body = _request(server, "GET", "/v1/alerts?from=2026-06-11T00:00:00Z") + assert status == 200 + assert [row["id"] for row in body["data"]] == [alert_id] + + status, body = _request(server, "GET", "/v1/alerts?to=2026-06-11T00:00:00Z") + assert status == 200 + assert body["data"] == [] + finally: + server.shutdown() diff --git a/tests/test_cache_sqlite.py b/tests/test_cache_sqlite.py new file mode 100644 index 00000000..01dc4e67 --- /dev/null +++ b/tests/test_cache_sqlite.py @@ -0,0 +1,76 @@ +from __future__ import annotations + +from utils import cache, paths, store + + +def _use_cache_dir(monkeypatch, tmp_path) -> None: + monkeypatch.setattr(paths, "CACHE_DIR", str(tmp_path)) + monkeypatch.setattr(store, "_initialized", False) + monkeypatch.setattr(store, "_initialized_path", None) + monkeypatch.delenv("CACHE_BACKEND", raising=False) + monkeypatch.delenv("CACHE_DUAL_WRITE_LEGACY", raising=False) + + +def test_missing_key_returns_zero(monkeypatch, tmp_path): + _use_cache_dir(monkeypatch, tmp_path) + assert cache.get_last_value_for_key_from_file(str(tmp_path / "cache-id.txt"), "aave") == 0 + + +def test_string_values_round_trip_and_update(monkeypatch, tmp_path): + _use_cache_dir(monkeypatch, tmp_path) + filename = str(tmp_path / "cache-id.txt") + cache.write_last_value_to_file(filename, "aave", "12.5") + assert cache.get_last_value_for_key_from_file(filename, "aave") == "12.5" + cache.write_last_value_to_file(filename, "aave", "13.5") + assert cache.get_last_value_for_key_from_file(filename, "aave") == "13.5" + + +def test_int_wrappers_still_cast(monkeypatch, tmp_path): + _use_cache_dir(monkeypatch, tmp_path) + monkeypatch.setattr(cache, "cache_filename", str(tmp_path / "cache-id.txt")) + cache.write_last_queued_id_to_file("aave", 10) + assert cache.get_last_queued_id_from_file("aave") == 10 + + +def test_namespace_is_basename(monkeypatch, tmp_path): + _use_cache_dir(monkeypatch, tmp_path) + hourly = tmp_path / "hourly" / "cache-id.txt" + daily = tmp_path / "daily" / "cache-id-daily.txt" + hourly.parent.mkdir() + daily.parent.mkdir() + cache.write_last_value_to_file(str(hourly), "aave", 1) + cache.write_last_value_to_file(str(daily), "aave", 2) + + assert store.state_get("cache-id.txt", "aave") == "1" + assert store.state_get("cache-id-daily.txt", "aave") == "2" + + +def test_legacy_file_read_through_imports_to_sqlite(monkeypatch, tmp_path): + _use_cache_dir(monkeypatch, tmp_path) + filename = tmp_path / "cache-id.txt" + filename.write_text("aave:42\n") + + assert cache.get_last_value_for_key_from_file(str(filename), "aave") == "42" + filename.unlink() + assert cache.get_last_value_for_key_from_file(str(filename), "aave") == "42" + + +def test_file_backend_uses_legacy_file(monkeypatch, tmp_path): + _use_cache_dir(monkeypatch, tmp_path) + monkeypatch.setenv("CACHE_BACKEND", "file") + filename = tmp_path / "cache-id.txt" + + cache.write_last_value_to_file(str(filename), "aave", 7) + assert filename.read_text() == "aave:7\n" + assert cache.get_last_value_for_key_from_file(str(filename), "aave") == "7" + assert store.state_get("cache-id.txt", "aave") is None + + +def test_dual_write_legacy(monkeypatch, tmp_path): + _use_cache_dir(monkeypatch, tmp_path) + monkeypatch.setenv("CACHE_DUAL_WRITE_LEGACY", "1") + filename = tmp_path / "cache-id.txt" + cache.write_last_value_to_file(str(filename), "aave", 7) + + assert store.state_get("cache-id.txt", "aave") == "7" + assert filename.read_text() == "aave:7\n" diff --git a/tests/test_migrate_cache_to_db.py b/tests/test_migrate_cache_to_db.py new file mode 100644 index 00000000..115d2074 --- /dev/null +++ b/tests/test_migrate_cache_to_db.py @@ -0,0 +1,74 @@ +from __future__ import annotations + +from pathlib import Path + +from utils import migrate_cache_to_db, paths, store + + +def _use_cache_dir(monkeypatch, tmp_path) -> None: + monkeypatch.setattr(paths, "CACHE_DIR", str(tmp_path)) + monkeypatch.setattr(store, "_initialized", False) + monkeypatch.setattr(store, "_initialized_path", None) + + +def test_known_cache_files_include_profile_env(monkeypatch, tmp_path): + _use_cache_dir(monkeypatch, tmp_path) + jobs_yaml = tmp_path / "jobs.yaml" + jobs_yaml.write_text( + """ +profiles: + hourly: + cron: "5 * * * *" + tasks: [] + daily: + cron: "19 8 * * *" + env: + CACHE_FILENAME: cache-id-daily.txt + MORPHO_FILENAME: cache-id-daily.txt + tasks: [] +""" + ) + + files = migrate_cache_to_db.known_cache_files(jobs_yaml) + assert files == [ + tmp_path / "cache-id.txt", + tmp_path / "nonces.txt", + tmp_path / "cache-id-daily.txt", + ] + + +def test_migrate_file_imports_key_values_without_overwriting(monkeypatch, tmp_path): + _use_cache_dir(monkeypatch, tmp_path) + legacy = tmp_path / "cache-id.txt" + legacy.write_text("aave:10\nmorpho:20\ninvalid\n") + store.state_set("cache-id.txt", "aave", "9") + + result = migrate_cache_to_db.migrate_file(legacy) + + assert result.files_seen == 1 + assert result.rows_imported == 1 + assert result.rows_skipped == 1 + assert result.rows_invalid == 1 + assert store.state_get("cache-id.txt", "aave") == "9" + assert store.state_get("cache-id.txt", "morpho") == "20" + + +def test_migrate_file_overwrite(monkeypatch, tmp_path): + _use_cache_dir(monkeypatch, tmp_path) + legacy = tmp_path / "cache-id.txt" + legacy.write_text("aave:10\n") + store.state_set("cache-id.txt", "aave", "9") + + result = migrate_cache_to_db.migrate_file(legacy, overwrite=True) + + assert result.rows_imported == 1 + assert result.rows_skipped == 0 + assert store.state_get("cache-id.txt", "aave") == "10" + + +def test_migrate_missing_file(monkeypatch, tmp_path): + _use_cache_dir(monkeypatch, tmp_path) + result = migrate_cache_to_db.migrate_files([Path(tmp_path / "missing.txt")]) + assert result.files_missing == 1 + assert store.db_path() == str(tmp_path / "monitoring.db") + assert Path(store.db_path()).exists() diff --git a/tests/test_store.py b/tests/test_store.py new file mode 100644 index 00000000..f2bc8b4b --- /dev/null +++ b/tests/test_store.py @@ -0,0 +1,117 @@ +from __future__ import annotations + +import sqlite3 +from datetime import UTC, datetime, timedelta + +from utils import paths, store + + +def _use_cache_dir(monkeypatch, tmp_path) -> None: + monkeypatch.setattr(paths, "CACHE_DIR", str(tmp_path)) + monkeypatch.setattr(store, "_initialized", False) + monkeypatch.setattr(store, "_initialized_path", None) + + +def test_db_path_uses_cache_dir(monkeypatch, tmp_path): + _use_cache_dir(monkeypatch, tmp_path) + assert store.db_path() == str(tmp_path / "monitoring.db") + + +def test_record_get_update_and_metadata(monkeypatch, tmp_path): + _use_cache_dir(monkeypatch, tmp_path) + alert_id = store.record_alert( + message="hello", + protocol="aave", + channel="alerts", + severity="LOW", + plain_text=True, + silent=True, + metadata={"tx": "0xabc"}, + ) + + store.update_alert_delivery(alert_id, status="delivered", delivered_at="2026-06-11T10:00:00Z") + row = store.get_alert(alert_id) + + assert row is not None + assert row["message"] == "hello" + assert row["protocol"] == "aave" + assert row["channel"] == "alerts" + assert row["severity"] == "LOW" + assert row["plain_text"] is True + assert row["silent"] is True + assert row["delivery_status"] == "delivered" + assert row["delivered_at"] == "2026-06-11T10:00:00.000000Z" + assert row["metadata"] == {"tx": "0xabc"} + + +def test_get_alert_missing(monkeypatch, tmp_path): + _use_cache_dir(monkeypatch, tmp_path) + assert store.get_alert(999) is None + + +def test_query_filters_order_cursor_and_limit(monkeypatch, tmp_path): + _use_cache_dir(monkeypatch, tmp_path) + old_id = store.record_alert(message="old", protocol="aave", severity="LOW", source="protocol") + new_id = store.record_alert(message="new", protocol="morpho", severity="HIGH", source="ops_error") + + with sqlite3.connect(store.db_path()) as conn: + conn.execute("UPDATE alert_events SET created_at = ? WHERE id = ?", ("2026-06-10T00:00:00Z", old_id)) + conn.execute("UPDATE alert_events SET created_at = ? WHERE id = ?", ("2026-06-11T00:00:00Z", new_id)) + + assert [row["id"] for row in store.query_alerts()] == [new_id, old_id] + assert [row["id"] for row in store.query_alerts(protocol="aave")] == [old_id] + assert [row["id"] for row in store.query_alerts(severity="HIGH")] == [new_id] + assert [row["id"] for row in store.query_alerts(source="ops_error")] == [new_id] + assert [row["id"] for row in store.query_alerts(from_ts="2026-06-10T12:00:00Z")] == [new_id] + assert [row["id"] for row in store.query_alerts(to_ts="2026-06-10T12:00:00Z")] == [old_id] + assert [row["id"] for row in store.query_alerts(cursor=new_id)] == [old_id] + assert len(store.query_alerts(limit=5000)) == 2 + + +def test_query_timestamp_filters_normalize_second_precision_bounds(monkeypatch, tmp_path): + _use_cache_dir(monkeypatch, tmp_path) + alert_id = store.record_alert(message="same second", protocol="aave") + + with sqlite3.connect(store.db_path()) as conn: + conn.execute( + "UPDATE alert_events SET created_at = ? WHERE id = ?", + ("2026-06-11T00:00:00.123456Z", alert_id), + ) + + assert [row["id"] for row in store.query_alerts(from_ts="2026-06-11T00:00:00Z")] == [alert_id] + assert store.query_alerts(to_ts="2026-06-11T00:00:00Z") == [] + + +def test_prune_alerts(monkeypatch, tmp_path): + _use_cache_dir(monkeypatch, tmp_path) + old_id = store.record_alert(message="old", protocol="aave") + keep_id = store.record_alert(message="keep", protocol="aave") + old_ts = (datetime.now(UTC) - timedelta(days=45)).isoformat().replace("+00:00", "Z") + with sqlite3.connect(store.db_path()) as conn: + conn.execute("UPDATE alert_events SET created_at = ? WHERE id = ?", (old_ts, old_id)) + + assert store.prune_alerts(30) == 1 + assert store.get_alert(old_id) is None + assert store.get_alert(keep_id) is not None + + +def test_wal_read_while_write(monkeypatch, tmp_path): + _use_cache_dir(monkeypatch, tmp_path) + alert_id = store.record_alert(message="hello", protocol="aave") + reader = sqlite3.connect(store.db_path()) + try: + assert reader.execute("SELECT count(*) FROM alert_events").fetchone()[0] == 1 + store.update_alert_delivery(alert_id, status="delivered") + assert store.get_alert(alert_id)["delivery_status"] == "delivered" # type: ignore[index] + finally: + reader.close() + + +def test_monitor_state_round_trip(monkeypatch, tmp_path): + _use_cache_dir(monkeypatch, tmp_path) + assert store.state_get("cache-id.txt", "aave") is None + store.state_set("cache-id.txt", "aave", "42") + assert store.state_get("cache-id.txt", "aave") == "42" + store.state_set("cache-id.txt", "aave", "43") + assert store.state_get("cache-id.txt", "aave") == "43" + assert store.state_get("cache-id-daily.txt", "aave") is None diff --git a/tests/test_utils.py b/tests/test_utils.py index 3569020e..116bab57 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -400,25 +400,61 @@ def test_alert_dataclass_immutability(self): def test_emoji_prefix_low(self, mock_send): alert = Alert(severity=AlertSeverity.LOW, message="info msg", protocol="test") send_alert(alert) - mock_send.assert_called_once_with("ℹ️ info msg", "test", True, False) + mock_send.assert_called_once_with( + "ℹ️ info msg", + "test", + True, + False, + severity="LOW", + source="protocol", + origin_protocol="test", + channel="test", + ) @patch("utils.alert.send_telegram_message") def test_emoji_prefix_medium(self, mock_send): alert = Alert(severity=AlertSeverity.MEDIUM, message="warn msg", protocol="test") send_alert(alert) - mock_send.assert_called_once_with("⚠️ warn msg", "test", False, False) + mock_send.assert_called_once_with( + "⚠️ warn msg", + "test", + False, + False, + severity="MEDIUM", + source="protocol", + origin_protocol="test", + channel="test", + ) @patch("utils.alert.send_telegram_message") def test_emoji_prefix_high(self, mock_send): alert = Alert(severity=AlertSeverity.HIGH, message="high msg", protocol="test") send_alert(alert) - mock_send.assert_called_once_with("🚨 high msg", "test", False, False) + mock_send.assert_called_once_with( + "🚨 high msg", + "test", + False, + False, + severity="HIGH", + source="protocol", + origin_protocol="test", + channel="test", + ) @patch("utils.alert.send_telegram_message") def test_emoji_prefix_critical(self, mock_send): alert = Alert(severity=AlertSeverity.CRITICAL, message="crit msg", protocol="test") send_alert(alert) - mock_send.assert_called_once_with("🔴 crit msg", "test", False, False) + mock_send.assert_called_once_with( + "🔴 crit msg", + "test", + False, + False, + severity="CRITICAL", + source="protocol", + origin_protocol="test", + channel="test", + ) @patch("utils.alert.send_telegram_message") def test_silent_default_low(self, mock_send): @@ -463,14 +499,32 @@ def test_channel_routes_telegram(self, mock_send): """When channel is set, Telegram message goes to channel, not protocol.""" alert = Alert(severity=AlertSeverity.HIGH, message="peg alert", protocol="origin", channel="pegs") send_alert(alert) - mock_send.assert_called_once_with("🚨 peg alert", "pegs", False, False) + mock_send.assert_called_once_with( + "🚨 peg alert", + "pegs", + False, + False, + severity="HIGH", + source="protocol", + origin_protocol="origin", + channel="pegs", + ) @patch("utils.alert.send_telegram_message") def test_channel_fallback_to_protocol(self, mock_send): """When channel is empty, Telegram message goes to protocol.""" alert = Alert(severity=AlertSeverity.HIGH, message="reserves low", protocol="infinifi") send_alert(alert) - mock_send.assert_called_once_with("🚨 reserves low", "infinifi", False, False) + mock_send.assert_called_once_with( + "🚨 reserves low", + "infinifi", + False, + False, + severity="HIGH", + source="protocol", + origin_protocol="infinifi", + channel="infinifi", + ) @patch("utils.alert.send_telegram_message") def test_hook_invoked_for_high(self, mock_send): diff --git a/utils/alert.py b/utils/alert.py index c4f46f14..8f9a8b91 100644 --- a/utils/alert.py +++ b/utils/alert.py @@ -128,7 +128,16 @@ def send_alert( if silent is None: silent = _SEVERITY_SILENT_DEFAULT[alert.severity.value] - send_telegram_message(message, alert.channel or alert.protocol, silent, plain_text) + send_telegram_message( + message, + alert.channel or alert.protocol, + silent, + plain_text, + severity=alert.severity.value, + source="protocol", + origin_protocol=alert.protocol, + channel=alert.channel or alert.protocol, + ) # Invoke hook for HIGH and CRITICAL alerts if alert.severity in (AlertSeverity.HIGH, AlertSeverity.CRITICAL) and _alert_hook is not None: diff --git a/utils/cache.py b/utils/cache.py index e7ff27d6..8e42ae01 100644 --- a/utils/cache.py +++ b/utils/cache.py @@ -3,22 +3,12 @@ from dotenv import load_dotenv -load_dotenv() - -# CACHE_DIR is the single knob for where all on-disk dedupe/cache state lives. -# Default "" → the current working directory, so local runs drop files in the -# repo as before. On the VPS the systemd unit sets CACHE_DIR=/srv/cache (the one -# writable path under the hardened service); see deploy/systemd/yearn-monitor.service. -CACHE_DIR: str = os.getenv("CACHE_DIR", "") +from utils import paths, store +load_dotenv() -def cache_path(filename: str) -> str: - """Resolve a cache ``filename`` against ``CACHE_DIR``. - - An absolute ``filename`` is returned unchanged (``os.path.join`` semantics), so an - explicit override always wins over ``CACHE_DIR``. - """ - return os.path.join(CACHE_DIR, filename) +CACHE_DIR = paths.CACHE_DIR +cache_path = paths.cache_path # format of the data: "protocol:value" @@ -62,26 +52,50 @@ def morpho_key(vault_address: str, market_id: str, value_type: str) -> str: def get_last_value_for_key_from_file(filename: str, wanted_key: str) -> Union[str, int]: + if os.getenv("CACHE_BACKEND", "sqlite") == "file": + return _get_last_value_from_legacy_file(filename, wanted_key) + + namespace = os.path.basename(filename) + value = store.state_get(namespace, wanted_key) + if value is not None: + return value + + legacy_value = _get_last_value_from_legacy_file(filename, wanted_key) + if legacy_value != 0: + store.state_set(namespace, wanted_key, str(legacy_value)) + return legacy_value + + +def write_last_value_to_file(filename: str, write_key: str, write_value: Union[int, str, float]) -> None: + if os.getenv("CACHE_BACKEND", "sqlite") == "file": + _write_last_value_to_legacy_file(filename, write_key, write_value) + return + + store.state_set(os.path.basename(filename), write_key, str(write_value)) + if os.getenv("CACHE_DUAL_WRITE_LEGACY") == "1": + _write_last_value_to_legacy_file(filename, write_key, write_value) + + +def _get_last_value_from_legacy_file(filename: str, wanted_key: str) -> Union[str, int]: if not os.path.exists(filename): return 0 - else: - with open(filename, "r") as f: - # read line by line in format "key:value" - lines = f.readlines() - for line in lines: - key, value = line.strip().split(":") - if key == wanted_key: - return value + with open(filename, "r") as f: + # read line by line in format "key:value" + lines = f.readlines() + for line in lines: + key, value = line.strip().split(":", 1) + if key == wanted_key: + return value return 0 -def write_last_value_to_file(filename: str, write_key: str, write_value: Union[int, str, float]) -> None: +def _write_last_value_to_legacy_file(filename: str, write_key: str, write_value: Union[int, str, float]) -> None: # check if the proposal ud is already in the file, then update the id else append if os.path.exists(filename): with open(filename, "r") as f: lines = f.readlines() for i, line in enumerate(lines): - key, _ = line.strip().split(":") + key, _ = line.strip().split(":", 1) if key == write_key: lines[i] = f"{write_key}:{write_value}\n" break diff --git a/utils/migrate_cache_to_db.py b/utils/migrate_cache_to_db.py new file mode 100644 index 00000000..f7d0ff34 --- /dev/null +++ b/utils/migrate_cache_to_db.py @@ -0,0 +1,148 @@ +from __future__ import annotations + +import argparse +import os +from dataclasses import dataclass +from pathlib import Path +from typing import Iterable + +from automation.config import load_jobs_config +from utils import paths, store +from utils.logging import get_logger + +logger = get_logger("utils.migrate_cache_to_db") + +_CACHE_ENV_KEYS = ("CACHE_FILENAME", "NONCE_FILENAME", "MORPHO_FILENAME") +_DEFAULT_CACHE_FILES = { + "CACHE_FILENAME": "cache-id.txt", + "NONCE_FILENAME": "nonces.txt", + "MORPHO_FILENAME": "cache-id.txt", +} + + +@dataclass(frozen=True) +class MigrationResult: + files_seen: int = 0 + files_missing: int = 0 + rows_imported: int = 0 + rows_skipped: int = 0 + rows_invalid: int = 0 + + +def known_cache_files(jobs_path: Path | None = None) -> list[Path]: + """Return cache files used by the current deployment configuration.""" + candidates: list[str] = [] + for key in _CACHE_ENV_KEYS: + candidates.append(os.getenv(key, _DEFAULT_CACHE_FILES[key])) + + try: + jobs = load_jobs_config(jobs_path) + except Exception: + logger.debug("Could not load jobs config for migration file discovery", exc_info=True) + else: + for profile in jobs.profiles.values(): + for key in _CACHE_ENV_KEYS: + value = profile.env.get(key) + if value: + candidates.append(value) + + resolved: list[Path] = [] + seen: set[Path] = set() + for candidate in candidates: + path = Path(paths.cache_path(candidate)) + if path not in seen: + resolved.append(path) + seen.add(path) + return resolved + + +def migrate_files(files: Iterable[Path], *, overwrite: bool = False) -> MigrationResult: + """Import legacy key:value cache files into SQLite monitor_state.""" + store.initialize_database() + result = MigrationResult() + for file_path in files: + partial = migrate_file(file_path, overwrite=overwrite) + result = MigrationResult( + files_seen=result.files_seen + partial.files_seen, + files_missing=result.files_missing + partial.files_missing, + rows_imported=result.rows_imported + partial.rows_imported, + rows_skipped=result.rows_skipped + partial.rows_skipped, + rows_invalid=result.rows_invalid + partial.rows_invalid, + ) + return result + + +def migrate_file(file_path: Path, *, overwrite: bool = False) -> MigrationResult: + namespace = file_path.name + if not file_path.exists(): + logger.info("cache migration: missing %s", file_path) + return MigrationResult(files_missing=1) + + imported = 0 + skipped = 0 + invalid = 0 + with file_path.open("r") as f: + for line_no, raw_line in enumerate(f, start=1): + line = raw_line.strip() + if not line: + continue + if ":" not in line: + invalid += 1 + logger.warning("cache migration: invalid line %s:%d", file_path, line_no) + continue + key, value = line.split(":", 1) + if not key: + invalid += 1 + logger.warning("cache migration: empty key at %s:%d", file_path, line_no) + continue + if not overwrite and store.state_get(namespace, key) is not None: + skipped += 1 + continue + store.state_set(namespace, key, value) + imported += 1 + + logger.info( + "cache migration: %s namespace=%s imported=%d skipped=%d invalid=%d", + file_path, + namespace, + imported, + skipped, + invalid, + ) + return MigrationResult(files_seen=1, rows_imported=imported, rows_skipped=skipped, rows_invalid=invalid) + + +def parse_args(argv: list[str] | None = None) -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Import legacy text cache files into monitoring.db") + parser.add_argument( + "--file", + action="append", + default=[], + help="Additional legacy cache file to import. Relative paths resolve under CACHE_DIR.", + ) + parser.add_argument("--jobs-yaml", type=Path, default=None, help="jobs.yaml path for profile env discovery") + parser.add_argument("--overwrite", action="store_true", help="Overwrite existing SQLite monitor_state values") + parser.add_argument("--checkpoint", action="store_true", help="Run a WAL checkpoint after migration") + return parser.parse_args(argv) + + +def main(argv: list[str] | None = None) -> None: + args = parse_args(argv) + files = known_cache_files(args.jobs_yaml) + files.extend(Path(paths.cache_path(path)) for path in args.file) + result = migrate_files(files, overwrite=args.overwrite) + if args.checkpoint: + store.checkpoint_wal() + logger.info( + "cache migration complete: files=%d missing=%d imported=%d skipped=%d invalid=%d db=%s", + result.files_seen, + result.files_missing, + result.rows_imported, + result.rows_skipped, + result.rows_invalid, + store.db_path(), + ) + + +if __name__ == "__main__": + main() diff --git a/utils/paths.py b/utils/paths.py new file mode 100644 index 00000000..a0eda11b --- /dev/null +++ b/utils/paths.py @@ -0,0 +1,20 @@ +import os + +from dotenv import load_dotenv + +load_dotenv() + +# CACHE_DIR is the single knob for where on-disk monitoring state lives. +# "" → current working directory for local runs; systemd sets /srv/cache. +CACHE_DIR: str = os.getenv("CACHE_DIR", "") + + +def cache_path(filename: str) -> str: + """Resolve a cache filename against CACHE_DIR. + + Reads the ``CACHE_DIR`` env var at call time so modules that compute paths at + their own import time (e.g. ``protocols/ustb/main.py``) still pick it up, and + falls back to the module global ``CACHE_DIR`` when it is unset — which tests + override via ``monkeypatch.setattr``. + """ + return os.path.join(os.getenv("CACHE_DIR", CACHE_DIR), filename) diff --git a/utils/prune_alerts.py b/utils/prune_alerts.py new file mode 100644 index 00000000..9cf887ae --- /dev/null +++ b/utils/prune_alerts.py @@ -0,0 +1,18 @@ +import os + +from utils.logging import get_logger +from utils.runner import run_with_alert +from utils.store import checkpoint_wal, prune_alerts + +logger = get_logger("utils.prune_alerts") + + +def main() -> None: + days = int(os.getenv("ALERTS_RETENTION_DAYS", "30")) + deleted = prune_alerts(days) + logger.info("Pruned %d alert rows older than %d days", deleted, days) + checkpoint_wal() + + +if __name__ == "__main__": + run_with_alert(main, "automation") diff --git a/utils/runner.py b/utils/runner.py index ebd0bbb2..336f43cb 100644 --- a/utils/runner.py +++ b/utils/runner.py @@ -45,6 +45,6 @@ def run_with_alert(entrypoint: Callable[[], None], protocol: str, name: str | No if run_url: lines.append(f"Run: {run_url}") try: - send_error_message("\n".join(lines), protocol) + send_error_message("\n".join(lines), protocol, source="crash") except Exception: # noqa: BLE001 - alerting must not itself crash the wrapper logger.exception("Failed to send crash alert for %s", script) diff --git a/utils/store.py b/utils/store.py new file mode 100644 index 00000000..91e3cf65 --- /dev/null +++ b/utils/store.py @@ -0,0 +1,296 @@ +from __future__ import annotations + +import json +import sqlite3 +from contextlib import closing +from datetime import UTC, datetime, timedelta +from typing import Any, NotRequired, TypedDict + +from utils.paths import cache_path + +_initialized = False +_initialized_path: str | None = None + + +class AlertEvent(TypedDict): + id: int + created_at: str + source: str + protocol: str + channel: str + severity: str | None + message: str + plain_text: bool + silent: bool + delivery_status: str + delivered_at: str | None + delivery_error: str | None + dedupe_key: NotRequired[str | None] + fingerprint: NotRequired[str | None] + metadata: dict[str, Any] + + +def format_utc_iso(value: datetime) -> str: + """Return a fixed-width UTC ISO-8601 timestamp suitable for TEXT comparisons.""" + if value.tzinfo is None: + raise ValueError("timestamp must include timezone") + return value.astimezone(UTC).isoformat(timespec="microseconds").replace("+00:00", "Z") + + +def normalize_timestamp(value: str) -> str: + """Normalize an ISO-8601 timestamp to fixed-width UTC.""" + raw = value + if raw.endswith("Z"): + raw = f"{raw[:-1]}+00:00" + parsed = datetime.fromisoformat(raw) + return format_utc_iso(parsed) + + +def utc_now_iso() -> str: + return format_utc_iso(datetime.now(UTC)) + + +def db_path() -> str: + """Return the SQLite database path under CACHE_DIR.""" + return cache_path("monitoring.db") + + +def initialize_database() -> None: + """Create the SQLite database schema if needed.""" + with closing(_connect()): + pass + + +def checkpoint_wal() -> None: + """Checkpoint and truncate SQLite WAL files.""" + with closing(_connect()) as conn: + conn.execute("PRAGMA wal_checkpoint(TRUNCATE)") + + +def _connect() -> sqlite3.Connection: + conn = sqlite3.connect(db_path(), timeout=5) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA synchronous=NORMAL") + conn.execute("PRAGMA busy_timeout=5000") + _ensure_schema(conn) + return conn + + +def _ensure_schema(conn: sqlite3.Connection) -> None: + global _initialized, _initialized_path + current_path = db_path() + if _initialized and _initialized_path == current_path: + return + conn.executescript( + """ + CREATE TABLE IF NOT EXISTS alert_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + created_at TEXT NOT NULL, + source TEXT NOT NULL, + protocol TEXT NOT NULL, + channel TEXT NOT NULL DEFAULT '', + severity TEXT, + message TEXT NOT NULL, + plain_text INTEGER NOT NULL DEFAULT 0, + silent INTEGER NOT NULL DEFAULT 0, + delivery_status TEXT NOT NULL DEFAULT 'generated', + delivered_at TEXT, + delivery_error TEXT, + dedupe_key TEXT, + fingerprint TEXT, + metadata_json TEXT + ); + + CREATE INDEX IF NOT EXISTS idx_alert_events_created_at + ON alert_events(created_at); + + CREATE INDEX IF NOT EXISTS idx_alert_events_protocol_created_id + ON alert_events(protocol, created_at, id); + + CREATE INDEX IF NOT EXISTS idx_alert_events_severity_created_id + ON alert_events(severity, created_at, id); + + CREATE INDEX IF NOT EXISTS idx_alert_events_source_created_id + ON alert_events(source, created_at, id); + + CREATE TABLE IF NOT EXISTS monitor_state ( + namespace TEXT NOT NULL, + key TEXT NOT NULL, + value TEXT NOT NULL, + updated_at TEXT NOT NULL, + PRIMARY KEY (namespace, key) + ); + """ + ) + conn.commit() + _initialized = True + _initialized_path = current_path + + +def _row_to_alert(row: sqlite3.Row) -> AlertEvent: + metadata_raw = row["metadata_json"] + metadata = json.loads(metadata_raw) if metadata_raw else {} + return { + "id": row["id"], + "created_at": row["created_at"], + "source": row["source"], + "protocol": row["protocol"], + "channel": row["channel"], + "severity": row["severity"], + "message": row["message"], + "plain_text": bool(row["plain_text"]), + "silent": bool(row["silent"]), + "delivery_status": row["delivery_status"], + "delivered_at": row["delivered_at"], + "delivery_error": row["delivery_error"], + "dedupe_key": row["dedupe_key"], + "fingerprint": row["fingerprint"], + "metadata": metadata, + } + + +def record_alert( + *, + message: str, + protocol: str, + channel: str = "", + severity: str | None = None, + source: str = "protocol", + plain_text: bool = False, + silent: bool = False, + delivery_status: str = "generated", + metadata: dict[str, object] | None = None, + dedupe_key: str | None = None, + fingerprint: str | None = None, +) -> int: + """Insert an alert event and return its id.""" + # `closing(...)` closes the connection (sqlite3's own `with` only manages the + # transaction, never closes); the trailing `conn` commits/rolls back the write. + with closing(_connect()) as conn, conn: + cursor = conn.execute( + """ + INSERT INTO alert_events ( + created_at, source, protocol, channel, severity, message, plain_text, + silent, delivery_status, dedupe_key, fingerprint, metadata_json + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + utc_now_iso(), + source, + protocol, + channel, + severity, + message, + int(plain_text), + int(silent), + delivery_status, + dedupe_key, + fingerprint, + json.dumps(metadata or {}, sort_keys=True), + ), + ) + alert_id = cursor.lastrowid + if alert_id is None: + raise RuntimeError("SQLite did not return an alert id") + return alert_id + + +def update_alert_delivery( + alert_id: int, + *, + status: str, + delivered_at: str | None = None, + error: str | None = None, +) -> None: + """Update Telegram delivery fields for an alert event.""" + if delivered_at is None and status == "delivered": + delivered_at = utc_now_iso() + elif delivered_at is not None: + delivered_at = normalize_timestamp(delivered_at) + with closing(_connect()) as conn, conn: + conn.execute( + """ + UPDATE alert_events + SET delivery_status = ?, delivered_at = ?, delivery_error = ? + WHERE id = ? + """, + (status, delivered_at, error, alert_id), + ) + + +def get_alert(alert_id: int) -> AlertEvent | None: + """Return one alert event by id.""" + with closing(_connect()) as conn: + row = conn.execute("SELECT * FROM alert_events WHERE id = ?", (alert_id,)).fetchone() + return _row_to_alert(row) if row else None + + +def query_alerts( + *, + protocol: str | None = None, + severity: str | None = None, + source: str | None = None, + from_ts: str | None = None, + to_ts: str | None = None, + cursor: int | None = None, + limit: int = 100, +) -> list[AlertEvent]: + """Return alert events ordered by id descending.""" + limit = max(1, min(limit, 1000)) + clauses: list[str] = [] + params: list[object] = [] + for column, value in (("protocol", protocol), ("severity", severity), ("source", source)): + if value is not None: + clauses.append(f"{column} = ?") + params.append(value) + if from_ts is not None: + clauses.append("created_at >= ?") + params.append(normalize_timestamp(from_ts)) + if to_ts is not None: + clauses.append("created_at < ?") + params.append(normalize_timestamp(to_ts)) + if cursor is not None: + clauses.append("id < ?") + params.append(cursor) + + where = f"WHERE {' AND '.join(clauses)}" if clauses else "" + sql = f"SELECT * FROM alert_events {where} ORDER BY id DESC LIMIT ?" + params.append(limit) + with closing(_connect()) as conn: + rows = conn.execute(sql, params).fetchall() + return [_row_to_alert(row) for row in rows] + + +def prune_alerts(older_than_days: int) -> int: + """Delete old alert events and return the number of deleted rows.""" + cutoff = format_utc_iso(datetime.now(UTC) - timedelta(days=older_than_days)) + with closing(_connect()) as conn, conn: + cursor = conn.execute("DELETE FROM alert_events WHERE created_at < ?", (cutoff,)) + return cursor.rowcount + + +def state_get(namespace: str, key: str) -> str | None: + """Return a stored monitor state value.""" + with closing(_connect()) as conn: + row = conn.execute( + "SELECT value FROM monitor_state WHERE namespace = ? AND key = ?", + (namespace, key), + ).fetchone() + return str(row["value"]) if row else None + + +def state_set(namespace: str, key: str, value: str) -> None: + """Upsert a monitor state value.""" + with closing(_connect()) as conn, conn: + conn.execute( + """ + INSERT INTO monitor_state (namespace, key, value, updated_at) + VALUES (?, ?, ?, ?) + ON CONFLICT(namespace, key) DO UPDATE SET + value = excluded.value, + updated_at = excluded.updated_at + """, + (namespace, key, value, utc_now_iso()), + ) diff --git a/utils/telegram.py b/utils/telegram.py index 2348f3b4..18d93c27 100644 --- a/utils/telegram.py +++ b/utils/telegram.py @@ -4,6 +4,7 @@ import requests from dotenv import load_dotenv +from utils import store from utils.logging import get_logger load_dotenv() @@ -96,6 +97,12 @@ def send_telegram_message( protocol: str, disable_notification: bool = False, plain_text: bool = False, + *, + severity: str | None = None, + source: str = "protocol", + origin_protocol: str | None = None, + channel: str | None = None, + metadata: dict[str, object] | None = None, ) -> None: """ Send a message to a Telegram chat using a bot. @@ -110,15 +117,30 @@ def send_telegram_message( """ logger.debug("Sending telegram message:\n%s", message) - if os.getenv("LOG_LEVEL", "INFO").upper() == "DEBUG": - logger.debug("Skipping Telegram send (LOG_LEVEL=DEBUG)") - return - # Truncate long messages; disable Markdown to avoid broken entities if len(message) > MAX_MESSAGE_LENGTH: message = message[: MAX_MESSAGE_LENGTH - 3] + "..." plain_text = True + logical_protocol = origin_protocol or protocol + delivery_channel = channel or protocol + + if os.getenv("LOG_LEVEL", "INFO").upper() == "DEBUG": + # Terminal status recorded in the single insert; no delivery update needed. + _record_alert_safe( + message=message, + protocol=logical_protocol, + channel=delivery_channel, + severity=severity, + source=source, + plain_text=plain_text, + silent=disable_notification, + delivery_status="skipped_debug", + metadata=metadata, + ) + logger.debug("Skipping Telegram send (LOG_LEVEL=DEBUG)") + return + # Test/staging override: route every message to one chat for comparison runs. # Set TELEGRAM_TEST_CHAT_ID to a dummy group id; unset to restore prod routing. # The DEFAULT bot must be a member of that group. The per-protocol label is @@ -128,6 +150,17 @@ def send_telegram_message( if test_chat_id: bot_token = os.getenv("TELEGRAM_BOT_TOKEN_DEFAULT") if not bot_token: + _record_alert_safe( + message=message, + protocol=logical_protocol, + channel=delivery_channel, + severity=severity, + source=source, + plain_text=plain_text, + silent=disable_notification, + delivery_status="skipped_missing_credentials", + metadata=metadata, + ) logger.warning("TELEGRAM_TEST_CHAT_ID set but TELEGRAM_BOT_TOKEN_DEFAULT missing") return # Escape the label for Markdown sends — protocol names contain `_` @@ -137,7 +170,23 @@ def send_telegram_message( label = f"[{protocol}] " if not plain_text: label = escape_markdown(label) - _post_message(bot_token, test_chat_id, f"{label}{message}", plain_text, disable_notification) + sent_message = f"{label}{message}" + alert_id = _record_alert_safe( + message=sent_message, + protocol=logical_protocol, + channel=delivery_channel, + severity=severity, + source=source, + plain_text=plain_text, + silent=disable_notification, + metadata=metadata, + ) + try: + _post_message(bot_token, test_chat_id, sent_message, plain_text, disable_notification) + except TelegramError as exc: + _update_alert_delivery_safe(alert_id, status="failed", error=str(exc)) + raise + _update_alert_delivery_safe(alert_id, status="delivered", delivered_at=store.utc_now_iso()) return # Check if this protocol has a topic ID configured (forum-style group) @@ -155,10 +204,82 @@ def send_telegram_message( chat_id = os.getenv(f"TELEGRAM_CHAT_ID_{protocol.upper()}") if not bot_token or not chat_id: + _record_alert_safe( + message=message, + protocol=logical_protocol, + channel=delivery_channel, + severity=severity, + source=source, + plain_text=plain_text, + silent=disable_notification, + delivery_status="skipped_missing_credentials", + metadata=metadata, + ) logger.warning("Missing Telegram credentials for %s", protocol) return - _post_message(bot_token, chat_id, message, plain_text, disable_notification, topic_id) + alert_id = _record_alert_safe( + message=message, + protocol=logical_protocol, + channel=delivery_channel, + severity=severity, + source=source, + plain_text=plain_text, + silent=disable_notification, + metadata=metadata, + ) + try: + _post_message(bot_token, chat_id, message, plain_text, disable_notification, topic_id) + except TelegramError as exc: + _update_alert_delivery_safe(alert_id, status="failed", error=str(exc)) + raise + _update_alert_delivery_safe(alert_id, status="delivered", delivered_at=store.utc_now_iso()) + + +def _record_alert_safe( + *, + message: str, + protocol: str, + channel: str, + severity: str | None, + source: str, + plain_text: bool, + silent: bool, + delivery_status: str = "generated", + metadata: dict[str, object] | None = None, +) -> int | None: + """Best-effort alert insert; never raises.""" + try: + return store.record_alert( + message=message, + protocol=protocol, + channel=channel, + severity=severity, + source=source, + plain_text=plain_text, + silent=silent, + delivery_status=delivery_status, + metadata=metadata, + ) + except Exception: + logger.debug("Failed to record alert event", exc_info=True) + return None + + +def _update_alert_delivery_safe( + alert_id: int | None, + *, + status: str, + delivered_at: str | None = None, + error: str | None = None, +) -> None: + """Best-effort delivery update; never raises.""" + if alert_id is None: + return + try: + store.update_alert_delivery(alert_id, status=status, delivered_at=delivered_at, error=error) + except Exception: + logger.debug("Failed to update alert delivery", exc_info=True) def _error_channel_configured() -> bool: @@ -166,7 +287,13 @@ def _error_channel_configured() -> bool: return bool(os.getenv("TELEGRAM_TOPIC_ID_ERRORS") or os.getenv("TELEGRAM_CHAT_ID_ERRORS")) -def send_error_message(message: str, protocol: str, disable_notification: bool = True) -> None: +def send_error_message( + message: str, + protocol: str, + disable_notification: bool = True, + *, + source: str = "ops_error", +) -> None: """Route an operational error/diagnostic to the dedicated errors channel. Keeps transient failures (GraphQL/fetch errors, retries, crashes) out of the @@ -185,9 +312,25 @@ def send_error_message(message: str, protocol: str, disable_notification: bool = disable_notification: If True (default), send silently. """ if _error_channel_configured(): - send_telegram_message(f"[{protocol}] {message}", ERROR_CHANNEL, disable_notification, plain_text=True) + send_telegram_message( + f"[{protocol}] {message}", + ERROR_CHANNEL, + disable_notification, + plain_text=True, + source=source, + origin_protocol=protocol, + channel=ERROR_CHANNEL, + ) else: - send_telegram_message(message, protocol, disable_notification, plain_text=True) + send_telegram_message( + message, + protocol, + disable_notification, + plain_text=True, + source=source, + origin_protocol=protocol, + channel=protocol, + ) def get_github_run_url() -> str: