""" import_drivers_csv.py — Fireside Communications · Driver & Vehicle CSV Import ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ One-shot script: reads 20260414_FS__Logistics - final_fixed.csv, compares each row against the current tracksolid.devices values, and updates the DB. Usage: # Dry-run — shows diff, writes nothing python import_drivers_csv.py # Filter to a single IMEI (dry-run) python import_drivers_csv.py --imei 862798052707896 # Apply all changes to DB python import_drivers_csv.py --apply # Only fill fields that are currently NULL in the DB (never overwrite) python import_drivers_csv.py --only-null --apply Pre-requisite: Migration 06 must be applied first (adds assigned_city / cost_centre columns). ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ """ import argparse import csv import os import sys import time from datetime import date from pathlib import Path from ts_shared_rev import clean, clean_num, clean_ts, get_conn, get_logger log = get_logger("csv_import") CSV_PATH = Path(__file__).parent / "20260414_FS__Logistics - final_fixed.csv" # Columns fetched from DB for comparison DB_COLS = [ "imei", "driver_name", "driver_phone", "vehicle_number", "vehicle_name", "vehicle_models", "cost_centre", "sim", "iccid", "imsi", "mc_type", "activation_time", "expiration", "device_name", "assigned_city", ] # Driver Name values that are placeholders — skip writing driver_name for these _DRIVER_SKIP = {"identification", "ug"} def _infer_city(plate: str) -> str | None: """Derive assigned_city from license plate prefix.""" p = (plate or "").strip().upper() if p.startswith("UMA") or p.startswith("UAG"): return "KLA" if p.startswith("K"): return "NBO" return None def _clean_date(v: str) -> str | None: """Accept YYYY-MM-DD and return as ISO string suitable for TIMESTAMPTZ cast.""" s = (v or "").strip() if not s: return None try: date.fromisoformat(s) return s except ValueError: return None def load_csv() -> dict[str, dict]: """Load CSV into a dict keyed by IMEI.""" rows: dict[str, dict] = {} with open(CSV_PATH, encoding="utf-8-sig", newline="") as f: for row in csv.DictReader(f): imei = (row.get("IMEI") or "").strip() if not imei: continue rows[imei] = row log.info("CSV loaded: %d rows from %s", len(rows), CSV_PATH.name) return rows def load_db_devices() -> dict[str, dict]: """Fetch current device rows from DB, keyed by IMEI.""" devices: dict[str, dict] = {} with get_conn() as conn: with conn.cursor() as cur: cur.execute(f"SELECT {', '.join(DB_COLS)} FROM tracksolid.devices") col_names = [d[0] for d in cur.description] for row in cur.fetchall(): rec = dict(zip(col_names, row)) devices[rec["imei"]] = rec log.info("DB loaded: %d devices", len(devices)) return devices def build_update(csv_row: dict, db_row: dict | None, only_null: bool) -> dict[str, object]: """ Return a dict of column→new_value for fields that need updating. When only_null=True, skip any DB column that already has a value. The driver_name column is skipped for placeholder-labelled devices. """ driver_raw = clean(csv_row.get("Driver Name")) or "" plate = clean(csv_row.get("License Plate No.")) or "" is_placeholder = driver_raw.lower() in _DRIVER_SKIP skip_row = driver_raw.lower() == "identification" if skip_row: return {} proposed: dict[str, object] = { "vehicle_number": clean(plate), "vehicle_name": clean(plate), "vehicle_models": clean(csv_row.get("Vehicle Model")), "cost_centre": clean(csv_row.get("Department")), "sim": clean(csv_row.get("SIM")), "iccid": clean(csv_row.get("ICCID")), "imsi": clean(csv_row.get("IMSI")), "mc_type": clean(csv_row.get("Model")), "activation_time": _clean_date(csv_row.get("Activated Date", "")), "expiration": _clean_date(csv_row.get("Subscription Expiration", "")), "driver_phone": clean(csv_row.get("Telephone")), "assigned_city": _infer_city(plate), } if not is_placeholder: proposed["driver_name"] = driver_raw or None # Drop None values — no point sending a NULL to overwrite another NULL proposed = {k: v for k, v in proposed.items() if v is not None} if not only_null or db_row is None: return proposed # only_null: drop any column that already has a non-null value in the DB return { k: v for k, v in proposed.items() if db_row.get(k) is None } def print_diff(imei: str, updates: dict[str, object], db_row: dict | None) -> None: """Pretty-print what will change for one device.""" if not updates: return db = db_row or {} print(f"\n IMEI {imei}:") for col, new_val in sorted(updates.items()): old_val = db.get(col) if old_val != new_val: print(f" {col:<20} {str(old_val):<30} → {new_val}") def run(apply: bool, only_null: bool, filter_imei: str | None) -> None: csv_rows = load_csv() db_rows = load_db_devices() if filter_imei: csv_rows = {k: v for k, v in csv_rows.items() if k == filter_imei} if not csv_rows: print(f"IMEI {filter_imei} not found in CSV.") return updated = skipped = no_change = not_in_db = 0 with get_conn() as conn: with conn.cursor() as cur: for imei, csv_row in csv_rows.items(): db_row = db_rows.get(imei) updates = build_update(csv_row, db_row, only_null) if not updates: # Either an "Identification" placeholder or nothing to change driver_raw = (csv_row.get("Driver Name") or "").strip().lower() if driver_raw == "identification": skipped += 1 else: no_change += 1 continue if db_row is None: not_in_db += 1 log.warning("IMEI %s in CSV but NOT in DB — skipping.", imei) continue print_diff(imei, updates, db_row) if apply: set_clauses = [] params = [] for col, val in updates.items(): if col in ("activation_time", "expiration"): set_clauses.append(f"{col} = COALESCE(%s::TIMESTAMPTZ, {col})") else: set_clauses.append( f"{col} = COALESCE(NULLIF(%s, ''), {col})" ) params.append(str(val) if val is not None else None) set_clauses.append("updated_at = NOW()") params.append(imei) cur.execute( f"UPDATE tracksolid.devices SET {', '.join(set_clauses)} WHERE imei = %s", params, ) updated += 1 else: updated += 1 # count as "would update" in dry-run mode = "APPLIED" if apply else "DRY-RUN" print(f"\n{'='*60}") print(f" {mode} COMPLETE") print(f"{'='*60}") print(f" Would update / updated : {updated}") print(f" No change needed : {no_change}") print(f" Skipped (Identification): {skipped}") print(f" IMEI not in DB : {not_in_db}") if not apply: print("\n Run with --apply to commit changes.") if __name__ == "__main__": parser = argparse.ArgumentParser(description="Import driver/vehicle details from CSV into tracksolid.devices") parser.add_argument("--apply", action="store_true", help="Write changes to DB (default: dry-run)") parser.add_argument("--only-null", action="store_true", help="Only update fields currently NULL in the DB") parser.add_argument("--imei", default=None, help="Limit to a single IMEI") args = parser.parse_args() run(apply=args.apply, only_null=args.only_null, filter_imei=args.imei)