feat(tools): import_drivers_csv reads raw Tracksolid Pro export format

Add auto-detecting header-alias layer (_EXPORT_HEADER_ALIASES) so
import_drivers_csv ingests the raw Tracksolid Pro device export
(Title-Case headers: IMEI, Driver Name, LicensePlateNo., Model,
Vehicle Model, …) in addition to the original snake_case schema-mirror
CSV. Add customer_name/device_group to the write set, extend the
driver-placeholder skip list (Unassigned/UG/UG Crane), and exclude
activation_time/expiration/device_group (date-only/casing churn that
would degrade precise API-set values).

Used to apply the 260625 telematics quality-check file: 152 devices
updated (device_name, driver_name, plate, vehicle type, depot_address).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
david kiania 2026-06-25 14:52:11 +03:00
parent 048d1a3173
commit 458e0be101
2 changed files with 345 additions and 0 deletions

4
tools/__init__.py Normal file
View file

@ -0,0 +1,4 @@
# Manual / one-shot operator tools (NOT part of the runtime services).
# Run from the repo root as a module so `import ts_shared_rev` resolves, e.g.:
# docker exec ingest_worker python -m tools.sync_driver_audit
# See tools/README.md.

341
tools/import_drivers_csv.py Normal file
View file

@ -0,0 +1,341 @@
"""
import_drivers_csv.py Fireside Communications · Driver & Vehicle CSV Import
One-shot script: reads the snake_case Fireside Group vehicle CSV
(`20260427_FSG_Vehicles_mitieng.csv`), compares each row against the
current `tracksolid.devices` values, and updates the DB.
The CSV columns mirror the DB schema directly no inference. Cells with the
literal string "NULL" are treated as missing.
Fields imported (per Phase 0.1 of the Business Analytics redesign plan):
Identity : driver_name, driver_phone, vehicle_number, vehicle_name,
vehicle_models, mc_type, device_name
SIM : sim, iccid, imsi
Lifecycle : activation_time, expiration
Business meta : assigned_city, cost_centre, assigned_route,
vehicle_category, vehicle_brand, fuel_100km, depot_address
`depot_geom` (PostGIS Point) is intentionally NOT imported needs WKT and
isn't present as coordinates in the CSV. Set it via a follow-up migration
when geofences are loaded.
Usage:
# Dry-run — shows diff, writes nothing
python import_drivers_csv.py
# Filter to a single IMEI (dry-run)
python import_drivers_csv.py --imei 862798052707896
# Apply all changes to DB
python import_drivers_csv.py --apply
# Only fill fields that are currently NULL in the DB (never overwrite)
python import_drivers_csv.py --only-null --apply
# Use a different CSV
python import_drivers_csv.py --csv path/to/file.csv
Pre-requisite:
Migrations 02, 05, 06 must be applied (they add the metadata columns).
"""
import argparse
import csv
from pathlib import Path
from ts_shared_rev import clean, clean_num, clean_ts, get_conn, get_logger
log = get_logger("csv_import")
DEFAULT_CSV_PATH = Path(__file__).parent / "data" / "20260427_FSG_Vehicles_mitieng.csv"
# Columns fetched from DB for diff comparison.
DB_COLS = [
"imei",
# Identity
"driver_name", "driver_phone", "vehicle_number", "vehicle_name",
"vehicle_models", "mc_type", "device_name", "customer_name", "device_group",
# SIM
"sim", "iccid", "imsi",
# Lifecycle
"activation_time", "expiration",
# Business metadata (Phase 0.1 additions)
"assigned_city", "cost_centre", "assigned_route",
"vehicle_category", "vehicle_brand", "fuel_100km", "depot_address",
]
# Driver Name values that are placeholders — skip writing driver_name for these.
_DRIVER_SKIP = {"identification", "ug", "ug crane", "unassigned"}
# Tracksolid Pro raw-export headers (Title-Case) → canonical snake_case DB columns.
# Used when the CSV is the raw platform export rather than the snake_case schema-mirror.
# Unmapped export columns (Mileage, Account, MAC, SN, VIN, install metadata, …) are ignored:
# `Mileage`/`Account` are deliberately excluded (pipeline-owned), the rest have no DB column.
_EXPORT_HEADER_ALIASES = {
"IMEI": "imei",
"Driver Name": "driver_name",
"Telephone": "driver_phone",
"LicensePlateNo.": "vehicle_number",
"Device Name": "device_name",
"Model": "mc_type",
"Vehicle Model": "vehicle_models",
"Vehicle Brand": "vehicle_brand",
"Fuel/100km": "fuel_100km",
"SIM": "sim",
"ICCID": "iccid",
"IMSI": "imsi",
"Activated Date": "activation_time",
"Subscription Expiration": "expiration",
"Customer Name": "customer_name",
"Group": "device_group",
"Installation Address": "depot_address",
}
# Columns intentionally NOT written from the raw Tracksolid export: its date
# columns are date-only and would degrade the precise API-set timestamps, and
# device_group differs only by capitalisation ("Default group"→"Default Group").
# (User decision, 2026-06-25 quality-check import.)
_EXCLUDE_COLS = {"activation_time", "expiration", "device_group"}
# Columns that need an explicit cast in the UPDATE statement.
_TIMESTAMPTZ_COLS = {"activation_time", "expiration"}
_NUMERIC_COLS = {"fuel_100km"}
def _read(row: dict, col: str) -> str | None:
"""Read a CSV column treating literal 'NULL'/'None' (case-insensitive) as missing."""
v = clean(row.get(col))
if v is None:
return None
return None if v.upper() in ("NULL", "NONE") else v
def _read_num(row: dict, col: str) -> float | None:
v = _read(row, col)
return clean_num(v) if v is not None else None
def _read_ts(row: dict, col: str) -> str | None:
v = _read(row, col)
return clean_ts(v) if v is not None else None
def load_csv(csv_path: Path) -> dict[str, dict]:
"""Load CSV into a dict keyed by IMEI.
Accepts two header styles: the snake_case schema-mirror CSV (original
`tools/data/` workflow) and the raw Tracksolid Pro export (Title-Case
headers). The export form is auto-detected and its headers are remapped to
the canonical snake_case names via `_EXPORT_HEADER_ALIASES`; unmapped
columns are dropped.
"""
rows: dict[str, dict] = {}
with open(csv_path, encoding="utf-8-sig", newline="") as f:
reader = csv.DictReader(f)
is_export = "IMEI" in (reader.fieldnames or [])
if is_export:
log.info("Detected Tracksolid raw-export header format; remapping columns.")
for raw in reader:
row = (
{snake: raw.get(orig) for orig, snake in _EXPORT_HEADER_ALIASES.items()}
if is_export
else raw
)
imei = (row.get("imei") or "").strip()
if not imei:
continue
rows[imei] = row
log.info("CSV loaded: %d rows from %s", len(rows), csv_path.name)
return rows
def load_db_devices() -> dict[str, dict]:
"""Fetch current device rows from DB, keyed by IMEI."""
devices: dict[str, dict] = {}
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(f"SELECT {', '.join(DB_COLS)} FROM tracksolid.devices")
col_names = [d[0] for d in cur.description]
for row in cur.fetchall():
rec = dict(zip(col_names, row))
devices[rec["imei"]] = rec
log.info("DB loaded: %d devices", len(devices))
return devices
def build_update(csv_row: dict, db_row: dict | None, only_null: bool) -> dict[str, object]:
"""
Return a dict of columnnew_value for fields that need updating.
When only_null=True, skip any DB column that already has a value.
The driver_name column is skipped for placeholder-labelled devices.
"""
driver_raw = (_read(csv_row, "driver_name") or "")
is_placeholder = driver_raw.lower() in _DRIVER_SKIP
if driver_raw.lower() == "identification":
return {}
proposed: dict[str, object] = {
# Identity
"driver_phone": _read(csv_row, "driver_phone"),
"vehicle_number": _read(csv_row, "vehicle_number"),
"vehicle_name": _read(csv_row, "vehicle_name"),
"vehicle_models": _read(csv_row, "vehicle_models"),
"mc_type": _read(csv_row, "mc_type"),
"device_name": _read(csv_row, "device_name"),
"customer_name": _read(csv_row, "customer_name"),
"device_group": _read(csv_row, "device_group"),
# SIM
"sim": _read(csv_row, "sim"),
"iccid": _read(csv_row, "iccid"),
"imsi": _read(csv_row, "imsi"),
# Lifecycle
"activation_time": _read_ts(csv_row, "activation_time"),
"expiration": _read_ts(csv_row, "expiration"),
# Business metadata
"assigned_city": _read(csv_row, "assigned_city"),
"cost_centre": _read(csv_row, "cost_centre"),
"assigned_route": _read(csv_row, "assigned_route"),
"vehicle_category": _read(csv_row, "vehicle_category"),
"vehicle_brand": _read(csv_row, "vehicle_brand"),
"fuel_100km": _read_num(csv_row, "fuel_100km"),
"depot_address": _read(csv_row, "depot_address"),
}
if not is_placeholder and driver_raw:
proposed["driver_name"] = driver_raw
# Drop None values — no point sending NULL to overwrite NULL — and any
# explicitly excluded columns (precision-degrading / cosmetic churn).
proposed = {
k: v for k, v in proposed.items()
if v is not None and k not in _EXCLUDE_COLS
}
if not only_null or db_row is None:
return proposed
# only_null: drop any column that already has a non-null value in the DB.
return {k: v for k, v in proposed.items() if db_row.get(k) is None}
def print_diff(imei: str, updates: dict[str, object], db_row: dict | None) -> None:
"""Pretty-print what will change for one device."""
if not updates:
return
db = db_row or {}
print(f"\n IMEI {imei}:")
for col, new_val in sorted(updates.items()):
old_val = db.get(col)
if str(old_val) != str(new_val):
print(f" {col:<20} {str(old_val):<30}{new_val}")
def _set_clause(col: str) -> str:
"""SQL fragment for `col = ...` honouring per-column casts."""
if col in _TIMESTAMPTZ_COLS:
return f"{col} = COALESCE(%s::TIMESTAMPTZ, {col})"
if col in _NUMERIC_COLS:
# %s already a float; no NULLIF dance needed.
return f"{col} = COALESCE(%s::NUMERIC, {col})"
return f"{col} = COALESCE(NULLIF(%s, ''), {col})"
def _placeholder(col: str) -> str:
"""SQL fragment for a single VALUES placeholder honouring per-column casts."""
if col in _TIMESTAMPTZ_COLS:
return "%s::TIMESTAMPTZ"
if col in _NUMERIC_COLS:
return "%s::NUMERIC"
return "%s"
def run(apply: bool, only_null: bool, filter_imei: str | None, csv_path: Path) -> None:
csv_rows = load_csv(csv_path)
db_rows = load_db_devices()
if filter_imei:
csv_rows = {k: v for k, v in csv_rows.items() if k == filter_imei}
if not csv_rows:
print(f"IMEI {filter_imei} not found in CSV.")
return
updated = inserted = skipped = no_change = 0
with get_conn() as conn:
with conn.cursor() as cur:
for imei, csv_row in csv_rows.items():
db_row = db_rows.get(imei)
updates = build_update(csv_row, db_row, only_null)
if not updates:
driver_raw = (_read(csv_row, "driver_name") or "").lower()
if driver_raw == "identification":
skipped += 1
else:
no_change += 1
continue
if db_row is None:
# Device not yet synced from API — insert a stub row so
# incoming alarms / positions don't trip the FK constraint.
print(f"\n [NEW] IMEI {imei}:")
for col, new_val in sorted(updates.items()):
print(f" {col:<20}{new_val}")
if apply:
cols = ["imei"] + list(updates.keys())
vals = [imei] + list(updates.values())
placeholders = ["%s"] + [_placeholder(c) for c in updates.keys()]
cur.execute(
f"INSERT INTO tracksolid.devices ({', '.join(cols)}) "
f"VALUES ({', '.join(placeholders)}) "
"ON CONFLICT (imei) DO NOTHING",
vals,
)
inserted += 1
continue
print_diff(imei, updates, db_row)
if apply:
set_clauses = [_set_clause(c) for c in updates.keys()]
params = list(updates.values())
set_clauses.append("updated_at = NOW()")
params.append(imei)
cur.execute(
f"UPDATE tracksolid.devices SET {', '.join(set_clauses)} WHERE imei = %s",
params,
)
updated += 1
else:
updated += 1 # count as "would update" in dry-run
mode = "APPLIED" if apply else "DRY-RUN"
print(f"\n{'='*60}")
print(f" {mode} COMPLETE")
print(f"{'='*60}")
print(f" Would update / updated : {updated}")
print(f" Would insert / inserted : {inserted}")
print(f" No change needed : {no_change}")
print(f" Skipped (Identification): {skipped}")
if not apply:
print("\n Run with --apply to commit changes.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Import driver/vehicle details from CSV into tracksolid.devices"
)
parser.add_argument("--apply", action="store_true", help="Write changes to DB (default: dry-run)")
parser.add_argument("--only-null", action="store_true", help="Only update fields currently NULL in the DB")
parser.add_argument("--imei", default=None, help="Limit to a single IMEI")
parser.add_argument("--csv", default=str(DEFAULT_CSV_PATH),
help=f"Path to the CSV (default: {DEFAULT_CSV_PATH.name})")
args = parser.parse_args()
csv_path = Path(args.csv)
if not csv_path.exists():
log.error("CSV file not found: %s", csv_path)
raise SystemExit(1)
run(apply=args.apply, only_null=args.only_null, filter_imei=args.imei, csv_path=csv_path)