-- ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ -- Migration 05 — Schema Enhancements for Expanded Ingestion -- ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ -- Adds columns and tables to support: -- • Normalized OBD scalar fields (from /pushobd JSONB payload) -- • Alarm enrichment (severity, geofence context, acknowledgement) -- • Vehicle enrichment (category, cost centre, depot location) -- • New webhook endpoints: /pushevent, /pushoil, /pushtem, /pushlbs -- • Geofence definition storage -- • dwh_gold fact table expansion for full daily KPI reporting -- -- Run after migration 04. Safe to re-run (uses IF NOT EXISTS / DO NOTHING). -- ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ BEGIN; -- ── 1. Normalize OBD scalar fields ─────────────────────────────────────────── -- These are extracted from the obd_data JSONB column during /pushobd ingestion. -- Raw JSONB is retained for full fidelity. Common OBD PID values only. ALTER TABLE tracksolid.obd_readings ADD COLUMN IF NOT EXISTS engine_rpm INTEGER, ADD COLUMN IF NOT EXISTS coolant_temp_c NUMERIC(6,2), ADD COLUMN IF NOT EXISTS fuel_level_pct NUMERIC(5,2), ADD COLUMN IF NOT EXISTS battery_voltage NUMERIC(5,2), ADD COLUMN IF NOT EXISTS intake_pressure NUMERIC(6,2), ADD COLUMN IF NOT EXISTS throttle_pct NUMERIC(5,2), ADD COLUMN IF NOT EXISTS vehicle_speed NUMERIC(7,2), ADD COLUMN IF NOT EXISTS engine_load_pct NUMERIC(5,2); COMMENT ON COLUMN tracksolid.obd_readings.engine_rpm IS 'Engine RPM from OBD PID 0x0C'; COMMENT ON COLUMN tracksolid.obd_readings.coolant_temp_c IS 'Coolant temperature °C from OBD PID 0x05'; COMMENT ON COLUMN tracksolid.obd_readings.fuel_level_pct IS 'Fuel tank level % from OBD PID 0x2F'; COMMENT ON COLUMN tracksolid.obd_readings.battery_voltage IS 'Battery voltage (V) from OBD PID 0x42'; COMMENT ON COLUMN tracksolid.obd_readings.intake_pressure IS 'Intake manifold pressure kPa from OBD PID 0x0B'; COMMENT ON COLUMN tracksolid.obd_readings.throttle_pct IS 'Throttle position % from OBD PID 0x11'; COMMENT ON COLUMN tracksolid.obd_readings.vehicle_speed IS 'Vehicle speed km/h from OBD PID 0x0D'; COMMENT ON COLUMN tracksolid.obd_readings.engine_load_pct IS 'Calculated engine load % from OBD PID 0x04'; -- ── 2. Alarm enrichment ─────────────────────────────────────────────────────── ALTER TABLE tracksolid.alarms ADD COLUMN IF NOT EXISTS severity TEXT, ADD COLUMN IF NOT EXISTS geofence_id TEXT, ADD COLUMN IF NOT EXISTS geofence_name TEXT, ADD COLUMN IF NOT EXISTS acknowledged_at TIMESTAMPTZ, ADD COLUMN IF NOT EXISTS acknowledged_by TEXT; COMMENT ON COLUMN tracksolid.alarms.severity IS 'Alarm severity level: critical | warning | info'; COMMENT ON COLUMN tracksolid.alarms.geofence_id IS 'Tracksolid geofence ID if this is a geofence alarm'; COMMENT ON COLUMN tracksolid.alarms.geofence_name IS 'Human-readable geofence name'; COMMENT ON COLUMN tracksolid.alarms.acknowledged_at IS 'Timestamp when alarm was acknowledged by an operator'; COMMENT ON COLUMN tracksolid.alarms.acknowledged_by IS 'Username or ID of operator who acknowledged the alarm'; -- ── 3. Vehicle enrichment ───────────────────────────────────────────────────── ALTER TABLE tracksolid.devices ADD COLUMN IF NOT EXISTS vehicle_category TEXT, ADD COLUMN IF NOT EXISTS cost_centre TEXT, ADD COLUMN IF NOT EXISTS assigned_route TEXT, ADD COLUMN IF NOT EXISTS depot_geom geometry(Point,4326), ADD COLUMN IF NOT EXISTS depot_address TEXT; COMMENT ON COLUMN tracksolid.devices.vehicle_category IS 'Vehicle type: truck | van | motorcycle | car | other'; COMMENT ON COLUMN tracksolid.devices.cost_centre IS 'Business unit or department this vehicle belongs to'; COMMENT ON COLUMN tracksolid.devices.assigned_route IS 'Regular route name or ID for route-based reporting'; COMMENT ON COLUMN tracksolid.devices.depot_geom IS 'Home base/depot coordinates (WGS84)'; COMMENT ON COLUMN tracksolid.devices.depot_address IS 'Human-readable depot address'; -- ── 4. Device login/logout events (webhook /pushevent) ──────────────────────── CREATE TABLE IF NOT EXISTS tracksolid.device_events ( id BIGSERIAL PRIMARY KEY, imei TEXT NOT NULL REFERENCES tracksolid.devices(imei), event_type TEXT NOT NULL, -- 'LOGIN' | 'LOGOUT' event_time TIMESTAMPTZ NOT NULL, timezone TEXT, created_at TIMESTAMPTZ NOT NULL DEFAULT now(), UNIQUE (imei, event_type, event_time) ); CREATE INDEX IF NOT EXISTS idx_device_events_imei_time ON tracksolid.device_events (imei, event_time DESC); COMMENT ON TABLE tracksolid.device_events IS 'Device network connection and disconnection events from /pushevent webhook.'; COMMENT ON COLUMN tracksolid.device_events.event_type IS 'LOGIN = device connected to network; LOGOUT = device disconnected'; -- ── 5. Fuel sensor readings (webhook /pushoil) — hypertable ────────────────── CREATE TABLE IF NOT EXISTS tracksolid.fuel_readings ( imei TEXT NOT NULL REFERENCES tracksolid.devices(imei), reading_time TIMESTAMPTZ NOT NULL, sensor_path TEXT, value NUMERIC(10,3), unit TEXT, lat DOUBLE PRECISION, lng DOUBLE PRECISION, geom geometry(Point,4326), created_at TIMESTAMPTZ NOT NULL DEFAULT now(), PRIMARY KEY (imei, reading_time) ); COMMENT ON TABLE tracksolid.fuel_readings IS 'Fuel/oil sensor readings from /pushoil webhook. Unit varies per sensor: cm | % | V | L.'; COMMENT ON COLUMN tracksolid.fuel_readings.sensor_path IS 'Sensor channel identifier from the device (path field in API payload)'; COMMENT ON COLUMN tracksolid.fuel_readings.unit IS 'Measurement unit: cm (tank depth), % (percentage), V (voltage), L (litres)'; SELECT create_hypertable( 'tracksolid.fuel_readings', 'reading_time', chunk_time_interval => INTERVAL '7 days', if_not_exists => TRUE ); -- ── 6. Temperature & humidity readings (webhook /pushtem) — hypertable ──────── CREATE TABLE IF NOT EXISTS tracksolid.temperature_readings ( imei TEXT NOT NULL REFERENCES tracksolid.devices(imei), reading_time TIMESTAMPTZ NOT NULL, temperature NUMERIC(6,2), humidity_pct NUMERIC(5,2), created_at TIMESTAMPTZ NOT NULL DEFAULT now(), PRIMARY KEY (imei, reading_time) ); COMMENT ON TABLE tracksolid.temperature_readings IS 'Temperature and humidity sensor readings from /pushtem webhook. For cold-chain / refrigerated cargo monitoring.'; SELECT create_hypertable( 'tracksolid.temperature_readings', 'reading_time', chunk_time_interval => INTERVAL '7 days', if_not_exists => TRUE ); -- ── 7. LBS / cell-tower fallback positions (webhook /pushlbs) ──────────────── CREATE TABLE IF NOT EXISTS tracksolid.lbs_readings ( id BIGSERIAL PRIMARY KEY, imei TEXT NOT NULL REFERENCES tracksolid.devices(imei), gate_time TIMESTAMPTZ NOT NULL, post_type TEXT, lbs_data JSONB, created_at TIMESTAMPTZ NOT NULL DEFAULT now(), UNIQUE (imei, gate_time) ); CREATE INDEX IF NOT EXISTS idx_lbs_readings_imei_time ON tracksolid.lbs_readings (imei, gate_time DESC); COMMENT ON TABLE tracksolid.lbs_readings IS 'Cell tower / WiFi positioning fallback data from /pushlbs webhook. Used when GPS signal is unavailable.'; COMMENT ON COLUMN tracksolid.lbs_readings.post_type IS 'Positioning technology: WIFI | LBS (cell tower)'; COMMENT ON COLUMN tracksolid.lbs_readings.lbs_data IS 'Raw JSON payload containing MCC, MNC, and cell tower list for approximate geocoding.'; -- ── 8. Geofence definitions ─────────────────────────────────────────────────── CREATE TABLE IF NOT EXISTS tracksolid.geofences ( id BIGSERIAL PRIMARY KEY, fence_id TEXT UNIQUE, fence_name TEXT NOT NULL, fence_type TEXT, geom geometry(Geometry,4326), radius_m NUMERIC(10,2), description TEXT, created_at TIMESTAMPTZ NOT NULL DEFAULT now(), updated_at TIMESTAMPTZ NOT NULL DEFAULT now() ); COMMENT ON TABLE tracksolid.geofences IS 'Geofence boundary definitions synced from the Tracksolid platform.'; COMMENT ON COLUMN tracksolid.geofences.fence_type IS 'circle | polygon'; COMMENT ON COLUMN tracksolid.geofences.radius_m IS 'Radius in metres — only applicable for circle type geofences'; -- ── 9. Expand dwh_gold.fact_daily_fleet_metrics ─────────────────────────────── ALTER TABLE dwh_gold.fact_daily_fleet_metrics ADD COLUMN IF NOT EXISTS total_distance_km NUMERIC(12,3), ADD COLUMN IF NOT EXISTS total_trips INTEGER, ADD COLUMN IF NOT EXISTS total_drive_hours NUMERIC(8,2), ADD COLUMN IF NOT EXISTS total_idle_hours NUMERIC(8,2), ADD COLUMN IF NOT EXISTS fuel_consumed_l NUMERIC(10,3), ADD COLUMN IF NOT EXISTS alarm_count INTEGER, ADD COLUMN IF NOT EXISTS overspeed_count INTEGER, ADD COLUMN IF NOT EXISTS day_start_time TIME, ADD COLUMN IF NOT EXISTS day_end_time TIME, ADD COLUMN IF NOT EXISTS avg_speed_kmh NUMERIC(7,2), ADD COLUMN IF NOT EXISTS peak_speed_kmh NUMERIC(7,2); COMMENT ON COLUMN dwh_gold.fact_daily_fleet_metrics.total_distance_km IS 'Total km driven that day across all trips'; COMMENT ON COLUMN dwh_gold.fact_daily_fleet_metrics.total_trips IS 'Number of completed trips'; COMMENT ON COLUMN dwh_gold.fact_daily_fleet_metrics.total_drive_hours IS 'Total hours of active driving (engine on + moving)'; COMMENT ON COLUMN dwh_gold.fact_daily_fleet_metrics.total_idle_hours IS 'Total hours engine on but stationary'; COMMENT ON COLUMN dwh_gold.fact_daily_fleet_metrics.fuel_consumed_l IS 'Total fuel consumed in litres (from webhook trip reports)'; COMMENT ON COLUMN dwh_gold.fact_daily_fleet_metrics.alarm_count IS 'Total alarm events triggered that day'; COMMENT ON COLUMN dwh_gold.fact_daily_fleet_metrics.overspeed_count IS 'Number of overspeed alarm events'; COMMENT ON COLUMN dwh_gold.fact_daily_fleet_metrics.day_start_time IS 'Time of first trip start (Africa/Nairobi)'; COMMENT ON COLUMN dwh_gold.fact_daily_fleet_metrics.day_end_time IS 'Time of last trip end (Africa/Nairobi)'; COMMENT ON COLUMN dwh_gold.fact_daily_fleet_metrics.avg_speed_kmh IS 'Fleet average speed across all trips that day'; COMMENT ON COLUMN dwh_gold.fact_daily_fleet_metrics.peak_speed_kmh IS 'Highest max_speed_kmh recorded across all trips'; -- ── 10. ETL function — refresh daily metrics ────────────────────────────────── -- Populates dwh_gold.fact_daily_fleet_metrics for a given date. -- Call nightly: SELECT dwh_gold.refresh_daily_metrics(CURRENT_DATE - 1); CREATE OR REPLACE FUNCTION dwh_gold.refresh_daily_metrics(target_date DATE) RETURNS void LANGUAGE plpgsql AS $$ BEGIN INSERT INTO dwh_gold.fact_daily_fleet_metrics ( day, vehicle_key, total_distance_km, total_trips, total_drive_hours, total_idle_hours, fuel_consumed_l, alarm_count, overspeed_count, day_start_time, day_end_time, avg_speed_kmh, peak_speed_kmh ) SELECT target_date AS day, t.imei AS vehicle_key, ROUND(SUM(t.distance_km)::numeric, 3) AS total_distance_km, COUNT(*) AS total_trips, ROUND((SUM(t.driving_time_s) / 3600.0)::numeric, 2) AS total_drive_hours, ROUND((SUM(t.idle_time_s) / 3600.0)::numeric, 2) AS total_idle_hours, ROUND(SUM(t.fuel_consumed_l)::numeric, 3) AS fuel_consumed_l, COUNT(a.id) AS alarm_count, COUNT(a.id) FILTER (WHERE a.alarm_type ILIKE '%speed%') AS overspeed_count, MIN(t.start_time AT TIME ZONE 'Africa/Nairobi')::TIME AS day_start_time, MAX(t.end_time AT TIME ZONE 'Africa/Nairobi')::TIME AS day_end_time, ROUND(AVG(t.avg_speed_kmh)::numeric, 2) AS avg_speed_kmh, MAX(t.max_speed_kmh) AS peak_speed_kmh FROM tracksolid.trips t LEFT JOIN tracksolid.alarms a ON a.imei = t.imei AND DATE(a.alarm_time AT TIME ZONE 'Africa/Nairobi') = target_date WHERE DATE(t.start_time AT TIME ZONE 'Africa/Nairobi') = target_date AND t.end_time IS NOT NULL GROUP BY t.imei ON CONFLICT (day, vehicle_key) DO UPDATE SET total_distance_km = EXCLUDED.total_distance_km, total_trips = EXCLUDED.total_trips, total_drive_hours = EXCLUDED.total_drive_hours, total_idle_hours = EXCLUDED.total_idle_hours, fuel_consumed_l = EXCLUDED.fuel_consumed_l, alarm_count = EXCLUDED.alarm_count, overspeed_count = EXCLUDED.overspeed_count, day_start_time = EXCLUDED.day_start_time, day_end_time = EXCLUDED.day_end_time, avg_speed_kmh = EXCLUDED.avg_speed_kmh, peak_speed_kmh = EXCLUDED.peak_speed_kmh; END; $$; COMMENT ON FUNCTION dwh_gold.refresh_daily_metrics(DATE) IS 'Populates or refreshes fact_daily_fleet_metrics for the given date. ' 'Call nightly: SELECT dwh_gold.refresh_daily_metrics(CURRENT_DATE - 1);'; COMMIT;