tracksolid_timescale_grafan.../sync_driver_audit.py
David Kiania 8867be9d3d
Some checks are pending
Static Analysis / static (push) Waiting to run
Tests / test (push) Waiting to run
perf+fix: SAVEPOINT-per-item pollers, batched GPS inserts, parallel detail fetch
Audit fixes across the ingestion stack:

Observability
- Move log_ingestion out of batch loops in poll_alarms and poll_parking
  (was emitting N cumulative log rows per run instead of one).
- Add missing log_ingestion + t0 to poll_trips.
- Count inserted via cur.rowcount instead of naive +=1 so ON CONFLICT
  DO NOTHING no longer inflates the metric.

Resilience
- SAVEPOINT-per-item added to poll_alarms, poll_live_positions,
  poll_trips, poll_parking so one bad row no longer aborts the batch
  (webhook handlers already had this; pollers were inconsistent).

Performance
- /pushgps and poll_track_list now use psycopg2.extras.execute_values
  with ON CONFLICT DO NOTHING — 10-50x write throughput on larger
  batches.
- sync_devices and sync_driver_audit fetch jimi.track.device.detail
  concurrently via ThreadPoolExecutor(max_workers=8), cutting the
  daily registry sync from ~24s to ~3s for an 80-device fleet.
- poll_track_list split into two phases: parallel API fetch (4 workers,
  no DB connection held) then one batched write. Previously the DB
  connection was held across every per-IMEI HTTP call, risking pool
  starvation.

Security
- _validate_token uses hmac.compare_digest for constant-time token
  comparison (closes timing side-channel).
- _parse_data_list caps incoming items at WEBHOOK_MAX_ITEMS (default
  5000) so a pathological push cannot blow memory.

Tests
- Fix test_null_alarm_type_skipped: its INSERT-count assertion was
  catching the ingestion_log insert written by log_ingestion. Filter
  that out so the test checks only data-table inserts.
- Full suite: 66 passed.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-18 00:33:55 +03:00

214 lines
8.9 KiB
Python

"""
sync_driver_audit.py — Fireside Communications · Driver & IMEI Audit Sync
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
One-shot script: fetches ALL devices from Tracksolid API, compares driver
and IMEI details against the DB, reports gaps, and populates missing data.
Run inside the container:
docker exec -it <ingest_movement_container> python sync_driver_audit.py
Or via Coolify terminal with env vars loaded.
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
"""
import time
from concurrent.futures import ThreadPoolExecutor
from ts_shared_rev import (
TARGET_ACCOUNT,
api_post,
get_conn,
get_token,
clean,
clean_num,
clean_int,
clean_ts,
get_logger,
)
log = get_logger("driver_audit")
def run_audit():
log.info("=== Driver & IMEI Audit Sync ===")
t0 = time.time()
token = get_token()
if not token:
log.error("Could not obtain API token. Check credentials.")
return
# 1. Fetch all devices from API
resp = api_post("jimi.user.device.list", {"target": TARGET_ACCOUNT}, token)
if resp.get("code") != 0:
log.error("API error: %s", resp)
return
api_devices = resp.get("result") or []
log.info("API returned %d devices.", len(api_devices))
# 2. Fetch current DB state
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT imei, device_name, driver_name, driver_phone, sim, status
FROM tracksolid.devices
ORDER BY imei
""")
db_rows = {row[0]: {
"device_name": row[1],
"driver_name": row[2],
"driver_phone": row[3],
"sim": row[4],
"status": row[5],
} for row in cur.fetchall()}
log.info("DB has %d devices registered.", len(db_rows))
# 3. Compare and report gaps
api_imeis = set()
missing_from_db = []
driver_gaps = []
driver_phone_gaps = []
for d in api_devices:
imei = d.get("imei")
if not imei:
continue
api_imeis.add(imei)
if imei not in db_rows:
missing_from_db.append(imei)
else:
db = db_rows[imei]
if not db["driver_name"] and clean(d.get("driverName")):
driver_gaps.append((imei, clean(d.get("driverName"))))
if not db["driver_phone"] and clean(d.get("driverPhone")):
driver_phone_gaps.append((imei, clean(d.get("driverPhone"))))
orphaned_in_db = set(db_rows.keys()) - api_imeis
# 4. Print gap report
print("\n" + "="*60)
print("AUDIT REPORT")
print("="*60)
print(f" API devices : {len(api_imeis)}")
print(f" DB devices : {len(db_rows)}")
print(f" New (API only): {len(missing_from_db)}")
print(f" Orphaned (DB) : {len(orphaned_in_db)}")
print(f" Missing driver_name (API has, DB null): {len(driver_gaps)}")
print(f" Missing driver_phone (API has, DB null): {len(driver_phone_gaps)}")
if missing_from_db:
print(f"\nIMEIs NOT in DB ({len(missing_from_db)}):")
for imei in missing_from_db:
print(f" {imei}")
if driver_gaps:
print(f"\nDevices missing driver_name in DB ({len(driver_gaps)}):")
for imei, name in driver_gaps:
print(f" {imei}'{name}'")
if driver_phone_gaps:
print(f"\nDevices missing driver_phone in DB ({len(driver_phone_gaps)}):")
for imei, phone in driver_phone_gaps:
print(f" {imei}'{phone}'")
if orphaned_in_db:
print(f"\nIMEIs in DB but NOT in API (orphaned/deactivated) ({len(orphaned_in_db)}):")
for imei in sorted(orphaned_in_db):
print(f" {imei}")
print("="*60)
# 5. Upsert ALL devices with full field sync (including driver info)
log.info("Starting full upsert of %d devices...", len(api_devices))
upserted = 0
# Parallelize the per-device detail lookups (see ingest_movement.sync_devices).
def _fetch_detail(imei: str) -> dict:
detail_resp = api_post("jimi.track.device.detail", {"imei": imei}, token)
return detail_resp.get("result") or {} if detail_resp.get("code") == 0 else {}
imeis_to_fetch = [d.get("imei") for d in api_devices if d.get("imei")]
with ThreadPoolExecutor(max_workers=8) as pool:
details = dict(zip(imeis_to_fetch, pool.map(_fetch_detail, imeis_to_fetch)))
with get_conn() as conn:
with conn.cursor() as cur:
for d in api_devices:
imei = d.get("imei")
if not imei:
continue
dtl = details.get(imei, {})
cur.execute("""
INSERT INTO tracksolid.devices (
imei, device_name, mc_type, mc_type_use_scope,
vehicle_name, vehicle_number, vehicle_models, vehicle_icon,
vin, engine_number, vehicle_brand, fuel_100km,
driver_name, driver_phone, sim, iccid, imsi,
account, customer_name, device_group_id, device_group,
activation_time, expiration, enabled_flag, status,
current_mileage_km, last_synced_at
) VALUES (
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW()
)
ON CONFLICT (imei) DO UPDATE SET
device_name = EXCLUDED.device_name,
mc_type = EXCLUDED.mc_type,
mc_type_use_scope = EXCLUDED.mc_type_use_scope,
vehicle_name = EXCLUDED.vehicle_name,
vehicle_number = EXCLUDED.vehicle_number,
vehicle_models = EXCLUDED.vehicle_models,
vehicle_icon = EXCLUDED.vehicle_icon,
vin = EXCLUDED.vin,
engine_number = EXCLUDED.engine_number,
vehicle_brand = EXCLUDED.vehicle_brand,
fuel_100km = EXCLUDED.fuel_100km,
driver_name = EXCLUDED.driver_name,
driver_phone = EXCLUDED.driver_phone,
sim = EXCLUDED.sim,
iccid = EXCLUDED.iccid,
imsi = EXCLUDED.imsi,
account = EXCLUDED.account,
customer_name = EXCLUDED.customer_name,
device_group_id = EXCLUDED.device_group_id,
device_group = EXCLUDED.device_group,
activation_time = EXCLUDED.activation_time,
expiration = EXCLUDED.expiration,
enabled_flag = EXCLUDED.enabled_flag,
status = EXCLUDED.status,
current_mileage_km = EXCLUDED.current_mileage_km,
last_synced_at = NOW(),
updated_at = NOW()
""", (
imei,
clean(d.get("deviceName")), clean(d.get("mcType")),
clean(d.get("mcTypeUseScope")), clean(d.get("vehicleName")),
clean(d.get("vehicleNumber")), clean(d.get("vehicleModels")),
clean(d.get("vehicleIcon")),
clean(dtl.get("vin")), clean(dtl.get("engineNumber")),
clean(dtl.get("vehicleBrand")), clean_num(dtl.get("fuel_100km")),
clean(d.get("driverName")), clean(d.get("driverPhone")),
clean(d.get("sim")), clean(dtl.get("iccid")),
clean(dtl.get("imsi")),
clean(dtl.get("account")), clean(dtl.get("customerName")),
clean(d.get("deviceGroupId")), clean(d.get("deviceGroup")),
clean_ts(d.get("activationTime")), clean_ts(d.get("expiration")),
clean_int(d.get("enabledFlag", 1)),
clean(dtl.get("status", "active")),
clean_num(dtl.get("currentMileage")),
))
upserted += 1
conn.commit()
elapsed = int((time.time() - t0) * 1000)
log.info("Done. Upserted %d devices in %dms.", upserted, elapsed)
print(f"\nSync complete: {upserted} devices upserted in {elapsed}ms.")
if __name__ == "__main__":
run_audit()