tracksolid_timescale_grafan.../dashboard_api_rev.py
david kiania 30b351576c
Some checks are pending
Static Analysis / static (push) Waiting to run
Tests / test (push) Waiting to run
feat(api): self-refresh reporting.v_trips in dashboard_api
The Fleet Trips dashboard reads reporting.v_trips (a materialized view).
Its refresh was a scheduled n8n workflow; when n8n was retired the matview
froze (last refresh 2026-06-01) so the dashboard showed no recent trips
even though tracksolid.trips kept ingesting live.

Move the refresh into the owned stack: a background loop in dashboard_api
runs REFRESH MATERIALIZED VIEW CONCURRENTLY reporting.v_trips every
VTRIPS_REFRESH_INTERVAL_S (default 300s). Safe across uvicorn --workers
via a pg advisory lock (one worker refreshes per tick); runs in a thread
so the ~9s refresh never blocks the event loop; logs to
reporting.refresh_log (source='dashboard_api') for continuity. Uses a
dedicated autocommit connection because REFRESH ... CONCURRENTLY cannot
run inside a transaction block.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-05 16:25:45 +03:00

324 lines
13 KiB
Python

"""
dashboard_api_rev.py — Fireside Communications · Map Dashboard Read API
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Stable replacement for the n8n webhooks that fed the Live Position and Fleet
Trips map dashboards. n8n was acting only as a thin HTTP→SQL proxy; this
service does the same job directly against the proven reporting.* functions,
removing n8n's credential-management / reload / version-drift fragility from
the live-data path.
It REUSES the existing stack: ts_shared_rev's psycopg2 pool and DATABASE_URL,
the same Docker image, the same Coolify deploy. The reporting.* functions
(already verified to return correct GeoJSON) are the single source of truth.
Endpoints mirror the original n8n webhook paths so the only frontend change
is the base URL (the `N8N_BASE` constant in each dashboard SPA):
GET /webhook/live-positions?cost_centre=&acc_status=
{ summary, geojson } (reporting.fn_live_positions)
GET /webhook/live-positions/track?vehicle_number=&hours=
(alias: /webhook/vehicle-track)
→ GeoJSON Feature (reporting.fn_vehicle_track)
GET /webhook/fleet-dashboard
{ drivers, cost_centres, cities, vehicles } (filter options)
POST /webhook/fleet-dashboard body: {period, vehicle_numbers, driver,
cost_centre, assigned_city,
start_date, end_date}
→ trips payload (reporting.fn_trips_for_map)
GET /health
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
"""
from __future__ import annotations
import asyncio
import json
import os
import time
from contextlib import asynccontextmanager
from datetime import date, datetime, timedelta, timezone
from urllib.parse import parse_qs
import psycopg2
import psycopg2.extras
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from ts_shared_rev import close_pool, get_conn, get_logger
log = get_logger("dashboard_api")
# Comma-separated list of allowed browser origins (the dashboard domains).
_ALLOWED_ORIGINS = [
o.strip()
for o in os.getenv(
"DASHBOARD_CORS_ORIGINS",
"https://liveposition.rahamafresh.com,https://fleetintelligence.rahamafresh.com",
).split(",")
if o.strip()
]
# ── v_trips materialized-view refresher ─────────────────────────────────────
# The Fleet Trips dashboard reads reporting.v_trips (a materialized view). Its
# refresh used to be a scheduled n8n workflow; when n8n was retired the matview
# went stale (data froze). We now keep it fresh in-process: a background loop
# refreshes it on an interval. A Postgres advisory lock makes this safe across
# uvicorn workers (only one worker refreshes per tick); the work runs in a
# thread so the async event loop never blocks on the ~9s REFRESH.
_DATABASE_URL = os.environ["DATABASE_URL"]
_REFRESH_INTERVAL_S = int(os.getenv("VTRIPS_REFRESH_INTERVAL_S", "300"))
_REFRESH_LOCK_KEY = 920_145 # arbitrary, stable advisory-lock key for this job
def _refresh_v_trips_once() -> str:
"""Refresh reporting.v_trips. Blocking — call via asyncio.to_thread.
Uses a dedicated autocommit connection: REFRESH ... CONCURRENTLY cannot run
inside a transaction block (so the pooled get_conn, which wraps a txn, won't
do). DATABASE_URL connects as a superuser, which may REFRESH the matview
even though reporting_refresher owns it.
"""
conn = psycopg2.connect(_DATABASE_URL, connect_timeout=10)
try:
conn.autocommit = True
with conn.cursor() as cur:
cur.execute("SELECT pg_try_advisory_lock(%s)", (_REFRESH_LOCK_KEY,))
if not cur.fetchone()[0]:
return "skipped (another worker holds the lock)"
try:
t0 = time.monotonic()
cur.execute("REFRESH MATERIALIZED VIEW CONCURRENTLY reporting.v_trips")
dur_ms = int((time.monotonic() - t0) * 1000)
cur.execute(
"INSERT INTO reporting.refresh_log"
"(refreshed_at, source, duration_ms, row_count, notes) "
"VALUES (now(), 'dashboard_api', %s,"
" (SELECT count(*) FROM reporting.v_trips), 'scheduled')",
(dur_ms,),
)
return f"refreshed in {dur_ms}ms"
finally:
cur.execute("SELECT pg_advisory_unlock(%s)", (_REFRESH_LOCK_KEY,))
finally:
conn.close()
async def _refresh_loop():
# Brief startup delay so the first refresh doesn't race container init.
await asyncio.sleep(15)
while True:
try:
result = await asyncio.to_thread(_refresh_v_trips_once)
log.info("v_trips refresh: %s", result)
except Exception:
log.exception("v_trips refresh failed (will retry next interval)")
await asyncio.sleep(_REFRESH_INTERVAL_S)
@asynccontextmanager
async def lifespan(app: FastAPI):
log.info(
"Dashboard API starting (v1.1). Origins=%s. v_trips refresh every %ss.",
_ALLOWED_ORIGINS, _REFRESH_INTERVAL_S,
)
refresher = asyncio.create_task(_refresh_loop())
yield
refresher.cancel()
try:
await refresher
except asyncio.CancelledError:
pass
close_pool()
app = FastAPI(title="Fireside Map Dashboard API", lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=_ALLOWED_ORIGINS,
allow_methods=["GET", "POST", "OPTIONS"],
allow_headers=["*"],
)
_EMPTY_GEOJSON = {"type": "FeatureCollection", "features": []}
def _clean(v):
"""Treat missing / blank / sentinel values as None (= SQL wildcard)."""
if v is None:
return None
s = str(v).strip()
return s if s and s.lower() not in ("null", "undefined") else None
# ── Health ────────────────────────────────────────────────────────────────────
@app.get("/health")
def health():
return {"status": "ok"}
# ── Live positions (#004) ───────────────────────────────────────────────────
@app.get("/webhook/live-positions")
def live_positions(cost_centre: str | None = None, acc_status: str | None = None):
try:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT reporting.fn_live_positions(%s, %s)",
(_clean(cost_centre), _clean(acc_status)),
)
payload = cur.fetchone()[0] or {}
return JSONResponse(
{
"summary": payload.get("summary") or {},
"geojson": payload.get("geojson") or _EMPTY_GEOJSON,
}
)
except Exception:
log.exception("live-positions failed")
return JSONResponse(
{
"error": {
"type": "unknown",
"message": "Live-position feed is unavailable. Try again in a few seconds.",
}
}
)
# `/webhook/live-positions/track` is the path the Live Positions SPA actually
# calls; `/webhook/vehicle-track` is kept as an alias. Both hit the same handler
# so the only frontend change is the base URL (N8N_BASE).
@app.get("/webhook/live-positions/track")
@app.get("/webhook/vehicle-track")
def vehicle_track(vehicle_number: str | None = None, hours: int = 1):
veh = _clean(vehicle_number)
if not veh:
return JSONResponse({"error": "vehicle_number is required"})
hours = max(1, min(24, hours or 1))
try:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT reporting.fn_vehicle_track(%s, %s::int)", (veh, hours)
)
feature = cur.fetchone()[0]
return JSONResponse(
feature
or {"type": "Feature", "geometry": {"type": "LineString", "coordinates": []}, "properties": {}}
)
except Exception:
log.exception("vehicle-track failed for %s", veh)
return JSONResponse({"error": "vehicle-track unavailable"})
# ── Fleet trips (#002) ───────────────────────────────────────────────────────
_FILTER_OPTIONS_SQL = """
SELECT
(SELECT array_agg(driver ORDER BY driver) FROM reporting.v_filter_drivers) AS drivers,
(SELECT array_agg(cost_centre ORDER BY cost_centre) FROM reporting.v_filter_cost_centres) AS cost_centres,
(SELECT array_agg(assigned_city ORDER BY assigned_city) FROM reporting.v_filter_cities) AS cities,
(SELECT jsonb_agg(jsonb_build_object(
'vehicle_number', vehicle_number, 'drivers', drivers,
'cost_centre', cost_centre, 'assigned_city', assigned_city)
ORDER BY vehicle_number) FROM reporting.v_filter_vehicles) AS vehicles
"""
@app.get("/webhook/fleet-dashboard")
def fleet_filter_options():
try:
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(_FILTER_OPTIONS_SQL)
row = cur.fetchone() or {}
return JSONResponse(
{
"drivers": row.get("drivers") or [],
"cost_centres": row.get("cost_centres") or [],
"cities": row.get("cities") or [],
"vehicles": row.get("vehicles") or [],
}
)
except Exception:
log.exception("fleet-dashboard filter options failed")
return JSONResponse({"drivers": [], "cost_centres": [], "cities": [], "vehicles": []})
def _preset_to_range(period: str | None, start_date, end_date):
"""Mirror of the n8n preset_to_range node."""
today = datetime.now(timezone.utc).date()
p = (period or "").strip()
if p == "today":
return today, today
if p == "30d":
return today - timedelta(days=29), today
if p == "custom":
def _d(v, default):
v = _clean(v)
if not v:
return default
try:
return date.fromisoformat(v)
except ValueError:
return default
return _d(start_date, today), _d(end_date, today)
# default + '7d'
return today - timedelta(days=6), today
@app.post("/webhook/fleet-dashboard")
async def fleet_trips(request: Request):
# The dashboard SPA posts application/x-www-form-urlencoded (not JSON), so
# parse by content-type. Reading the raw body + parse_qs avoids pulling in
# python-multipart. JSON is still accepted defensively (n8n-compat callers).
body: dict = {}
ctype = request.headers.get("content-type", "").lower()
try:
raw = await request.body()
if "application/json" in ctype:
parsed = json.loads(raw or b"{}")
body = parsed if isinstance(parsed, dict) else {}
else:
# x-www-form-urlencoded — parse_qs yields lists; keep the last value.
body = {k: v[-1] for k, v in parse_qs(raw.decode("utf-8", "replace")).items()}
except Exception:
body = {}
if isinstance(body.get("body"), dict):
body = body["body"]
start, end = _preset_to_range(
body.get("period"), body.get("start_date"), body.get("end_date")
)
veh = _clean(body.get("vehicle_numbers")) # comma-separated string or None
try:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT reporting.fn_trips_for_map(
CASE WHEN %(veh)s IS NULL THEN NULL
ELSE string_to_array(%(veh)s, ',') END,
%(driver)s, %(cc)s, %(city)s, %(start)s::date, %(end)s::date
)
""",
{
"veh": veh,
"driver": _clean(body.get("driver")),
"cc": _clean(body.get("cost_centre")),
"city": _clean(body.get("assigned_city")),
"start": start,
"end": end,
},
)
payload = cur.fetchone()[0]
return JSONResponse(payload if payload is not None else {})
except Exception:
log.exception("fleet-dashboard trips failed")
return JSONResponse(
{"error": {"type": "unknown", "message": "Fleet feed is unavailable. Try again in a few seconds."}}
)