From 495bb2bd71f42f70988fff84cea368cb17b00402 Mon Sep 17 00:00:00 2001 From: kianiadee Date: Mon, 25 May 2026 00:41:32 +0300 Subject: [PATCH] Rollback CSV roster import (mig 17): re-split vehicles, drop CSV columns The CSV-based roster import (mig 15+16 and scripts/import_csv_roster.py) merged vehicle rows that differed only by _Track / _CAM suffix, dropping the active fleet count from 144 to 124. Reverting the whole thing. Mig 17 in one transaction: - Re-splits devices by parsed plate from device_name (same regex as mig 14, preserving _Track as separate vehicle) - Restores serve.fn_live_view to its v3 body (no d.driver_name/phone refs that would break once the columns are gone) - Drops the six CSV-only columns from domain.devices - Deletes schema_migrations rows for the deleted 15/16 - Logs final counts via RAISE NOTICE Apply on VPS: psql -f db/migrations/20260601000017_rollback_csv_import.sql --- .../20260601000015_devices_csv_columns.sql | 24 -- ...=> 20260601000017_rollback_csv_import.sql} | 121 +++++++- scripts/import_csv_roster.py | 289 ------------------ 3 files changed, 111 insertions(+), 323 deletions(-) delete mode 100644 db/migrations/20260601000015_devices_csv_columns.sql rename db/migrations/{20260601000016_prefer_csv_driver.sql => 20260601000017_rollback_csv_import.sql} (55%) delete mode 100644 scripts/import_csv_roster.py diff --git a/db/migrations/20260601000015_devices_csv_columns.sql b/db/migrations/20260601000015_devices_csv_columns.sql deleted file mode 100644 index c233fe1..0000000 --- a/db/migrations/20260601000015_devices_csv_columns.sql +++ /dev/null @@ -1,24 +0,0 @@ --- migrate:up --- --- Tactical columns on domain.devices to land CSV-imported roster data that --- has no permanent home yet. Superseded by PRD F3.10's full driver roster --- in P3 (domain.drivers + domain.driver_assignments) — at that point these --- columns get dropped after the data is migrated. - -ALTER TABLE domain.devices - ADD COLUMN IF NOT EXISTS driver_name text, - ADD COLUMN IF NOT EXISTS driver_phone text, - ADD COLUMN IF NOT EXISTS iccid text, - ADD COLUMN IF NOT EXISTS expiration_at timestamptz, - ADD COLUMN IF NOT EXISTS device_group text, - ADD COLUMN IF NOT EXISTS roster_synced_at timestamptz; - --- migrate:down - -ALTER TABLE domain.devices - DROP COLUMN IF EXISTS driver_name, - DROP COLUMN IF EXISTS driver_phone, - DROP COLUMN IF EXISTS iccid, - DROP COLUMN IF EXISTS expiration_at, - DROP COLUMN IF EXISTS device_group, - DROP COLUMN IF EXISTS roster_synced_at; diff --git a/db/migrations/20260601000016_prefer_csv_driver.sql b/db/migrations/20260601000017_rollback_csv_import.sql similarity index 55% rename from db/migrations/20260601000016_prefer_csv_driver.sql rename to db/migrations/20260601000017_rollback_csv_import.sql index 863f31e..5e7fd58 100644 --- a/db/migrations/20260601000016_prefer_csv_driver.sql +++ b/db/migrations/20260601000017_rollback_csv_import.sql @@ -1,9 +1,75 @@ -- migrate:up -- --- serve.fn_live_view now prefers the canonical driver_name from --- domain.devices (CSV-imported by scripts/import_csv_roster.py) over the --- heuristic serve._driver_name() parse of device_name. Also surfaces --- driver_phone so the popup can show it. +-- Full rollback of the CSV import (migrations 15 + 16 + the data mutations +-- performed by scripts/import_csv_roster.py). After this runs, the database +-- is back to the post-migration-14 shape: ~144 vehicles split per device_name +-- (preserving _Track / _CAM suffix vehicles), no CSV-only columns, and +-- serve.fn_live_view back to its v3 body (driver_name from heuristic only). +-- +-- Why a single forward migration instead of `dbmate rollback`: +-- - The CSV import mutated row state, not just schema. A schema-only down +-- would leave 124 merged vehicles in place. +-- - dbmate rollback runs the down blocks in reverse, but mig 16's down only +-- drops the function — it doesn't restore the previous body. Doing it +-- forward lets us be explicit about the target state. + +-- --------------------------------------------------------------------------- +-- 1. Re-split vehicles by parsed plate from device_name +-- --------------------------------------------------------------------------- + +DO $rollback$ +DECLARE + rec RECORD; + new_vid integer; +BEGIN + ALTER TABLE domain.vehicles DROP CONSTRAINT IF EXISTS vehicles_plate_key; + + FOR rec IN + SELECT + d.imei, + d.vehicle_id AS current_vid, + v.plate AS current_plate, + regexp_replace( + (regexp_match(lp.device_name, '^.* - (.+)$'))[1], + '_(cam|CAM)$', '' + ) AS parsed_plate + FROM domain.devices d + JOIN state.live_positions lp ON lp.imei = d.imei + JOIN domain.vehicles v ON v.vehicle_id = d.vehicle_id + WHERE lp.device_name LIKE '% - %' + LOOP + IF rec.parsed_plate IS NULL OR rec.parsed_plate = '' THEN CONTINUE; END IF; + IF rec.parsed_plate !~ '[A-Z]' OR rec.parsed_plate !~ '[0-9]' THEN CONTINUE; END IF; + IF rec.parsed_plate = rec.current_plate THEN CONTINUE; END IF; + + SELECT vehicle_id INTO new_vid + FROM domain.vehicles + WHERE plate = rec.parsed_plate; + + IF new_vid IS NULL THEN + INSERT INTO domain.vehicles (plate) + VALUES (rec.parsed_plate) + RETURNING vehicle_id INTO new_vid; + END IF; + + UPDATE domain.devices SET vehicle_id = new_vid WHERE imei = rec.imei; + UPDATE state.live_positions SET vehicle_id = new_vid WHERE imei = rec.imei; + UPDATE state.position_history SET vehicle_id = new_vid WHERE imei = rec.imei; + END LOOP; + + -- Delete vehicles that no device or state row references anymore + DELETE FROM domain.vehicles v + WHERE NOT EXISTS (SELECT 1 FROM domain.devices d WHERE d.vehicle_id = v.vehicle_id) + AND NOT EXISTS (SELECT 1 FROM state.live_positions lp WHERE lp.vehicle_id = v.vehicle_id) + AND NOT EXISTS (SELECT 1 FROM state.position_history ph WHERE ph.vehicle_id = v.vehicle_id); + + ALTER TABLE domain.vehicles ADD CONSTRAINT vehicles_plate_key UNIQUE (plate); +END +$rollback$; + +-- --------------------------------------------------------------------------- +-- 2. Restore serve.fn_live_view to the v3 body (driver_name heuristic only) +-- --------------------------------------------------------------------------- DROP FUNCTION IF EXISTS serve.fn_live_view(jsonb); @@ -30,8 +96,6 @@ BEGIN lp.mc_type, lp.current_mileage_km, lp.gps_signal, lp.satellites, lp.device_name, lp.pos_type, d.device_type, d.activation_at, - d.driver_name AS driver_name_csv, - d.driver_phone AS driver_phone_csv, v.vehicle_id, v.plate, v.cost_centre, v.assigned_city FROM state.live_positions lp JOIN domain.devices d ON d.imei = lp.imei @@ -95,8 +159,7 @@ BEGIN 'vehicle_id', e.vehicle_id, 'plate', e.plate, 'plate_short', serve._label_short(e.device_name, e.plate), - 'driver_name', COALESCE(e.driver_name_csv, serve._driver_name(e.device_name)), - 'driver_phone', e.driver_phone_csv, + 'driver_name', serve._driver_name(e.device_name), 'imei', e.imei, 'device_type', e.device_type, 'device_name', e.device_name, @@ -142,6 +205,44 @@ BEGIN END; $$; --- migrate:down +-- --------------------------------------------------------------------------- +-- 3. Drop the CSV-only columns from domain.devices +-- --------------------------------------------------------------------------- -DROP FUNCTION IF EXISTS serve.fn_live_view(jsonb); +ALTER TABLE domain.devices + DROP COLUMN IF EXISTS driver_name, + DROP COLUMN IF EXISTS driver_phone, + DROP COLUMN IF EXISTS iccid, + DROP COLUMN IF EXISTS expiration_at, + DROP COLUMN IF EXISTS device_group, + DROP COLUMN IF EXISTS roster_synced_at; + +-- --------------------------------------------------------------------------- +-- 4. Remove the dead schema_migrations rows for the deleted migrations +-- --------------------------------------------------------------------------- +-- +-- 20260601000015 (devices_csv_columns) and 20260601000016 (prefer_csv_driver) +-- have been deleted from db/migrations/ — drop their tracker rows so dbmate +-- status stays consistent with the files on disk. + +DELETE FROM public.schema_migrations + WHERE version IN ('20260601000015', '20260601000016'); + +-- --------------------------------------------------------------------------- +-- 5. Sanity check — log the resulting counts +-- --------------------------------------------------------------------------- + +DO $check$ +DECLARE + v_count integer; + d_count integer; +BEGIN + SELECT count(*) INTO v_count FROM domain.vehicles; + SELECT count(*) INTO d_count FROM domain.devices; + RAISE NOTICE 'rollback complete: % vehicles, % devices', v_count, d_count; +END +$check$; + +-- migrate:down +-- No-op: this migration is itself a rollback. To re-apply the CSV import, +-- re-run migrations 15 + 16 and scripts/import_csv_roster.py. diff --git a/scripts/import_csv_roster.py b/scripts/import_csv_roster.py deleted file mode 100644 index c7a1044..0000000 --- a/scripts/import_csv_roster.py +++ /dev/null @@ -1,289 +0,0 @@ -"""Import the FSG roster CSV into domain.accounts / domain.vehicles / domain.devices. - -Idempotent — re-running gives the same end state. Per-CSV-row UPSERT inside -one outer transaction; ROLLBACK on --dry-run, COMMIT otherwise. Orphaned -vehicle rows (placeholders left behind after remap) are deleted at the end. - -Usage: - PGPASSWORD=... DATABASE_URL=postgresql://... \\ - .venv/bin/python scripts/import_csv_roster.py [--dry-run] -""" - -from __future__ import annotations - -import argparse -import csv -import os -import sys -from collections import Counter -from typing import Any - -import psycopg - - -def _norm(v: str | None) -> str | None: - if v is None: - return None - s = v.strip() - if s in {"", "NULL", "null", "None"}: - return None - return s - - -def _device_type_for(mc_type: str | None) -> str: - return "camera" if (mc_type or "").upper() == "JC400P" else "tracker" - - -def _lifecycle_for(enabled_flag: str | None) -> str: - return "active" if enabled_flag == "1" else "suspended" - - -def _row_from_csv(raw: dict[str, str]) -> dict[str, Any]: - return { - "imei": _norm(raw.get("imei")), - "device_name": _norm(raw.get("device_name")), - "mc_type": _norm(raw.get("mc_type")), - "plate": _norm(raw.get("vehicle_name")), - "vehicle_class": _norm(raw.get("vehicle_models")), - "driver_name": _norm(raw.get("driver_name")), - "driver_phone": _norm(raw.get("driver_phone")), - "sim": _norm(raw.get("sim")), - "iccid": _norm(raw.get("iccid")), - "account": _norm(raw.get("account")), - "customer_name": _norm(raw.get("customer_name")), - "device_group": _norm(raw.get("device_group")), - "activation_time": _norm(raw.get("activation_time")), - "expiration": _norm(raw.get("expiration")), - "enabled_flag": _norm(raw.get("enabled_flag")), - "cost_centre": _norm(raw.get("cost_centre")), - "assigned_city": _norm(raw.get("assigned_city")) or _norm(raw.get("city")), - } - - -def process_one(cur: psycopg.Cursor, row: dict[str, Any]) -> dict[str, Any]: - """Apply one CSV row. Returns an action dict for summary reporting.""" - imei = row["imei"] - if not imei: - return {"action": "skipped_no_imei"} - - action = {"imei": imei, "remapped_from": None, "remapped_to": None, "created_vehicle": False} - - # 1. Ensure account - if row["account"]: - cur.execute( - """INSERT INTO domain.accounts (account_id, name, app_key) - VALUES (%s, %s, '') - ON CONFLICT (account_id) DO UPDATE SET name = EXCLUDED.name""", - (row["account"], row["customer_name"] or row["account"]), - ) - - # 2. Resolve vehicle_id - cur.execute("SELECT vehicle_id FROM domain.devices WHERE imei = %s", (imei,)) - existing = cur.fetchone() - current_vehicle_id = existing[0] if existing else None - action["pre_existed"] = existing is not None - - if row["plate"]: - cur.execute("SELECT vehicle_id FROM domain.vehicles WHERE plate = %s", (row["plate"],)) - match = cur.fetchone() - if match is not None: - target_vehicle_id = match[0] - if current_vehicle_id is not None and current_vehicle_id != target_vehicle_id: - # Remap state and device from current to target - cur.execute( - "UPDATE state.position_history SET vehicle_id = %s WHERE imei = %s", - (target_vehicle_id, imei), - ) - cur.execute( - "UPDATE state.live_positions SET vehicle_id = %s WHERE imei = %s", - (target_vehicle_id, imei), - ) - action["remapped_from"] = current_vehicle_id - action["remapped_to"] = target_vehicle_id - else: - cur.execute( - """INSERT INTO domain.vehicles (plate, cost_centre, assigned_city, vehicle_class) - VALUES (%s, %s, %s, %s) - RETURNING vehicle_id""", - (row["plate"], row["cost_centre"], row["assigned_city"], row["vehicle_class"]), - ) - target_vehicle_id = cur.fetchone()[0] - action["created_vehicle"] = True - if current_vehicle_id is not None and current_vehicle_id != target_vehicle_id: - cur.execute( - "UPDATE state.position_history SET vehicle_id = %s WHERE imei = %s", - (target_vehicle_id, imei), - ) - cur.execute( - "UPDATE state.live_positions SET vehicle_id = %s WHERE imei = %s", - (target_vehicle_id, imei), - ) - action["remapped_from"] = current_vehicle_id - action["remapped_to"] = target_vehicle_id - elif current_vehicle_id is not None: - target_vehicle_id = current_vehicle_id - else: - # No CSV plate, no existing device → placeholder - plate_holder = f"IMEI-{imei[-6:]}" - cur.execute( - """INSERT INTO domain.vehicles (plate) VALUES (%s) - ON CONFLICT (plate) DO UPDATE SET plate = EXCLUDED.plate - RETURNING vehicle_id""", - (plate_holder,), - ) - target_vehicle_id = cur.fetchone()[0] - action["created_vehicle"] = True - - # 3. Apply vehicle attributes (only overwrite when CSV has a value) - cur.execute( - """UPDATE domain.vehicles SET - cost_centre = COALESCE(%s, cost_centre), - assigned_city = COALESCE(%s, assigned_city), - vehicle_class = COALESCE(%s, vehicle_class), - updated_at = now() - WHERE vehicle_id = %s""", - (row["cost_centre"], row["assigned_city"], row["vehicle_class"], target_vehicle_id), - ) - - # 4. UPSERT device row. - # - # API-vs-CSV ownership of `domain.devices` fields: - # - # API-managed (populated by the projector's _resolve_device): - # imei, account_id, vehicle_id, device_type, lifecycle, activation_at - # → for these we use COALESCE(existing, csv) on UPDATE — CSV only fills - # gaps; never overwrites a value the projector already set. - # - # CSV-only (no API source, added by migration 15): - # driver_name, driver_phone, iccid, expiration_at, device_group, roster_synced_at - # → CSV is the canonical source; always overwrite. - # - # Hybrid: - # model → API surface has this via state.live_positions.mc_type but the - # projector doesn't write it to domain.devices, so the CSV - # effectively fills a NULL gap. COALESCE-fill. - # - # `vehicle_id` is structural — the per-row resolve logic above already - # decided this; the UPSERT just stores it. - cur.execute( - """INSERT INTO domain.devices ( - imei, account_id, vehicle_id, device_type, model, lifecycle, activation_at, - driver_name, driver_phone, iccid, expiration_at, device_group, roster_synced_at - ) VALUES ( - %s, %s, %s, %s, %s, %s, %s, - %s, %s, %s, %s, %s, now() - ) - ON CONFLICT (imei) DO UPDATE SET - account_id = COALESCE(domain.devices.account_id, EXCLUDED.account_id), - vehicle_id = EXCLUDED.vehicle_id, - device_type = COALESCE(domain.devices.device_type, EXCLUDED.device_type), - model = COALESCE(domain.devices.model, EXCLUDED.model), - lifecycle = COALESCE(domain.devices.lifecycle, EXCLUDED.lifecycle), - activation_at = COALESCE(domain.devices.activation_at, EXCLUDED.activation_at), - driver_name = EXCLUDED.driver_name, - driver_phone = EXCLUDED.driver_phone, - iccid = COALESCE(domain.devices.iccid, EXCLUDED.iccid), - expiration_at = COALESCE(domain.devices.expiration_at, EXCLUDED.expiration_at), - device_group = EXCLUDED.device_group, - roster_synced_at = EXCLUDED.roster_synced_at, - updated_at = now()""", - ( - imei, - row["account"] or "Fireside Communications", - target_vehicle_id, - _device_type_for(row["mc_type"]), - row["mc_type"], - _lifecycle_for(row["enabled_flag"]), - row["activation_time"], - row["driver_name"], - row["driver_phone"], - row["iccid"], - row["expiration"], - row["device_group"], - ), - ) - - action["target_vehicle_id"] = target_vehicle_id - action["action"] = "updated" if action["pre_existed"] else "inserted" - return action - - -def delete_orphan_vehicles(cur: psycopg.Cursor) -> int: - cur.execute( - """DELETE FROM domain.vehicles v - WHERE NOT EXISTS (SELECT 1 FROM domain.devices d WHERE d.vehicle_id = v.vehicle_id) - AND NOT EXISTS (SELECT 1 FROM state.live_positions lp WHERE lp.vehicle_id = v.vehicle_id) - AND NOT EXISTS (SELECT 1 FROM state.position_history ph WHERE ph.vehicle_id = v.vehicle_id) - """ - ) - return cur.rowcount - - -def main() -> None: - p = argparse.ArgumentParser() - p.add_argument("csv_path") - p.add_argument("--dry-run", action="store_true", help="Roll back the transaction at the end") - args = p.parse_args() - - dsn = os.environ.get("DATABASE_URL") - if not dsn: - sys.exit("DATABASE_URL env var required") - - with open(args.csv_path) as f: - rows = [_row_from_csv(r) for r in csv.DictReader(f)] - - print(f"loaded {len(rows)} CSV rows ({'DRY-RUN' if args.dry_run else 'APPLY'})") - - actions: list[dict[str, Any]] = [] - with psycopg.connect(dsn, autocommit=False) as conn: - with conn.cursor() as cur: - for row in rows: - actions.append(process_one(cur, row)) - orphans_deleted = delete_orphan_vehicles(cur) - - # summary - inserted = sum(1 for a in actions if a.get("action") == "inserted") - updated = sum(1 for a in actions if a.get("action") == "updated") - skipped = sum(1 for a in actions if a.get("action") == "skipped_no_imei") - new_vehicles = sum(1 for a in actions if a.get("created_vehicle")) - remaps = [a for a in actions if a.get("remapped_from") is not None] - - print(f"\ndevices : inserted={inserted} updated={updated} skipped={skipped}") - print(f"vehicles: created={new_vehicles} orphans_deleted={orphans_deleted}") - print(f"remaps : {len(remaps)} devices moved to a different vehicle_id") - for a in remaps[:10]: - print(f" imei={a['imei']} {a['remapped_from']} -> {a['remapped_to']}") - if len(remaps) > 10: - print(f" ... and {len(remaps) - 10} more") - - # Final-state snapshot - cur.execute("SELECT count(*) FROM domain.vehicles WHERE cost_centre IS NOT NULL") - with_cc = cur.fetchone()[0] - cur.execute("SELECT count(*) FROM domain.devices WHERE driver_name IS NOT NULL") - with_drv = cur.fetchone()[0] - cur.execute("SELECT count(*) FROM domain.vehicles WHERE plate LIKE 'IMEI-%'") - placeholders = cur.fetchone()[0] - cur.execute("SELECT count(*) FROM domain.vehicles") - v_total = cur.fetchone()[0] - cur.execute("SELECT count(*) FROM domain.devices") - d_total = cur.fetchone()[0] - print() - print(f"final state: {d_total} devices / {v_total} vehicles " - f"({with_drv} with driver_name, {with_cc} vehicles with cost_centre, " - f"{placeholders} still IMEI- placeholders)") - cur.execute("""SELECT cost_centre, count(*) FROM domain.vehicles - GROUP BY 1 ORDER BY 2 DESC LIMIT 10""") - print("cost-centre histogram (top 10):") - for cc, n in cur.fetchall(): - print(f" {cc!r:<14} {n}") - - if args.dry_run: - print("\n(dry-run: rolling back)") - conn.rollback() - else: - conn.commit() - print("\n(committed)") - - -if __name__ == "__main__": - main()