fix: BUG-01 ETL (type crash + cartesian explosion), BUG-02 multi-account audit, BUG-03 diagnostic #12

Open
kianiadee wants to merge 2 commits from fix/bugs-01-02-03 into main
5 changed files with 253 additions and 9 deletions

134
08_fix_etl_vehicle_key.sql Normal file
View file

@ -0,0 +1,134 @@
-- 08_fix_etl_vehicle_key.sql
-- Fixes two distinct bugs in dwh_gold.refresh_daily_metrics():
--
-- BUG-01a (type crash): the original function inserted t.imei (TEXT) into
-- fact_daily_fleet_metrics.vehicle_key (INTEGER REFERENCES dim_vehicles),
-- so every nightly call raised "invalid input syntax for type integer".
--
-- BUG-01b (cartesian explosion): the original function joined
-- trips × alarms in a single SELECT. For every trip row it produced one
-- output row per matching alarm, multiplying every SUM/COUNT over trip
-- columns by the per-IMEI alarm count. Spot-checking the broken output
-- showed total_trips identical to alarm_count and drive_hours > 1000/day.
--
-- The fix has three parts:
-- 1. Seed dwh_gold.dim_vehicles from tracksolid.devices so every IMEI
-- has a serial vehicle_key to point at.
-- 2. Rewrite refresh_daily_metrics() so trip aggregates and alarm
-- aggregates are computed in separate CTEs and then joined on imei.
-- 3. Map IMEI → vehicle_key via dim_vehicles inside the same statement.
BEGIN;
-- ── 1. Backfill dim_vehicles ─────────────────────────────────────────────────
-- One row per device. is_active mirrors enabled_flag; vehicle_number tracks
-- the plate so dashboards can label charts without joining back to devices.
INSERT INTO dwh_gold.dim_vehicles (imei, vehicle_number, is_active)
SELECT
d.imei,
d.vehicle_number,
COALESCE(d.enabled_flag, 1) = 1
FROM tracksolid.devices d
ON CONFLICT (imei) DO UPDATE SET
vehicle_number = EXCLUDED.vehicle_number,
is_active = EXCLUDED.is_active;
-- ── 2. Rewrite ETL function ──────────────────────────────────────────────────
CREATE OR REPLACE FUNCTION dwh_gold.refresh_daily_metrics(target_date DATE)
RETURNS void LANGUAGE plpgsql AS $$
BEGIN
-- Sync dim_vehicles first so any IMEI seen in trips has a vehicle_key.
-- Without this, a brand-new device would have trip rows but no
-- dim_vehicles entry, and the JOIN below would drop its metrics.
INSERT INTO dwh_gold.dim_vehicles (imei, vehicle_number, is_active)
SELECT
d.imei,
d.vehicle_number,
COALESCE(d.enabled_flag, 1) = 1
FROM tracksolid.devices d
ON CONFLICT (imei) DO UPDATE SET
vehicle_number = EXCLUDED.vehicle_number,
is_active = EXCLUDED.is_active;
-- Aggregate trips and alarms in separate CTEs to avoid the cartesian
-- multiplication that the original single-SELECT version produced.
WITH trip_agg AS (
SELECT
t.imei,
SUM(t.distance_km) AS total_distance_km,
COUNT(*) AS total_trips,
SUM(t.driving_time_s) AS total_drive_seconds,
SUM(t.idle_time_s) AS total_idle_seconds,
SUM(t.fuel_consumed_l) AS fuel_consumed_l,
MIN(t.start_time AT TIME ZONE 'Africa/Nairobi')::TIME AS day_start_time,
MAX(t.end_time AT TIME ZONE 'Africa/Nairobi')::TIME AS day_end_time,
AVG(t.avg_speed_kmh) AS avg_speed_kmh,
MAX(t.max_speed_kmh) AS peak_speed_kmh
FROM tracksolid.trips t
WHERE DATE(t.start_time AT TIME ZONE 'Africa/Nairobi') = target_date
AND t.end_time IS NOT NULL
GROUP BY t.imei
),
alarm_agg AS (
SELECT
a.imei,
COUNT(*) AS alarm_count,
COUNT(*) FILTER (WHERE a.alarm_type ILIKE '%speed%') AS overspeed_count
FROM tracksolid.alarms a
WHERE DATE(a.alarm_time AT TIME ZONE 'Africa/Nairobi') = target_date
GROUP BY a.imei
)
INSERT INTO dwh_gold.fact_daily_fleet_metrics (
day,
vehicle_key,
total_distance_km,
total_trips,
total_drive_hours,
total_idle_hours,
fuel_consumed_l,
alarm_count,
overspeed_count,
day_start_time,
day_end_time,
avg_speed_kmh,
peak_speed_kmh
)
SELECT
target_date AS day,
dv.vehicle_key AS vehicle_key,
ROUND(tr.total_distance_km::numeric, 3) AS total_distance_km,
tr.total_trips AS total_trips,
ROUND((tr.total_drive_seconds / 3600.0)::numeric, 2) AS total_drive_hours,
ROUND((tr.total_idle_seconds / 3600.0)::numeric, 2) AS total_idle_hours,
ROUND(tr.fuel_consumed_l::numeric, 3) AS fuel_consumed_l,
COALESCE(al.alarm_count, 0) AS alarm_count,
COALESCE(al.overspeed_count, 0) AS overspeed_count,
tr.day_start_time AS day_start_time,
tr.day_end_time AS day_end_time,
ROUND(tr.avg_speed_kmh::numeric, 2) AS avg_speed_kmh,
tr.peak_speed_kmh AS peak_speed_kmh
FROM trip_agg tr
JOIN dwh_gold.dim_vehicles dv ON dv.imei = tr.imei
LEFT JOIN alarm_agg al ON al.imei = tr.imei
ON CONFLICT (day, vehicle_key) DO UPDATE SET
total_distance_km = EXCLUDED.total_distance_km,
total_trips = EXCLUDED.total_trips,
total_drive_hours = EXCLUDED.total_drive_hours,
total_idle_hours = EXCLUDED.total_idle_hours,
fuel_consumed_l = EXCLUDED.fuel_consumed_l,
alarm_count = EXCLUDED.alarm_count,
overspeed_count = EXCLUDED.overspeed_count,
day_start_time = EXCLUDED.day_start_time,
day_end_time = EXCLUDED.day_end_time,
avg_speed_kmh = EXCLUDED.avg_speed_kmh,
peak_speed_kmh = EXCLUDED.peak_speed_kmh;
END;
$$;
COMMENT ON FUNCTION dwh_gold.refresh_daily_metrics(DATE)
IS 'Populates or refreshes fact_daily_fleet_metrics for the given date. '
'Trips and alarms are aggregated in separate CTEs to avoid cartesian '
'multiplication. Maps IMEI → vehicle_key via dwh_gold.dim_vehicles. '
'Call nightly: SELECT dwh_gold.refresh_daily_metrics(CURRENT_DATE - 1);';
COMMIT;

View file

@ -0,0 +1,85 @@
-- BUG-03 diagnostic: is the webhook `miles` field actually miles, or km?
--
-- Context: ingest_movement_rev.poll_trips divides the API `distance` field by
-- 1000 (FIX-M16: API returns metres despite docs saying km). The webhook
-- handler in webhook_receiver_rev.push_trip_report stores `item.get("miles")`
-- directly into distance_km with no unit conversion. The field name is
-- suspicious. If Jimi actually pushes miles, every push-sourced trip distance
-- is inflated by ≈ 1.609×.
--
-- We cannot diff push vs poll on the SAME (imei, start_time) row because both
-- paths ON CONFLICT DO UPDATE distance_km — the later writer overwrites the
-- earlier one. So instead, this script compares the *distribution* of the
-- distance_km / great_circle_km ratio for push-source vs poll-source trips
-- across the last 30 days.
--
-- A driven trip on real roads should have road_km > great_circle_km
-- (ratio > 1). If push and poll come from the same fleet on the same roads,
-- their median ratios should be roughly equal. If push's median ratio is
-- ≈ 1.609× higher than poll's, the `miles` field is miles and the webhook
-- needs to divide by 1.609 (or by 1000 if Jimi is actually sending metres
-- like the polling endpoint does — the ratio test catches both).
--
-- Run:
-- docker exec -i $DB psql -U postgres -d tracksolid_db \
-- < db_audit/checks/bug03_webhook_distance_units.sql
-- ── 1. Per-trip ratio of stored distance to start→end great-circle distance.
WITH trip_ratios AS (
SELECT
t.source,
t.imei,
t.start_time,
t.distance_km,
ST_Distance(t.start_geom::geography, t.end_geom::geography) / 1000.0
AS great_circle_km,
CASE
WHEN ST_Distance(t.start_geom::geography, t.end_geom::geography) > 0
THEN t.distance_km
/ (ST_Distance(t.start_geom::geography, t.end_geom::geography) / 1000.0)
ELSE NULL
END AS ratio
FROM tracksolid.trips t
WHERE t.start_time > NOW() - INTERVAL '30 days'
AND t.start_geom IS NOT NULL
AND t.end_geom IS NOT NULL
AND t.distance_km IS NOT NULL
AND t.distance_km > 0.1 -- filter out idle creep
AND ST_Distance(t.start_geom::geography,
t.end_geom::geography) > 200 -- ignore round-trips / parking
)
SELECT
source,
COUNT(*) AS trips,
ROUND(AVG(ratio)::numeric, 3) AS avg_ratio,
ROUND((percentile_cont(0.5) WITHIN GROUP (ORDER BY ratio))::numeric, 3)
AS median_ratio,
ROUND(MIN(ratio)::numeric, 3) AS min_ratio,
ROUND(MAX(ratio)::numeric, 3) AS max_ratio
FROM trip_ratios
GROUP BY source
ORDER BY source;
-- ── 2. Side-by-side sample of recent push vs poll trips for spot-checking.
-- If push distance_km is consistently ≈ 1.609× the great_circle for
-- short, near-straight trips while poll trips show smaller ratios,
-- the unit conversion is needed.
SELECT
source,
imei,
start_time,
ROUND(distance_km::numeric, 2) AS stored_km,
ROUND((
ST_Distance(start_geom::geography, end_geom::geography) / 1000.0
)::numeric, 2) AS straight_line_km,
ROUND((
distance_km
/ NULLIF(ST_Distance(start_geom::geography, end_geom::geography) / 1000.0, 0)
)::numeric, 2) AS ratio
FROM tracksolid.trips
WHERE start_time > NOW() - INTERVAL '7 days'
AND start_geom IS NOT NULL
AND end_geom IS NOT NULL
AND distance_km > 0.5
ORDER BY start_time DESC
LIMIT 40;

View file

@ -31,6 +31,7 @@ MIGRATIONS = [
"05_enhancement_migration.sql", # new tables, OBD columns, dwh_gold expansion
"06_business_analytics_migration.sql", # ops schema, dispatch_log, assigned_city
"07_analytics_views.sql", # Grafana-facing views in tracksolid.*
"08_fix_etl_vehicle_key.sql", # BUG-01: ETL TEXT→INTEGER mismatch on vehicle_key
]
# ── Tables that must exist before the service is allowed to start ─────────────

View file

@ -15,7 +15,7 @@ import time
from concurrent.futures import ThreadPoolExecutor
from ts_shared_rev import (
TARGET_ACCOUNT,
TARGETS,
api_post,
get_conn,
get_token,
@ -37,14 +37,31 @@ def run_audit():
log.error("Could not obtain API token. Check credentials.")
return
# 1. Fetch all devices from API
resp = api_post("jimi.user.device.list", {"target": TARGET_ACCOUNT}, token)
if resp.get("code") != 0:
log.error("API error: %s", resp)
return
# 1. Fetch all devices from API across every configured sub-account.
# [FIX-M19] The fleet spans multiple Tracksolid sub-accounts (e.g.
# fireside, Fireside@HQ, Fireside_MSA). Iterate TARGETS and dedupe by
# IMEI, keeping the most recently-seen record. Without this, devices
# registered under a non-primary sub-account never appear in the audit
# and never get upserted.
devices_by_imei: dict[str, dict] = {}
for target in TARGETS:
resp = api_post("jimi.user.device.list", {"target": target}, token)
if resp.get("code") != 0:
log.warning("device.list failed for target=%s: code=%s msg=%s",
target, resp.get("code"), resp.get("message"))
continue
for d in (resp.get("result") or []):
imei = d.get("imei")
if imei:
devices_by_imei[imei] = d
log.info(" target=%s returned %d devices", target, len(resp.get("result") or []))
api_devices = resp.get("result") or []
log.info("API returned %d devices.", len(api_devices))
api_devices = list(devices_by_imei.values())
if not api_devices:
log.error("No devices returned from any target. Aborting.")
return
log.info("API returned %d unique devices across %d target(s).",
len(api_devices), len(TARGETS))
# 2. Fetch current DB state
with get_conn() as conn:

View file

@ -543,7 +543,14 @@ async def push_trip_report(request: Request):
cur.execute("RELEASE SAVEPOINT sp")
continue
# [FIX-M11] API sends km. Store directly as distance_km.
# [BUG-03 — under investigation] The field is named `miles`
# but FIX-M11 claims it is already km. The polling endpoint
# was found to return metres despite docs (FIX-M16), so the
# webhook unit is suspect. Until the diagnostic
# db_audit/checks/bug03_webhook_distance_units.sql confirms
# the unit, store the raw value unchanged and rely on the
# diagnostic's ratio test to decide whether a /1.609 or
# /1000 conversion is needed.
distance_km = clean_num(item.get("miles"))
begin_lat = clean_num(item.get("beginLat"))