feat(api): dedicated FastAPI read-API for map dashboards (replaces n8n)
Some checks failed
Static Analysis / static (push) Has been cancelled
Tests / test (push) Has been cancelled

n8n was a thin HTTP->SQL proxy for the Live Position and Fleet Trips maps and
proved fragile (credential reloads, :latest drift, shared connection limits).
This service calls the same proven reporting.* functions directly, reusing the
existing psycopg2 pool / Docker image / Coolify deploy.

Endpoints mirror the n8n webhook paths so the only frontend change is N8N_BASE:
  GET  /webhook/live-positions  -> {summary, geojson}   (fn_live_positions)
  GET  /webhook/vehicle-track   -> GeoJSON Feature       (fn_vehicle_track)
  GET  /webhook/fleet-dashboard -> filter options
  POST /webhook/fleet-dashboard -> trips payload         (fn_trips_for_map)

Response shapes replicate the n8n "Build response JSON" nodes exactly; empty
filters/sentinels ('', null, undefined) normalize to SQL wildcards. CORS limited
to the dashboard origins. Added dashboard_api service to docker-compose (port
8890, Coolify-routed). SQL contracts validated against prod.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
david kiania 2026-06-01 04:23:37 +03:00
parent e5b0e192d8
commit 5703d70aa6
2 changed files with 263 additions and 0 deletions

238
dashboard_api_rev.py Normal file
View file

@ -0,0 +1,238 @@
"""
dashboard_api_rev.py Fireside Communications · Map Dashboard Read API
Stable replacement for the n8n webhooks that fed the Live Position and Fleet
Trips map dashboards. n8n was acting only as a thin HTTPSQL proxy; this
service does the same job directly against the proven reporting.* functions,
removing n8n's credential-management / reload / version-drift fragility from
the live-data path.
It REUSES the existing stack: ts_shared_rev's psycopg2 pool and DATABASE_URL,
the same Docker image, the same Coolify deploy. The reporting.* functions
(already verified to return correct GeoJSON) are the single source of truth.
Endpoints mirror the original n8n webhook paths so the only frontend change
is the base URL (the `N8N_BASE` constant in each dashboard SPA):
GET /webhook/live-positions?cost_centre=&acc_status=
{ summary, geojson } (reporting.fn_live_positions)
GET /webhook/vehicle-track?vehicle_number=&hours=
GeoJSON Feature (reporting.fn_vehicle_track)
GET /webhook/fleet-dashboard
{ drivers, cost_centres, cities, vehicles } (filter options)
POST /webhook/fleet-dashboard body: {period, vehicle_numbers, driver,
cost_centre, assigned_city,
start_date, end_date}
trips payload (reporting.fn_trips_for_map)
GET /health
"""
from __future__ import annotations
import os
from contextlib import asynccontextmanager
from datetime import date, datetime, timedelta, timezone
import psycopg2.extras
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from ts_shared_rev import close_pool, get_conn, get_logger
log = get_logger("dashboard_api")
# Comma-separated list of allowed browser origins (the dashboard domains).
_ALLOWED_ORIGINS = [
o.strip()
for o in os.getenv(
"DASHBOARD_CORS_ORIGINS",
"https://liveposition.rahamafresh.com,https://fleetintelligence.rahamafresh.com",
).split(",")
if o.strip()
]
@asynccontextmanager
async def lifespan(app: FastAPI):
log.info("Dashboard API starting (v1.0). Origins=%s", _ALLOWED_ORIGINS)
yield
close_pool()
app = FastAPI(title="Fireside Map Dashboard API", lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=_ALLOWED_ORIGINS,
allow_methods=["GET", "POST", "OPTIONS"],
allow_headers=["*"],
)
_EMPTY_GEOJSON = {"type": "FeatureCollection", "features": []}
def _clean(v):
"""Treat missing / blank / sentinel values as None (= SQL wildcard)."""
if v is None:
return None
s = str(v).strip()
return s if s and s.lower() not in ("null", "undefined") else None
# ── Health ────────────────────────────────────────────────────────────────────
@app.get("/health")
def health():
return {"status": "ok"}
# ── Live positions (#004) ───────────────────────────────────────────────────
@app.get("/webhook/live-positions")
def live_positions(cost_centre: str | None = None, acc_status: str | None = None):
try:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT reporting.fn_live_positions(%s, %s)",
(_clean(cost_centre), _clean(acc_status)),
)
payload = cur.fetchone()[0] or {}
return JSONResponse(
{
"summary": payload.get("summary") or {},
"geojson": payload.get("geojson") or _EMPTY_GEOJSON,
}
)
except Exception:
log.exception("live-positions failed")
return JSONResponse(
{
"error": {
"type": "unknown",
"message": "Live-position feed is unavailable. Try again in a few seconds.",
}
}
)
@app.get("/webhook/vehicle-track")
def vehicle_track(vehicle_number: str | None = None, hours: int = 1):
veh = _clean(vehicle_number)
if not veh:
return JSONResponse({"error": "vehicle_number is required"})
hours = max(1, min(24, hours or 1))
try:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT reporting.fn_vehicle_track(%s, %s::int)", (veh, hours)
)
feature = cur.fetchone()[0]
return JSONResponse(
feature
or {"type": "Feature", "geometry": {"type": "LineString", "coordinates": []}, "properties": {}}
)
except Exception:
log.exception("vehicle-track failed for %s", veh)
return JSONResponse({"error": "vehicle-track unavailable"})
# ── Fleet trips (#002) ───────────────────────────────────────────────────────
_FILTER_OPTIONS_SQL = """
SELECT
(SELECT array_agg(driver ORDER BY driver) FROM reporting.v_filter_drivers) AS drivers,
(SELECT array_agg(cost_centre ORDER BY cost_centre) FROM reporting.v_filter_cost_centres) AS cost_centres,
(SELECT array_agg(assigned_city ORDER BY assigned_city) FROM reporting.v_filter_cities) AS cities,
(SELECT jsonb_agg(jsonb_build_object(
'vehicle_number', vehicle_number, 'drivers', drivers,
'cost_centre', cost_centre, 'assigned_city', assigned_city)
ORDER BY vehicle_number) FROM reporting.v_filter_vehicles) AS vehicles
"""
@app.get("/webhook/fleet-dashboard")
def fleet_filter_options():
try:
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(_FILTER_OPTIONS_SQL)
row = cur.fetchone() or {}
return JSONResponse(
{
"drivers": row.get("drivers") or [],
"cost_centres": row.get("cost_centres") or [],
"cities": row.get("cities") or [],
"vehicles": row.get("vehicles") or [],
}
)
except Exception:
log.exception("fleet-dashboard filter options failed")
return JSONResponse({"drivers": [], "cost_centres": [], "cities": [], "vehicles": []})
def _preset_to_range(period: str | None, start_date, end_date):
"""Mirror of the n8n preset_to_range node."""
today = datetime.now(timezone.utc).date()
p = (period or "").strip()
if p == "today":
return today, today
if p == "30d":
return today - timedelta(days=29), today
if p == "custom":
def _d(v, default):
v = _clean(v)
if not v:
return default
try:
return date.fromisoformat(v)
except ValueError:
return default
return _d(start_date, today), _d(end_date, today)
# default + '7d'
return today - timedelta(days=6), today
@app.post("/webhook/fleet-dashboard")
async def fleet_trips(request: Request):
try:
body = await request.json()
except Exception:
body = {}
if not isinstance(body, dict):
body = {}
body = body.get("body", body) if isinstance(body.get("body"), dict) else body
start, end = _preset_to_range(
body.get("period"), body.get("start_date"), body.get("end_date")
)
veh = _clean(body.get("vehicle_numbers")) # comma-separated string or None
try:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT reporting.fn_trips_for_map(
CASE WHEN %(veh)s IS NULL THEN NULL
ELSE string_to_array(%(veh)s, ',') END,
%(driver)s, %(cc)s, %(city)s, %(start)s::date, %(end)s::date
)
""",
{
"veh": veh,
"driver": _clean(body.get("driver")),
"cc": _clean(body.get("cost_centre")),
"city": _clean(body.get("assigned_city")),
"start": start,
"end": end,
},
)
payload = cur.fetchone()[0]
return JSONResponse(payload if payload is not None else {})
except Exception:
log.exception("fleet-dashboard trips failed")
return JSONResponse(
{"error": {"type": "unknown", "message": "Fleet feed is unavailable. Try again in a few seconds."}}
)

View file

@ -59,6 +59,31 @@ services:
timeout: 5s timeout: 5s
retries: 3 retries: 3
dashboard_api:
# Stable read-API for the Live Position + Fleet Trips map dashboards.
# Replaces the n8n webhooks (n8n was only a thin HTTP->SQL proxy).
# Calls reporting.fn_live_positions / fn_vehicle_track / fn_trips_for_map.
build:
context: .
dockerfile: Dockerfile
command: sh -c "uvicorn dashboard_api_rev:app --host 0.0.0.0 --port 8890 --workers 2"
restart: always
depends_on:
timescale_db:
condition: service_healthy
env_file: .env
environment:
# Browser origins allowed to call this API (the dashboard domains).
- DASHBOARD_CORS_ORIGINS=${DASHBOARD_CORS_ORIGINS:-https://liveposition.rahamafresh.com,https://fleetintelligence.rahamafresh.com}
# No host port binding — set a domain (e.g. fleetapi.rahamafresh.com) in the
# Coolify UI pointing to this service on port 8890. The dashboards then point
# their N8N_BASE at that domain; paths (/webhook/...) are unchanged.
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8890/health"]
interval: 30s
timeout: 5s
retries: 3
grafana: grafana:
build: build:
context: ./grafana context: ./grafana