diff --git a/dashboard_api_rev.py b/dashboard_api_rev.py index 92985c4..98530e1 100644 --- a/dashboard_api_rev.py +++ b/dashboard_api_rev.py @@ -31,12 +31,15 @@ is the base URL (the `N8N_BASE` constant in each dashboard SPA): from __future__ import annotations +import asyncio import json import os +import time from contextlib import asynccontextmanager from datetime import date, datetime, timedelta, timezone from urllib.parse import parse_qs +import psycopg2 import psycopg2.extras from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware @@ -57,10 +60,76 @@ _ALLOWED_ORIGINS = [ ] +# ── v_trips materialized-view refresher ───────────────────────────────────── +# The Fleet Trips dashboard reads reporting.v_trips (a materialized view). Its +# refresh used to be a scheduled n8n workflow; when n8n was retired the matview +# went stale (data froze). We now keep it fresh in-process: a background loop +# refreshes it on an interval. A Postgres advisory lock makes this safe across +# uvicorn workers (only one worker refreshes per tick); the work runs in a +# thread so the async event loop never blocks on the ~9s REFRESH. +_DATABASE_URL = os.environ["DATABASE_URL"] +_REFRESH_INTERVAL_S = int(os.getenv("VTRIPS_REFRESH_INTERVAL_S", "300")) +_REFRESH_LOCK_KEY = 920_145 # arbitrary, stable advisory-lock key for this job + + +def _refresh_v_trips_once() -> str: + """Refresh reporting.v_trips. Blocking — call via asyncio.to_thread. + + Uses a dedicated autocommit connection: REFRESH ... CONCURRENTLY cannot run + inside a transaction block (so the pooled get_conn, which wraps a txn, won't + do). DATABASE_URL connects as a superuser, which may REFRESH the matview + even though reporting_refresher owns it. + """ + conn = psycopg2.connect(_DATABASE_URL, connect_timeout=10) + try: + conn.autocommit = True + with conn.cursor() as cur: + cur.execute("SELECT pg_try_advisory_lock(%s)", (_REFRESH_LOCK_KEY,)) + if not cur.fetchone()[0]: + return "skipped (another worker holds the lock)" + try: + t0 = time.monotonic() + cur.execute("REFRESH MATERIALIZED VIEW CONCURRENTLY reporting.v_trips") + dur_ms = int((time.monotonic() - t0) * 1000) + cur.execute( + "INSERT INTO reporting.refresh_log" + "(refreshed_at, source, duration_ms, row_count, notes) " + "VALUES (now(), 'dashboard_api', %s," + " (SELECT count(*) FROM reporting.v_trips), 'scheduled')", + (dur_ms,), + ) + return f"refreshed in {dur_ms}ms" + finally: + cur.execute("SELECT pg_advisory_unlock(%s)", (_REFRESH_LOCK_KEY,)) + finally: + conn.close() + + +async def _refresh_loop(): + # Brief startup delay so the first refresh doesn't race container init. + await asyncio.sleep(15) + while True: + try: + result = await asyncio.to_thread(_refresh_v_trips_once) + log.info("v_trips refresh: %s", result) + except Exception: + log.exception("v_trips refresh failed (will retry next interval)") + await asyncio.sleep(_REFRESH_INTERVAL_S) + + @asynccontextmanager async def lifespan(app: FastAPI): - log.info("Dashboard API starting (v1.0). Origins=%s", _ALLOWED_ORIGINS) + log.info( + "Dashboard API starting (v1.1). Origins=%s. v_trips refresh every %ss.", + _ALLOWED_ORIGINS, _REFRESH_INTERVAL_S, + ) + refresher = asyncio.create_task(_refresh_loop()) yield + refresher.cancel() + try: + await refresher + except asyncio.CancelledError: + pass close_pool()