From 20d3ddb8410001e783dba2baae6ae1ab77f583ce Mon Sep 17 00:00:00 2001 From: David Kiania Date: Sun, 12 Apr 2026 21:40:29 +0300 Subject: [PATCH] feat: add db_audit health checks, runner, and scheduled Forgejo workflow Co-Authored-By: Claude Sonnet 4.6 --- .forgejo/workflows/scheduled-audit.yml | 20 +++ db_audit/__init__.py | 0 db_audit/checks/data_gaps.sql | 19 +++ db_audit/checks/distance_outliers.sql | 14 +++ db_audit/checks/duplicate_positions.sql | 11 ++ db_audit/checks/enum_drift.sql | 34 +++++ db_audit/checks/null_integrity.sql | 30 +++++ db_audit/checks/stale_devices.sql | 14 +++ db_audit/run_audit.py | 161 ++++++++++++++++++++++++ db_audit/schema/health_checks_table.sql | 13 ++ 10 files changed, 316 insertions(+) create mode 100644 .forgejo/workflows/scheduled-audit.yml create mode 100644 db_audit/__init__.py create mode 100644 db_audit/checks/data_gaps.sql create mode 100644 db_audit/checks/distance_outliers.sql create mode 100644 db_audit/checks/duplicate_positions.sql create mode 100644 db_audit/checks/enum_drift.sql create mode 100644 db_audit/checks/null_integrity.sql create mode 100644 db_audit/checks/stale_devices.sql create mode 100644 db_audit/run_audit.py create mode 100644 db_audit/schema/health_checks_table.sql diff --git a/.forgejo/workflows/scheduled-audit.yml b/.forgejo/workflows/scheduled-audit.yml new file mode 100644 index 0000000..01551cd --- /dev/null +++ b/.forgejo/workflows/scheduled-audit.yml @@ -0,0 +1,20 @@ +name: DB Audit + +on: + schedule: + - cron: "0 3 * * *" # 03:00 UTC = 06:00 EAT daily + workflow_dispatch: # Also runnable manually from Forgejo UI + +jobs: + audit: + runs-on: self-hosted + steps: + - uses: actions/checkout@v4 + + - name: Install dependencies + run: pip install psycopg2-binary + + - name: Run DB audit + run: python db_audit/run_audit.py + env: + DATABASE_URL: ${{ secrets.DATABASE_URL }} diff --git a/db_audit/__init__.py b/db_audit/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/db_audit/checks/data_gaps.sql b/db_audit/checks/data_gaps.sql new file mode 100644 index 0000000..c1698f5 --- /dev/null +++ b/db_audit/checks/data_gaps.sql @@ -0,0 +1,19 @@ +-- Data gaps: enabled devices with no position_history or trips in last 7 days +SELECT + d.imei, + d.device_name, + d.enabled_flag, + MAX(ph.gps_time) AS last_position, + MAX(t.start_time) AS last_trip +FROM tracksolid.devices d +LEFT JOIN tracksolid.position_history ph + ON ph.imei = d.imei + AND ph.gps_time > NOW() - INTERVAL '7 days' +LEFT JOIN tracksolid.trips t + ON t.imei = d.imei + AND t.start_time > NOW() - INTERVAL '7 days' +WHERE d.enabled_flag = 1 +GROUP BY d.imei, d.device_name, d.enabled_flag +HAVING MAX(ph.gps_time) IS NULL + AND MAX(t.start_time) IS NULL +ORDER BY d.imei; diff --git a/db_audit/checks/distance_outliers.sql b/db_audit/checks/distance_outliers.sql new file mode 100644 index 0000000..c3c7500 --- /dev/null +++ b/db_audit/checks/distance_outliers.sql @@ -0,0 +1,14 @@ +-- Distance outliers: trips with impossible or suspicious distance in last 7 days +SELECT + imei, + start_time, + end_time, + distance_km, + source +FROM tracksolid.trips +WHERE start_time > NOW() - INTERVAL '7 days' + AND ( + distance_km < 0 + OR distance_km > 500 + ) +ORDER BY distance_km DESC; diff --git a/db_audit/checks/duplicate_positions.sql b/db_audit/checks/duplicate_positions.sql new file mode 100644 index 0000000..75d1209 --- /dev/null +++ b/db_audit/checks/duplicate_positions.sql @@ -0,0 +1,11 @@ +-- Duplicate (imei, gps_time) pairs in position_history +-- Should always return 0 rows if ON CONFLICT DO NOTHING is working correctly +SELECT + imei, + gps_time, + COUNT(*) AS duplicate_count +FROM tracksolid.position_history +WHERE gps_time > NOW() - INTERVAL '7 days' +GROUP BY imei, gps_time +HAVING COUNT(*) > 1 +ORDER BY duplicate_count DESC; diff --git a/db_audit/checks/enum_drift.sql b/db_audit/checks/enum_drift.sql new file mode 100644 index 0000000..b09ac2d --- /dev/null +++ b/db_audit/checks/enum_drift.sql @@ -0,0 +1,34 @@ +-- Enum drift: unexpected values in source and other constrained columns +-- position_history.source should be: poll, push, track_list +SELECT + 'position_history.source' AS check_column, + source AS unexpected_value, + COUNT(*) AS occurrences +FROM tracksolid.position_history +WHERE source NOT IN ('poll', 'push', 'track_list') + AND source IS NOT NULL +GROUP BY source + +UNION ALL + +-- trips.source should be: poll, push +SELECT + 'trips.source', + source, + COUNT(*) +FROM tracksolid.trips +WHERE source NOT IN ('poll', 'push') + AND source IS NOT NULL +GROUP BY source + +UNION ALL + +-- alarms.source should be: poll, push +SELECT + 'alarms.source', + source, + COUNT(*) +FROM tracksolid.alarms +WHERE source NOT IN ('poll', 'push') + AND source IS NOT NULL +GROUP BY source; diff --git a/db_audit/checks/null_integrity.sql b/db_audit/checks/null_integrity.sql new file mode 100644 index 0000000..719e961 --- /dev/null +++ b/db_audit/checks/null_integrity.sql @@ -0,0 +1,30 @@ +-- NULL integrity check across telemetry tables +SELECT + 'position_history.imei_null' AS check_field, + COUNT(*) AS null_count +FROM tracksolid.position_history +WHERE imei IS NULL +UNION ALL +SELECT + 'position_history.gps_time_null', + COUNT(*) +FROM tracksolid.position_history +WHERE gps_time IS NULL +UNION ALL +SELECT + 'alarms.imei_null', + COUNT(*) +FROM tracksolid.alarms +WHERE imei IS NULL +UNION ALL +SELECT + 'alarms.alarm_type_null', + COUNT(*) +FROM tracksolid.alarms +WHERE alarm_type IS NULL +UNION ALL +SELECT + 'obd_readings.imei_null', + COUNT(*) +FROM tracksolid.obd_readings +WHERE imei IS NULL; diff --git a/db_audit/checks/stale_devices.sql b/db_audit/checks/stale_devices.sql new file mode 100644 index 0000000..98072ee --- /dev/null +++ b/db_audit/checks/stale_devices.sql @@ -0,0 +1,14 @@ +-- Stale devices: enabled devices with no GPS fix in last 2 hours +SELECT + d.imei, + d.device_name, + lp.gps_time AS last_gps_time, + EXTRACT(EPOCH FROM (NOW() - lp.gps_time)) / 3600 AS hours_since_fix +FROM tracksolid.devices d +LEFT JOIN tracksolid.live_positions lp ON lp.imei = d.imei +WHERE d.enabled_flag = 1 + AND ( + lp.gps_time IS NULL + OR lp.gps_time < NOW() - INTERVAL '2 hours' + ) +ORDER BY hours_since_fix DESC NULLS FIRST; diff --git a/db_audit/run_audit.py b/db_audit/run_audit.py new file mode 100644 index 0000000..3be25a7 --- /dev/null +++ b/db_audit/run_audit.py @@ -0,0 +1,161 @@ +""" +db_audit/run_audit.py — Fireside Communications Fleet Telemetry DB Audit +━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ +Runs six health checks against the production TimescaleDB. +Writes results to tracksolid.health_checks for Grafana monitoring. +Exits with code 1 if any critical finding is detected. + +Usage: + DATABASE_URL=postgresql://... python db_audit/run_audit.py + +Checks: + stale_devices - Enabled devices with no GPS fix in >2h + null_integrity - NULL imei/gps_time in telemetry tables + distance_outliers - Trip distances <0 or >500 km in last 7 days + duplicate_positions - Duplicate (imei, gps_time) in position_history + data_gaps - Enabled devices with zero data in last 7 days + enum_drift - Unexpected values in source/severity columns +━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ +""" + +from __future__ import annotations + +import json +import os +import sys +import logging +from pathlib import Path + +import psycopg2 +import psycopg2.extras + +# ── Config ──────────────────────────────────────────────────────────────────── + +DATABASE_URL = os.environ.get("DATABASE_URL") +if not DATABASE_URL: + print("ERROR: DATABASE_URL environment variable is required.", file=sys.stderr) + sys.exit(1) + +CHECKS_DIR = Path(__file__).parent / "checks" +SCHEMA_FILE = Path(__file__).parent / "schema" / "health_checks_table.sql" + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) +log = logging.getLogger("db_audit") + +# ── Status Logic ────────────────────────────────────────────────────────────── + +# Checks that produce CRITICAL status if they return any rows +CRITICAL_CHECKS = {"null_integrity", "duplicate_positions"} + +# Checks that produce WARNING status if they return any rows +WARNING_CHECKS = {"stale_devices", "distance_outliers", "data_gaps", "enum_drift"} + + +def _determine_status(check_name: str, rows: list[dict]) -> str: + if not rows: + return "ok" + # null_integrity returns counts — critical if any count > 0 + if check_name == "null_integrity": + has_nulls = any(row.get("null_count", 0) > 0 for row in rows) + return "critical" if has_nulls else "ok" + if check_name in CRITICAL_CHECKS: + return "critical" + if check_name in WARNING_CHECKS: + return "warning" + return "ok" + + +# ── Core Runner ─────────────────────────────────────────────────────────────── + +def run_checks() -> bool: + """Run all checks. Returns True if any critical finding found.""" + conn = psycopg2.connect(DATABASE_URL, options="-c client_encoding=UTF8") + conn.autocommit = False + + try: + with conn.cursor() as cur: + # Ensure health_checks table exists + cur.execute(SCHEMA_FILE.read_text()) + conn.commit() + log.info("health_checks table verified.") + + has_critical = False + results = [] + + for sql_file in sorted(CHECKS_DIR.glob("*.sql")): + check_name = sql_file.stem + sql = sql_file.read_text() + + log.info("Running check: %s ...", check_name) + + with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: + cur.execute(sql) + rows = [dict(r) for r in cur.fetchall()] + + status = _determine_status(check_name, rows) + row_count = len(rows) + + # Serialize rows (convert non-JSON-serializable types) + detail = _safe_json(rows[:50]) # Cap at 50 rows to keep detail manageable + + with conn.cursor() as cur: + cur.execute(""" + INSERT INTO tracksolid.health_checks + (check_name, status, detail, row_count) + VALUES (%s, %s, %s, %s) + """, (check_name, status, json.dumps(detail), row_count)) + conn.commit() + + icon = "✅" if status == "ok" else ("⚠️ " if status == "warning" else "🔴") + log.info(" %s %s: %s (%d rows)", icon, check_name, status.upper(), row_count) + results.append((check_name, status, row_count)) + + if status == "critical": + has_critical = True + + # Summary + print("\n" + "="*60) + print("DB AUDIT SUMMARY") + print("="*60) + for name, status, count in results: + indicator = "OK" if status == "ok" else ("WARN" if status == "warning" else "CRIT") + print(f" [{indicator:4s}] {name:<30} ({count} rows)") + print("="*60) + + if has_critical: + print("RESULT: CRITICAL findings detected. Exit code 1.") + else: + print("RESULT: No critical findings. Exit code 0.") + print() + + return has_critical + + finally: + conn.close() + + +def _safe_json(rows: list[dict]) -> list[dict]: + """Convert any non-JSON-serializable values (Decimal, datetime) to strings.""" + import decimal + from datetime import datetime, date + + def convert(v): + if isinstance(v, (datetime, date)): + return v.isoformat() + if isinstance(v, decimal.Decimal): + return float(v) + return v + + return [{k: convert(v) for k, v in row.items()} for row in rows] + + +# ── Entry Point ─────────────────────────────────────────────────────────────── + +if __name__ == "__main__": + log.info("Starting DB audit...") + has_critical = run_checks() + sys.exit(1 if has_critical else 0) diff --git a/db_audit/schema/health_checks_table.sql b/db_audit/schema/health_checks_table.sql new file mode 100644 index 0000000..ce1c568 --- /dev/null +++ b/db_audit/schema/health_checks_table.sql @@ -0,0 +1,13 @@ +-- Idempotent: safe to run on every audit start +CREATE TABLE IF NOT EXISTS tracksolid.health_checks ( + id BIGSERIAL PRIMARY KEY, + checked_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + check_name TEXT NOT NULL, + status TEXT NOT NULL CHECK (status IN ('ok', 'warning', 'critical')), + detail JSONB, + row_count INT +); + +-- Index for Grafana time-range queries +CREATE INDEX IF NOT EXISTS health_checks_checked_at_idx + ON tracksolid.health_checks (checked_at DESC);