tracksolid_timescale_grafan.../import_drivers_csv.py
David Kiania 257643cae2
Some checks failed
Static Analysis / static (push) Waiting to run
Tests / test (push) Waiting to run
Static Analysis / static (pull_request) Has been cancelled
Tests / test (pull_request) Has been cancelled
fix: auto-register devices on push + allow CSV import to insert new rows
Three changes that together close the FK-violation loop on /pushalarm:

1. import_drivers_csv.py: when an IMEI is in the CSV but not in
   tracksolid.devices, INSERT a new row instead of skipping. Unblocks
   the 140 X3/JC400P devices listed as a HIGH open item in CLAUDE.md §10.

2. webhook_receiver_rev.py: new _ensure_device() helper upserts a stub
   devices row (status='unknown') before inserting an alarm. Handles the
   third class of devices — not in API sync, not in CSV (e.g. the
   X3-63282 Kampala device flagged in CLAUDE.md §10).

3. CSV refreshed from Downloads (Apr 21 version, 140 active rows).

Also fixes alarm error log previously showing "None" (read deviceImei
instead of the integration push's imei field).

CSV import already applied live on the instance (63 → 201 devices).
Webhook patch requires a Coolify redeploy to pick up _ensure_device().

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-21 12:29:32 +03:00

251 lines
9.6 KiB
Python

"""
import_drivers_csv.py — Fireside Communications · Driver & Vehicle CSV Import
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
One-shot script: reads 20260414_FS__Logistics - final_fixed.csv, compares
each row against the current tracksolid.devices values, and updates the DB.
Usage:
# Dry-run — shows diff, writes nothing
python import_drivers_csv.py
# Filter to a single IMEI (dry-run)
python import_drivers_csv.py --imei 862798052707896
# Apply all changes to DB
python import_drivers_csv.py --apply
# Only fill fields that are currently NULL in the DB (never overwrite)
python import_drivers_csv.py --only-null --apply
Pre-requisite:
Migration 06 must be applied first (adds assigned_city / cost_centre columns).
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
"""
import argparse
import csv
import os
import sys
import time
from datetime import date
from pathlib import Path
from ts_shared_rev import clean, clean_num, clean_ts, get_conn, get_logger
log = get_logger("csv_import")
CSV_PATH = Path(__file__).parent / "20260414_FS__Logistics - final_fixed.csv"
# Columns fetched from DB for comparison
DB_COLS = [
"imei", "driver_name", "driver_phone", "vehicle_number", "vehicle_name",
"vehicle_models", "cost_centre", "sim", "iccid", "imsi", "mc_type",
"activation_time", "expiration", "device_name", "assigned_city",
]
# Driver Name values that are placeholders — skip writing driver_name for these
_DRIVER_SKIP = {"identification", "ug"}
def _infer_city(plate: str) -> str | None:
"""Derive assigned_city from license plate prefix."""
p = (plate or "").strip().upper()
if p.startswith("UMA") or p.startswith("UAG"):
return "KLA"
if p.startswith("K"):
return "NBO"
return None
def _clean_date(v: str) -> str | None:
"""Accept YYYY-MM-DD and return as ISO string suitable for TIMESTAMPTZ cast."""
s = (v or "").strip()
if not s:
return None
try:
date.fromisoformat(s)
return s
except ValueError:
return None
def load_csv() -> dict[str, dict]:
"""Load CSV into a dict keyed by IMEI."""
rows: dict[str, dict] = {}
with open(CSV_PATH, encoding="utf-8-sig", newline="") as f:
for row in csv.DictReader(f):
imei = (row.get("IMEI") or "").strip()
if not imei:
continue
rows[imei] = row
log.info("CSV loaded: %d rows from %s", len(rows), CSV_PATH.name)
return rows
def load_db_devices() -> dict[str, dict]:
"""Fetch current device rows from DB, keyed by IMEI."""
devices: dict[str, dict] = {}
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(f"SELECT {', '.join(DB_COLS)} FROM tracksolid.devices")
col_names = [d[0] for d in cur.description]
for row in cur.fetchall():
rec = dict(zip(col_names, row))
devices[rec["imei"]] = rec
log.info("DB loaded: %d devices", len(devices))
return devices
def build_update(csv_row: dict, db_row: dict | None, only_null: bool) -> dict[str, object]:
"""
Return a dict of column→new_value for fields that need updating.
When only_null=True, skip any DB column that already has a value.
The driver_name column is skipped for placeholder-labelled devices.
"""
driver_raw = clean(csv_row.get("Driver Name")) or ""
plate = clean(csv_row.get("License Plate No.")) or ""
is_placeholder = driver_raw.lower() in _DRIVER_SKIP
skip_row = driver_raw.lower() == "identification"
if skip_row:
return {}
proposed: dict[str, object] = {
"vehicle_number": clean(plate),
"vehicle_name": clean(plate),
"vehicle_models": clean(csv_row.get("Vehicle Model")),
"cost_centre": clean(csv_row.get("Department")),
"sim": clean(csv_row.get("SIM")),
"iccid": clean(csv_row.get("ICCID")),
"imsi": clean(csv_row.get("IMSI")),
"mc_type": clean(csv_row.get("Model")),
"activation_time": _clean_date(csv_row.get("Activated Date", "")),
"expiration": _clean_date(csv_row.get("Subscription Expiration", "")),
"driver_phone": clean(csv_row.get("Telephone")),
"assigned_city": _infer_city(plate),
}
if not is_placeholder:
proposed["driver_name"] = driver_raw or None
# Drop None values — no point sending a NULL to overwrite another NULL
proposed = {k: v for k, v in proposed.items() if v is not None}
if not only_null or db_row is None:
return proposed
# only_null: drop any column that already has a non-null value in the DB
return {
k: v for k, v in proposed.items()
if db_row.get(k) is None
}
def print_diff(imei: str, updates: dict[str, object], db_row: dict | None) -> None:
"""Pretty-print what will change for one device."""
if not updates:
return
db = db_row or {}
print(f"\n IMEI {imei}:")
for col, new_val in sorted(updates.items()):
old_val = db.get(col)
if old_val != new_val:
print(f" {col:<20} {str(old_val):<30}{new_val}")
def run(apply: bool, only_null: bool, filter_imei: str | None) -> None:
csv_rows = load_csv()
db_rows = load_db_devices()
if filter_imei:
csv_rows = {k: v for k, v in csv_rows.items() if k == filter_imei}
if not csv_rows:
print(f"IMEI {filter_imei} not found in CSV.")
return
updated = inserted = skipped = no_change = 0
with get_conn() as conn:
with conn.cursor() as cur:
for imei, csv_row in csv_rows.items():
db_row = db_rows.get(imei)
updates = build_update(csv_row, db_row, only_null)
if not updates:
# Either an "Identification" placeholder or nothing to change
driver_raw = (csv_row.get("Driver Name") or "").strip().lower()
if driver_raw == "identification":
skipped += 1
else:
no_change += 1
continue
if db_row is None:
# Device not yet synced from API — insert a stub row now so
# incoming alarms / positions don't trip the FK constraint.
print(f"\n [NEW] IMEI {imei}:")
for col, new_val in sorted(updates.items()):
print(f" {col:<20}{new_val}")
if apply:
cols = ["imei"] + list(updates.keys())
vals = [imei] + [str(v) if v is not None else None for v in updates.values()]
placeholders = []
for col in cols:
if col in ("activation_time", "expiration"):
placeholders.append("%s::TIMESTAMPTZ")
else:
placeholders.append("%s")
cur.execute(
f"INSERT INTO tracksolid.devices ({', '.join(cols)}) "
f"VALUES ({', '.join(placeholders)}) "
"ON CONFLICT (imei) DO NOTHING",
vals,
)
inserted += 1
continue
print_diff(imei, updates, db_row)
if apply:
set_clauses = []
params = []
for col, val in updates.items():
if col in ("activation_time", "expiration"):
set_clauses.append(f"{col} = COALESCE(%s::TIMESTAMPTZ, {col})")
else:
set_clauses.append(
f"{col} = COALESCE(NULLIF(%s, ''), {col})"
)
params.append(str(val) if val is not None else None)
set_clauses.append("updated_at = NOW()")
params.append(imei)
cur.execute(
f"UPDATE tracksolid.devices SET {', '.join(set_clauses)} WHERE imei = %s",
params,
)
updated += 1
else:
updated += 1 # count as "would update" in dry-run
mode = "APPLIED" if apply else "DRY-RUN"
print(f"\n{'='*60}")
print(f" {mode} COMPLETE")
print(f"{'='*60}")
print(f" Would update / updated : {updated}")
print(f" Would insert / inserted: {inserted}")
print(f" No change needed : {no_change}")
print(f" Skipped (Identification): {skipped}")
if not apply:
print("\n Run with --apply to commit changes.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Import driver/vehicle details from CSV into tracksolid.devices")
parser.add_argument("--apply", action="store_true", help="Write changes to DB (default: dry-run)")
parser.add_argument("--only-null", action="store_true", help="Only update fields currently NULL in the DB")
parser.add_argument("--imei", default=None, help="Limit to a single IMEI")
args = parser.parse_args()
run(apply=args.apply, only_null=args.only_null, filter_imei=args.imei)