fix: BUG-01 ETL type crash, BUG-02 multi-account audit, BUG-03 diagnostic
BUG-01 (CRITICAL): dwh_gold.refresh_daily_metrics inserted t.imei (TEXT) into fact_daily_fleet_metrics.vehicle_key (INTEGER REFERENCES dim_vehicles), so the nightly ETL would have raised "invalid input syntax for type integer" on every run. Migration 08 backfills dim_vehicles from tracksolid.devices and rewrites the function to JOIN through dim_vehicles, returning the serial vehicle_key. The function also re-syncs dim_vehicles at the top of each call so newly registered devices appear in the warehouse without manual seeding. BUG-02 (HIGH): sync_driver_audit.py only queried TARGET_ACCOUNT, ignoring the Fireside@HQ and Fireside_MSA sub-accounts. The audit now iterates TARGETS (matching FIX-M19 in ingest_movement_rev.sync_devices), dedupes devices by IMEI, and tolerates per-target failures. BUG-03 (HIGH, diagnostic only): the webhook trip handler stores item["miles"] straight into distance_km. The field name is suspicious and FIX-M16 already proved the polling endpoint mis-documents its units. Added a SQL diagnostic that compares the distribution of stored-km / great-circle-km for push-source vs poll-source trips over 30 days — the ratio test will tell us whether the push value needs a /1.609 (miles), /1000 (metres), or no conversion. The existing calculation is left unchanged until the data confirms the unit; the old FIX-M11 comment is replaced with a BUG-03 pointer to the diagnostic. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
parent
dc6404a114
commit
8d386bf27a
5 changed files with 223 additions and 9 deletions
104
08_fix_etl_vehicle_key.sql
Normal file
104
08_fix_etl_vehicle_key.sql
Normal file
|
|
@ -0,0 +1,104 @@
|
||||||
|
-- 08_fix_etl_vehicle_key.sql
|
||||||
|
-- Fixes BUG-01: dwh_gold.refresh_daily_metrics() inserted t.imei (TEXT) into
|
||||||
|
-- fact_daily_fleet_metrics.vehicle_key (INTEGER REFERENCES dim_vehicles), so
|
||||||
|
-- every nightly call raised "invalid input syntax for type integer".
|
||||||
|
--
|
||||||
|
-- The fix has two parts:
|
||||||
|
-- 1. Seed dwh_gold.dim_vehicles from tracksolid.devices so every IMEI has
|
||||||
|
-- a serial vehicle_key to point at.
|
||||||
|
-- 2. Rewrite refresh_daily_metrics() to JOIN through dim_vehicles and
|
||||||
|
-- SELECT the serial key instead of the raw IMEI. The function also
|
||||||
|
-- upserts dim_vehicles at the top of each run so newly-registered
|
||||||
|
-- devices appear in the warehouse without manual intervention.
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
-- ── 1. Backfill dim_vehicles ─────────────────────────────────────────────────
|
||||||
|
-- One row per device. is_active mirrors enabled_flag; vehicle_number tracks
|
||||||
|
-- the plate so dashboards can label charts without joining back to devices.
|
||||||
|
INSERT INTO dwh_gold.dim_vehicles (imei, vehicle_number, is_active)
|
||||||
|
SELECT
|
||||||
|
d.imei,
|
||||||
|
d.vehicle_number,
|
||||||
|
COALESCE(d.enabled_flag, 1) = 1
|
||||||
|
FROM tracksolid.devices d
|
||||||
|
ON CONFLICT (imei) DO UPDATE SET
|
||||||
|
vehicle_number = EXCLUDED.vehicle_number,
|
||||||
|
is_active = EXCLUDED.is_active;
|
||||||
|
|
||||||
|
-- ── 2. Rewrite ETL function ──────────────────────────────────────────────────
|
||||||
|
CREATE OR REPLACE FUNCTION dwh_gold.refresh_daily_metrics(target_date DATE)
|
||||||
|
RETURNS void LANGUAGE plpgsql AS $$
|
||||||
|
BEGIN
|
||||||
|
-- Sync dim_vehicles first so any IMEI seen in trips has a vehicle_key.
|
||||||
|
-- Without this, a brand-new device would have trip rows but no
|
||||||
|
-- dim_vehicles entry, and the JOIN below would drop its metrics.
|
||||||
|
INSERT INTO dwh_gold.dim_vehicles (imei, vehicle_number, is_active)
|
||||||
|
SELECT
|
||||||
|
d.imei,
|
||||||
|
d.vehicle_number,
|
||||||
|
COALESCE(d.enabled_flag, 1) = 1
|
||||||
|
FROM tracksolid.devices d
|
||||||
|
ON CONFLICT (imei) DO UPDATE SET
|
||||||
|
vehicle_number = EXCLUDED.vehicle_number,
|
||||||
|
is_active = EXCLUDED.is_active;
|
||||||
|
|
||||||
|
INSERT INTO dwh_gold.fact_daily_fleet_metrics (
|
||||||
|
day,
|
||||||
|
vehicle_key,
|
||||||
|
total_distance_km,
|
||||||
|
total_trips,
|
||||||
|
total_drive_hours,
|
||||||
|
total_idle_hours,
|
||||||
|
fuel_consumed_l,
|
||||||
|
alarm_count,
|
||||||
|
overspeed_count,
|
||||||
|
day_start_time,
|
||||||
|
day_end_time,
|
||||||
|
avg_speed_kmh,
|
||||||
|
peak_speed_kmh
|
||||||
|
)
|
||||||
|
SELECT
|
||||||
|
target_date AS day,
|
||||||
|
dv.vehicle_key AS vehicle_key,
|
||||||
|
ROUND(SUM(t.distance_km)::numeric, 3) AS total_distance_km,
|
||||||
|
COUNT(*) AS total_trips,
|
||||||
|
ROUND((SUM(t.driving_time_s) / 3600.0)::numeric, 2) AS total_drive_hours,
|
||||||
|
ROUND((SUM(t.idle_time_s) / 3600.0)::numeric, 2) AS total_idle_hours,
|
||||||
|
ROUND(SUM(t.fuel_consumed_l)::numeric, 3) AS fuel_consumed_l,
|
||||||
|
COUNT(a.id) AS alarm_count,
|
||||||
|
COUNT(a.id) FILTER (WHERE a.alarm_type ILIKE '%speed%') AS overspeed_count,
|
||||||
|
MIN(t.start_time AT TIME ZONE 'Africa/Nairobi')::TIME AS day_start_time,
|
||||||
|
MAX(t.end_time AT TIME ZONE 'Africa/Nairobi')::TIME AS day_end_time,
|
||||||
|
ROUND(AVG(t.avg_speed_kmh)::numeric, 2) AS avg_speed_kmh,
|
||||||
|
MAX(t.max_speed_kmh) AS peak_speed_kmh
|
||||||
|
FROM tracksolid.trips t
|
||||||
|
JOIN dwh_gold.dim_vehicles dv
|
||||||
|
ON dv.imei = t.imei
|
||||||
|
LEFT JOIN tracksolid.alarms a
|
||||||
|
ON a.imei = t.imei
|
||||||
|
AND DATE(a.alarm_time AT TIME ZONE 'Africa/Nairobi') = target_date
|
||||||
|
WHERE DATE(t.start_time AT TIME ZONE 'Africa/Nairobi') = target_date
|
||||||
|
AND t.end_time IS NOT NULL
|
||||||
|
GROUP BY dv.vehicle_key
|
||||||
|
ON CONFLICT (day, vehicle_key) DO UPDATE SET
|
||||||
|
total_distance_km = EXCLUDED.total_distance_km,
|
||||||
|
total_trips = EXCLUDED.total_trips,
|
||||||
|
total_drive_hours = EXCLUDED.total_drive_hours,
|
||||||
|
total_idle_hours = EXCLUDED.total_idle_hours,
|
||||||
|
fuel_consumed_l = EXCLUDED.fuel_consumed_l,
|
||||||
|
alarm_count = EXCLUDED.alarm_count,
|
||||||
|
overspeed_count = EXCLUDED.overspeed_count,
|
||||||
|
day_start_time = EXCLUDED.day_start_time,
|
||||||
|
day_end_time = EXCLUDED.day_end_time,
|
||||||
|
avg_speed_kmh = EXCLUDED.avg_speed_kmh,
|
||||||
|
peak_speed_kmh = EXCLUDED.peak_speed_kmh;
|
||||||
|
END;
|
||||||
|
$$;
|
||||||
|
|
||||||
|
COMMENT ON FUNCTION dwh_gold.refresh_daily_metrics(DATE)
|
||||||
|
IS 'Populates or refreshes fact_daily_fleet_metrics for the given date. '
|
||||||
|
'Joins tracksolid.trips through dwh_gold.dim_vehicles to map IMEI → vehicle_key. '
|
||||||
|
'Call nightly: SELECT dwh_gold.refresh_daily_metrics(CURRENT_DATE - 1);';
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
85
db_audit/checks/bug03_webhook_distance_units.sql
Normal file
85
db_audit/checks/bug03_webhook_distance_units.sql
Normal file
|
|
@ -0,0 +1,85 @@
|
||||||
|
-- BUG-03 diagnostic: is the webhook `miles` field actually miles, or km?
|
||||||
|
--
|
||||||
|
-- Context: ingest_movement_rev.poll_trips divides the API `distance` field by
|
||||||
|
-- 1000 (FIX-M16: API returns metres despite docs saying km). The webhook
|
||||||
|
-- handler in webhook_receiver_rev.push_trip_report stores `item.get("miles")`
|
||||||
|
-- directly into distance_km with no unit conversion. The field name is
|
||||||
|
-- suspicious. If Jimi actually pushes miles, every push-sourced trip distance
|
||||||
|
-- is inflated by ≈ 1.609×.
|
||||||
|
--
|
||||||
|
-- We cannot diff push vs poll on the SAME (imei, start_time) row because both
|
||||||
|
-- paths ON CONFLICT DO UPDATE distance_km — the later writer overwrites the
|
||||||
|
-- earlier one. So instead, this script compares the *distribution* of the
|
||||||
|
-- distance_km / great_circle_km ratio for push-source vs poll-source trips
|
||||||
|
-- across the last 30 days.
|
||||||
|
--
|
||||||
|
-- A driven trip on real roads should have road_km > great_circle_km
|
||||||
|
-- (ratio > 1). If push and poll come from the same fleet on the same roads,
|
||||||
|
-- their median ratios should be roughly equal. If push's median ratio is
|
||||||
|
-- ≈ 1.609× higher than poll's, the `miles` field is miles and the webhook
|
||||||
|
-- needs to divide by 1.609 (or by 1000 if Jimi is actually sending metres
|
||||||
|
-- like the polling endpoint does — the ratio test catches both).
|
||||||
|
--
|
||||||
|
-- Run:
|
||||||
|
-- docker exec -i $DB psql -U postgres -d tracksolid_db \
|
||||||
|
-- < db_audit/checks/bug03_webhook_distance_units.sql
|
||||||
|
|
||||||
|
-- ── 1. Per-trip ratio of stored distance to start→end great-circle distance.
|
||||||
|
WITH trip_ratios AS (
|
||||||
|
SELECT
|
||||||
|
t.source,
|
||||||
|
t.imei,
|
||||||
|
t.start_time,
|
||||||
|
t.distance_km,
|
||||||
|
ST_Distance(t.start_geom::geography, t.end_geom::geography) / 1000.0
|
||||||
|
AS great_circle_km,
|
||||||
|
CASE
|
||||||
|
WHEN ST_Distance(t.start_geom::geography, t.end_geom::geography) > 0
|
||||||
|
THEN t.distance_km
|
||||||
|
/ (ST_Distance(t.start_geom::geography, t.end_geom::geography) / 1000.0)
|
||||||
|
ELSE NULL
|
||||||
|
END AS ratio
|
||||||
|
FROM tracksolid.trips t
|
||||||
|
WHERE t.start_time > NOW() - INTERVAL '30 days'
|
||||||
|
AND t.start_geom IS NOT NULL
|
||||||
|
AND t.end_geom IS NOT NULL
|
||||||
|
AND t.distance_km IS NOT NULL
|
||||||
|
AND t.distance_km > 0.1 -- filter out idle creep
|
||||||
|
AND ST_Distance(t.start_geom::geography,
|
||||||
|
t.end_geom::geography) > 200 -- ignore round-trips / parking
|
||||||
|
)
|
||||||
|
SELECT
|
||||||
|
source,
|
||||||
|
COUNT(*) AS trips,
|
||||||
|
ROUND(AVG(ratio)::numeric, 3) AS avg_ratio,
|
||||||
|
ROUND((percentile_cont(0.5) WITHIN GROUP (ORDER BY ratio))::numeric, 3)
|
||||||
|
AS median_ratio,
|
||||||
|
ROUND(MIN(ratio)::numeric, 3) AS min_ratio,
|
||||||
|
ROUND(MAX(ratio)::numeric, 3) AS max_ratio
|
||||||
|
FROM trip_ratios
|
||||||
|
GROUP BY source
|
||||||
|
ORDER BY source;
|
||||||
|
|
||||||
|
-- ── 2. Side-by-side sample of recent push vs poll trips for spot-checking.
|
||||||
|
-- If push distance_km is consistently ≈ 1.609× the great_circle for
|
||||||
|
-- short, near-straight trips while poll trips show smaller ratios,
|
||||||
|
-- the unit conversion is needed.
|
||||||
|
SELECT
|
||||||
|
source,
|
||||||
|
imei,
|
||||||
|
start_time,
|
||||||
|
ROUND(distance_km::numeric, 2) AS stored_km,
|
||||||
|
ROUND((
|
||||||
|
ST_Distance(start_geom::geography, end_geom::geography) / 1000.0
|
||||||
|
)::numeric, 2) AS straight_line_km,
|
||||||
|
ROUND((
|
||||||
|
distance_km
|
||||||
|
/ NULLIF(ST_Distance(start_geom::geography, end_geom::geography) / 1000.0, 0)
|
||||||
|
)::numeric, 2) AS ratio
|
||||||
|
FROM tracksolid.trips
|
||||||
|
WHERE start_time > NOW() - INTERVAL '7 days'
|
||||||
|
AND start_geom IS NOT NULL
|
||||||
|
AND end_geom IS NOT NULL
|
||||||
|
AND distance_km > 0.5
|
||||||
|
ORDER BY start_time DESC
|
||||||
|
LIMIT 40;
|
||||||
|
|
@ -31,6 +31,7 @@ MIGRATIONS = [
|
||||||
"05_enhancement_migration.sql", # new tables, OBD columns, dwh_gold expansion
|
"05_enhancement_migration.sql", # new tables, OBD columns, dwh_gold expansion
|
||||||
"06_business_analytics_migration.sql", # ops schema, dispatch_log, assigned_city
|
"06_business_analytics_migration.sql", # ops schema, dispatch_log, assigned_city
|
||||||
"07_analytics_views.sql", # Grafana-facing views in tracksolid.*
|
"07_analytics_views.sql", # Grafana-facing views in tracksolid.*
|
||||||
|
"08_fix_etl_vehicle_key.sql", # BUG-01: ETL TEXT→INTEGER mismatch on vehicle_key
|
||||||
]
|
]
|
||||||
|
|
||||||
# ── Tables that must exist before the service is allowed to start ─────────────
|
# ── Tables that must exist before the service is allowed to start ─────────────
|
||||||
|
|
|
||||||
|
|
@ -15,7 +15,7 @@ import time
|
||||||
from concurrent.futures import ThreadPoolExecutor
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
|
|
||||||
from ts_shared_rev import (
|
from ts_shared_rev import (
|
||||||
TARGET_ACCOUNT,
|
TARGETS,
|
||||||
api_post,
|
api_post,
|
||||||
get_conn,
|
get_conn,
|
||||||
get_token,
|
get_token,
|
||||||
|
|
@ -37,14 +37,31 @@ def run_audit():
|
||||||
log.error("Could not obtain API token. Check credentials.")
|
log.error("Could not obtain API token. Check credentials.")
|
||||||
return
|
return
|
||||||
|
|
||||||
# 1. Fetch all devices from API
|
# 1. Fetch all devices from API across every configured sub-account.
|
||||||
resp = api_post("jimi.user.device.list", {"target": TARGET_ACCOUNT}, token)
|
# [FIX-M19] The fleet spans multiple Tracksolid sub-accounts (e.g.
|
||||||
|
# fireside, Fireside@HQ, Fireside_MSA). Iterate TARGETS and dedupe by
|
||||||
|
# IMEI, keeping the most recently-seen record. Without this, devices
|
||||||
|
# registered under a non-primary sub-account never appear in the audit
|
||||||
|
# and never get upserted.
|
||||||
|
devices_by_imei: dict[str, dict] = {}
|
||||||
|
for target in TARGETS:
|
||||||
|
resp = api_post("jimi.user.device.list", {"target": target}, token)
|
||||||
if resp.get("code") != 0:
|
if resp.get("code") != 0:
|
||||||
log.error("API error: %s", resp)
|
log.warning("device.list failed for target=%s: code=%s msg=%s",
|
||||||
return
|
target, resp.get("code"), resp.get("message"))
|
||||||
|
continue
|
||||||
|
for d in (resp.get("result") or []):
|
||||||
|
imei = d.get("imei")
|
||||||
|
if imei:
|
||||||
|
devices_by_imei[imei] = d
|
||||||
|
log.info(" target=%s returned %d devices", target, len(resp.get("result") or []))
|
||||||
|
|
||||||
api_devices = resp.get("result") or []
|
api_devices = list(devices_by_imei.values())
|
||||||
log.info("API returned %d devices.", len(api_devices))
|
if not api_devices:
|
||||||
|
log.error("No devices returned from any target. Aborting.")
|
||||||
|
return
|
||||||
|
log.info("API returned %d unique devices across %d target(s).",
|
||||||
|
len(api_devices), len(TARGETS))
|
||||||
|
|
||||||
# 2. Fetch current DB state
|
# 2. Fetch current DB state
|
||||||
with get_conn() as conn:
|
with get_conn() as conn:
|
||||||
|
|
|
||||||
|
|
@ -543,7 +543,14 @@ async def push_trip_report(request: Request):
|
||||||
cur.execute("RELEASE SAVEPOINT sp")
|
cur.execute("RELEASE SAVEPOINT sp")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# [FIX-M11] API sends km. Store directly as distance_km.
|
# [BUG-03 — under investigation] The field is named `miles`
|
||||||
|
# but FIX-M11 claims it is already km. The polling endpoint
|
||||||
|
# was found to return metres despite docs (FIX-M16), so the
|
||||||
|
# webhook unit is suspect. Until the diagnostic
|
||||||
|
# db_audit/checks/bug03_webhook_distance_units.sql confirms
|
||||||
|
# the unit, store the raw value unchanged and rely on the
|
||||||
|
# diagnostic's ratio test to decide whether a /1.609 or
|
||||||
|
# /1000 conversion is needed.
|
||||||
distance_km = clean_num(item.get("miles"))
|
distance_km = clean_num(item.get("miles"))
|
||||||
|
|
||||||
begin_lat = clean_num(item.get("beginLat"))
|
begin_lat = clean_num(item.get("beginLat"))
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue