Self-contained ingestion module (mirrors fleettickets) for the WhatsApp fuel-record feed in the rustfs `fuel` bucket: - import_fuel.py — snapshot/changes/file modes, raw-jsonb upsert on id - migrations/01_fuel_schema.sql — fuel schema, plate/fuel-type/department normalizers, trigger-derived columns, reporting.v_fuel_fills + v_fuel_efficiency, grafana_ro grants - s3util.py / shared.py / run_migrations.py — rustfs client + DB helpers - docs/plan.html — implementation plan Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
173 lines
7.5 KiB
Python
173 lines
7.5 KiB
Python
"""
|
|
import_fuel.py — Rahama Fresh · fleet fuel-record ingestion (raw-first)
|
|
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
|
Loads WhatsApp fuel-update records into the `fuel` schema — the source of the
|
|
FleetOps "Fuel Log" tab. The feed is produced by an n8n CDC job that exports the
|
|
client's `logistics_department.fuel_records` table to the rustfs `fuel` bucket.
|
|
|
|
RAW-FIRST: each row stores `id` (the source PK) + `raw` (the full record as jsonb).
|
|
A DB trigger (see migrations/01_fuel_schema.sql) derives the normalized columns
|
|
(plate, liters, amount, fuel_type, department, odometer, deleted_at, …) from `raw`,
|
|
so a change to the source schema needs no loader change.
|
|
|
|
Bucket layout (envelope `{ "metadata": {...}, "records": [...] }`):
|
|
fuel_records/latest.json full snapshot (~1.9k rows)
|
|
fuel_records/changes/<ISO-ts>.json hourly CDC deltas (incl. soft-deletes)
|
|
|
|
Modes (need DATABASE_URL + RUSTFS_* env; see .env.example):
|
|
python import_fuel.py --snapshot --apply # default: full reconcile (self-healing)
|
|
python import_fuel.py --changes --apply # incremental, since the stored watermark
|
|
python import_fuel.py --file latest.json --apply # local file (dev/testing)
|
|
Dry-run (no --apply) parses + logs counts without writing.
|
|
|
|
Pre-requisite: migration applied (run_migrations.py) — fuel.records + fuel.ingest_state
|
|
+ reporting.v_fuel_fills.
|
|
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import math
|
|
|
|
import psycopg2.extras
|
|
|
|
from s3util import bucket, get_s3
|
|
from shared import get_conn, get_logger
|
|
|
|
log = get_logger("import_fuel")
|
|
|
|
SNAPSHOT_KEY = "fuel_records/latest.json"
|
|
CHANGES_PREFIX = "fuel_records/changes/"
|
|
_STATE_KEY = "changes" # row key in fuel.ingest_state for the changes watermark
|
|
|
|
|
|
# ── data loading ──────────────────────────────────────────────────────────────
|
|
def _records(payload: dict | list) -> list[dict]:
|
|
"""Pull the records array out of the `{metadata, records}` envelope (or a bare list)."""
|
|
if isinstance(payload, dict):
|
|
recs = payload.get("records", [])
|
|
else:
|
|
recs = payload
|
|
return recs if isinstance(recs, list) else []
|
|
|
|
|
|
def _load_local(path: str) -> list[dict]:
|
|
with open(path, encoding="utf-8") as f:
|
|
return _records(json.load(f)) # json.loads accepts NaN by default
|
|
|
|
|
|
def _load_s3_json(s3, key: str) -> list[dict]:
|
|
log.info("fetching s3://%s/%s", bucket(), key)
|
|
body = s3.get_object(Bucket=bucket(), Key=key)["Body"].read()
|
|
return _records(json.loads(body.decode("utf-8")))
|
|
|
|
|
|
def _list_change_keys(s3, after: str | None) -> list[str]:
|
|
"""Change-file keys (lexically > `after`), sorted. ISO-timestamp names sort chronologically."""
|
|
keys: list[str] = []
|
|
paginator = s3.get_paginator("list_objects_v2")
|
|
for page in paginator.paginate(Bucket=bucket(), Prefix=CHANGES_PREFIX):
|
|
for obj in page.get("Contents", []):
|
|
k = obj["Key"]
|
|
if k.endswith(".json") and (after is None or k > after):
|
|
keys.append(k)
|
|
return sorted(keys)
|
|
|
|
|
|
# ── upsert (raw-first) ────────────────────────────────────────────────────────
|
|
def _scrub_nan(row: dict) -> dict:
|
|
# Postgres jsonb rejects the JSON `NaN` token — scrub to null.
|
|
return {k: (None if isinstance(v, float) and math.isnan(v) else v) for k, v in row.items()}
|
|
|
|
|
|
def upsert(rows: list[dict], apply: bool) -> int:
|
|
payload = [
|
|
(rid, psycopg2.extras.Json(_scrub_nan(r)))
|
|
for r in rows
|
|
if (rid := r.get("id")) is not None
|
|
]
|
|
log.info("fuel.records: %d valid rows (skipped %d without id)",
|
|
len(payload), len(rows) - len(payload))
|
|
if not apply:
|
|
log.info("DRY-RUN — nothing written. Use --apply.")
|
|
return len(payload)
|
|
if not payload:
|
|
return 0
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
psycopg2.extras.execute_values(
|
|
cur,
|
|
"INSERT INTO fuel.records (id, raw) VALUES %s "
|
|
"ON CONFLICT (id) DO UPDATE SET raw = EXCLUDED.raw, updated_at = now()",
|
|
payload, page_size=500,
|
|
)
|
|
log.info("upserted %d rows into fuel.records", len(payload))
|
|
return len(payload)
|
|
|
|
|
|
# ── watermark (fuel.ingest_state) ─────────────────────────────────────────────
|
|
def _get_watermark() -> str | None:
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute("SELECT last_key FROM fuel.ingest_state WHERE key = %s", (_STATE_KEY,))
|
|
row = cur.fetchone()
|
|
return row[0] if row else None
|
|
|
|
|
|
def _set_watermark(last_key: str) -> None:
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"INSERT INTO fuel.ingest_state (key, last_key) VALUES (%s, %s) "
|
|
"ON CONFLICT (key) DO UPDATE SET last_key = EXCLUDED.last_key, updated_at = now()",
|
|
(_STATE_KEY, last_key),
|
|
)
|
|
|
|
|
|
# ── modes ─────────────────────────────────────────────────────────────────────
|
|
def ingest_snapshot(apply: bool) -> None:
|
|
s3 = get_s3()
|
|
upsert(_load_s3_json(s3, SNAPSHOT_KEY), apply)
|
|
|
|
|
|
def ingest_changes(apply: bool) -> None:
|
|
s3 = get_s3()
|
|
after = _get_watermark() if apply else None
|
|
keys = _list_change_keys(s3, after)
|
|
log.info("%d change file(s) to process (watermark=%s)", len(keys), after)
|
|
total = 0
|
|
for key in keys:
|
|
total += upsert(_load_s3_json(s3, key), apply)
|
|
if apply:
|
|
_set_watermark(key)
|
|
log.info("changes: processed %d file(s), upserted %d rows", len(keys), total)
|
|
|
|
|
|
def ingest_file(path: str, apply: bool) -> None:
|
|
upsert(_load_local(path), apply)
|
|
|
|
|
|
# ── entrypoint ────────────────────────────────────────────────────────────────
|
|
def main() -> None:
|
|
ap = argparse.ArgumentParser(description="Ingest fuel records (raw-first) from the rustfs bucket")
|
|
ap.add_argument("--apply", action="store_true", help="Write to DB (default: dry-run)")
|
|
mode = ap.add_mutually_exclusive_group()
|
|
mode.add_argument("--snapshot", action="store_true",
|
|
help="Full reconcile from fuel_records/latest.json (default)")
|
|
mode.add_argument("--changes", action="store_true",
|
|
help="Incremental: change files newer than the stored watermark")
|
|
mode.add_argument("--file", default=None, help="Local JSON file (snapshot or changes envelope)")
|
|
args = ap.parse_args()
|
|
|
|
if args.file:
|
|
ingest_file(args.file, args.apply)
|
|
elif args.changes:
|
|
ingest_changes(args.apply)
|
|
else: # default
|
|
ingest_snapshot(args.apply)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|