161 lines
6.2 KiB
Python
161 lines
6.2 KiB
Python
"""
|
|
db_audit/run_audit.py — Fireside Communications Fleet Telemetry DB Audit
|
|
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
|
Runs six health checks against the production TimescaleDB.
|
|
Writes results to tracksolid.health_checks for Grafana monitoring.
|
|
Exits with code 1 if any critical finding is detected.
|
|
|
|
Usage:
|
|
DATABASE_URL=postgresql://... python db_audit/run_audit.py
|
|
|
|
Checks:
|
|
stale_devices - Enabled devices with no GPS fix in >2h
|
|
null_integrity - NULL imei/gps_time in telemetry tables
|
|
distance_outliers - Trip distances <0 or >500 km in last 7 days
|
|
duplicate_positions - Duplicate (imei, gps_time) in position_history
|
|
data_gaps - Enabled devices with zero data in last 7 days
|
|
enum_drift - Unexpected values in source/severity columns
|
|
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
import logging
|
|
from pathlib import Path
|
|
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
|
|
# ── Config ────────────────────────────────────────────────────────────────────
|
|
|
|
DATABASE_URL = os.environ.get("DATABASE_URL")
|
|
if not DATABASE_URL:
|
|
print("ERROR: DATABASE_URL environment variable is required.", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
CHECKS_DIR = Path(__file__).parent / "checks"
|
|
SCHEMA_FILE = Path(__file__).parent / "schema" / "health_checks_table.sql"
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(message)s",
|
|
datefmt="%Y-%m-%d %H:%M:%S",
|
|
)
|
|
log = logging.getLogger("db_audit")
|
|
|
|
# ── Status Logic ──────────────────────────────────────────────────────────────
|
|
|
|
# Checks that produce CRITICAL status if they return any rows
|
|
CRITICAL_CHECKS = {"null_integrity", "duplicate_positions"}
|
|
|
|
# Checks that produce WARNING status if they return any rows
|
|
WARNING_CHECKS = {"stale_devices", "distance_outliers", "data_gaps", "enum_drift"}
|
|
|
|
|
|
def _determine_status(check_name: str, rows: list[dict]) -> str:
|
|
if not rows:
|
|
return "ok"
|
|
# null_integrity returns counts — critical if any count > 0
|
|
if check_name == "null_integrity":
|
|
has_nulls = any(row.get("null_count", 0) > 0 for row in rows)
|
|
return "critical" if has_nulls else "ok"
|
|
if check_name in CRITICAL_CHECKS:
|
|
return "critical"
|
|
if check_name in WARNING_CHECKS:
|
|
return "warning"
|
|
return "ok"
|
|
|
|
|
|
# ── Core Runner ───────────────────────────────────────────────────────────────
|
|
|
|
def run_checks() -> bool:
|
|
"""Run all checks. Returns True if any critical finding found."""
|
|
conn = psycopg2.connect(DATABASE_URL, options="-c client_encoding=UTF8")
|
|
conn.autocommit = False
|
|
|
|
try:
|
|
with conn.cursor() as cur:
|
|
# Ensure health_checks table exists
|
|
cur.execute(SCHEMA_FILE.read_text())
|
|
conn.commit()
|
|
log.info("health_checks table verified.")
|
|
|
|
has_critical = False
|
|
results = []
|
|
|
|
for sql_file in sorted(CHECKS_DIR.glob("*.sql")):
|
|
check_name = sql_file.stem
|
|
sql = sql_file.read_text()
|
|
|
|
log.info("Running check: %s ...", check_name)
|
|
|
|
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
|
|
cur.execute(sql)
|
|
rows = [dict(r) for r in cur.fetchall()]
|
|
|
|
status = _determine_status(check_name, rows)
|
|
row_count = len(rows)
|
|
|
|
# Serialize rows (convert non-JSON-serializable types)
|
|
detail = _safe_json(rows[:50]) # Cap at 50 rows to keep detail manageable
|
|
|
|
with conn.cursor() as cur:
|
|
cur.execute("""
|
|
INSERT INTO tracksolid.health_checks
|
|
(check_name, status, detail, row_count)
|
|
VALUES (%s, %s, %s, %s)
|
|
""", (check_name, status, json.dumps(detail), row_count))
|
|
conn.commit()
|
|
|
|
icon = "✅" if status == "ok" else ("⚠️ " if status == "warning" else "🔴")
|
|
log.info(" %s %s: %s (%d rows)", icon, check_name, status.upper(), row_count)
|
|
results.append((check_name, status, row_count))
|
|
|
|
if status == "critical":
|
|
has_critical = True
|
|
|
|
# Summary
|
|
print("\n" + "="*60)
|
|
print("DB AUDIT SUMMARY")
|
|
print("="*60)
|
|
for name, status, count in results:
|
|
indicator = "OK" if status == "ok" else ("WARN" if status == "warning" else "CRIT")
|
|
print(f" [{indicator:4s}] {name:<30} ({count} rows)")
|
|
print("="*60)
|
|
|
|
if has_critical:
|
|
print("RESULT: CRITICAL findings detected. Exit code 1.")
|
|
else:
|
|
print("RESULT: No critical findings. Exit code 0.")
|
|
print()
|
|
|
|
return has_critical
|
|
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def _safe_json(rows: list[dict]) -> list[dict]:
|
|
"""Convert any non-JSON-serializable values (Decimal, datetime) to strings."""
|
|
import decimal
|
|
from datetime import datetime, date
|
|
|
|
def convert(v):
|
|
if isinstance(v, (datetime, date)):
|
|
return v.isoformat()
|
|
if isinstance(v, decimal.Decimal):
|
|
return float(v)
|
|
return v
|
|
|
|
return [{k: convert(v) for k, v in row.items()} for row in rows]
|
|
|
|
|
|
# ── Entry Point ───────────────────────────────────────────────────────────────
|
|
|
|
if __name__ == "__main__":
|
|
log.info("Starting DB audit...")
|
|
has_critical = run_checks()
|
|
sys.exit(1 if has_critical else 0)
|