Initial fleetfuel: rustfs fuel bucket → DB → FleetOps Fuel Log

Self-contained ingestion module (mirrors fleettickets) for the WhatsApp
fuel-record feed in the rustfs `fuel` bucket:

- import_fuel.py — snapshot/changes/file modes, raw-jsonb upsert on id
- migrations/01_fuel_schema.sql — fuel schema, plate/fuel-type/department
  normalizers, trigger-derived columns, reporting.v_fuel_fills +
  v_fuel_efficiency, grafana_ro grants
- s3util.py / shared.py / run_migrations.py — rustfs client + DB helpers
- docs/plan.html — implementation plan

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
kianiadee 2026-06-11 23:24:33 +03:00
commit 9943932200
10 changed files with 848 additions and 0 deletions

11
.env.example Normal file
View file

@ -0,0 +1,11 @@
# fleetfuel — copy to .env and fill in. NEVER commit the real .env.
# Shared database (the `fuel` schema lives in tracksolid_db; internal Docker host)
DATABASE_URL=postgresql://tracksolid_owner:<password>@timescale_db:5432/tracksolid_db
# rustfs / S3 — source fuel records (fuel_records/latest.json + fuel_records/changes/*.json)
RUSTFS_ENDPOINT=https://s3.rahamafresh.com
RUSTFS_ACCESS_KEY=<key>
RUSTFS_SECRET_KEY=<secret>
RUSTFS_REGION=us-east-1
FUEL_BUCKET=fuel

6
.gitignore vendored Normal file
View file

@ -0,0 +1,6 @@
.env
.DS_Store
__pycache__/
*.pyc
.venv/
.ruff_cache/

75
README.md Normal file
View file

@ -0,0 +1,75 @@
# fleetfuel
Fleet **fuel-spend** ingestion and read-schema that powers the **Fuel Log** tab in
FleetOps. Sibling of **fleettickets** (same self-contained module shape).
The feed is WhatsApp fuel-update messages, extracted by an n8n CDC job from the client's
`logistics_department.fuel_records` table and dropped in the rustfs `fuel` bucket. Each
record is an *actual* fill: litres, KES amount, odometer, fuel type, driver, department,
keyed by number plate (`car`).
## What this owns
| Piece | What |
|---|---|
| `migrations/01_fuel_schema.sql` | The `fuel` schema: `fuel.records` (raw-jsonb-first + trigger-derived columns), `fuel.norm_plate` / `fuel.canon_fuel_type` / `fuel.canon_department` normalizers, `fuel.ingest_state` (CDC watermark), and `reporting.v_fuel_fills` / `reporting.v_fuel_efficiency` (the read views) |
| `import_fuel.py` | Pulls fuel records from the rustfs `fuel` bucket and upserts them on `id` |
| `s3util.py` | rustfs (S3-compatible) client factory — path-style, custom endpoint |
| `run_migrations.py` | Applies `migrations/*.sql` in order (ledger: `fuel.schema_migrations`) |
| `shared.py` | Minimal DB/logging helpers (self-contained — no tracksolid dependency) |
## What this does NOT own (stays where it is)
- **The DB** — the `fuel` schema lives in the shared `tracksolid_db`.
- **The read-API**`dashboard_api` (in the tracksolid stack) serves
`GET /analytics/fuel-fills`, which reads `reporting.v_fuel_fills` (defined here).
- **The frontend** — the Fuel Log map/panel is a tab in the **FleetOps** SPA (`fleetops` repo).
## Data model (raw-first)
Each row is `id` (the source PK) + `raw` (the full source record as `jsonb`) + derived,
*normalized* columns the DB trigger fills from `raw`. The WhatsApp feed is messy
(`KCA 542Q` vs `KCA542Q`, fuel-type typos like `DISIEL`, ~30 department spellings), so the
normalizers (`fuel.norm_plate`, `fuel.canon_fuel_type`, `fuel.canon_department`) are the single
source of truth. Soft-deletes (`deleted_at`) are kept on the row and excluded by the read views.
The fuel record links to the fleet by plate: `reporting.v_fuel_fills` joins
`fuel.norm_plate(car)` to `fuel.norm_plate(devices.vehicle_number)` to pick up `cost_centre` /
`assigned_city` / `imei`.
## Bucket layout
```
fuel_records/latest.json full snapshot { metadata, records[] }
fuel_records/changes/<ISO-ts>.json hourly CDC delta (incl. soft-deletes)
```
## Setup
```bash
uv sync
cp .env.example .env # fill in DATABASE_URL, RUSTFS_*
python run_migrations.py # apply the schema (idempotent)
```
## Run
```bash
# full reconcile from the snapshot (self-healing; the default cron job)
python import_fuel.py --snapshot --apply
# incremental, since the stored watermark (lower latency)
python import_fuel.py --changes --apply
# from a local file instead of the bucket
python import_fuel.py --file latest.json --apply
```
Drop `--apply` for a dry-run (parses + logs counts, writes nothing).
## Deploy
Like fleettickets: a Coolify container/cron in the tracksolid stack runs
`python run_migrations.py` then `python import_fuel.py --snapshot --apply` hourly (matching the
n8n export cadence). Env from the Coolify `.env`. The `dashboard_api` and `fleetops` apps ride
their own existing pipelines.

223
docs/plan.html Normal file
View file

@ -0,0 +1,223 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>FleetFuel — Implementation Plan</title>
<style>
:root{
--bg:#15110c; --panel:#1d1812; --panel2:#241d15; --ink:#f3ebdd; --muted:#b8a98f;
--line:#3a2f22; --accent:#e8913c; --accent2:#f0b454; --green:#7fb069; --code:#120e09;
}
*{box-sizing:border-box}
html{scroll-behavior:smooth}
body{
margin:0; background:var(--bg); color:var(--ink);
font:16px/1.65 -apple-system,BlinkMacSystemFont,"Segoe UI",Roboto,Helvetica,Arial,sans-serif;
-webkit-font-smoothing:antialiased;
}
.wrap{max-width:920px; margin:0 auto; padding:48px 24px 96px}
header.hero{
border:1px solid var(--line); border-radius:16px; padding:32px 32px 28px;
background:linear-gradient(135deg,#221a11,#1a140d);
box-shadow:0 10px 40px rgba(0,0,0,.35);
}
.kicker{color:var(--accent2); font-weight:700; letter-spacing:.14em; text-transform:uppercase; font-size:12px}
h1{margin:.3em 0 .15em; font-size:34px; letter-spacing:-.02em}
.sub{color:var(--muted); font-size:16px; margin:0}
.pipeline{display:flex; flex-wrap:wrap; gap:8px; margin-top:20px}
.pipeline span{
background:var(--panel2); border:1px solid var(--line); border-radius:999px;
padding:6px 14px; font-size:13px; color:var(--ink); white-space:nowrap;
}
.pipeline .arrow{border:none; background:none; color:var(--accent); padding:6px 0}
h2{
margin:46px 0 14px; font-size:23px; padding-bottom:8px; border-bottom:1px solid var(--line);
display:flex; align-items:center; gap:12px;
}
h2 .num{
flex:0 0 auto; width:30px; height:30px; border-radius:8px; background:var(--accent);
color:#1a120a; font-size:15px; font-weight:800; display:grid; place-items:center;
}
h3{margin:26px 0 8px; font-size:17px; color:var(--accent2)}
p{margin:.6em 0}
code{
background:var(--code); color:#f0c992; padding:.12em .42em; border-radius:5px;
font:13.5px/1.5 ui-monospace,SFMono-Regular,Menlo,Consolas,monospace;
}
pre{
background:var(--code); border:1px solid var(--line); border-radius:10px; padding:16px;
overflow:auto; font:13px/1.6 ui-monospace,SFMono-Regular,Menlo,Consolas,monospace; color:#d8c7a6;
}
pre code{background:none; padding:0; color:inherit}
ul{margin:.5em 0; padding-left:1.3em}
li{margin:.3em 0}
a{color:var(--accent2)}
.note{
border-left:3px solid var(--accent); background:var(--panel2); padding:12px 16px;
border-radius:0 10px 10px 0; margin:18px 0; color:var(--ink);
}
.note.warn{border-left-color:#e0683c}
table{width:100%; border-collapse:collapse; margin:14px 0; font-size:14px}
th,td{text-align:left; padding:10px 12px; border-bottom:1px solid var(--line); vertical-align:top}
th{color:var(--accent2); font-size:12px; text-transform:uppercase; letter-spacing:.06em}
td code{font-size:12.5px}
.tag{display:inline-block; font-size:11px; font-weight:700; padding:2px 8px; border-radius:6px}
.tag.new{background:rgba(127,176,105,.18); color:var(--green)}
.tag.edit{background:rgba(232,145,60,.18); color:var(--accent2)}
footer{margin-top:60px; color:var(--muted); font-size:13px; text-align:center; border-top:1px solid var(--line); padding-top:20px}
.pill{color:var(--muted); font-size:13px}
</style>
</head>
<body>
<div class="wrap">
<header class="hero">
<div class="kicker">Implementation Plan · 17_fleetfuel</div>
<h1>FleetFuel</h1>
<p class="sub">Ingest the RustFS <code>fuel</code> bucket → shared database → a new <strong>Fuel Log</strong> tab in FleetOps.</p>
<div class="pipeline">
<span>RustFS <code>fuel</code> bucket</span><span class="arrow"></span>
<span><code>17_fleetfuel</code> ingestion</span><span class="arrow"></span>
<span><code>fuel</code> schema · tracksolid_db</span><span class="arrow"></span>
<span><code>dashboard_api</code></span><span class="arrow"></span>
<span>FleetOps “Fuel Log” tab</span>
</div>
</header>
<h2><span class="num">·</span>Context</h2>
<p>FleetOps (<code>15_fleetops</code>) is the fleet <strong>operations</strong> analytics SPA. It already has a
trip-<em>derived</em> fuel panel (<code>GET /analytics/fuel</code>), but that data is effectively empty — estimated
fuel needs <code>devices.fuel_100km</code> (NULL fleet-wide) and actual <code>oils</code> is sparse.</p>
<p>A real fuel-spend feed now lands in the RustFS <code>fuel</code> bucket: WhatsApp fuel-update messages,
extracted by an n8n CDC job from <code>logistics_department.fuel_records</code><strong>1,922 rows</strong> (FebJun 2026)
of <em>actual</em> fills (litres, KES amount, odometer, fuel type, driver, department), keyed by number plate.</p>
<p><strong>Goal (full vertical):</strong> a new <code>17_fleetfuel</code> module pulls the bucket into the shared
<code>tracksolid_db</code> under its own <code>fuel</code> schema, exposes it via <code>dashboard_api</code>, and adds a
<strong>new “Fuel Log” tab</strong> to FleetOps. The existing trip-derived panel stays as-is — the two coexist.
This mirrors the proven <code>16_fleettickets</code> module pattern exactly.</p>
<div class="note warn"><strong>Credential note:</strong> the <code>RUSTFS_*</code> keys were pasted in chat. They go
<strong>only</strong> in a gitignored <code>.env</code> (never committed), and the shared secret should be
<strong>rotated</strong> after this work, since plaintext-in-chat counts as exposed.</div>
<h2><span class="num">·</span>The data (confirmed by reading the bucket)</h2>
<p>Bucket layout (<code>s3.rahamafresh.com</code>, path-style, region <code>us-east-1</code>):</p>
<ul>
<li><code>fuel_records/latest.json</code> — full snapshot, envelope <code>{ metadata, records[] }</code>, ~1.7 MB / 1922 rows.</li>
<li><code>fuel_records/changes/&lt;ISO-ts&gt;.json</code> — hourly CDC deltas (same envelope, includes soft-deletes).</li>
<li><code>.csv</code> siblings exist, but we ingest the <strong>JSON</strong> (richer, typed).</li>
</ul>
<p>Record shape (stable PK <code>id</code>):</p>
<pre><code>id, record_datetime, department, driver, car, liters, amount, fuel_type, odometer,
sender_name, sender_phone, raw_message, source, source_instance, source_message_id,
source_event_timestamp, message_fingerprint, deleted_at, deleted_by, delete_reason,
delete_source, created_at</code></pre>
<p>The data is <strong>messy</strong> (WhatsApp-sourced) → normalization is essential:</p>
<ul>
<li><code>car</code>: <code>KCA 542Q</code> vs <code>KCA542Q</code>, plus junk (<code>ANY VEH</code>). 162 distinct.</li>
<li><code>fuel_type</code>: <code>PETROL/DIESEL</code> + typos (<code>DISIEL</code>, <code>DISEL</code>, <code>PETRO</code>, <code>/PETROL</code>, <code>VPOWER</code>, null).</li>
<li><code>department</code>: ~30 case/spelling variants of ~12 real departments (<code>OSP/osp/Osp</code>, <code>ROLL-OUT/ROLLOUT</code>).</li>
<li><code>deleted_at</code> set on 34 rows (soft-deleted — must be excluded from reporting).</li>
</ul>
<h2><span class="num">·</span>Pattern mirrored: <code>16_fleettickets</code></h2>
<p>Self-contained Python module → reads a RustFS bucket → upserts raw-jsonb rows into a namespaced schema in the
shared <code>tracksolid_db</code> → idempotent migrations with a <code>schema_migrations</code> ledger → a
<code>reporting.*</code> view consumed by <code>dashboard_api</code> → surfaced as a FleetOps tab. Reuse:
<code>shared.py</code> (DB ctx-mgr + <code>clean</code>), <code>run_migrations.py</code> (ledger runner), the
dry-run/<code>--apply</code> CLI convention, and the <code>.env</code>/<code>pyproject</code>/<code>README</code> layout.</p>
<h2><span class="num">A</span>Ingestion repo — <code>17_fleetfuel</code> <span class="tag new">new</span></h2>
<p>Created in <code>/Users/kianiadee/Downloads/projects/17_fleetfuel</code>, files mirroring fleettickets:</p>
<ul>
<li><strong><code>pyproject.toml</code></strong><code>psycopg2-binary</code>, <code>boto3</code> (the <code>aws</code> CLI isnt available, and the CDC <code>changes/</code> listing needs <code>list_objects_v2</code> pagination, so boto3 beats CLI-subprocess). ruff dev dep.</li>
<li><strong><code>shared.py</code></strong> — copy verbatim (<code>get_conn</code>, <code>get_logger</code>, <code>clean</code>); rename logger ns to <code>fleetfuel</code>.</li>
<li><strong><code>run_migrations.py</code></strong> — copy; swap ledger to <code>fuel.schema_migrations</code>.</li>
<li><strong><code>.env.example</code></strong><code>DATABASE_URL</code>, <code>RUSTFS_ENDPOINT/ACCESS_KEY/SECRET_KEY/REGION</code>, <code>FUEL_BUCKET=fuel</code>.</li>
<li><strong><code>.gitignore</code></strong><code>.env</code>, <code>__pycache__</code>, <code>.venv</code>.</li>
<li><strong><code>README.md</code></strong> — what it owns vs. not (DB schema = ours; read-API = dashboard_api; frontend = fleetops).</li>
<li><strong><code>migrations/01_fuel_schema.sql</code></strong> — see Part B.</li>
<li><strong><code>import_fuel.py</code></strong> — the loader (below).</li>
<li><strong><code>s3util.py</code></strong> (optional) — thin boto3 client factory (<code>endpoint_url</code> + path-style addressing).</li>
</ul>
<h3><code>import_fuel.py</code></h3>
<ul>
<li>boto3 S3 client from <code>RUSTFS_*</code> env.</li>
<li><strong>Default <code>--snapshot</code></strong>: GET <code>fuel_records/latest.json</code>, upsert all <code>records</code> on <code>id</code>. At 1922 rows / hourly cadence this full reconcile is trivial and <strong>self-healing</strong> (picks up edits + soft-deletes) → simplest correct design.</li>
<li><strong><code>--changes</code></strong> (optional, lower-latency): list <code>fuel_records/changes/</code>, process files newer than a watermark in <code>fuel.ingest_state</code>.</li>
<li><strong><code>--file &lt;path&gt;</code></strong>: local JSON for dev/testing.</li>
<li><strong><code>--apply</code></strong> writes; default is a dry-run logging parsed/valid/skipped counts.</li>
<li>Upsert via <code>execute_values</code>: <code>INSERT … ON CONFLICT (id) DO UPDATE SET raw=EXCLUDED.raw, updated_at=now()</code>. Derived/normalized columns populated by a DB trigger reading <code>raw</code>. Scrub JSON <code>NaN</code> → null first.</li>
</ul>
<h2><span class="num">B</span><code>migrations/01_fuel_schema.sql</code> — the <code>fuel</code> schema <span class="tag new">new</span></h2>
<p>Idempotent, lives in shared <code>tracksolid_db</code>:</p>
<ul>
<li><code>CREATE SCHEMA IF NOT EXISTS fuel;</code> + <code>reporting;</code></li>
<li><strong>Normalizer functions</strong> (IMMUTABLE, single source of truth):
<ul>
<li><code>fuel.norm_plate(text)</code> → upper, strip non-alphanumeric (<code>KCA 542Q</code><code>KCA542Q</code>); null out junk.</li>
<li><code>fuel.canon_fuel_type(text)</code> → map typos to <code>PETROL / DIESEL / VPOWER / OTHER / NULL</code>.</li>
<li><code>fuel.canon_department(text)</code> → upper + collapse-ws + variant map to canonical set.</li>
</ul>
</li>
<li><strong><code>fuel.records</code></strong> — raw-first + derived columns populated by a <code>BEFORE INSERT/UPDATE</code> trigger from <code>raw</code>: <code>id, raw, record_datetime, plate, car_raw, liters, amount, fuel_type, department, driver, odometer, deleted_at, message_fingerprint, ingested_at, updated_at</code>. Indexes on <code>plate</code>, <code>record_datetime</code>, <code>department</code>, partial <code>WHERE deleted_at IS NULL</code>.</li>
<li><strong><code>fuel.ingest_state</code></strong> — watermark for <code>--changes</code> mode.</li>
<li><strong><code>reporting.v_fuel_fills</code></strong> — read view: <code>fuel.records</code> (<code>deleted_at IS NULL</code>) LEFT JOIN <code>tracksolid.devices d ON fuel.norm_plate(d.vehicle_number) = r.plate</code>, exposing <code>fuel_date, plate, vehicle_number, cost_centre, assigned_city, imei, department, driver, liters, amount, fuel_type, odometer</code>. Same filter contract as <code>reporting.v_daily_summary</code>.</li>
<li><strong><code>reporting.v_fuel_efficiency</code></strong> (optional, high-value) — per-<code>plate</code> window over <code>record_datetime</code>: <code>km = odometer lag(odometer)</code>, <code>km_per_litre = km / liters</code>, with defensive bounds.</li>
<li><strong>Grants</strong><code>USAGE</code> + <code>SELECT</code> on the views to <code>dashboard_ro</code> (mirror tracksolid migration 18).</li>
</ul>
<h2><span class="num">C</span><code>dashboard_api</code> read endpoints <span class="tag edit">edit</span></h2>
<p>In <code>dashboard_api_rev.py</code>, add endpoints reusing <code>_analytics_window</code> + <code>_dim_filters</code> + <code>RealDictCursor</code>:</p>
<ul>
<li><strong><code>GET /analytics/fuel-fills</code></strong><code>period/start/end</code> + dims + optional <code>department</code>, <code>fuel_type</code>. Returns: <code>totals</code> (litres, spend_kes, fills, avg_price_per_litre, vehicles_fuelled), <code>rows</code> (per-vehicle), <code>by_department</code>, <code>trend</code> (daily litres + spend), <code>data_status</code> (unmatched-plate count).</li>
<li><strong><code>GET /analytics/fuel-fills/recent</code></strong> — recent N fills for the detail table.</li>
<li><strong>Extend <code>GET /analytics/filters</code></strong> to also return <code>departments</code> and <code>fuel_types</code>.</li>
</ul>
<p>No business logic in the API — it only selects from the <code>reporting.*</code> views.</p>
<h2><span class="num">D</span>FleetOps SPA — new “Fuel Log” tab <span class="tag edit">edit</span></h2>
<p>In <code>15_fleetops/src/index.html</code> (single-file SPA, inline JS + Chart.js). Leave the existing fuel panel untouched; add a “Fuel Log” tab:</p>
<ul>
<li>KPI strip: total litres, total KES spend, fills, avg KES/litre, vehicles fuelled.</li>
<li>Trend chart (spend + litres) — reuse the utilisation panels Chart.js setup.</li>
<li>Per-vehicle table (litres, spend, fills, last odometer, km/l) + by-department breakdown.</li>
<li>Recent-fills detail table from <code>/analytics/fuel-fills/recent</code>.</li>
<li>Wire to the shared filter state + new department / fuel-type dropdowns; calls go through the existing <code>API_BASE</code> mechanism.</li>
</ul>
<h2><span class="num">E</span>Git &amp; deploy</h2>
<ul>
<li><code>git init</code> in <code>17_fleetfuel</code>, add <code>repo.rahamafresh.com/kianiadee/fleetfuel.git</code> as origin, push (repo is currently empty).</li>
<li>Deploy like fleettickets: a Coolify container/cron in the stack runs <code>run_migrations.py</code> then <code>import_fuel.py --snapshot --apply</code> hourly (matching the CDC cadence).</li>
<li><code>dashboard_api</code> + <code>fleetops</code> ride their existing Coolify pipelines (feature → <code>staging</code><code>main</code>).</li>
</ul>
<h2><span class="num">·</span>Critical files</h2>
<table>
<thead><tr><th>File</th><th>Action</th></tr></thead>
<tbody>
<tr><td><code>17_fleetfuel/import_fuel.py</code>, <code>shared.py</code>, <code>run_migrations.py</code>, <code>pyproject.toml</code>, <code>.env.example</code>, <code>README.md</code></td><td><span class="tag new">new</span> mirror <code>16_fleettickets/*</code></td></tr>
<tr><td><code>17_fleetfuel/migrations/01_fuel_schema.sql</code></td><td><span class="tag new">new</span> <code>fuel</code> schema + normalizers + <code>reporting.v_fuel_fills</code></td></tr>
<tr><td><code>tracksolid_timescale_grafana_prod/dashboard_api_rev.py</code></td><td><span class="tag edit">edit</span> add <code>/analytics/fuel-fills[/recent]</code>, extend <code>/analytics/filters</code></td></tr>
<tr><td><code>15_fleetops/src/index.html</code></td><td><span class="tag edit">edit</span> add “Fuel Log” tab</td></tr>
</tbody>
</table>
<p class="pill">Reuse: <code>16_fleettickets/shared.py</code>, <code>run_migrations.py</code>, the <code>_scrub_nan</code>/<code>upsert</code> shape; <code>dashboard_api_rev.py:444</code> <code>_dim_filters</code>, <code>_analytics_window</code>; the stdlib SigV4 reader already proven this session (fallback if boto3 is undesirable).</p>
<h2><span class="num"></span>Verification (end-to-end)</h2>
<ol>
<li><strong>Bucket read</strong> — already proven this session (listed 14 objects, parsed <code>latest.json</code> = 1922 rows).</li>
<li><strong>Ingestion dry-run</strong><code>python import_fuel.py --snapshot</code> (no <code>--apply</code>): logs parsed/valid/skipped, no DB writes.</li>
<li><strong>Migrate + apply</strong><code>python run_migrations.py</code> then <code>import_fuel.py --snapshot --apply</code>. Spot-check: <code>SELECT count(*), count(*) FILTER (WHERE deleted_at IS NULL) FROM fuel.records;</code> (≈1922 / ≈1888) and plate-match rate in <code>reporting.v_fuel_fills</code>.</li>
<li><strong>API</strong><code>curl "$API/analytics/fuel-fills?period=90d"</code> → totals/rows/by_department/trend non-empty; <code>/analytics/filters</code> includes <code>departments</code>.</li>
<li><strong>Frontend</strong> — build &amp; run fleetops locally, open the Fuel Log tab, confirm KPIs/chart/tables render and filters drive the queries.</li>
<li><strong><code>/verify</code></strong> the fleetops change once wired.</li>
</ol>
<footer>FleetFuel implementation plan · generated 2026-06-11 · sibling of FleetOps / FleetTickets</footer>
</div>
</body>
</html>

173
import_fuel.py Normal file
View file

@ -0,0 +1,173 @@
"""
import_fuel.py Rahama Fresh · fleet fuel-record ingestion (raw-first)
Loads WhatsApp fuel-update records into the `fuel` schema the source of the
FleetOps "Fuel Log" tab. The feed is produced by an n8n CDC job that exports the
client's `logistics_department.fuel_records` table to the rustfs `fuel` bucket.
RAW-FIRST: each row stores `id` (the source PK) + `raw` (the full record as jsonb).
A DB trigger (see migrations/01_fuel_schema.sql) derives the normalized columns
(plate, liters, amount, fuel_type, department, odometer, deleted_at, ) from `raw`,
so a change to the source schema needs no loader change.
Bucket layout (envelope `{ "metadata": {...}, "records": [...] }`):
fuel_records/latest.json full snapshot (~1.9k rows)
fuel_records/changes/<ISO-ts>.json hourly CDC deltas (incl. soft-deletes)
Modes (need DATABASE_URL + RUSTFS_* env; see .env.example):
python import_fuel.py --snapshot --apply # default: full reconcile (self-healing)
python import_fuel.py --changes --apply # incremental, since the stored watermark
python import_fuel.py --file latest.json --apply # local file (dev/testing)
Dry-run (no --apply) parses + logs counts without writing.
Pre-requisite: migration applied (run_migrations.py) fuel.records + fuel.ingest_state
+ reporting.v_fuel_fills.
"""
from __future__ import annotations
import argparse
import json
import math
import psycopg2.extras
from s3util import bucket, get_s3
from shared import get_conn, get_logger
log = get_logger("import_fuel")
SNAPSHOT_KEY = "fuel_records/latest.json"
CHANGES_PREFIX = "fuel_records/changes/"
_STATE_KEY = "changes" # row key in fuel.ingest_state for the changes watermark
# ── data loading ──────────────────────────────────────────────────────────────
def _records(payload: dict | list) -> list[dict]:
"""Pull the records array out of the `{metadata, records}` envelope (or a bare list)."""
if isinstance(payload, dict):
recs = payload.get("records", [])
else:
recs = payload
return recs if isinstance(recs, list) else []
def _load_local(path: str) -> list[dict]:
with open(path, encoding="utf-8") as f:
return _records(json.load(f)) # json.loads accepts NaN by default
def _load_s3_json(s3, key: str) -> list[dict]:
log.info("fetching s3://%s/%s", bucket(), key)
body = s3.get_object(Bucket=bucket(), Key=key)["Body"].read()
return _records(json.loads(body.decode("utf-8")))
def _list_change_keys(s3, after: str | None) -> list[str]:
"""Change-file keys (lexically > `after`), sorted. ISO-timestamp names sort chronologically."""
keys: list[str] = []
paginator = s3.get_paginator("list_objects_v2")
for page in paginator.paginate(Bucket=bucket(), Prefix=CHANGES_PREFIX):
for obj in page.get("Contents", []):
k = obj["Key"]
if k.endswith(".json") and (after is None or k > after):
keys.append(k)
return sorted(keys)
# ── upsert (raw-first) ────────────────────────────────────────────────────────
def _scrub_nan(row: dict) -> dict:
# Postgres jsonb rejects the JSON `NaN` token — scrub to null.
return {k: (None if isinstance(v, float) and math.isnan(v) else v) for k, v in row.items()}
def upsert(rows: list[dict], apply: bool) -> int:
payload = [
(rid, psycopg2.extras.Json(_scrub_nan(r)))
for r in rows
if (rid := r.get("id")) is not None
]
log.info("fuel.records: %d valid rows (skipped %d without id)",
len(payload), len(rows) - len(payload))
if not apply:
log.info("DRY-RUN — nothing written. Use --apply.")
return len(payload)
if not payload:
return 0
with get_conn() as conn:
with conn.cursor() as cur:
psycopg2.extras.execute_values(
cur,
"INSERT INTO fuel.records (id, raw) VALUES %s "
"ON CONFLICT (id) DO UPDATE SET raw = EXCLUDED.raw, updated_at = now()",
payload, page_size=500,
)
log.info("upserted %d rows into fuel.records", len(payload))
return len(payload)
# ── watermark (fuel.ingest_state) ─────────────────────────────────────────────
def _get_watermark() -> str | None:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute("SELECT last_key FROM fuel.ingest_state WHERE key = %s", (_STATE_KEY,))
row = cur.fetchone()
return row[0] if row else None
def _set_watermark(last_key: str) -> None:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"INSERT INTO fuel.ingest_state (key, last_key) VALUES (%s, %s) "
"ON CONFLICT (key) DO UPDATE SET last_key = EXCLUDED.last_key, updated_at = now()",
(_STATE_KEY, last_key),
)
# ── modes ─────────────────────────────────────────────────────────────────────
def ingest_snapshot(apply: bool) -> None:
s3 = get_s3()
upsert(_load_s3_json(s3, SNAPSHOT_KEY), apply)
def ingest_changes(apply: bool) -> None:
s3 = get_s3()
after = _get_watermark() if apply else None
keys = _list_change_keys(s3, after)
log.info("%d change file(s) to process (watermark=%s)", len(keys), after)
total = 0
for key in keys:
total += upsert(_load_s3_json(s3, key), apply)
if apply:
_set_watermark(key)
log.info("changes: processed %d file(s), upserted %d rows", len(keys), total)
def ingest_file(path: str, apply: bool) -> None:
upsert(_load_local(path), apply)
# ── entrypoint ────────────────────────────────────────────────────────────────
def main() -> None:
ap = argparse.ArgumentParser(description="Ingest fuel records (raw-first) from the rustfs bucket")
ap.add_argument("--apply", action="store_true", help="Write to DB (default: dry-run)")
mode = ap.add_mutually_exclusive_group()
mode.add_argument("--snapshot", action="store_true",
help="Full reconcile from fuel_records/latest.json (default)")
mode.add_argument("--changes", action="store_true",
help="Incremental: change files newer than the stored watermark")
mode.add_argument("--file", default=None, help="Local JSON file (snapshot or changes envelope)")
args = ap.parse_args()
if args.file:
ingest_file(args.file, args.apply)
elif args.changes:
ingest_changes(args.apply)
else: # default
ingest_snapshot(args.apply)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,198 @@
-- 01_fuel_schema.sql — fleetfuel · WhatsApp fuel-record store (raw-jsonb-first)
-- ─────────────────────────────────────────────────────────────────────────────
-- The `fuel` schema: one raw-jsonb row per fuel record (the source PK `id` + the
-- full record as `raw`), plus DB-derived NORMALIZED columns the trigger fills from
-- `raw`. The feed is WhatsApp fuel-update messages (n8n CDC of the client's
-- `logistics_department.fuel_records`), which is messy — plate spacing, fuel-type
-- typos, ~30 department spellings — so the normalizers below are the single source
-- of truth, kept in the DB so reads never re-implement them.
--
-- fuel.records id + raw + derived (plate, liters, amount, fuel_type,
-- department, odometer, deleted_at, …)
-- fuel.ingest_state CDC watermark for import_fuel.py --changes
-- reporting.v_fuel_fills read view (joins devices by normalized plate)
-- reporting.v_fuel_efficiency per-vehicle km/litre from odometer deltas
--
-- Lives in the shared `tracksolid_db` so the existing dashboard_api and the
-- FleetOps SPA keep working. Idempotent: safe on a fresh DB and re-appliable live.
-- ─────────────────────────────────────────────────────────────────────────────
CREATE SCHEMA IF NOT EXISTS fuel;
CREATE SCHEMA IF NOT EXISTS reporting; -- shared read layer (the views live here for dashboard_api)
-- ── safe casts (bad WhatsApp values -> NULL, never error) ────────────────────
CREATE OR REPLACE FUNCTION fuel.to_num(p text)
RETURNS numeric LANGUAGE plpgsql IMMUTABLE AS $fn$
BEGIN
RETURN NULLIF(btrim(p), '')::numeric;
EXCEPTION WHEN others THEN
RETURN NULL;
END $fn$;
CREATE OR REPLACE FUNCTION fuel.to_ts(p text)
RETURNS timestamptz LANGUAGE plpgsql IMMUTABLE AS $fn$
BEGIN
RETURN NULLIF(btrim(p), '')::timestamptz;
EXCEPTION WHEN others THEN
RETURN NULL;
END $fn$;
-- ── normalizers (the single source of truth for the messy feed) ──────────────
-- Plate: upper, strip every non-alphanumeric ('KCA 542Q' -> 'KCA542Q'); drop junk
-- placeholders ('ANY VEH', 'NA', …) so they don't pollute the join.
CREATE OR REPLACE FUNCTION fuel.norm_plate(p text)
RETURNS text LANGUAGE sql IMMUTABLE PARALLEL SAFE AS $fn$
SELECT CASE
WHEN upper(regexp_replace(coalesce(p, ''), '[^A-Za-z0-9]', '', 'g'))
IN ('', 'ANYVEH', 'NA', 'NONE', 'NIL', 'NULL') THEN NULL
ELSE NULLIF(upper(regexp_replace(p, '[^A-Za-z0-9]', '', 'g')), '')
END
$fn$;
-- Fuel type: collapse the typo zoo into PETROL / DIESEL / VPOWER / OTHER / NULL.
CREATE OR REPLACE FUNCTION fuel.canon_fuel_type(p text)
RETURNS text LANGUAGE sql IMMUTABLE PARALLEL SAFE AS $fn$
WITH c AS (SELECT NULLIF(upper(regexp_replace(coalesce(p, ''), '[^A-Za-z]', '', 'g')), '') AS v)
SELECT CASE
WHEN v IS NULL THEN NULL
WHEN v LIKE '%VPOWER%' THEN 'VPOWER'
WHEN v LIKE '%PET%' THEN 'PETROL' -- PETROL, PETRO, PETROLI, /PETROL
WHEN v LIKE 'DI%' THEN 'DIESEL' -- DIESEL, DISEL, DISIEL, DISEIL, DIRSEL
ELSE 'OTHER'
END FROM c
$fn$;
-- Department: upper, punctuation -> space, collapse, then map known variants.
CREATE OR REPLACE FUNCTION fuel.canon_department(p text)
RETURNS text LANGUAGE sql IMMUTABLE PARALLEL SAFE AS $fn$
WITH c AS (SELECT NULLIF(btrim(regexp_replace(upper(coalesce(p, '')), '[^A-Z0-9]+', ' ', 'g')), '') AS v)
SELECT CASE
WHEN v IS NULL THEN NULL
WHEN v LIKE '%OSP%' AND v LIKE '%PATROL%' THEN 'OSP PATROL'
WHEN v LIKE 'ROLL%OUT' OR v IN ('ROLLOUT','ROLOUT') THEN 'ROLLOUT'
WHEN v LIKE 'ISP%' THEN 'ISP'
WHEN v LIKE 'PLANNING%' THEN 'PLANNING'
WHEN v LIKE 'OSP%' THEN 'OSP'
WHEN v LIKE 'FDS%' THEN 'FDS'
WHEN v LIKE 'DELIVER%' THEN 'DELIVERIES'
WHEN v LIKE 'HUAWEI%' THEN 'HUAWEI'
WHEN v LIKE 'AIRTEL%' THEN 'AIRTEL'
WHEN v LIKE 'REGION%' THEN 'REGIONAL'
WHEN v LIKE 'FTTH%' THEN 'FTTH'
WHEN v LIKE 'QEHS%' THEN 'QEHS'
WHEN v LIKE 'GENERAL%' THEN 'GENERAL'
WHEN v LIKE 'LOGISTIC%' THEN 'LOGISTICS'
ELSE v
END FROM c
$fn$;
-- ── records: raw-jsonb-first + trigger-derived normalized columns ────────────
CREATE TABLE IF NOT EXISTS fuel.records (
id bigint PRIMARY KEY,
raw jsonb NOT NULL,
record_datetime timestamptz,
car_raw text,
plate text,
liters numeric,
amount numeric,
fuel_type text,
fuel_type_raw text,
department text,
driver text,
odometer numeric,
deleted_at timestamptz,
message_fingerprint text,
ingested_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now()
);
-- Derive every normalized column from `raw` on write — the loader only supplies (id, raw).
CREATE OR REPLACE FUNCTION fuel.tg_records_derive()
RETURNS trigger LANGUAGE plpgsql AS $fn$
BEGIN
NEW.record_datetime := fuel.to_ts(NEW.raw->>'record_datetime');
NEW.car_raw := NEW.raw->>'car';
NEW.plate := fuel.norm_plate(NEW.raw->>'car');
NEW.liters := fuel.to_num(NEW.raw->>'liters');
NEW.amount := fuel.to_num(NEW.raw->>'amount');
NEW.fuel_type := fuel.canon_fuel_type(NEW.raw->>'fuel_type');
NEW.fuel_type_raw := NEW.raw->>'fuel_type';
NEW.department := fuel.canon_department(NEW.raw->>'department');
NEW.driver := NULLIF(btrim(NEW.raw->>'driver'), '');
NEW.odometer := fuel.to_num(NEW.raw->>'odometer');
NEW.deleted_at := fuel.to_ts(NEW.raw->>'deleted_at');
NEW.message_fingerprint := NEW.raw->>'message_fingerprint';
NEW.updated_at := now();
RETURN NEW;
END $fn$;
DROP TRIGGER IF EXISTS trg_records_derive ON fuel.records;
CREATE TRIGGER trg_records_derive BEFORE INSERT OR UPDATE ON fuel.records
FOR EACH ROW EXECUTE FUNCTION fuel.tg_records_derive();
CREATE INDEX IF NOT EXISTS ix_fuel_records_plate ON fuel.records (plate);
CREATE INDEX IF NOT EXISTS ix_fuel_records_datetime ON fuel.records (record_datetime);
CREATE INDEX IF NOT EXISTS ix_fuel_records_dept ON fuel.records (department);
CREATE INDEX IF NOT EXISTS ix_fuel_records_live ON fuel.records (record_datetime) WHERE deleted_at IS NULL;
-- ── CDC watermark for import_fuel.py --changes ───────────────────────────────
CREATE TABLE IF NOT EXISTS fuel.ingest_state (
key text PRIMARY KEY,
last_key text,
updated_at timestamptz NOT NULL DEFAULT now()
);
-- ── read view: live fills joined to the fleet by normalized plate ────────────
-- Joins devices so the Fuel Log tab reuses the same dims as the rest of FleetOps
-- (cost_centre / assigned_city / assigned_driver / vehicle_number — the columns
-- dashboard_api's _dim_filters expects). Keeps the fuel-native `department` and
-- `driver` (from the WhatsApp message) for fuel-specific filtering. Soft-deleted
-- rows are excluded here. Encapsulating the devices join means the read-only
-- staging role only needs SELECT on this view, not on tracksolid.devices.
CREATE OR REPLACE VIEW reporting.v_fuel_fills AS
SELECT r.id,
r.record_datetime,
r.record_datetime::date AS fuel_date,
r.plate,
d.vehicle_number,
d.cost_centre,
d.assigned_city,
d.driver_name AS assigned_driver,
d.imei,
r.department,
r.driver,
r.liters,
r.amount,
r.fuel_type,
r.odometer
FROM fuel.records r
LEFT JOIN tracksolid.devices d ON fuel.norm_plate(d.vehicle_number) = r.plate
WHERE r.deleted_at IS NULL;
-- ── per-vehicle fuel efficiency: km/litre from consecutive odometer readings ──
-- Defensive: only positive, plausible (<5000 km) odometer deltas yield km/litre.
CREATE OR REPLACE VIEW reporting.v_fuel_efficiency AS
WITH seq AS (
SELECT id, plate, vehicle_number, cost_centre, assigned_city,
record_datetime, fuel_date, odometer, liters, amount, fuel_type,
odometer - lag(odometer) OVER (PARTITION BY plate ORDER BY record_datetime, id) AS km_since_last
FROM reporting.v_fuel_fills
WHERE plate IS NOT NULL AND odometer IS NOT NULL AND odometer > 0
)
SELECT seq.*,
CASE WHEN liters > 0 AND km_since_last > 0 AND km_since_last < 5000
THEN round(km_since_last / liters, 2)
END AS km_per_litre
FROM seq;
-- ── read-only grant (staging dashboard_api connects as grafana_ro) ───────────
-- Mirrors migrations/18_grant_reporting_ro.sql in the tracksolid repo. Guarded +
-- idempotent. Only the reporting views are exposed (not fuel.records) — the views
-- run with the owner's rights, so grafana_ro needs nothing on the base tables.
DO $grants$
BEGIN
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'grafana_ro') THEN
GRANT USAGE ON SCHEMA reporting TO grafana_ro;
GRANT SELECT ON reporting.v_fuel_fills, reporting.v_fuel_efficiency TO grafana_ro;
END IF;
END $grants$;

22
pyproject.toml Normal file
View file

@ -0,0 +1,22 @@
[project]
name = "fleetfuel"
version = "0.1.0"
description = "WhatsApp fuel-record ingestion (rustfs `fuel` bucket) + read-schema for the FleetOps Fuel Log tab"
requires-python = ">=3.12"
dependencies = [
"psycopg2-binary>=2.9.9", # DB driver
"boto3>=1.34", # S3 client for the rustfs `fuel` bucket
]
[project.optional-dependencies]
dev = [
"ruff>=0.4",
]
[tool.uv]
managed = true
[tool.ruff]
target-version = "py312"
line-length = 100
lint.select = ["E", "W", "F", "B", "UP"]

56
run_migrations.py Normal file
View file

@ -0,0 +1,56 @@
"""
run_migrations.py fleetfuel · apply SQL migrations in order.
Applies migrations/*.sql (lexical order) against DATABASE_URL, tracking applied
files in fuel.schema_migrations. Migrations are idempotent, so re-running is
safe. Run: `python run_migrations.py`.
"""
from __future__ import annotations
import glob
import os
import psycopg2
MIG_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "migrations")
def main() -> None:
dsn = os.environ.get("DATABASE_URL")
if not dsn:
raise SystemExit("DATABASE_URL is not set")
conn = psycopg2.connect(dsn)
conn.autocommit = False
try:
with conn.cursor() as cur:
cur.execute("CREATE SCHEMA IF NOT EXISTS fuel")
cur.execute(
"CREATE TABLE IF NOT EXISTS fuel.schema_migrations "
"(filename text PRIMARY KEY, applied_at timestamptz NOT NULL DEFAULT now())"
)
conn.commit()
cur.execute("SELECT filename FROM fuel.schema_migrations")
applied = {r[0] for r in cur.fetchall()}
for path in sorted(glob.glob(os.path.join(MIG_DIR, "*.sql"))):
fn = os.path.basename(path)
if fn in applied:
print(f" skip {fn}")
continue
print(f" apply {fn}")
with open(path, encoding="utf-8") as f:
cur.execute(f.read())
cur.execute(
"INSERT INTO fuel.schema_migrations (filename) VALUES (%s) "
"ON CONFLICT DO NOTHING",
(fn,),
)
conn.commit()
print("migrations up to date.")
finally:
conn.close()
if __name__ == "__main__":
main()

31
s3util.py Normal file
View file

@ -0,0 +1,31 @@
"""
s3util.py fleetfuel · rustfs (S3-compatible) client factory.
rustfs needs path-style addressing and a custom endpoint. Credentials and
endpoint come from the RUSTFS_* env vars (see .env.example). Kept in one place
so import_fuel.py just asks for a ready client.
"""
from __future__ import annotations
import os
import boto3
from botocore.config import Config
def get_s3():
"""Return a boto3 S3 client configured for the rustfs endpoint (path-style)."""
endpoint = os.environ["RUSTFS_ENDPOINT"]
return boto3.client(
"s3",
endpoint_url=endpoint,
aws_access_key_id=os.environ["RUSTFS_ACCESS_KEY"],
aws_secret_access_key=os.environ["RUSTFS_SECRET_KEY"],
region_name=os.getenv("RUSTFS_REGION", "us-east-1"),
config=Config(s3={"addressing_style": "path"}, signature_version="s3v4"),
)
def bucket() -> str:
return os.getenv("FUEL_BUCKET", "fuel")

53
shared.py Normal file
View file

@ -0,0 +1,53 @@
"""
shared.py fleetfuel · minimal DB + helper utilities.
Self-contained (mirrors 16_fleettickets/shared.py) so fleetfuel stands alone.
The DB connection comes from DATABASE_URL (points at the shared `tracksolid_db`,
where the `fuel` schema lives alongside `tracksolid` / `reporting` / `tickets`).
"""
from __future__ import annotations
import logging
import os
from contextlib import contextmanager
from typing import Any, Optional
import psycopg2
def get_logger(name: str) -> logging.Logger:
logger = logging.getLogger(f"fleetfuel.{name}")
if not logger.handlers:
h = logging.StreamHandler()
h.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(name)s%(message)s",
datefmt="%Y-%m-%d %H:%M:%S"))
logger.addHandler(h)
logger.setLevel(os.getenv("LOG_LEVEL", "INFO"))
return logger
@contextmanager
def get_conn():
"""DB connection context manager. Auto-commits on success, rolls back on error."""
dsn = os.environ.get("DATABASE_URL")
if not dsn:
raise RuntimeError("DATABASE_URL is not set")
conn = psycopg2.connect(dsn)
try:
conn.autocommit = False
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
def clean(v: Any) -> Optional[str]:
"""Trimmed string, or None if empty/None."""
if v is None:
return None
s = str(v).strip()
return s if s != "" else None