fleettickets/README.md
david kiania 7d3bba8d78 chore(schedule): INC ingest cron -> every 20 min, 06:00-20:40 EAT
Was hourly at :15 (15 7-19 * * *); now */20 6-20 * * * for fresher ticket
data through the working day. Updates the documented schedule in the Coolify
Scheduled Task command, run_ingest.sh, Dockerfile, README, and implementation
notes (the live schedule is set in the Coolify UI).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 18:23:17 +03:00

219 lines
14 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# fleettickets
Field-ops **INC ticket** ingestion, geocoding, and read-schema that powers the
**Tickets** map in FleetOps. Extracted from the `tracksolid` repo into its own module
(it previously lived there as migrations 2123 + `tools/import_tickets.py`).
- **INC** — incident / customer-fault tickets *(this pipeline is **strictly INC**)*
- **CRQ** — new-installation requests *(schema kept, but **out of scope** — not ingested here)*
## What this owns
| Piece | What |
|---|---|
| `migrations/01_tickets_schema.sql` | The `tickets` schema: `tickets.inc` / `tickets.crq` (raw-jsonb-first), `tickets.geo_clusters` + `tickets.geo_locations` gazetteers, geom-resolution trigger, and `reporting.fn_tickets_for_map` (the GeoJSON read function) |
| `migrations/02_import_meta.sql` | `tickets.import_meta` (per-dataset snapshot envelope metadata) + `fn_tickets_for_map` re-defined to expose it as `summary.freshness` (same signature — dashboard_api unchanged) |
| `migrations/03_inc_columns.sql` | Unpacks `tickets.inc.raw` into **typed STORED generated columns** (status, cluster, region, team, owner, sla_status, mttr, lat/lng, is_* booleans, and EAT→`timestamptz` timestamps via `tickets.eat_ts()`). Computed for all rows + auto-populated on every ingest; `raw` stays the source of truth |
| `migrations/04_inc_latlng.sql` | Redefines `latitude`/`longitude` to `COALESCE(feed, ST_Y/ST_X(geom))` so they're **populated from the geocoded position** (feed is always empty); precision per `geo_source` (`location` vs `cluster` centroid) |
| `migrations/05_inc_geography.sql` | Adds `geog geography(Point,4326)` (= `geom::geography`) + GiST index for **routing**`ST_Distance`/`ST_DWithin`/KNN in real metres (nearest-vehicle, radius search) |
| `migrations/06_inc_mttr_minutes.sql` | `mttr` generated column → integer **minutes** (source is decimal hours); drops the constant `is_alarm`/`is_auto_created`/`is_auto_closed` columns (kept in `raw`). `is_actionable` retained |
| `migrations/07_inc_drop_service_type.sql` | Drops the constant `service_type` column (always `inc`; kept in `raw`) |
| `migrations/08_inc_open_sla_view.sql` | `tickets.inc_open_sla` view — open (`is_actionable`) tickets with **derived SLA** (`hours_open`, `sla_state` vs 48h; clock = `created_at_service``first_seen_at`), plus team/cluster/`geog` for dispatch |
| `migrations/09_inc_dashboard_fn.sql` | `reporting.fn_inc_dashboard(cluster, status, window, from, to)` — one JSON payload (`window` / `open` GeoJSON / `closed` GeoJSON / `metrics` / `freshness`) powering the FleetOps live INC map. Open=live, closed=windowed (EAT calendar / custom); filters AND |
| `migrations/10_inc_history_capture.sql` | History for time-series: `tickets.closure_events` (append-only observed closures) + `tickets.inc_daily_snapshot` (per-EAT-day open backlog + flow), populated by `tickets.capture_history()` each ingest. Unlocks **backlog-over-time** |
| `import_tickets.py` | Drains the **incremental INC change stream** from the `isptickets` bucket (`automations/inc/changes/<EAT-timestamp>.csv`), upserting on `ticket_id` oldest→newest; geocodes clusters + INC locations |
| `run_migrations.py` | Applies `migrations/*.sql` in order (ledger: `tickets.schema_migrations`) |
| `shared.py` | Minimal DB/logging helpers (self-contained — no tracksolid dependency) |
## What this does NOT own (stays where it is)
- **The DB** — the `tickets` schema lives in the shared `tracksolid_db`.
- **The read-API** — `dashboard_api` (in the tracksolid stack) serves
`GET /webhook/tickets`, which calls `reporting.fn_tickets_for_map` (defined here).
- **The frontend** — the Tickets map is a tab in the **FleetOps** SPA (`fleetops` repo).
## Data model (raw-first)
Each row is `ticket_id` + `raw` (the full source record as `jsonb`) + a derived
`geom` / `geo_source`. Everything reads from `raw`, so a change to the source schema
needs no migration. For convenient typed/indexable access, `raw` is also **unpacked
into STORED generated columns** (migration 03) — e.g. `normalized_status`, `cluster`,
`region`, `assigned_team`, `owner`, `sla_status`, `mttr`, `is_actionable`,
`created_at_service`/`closed_at` (as EAT→`timestamptz`). These stay in lock-step with
`raw` automatically (no loader change); `raw` remains the source of truth. `geom` is resolved: **feed** coords (`raw` lat/lng) → **location**
(geocoded `location_name`) → **cluster** centroid → **none**.
Source coordinates are empty in the feed, so geocoding is required:
- `--geocode-clusters` — one coordinate per cluster (coarse fallback).
- `--geocode-locations` — precise per-location for **actionable INC** tickets: strips the
network codes from `location_name` (e.g. `NW_`, `ADR_MNT_`, `FDT<n>`, `SDUS`), geocodes
the real place via a **keyed** provider (LocationIQ / OpenCage), and **rejects any result
>25 km from the cluster centroid** (wrong-city guard). Results cache in
`tickets.geo_locations`.
### Columns on `tickets.inc`
| Column | Type | Notes |
|---|---|---|
| `ticket_id` | text (PK) | e.g. `WOT0715527` |
| `raw` | jsonb | full source record — the source of truth |
| `normalized_status` · `raw_status` | text | use `normalized_status` for filtering (canonical) |
| `bucket` | text | lifecycle: `closed` / `pending` |
| `is_actionable` | boolean | the open/closed flag (open = `true`) |
| `cluster` · `region` · `location_name` | text | `region` lowercased; `cluster` feeds the gazetteer |
| `assigned_team` · `owner` | text | closure attribution dimensions |
| `sla_status` | text | source `Compliant`/`Breached` — **only meaningful once closed** |
| `mttr` | numeric | **minutes** (source is decimal hours); null until closed |
| `created_at_service` · `scheduled_at` · `closed_at` · `first_seen_at` · `last_seen_at` · `source_created_at` · `source_updated_at` | timestamptz | EAT→UTC via `tickets.eat_ts()`. **lifecycle** = `created_at_service`→`closed_at`; **export bookkeeping** = `first_seen_at`/`last_seen_at`/`source_*` |
| `latitude` · `longitude` | double precision | `COALESCE(feed, geocoded)` — populated from `geom` |
| `geom` | geometry(Point,4326) | display / the map |
| `geog` | geography(Point,4326) | **routing** — metres-accurate distance (GiST indexed) |
| `geo_source` | text | precision: `feed` / `location` / `cluster` / `none` |
| `ingested_at` | timestamptz | when we last upserted this row |
Dropped from the unpacked columns (still in `raw`): `service_type`, `is_alarm`,
`is_auto_created`, `is_auto_closed` (all single-cardinality), plus the ingest-time
drops below. **`reporting.fn_tickets_for_map`** reads from `raw` and serves the map;
**`tickets.inc_open_sla`** is the open-ticket SLA view for dashboards/dispatch.
## Setup
```bash
uv sync
cp .env.example .env # fill in DATABASE_URL, RUSTFS_*, GEOCODER_*
python run_migrations.py # apply the schema (idempotent)
```
## Run
```bash
# drain the incremental INC change stream (every new file oldest→newest, then archive)
python import_tickets.py --from-bucket --apply
# geocode (needs GEOCODER_API_KEY)
python import_tickets.py --geocode-clusters --apply # coarse, once
python import_tickets.py --geocode-locations --apply # precise, actionable INC
# from a local CSV instead of the bucket (dev)
python import_tickets.py --inc-csv 2026-06-15T17-00-00.csv --apply
```
Dry-run is the default (omit `--apply`). `import_tickets.py --from-bucket` talks to S3
via **boto3** using the `RUSTFS_*` env (path-style addressing; no aws-CLI dependency).
## Deploy (Coolify)
The repo ships a [`Dockerfile`](Dockerfile) — a small batch worker with no web server.
Coolify builds it and keeps the container alive (`CMD tail -f /dev/null`); the ingest
runs as a **Scheduled Task**, not a system crontab:
- **Command:** `python import_tickets.py --from-bucket --apply`
- **Frequency:** `*/20 6-20 * * *` (every 20 min, **06:0020:40 EAT**). This
Coolify instance runs scheduled tasks in **EAT (Africa/Nairobi)**, so no UTC
conversion is needed.
- **Env vars** (Coolify → Environment Variables): `DATABASE_URL` (internal DB host),
`RUSTFS_*` (now the `isptickets` bucket credentials), `GEOCODER_*`.
The watermark makes a run with no new change files a cheap no-op.
For a plain host/VM instead of Coolify, [`run_ingest.sh`](run_ingest.sh) loads `.env`
and runs the ingest; schedule it with a crontab line
(`CRON_TZ=Africa/Nairobi` / `*/20 6-20 * * *`).
### Bucket cutover (one-time reseed)
When the source provider moves the feed to a new bucket (e.g. `tickets``isptickets`),
the stored watermark holds a key from the *old* bucket's stream, whose timestamp may be
newer than the new bucket's first file — which would otherwise be skipped. Point the
`RUSTFS_*` creds + `TICKETS_BUCKET` at the new bucket, then drain it once with `--reseed`,
which ignores the stored watermark and ingests **every** file in `changes/` oldest→newest:
```bash
python import_tickets.py --from-bucket --reseed # dry-run first
python import_tickets.py --from-bucket --reseed --apply # commit + archive
```
Upserts are idempotent (`ticket_id` PK, rows never deleted) and the new stream's periodic
full-state re-emissions re-assert current state, so this is non-destructive and converges
even across the cutover gap. After it, the watermark is current — resume normal
`--from-bucket --apply` runs (no `--reseed`). The old bucket is left untouched.
## Notes
- The n8n export writes an **incremental CDC change stream** to
`automations/inc/changes/<EAT-timestamp>.csv`: a full-state baseline followed by files
holding only the rows that changed (with periodic full-state re-emissions). No `latest`
pointer, no metadata envelope. The loader drains **every not-yet-processed file
oldest→newest** — taking only the newest would drop intermediate deltas.
- **Watermark:** the newest file already applied is recorded in
`tickets.import_meta.metadata.source_max_key`; runs skip anything at/older than it, so
reruns are cheap no-ops. `--reseed` ignores it for a one-time bucket cutover.
- **Upsert on `ticket_id`** (PRIMARY KEY) — duplication is impossible; rows are never
deleted, so closed-ticket history accumulates. On success each file is **moved** to
`automations/inc/processed/`.
- **Cleaning at ingest:** drop `is_alarm=true` rows + the `EXPORT STOPPED…` sentinel; drop
`week_start`/`week_end`, `source_s3_*`/`source_snapshot_id`, `department`/`source_type`;
normalize `region` → lowercase and `raw_status` → UPPERCASE. `service_type` and `bucket`
(a `closed`/`pending` flag) are kept.
- `tickets.import_meta` captures snapshot freshness (surfaced as `summary.freshness` by
`fn_tickets_for_map`).
- The curated/geocoded coordinates are written `verified = false` — review
`tickets.geo_clusters` / `tickets.geo_locations` and flip `verified` once checked.
## Querying
```sql
-- map payload (GeoJSON + summary, incl. summary.freshness) — what dashboard_api serves
SELECT reporting.fn_tickets_for_map(); -- open-only by default
SELECT reporting.fn_tickets_for_map(p_open_only := false); -- all geocoded tickets
-- open tickets by SLA (derived) + by cluster — via the view
SELECT sla_state, count(*) FROM tickets.inc_open_sla GROUP BY 1;
SELECT cluster, count(*), round(avg(hours_open),1) AS avg_hrs
FROM tickets.inc_open_sla GROUP BY 1 ORDER BY 2 DESC;
-- closures / creations per day (EAT)
SELECT (closed_at AT TIME ZONE 'Africa/Nairobi')::date AS d, count(*)
FROM tickets.inc WHERE closed_at IS NOT NULL GROUP BY 1 ORDER BY 1 DESC;
-- open-backlog-over-time (accrues from first capture; one row per EAT day)
SELECT snapshot_date, open_total, open_breached, closed_today
FROM tickets.inc_daily_snapshot ORDER BY snapshot_date DESC;
-- nearest open tickets to a vehicle (lng, lat) — metres, index-accelerated KNN
SELECT ticket_id, cluster, hours_open,
round(ST_Distance(geog, ST_SetSRID(ST_MakePoint(:lng,:lat),4326)::geography))::int AS metres
FROM tickets.inc_open_sla
ORDER BY geog <-> ST_SetSRID(ST_MakePoint(:lng,:lat),4326)::geography
LIMIT 10;
```
## Data-quality & SLA notes
Findings to keep in mind (see the PRD for detail):
- **Source `sla_status` is only meaningful for *closed* tickets.** It reads
`Compliant` for essentially all *open* tickets, so for open work use the **derived**
state in `tickets.inc_open_sla` (`now() created_at_service` vs the contract's 48h).
- **`created_at_service` is missing on ~30% of rows** (incl. most open ones); the SLA
view falls back to `first_seen_at` and flags it via `sla_clock_source`.
- **`mttr` is not wall-clock** `closed_at created_at_service` and the source's
`Breached`/`Compliant` does **not** match a plain 48h threshold — pin the contract's
exact SLA definition before trusting cross-field SLA math.
- **Content lag:** the feed's *file* timestamps are current, but the ticket *content*
trails ~2 days (the underlying `…wm_task.xlsx` source), so creation/closure dates
run a couple of days behind wall-clock.
- **History:** `tickets.inc` is current-state (upsert). Closure/creation/MTTR
*event* series work directly off `closed_at`/`created_at_service`. **Backlog-over-time**
now accrues via `tickets.inc_daily_snapshot` (one row per EAT day, written by
`tickets.capture_history()` each ingest); observed closures log to
`tickets.closure_events`. Past backlog can't be reconstructed — the series builds
from the first capture onward.
## Status / roadmap
Live: INC ingestion deployed on Coolify (every 20 min `*/20 6-20 * * *` EAT), schema +
generated columns + geocoding + the `inc_open_sla` view in `tracksolid_db`.
Next (Phase 2): time-series analytics (closure rate, MTTR/SLA trends), then FleetNow
vehicle **dispatch** off `geog`, and **team closure attribution**. **CRQ** is a
separate future project that will reuse this machinery against `automations/crq/`.