tracksolid_timescale_grafan.../import_drivers_csv.py

233 lines
8.6 KiB
Python
Raw Normal View History

"""
import_drivers_csv.py Fireside Communications · Driver & Vehicle CSV Import
One-shot script: reads 20260414_FS__Logistics - final_fixed.csv, compares
each row against the current tracksolid.devices values, and updates the DB.
Usage:
# Dry-run — shows diff, writes nothing
python import_drivers_csv.py
# Filter to a single IMEI (dry-run)
python import_drivers_csv.py --imei 862798052707896
# Apply all changes to DB
python import_drivers_csv.py --apply
# Only fill fields that are currently NULL in the DB (never overwrite)
python import_drivers_csv.py --only-null --apply
Pre-requisite:
Migration 06 must be applied first (adds assigned_city / cost_centre columns).
"""
import argparse
import csv
import os
import sys
import time
from datetime import date
from pathlib import Path
from ts_shared_rev import clean, clean_num, clean_ts, get_conn, get_logger
log = get_logger("csv_import")
CSV_PATH = Path(__file__).parent / "20260414_FS__Logistics - final_fixed.csv"
# Columns fetched from DB for comparison
DB_COLS = [
"imei", "driver_name", "driver_phone", "vehicle_number", "vehicle_name",
"vehicle_models", "cost_centre", "sim", "iccid", "imsi", "mc_type",
"activation_time", "expiration", "device_name", "assigned_city",
]
# Driver Name values that are placeholders — skip writing driver_name for these
_DRIVER_SKIP = {"identification", "ug"}
def _infer_city(plate: str) -> str | None:
"""Derive assigned_city from license plate prefix."""
p = (plate or "").strip().upper()
if p.startswith("UMA") or p.startswith("UAG"):
return "KLA"
if p.startswith("K"):
return "NBO"
return None
def _clean_date(v: str) -> str | None:
"""Accept YYYY-MM-DD and return as ISO string suitable for TIMESTAMPTZ cast."""
s = (v or "").strip()
if not s:
return None
try:
date.fromisoformat(s)
return s
except ValueError:
return None
def load_csv() -> dict[str, dict]:
"""Load CSV into a dict keyed by IMEI."""
rows: dict[str, dict] = {}
with open(CSV_PATH, encoding="utf-8-sig", newline="") as f:
for row in csv.DictReader(f):
imei = (row.get("IMEI") or "").strip()
if not imei:
continue
rows[imei] = row
log.info("CSV loaded: %d rows from %s", len(rows), CSV_PATH.name)
return rows
def load_db_devices() -> dict[str, dict]:
"""Fetch current device rows from DB, keyed by IMEI."""
devices: dict[str, dict] = {}
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(f"SELECT {', '.join(DB_COLS)} FROM tracksolid.devices")
col_names = [d[0] for d in cur.description]
for row in cur.fetchall():
rec = dict(zip(col_names, row))
devices[rec["imei"]] = rec
log.info("DB loaded: %d devices", len(devices))
return devices
def build_update(csv_row: dict, db_row: dict | None, only_null: bool) -> dict[str, object]:
"""
Return a dict of columnnew_value for fields that need updating.
When only_null=True, skip any DB column that already has a value.
The driver_name column is skipped for placeholder-labelled devices.
"""
driver_raw = clean(csv_row.get("Driver Name")) or ""
plate = clean(csv_row.get("License Plate No.")) or ""
is_placeholder = driver_raw.lower() in _DRIVER_SKIP
skip_row = driver_raw.lower() == "identification"
if skip_row:
return {}
proposed: dict[str, object] = {
"vehicle_number": clean(plate),
"vehicle_name": clean(plate),
"vehicle_models": clean(csv_row.get("Vehicle Model")),
"cost_centre": clean(csv_row.get("Department")),
"sim": clean(csv_row.get("SIM")),
"iccid": clean(csv_row.get("ICCID")),
"imsi": clean(csv_row.get("IMSI")),
"mc_type": clean(csv_row.get("Model")),
"activation_time": _clean_date(csv_row.get("Activated Date", "")),
"expiration": _clean_date(csv_row.get("Subscription Expiration", "")),
"driver_phone": clean(csv_row.get("Telephone")),
"assigned_city": _infer_city(plate),
}
if not is_placeholder:
proposed["driver_name"] = driver_raw or None
# Drop None values — no point sending a NULL to overwrite another NULL
proposed = {k: v for k, v in proposed.items() if v is not None}
if not only_null or db_row is None:
return proposed
# only_null: drop any column that already has a non-null value in the DB
return {
k: v for k, v in proposed.items()
if db_row.get(k) is None
}
def print_diff(imei: str, updates: dict[str, object], db_row: dict | None) -> None:
"""Pretty-print what will change for one device."""
if not updates:
return
db = db_row or {}
print(f"\n IMEI {imei}:")
for col, new_val in sorted(updates.items()):
old_val = db.get(col)
if old_val != new_val:
print(f" {col:<20} {str(old_val):<30}{new_val}")
def run(apply: bool, only_null: bool, filter_imei: str | None) -> None:
csv_rows = load_csv()
db_rows = load_db_devices()
if filter_imei:
csv_rows = {k: v for k, v in csv_rows.items() if k == filter_imei}
if not csv_rows:
print(f"IMEI {filter_imei} not found in CSV.")
return
updated = skipped = no_change = not_in_db = 0
with get_conn() as conn:
with conn.cursor() as cur:
for imei, csv_row in csv_rows.items():
db_row = db_rows.get(imei)
updates = build_update(csv_row, db_row, only_null)
if not updates:
# Either an "Identification" placeholder or nothing to change
driver_raw = (csv_row.get("Driver Name") or "").strip().lower()
if driver_raw == "identification":
skipped += 1
else:
no_change += 1
continue
if db_row is None:
not_in_db += 1
log.warning("IMEI %s in CSV but NOT in DB — skipping.", imei)
continue
print_diff(imei, updates, db_row)
if apply:
set_clauses = []
params = []
for col, val in updates.items():
if col in ("activation_time", "expiration"):
set_clauses.append(f"{col} = COALESCE(%s::TIMESTAMPTZ, {col})")
else:
set_clauses.append(
f"{col} = COALESCE(NULLIF(%s, ''), {col})"
)
params.append(str(val) if val is not None else None)
set_clauses.append("updated_at = NOW()")
params.append(imei)
cur.execute(
f"UPDATE tracksolid.devices SET {', '.join(set_clauses)} WHERE imei = %s",
params,
)
updated += 1
else:
updated += 1 # count as "would update" in dry-run
mode = "APPLIED" if apply else "DRY-RUN"
print(f"\n{'='*60}")
print(f" {mode} COMPLETE")
print(f"{'='*60}")
print(f" Would update / updated : {updated}")
print(f" No change needed : {no_change}")
print(f" Skipped (Identification): {skipped}")
print(f" IMEI not in DB : {not_in_db}")
if not apply:
print("\n Run with --apply to commit changes.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Import driver/vehicle details from CSV into tracksolid.devices")
parser.add_argument("--apply", action="store_true", help="Write changes to DB (default: dry-run)")
parser.add_argument("--only-null", action="store_true", help="Only update fields currently NULL in the DB")
parser.add_argument("--imei", default=None, help="Limit to a single IMEI")
args = parser.parse_args()
run(apply=args.apply, only_null=args.only_null, filter_imei=args.imei)