From 8d386bf27a604c9839bba47384dd86f6aceaf17c Mon Sep 17 00:00:00 2001 From: david kiania Date: Fri, 15 May 2026 15:34:43 +0300 Subject: [PATCH 1/2] fix: BUG-01 ETL type crash, BUG-02 multi-account audit, BUG-03 diagnostic MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit BUG-01 (CRITICAL): dwh_gold.refresh_daily_metrics inserted t.imei (TEXT) into fact_daily_fleet_metrics.vehicle_key (INTEGER REFERENCES dim_vehicles), so the nightly ETL would have raised "invalid input syntax for type integer" on every run. Migration 08 backfills dim_vehicles from tracksolid.devices and rewrites the function to JOIN through dim_vehicles, returning the serial vehicle_key. The function also re-syncs dim_vehicles at the top of each call so newly registered devices appear in the warehouse without manual seeding. BUG-02 (HIGH): sync_driver_audit.py only queried TARGET_ACCOUNT, ignoring the Fireside@HQ and Fireside_MSA sub-accounts. The audit now iterates TARGETS (matching FIX-M19 in ingest_movement_rev.sync_devices), dedupes devices by IMEI, and tolerates per-target failures. BUG-03 (HIGH, diagnostic only): the webhook trip handler stores item["miles"] straight into distance_km. The field name is suspicious and FIX-M16 already proved the polling endpoint mis-documents its units. Added a SQL diagnostic that compares the distribution of stored-km / great-circle-km for push-source vs poll-source trips over 30 days — the ratio test will tell us whether the push value needs a /1.609 (miles), /1000 (metres), or no conversion. The existing calculation is left unchanged until the data confirms the unit; the old FIX-M11 comment is replaced with a BUG-03 pointer to the diagnostic. Co-Authored-By: Claude Opus 4.7 --- 08_fix_etl_vehicle_key.sql | 104 ++++++++++++++++++ .../checks/bug03_webhook_distance_units.sql | 85 ++++++++++++++ run_migrations.py | 1 + sync_driver_audit.py | 33 ++++-- webhook_receiver_rev.py | 9 +- 5 files changed, 223 insertions(+), 9 deletions(-) create mode 100644 08_fix_etl_vehicle_key.sql create mode 100644 db_audit/checks/bug03_webhook_distance_units.sql diff --git a/08_fix_etl_vehicle_key.sql b/08_fix_etl_vehicle_key.sql new file mode 100644 index 0000000..c49053b --- /dev/null +++ b/08_fix_etl_vehicle_key.sql @@ -0,0 +1,104 @@ +-- 08_fix_etl_vehicle_key.sql +-- Fixes BUG-01: dwh_gold.refresh_daily_metrics() inserted t.imei (TEXT) into +-- fact_daily_fleet_metrics.vehicle_key (INTEGER REFERENCES dim_vehicles), so +-- every nightly call raised "invalid input syntax for type integer". +-- +-- The fix has two parts: +-- 1. Seed dwh_gold.dim_vehicles from tracksolid.devices so every IMEI has +-- a serial vehicle_key to point at. +-- 2. Rewrite refresh_daily_metrics() to JOIN through dim_vehicles and +-- SELECT the serial key instead of the raw IMEI. The function also +-- upserts dim_vehicles at the top of each run so newly-registered +-- devices appear in the warehouse without manual intervention. + +BEGIN; + +-- ── 1. Backfill dim_vehicles ───────────────────────────────────────────────── +-- One row per device. is_active mirrors enabled_flag; vehicle_number tracks +-- the plate so dashboards can label charts without joining back to devices. +INSERT INTO dwh_gold.dim_vehicles (imei, vehicle_number, is_active) +SELECT + d.imei, + d.vehicle_number, + COALESCE(d.enabled_flag, 1) = 1 +FROM tracksolid.devices d +ON CONFLICT (imei) DO UPDATE SET + vehicle_number = EXCLUDED.vehicle_number, + is_active = EXCLUDED.is_active; + +-- ── 2. Rewrite ETL function ────────────────────────────────────────────────── +CREATE OR REPLACE FUNCTION dwh_gold.refresh_daily_metrics(target_date DATE) +RETURNS void LANGUAGE plpgsql AS $$ +BEGIN + -- Sync dim_vehicles first so any IMEI seen in trips has a vehicle_key. + -- Without this, a brand-new device would have trip rows but no + -- dim_vehicles entry, and the JOIN below would drop its metrics. + INSERT INTO dwh_gold.dim_vehicles (imei, vehicle_number, is_active) + SELECT + d.imei, + d.vehicle_number, + COALESCE(d.enabled_flag, 1) = 1 + FROM tracksolid.devices d + ON CONFLICT (imei) DO UPDATE SET + vehicle_number = EXCLUDED.vehicle_number, + is_active = EXCLUDED.is_active; + + INSERT INTO dwh_gold.fact_daily_fleet_metrics ( + day, + vehicle_key, + total_distance_km, + total_trips, + total_drive_hours, + total_idle_hours, + fuel_consumed_l, + alarm_count, + overspeed_count, + day_start_time, + day_end_time, + avg_speed_kmh, + peak_speed_kmh + ) + SELECT + target_date AS day, + dv.vehicle_key AS vehicle_key, + ROUND(SUM(t.distance_km)::numeric, 3) AS total_distance_km, + COUNT(*) AS total_trips, + ROUND((SUM(t.driving_time_s) / 3600.0)::numeric, 2) AS total_drive_hours, + ROUND((SUM(t.idle_time_s) / 3600.0)::numeric, 2) AS total_idle_hours, + ROUND(SUM(t.fuel_consumed_l)::numeric, 3) AS fuel_consumed_l, + COUNT(a.id) AS alarm_count, + COUNT(a.id) FILTER (WHERE a.alarm_type ILIKE '%speed%') AS overspeed_count, + MIN(t.start_time AT TIME ZONE 'Africa/Nairobi')::TIME AS day_start_time, + MAX(t.end_time AT TIME ZONE 'Africa/Nairobi')::TIME AS day_end_time, + ROUND(AVG(t.avg_speed_kmh)::numeric, 2) AS avg_speed_kmh, + MAX(t.max_speed_kmh) AS peak_speed_kmh + FROM tracksolid.trips t + JOIN dwh_gold.dim_vehicles dv + ON dv.imei = t.imei + LEFT JOIN tracksolid.alarms a + ON a.imei = t.imei + AND DATE(a.alarm_time AT TIME ZONE 'Africa/Nairobi') = target_date + WHERE DATE(t.start_time AT TIME ZONE 'Africa/Nairobi') = target_date + AND t.end_time IS NOT NULL + GROUP BY dv.vehicle_key + ON CONFLICT (day, vehicle_key) DO UPDATE SET + total_distance_km = EXCLUDED.total_distance_km, + total_trips = EXCLUDED.total_trips, + total_drive_hours = EXCLUDED.total_drive_hours, + total_idle_hours = EXCLUDED.total_idle_hours, + fuel_consumed_l = EXCLUDED.fuel_consumed_l, + alarm_count = EXCLUDED.alarm_count, + overspeed_count = EXCLUDED.overspeed_count, + day_start_time = EXCLUDED.day_start_time, + day_end_time = EXCLUDED.day_end_time, + avg_speed_kmh = EXCLUDED.avg_speed_kmh, + peak_speed_kmh = EXCLUDED.peak_speed_kmh; +END; +$$; + +COMMENT ON FUNCTION dwh_gold.refresh_daily_metrics(DATE) + IS 'Populates or refreshes fact_daily_fleet_metrics for the given date. ' + 'Joins tracksolid.trips through dwh_gold.dim_vehicles to map IMEI → vehicle_key. ' + 'Call nightly: SELECT dwh_gold.refresh_daily_metrics(CURRENT_DATE - 1);'; + +COMMIT; diff --git a/db_audit/checks/bug03_webhook_distance_units.sql b/db_audit/checks/bug03_webhook_distance_units.sql new file mode 100644 index 0000000..2ba2833 --- /dev/null +++ b/db_audit/checks/bug03_webhook_distance_units.sql @@ -0,0 +1,85 @@ +-- BUG-03 diagnostic: is the webhook `miles` field actually miles, or km? +-- +-- Context: ingest_movement_rev.poll_trips divides the API `distance` field by +-- 1000 (FIX-M16: API returns metres despite docs saying km). The webhook +-- handler in webhook_receiver_rev.push_trip_report stores `item.get("miles")` +-- directly into distance_km with no unit conversion. The field name is +-- suspicious. If Jimi actually pushes miles, every push-sourced trip distance +-- is inflated by ≈ 1.609×. +-- +-- We cannot diff push vs poll on the SAME (imei, start_time) row because both +-- paths ON CONFLICT DO UPDATE distance_km — the later writer overwrites the +-- earlier one. So instead, this script compares the *distribution* of the +-- distance_km / great_circle_km ratio for push-source vs poll-source trips +-- across the last 30 days. +-- +-- A driven trip on real roads should have road_km > great_circle_km +-- (ratio > 1). If push and poll come from the same fleet on the same roads, +-- their median ratios should be roughly equal. If push's median ratio is +-- ≈ 1.609× higher than poll's, the `miles` field is miles and the webhook +-- needs to divide by 1.609 (or by 1000 if Jimi is actually sending metres +-- like the polling endpoint does — the ratio test catches both). +-- +-- Run: +-- docker exec -i $DB psql -U postgres -d tracksolid_db \ +-- < db_audit/checks/bug03_webhook_distance_units.sql + +-- ── 1. Per-trip ratio of stored distance to start→end great-circle distance. +WITH trip_ratios AS ( + SELECT + t.source, + t.imei, + t.start_time, + t.distance_km, + ST_Distance(t.start_geom::geography, t.end_geom::geography) / 1000.0 + AS great_circle_km, + CASE + WHEN ST_Distance(t.start_geom::geography, t.end_geom::geography) > 0 + THEN t.distance_km + / (ST_Distance(t.start_geom::geography, t.end_geom::geography) / 1000.0) + ELSE NULL + END AS ratio + FROM tracksolid.trips t + WHERE t.start_time > NOW() - INTERVAL '30 days' + AND t.start_geom IS NOT NULL + AND t.end_geom IS NOT NULL + AND t.distance_km IS NOT NULL + AND t.distance_km > 0.1 -- filter out idle creep + AND ST_Distance(t.start_geom::geography, + t.end_geom::geography) > 200 -- ignore round-trips / parking +) +SELECT + source, + COUNT(*) AS trips, + ROUND(AVG(ratio)::numeric, 3) AS avg_ratio, + ROUND((percentile_cont(0.5) WITHIN GROUP (ORDER BY ratio))::numeric, 3) + AS median_ratio, + ROUND(MIN(ratio)::numeric, 3) AS min_ratio, + ROUND(MAX(ratio)::numeric, 3) AS max_ratio +FROM trip_ratios +GROUP BY source +ORDER BY source; + +-- ── 2. Side-by-side sample of recent push vs poll trips for spot-checking. +-- If push distance_km is consistently ≈ 1.609× the great_circle for +-- short, near-straight trips while poll trips show smaller ratios, +-- the unit conversion is needed. +SELECT + source, + imei, + start_time, + ROUND(distance_km::numeric, 2) AS stored_km, + ROUND(( + ST_Distance(start_geom::geography, end_geom::geography) / 1000.0 + )::numeric, 2) AS straight_line_km, + ROUND(( + distance_km + / NULLIF(ST_Distance(start_geom::geography, end_geom::geography) / 1000.0, 0) + )::numeric, 2) AS ratio +FROM tracksolid.trips +WHERE start_time > NOW() - INTERVAL '7 days' + AND start_geom IS NOT NULL + AND end_geom IS NOT NULL + AND distance_km > 0.5 +ORDER BY start_time DESC +LIMIT 40; diff --git a/run_migrations.py b/run_migrations.py index 3a98169..961191f 100644 --- a/run_migrations.py +++ b/run_migrations.py @@ -31,6 +31,7 @@ MIGRATIONS = [ "05_enhancement_migration.sql", # new tables, OBD columns, dwh_gold expansion "06_business_analytics_migration.sql", # ops schema, dispatch_log, assigned_city "07_analytics_views.sql", # Grafana-facing views in tracksolid.* + "08_fix_etl_vehicle_key.sql", # BUG-01: ETL TEXT→INTEGER mismatch on vehicle_key ] # ── Tables that must exist before the service is allowed to start ───────────── diff --git a/sync_driver_audit.py b/sync_driver_audit.py index 2248346..548a5a8 100644 --- a/sync_driver_audit.py +++ b/sync_driver_audit.py @@ -15,7 +15,7 @@ import time from concurrent.futures import ThreadPoolExecutor from ts_shared_rev import ( - TARGET_ACCOUNT, + TARGETS, api_post, get_conn, get_token, @@ -37,14 +37,31 @@ def run_audit(): log.error("Could not obtain API token. Check credentials.") return - # 1. Fetch all devices from API - resp = api_post("jimi.user.device.list", {"target": TARGET_ACCOUNT}, token) - if resp.get("code") != 0: - log.error("API error: %s", resp) - return + # 1. Fetch all devices from API across every configured sub-account. + # [FIX-M19] The fleet spans multiple Tracksolid sub-accounts (e.g. + # fireside, Fireside@HQ, Fireside_MSA). Iterate TARGETS and dedupe by + # IMEI, keeping the most recently-seen record. Without this, devices + # registered under a non-primary sub-account never appear in the audit + # and never get upserted. + devices_by_imei: dict[str, dict] = {} + for target in TARGETS: + resp = api_post("jimi.user.device.list", {"target": target}, token) + if resp.get("code") != 0: + log.warning("device.list failed for target=%s: code=%s msg=%s", + target, resp.get("code"), resp.get("message")) + continue + for d in (resp.get("result") or []): + imei = d.get("imei") + if imei: + devices_by_imei[imei] = d + log.info(" target=%s returned %d devices", target, len(resp.get("result") or [])) - api_devices = resp.get("result") or [] - log.info("API returned %d devices.", len(api_devices)) + api_devices = list(devices_by_imei.values()) + if not api_devices: + log.error("No devices returned from any target. Aborting.") + return + log.info("API returned %d unique devices across %d target(s).", + len(api_devices), len(TARGETS)) # 2. Fetch current DB state with get_conn() as conn: diff --git a/webhook_receiver_rev.py b/webhook_receiver_rev.py index b0d7108..4fe2051 100644 --- a/webhook_receiver_rev.py +++ b/webhook_receiver_rev.py @@ -543,7 +543,14 @@ async def push_trip_report(request: Request): cur.execute("RELEASE SAVEPOINT sp") continue - # [FIX-M11] API sends km. Store directly as distance_km. + # [BUG-03 — under investigation] The field is named `miles` + # but FIX-M11 claims it is already km. The polling endpoint + # was found to return metres despite docs (FIX-M16), so the + # webhook unit is suspect. Until the diagnostic + # db_audit/checks/bug03_webhook_distance_units.sql confirms + # the unit, store the raw value unchanged and rely on the + # diagnostic's ratio test to decide whether a /1.609 or + # /1000 conversion is needed. distance_km = clean_num(item.get("miles")) begin_lat = clean_num(item.get("beginLat")) -- 2.45.2 From 7bc0a2ce8730396fdc309b1c68caaf1b1b96c5e6 Mon Sep 17 00:00:00 2001 From: david kiania Date: Fri, 15 May 2026 16:44:40 +0300 Subject: [PATCH 2/2] fix(BUG-01b): aggregate trips and alarms in separate CTEs to avoid cartesian explosion MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The original refresh_daily_metrics() joined trips × alarms in one SELECT, producing one row per (trip, alarm) pair. Every SUM/COUNT over trip columns was multiplied by the per-IMEI alarm count, so spot-checks showed total_trips identical to alarm_count, drive_hours > 1000/day, and distance_km in the tens of thousands per vehicle per day. Migration 08 carried that flawed join forward when fixing the TEXT→INTEGER vehicle_key crash. Rewriting the function so trip_agg and alarm_agg are computed in separate CTEs and then joined on imei restores correct per-vehicle aggregates: total_trips reflects real trip count, drive_hours ≤ 24, alarms are counted once. This bug is being fixed in the same migration file (08) before PR #12 merges; no deploy has applied the prior version, so no second migration is needed. Co-Authored-By: Claude Opus 4.7 --- 08_fix_etl_vehicle_key.sql | 96 +++++++++++++++++++++++++------------- 1 file changed, 63 insertions(+), 33 deletions(-) diff --git a/08_fix_etl_vehicle_key.sql b/08_fix_etl_vehicle_key.sql index c49053b..9a1621a 100644 --- a/08_fix_etl_vehicle_key.sql +++ b/08_fix_etl_vehicle_key.sql @@ -1,15 +1,22 @@ -- 08_fix_etl_vehicle_key.sql --- Fixes BUG-01: dwh_gold.refresh_daily_metrics() inserted t.imei (TEXT) into --- fact_daily_fleet_metrics.vehicle_key (INTEGER REFERENCES dim_vehicles), so --- every nightly call raised "invalid input syntax for type integer". +-- Fixes two distinct bugs in dwh_gold.refresh_daily_metrics(): -- --- The fix has two parts: --- 1. Seed dwh_gold.dim_vehicles from tracksolid.devices so every IMEI has --- a serial vehicle_key to point at. --- 2. Rewrite refresh_daily_metrics() to JOIN through dim_vehicles and --- SELECT the serial key instead of the raw IMEI. The function also --- upserts dim_vehicles at the top of each run so newly-registered --- devices appear in the warehouse without manual intervention. +-- BUG-01a (type crash): the original function inserted t.imei (TEXT) into +-- fact_daily_fleet_metrics.vehicle_key (INTEGER REFERENCES dim_vehicles), +-- so every nightly call raised "invalid input syntax for type integer". +-- +-- BUG-01b (cartesian explosion): the original function joined +-- trips × alarms in a single SELECT. For every trip row it produced one +-- output row per matching alarm, multiplying every SUM/COUNT over trip +-- columns by the per-IMEI alarm count. Spot-checking the broken output +-- showed total_trips identical to alarm_count and drive_hours > 1000/day. +-- +-- The fix has three parts: +-- 1. Seed dwh_gold.dim_vehicles from tracksolid.devices so every IMEI +-- has a serial vehicle_key to point at. +-- 2. Rewrite refresh_daily_metrics() so trip aggregates and alarm +-- aggregates are computed in separate CTEs and then joined on imei. +-- 3. Map IMEI → vehicle_key via dim_vehicles inside the same statement. BEGIN; @@ -43,6 +50,34 @@ BEGIN vehicle_number = EXCLUDED.vehicle_number, is_active = EXCLUDED.is_active; + -- Aggregate trips and alarms in separate CTEs to avoid the cartesian + -- multiplication that the original single-SELECT version produced. + WITH trip_agg AS ( + SELECT + t.imei, + SUM(t.distance_km) AS total_distance_km, + COUNT(*) AS total_trips, + SUM(t.driving_time_s) AS total_drive_seconds, + SUM(t.idle_time_s) AS total_idle_seconds, + SUM(t.fuel_consumed_l) AS fuel_consumed_l, + MIN(t.start_time AT TIME ZONE 'Africa/Nairobi')::TIME AS day_start_time, + MAX(t.end_time AT TIME ZONE 'Africa/Nairobi')::TIME AS day_end_time, + AVG(t.avg_speed_kmh) AS avg_speed_kmh, + MAX(t.max_speed_kmh) AS peak_speed_kmh + FROM tracksolid.trips t + WHERE DATE(t.start_time AT TIME ZONE 'Africa/Nairobi') = target_date + AND t.end_time IS NOT NULL + GROUP BY t.imei + ), + alarm_agg AS ( + SELECT + a.imei, + COUNT(*) AS alarm_count, + COUNT(*) FILTER (WHERE a.alarm_type ILIKE '%speed%') AS overspeed_count + FROM tracksolid.alarms a + WHERE DATE(a.alarm_time AT TIME ZONE 'Africa/Nairobi') = target_date + GROUP BY a.imei + ) INSERT INTO dwh_gold.fact_daily_fleet_metrics ( day, vehicle_key, @@ -59,28 +94,22 @@ BEGIN peak_speed_kmh ) SELECT - target_date AS day, - dv.vehicle_key AS vehicle_key, - ROUND(SUM(t.distance_km)::numeric, 3) AS total_distance_km, - COUNT(*) AS total_trips, - ROUND((SUM(t.driving_time_s) / 3600.0)::numeric, 2) AS total_drive_hours, - ROUND((SUM(t.idle_time_s) / 3600.0)::numeric, 2) AS total_idle_hours, - ROUND(SUM(t.fuel_consumed_l)::numeric, 3) AS fuel_consumed_l, - COUNT(a.id) AS alarm_count, - COUNT(a.id) FILTER (WHERE a.alarm_type ILIKE '%speed%') AS overspeed_count, - MIN(t.start_time AT TIME ZONE 'Africa/Nairobi')::TIME AS day_start_time, - MAX(t.end_time AT TIME ZONE 'Africa/Nairobi')::TIME AS day_end_time, - ROUND(AVG(t.avg_speed_kmh)::numeric, 2) AS avg_speed_kmh, - MAX(t.max_speed_kmh) AS peak_speed_kmh - FROM tracksolid.trips t - JOIN dwh_gold.dim_vehicles dv - ON dv.imei = t.imei - LEFT JOIN tracksolid.alarms a - ON a.imei = t.imei - AND DATE(a.alarm_time AT TIME ZONE 'Africa/Nairobi') = target_date - WHERE DATE(t.start_time AT TIME ZONE 'Africa/Nairobi') = target_date - AND t.end_time IS NOT NULL - GROUP BY dv.vehicle_key + target_date AS day, + dv.vehicle_key AS vehicle_key, + ROUND(tr.total_distance_km::numeric, 3) AS total_distance_km, + tr.total_trips AS total_trips, + ROUND((tr.total_drive_seconds / 3600.0)::numeric, 2) AS total_drive_hours, + ROUND((tr.total_idle_seconds / 3600.0)::numeric, 2) AS total_idle_hours, + ROUND(tr.fuel_consumed_l::numeric, 3) AS fuel_consumed_l, + COALESCE(al.alarm_count, 0) AS alarm_count, + COALESCE(al.overspeed_count, 0) AS overspeed_count, + tr.day_start_time AS day_start_time, + tr.day_end_time AS day_end_time, + ROUND(tr.avg_speed_kmh::numeric, 2) AS avg_speed_kmh, + tr.peak_speed_kmh AS peak_speed_kmh + FROM trip_agg tr + JOIN dwh_gold.dim_vehicles dv ON dv.imei = tr.imei + LEFT JOIN alarm_agg al ON al.imei = tr.imei ON CONFLICT (day, vehicle_key) DO UPDATE SET total_distance_km = EXCLUDED.total_distance_km, total_trips = EXCLUDED.total_trips, @@ -98,7 +127,8 @@ $$; COMMENT ON FUNCTION dwh_gold.refresh_daily_metrics(DATE) IS 'Populates or refreshes fact_daily_fleet_metrics for the given date. ' - 'Joins tracksolid.trips through dwh_gold.dim_vehicles to map IMEI → vehicle_key. ' + 'Trips and alarms are aggregated in separate CTEs to avoid cartesian ' + 'multiplication. Maps IMEI → vehicle_key via dwh_gold.dim_vehicles. ' 'Call nightly: SELECT dwh_gold.refresh_daily_metrics(CURRENT_DATE - 1);'; COMMIT; -- 2.45.2