From 764dee986f3dc689125e6bbe7f9bdef3f3954072 Mon Sep 17 00:00:00 2001 From: david kiania Date: Tue, 16 Jun 2026 01:19:23 +0300 Subject: [PATCH] =?UTF-8?q?feat:=20history=20capture=20=E2=80=94=20closure?= =?UTF-8?q?=5Fevents=20+=20daily=20backlog=20snapshot=20(migration=2010)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - tickets.closure_events: append-only observed closures (PK ticket_id, closed_at; observed_at = first sighting; survives row churn). - tickets.inc_daily_snapshot: one row per EAT day — open backlog (+ SLA split, by cluster/status) and created/closed flow; upserted each run. - tickets.capture_history(): appends new closures + upserts today's snapshot. - import_tickets calls it after each --apply run (ingest or skip); add --capture-history CLI flag for standalone runs. Verified: backfilled 21,282 closures; today's snapshot recorded (open_total 30). Co-Authored-By: Claude Opus 4.8 --- README.md | 12 ++- docs/implementation.md | 23 +++-- import_tickets.py | 17 +++- migrations/10_inc_history_capture.sql | 119 ++++++++++++++++++++++++++ 4 files changed, 160 insertions(+), 11 deletions(-) create mode 100644 migrations/10_inc_history_capture.sql diff --git a/README.md b/README.md index 4f5a724..dcbb81d 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,7 @@ Field-ops **INC ticket** ingestion, geocoding, and read-schema that powers the | `migrations/07_inc_drop_service_type.sql` | Drops the constant `service_type` column (always `inc`; kept in `raw`) | | `migrations/08_inc_open_sla_view.sql` | `tickets.inc_open_sla` view — open (`is_actionable`) tickets with **derived SLA** (`hours_open`, `sla_state` vs 48h; clock = `created_at_service` ∥ `first_seen_at`), plus team/cluster/`geog` for dispatch | | `migrations/09_inc_dashboard_fn.sql` | `reporting.fn_inc_dashboard(cluster, status, window, from, to)` — one JSON payload (`window` / `open` GeoJSON / `closed` GeoJSON / `metrics` / `freshness`) powering the FleetOps live INC map. Open=live, closed=windowed (EAT calendar / custom); filters AND | +| `migrations/10_inc_history_capture.sql` | History for time-series: `tickets.closure_events` (append-only observed closures) + `tickets.inc_daily_snapshot` (per-EAT-day open backlog + flow), populated by `tickets.capture_history()` each ingest. Unlocks **backlog-over-time** | | `import_tickets.py` | Ingests the **newest INC CSV** from the rustfs `tickets` bucket (`automations/inc/.csv`) and upserts on `ticket_id`; geocodes clusters + INC locations | | `run_migrations.py` | Applies `migrations/*.sql` in order (ledger: `tickets.schema_migrations`) | | `shared.py` | Minimal DB/logging helpers (self-contained — no tracksolid dependency) | @@ -155,6 +156,10 @@ FROM tickets.inc_open_sla GROUP BY 1 ORDER BY 2 DESC; SELECT (closed_at AT TIME ZONE 'Africa/Nairobi')::date AS d, count(*) FROM tickets.inc WHERE closed_at IS NOT NULL GROUP BY 1 ORDER BY 1 DESC; +-- open-backlog-over-time (accrues from first capture; one row per EAT day) +SELECT snapshot_date, open_total, open_breached, closed_today +FROM tickets.inc_daily_snapshot ORDER BY snapshot_date DESC; + -- nearest open tickets to a vehicle (lng, lat) — metres, index-accelerated KNN SELECT ticket_id, cluster, hours_open, round(ST_Distance(geog, ST_SetSRID(ST_MakePoint(:lng,:lat),4326)::geography))::int AS metres @@ -179,8 +184,11 @@ Findings to keep in mind (see the PRD for detail): trails ~2 days (the underlying `…wm_task.xlsx` source), so creation/closure dates run a couple of days behind wall-clock. - **History:** `tickets.inc` is current-state (upsert). Closure/creation/MTTR - *event* series work directly; **open-backlog-over-time** needs a history capture - (append-only `closure_events` or daily snapshots) — not yet built. + *event* series work directly off `closed_at`/`created_at_service`. **Backlog-over-time** + now accrues via `tickets.inc_daily_snapshot` (one row per EAT day, written by + `tickets.capture_history()` each ingest); observed closures log to + `tickets.closure_events`. Past backlog can't be reconstructed — the series builds + from the first capture onward. ## Status / roadmap diff --git a/docs/implementation.md b/docs/implementation.md index 1af7c57..f26fc11 100644 --- a/docs/implementation.md +++ b/docs/implementation.md @@ -19,8 +19,11 @@ What is actually built and deployed, as of the Phase-1 completion. Companion to - **Geocoding** (keyed LocationIQ): `--geocode-clusters` (coarse, per cluster) and `--geocode-locations` (precise, actionable INC; strips network codes; 25 km wrong-city guard). Results cache in `tickets.geo_clusters` / `tickets.geo_locations`. +- **History capture:** after each `--apply` run (ingest or skip), calls + `tickets.capture_history()` → appends new closures + upserts today's backlog + snapshot. - CLI: `--from-bucket` (newest INC csv), `--inc-csv ` (local dev), `--apply` - (else dry-run), `--geocode-clusters`, `--geocode-locations`. + (else dry-run), `--geocode-clusters`, `--geocode-locations`, `--capture-history`. ## Schema / migrations (`tracksolid_db`, applied via `run_migrations.py`) @@ -35,6 +38,7 @@ What is actually built and deployed, as of the Phase-1 completion. Companion to | 07_inc_drop_service_type | drop constant `service_type` | | 08_inc_open_sla_view | `tickets.inc_open_sla` view (open tickets + derived SLA) | | 09_inc_dashboard_fn | **built** — `reporting.fn_inc_dashboard(cluster, status, window, from, to)`: one JSON payload (open GeoJSON + windowed closed GeoJSON + metrics + freshness) for the FleetOps live INC map. See `docs/phase-2-dashboard.md` | +| 10_inc_history_capture | **built** — `tickets.closure_events` (append-only observed closures) + `tickets.inc_daily_snapshot` (per-EAT-day open backlog + flow) + `tickets.capture_history()`; the ingest calls it each `--apply` run. Unlocks backlog-over-time | `tickets.inc` columns: `ticket_id` (PK), `raw` (jsonb, source of truth), `normalized_status`/`raw_status`, `bucket`, `is_actionable`, `cluster`/`region`/ @@ -71,13 +75,16 @@ What is actually built and deployed, as of the Phase-1 completion. Companion to - Lifecycle timestamps = `created_at_service`→`closed_at`; the `*_seen_at` / `source_*` ones are export bookkeeping (don't use for SLA/closure-time). - Content lag ~2 days behind wall-clock. -- **History gap:** `tickets.inc` is current-state (upsert). Closure/creation/MTTR - *event* series work directly; **open-backlog-over-time** needs an append-only - history capture (not yet built). +- **History:** `tickets.inc` is current-state (upsert). Closure/creation/MTTR event + series work directly; **backlog-over-time** now accrues via + `tickets.inc_daily_snapshot` + `tickets.closure_events` (written by + `tickets.capture_history()` each ingest) — builds forward from the first capture. ## Roadmap -Phase 2: `fn_inc_dashboard` read-API → FleetOps live map (open + closed overlay + -metrics). Then FleetNow **dispatch** off `geog`, **team closure attribution**, and -**history capture** for backlog trends. **CRQ** = separate future project reusing -this machinery against `automations/crq/`. +Phase 2 (built): `fn_inc_dashboard` read-API → FleetOps live map (open + closed +overlay + metrics); history capture (`closure_events` + `inc_daily_snapshot`) for +backlog/closure trends. Remaining: `dashboard_api` endpoint + FleetOps SPA (other +repos; see `docs/dashboard-api-contract.md`), FleetNow **dispatch** off `geog`, +**team closure attribution**. **CRQ** = separate future project reusing this +machinery against `automations/crq/`. diff --git a/import_tickets.py b/import_tickets.py index dd34d82..d8263d6 100644 --- a/import_tickets.py +++ b/import_tickets.py @@ -237,6 +237,14 @@ def upsert(rows: list[dict], apply: bool, meta: dict | None = None) -> int: return len(payload) +def _capture_history() -> None: + """Append new closures + upsert today's backlog snapshot (tickets.capture_history).""" + with get_conn() as conn: + with conn.cursor() as cur: + cur.execute("SELECT tickets.capture_history()") + log.info("history: %s", cur.fetchone()[0]) + + def ingest(args) -> None: # Local-file path (dev): ingest a single CSV, no bucket / no archive. if args.inc_csv: @@ -266,6 +274,7 @@ def ingest(args) -> None: log.info("etag unchanged from last processed (%s) — skipping DB write", last_etag) if args.apply: _move_processed(s3, all_keys) + _capture_history() # still record today's snapshot even when unchanged else: log.info("DRY-RUN — would archive %d file(s) to %s", len(all_keys), _PROCESSED_PREFIX) return @@ -279,6 +288,7 @@ def ingest(args) -> None: upsert(rows, args.apply, meta=meta) if args.apply: _move_processed(s3, all_keys) + _capture_history() else: log.info("DRY-RUN — would archive %d file(s) to %s", len(all_keys), _PROCESSED_PREFIX) @@ -516,6 +526,8 @@ def main() -> None: help="Geocode distinct clusters into the gazetteer, then re-resolve geoms") ap.add_argument("--geocode-locations", action="store_true", help="Geocode actionable-INC location_names precisely (keyed provider), then re-resolve") + ap.add_argument("--capture-history", action="store_true", + help="Run tickets.capture_history() standalone (closure_events + daily snapshot)") args = ap.parse_args() if args.geocode_clusters: @@ -524,8 +536,11 @@ def main() -> None: if args.geocode_locations: geocode_locations(apply=args.apply) return + if args.capture_history: + _capture_history() + return if not (args.from_bucket or args.inc_csv): - ap.error("provide --from-bucket, --inc-csv, --geocode-clusters, or --geocode-locations") + ap.error("provide --from-bucket, --inc-csv, --geocode-clusters, --geocode-locations, or --capture-history") ingest(args) diff --git a/migrations/10_inc_history_capture.sql b/migrations/10_inc_history_capture.sql new file mode 100644 index 0000000..30938d3 --- /dev/null +++ b/migrations/10_inc_history_capture.sql @@ -0,0 +1,119 @@ +-- 10_inc_history_capture.sql — fleettickets · history capture for time-series +-- ───────────────────────────────────────────────────────────────────────────── +-- tickets.inc is current-state (upsert), so it can't answer "how many were OPEN on +-- day X". This adds two durable history artifacts, populated by the ingest via +-- tickets.capture_history(): +-- +-- tickets.closure_events — append-only log of observed ticket closures +-- (keyed (ticket_id, closed_at); survives row churn, +-- records observed_at = when WE first saw the closure). +-- tickets.inc_daily_snapshot— one row per EAT day: open backlog (+ SLA split, +-- by cluster/status) and the day's created/closed flow. +-- Upserted each run → backlog-over-time accrues going +-- forward (past backlog can't be reconstructed). +-- +-- Idempotent: safe on a fresh DB and re-appliable. The first capture backfills +-- closure_events from existing closed rows (observed_at = first-run time). +-- ───────────────────────────────────────────────────────────────────────────── + +SET search_path = tickets, public; + +CREATE TABLE IF NOT EXISTS tickets.closure_events ( + ticket_id text NOT NULL, + closed_at timestamptz NOT NULL, + cluster text, + region text, + assigned_team text, + owner text, + mttr numeric, -- minutes + sla_status text, + observed_at timestamptz NOT NULL DEFAULT now(), + PRIMARY KEY (ticket_id, closed_at) +); +CREATE INDEX IF NOT EXISTS ix_closure_events_closed_at ON tickets.closure_events (closed_at); +CREATE INDEX IF NOT EXISTS ix_closure_events_team ON tickets.closure_events (assigned_team); +CREATE INDEX IF NOT EXISTS ix_closure_events_cluster ON tickets.closure_events (cluster); + +CREATE TABLE IF NOT EXISTS tickets.inc_daily_snapshot ( + snapshot_date date PRIMARY KEY, -- EAT calendar day + open_total integer, + open_breached integer, + open_at_risk integer, + open_ok integer, + open_unknown integer, + created_today integer, -- created_at_service on this EAT day + closed_today integer, -- closed_at on this EAT day + by_cluster_open jsonb, + by_status_open jsonb, + captured_at timestamptz NOT NULL DEFAULT now() +); + +-- capture: append new closures + upsert today's backlog snapshot. Called by the +-- ingest after each --apply run (and available standalone). +CREATE OR REPLACE FUNCTION tickets.capture_history() + RETURNS jsonb LANGUAGE plpgsql AS $fn$ +DECLARE + v_today date := (now() AT TIME ZONE 'Africa/Nairobi')::date; + v_events integer; + v_open integer; +BEGIN + -- 1) append newly-observed closures (closed_at is authoritative; dedup by id+closed_at) + INSERT INTO tickets.closure_events + (ticket_id, closed_at, cluster, region, assigned_team, owner, mttr, sla_status) + SELECT ticket_id, closed_at, cluster, region, assigned_team, owner, mttr, sla_status + FROM tickets.inc + WHERE closed_at IS NOT NULL + ON CONFLICT (ticket_id, closed_at) DO NOTHING; + GET DIAGNOSTICS v_events = ROW_COUNT; + + -- 2) upsert today's backlog snapshot (open metrics from inc_open_sla) + INSERT INTO tickets.inc_daily_snapshot + (snapshot_date, open_total, open_breached, open_at_risk, open_ok, open_unknown, + created_today, closed_today, by_cluster_open, by_status_open, captured_at) + SELECT + v_today, + (SELECT count(*) FROM tickets.inc_open_sla), + (SELECT count(*) FROM tickets.inc_open_sla WHERE sla_state = 'breached'), + (SELECT count(*) FROM tickets.inc_open_sla WHERE sla_state = 'at_risk'), + (SELECT count(*) FROM tickets.inc_open_sla WHERE sla_state = 'ok'), + (SELECT count(*) FROM tickets.inc_open_sla WHERE sla_state = 'unknown'), + (SELECT count(*) FROM tickets.inc + WHERE (created_at_service AT TIME ZONE 'Africa/Nairobi')::date = v_today), + (SELECT count(*) FROM tickets.inc + WHERE (closed_at AT TIME ZONE 'Africa/Nairobi')::date = v_today), + (SELECT COALESCE(jsonb_object_agg(cl, c), '{}'::jsonb) + FROM (SELECT COALESCE(cluster, '(none)') cl, count(*) c + FROM tickets.inc_open_sla GROUP BY 1) z), + (SELECT COALESCE(jsonb_object_agg(st, c), '{}'::jsonb) + FROM (SELECT COALESCE(normalized_status, '(none)') st, count(*) c + FROM tickets.inc_open_sla GROUP BY 1) z), + now() + ON CONFLICT (snapshot_date) DO UPDATE SET + open_total = EXCLUDED.open_total, open_breached = EXCLUDED.open_breached, + open_at_risk = EXCLUDED.open_at_risk, open_ok = EXCLUDED.open_ok, + open_unknown = EXCLUDED.open_unknown, created_today = EXCLUDED.created_today, + closed_today = EXCLUDED.closed_today, by_cluster_open = EXCLUDED.by_cluster_open, + by_status_open = EXCLUDED.by_status_open, captured_at = now(); + + SELECT open_total INTO v_open FROM tickets.inc_daily_snapshot WHERE snapshot_date = v_today; + RETURN jsonb_build_object('closure_events_added', v_events, + 'snapshot_date', v_today, 'open_total', v_open); +END $fn$; + +COMMENT ON FUNCTION tickets.capture_history() IS + 'Append new closures to tickets.closure_events + upsert today''s tickets.inc_daily_snapshot. fleettickets 10.'; + +-- grants (guarded: roles may not exist on a fresh DB) +DO $grants$ +BEGIN + IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'tracksolid_owner') THEN + GRANT SELECT, INSERT, UPDATE, DELETE ON tickets.closure_events, tickets.inc_daily_snapshot TO tracksolid_owner; + GRANT EXECUTE ON FUNCTION tickets.capture_history() TO tracksolid_owner; + END IF; + IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'dashboard_ro') THEN + GRANT SELECT ON tickets.closure_events, tickets.inc_daily_snapshot TO dashboard_ro; + END IF; + IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'grafana_ro') THEN + GRANT SELECT ON tickets.closure_events, tickets.inc_daily_snapshot TO grafana_ro; + END IF; +END $grants$;