commit 9943932200e89964a6b46e2787a4c5b43ef4e5c6 Author: kianiadee Date: Thu Jun 11 23:24:33 2026 +0300 Initial fleetfuel: rustfs `fuel` bucket → DB → FleetOps Fuel Log Self-contained ingestion module (mirrors fleettickets) for the WhatsApp fuel-record feed in the rustfs `fuel` bucket: - import_fuel.py — snapshot/changes/file modes, raw-jsonb upsert on id - migrations/01_fuel_schema.sql — fuel schema, plate/fuel-type/department normalizers, trigger-derived columns, reporting.v_fuel_fills + v_fuel_efficiency, grafana_ro grants - s3util.py / shared.py / run_migrations.py — rustfs client + DB helpers - docs/plan.html — implementation plan Co-Authored-By: Claude Opus 4.8 diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..6c633fb --- /dev/null +++ b/.env.example @@ -0,0 +1,11 @@ +# fleetfuel — copy to .env and fill in. NEVER commit the real .env. + +# Shared database (the `fuel` schema lives in tracksolid_db; internal Docker host) +DATABASE_URL=postgresql://tracksolid_owner:@timescale_db:5432/tracksolid_db + +# rustfs / S3 — source fuel records (fuel_records/latest.json + fuel_records/changes/*.json) +RUSTFS_ENDPOINT=https://s3.rahamafresh.com +RUSTFS_ACCESS_KEY= +RUSTFS_SECRET_KEY= +RUSTFS_REGION=us-east-1 +FUEL_BUCKET=fuel diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4a76a19 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +.env +.DS_Store +__pycache__/ +*.pyc +.venv/ +.ruff_cache/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..af7a788 --- /dev/null +++ b/README.md @@ -0,0 +1,75 @@ +# fleetfuel + +Fleet **fuel-spend** ingestion and read-schema that powers the **Fuel Log** tab in +FleetOps. Sibling of **fleettickets** (same self-contained module shape). + +The feed is WhatsApp fuel-update messages, extracted by an n8n CDC job from the client's +`logistics_department.fuel_records` table and dropped in the rustfs `fuel` bucket. Each +record is an *actual* fill: litres, KES amount, odometer, fuel type, driver, department, +keyed by number plate (`car`). + +## What this owns + +| Piece | What | +|---|---| +| `migrations/01_fuel_schema.sql` | The `fuel` schema: `fuel.records` (raw-jsonb-first + trigger-derived columns), `fuel.norm_plate` / `fuel.canon_fuel_type` / `fuel.canon_department` normalizers, `fuel.ingest_state` (CDC watermark), and `reporting.v_fuel_fills` / `reporting.v_fuel_efficiency` (the read views) | +| `import_fuel.py` | Pulls fuel records from the rustfs `fuel` bucket and upserts them on `id` | +| `s3util.py` | rustfs (S3-compatible) client factory — path-style, custom endpoint | +| `run_migrations.py` | Applies `migrations/*.sql` in order (ledger: `fuel.schema_migrations`) | +| `shared.py` | Minimal DB/logging helpers (self-contained — no tracksolid dependency) | + +## What this does NOT own (stays where it is) + +- **The DB** — the `fuel` schema lives in the shared `tracksolid_db`. +- **The read-API** — `dashboard_api` (in the tracksolid stack) serves + `GET /analytics/fuel-fills`, which reads `reporting.v_fuel_fills` (defined here). +- **The frontend** — the Fuel Log map/panel is a tab in the **FleetOps** SPA (`fleetops` repo). + +## Data model (raw-first) + +Each row is `id` (the source PK) + `raw` (the full source record as `jsonb`) + derived, +*normalized* columns the DB trigger fills from `raw`. The WhatsApp feed is messy +(`KCA 542Q` vs `KCA542Q`, fuel-type typos like `DISIEL`, ~30 department spellings), so the +normalizers (`fuel.norm_plate`, `fuel.canon_fuel_type`, `fuel.canon_department`) are the single +source of truth. Soft-deletes (`deleted_at`) are kept on the row and excluded by the read views. + +The fuel record links to the fleet by plate: `reporting.v_fuel_fills` joins +`fuel.norm_plate(car)` to `fuel.norm_plate(devices.vehicle_number)` to pick up `cost_centre` / +`assigned_city` / `imei`. + +## Bucket layout + +``` +fuel_records/latest.json full snapshot { metadata, records[] } +fuel_records/changes/.json hourly CDC delta (incl. soft-deletes) +``` + +## Setup + +```bash +uv sync +cp .env.example .env # fill in DATABASE_URL, RUSTFS_* +python run_migrations.py # apply the schema (idempotent) +``` + +## Run + +```bash +# full reconcile from the snapshot (self-healing; the default cron job) +python import_fuel.py --snapshot --apply + +# incremental, since the stored watermark (lower latency) +python import_fuel.py --changes --apply + +# from a local file instead of the bucket +python import_fuel.py --file latest.json --apply +``` + +Drop `--apply` for a dry-run (parses + logs counts, writes nothing). + +## Deploy + +Like fleettickets: a Coolify container/cron in the tracksolid stack runs +`python run_migrations.py` then `python import_fuel.py --snapshot --apply` hourly (matching the +n8n export cadence). Env from the Coolify `.env`. The `dashboard_api` and `fleetops` apps ride +their own existing pipelines. diff --git a/docs/plan.html b/docs/plan.html new file mode 100644 index 0000000..36ba358 --- /dev/null +++ b/docs/plan.html @@ -0,0 +1,223 @@ + + + + + +FleetFuel — Implementation Plan + + + +
+ +
+
Implementation Plan · 17_fleetfuel
+

FleetFuel

+

Ingest the RustFS fuel bucket → shared database → a new Fuel Log tab in FleetOps.

+
+ RustFS fuel bucket + 17_fleetfuel ingestion + fuel schema · tracksolid_db + dashboard_api + FleetOps “Fuel Log” tab +
+
+ +

·Context

+

FleetOps (15_fleetops) is the fleet operations analytics SPA. It already has a +trip-derived fuel panel (GET /analytics/fuel), but that data is effectively empty — estimated +fuel needs devices.fuel_100km (NULL fleet-wide) and actual oils is sparse.

+

A real fuel-spend feed now lands in the RustFS fuel bucket: WhatsApp fuel-update messages, +extracted by an n8n CDC job from logistics_department.fuel_records1,922 rows (Feb–Jun 2026) +of actual fills (litres, KES amount, odometer, fuel type, driver, department), keyed by number plate.

+

Goal (full vertical): a new 17_fleetfuel module pulls the bucket into the shared +tracksolid_db under its own fuel schema, exposes it via dashboard_api, and adds a +new “Fuel Log” tab to FleetOps. The existing trip-derived panel stays as-is — the two coexist. +This mirrors the proven 16_fleettickets module pattern exactly.

+
Credential note: the RUSTFS_* keys were pasted in chat. They go +only in a gitignored .env (never committed), and the shared secret should be +rotated after this work, since plaintext-in-chat counts as exposed.
+ +

·The data (confirmed by reading the bucket)

+

Bucket layout (s3.rahamafresh.com, path-style, region us-east-1):

+
    +
  • fuel_records/latest.json — full snapshot, envelope { metadata, records[] }, ~1.7 MB / 1922 rows.
  • +
  • fuel_records/changes/<ISO-ts>.json — hourly CDC deltas (same envelope, includes soft-deletes).
  • +
  • .csv siblings exist, but we ingest the JSON (richer, typed).
  • +
+

Record shape (stable PK id):

+
id, record_datetime, department, driver, car, liters, amount, fuel_type, odometer,
+sender_name, sender_phone, raw_message, source, source_instance, source_message_id,
+source_event_timestamp, message_fingerprint, deleted_at, deleted_by, delete_reason,
+delete_source, created_at
+

The data is messy (WhatsApp-sourced) → normalization is essential:

+
    +
  • car: KCA 542Q vs KCA542Q, plus junk (ANY VEH). 162 distinct.
  • +
  • fuel_type: PETROL/DIESEL + typos (DISIEL, DISEL, PETRO, /PETROL, VPOWER, null).
  • +
  • department: ~30 case/spelling variants of ~12 real departments (OSP/osp/Osp, ROLL-OUT/ROLLOUT).
  • +
  • deleted_at set on 34 rows (soft-deleted — must be excluded from reporting).
  • +
+ +

·Pattern mirrored: 16_fleettickets

+

Self-contained Python module → reads a RustFS bucket → upserts raw-jsonb rows into a namespaced schema in the +shared tracksolid_db → idempotent migrations with a schema_migrations ledger → a +reporting.* view consumed by dashboard_api → surfaced as a FleetOps tab. Reuse: +shared.py (DB ctx-mgr + clean), run_migrations.py (ledger runner), the +dry-run/--apply CLI convention, and the .env/pyproject/README layout.

+ +

AIngestion repo — 17_fleetfuel new

+

Created in /Users/kianiadee/Downloads/projects/17_fleetfuel, files mirroring fleettickets:

+
    +
  • pyproject.tomlpsycopg2-binary, boto3 (the aws CLI isn’t available, and the CDC changes/ listing needs list_objects_v2 pagination, so boto3 beats CLI-subprocess). ruff dev dep.
  • +
  • shared.py — copy verbatim (get_conn, get_logger, clean); rename logger ns to fleetfuel.
  • +
  • run_migrations.py — copy; swap ledger to fuel.schema_migrations.
  • +
  • .env.exampleDATABASE_URL, RUSTFS_ENDPOINT/ACCESS_KEY/SECRET_KEY/REGION, FUEL_BUCKET=fuel.
  • +
  • .gitignore.env, __pycache__, .venv.
  • +
  • README.md — what it owns vs. not (DB schema = ours; read-API = dashboard_api; frontend = fleetops).
  • +
  • migrations/01_fuel_schema.sql — see Part B.
  • +
  • import_fuel.py — the loader (below).
  • +
  • s3util.py (optional) — thin boto3 client factory (endpoint_url + path-style addressing).
  • +
+

import_fuel.py

+
    +
  • boto3 S3 client from RUSTFS_* env.
  • +
  • Default --snapshot: GET fuel_records/latest.json, upsert all records on id. At 1922 rows / hourly cadence this full reconcile is trivial and self-healing (picks up edits + soft-deletes) → simplest correct design.
  • +
  • --changes (optional, lower-latency): list fuel_records/changes/, process files newer than a watermark in fuel.ingest_state.
  • +
  • --file <path>: local JSON for dev/testing.
  • +
  • --apply writes; default is a dry-run logging parsed/valid/skipped counts.
  • +
  • Upsert via execute_values: INSERT … ON CONFLICT (id) DO UPDATE SET raw=EXCLUDED.raw, updated_at=now(). Derived/normalized columns populated by a DB trigger reading raw. Scrub JSON NaN → null first.
  • +
+ +

Bmigrations/01_fuel_schema.sql — the fuel schema new

+

Idempotent, lives in shared tracksolid_db:

+
    +
  • CREATE SCHEMA IF NOT EXISTS fuel; + reporting;
  • +
  • Normalizer functions (IMMUTABLE, single source of truth): +
      +
    • fuel.norm_plate(text) → upper, strip non-alphanumeric (KCA 542QKCA542Q); null out junk.
    • +
    • fuel.canon_fuel_type(text) → map typos to PETROL / DIESEL / VPOWER / OTHER / NULL.
    • +
    • fuel.canon_department(text) → upper + collapse-ws + variant map to canonical set.
    • +
    +
  • +
  • fuel.records — raw-first + derived columns populated by a BEFORE INSERT/UPDATE trigger from raw: id, raw, record_datetime, plate, car_raw, liters, amount, fuel_type, department, driver, odometer, deleted_at, message_fingerprint, ingested_at, updated_at. Indexes on plate, record_datetime, department, partial WHERE deleted_at IS NULL.
  • +
  • fuel.ingest_state — watermark for --changes mode.
  • +
  • reporting.v_fuel_fills — read view: fuel.records (deleted_at IS NULL) LEFT JOIN tracksolid.devices d ON fuel.norm_plate(d.vehicle_number) = r.plate, exposing fuel_date, plate, vehicle_number, cost_centre, assigned_city, imei, department, driver, liters, amount, fuel_type, odometer. Same filter contract as reporting.v_daily_summary.
  • +
  • reporting.v_fuel_efficiency (optional, high-value) — per-plate window over record_datetime: km = odometer − lag(odometer), km_per_litre = km / liters, with defensive bounds.
  • +
  • GrantsUSAGE + SELECT on the views to dashboard_ro (mirror tracksolid migration 18).
  • +
+ +

Cdashboard_api read endpoints edit

+

In dashboard_api_rev.py, add endpoints reusing _analytics_window + _dim_filters + RealDictCursor:

+
    +
  • GET /analytics/fuel-fillsperiod/start/end + dims + optional department, fuel_type. Returns: totals (litres, spend_kes, fills, avg_price_per_litre, vehicles_fuelled), rows (per-vehicle), by_department, trend (daily litres + spend), data_status (unmatched-plate count).
  • +
  • GET /analytics/fuel-fills/recent — recent N fills for the detail table.
  • +
  • Extend GET /analytics/filters to also return departments and fuel_types.
  • +
+

No business logic in the API — it only selects from the reporting.* views.

+ +

DFleetOps SPA — new “Fuel Log” tab edit

+

In 15_fleetops/src/index.html (single-file SPA, inline JS + Chart.js). Leave the existing fuel panel untouched; add a “Fuel Log” tab:

+
    +
  • KPI strip: total litres, total KES spend, fills, avg KES/litre, vehicles fuelled.
  • +
  • Trend chart (spend + litres) — reuse the utilisation panel’s Chart.js setup.
  • +
  • Per-vehicle table (litres, spend, fills, last odometer, km/l) + by-department breakdown.
  • +
  • Recent-fills detail table from /analytics/fuel-fills/recent.
  • +
  • Wire to the shared filter state + new department / fuel-type dropdowns; calls go through the existing API_BASE mechanism.
  • +
+ +

EGit & deploy

+
    +
  • git init in 17_fleetfuel, add repo.rahamafresh.com/kianiadee/fleetfuel.git as origin, push (repo is currently empty).
  • +
  • Deploy like fleettickets: a Coolify container/cron in the stack runs run_migrations.py then import_fuel.py --snapshot --apply hourly (matching the CDC cadence).
  • +
  • dashboard_api + fleetops ride their existing Coolify pipelines (feature → stagingmain).
  • +
+ +

·Critical files

+ + + + + + + + +
FileAction
17_fleetfuel/import_fuel.py, shared.py, run_migrations.py, pyproject.toml, .env.example, README.mdnew mirror 16_fleettickets/*
17_fleetfuel/migrations/01_fuel_schema.sqlnew fuel schema + normalizers + reporting.v_fuel_fills
tracksolid_timescale_grafana_prod/dashboard_api_rev.pyedit add /analytics/fuel-fills[/recent], extend /analytics/filters
15_fleetops/src/index.htmledit add “Fuel Log” tab
+

Reuse: 16_fleettickets/shared.py, run_migrations.py, the _scrub_nan/upsert shape; dashboard_api_rev.py:444 _dim_filters, _analytics_window; the stdlib SigV4 reader already proven this session (fallback if boto3 is undesirable).

+ +

Verification (end-to-end)

+
    +
  1. Bucket read — already proven this session (listed 14 objects, parsed latest.json = 1922 rows).
  2. +
  3. Ingestion dry-runpython import_fuel.py --snapshot (no --apply): logs parsed/valid/skipped, no DB writes.
  4. +
  5. Migrate + applypython run_migrations.py then import_fuel.py --snapshot --apply. Spot-check: SELECT count(*), count(*) FILTER (WHERE deleted_at IS NULL) FROM fuel.records; (≈1922 / ≈1888) and plate-match rate in reporting.v_fuel_fills.
  6. +
  7. APIcurl "$API/analytics/fuel-fills?period=90d" → totals/rows/by_department/trend non-empty; /analytics/filters includes departments.
  8. +
  9. Frontend — build & run fleetops locally, open the Fuel Log tab, confirm KPIs/chart/tables render and filters drive the queries.
  10. +
  11. /verify the fleetops change once wired.
  12. +
+ +
FleetFuel implementation plan · generated 2026-06-11 · sibling of FleetOps / FleetTickets
+ +
+ + diff --git a/import_fuel.py b/import_fuel.py new file mode 100644 index 0000000..1f747f4 --- /dev/null +++ b/import_fuel.py @@ -0,0 +1,173 @@ +""" +import_fuel.py — Rahama Fresh · fleet fuel-record ingestion (raw-first) +━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ +Loads WhatsApp fuel-update records into the `fuel` schema — the source of the +FleetOps "Fuel Log" tab. The feed is produced by an n8n CDC job that exports the +client's `logistics_department.fuel_records` table to the rustfs `fuel` bucket. + +RAW-FIRST: each row stores `id` (the source PK) + `raw` (the full record as jsonb). +A DB trigger (see migrations/01_fuel_schema.sql) derives the normalized columns +(plate, liters, amount, fuel_type, department, odometer, deleted_at, …) from `raw`, +so a change to the source schema needs no loader change. + +Bucket layout (envelope `{ "metadata": {...}, "records": [...] }`): + fuel_records/latest.json full snapshot (~1.9k rows) + fuel_records/changes/.json hourly CDC deltas (incl. soft-deletes) + +Modes (need DATABASE_URL + RUSTFS_* env; see .env.example): + python import_fuel.py --snapshot --apply # default: full reconcile (self-healing) + python import_fuel.py --changes --apply # incremental, since the stored watermark + python import_fuel.py --file latest.json --apply # local file (dev/testing) +Dry-run (no --apply) parses + logs counts without writing. + +Pre-requisite: migration applied (run_migrations.py) — fuel.records + fuel.ingest_state ++ reporting.v_fuel_fills. +━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ +""" + +from __future__ import annotations + +import argparse +import json +import math + +import psycopg2.extras + +from s3util import bucket, get_s3 +from shared import get_conn, get_logger + +log = get_logger("import_fuel") + +SNAPSHOT_KEY = "fuel_records/latest.json" +CHANGES_PREFIX = "fuel_records/changes/" +_STATE_KEY = "changes" # row key in fuel.ingest_state for the changes watermark + + +# ── data loading ────────────────────────────────────────────────────────────── +def _records(payload: dict | list) -> list[dict]: + """Pull the records array out of the `{metadata, records}` envelope (or a bare list).""" + if isinstance(payload, dict): + recs = payload.get("records", []) + else: + recs = payload + return recs if isinstance(recs, list) else [] + + +def _load_local(path: str) -> list[dict]: + with open(path, encoding="utf-8") as f: + return _records(json.load(f)) # json.loads accepts NaN by default + + +def _load_s3_json(s3, key: str) -> list[dict]: + log.info("fetching s3://%s/%s", bucket(), key) + body = s3.get_object(Bucket=bucket(), Key=key)["Body"].read() + return _records(json.loads(body.decode("utf-8"))) + + +def _list_change_keys(s3, after: str | None) -> list[str]: + """Change-file keys (lexically > `after`), sorted. ISO-timestamp names sort chronologically.""" + keys: list[str] = [] + paginator = s3.get_paginator("list_objects_v2") + for page in paginator.paginate(Bucket=bucket(), Prefix=CHANGES_PREFIX): + for obj in page.get("Contents", []): + k = obj["Key"] + if k.endswith(".json") and (after is None or k > after): + keys.append(k) + return sorted(keys) + + +# ── upsert (raw-first) ──────────────────────────────────────────────────────── +def _scrub_nan(row: dict) -> dict: + # Postgres jsonb rejects the JSON `NaN` token — scrub to null. + return {k: (None if isinstance(v, float) and math.isnan(v) else v) for k, v in row.items()} + + +def upsert(rows: list[dict], apply: bool) -> int: + payload = [ + (rid, psycopg2.extras.Json(_scrub_nan(r))) + for r in rows + if (rid := r.get("id")) is not None + ] + log.info("fuel.records: %d valid rows (skipped %d without id)", + len(payload), len(rows) - len(payload)) + if not apply: + log.info("DRY-RUN — nothing written. Use --apply.") + return len(payload) + if not payload: + return 0 + with get_conn() as conn: + with conn.cursor() as cur: + psycopg2.extras.execute_values( + cur, + "INSERT INTO fuel.records (id, raw) VALUES %s " + "ON CONFLICT (id) DO UPDATE SET raw = EXCLUDED.raw, updated_at = now()", + payload, page_size=500, + ) + log.info("upserted %d rows into fuel.records", len(payload)) + return len(payload) + + +# ── watermark (fuel.ingest_state) ───────────────────────────────────────────── +def _get_watermark() -> str | None: + with get_conn() as conn: + with conn.cursor() as cur: + cur.execute("SELECT last_key FROM fuel.ingest_state WHERE key = %s", (_STATE_KEY,)) + row = cur.fetchone() + return row[0] if row else None + + +def _set_watermark(last_key: str) -> None: + with get_conn() as conn: + with conn.cursor() as cur: + cur.execute( + "INSERT INTO fuel.ingest_state (key, last_key) VALUES (%s, %s) " + "ON CONFLICT (key) DO UPDATE SET last_key = EXCLUDED.last_key, updated_at = now()", + (_STATE_KEY, last_key), + ) + + +# ── modes ───────────────────────────────────────────────────────────────────── +def ingest_snapshot(apply: bool) -> None: + s3 = get_s3() + upsert(_load_s3_json(s3, SNAPSHOT_KEY), apply) + + +def ingest_changes(apply: bool) -> None: + s3 = get_s3() + after = _get_watermark() if apply else None + keys = _list_change_keys(s3, after) + log.info("%d change file(s) to process (watermark=%s)", len(keys), after) + total = 0 + for key in keys: + total += upsert(_load_s3_json(s3, key), apply) + if apply: + _set_watermark(key) + log.info("changes: processed %d file(s), upserted %d rows", len(keys), total) + + +def ingest_file(path: str, apply: bool) -> None: + upsert(_load_local(path), apply) + + +# ── entrypoint ──────────────────────────────────────────────────────────────── +def main() -> None: + ap = argparse.ArgumentParser(description="Ingest fuel records (raw-first) from the rustfs bucket") + ap.add_argument("--apply", action="store_true", help="Write to DB (default: dry-run)") + mode = ap.add_mutually_exclusive_group() + mode.add_argument("--snapshot", action="store_true", + help="Full reconcile from fuel_records/latest.json (default)") + mode.add_argument("--changes", action="store_true", + help="Incremental: change files newer than the stored watermark") + mode.add_argument("--file", default=None, help="Local JSON file (snapshot or changes envelope)") + args = ap.parse_args() + + if args.file: + ingest_file(args.file, args.apply) + elif args.changes: + ingest_changes(args.apply) + else: # default + ingest_snapshot(args.apply) + + +if __name__ == "__main__": + main() diff --git a/migrations/01_fuel_schema.sql b/migrations/01_fuel_schema.sql new file mode 100644 index 0000000..fa6689c --- /dev/null +++ b/migrations/01_fuel_schema.sql @@ -0,0 +1,198 @@ +-- 01_fuel_schema.sql — fleetfuel · WhatsApp fuel-record store (raw-jsonb-first) +-- ───────────────────────────────────────────────────────────────────────────── +-- The `fuel` schema: one raw-jsonb row per fuel record (the source PK `id` + the +-- full record as `raw`), plus DB-derived NORMALIZED columns the trigger fills from +-- `raw`. The feed is WhatsApp fuel-update messages (n8n CDC of the client's +-- `logistics_department.fuel_records`), which is messy — plate spacing, fuel-type +-- typos, ~30 department spellings — so the normalizers below are the single source +-- of truth, kept in the DB so reads never re-implement them. +-- +-- fuel.records id + raw + derived (plate, liters, amount, fuel_type, +-- department, odometer, deleted_at, …) +-- fuel.ingest_state CDC watermark for import_fuel.py --changes +-- reporting.v_fuel_fills read view (joins devices by normalized plate) +-- reporting.v_fuel_efficiency per-vehicle km/litre from odometer deltas +-- +-- Lives in the shared `tracksolid_db` so the existing dashboard_api and the +-- FleetOps SPA keep working. Idempotent: safe on a fresh DB and re-appliable live. +-- ───────────────────────────────────────────────────────────────────────────── + +CREATE SCHEMA IF NOT EXISTS fuel; +CREATE SCHEMA IF NOT EXISTS reporting; -- shared read layer (the views live here for dashboard_api) + +-- ── safe casts (bad WhatsApp values -> NULL, never error) ──────────────────── +CREATE OR REPLACE FUNCTION fuel.to_num(p text) + RETURNS numeric LANGUAGE plpgsql IMMUTABLE AS $fn$ +BEGIN + RETURN NULLIF(btrim(p), '')::numeric; +EXCEPTION WHEN others THEN + RETURN NULL; +END $fn$; + +CREATE OR REPLACE FUNCTION fuel.to_ts(p text) + RETURNS timestamptz LANGUAGE plpgsql IMMUTABLE AS $fn$ +BEGIN + RETURN NULLIF(btrim(p), '')::timestamptz; +EXCEPTION WHEN others THEN + RETURN NULL; +END $fn$; + +-- ── normalizers (the single source of truth for the messy feed) ────────────── +-- Plate: upper, strip every non-alphanumeric ('KCA 542Q' -> 'KCA542Q'); drop junk +-- placeholders ('ANY VEH', 'NA', …) so they don't pollute the join. +CREATE OR REPLACE FUNCTION fuel.norm_plate(p text) + RETURNS text LANGUAGE sql IMMUTABLE PARALLEL SAFE AS $fn$ + SELECT CASE + WHEN upper(regexp_replace(coalesce(p, ''), '[^A-Za-z0-9]', '', 'g')) + IN ('', 'ANYVEH', 'NA', 'NONE', 'NIL', 'NULL') THEN NULL + ELSE NULLIF(upper(regexp_replace(p, '[^A-Za-z0-9]', '', 'g')), '') + END +$fn$; + +-- Fuel type: collapse the typo zoo into PETROL / DIESEL / VPOWER / OTHER / NULL. +CREATE OR REPLACE FUNCTION fuel.canon_fuel_type(p text) + RETURNS text LANGUAGE sql IMMUTABLE PARALLEL SAFE AS $fn$ + WITH c AS (SELECT NULLIF(upper(regexp_replace(coalesce(p, ''), '[^A-Za-z]', '', 'g')), '') AS v) + SELECT CASE + WHEN v IS NULL THEN NULL + WHEN v LIKE '%VPOWER%' THEN 'VPOWER' + WHEN v LIKE '%PET%' THEN 'PETROL' -- PETROL, PETRO, PETROLI, /PETROL + WHEN v LIKE 'DI%' THEN 'DIESEL' -- DIESEL, DISEL, DISIEL, DISEIL, DIRSEL + ELSE 'OTHER' + END FROM c +$fn$; + +-- Department: upper, punctuation -> space, collapse, then map known variants. +CREATE OR REPLACE FUNCTION fuel.canon_department(p text) + RETURNS text LANGUAGE sql IMMUTABLE PARALLEL SAFE AS $fn$ + WITH c AS (SELECT NULLIF(btrim(regexp_replace(upper(coalesce(p, '')), '[^A-Z0-9]+', ' ', 'g')), '') AS v) + SELECT CASE + WHEN v IS NULL THEN NULL + WHEN v LIKE '%OSP%' AND v LIKE '%PATROL%' THEN 'OSP PATROL' + WHEN v LIKE 'ROLL%OUT' OR v IN ('ROLLOUT','ROLOUT') THEN 'ROLLOUT' + WHEN v LIKE 'ISP%' THEN 'ISP' + WHEN v LIKE 'PLANNING%' THEN 'PLANNING' + WHEN v LIKE 'OSP%' THEN 'OSP' + WHEN v LIKE 'FDS%' THEN 'FDS' + WHEN v LIKE 'DELIVER%' THEN 'DELIVERIES' + WHEN v LIKE 'HUAWEI%' THEN 'HUAWEI' + WHEN v LIKE 'AIRTEL%' THEN 'AIRTEL' + WHEN v LIKE 'REGION%' THEN 'REGIONAL' + WHEN v LIKE 'FTTH%' THEN 'FTTH' + WHEN v LIKE 'QEHS%' THEN 'QEHS' + WHEN v LIKE 'GENERAL%' THEN 'GENERAL' + WHEN v LIKE 'LOGISTIC%' THEN 'LOGISTICS' + ELSE v + END FROM c +$fn$; + +-- ── records: raw-jsonb-first + trigger-derived normalized columns ──────────── +CREATE TABLE IF NOT EXISTS fuel.records ( + id bigint PRIMARY KEY, + raw jsonb NOT NULL, + record_datetime timestamptz, + car_raw text, + plate text, + liters numeric, + amount numeric, + fuel_type text, + fuel_type_raw text, + department text, + driver text, + odometer numeric, + deleted_at timestamptz, + message_fingerprint text, + ingested_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz NOT NULL DEFAULT now() +); + +-- Derive every normalized column from `raw` on write — the loader only supplies (id, raw). +CREATE OR REPLACE FUNCTION fuel.tg_records_derive() + RETURNS trigger LANGUAGE plpgsql AS $fn$ +BEGIN + NEW.record_datetime := fuel.to_ts(NEW.raw->>'record_datetime'); + NEW.car_raw := NEW.raw->>'car'; + NEW.plate := fuel.norm_plate(NEW.raw->>'car'); + NEW.liters := fuel.to_num(NEW.raw->>'liters'); + NEW.amount := fuel.to_num(NEW.raw->>'amount'); + NEW.fuel_type := fuel.canon_fuel_type(NEW.raw->>'fuel_type'); + NEW.fuel_type_raw := NEW.raw->>'fuel_type'; + NEW.department := fuel.canon_department(NEW.raw->>'department'); + NEW.driver := NULLIF(btrim(NEW.raw->>'driver'), ''); + NEW.odometer := fuel.to_num(NEW.raw->>'odometer'); + NEW.deleted_at := fuel.to_ts(NEW.raw->>'deleted_at'); + NEW.message_fingerprint := NEW.raw->>'message_fingerprint'; + NEW.updated_at := now(); + RETURN NEW; +END $fn$; + +DROP TRIGGER IF EXISTS trg_records_derive ON fuel.records; +CREATE TRIGGER trg_records_derive BEFORE INSERT OR UPDATE ON fuel.records + FOR EACH ROW EXECUTE FUNCTION fuel.tg_records_derive(); + +CREATE INDEX IF NOT EXISTS ix_fuel_records_plate ON fuel.records (plate); +CREATE INDEX IF NOT EXISTS ix_fuel_records_datetime ON fuel.records (record_datetime); +CREATE INDEX IF NOT EXISTS ix_fuel_records_dept ON fuel.records (department); +CREATE INDEX IF NOT EXISTS ix_fuel_records_live ON fuel.records (record_datetime) WHERE deleted_at IS NULL; + +-- ── CDC watermark for import_fuel.py --changes ─────────────────────────────── +CREATE TABLE IF NOT EXISTS fuel.ingest_state ( + key text PRIMARY KEY, + last_key text, + updated_at timestamptz NOT NULL DEFAULT now() +); + +-- ── read view: live fills joined to the fleet by normalized plate ──────────── +-- Joins devices so the Fuel Log tab reuses the same dims as the rest of FleetOps +-- (cost_centre / assigned_city / assigned_driver / vehicle_number — the columns +-- dashboard_api's _dim_filters expects). Keeps the fuel-native `department` and +-- `driver` (from the WhatsApp message) for fuel-specific filtering. Soft-deleted +-- rows are excluded here. Encapsulating the devices join means the read-only +-- staging role only needs SELECT on this view, not on tracksolid.devices. +CREATE OR REPLACE VIEW reporting.v_fuel_fills AS + SELECT r.id, + r.record_datetime, + r.record_datetime::date AS fuel_date, + r.plate, + d.vehicle_number, + d.cost_centre, + d.assigned_city, + d.driver_name AS assigned_driver, + d.imei, + r.department, + r.driver, + r.liters, + r.amount, + r.fuel_type, + r.odometer + FROM fuel.records r + LEFT JOIN tracksolid.devices d ON fuel.norm_plate(d.vehicle_number) = r.plate + WHERE r.deleted_at IS NULL; + +-- ── per-vehicle fuel efficiency: km/litre from consecutive odometer readings ── +-- Defensive: only positive, plausible (<5000 km) odometer deltas yield km/litre. +CREATE OR REPLACE VIEW reporting.v_fuel_efficiency AS + WITH seq AS ( + SELECT id, plate, vehicle_number, cost_centre, assigned_city, + record_datetime, fuel_date, odometer, liters, amount, fuel_type, + odometer - lag(odometer) OVER (PARTITION BY plate ORDER BY record_datetime, id) AS km_since_last + FROM reporting.v_fuel_fills + WHERE plate IS NOT NULL AND odometer IS NOT NULL AND odometer > 0 + ) + SELECT seq.*, + CASE WHEN liters > 0 AND km_since_last > 0 AND km_since_last < 5000 + THEN round(km_since_last / liters, 2) + END AS km_per_litre + FROM seq; + +-- ── read-only grant (staging dashboard_api connects as grafana_ro) ─────────── +-- Mirrors migrations/18_grant_reporting_ro.sql in the tracksolid repo. Guarded + +-- idempotent. Only the reporting views are exposed (not fuel.records) — the views +-- run with the owner's rights, so grafana_ro needs nothing on the base tables. +DO $grants$ +BEGIN + IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'grafana_ro') THEN + GRANT USAGE ON SCHEMA reporting TO grafana_ro; + GRANT SELECT ON reporting.v_fuel_fills, reporting.v_fuel_efficiency TO grafana_ro; + END IF; +END $grants$; diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..954f2d0 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,22 @@ +[project] +name = "fleetfuel" +version = "0.1.0" +description = "WhatsApp fuel-record ingestion (rustfs `fuel` bucket) + read-schema for the FleetOps Fuel Log tab" +requires-python = ">=3.12" +dependencies = [ + "psycopg2-binary>=2.9.9", # DB driver + "boto3>=1.34", # S3 client for the rustfs `fuel` bucket +] + +[project.optional-dependencies] +dev = [ + "ruff>=0.4", +] + +[tool.uv] +managed = true + +[tool.ruff] +target-version = "py312" +line-length = 100 +lint.select = ["E", "W", "F", "B", "UP"] diff --git a/run_migrations.py b/run_migrations.py new file mode 100644 index 0000000..005e579 --- /dev/null +++ b/run_migrations.py @@ -0,0 +1,56 @@ +""" +run_migrations.py — fleetfuel · apply SQL migrations in order. + +Applies migrations/*.sql (lexical order) against DATABASE_URL, tracking applied +files in fuel.schema_migrations. Migrations are idempotent, so re-running is +safe. Run: `python run_migrations.py`. +""" + +from __future__ import annotations + +import glob +import os + +import psycopg2 + +MIG_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "migrations") + + +def main() -> None: + dsn = os.environ.get("DATABASE_URL") + if not dsn: + raise SystemExit("DATABASE_URL is not set") + conn = psycopg2.connect(dsn) + conn.autocommit = False + try: + with conn.cursor() as cur: + cur.execute("CREATE SCHEMA IF NOT EXISTS fuel") + cur.execute( + "CREATE TABLE IF NOT EXISTS fuel.schema_migrations " + "(filename text PRIMARY KEY, applied_at timestamptz NOT NULL DEFAULT now())" + ) + conn.commit() + cur.execute("SELECT filename FROM fuel.schema_migrations") + applied = {r[0] for r in cur.fetchall()} + + for path in sorted(glob.glob(os.path.join(MIG_DIR, "*.sql"))): + fn = os.path.basename(path) + if fn in applied: + print(f" skip {fn}") + continue + print(f" apply {fn}") + with open(path, encoding="utf-8") as f: + cur.execute(f.read()) + cur.execute( + "INSERT INTO fuel.schema_migrations (filename) VALUES (%s) " + "ON CONFLICT DO NOTHING", + (fn,), + ) + conn.commit() + print("migrations up to date.") + finally: + conn.close() + + +if __name__ == "__main__": + main() diff --git a/s3util.py b/s3util.py new file mode 100644 index 0000000..5e430bf --- /dev/null +++ b/s3util.py @@ -0,0 +1,31 @@ +""" +s3util.py — fleetfuel · rustfs (S3-compatible) client factory. + +rustfs needs path-style addressing and a custom endpoint. Credentials and +endpoint come from the RUSTFS_* env vars (see .env.example). Kept in one place +so import_fuel.py just asks for a ready client. +""" + +from __future__ import annotations + +import os + +import boto3 +from botocore.config import Config + + +def get_s3(): + """Return a boto3 S3 client configured for the rustfs endpoint (path-style).""" + endpoint = os.environ["RUSTFS_ENDPOINT"] + return boto3.client( + "s3", + endpoint_url=endpoint, + aws_access_key_id=os.environ["RUSTFS_ACCESS_KEY"], + aws_secret_access_key=os.environ["RUSTFS_SECRET_KEY"], + region_name=os.getenv("RUSTFS_REGION", "us-east-1"), + config=Config(s3={"addressing_style": "path"}, signature_version="s3v4"), + ) + + +def bucket() -> str: + return os.getenv("FUEL_BUCKET", "fuel") diff --git a/shared.py b/shared.py new file mode 100644 index 0000000..2e0a43b --- /dev/null +++ b/shared.py @@ -0,0 +1,53 @@ +""" +shared.py — fleetfuel · minimal DB + helper utilities. + +Self-contained (mirrors 16_fleettickets/shared.py) so fleetfuel stands alone. +The DB connection comes from DATABASE_URL (points at the shared `tracksolid_db`, +where the `fuel` schema lives alongside `tracksolid` / `reporting` / `tickets`). +""" + +from __future__ import annotations + +import logging +import os +from contextlib import contextmanager +from typing import Any, Optional + +import psycopg2 + + +def get_logger(name: str) -> logging.Logger: + logger = logging.getLogger(f"fleetfuel.{name}") + if not logger.handlers: + h = logging.StreamHandler() + h.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(name)s — %(message)s", + datefmt="%Y-%m-%d %H:%M:%S")) + logger.addHandler(h) + logger.setLevel(os.getenv("LOG_LEVEL", "INFO")) + return logger + + +@contextmanager +def get_conn(): + """DB connection context manager. Auto-commits on success, rolls back on error.""" + dsn = os.environ.get("DATABASE_URL") + if not dsn: + raise RuntimeError("DATABASE_URL is not set") + conn = psycopg2.connect(dsn) + try: + conn.autocommit = False + yield conn + conn.commit() + except Exception: + conn.rollback() + raise + finally: + conn.close() + + +def clean(v: Any) -> Optional[str]: + """Trimmed string, or None if empty/None.""" + if v is None: + return None + s = str(v).strip() + return s if s != "" else None