fleetfuel/migrations/01_fuel_schema.sql
kianiadee a5380aece6 fix(schema): one device per fill in v_fuel_fills (LATERAL + LIMIT 1)
A plate can map to multiple tracksolid.devices rows (vehicle re-fitted with a
new tracker), so the plain LEFT JOIN fanned out and double-counted litres/spend
(3345 view rows for 1888 live records). LATERAL ... ORDER BY enabled_flag,
updated_at LIMIT 1 picks one device per fill. Applied to prod as migration 02;
01 updated so fresh installs are correct. Verified: 1888 rows, 1775 matched.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-11 23:49:37 +03:00

210 lines
10 KiB
PL/PgSQL

-- 01_fuel_schema.sql — fleetfuel · WhatsApp fuel-record store (raw-jsonb-first)
-- ─────────────────────────────────────────────────────────────────────────────
-- The `fuel` schema: one raw-jsonb row per fuel record (the source PK `id` + the
-- full record as `raw`), plus DB-derived NORMALIZED columns the trigger fills from
-- `raw`. The feed is WhatsApp fuel-update messages (n8n CDC of the client's
-- `logistics_department.fuel_records`), which is messy — plate spacing, fuel-type
-- typos, ~30 department spellings — so the normalizers below are the single source
-- of truth, kept in the DB so reads never re-implement them.
--
-- fuel.records id + raw + derived (plate, liters, amount, fuel_type,
-- department, odometer, deleted_at, …)
-- fuel.ingest_state CDC watermark for import_fuel.py --changes
-- reporting.v_fuel_fills read view (joins devices by normalized plate)
-- reporting.v_fuel_efficiency per-vehicle km/litre from odometer deltas
--
-- Lives in the shared `tracksolid_db` so the existing dashboard_api and the
-- FleetOps SPA keep working. Idempotent: safe on a fresh DB and re-appliable live.
-- ─────────────────────────────────────────────────────────────────────────────
CREATE SCHEMA IF NOT EXISTS fuel;
CREATE SCHEMA IF NOT EXISTS reporting; -- shared read layer (the views live here for dashboard_api)
-- ── safe casts (bad WhatsApp values -> NULL, never error) ────────────────────
CREATE OR REPLACE FUNCTION fuel.to_num(p text)
RETURNS numeric LANGUAGE plpgsql IMMUTABLE AS $fn$
BEGIN
RETURN NULLIF(btrim(p), '')::numeric;
EXCEPTION WHEN others THEN
RETURN NULL;
END $fn$;
CREATE OR REPLACE FUNCTION fuel.to_ts(p text)
RETURNS timestamptz LANGUAGE plpgsql IMMUTABLE AS $fn$
BEGIN
RETURN NULLIF(btrim(p), '')::timestamptz;
EXCEPTION WHEN others THEN
RETURN NULL;
END $fn$;
-- ── normalizers (the single source of truth for the messy feed) ──────────────
-- Plate: upper, strip every non-alphanumeric ('KCA 542Q' -> 'KCA542Q'); drop junk
-- placeholders ('ANY VEH', 'NA', …) so they don't pollute the join.
CREATE OR REPLACE FUNCTION fuel.norm_plate(p text)
RETURNS text LANGUAGE sql IMMUTABLE PARALLEL SAFE AS $fn$
SELECT CASE
WHEN upper(regexp_replace(coalesce(p, ''), '[^A-Za-z0-9]', '', 'g'))
IN ('', 'ANYVEH', 'NA', 'NONE', 'NIL', 'NULL') THEN NULL
ELSE NULLIF(upper(regexp_replace(p, '[^A-Za-z0-9]', '', 'g')), '')
END
$fn$;
-- Fuel type: collapse the typo zoo into PETROL / DIESEL / VPOWER / OTHER / NULL.
CREATE OR REPLACE FUNCTION fuel.canon_fuel_type(p text)
RETURNS text LANGUAGE sql IMMUTABLE PARALLEL SAFE AS $fn$
WITH c AS (SELECT NULLIF(upper(regexp_replace(coalesce(p, ''), '[^A-Za-z]', '', 'g')), '') AS v)
SELECT CASE
WHEN v IS NULL THEN NULL
WHEN v LIKE '%VPOWER%' THEN 'VPOWER'
WHEN v LIKE '%PET%' THEN 'PETROL' -- PETROL, PETRO, PETROLI, /PETROL
WHEN v LIKE 'DI%' THEN 'DIESEL' -- DIESEL, DISEL, DISIEL, DISEIL, DIRSEL
ELSE 'OTHER'
END FROM c
$fn$;
-- Department: upper, punctuation -> space, collapse, then map known variants.
CREATE OR REPLACE FUNCTION fuel.canon_department(p text)
RETURNS text LANGUAGE sql IMMUTABLE PARALLEL SAFE AS $fn$
WITH c AS (SELECT NULLIF(btrim(regexp_replace(upper(coalesce(p, '')), '[^A-Z0-9]+', ' ', 'g')), '') AS v)
SELECT CASE
WHEN v IS NULL THEN NULL
WHEN v LIKE '%OSP%' AND v LIKE '%PATROL%' THEN 'OSP PATROL'
WHEN v LIKE 'ROLL%OUT' OR v IN ('ROLLOUT','ROLOUT') THEN 'ROLLOUT'
WHEN v LIKE 'ISP%' THEN 'ISP'
WHEN v LIKE 'PLANNING%' THEN 'PLANNING'
WHEN v LIKE 'OSP%' THEN 'OSP'
WHEN v LIKE 'FDS%' THEN 'FDS'
WHEN v LIKE 'DELIVER%' THEN 'DELIVERIES'
WHEN v LIKE 'HUAWEI%' THEN 'HUAWEI'
WHEN v LIKE 'AIRTEL%' THEN 'AIRTEL'
WHEN v LIKE 'REGION%' THEN 'REGIONAL'
WHEN v LIKE 'FTTH%' THEN 'FTTH'
WHEN v LIKE 'QEHS%' THEN 'QEHS'
WHEN v LIKE 'GENERAL%' THEN 'GENERAL'
WHEN v LIKE 'LOGISTIC%' THEN 'LOGISTICS'
ELSE v
END FROM c
$fn$;
-- ── records: raw-jsonb-first + trigger-derived normalized columns ────────────
CREATE TABLE IF NOT EXISTS fuel.records (
id bigint PRIMARY KEY,
raw jsonb NOT NULL,
record_datetime timestamptz,
car_raw text,
plate text,
liters numeric,
amount numeric,
fuel_type text,
fuel_type_raw text,
department text,
driver text,
odometer numeric,
deleted_at timestamptz,
message_fingerprint text,
ingested_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now()
);
-- Derive every normalized column from `raw` on write — the loader only supplies (id, raw).
CREATE OR REPLACE FUNCTION fuel.tg_records_derive()
RETURNS trigger LANGUAGE plpgsql AS $fn$
BEGIN
NEW.record_datetime := fuel.to_ts(NEW.raw->>'record_datetime');
NEW.car_raw := NEW.raw->>'car';
NEW.plate := fuel.norm_plate(NEW.raw->>'car');
NEW.liters := fuel.to_num(NEW.raw->>'liters');
NEW.amount := fuel.to_num(NEW.raw->>'amount');
NEW.fuel_type := fuel.canon_fuel_type(NEW.raw->>'fuel_type');
NEW.fuel_type_raw := NEW.raw->>'fuel_type';
NEW.department := fuel.canon_department(NEW.raw->>'department');
NEW.driver := NULLIF(btrim(NEW.raw->>'driver'), '');
NEW.odometer := fuel.to_num(NEW.raw->>'odometer');
NEW.deleted_at := fuel.to_ts(NEW.raw->>'deleted_at');
NEW.message_fingerprint := NEW.raw->>'message_fingerprint';
NEW.updated_at := now();
RETURN NEW;
END $fn$;
DROP TRIGGER IF EXISTS trg_records_derive ON fuel.records;
CREATE TRIGGER trg_records_derive BEFORE INSERT OR UPDATE ON fuel.records
FOR EACH ROW EXECUTE FUNCTION fuel.tg_records_derive();
CREATE INDEX IF NOT EXISTS ix_fuel_records_plate ON fuel.records (plate);
CREATE INDEX IF NOT EXISTS ix_fuel_records_datetime ON fuel.records (record_datetime);
CREATE INDEX IF NOT EXISTS ix_fuel_records_dept ON fuel.records (department);
CREATE INDEX IF NOT EXISTS ix_fuel_records_live ON fuel.records (record_datetime) WHERE deleted_at IS NULL;
-- ── CDC watermark for import_fuel.py --changes ───────────────────────────────
CREATE TABLE IF NOT EXISTS fuel.ingest_state (
key text PRIMARY KEY,
last_key text,
updated_at timestamptz NOT NULL DEFAULT now()
);
-- ── read view: live fills joined to the fleet by normalized plate ────────────
-- Joins devices so the Fuel Log tab reuses the same dims as the rest of FleetOps
-- (cost_centre / assigned_city / assigned_driver / vehicle_number — the columns
-- dashboard_api's _dim_filters expects). Keeps the fuel-native `department` and
-- `driver` (from the WhatsApp message) for fuel-specific filtering. Soft-deleted
-- rows are excluded here. Encapsulating the devices join means the read-only
-- staging role only needs SELECT on this view, not on tracksolid.devices.
CREATE OR REPLACE VIEW reporting.v_fuel_fills AS
SELECT r.id,
r.record_datetime,
r.record_datetime::date AS fuel_date,
r.plate,
d.vehicle_number,
d.cost_centre,
d.assigned_city,
d.assigned_driver,
d.imei,
r.department,
r.driver,
r.liters,
r.amount,
r.fuel_type,
r.odometer
FROM fuel.records r
-- LATERAL + LIMIT 1: a plate can map to more than one devices row (vehicle
-- re-fitted with a new tracker over time). Pick exactly ONE — prefer an
-- enabled, most-recently-updated device — so each fill yields a single row
-- (a plain LEFT JOIN fans out and double-counts litres/spend).
LEFT JOIN LATERAL (
SELECT dv.vehicle_number, dv.cost_centre, dv.assigned_city,
dv.driver_name AS assigned_driver, dv.imei
FROM tracksolid.devices dv
WHERE r.plate IS NOT NULL
AND fuel.norm_plate(dv.vehicle_number) = r.plate
ORDER BY dv.enabled_flag DESC NULLS LAST, dv.updated_at DESC NULLS LAST
LIMIT 1
) d ON true
WHERE r.deleted_at IS NULL;
-- ── per-vehicle fuel efficiency: km/litre from consecutive odometer readings ──
-- Defensive: only positive, plausible (<5000 km) odometer deltas yield km/litre.
CREATE OR REPLACE VIEW reporting.v_fuel_efficiency AS
WITH seq AS (
SELECT id, plate, vehicle_number, cost_centre, assigned_city,
record_datetime, fuel_date, odometer, liters, amount, fuel_type,
odometer - lag(odometer) OVER (PARTITION BY plate ORDER BY record_datetime, id) AS km_since_last
FROM reporting.v_fuel_fills
WHERE plate IS NOT NULL AND odometer IS NOT NULL AND odometer > 0
)
SELECT seq.*,
CASE WHEN liters > 0 AND km_since_last > 0 AND km_since_last < 5000
THEN round(km_since_last / liters, 2)
END AS km_per_litre
FROM seq;
-- ── read-only grant (staging dashboard_api connects as grafana_ro) ───────────
-- Mirrors migrations/18_grant_reporting_ro.sql in the tracksolid repo. Guarded +
-- idempotent. Only the reporting views are exposed (not fuel.records) — the views
-- run with the owner's rights, so grafana_ro needs nothing on the base tables.
DO $grants$
BEGIN
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'grafana_ro') THEN
GRANT USAGE ON SCHEMA reporting TO grafana_ro;
GRANT SELECT ON reporting.v_fuel_fills, reporting.v_fuel_efficiency TO grafana_ro;
END IF;
END $grants$;