""" db_audit/run_audit.py — Fireside Communications Fleet Telemetry DB Audit ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ Runs six health checks against the production TimescaleDB. Writes results to tracksolid.health_checks for Grafana monitoring. Exits with code 1 if any critical finding is detected. Usage: DATABASE_URL=postgresql://... python db_audit/run_audit.py Checks: stale_devices - Enabled devices with no GPS fix in >2h null_integrity - NULL imei/gps_time in telemetry tables distance_outliers - Trip distances <0 or >500 km in last 7 days duplicate_positions - Duplicate (imei, gps_time) in position_history data_gaps - Enabled devices with zero data in last 7 days enum_drift - Unexpected values in source/severity columns ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ """ from __future__ import annotations import json import os import sys import logging from pathlib import Path import psycopg2 import psycopg2.extras # ── Config ──────────────────────────────────────────────────────────────────── DATABASE_URL = os.environ.get("DATABASE_URL") if not DATABASE_URL: print("ERROR: DATABASE_URL environment variable is required.", file=sys.stderr) sys.exit(1) CHECKS_DIR = Path(__file__).parent / "checks" SCHEMA_FILE = Path(__file__).parent / "schema" / "health_checks_table.sql" logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) log = logging.getLogger("db_audit") # ── Status Logic ────────────────────────────────────────────────────────────── # Checks that produce CRITICAL status if they return any rows CRITICAL_CHECKS = {"null_integrity", "duplicate_positions"} # Checks that produce WARNING status if they return any rows WARNING_CHECKS = {"stale_devices", "distance_outliers", "data_gaps", "enum_drift"} def _determine_status(check_name: str, rows: list[dict]) -> str: if not rows: return "ok" # null_integrity returns counts — critical if any count > 0 if check_name == "null_integrity": has_nulls = any(row.get("null_count", 0) > 0 for row in rows) return "critical" if has_nulls else "ok" if check_name in CRITICAL_CHECKS: return "critical" if check_name in WARNING_CHECKS: return "warning" return "ok" # ── Core Runner ─────────────────────────────────────────────────────────────── def run_checks() -> bool: """Run all checks. Returns True if any critical finding found.""" conn = psycopg2.connect(DATABASE_URL, options="-c client_encoding=UTF8") conn.autocommit = False try: with conn.cursor() as cur: # Ensure health_checks table exists cur.execute(SCHEMA_FILE.read_text()) conn.commit() log.info("health_checks table verified.") has_critical = False results = [] for sql_file in sorted(CHECKS_DIR.glob("*.sql")): check_name = sql_file.stem sql = sql_file.read_text() log.info("Running check: %s ...", check_name) with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute(sql) rows = [dict(r) for r in cur.fetchall()] status = _determine_status(check_name, rows) row_count = len(rows) # Serialize rows (convert non-JSON-serializable types) detail = _safe_json(rows[:50]) # Cap at 50 rows to keep detail manageable with conn.cursor() as cur: cur.execute(""" INSERT INTO tracksolid.health_checks (check_name, status, detail, row_count) VALUES (%s, %s, %s, %s) """, (check_name, status, json.dumps(detail), row_count)) conn.commit() icon = "✅" if status == "ok" else ("⚠️ " if status == "warning" else "🔴") log.info(" %s %s: %s (%d rows)", icon, check_name, status.upper(), row_count) results.append((check_name, status, row_count)) if status == "critical": has_critical = True # Summary print("\n" + "="*60) print("DB AUDIT SUMMARY") print("="*60) for name, status, count in results: indicator = "OK" if status == "ok" else ("WARN" if status == "warning" else "CRIT") print(f" [{indicator:4s}] {name:<30} ({count} rows)") print("="*60) if has_critical: print("RESULT: CRITICAL findings detected. Exit code 1.") else: print("RESULT: No critical findings. Exit code 0.") print() return has_critical finally: conn.close() def _safe_json(rows: list[dict]) -> list[dict]: """Convert any non-JSON-serializable values (Decimal, datetime) to strings.""" import decimal from datetime import datetime, date def convert(v): if isinstance(v, (datetime, date)): return v.isoformat() if isinstance(v, decimal.Decimal): return float(v) return v return [{k: convert(v) for k, v in row.items()} for row in rows] # ── Entry Point ─────────────────────────────────────────────────────────────── if __name__ == "__main__": log.info("Starting DB audit...") has_critical = run_checks() sys.exit(1 if has_critical else 0)