feat(api): add /analytics/fuel-fills endpoints for the FleetOps Fuel Log
Some checks failed
Static Analysis / static (push) Waiting to run
Tests / test (push) Waiting to run
Static Analysis / static (pull_request) Has been cancelled
Tests / test (pull_request) Has been cancelled

Reads reporting.v_fuel_fills / v_fuel_efficiency (owned by the fleetfuel module,
which ingests the rustfs `fuel` bucket). Adds GET /analytics/fuel-fills (totals,
per-vehicle rows incl. km/L, by_department, daily trend, unmatched-plate status)
and /analytics/fuel-fills/recent, reusing _analytics_window + _dim_filters.
Extends /analytics/filters with departments + fuel_types, savepoint-guarded so a
not-yet-migrated v_fuel_fills can't break the existing trips dropdowns.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
kianiadee 2026-06-12 00:01:36 +03:00
parent 868960c39c
commit c42a500d4b

View file

@ -691,8 +691,187 @@ def analytics_fuel(
return _analytics_error("analytics/fuel") return _analytics_error("analytics/fuel")
# ── Fuel Log (#fuelfuel) — actual fills from the WhatsApp feed ───────────────
# Backed by reporting.v_fuel_fills / v_fuel_efficiency (owned by the `fleetfuel`
# repo, which ingests the rustfs `fuel` bucket). Separate from /analytics/fuel
# above (that one is the trip-derived estimate); this is real litres + KES spend.
def _fuel_filters(cost_centre, assigned_city, vehicle_number, driver, department, fuel_type):
"""Shared dims (_dim_filters) plus the fuel-native department / fuel_type."""
clauses, params = _dim_filters(cost_centre, assigned_city, vehicle_number, driver)
for col, val in (("department", department), ("fuel_type", fuel_type)):
v = _clean(val)
if v is not None:
clauses.append(f"{col} = %({col})s")
params[col] = v
return clauses, params
@app.get("/analytics/fuel-fills")
def analytics_fuel_fills(
period: str | None = None, start_date: str | None = None, end_date: str | None = None,
cost_centre: str | None = None, assigned_city: str | None = None,
vehicle_number: str | None = None, driver: str | None = None,
department: str | None = None, fuel_type: str | None = None,
):
start, end = _analytics_window(period or "90d", start_date, end_date)
clauses, params = _fuel_filters(cost_centre, assigned_city, vehicle_number, driver,
department, fuel_type)
params |= {"start": start, "end": end}
where = " AND ".join(["fuel_date BETWEEN %(start)s AND %(end)s"] + clauses)
try:
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(
f"""
SELECT round(sum(liters), 1) AS litres,
round(sum(amount), 0) AS spend_kes,
count(*) AS fills,
round(sum(amount) / NULLIF(sum(liters), 0), 1) AS avg_price_per_litre,
count(DISTINCT plate) AS vehicles_fuelled,
count(*) FILTER (WHERE vehicle_number IS NULL) AS unmatched_fills
FROM reporting.v_fuel_fills
WHERE {where}
""",
params,
)
totals = cur.fetchone() or {}
cur.execute(
f"""
SELECT f.plate, f.vehicle_number, f.cost_centre, f.assigned_city,
round(sum(f.liters), 1) AS litres,
round(sum(f.amount), 0) AS spend_kes,
count(*) AS fills,
max(f.odometer) AS last_odometer,
round(sum(f.amount) / NULLIF(sum(f.liters), 0), 1) AS avg_price_per_litre,
eff.km_per_litre
FROM reporting.v_fuel_fills f
LEFT JOIN (
SELECT plate, round(avg(km_per_litre), 2) AS km_per_litre
FROM reporting.v_fuel_efficiency
WHERE fuel_date BETWEEN %(start)s AND %(end)s
AND km_per_litre IS NOT NULL
GROUP BY plate
) eff ON eff.plate = f.plate
WHERE {where}
GROUP BY f.plate, f.vehicle_number, f.cost_centre, f.assigned_city, eff.km_per_litre
ORDER BY spend_kes DESC NULLS LAST
""",
params,
)
rows = cur.fetchall()
cur.execute(
f"""
SELECT coalesce(department, '(unspecified)') AS department,
round(sum(liters), 1) AS litres,
round(sum(amount), 0) AS spend_kes,
count(*) AS fills
FROM reporting.v_fuel_fills
WHERE {where}
GROUP BY department
ORDER BY spend_kes DESC NULLS LAST
""",
params,
)
by_department = cur.fetchall()
cur.execute(
f"""
SELECT fuel_date,
round(sum(liters), 1) AS litres,
round(sum(amount), 0) AS spend_kes,
count(*) AS fills
FROM reporting.v_fuel_fills
WHERE {where}
GROUP BY fuel_date
ORDER BY fuel_date
""",
params,
)
trend = cur.fetchall()
data_status = {
"matched_to_fleet": (totals.get("fills") or 0) - (totals.get("unmatched_fills") or 0),
"unmatched_fills": totals.get("unmatched_fills") or 0,
"notes": [
"Fills are real WhatsApp fuel-update records (litres + KES amount).",
"unmatched_fills are records whose plate didn't match a known vehicle "
"in tracksolid.devices — they still count in totals.",
"km_per_litre is derived from consecutive odometer readings; sparse where "
"odometer is missing or implausible.",
],
}
return _json({"window": {"start": str(start), "end": str(end)},
"data_status": data_status, "totals": totals, "rows": rows,
"by_department": by_department, "trend": trend})
except Exception:
return _analytics_error("analytics/fuel-fills")
@app.get("/analytics/fuel-fills/recent")
def analytics_fuel_fills_recent(
period: str | None = None, start_date: str | None = None, end_date: str | None = None,
cost_centre: str | None = None, assigned_city: str | None = None,
vehicle_number: str | None = None, driver: str | None = None,
department: str | None = None, fuel_type: str | None = None,
limit: int = 50,
):
start, end = _analytics_window(period or "90d", start_date, end_date)
clauses, params = _fuel_filters(cost_centre, assigned_city, vehicle_number, driver,
department, fuel_type)
params |= {"start": start, "end": end, "lim": max(1, min(limit, 500))}
where = " AND ".join(["fuel_date BETWEEN %(start)s AND %(end)s"] + clauses)
try:
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(
f"""
SELECT record_datetime, plate, vehicle_number, cost_centre, assigned_city,
department, driver, liters, amount, fuel_type, odometer
FROM reporting.v_fuel_fills
WHERE {where}
ORDER BY record_datetime DESC NULLS LAST
LIMIT %(lim)s
""",
params,
)
rows = cur.fetchall()
return _json({"window": {"start": str(start), "end": str(end)}, "rows": rows})
except Exception:
return _analytics_error("analytics/fuel-fills/recent")
@app.get("/analytics/filters") @app.get("/analytics/filters")
def analytics_filters(): def analytics_filters():
# Same dropdown options the trips dashboard uses (drivers / cost_centres / # Trips dropdowns (drivers / cost_centres / cities / vehicles) plus the fuel
# cities / vehicles). Aliased so FleetOps has a single /analytics/* surface. # dropdowns (departments / fuel_types), so FleetOps has a single /analytics/*
return fleet_filter_options() # filter surface for every tab including Fuel Log.
try:
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(_FILTER_OPTIONS_SQL)
row = cur.fetchone() or {}
# Fuel dims are best-effort: a missing reporting.v_fuel_fills (fleetfuel
# migration not yet applied) must NOT break the trips dropdowns, so query
# it in its own savepoint and degrade to empty lists if it isn't there.
fuel = {}
try:
cur.execute("SAVEPOINT fuel_dims")
cur.execute(
"SELECT array_agg(DISTINCT department) FILTER (WHERE department IS NOT NULL) AS departments,"
" array_agg(DISTINCT fuel_type) FILTER (WHERE fuel_type IS NOT NULL) AS fuel_types"
" FROM reporting.v_fuel_fills"
)
fuel = cur.fetchone() or {}
except Exception:
cur.execute("ROLLBACK TO SAVEPOINT fuel_dims")
log.warning("fuel filter dims unavailable (reporting.v_fuel_fills missing?)")
return JSONResponse({
"drivers": row.get("drivers") or [],
"cost_centres": row.get("cost_centres") or [],
"cities": row.get("cities") or [],
"vehicles": row.get("vehicles") or [],
"departments": sorted(fuel.get("departments") or []),
"fuel_types": sorted(fuel.get("fuel_types") or []),
})
except Exception:
log.exception("analytics/filters failed")
return JSONResponse({"drivers": [], "cost_centres": [], "cities": [],
"vehicles": [], "departments": [], "fuel_types": []})