feat: business analytics expansion, driver CSV import, live DB state docs #1

Merged
kianiadee merged 11 commits from quality-program-2026-04-12 into main 2026-04-18 06:04:11 +00:00
10 changed files with 316 additions and 0 deletions
Showing only changes of commit 20d3ddb841 - Show all commits

View file

@ -0,0 +1,20 @@
name: DB Audit
on:
schedule:
- cron: "0 3 * * *" # 03:00 UTC = 06:00 EAT daily
workflow_dispatch: # Also runnable manually from Forgejo UI
jobs:
audit:
runs-on: self-hosted
steps:
- uses: actions/checkout@v4
- name: Install dependencies
run: pip install psycopg2-binary
- name: Run DB audit
run: python db_audit/run_audit.py
env:
DATABASE_URL: ${{ secrets.DATABASE_URL }}

0
db_audit/__init__.py Normal file
View file

View file

@ -0,0 +1,19 @@
-- Data gaps: enabled devices with no position_history or trips in last 7 days
SELECT
d.imei,
d.device_name,
d.enabled_flag,
MAX(ph.gps_time) AS last_position,
MAX(t.start_time) AS last_trip
FROM tracksolid.devices d
LEFT JOIN tracksolid.position_history ph
ON ph.imei = d.imei
AND ph.gps_time > NOW() - INTERVAL '7 days'
LEFT JOIN tracksolid.trips t
ON t.imei = d.imei
AND t.start_time > NOW() - INTERVAL '7 days'
WHERE d.enabled_flag = 1
GROUP BY d.imei, d.device_name, d.enabled_flag
HAVING MAX(ph.gps_time) IS NULL
AND MAX(t.start_time) IS NULL
ORDER BY d.imei;

View file

@ -0,0 +1,14 @@
-- Distance outliers: trips with impossible or suspicious distance in last 7 days
SELECT
imei,
start_time,
end_time,
distance_km,
source
FROM tracksolid.trips
WHERE start_time > NOW() - INTERVAL '7 days'
AND (
distance_km < 0
OR distance_km > 500
)
ORDER BY distance_km DESC;

View file

@ -0,0 +1,11 @@
-- Duplicate (imei, gps_time) pairs in position_history
-- Should always return 0 rows if ON CONFLICT DO NOTHING is working correctly
SELECT
imei,
gps_time,
COUNT(*) AS duplicate_count
FROM tracksolid.position_history
WHERE gps_time > NOW() - INTERVAL '7 days'
GROUP BY imei, gps_time
HAVING COUNT(*) > 1
ORDER BY duplicate_count DESC;

View file

@ -0,0 +1,34 @@
-- Enum drift: unexpected values in source and other constrained columns
-- position_history.source should be: poll, push, track_list
SELECT
'position_history.source' AS check_column,
source AS unexpected_value,
COUNT(*) AS occurrences
FROM tracksolid.position_history
WHERE source NOT IN ('poll', 'push', 'track_list')
AND source IS NOT NULL
GROUP BY source
UNION ALL
-- trips.source should be: poll, push
SELECT
'trips.source',
source,
COUNT(*)
FROM tracksolid.trips
WHERE source NOT IN ('poll', 'push')
AND source IS NOT NULL
GROUP BY source
UNION ALL
-- alarms.source should be: poll, push
SELECT
'alarms.source',
source,
COUNT(*)
FROM tracksolid.alarms
WHERE source NOT IN ('poll', 'push')
AND source IS NOT NULL
GROUP BY source;

View file

@ -0,0 +1,30 @@
-- NULL integrity check across telemetry tables
SELECT
'position_history.imei_null' AS check_field,
COUNT(*) AS null_count
FROM tracksolid.position_history
WHERE imei IS NULL
UNION ALL
SELECT
'position_history.gps_time_null',
COUNT(*)
FROM tracksolid.position_history
WHERE gps_time IS NULL
UNION ALL
SELECT
'alarms.imei_null',
COUNT(*)
FROM tracksolid.alarms
WHERE imei IS NULL
UNION ALL
SELECT
'alarms.alarm_type_null',
COUNT(*)
FROM tracksolid.alarms
WHERE alarm_type IS NULL
UNION ALL
SELECT
'obd_readings.imei_null',
COUNT(*)
FROM tracksolid.obd_readings
WHERE imei IS NULL;

View file

@ -0,0 +1,14 @@
-- Stale devices: enabled devices with no GPS fix in last 2 hours
SELECT
d.imei,
d.device_name,
lp.gps_time AS last_gps_time,
EXTRACT(EPOCH FROM (NOW() - lp.gps_time)) / 3600 AS hours_since_fix
FROM tracksolid.devices d
LEFT JOIN tracksolid.live_positions lp ON lp.imei = d.imei
WHERE d.enabled_flag = 1
AND (
lp.gps_time IS NULL
OR lp.gps_time < NOW() - INTERVAL '2 hours'
)
ORDER BY hours_since_fix DESC NULLS FIRST;

161
db_audit/run_audit.py Normal file
View file

@ -0,0 +1,161 @@
"""
db_audit/run_audit.py Fireside Communications Fleet Telemetry DB Audit
Runs six health checks against the production TimescaleDB.
Writes results to tracksolid.health_checks for Grafana monitoring.
Exits with code 1 if any critical finding is detected.
Usage:
DATABASE_URL=postgresql://... python db_audit/run_audit.py
Checks:
stale_devices - Enabled devices with no GPS fix in >2h
null_integrity - NULL imei/gps_time in telemetry tables
distance_outliers - Trip distances <0 or >500 km in last 7 days
duplicate_positions - Duplicate (imei, gps_time) in position_history
data_gaps - Enabled devices with zero data in last 7 days
enum_drift - Unexpected values in source/severity columns
"""
from __future__ import annotations
import json
import os
import sys
import logging
from pathlib import Path
import psycopg2
import psycopg2.extras
# ── Config ────────────────────────────────────────────────────────────────────
DATABASE_URL = os.environ.get("DATABASE_URL")
if not DATABASE_URL:
print("ERROR: DATABASE_URL environment variable is required.", file=sys.stderr)
sys.exit(1)
CHECKS_DIR = Path(__file__).parent / "checks"
SCHEMA_FILE = Path(__file__).parent / "schema" / "health_checks_table.sql"
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
log = logging.getLogger("db_audit")
# ── Status Logic ──────────────────────────────────────────────────────────────
# Checks that produce CRITICAL status if they return any rows
CRITICAL_CHECKS = {"null_integrity", "duplicate_positions"}
# Checks that produce WARNING status if they return any rows
WARNING_CHECKS = {"stale_devices", "distance_outliers", "data_gaps", "enum_drift"}
def _determine_status(check_name: str, rows: list[dict]) -> str:
if not rows:
return "ok"
# null_integrity returns counts — critical if any count > 0
if check_name == "null_integrity":
has_nulls = any(row.get("null_count", 0) > 0 for row in rows)
return "critical" if has_nulls else "ok"
if check_name in CRITICAL_CHECKS:
return "critical"
if check_name in WARNING_CHECKS:
return "warning"
return "ok"
# ── Core Runner ───────────────────────────────────────────────────────────────
def run_checks() -> bool:
"""Run all checks. Returns True if any critical finding found."""
conn = psycopg2.connect(DATABASE_URL, options="-c client_encoding=UTF8")
conn.autocommit = False
try:
with conn.cursor() as cur:
# Ensure health_checks table exists
cur.execute(SCHEMA_FILE.read_text())
conn.commit()
log.info("health_checks table verified.")
has_critical = False
results = []
for sql_file in sorted(CHECKS_DIR.glob("*.sql")):
check_name = sql_file.stem
sql = sql_file.read_text()
log.info("Running check: %s ...", check_name)
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(sql)
rows = [dict(r) for r in cur.fetchall()]
status = _determine_status(check_name, rows)
row_count = len(rows)
# Serialize rows (convert non-JSON-serializable types)
detail = _safe_json(rows[:50]) # Cap at 50 rows to keep detail manageable
with conn.cursor() as cur:
cur.execute("""
INSERT INTO tracksolid.health_checks
(check_name, status, detail, row_count)
VALUES (%s, %s, %s, %s)
""", (check_name, status, json.dumps(detail), row_count))
conn.commit()
icon = "" if status == "ok" else ("⚠️ " if status == "warning" else "🔴")
log.info(" %s %s: %s (%d rows)", icon, check_name, status.upper(), row_count)
results.append((check_name, status, row_count))
if status == "critical":
has_critical = True
# Summary
print("\n" + "="*60)
print("DB AUDIT SUMMARY")
print("="*60)
for name, status, count in results:
indicator = "OK" if status == "ok" else ("WARN" if status == "warning" else "CRIT")
print(f" [{indicator:4s}] {name:<30} ({count} rows)")
print("="*60)
if has_critical:
print("RESULT: CRITICAL findings detected. Exit code 1.")
else:
print("RESULT: No critical findings. Exit code 0.")
print()
return has_critical
finally:
conn.close()
def _safe_json(rows: list[dict]) -> list[dict]:
"""Convert any non-JSON-serializable values (Decimal, datetime) to strings."""
import decimal
from datetime import datetime, date
def convert(v):
if isinstance(v, (datetime, date)):
return v.isoformat()
if isinstance(v, decimal.Decimal):
return float(v)
return v
return [{k: convert(v) for k, v in row.items()} for row in rows]
# ── Entry Point ───────────────────────────────────────────────────────────────
if __name__ == "__main__":
log.info("Starting DB audit...")
has_critical = run_checks()
sys.exit(1 if has_critical else 0)

View file

@ -0,0 +1,13 @@
-- Idempotent: safe to run on every audit start
CREATE TABLE IF NOT EXISTS tracksolid.health_checks (
id BIGSERIAL PRIMARY KEY,
checked_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
check_name TEXT NOT NULL,
status TEXT NOT NULL CHECK (status IN ('ok', 'warning', 'critical')),
detail JSONB,
row_count INT
);
-- Index for Grafana time-range queries
CREATE INDEX IF NOT EXISTS health_checks_checked_at_idx
ON tracksolid.health_checks (checked_at DESC);