fix: BUG-01 ETL (type crash + cartesian explosion), BUG-02 multi-account audit, BUG-03 diagnostic #12
5 changed files with 253 additions and 9 deletions
134
08_fix_etl_vehicle_key.sql
Normal file
134
08_fix_etl_vehicle_key.sql
Normal file
|
|
@ -0,0 +1,134 @@
|
|||
-- 08_fix_etl_vehicle_key.sql
|
||||
-- Fixes two distinct bugs in dwh_gold.refresh_daily_metrics():
|
||||
--
|
||||
-- BUG-01a (type crash): the original function inserted t.imei (TEXT) into
|
||||
-- fact_daily_fleet_metrics.vehicle_key (INTEGER REFERENCES dim_vehicles),
|
||||
-- so every nightly call raised "invalid input syntax for type integer".
|
||||
--
|
||||
-- BUG-01b (cartesian explosion): the original function joined
|
||||
-- trips × alarms in a single SELECT. For every trip row it produced one
|
||||
-- output row per matching alarm, multiplying every SUM/COUNT over trip
|
||||
-- columns by the per-IMEI alarm count. Spot-checking the broken output
|
||||
-- showed total_trips identical to alarm_count and drive_hours > 1000/day.
|
||||
--
|
||||
-- The fix has three parts:
|
||||
-- 1. Seed dwh_gold.dim_vehicles from tracksolid.devices so every IMEI
|
||||
-- has a serial vehicle_key to point at.
|
||||
-- 2. Rewrite refresh_daily_metrics() so trip aggregates and alarm
|
||||
-- aggregates are computed in separate CTEs and then joined on imei.
|
||||
-- 3. Map IMEI → vehicle_key via dim_vehicles inside the same statement.
|
||||
|
||||
BEGIN;
|
||||
|
||||
-- ── 1. Backfill dim_vehicles ─────────────────────────────────────────────────
|
||||
-- One row per device. is_active mirrors enabled_flag; vehicle_number tracks
|
||||
-- the plate so dashboards can label charts without joining back to devices.
|
||||
INSERT INTO dwh_gold.dim_vehicles (imei, vehicle_number, is_active)
|
||||
SELECT
|
||||
d.imei,
|
||||
d.vehicle_number,
|
||||
COALESCE(d.enabled_flag, 1) = 1
|
||||
FROM tracksolid.devices d
|
||||
ON CONFLICT (imei) DO UPDATE SET
|
||||
vehicle_number = EXCLUDED.vehicle_number,
|
||||
is_active = EXCLUDED.is_active;
|
||||
|
||||
-- ── 2. Rewrite ETL function ──────────────────────────────────────────────────
|
||||
CREATE OR REPLACE FUNCTION dwh_gold.refresh_daily_metrics(target_date DATE)
|
||||
RETURNS void LANGUAGE plpgsql AS $$
|
||||
BEGIN
|
||||
-- Sync dim_vehicles first so any IMEI seen in trips has a vehicle_key.
|
||||
-- Without this, a brand-new device would have trip rows but no
|
||||
-- dim_vehicles entry, and the JOIN below would drop its metrics.
|
||||
INSERT INTO dwh_gold.dim_vehicles (imei, vehicle_number, is_active)
|
||||
SELECT
|
||||
d.imei,
|
||||
d.vehicle_number,
|
||||
COALESCE(d.enabled_flag, 1) = 1
|
||||
FROM tracksolid.devices d
|
||||
ON CONFLICT (imei) DO UPDATE SET
|
||||
vehicle_number = EXCLUDED.vehicle_number,
|
||||
is_active = EXCLUDED.is_active;
|
||||
|
||||
-- Aggregate trips and alarms in separate CTEs to avoid the cartesian
|
||||
-- multiplication that the original single-SELECT version produced.
|
||||
WITH trip_agg AS (
|
||||
SELECT
|
||||
t.imei,
|
||||
SUM(t.distance_km) AS total_distance_km,
|
||||
COUNT(*) AS total_trips,
|
||||
SUM(t.driving_time_s) AS total_drive_seconds,
|
||||
SUM(t.idle_time_s) AS total_idle_seconds,
|
||||
SUM(t.fuel_consumed_l) AS fuel_consumed_l,
|
||||
MIN(t.start_time AT TIME ZONE 'Africa/Nairobi')::TIME AS day_start_time,
|
||||
MAX(t.end_time AT TIME ZONE 'Africa/Nairobi')::TIME AS day_end_time,
|
||||
AVG(t.avg_speed_kmh) AS avg_speed_kmh,
|
||||
MAX(t.max_speed_kmh) AS peak_speed_kmh
|
||||
FROM tracksolid.trips t
|
||||
WHERE DATE(t.start_time AT TIME ZONE 'Africa/Nairobi') = target_date
|
||||
AND t.end_time IS NOT NULL
|
||||
GROUP BY t.imei
|
||||
),
|
||||
alarm_agg AS (
|
||||
SELECT
|
||||
a.imei,
|
||||
COUNT(*) AS alarm_count,
|
||||
COUNT(*) FILTER (WHERE a.alarm_type ILIKE '%speed%') AS overspeed_count
|
||||
FROM tracksolid.alarms a
|
||||
WHERE DATE(a.alarm_time AT TIME ZONE 'Africa/Nairobi') = target_date
|
||||
GROUP BY a.imei
|
||||
)
|
||||
INSERT INTO dwh_gold.fact_daily_fleet_metrics (
|
||||
day,
|
||||
vehicle_key,
|
||||
total_distance_km,
|
||||
total_trips,
|
||||
total_drive_hours,
|
||||
total_idle_hours,
|
||||
fuel_consumed_l,
|
||||
alarm_count,
|
||||
overspeed_count,
|
||||
day_start_time,
|
||||
day_end_time,
|
||||
avg_speed_kmh,
|
||||
peak_speed_kmh
|
||||
)
|
||||
SELECT
|
||||
target_date AS day,
|
||||
dv.vehicle_key AS vehicle_key,
|
||||
ROUND(tr.total_distance_km::numeric, 3) AS total_distance_km,
|
||||
tr.total_trips AS total_trips,
|
||||
ROUND((tr.total_drive_seconds / 3600.0)::numeric, 2) AS total_drive_hours,
|
||||
ROUND((tr.total_idle_seconds / 3600.0)::numeric, 2) AS total_idle_hours,
|
||||
ROUND(tr.fuel_consumed_l::numeric, 3) AS fuel_consumed_l,
|
||||
COALESCE(al.alarm_count, 0) AS alarm_count,
|
||||
COALESCE(al.overspeed_count, 0) AS overspeed_count,
|
||||
tr.day_start_time AS day_start_time,
|
||||
tr.day_end_time AS day_end_time,
|
||||
ROUND(tr.avg_speed_kmh::numeric, 2) AS avg_speed_kmh,
|
||||
tr.peak_speed_kmh AS peak_speed_kmh
|
||||
FROM trip_agg tr
|
||||
JOIN dwh_gold.dim_vehicles dv ON dv.imei = tr.imei
|
||||
LEFT JOIN alarm_agg al ON al.imei = tr.imei
|
||||
ON CONFLICT (day, vehicle_key) DO UPDATE SET
|
||||
total_distance_km = EXCLUDED.total_distance_km,
|
||||
total_trips = EXCLUDED.total_trips,
|
||||
total_drive_hours = EXCLUDED.total_drive_hours,
|
||||
total_idle_hours = EXCLUDED.total_idle_hours,
|
||||
fuel_consumed_l = EXCLUDED.fuel_consumed_l,
|
||||
alarm_count = EXCLUDED.alarm_count,
|
||||
overspeed_count = EXCLUDED.overspeed_count,
|
||||
day_start_time = EXCLUDED.day_start_time,
|
||||
day_end_time = EXCLUDED.day_end_time,
|
||||
avg_speed_kmh = EXCLUDED.avg_speed_kmh,
|
||||
peak_speed_kmh = EXCLUDED.peak_speed_kmh;
|
||||
END;
|
||||
$$;
|
||||
|
||||
COMMENT ON FUNCTION dwh_gold.refresh_daily_metrics(DATE)
|
||||
IS 'Populates or refreshes fact_daily_fleet_metrics for the given date. '
|
||||
'Trips and alarms are aggregated in separate CTEs to avoid cartesian '
|
||||
'multiplication. Maps IMEI → vehicle_key via dwh_gold.dim_vehicles. '
|
||||
'Call nightly: SELECT dwh_gold.refresh_daily_metrics(CURRENT_DATE - 1);';
|
||||
|
||||
COMMIT;
|
||||
85
db_audit/checks/bug03_webhook_distance_units.sql
Normal file
85
db_audit/checks/bug03_webhook_distance_units.sql
Normal file
|
|
@ -0,0 +1,85 @@
|
|||
-- BUG-03 diagnostic: is the webhook `miles` field actually miles, or km?
|
||||
--
|
||||
-- Context: ingest_movement_rev.poll_trips divides the API `distance` field by
|
||||
-- 1000 (FIX-M16: API returns metres despite docs saying km). The webhook
|
||||
-- handler in webhook_receiver_rev.push_trip_report stores `item.get("miles")`
|
||||
-- directly into distance_km with no unit conversion. The field name is
|
||||
-- suspicious. If Jimi actually pushes miles, every push-sourced trip distance
|
||||
-- is inflated by ≈ 1.609×.
|
||||
--
|
||||
-- We cannot diff push vs poll on the SAME (imei, start_time) row because both
|
||||
-- paths ON CONFLICT DO UPDATE distance_km — the later writer overwrites the
|
||||
-- earlier one. So instead, this script compares the *distribution* of the
|
||||
-- distance_km / great_circle_km ratio for push-source vs poll-source trips
|
||||
-- across the last 30 days.
|
||||
--
|
||||
-- A driven trip on real roads should have road_km > great_circle_km
|
||||
-- (ratio > 1). If push and poll come from the same fleet on the same roads,
|
||||
-- their median ratios should be roughly equal. If push's median ratio is
|
||||
-- ≈ 1.609× higher than poll's, the `miles` field is miles and the webhook
|
||||
-- needs to divide by 1.609 (or by 1000 if Jimi is actually sending metres
|
||||
-- like the polling endpoint does — the ratio test catches both).
|
||||
--
|
||||
-- Run:
|
||||
-- docker exec -i $DB psql -U postgres -d tracksolid_db \
|
||||
-- < db_audit/checks/bug03_webhook_distance_units.sql
|
||||
|
||||
-- ── 1. Per-trip ratio of stored distance to start→end great-circle distance.
|
||||
WITH trip_ratios AS (
|
||||
SELECT
|
||||
t.source,
|
||||
t.imei,
|
||||
t.start_time,
|
||||
t.distance_km,
|
||||
ST_Distance(t.start_geom::geography, t.end_geom::geography) / 1000.0
|
||||
AS great_circle_km,
|
||||
CASE
|
||||
WHEN ST_Distance(t.start_geom::geography, t.end_geom::geography) > 0
|
||||
THEN t.distance_km
|
||||
/ (ST_Distance(t.start_geom::geography, t.end_geom::geography) / 1000.0)
|
||||
ELSE NULL
|
||||
END AS ratio
|
||||
FROM tracksolid.trips t
|
||||
WHERE t.start_time > NOW() - INTERVAL '30 days'
|
||||
AND t.start_geom IS NOT NULL
|
||||
AND t.end_geom IS NOT NULL
|
||||
AND t.distance_km IS NOT NULL
|
||||
AND t.distance_km > 0.1 -- filter out idle creep
|
||||
AND ST_Distance(t.start_geom::geography,
|
||||
t.end_geom::geography) > 200 -- ignore round-trips / parking
|
||||
)
|
||||
SELECT
|
||||
source,
|
||||
COUNT(*) AS trips,
|
||||
ROUND(AVG(ratio)::numeric, 3) AS avg_ratio,
|
||||
ROUND((percentile_cont(0.5) WITHIN GROUP (ORDER BY ratio))::numeric, 3)
|
||||
AS median_ratio,
|
||||
ROUND(MIN(ratio)::numeric, 3) AS min_ratio,
|
||||
ROUND(MAX(ratio)::numeric, 3) AS max_ratio
|
||||
FROM trip_ratios
|
||||
GROUP BY source
|
||||
ORDER BY source;
|
||||
|
||||
-- ── 2. Side-by-side sample of recent push vs poll trips for spot-checking.
|
||||
-- If push distance_km is consistently ≈ 1.609× the great_circle for
|
||||
-- short, near-straight trips while poll trips show smaller ratios,
|
||||
-- the unit conversion is needed.
|
||||
SELECT
|
||||
source,
|
||||
imei,
|
||||
start_time,
|
||||
ROUND(distance_km::numeric, 2) AS stored_km,
|
||||
ROUND((
|
||||
ST_Distance(start_geom::geography, end_geom::geography) / 1000.0
|
||||
)::numeric, 2) AS straight_line_km,
|
||||
ROUND((
|
||||
distance_km
|
||||
/ NULLIF(ST_Distance(start_geom::geography, end_geom::geography) / 1000.0, 0)
|
||||
)::numeric, 2) AS ratio
|
||||
FROM tracksolid.trips
|
||||
WHERE start_time > NOW() - INTERVAL '7 days'
|
||||
AND start_geom IS NOT NULL
|
||||
AND end_geom IS NOT NULL
|
||||
AND distance_km > 0.5
|
||||
ORDER BY start_time DESC
|
||||
LIMIT 40;
|
||||
|
|
@ -31,6 +31,7 @@ MIGRATIONS = [
|
|||
"05_enhancement_migration.sql", # new tables, OBD columns, dwh_gold expansion
|
||||
"06_business_analytics_migration.sql", # ops schema, dispatch_log, assigned_city
|
||||
"07_analytics_views.sql", # Grafana-facing views in tracksolid.*
|
||||
"08_fix_etl_vehicle_key.sql", # BUG-01: ETL TEXT→INTEGER mismatch on vehicle_key
|
||||
]
|
||||
|
||||
# ── Tables that must exist before the service is allowed to start ─────────────
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@ import time
|
|||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
from ts_shared_rev import (
|
||||
TARGET_ACCOUNT,
|
||||
TARGETS,
|
||||
api_post,
|
||||
get_conn,
|
||||
get_token,
|
||||
|
|
@ -37,14 +37,31 @@ def run_audit():
|
|||
log.error("Could not obtain API token. Check credentials.")
|
||||
return
|
||||
|
||||
# 1. Fetch all devices from API
|
||||
resp = api_post("jimi.user.device.list", {"target": TARGET_ACCOUNT}, token)
|
||||
if resp.get("code") != 0:
|
||||
log.error("API error: %s", resp)
|
||||
return
|
||||
# 1. Fetch all devices from API across every configured sub-account.
|
||||
# [FIX-M19] The fleet spans multiple Tracksolid sub-accounts (e.g.
|
||||
# fireside, Fireside@HQ, Fireside_MSA). Iterate TARGETS and dedupe by
|
||||
# IMEI, keeping the most recently-seen record. Without this, devices
|
||||
# registered under a non-primary sub-account never appear in the audit
|
||||
# and never get upserted.
|
||||
devices_by_imei: dict[str, dict] = {}
|
||||
for target in TARGETS:
|
||||
resp = api_post("jimi.user.device.list", {"target": target}, token)
|
||||
if resp.get("code") != 0:
|
||||
log.warning("device.list failed for target=%s: code=%s msg=%s",
|
||||
target, resp.get("code"), resp.get("message"))
|
||||
continue
|
||||
for d in (resp.get("result") or []):
|
||||
imei = d.get("imei")
|
||||
if imei:
|
||||
devices_by_imei[imei] = d
|
||||
log.info(" target=%s returned %d devices", target, len(resp.get("result") or []))
|
||||
|
||||
api_devices = resp.get("result") or []
|
||||
log.info("API returned %d devices.", len(api_devices))
|
||||
api_devices = list(devices_by_imei.values())
|
||||
if not api_devices:
|
||||
log.error("No devices returned from any target. Aborting.")
|
||||
return
|
||||
log.info("API returned %d unique devices across %d target(s).",
|
||||
len(api_devices), len(TARGETS))
|
||||
|
||||
# 2. Fetch current DB state
|
||||
with get_conn() as conn:
|
||||
|
|
|
|||
|
|
@ -543,7 +543,14 @@ async def push_trip_report(request: Request):
|
|||
cur.execute("RELEASE SAVEPOINT sp")
|
||||
continue
|
||||
|
||||
# [FIX-M11] API sends km. Store directly as distance_km.
|
||||
# [BUG-03 — under investigation] The field is named `miles`
|
||||
# but FIX-M11 claims it is already km. The polling endpoint
|
||||
# was found to return metres despite docs (FIX-M16), so the
|
||||
# webhook unit is suspect. Until the diagnostic
|
||||
# db_audit/checks/bug03_webhook_distance_units.sql confirms
|
||||
# the unit, store the raw value unchanged and rely on the
|
||||
# diagnostic's ratio test to decide whether a /1.609 or
|
||||
# /1000 conversion is needed.
|
||||
distance_km = clean_num(item.get("miles"))
|
||||
|
||||
begin_lat = clean_num(item.get("beginLat"))
|
||||
|
|
|
|||
Loading…
Reference in a new issue