feat(api): self-refresh reporting.v_trips in dashboard_api
The Fleet Trips dashboard reads reporting.v_trips (a materialized view). Its refresh was a scheduled n8n workflow; when n8n was retired the matview froze (last refresh 2026-06-01) so the dashboard showed no recent trips even though tracksolid.trips kept ingesting live. Move the refresh into the owned stack: a background loop in dashboard_api runs REFRESH MATERIALIZED VIEW CONCURRENTLY reporting.v_trips every VTRIPS_REFRESH_INTERVAL_S (default 300s). Safe across uvicorn --workers via a pg advisory lock (one worker refreshes per tick); runs in a thread so the ~9s refresh never blocks the event loop; logs to reporting.refresh_log (source='dashboard_api') for continuity. Uses a dedicated autocommit connection because REFRESH ... CONCURRENTLY cannot run inside a transaction block. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
parent
f1387d1476
commit
30b351576c
1 changed files with 70 additions and 1 deletions
|
|
@ -31,12 +31,15 @@ is the base URL (the `N8N_BASE` constant in each dashboard SPA):
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
|
import time
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
from datetime import date, datetime, timedelta, timezone
|
from datetime import date, datetime, timedelta, timezone
|
||||||
from urllib.parse import parse_qs
|
from urllib.parse import parse_qs
|
||||||
|
|
||||||
|
import psycopg2
|
||||||
import psycopg2.extras
|
import psycopg2.extras
|
||||||
from fastapi import FastAPI, Request
|
from fastapi import FastAPI, Request
|
||||||
from fastapi.middleware.cors import CORSMiddleware
|
from fastapi.middleware.cors import CORSMiddleware
|
||||||
|
|
@ -57,10 +60,76 @@ _ALLOWED_ORIGINS = [
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
# ── v_trips materialized-view refresher ─────────────────────────────────────
|
||||||
|
# The Fleet Trips dashboard reads reporting.v_trips (a materialized view). Its
|
||||||
|
# refresh used to be a scheduled n8n workflow; when n8n was retired the matview
|
||||||
|
# went stale (data froze). We now keep it fresh in-process: a background loop
|
||||||
|
# refreshes it on an interval. A Postgres advisory lock makes this safe across
|
||||||
|
# uvicorn workers (only one worker refreshes per tick); the work runs in a
|
||||||
|
# thread so the async event loop never blocks on the ~9s REFRESH.
|
||||||
|
_DATABASE_URL = os.environ["DATABASE_URL"]
|
||||||
|
_REFRESH_INTERVAL_S = int(os.getenv("VTRIPS_REFRESH_INTERVAL_S", "300"))
|
||||||
|
_REFRESH_LOCK_KEY = 920_145 # arbitrary, stable advisory-lock key for this job
|
||||||
|
|
||||||
|
|
||||||
|
def _refresh_v_trips_once() -> str:
|
||||||
|
"""Refresh reporting.v_trips. Blocking — call via asyncio.to_thread.
|
||||||
|
|
||||||
|
Uses a dedicated autocommit connection: REFRESH ... CONCURRENTLY cannot run
|
||||||
|
inside a transaction block (so the pooled get_conn, which wraps a txn, won't
|
||||||
|
do). DATABASE_URL connects as a superuser, which may REFRESH the matview
|
||||||
|
even though reporting_refresher owns it.
|
||||||
|
"""
|
||||||
|
conn = psycopg2.connect(_DATABASE_URL, connect_timeout=10)
|
||||||
|
try:
|
||||||
|
conn.autocommit = True
|
||||||
|
with conn.cursor() as cur:
|
||||||
|
cur.execute("SELECT pg_try_advisory_lock(%s)", (_REFRESH_LOCK_KEY,))
|
||||||
|
if not cur.fetchone()[0]:
|
||||||
|
return "skipped (another worker holds the lock)"
|
||||||
|
try:
|
||||||
|
t0 = time.monotonic()
|
||||||
|
cur.execute("REFRESH MATERIALIZED VIEW CONCURRENTLY reporting.v_trips")
|
||||||
|
dur_ms = int((time.monotonic() - t0) * 1000)
|
||||||
|
cur.execute(
|
||||||
|
"INSERT INTO reporting.refresh_log"
|
||||||
|
"(refreshed_at, source, duration_ms, row_count, notes) "
|
||||||
|
"VALUES (now(), 'dashboard_api', %s,"
|
||||||
|
" (SELECT count(*) FROM reporting.v_trips), 'scheduled')",
|
||||||
|
(dur_ms,),
|
||||||
|
)
|
||||||
|
return f"refreshed in {dur_ms}ms"
|
||||||
|
finally:
|
||||||
|
cur.execute("SELECT pg_advisory_unlock(%s)", (_REFRESH_LOCK_KEY,))
|
||||||
|
finally:
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
async def _refresh_loop():
|
||||||
|
# Brief startup delay so the first refresh doesn't race container init.
|
||||||
|
await asyncio.sleep(15)
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
result = await asyncio.to_thread(_refresh_v_trips_once)
|
||||||
|
log.info("v_trips refresh: %s", result)
|
||||||
|
except Exception:
|
||||||
|
log.exception("v_trips refresh failed (will retry next interval)")
|
||||||
|
await asyncio.sleep(_REFRESH_INTERVAL_S)
|
||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def lifespan(app: FastAPI):
|
async def lifespan(app: FastAPI):
|
||||||
log.info("Dashboard API starting (v1.0). Origins=%s", _ALLOWED_ORIGINS)
|
log.info(
|
||||||
|
"Dashboard API starting (v1.1). Origins=%s. v_trips refresh every %ss.",
|
||||||
|
_ALLOWED_ORIGINS, _REFRESH_INTERVAL_S,
|
||||||
|
)
|
||||||
|
refresher = asyncio.create_task(_refresh_loop())
|
||||||
yield
|
yield
|
||||||
|
refresher.cancel()
|
||||||
|
try:
|
||||||
|
await refresher
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
close_pool()
|
close_pool()
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue