From 458e0be1011b2304fb98299d1a07f30db0364fe1 Mon Sep 17 00:00:00 2001 From: david kiania Date: Thu, 25 Jun 2026 14:52:11 +0300 Subject: [PATCH] feat(tools): import_drivers_csv reads raw Tracksolid Pro export format MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add auto-detecting header-alias layer (_EXPORT_HEADER_ALIASES) so import_drivers_csv ingests the raw Tracksolid Pro device export (Title-Case headers: IMEI, Driver Name, LicensePlateNo., Model, Vehicle Model, …) in addition to the original snake_case schema-mirror CSV. Add customer_name/device_group to the write set, extend the driver-placeholder skip list (Unassigned/UG/UG Crane), and exclude activation_time/expiration/device_group (date-only/casing churn that would degrade precise API-set values). Used to apply the 260625 telematics quality-check file: 152 devices updated (device_name, driver_name, plate, vehicle type, depot_address). Co-Authored-By: Claude Opus 4.8 --- tools/__init__.py | 4 + tools/import_drivers_csv.py | 341 ++++++++++++++++++++++++++++++++++++ 2 files changed, 345 insertions(+) create mode 100644 tools/__init__.py create mode 100644 tools/import_drivers_csv.py diff --git a/tools/__init__.py b/tools/__init__.py new file mode 100644 index 0000000..b224a53 --- /dev/null +++ b/tools/__init__.py @@ -0,0 +1,4 @@ +# Manual / one-shot operator tools (NOT part of the runtime services). +# Run from the repo root as a module so `import ts_shared_rev` resolves, e.g.: +# docker exec ingest_worker python -m tools.sync_driver_audit +# See tools/README.md. diff --git a/tools/import_drivers_csv.py b/tools/import_drivers_csv.py new file mode 100644 index 0000000..48ad9cd --- /dev/null +++ b/tools/import_drivers_csv.py @@ -0,0 +1,341 @@ +""" +import_drivers_csv.py — Fireside Communications · Driver & Vehicle CSV Import +━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ +One-shot script: reads the snake_case Fireside Group vehicle CSV +(`20260427_FSG_Vehicles_mitieng.csv`), compares each row against the +current `tracksolid.devices` values, and updates the DB. + +The CSV columns mirror the DB schema directly — no inference. Cells with the +literal string "NULL" are treated as missing. + +Fields imported (per Phase 0.1 of the Business Analytics redesign plan): + Identity : driver_name, driver_phone, vehicle_number, vehicle_name, + vehicle_models, mc_type, device_name + SIM : sim, iccid, imsi + Lifecycle : activation_time, expiration + Business meta : assigned_city, cost_centre, assigned_route, + vehicle_category, vehicle_brand, fuel_100km, depot_address + +`depot_geom` (PostGIS Point) is intentionally NOT imported — needs WKT and +isn't present as coordinates in the CSV. Set it via a follow-up migration +when geofences are loaded. + +Usage: + # Dry-run — shows diff, writes nothing + python import_drivers_csv.py + + # Filter to a single IMEI (dry-run) + python import_drivers_csv.py --imei 862798052707896 + + # Apply all changes to DB + python import_drivers_csv.py --apply + + # Only fill fields that are currently NULL in the DB (never overwrite) + python import_drivers_csv.py --only-null --apply + + # Use a different CSV + python import_drivers_csv.py --csv path/to/file.csv + +Pre-requisite: + Migrations 02, 05, 06 must be applied (they add the metadata columns). +━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ +""" + +import argparse +import csv +from pathlib import Path + +from ts_shared_rev import clean, clean_num, clean_ts, get_conn, get_logger + +log = get_logger("csv_import") + +DEFAULT_CSV_PATH = Path(__file__).parent / "data" / "20260427_FSG_Vehicles_mitieng.csv" + +# Columns fetched from DB for diff comparison. +DB_COLS = [ + "imei", + # Identity + "driver_name", "driver_phone", "vehicle_number", "vehicle_name", + "vehicle_models", "mc_type", "device_name", "customer_name", "device_group", + # SIM + "sim", "iccid", "imsi", + # Lifecycle + "activation_time", "expiration", + # Business metadata (Phase 0.1 additions) + "assigned_city", "cost_centre", "assigned_route", + "vehicle_category", "vehicle_brand", "fuel_100km", "depot_address", +] + +# Driver Name values that are placeholders — skip writing driver_name for these. +_DRIVER_SKIP = {"identification", "ug", "ug crane", "unassigned"} + +# Tracksolid Pro raw-export headers (Title-Case) → canonical snake_case DB columns. +# Used when the CSV is the raw platform export rather than the snake_case schema-mirror. +# Unmapped export columns (Mileage, Account, MAC, SN, VIN, install metadata, …) are ignored: +# `Mileage`/`Account` are deliberately excluded (pipeline-owned), the rest have no DB column. +_EXPORT_HEADER_ALIASES = { + "IMEI": "imei", + "Driver Name": "driver_name", + "Telephone": "driver_phone", + "LicensePlateNo.": "vehicle_number", + "Device Name": "device_name", + "Model": "mc_type", + "Vehicle Model": "vehicle_models", + "Vehicle Brand": "vehicle_brand", + "Fuel/100km": "fuel_100km", + "SIM": "sim", + "ICCID": "iccid", + "IMSI": "imsi", + "Activated Date": "activation_time", + "Subscription Expiration": "expiration", + "Customer Name": "customer_name", + "Group": "device_group", + "Installation Address": "depot_address", +} + +# Columns intentionally NOT written from the raw Tracksolid export: its date +# columns are date-only and would degrade the precise API-set timestamps, and +# device_group differs only by capitalisation ("Default group"→"Default Group"). +# (User decision, 2026-06-25 quality-check import.) +_EXCLUDE_COLS = {"activation_time", "expiration", "device_group"} + +# Columns that need an explicit cast in the UPDATE statement. +_TIMESTAMPTZ_COLS = {"activation_time", "expiration"} +_NUMERIC_COLS = {"fuel_100km"} + + +def _read(row: dict, col: str) -> str | None: + """Read a CSV column treating literal 'NULL'/'None' (case-insensitive) as missing.""" + v = clean(row.get(col)) + if v is None: + return None + return None if v.upper() in ("NULL", "NONE") else v + + +def _read_num(row: dict, col: str) -> float | None: + v = _read(row, col) + return clean_num(v) if v is not None else None + + +def _read_ts(row: dict, col: str) -> str | None: + v = _read(row, col) + return clean_ts(v) if v is not None else None + + +def load_csv(csv_path: Path) -> dict[str, dict]: + """Load CSV into a dict keyed by IMEI. + + Accepts two header styles: the snake_case schema-mirror CSV (original + `tools/data/` workflow) and the raw Tracksolid Pro export (Title-Case + headers). The export form is auto-detected and its headers are remapped to + the canonical snake_case names via `_EXPORT_HEADER_ALIASES`; unmapped + columns are dropped. + """ + rows: dict[str, dict] = {} + with open(csv_path, encoding="utf-8-sig", newline="") as f: + reader = csv.DictReader(f) + is_export = "IMEI" in (reader.fieldnames or []) + if is_export: + log.info("Detected Tracksolid raw-export header format; remapping columns.") + for raw in reader: + row = ( + {snake: raw.get(orig) for orig, snake in _EXPORT_HEADER_ALIASES.items()} + if is_export + else raw + ) + imei = (row.get("imei") or "").strip() + if not imei: + continue + rows[imei] = row + log.info("CSV loaded: %d rows from %s", len(rows), csv_path.name) + return rows + + +def load_db_devices() -> dict[str, dict]: + """Fetch current device rows from DB, keyed by IMEI.""" + devices: dict[str, dict] = {} + with get_conn() as conn: + with conn.cursor() as cur: + cur.execute(f"SELECT {', '.join(DB_COLS)} FROM tracksolid.devices") + col_names = [d[0] for d in cur.description] + for row in cur.fetchall(): + rec = dict(zip(col_names, row)) + devices[rec["imei"]] = rec + log.info("DB loaded: %d devices", len(devices)) + return devices + + +def build_update(csv_row: dict, db_row: dict | None, only_null: bool) -> dict[str, object]: + """ + Return a dict of column→new_value for fields that need updating. + When only_null=True, skip any DB column that already has a value. + The driver_name column is skipped for placeholder-labelled devices. + """ + driver_raw = (_read(csv_row, "driver_name") or "") + is_placeholder = driver_raw.lower() in _DRIVER_SKIP + if driver_raw.lower() == "identification": + return {} + + proposed: dict[str, object] = { + # Identity + "driver_phone": _read(csv_row, "driver_phone"), + "vehicle_number": _read(csv_row, "vehicle_number"), + "vehicle_name": _read(csv_row, "vehicle_name"), + "vehicle_models": _read(csv_row, "vehicle_models"), + "mc_type": _read(csv_row, "mc_type"), + "device_name": _read(csv_row, "device_name"), + "customer_name": _read(csv_row, "customer_name"), + "device_group": _read(csv_row, "device_group"), + # SIM + "sim": _read(csv_row, "sim"), + "iccid": _read(csv_row, "iccid"), + "imsi": _read(csv_row, "imsi"), + # Lifecycle + "activation_time": _read_ts(csv_row, "activation_time"), + "expiration": _read_ts(csv_row, "expiration"), + # Business metadata + "assigned_city": _read(csv_row, "assigned_city"), + "cost_centre": _read(csv_row, "cost_centre"), + "assigned_route": _read(csv_row, "assigned_route"), + "vehicle_category": _read(csv_row, "vehicle_category"), + "vehicle_brand": _read(csv_row, "vehicle_brand"), + "fuel_100km": _read_num(csv_row, "fuel_100km"), + "depot_address": _read(csv_row, "depot_address"), + } + if not is_placeholder and driver_raw: + proposed["driver_name"] = driver_raw + + # Drop None values — no point sending NULL to overwrite NULL — and any + # explicitly excluded columns (precision-degrading / cosmetic churn). + proposed = { + k: v for k, v in proposed.items() + if v is not None and k not in _EXCLUDE_COLS + } + + if not only_null or db_row is None: + return proposed + + # only_null: drop any column that already has a non-null value in the DB. + return {k: v for k, v in proposed.items() if db_row.get(k) is None} + + +def print_diff(imei: str, updates: dict[str, object], db_row: dict | None) -> None: + """Pretty-print what will change for one device.""" + if not updates: + return + db = db_row or {} + print(f"\n IMEI {imei}:") + for col, new_val in sorted(updates.items()): + old_val = db.get(col) + if str(old_val) != str(new_val): + print(f" {col:<20} {str(old_val):<30} → {new_val}") + + +def _set_clause(col: str) -> str: + """SQL fragment for `col = ...` honouring per-column casts.""" + if col in _TIMESTAMPTZ_COLS: + return f"{col} = COALESCE(%s::TIMESTAMPTZ, {col})" + if col in _NUMERIC_COLS: + # %s already a float; no NULLIF dance needed. + return f"{col} = COALESCE(%s::NUMERIC, {col})" + return f"{col} = COALESCE(NULLIF(%s, ''), {col})" + + +def _placeholder(col: str) -> str: + """SQL fragment for a single VALUES placeholder honouring per-column casts.""" + if col in _TIMESTAMPTZ_COLS: + return "%s::TIMESTAMPTZ" + if col in _NUMERIC_COLS: + return "%s::NUMERIC" + return "%s" + + +def run(apply: bool, only_null: bool, filter_imei: str | None, csv_path: Path) -> None: + csv_rows = load_csv(csv_path) + db_rows = load_db_devices() + + if filter_imei: + csv_rows = {k: v for k, v in csv_rows.items() if k == filter_imei} + if not csv_rows: + print(f"IMEI {filter_imei} not found in CSV.") + return + + updated = inserted = skipped = no_change = 0 + + with get_conn() as conn: + with conn.cursor() as cur: + for imei, csv_row in csv_rows.items(): + db_row = db_rows.get(imei) + updates = build_update(csv_row, db_row, only_null) + + if not updates: + driver_raw = (_read(csv_row, "driver_name") or "").lower() + if driver_raw == "identification": + skipped += 1 + else: + no_change += 1 + continue + + if db_row is None: + # Device not yet synced from API — insert a stub row so + # incoming alarms / positions don't trip the FK constraint. + print(f"\n [NEW] IMEI {imei}:") + for col, new_val in sorted(updates.items()): + print(f" {col:<20} → {new_val}") + if apply: + cols = ["imei"] + list(updates.keys()) + vals = [imei] + list(updates.values()) + placeholders = ["%s"] + [_placeholder(c) for c in updates.keys()] + cur.execute( + f"INSERT INTO tracksolid.devices ({', '.join(cols)}) " + f"VALUES ({', '.join(placeholders)}) " + "ON CONFLICT (imei) DO NOTHING", + vals, + ) + inserted += 1 + continue + + print_diff(imei, updates, db_row) + + if apply: + set_clauses = [_set_clause(c) for c in updates.keys()] + params = list(updates.values()) + set_clauses.append("updated_at = NOW()") + params.append(imei) + cur.execute( + f"UPDATE tracksolid.devices SET {', '.join(set_clauses)} WHERE imei = %s", + params, + ) + updated += 1 + else: + updated += 1 # count as "would update" in dry-run + + mode = "APPLIED" if apply else "DRY-RUN" + print(f"\n{'='*60}") + print(f" {mode} COMPLETE") + print(f"{'='*60}") + print(f" Would update / updated : {updated}") + print(f" Would insert / inserted : {inserted}") + print(f" No change needed : {no_change}") + print(f" Skipped (Identification): {skipped}") + if not apply: + print("\n Run with --apply to commit changes.") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Import driver/vehicle details from CSV into tracksolid.devices" + ) + parser.add_argument("--apply", action="store_true", help="Write changes to DB (default: dry-run)") + parser.add_argument("--only-null", action="store_true", help="Only update fields currently NULL in the DB") + parser.add_argument("--imei", default=None, help="Limit to a single IMEI") + parser.add_argument("--csv", default=str(DEFAULT_CSV_PATH), + help=f"Path to the CSV (default: {DEFAULT_CSV_PATH.name})") + args = parser.parse_args() + + csv_path = Path(args.csv) + if not csv_path.exists(): + log.error("CSV file not found: %s", csv_path) + raise SystemExit(1) + + run(apply=args.apply, only_null=args.only_null, filter_imei=args.imei, csv_path=csv_path)