Three changes that together close the FK-violation loop on /pushalarm: 1. import_drivers_csv.py: when an IMEI is in the CSV but not in tracksolid.devices, INSERT a new row instead of skipping. Unblocks the 140 X3/JC400P devices listed as a HIGH open item in CLAUDE.md §10. 2. webhook_receiver_rev.py: new _ensure_device() helper upserts a stub devices row (status='unknown') before inserting an alarm. Handles the third class of devices — not in API sync, not in CSV (e.g. the X3-63282 Kampala device flagged in CLAUDE.md §10). 3. CSV refreshed from Downloads (Apr 21 version, 140 active rows). Also fixes alarm error log previously showing "None" (read deviceImei instead of the integration push's imei field). CSV import already applied live on the instance (63 → 201 devices). Webhook patch requires a Coolify redeploy to pick up _ensure_device(). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
251 lines
9.6 KiB
Python
251 lines
9.6 KiB
Python
"""
|
|
import_drivers_csv.py — Fireside Communications · Driver & Vehicle CSV Import
|
|
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
|
One-shot script: reads 20260414_FS__Logistics - final_fixed.csv, compares
|
|
each row against the current tracksolid.devices values, and updates the DB.
|
|
|
|
Usage:
|
|
# Dry-run — shows diff, writes nothing
|
|
python import_drivers_csv.py
|
|
|
|
# Filter to a single IMEI (dry-run)
|
|
python import_drivers_csv.py --imei 862798052707896
|
|
|
|
# Apply all changes to DB
|
|
python import_drivers_csv.py --apply
|
|
|
|
# Only fill fields that are currently NULL in the DB (never overwrite)
|
|
python import_drivers_csv.py --only-null --apply
|
|
|
|
Pre-requisite:
|
|
Migration 06 must be applied first (adds assigned_city / cost_centre columns).
|
|
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
|
"""
|
|
|
|
import argparse
|
|
import csv
|
|
import os
|
|
import sys
|
|
import time
|
|
from datetime import date
|
|
from pathlib import Path
|
|
|
|
from ts_shared_rev import clean, clean_num, clean_ts, get_conn, get_logger
|
|
|
|
log = get_logger("csv_import")
|
|
|
|
CSV_PATH = Path(__file__).parent / "20260414_FS__Logistics - final_fixed.csv"
|
|
|
|
# Columns fetched from DB for comparison
|
|
DB_COLS = [
|
|
"imei", "driver_name", "driver_phone", "vehicle_number", "vehicle_name",
|
|
"vehicle_models", "cost_centre", "sim", "iccid", "imsi", "mc_type",
|
|
"activation_time", "expiration", "device_name", "assigned_city",
|
|
]
|
|
|
|
# Driver Name values that are placeholders — skip writing driver_name for these
|
|
_DRIVER_SKIP = {"identification", "ug"}
|
|
|
|
|
|
def _infer_city(plate: str) -> str | None:
|
|
"""Derive assigned_city from license plate prefix."""
|
|
p = (plate or "").strip().upper()
|
|
if p.startswith("UMA") or p.startswith("UAG"):
|
|
return "KLA"
|
|
if p.startswith("K"):
|
|
return "NBO"
|
|
return None
|
|
|
|
|
|
def _clean_date(v: str) -> str | None:
|
|
"""Accept YYYY-MM-DD and return as ISO string suitable for TIMESTAMPTZ cast."""
|
|
s = (v or "").strip()
|
|
if not s:
|
|
return None
|
|
try:
|
|
date.fromisoformat(s)
|
|
return s
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def load_csv() -> dict[str, dict]:
|
|
"""Load CSV into a dict keyed by IMEI."""
|
|
rows: dict[str, dict] = {}
|
|
with open(CSV_PATH, encoding="utf-8-sig", newline="") as f:
|
|
for row in csv.DictReader(f):
|
|
imei = (row.get("IMEI") or "").strip()
|
|
if not imei:
|
|
continue
|
|
rows[imei] = row
|
|
log.info("CSV loaded: %d rows from %s", len(rows), CSV_PATH.name)
|
|
return rows
|
|
|
|
|
|
def load_db_devices() -> dict[str, dict]:
|
|
"""Fetch current device rows from DB, keyed by IMEI."""
|
|
devices: dict[str, dict] = {}
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(f"SELECT {', '.join(DB_COLS)} FROM tracksolid.devices")
|
|
col_names = [d[0] for d in cur.description]
|
|
for row in cur.fetchall():
|
|
rec = dict(zip(col_names, row))
|
|
devices[rec["imei"]] = rec
|
|
log.info("DB loaded: %d devices", len(devices))
|
|
return devices
|
|
|
|
|
|
def build_update(csv_row: dict, db_row: dict | None, only_null: bool) -> dict[str, object]:
|
|
"""
|
|
Return a dict of column→new_value for fields that need updating.
|
|
When only_null=True, skip any DB column that already has a value.
|
|
The driver_name column is skipped for placeholder-labelled devices.
|
|
"""
|
|
driver_raw = clean(csv_row.get("Driver Name")) or ""
|
|
plate = clean(csv_row.get("License Plate No.")) or ""
|
|
is_placeholder = driver_raw.lower() in _DRIVER_SKIP
|
|
skip_row = driver_raw.lower() == "identification"
|
|
|
|
if skip_row:
|
|
return {}
|
|
|
|
proposed: dict[str, object] = {
|
|
"vehicle_number": clean(plate),
|
|
"vehicle_name": clean(plate),
|
|
"vehicle_models": clean(csv_row.get("Vehicle Model")),
|
|
"cost_centre": clean(csv_row.get("Department")),
|
|
"sim": clean(csv_row.get("SIM")),
|
|
"iccid": clean(csv_row.get("ICCID")),
|
|
"imsi": clean(csv_row.get("IMSI")),
|
|
"mc_type": clean(csv_row.get("Model")),
|
|
"activation_time": _clean_date(csv_row.get("Activated Date", "")),
|
|
"expiration": _clean_date(csv_row.get("Subscription Expiration", "")),
|
|
"driver_phone": clean(csv_row.get("Telephone")),
|
|
"assigned_city": _infer_city(plate),
|
|
}
|
|
if not is_placeholder:
|
|
proposed["driver_name"] = driver_raw or None
|
|
|
|
# Drop None values — no point sending a NULL to overwrite another NULL
|
|
proposed = {k: v for k, v in proposed.items() if v is not None}
|
|
|
|
if not only_null or db_row is None:
|
|
return proposed
|
|
|
|
# only_null: drop any column that already has a non-null value in the DB
|
|
return {
|
|
k: v for k, v in proposed.items()
|
|
if db_row.get(k) is None
|
|
}
|
|
|
|
|
|
def print_diff(imei: str, updates: dict[str, object], db_row: dict | None) -> None:
|
|
"""Pretty-print what will change for one device."""
|
|
if not updates:
|
|
return
|
|
db = db_row or {}
|
|
print(f"\n IMEI {imei}:")
|
|
for col, new_val in sorted(updates.items()):
|
|
old_val = db.get(col)
|
|
if old_val != new_val:
|
|
print(f" {col:<20} {str(old_val):<30} → {new_val}")
|
|
|
|
|
|
def run(apply: bool, only_null: bool, filter_imei: str | None) -> None:
|
|
csv_rows = load_csv()
|
|
db_rows = load_db_devices()
|
|
|
|
if filter_imei:
|
|
csv_rows = {k: v for k, v in csv_rows.items() if k == filter_imei}
|
|
if not csv_rows:
|
|
print(f"IMEI {filter_imei} not found in CSV.")
|
|
return
|
|
|
|
updated = inserted = skipped = no_change = 0
|
|
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
for imei, csv_row in csv_rows.items():
|
|
db_row = db_rows.get(imei)
|
|
|
|
updates = build_update(csv_row, db_row, only_null)
|
|
|
|
if not updates:
|
|
# Either an "Identification" placeholder or nothing to change
|
|
driver_raw = (csv_row.get("Driver Name") or "").strip().lower()
|
|
if driver_raw == "identification":
|
|
skipped += 1
|
|
else:
|
|
no_change += 1
|
|
continue
|
|
|
|
if db_row is None:
|
|
# Device not yet synced from API — insert a stub row now so
|
|
# incoming alarms / positions don't trip the FK constraint.
|
|
print(f"\n [NEW] IMEI {imei}:")
|
|
for col, new_val in sorted(updates.items()):
|
|
print(f" {col:<20} → {new_val}")
|
|
if apply:
|
|
cols = ["imei"] + list(updates.keys())
|
|
vals = [imei] + [str(v) if v is not None else None for v in updates.values()]
|
|
placeholders = []
|
|
for col in cols:
|
|
if col in ("activation_time", "expiration"):
|
|
placeholders.append("%s::TIMESTAMPTZ")
|
|
else:
|
|
placeholders.append("%s")
|
|
cur.execute(
|
|
f"INSERT INTO tracksolid.devices ({', '.join(cols)}) "
|
|
f"VALUES ({', '.join(placeholders)}) "
|
|
"ON CONFLICT (imei) DO NOTHING",
|
|
vals,
|
|
)
|
|
inserted += 1
|
|
continue
|
|
|
|
print_diff(imei, updates, db_row)
|
|
|
|
if apply:
|
|
set_clauses = []
|
|
params = []
|
|
for col, val in updates.items():
|
|
if col in ("activation_time", "expiration"):
|
|
set_clauses.append(f"{col} = COALESCE(%s::TIMESTAMPTZ, {col})")
|
|
else:
|
|
set_clauses.append(
|
|
f"{col} = COALESCE(NULLIF(%s, ''), {col})"
|
|
)
|
|
params.append(str(val) if val is not None else None)
|
|
|
|
set_clauses.append("updated_at = NOW()")
|
|
params.append(imei)
|
|
|
|
cur.execute(
|
|
f"UPDATE tracksolid.devices SET {', '.join(set_clauses)} WHERE imei = %s",
|
|
params,
|
|
)
|
|
updated += 1
|
|
else:
|
|
updated += 1 # count as "would update" in dry-run
|
|
|
|
mode = "APPLIED" if apply else "DRY-RUN"
|
|
print(f"\n{'='*60}")
|
|
print(f" {mode} COMPLETE")
|
|
print(f"{'='*60}")
|
|
print(f" Would update / updated : {updated}")
|
|
print(f" Would insert / inserted: {inserted}")
|
|
print(f" No change needed : {no_change}")
|
|
print(f" Skipped (Identification): {skipped}")
|
|
if not apply:
|
|
print("\n Run with --apply to commit changes.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="Import driver/vehicle details from CSV into tracksolid.devices")
|
|
parser.add_argument("--apply", action="store_true", help="Write changes to DB (default: dry-run)")
|
|
parser.add_argument("--only-null", action="store_true", help="Only update fields currently NULL in the DB")
|
|
parser.add_argument("--imei", default=None, help="Limit to a single IMEI")
|
|
args = parser.parse_args()
|
|
|
|
run(apply=args.apply, only_null=args.only_null, filter_imei=args.imei)
|