From 6cf0905b3195da3bece23d4c2be88f37c183c588 Mon Sep 17 00:00:00 2001 From: david kiania Date: Wed, 10 Jun 2026 12:12:00 +0300 Subject: [PATCH] feat(dashboard_api): FleetOps analytics endpoints + fuel view (Phase 3) Adds the read-only /analytics/* surface the FleetOps SPA will consume, plus the migration that backs the fuel roll-up. All endpoints SELECT the indexed reporting.* / tracksolid.v_* views and never write, so the forthcoming staging instance can serve them against the prod DB as grafana_ro. dashboard_api_rev.py: - GET /analytics/fleet-summary per-vehicle + per-cost-centre roll-up - GET /analytics/utilisation per-vehicle utilisation + daily fleet trend - GET /analytics/driver-behaviour per-driver speeding / harsh index - GET /analytics/fuel actual vs estimated litres (data-gated flags) - GET /analytics/filters dropdown options (alias of GET /webhook/fleet-dashboard) - responses run through jsonable_encoder (Decimal->float, date->ISO) - VTRIPS_REFRESH_INTERVAL_S<=0 now DISABLES the v_trips refresher, so a read-only staging instance never attempts REFRESH (prod still owns it). migrations/17_fleetops_fuel_view.sql: - reporting.v_fuel_daily encapsulates the v_trips->devices join (so the read-only role needs SELECT only on the view) and grants it to grafana_ro. Registered 17 in run_migrations.py. Note: live migration head is 16, not 13 as CLAUDE.md implies. Endpoints are unit-compilable but untested live until the staging bridge (Phase 1) exists. Co-Authored-By: Claude Opus 4.8 --- dashboard_api_rev.py | 319 +++++++++++++++++++++++++- docs/STAGING_FLEETOPS_ARCHITECTURE.md | 12 +- migrations/17_fleetops_fuel_view.sql | 56 +++++ run_migrations.py | 1 + 4 files changed, 373 insertions(+), 15 deletions(-) create mode 100644 migrations/17_fleetops_fuel_view.sql diff --git a/dashboard_api_rev.py b/dashboard_api_rev.py index 5cf0539..c764219 100644 --- a/dashboard_api_rev.py +++ b/dashboard_api_rev.py @@ -42,6 +42,7 @@ from urllib.parse import parse_qs import psycopg2 import psycopg2.extras from fastapi import FastAPI, Request +from fastapi.encoders import jsonable_encoder from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse @@ -68,6 +69,10 @@ _ALLOWED_ORIGINS = [ # uvicorn workers (only one worker refreshes per tick); the work runs in a # thread so the async event loop never blocks on the ~9s REFRESH. _DATABASE_URL = os.environ["DATABASE_URL"] +# VTRIPS_REFRESH_INTERVAL_S <= 0 disables the in-process refresher entirely. +# Staging sets it to 0: it connects read-only and prod owns the refresh, so a +# staging instance must never attempt REFRESH (it would only log permission +# errors). Prod keeps the 300s default. _REFRESH_INTERVAL_S = int(os.getenv("VTRIPS_REFRESH_INTERVAL_S", "300")) _REFRESH_LOCK_KEY = 920_145 # arbitrary, stable advisory-lock key for this job @@ -119,17 +124,26 @@ async def _refresh_loop(): @asynccontextmanager async def lifespan(app: FastAPI): - log.info( - "Dashboard API starting (v1.1). Origins=%s. v_trips refresh every %ss.", - _ALLOWED_ORIGINS, _REFRESH_INTERVAL_S, - ) - refresher = asyncio.create_task(_refresh_loop()) + refresher = None + if _REFRESH_INTERVAL_S > 0: + log.info( + "Dashboard API starting (v1.2). Origins=%s. v_trips refresh every %ss.", + _ALLOWED_ORIGINS, _REFRESH_INTERVAL_S, + ) + refresher = asyncio.create_task(_refresh_loop()) + else: + log.info( + "Dashboard API starting (v1.2). Origins=%s. v_trips refresher DISABLED " + "(VTRIPS_REFRESH_INTERVAL_S<=0) — read-only / staging mode.", + _ALLOWED_ORIGINS, + ) yield - refresher.cancel() - try: - await refresher - except asyncio.CancelledError: - pass + if refresher is not None: + refresher.cancel() + try: + await refresher + except asyncio.CancelledError: + pass close_pool() @@ -322,3 +336,288 @@ async def fleet_trips(request: Request): return JSONResponse( {"error": {"type": "unknown", "message": "Fleet feed is unavailable. Try again in a few seconds."}} ) + + +# ── FleetOps analytics (#15) ───────────────────────────────────────────────── +# Read-only roll-ups powering the FleetOps SPA (fleetops.rahamafresh.com): +# utilisation, distance, driver behaviour and fuel. Every query SELECTs the +# indexed reporting.* / tracksolid.v_* views and never writes — so the staging +# instance serves them against the prod DB as a read-only role. Numeric/date +# values come back as Decimal/date from psycopg2, so responses pass through +# jsonable_encoder (Decimal→float, date→ISO) before JSONResponse. +# +# GET /analytics/fleet-summary per-vehicle + per-cost-centre roll-up +# GET /analytics/utilisation per-vehicle utilisation + daily fleet trend +# GET /analytics/driver-behaviour per-driver speeding / harsh index +# GET /analytics/fuel actual vs estimated litres (data-gated) +# GET /analytics/filters dropdown options (alias of GET /webhook/fleet-dashboard) +# +# Shared query params: period (today|7d|30d|custom, default 30d), start_date, +# end_date, and optional dims cost_centre / assigned_city / vehicle_number / +# driver. + +def _json(obj): + """Serialise dicts that may carry Decimal / date values from psycopg2.""" + return JSONResponse(jsonable_encoder(obj)) + + +def _analytics_window(period, start_date, end_date): + """Date range for analytics — defaults to a 30-day window (vs the 7d trips default).""" + return _preset_to_range(period or "30d", start_date, end_date) + + +def _dim_filters(cost_centre=None, assigned_city=None, vehicle_number=None, driver=None): + """Optional WHERE fragments shared by the reporting.* analytics views. + + Column names are fixed literals (not user input); only the values are + parameterised, so interpolating the fragments into the query is injection-safe. + """ + clauses, params = [], {} + for col, val in ( + ("cost_centre", cost_centre), + ("assigned_city", assigned_city), + ("vehicle_number", vehicle_number), + ("assigned_driver", driver), + ): + v = _clean(val) + if v is not None: + clauses.append(f"{col} = %({col})s") + params[col] = v + return clauses, params + + +def _analytics_error(name): + log.exception("%s failed", name) + return JSONResponse( + {"error": {"type": "unknown", + "message": "Analytics feed is unavailable. Try again in a few seconds."}} + ) + + +@app.get("/analytics/fleet-summary") +def analytics_fleet_summary( + period: str | None = None, start_date: str | None = None, end_date: str | None = None, + cost_centre: str | None = None, assigned_city: str | None = None, + vehicle_number: str | None = None, driver: str | None = None, +): + start, end = _analytics_window(period, start_date, end_date) + clauses, params = _dim_filters(cost_centre, assigned_city, vehicle_number, driver) + params |= {"start": start, "end": end} + where = " AND ".join(["trip_date BETWEEN %(start)s AND %(end)s"] + clauses) + try: + with get_conn() as conn: + with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: + cur.execute( + f""" + SELECT vehicle_number, cost_centre, assigned_city, assigned_driver, + count(DISTINCT trip_date) AS active_days, + sum(trip_count) AS trips, + round(sum(total_km), 1) AS total_km, + round(sum(driving_hours), 1) AS driving_hours, + round(sum(idle_hours), 1) AS idle_hours, + round(100.0 * sum(idle_hours) + / NULLIF(sum(idle_hours + driving_hours), 0), 1) AS idle_pct, + round(max(max_speed_kmh)) AS max_speed_kmh + FROM reporting.v_daily_summary + WHERE {where} + GROUP BY vehicle_number, cost_centre, assigned_city, assigned_driver + ORDER BY total_km DESC NULLS LAST + """, + params, + ) + rows = cur.fetchall() + cur.execute( + f""" + SELECT cost_centre, + count(DISTINCT vehicle_number) AS vehicles, + sum(trip_count) AS trips, + round(sum(total_km), 1) AS total_km, + round(sum(driving_hours), 1) AS driving_hours, + round(sum(idle_hours), 1) AS idle_hours, + round(100.0 * sum(idle_hours) + / NULLIF(sum(idle_hours + driving_hours), 0), 1) AS idle_pct + FROM reporting.v_daily_summary + WHERE {where} + GROUP BY cost_centre + ORDER BY total_km DESC NULLS LAST + """, + params, + ) + by_cc = cur.fetchall() + totals = { + "vehicles": len({r["vehicle_number"] for r in rows}), + "trips": sum(int(r["trips"] or 0) for r in rows), + "total_km": round(sum(float(r["total_km"] or 0) for r in rows), 1), + "driving_hours": round(sum(float(r["driving_hours"] or 0) for r in rows), 1), + "idle_hours": round(sum(float(r["idle_hours"] or 0) for r in rows), 1), + } + return _json({"window": {"start": str(start), "end": str(end)}, + "totals": totals, "rows": rows, "by_cost_centre": by_cc}) + except Exception: + return _analytics_error("analytics/fleet-summary") + + +@app.get("/analytics/utilisation") +def analytics_utilisation( + period: str | None = None, start_date: str | None = None, end_date: str | None = None, + cost_centre: str | None = None, assigned_city: str | None = None, + vehicle_number: str | None = None, driver: str | None = None, +): + start, end = _analytics_window(period, start_date, end_date) + clauses, params = _dim_filters(cost_centre, assigned_city, vehicle_number, driver) + params |= {"start": start, "end": end} + where = " AND ".join(["trip_date BETWEEN %(start)s AND %(end)s"] + clauses) + try: + with get_conn() as conn: + with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: + cur.execute( + f""" + SELECT vehicle_number, cost_centre, assigned_city, + count(DISTINCT trip_date) AS active_days, + round(sum(total_km), 1) AS total_km, + round(sum(total_km) + / NULLIF(count(DISTINCT trip_date), 0), 1) AS km_per_active_day, + round(sum(driving_hours), 1) AS driving_hours, + round(sum(idle_hours), 1) AS idle_hours, + round(100.0 * sum(idle_hours) + / NULLIF(sum(idle_hours + driving_hours), 0), 1) AS idle_pct + FROM reporting.v_daily_summary + WHERE {where} + GROUP BY vehicle_number, cost_centre, assigned_city + ORDER BY total_km DESC NULLS LAST + """, + params, + ) + by_vehicle = cur.fetchall() + cur.execute( + f""" + SELECT trip_date, + count(DISTINCT vehicle_number) AS active_vehicles, + round(sum(total_km), 1) AS total_km, + round(sum(driving_hours), 1) AS driving_hours, + round(sum(idle_hours), 1) AS idle_hours, + round(100.0 * sum(idle_hours) + / NULLIF(sum(idle_hours + driving_hours), 0), 1) AS idle_pct + FROM reporting.v_daily_summary + WHERE {where} + GROUP BY trip_date + ORDER BY trip_date + """, + params, + ) + daily_trend = cur.fetchall() + return _json({"window": {"start": str(start), "end": str(end)}, + "by_vehicle": by_vehicle, "daily_trend": daily_trend}) + except Exception: + return _analytics_error("analytics/utilisation") + + +@app.get("/analytics/driver-behaviour") +def analytics_driver_behaviour( + period: str | None = None, start_date: str | None = None, end_date: str | None = None, + assigned_city: str | None = None, driver: str | None = None, +): + start, end = _analytics_window(period, start_date, end_date) + clauses = ["day BETWEEN %(start)s AND %(end)s", "driver_name IS NOT NULL"] + params = {"start": start, "end": end} + city, drv = _clean(assigned_city), _clean(driver) + if city is not None: + clauses.append("assigned_city = %(city)s") + params["city"] = city + if drv is not None: + clauses.append("driver_name = %(drv)s") + params["drv"] = drv + where = " AND ".join(clauses) + try: + with get_conn() as conn: + with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: + cur.execute( + f""" + SELECT driver_name, assigned_city, + count(DISTINCT day) AS active_days, + round(sum(km), 1) AS total_km, + sum(trips) AS trips, + sum(events_80) AS events_80, + sum(events_100) AS events_100, + sum(events_120) AS events_120, + sum(harsh_events) AS harsh_events, + round(sum(events_80)::numeric + / NULLIF(sum(km), 0) * 100, 2) AS speeding_per_100km, + round(sum(harsh_events)::numeric + / NULLIF(sum(km), 0) * 100, 2) AS harsh_per_100km + FROM tracksolid.v_driver_aggregates_daily + WHERE {where} + GROUP BY driver_name, assigned_city + ORDER BY speeding_per_100km DESC NULLS LAST + """, + params, + ) + rows = cur.fetchall() + # driver_name is 0/63 populated until import_drivers_csv.py --apply runs, + # so this legitimately returns [] today; it fills in once drivers land. + return _json({"window": {"start": str(start), "end": str(end)}, + "drivers_populated": bool(rows), "rows": rows}) + except Exception: + return _analytics_error("analytics/driver-behaviour") + + +@app.get("/analytics/fuel") +def analytics_fuel( + period: str | None = None, start_date: str | None = None, end_date: str | None = None, + cost_centre: str | None = None, assigned_city: str | None = None, + vehicle_number: str | None = None, driver: str | None = None, +): + start, end = _analytics_window(period, start_date, end_date) + clauses, params = _dim_filters(cost_centre, assigned_city, vehicle_number, driver) + params |= {"start": start, "end": end} + where = " AND ".join(["trip_date BETWEEN %(start)s AND %(end)s"] + clauses) + try: + with get_conn() as conn: + with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: + cur.execute( + f""" + SELECT vehicle_number, cost_centre, assigned_city, + round(sum(distance_km), 1) AS total_km, + round(sum(actual_fuel_l), 2) AS actual_fuel_l, + round(sum(estimated_fuel_l), 2) AS estimated_fuel_l, + count(*) FILTER (WHERE actual_fuel_l IS NOT NULL) AS trips_with_actual, + count(*) AS trips + FROM reporting.v_fuel_daily + WHERE {where} + GROUP BY vehicle_number, cost_centre, assigned_city + ORDER BY total_km DESC NULLS LAST + """, + params, + ) + rows = cur.fetchall() + cur.execute( + f""" + SELECT bool_or(actual_fuel_l IS NOT NULL) AS actual_available, + bool_or(estimated_fuel_l IS NOT NULL) AS estimated_available + FROM reporting.v_fuel_daily + WHERE {where} + """, + params, + ) + flags = cur.fetchone() or {} + data_status = { + "actual_fuel_available": bool(flags.get("actual_available")), + "estimated_fuel_available": bool(flags.get("estimated_available")), + "notes": [ + "actual_fuel_l comes from trips.fuel_consumed_l (/pushtripreport webhook).", + "estimated_fuel_l needs devices.fuel_100km set per vehicle " + "(currently NULL fleet-wide — see CLAUDE.md Open Items).", + "Fuel-cost monetisation is unavailable: ops.cost_rates was purged 2026-06-05.", + ], + } + return _json({"window": {"start": str(start), "end": str(end)}, + "data_status": data_status, "rows": rows}) + except Exception: + return _analytics_error("analytics/fuel") + + +@app.get("/analytics/filters") +def analytics_filters(): + # Same dropdown options the trips dashboard uses (drivers / cost_centres / + # cities / vehicles). Aliased so FleetOps has a single /analytics/* surface. + return fleet_filter_options() diff --git a/docs/STAGING_FLEETOPS_ARCHITECTURE.md b/docs/STAGING_FLEETOPS_ARCHITECTURE.md index 1db1450..de9239e 100644 --- a/docs/STAGING_FLEETOPS_ARCHITECTURE.md +++ b/docs/STAGING_FLEETOPS_ARCHITECTURE.md @@ -101,11 +101,13 @@ the JSONB/GeoJSON return style of the existing `/webhook/*` routes: | `GET /analytics/fleet-summary` | `reporting.v_daily_summary` / `v_weekly_summary` / `v_monthly_summary` + `v_daily_cost_centre` | | `GET /analytics/utilisation` | derived from the `reporting` summaries (idle_pct, km/day) | | `GET /analytics/driver-behaviour` | `tracksolid.v_driver_aggregates_daily` | -| `GET /analytics/fuel` | `reporting.v_trips.fuel_consumed_l` + `devices.fuel_100km` — **data-gated** (returns "needs data" flags until populated) | -| `GET /analytics/filters` | `reporting.v_filter_*` (mirrors `GET /webhook/fleet-dashboard`) | +| `GET /analytics/fuel` | `reporting.v_fuel_daily` (migration 17 — wraps `v_trips.fuel_consumed_l` + `devices.fuel_100km`) — **data-gated** (returns "needs data" flags until populated) | +| `GET /analytics/filters` | `reporting.v_filter_*` (alias of `GET /webhook/fleet-dashboard`) | -Any aggregation that isn't a thin wrapper becomes a **new numbered migration** -(`migrations/15_*.sql`) — never edit an applied migration. +Aggregations that aren't thin wrappers get a **new numbered migration** — never edit an applied +one. The fuel roll-up ships as `migrations/17_fleetops_fuel_view.sql` (the live migration head is +**16**, not 13 as older docs imply); it also `GRANT`s `SELECT` to `grafana_ro` for the staging +read-only role. > **Reuse the existing reporting layer.** The analytics building blocks are `reporting.*` > (migrations 11/14) and the surviving `tracksolid.v_*` views (migration 07). The `ops.*` and @@ -183,7 +185,7 @@ client's production domains **last**. | **0 — Foundation** | This document; migrate all Coolify apps to Forgejo webhook deploys; provision the read-only DB role | Every existing Coolify app redeploys via webhook; read-only role can `SELECT` `reporting.*` + `tracksolid.v_*` and nothing else | | **1 — Staging backbone** | Staging `dashboard_api` bridge (`deploy_dashboard_api_staging.sh`, 8891, `fleetapi.fivetitude.com`, read-only, refresher off, staging CORS) | `curl https://fleetapi.fivetitude.com/health` ok; verifiably read-only; no staging rows in `reporting.refresh_log` | | **2 — FleetNow staging** | FleetNow repo: `staging` branch + parameterized API base + `fleetnow.fivetitude.com` Coolify app | Renders against staging API; `staging` push deploys staging only, `main` merge deploys prod only; prod FleetNow untouched | -| **3 — FleetOps backend** | `/analytics/*` endpoints in `dashboard_api_rev.py`; `migrations/15_*` if needed; tested on the staging API | Every route returns correct shape on `fleetapi.fivetitude.com`; fuel route returns "needs data" flags | +| **3 — FleetOps backend** | `/analytics/*` endpoints in `dashboard_api_rev.py` + `migrations/17_fleetops_fuel_view.sql`; refresher made disable-able (`VTRIPS_REFRESH_INTERVAL_S<=0`); tested on the staging API | Every route returns correct shape on `fleetapi.fivetitude.com`; fuel route returns "needs data" flags | | **4 — FleetOps SPA** | Scaffold `15_fleetops` (git init + remote + SPA/Dockerfile); `fleetops.fivetitude.com` Coolify app | Renders fuel/analytics/utilisation/driver panels from staging endpoints; CORS clean | | **5 — Production cutover** | Promote API to prod + prod CORS add; `fleetops.rahamafresh.com` Coolify app; prod DNS record; update `CLAUDE.md` / `CONNECTIONS.md` / `PLATFORM_OVERVIEW.html` | FleetOps live on prod; prod FleetNow/API otherwise unchanged; docs current | diff --git a/migrations/17_fleetops_fuel_view.sql b/migrations/17_fleetops_fuel_view.sql new file mode 100644 index 0000000..611130e --- /dev/null +++ b/migrations/17_fleetops_fuel_view.sql @@ -0,0 +1,56 @@ +-- 17_fleetops_fuel_view.sql +-- FleetOps fuel roll-up source: reporting.v_fuel_daily. +-- +-- Backs GET /analytics/fuel in dashboard_api_rev.py (the FleetOps SPA). It pairs +-- ACTUAL fuel (trips.fuel_consumed_l, from the /pushtripreport webhook) with an +-- ESTIMATED figure (distance_km * devices.fuel_100km / 100) so the SPA can show +-- both and flag the gap. +-- +-- Why a view (not a direct join in the API): it encapsulates the +-- reporting.v_trips -> tracksolid.devices join so the read-only staging role only +-- needs SELECT on this one reporting.* object, not on tracksolid.devices. It reuses +-- the same per-trip grain + is_meaningful_route filter as the other reporting +-- summaries (migration 11), and the same imei key v_trips already exposes. +-- +-- Data state (2026-06-10): devices.fuel_100km is NULL fleet-wide and the /pushoil +-- + /pushobd webhooks are unregistered, so estimated_fuel_l is NULL today and +-- actual_fuel_l is sparse. The view is correct now and fills in as data lands — +-- the API surfaces availability flags rather than faking numbers. Fuel-cost +-- monetisation is intentionally absent: ops.cost_rates was purged 2026-06-05 +-- (migration 12). +-- +-- CREATE OR REPLACE + guarded grant -> safe to re-apply. + +SET search_path = reporting, tracksolid, public; + +CREATE OR REPLACE VIEW reporting.v_fuel_daily AS + SELECT t.trip_date, + t.vehicle_number, + t.cost_centre, + t.assigned_city, + t.assigned_driver, + t.imei, + t.distance_km, + t.fuel_consumed_l AS actual_fuel_l, + CASE + WHEN d.fuel_100km IS NOT NULL AND t.distance_km IS NOT NULL + THEN round(t.distance_km * d.fuel_100km / 100.0, 3) + ELSE NULL::numeric + END AS estimated_fuel_l + FROM reporting.v_trips t + LEFT JOIN tracksolid.devices d ON d.imei = t.imei + WHERE t.is_meaningful_route; + +COMMENT ON VIEW reporting.v_fuel_daily IS + 'Per-trip fuel: actual (trips.fuel_consumed_l) vs estimated (distance_km * devices.fuel_100km/100). ' + 'Source for dashboard_api GET /analytics/fuel. Encapsulates the v_trips->devices join so the ' + 'read-only staging role needs SELECT only on this view. fuel_100km is NULL fleet-wide as of 2026-06-10.'; + +-- ── grants (guarded: roles may not exist on a fresh DB) ─────────────────────── +DO $grants$ +BEGIN + IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'grafana_ro') THEN + GRANT USAGE ON SCHEMA reporting TO grafana_ro; + GRANT SELECT ON reporting.v_fuel_daily TO grafana_ro; + END IF; +END $grants$; diff --git a/run_migrations.py b/run_migrations.py index 8e1e939..41a7b27 100644 --- a/run_migrations.py +++ b/run_migrations.py @@ -40,6 +40,7 @@ MIGRATIONS = [ "14_fleet_segment_and_vehicles_view.sql", # reporting.fn_fleet_segment + reporting.v_vehicles roster "15_map_exclude_cost_centres.sql", # hide personal/management/mtn vehicles from the live map "16_live_feed_vehicle_type.sql", # add vehicle_type + fleet_segment to fn_live_positions feed + "17_fleetops_fuel_view.sql", # reporting.v_fuel_daily — FleetOps GET /analytics/fuel source ] # ── Tables that must exist before the service is allowed to start ─────────────