feat(dashboard_api): FleetOps analytics endpoints + fuel view (Phase 3)

Adds the read-only /analytics/* surface the FleetOps SPA will consume, plus
the migration that backs the fuel roll-up. All endpoints SELECT the indexed
reporting.* / tracksolid.v_* views and never write, so the forthcoming staging
instance can serve them against the prod DB as grafana_ro.

dashboard_api_rev.py:
  - GET /analytics/fleet-summary     per-vehicle + per-cost-centre roll-up
  - GET /analytics/utilisation       per-vehicle utilisation + daily fleet trend
  - GET /analytics/driver-behaviour  per-driver speeding / harsh index
  - GET /analytics/fuel              actual vs estimated litres (data-gated flags)
  - GET /analytics/filters           dropdown options (alias of GET /webhook/fleet-dashboard)
  - responses run through jsonable_encoder (Decimal->float, date->ISO)
  - VTRIPS_REFRESH_INTERVAL_S<=0 now DISABLES the v_trips refresher, so a
    read-only staging instance never attempts REFRESH (prod still owns it).

migrations/17_fleetops_fuel_view.sql:
  - reporting.v_fuel_daily encapsulates the v_trips->devices join (so the
    read-only role needs SELECT only on the view) and grants it to grafana_ro.

Registered 17 in run_migrations.py. Note: live migration head is 16, not 13
as CLAUDE.md implies. Endpoints are unit-compilable but untested live until
the staging bridge (Phase 1) exists.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
david kiania 2026-06-10 12:12:00 +03:00
parent e01b50a8b1
commit 6cf0905b31
4 changed files with 373 additions and 15 deletions

View file

@ -42,6 +42,7 @@ from urllib.parse import parse_qs
import psycopg2
import psycopg2.extras
from fastapi import FastAPI, Request
from fastapi.encoders import jsonable_encoder
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
@ -68,6 +69,10 @@ _ALLOWED_ORIGINS = [
# uvicorn workers (only one worker refreshes per tick); the work runs in a
# thread so the async event loop never blocks on the ~9s REFRESH.
_DATABASE_URL = os.environ["DATABASE_URL"]
# VTRIPS_REFRESH_INTERVAL_S <= 0 disables the in-process refresher entirely.
# Staging sets it to 0: it connects read-only and prod owns the refresh, so a
# staging instance must never attempt REFRESH (it would only log permission
# errors). Prod keeps the 300s default.
_REFRESH_INTERVAL_S = int(os.getenv("VTRIPS_REFRESH_INTERVAL_S", "300"))
_REFRESH_LOCK_KEY = 920_145 # arbitrary, stable advisory-lock key for this job
@ -119,12 +124,21 @@ async def _refresh_loop():
@asynccontextmanager
async def lifespan(app: FastAPI):
refresher = None
if _REFRESH_INTERVAL_S > 0:
log.info(
"Dashboard API starting (v1.1). Origins=%s. v_trips refresh every %ss.",
"Dashboard API starting (v1.2). Origins=%s. v_trips refresh every %ss.",
_ALLOWED_ORIGINS, _REFRESH_INTERVAL_S,
)
refresher = asyncio.create_task(_refresh_loop())
else:
log.info(
"Dashboard API starting (v1.2). Origins=%s. v_trips refresher DISABLED "
"(VTRIPS_REFRESH_INTERVAL_S<=0) — read-only / staging mode.",
_ALLOWED_ORIGINS,
)
yield
if refresher is not None:
refresher.cancel()
try:
await refresher
@ -322,3 +336,288 @@ async def fleet_trips(request: Request):
return JSONResponse(
{"error": {"type": "unknown", "message": "Fleet feed is unavailable. Try again in a few seconds."}}
)
# ── FleetOps analytics (#15) ─────────────────────────────────────────────────
# Read-only roll-ups powering the FleetOps SPA (fleetops.rahamafresh.com):
# utilisation, distance, driver behaviour and fuel. Every query SELECTs the
# indexed reporting.* / tracksolid.v_* views and never writes — so the staging
# instance serves them against the prod DB as a read-only role. Numeric/date
# values come back as Decimal/date from psycopg2, so responses pass through
# jsonable_encoder (Decimal→float, date→ISO) before JSONResponse.
#
# GET /analytics/fleet-summary per-vehicle + per-cost-centre roll-up
# GET /analytics/utilisation per-vehicle utilisation + daily fleet trend
# GET /analytics/driver-behaviour per-driver speeding / harsh index
# GET /analytics/fuel actual vs estimated litres (data-gated)
# GET /analytics/filters dropdown options (alias of GET /webhook/fleet-dashboard)
#
# Shared query params: period (today|7d|30d|custom, default 30d), start_date,
# end_date, and optional dims cost_centre / assigned_city / vehicle_number /
# driver.
def _json(obj):
"""Serialise dicts that may carry Decimal / date values from psycopg2."""
return JSONResponse(jsonable_encoder(obj))
def _analytics_window(period, start_date, end_date):
"""Date range for analytics — defaults to a 30-day window (vs the 7d trips default)."""
return _preset_to_range(period or "30d", start_date, end_date)
def _dim_filters(cost_centre=None, assigned_city=None, vehicle_number=None, driver=None):
"""Optional WHERE fragments shared by the reporting.* analytics views.
Column names are fixed literals (not user input); only the values are
parameterised, so interpolating the fragments into the query is injection-safe.
"""
clauses, params = [], {}
for col, val in (
("cost_centre", cost_centre),
("assigned_city", assigned_city),
("vehicle_number", vehicle_number),
("assigned_driver", driver),
):
v = _clean(val)
if v is not None:
clauses.append(f"{col} = %({col})s")
params[col] = v
return clauses, params
def _analytics_error(name):
log.exception("%s failed", name)
return JSONResponse(
{"error": {"type": "unknown",
"message": "Analytics feed is unavailable. Try again in a few seconds."}}
)
@app.get("/analytics/fleet-summary")
def analytics_fleet_summary(
period: str | None = None, start_date: str | None = None, end_date: str | None = None,
cost_centre: str | None = None, assigned_city: str | None = None,
vehicle_number: str | None = None, driver: str | None = None,
):
start, end = _analytics_window(period, start_date, end_date)
clauses, params = _dim_filters(cost_centre, assigned_city, vehicle_number, driver)
params |= {"start": start, "end": end}
where = " AND ".join(["trip_date BETWEEN %(start)s AND %(end)s"] + clauses)
try:
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(
f"""
SELECT vehicle_number, cost_centre, assigned_city, assigned_driver,
count(DISTINCT trip_date) AS active_days,
sum(trip_count) AS trips,
round(sum(total_km), 1) AS total_km,
round(sum(driving_hours), 1) AS driving_hours,
round(sum(idle_hours), 1) AS idle_hours,
round(100.0 * sum(idle_hours)
/ NULLIF(sum(idle_hours + driving_hours), 0), 1) AS idle_pct,
round(max(max_speed_kmh)) AS max_speed_kmh
FROM reporting.v_daily_summary
WHERE {where}
GROUP BY vehicle_number, cost_centre, assigned_city, assigned_driver
ORDER BY total_km DESC NULLS LAST
""",
params,
)
rows = cur.fetchall()
cur.execute(
f"""
SELECT cost_centre,
count(DISTINCT vehicle_number) AS vehicles,
sum(trip_count) AS trips,
round(sum(total_km), 1) AS total_km,
round(sum(driving_hours), 1) AS driving_hours,
round(sum(idle_hours), 1) AS idle_hours,
round(100.0 * sum(idle_hours)
/ NULLIF(sum(idle_hours + driving_hours), 0), 1) AS idle_pct
FROM reporting.v_daily_summary
WHERE {where}
GROUP BY cost_centre
ORDER BY total_km DESC NULLS LAST
""",
params,
)
by_cc = cur.fetchall()
totals = {
"vehicles": len({r["vehicle_number"] for r in rows}),
"trips": sum(int(r["trips"] or 0) for r in rows),
"total_km": round(sum(float(r["total_km"] or 0) for r in rows), 1),
"driving_hours": round(sum(float(r["driving_hours"] or 0) for r in rows), 1),
"idle_hours": round(sum(float(r["idle_hours"] or 0) for r in rows), 1),
}
return _json({"window": {"start": str(start), "end": str(end)},
"totals": totals, "rows": rows, "by_cost_centre": by_cc})
except Exception:
return _analytics_error("analytics/fleet-summary")
@app.get("/analytics/utilisation")
def analytics_utilisation(
period: str | None = None, start_date: str | None = None, end_date: str | None = None,
cost_centre: str | None = None, assigned_city: str | None = None,
vehicle_number: str | None = None, driver: str | None = None,
):
start, end = _analytics_window(period, start_date, end_date)
clauses, params = _dim_filters(cost_centre, assigned_city, vehicle_number, driver)
params |= {"start": start, "end": end}
where = " AND ".join(["trip_date BETWEEN %(start)s AND %(end)s"] + clauses)
try:
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(
f"""
SELECT vehicle_number, cost_centre, assigned_city,
count(DISTINCT trip_date) AS active_days,
round(sum(total_km), 1) AS total_km,
round(sum(total_km)
/ NULLIF(count(DISTINCT trip_date), 0), 1) AS km_per_active_day,
round(sum(driving_hours), 1) AS driving_hours,
round(sum(idle_hours), 1) AS idle_hours,
round(100.0 * sum(idle_hours)
/ NULLIF(sum(idle_hours + driving_hours), 0), 1) AS idle_pct
FROM reporting.v_daily_summary
WHERE {where}
GROUP BY vehicle_number, cost_centre, assigned_city
ORDER BY total_km DESC NULLS LAST
""",
params,
)
by_vehicle = cur.fetchall()
cur.execute(
f"""
SELECT trip_date,
count(DISTINCT vehicle_number) AS active_vehicles,
round(sum(total_km), 1) AS total_km,
round(sum(driving_hours), 1) AS driving_hours,
round(sum(idle_hours), 1) AS idle_hours,
round(100.0 * sum(idle_hours)
/ NULLIF(sum(idle_hours + driving_hours), 0), 1) AS idle_pct
FROM reporting.v_daily_summary
WHERE {where}
GROUP BY trip_date
ORDER BY trip_date
""",
params,
)
daily_trend = cur.fetchall()
return _json({"window": {"start": str(start), "end": str(end)},
"by_vehicle": by_vehicle, "daily_trend": daily_trend})
except Exception:
return _analytics_error("analytics/utilisation")
@app.get("/analytics/driver-behaviour")
def analytics_driver_behaviour(
period: str | None = None, start_date: str | None = None, end_date: str | None = None,
assigned_city: str | None = None, driver: str | None = None,
):
start, end = _analytics_window(period, start_date, end_date)
clauses = ["day BETWEEN %(start)s AND %(end)s", "driver_name IS NOT NULL"]
params = {"start": start, "end": end}
city, drv = _clean(assigned_city), _clean(driver)
if city is not None:
clauses.append("assigned_city = %(city)s")
params["city"] = city
if drv is not None:
clauses.append("driver_name = %(drv)s")
params["drv"] = drv
where = " AND ".join(clauses)
try:
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(
f"""
SELECT driver_name, assigned_city,
count(DISTINCT day) AS active_days,
round(sum(km), 1) AS total_km,
sum(trips) AS trips,
sum(events_80) AS events_80,
sum(events_100) AS events_100,
sum(events_120) AS events_120,
sum(harsh_events) AS harsh_events,
round(sum(events_80)::numeric
/ NULLIF(sum(km), 0) * 100, 2) AS speeding_per_100km,
round(sum(harsh_events)::numeric
/ NULLIF(sum(km), 0) * 100, 2) AS harsh_per_100km
FROM tracksolid.v_driver_aggregates_daily
WHERE {where}
GROUP BY driver_name, assigned_city
ORDER BY speeding_per_100km DESC NULLS LAST
""",
params,
)
rows = cur.fetchall()
# driver_name is 0/63 populated until import_drivers_csv.py --apply runs,
# so this legitimately returns [] today; it fills in once drivers land.
return _json({"window": {"start": str(start), "end": str(end)},
"drivers_populated": bool(rows), "rows": rows})
except Exception:
return _analytics_error("analytics/driver-behaviour")
@app.get("/analytics/fuel")
def analytics_fuel(
period: str | None = None, start_date: str | None = None, end_date: str | None = None,
cost_centre: str | None = None, assigned_city: str | None = None,
vehicle_number: str | None = None, driver: str | None = None,
):
start, end = _analytics_window(period, start_date, end_date)
clauses, params = _dim_filters(cost_centre, assigned_city, vehicle_number, driver)
params |= {"start": start, "end": end}
where = " AND ".join(["trip_date BETWEEN %(start)s AND %(end)s"] + clauses)
try:
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(
f"""
SELECT vehicle_number, cost_centre, assigned_city,
round(sum(distance_km), 1) AS total_km,
round(sum(actual_fuel_l), 2) AS actual_fuel_l,
round(sum(estimated_fuel_l), 2) AS estimated_fuel_l,
count(*) FILTER (WHERE actual_fuel_l IS NOT NULL) AS trips_with_actual,
count(*) AS trips
FROM reporting.v_fuel_daily
WHERE {where}
GROUP BY vehicle_number, cost_centre, assigned_city
ORDER BY total_km DESC NULLS LAST
""",
params,
)
rows = cur.fetchall()
cur.execute(
f"""
SELECT bool_or(actual_fuel_l IS NOT NULL) AS actual_available,
bool_or(estimated_fuel_l IS NOT NULL) AS estimated_available
FROM reporting.v_fuel_daily
WHERE {where}
""",
params,
)
flags = cur.fetchone() or {}
data_status = {
"actual_fuel_available": bool(flags.get("actual_available")),
"estimated_fuel_available": bool(flags.get("estimated_available")),
"notes": [
"actual_fuel_l comes from trips.fuel_consumed_l (/pushtripreport webhook).",
"estimated_fuel_l needs devices.fuel_100km set per vehicle "
"(currently NULL fleet-wide — see CLAUDE.md Open Items).",
"Fuel-cost monetisation is unavailable: ops.cost_rates was purged 2026-06-05.",
],
}
return _json({"window": {"start": str(start), "end": str(end)},
"data_status": data_status, "rows": rows})
except Exception:
return _analytics_error("analytics/fuel")
@app.get("/analytics/filters")
def analytics_filters():
# Same dropdown options the trips dashboard uses (drivers / cost_centres /
# cities / vehicles). Aliased so FleetOps has a single /analytics/* surface.
return fleet_filter_options()

View file

@ -101,11 +101,13 @@ the JSONB/GeoJSON return style of the existing `/webhook/*` routes:
| `GET /analytics/fleet-summary` | `reporting.v_daily_summary` / `v_weekly_summary` / `v_monthly_summary` + `v_daily_cost_centre` |
| `GET /analytics/utilisation` | derived from the `reporting` summaries (idle_pct, km/day) |
| `GET /analytics/driver-behaviour` | `tracksolid.v_driver_aggregates_daily` |
| `GET /analytics/fuel` | `reporting.v_trips.fuel_consumed_l` + `devices.fuel_100km`**data-gated** (returns "needs data" flags until populated) |
| `GET /analytics/filters` | `reporting.v_filter_*` (mirrors `GET /webhook/fleet-dashboard`) |
| `GET /analytics/fuel` | `reporting.v_fuel_daily` (migration 17 — wraps `v_trips.fuel_consumed_l` + `devices.fuel_100km`)**data-gated** (returns "needs data" flags until populated) |
| `GET /analytics/filters` | `reporting.v_filter_*` (alias of `GET /webhook/fleet-dashboard`) |
Any aggregation that isn't a thin wrapper becomes a **new numbered migration**
(`migrations/15_*.sql`) — never edit an applied migration.
Aggregations that aren't thin wrappers get a **new numbered migration** — never edit an applied
one. The fuel roll-up ships as `migrations/17_fleetops_fuel_view.sql` (the live migration head is
**16**, not 13 as older docs imply); it also `GRANT`s `SELECT` to `grafana_ro` for the staging
read-only role.
> **Reuse the existing reporting layer.** The analytics building blocks are `reporting.*`
> (migrations 11/14) and the surviving `tracksolid.v_*` views (migration 07). The `ops.*` and
@ -183,7 +185,7 @@ client's production domains **last**.
| **0 — Foundation** | This document; migrate all Coolify apps to Forgejo webhook deploys; provision the read-only DB role | Every existing Coolify app redeploys via webhook; read-only role can `SELECT` `reporting.*` + `tracksolid.v_*` and nothing else |
| **1 — Staging backbone** | Staging `dashboard_api` bridge (`deploy_dashboard_api_staging.sh`, 8891, `fleetapi.fivetitude.com`, read-only, refresher off, staging CORS) | `curl https://fleetapi.fivetitude.com/health` ok; verifiably read-only; no staging rows in `reporting.refresh_log` |
| **2 — FleetNow staging** | FleetNow repo: `staging` branch + parameterized API base + `fleetnow.fivetitude.com` Coolify app | Renders against staging API; `staging` push deploys staging only, `main` merge deploys prod only; prod FleetNow untouched |
| **3 — FleetOps backend** | `/analytics/*` endpoints in `dashboard_api_rev.py`; `migrations/15_*` if needed; tested on the staging API | Every route returns correct shape on `fleetapi.fivetitude.com`; fuel route returns "needs data" flags |
| **3 — FleetOps backend** | `/analytics/*` endpoints in `dashboard_api_rev.py` + `migrations/17_fleetops_fuel_view.sql`; refresher made disable-able (`VTRIPS_REFRESH_INTERVAL_S<=0`); tested on the staging API | Every route returns correct shape on `fleetapi.fivetitude.com`; fuel route returns "needs data" flags |
| **4 — FleetOps SPA** | Scaffold `15_fleetops` (git init + remote + SPA/Dockerfile); `fleetops.fivetitude.com` Coolify app | Renders fuel/analytics/utilisation/driver panels from staging endpoints; CORS clean |
| **5 — Production cutover** | Promote API to prod + prod CORS add; `fleetops.rahamafresh.com` Coolify app; prod DNS record; update `CLAUDE.md` / `CONNECTIONS.md` / `PLATFORM_OVERVIEW.html` | FleetOps live on prod; prod FleetNow/API otherwise unchanged; docs current |

View file

@ -0,0 +1,56 @@
-- 17_fleetops_fuel_view.sql
-- FleetOps fuel roll-up source: reporting.v_fuel_daily.
--
-- Backs GET /analytics/fuel in dashboard_api_rev.py (the FleetOps SPA). It pairs
-- ACTUAL fuel (trips.fuel_consumed_l, from the /pushtripreport webhook) with an
-- ESTIMATED figure (distance_km * devices.fuel_100km / 100) so the SPA can show
-- both and flag the gap.
--
-- Why a view (not a direct join in the API): it encapsulates the
-- reporting.v_trips -> tracksolid.devices join so the read-only staging role only
-- needs SELECT on this one reporting.* object, not on tracksolid.devices. It reuses
-- the same per-trip grain + is_meaningful_route filter as the other reporting
-- summaries (migration 11), and the same imei key v_trips already exposes.
--
-- Data state (2026-06-10): devices.fuel_100km is NULL fleet-wide and the /pushoil
-- + /pushobd webhooks are unregistered, so estimated_fuel_l is NULL today and
-- actual_fuel_l is sparse. The view is correct now and fills in as data lands —
-- the API surfaces availability flags rather than faking numbers. Fuel-cost
-- monetisation is intentionally absent: ops.cost_rates was purged 2026-06-05
-- (migration 12).
--
-- CREATE OR REPLACE + guarded grant -> safe to re-apply.
SET search_path = reporting, tracksolid, public;
CREATE OR REPLACE VIEW reporting.v_fuel_daily AS
SELECT t.trip_date,
t.vehicle_number,
t.cost_centre,
t.assigned_city,
t.assigned_driver,
t.imei,
t.distance_km,
t.fuel_consumed_l AS actual_fuel_l,
CASE
WHEN d.fuel_100km IS NOT NULL AND t.distance_km IS NOT NULL
THEN round(t.distance_km * d.fuel_100km / 100.0, 3)
ELSE NULL::numeric
END AS estimated_fuel_l
FROM reporting.v_trips t
LEFT JOIN tracksolid.devices d ON d.imei = t.imei
WHERE t.is_meaningful_route;
COMMENT ON VIEW reporting.v_fuel_daily IS
'Per-trip fuel: actual (trips.fuel_consumed_l) vs estimated (distance_km * devices.fuel_100km/100). '
'Source for dashboard_api GET /analytics/fuel. Encapsulates the v_trips->devices join so the '
'read-only staging role needs SELECT only on this view. fuel_100km is NULL fleet-wide as of 2026-06-10.';
-- ── grants (guarded: roles may not exist on a fresh DB) ───────────────────────
DO $grants$
BEGIN
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'grafana_ro') THEN
GRANT USAGE ON SCHEMA reporting TO grafana_ro;
GRANT SELECT ON reporting.v_fuel_daily TO grafana_ro;
END IF;
END $grants$;

View file

@ -40,6 +40,7 @@ MIGRATIONS = [
"14_fleet_segment_and_vehicles_view.sql", # reporting.fn_fleet_segment + reporting.v_vehicles roster
"15_map_exclude_cost_centres.sql", # hide personal/management/mtn vehicles from the live map
"16_live_feed_vehicle_type.sql", # add vehicle_type + fleet_segment to fn_live_positions feed
"17_fleetops_fuel_view.sql", # reporting.v_fuel_daily — FleetOps GET /analytics/fuel source
]
# ── Tables that must exist before the service is allowed to start ─────────────