Rollback CSV roster import (mig 17): re-split vehicles, drop CSV columns
The CSV-based roster import (mig 15+16 and scripts/import_csv_roster.py)
merged vehicle rows that differed only by _Track / _CAM suffix, dropping
the active fleet count from 144 to 124. Reverting the whole thing.
Mig 17 in one transaction:
- Re-splits devices by parsed plate from device_name (same regex as
mig 14, preserving _Track as separate vehicle)
- Restores serve.fn_live_view to its v3 body (no d.driver_name/phone
refs that would break once the columns are gone)
- Drops the six CSV-only columns from domain.devices
- Deletes schema_migrations rows for the deleted 15/16
- Logs final counts via RAISE NOTICE
Apply on VPS: psql -f db/migrations/20260601000017_rollback_csv_import.sql
This commit is contained in:
parent
4cc0ef0535
commit
495bb2bd71
3 changed files with 111 additions and 323 deletions
|
|
@ -1,24 +0,0 @@
|
||||||
-- migrate:up
|
|
||||||
--
|
|
||||||
-- Tactical columns on domain.devices to land CSV-imported roster data that
|
|
||||||
-- has no permanent home yet. Superseded by PRD F3.10's full driver roster
|
|
||||||
-- in P3 (domain.drivers + domain.driver_assignments) — at that point these
|
|
||||||
-- columns get dropped after the data is migrated.
|
|
||||||
|
|
||||||
ALTER TABLE domain.devices
|
|
||||||
ADD COLUMN IF NOT EXISTS driver_name text,
|
|
||||||
ADD COLUMN IF NOT EXISTS driver_phone text,
|
|
||||||
ADD COLUMN IF NOT EXISTS iccid text,
|
|
||||||
ADD COLUMN IF NOT EXISTS expiration_at timestamptz,
|
|
||||||
ADD COLUMN IF NOT EXISTS device_group text,
|
|
||||||
ADD COLUMN IF NOT EXISTS roster_synced_at timestamptz;
|
|
||||||
|
|
||||||
-- migrate:down
|
|
||||||
|
|
||||||
ALTER TABLE domain.devices
|
|
||||||
DROP COLUMN IF EXISTS driver_name,
|
|
||||||
DROP COLUMN IF EXISTS driver_phone,
|
|
||||||
DROP COLUMN IF EXISTS iccid,
|
|
||||||
DROP COLUMN IF EXISTS expiration_at,
|
|
||||||
DROP COLUMN IF EXISTS device_group,
|
|
||||||
DROP COLUMN IF EXISTS roster_synced_at;
|
|
||||||
|
|
@ -1,9 +1,75 @@
|
||||||
-- migrate:up
|
-- migrate:up
|
||||||
--
|
--
|
||||||
-- serve.fn_live_view now prefers the canonical driver_name from
|
-- Full rollback of the CSV import (migrations 15 + 16 + the data mutations
|
||||||
-- domain.devices (CSV-imported by scripts/import_csv_roster.py) over the
|
-- performed by scripts/import_csv_roster.py). After this runs, the database
|
||||||
-- heuristic serve._driver_name() parse of device_name. Also surfaces
|
-- is back to the post-migration-14 shape: ~144 vehicles split per device_name
|
||||||
-- driver_phone so the popup can show it.
|
-- (preserving _Track / _CAM suffix vehicles), no CSV-only columns, and
|
||||||
|
-- serve.fn_live_view back to its v3 body (driver_name from heuristic only).
|
||||||
|
--
|
||||||
|
-- Why a single forward migration instead of `dbmate rollback`:
|
||||||
|
-- - The CSV import mutated row state, not just schema. A schema-only down
|
||||||
|
-- would leave 124 merged vehicles in place.
|
||||||
|
-- - dbmate rollback runs the down blocks in reverse, but mig 16's down only
|
||||||
|
-- drops the function — it doesn't restore the previous body. Doing it
|
||||||
|
-- forward lets us be explicit about the target state.
|
||||||
|
|
||||||
|
-- ---------------------------------------------------------------------------
|
||||||
|
-- 1. Re-split vehicles by parsed plate from device_name
|
||||||
|
-- ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
DO $rollback$
|
||||||
|
DECLARE
|
||||||
|
rec RECORD;
|
||||||
|
new_vid integer;
|
||||||
|
BEGIN
|
||||||
|
ALTER TABLE domain.vehicles DROP CONSTRAINT IF EXISTS vehicles_plate_key;
|
||||||
|
|
||||||
|
FOR rec IN
|
||||||
|
SELECT
|
||||||
|
d.imei,
|
||||||
|
d.vehicle_id AS current_vid,
|
||||||
|
v.plate AS current_plate,
|
||||||
|
regexp_replace(
|
||||||
|
(regexp_match(lp.device_name, '^.* - (.+)$'))[1],
|
||||||
|
'_(cam|CAM)$', ''
|
||||||
|
) AS parsed_plate
|
||||||
|
FROM domain.devices d
|
||||||
|
JOIN state.live_positions lp ON lp.imei = d.imei
|
||||||
|
JOIN domain.vehicles v ON v.vehicle_id = d.vehicle_id
|
||||||
|
WHERE lp.device_name LIKE '% - %'
|
||||||
|
LOOP
|
||||||
|
IF rec.parsed_plate IS NULL OR rec.parsed_plate = '' THEN CONTINUE; END IF;
|
||||||
|
IF rec.parsed_plate !~ '[A-Z]' OR rec.parsed_plate !~ '[0-9]' THEN CONTINUE; END IF;
|
||||||
|
IF rec.parsed_plate = rec.current_plate THEN CONTINUE; END IF;
|
||||||
|
|
||||||
|
SELECT vehicle_id INTO new_vid
|
||||||
|
FROM domain.vehicles
|
||||||
|
WHERE plate = rec.parsed_plate;
|
||||||
|
|
||||||
|
IF new_vid IS NULL THEN
|
||||||
|
INSERT INTO domain.vehicles (plate)
|
||||||
|
VALUES (rec.parsed_plate)
|
||||||
|
RETURNING vehicle_id INTO new_vid;
|
||||||
|
END IF;
|
||||||
|
|
||||||
|
UPDATE domain.devices SET vehicle_id = new_vid WHERE imei = rec.imei;
|
||||||
|
UPDATE state.live_positions SET vehicle_id = new_vid WHERE imei = rec.imei;
|
||||||
|
UPDATE state.position_history SET vehicle_id = new_vid WHERE imei = rec.imei;
|
||||||
|
END LOOP;
|
||||||
|
|
||||||
|
-- Delete vehicles that no device or state row references anymore
|
||||||
|
DELETE FROM domain.vehicles v
|
||||||
|
WHERE NOT EXISTS (SELECT 1 FROM domain.devices d WHERE d.vehicle_id = v.vehicle_id)
|
||||||
|
AND NOT EXISTS (SELECT 1 FROM state.live_positions lp WHERE lp.vehicle_id = v.vehicle_id)
|
||||||
|
AND NOT EXISTS (SELECT 1 FROM state.position_history ph WHERE ph.vehicle_id = v.vehicle_id);
|
||||||
|
|
||||||
|
ALTER TABLE domain.vehicles ADD CONSTRAINT vehicles_plate_key UNIQUE (plate);
|
||||||
|
END
|
||||||
|
$rollback$;
|
||||||
|
|
||||||
|
-- ---------------------------------------------------------------------------
|
||||||
|
-- 2. Restore serve.fn_live_view to the v3 body (driver_name heuristic only)
|
||||||
|
-- ---------------------------------------------------------------------------
|
||||||
|
|
||||||
DROP FUNCTION IF EXISTS serve.fn_live_view(jsonb);
|
DROP FUNCTION IF EXISTS serve.fn_live_view(jsonb);
|
||||||
|
|
||||||
|
|
@ -30,8 +96,6 @@ BEGIN
|
||||||
lp.mc_type, lp.current_mileage_km, lp.gps_signal, lp.satellites,
|
lp.mc_type, lp.current_mileage_km, lp.gps_signal, lp.satellites,
|
||||||
lp.device_name, lp.pos_type,
|
lp.device_name, lp.pos_type,
|
||||||
d.device_type, d.activation_at,
|
d.device_type, d.activation_at,
|
||||||
d.driver_name AS driver_name_csv,
|
|
||||||
d.driver_phone AS driver_phone_csv,
|
|
||||||
v.vehicle_id, v.plate, v.cost_centre, v.assigned_city
|
v.vehicle_id, v.plate, v.cost_centre, v.assigned_city
|
||||||
FROM state.live_positions lp
|
FROM state.live_positions lp
|
||||||
JOIN domain.devices d ON d.imei = lp.imei
|
JOIN domain.devices d ON d.imei = lp.imei
|
||||||
|
|
@ -95,8 +159,7 @@ BEGIN
|
||||||
'vehicle_id', e.vehicle_id,
|
'vehicle_id', e.vehicle_id,
|
||||||
'plate', e.plate,
|
'plate', e.plate,
|
||||||
'plate_short', serve._label_short(e.device_name, e.plate),
|
'plate_short', serve._label_short(e.device_name, e.plate),
|
||||||
'driver_name', COALESCE(e.driver_name_csv, serve._driver_name(e.device_name)),
|
'driver_name', serve._driver_name(e.device_name),
|
||||||
'driver_phone', e.driver_phone_csv,
|
|
||||||
'imei', e.imei,
|
'imei', e.imei,
|
||||||
'device_type', e.device_type,
|
'device_type', e.device_type,
|
||||||
'device_name', e.device_name,
|
'device_name', e.device_name,
|
||||||
|
|
@ -142,6 +205,44 @@ BEGIN
|
||||||
END;
|
END;
|
||||||
$$;
|
$$;
|
||||||
|
|
||||||
-- migrate:down
|
-- ---------------------------------------------------------------------------
|
||||||
|
-- 3. Drop the CSV-only columns from domain.devices
|
||||||
|
-- ---------------------------------------------------------------------------
|
||||||
|
|
||||||
DROP FUNCTION IF EXISTS serve.fn_live_view(jsonb);
|
ALTER TABLE domain.devices
|
||||||
|
DROP COLUMN IF EXISTS driver_name,
|
||||||
|
DROP COLUMN IF EXISTS driver_phone,
|
||||||
|
DROP COLUMN IF EXISTS iccid,
|
||||||
|
DROP COLUMN IF EXISTS expiration_at,
|
||||||
|
DROP COLUMN IF EXISTS device_group,
|
||||||
|
DROP COLUMN IF EXISTS roster_synced_at;
|
||||||
|
|
||||||
|
-- ---------------------------------------------------------------------------
|
||||||
|
-- 4. Remove the dead schema_migrations rows for the deleted migrations
|
||||||
|
-- ---------------------------------------------------------------------------
|
||||||
|
--
|
||||||
|
-- 20260601000015 (devices_csv_columns) and 20260601000016 (prefer_csv_driver)
|
||||||
|
-- have been deleted from db/migrations/ — drop their tracker rows so dbmate
|
||||||
|
-- status stays consistent with the files on disk.
|
||||||
|
|
||||||
|
DELETE FROM public.schema_migrations
|
||||||
|
WHERE version IN ('20260601000015', '20260601000016');
|
||||||
|
|
||||||
|
-- ---------------------------------------------------------------------------
|
||||||
|
-- 5. Sanity check — log the resulting counts
|
||||||
|
-- ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
DO $check$
|
||||||
|
DECLARE
|
||||||
|
v_count integer;
|
||||||
|
d_count integer;
|
||||||
|
BEGIN
|
||||||
|
SELECT count(*) INTO v_count FROM domain.vehicles;
|
||||||
|
SELECT count(*) INTO d_count FROM domain.devices;
|
||||||
|
RAISE NOTICE 'rollback complete: % vehicles, % devices', v_count, d_count;
|
||||||
|
END
|
||||||
|
$check$;
|
||||||
|
|
||||||
|
-- migrate:down
|
||||||
|
-- No-op: this migration is itself a rollback. To re-apply the CSV import,
|
||||||
|
-- re-run migrations 15 + 16 and scripts/import_csv_roster.py.
|
||||||
|
|
@ -1,289 +0,0 @@
|
||||||
"""Import the FSG roster CSV into domain.accounts / domain.vehicles / domain.devices.
|
|
||||||
|
|
||||||
Idempotent — re-running gives the same end state. Per-CSV-row UPSERT inside
|
|
||||||
one outer transaction; ROLLBACK on --dry-run, COMMIT otherwise. Orphaned
|
|
||||||
vehicle rows (placeholders left behind after remap) are deleted at the end.
|
|
||||||
|
|
||||||
Usage:
|
|
||||||
PGPASSWORD=... DATABASE_URL=postgresql://... \\
|
|
||||||
.venv/bin/python scripts/import_csv_roster.py <path-to-csv> [--dry-run]
|
|
||||||
"""
|
|
||||||
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import argparse
|
|
||||||
import csv
|
|
||||||
import os
|
|
||||||
import sys
|
|
||||||
from collections import Counter
|
|
||||||
from typing import Any
|
|
||||||
|
|
||||||
import psycopg
|
|
||||||
|
|
||||||
|
|
||||||
def _norm(v: str | None) -> str | None:
|
|
||||||
if v is None:
|
|
||||||
return None
|
|
||||||
s = v.strip()
|
|
||||||
if s in {"", "NULL", "null", "None"}:
|
|
||||||
return None
|
|
||||||
return s
|
|
||||||
|
|
||||||
|
|
||||||
def _device_type_for(mc_type: str | None) -> str:
|
|
||||||
return "camera" if (mc_type or "").upper() == "JC400P" else "tracker"
|
|
||||||
|
|
||||||
|
|
||||||
def _lifecycle_for(enabled_flag: str | None) -> str:
|
|
||||||
return "active" if enabled_flag == "1" else "suspended"
|
|
||||||
|
|
||||||
|
|
||||||
def _row_from_csv(raw: dict[str, str]) -> dict[str, Any]:
|
|
||||||
return {
|
|
||||||
"imei": _norm(raw.get("imei")),
|
|
||||||
"device_name": _norm(raw.get("device_name")),
|
|
||||||
"mc_type": _norm(raw.get("mc_type")),
|
|
||||||
"plate": _norm(raw.get("vehicle_name")),
|
|
||||||
"vehicle_class": _norm(raw.get("vehicle_models")),
|
|
||||||
"driver_name": _norm(raw.get("driver_name")),
|
|
||||||
"driver_phone": _norm(raw.get("driver_phone")),
|
|
||||||
"sim": _norm(raw.get("sim")),
|
|
||||||
"iccid": _norm(raw.get("iccid")),
|
|
||||||
"account": _norm(raw.get("account")),
|
|
||||||
"customer_name": _norm(raw.get("customer_name")),
|
|
||||||
"device_group": _norm(raw.get("device_group")),
|
|
||||||
"activation_time": _norm(raw.get("activation_time")),
|
|
||||||
"expiration": _norm(raw.get("expiration")),
|
|
||||||
"enabled_flag": _norm(raw.get("enabled_flag")),
|
|
||||||
"cost_centre": _norm(raw.get("cost_centre")),
|
|
||||||
"assigned_city": _norm(raw.get("assigned_city")) or _norm(raw.get("city")),
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
def process_one(cur: psycopg.Cursor, row: dict[str, Any]) -> dict[str, Any]:
|
|
||||||
"""Apply one CSV row. Returns an action dict for summary reporting."""
|
|
||||||
imei = row["imei"]
|
|
||||||
if not imei:
|
|
||||||
return {"action": "skipped_no_imei"}
|
|
||||||
|
|
||||||
action = {"imei": imei, "remapped_from": None, "remapped_to": None, "created_vehicle": False}
|
|
||||||
|
|
||||||
# 1. Ensure account
|
|
||||||
if row["account"]:
|
|
||||||
cur.execute(
|
|
||||||
"""INSERT INTO domain.accounts (account_id, name, app_key)
|
|
||||||
VALUES (%s, %s, '')
|
|
||||||
ON CONFLICT (account_id) DO UPDATE SET name = EXCLUDED.name""",
|
|
||||||
(row["account"], row["customer_name"] or row["account"]),
|
|
||||||
)
|
|
||||||
|
|
||||||
# 2. Resolve vehicle_id
|
|
||||||
cur.execute("SELECT vehicle_id FROM domain.devices WHERE imei = %s", (imei,))
|
|
||||||
existing = cur.fetchone()
|
|
||||||
current_vehicle_id = existing[0] if existing else None
|
|
||||||
action["pre_existed"] = existing is not None
|
|
||||||
|
|
||||||
if row["plate"]:
|
|
||||||
cur.execute("SELECT vehicle_id FROM domain.vehicles WHERE plate = %s", (row["plate"],))
|
|
||||||
match = cur.fetchone()
|
|
||||||
if match is not None:
|
|
||||||
target_vehicle_id = match[0]
|
|
||||||
if current_vehicle_id is not None and current_vehicle_id != target_vehicle_id:
|
|
||||||
# Remap state and device from current to target
|
|
||||||
cur.execute(
|
|
||||||
"UPDATE state.position_history SET vehicle_id = %s WHERE imei = %s",
|
|
||||||
(target_vehicle_id, imei),
|
|
||||||
)
|
|
||||||
cur.execute(
|
|
||||||
"UPDATE state.live_positions SET vehicle_id = %s WHERE imei = %s",
|
|
||||||
(target_vehicle_id, imei),
|
|
||||||
)
|
|
||||||
action["remapped_from"] = current_vehicle_id
|
|
||||||
action["remapped_to"] = target_vehicle_id
|
|
||||||
else:
|
|
||||||
cur.execute(
|
|
||||||
"""INSERT INTO domain.vehicles (plate, cost_centre, assigned_city, vehicle_class)
|
|
||||||
VALUES (%s, %s, %s, %s)
|
|
||||||
RETURNING vehicle_id""",
|
|
||||||
(row["plate"], row["cost_centre"], row["assigned_city"], row["vehicle_class"]),
|
|
||||||
)
|
|
||||||
target_vehicle_id = cur.fetchone()[0]
|
|
||||||
action["created_vehicle"] = True
|
|
||||||
if current_vehicle_id is not None and current_vehicle_id != target_vehicle_id:
|
|
||||||
cur.execute(
|
|
||||||
"UPDATE state.position_history SET vehicle_id = %s WHERE imei = %s",
|
|
||||||
(target_vehicle_id, imei),
|
|
||||||
)
|
|
||||||
cur.execute(
|
|
||||||
"UPDATE state.live_positions SET vehicle_id = %s WHERE imei = %s",
|
|
||||||
(target_vehicle_id, imei),
|
|
||||||
)
|
|
||||||
action["remapped_from"] = current_vehicle_id
|
|
||||||
action["remapped_to"] = target_vehicle_id
|
|
||||||
elif current_vehicle_id is not None:
|
|
||||||
target_vehicle_id = current_vehicle_id
|
|
||||||
else:
|
|
||||||
# No CSV plate, no existing device → placeholder
|
|
||||||
plate_holder = f"IMEI-{imei[-6:]}"
|
|
||||||
cur.execute(
|
|
||||||
"""INSERT INTO domain.vehicles (plate) VALUES (%s)
|
|
||||||
ON CONFLICT (plate) DO UPDATE SET plate = EXCLUDED.plate
|
|
||||||
RETURNING vehicle_id""",
|
|
||||||
(plate_holder,),
|
|
||||||
)
|
|
||||||
target_vehicle_id = cur.fetchone()[0]
|
|
||||||
action["created_vehicle"] = True
|
|
||||||
|
|
||||||
# 3. Apply vehicle attributes (only overwrite when CSV has a value)
|
|
||||||
cur.execute(
|
|
||||||
"""UPDATE domain.vehicles SET
|
|
||||||
cost_centre = COALESCE(%s, cost_centre),
|
|
||||||
assigned_city = COALESCE(%s, assigned_city),
|
|
||||||
vehicle_class = COALESCE(%s, vehicle_class),
|
|
||||||
updated_at = now()
|
|
||||||
WHERE vehicle_id = %s""",
|
|
||||||
(row["cost_centre"], row["assigned_city"], row["vehicle_class"], target_vehicle_id),
|
|
||||||
)
|
|
||||||
|
|
||||||
# 4. UPSERT device row.
|
|
||||||
#
|
|
||||||
# API-vs-CSV ownership of `domain.devices` fields:
|
|
||||||
#
|
|
||||||
# API-managed (populated by the projector's _resolve_device):
|
|
||||||
# imei, account_id, vehicle_id, device_type, lifecycle, activation_at
|
|
||||||
# → for these we use COALESCE(existing, csv) on UPDATE — CSV only fills
|
|
||||||
# gaps; never overwrites a value the projector already set.
|
|
||||||
#
|
|
||||||
# CSV-only (no API source, added by migration 15):
|
|
||||||
# driver_name, driver_phone, iccid, expiration_at, device_group, roster_synced_at
|
|
||||||
# → CSV is the canonical source; always overwrite.
|
|
||||||
#
|
|
||||||
# Hybrid:
|
|
||||||
# model → API surface has this via state.live_positions.mc_type but the
|
|
||||||
# projector doesn't write it to domain.devices, so the CSV
|
|
||||||
# effectively fills a NULL gap. COALESCE-fill.
|
|
||||||
#
|
|
||||||
# `vehicle_id` is structural — the per-row resolve logic above already
|
|
||||||
# decided this; the UPSERT just stores it.
|
|
||||||
cur.execute(
|
|
||||||
"""INSERT INTO domain.devices (
|
|
||||||
imei, account_id, vehicle_id, device_type, model, lifecycle, activation_at,
|
|
||||||
driver_name, driver_phone, iccid, expiration_at, device_group, roster_synced_at
|
|
||||||
) VALUES (
|
|
||||||
%s, %s, %s, %s, %s, %s, %s,
|
|
||||||
%s, %s, %s, %s, %s, now()
|
|
||||||
)
|
|
||||||
ON CONFLICT (imei) DO UPDATE SET
|
|
||||||
account_id = COALESCE(domain.devices.account_id, EXCLUDED.account_id),
|
|
||||||
vehicle_id = EXCLUDED.vehicle_id,
|
|
||||||
device_type = COALESCE(domain.devices.device_type, EXCLUDED.device_type),
|
|
||||||
model = COALESCE(domain.devices.model, EXCLUDED.model),
|
|
||||||
lifecycle = COALESCE(domain.devices.lifecycle, EXCLUDED.lifecycle),
|
|
||||||
activation_at = COALESCE(domain.devices.activation_at, EXCLUDED.activation_at),
|
|
||||||
driver_name = EXCLUDED.driver_name,
|
|
||||||
driver_phone = EXCLUDED.driver_phone,
|
|
||||||
iccid = COALESCE(domain.devices.iccid, EXCLUDED.iccid),
|
|
||||||
expiration_at = COALESCE(domain.devices.expiration_at, EXCLUDED.expiration_at),
|
|
||||||
device_group = EXCLUDED.device_group,
|
|
||||||
roster_synced_at = EXCLUDED.roster_synced_at,
|
|
||||||
updated_at = now()""",
|
|
||||||
(
|
|
||||||
imei,
|
|
||||||
row["account"] or "Fireside Communications",
|
|
||||||
target_vehicle_id,
|
|
||||||
_device_type_for(row["mc_type"]),
|
|
||||||
row["mc_type"],
|
|
||||||
_lifecycle_for(row["enabled_flag"]),
|
|
||||||
row["activation_time"],
|
|
||||||
row["driver_name"],
|
|
||||||
row["driver_phone"],
|
|
||||||
row["iccid"],
|
|
||||||
row["expiration"],
|
|
||||||
row["device_group"],
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
action["target_vehicle_id"] = target_vehicle_id
|
|
||||||
action["action"] = "updated" if action["pre_existed"] else "inserted"
|
|
||||||
return action
|
|
||||||
|
|
||||||
|
|
||||||
def delete_orphan_vehicles(cur: psycopg.Cursor) -> int:
|
|
||||||
cur.execute(
|
|
||||||
"""DELETE FROM domain.vehicles v
|
|
||||||
WHERE NOT EXISTS (SELECT 1 FROM domain.devices d WHERE d.vehicle_id = v.vehicle_id)
|
|
||||||
AND NOT EXISTS (SELECT 1 FROM state.live_positions lp WHERE lp.vehicle_id = v.vehicle_id)
|
|
||||||
AND NOT EXISTS (SELECT 1 FROM state.position_history ph WHERE ph.vehicle_id = v.vehicle_id)
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
return cur.rowcount
|
|
||||||
|
|
||||||
|
|
||||||
def main() -> None:
|
|
||||||
p = argparse.ArgumentParser()
|
|
||||||
p.add_argument("csv_path")
|
|
||||||
p.add_argument("--dry-run", action="store_true", help="Roll back the transaction at the end")
|
|
||||||
args = p.parse_args()
|
|
||||||
|
|
||||||
dsn = os.environ.get("DATABASE_URL")
|
|
||||||
if not dsn:
|
|
||||||
sys.exit("DATABASE_URL env var required")
|
|
||||||
|
|
||||||
with open(args.csv_path) as f:
|
|
||||||
rows = [_row_from_csv(r) for r in csv.DictReader(f)]
|
|
||||||
|
|
||||||
print(f"loaded {len(rows)} CSV rows ({'DRY-RUN' if args.dry_run else 'APPLY'})")
|
|
||||||
|
|
||||||
actions: list[dict[str, Any]] = []
|
|
||||||
with psycopg.connect(dsn, autocommit=False) as conn:
|
|
||||||
with conn.cursor() as cur:
|
|
||||||
for row in rows:
|
|
||||||
actions.append(process_one(cur, row))
|
|
||||||
orphans_deleted = delete_orphan_vehicles(cur)
|
|
||||||
|
|
||||||
# summary
|
|
||||||
inserted = sum(1 for a in actions if a.get("action") == "inserted")
|
|
||||||
updated = sum(1 for a in actions if a.get("action") == "updated")
|
|
||||||
skipped = sum(1 for a in actions if a.get("action") == "skipped_no_imei")
|
|
||||||
new_vehicles = sum(1 for a in actions if a.get("created_vehicle"))
|
|
||||||
remaps = [a for a in actions if a.get("remapped_from") is not None]
|
|
||||||
|
|
||||||
print(f"\ndevices : inserted={inserted} updated={updated} skipped={skipped}")
|
|
||||||
print(f"vehicles: created={new_vehicles} orphans_deleted={orphans_deleted}")
|
|
||||||
print(f"remaps : {len(remaps)} devices moved to a different vehicle_id")
|
|
||||||
for a in remaps[:10]:
|
|
||||||
print(f" imei={a['imei']} {a['remapped_from']} -> {a['remapped_to']}")
|
|
||||||
if len(remaps) > 10:
|
|
||||||
print(f" ... and {len(remaps) - 10} more")
|
|
||||||
|
|
||||||
# Final-state snapshot
|
|
||||||
cur.execute("SELECT count(*) FROM domain.vehicles WHERE cost_centre IS NOT NULL")
|
|
||||||
with_cc = cur.fetchone()[0]
|
|
||||||
cur.execute("SELECT count(*) FROM domain.devices WHERE driver_name IS NOT NULL")
|
|
||||||
with_drv = cur.fetchone()[0]
|
|
||||||
cur.execute("SELECT count(*) FROM domain.vehicles WHERE plate LIKE 'IMEI-%'")
|
|
||||||
placeholders = cur.fetchone()[0]
|
|
||||||
cur.execute("SELECT count(*) FROM domain.vehicles")
|
|
||||||
v_total = cur.fetchone()[0]
|
|
||||||
cur.execute("SELECT count(*) FROM domain.devices")
|
|
||||||
d_total = cur.fetchone()[0]
|
|
||||||
print()
|
|
||||||
print(f"final state: {d_total} devices / {v_total} vehicles "
|
|
||||||
f"({with_drv} with driver_name, {with_cc} vehicles with cost_centre, "
|
|
||||||
f"{placeholders} still IMEI- placeholders)")
|
|
||||||
cur.execute("""SELECT cost_centre, count(*) FROM domain.vehicles
|
|
||||||
GROUP BY 1 ORDER BY 2 DESC LIMIT 10""")
|
|
||||||
print("cost-centre histogram (top 10):")
|
|
||||||
for cc, n in cur.fetchall():
|
|
||||||
print(f" {cc!r:<14} {n}")
|
|
||||||
|
|
||||||
if args.dry_run:
|
|
||||||
print("\n(dry-run: rolling back)")
|
|
||||||
conn.rollback()
|
|
||||||
else:
|
|
||||||
conn.commit()
|
|
||||||
print("\n(committed)")
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
Loading…
Reference in a new issue