From 5703d70aa6fa4ab57f183fe87144b4725f3e034f Mon Sep 17 00:00:00 2001 From: david kiania Date: Mon, 1 Jun 2026 04:23:37 +0300 Subject: [PATCH] feat(api): dedicated FastAPI read-API for map dashboards (replaces n8n) n8n was a thin HTTP->SQL proxy for the Live Position and Fleet Trips maps and proved fragile (credential reloads, :latest drift, shared connection limits). This service calls the same proven reporting.* functions directly, reusing the existing psycopg2 pool / Docker image / Coolify deploy. Endpoints mirror the n8n webhook paths so the only frontend change is N8N_BASE: GET /webhook/live-positions -> {summary, geojson} (fn_live_positions) GET /webhook/vehicle-track -> GeoJSON Feature (fn_vehicle_track) GET /webhook/fleet-dashboard -> filter options POST /webhook/fleet-dashboard -> trips payload (fn_trips_for_map) Response shapes replicate the n8n "Build response JSON" nodes exactly; empty filters/sentinels ('', null, undefined) normalize to SQL wildcards. CORS limited to the dashboard origins. Added dashboard_api service to docker-compose (port 8890, Coolify-routed). SQL contracts validated against prod. Co-Authored-By: Claude Opus 4.8 --- dashboard_api_rev.py | 238 +++++++++++++++++++++++++++++++++++++++++++ docker-compose.yaml | 25 +++++ 2 files changed, 263 insertions(+) create mode 100644 dashboard_api_rev.py diff --git a/dashboard_api_rev.py b/dashboard_api_rev.py new file mode 100644 index 0000000..d4dfc4c --- /dev/null +++ b/dashboard_api_rev.py @@ -0,0 +1,238 @@ +""" +dashboard_api_rev.py — Fireside Communications · Map Dashboard Read API +━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ +Stable replacement for the n8n webhooks that fed the Live Position and Fleet +Trips map dashboards. n8n was acting only as a thin HTTP→SQL proxy; this +service does the same job directly against the proven reporting.* functions, +removing n8n's credential-management / reload / version-drift fragility from +the live-data path. + +It REUSES the existing stack: ts_shared_rev's psycopg2 pool and DATABASE_URL, +the same Docker image, the same Coolify deploy. The reporting.* functions +(already verified to return correct GeoJSON) are the single source of truth. + +Endpoints mirror the original n8n webhook paths so the only frontend change +is the base URL (the `N8N_BASE` constant in each dashboard SPA): + + GET /webhook/live-positions?cost_centre=&acc_status= + → { summary, geojson } (reporting.fn_live_positions) + GET /webhook/vehicle-track?vehicle_number=&hours= + → GeoJSON Feature (reporting.fn_vehicle_track) + GET /webhook/fleet-dashboard + → { drivers, cost_centres, cities, vehicles } (filter options) + POST /webhook/fleet-dashboard body: {period, vehicle_numbers, driver, + cost_centre, assigned_city, + start_date, end_date} + → trips payload (reporting.fn_trips_for_map) + GET /health +━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ +""" + +from __future__ import annotations + +import os +from contextlib import asynccontextmanager +from datetime import date, datetime, timedelta, timezone + +import psycopg2.extras +from fastapi import FastAPI, Request +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import JSONResponse + +from ts_shared_rev import close_pool, get_conn, get_logger + +log = get_logger("dashboard_api") + +# Comma-separated list of allowed browser origins (the dashboard domains). +_ALLOWED_ORIGINS = [ + o.strip() + for o in os.getenv( + "DASHBOARD_CORS_ORIGINS", + "https://liveposition.rahamafresh.com,https://fleetintelligence.rahamafresh.com", + ).split(",") + if o.strip() +] + + +@asynccontextmanager +async def lifespan(app: FastAPI): + log.info("Dashboard API starting (v1.0). Origins=%s", _ALLOWED_ORIGINS) + yield + close_pool() + + +app = FastAPI(title="Fireside Map Dashboard API", lifespan=lifespan) +app.add_middleware( + CORSMiddleware, + allow_origins=_ALLOWED_ORIGINS, + allow_methods=["GET", "POST", "OPTIONS"], + allow_headers=["*"], +) + +_EMPTY_GEOJSON = {"type": "FeatureCollection", "features": []} + + +def _clean(v): + """Treat missing / blank / sentinel values as None (= SQL wildcard).""" + if v is None: + return None + s = str(v).strip() + return s if s and s.lower() not in ("null", "undefined") else None + + +# ── Health ──────────────────────────────────────────────────────────────────── + +@app.get("/health") +def health(): + return {"status": "ok"} + + +# ── Live positions (#004) ─────────────────────────────────────────────────── + +@app.get("/webhook/live-positions") +def live_positions(cost_centre: str | None = None, acc_status: str | None = None): + try: + with get_conn() as conn: + with conn.cursor() as cur: + cur.execute( + "SELECT reporting.fn_live_positions(%s, %s)", + (_clean(cost_centre), _clean(acc_status)), + ) + payload = cur.fetchone()[0] or {} + return JSONResponse( + { + "summary": payload.get("summary") or {}, + "geojson": payload.get("geojson") or _EMPTY_GEOJSON, + } + ) + except Exception: + log.exception("live-positions failed") + return JSONResponse( + { + "error": { + "type": "unknown", + "message": "Live-position feed is unavailable. Try again in a few seconds.", + } + } + ) + + +@app.get("/webhook/vehicle-track") +def vehicle_track(vehicle_number: str | None = None, hours: int = 1): + veh = _clean(vehicle_number) + if not veh: + return JSONResponse({"error": "vehicle_number is required"}) + hours = max(1, min(24, hours or 1)) + try: + with get_conn() as conn: + with conn.cursor() as cur: + cur.execute( + "SELECT reporting.fn_vehicle_track(%s, %s::int)", (veh, hours) + ) + feature = cur.fetchone()[0] + return JSONResponse( + feature + or {"type": "Feature", "geometry": {"type": "LineString", "coordinates": []}, "properties": {}} + ) + except Exception: + log.exception("vehicle-track failed for %s", veh) + return JSONResponse({"error": "vehicle-track unavailable"}) + + +# ── Fleet trips (#002) ─────────────────────────────────────────────────────── + +_FILTER_OPTIONS_SQL = """ +SELECT + (SELECT array_agg(driver ORDER BY driver) FROM reporting.v_filter_drivers) AS drivers, + (SELECT array_agg(cost_centre ORDER BY cost_centre) FROM reporting.v_filter_cost_centres) AS cost_centres, + (SELECT array_agg(assigned_city ORDER BY assigned_city) FROM reporting.v_filter_cities) AS cities, + (SELECT jsonb_agg(jsonb_build_object( + 'vehicle_number', vehicle_number, 'drivers', drivers, + 'cost_centre', cost_centre, 'assigned_city', assigned_city) + ORDER BY vehicle_number) FROM reporting.v_filter_vehicles) AS vehicles +""" + + +@app.get("/webhook/fleet-dashboard") +def fleet_filter_options(): + try: + with get_conn() as conn: + with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: + cur.execute(_FILTER_OPTIONS_SQL) + row = cur.fetchone() or {} + return JSONResponse( + { + "drivers": row.get("drivers") or [], + "cost_centres": row.get("cost_centres") or [], + "cities": row.get("cities") or [], + "vehicles": row.get("vehicles") or [], + } + ) + except Exception: + log.exception("fleet-dashboard filter options failed") + return JSONResponse({"drivers": [], "cost_centres": [], "cities": [], "vehicles": []}) + + +def _preset_to_range(period: str | None, start_date, end_date): + """Mirror of the n8n preset_to_range node.""" + today = datetime.now(timezone.utc).date() + p = (period or "").strip() + if p == "today": + return today, today + if p == "30d": + return today - timedelta(days=29), today + if p == "custom": + def _d(v, default): + v = _clean(v) + if not v: + return default + try: + return date.fromisoformat(v) + except ValueError: + return default + return _d(start_date, today), _d(end_date, today) + # default + '7d' + return today - timedelta(days=6), today + + +@app.post("/webhook/fleet-dashboard") +async def fleet_trips(request: Request): + try: + body = await request.json() + except Exception: + body = {} + if not isinstance(body, dict): + body = {} + body = body.get("body", body) if isinstance(body.get("body"), dict) else body + + start, end = _preset_to_range( + body.get("period"), body.get("start_date"), body.get("end_date") + ) + veh = _clean(body.get("vehicle_numbers")) # comma-separated string or None + try: + with get_conn() as conn: + with conn.cursor() as cur: + cur.execute( + """ + SELECT reporting.fn_trips_for_map( + CASE WHEN %(veh)s IS NULL THEN NULL + ELSE string_to_array(%(veh)s, ',') END, + %(driver)s, %(cc)s, %(city)s, %(start)s::date, %(end)s::date + ) + """, + { + "veh": veh, + "driver": _clean(body.get("driver")), + "cc": _clean(body.get("cost_centre")), + "city": _clean(body.get("assigned_city")), + "start": start, + "end": end, + }, + ) + payload = cur.fetchone()[0] + return JSONResponse(payload if payload is not None else {}) + except Exception: + log.exception("fleet-dashboard trips failed") + return JSONResponse( + {"error": {"type": "unknown", "message": "Fleet feed is unavailable. Try again in a few seconds."}} + ) diff --git a/docker-compose.yaml b/docker-compose.yaml index 67d10c5..5d80f6b 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -59,6 +59,31 @@ services: timeout: 5s retries: 3 + dashboard_api: + # Stable read-API for the Live Position + Fleet Trips map dashboards. + # Replaces the n8n webhooks (n8n was only a thin HTTP->SQL proxy). + # Calls reporting.fn_live_positions / fn_vehicle_track / fn_trips_for_map. + build: + context: . + dockerfile: Dockerfile + command: sh -c "uvicorn dashboard_api_rev:app --host 0.0.0.0 --port 8890 --workers 2" + restart: always + depends_on: + timescale_db: + condition: service_healthy + env_file: .env + environment: + # Browser origins allowed to call this API (the dashboard domains). + - DASHBOARD_CORS_ORIGINS=${DASHBOARD_CORS_ORIGINS:-https://liveposition.rahamafresh.com,https://fleetintelligence.rahamafresh.com} + # No host port binding — set a domain (e.g. fleetapi.rahamafresh.com) in the + # Coolify UI pointing to this service on port 8890. The dashboards then point + # their N8N_BASE at that domain; paths (/webhook/...) are unchanged. + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8890/health"] + interval: 30s + timeout: 5s + retries: 3 + grafana: build: context: ./grafana