tracksolid_timescale_grafan.../audit_device_reconciliation.py

206 lines
8.1 KiB
Python
Raw Permalink Normal View History

feat(analytics): Phase 0 — analytics-config migration and CSV importer rewrite Phase 0 of the three-stakeholder analytics redesign: - 08_analytics_config.sql: ops.cost_rates + ops.kpi_targets with seed fuel rates (KES 195/L NBO+MBA, UGX 5200/L KLA) and 6 seed KPI targets (utilisation_pct, idle_pct global+osp-patrol, fuel_kes_per_100km, mttr_hours, alarms_per_100km). Granted SELECT to grafana_ro. Wired into run_migrations.py MIGRATIONS. - import_drivers_csv.py: full rewrite for the new Mitieng CSV (20260427_FSG_Vehicles_mitieng.csv). Snake_case columns, drops _infer_city() plate-prefix logic in favour of reading assigned_city directly. Adds cost_centre, assigned_route, vehicle_category, vehicle_brand, fuel_100km, depot_address. Treats the literal "NULL" string as missing. Reuses clean(), clean_num(), clean_ts(), get_conn(), get_logger() from ts_shared_rev. Special-cases numeric and timestamptz columns in the UPDATE clause. - audit_device_reconciliation.py: read-only audit comparing the CSV against tracksolid.devices. Reports per-account row counts, IMEIs on one side only, and devices on both sides whose metadata is still NULL. - 260427_device_reconciliation.md + 260427_audit_output.txt: Phase 0.2 reconciliation record. First run: DB has 172 devices, CSV has 162, delta +10 (10 IMEIs in DB-only, mostly fireside-account auto-syncs). Importer run with --only-null --apply filled 154 rows; coverage now assigned_city 152/172, cost_centre 150/172. Applied to stage on 2026-04-27 23:35 UTC. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-27 20:42:37 +00:00
"""
audit_device_reconciliation.py 162-vs-182 device delta audit
Phase 0.2 of the Business Analytics redesign.
Compares `20260427_FSG_Vehicles_mitieng.csv` (162 rows) to `tracksolid.devices`
(182 rows at last check) and reports:
1. Per-account row counts on each side.
2. IMEIs in DB but not in CSV (the unexplained delta typically auto-synced
API rows with no business metadata).
3. IMEIs in CSV but not in DB (should be empty after a successful import).
4. IMEIs present on both sides where DB metadata is still NULL on key fields.
Usage:
# Read-only audit, prints to stdout.
python audit_device_reconciliation.py
# Same, but write output to a file (useful for the reconciliation report).
python audit_device_reconciliation.py --out 260427_audit_output.txt
# Use a different CSV path
python audit_device_reconciliation.py --csv path/to/file.csv
This script makes no writes safe to run on prod.
"""
import argparse
import csv
import sys
from collections import Counter
from pathlib import Path
from ts_shared_rev import get_conn, get_logger
log = get_logger("device_audit")
DEFAULT_CSV_PATH = Path(__file__).parent / "20260427_FSG_Vehicles_mitieng.csv"
# Fields whose NULL-ness on devices that DO appear in CSV would indicate
# a stale import.
META_FIELDS = ("assigned_city", "cost_centre", "assigned_route",
"vehicle_category", "vehicle_brand", "fuel_100km",
"depot_address", "driver_name", "vehicle_number")
def load_csv_index(csv_path: Path) -> dict[str, dict]:
rows: dict[str, dict] = {}
with open(csv_path, encoding="utf-8-sig", newline="") as f:
for row in csv.DictReader(f):
imei = (row.get("imei") or "").strip()
if imei:
rows[imei] = row
return rows
def load_db_index() -> dict[str, dict]:
cols = (
"imei", "account", "assigned_city", "city", "cost_centre",
"assigned_route", "vehicle_category", "vehicle_brand", "fuel_100km",
"depot_address", "driver_name", "vehicle_number", "device_name",
"last_synced_at", "created_at",
)
devices: dict[str, dict] = {}
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(f"SELECT {', '.join(cols)} FROM tracksolid.devices")
names = [d[0] for d in cur.description]
for row in cur.fetchall():
rec = dict(zip(names, row))
devices[rec["imei"]] = rec
return devices
def _csv_account(row: dict) -> str:
return (row.get("account") or "").strip() or "(blank)"
def _db_account(row: dict) -> str:
return (row.get("account") or "").strip() or "(blank)"
def _is_blank(v) -> bool:
if v is None:
return True
s = str(v).strip()
return s == "" or s.upper() == "NULL"
def main() -> int:
parser = argparse.ArgumentParser(description="Reconcile vehicle CSV against tracksolid.devices")
parser.add_argument("--csv", default=str(DEFAULT_CSV_PATH))
parser.add_argument("--out", default=None, help="Write report to this file in addition to stdout")
args = parser.parse_args()
csv_path = Path(args.csv)
if not csv_path.exists():
log.error("CSV not found: %s", csv_path)
return 1
csv_idx = load_csv_index(csv_path)
db_idx = load_db_index()
csv_imeis = set(csv_idx)
db_imeis = set(db_idx)
only_db = sorted(db_imeis - csv_imeis)
only_csv = sorted(csv_imeis - db_imeis)
both = csv_imeis & db_imeis
csv_accounts = Counter(_csv_account(r) for r in csv_idx.values())
db_accounts = Counter(_db_account(r) for r in db_idx.values())
out_lines: list[str] = []
def w(line: str = "") -> None:
out_lines.append(line)
print(line)
w("=" * 76)
w(" Device reconciliation — CSV vs tracksolid.devices")
w("=" * 76)
w(f" CSV file : {csv_path.name}")
w(f" CSV row count : {len(csv_idx)}")
w(f" DB row count : {len(db_idx)}")
w(f" Delta (DB-CSV) : {len(only_db):+d}")
w("")
w("─ Per-account breakdown ─────────────────────────────────────────────────")
all_accounts = sorted(set(csv_accounts) | set(db_accounts))
w(f" {'account':<30} {'CSV':>6} {'DB':>6} {'delta':>7}")
for acct in all_accounts:
c, d = csv_accounts.get(acct, 0), db_accounts.get(acct, 0)
w(f" {acct:<30} {c:>6} {d:>6} {d-c:>+7}")
w("")
w(f"─ IMEIs in DB but NOT in CSV ({len(only_db)}) ─────────────────────────────")
if not only_db:
w(" (none — DB is a strict subset of CSV)")
else:
w(f" {'imei':<18} {'account':<22} {'city':<10} {'last_synced_at':<28} {'device_name'}")
for imei in only_db:
r = db_idx[imei]
w(f" {imei:<18} {(r.get('account') or ''):<22} "
f"{(r.get('assigned_city') or r.get('city') or ''):<10} "
f"{str(r.get('last_synced_at') or ''):<28} "
f"{r.get('device_name') or ''}")
w("")
w(f"─ IMEIs in CSV but NOT in DB ({len(only_csv)}) ─────────────────────────────")
if not only_csv:
w(" (none — every CSV row has a corresponding device row)")
else:
w(f" {'imei':<18} {'account':<22} {'assigned_city':<14} {'cost_centre':<14} {'driver_name'}")
for imei in only_csv:
r = csv_idx[imei]
w(f" {imei:<18} {(r.get('account') or ''):<22} "
f"{(r.get('assigned_city') or ''):<14} "
f"{(r.get('cost_centre') or ''):<14} "
f"{r.get('driver_name') or ''}")
w("")
# Stale-metadata audit: in both, but DB is still NULL on key fields.
stale: list[tuple[str, list[str]]] = []
for imei in sorted(both):
d = db_idx[imei]
blanks = [f for f in META_FIELDS if _is_blank(d.get(f))]
if blanks:
stale.append((imei, blanks))
w(f"─ Devices in both, but DB metadata still NULL ({len(stale)}) ──────────────")
if not stale:
w(" (none — import looks complete on intersecting devices)")
else:
w(" Likely cause: import_drivers_csv.py has not been re-run with --apply")
w(" against the new CSV, or rows had 'Identification' placeholders.")
w("")
w(f" {'imei':<18} blank_fields")
for imei, blanks in stale[:30]: # cap output
w(f" {imei:<18} {', '.join(blanks)}")
if len(stale) > 30:
w(f" ... and {len(stale) - 30} more")
w("")
w("─ Suggested next step ───────────────────────────────────────────────────")
if only_db:
w(" Inspect the IMEIs above. Decide one of:")
w(" (a) Prune — delete from tracksolid.devices if they are stale "
"test/decommissioned units.")
w(" (b) Leave-as-NULL — keep them as auto-synced API rows; their "
"metadata stays NULL until added to a future CSV.")
w(" (c) Addendum — add them to the CSV (or a sidecar CSV) and re-run "
"import_drivers_csv.py --apply.")
w(" Document the choice in 260427_device_reconciliation.md.")
else:
w(" CSV and DB are reconciled. No further action.")
if args.out:
Path(args.out).write_text("\n".join(out_lines), encoding="utf-8")
print(f"\nReport also written to {args.out}")
return 0
if __name__ == "__main__":
sys.exit(main())