tracksolid_timescale_grafan.../sync_driver_audit.py

232 lines
9.8 KiB
Python
Raw Permalink Normal View History

"""
sync_driver_audit.py Fireside Communications · Driver & IMEI Audit Sync
One-shot script: fetches ALL devices from Tracksolid API, compares driver
and IMEI details against the DB, reports gaps, and populates missing data.
Run inside the container:
docker exec -it <ingest_movement_container> python sync_driver_audit.py
Or via Coolify terminal with env vars loaded.
"""
import time
perf+fix: SAVEPOINT-per-item pollers, batched GPS inserts, parallel detail fetch Audit fixes across the ingestion stack: Observability - Move log_ingestion out of batch loops in poll_alarms and poll_parking (was emitting N cumulative log rows per run instead of one). - Add missing log_ingestion + t0 to poll_trips. - Count inserted via cur.rowcount instead of naive +=1 so ON CONFLICT DO NOTHING no longer inflates the metric. Resilience - SAVEPOINT-per-item added to poll_alarms, poll_live_positions, poll_trips, poll_parking so one bad row no longer aborts the batch (webhook handlers already had this; pollers were inconsistent). Performance - /pushgps and poll_track_list now use psycopg2.extras.execute_values with ON CONFLICT DO NOTHING — 10-50x write throughput on larger batches. - sync_devices and sync_driver_audit fetch jimi.track.device.detail concurrently via ThreadPoolExecutor(max_workers=8), cutting the daily registry sync from ~24s to ~3s for an 80-device fleet. - poll_track_list split into two phases: parallel API fetch (4 workers, no DB connection held) then one batched write. Previously the DB connection was held across every per-IMEI HTTP call, risking pool starvation. Security - _validate_token uses hmac.compare_digest for constant-time token comparison (closes timing side-channel). - _parse_data_list caps incoming items at WEBHOOK_MAX_ITEMS (default 5000) so a pathological push cannot blow memory. Tests - Fix test_null_alarm_type_skipped: its INSERT-count assertion was catching the ingestion_log insert written by log_ingestion. Filter that out so the test checks only data-table inserts. - Full suite: 66 passed. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 21:33:55 +00:00
from concurrent.futures import ThreadPoolExecutor
from ts_shared_rev import (
fix: BUG-01 ETL type crash, BUG-02 multi-account audit, BUG-03 diagnostic BUG-01 (CRITICAL): dwh_gold.refresh_daily_metrics inserted t.imei (TEXT) into fact_daily_fleet_metrics.vehicle_key (INTEGER REFERENCES dim_vehicles), so the nightly ETL would have raised "invalid input syntax for type integer" on every run. Migration 08 backfills dim_vehicles from tracksolid.devices and rewrites the function to JOIN through dim_vehicles, returning the serial vehicle_key. The function also re-syncs dim_vehicles at the top of each call so newly registered devices appear in the warehouse without manual seeding. BUG-02 (HIGH): sync_driver_audit.py only queried TARGET_ACCOUNT, ignoring the Fireside@HQ and Fireside_MSA sub-accounts. The audit now iterates TARGETS (matching FIX-M19 in ingest_movement_rev.sync_devices), dedupes devices by IMEI, and tolerates per-target failures. BUG-03 (HIGH, diagnostic only): the webhook trip handler stores item["miles"] straight into distance_km. The field name is suspicious and FIX-M16 already proved the polling endpoint mis-documents its units. Added a SQL diagnostic that compares the distribution of stored-km / great-circle-km for push-source vs poll-source trips over 30 days — the ratio test will tell us whether the push value needs a /1.609 (miles), /1000 (metres), or no conversion. The existing calculation is left unchanged until the data confirms the unit; the old FIX-M11 comment is replaced with a BUG-03 pointer to the diagnostic. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-15 12:34:43 +00:00
TARGETS,
api_post,
get_conn,
get_token,
clean,
clean_num,
clean_int,
clean_ts,
get_logger,
)
log = get_logger("driver_audit")
def run_audit():
log.info("=== Driver & IMEI Audit Sync ===")
t0 = time.time()
token = get_token()
if not token:
log.error("Could not obtain API token. Check credentials.")
return
fix: BUG-01 ETL type crash, BUG-02 multi-account audit, BUG-03 diagnostic BUG-01 (CRITICAL): dwh_gold.refresh_daily_metrics inserted t.imei (TEXT) into fact_daily_fleet_metrics.vehicle_key (INTEGER REFERENCES dim_vehicles), so the nightly ETL would have raised "invalid input syntax for type integer" on every run. Migration 08 backfills dim_vehicles from tracksolid.devices and rewrites the function to JOIN through dim_vehicles, returning the serial vehicle_key. The function also re-syncs dim_vehicles at the top of each call so newly registered devices appear in the warehouse without manual seeding. BUG-02 (HIGH): sync_driver_audit.py only queried TARGET_ACCOUNT, ignoring the Fireside@HQ and Fireside_MSA sub-accounts. The audit now iterates TARGETS (matching FIX-M19 in ingest_movement_rev.sync_devices), dedupes devices by IMEI, and tolerates per-target failures. BUG-03 (HIGH, diagnostic only): the webhook trip handler stores item["miles"] straight into distance_km. The field name is suspicious and FIX-M16 already proved the polling endpoint mis-documents its units. Added a SQL diagnostic that compares the distribution of stored-km / great-circle-km for push-source vs poll-source trips over 30 days — the ratio test will tell us whether the push value needs a /1.609 (miles), /1000 (metres), or no conversion. The existing calculation is left unchanged until the data confirms the unit; the old FIX-M11 comment is replaced with a BUG-03 pointer to the diagnostic. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-15 12:34:43 +00:00
# 1. Fetch all devices from API across every configured sub-account.
# [FIX-M19] The fleet spans multiple Tracksolid sub-accounts (e.g.
# fireside, Fireside@HQ, Fireside_MSA). Iterate TARGETS and dedupe by
# IMEI, keeping the most recently-seen record. Without this, devices
# registered under a non-primary sub-account never appear in the audit
# and never get upserted.
devices_by_imei: dict[str, dict] = {}
for target in TARGETS:
resp = api_post("jimi.user.device.list", {"target": target}, token)
if resp.get("code") != 0:
log.warning("device.list failed for target=%s: code=%s msg=%s",
target, resp.get("code"), resp.get("message"))
continue
for d in (resp.get("result") or []):
imei = d.get("imei")
if imei:
devices_by_imei[imei] = d
log.info(" target=%s returned %d devices", target, len(resp.get("result") or []))
fix: BUG-01 ETL type crash, BUG-02 multi-account audit, BUG-03 diagnostic BUG-01 (CRITICAL): dwh_gold.refresh_daily_metrics inserted t.imei (TEXT) into fact_daily_fleet_metrics.vehicle_key (INTEGER REFERENCES dim_vehicles), so the nightly ETL would have raised "invalid input syntax for type integer" on every run. Migration 08 backfills dim_vehicles from tracksolid.devices and rewrites the function to JOIN through dim_vehicles, returning the serial vehicle_key. The function also re-syncs dim_vehicles at the top of each call so newly registered devices appear in the warehouse without manual seeding. BUG-02 (HIGH): sync_driver_audit.py only queried TARGET_ACCOUNT, ignoring the Fireside@HQ and Fireside_MSA sub-accounts. The audit now iterates TARGETS (matching FIX-M19 in ingest_movement_rev.sync_devices), dedupes devices by IMEI, and tolerates per-target failures. BUG-03 (HIGH, diagnostic only): the webhook trip handler stores item["miles"] straight into distance_km. The field name is suspicious and FIX-M16 already proved the polling endpoint mis-documents its units. Added a SQL diagnostic that compares the distribution of stored-km / great-circle-km for push-source vs poll-source trips over 30 days — the ratio test will tell us whether the push value needs a /1.609 (miles), /1000 (metres), or no conversion. The existing calculation is left unchanged until the data confirms the unit; the old FIX-M11 comment is replaced with a BUG-03 pointer to the diagnostic. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-15 12:34:43 +00:00
api_devices = list(devices_by_imei.values())
if not api_devices:
log.error("No devices returned from any target. Aborting.")
return
log.info("API returned %d unique devices across %d target(s).",
len(api_devices), len(TARGETS))
# 2. Fetch current DB state
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT imei, device_name, driver_name, driver_phone, sim, status
FROM tracksolid.devices
ORDER BY imei
""")
db_rows = {row[0]: {
"device_name": row[1],
"driver_name": row[2],
"driver_phone": row[3],
"sim": row[4],
"status": row[5],
} for row in cur.fetchall()}
log.info("DB has %d devices registered.", len(db_rows))
# 3. Compare and report gaps
api_imeis = set()
missing_from_db = []
driver_gaps = []
driver_phone_gaps = []
for d in api_devices:
imei = d.get("imei")
if not imei:
continue
api_imeis.add(imei)
if imei not in db_rows:
missing_from_db.append(imei)
else:
db = db_rows[imei]
if not db["driver_name"] and clean(d.get("driverName")):
driver_gaps.append((imei, clean(d.get("driverName"))))
if not db["driver_phone"] and clean(d.get("driverPhone")):
driver_phone_gaps.append((imei, clean(d.get("driverPhone"))))
orphaned_in_db = set(db_rows.keys()) - api_imeis
# 4. Print gap report
print("\n" + "="*60)
print("AUDIT REPORT")
print("="*60)
print(f" API devices : {len(api_imeis)}")
print(f" DB devices : {len(db_rows)}")
print(f" New (API only): {len(missing_from_db)}")
print(f" Orphaned (DB) : {len(orphaned_in_db)}")
print(f" Missing driver_name (API has, DB null): {len(driver_gaps)}")
print(f" Missing driver_phone (API has, DB null): {len(driver_phone_gaps)}")
if missing_from_db:
print(f"\nIMEIs NOT in DB ({len(missing_from_db)}):")
for imei in missing_from_db:
print(f" {imei}")
if driver_gaps:
print(f"\nDevices missing driver_name in DB ({len(driver_gaps)}):")
for imei, name in driver_gaps:
print(f" {imei}'{name}'")
if driver_phone_gaps:
print(f"\nDevices missing driver_phone in DB ({len(driver_phone_gaps)}):")
for imei, phone in driver_phone_gaps:
print(f" {imei}'{phone}'")
if orphaned_in_db:
print(f"\nIMEIs in DB but NOT in API (orphaned/deactivated) ({len(orphaned_in_db)}):")
for imei in sorted(orphaned_in_db):
print(f" {imei}")
print("="*60)
# 5. Upsert ALL devices with full field sync (including driver info)
log.info("Starting full upsert of %d devices...", len(api_devices))
upserted = 0
perf+fix: SAVEPOINT-per-item pollers, batched GPS inserts, parallel detail fetch Audit fixes across the ingestion stack: Observability - Move log_ingestion out of batch loops in poll_alarms and poll_parking (was emitting N cumulative log rows per run instead of one). - Add missing log_ingestion + t0 to poll_trips. - Count inserted via cur.rowcount instead of naive +=1 so ON CONFLICT DO NOTHING no longer inflates the metric. Resilience - SAVEPOINT-per-item added to poll_alarms, poll_live_positions, poll_trips, poll_parking so one bad row no longer aborts the batch (webhook handlers already had this; pollers were inconsistent). Performance - /pushgps and poll_track_list now use psycopg2.extras.execute_values with ON CONFLICT DO NOTHING — 10-50x write throughput on larger batches. - sync_devices and sync_driver_audit fetch jimi.track.device.detail concurrently via ThreadPoolExecutor(max_workers=8), cutting the daily registry sync from ~24s to ~3s for an 80-device fleet. - poll_track_list split into two phases: parallel API fetch (4 workers, no DB connection held) then one batched write. Previously the DB connection was held across every per-IMEI HTTP call, risking pool starvation. Security - _validate_token uses hmac.compare_digest for constant-time token comparison (closes timing side-channel). - _parse_data_list caps incoming items at WEBHOOK_MAX_ITEMS (default 5000) so a pathological push cannot blow memory. Tests - Fix test_null_alarm_type_skipped: its INSERT-count assertion was catching the ingestion_log insert written by log_ingestion. Filter that out so the test checks only data-table inserts. - Full suite: 66 passed. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 21:33:55 +00:00
# Parallelize the per-device detail lookups (see ingest_movement.sync_devices).
def _fetch_detail(imei: str) -> dict:
detail_resp = api_post("jimi.track.device.detail", {"imei": imei}, token)
return detail_resp.get("result") or {} if detail_resp.get("code") == 0 else {}
imeis_to_fetch = [d.get("imei") for d in api_devices if d.get("imei")]
with ThreadPoolExecutor(max_workers=8) as pool:
details = dict(zip(imeis_to_fetch, pool.map(_fetch_detail, imeis_to_fetch)))
with get_conn() as conn:
with conn.cursor() as cur:
for d in api_devices:
imei = d.get("imei")
if not imei:
continue
perf+fix: SAVEPOINT-per-item pollers, batched GPS inserts, parallel detail fetch Audit fixes across the ingestion stack: Observability - Move log_ingestion out of batch loops in poll_alarms and poll_parking (was emitting N cumulative log rows per run instead of one). - Add missing log_ingestion + t0 to poll_trips. - Count inserted via cur.rowcount instead of naive +=1 so ON CONFLICT DO NOTHING no longer inflates the metric. Resilience - SAVEPOINT-per-item added to poll_alarms, poll_live_positions, poll_trips, poll_parking so one bad row no longer aborts the batch (webhook handlers already had this; pollers were inconsistent). Performance - /pushgps and poll_track_list now use psycopg2.extras.execute_values with ON CONFLICT DO NOTHING — 10-50x write throughput on larger batches. - sync_devices and sync_driver_audit fetch jimi.track.device.detail concurrently via ThreadPoolExecutor(max_workers=8), cutting the daily registry sync from ~24s to ~3s for an 80-device fleet. - poll_track_list split into two phases: parallel API fetch (4 workers, no DB connection held) then one batched write. Previously the DB connection was held across every per-IMEI HTTP call, risking pool starvation. Security - _validate_token uses hmac.compare_digest for constant-time token comparison (closes timing side-channel). - _parse_data_list caps incoming items at WEBHOOK_MAX_ITEMS (default 5000) so a pathological push cannot blow memory. Tests - Fix test_null_alarm_type_skipped: its INSERT-count assertion was catching the ingestion_log insert written by log_ingestion. Filter that out so the test checks only data-table inserts. - Full suite: 66 passed. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 21:33:55 +00:00
dtl = details.get(imei, {})
cur.execute("""
INSERT INTO tracksolid.devices (
imei, device_name, mc_type, mc_type_use_scope,
vehicle_name, vehicle_number, vehicle_models, vehicle_icon,
vin, engine_number, vehicle_brand, fuel_100km,
driver_name, driver_phone, sim, iccid, imsi,
account, customer_name, device_group_id, device_group,
activation_time, expiration, enabled_flag, status,
current_mileage_km, last_synced_at
) VALUES (
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW()
)
ON CONFLICT (imei) DO UPDATE SET
device_name = EXCLUDED.device_name,
mc_type = EXCLUDED.mc_type,
mc_type_use_scope = EXCLUDED.mc_type_use_scope,
vehicle_name = EXCLUDED.vehicle_name,
vehicle_number = EXCLUDED.vehicle_number,
vehicle_models = EXCLUDED.vehicle_models,
vehicle_icon = EXCLUDED.vehicle_icon,
vin = EXCLUDED.vin,
engine_number = EXCLUDED.engine_number,
vehicle_brand = EXCLUDED.vehicle_brand,
fuel_100km = EXCLUDED.fuel_100km,
driver_name = EXCLUDED.driver_name,
driver_phone = EXCLUDED.driver_phone,
sim = EXCLUDED.sim,
iccid = EXCLUDED.iccid,
imsi = EXCLUDED.imsi,
account = EXCLUDED.account,
customer_name = EXCLUDED.customer_name,
device_group_id = EXCLUDED.device_group_id,
device_group = EXCLUDED.device_group,
activation_time = EXCLUDED.activation_time,
expiration = EXCLUDED.expiration,
enabled_flag = EXCLUDED.enabled_flag,
status = EXCLUDED.status,
current_mileage_km = EXCLUDED.current_mileage_km,
last_synced_at = NOW(),
updated_at = NOW()
""", (
imei,
clean(d.get("deviceName")), clean(d.get("mcType")),
clean(d.get("mcTypeUseScope")), clean(d.get("vehicleName")),
clean(d.get("vehicleNumber")), clean(d.get("vehicleModels")),
clean(d.get("vehicleIcon")),
clean(dtl.get("vin")), clean(dtl.get("engineNumber")),
clean(dtl.get("vehicleBrand")), clean_num(dtl.get("fuel_100km")),
clean(d.get("driverName")), clean(d.get("driverPhone")),
clean(d.get("sim")), clean(dtl.get("iccid")),
clean(dtl.get("imsi")),
clean(dtl.get("account")), clean(dtl.get("customerName")),
clean(d.get("deviceGroupId")), clean(d.get("deviceGroup")),
clean_ts(d.get("activationTime")), clean_ts(d.get("expiration")),
clean_int(d.get("enabledFlag", 1)),
clean(dtl.get("status", "active")),
clean_num(dtl.get("currentMileage")),
))
upserted += 1
conn.commit()
elapsed = int((time.time() - t0) * 1000)
log.info("Done. Upserted %d devices in %dms.", upserted, elapsed)
print(f"\nSync complete: {upserted} devices upserted in {elapsed}ms.")
if __name__ == "__main__":
run_audit()