tracksolid_timescale_grafan.../dashboard_api_rev.py
david kiania 831f683b83
Some checks are pending
Static Analysis / static (push) Waiting to run
Tests / test (push) Waiting to run
fix(api): expose /webhook/live-positions/track so map trail matches SPA path
The Live Positions SPA calls GET /webhook/live-positions/track, but the
read-API only exposed /webhook/vehicle-track. Clicking a vehicle to view its
1-hour trail therefore 404'd even after repointing N8N_BASE. Register the SPA's
actual path as a route alias to the same handler (vehicle-track kept as alias),
so the only frontend change remains the base URL. Docstring updated to match.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-05 00:54:58 +03:00

243 lines
9.5 KiB
Python

"""
dashboard_api_rev.py — Fireside Communications · Map Dashboard Read API
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Stable replacement for the n8n webhooks that fed the Live Position and Fleet
Trips map dashboards. n8n was acting only as a thin HTTP→SQL proxy; this
service does the same job directly against the proven reporting.* functions,
removing n8n's credential-management / reload / version-drift fragility from
the live-data path.
It REUSES the existing stack: ts_shared_rev's psycopg2 pool and DATABASE_URL,
the same Docker image, the same Coolify deploy. The reporting.* functions
(already verified to return correct GeoJSON) are the single source of truth.
Endpoints mirror the original n8n webhook paths so the only frontend change
is the base URL (the `N8N_BASE` constant in each dashboard SPA):
GET /webhook/live-positions?cost_centre=&acc_status=
{ summary, geojson } (reporting.fn_live_positions)
GET /webhook/live-positions/track?vehicle_number=&hours=
(alias: /webhook/vehicle-track)
→ GeoJSON Feature (reporting.fn_vehicle_track)
GET /webhook/fleet-dashboard
{ drivers, cost_centres, cities, vehicles } (filter options)
POST /webhook/fleet-dashboard body: {period, vehicle_numbers, driver,
cost_centre, assigned_city,
start_date, end_date}
→ trips payload (reporting.fn_trips_for_map)
GET /health
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
"""
from __future__ import annotations
import os
from contextlib import asynccontextmanager
from datetime import date, datetime, timedelta, timezone
import psycopg2.extras
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from ts_shared_rev import close_pool, get_conn, get_logger
log = get_logger("dashboard_api")
# Comma-separated list of allowed browser origins (the dashboard domains).
_ALLOWED_ORIGINS = [
o.strip()
for o in os.getenv(
"DASHBOARD_CORS_ORIGINS",
"https://liveposition.rahamafresh.com,https://fleetintelligence.rahamafresh.com",
).split(",")
if o.strip()
]
@asynccontextmanager
async def lifespan(app: FastAPI):
log.info("Dashboard API starting (v1.0). Origins=%s", _ALLOWED_ORIGINS)
yield
close_pool()
app = FastAPI(title="Fireside Map Dashboard API", lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=_ALLOWED_ORIGINS,
allow_methods=["GET", "POST", "OPTIONS"],
allow_headers=["*"],
)
_EMPTY_GEOJSON = {"type": "FeatureCollection", "features": []}
def _clean(v):
"""Treat missing / blank / sentinel values as None (= SQL wildcard)."""
if v is None:
return None
s = str(v).strip()
return s if s and s.lower() not in ("null", "undefined") else None
# ── Health ────────────────────────────────────────────────────────────────────
@app.get("/health")
def health():
return {"status": "ok"}
# ── Live positions (#004) ───────────────────────────────────────────────────
@app.get("/webhook/live-positions")
def live_positions(cost_centre: str | None = None, acc_status: str | None = None):
try:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT reporting.fn_live_positions(%s, %s)",
(_clean(cost_centre), _clean(acc_status)),
)
payload = cur.fetchone()[0] or {}
return JSONResponse(
{
"summary": payload.get("summary") or {},
"geojson": payload.get("geojson") or _EMPTY_GEOJSON,
}
)
except Exception:
log.exception("live-positions failed")
return JSONResponse(
{
"error": {
"type": "unknown",
"message": "Live-position feed is unavailable. Try again in a few seconds.",
}
}
)
# `/webhook/live-positions/track` is the path the Live Positions SPA actually
# calls; `/webhook/vehicle-track` is kept as an alias. Both hit the same handler
# so the only frontend change is the base URL (N8N_BASE).
@app.get("/webhook/live-positions/track")
@app.get("/webhook/vehicle-track")
def vehicle_track(vehicle_number: str | None = None, hours: int = 1):
veh = _clean(vehicle_number)
if not veh:
return JSONResponse({"error": "vehicle_number is required"})
hours = max(1, min(24, hours or 1))
try:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT reporting.fn_vehicle_track(%s, %s::int)", (veh, hours)
)
feature = cur.fetchone()[0]
return JSONResponse(
feature
or {"type": "Feature", "geometry": {"type": "LineString", "coordinates": []}, "properties": {}}
)
except Exception:
log.exception("vehicle-track failed for %s", veh)
return JSONResponse({"error": "vehicle-track unavailable"})
# ── Fleet trips (#002) ───────────────────────────────────────────────────────
_FILTER_OPTIONS_SQL = """
SELECT
(SELECT array_agg(driver ORDER BY driver) FROM reporting.v_filter_drivers) AS drivers,
(SELECT array_agg(cost_centre ORDER BY cost_centre) FROM reporting.v_filter_cost_centres) AS cost_centres,
(SELECT array_agg(assigned_city ORDER BY assigned_city) FROM reporting.v_filter_cities) AS cities,
(SELECT jsonb_agg(jsonb_build_object(
'vehicle_number', vehicle_number, 'drivers', drivers,
'cost_centre', cost_centre, 'assigned_city', assigned_city)
ORDER BY vehicle_number) FROM reporting.v_filter_vehicles) AS vehicles
"""
@app.get("/webhook/fleet-dashboard")
def fleet_filter_options():
try:
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(_FILTER_OPTIONS_SQL)
row = cur.fetchone() or {}
return JSONResponse(
{
"drivers": row.get("drivers") or [],
"cost_centres": row.get("cost_centres") or [],
"cities": row.get("cities") or [],
"vehicles": row.get("vehicles") or [],
}
)
except Exception:
log.exception("fleet-dashboard filter options failed")
return JSONResponse({"drivers": [], "cost_centres": [], "cities": [], "vehicles": []})
def _preset_to_range(period: str | None, start_date, end_date):
"""Mirror of the n8n preset_to_range node."""
today = datetime.now(timezone.utc).date()
p = (period or "").strip()
if p == "today":
return today, today
if p == "30d":
return today - timedelta(days=29), today
if p == "custom":
def _d(v, default):
v = _clean(v)
if not v:
return default
try:
return date.fromisoformat(v)
except ValueError:
return default
return _d(start_date, today), _d(end_date, today)
# default + '7d'
return today - timedelta(days=6), today
@app.post("/webhook/fleet-dashboard")
async def fleet_trips(request: Request):
try:
body = await request.json()
except Exception:
body = {}
if not isinstance(body, dict):
body = {}
body = body.get("body", body) if isinstance(body.get("body"), dict) else body
start, end = _preset_to_range(
body.get("period"), body.get("start_date"), body.get("end_date")
)
veh = _clean(body.get("vehicle_numbers")) # comma-separated string or None
try:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT reporting.fn_trips_for_map(
CASE WHEN %(veh)s IS NULL THEN NULL
ELSE string_to_array(%(veh)s, ',') END,
%(driver)s, %(cc)s, %(city)s, %(start)s::date, %(end)s::date
)
""",
{
"veh": veh,
"driver": _clean(body.get("driver")),
"cc": _clean(body.get("cost_centre")),
"city": _clean(body.get("assigned_city")),
"start": start,
"end": end,
},
)
payload = cur.fetchone()[0]
return JSONResponse(payload if payload is not None else {})
except Exception:
log.exception("fleet-dashboard trips failed")
return JSONResponse(
{"error": {"type": "unknown", "message": "Fleet feed is unavailable. Try again in a few seconds."}}
)