diff --git a/dashboard_api_rev.py b/dashboard_api_rev.py index 8be9e5b..e39d0fc 100644 --- a/dashboard_api_rev.py +++ b/dashboard_api_rev.py @@ -180,6 +180,33 @@ def health(): return {"status": "ok"} +# ── Ingest pipeline freshness ──────────────────────────────────────────────── +# Replaces the Grafana pipeline-health panels (Grafana removed 2026-06-10). +# Reads reporting.v_ingest_health (migration 19) — one row per ingest endpoint +# with last-run age + freshness verdict (ok|stale|error). Lets FleetOps show +# whether the ingest_worker pollers are alive without a separate dashboard product. +@app.get("/health/ingest") +def ingest_health(): + try: + with get_conn() as conn: + with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: + cur.execute("SELECT * FROM reporting.v_ingest_health") + rows = cur.fetchall() + worst = ( + "error" if any(r["freshness"] == "error" for r in rows) + else "stale" if any(r["freshness"] == "stale" for r in rows) + else "ok" + ) if rows else "unknown" + return JSONResponse({"overall": worst, "endpoints": rows}) + except Exception: + log.exception("ingest-health failed") + return JSONResponse( + {"overall": "unknown", "endpoints": [], + "error": {"type": "unknown", + "message": "Ingest-health feed is unavailable. Try again in a few seconds."}} + ) + + # ── Live positions (#004) ─────────────────────────────────────────────────── @app.get("/webhook/live-positions") diff --git a/docker-compose.yaml b/docker-compose.yaml index 5d80f6b..bfe2018 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -19,22 +19,14 @@ services: timeout: 5s retries: 5 - ingest_movement: + ingest_worker: + # Merged movement + events pollers (was ingest_movement + ingest_events). + # Both pipelines run in one process via ingest_worker_rev.py — same image, + # same shared connection pool, one `schedule` loop. See ingest_worker_rev.py. build: context: . dockerfile: Dockerfile - command: sh -c "python run_migrations.py && python ingest_movement_rev.py" - restart: always - depends_on: - timescale_db: - condition: service_healthy - env_file: .env - - ingest_events: - build: - context: . - dockerfile: Dockerfile - command: sh -c "python run_migrations.py && python ingest_events_rev.py" + command: sh -c "python run_migrations.py && python ingest_worker_rev.py" restart: always depends_on: timescale_db: @@ -84,61 +76,19 @@ services: timeout: 5s retries: 3 - grafana: - build: - context: ./grafana - dockerfile: Dockerfile - restart: always - depends_on: - timescale_db: - condition: service_healthy - env_file: .env - environment: - - GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_ADMIN_PASSWORD} - - GF_USERS_DEFAULT_THEME=dark - - GF_DASHBOARDS_DEFAULT_HOME_DASHBOARD_PATH=/etc/grafana/provisioning/dashboards-json/noc_fleet_dashboard.json - volumes: - - grafana-data:/var/lib/grafana - # Provisioning is baked into the image via grafana/Dockerfile — no bind mount needed. - # COOLIFY DOMAIN LOGIC: - # You will set the actual URL in the Coolify UI, - # but the service needs to expose port 3000 internally. + # grafana — REMOVED 2026-06-10. Fleet visualisation/KPIs are now served by the + # FleetOps SPA (own repo) via the dashboard_api read layer. Pipeline freshness + # (the one thing only Grafana surfaced) is replaced by reporting.v_ingest_health + # (migration 19) exposed on the read-API. The grafana_ro role + reporting.* + # grants are retained (harmless, reusable). Provisioning kept in ./grafana for + # reference. To restore, re-add this service block. - pgbouncer: - # Connection pooler in front of timescale_db. - # Runbook: docs/reference/260507_pgbouncer_deployment.md - # Internal Docker network only — no host port. SCRAM passthrough via - # auth_query against the public.user_lookup() function (migration 10). - image: edoburu/pgbouncer - restart: always - depends_on: - timescale_db: - condition: service_healthy - env_file: .env - environment: - - DB_HOST=timescale_db - - DB_PORT=5432 - - DB_USER=${POSTGRES_USER} - - DB_PASSWORD=${POSTGRES_PASSWORD} - - DB_NAME=${POSTGRES_DB} - - POOL_MODE=transaction - - AUTH_TYPE=scram-sha-256 - - AUTH_USER=pgbouncer - # $$1 escapes docker-compose interpolation; pgbouncer sees literal $1. - - AUTH_QUERY=SELECT uname, phash FROM public.user_lookup($$1) - - MAX_CLIENT_CONN=200 - - DEFAULT_POOL_SIZE=15 - - MIN_POOL_SIZE=2 - - RESERVE_POOL_SIZE=5 - - SERVER_RESET_QUERY=DISCARD ALL - - SERVER_IDLE_TIMEOUT=600 - - ADMIN_USERS=${POSTGRES_USER} - - LISTEN_PORT=6432 - healthcheck: - test: ["CMD-SHELL", "pg_isready -h 127.0.0.1 -p 6432 -U ${POSTGRES_USER}"] - interval: 30s - timeout: 5s - retries: 3 + # pgbouncer — REMOVED 2026-06-10. It was deployed but dormant (zero clients + # pointed at :6432; every service connects directly to timescale_db:5432). + # In-process pooling (ts_shared_rev ThreadedConnectionPool) is more than + # sufficient at this scale, and transaction-mode pooling is unsafe for the + # advisory-lock'd v_trips refresher (FIX-D02). Migration 10 (pgbouncer role + + # user_lookup()) is left applied but inert. To restore, re-add this service block. db_backup: build: @@ -164,5 +114,4 @@ services: volumes: timescale-data: name: timescale-data - grafana-data: - name: grafana-data + # grafana-data removed with the grafana service (2026-06-10). diff --git a/ingest_events_rev.py b/ingest_events_rev.py index 627b932..5fa0c1c 100644 --- a/ingest_events_rev.py +++ b/ingest_events_rev.py @@ -105,16 +105,24 @@ def poll_alarms(): # ── Main Loop ───────────────────────────────────────────────────────────────── -def main(): - log.info("Starting EVENTS PIPELINE (v2.1)...") - # OBD removed: Data arrives via webhook push (/pushobd), not polling. - - # Startup catch-up +def startup_catchup(): + """Run the alarm poll once on boot. Split out of main() so the merged + ingest_worker can reuse it (DRY). + OBD removed: data arrives via webhook push (/pushobd), not polling.""" safe_task(poll_alarms, log)() - # Schedule + +def register_jobs(): + """Register the events jobs on the global `schedule` scheduler. + Reused by both this module's main() and ingest_worker_rev.main().""" schedule.every(5).minutes.do(safe_task(poll_alarms, log)) + +def main(): + log.info("Starting EVENTS PIPELINE (v2.1)...") + startup_catchup() + register_jobs() + while True: schedule.run_pending() time.sleep(1) diff --git a/ingest_movement_rev.py b/ingest_movement_rev.py index a618520..dc77fe6 100644 --- a/ingest_movement_rev.py +++ b/ingest_movement_rev.py @@ -671,10 +671,9 @@ def poll_stale_locations(): # ── Main Loop ───────────────────────────────────────────────────────────────── -def main(): - log.info("Starting MOVEMENT PIPELINE (v2.2)...") - - # Startup catch-up +def startup_catchup(): + """Run every movement task once on boot so the DB is warm immediately. + Split out of main() so the merged ingest_worker can reuse it (DRY).""" safe_task(sync_devices, log)() safe_task(poll_live_positions, log)() safe_task(poll_trips, log)() @@ -682,7 +681,10 @@ def main(): safe_task(poll_track_list, log)() safe_task(poll_stale_locations, log)() - # Schedule + +def register_jobs(): + """Register the movement jobs on the global `schedule` scheduler. + Reused by both this module's main() and ingest_worker_rev.main().""" schedule.every(60).seconds.do(safe_task(poll_live_positions, log)) schedule.every(15).minutes.do(safe_task(poll_trips, log)) schedule.every(15).minutes.do(safe_task(poll_parking, log)) @@ -690,6 +692,12 @@ def main(): schedule.every(10).minutes.do(safe_task(poll_stale_locations, log)) # [FIX-M21] schedule.every().day.at("02:00").do(safe_task(sync_devices, log)) + +def main(): + log.info("Starting MOVEMENT PIPELINE (v2.2)...") + startup_catchup() + register_jobs() + while True: schedule.run_pending() time.sleep(1) diff --git a/ingest_worker_rev.py b/ingest_worker_rev.py new file mode 100644 index 0000000..64d1281 --- /dev/null +++ b/ingest_worker_rev.py @@ -0,0 +1,52 @@ +""" +ingest_worker_rev.py — Fireside Communications · Merged Ingest Worker +━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ +RESPONSIBILITY: Run the movement and events polling pipelines in a single +process. Consolidates the former `ingest_movement` and `ingest_events` +containers into one `ingest_worker` service. + +WHY ONE PROCESS: both pipelines were identical in shape — blocking +`while True: schedule.run_pending()` daemons that register jobs onto the +`schedule` library's module-global default scheduler and share the same +ts_shared_rev ThreadedConnectionPool. Driving every job from one +run_pending() loop is strictly equivalent to running them separately, with +one fewer container, one log stream, and one connection pool. + +The inbound `webhook_receiver` is deliberately NOT merged here: pushed +device data is unrecoverable, so it stays isolated from poller faults. + +Standalone entrypoints (`python ingest_movement_rev.py`, +`python ingest_events_rev.py`) remain intact for local debugging — this +module only reuses their startup_catchup()/register_jobs() helpers. +━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ +""" + +import time +import schedule + +from ts_shared_rev import get_logger, setup_shutdown +import ingest_movement_rev as mv +import ingest_events_rev as ev + +log = get_logger("ingest_worker") + + +def main(): + log.info("Starting INGEST WORKER — merged MOVEMENT + EVENTS pipelines") + setup_shutdown(log) # one SIGTERM/SIGINT handler for the shared DB pool + + # Startup catch-up — warm both pipelines immediately. + mv.startup_catchup() + ev.startup_catchup() + + # Register every job onto the shared global `schedule` scheduler. + mv.register_jobs() + ev.register_jobs() + + while True: + schedule.run_pending() + time.sleep(1) + + +if __name__ == "__main__": + main() diff --git a/migrations/19_v_ingest_health.sql b/migrations/19_v_ingest_health.sql new file mode 100644 index 0000000..49046b4 --- /dev/null +++ b/migrations/19_v_ingest_health.sql @@ -0,0 +1,79 @@ +-- 19_v_ingest_health.sql +-- reporting.v_ingest_health — per-endpoint ingest freshness for FleetOps. +-- +-- CONTEXT: Grafana was removed (2026-06-10) as redundant with the FleetOps SPA. +-- The one signal only Grafana surfaced was pipeline freshness. This view replaces +-- it with a read-API-friendly surface derived from the existing +-- tracksolid.ingestion_log (every poll already writes a row via log_ingestion()), +-- so FleetOps can show "is the ingest pipeline alive / stale / erroring" per +-- endpoint without a separate dashboard product. Exposed by dashboard_api as +-- GET /health/ingest. +-- +-- One row per endpoint: the latest run, how long ago, last success/error, 1-hour +-- run + failure counts, and a coarse freshness verdict. Each endpoint's expected +-- cadence mirrors the ingest_worker schedule (ingest_worker_rev.py); 'stale' fires +-- only past 3x that cadence so daily/low-frequency jobs aren't false-flagged. +-- Guarded + idempotent -> safe to re-apply. + +CREATE OR REPLACE VIEW reporting.v_ingest_health AS +WITH last_run AS ( + SELECT DISTINCT ON (endpoint) + endpoint, run_at, success, error_code, error_message, + rows_inserted, rows_upserted, imei_count, duration_ms + FROM tracksolid.ingestion_log + ORDER BY endpoint, run_at DESC +), +agg AS ( + SELECT endpoint, + count(*) FILTER (WHERE run_at > now() - interval '1 hour') AS runs_1h, + count(*) FILTER (WHERE run_at > now() - interval '1 hour' AND NOT success) AS failures_1h + FROM tracksolid.ingestion_log + GROUP BY endpoint +) +SELECT + lr.endpoint, + lr.run_at AS last_run_at, + EXTRACT(EPOCH FROM (now() - lr.run_at))::int AS seconds_ago, + lr.success AS last_success, + lr.error_code, + lr.error_message, + lr.rows_inserted, + lr.rows_upserted, + COALESCE(a.runs_1h, 0) AS runs_1h, + COALESCE(a.failures_1h, 0) AS failures_1h, + ex.expected_interval_s, + CASE + WHEN EXTRACT(EPOCH FROM (now() - lr.run_at)) > 3 * ex.expected_interval_s THEN 'stale' + WHEN NOT lr.success THEN 'error' + ELSE 'ok' + END AS freshness +FROM last_run lr +LEFT JOIN agg a USING (endpoint) +CROSS JOIN LATERAL ( + SELECT CASE lr.endpoint + WHEN 'jimi.user.device.location.list' THEN 60 -- live sweep (60s) + WHEN 'jimi.device.alarm.list' THEN 300 -- alarms (5m) + WHEN 'jimi.device.track.mileage' THEN 900 -- trips (15m) + WHEN 'jimi.open.platform.report.parking' THEN 900 -- parking (15m) + WHEN 'jimi.device.track.list' THEN 1800 -- high-res trail (30m) + ELSE 3600 -- default (1h) + END AS expected_interval_s +) ex +ORDER BY seconds_ago DESC; + +COMMENT ON VIEW reporting.v_ingest_health IS + 'Per-endpoint ingest freshness from tracksolid.ingestion_log. Replaces the ' + 'Grafana pipeline-health panels (Grafana removed 2026-06-10). Surfaced by ' + 'dashboard_api GET /health/ingest. freshness = ok|stale|error (stale = past 3x ' + 'the ingest_worker_rev.py cadence).'; + +-- Read-only access for the dashboard roles (guarded; idempotent). +DO $grants$ +BEGIN + IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'grafana_ro') THEN + GRANT SELECT ON reporting.v_ingest_health TO grafana_ro; + END IF; + IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'dashboard_ro') THEN + GRANT SELECT ON reporting.v_ingest_health TO dashboard_ro; + END IF; +END $grants$; diff --git a/run_migrations.py b/run_migrations.py index 3514363..1bab5b5 100644 --- a/run_migrations.py +++ b/run_migrations.py @@ -42,6 +42,7 @@ MIGRATIONS = [ "16_live_feed_vehicle_type.sql", # add vehicle_type + fleet_segment to fn_live_positions feed "17_fleetops_fuel_view.sql", # reporting.v_fuel_daily — FleetOps GET /analytics/fuel source "18_grant_reporting_ro.sql", # grant SELECT on reporting.* to grafana_ro (staging read-only role) + "19_v_ingest_health.sql", # reporting.v_ingest_health — pipeline freshness (replaces Grafana panels) ] # ── Tables that must exist before the service is allowed to start ─────────────