""" import_fuel.py — Rahama Fresh · fleet fuel-record ingestion (raw-first) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ Loads WhatsApp fuel-update records into the `fuel` schema — the source of the FleetOps "Fuel Log" tab. The feed is produced by an n8n CDC job that exports the client's `logistics_department.fuel_records` table to the rustfs `fuel` bucket. RAW-FIRST: each row stores `id` (the source PK) + `raw` (the full record as jsonb). A DB trigger (see migrations/01_fuel_schema.sql) derives the normalized columns (plate, liters, amount, fuel_type, department, odometer, deleted_at, …) from `raw`, so a change to the source schema needs no loader change. Bucket layout (envelope `{ "metadata": {...}, "records": [...] }`): fuel_records/latest.json full snapshot (~1.9k rows) fuel_records/changes/.json hourly CDC deltas (incl. soft-deletes) Modes (need DATABASE_URL + RUSTFS_* env; see .env.example): python import_fuel.py --snapshot --apply # default: full reconcile (self-healing) python import_fuel.py --changes --apply # incremental, since the stored watermark python import_fuel.py --file latest.json --apply # local file (dev/testing) Dry-run (no --apply) parses + logs counts without writing. Pre-requisite: migration applied (run_migrations.py) — fuel.records + fuel.ingest_state + reporting.v_fuel_fills. ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ """ from __future__ import annotations import argparse import json import math import psycopg2.extras from s3util import bucket, get_s3 from shared import get_conn, get_logger log = get_logger("import_fuel") SNAPSHOT_KEY = "fuel_records/latest.json" CHANGES_PREFIX = "fuel_records/changes/" _STATE_KEY = "changes" # row key in fuel.ingest_state for the changes watermark # ── data loading ────────────────────────────────────────────────────────────── def _records(payload: dict | list) -> list[dict]: """Pull the records array out of the `{metadata, records}` envelope (or a bare list).""" if isinstance(payload, dict): recs = payload.get("records", []) else: recs = payload return recs if isinstance(recs, list) else [] def _load_local(path: str) -> list[dict]: with open(path, encoding="utf-8") as f: return _records(json.load(f)) # json.loads accepts NaN by default def _load_s3_json(s3, key: str) -> list[dict]: log.info("fetching s3://%s/%s", bucket(), key) body = s3.get_object(Bucket=bucket(), Key=key)["Body"].read() return _records(json.loads(body.decode("utf-8"))) def _list_change_keys(s3, after: str | None) -> list[str]: """Change-file keys (lexically > `after`), sorted. ISO-timestamp names sort chronologically.""" keys: list[str] = [] paginator = s3.get_paginator("list_objects_v2") for page in paginator.paginate(Bucket=bucket(), Prefix=CHANGES_PREFIX): for obj in page.get("Contents", []): k = obj["Key"] if k.endswith(".json") and (after is None or k > after): keys.append(k) return sorted(keys) # ── upsert (raw-first) ──────────────────────────────────────────────────────── def _scrub_nan(row: dict) -> dict: # Postgres jsonb rejects the JSON `NaN` token — scrub to null. return {k: (None if isinstance(v, float) and math.isnan(v) else v) for k, v in row.items()} def upsert(rows: list[dict], apply: bool) -> int: payload = [ (rid, psycopg2.extras.Json(_scrub_nan(r))) for r in rows if (rid := r.get("id")) is not None ] log.info("fuel.records: %d valid rows (skipped %d without id)", len(payload), len(rows) - len(payload)) if not apply: log.info("DRY-RUN — nothing written. Use --apply.") return len(payload) if not payload: return 0 with get_conn() as conn: with conn.cursor() as cur: psycopg2.extras.execute_values( cur, "INSERT INTO fuel.records (id, raw) VALUES %s " "ON CONFLICT (id) DO UPDATE SET raw = EXCLUDED.raw, updated_at = now()", payload, page_size=500, ) log.info("upserted %d rows into fuel.records", len(payload)) return len(payload) # ── watermark (fuel.ingest_state) ───────────────────────────────────────────── def _get_watermark() -> str | None: with get_conn() as conn: with conn.cursor() as cur: cur.execute("SELECT last_key FROM fuel.ingest_state WHERE key = %s", (_STATE_KEY,)) row = cur.fetchone() return row[0] if row else None def _set_watermark(last_key: str) -> None: with get_conn() as conn: with conn.cursor() as cur: cur.execute( "INSERT INTO fuel.ingest_state (key, last_key) VALUES (%s, %s) " "ON CONFLICT (key) DO UPDATE SET last_key = EXCLUDED.last_key, updated_at = now()", (_STATE_KEY, last_key), ) # ── modes ───────────────────────────────────────────────────────────────────── def ingest_snapshot(apply: bool) -> None: s3 = get_s3() upsert(_load_s3_json(s3, SNAPSHOT_KEY), apply) def ingest_changes(apply: bool) -> None: s3 = get_s3() after = _get_watermark() if apply else None keys = _list_change_keys(s3, after) log.info("%d change file(s) to process (watermark=%s)", len(keys), after) total = 0 for key in keys: total += upsert(_load_s3_json(s3, key), apply) if apply: _set_watermark(key) log.info("changes: processed %d file(s), upserted %d rows", len(keys), total) def ingest_file(path: str, apply: bool) -> None: upsert(_load_local(path), apply) # ── entrypoint ──────────────────────────────────────────────────────────────── def main() -> None: ap = argparse.ArgumentParser(description="Ingest fuel records (raw-first) from the rustfs bucket") ap.add_argument("--apply", action="store_true", help="Write to DB (default: dry-run)") mode = ap.add_mutually_exclusive_group() mode.add_argument("--snapshot", action="store_true", help="Full reconcile from fuel_records/latest.json (default)") mode.add_argument("--changes", action="store_true", help="Incremental: change files newer than the stored watermark") mode.add_argument("--file", default=None, help="Local JSON file (snapshot or changes envelope)") args = ap.parse_args() if args.file: ingest_file(args.file, args.apply) elif args.changes: ingest_changes(args.apply) else: # default ingest_snapshot(args.apply) if __name__ == "__main__": main()