feat: history capture — closure_events + daily backlog snapshot (migration 10)

- tickets.closure_events: append-only observed closures (PK ticket_id, closed_at;
  observed_at = first sighting; survives row churn).
- tickets.inc_daily_snapshot: one row per EAT day — open backlog (+ SLA split, by
  cluster/status) and created/closed flow; upserted each run.
- tickets.capture_history(): appends new closures + upserts today's snapshot.
- import_tickets calls it after each --apply run (ingest or skip); add
  --capture-history CLI flag for standalone runs.
Verified: backfilled 21,282 closures; today's snapshot recorded (open_total 30).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
david kiania 2026-06-16 01:19:23 +03:00
parent da6da9d26f
commit 764dee986f
4 changed files with 160 additions and 11 deletions

View file

@ -20,6 +20,7 @@ Field-ops **INC ticket** ingestion, geocoding, and read-schema that powers the
| `migrations/07_inc_drop_service_type.sql` | Drops the constant `service_type` column (always `inc`; kept in `raw`) |
| `migrations/08_inc_open_sla_view.sql` | `tickets.inc_open_sla` view — open (`is_actionable`) tickets with **derived SLA** (`hours_open`, `sla_state` vs 48h; clock = `created_at_service``first_seen_at`), plus team/cluster/`geog` for dispatch |
| `migrations/09_inc_dashboard_fn.sql` | `reporting.fn_inc_dashboard(cluster, status, window, from, to)` — one JSON payload (`window` / `open` GeoJSON / `closed` GeoJSON / `metrics` / `freshness`) powering the FleetOps live INC map. Open=live, closed=windowed (EAT calendar / custom); filters AND |
| `migrations/10_inc_history_capture.sql` | History for time-series: `tickets.closure_events` (append-only observed closures) + `tickets.inc_daily_snapshot` (per-EAT-day open backlog + flow), populated by `tickets.capture_history()` each ingest. Unlocks **backlog-over-time** |
| `import_tickets.py` | Ingests the **newest INC CSV** from the rustfs `tickets` bucket (`automations/inc/<EAT-timestamp>.csv`) and upserts on `ticket_id`; geocodes clusters + INC locations |
| `run_migrations.py` | Applies `migrations/*.sql` in order (ledger: `tickets.schema_migrations`) |
| `shared.py` | Minimal DB/logging helpers (self-contained — no tracksolid dependency) |
@ -155,6 +156,10 @@ FROM tickets.inc_open_sla GROUP BY 1 ORDER BY 2 DESC;
SELECT (closed_at AT TIME ZONE 'Africa/Nairobi')::date AS d, count(*)
FROM tickets.inc WHERE closed_at IS NOT NULL GROUP BY 1 ORDER BY 1 DESC;
-- open-backlog-over-time (accrues from first capture; one row per EAT day)
SELECT snapshot_date, open_total, open_breached, closed_today
FROM tickets.inc_daily_snapshot ORDER BY snapshot_date DESC;
-- nearest open tickets to a vehicle (lng, lat) — metres, index-accelerated KNN
SELECT ticket_id, cluster, hours_open,
round(ST_Distance(geog, ST_SetSRID(ST_MakePoint(:lng,:lat),4326)::geography))::int AS metres
@ -179,8 +184,11 @@ Findings to keep in mind (see the PRD for detail):
trails ~2 days (the underlying `…wm_task.xlsx` source), so creation/closure dates
run a couple of days behind wall-clock.
- **History:** `tickets.inc` is current-state (upsert). Closure/creation/MTTR
*event* series work directly; **open-backlog-over-time** needs a history capture
(append-only `closure_events` or daily snapshots) — not yet built.
*event* series work directly off `closed_at`/`created_at_service`. **Backlog-over-time**
now accrues via `tickets.inc_daily_snapshot` (one row per EAT day, written by
`tickets.capture_history()` each ingest); observed closures log to
`tickets.closure_events`. Past backlog can't be reconstructed — the series builds
from the first capture onward.
## Status / roadmap

View file

@ -19,8 +19,11 @@ What is actually built and deployed, as of the Phase-1 completion. Companion to
- **Geocoding** (keyed LocationIQ): `--geocode-clusters` (coarse, per cluster) and
`--geocode-locations` (precise, actionable INC; strips network codes; 25 km
wrong-city guard). Results cache in `tickets.geo_clusters` / `tickets.geo_locations`.
- **History capture:** after each `--apply` run (ingest or skip), calls
`tickets.capture_history()` → appends new closures + upserts today's backlog
snapshot.
- CLI: `--from-bucket` (newest INC csv), `--inc-csv <file>` (local dev), `--apply`
(else dry-run), `--geocode-clusters`, `--geocode-locations`.
(else dry-run), `--geocode-clusters`, `--geocode-locations`, `--capture-history`.
## Schema / migrations (`tracksolid_db`, applied via `run_migrations.py`)
@ -35,6 +38,7 @@ What is actually built and deployed, as of the Phase-1 completion. Companion to
| 07_inc_drop_service_type | drop constant `service_type` |
| 08_inc_open_sla_view | `tickets.inc_open_sla` view (open tickets + derived SLA) |
| 09_inc_dashboard_fn | **built**`reporting.fn_inc_dashboard(cluster, status, window, from, to)`: one JSON payload (open GeoJSON + windowed closed GeoJSON + metrics + freshness) for the FleetOps live INC map. See `docs/phase-2-dashboard.md` |
| 10_inc_history_capture | **built**`tickets.closure_events` (append-only observed closures) + `tickets.inc_daily_snapshot` (per-EAT-day open backlog + flow) + `tickets.capture_history()`; the ingest calls it each `--apply` run. Unlocks backlog-over-time |
`tickets.inc` columns: `ticket_id` (PK), `raw` (jsonb, source of truth),
`normalized_status`/`raw_status`, `bucket`, `is_actionable`, `cluster`/`region`/
@ -71,13 +75,16 @@ What is actually built and deployed, as of the Phase-1 completion. Companion to
- Lifecycle timestamps = `created_at_service`→`closed_at`; the `*_seen_at` / `source_*`
ones are export bookkeeping (don't use for SLA/closure-time).
- Content lag ~2 days behind wall-clock.
- **History gap:** `tickets.inc` is current-state (upsert). Closure/creation/MTTR
*event* series work directly; **open-backlog-over-time** needs an append-only
history capture (not yet built).
- **History:** `tickets.inc` is current-state (upsert). Closure/creation/MTTR event
series work directly; **backlog-over-time** now accrues via
`tickets.inc_daily_snapshot` + `tickets.closure_events` (written by
`tickets.capture_history()` each ingest) — builds forward from the first capture.
## Roadmap
Phase 2: `fn_inc_dashboard` read-API → FleetOps live map (open + closed overlay +
metrics). Then FleetNow **dispatch** off `geog`, **team closure attribution**, and
**history capture** for backlog trends. **CRQ** = separate future project reusing
this machinery against `automations/crq/`.
Phase 2 (built): `fn_inc_dashboard` read-API → FleetOps live map (open + closed
overlay + metrics); history capture (`closure_events` + `inc_daily_snapshot`) for
backlog/closure trends. Remaining: `dashboard_api` endpoint + FleetOps SPA (other
repos; see `docs/dashboard-api-contract.md`), FleetNow **dispatch** off `geog`,
**team closure attribution**. **CRQ** = separate future project reusing this
machinery against `automations/crq/`.

View file

@ -237,6 +237,14 @@ def upsert(rows: list[dict], apply: bool, meta: dict | None = None) -> int:
return len(payload)
def _capture_history() -> None:
"""Append new closures + upsert today's backlog snapshot (tickets.capture_history)."""
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute("SELECT tickets.capture_history()")
log.info("history: %s", cur.fetchone()[0])
def ingest(args) -> None:
# Local-file path (dev): ingest a single CSV, no bucket / no archive.
if args.inc_csv:
@ -266,6 +274,7 @@ def ingest(args) -> None:
log.info("etag unchanged from last processed (%s) — skipping DB write", last_etag)
if args.apply:
_move_processed(s3, all_keys)
_capture_history() # still record today's snapshot even when unchanged
else:
log.info("DRY-RUN — would archive %d file(s) to %s", len(all_keys), _PROCESSED_PREFIX)
return
@ -279,6 +288,7 @@ def ingest(args) -> None:
upsert(rows, args.apply, meta=meta)
if args.apply:
_move_processed(s3, all_keys)
_capture_history()
else:
log.info("DRY-RUN — would archive %d file(s) to %s", len(all_keys), _PROCESSED_PREFIX)
@ -516,6 +526,8 @@ def main() -> None:
help="Geocode distinct clusters into the gazetteer, then re-resolve geoms")
ap.add_argument("--geocode-locations", action="store_true",
help="Geocode actionable-INC location_names precisely (keyed provider), then re-resolve")
ap.add_argument("--capture-history", action="store_true",
help="Run tickets.capture_history() standalone (closure_events + daily snapshot)")
args = ap.parse_args()
if args.geocode_clusters:
@ -524,8 +536,11 @@ def main() -> None:
if args.geocode_locations:
geocode_locations(apply=args.apply)
return
if args.capture_history:
_capture_history()
return
if not (args.from_bucket or args.inc_csv):
ap.error("provide --from-bucket, --inc-csv, --geocode-clusters, or --geocode-locations")
ap.error("provide --from-bucket, --inc-csv, --geocode-clusters, --geocode-locations, or --capture-history")
ingest(args)

View file

@ -0,0 +1,119 @@
-- 10_inc_history_capture.sql — fleettickets · history capture for time-series
-- ─────────────────────────────────────────────────────────────────────────────
-- tickets.inc is current-state (upsert), so it can't answer "how many were OPEN on
-- day X". This adds two durable history artifacts, populated by the ingest via
-- tickets.capture_history():
--
-- tickets.closure_events — append-only log of observed ticket closures
-- (keyed (ticket_id, closed_at); survives row churn,
-- records observed_at = when WE first saw the closure).
-- tickets.inc_daily_snapshot— one row per EAT day: open backlog (+ SLA split,
-- by cluster/status) and the day's created/closed flow.
-- Upserted each run → backlog-over-time accrues going
-- forward (past backlog can't be reconstructed).
--
-- Idempotent: safe on a fresh DB and re-appliable. The first capture backfills
-- closure_events from existing closed rows (observed_at = first-run time).
-- ─────────────────────────────────────────────────────────────────────────────
SET search_path = tickets, public;
CREATE TABLE IF NOT EXISTS tickets.closure_events (
ticket_id text NOT NULL,
closed_at timestamptz NOT NULL,
cluster text,
region text,
assigned_team text,
owner text,
mttr numeric, -- minutes
sla_status text,
observed_at timestamptz NOT NULL DEFAULT now(),
PRIMARY KEY (ticket_id, closed_at)
);
CREATE INDEX IF NOT EXISTS ix_closure_events_closed_at ON tickets.closure_events (closed_at);
CREATE INDEX IF NOT EXISTS ix_closure_events_team ON tickets.closure_events (assigned_team);
CREATE INDEX IF NOT EXISTS ix_closure_events_cluster ON tickets.closure_events (cluster);
CREATE TABLE IF NOT EXISTS tickets.inc_daily_snapshot (
snapshot_date date PRIMARY KEY, -- EAT calendar day
open_total integer,
open_breached integer,
open_at_risk integer,
open_ok integer,
open_unknown integer,
created_today integer, -- created_at_service on this EAT day
closed_today integer, -- closed_at on this EAT day
by_cluster_open jsonb,
by_status_open jsonb,
captured_at timestamptz NOT NULL DEFAULT now()
);
-- capture: append new closures + upsert today's backlog snapshot. Called by the
-- ingest after each --apply run (and available standalone).
CREATE OR REPLACE FUNCTION tickets.capture_history()
RETURNS jsonb LANGUAGE plpgsql AS $fn$
DECLARE
v_today date := (now() AT TIME ZONE 'Africa/Nairobi')::date;
v_events integer;
v_open integer;
BEGIN
-- 1) append newly-observed closures (closed_at is authoritative; dedup by id+closed_at)
INSERT INTO tickets.closure_events
(ticket_id, closed_at, cluster, region, assigned_team, owner, mttr, sla_status)
SELECT ticket_id, closed_at, cluster, region, assigned_team, owner, mttr, sla_status
FROM tickets.inc
WHERE closed_at IS NOT NULL
ON CONFLICT (ticket_id, closed_at) DO NOTHING;
GET DIAGNOSTICS v_events = ROW_COUNT;
-- 2) upsert today's backlog snapshot (open metrics from inc_open_sla)
INSERT INTO tickets.inc_daily_snapshot
(snapshot_date, open_total, open_breached, open_at_risk, open_ok, open_unknown,
created_today, closed_today, by_cluster_open, by_status_open, captured_at)
SELECT
v_today,
(SELECT count(*) FROM tickets.inc_open_sla),
(SELECT count(*) FROM tickets.inc_open_sla WHERE sla_state = 'breached'),
(SELECT count(*) FROM tickets.inc_open_sla WHERE sla_state = 'at_risk'),
(SELECT count(*) FROM tickets.inc_open_sla WHERE sla_state = 'ok'),
(SELECT count(*) FROM tickets.inc_open_sla WHERE sla_state = 'unknown'),
(SELECT count(*) FROM tickets.inc
WHERE (created_at_service AT TIME ZONE 'Africa/Nairobi')::date = v_today),
(SELECT count(*) FROM tickets.inc
WHERE (closed_at AT TIME ZONE 'Africa/Nairobi')::date = v_today),
(SELECT COALESCE(jsonb_object_agg(cl, c), '{}'::jsonb)
FROM (SELECT COALESCE(cluster, '(none)') cl, count(*) c
FROM tickets.inc_open_sla GROUP BY 1) z),
(SELECT COALESCE(jsonb_object_agg(st, c), '{}'::jsonb)
FROM (SELECT COALESCE(normalized_status, '(none)') st, count(*) c
FROM tickets.inc_open_sla GROUP BY 1) z),
now()
ON CONFLICT (snapshot_date) DO UPDATE SET
open_total = EXCLUDED.open_total, open_breached = EXCLUDED.open_breached,
open_at_risk = EXCLUDED.open_at_risk, open_ok = EXCLUDED.open_ok,
open_unknown = EXCLUDED.open_unknown, created_today = EXCLUDED.created_today,
closed_today = EXCLUDED.closed_today, by_cluster_open = EXCLUDED.by_cluster_open,
by_status_open = EXCLUDED.by_status_open, captured_at = now();
SELECT open_total INTO v_open FROM tickets.inc_daily_snapshot WHERE snapshot_date = v_today;
RETURN jsonb_build_object('closure_events_added', v_events,
'snapshot_date', v_today, 'open_total', v_open);
END $fn$;
COMMENT ON FUNCTION tickets.capture_history() IS
'Append new closures to tickets.closure_events + upsert today''s tickets.inc_daily_snapshot. fleettickets 10.';
-- grants (guarded: roles may not exist on a fresh DB)
DO $grants$
BEGIN
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'tracksolid_owner') THEN
GRANT SELECT, INSERT, UPDATE, DELETE ON tickets.closure_events, tickets.inc_daily_snapshot TO tracksolid_owner;
GRANT EXECUTE ON FUNCTION tickets.capture_history() TO tracksolid_owner;
END IF;
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'dashboard_ro') THEN
GRANT SELECT ON tickets.closure_events, tickets.inc_daily_snapshot TO dashboard_ro;
END IF;
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'grafana_ro') THEN
GRANT SELECT ON tickets.closure_events, tickets.inc_daily_snapshot TO grafana_ro;
END IF;
END $grants$;