fleetfuel/import_fuel.py

174 lines
7.5 KiB
Python
Raw Permalink Normal View History

"""
import_fuel.py Rahama Fresh · fleet fuel-record ingestion (raw-first)
Loads WhatsApp fuel-update records into the `fuel` schema the source of the
FleetOps "Fuel Log" tab. The feed is produced by an n8n CDC job that exports the
client's `logistics_department.fuel_records` table to the rustfs `fuel` bucket.
RAW-FIRST: each row stores `id` (the source PK) + `raw` (the full record as jsonb).
A DB trigger (see migrations/01_fuel_schema.sql) derives the normalized columns
(plate, liters, amount, fuel_type, department, odometer, deleted_at, ) from `raw`,
so a change to the source schema needs no loader change.
Bucket layout (envelope `{ "metadata": {...}, "records": [...] }`):
fuel_records/latest.json full snapshot (~1.9k rows)
fuel_records/changes/<ISO-ts>.json hourly CDC deltas (incl. soft-deletes)
Modes (need DATABASE_URL + RUSTFS_* env; see .env.example):
python import_fuel.py --snapshot --apply # default: full reconcile (self-healing)
python import_fuel.py --changes --apply # incremental, since the stored watermark
python import_fuel.py --file latest.json --apply # local file (dev/testing)
Dry-run (no --apply) parses + logs counts without writing.
Pre-requisite: migration applied (run_migrations.py) fuel.records + fuel.ingest_state
+ reporting.v_fuel_fills.
"""
from __future__ import annotations
import argparse
import json
import math
import psycopg2.extras
from s3util import bucket, get_s3
from shared import get_conn, get_logger
log = get_logger("import_fuel")
SNAPSHOT_KEY = "fuel_records/latest.json"
CHANGES_PREFIX = "fuel_records/changes/"
_STATE_KEY = "changes" # row key in fuel.ingest_state for the changes watermark
# ── data loading ──────────────────────────────────────────────────────────────
def _records(payload: dict | list) -> list[dict]:
"""Pull the records array out of the `{metadata, records}` envelope (or a bare list)."""
if isinstance(payload, dict):
recs = payload.get("records", [])
else:
recs = payload
return recs if isinstance(recs, list) else []
def _load_local(path: str) -> list[dict]:
with open(path, encoding="utf-8") as f:
return _records(json.load(f)) # json.loads accepts NaN by default
def _load_s3_json(s3, key: str) -> list[dict]:
log.info("fetching s3://%s/%s", bucket(), key)
body = s3.get_object(Bucket=bucket(), Key=key)["Body"].read()
return _records(json.loads(body.decode("utf-8")))
def _list_change_keys(s3, after: str | None) -> list[str]:
"""Change-file keys (lexically > `after`), sorted. ISO-timestamp names sort chronologically."""
keys: list[str] = []
paginator = s3.get_paginator("list_objects_v2")
for page in paginator.paginate(Bucket=bucket(), Prefix=CHANGES_PREFIX):
for obj in page.get("Contents", []):
k = obj["Key"]
if k.endswith(".json") and (after is None or k > after):
keys.append(k)
return sorted(keys)
# ── upsert (raw-first) ────────────────────────────────────────────────────────
def _scrub_nan(row: dict) -> dict:
# Postgres jsonb rejects the JSON `NaN` token — scrub to null.
return {k: (None if isinstance(v, float) and math.isnan(v) else v) for k, v in row.items()}
def upsert(rows: list[dict], apply: bool) -> int:
payload = [
(rid, psycopg2.extras.Json(_scrub_nan(r)))
for r in rows
if (rid := r.get("id")) is not None
]
log.info("fuel.records: %d valid rows (skipped %d without id)",
len(payload), len(rows) - len(payload))
if not apply:
log.info("DRY-RUN — nothing written. Use --apply.")
return len(payload)
if not payload:
return 0
with get_conn() as conn:
with conn.cursor() as cur:
psycopg2.extras.execute_values(
cur,
"INSERT INTO fuel.records (id, raw) VALUES %s "
"ON CONFLICT (id) DO UPDATE SET raw = EXCLUDED.raw, updated_at = now()",
payload, page_size=500,
)
log.info("upserted %d rows into fuel.records", len(payload))
return len(payload)
# ── watermark (fuel.ingest_state) ─────────────────────────────────────────────
def _get_watermark() -> str | None:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute("SELECT last_key FROM fuel.ingest_state WHERE key = %s", (_STATE_KEY,))
row = cur.fetchone()
return row[0] if row else None
def _set_watermark(last_key: str) -> None:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"INSERT INTO fuel.ingest_state (key, last_key) VALUES (%s, %s) "
"ON CONFLICT (key) DO UPDATE SET last_key = EXCLUDED.last_key, updated_at = now()",
(_STATE_KEY, last_key),
)
# ── modes ─────────────────────────────────────────────────────────────────────
def ingest_snapshot(apply: bool) -> None:
s3 = get_s3()
upsert(_load_s3_json(s3, SNAPSHOT_KEY), apply)
def ingest_changes(apply: bool) -> None:
s3 = get_s3()
after = _get_watermark() if apply else None
keys = _list_change_keys(s3, after)
log.info("%d change file(s) to process (watermark=%s)", len(keys), after)
total = 0
for key in keys:
total += upsert(_load_s3_json(s3, key), apply)
if apply:
_set_watermark(key)
log.info("changes: processed %d file(s), upserted %d rows", len(keys), total)
def ingest_file(path: str, apply: bool) -> None:
upsert(_load_local(path), apply)
# ── entrypoint ────────────────────────────────────────────────────────────────
def main() -> None:
ap = argparse.ArgumentParser(description="Ingest fuel records (raw-first) from the rustfs bucket")
ap.add_argument("--apply", action="store_true", help="Write to DB (default: dry-run)")
mode = ap.add_mutually_exclusive_group()
mode.add_argument("--snapshot", action="store_true",
help="Full reconcile from fuel_records/latest.json (default)")
mode.add_argument("--changes", action="store_true",
help="Incremental: change files newer than the stored watermark")
mode.add_argument("--file", default=None, help="Local JSON file (snapshot or changes envelope)")
args = ap.parse_args()
if args.file:
ingest_file(args.file, args.apply)
elif args.changes:
ingest_changes(args.apply)
else: # default
ingest_snapshot(args.apply)
if __name__ == "__main__":
main()