fleet-platform/db/migrations/20260601000007_serve_fn_live_view.sql

139 lines
5 KiB
MySQL
Raw Permalink Normal View History

-- migrate:up
-- serve.fn_live_view — single source of truth for the live dashboard.
-- Implements PRD F1.6 dedup (tracker-first → 24h freshness → camera fallback
-- → most-recent fix → activation_time tiebreak) AND PRD F1.11 lifecycle
-- filter (lifecycle='active' only).
CREATE OR REPLACE FUNCTION serve.fn_live_view(filters jsonb)
RETURNS jsonb
LANGUAGE plpgsql STABLE
AS $$
DECLARE
fresh_window interval := COALESCE((filters->>'fresh_window')::interval, interval '24 hours');
offline_after interval := COALESCE((filters->>'offline_after')::interval, interval '5 minutes');
move_speed_kmh numeric := COALESCE((filters->>'move_speed_kmh')::numeric, 5);
p_cost_centre text := filters->>'cost_centre';
p_assigned_city text := filters->>'assigned_city';
p_vehicle_numbers text[] := CASE
WHEN filters ? 'vehicle_numbers'
THEN ARRAY(SELECT jsonb_array_elements_text(filters->'vehicle_numbers'))
ELSE NULL
END;
result jsonb;
BEGIN
WITH candidates AS (
SELECT
lp.imei,
lp.occurred_at,
lp.geom,
lp.speed_kmh,
lp.direction_deg,
d.device_type,
d.activation_at,
v.vehicle_id,
v.plate,
v.cost_centre,
v.assigned_city
FROM state.live_positions lp
JOIN domain.devices d ON d.imei = lp.imei
JOIN domain.vehicles v ON v.vehicle_id = d.vehicle_id
WHERE d.lifecycle = 'active'
AND (p_cost_centre IS NULL OR v.cost_centre = p_cost_centre)
AND (p_assigned_city IS NULL OR v.assigned_city = p_assigned_city)
AND (p_vehicle_numbers IS NULL OR v.plate = ANY (p_vehicle_numbers))
),
ranked AS (
SELECT
c.*,
ROW_NUMBER() OVER (
PARTITION BY c.vehicle_id
ORDER BY
-- tracker-first
CASE c.device_type WHEN 'tracker' THEN 0 ELSE 1 END,
-- 24h freshness gate: fresh tracker beats stale tracker
CASE WHEN c.occurred_at > now() - fresh_window THEN 0 ELSE 1 END,
-- intra-type tiebreak by most-recent fix
c.occurred_at DESC,
-- final tiebreak by activation_time
c.activation_at DESC NULLS LAST
) AS rn
FROM candidates c
),
deduped AS (
SELECT * FROM ranked WHERE rn = 1
),
enriched AS (
SELECT
d.*,
CASE
WHEN d.occurred_at <= now() - offline_after THEN 'offline'
WHEN d.speed_kmh IS NOT NULL AND d.speed_kmh > move_speed_kmh THEN 'moving'
ELSE 'parked'
END AS operational_state
FROM deduped d
),
summary AS (
SELECT jsonb_build_object(
'total_active', count(*),
'moving', count(*) FILTER (WHERE operational_state = 'moving'),
'parked', count(*) FILTER (WHERE operational_state = 'parked'),
'offline', count(*) FILTER (WHERE operational_state = 'offline'),
'below_freshness_slo', count(*) FILTER (
WHERE occurred_at <= now() - interval '90 seconds'
),
'as_of', to_char(now() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS"Z"')
) AS s
FROM enriched
),
features AS (
SELECT COALESCE(jsonb_agg(
jsonb_build_object(
'type', 'Feature',
'geometry', ST_AsGeoJSON(e.geom)::jsonb,
'properties', jsonb_build_object(
'vehicle_id', e.vehicle_id,
'plate', e.plate,
'imei', e.imei,
'device_type', e.device_type,
'cost_centre', e.cost_centre,
'assigned_city', e.assigned_city,
'occurred_at', to_char(e.occurred_at AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS"Z"'),
'speed_kmh', e.speed_kmh,
'direction_deg', e.direction_deg,
'operational_state', e.operational_state,
'style_class', 'vehicle-' || e.operational_state
)
)
), '[]'::jsonb) AS feats
FROM enriched e
),
slo_block AS (
SELECT COALESCE(jsonb_object_agg(
metric,
jsonb_build_object(
'threshold', threshold,
'current', current_value,
'status', status
)
), '{}'::jsonb) AS ss
FROM slo.v_current_status
)
SELECT jsonb_build_object(
'summary', (SELECT s FROM summary),
'geojson', jsonb_build_object(
'type', 'FeatureCollection',
'features', (SELECT feats FROM features)
),
'slo_status', (SELECT ss FROM slo_block)
)
INTO result;
RETURN result;
END;
$$;
-- migrate:down
DROP FUNCTION IF EXISTS serve.fn_live_view(jsonb);