"""Import the FSG roster CSV into domain.accounts / domain.vehicles / domain.devices. Idempotent — re-running gives the same end state. Per-CSV-row UPSERT inside one outer transaction; ROLLBACK on --dry-run, COMMIT otherwise. Orphaned vehicle rows (placeholders left behind after remap) are deleted at the end. Usage: PGPASSWORD=... DATABASE_URL=postgresql://... \\ .venv/bin/python scripts/import_csv_roster.py [--dry-run] """ from __future__ import annotations import argparse import csv import os import sys from collections import Counter from typing import Any import psycopg def _norm(v: str | None) -> str | None: if v is None: return None s = v.strip() if s in {"", "NULL", "null", "None"}: return None return s def _device_type_for(mc_type: str | None) -> str: return "camera" if (mc_type or "").upper() == "JC400P" else "tracker" def _lifecycle_for(enabled_flag: str | None) -> str: return "active" if enabled_flag == "1" else "suspended" def _row_from_csv(raw: dict[str, str]) -> dict[str, Any]: return { "imei": _norm(raw.get("imei")), "device_name": _norm(raw.get("device_name")), "mc_type": _norm(raw.get("mc_type")), "plate": _norm(raw.get("vehicle_name")), "vehicle_class": _norm(raw.get("vehicle_models")), "driver_name": _norm(raw.get("driver_name")), "driver_phone": _norm(raw.get("driver_phone")), "sim": _norm(raw.get("sim")), "iccid": _norm(raw.get("iccid")), "account": _norm(raw.get("account")), "customer_name": _norm(raw.get("customer_name")), "device_group": _norm(raw.get("device_group")), "activation_time": _norm(raw.get("activation_time")), "expiration": _norm(raw.get("expiration")), "enabled_flag": _norm(raw.get("enabled_flag")), "cost_centre": _norm(raw.get("cost_centre")), "assigned_city": _norm(raw.get("assigned_city")) or _norm(raw.get("city")), } def process_one(cur: psycopg.Cursor, row: dict[str, Any]) -> dict[str, Any]: """Apply one CSV row. Returns an action dict for summary reporting.""" imei = row["imei"] if not imei: return {"action": "skipped_no_imei"} action = {"imei": imei, "remapped_from": None, "remapped_to": None, "created_vehicle": False} # 1. Ensure account if row["account"]: cur.execute( """INSERT INTO domain.accounts (account_id, name, app_key) VALUES (%s, %s, '') ON CONFLICT (account_id) DO UPDATE SET name = EXCLUDED.name""", (row["account"], row["customer_name"] or row["account"]), ) # 2. Resolve vehicle_id cur.execute("SELECT vehicle_id FROM domain.devices WHERE imei = %s", (imei,)) existing = cur.fetchone() current_vehicle_id = existing[0] if existing else None action["pre_existed"] = existing is not None if row["plate"]: cur.execute("SELECT vehicle_id FROM domain.vehicles WHERE plate = %s", (row["plate"],)) match = cur.fetchone() if match is not None: target_vehicle_id = match[0] if current_vehicle_id is not None and current_vehicle_id != target_vehicle_id: # Remap state and device from current to target cur.execute( "UPDATE state.position_history SET vehicle_id = %s WHERE imei = %s", (target_vehicle_id, imei), ) cur.execute( "UPDATE state.live_positions SET vehicle_id = %s WHERE imei = %s", (target_vehicle_id, imei), ) action["remapped_from"] = current_vehicle_id action["remapped_to"] = target_vehicle_id else: cur.execute( """INSERT INTO domain.vehicles (plate, cost_centre, assigned_city, vehicle_class) VALUES (%s, %s, %s, %s) RETURNING vehicle_id""", (row["plate"], row["cost_centre"], row["assigned_city"], row["vehicle_class"]), ) target_vehicle_id = cur.fetchone()[0] action["created_vehicle"] = True if current_vehicle_id is not None and current_vehicle_id != target_vehicle_id: cur.execute( "UPDATE state.position_history SET vehicle_id = %s WHERE imei = %s", (target_vehicle_id, imei), ) cur.execute( "UPDATE state.live_positions SET vehicle_id = %s WHERE imei = %s", (target_vehicle_id, imei), ) action["remapped_from"] = current_vehicle_id action["remapped_to"] = target_vehicle_id elif current_vehicle_id is not None: target_vehicle_id = current_vehicle_id else: # No CSV plate, no existing device → placeholder plate_holder = f"IMEI-{imei[-6:]}" cur.execute( """INSERT INTO domain.vehicles (plate) VALUES (%s) ON CONFLICT (plate) DO UPDATE SET plate = EXCLUDED.plate RETURNING vehicle_id""", (plate_holder,), ) target_vehicle_id = cur.fetchone()[0] action["created_vehicle"] = True # 3. Apply vehicle attributes (only overwrite when CSV has a value) cur.execute( """UPDATE domain.vehicles SET cost_centre = COALESCE(%s, cost_centre), assigned_city = COALESCE(%s, assigned_city), vehicle_class = COALESCE(%s, vehicle_class), updated_at = now() WHERE vehicle_id = %s""", (row["cost_centre"], row["assigned_city"], row["vehicle_class"], target_vehicle_id), ) # 4. UPSERT device row. # # API-vs-CSV ownership of `domain.devices` fields: # # API-managed (populated by the projector's _resolve_device): # imei, account_id, vehicle_id, device_type, lifecycle, activation_at # → for these we use COALESCE(existing, csv) on UPDATE — CSV only fills # gaps; never overwrites a value the projector already set. # # CSV-only (no API source, added by migration 15): # driver_name, driver_phone, iccid, expiration_at, device_group, roster_synced_at # → CSV is the canonical source; always overwrite. # # Hybrid: # model → API surface has this via state.live_positions.mc_type but the # projector doesn't write it to domain.devices, so the CSV # effectively fills a NULL gap. COALESCE-fill. # # `vehicle_id` is structural — the per-row resolve logic above already # decided this; the UPSERT just stores it. cur.execute( """INSERT INTO domain.devices ( imei, account_id, vehicle_id, device_type, model, lifecycle, activation_at, driver_name, driver_phone, iccid, expiration_at, device_group, roster_synced_at ) VALUES ( %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, now() ) ON CONFLICT (imei) DO UPDATE SET account_id = COALESCE(domain.devices.account_id, EXCLUDED.account_id), vehicle_id = EXCLUDED.vehicle_id, device_type = COALESCE(domain.devices.device_type, EXCLUDED.device_type), model = COALESCE(domain.devices.model, EXCLUDED.model), lifecycle = COALESCE(domain.devices.lifecycle, EXCLUDED.lifecycle), activation_at = COALESCE(domain.devices.activation_at, EXCLUDED.activation_at), driver_name = EXCLUDED.driver_name, driver_phone = EXCLUDED.driver_phone, iccid = COALESCE(domain.devices.iccid, EXCLUDED.iccid), expiration_at = COALESCE(domain.devices.expiration_at, EXCLUDED.expiration_at), device_group = EXCLUDED.device_group, roster_synced_at = EXCLUDED.roster_synced_at, updated_at = now()""", ( imei, row["account"] or "Fireside Communications", target_vehicle_id, _device_type_for(row["mc_type"]), row["mc_type"], _lifecycle_for(row["enabled_flag"]), row["activation_time"], row["driver_name"], row["driver_phone"], row["iccid"], row["expiration"], row["device_group"], ), ) action["target_vehicle_id"] = target_vehicle_id action["action"] = "updated" if action["pre_existed"] else "inserted" return action def delete_orphan_vehicles(cur: psycopg.Cursor) -> int: cur.execute( """DELETE FROM domain.vehicles v WHERE NOT EXISTS (SELECT 1 FROM domain.devices d WHERE d.vehicle_id = v.vehicle_id) AND NOT EXISTS (SELECT 1 FROM state.live_positions lp WHERE lp.vehicle_id = v.vehicle_id) AND NOT EXISTS (SELECT 1 FROM state.position_history ph WHERE ph.vehicle_id = v.vehicle_id) """ ) return cur.rowcount def main() -> None: p = argparse.ArgumentParser() p.add_argument("csv_path") p.add_argument("--dry-run", action="store_true", help="Roll back the transaction at the end") args = p.parse_args() dsn = os.environ.get("DATABASE_URL") if not dsn: sys.exit("DATABASE_URL env var required") with open(args.csv_path) as f: rows = [_row_from_csv(r) for r in csv.DictReader(f)] print(f"loaded {len(rows)} CSV rows ({'DRY-RUN' if args.dry_run else 'APPLY'})") actions: list[dict[str, Any]] = [] with psycopg.connect(dsn, autocommit=False) as conn: with conn.cursor() as cur: for row in rows: actions.append(process_one(cur, row)) orphans_deleted = delete_orphan_vehicles(cur) # summary inserted = sum(1 for a in actions if a.get("action") == "inserted") updated = sum(1 for a in actions if a.get("action") == "updated") skipped = sum(1 for a in actions if a.get("action") == "skipped_no_imei") new_vehicles = sum(1 for a in actions if a.get("created_vehicle")) remaps = [a for a in actions if a.get("remapped_from") is not None] print(f"\ndevices : inserted={inserted} updated={updated} skipped={skipped}") print(f"vehicles: created={new_vehicles} orphans_deleted={orphans_deleted}") print(f"remaps : {len(remaps)} devices moved to a different vehicle_id") for a in remaps[:10]: print(f" imei={a['imei']} {a['remapped_from']} -> {a['remapped_to']}") if len(remaps) > 10: print(f" ... and {len(remaps) - 10} more") # Final-state snapshot cur.execute("SELECT count(*) FROM domain.vehicles WHERE cost_centre IS NOT NULL") with_cc = cur.fetchone()[0] cur.execute("SELECT count(*) FROM domain.devices WHERE driver_name IS NOT NULL") with_drv = cur.fetchone()[0] cur.execute("SELECT count(*) FROM domain.vehicles WHERE plate LIKE 'IMEI-%'") placeholders = cur.fetchone()[0] cur.execute("SELECT count(*) FROM domain.vehicles") v_total = cur.fetchone()[0] cur.execute("SELECT count(*) FROM domain.devices") d_total = cur.fetchone()[0] print() print(f"final state: {d_total} devices / {v_total} vehicles " f"({with_drv} with driver_name, {with_cc} vehicles with cost_centre, " f"{placeholders} still IMEI- placeholders)") cur.execute("""SELECT cost_centre, count(*) FROM domain.vehicles GROUP BY 1 ORDER BY 2 DESC LIMIT 10""") print("cost-centre histogram (top 10):") for cc, n in cur.fetchall(): print(f" {cc!r:<14} {n}") if args.dry_run: print("\n(dry-run: rolling back)") conn.rollback() else: conn.commit() print("\n(committed)") if __name__ == "__main__": main()