Compare commits
3 commits
0787d3a185
...
c980f3edd0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c980f3edd0 | ||
|
|
066d866b90 | ||
|
|
5f5d71d500 |
18 changed files with 964 additions and 225 deletions
|
|
@ -3,7 +3,7 @@
|
||||||
# Shared database (the `tickets` schema lives in tracksolid_db; internal Docker host)
|
# Shared database (the `tickets` schema lives in tracksolid_db; internal Docker host)
|
||||||
DATABASE_URL=postgresql://tracksolid_owner:<password>@timescale_db:5432/tracksolid_db
|
DATABASE_URL=postgresql://tracksolid_owner:<password>@timescale_db:5432/tracksolid_db
|
||||||
|
|
||||||
# S3 — source INC ticket CDC stream (isptickets bucket, automations/inc/changes/<EAT-ts>.csv)
|
# S3 — source ticket CDC streams (isptickets bucket, automations/{inc,crq}/changes/<EAT-ts>.csv)
|
||||||
RUSTFS_ENDPOINT=https://s3.rahamafresh.com
|
RUSTFS_ENDPOINT=https://s3.rahamafresh.com
|
||||||
RUSTFS_ACCESS_KEY=isptickets
|
RUSTFS_ACCESS_KEY=isptickets
|
||||||
RUSTFS_SECRET_KEY=<secret>
|
RUSTFS_SECRET_KEY=<secret>
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,9 @@
|
||||||
# fleettickets — INC ingestion image (Coolify-deployable).
|
# fleettickets — INC + CRQ ticket ingestion image (Coolify-deployable).
|
||||||
# A small batch/cron worker: it has no web server. Coolify keeps the container
|
# A small batch/cron worker: it has no web server. Coolify keeps the container
|
||||||
# running (CMD below) and fires the ingest via a Scheduled Task:
|
# running (CMD below) and fires the ingests via two Scheduled Tasks:
|
||||||
# python import_tickets.py --from-bucket --apply (cron: */20 6-20 * * *)
|
# python -m inc.import_inc --from-bucket --apply (cron: */20 6-20 * * *)
|
||||||
|
# python -m crq.import_crq --from-bucket --apply (cron: */20 6-20 * * *)
|
||||||
|
# (run from /app so the inc/ and crq/ packages + pipeline.py/shared.py import.)
|
||||||
# Env (set in Coolify): DATABASE_URL, RUSTFS_*, GEOCODER_*. S3 is via boto3 — no
|
# Env (set in Coolify): DATABASE_URL, RUSTFS_*, GEOCODER_*. S3 is via boto3 — no
|
||||||
# aws CLI needed. psycopg2-binary ships its own libpq, so no build toolchain.
|
# aws CLI needed. psycopg2-binary ships its own libpq, so no build toolchain.
|
||||||
FROM python:3.12-slim
|
FROM python:3.12-slim
|
||||||
|
|
|
||||||
78
README.md
78
README.md
|
|
@ -1,11 +1,22 @@
|
||||||
# fleettickets
|
# fleettickets
|
||||||
|
|
||||||
Field-ops **INC ticket** ingestion, geocoding, and read-schema that powers the
|
Field-ops **ticket** ingestion, geocoding, and read-schema that powers the
|
||||||
**Tickets** map in FleetOps. Extracted from the `tracksolid` repo into its own module
|
**Tickets** map in FleetOps. Extracted from the `tracksolid` repo into its own module
|
||||||
(it previously lived there as migrations 21–23 + `tools/import_tickets.py`).
|
(it previously lived there as migrations 21–23 + `tools/import_tickets.py`).
|
||||||
|
|
||||||
- **INC** — incident / customer-fault tickets *(this pipeline is **strictly INC**)*
|
Two ticket types, identical 32-column source schema and CDC change stream, served
|
||||||
- **CRQ** — new-installation requests *(schema kept, but **out of scope** — not ingested here)*
|
through a **shared engine** (`pipeline.py`) with a thin per-type entrypoint each:
|
||||||
|
|
||||||
|
- **INC** — incident / customer-fault tickets → `tickets.inc` (`inc/import_inc.py`).
|
||||||
|
Full feature set: typed columns, geocoding, SLA view, dashboard fn, history capture.
|
||||||
|
- **CRQ** — new-installation requests → `tickets.crq` (`crq/import_crq.py`). **Data
|
||||||
|
layer + map** (typed columns, geocoding, appears on the Tickets map via
|
||||||
|
`fn_tickets_for_map`). SLA view / dashboard fn / history capture are deferred —
|
||||||
|
installation-lifecycle semantics differ from incidents (see roadmap). CRQ gets its
|
||||||
|
**own FleetOps tab**, same look & feel as INC.
|
||||||
|
|
||||||
|
Geocoding is **cross-dataset** (one gazetteer, one geocoder budget, covers inc + crq)
|
||||||
|
and is driven from the INC entrypoint.
|
||||||
|
|
||||||
## What this owns
|
## What this owns
|
||||||
|
|
||||||
|
|
@ -21,7 +32,10 @@ Field-ops **INC ticket** ingestion, geocoding, and read-schema that powers the
|
||||||
| `migrations/08_inc_open_sla_view.sql` | `tickets.inc_open_sla` view — open (`is_actionable`) tickets with **derived SLA** (`hours_open`, `sla_state` vs 48h; clock = `created_at_service` ∥ `first_seen_at`), plus team/cluster/`geog` for dispatch |
|
| `migrations/08_inc_open_sla_view.sql` | `tickets.inc_open_sla` view — open (`is_actionable`) tickets with **derived SLA** (`hours_open`, `sla_state` vs 48h; clock = `created_at_service` ∥ `first_seen_at`), plus team/cluster/`geog` for dispatch |
|
||||||
| `migrations/09_inc_dashboard_fn.sql` | `reporting.fn_inc_dashboard(cluster, status, window, from, to)` — one JSON payload (`window` / `open` GeoJSON / `closed` GeoJSON / `metrics` / `freshness`) powering the FleetOps live INC map. Open=live, closed=windowed (EAT calendar / custom); filters AND |
|
| `migrations/09_inc_dashboard_fn.sql` | `reporting.fn_inc_dashboard(cluster, status, window, from, to)` — one JSON payload (`window` / `open` GeoJSON / `closed` GeoJSON / `metrics` / `freshness`) powering the FleetOps live INC map. Open=live, closed=windowed (EAT calendar / custom); filters AND |
|
||||||
| `migrations/10_inc_history_capture.sql` | History for time-series: `tickets.closure_events` (append-only observed closures) + `tickets.inc_daily_snapshot` (per-EAT-day open backlog + flow), populated by `tickets.capture_history()` each ingest. Unlocks **backlog-over-time** |
|
| `migrations/10_inc_history_capture.sql` | History for time-series: `tickets.closure_events` (append-only observed closures) + `tickets.inc_daily_snapshot` (per-EAT-day open backlog + flow), populated by `tickets.capture_history()` each ingest. Unlocks **backlog-over-time** |
|
||||||
| `import_tickets.py` | Drains the **incremental INC change stream** from the `isptickets` bucket (`automations/inc/changes/<EAT-timestamp>.csv`), upserting on `ticket_id` oldest→newest; geocodes clusters + INC locations |
|
| `migrations/15_crq_table.sql` | **Materializes `tickets.crq`** (table + geom trigger + indexes — `01`'s crq section never ran on the live DB) and unpacks `raw` into the same **typed STORED generated columns** as INC's `03` (reuses `tickets.eat_ts()`). Brings CRQ to data-layer parity |
|
||||||
|
| `pipeline.py` | **Shared engine** — the dataset-agnostic CDC loader (drains `automations/<type>/changes/<EAT-ts>.csv` from the `isptickets` bucket, upserts on `ticket_id` oldest→newest, watermark + per-file archive) and the **cross-dataset** geocoder (clusters + actionable inc/crq locations) |
|
||||||
|
| `inc/import_inc.py` | INC entrypoint (`python -m inc.import_inc`) — INC `Dataset` config + CLI; runs `tickets.capture_history()` after each `--apply`; hosts the shared geocode commands |
|
||||||
|
| `crq/import_crq.py` | CRQ entrypoint (`python -m crq.import_crq`) — CRQ `Dataset` config + CLI (ingest only; no history hook yet) |
|
||||||
| `run_migrations.py` | Applies `migrations/*.sql` in order (ledger: `tickets.schema_migrations`) |
|
| `run_migrations.py` | Applies `migrations/*.sql` in order (ledger: `tickets.schema_migrations`) |
|
||||||
| `shared.py` | Minimal DB/logging helpers (self-contained — no tracksolid dependency) |
|
| `shared.py` | Minimal DB/logging helpers (self-contained — no tracksolid dependency) |
|
||||||
|
|
||||||
|
|
@ -86,38 +100,44 @@ python run_migrations.py # apply the schema (idempotent)
|
||||||
|
|
||||||
## Run
|
## Run
|
||||||
|
|
||||||
```bash
|
Run from the repo root so the `inc`/`crq` packages + `pipeline.py`/`shared.py` import.
|
||||||
# drain the incremental INC change stream (every new file oldest→newest, then archive)
|
|
||||||
python import_tickets.py --from-bucket --apply
|
|
||||||
|
|
||||||
# geocode (needs GEOCODER_API_KEY)
|
```bash
|
||||||
python import_tickets.py --geocode-clusters --apply # coarse, once
|
# drain the incremental change streams (every new file oldest→newest, then archive)
|
||||||
python import_tickets.py --geocode-locations --apply # precise, actionable INC
|
python -m inc.import_inc --from-bucket --apply
|
||||||
|
python -m crq.import_crq --from-bucket --apply
|
||||||
|
|
||||||
|
# geocode — CROSS-DATASET (covers inc + crq); driven from the INC entrypoint, needs GEOCODER_API_KEY
|
||||||
|
python -m inc.import_inc --geocode-clusters --apply # coarse, once
|
||||||
|
python -m inc.import_inc --geocode-locations --apply # precise, actionable inc+crq
|
||||||
|
|
||||||
# from a local CSV instead of the bucket (dev)
|
# from a local CSV instead of the bucket (dev)
|
||||||
python import_tickets.py --inc-csv 2026-06-15T17-00-00.csv --apply
|
python -m inc.import_inc --inc-csv 2026-06-15T17-00-00.csv --apply
|
||||||
|
python -m crq.import_crq --crq-csv 2026-06-24T12-55-44.csv --apply
|
||||||
```
|
```
|
||||||
|
|
||||||
Dry-run is the default (omit `--apply`). `import_tickets.py --from-bucket` talks to S3
|
Dry-run is the default (omit `--apply`). `--from-bucket` talks to S3 via **boto3** using
|
||||||
via **boto3** using the `RUSTFS_*` env (path-style addressing; no aws-CLI dependency).
|
the `RUSTFS_*` env (path-style addressing; no aws-CLI dependency).
|
||||||
|
|
||||||
## Deploy (Coolify)
|
## Deploy (Coolify)
|
||||||
|
|
||||||
The repo ships a [`Dockerfile`](Dockerfile) — a small batch worker with no web server.
|
The repo ships a [`Dockerfile`](Dockerfile) — a small batch worker with no web server.
|
||||||
Coolify builds it and keeps the container alive (`CMD tail -f /dev/null`); the ingest
|
Coolify builds it and keeps the container alive (`CMD tail -f /dev/null`); each ingest
|
||||||
runs as a **Scheduled Task**, not a system crontab:
|
runs as its own **Scheduled Task**, not a system crontab:
|
||||||
|
|
||||||
- **Command:** `python import_tickets.py --from-bucket --apply`
|
- **`inc_tickets`:** `python -m inc.import_inc --from-bucket --apply`
|
||||||
- **Frequency:** `*/20 6-20 * * *` (every 20 min, **06:00–20:40 EAT**). This
|
- **`crq_tickets`:** `python -m crq.import_crq --from-bucket --apply`
|
||||||
|
- **Frequency:** both `*/20 6-20 * * *` (every 20 min, **06:00–20:40 EAT**). This
|
||||||
Coolify instance runs scheduled tasks in **EAT (Africa/Nairobi)**, so no UTC
|
Coolify instance runs scheduled tasks in **EAT (Africa/Nairobi)**, so no UTC
|
||||||
conversion is needed.
|
conversion is needed.
|
||||||
- **Env vars** (Coolify → Environment Variables): `DATABASE_URL` (internal DB host),
|
- **Env vars** (Coolify → Environment Variables): `DATABASE_URL` (internal DB host),
|
||||||
`RUSTFS_*` (now the `isptickets` bucket credentials), `GEOCODER_*`.
|
`RUSTFS_*` (the `isptickets` bucket credentials), `GEOCODER_*`. The same bucket holds
|
||||||
|
both `automations/inc/` and `automations/crq/`, so one credential set serves both tasks.
|
||||||
|
|
||||||
The watermark makes a run with no new change files a cheap no-op.
|
The watermark makes a run with no new change files a cheap no-op.
|
||||||
|
|
||||||
For a plain host/VM instead of Coolify, [`run_ingest.sh`](run_ingest.sh) loads `.env`
|
For a plain host/VM instead of Coolify, [`run_ingest.sh`](run_ingest.sh) loads `.env`
|
||||||
and runs the ingest; schedule it with a crontab line
|
and runs **both** ingests; schedule it with a crontab line
|
||||||
(`CRON_TZ=Africa/Nairobi` / `*/20 6-20 * * *`).
|
(`CRON_TZ=Africa/Nairobi` / `*/20 6-20 * * *`).
|
||||||
|
|
||||||
Full operational runbook — container, env management (encrypted; via the UI or
|
Full operational runbook — container, env management (encrypted; via the UI or
|
||||||
|
|
@ -134,8 +154,8 @@ newer than the new bucket's first file — which would otherwise be skipped. Poi
|
||||||
which ignores the stored watermark and ingests **every** file in `changes/` oldest→newest:
|
which ignores the stored watermark and ingests **every** file in `changes/` oldest→newest:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
python import_tickets.py --from-bucket --reseed # dry-run first
|
python -m inc.import_inc --from-bucket --reseed # dry-run first (or -m crq.import_crq)
|
||||||
python import_tickets.py --from-bucket --reseed --apply # commit + archive
|
python -m inc.import_inc --from-bucket --reseed --apply # commit + archive
|
||||||
```
|
```
|
||||||
|
|
||||||
Upserts are idempotent (`ticket_id` PK, rows never deleted) and the new stream's periodic
|
Upserts are idempotent (`ticket_id` PK, rows never deleted) and the new stream's periodic
|
||||||
|
|
@ -219,6 +239,16 @@ Findings to keep in mind (see the PRD for detail):
|
||||||
|
|
||||||
Live: INC ingestion deployed on Coolify (every 20 min `*/20 6-20 * * *` EAT), schema +
|
Live: INC ingestion deployed on Coolify (every 20 min `*/20 6-20 * * *` EAT), schema +
|
||||||
generated columns + geocoding + the `inc_open_sla` view in `tracksolid_db`.
|
generated columns + geocoding + the `inc_open_sla` view in `tracksolid_db`.
|
||||||
Next (Phase 2): time-series analytics (closure rate, MTTR/SLA trends), then FleetNow
|
|
||||||
vehicle **dispatch** off `geog`, and **team closure attribution**. **CRQ** is a
|
**CRQ (this milestone):** data layer + map — `tickets.crq` fed from
|
||||||
separate future project that will reuse this machinery against `automations/crq/`.
|
`automations/crq/changes/` by `crq/import_crq.py`, the `tickets.crq` table + typed columns (migration 15),
|
||||||
|
cross-dataset geocoding, and visibility on the Tickets map via `fn_tickets_for_map`.
|
||||||
|
One-time seed: drain the isptickets CRQ stream (`python -m crq.import_crq --from-bucket
|
||||||
|
--apply`) — empty watermark + the stream's periodic full-state snapshots converge to
|
||||||
|
current state — then run the shared geocode once. See
|
||||||
|
[`docs/deployment-and-operations.md`](docs/deployment-and-operations.md).
|
||||||
|
|
||||||
|
Next (Phase 2): bring CRQ to full INC parity once installation-lifecycle semantics are
|
||||||
|
confirmed — a `crq_open_sla` view, `fn_crq_dashboard`, and CRQ history capture (the INC
|
||||||
|
analogues of migrations 08/09/10). Then time-series analytics (closure rate, MTTR/SLA
|
||||||
|
trends), FleetNow vehicle **dispatch** off `geog`, and **team closure attribution**.
|
||||||
|
|
|
||||||
0
crq/__init__.py
Normal file
0
crq/__init__.py
Normal file
61
crq/import_crq.py
Normal file
61
crq/import_crq.py
Normal file
|
|
@ -0,0 +1,61 @@
|
||||||
|
"""
|
||||||
|
crq/import_crq.py — Fireside Communications · CRQ (new-installation) ingestion.
|
||||||
|
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||||||
|
Thin entrypoint over the shared engine (`pipeline.py`) for the CRQ dataset:
|
||||||
|
tickets.crq — new-installation requests (FleetOps "Tickets" CRQ tab)
|
||||||
|
|
||||||
|
CRQ mirrors INC at the data layer — IDENTICAL 32-column CSV schema and the same
|
||||||
|
incremental CDC change stream automations/crq/changes/<EAT-ts>.csv in the
|
||||||
|
`isptickets` bucket. This loader upserts on ticket_id, advances the per-dataset
|
||||||
|
watermark (tickets.import_meta dataset='crq'), and archives each consumed file to
|
||||||
|
automations/crq/processed/. CRQ flows onto the existing Tickets map via
|
||||||
|
reporting.fn_tickets_for_map (which already unions tickets.crq).
|
||||||
|
|
||||||
|
Scope (current): data layer + map only. CRQ has NO post-apply history capture yet
|
||||||
|
(installation-lifecycle SLA/backlog semantics differ from incidents — a future
|
||||||
|
migration). Geocoding is CROSS-DATASET and run from the INC entrypoint
|
||||||
|
(python -m inc.import_inc --geocode-clusters / --geocode-locations) against the
|
||||||
|
shared gazetteer, which covers both inc and crq.
|
||||||
|
|
||||||
|
Usage (needs DATABASE_URL + RUSTFS_* env; see .env.example):
|
||||||
|
python -m crq.import_crq --from-bucket --apply
|
||||||
|
python -m crq.import_crq --from-bucket --reseed --apply # one-time bucket cutover
|
||||||
|
python -m crq.import_crq --crq-csv 2026-06-24T12-55-44.csv --apply
|
||||||
|
|
||||||
|
Pre-requisite: migrations applied (run_migrations.py) — tickets.crq + its typed
|
||||||
|
columns (15_crq_table.sql) + geo_clusters/geo_locations + fn_tickets_for_map.
|
||||||
|
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
|
||||||
|
import pipeline
|
||||||
|
|
||||||
|
# CRQ has no post-apply hook yet (history capture is INC-only — see module docstring).
|
||||||
|
DATASET = pipeline.make_dataset("crq", post_apply=None)
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
ap = argparse.ArgumentParser(
|
||||||
|
description="Ingest CRQ (installation) tickets from CSV (raw-first)")
|
||||||
|
ap.add_argument("--apply", action="store_true", help="Write to DB (default: dry-run)")
|
||||||
|
ap.add_argument("--from-bucket", action="store_true",
|
||||||
|
help="Drain the incremental CRQ change stream (automations/crq/changes/) "
|
||||||
|
"from the isptickets S3 bucket: every not-yet-processed file "
|
||||||
|
"oldest→newest, upsert on ticket_id, advance the watermark, archive")
|
||||||
|
ap.add_argument("--reseed", action="store_true",
|
||||||
|
help="Ignore the stored watermark and drain every file in changes/ once "
|
||||||
|
"(one-time bucket cutover / reseed). Use with --from-bucket --apply")
|
||||||
|
ap.add_argument("--crq-csv", dest="local_csv", default=None,
|
||||||
|
help="Local CRQ tickets CSV file (dev)")
|
||||||
|
args = ap.parse_args()
|
||||||
|
|
||||||
|
if not (args.from_bucket or args.local_csv):
|
||||||
|
ap.error("provide --from-bucket or --crq-csv")
|
||||||
|
pipeline.ingest(DATASET, args)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
|
|
@ -1,20 +1,26 @@
|
||||||
# Deployment & Operations — fleettickets
|
# Deployment & Operations — fleettickets
|
||||||
|
|
||||||
Operational runbook for the INC ingest pipeline as deployed on **Coolify**
|
Operational runbook for the INC + CRQ ingest pipelines as deployed on **Coolify**
|
||||||
(host `kianiadee@twala.rahamafresh.com`, key `~/.ssh/id_ed25519`). Covers the
|
(host `kianiadee@twala.rahamafresh.com`, key `~/.ssh/id_ed25519`). Covers the
|
||||||
container, environment, schedule, auto-deploy webhook, the source-bucket cutover
|
container, environment, schedule, auto-deploy webhook, the source-bucket cutover
|
||||||
procedure, and verification. Secrets are referenced by **where to retrieve them**,
|
procedure, and verification. Secrets are referenced by **where to retrieve them**,
|
||||||
never by value.
|
never by value.
|
||||||
|
|
||||||
|
> **One image, two datasets.** INC and CRQ share an identical 32-column source schema
|
||||||
|
> and the same `isptickets` bucket; they run as **two Scheduled Tasks** off the one
|
||||||
|
> container, via thin entrypoints `python -m inc.import_inc` / `python -m crq.import_crq`
|
||||||
|
> over the shared `pipeline.py` engine. Everything below applies to both unless noted.
|
||||||
|
|
||||||
## What's deployed
|
## What's deployed
|
||||||
|
|
||||||
| Thing | Detail |
|
| Thing | Detail |
|
||||||
|---|---|
|
|---|---|
|
||||||
| Coolify app | **`fleettickets`** — id `15`, uuid `g14mwzo73q20g70vc6fzumya`, build pack `dockerfile`, git `main` |
|
| Coolify app | **`fleettickets`** — id `15`, uuid `g14mwzo73q20g70vc6fzumya`, build pack `dockerfile`, git `main` |
|
||||||
| Container | built from this repo's `Dockerfile` (`python:3.12-slim`, `TZ=Africa/Nairobi`); kept alive with `tail -f /dev/null` (no web server) |
|
| Container | built from this repo's `Dockerfile` (`python:3.12-slim`, `TZ=Africa/Nairobi`); kept alive with `tail -f /dev/null` (no web server) |
|
||||||
| Ingest | a Coolify **Scheduled Task** `inc_tickets` running `python import_tickets.py --from-bucket --apply` |
|
| Ingest (INC) | Coolify **Scheduled Task** `inc_tickets` → `python -m inc.import_inc --from-bucket --apply` |
|
||||||
|
| Ingest (CRQ) | Coolify **Scheduled Task** `crq_tickets` → `python -m crq.import_crq --from-bucket --apply` |
|
||||||
| DB | `tickets` schema in the shared `tracksolid_db` (internal host `timescale_db:5432`) |
|
| DB | `tickets` schema in the shared `tracksolid_db` (internal host `timescale_db:5432`) |
|
||||||
| Source | **`isptickets`** S3 bucket, `automations/inc/changes/<EAT-ts>.csv` CDC stream (see `../n8n-s3-ticket-exports.md` and `../README.md`) |
|
| Source | **`isptickets`** S3 bucket, `automations/{inc,crq}/changes/<EAT-ts>.csv` CDC streams (see `../n8n-s3-ticket-exports.md` and `../README.md`) |
|
||||||
|
|
||||||
Resolve the live container name (Coolify appends a random suffix):
|
Resolve the live container name (Coolify appends a random suffix):
|
||||||
```bash
|
```bash
|
||||||
|
|
@ -24,17 +30,20 @@ ssh -i ~/.ssh/id_ed25519 kianiadee@twala.rahamafresh.com \
|
||||||
|
|
||||||
## Schedule (cron)
|
## Schedule (cron)
|
||||||
|
|
||||||
The Scheduled Task runs **`*/20 6-20 * * *`** — every 20 min, **06:00–20:40 EAT**.
|
Both Scheduled Tasks (`inc_tickets`, `crq_tickets`) run **`*/20 6-20 * * *`** — every
|
||||||
Coolify evaluates task cron in the server timezone (`server_settings.server_timezone`
|
20 min, **06:00–20:40 EAT**. Coolify evaluates task cron in the server timezone
|
||||||
= `Africa/Nairobi`), so **no UTC conversion** — write EAT directly. The `--from-bucket`
|
(`server_settings.server_timezone` = `Africa/Nairobi`), so **no UTC conversion** — write
|
||||||
run is a cheap no-op when no new change file has arrived (watermark guard), so a dense
|
EAT directly. The `--from-bucket` run is a cheap no-op when no new change file has arrived
|
||||||
schedule is safe.
|
(watermark guard, per dataset), so a dense schedule is safe.
|
||||||
|
|
||||||
To change the frequency, edit the task in the Coolify UI, or in `coolify-db`:
|
To change the frequency, edit the task in the Coolify UI, or in `coolify-db`:
|
||||||
```sql
|
```sql
|
||||||
UPDATE scheduled_tasks SET frequency = '*/20 6-20 * * *', updated_at = now()
|
UPDATE scheduled_tasks SET frequency = '*/20 6-20 * * *', updated_at = now()
|
||||||
WHERE name = 'inc_tickets'; -- id 3
|
WHERE name IN ('inc_tickets', 'crq_tickets');
|
||||||
```
|
```
|
||||||
|
The `crq_tickets` task is added the same way INC was — in the Coolify UI (Scheduled Tasks
|
||||||
|
→ Add) with command `python -m crq.import_crq --from-bucket --apply`, container
|
||||||
|
`fleettickets`, cron `*/20 6-20 * * *`.
|
||||||
Coolify's scheduler re-reads `scheduled_tasks` each minute, so the change is picked up
|
Coolify's scheduler re-reads `scheduled_tasks` each minute, so the change is picked up
|
||||||
without a redeploy. Execution history: `scheduled_task_executions`.
|
without a redeploy. Execution history: `scheduled_task_executions`.
|
||||||
|
|
||||||
|
|
@ -118,27 +127,53 @@ application_id = '15' ORDER BY created_at DESC LIMIT 3;` (note: `application_id`
|
||||||
If the provider moves the INC feed to a new bucket (as happened `tickets` → `isptickets`,
|
If the provider moves the INC feed to a new bucket (as happened `tickets` → `isptickets`,
|
||||||
2026-06-25):
|
2026-06-25):
|
||||||
|
|
||||||
1. **Inspect** the new bucket (read-only) — confirm `automations/inc/changes/` layout,
|
1. **Inspect** the new bucket (read-only) — confirm `automations/{inc,crq}/changes/` layout,
|
||||||
timestamp range, schema parity. CRQ (`automations/crq/`) stays out of scope.
|
timestamp range, schema parity.
|
||||||
2. **Update env** (UI or tinker): `RUSTFS_ACCESS_KEY`, `RUSTFS_SECRET_KEY`,
|
2. **Update env** (UI or tinker): `RUSTFS_ACCESS_KEY`, `RUSTFS_SECRET_KEY`,
|
||||||
`TICKETS_BUCKET` → the new bucket (endpoint usually unchanged).
|
`TICKETS_BUCKET` → the new bucket (endpoint usually unchanged). Both datasets read the
|
||||||
|
same bucket, so one env change serves both tasks.
|
||||||
3. **Reconcile the DB** to current. The loader drains every `changes/` file newer than the
|
3. **Reconcile the DB** to current. The loader drains every `changes/` file newer than the
|
||||||
watermark (`tickets.import_meta.metadata.source_max_key`), oldest→newest, upserting on
|
watermark (`tickets.import_meta.metadata.source_max_key`, **per dataset**), oldest→newest,
|
||||||
`ticket_id`:
|
upserting on `ticket_id`:
|
||||||
- If the watermark **predates** the new bucket's first file, a normal
|
- If the watermark **predates** the new bucket's first file, a normal
|
||||||
`--from-bucket --apply` drains the whole new stream — no reseed needed.
|
`--from-bucket --apply` drains the whole new stream — no reseed needed.
|
||||||
- Otherwise use **`--reseed`** (ignores the watermark, drains all `changes/` once):
|
- Otherwise use **`--reseed`** (ignores the watermark, drains all `changes/` once):
|
||||||
`python import_tickets.py --from-bucket --reseed --apply` (see README "Bucket cutover").
|
`python -m inc.import_inc --from-bucket --reseed --apply` (see README "Bucket cutover").
|
||||||
The new stream's periodic full-state re-emissions make this converge even across the
|
The new stream's periodic full-state re-emissions make this converge even across the
|
||||||
cutover gap. Idempotent upserts + never-delete make it non-destructive.
|
cutover gap. Idempotent upserts + never-delete make it non-destructive.
|
||||||
- For a one-off, you can run it in the live container with the new creds inlined:
|
- For a one-off, you can run it in the live container with the new creds inlined:
|
||||||
`docker exec -e TICKETS_BUCKET=… -e RUSTFS_ACCESS_KEY=… -e RUSTFS_SECRET_KEY=… <container>
|
`docker exec -e TICKETS_BUCKET=… -e RUSTFS_ACCESS_KEY=… -e RUSTFS_SECRET_KEY=… <container>
|
||||||
sh -c "cd /app && python import_tickets.py --from-bucket --apply"`.
|
sh -c "cd /app && python -m inc.import_inc --from-bucket --apply"`.
|
||||||
4. **Re-geocode** new clusters/locations: `--geocode-clusters --apply` then
|
4. **Re-geocode** new clusters/locations: `python -m inc.import_inc --geocode-clusters --apply`
|
||||||
`--geocode-locations --apply` (existing gazetteer persists; only new keys are looked up).
|
then `--geocode-locations --apply` (cross-dataset; existing gazetteer persists; only new
|
||||||
|
keys are looked up).
|
||||||
5. **Redeploy** so the Scheduled Task's container picks up the new env (push `main` → webhook,
|
5. **Redeploy** so the Scheduled Task's container picks up the new env (push `main` → webhook,
|
||||||
or manual deploy). Old bucket is left untouched for rollback.
|
or manual deploy). Old bucket is left untouched for rollback.
|
||||||
|
|
||||||
|
## Bringing CRQ online (one-time seed)
|
||||||
|
|
||||||
|
CRQ was added 2026-06-25 (data layer + map). Migration `15_crq_table.sql` **creates**
|
||||||
|
`tickets.crq` (the live DB's `01` predated its crq section, so the table never existed)
|
||||||
|
plus the typed columns. To seed it from zero on the live DB — once the code + migration are
|
||||||
|
applied (`run_migrations.py`; on the live cutover it was applied out-of-band via the running
|
||||||
|
container, see below):
|
||||||
|
|
||||||
|
1. **Verify** the migration applied: `SELECT 1 FROM tickets.schema_migrations WHERE
|
||||||
|
filename='15_crq_table.sql';` and `\d tickets.crq` shows the table + typed columns.
|
||||||
|
2. **Seed** from isptickets (empty `crq` watermark → drains all `automations/crq/changes/`
|
||||||
|
files oldest→newest; the stream's periodic full-state snapshots converge to current
|
||||||
|
state — same convergence the INC cutover relied on, so **no `--reseed` needed**):
|
||||||
|
```bash
|
||||||
|
python -m crq.import_crq --from-bucket # dry-run first ("N of N change file(s)…")
|
||||||
|
python -m crq.import_crq --from-bucket --apply # commit + archive to crq/processed/
|
||||||
|
```
|
||||||
|
(Or in the live container with `docker exec … sh -c "cd /app && python -m crq.import_crq
|
||||||
|
--from-bucket --apply"`.)
|
||||||
|
3. **Geocode** (cross-dataset; most clusters already resolved from INC, so few new lookups):
|
||||||
|
`python -m inc.import_inc --geocode-clusters --apply` then `--geocode-locations --apply`.
|
||||||
|
4. **Confirm** CRQ on the map: `SELECT reporting.fn_tickets_for_map() -> 'summary';` shows a
|
||||||
|
non-zero `crq` count. The `crq_tickets` Scheduled Task then keeps it current.
|
||||||
|
|
||||||
## Verification
|
## Verification
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
|
|
|
||||||
|
|
@ -3,9 +3,18 @@
|
||||||
What is actually built and deployed, as of the Phase-1 completion. Companion to
|
What is actually built and deployed, as of the Phase-1 completion. Companion to
|
||||||
`docs/phase-1-ingestion.md` (plan) and `docs/phase-2-dashboard.md` (next).
|
`docs/phase-1-ingestion.md` (plan) and `docs/phase-2-dashboard.md` (next).
|
||||||
|
|
||||||
## Pipeline (`import_tickets.py`)
|
## Pipeline (`pipeline.py` engine + `inc/`,`crq/` entrypoints)
|
||||||
|
|
||||||
- **Source:** the incremental CDC stream `automations/inc/changes/<EAT-timestamp>.csv`
|
The dataset-agnostic CDC engine lives in **`pipeline.py`**, parameterized by a small
|
||||||
|
`Dataset` config (name, table, `automations/<type>/changes|processed/` prefixes, key
|
||||||
|
regex, optional `post_apply` hook). Two thin entrypoints supply that config and the CLI:
|
||||||
|
**`inc/import_inc.py`** (`python -m inc.import_inc`, `post_apply=capture_history`) and
|
||||||
|
**`crq/import_crq.py`** (`python -m crq.import_crq`, no history hook). INC and CRQ share an
|
||||||
|
**identical 32-column source schema**, so the engine is fully shared; geocoding is
|
||||||
|
**cross-dataset** (one gazetteer/budget, unions `tickets.inc` + `tickets.crq`) and is run
|
||||||
|
from the INC entrypoint.
|
||||||
|
|
||||||
|
- **Source:** the incremental CDC stream `automations/<inc|crq>/changes/<EAT-timestamp>.csv`
|
||||||
in the **`isptickets`** S3 bucket (endpoint `https://s3.rahamafresh.com`, path-style,
|
in the **`isptickets`** S3 bucket (endpoint `https://s3.rahamafresh.com`, path-style,
|
||||||
region `us-east-1`; was the `tickets` bucket before the 2026-06-25 cutover).
|
region `us-east-1`; was the `tickets` bucket before the 2026-06-25 cutover).
|
||||||
- **S3 access via boto3** (no aws-CLI dependency): `list_objects_v2` (paginator),
|
- **S3 access via boto3** (no aws-CLI dependency): `list_objects_v2` (paginator),
|
||||||
|
|
@ -24,9 +33,11 @@ What is actually built and deployed, as of the Phase-1 completion. Companion to
|
||||||
- **History capture:** after each `--apply` run (ingest or skip), calls
|
- **History capture:** after each `--apply` run (ingest or skip), calls
|
||||||
`tickets.capture_history()` → appends new closures + upserts today's backlog
|
`tickets.capture_history()` → appends new closures + upserts today's backlog
|
||||||
snapshot.
|
snapshot.
|
||||||
- CLI: `--from-bucket` (drain the INC change stream), `--reseed` (ignore the watermark;
|
- CLI (`inc`): `--from-bucket` (drain the INC change stream), `--reseed` (ignore the
|
||||||
one-time bucket cutover), `--inc-csv <file>` (local dev), `--apply` (else dry-run),
|
watermark; one-time bucket cutover), `--inc-csv <file>` (local dev), `--apply` (else
|
||||||
`--geocode-clusters`, `--geocode-locations`, `--capture-history`.
|
dry-run), `--geocode-clusters`, `--geocode-locations`, `--capture-history`.
|
||||||
|
- CLI (`crq`): `--from-bucket`, `--reseed`, `--crq-csv <file>`, `--apply` (ingest only;
|
||||||
|
geocoding + history are not on the CRQ entrypoint).
|
||||||
|
|
||||||
## Schema / migrations (`tracksolid_db`, applied via `run_migrations.py`)
|
## Schema / migrations (`tracksolid_db`, applied via `run_migrations.py`)
|
||||||
|
|
||||||
|
|
@ -42,6 +53,8 @@ What is actually built and deployed, as of the Phase-1 completion. Companion to
|
||||||
| 08_inc_open_sla_view | `tickets.inc_open_sla` view (open tickets + derived SLA) |
|
| 08_inc_open_sla_view | `tickets.inc_open_sla` view (open tickets + derived SLA) |
|
||||||
| 09_inc_dashboard_fn | **built** — `reporting.fn_inc_dashboard(cluster, status, window, from, to)`: one JSON payload (open GeoJSON + windowed closed GeoJSON + metrics + freshness) for the FleetOps live INC map. See `docs/phase-2-dashboard.md` |
|
| 09_inc_dashboard_fn | **built** — `reporting.fn_inc_dashboard(cluster, status, window, from, to)`: one JSON payload (open GeoJSON + windowed closed GeoJSON + metrics + freshness) for the FleetOps live INC map. See `docs/phase-2-dashboard.md` |
|
||||||
| 10_inc_history_capture | **built** — `tickets.closure_events` (append-only observed closures) + `tickets.inc_daily_snapshot` (per-EAT-day open backlog + flow) + `tickets.capture_history()`; the ingest calls it each `--apply` run. Unlocks backlog-over-time |
|
| 10_inc_history_capture | **built** — `tickets.closure_events` (append-only observed closures) + `tickets.inc_daily_snapshot` (per-EAT-day open backlog + flow) + `tickets.capture_history()`; the ingest calls it each `--apply` run. Unlocks backlog-over-time |
|
||||||
|
| 12_inc_dashboard_by_owner | **built** — owner/team breakdown extension to `fn_inc_dashboard` |
|
||||||
|
| 15_crq_table | **built** — materializes `tickets.crq` (table + geom trigger + indexes; `01`'s crq section never ran on the live DB) + the typed STORED generated columns from `03` (reuses `tickets.eat_ts()`). Data-layer parity for the CRQ tab |
|
||||||
|
|
||||||
`tickets.inc` columns: `ticket_id` (PK), `raw` (jsonb, source of truth),
|
`tickets.inc` columns: `ticket_id` (PK), `raw` (jsonb, source of truth),
|
||||||
`normalized_status`/`raw_status`, `bucket`, `is_actionable`, `cluster`/`region`/
|
`normalized_status`/`raw_status`, `bucket`, `is_actionable`, `cluster`/`region`/
|
||||||
|
|
@ -56,10 +69,11 @@ What is actually built and deployed, as of the Phase-1 completion. Companion to
|
||||||
- **Coolify** app built from this repo's `Dockerfile` (`python:3.12-slim`,
|
- **Coolify** app built from this repo's `Dockerfile` (`python:3.12-slim`,
|
||||||
`TZ=Africa/Nairobi`, keep-alive `tail -f /dev/null`). Separate from the FleetOps
|
`TZ=Africa/Nairobi`, keep-alive `tail -f /dev/null`). Separate from the FleetOps
|
||||||
web app (`fleet-ops-staging`).
|
web app (`fleet-ops-staging`).
|
||||||
- **Scheduled Task:** `python import_tickets.py --from-bucket --apply`, cron
|
- **Scheduled Tasks (two):** `inc_tickets` → `python -m inc.import_inc --from-bucket
|
||||||
|
--apply` and `crq_tickets` → `python -m crq.import_crq --from-bucket --apply`, both cron
|
||||||
`*/20 6-20 * * *` in **EAT** (Coolify runs tasks in EAT — no UTC conversion).
|
`*/20 6-20 * * *` in **EAT** (Coolify runs tasks in EAT — no UTC conversion).
|
||||||
- **Env vars** (Coolify): `DATABASE_URL` (internal DB host), `RUSTFS_*`
|
- **Env vars** (Coolify): `DATABASE_URL` (internal DB host), `RUSTFS_*`
|
||||||
(`isptickets` bucket), `GEOCODER_*`.
|
(`isptickets` bucket — serves both inc + crq), `GEOCODER_*`.
|
||||||
- For a plain host/VM, `run_ingest.sh` + a crontab line is the alternative.
|
- For a plain host/VM, `run_ingest.sh` + a crontab line is the alternative.
|
||||||
|
|
||||||
Full ops runbook (env management, the Forgejo → Coolify auto-deploy webhook, manual
|
Full ops runbook (env management, the Forgejo → Coolify auto-deploy webhook, manual
|
||||||
|
|
@ -93,5 +107,12 @@ Phase 2 (built): `fn_inc_dashboard` read-API → FleetOps live map (open + close
|
||||||
overlay + metrics); history capture (`closure_events` + `inc_daily_snapshot`) for
|
overlay + metrics); history capture (`closure_events` + `inc_daily_snapshot`) for
|
||||||
backlog/closure trends. Remaining: `dashboard_api` endpoint + FleetOps SPA (other
|
backlog/closure trends. Remaining: `dashboard_api` endpoint + FleetOps SPA (other
|
||||||
repos; see `docs/dashboard-api-contract.md`), FleetNow **dispatch** off `geog`,
|
repos; see `docs/dashboard-api-contract.md`), FleetNow **dispatch** off `geog`,
|
||||||
**team closure attribution**. **CRQ** = separate future project reusing this
|
**team closure attribution**.
|
||||||
machinery against `automations/crq/`.
|
|
||||||
|
**CRQ** (this milestone): the shared engine now feeds `tickets.crq` from
|
||||||
|
`automations/crq/changes/` (`crq/import_crq.py`), with the `tickets.crq` table + typed columns (migration 15) and
|
||||||
|
cross-dataset geocoding — CRQ shows on the Tickets map via `fn_tickets_for_map` (which
|
||||||
|
already unions it) and gets its own FleetOps tab. Deferred to a follow-up once
|
||||||
|
installation-lifecycle semantics are confirmed: the CRQ analogues of migrations
|
||||||
|
08/09/10 — `crq_open_sla`, `fn_crq_dashboard`, and CRQ history capture (`tickets.crq`
|
||||||
|
currently has **no** `post_apply` hook).
|
||||||
|
|
|
||||||
|
|
@ -82,6 +82,11 @@ Deployed on **Coolify** (own app, `Dockerfile`, keep-alive worker). Ingest runs
|
||||||
**Scheduled Task**: `python import_tickets.py --from-bucket --apply`, cron
|
**Scheduled Task**: `python import_tickets.py --from-bucket --apply`, cron
|
||||||
`15 7-19 * * *` in **EAT**. Env: `DATABASE_URL`, `RUSTFS_*`, `GEOCODER_*`.
|
`15 7-19 * * *` in **EAT**. Env: `DATABASE_URL`, `RUSTFS_*`, `GEOCODER_*`.
|
||||||
|
|
||||||
|
> **Superseded** (historical Phase-1 plan). As built: the loader is now the shared
|
||||||
|
> `pipeline.py` engine with thin entrypoints (`python -m inc.import_inc` / `-m
|
||||||
|
> crq.import_crq`), running as **two** Scheduled Tasks at cron `*/20 6-20 * * *`. See
|
||||||
|
> `implementation.md` and `deployment-and-operations.md`.
|
||||||
|
|
||||||
## Data-quality findings (carried into Phase 2)
|
## Data-quality findings (carried into Phase 2)
|
||||||
|
|
||||||
- Source `sla_status` ≠ a plain 48h rule, and `mttr` is not wall-clock — pin the
|
- Source `sla_status` ≠ a plain 48h rule, and `mttr` is not wall-clock — pin the
|
||||||
|
|
|
||||||
0
inc/__init__.py
Normal file
0
inc/__init__.py
Normal file
74
inc/import_inc.py
Normal file
74
inc/import_inc.py
Normal file
|
|
@ -0,0 +1,74 @@
|
||||||
|
"""
|
||||||
|
inc/import_inc.py — Fireside Communications · INC (incident / fault) ingestion.
|
||||||
|
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||||||
|
Thin entrypoint over the shared engine (`pipeline.py`) for the INC dataset:
|
||||||
|
tickets.inc — incidents / customer faults (FleetOps "Tickets" INC tab)
|
||||||
|
|
||||||
|
INC reads the incremental CDC change stream automations/inc/changes/<EAT-ts>.csv
|
||||||
|
from the `isptickets` bucket, upserts on ticket_id, advances the per-dataset
|
||||||
|
watermark, archives each file to automations/inc/processed/, and — uniquely to
|
||||||
|
INC — runs tickets.capture_history() after each --apply run (closure_events +
|
||||||
|
daily backlog snapshot). Geocoding (--geocode-clusters / --geocode-locations) is
|
||||||
|
CROSS-DATASET and driven from here (the shared gazetteer covers inc + crq).
|
||||||
|
|
||||||
|
Usage (needs DATABASE_URL + RUSTFS_* + GEOCODER_* env; see .env.example):
|
||||||
|
python -m inc.import_inc --from-bucket --apply
|
||||||
|
python -m inc.import_inc --from-bucket --reseed --apply # one-time bucket cutover
|
||||||
|
python -m inc.import_inc --inc-csv 2026-06-15T17-00-00.csv --apply
|
||||||
|
python -m inc.import_inc --geocode-clusters --apply
|
||||||
|
python -m inc.import_inc --geocode-locations --apply
|
||||||
|
|
||||||
|
Pre-requisite: migrations applied (run_migrations.py) — tickets.inc/crq +
|
||||||
|
geo_clusters + geo_locations + reporting.fn_tickets_for_map.
|
||||||
|
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
|
||||||
|
import pipeline
|
||||||
|
|
||||||
|
# INC captures closure/backlog history after every --apply run (CRQ does not yet).
|
||||||
|
DATASET = pipeline.make_dataset("inc", post_apply=pipeline.capture_history)
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
ap = argparse.ArgumentParser(description="Ingest INC tickets from CSV (raw-first) + geocode")
|
||||||
|
ap.add_argument("--apply", action="store_true", help="Write to DB (default: dry-run)")
|
||||||
|
ap.add_argument("--from-bucket", action="store_true",
|
||||||
|
help="Drain the incremental INC change stream (automations/inc/changes/) "
|
||||||
|
"from the isptickets S3 bucket: every not-yet-processed file "
|
||||||
|
"oldest→newest, upsert on ticket_id, advance the watermark, archive")
|
||||||
|
ap.add_argument("--reseed", action="store_true",
|
||||||
|
help="Ignore the stored watermark and drain every file in changes/ once "
|
||||||
|
"(one-time bucket cutover / reseed). Use with --from-bucket --apply")
|
||||||
|
ap.add_argument("--inc-csv", dest="local_csv", default=None,
|
||||||
|
help="Local INC tickets CSV file (dev)")
|
||||||
|
ap.add_argument("--geocode-clusters", action="store_true",
|
||||||
|
help="Geocode distinct clusters (inc+crq) into the gazetteer, then re-resolve")
|
||||||
|
ap.add_argument("--geocode-locations", action="store_true",
|
||||||
|
help="Geocode actionable inc+crq location_names precisely (keyed provider), "
|
||||||
|
"then re-resolve")
|
||||||
|
ap.add_argument("--capture-history", action="store_true",
|
||||||
|
help="Run tickets.capture_history() standalone "
|
||||||
|
"(closure_events + daily snapshot)")
|
||||||
|
args = ap.parse_args()
|
||||||
|
|
||||||
|
if args.geocode_clusters:
|
||||||
|
pipeline.geocode_clusters(apply=args.apply)
|
||||||
|
return
|
||||||
|
if args.geocode_locations:
|
||||||
|
pipeline.geocode_locations(apply=args.apply)
|
||||||
|
return
|
||||||
|
if args.capture_history:
|
||||||
|
pipeline.capture_history()
|
||||||
|
return
|
||||||
|
if not (args.from_bucket or args.local_csv):
|
||||||
|
ap.error("provide --from-bucket, --inc-csv, --geocode-clusters, "
|
||||||
|
"--geocode-locations, or --capture-history")
|
||||||
|
pipeline.ingest(DATASET, args)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
89
migrations/13_inc_search_fn.sql
Normal file
89
migrations/13_inc_search_fn.sql
Normal file
|
|
@ -0,0 +1,89 @@
|
||||||
|
-- 13_inc_search_fn.sql — fleettickets · INC ticket explorer (search) function
|
||||||
|
-- ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
-- reporting.fn_inc_search — ad-hoc ticket lookup by id / engineer / cluster /
|
||||||
|
-- status / state / time, for the FleetOps "Ticket explorer" card. Returns
|
||||||
|
-- { count, truncated, limit, state, rows }. Consumed by dashboard_api
|
||||||
|
-- GET /webhook/inc-search.
|
||||||
|
--
|
||||||
|
-- RECOVERED INTO VERSION CONTROL 2026-06-26: this migration was applied to the live
|
||||||
|
-- DB on 2026-06-19 but the file was never committed. Recovered verbatim from the live
|
||||||
|
-- definition (pg_get_functiondef) so a fresh DB rebuilds faithfully; the live ledger
|
||||||
|
-- already lists it, so run_migrations skips it there. The crq mirror is in 16.
|
||||||
|
-- Idempotent (CREATE OR REPLACE).
|
||||||
|
-- ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
SET search_path = tickets, public;
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION reporting.fn_inc_search(
|
||||||
|
p_ticket_id text DEFAULT NULL,
|
||||||
|
p_owner text DEFAULT NULL,
|
||||||
|
p_cluster text DEFAULT NULL,
|
||||||
|
p_status text DEFAULT NULL,
|
||||||
|
p_state text DEFAULT 'closed',
|
||||||
|
p_from timestamptz DEFAULT NULL,
|
||||||
|
p_to timestamptz DEFAULT NULL,
|
||||||
|
p_limit integer DEFAULT 500
|
||||||
|
)
|
||||||
|
RETURNS jsonb LANGUAGE plpgsql STABLE AS $function$
|
||||||
|
DECLARE
|
||||||
|
v_state text := lower(COALESCE(NULLIF(p_state, ''), 'closed'));
|
||||||
|
v_limit integer := LEAST(GREATEST(COALESCE(p_limit, 500), 1), 5000);
|
||||||
|
v_result jsonb;
|
||||||
|
BEGIN
|
||||||
|
p_ticket_id := NULLIF(trim(p_ticket_id), '');
|
||||||
|
p_owner := NULLIF(trim(p_owner), '');
|
||||||
|
p_cluster := NULLIF(p_cluster, '');
|
||||||
|
p_status := NULLIF(p_status, '');
|
||||||
|
|
||||||
|
WITH hits AS (
|
||||||
|
SELECT ticket_id, normalized_status, cluster, region, location_name,
|
||||||
|
initcap(lower(NULLIF(owner, ''))) AS owner, assigned_team,
|
||||||
|
sla_status, mttr, closed_at, created_at_service, is_actionable,
|
||||||
|
CASE WHEN geom IS NOT NULL THEN ST_Y(geom) END AS lat,
|
||||||
|
CASE WHEN geom IS NOT NULL THEN ST_X(geom) END AS lng
|
||||||
|
FROM tickets.inc
|
||||||
|
WHERE (p_ticket_id IS NULL OR ticket_id ILIKE '%' || p_ticket_id || '%')
|
||||||
|
AND (p_owner IS NULL OR lower(owner) LIKE '%' || lower(p_owner) || '%')
|
||||||
|
AND (p_cluster IS NULL OR cluster = p_cluster)
|
||||||
|
AND (p_status IS NULL OR normalized_status = p_status)
|
||||||
|
AND CASE v_state
|
||||||
|
WHEN 'open' THEN COALESCE(is_actionable, false)
|
||||||
|
WHEN 'all' THEN COALESCE(is_actionable, false)
|
||||||
|
OR (closed_at IS NOT NULL
|
||||||
|
AND (p_from IS NULL OR closed_at >= p_from)
|
||||||
|
AND (p_to IS NULL OR closed_at < p_to))
|
||||||
|
ELSE NOT COALESCE(is_actionable, false) -- 'closed'
|
||||||
|
AND closed_at IS NOT NULL
|
||||||
|
AND (p_from IS NULL OR closed_at >= p_from)
|
||||||
|
AND (p_to IS NULL OR closed_at < p_to)
|
||||||
|
END
|
||||||
|
),
|
||||||
|
total AS (SELECT count(*) AS n FROM hits),
|
||||||
|
page AS (
|
||||||
|
SELECT * FROM hits
|
||||||
|
ORDER BY closed_at DESC NULLS LAST, created_at_service DESC NULLS LAST
|
||||||
|
LIMIT v_limit
|
||||||
|
)
|
||||||
|
SELECT jsonb_build_object(
|
||||||
|
'count', (SELECT n FROM total),
|
||||||
|
'truncated', (SELECT n FROM total) > v_limit,
|
||||||
|
'limit', v_limit,
|
||||||
|
'state', v_state,
|
||||||
|
'rows', COALESCE((SELECT jsonb_agg(to_jsonb(page)
|
||||||
|
ORDER BY page.closed_at DESC NULLS LAST,
|
||||||
|
page.created_at_service DESC NULLS LAST)
|
||||||
|
FROM page), '[]'::jsonb)
|
||||||
|
) INTO v_result;
|
||||||
|
|
||||||
|
RETURN v_result;
|
||||||
|
END $function$;
|
||||||
|
|
||||||
|
DO $grants$
|
||||||
|
BEGIN
|
||||||
|
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'dashboard_ro') THEN
|
||||||
|
GRANT EXECUTE ON FUNCTION reporting.fn_inc_search(text, text, text, text, text, timestamptz, timestamptz, integer) TO dashboard_ro;
|
||||||
|
END IF;
|
||||||
|
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'grafana_ro') THEN
|
||||||
|
GRANT EXECUTE ON FUNCTION reporting.fn_inc_search(text, text, text, text, text, timestamptz, timestamptz, integer) TO grafana_ro;
|
||||||
|
END IF;
|
||||||
|
END $grants$;
|
||||||
37
migrations/14_inc_filter_options.sql
Normal file
37
migrations/14_inc_filter_options.sql
Normal file
|
|
@ -0,0 +1,37 @@
|
||||||
|
-- 14_inc_filter_options.sql — fleettickets · INC explorer dropdown options
|
||||||
|
-- ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
-- reporting.fn_inc_filter_options — distinct engineers (owner), clusters, and the
|
||||||
|
-- ids of currently-open tickets, for the FleetOps "Ticket explorer" dropdowns.
|
||||||
|
-- Consumed by dashboard_api GET /webhook/inc-filter-options.
|
||||||
|
--
|
||||||
|
-- RECOVERED INTO VERSION CONTROL 2026-06-26: applied to the live DB 2026-06-19 but
|
||||||
|
-- never committed. Recovered verbatim from the live definition so a fresh DB rebuilds
|
||||||
|
-- faithfully; the live ledger already lists it (run_migrations skips it there). The crq
|
||||||
|
-- mirror is in 16. Idempotent (CREATE OR REPLACE).
|
||||||
|
-- ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
SET search_path = tickets, public;
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION reporting.fn_inc_filter_options()
|
||||||
|
RETURNS jsonb LANGUAGE sql STABLE AS $function$
|
||||||
|
SELECT jsonb_build_object(
|
||||||
|
'owners', (SELECT COALESCE(jsonb_agg(o ORDER BY o), '[]'::jsonb)
|
||||||
|
FROM (SELECT DISTINCT initcap(lower(NULLIF(owner, ''))) AS o
|
||||||
|
FROM tickets.inc WHERE NULLIF(owner, '') IS NOT NULL) s),
|
||||||
|
'clusters', (SELECT COALESCE(jsonb_agg(c ORDER BY c), '[]'::jsonb)
|
||||||
|
FROM (SELECT DISTINCT cluster AS c
|
||||||
|
FROM tickets.inc WHERE NULLIF(cluster, '') IS NOT NULL) s),
|
||||||
|
'open_ticket_ids', (SELECT COALESCE(jsonb_agg(ticket_id ORDER BY ticket_id), '[]'::jsonb)
|
||||||
|
FROM tickets.inc WHERE COALESCE(is_actionable, false))
|
||||||
|
);
|
||||||
|
$function$;
|
||||||
|
|
||||||
|
DO $grants$
|
||||||
|
BEGIN
|
||||||
|
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'dashboard_ro') THEN
|
||||||
|
GRANT EXECUTE ON FUNCTION reporting.fn_inc_filter_options() TO dashboard_ro;
|
||||||
|
END IF;
|
||||||
|
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'grafana_ro') THEN
|
||||||
|
GRANT EXECUTE ON FUNCTION reporting.fn_inc_filter_options() TO grafana_ro;
|
||||||
|
END IF;
|
||||||
|
END $grants$;
|
||||||
101
migrations/15_crq_table.sql
Normal file
101
migrations/15_crq_table.sql
Normal file
|
|
@ -0,0 +1,101 @@
|
||||||
|
-- 15_crq_table.sql — fleettickets · materialize tickets.crq + typed columns
|
||||||
|
-- ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
-- Why a NEW migration (not an edit to 01): `01_tickets_schema.sql` was applied to the
|
||||||
|
-- live DB on 2026-06-15 from a version that PREDATED its `tickets.crq` section, so the
|
||||||
|
-- IF-NOT-EXISTS ledger guard has kept crq from ever being created there — even though
|
||||||
|
-- the live `reporting.fn_tickets_for_map` and `tickets.resolve_ticket_geoms` already
|
||||||
|
-- reference it (they error if called until crq exists). This migration creates
|
||||||
|
-- `tickets.crq` self-containedly (table + geom trigger + indexes) and adds the same
|
||||||
|
-- typed STORED generated columns INC got in `03_inc_columns.sql`, bringing CRQ to
|
||||||
|
-- data-layer parity.
|
||||||
|
--
|
||||||
|
-- Deterministic + idempotent — converges to the same shape on BOTH:
|
||||||
|
-- • the live DB (crq missing) -> CREATE makes it, ALTER adds typed cols
|
||||||
|
-- • a fresh DB (crq minimal, from 01) -> CREATE skipped, ALTER adds typed cols
|
||||||
|
-- Reuses shared objects already present: tickets.tg_ticket_geom() (01),
|
||||||
|
-- tickets.norm_cluster() (01), tickets.eat_ts() (03).
|
||||||
|
--
|
||||||
|
-- NOTE: the live DB also carries un-versioned migrations 13_inc_search_fn.sql /
|
||||||
|
-- 14_inc_filter_options.sql (applied 2026-06-19, absent from this repo) — INC dashboard
|
||||||
|
-- functions, unrelated to CRQ. Numbered 15 here to sit cleanly after the live ledger.
|
||||||
|
-- ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
SET search_path = tickets, public;
|
||||||
|
|
||||||
|
-- ── table (base shape mirrors tickets.inc's original 01 base) ────────────────
|
||||||
|
CREATE TABLE IF NOT EXISTS tickets.crq (
|
||||||
|
ticket_id text PRIMARY KEY,
|
||||||
|
raw jsonb NOT NULL,
|
||||||
|
geom geometry(Point, 4326),
|
||||||
|
geo_source text, -- 'feed' | 'location' | 'cluster' | 'none'
|
||||||
|
ingested_at timestamptz NOT NULL DEFAULT now()
|
||||||
|
);
|
||||||
|
|
||||||
|
-- ── geom trigger — read from raw; shared tickets.tg_ticket_geom() (from 01) ───
|
||||||
|
DROP TRIGGER IF EXISTS trg_crq_geom ON tickets.crq;
|
||||||
|
CREATE TRIGGER trg_crq_geom BEFORE INSERT OR UPDATE ON tickets.crq
|
||||||
|
FOR EACH ROW EXECUTE FUNCTION tickets.tg_ticket_geom();
|
||||||
|
|
||||||
|
-- ── raw-based indexes (mirror 01's inc/crq set) ──────────────────────────────
|
||||||
|
CREATE INDEX IF NOT EXISTS ix_crq_status_raw ON tickets.crq ((raw->>'normalized_status'));
|
||||||
|
CREATE INDEX IF NOT EXISTS ix_crq_actionable_raw ON tickets.crq (((raw->>'is_actionable')::boolean))
|
||||||
|
WHERE (raw->>'is_actionable')::boolean;
|
||||||
|
CREATE INDEX IF NOT EXISTS ix_crq_cluster_raw ON tickets.crq (tickets.norm_cluster(raw->>'cluster'));
|
||||||
|
CREATE INDEX IF NOT EXISTS ix_crq_loc_raw ON tickets.crq (tickets.norm_cluster(raw->>'location_name'));
|
||||||
|
CREATE INDEX IF NOT EXISTS ix_crq_geom ON tickets.crq USING gist (geom);
|
||||||
|
|
||||||
|
-- ── typed STORED generated columns (mirror of 03_inc_columns.sql) ────────────
|
||||||
|
-- Computed for ALL existing rows on creation + auto-recomputed on every insert/update;
|
||||||
|
-- `raw` stays the source of truth. tickets.eat_ts() (EAT->timestamptz, IMMUTABLE) is
|
||||||
|
-- reused from 03 — see that file's note on why IMMUTABLE is safe for Kenya (UTC+3, no DST).
|
||||||
|
ALTER TABLE tickets.crq
|
||||||
|
-- text
|
||||||
|
ADD COLUMN IF NOT EXISTS service_type text GENERATED ALWAYS AS (raw->>'service_type') STORED,
|
||||||
|
ADD COLUMN IF NOT EXISTS bucket text GENERATED ALWAYS AS (raw->>'bucket') STORED,
|
||||||
|
ADD COLUMN IF NOT EXISTS raw_status text GENERATED ALWAYS AS (raw->>'raw_status') STORED,
|
||||||
|
ADD COLUMN IF NOT EXISTS normalized_status text GENERATED ALWAYS AS (raw->>'normalized_status') STORED,
|
||||||
|
ADD COLUMN IF NOT EXISTS cluster text GENERATED ALWAYS AS (raw->>'cluster') STORED,
|
||||||
|
ADD COLUMN IF NOT EXISTS region text GENERATED ALWAYS AS (raw->>'region') STORED,
|
||||||
|
ADD COLUMN IF NOT EXISTS location_name text GENERATED ALWAYS AS (raw->>'location_name') STORED,
|
||||||
|
ADD COLUMN IF NOT EXISTS assigned_team text GENERATED ALWAYS AS (raw->>'assigned_team') STORED,
|
||||||
|
ADD COLUMN IF NOT EXISTS owner text GENERATED ALWAYS AS (raw->>'owner') STORED,
|
||||||
|
ADD COLUMN IF NOT EXISTS sla_status text GENERATED ALWAYS AS (raw->>'sla_status') STORED,
|
||||||
|
-- numeric / float
|
||||||
|
ADD COLUMN IF NOT EXISTS mttr numeric GENERATED ALWAYS AS (NULLIF(raw->>'mttr','')::numeric) STORED,
|
||||||
|
ADD COLUMN IF NOT EXISTS latitude double precision GENERATED ALWAYS AS (NULLIF(raw->>'latitude','')::double precision) STORED,
|
||||||
|
ADD COLUMN IF NOT EXISTS longitude double precision GENERATED ALWAYS AS (NULLIF(raw->>'longitude','')::double precision) STORED,
|
||||||
|
-- boolean
|
||||||
|
ADD COLUMN IF NOT EXISTS is_actionable boolean GENERATED ALWAYS AS (NULLIF(raw->>'is_actionable','')::boolean) STORED,
|
||||||
|
ADD COLUMN IF NOT EXISTS is_auto_created boolean GENERATED ALWAYS AS (NULLIF(raw->>'is_auto_created','')::boolean) STORED,
|
||||||
|
ADD COLUMN IF NOT EXISTS is_auto_closed boolean GENERATED ALWAYS AS (NULLIF(raw->>'is_auto_closed','')::boolean) STORED,
|
||||||
|
ADD COLUMN IF NOT EXISTS is_alarm boolean GENERATED ALWAYS AS (NULLIF(raw->>'is_alarm','')::boolean) STORED,
|
||||||
|
-- timestamps (EAT wall-clock -> timestamptz). created_at/updated_at are the EXPORT
|
||||||
|
-- pipeline's bookkeeping (not ticket lifecycle), hence the source_ prefix.
|
||||||
|
ADD COLUMN IF NOT EXISTS created_at_service timestamptz GENERATED ALWAYS AS (tickets.eat_ts(raw->>'created_at_service')) STORED,
|
||||||
|
ADD COLUMN IF NOT EXISTS scheduled_at timestamptz GENERATED ALWAYS AS (tickets.eat_ts(raw->>'scheduled_at')) STORED,
|
||||||
|
ADD COLUMN IF NOT EXISTS closed_at timestamptz GENERATED ALWAYS AS (tickets.eat_ts(raw->>'closed_at')) STORED,
|
||||||
|
ADD COLUMN IF NOT EXISTS last_seen_at timestamptz GENERATED ALWAYS AS (tickets.eat_ts(raw->>'last_seen_at')) STORED,
|
||||||
|
ADD COLUMN IF NOT EXISTS first_seen_at timestamptz GENERATED ALWAYS AS (tickets.eat_ts(raw->>'first_seen_at')) STORED,
|
||||||
|
ADD COLUMN IF NOT EXISTS source_created_at timestamptz GENERATED ALWAYS AS (tickets.eat_ts(raw->>'created_at')) STORED,
|
||||||
|
ADD COLUMN IF NOT EXISTS source_updated_at timestamptz GENERATED ALWAYS AS (tickets.eat_ts(raw->>'updated_at')) STORED;
|
||||||
|
|
||||||
|
-- ── typed-column indexes (serve cluster / team / closure queries) ────────────
|
||||||
|
CREATE INDEX IF NOT EXISTS ix_crq_norm_status_col ON tickets.crq (normalized_status);
|
||||||
|
CREATE INDEX IF NOT EXISTS ix_crq_cluster_col ON tickets.crq (cluster);
|
||||||
|
CREATE INDEX IF NOT EXISTS ix_crq_assigned_team ON tickets.crq (assigned_team);
|
||||||
|
CREATE INDEX IF NOT EXISTS ix_crq_closed_at ON tickets.crq (closed_at);
|
||||||
|
CREATE INDEX IF NOT EXISTS ix_crq_actionable_col ON tickets.crq (is_actionable) WHERE is_actionable;
|
||||||
|
|
||||||
|
-- ── grants (guarded: roles may not exist on a fresh DB) ──────────────────────
|
||||||
|
DO $grants$
|
||||||
|
BEGIN
|
||||||
|
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'tracksolid_owner') THEN
|
||||||
|
GRANT SELECT, INSERT, UPDATE, DELETE ON tickets.crq TO tracksolid_owner;
|
||||||
|
END IF;
|
||||||
|
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'dashboard_ro') THEN
|
||||||
|
GRANT SELECT ON tickets.crq TO dashboard_ro;
|
||||||
|
END IF;
|
||||||
|
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'grafana_ro') THEN
|
||||||
|
GRANT SELECT ON tickets.crq TO grafana_ro;
|
||||||
|
END IF;
|
||||||
|
END $grants$;
|
||||||
297
migrations/16_crq_dashboard.sql
Normal file
297
migrations/16_crq_dashboard.sql
Normal file
|
|
@ -0,0 +1,297 @@
|
||||||
|
-- 16_crq_dashboard.sql — fleettickets · CRQ dashboard parity (view + read functions)
|
||||||
|
-- ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
-- Brings CRQ to FleetOps-dashboard parity with INC, so the Tickets tab's CRQ
|
||||||
|
-- sub-tab works "just like INC". Mirrors, against tickets.crq:
|
||||||
|
-- tickets.crq_open_sla ← mirror of tickets.inc_open_sla (08)
|
||||||
|
-- reporting.fn_crq_dashboard ← mirror of reporting.fn_inc_dashboard (09/12)
|
||||||
|
-- reporting.fn_crq_search ← mirror of reporting.fn_inc_search (13)
|
||||||
|
-- reporting.fn_crq_filter_options ← mirror of reporting.fn_inc_filter_options (14)
|
||||||
|
-- consumed by dashboard_api GET /webhook/crq-dashboard | crq-search | crq-filter-options.
|
||||||
|
--
|
||||||
|
-- Differences from the INC view: tickets.crq has no `geog` column (mig 05 is INC-only)
|
||||||
|
-- and its latitude/longitude come from `raw` (empty in the feed), so crq_open_sla omits
|
||||||
|
-- geog and derives latitude/longitude from `geom`. The 48h SLA rule is reused verbatim
|
||||||
|
-- for layout parity — installation-lifecycle SLA semantics may be refined later.
|
||||||
|
--
|
||||||
|
-- Idempotent (CREATE OR REPLACE / VIEW). Requires migration 15 (tickets.crq + typed cols).
|
||||||
|
-- ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
SET search_path = tickets, public;
|
||||||
|
|
||||||
|
-- ── crq_open_sla — open CRQ tickets with derived SLA (mirror of inc_open_sla) ─
|
||||||
|
CREATE OR REPLACE VIEW tickets.crq_open_sla AS
|
||||||
|
SELECT
|
||||||
|
ticket_id,
|
||||||
|
normalized_status,
|
||||||
|
bucket,
|
||||||
|
cluster,
|
||||||
|
region,
|
||||||
|
location_name,
|
||||||
|
assigned_team,
|
||||||
|
owner,
|
||||||
|
sla_status AS source_sla_status,
|
||||||
|
mttr, -- minutes (null until closed)
|
||||||
|
COALESCE(created_at_service, first_seen_at) AS sla_clock,
|
||||||
|
CASE WHEN created_at_service IS NOT NULL THEN 'service' ELSE 'first_seen' END AS sla_clock_source,
|
||||||
|
round((EXTRACT(EPOCH FROM now() - COALESCE(created_at_service, first_seen_at)) / 3600)::numeric, 1) AS hours_open,
|
||||||
|
CASE
|
||||||
|
WHEN COALESCE(created_at_service, first_seen_at) IS NULL THEN 'unknown'
|
||||||
|
WHEN now() - COALESCE(created_at_service, first_seen_at) >= interval '48h' THEN 'breached'
|
||||||
|
WHEN now() - COALESCE(created_at_service, first_seen_at) >= interval '36h' THEN 'at_risk'
|
||||||
|
ELSE 'ok'
|
||||||
|
END AS sla_state,
|
||||||
|
created_at_service,
|
||||||
|
first_seen_at,
|
||||||
|
scheduled_at,
|
||||||
|
CASE WHEN geom IS NOT NULL THEN ST_Y(geom) END AS latitude,
|
||||||
|
CASE WHEN geom IS NOT NULL THEN ST_X(geom) END AS longitude,
|
||||||
|
geo_source,
|
||||||
|
geom
|
||||||
|
FROM tickets.crq
|
||||||
|
WHERE is_actionable;
|
||||||
|
|
||||||
|
COMMENT ON VIEW tickets.crq_open_sla IS
|
||||||
|
'Open (is_actionable) CRQ tickets with derived SLA (48h rule; clock = created_at_service '
|
||||||
|
'or first_seen_at fallback). Mirror of inc_open_sla; no geog. fleettickets 16.';
|
||||||
|
|
||||||
|
-- ── fn_crq_dashboard — mirror of fn_inc_dashboard over tickets.crq ───────────
|
||||||
|
CREATE OR REPLACE FUNCTION reporting.fn_crq_dashboard(
|
||||||
|
p_cluster text DEFAULT NULL,
|
||||||
|
p_status text DEFAULT NULL,
|
||||||
|
p_window text DEFAULT 'today',
|
||||||
|
p_from timestamptz DEFAULT NULL,
|
||||||
|
p_to timestamptz DEFAULT NULL
|
||||||
|
)
|
||||||
|
RETURNS jsonb LANGUAGE plpgsql STABLE AS $function$
|
||||||
|
DECLARE
|
||||||
|
v_now_eat timestamp;
|
||||||
|
v_from timestamptz;
|
||||||
|
v_to timestamptz;
|
||||||
|
v_preset text;
|
||||||
|
v_days numeric;
|
||||||
|
v_result jsonb;
|
||||||
|
BEGIN
|
||||||
|
p_cluster := NULLIF(p_cluster, '');
|
||||||
|
p_status := NULLIF(p_status, '');
|
||||||
|
v_now_eat := now() AT TIME ZONE 'Africa/Nairobi';
|
||||||
|
|
||||||
|
-- ── resolve the window ──────────────────────────────────────────────────────
|
||||||
|
IF p_from IS NOT NULL OR p_to IS NOT NULL THEN
|
||||||
|
v_preset := 'custom';
|
||||||
|
v_from := COALESCE(p_from, '-infinity'::timestamptz);
|
||||||
|
v_to := COALESCE(p_to, 'infinity'::timestamptz);
|
||||||
|
ELSE
|
||||||
|
v_preset := lower(COALESCE(NULLIF(p_window, ''), 'today'));
|
||||||
|
IF v_preset = 'week' THEN
|
||||||
|
v_from := date_trunc('week', v_now_eat) AT TIME ZONE 'Africa/Nairobi';
|
||||||
|
v_to := (date_trunc('week', v_now_eat) + interval '1 week') AT TIME ZONE 'Africa/Nairobi';
|
||||||
|
ELSIF v_preset = 'month' THEN
|
||||||
|
v_from := date_trunc('month', v_now_eat) AT TIME ZONE 'Africa/Nairobi';
|
||||||
|
v_to := (date_trunc('month', v_now_eat) + interval '1 month') AT TIME ZONE 'Africa/Nairobi';
|
||||||
|
ELSE
|
||||||
|
v_preset := 'today';
|
||||||
|
v_from := date_trunc('day', v_now_eat) AT TIME ZONE 'Africa/Nairobi';
|
||||||
|
v_to := (date_trunc('day', v_now_eat) + interval '1 day') AT TIME ZONE 'Africa/Nairobi';
|
||||||
|
END IF;
|
||||||
|
END IF;
|
||||||
|
|
||||||
|
IF v_from > '-infinity'::timestamptz AND v_to < 'infinity'::timestamptz THEN
|
||||||
|
v_days := GREATEST(EXTRACT(EPOCH FROM (v_to - v_from)) / 86400.0, 1);
|
||||||
|
ELSE
|
||||||
|
v_days := NULL; -- open-ended custom window → per-day average not meaningful
|
||||||
|
END IF;
|
||||||
|
|
||||||
|
-- ── build payload ───────────────────────────────────────────────────────────
|
||||||
|
WITH open_t AS (
|
||||||
|
SELECT * FROM tickets.crq_open_sla
|
||||||
|
WHERE (p_cluster IS NULL OR cluster = p_cluster)
|
||||||
|
AND (p_status IS NULL OR normalized_status = p_status)
|
||||||
|
),
|
||||||
|
closed_t AS (
|
||||||
|
SELECT ticket_id, normalized_status, cluster, region, location_name,
|
||||||
|
assigned_team, owner, closed_at, mttr, sla_status, geo_source, geom
|
||||||
|
FROM tickets.crq
|
||||||
|
WHERE NOT COALESCE(is_actionable, false)
|
||||||
|
AND closed_at IS NOT NULL
|
||||||
|
AND closed_at >= v_from AND closed_at < v_to
|
||||||
|
AND (p_cluster IS NULL OR cluster = p_cluster)
|
||||||
|
AND (p_status IS NULL OR normalized_status = p_status)
|
||||||
|
)
|
||||||
|
SELECT jsonb_build_object(
|
||||||
|
'window', jsonb_build_object('from', v_from, 'to', v_to, 'preset', v_preset),
|
||||||
|
|
||||||
|
'open', jsonb_build_object(
|
||||||
|
'type', 'FeatureCollection',
|
||||||
|
'features', COALESCE((
|
||||||
|
SELECT jsonb_agg(jsonb_build_object(
|
||||||
|
'type', 'Feature',
|
||||||
|
'properties', jsonb_build_object(
|
||||||
|
'ticket_id', ticket_id, 'normalized_status', normalized_status,
|
||||||
|
'cluster', cluster, 'region', region, 'location_name', location_name,
|
||||||
|
'assigned_team', assigned_team, 'owner', initcap(lower(NULLIF(owner, ''))),
|
||||||
|
'geo_source', geo_source,
|
||||||
|
'sla_state', sla_state, 'hours_open', hours_open),
|
||||||
|
'geometry', ST_AsGeoJSON(geom)::jsonb))
|
||||||
|
FROM open_t WHERE geom IS NOT NULL), '[]'::jsonb)
|
||||||
|
),
|
||||||
|
|
||||||
|
'closed', jsonb_build_object(
|
||||||
|
'type', 'FeatureCollection',
|
||||||
|
'features', COALESCE((
|
||||||
|
SELECT jsonb_agg(jsonb_build_object(
|
||||||
|
'type', 'Feature',
|
||||||
|
'properties', jsonb_build_object(
|
||||||
|
'ticket_id', ticket_id, 'normalized_status', normalized_status,
|
||||||
|
'cluster', cluster, 'region', region, 'location_name', location_name,
|
||||||
|
'assigned_team', assigned_team, 'owner', initcap(lower(NULLIF(owner, ''))),
|
||||||
|
'geo_source', geo_source,
|
||||||
|
'closed_at', closed_at, 'mttr', mttr, 'sla_status', sla_status),
|
||||||
|
'geometry', ST_AsGeoJSON(geom)::jsonb))
|
||||||
|
FROM closed_t WHERE geom IS NOT NULL), '[]'::jsonb)
|
||||||
|
),
|
||||||
|
|
||||||
|
'metrics', jsonb_build_object(
|
||||||
|
'open_now', (SELECT count(*) FROM open_t),
|
||||||
|
'closed_in_window', (SELECT count(*) FROM closed_t),
|
||||||
|
'sla', jsonb_build_object(
|
||||||
|
'open', (SELECT jsonb_build_object(
|
||||||
|
'breached', count(*) FILTER (WHERE sla_state = 'breached'),
|
||||||
|
'at_risk', count(*) FILTER (WHERE sla_state = 'at_risk'),
|
||||||
|
'ok', count(*) FILTER (WHERE sla_state = 'ok'),
|
||||||
|
'unknown', count(*) FILTER (WHERE sla_state = 'unknown')) FROM open_t),
|
||||||
|
'closed', (SELECT jsonb_build_object(
|
||||||
|
'compliant', count(*) FILTER (WHERE sla_status = 'Compliant'),
|
||||||
|
'breached', count(*) FILTER (WHERE sla_status = 'Breached')) FROM closed_t)
|
||||||
|
),
|
||||||
|
'by_status', COALESCE((SELECT jsonb_object_agg(s, c) FROM (
|
||||||
|
SELECT COALESCE(normalized_status, '(none)') AS s, count(*) AS c FROM (
|
||||||
|
SELECT normalized_status FROM open_t
|
||||||
|
UNION ALL SELECT normalized_status FROM closed_t) u GROUP BY 1) z), '{}'::jsonb),
|
||||||
|
'by_cluster', COALESCE((SELECT jsonb_object_agg(cl, c) FROM (
|
||||||
|
SELECT COALESCE(cluster, '(none)') AS cl, count(*) AS c FROM (
|
||||||
|
SELECT cluster FROM open_t
|
||||||
|
UNION ALL SELECT cluster FROM closed_t) u GROUP BY 1) z), '{}'::jsonb),
|
||||||
|
-- closures by engineer (CASE-NORMALIZED owner) — leaderboard for "who closed".
|
||||||
|
'by_owner', COALESCE((SELECT jsonb_agg(jsonb_build_object(
|
||||||
|
'owner', o, 'closed', c, 'breached', b, 'avg_mttr_min', a) ORDER BY c DESC, o)
|
||||||
|
FROM (
|
||||||
|
SELECT COALESCE(initcap(lower(NULLIF(owner, ''))), '(unattributed)') AS o,
|
||||||
|
count(*) AS c,
|
||||||
|
count(*) FILTER (WHERE sla_status = 'Breached') AS b,
|
||||||
|
round(avg(mttr) FILTER (WHERE mttr IS NOT NULL), 1) AS a
|
||||||
|
FROM closed_t GROUP BY 1) z), '[]'::jsonb),
|
||||||
|
'closure_rate', jsonb_build_object(
|
||||||
|
'per_day_avg', CASE WHEN v_days IS NULL THEN NULL
|
||||||
|
ELSE round((SELECT count(*) FROM closed_t)::numeric / v_days, 2) END,
|
||||||
|
'series', COALESCE((SELECT jsonb_agg(jsonb_build_object('day', d, 'count', c) ORDER BY d) FROM (
|
||||||
|
SELECT (closed_at AT TIME ZONE 'Africa/Nairobi')::date AS d, count(*) AS c
|
||||||
|
FROM closed_t GROUP BY 1) z), '[]'::jsonb)
|
||||||
|
),
|
||||||
|
'avg_mttr_min', (SELECT round(avg(mttr), 1) FROM closed_t WHERE mttr IS NOT NULL)
|
||||||
|
),
|
||||||
|
|
||||||
|
'freshness', (SELECT jsonb_object_agg(dataset, jsonb_build_object(
|
||||||
|
'export_type', export_type, 'exported_at', exported_at,
|
||||||
|
'records_ingested', records_ingested, 'ingested_at', ingested_at))
|
||||||
|
FROM tickets.import_meta)
|
||||||
|
) INTO v_result;
|
||||||
|
|
||||||
|
RETURN v_result;
|
||||||
|
END $function$;
|
||||||
|
|
||||||
|
-- ── fn_crq_search — mirror of fn_inc_search over tickets.crq ──────────────────
|
||||||
|
CREATE OR REPLACE FUNCTION reporting.fn_crq_search(
|
||||||
|
p_ticket_id text DEFAULT NULL,
|
||||||
|
p_owner text DEFAULT NULL,
|
||||||
|
p_cluster text DEFAULT NULL,
|
||||||
|
p_status text DEFAULT NULL,
|
||||||
|
p_state text DEFAULT 'closed',
|
||||||
|
p_from timestamptz DEFAULT NULL,
|
||||||
|
p_to timestamptz DEFAULT NULL,
|
||||||
|
p_limit integer DEFAULT 500
|
||||||
|
)
|
||||||
|
RETURNS jsonb LANGUAGE plpgsql STABLE AS $function$
|
||||||
|
DECLARE
|
||||||
|
v_state text := lower(COALESCE(NULLIF(p_state, ''), 'closed'));
|
||||||
|
v_limit integer := LEAST(GREATEST(COALESCE(p_limit, 500), 1), 5000);
|
||||||
|
v_result jsonb;
|
||||||
|
BEGIN
|
||||||
|
p_ticket_id := NULLIF(trim(p_ticket_id), '');
|
||||||
|
p_owner := NULLIF(trim(p_owner), '');
|
||||||
|
p_cluster := NULLIF(p_cluster, '');
|
||||||
|
p_status := NULLIF(p_status, '');
|
||||||
|
|
||||||
|
WITH hits AS (
|
||||||
|
SELECT ticket_id, normalized_status, cluster, region, location_name,
|
||||||
|
initcap(lower(NULLIF(owner, ''))) AS owner, assigned_team,
|
||||||
|
sla_status, mttr, closed_at, created_at_service, is_actionable,
|
||||||
|
CASE WHEN geom IS NOT NULL THEN ST_Y(geom) END AS lat,
|
||||||
|
CASE WHEN geom IS NOT NULL THEN ST_X(geom) END AS lng
|
||||||
|
FROM tickets.crq
|
||||||
|
WHERE (p_ticket_id IS NULL OR ticket_id ILIKE '%' || p_ticket_id || '%')
|
||||||
|
AND (p_owner IS NULL OR lower(owner) LIKE '%' || lower(p_owner) || '%')
|
||||||
|
AND (p_cluster IS NULL OR cluster = p_cluster)
|
||||||
|
AND (p_status IS NULL OR normalized_status = p_status)
|
||||||
|
AND CASE v_state
|
||||||
|
WHEN 'open' THEN COALESCE(is_actionable, false)
|
||||||
|
WHEN 'all' THEN COALESCE(is_actionable, false)
|
||||||
|
OR (closed_at IS NOT NULL
|
||||||
|
AND (p_from IS NULL OR closed_at >= p_from)
|
||||||
|
AND (p_to IS NULL OR closed_at < p_to))
|
||||||
|
ELSE NOT COALESCE(is_actionable, false) -- 'closed'
|
||||||
|
AND closed_at IS NOT NULL
|
||||||
|
AND (p_from IS NULL OR closed_at >= p_from)
|
||||||
|
AND (p_to IS NULL OR closed_at < p_to)
|
||||||
|
END
|
||||||
|
),
|
||||||
|
total AS (SELECT count(*) AS n FROM hits),
|
||||||
|
page AS (
|
||||||
|
SELECT * FROM hits
|
||||||
|
ORDER BY closed_at DESC NULLS LAST, created_at_service DESC NULLS LAST
|
||||||
|
LIMIT v_limit
|
||||||
|
)
|
||||||
|
SELECT jsonb_build_object(
|
||||||
|
'count', (SELECT n FROM total),
|
||||||
|
'truncated', (SELECT n FROM total) > v_limit,
|
||||||
|
'limit', v_limit,
|
||||||
|
'state', v_state,
|
||||||
|
'rows', COALESCE((SELECT jsonb_agg(to_jsonb(page)
|
||||||
|
ORDER BY page.closed_at DESC NULLS LAST,
|
||||||
|
page.created_at_service DESC NULLS LAST)
|
||||||
|
FROM page), '[]'::jsonb)
|
||||||
|
) INTO v_result;
|
||||||
|
|
||||||
|
RETURN v_result;
|
||||||
|
END $function$;
|
||||||
|
|
||||||
|
-- ── fn_crq_filter_options — mirror of fn_inc_filter_options over tickets.crq ──
|
||||||
|
CREATE OR REPLACE FUNCTION reporting.fn_crq_filter_options()
|
||||||
|
RETURNS jsonb LANGUAGE sql STABLE AS $function$
|
||||||
|
SELECT jsonb_build_object(
|
||||||
|
'owners', (SELECT COALESCE(jsonb_agg(o ORDER BY o), '[]'::jsonb)
|
||||||
|
FROM (SELECT DISTINCT initcap(lower(NULLIF(owner, ''))) AS o
|
||||||
|
FROM tickets.crq WHERE NULLIF(owner, '') IS NOT NULL) s),
|
||||||
|
'clusters', (SELECT COALESCE(jsonb_agg(c ORDER BY c), '[]'::jsonb)
|
||||||
|
FROM (SELECT DISTINCT cluster AS c
|
||||||
|
FROM tickets.crq WHERE NULLIF(cluster, '') IS NOT NULL) s),
|
||||||
|
'open_ticket_ids', (SELECT COALESCE(jsonb_agg(ticket_id ORDER BY ticket_id), '[]'::jsonb)
|
||||||
|
FROM tickets.crq WHERE COALESCE(is_actionable, false))
|
||||||
|
);
|
||||||
|
$function$;
|
||||||
|
|
||||||
|
-- ── grants (guarded: roles may not exist on a fresh DB) ──────────────────────
|
||||||
|
DO $grants$
|
||||||
|
BEGIN
|
||||||
|
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'dashboard_ro') THEN
|
||||||
|
GRANT SELECT ON tickets.crq_open_sla TO dashboard_ro;
|
||||||
|
GRANT EXECUTE ON FUNCTION reporting.fn_crq_dashboard(text, text, text, timestamptz, timestamptz) TO dashboard_ro;
|
||||||
|
GRANT EXECUTE ON FUNCTION reporting.fn_crq_search(text, text, text, text, text, timestamptz, timestamptz, integer) TO dashboard_ro;
|
||||||
|
GRANT EXECUTE ON FUNCTION reporting.fn_crq_filter_options() TO dashboard_ro;
|
||||||
|
END IF;
|
||||||
|
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'grafana_ro') THEN
|
||||||
|
GRANT SELECT ON tickets.crq_open_sla TO grafana_ro;
|
||||||
|
GRANT EXECUTE ON FUNCTION reporting.fn_crq_dashboard(text, text, text, timestamptz, timestamptz) TO grafana_ro;
|
||||||
|
GRANT EXECUTE ON FUNCTION reporting.fn_crq_search(text, text, text, text, text, timestamptz, timestamptz, integer) TO grafana_ro;
|
||||||
|
GRANT EXECUTE ON FUNCTION reporting.fn_crq_filter_options() TO grafana_ro;
|
||||||
|
END IF;
|
||||||
|
END $grants$;
|
||||||
|
|
@ -61,10 +61,12 @@ Notes:
|
||||||
listings but contain **no objects** (leftover/legacy; ignore them). There is
|
listings but contain **no objects** (leftover/legacy; ignore them). There is
|
||||||
no `latest` pointer and no JSON/metadata envelope.
|
no `latest` pointer and no JSON/metadata envelope.
|
||||||
- **CRQ mirrors INC**: `automations/crq/changes/` carries the same incremental
|
- **CRQ mirrors INC**: `automations/crq/changes/` carries the same incremental
|
||||||
stream (with matching baseline timestamps and additional newer deltas). CRQ
|
stream (with matching baseline timestamps and additional newer deltas) and the
|
||||||
remains out of scope for `import_tickets.py` (INC-only), but the source-side
|
identical 32-column schema. As of 2026-06-25 CRQ **is consumed** — by
|
||||||
shape is the same. CRQ's old root snapshots (`automations/crq/<ts>.csv`) are
|
`crq/import_crq.py` over the shared `pipeline.py` engine (`tickets.crq`), the same
|
||||||
still present because nothing archives them — they are not consumed.
|
way INC is consumed by `inc/import_inc.py`. CRQ's old root snapshots
|
||||||
|
(`automations/crq/<ts>.csv`, old `tickets` bucket) are still present because nothing
|
||||||
|
archives them — they are not consumed (the `changes/` stream is the source of truth).
|
||||||
|
|
||||||
## CSV Schema
|
## CSV Schema
|
||||||
|
|
||||||
|
|
@ -83,7 +85,7 @@ Each row is a complete record (not a partial diff): a delta row carries the
|
||||||
ticket's full current state, so a plain upsert on `ticket_id` is correct. The
|
ticket's full current state, so a plain upsert on `ticket_id` is correct. The
|
||||||
baseline still contains `is_alarm=true` rows and may include a leading
|
baseline still contains `is_alarm=true` rows and may include a leading
|
||||||
`EXPORT STOPPED…` truncation-sentinel row in `ticket_id`; both are filtered by
|
`EXPORT STOPPED…` truncation-sentinel row in `ticket_id`; both are filtered by
|
||||||
the consumer (see `import_tickets.py`).
|
the consumer (see `pipeline.py` + the `inc/`,`crq/` entrypoints).
|
||||||
|
|
||||||
## Timestamp Format
|
## Timestamp Format
|
||||||
|
|
||||||
|
|
@ -96,11 +98,12 @@ Generated once at the start of each execution, formatted in `Africa/Nairobi`
|
||||||
incremental files appear whenever a change batch is exported (multiple within
|
incremental files appear whenever a change batch is exported (multiple within
|
||||||
the same hour are normal, e.g. `15-50-39` then `15-53-04`).
|
the same hour are normal, e.g. `15-50-39` then `15-53-04`).
|
||||||
|
|
||||||
## How `import_tickets.py` Consumes It
|
## How the loader Consumes It
|
||||||
|
|
||||||
`python import_tickets.py --from-bucket --apply` (see `run_ingest.sh`):
|
`python -m inc.import_inc --from-bucket --apply` (and `python -m crq.import_crq
|
||||||
|
--from-bucket --apply`; see `run_ingest.sh`) — both over the shared `pipeline.py` engine:
|
||||||
|
|
||||||
1. Lists `automations/inc/changes/<ts>.csv`, sorts ascending by timestamp.
|
1. Lists `automations/<inc|crq>/changes/<ts>.csv`, sorts ascending by timestamp.
|
||||||
2. Skips files at/older than the **watermark**
|
2. Skips files at/older than the **watermark**
|
||||||
(`tickets.import_meta.metadata->>'source_max_key'` — the newest file already
|
(`tickets.import_meta.metadata->>'source_max_key'` — the newest file already
|
||||||
applied); on a fresh stream it processes everything present.
|
applied); on a fresh stream it processes everything present.
|
||||||
|
|
|
||||||
|
|
@ -1,12 +1,17 @@
|
||||||
"""
|
"""
|
||||||
import_tickets.py — Fireside Communications · INC ticket ingestion (raw-first)
|
pipeline.py — Fireside Communications · generic ticket ingestion engine (raw-first)
|
||||||
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||||||
Loads the client's field-ops INC ticket snapshots into the `tickets` schema — the
|
The dataset-agnostic core shared by the per-type entrypoints:
|
||||||
source of the FleetOps "Tickets" map.
|
inc/import_inc.py -> tickets.inc (incidents / customer faults)
|
||||||
tickets.inc — incidents / customer faults
|
crq/import_crq.py -> tickets.crq (new-installation requests)
|
||||||
|
|
||||||
STRICTLY INC: CRQ (new-installation) exports are out of scope and not processed
|
Both datasets share an IDENTICAL flat-CSV schema and the same CDC change stream,
|
||||||
here. `tickets.crq` stays in the schema but is not fed by this pipeline.
|
so the only differences are the table, the S3 prefixes, the import_meta dataset
|
||||||
|
key, and an optional post-apply hook (INC captures closure/backlog history; CRQ
|
||||||
|
does not yet). Those are carried by the `Dataset` config; everything else here is
|
||||||
|
generic. Geocoding is inherently CROSS-DATASET (one gazetteer, one geocoder
|
||||||
|
budget): geocode_clusters / geocode_locations / resolve operate on BOTH tables and
|
||||||
|
are driven from a single entrypoint (the INC one) — never duplicated per dataset.
|
||||||
|
|
||||||
RAW-FIRST: each row stores only `ticket_id` + `raw` (the source record as jsonb).
|
RAW-FIRST: each row stores only `ticket_id` + `raw` (the source record as jsonb).
|
||||||
Everything downstream reads from `raw` (resilient to source schema drift). The DB
|
Everything downstream reads from `raw` (resilient to source schema drift). The DB
|
||||||
|
|
@ -14,50 +19,33 @@ derives `geom` (see migrations): feed coords (raw lat/lng) -> location geocode
|
||||||
(tickets.geo_locations) -> cluster centroid (tickets.geo_clusters) -> none.
|
(tickets.geo_locations) -> cluster centroid (tickets.geo_clusters) -> none.
|
||||||
|
|
||||||
Source data: the n8n S3 export writes CSV files to the `isptickets` bucket under
|
Source data: the n8n S3 export writes CSV files to the `isptickets` bucket under
|
||||||
automations/inc/changes/<EAT-timestamp>.csv (e.g. 2026-06-24T09-55-44.csv)
|
automations/<dataset>/changes/<EAT-timestamp>.csv (e.g. 2026-06-24T09-55-44.csv)
|
||||||
This is an INCREMENTAL (CDC) stream: the first file is a full current-state
|
This is an INCREMENTAL (CDC) stream: the first file is a full current-state
|
||||||
baseline, and every later file holds only the rows that CHANGED since the prior
|
baseline, and every later file holds only the rows that CHANGED since the prior
|
||||||
export (new + updated tickets, keyed by ticket_id; deletions are never emitted).
|
export (with periodic full-state re-emissions). Deletions are never emitted. Every
|
||||||
Every file shares the identical flat-CSV schema (header + rows). We ingest EVERY
|
file shares the identical flat-CSV schema. We ingest EVERY not-yet-processed file
|
||||||
not-yet-processed file in ASCENDING timestamp order (baseline first, then each
|
in ASCENDING timestamp order (baseline first, then each delta) — taking only the
|
||||||
delta) — taking only the newest would silently drop the intermediate deltas:
|
newest would silently drop the intermediate deltas:
|
||||||
- drop is_alarm=true rows + the "EXPORT STOPPED…" truncation-sentinel row;
|
- drop is_alarm=true rows + the "EXPORT STOPPED…" truncation-sentinel row;
|
||||||
- drop derivable / provenance / zero-info columns (see DROP_FIELDS);
|
- drop derivable / provenance / zero-info columns (see DROP_FIELDS);
|
||||||
- normalize region -> lowercase, raw_status -> UPPERCASE;
|
- normalize region -> lowercase, raw_status -> UPPERCASE;
|
||||||
- upsert on ticket_id (PRIMARY KEY → no duplication; never delete, so closure
|
- upsert on ticket_id (PRIMARY KEY → no duplication; never delete, so closure
|
||||||
history accumulates), and advance the watermark in tickets.import_meta
|
history accumulates), and advance the watermark in tickets.import_meta
|
||||||
(metadata->>'source_max_key' = newest file applied) so reruns skip what's done;
|
(metadata->>'source_max_key' = newest file applied) so reruns skip what's done;
|
||||||
- on success, MOVE each file to automations/inc/processed/ (copy + delete).
|
- on success, MOVE each file to automations/<dataset>/processed/ (copy + delete).
|
||||||
|
|
||||||
Geocoding (two layers, both via a KEYED provider — public Nominatim rate-limits):
|
|
||||||
--geocode-clusters one coordinate per cluster (coarse fallback; ~50 lookups)
|
|
||||||
--geocode-locations precise per-location for ACTIONABLE INC tickets: parses the
|
|
||||||
real place out of location_name (region+cluster+location_name,
|
|
||||||
network codes stripped), geocodes it, caches in
|
|
||||||
tickets.geo_locations, then re-resolves geoms.
|
|
||||||
Provider/key from env: GEOCODER_PROVIDER (locationiq|opencage), GEOCODER_API_KEY.
|
|
||||||
|
|
||||||
Usage (needs DATABASE_URL + RUSTFS_* + GEOCODER_* env; see .env.example):
|
|
||||||
python import_tickets.py --from-bucket --apply
|
|
||||||
python import_tickets.py --from-bucket --reseed --apply # one-time bucket cutover
|
|
||||||
python import_tickets.py --inc-csv 2026-06-15T17-00-00.csv --apply
|
|
||||||
python import_tickets.py --geocode-clusters --apply
|
|
||||||
python import_tickets.py --geocode-locations --apply
|
|
||||||
|
|
||||||
Pre-requisite: migration applied (run_migrations.py) — tickets.inc/crq +
|
|
||||||
geo_clusters + geo_locations + reporting.fn_tickets_for_map.
|
|
||||||
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import argparse
|
|
||||||
import csv
|
|
||||||
import io
|
import io
|
||||||
|
import csv
|
||||||
import math
|
import math
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
import time
|
import time
|
||||||
|
from collections.abc import Callable
|
||||||
|
from dataclasses import dataclass
|
||||||
from datetime import datetime, timezone, timedelta
|
from datetime import datetime, timezone, timedelta
|
||||||
|
|
||||||
import boto3
|
import boto3
|
||||||
|
|
@ -67,14 +55,10 @@ from botocore.config import Config as BotoConfig
|
||||||
|
|
||||||
from shared import clean, get_conn, get_logger
|
from shared import clean, get_conn, get_logger
|
||||||
|
|
||||||
log = get_logger("import_tickets")
|
log = get_logger("pipeline")
|
||||||
|
|
||||||
# ── INC ingestion config ──────────────────────────────────────────────────────
|
# ── shared ingestion config ─────────────────────────────────────────────────────
|
||||||
_TABLE = "tickets.inc"
|
|
||||||
_DATASET = "inc"
|
|
||||||
_BUCKET = os.getenv("TICKETS_BUCKET", "isptickets")
|
_BUCKET = os.getenv("TICKETS_BUCKET", "isptickets")
|
||||||
_INC_PREFIX = "automations/inc/changes/" # the incremental (CDC) change stream
|
|
||||||
_PROCESSED_PREFIX = "automations/inc/processed/"
|
|
||||||
_EAT = timezone(timedelta(hours=3)) # Africa/Nairobi — filenames + data are EAT
|
_EAT = timezone(timedelta(hours=3)) # Africa/Nairobi — filenames + data are EAT
|
||||||
|
|
||||||
# Garbage row the source leaks (commonly the first data line): its ticket_id is the
|
# Garbage row the source leaks (commonly the first data line): its ticket_id is the
|
||||||
|
|
@ -91,11 +75,6 @@ DROP_FIELDS = frozenset({
|
||||||
"department", "source_type",
|
"department", "source_type",
|
||||||
})
|
})
|
||||||
|
|
||||||
# Only files matching automations/inc/changes/<EAT-timestamp>.csv — the incremental
|
|
||||||
# stream (NOT processed/, NOT the leftover latest.csv/, latest.json/, full/ prefixes).
|
|
||||||
_CHANGE_KEY_RE = re.compile(
|
|
||||||
r"^automations/inc/changes/(\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2})\.csv$")
|
|
||||||
|
|
||||||
# Geocoder (keyed) — public Nominatim rate-limits bulk, so we use LocationIQ/OpenCage.
|
# Geocoder (keyed) — public Nominatim rate-limits bulk, so we use LocationIQ/OpenCage.
|
||||||
_PROVIDER = os.getenv("GEOCODER_PROVIDER", "locationiq").lower()
|
_PROVIDER = os.getenv("GEOCODER_PROVIDER", "locationiq").lower()
|
||||||
_API_KEY = os.getenv("GEOCODER_API_KEY", "")
|
_API_KEY = os.getenv("GEOCODER_API_KEY", "")
|
||||||
|
|
@ -103,12 +82,34 @@ _GEOCODE_INTERVAL_S = float(os.getenv("GEOCODER_MIN_INTERVAL_S", "1.1"))
|
||||||
_last_geocode_at = 0.0
|
_last_geocode_at = 0.0
|
||||||
|
|
||||||
|
|
||||||
|
# ── dataset config (per ticket type) ────────────────────────────────────────────
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class Dataset:
|
||||||
|
"""All that distinguishes one ticket type from another in the generic engine."""
|
||||||
|
name: str # 'inc' | 'crq' (import_meta.dataset)
|
||||||
|
table: str # 'tickets.inc' | 'tickets.crq'
|
||||||
|
change_prefix: str # 'automations/<name>/changes/'
|
||||||
|
processed_prefix: str # 'automations/<name>/processed/'
|
||||||
|
key_regex: re.Pattern # matches a <prefix><EAT-ts>.csv key
|
||||||
|
post_apply: Callable[[], None] | None = None # e.g. capture_history (INC only)
|
||||||
|
|
||||||
|
|
||||||
|
def make_dataset(name: str, post_apply: Callable[[], None] | None = None) -> Dataset:
|
||||||
|
"""Build the standard Dataset for a ticket type (inc/crq) — only the name varies."""
|
||||||
|
return Dataset(
|
||||||
|
name=name,
|
||||||
|
table=f"tickets.{name}",
|
||||||
|
change_prefix=f"automations/{name}/changes/",
|
||||||
|
processed_prefix=f"automations/{name}/processed/",
|
||||||
|
# only automations/<name>/changes/<EAT-timestamp>.csv — the incremental stream
|
||||||
|
# (NOT processed/, NOT the leftover latest.csv/, latest.json/, full/ prefixes).
|
||||||
|
key_regex=re.compile(
|
||||||
|
rf"^automations/{name}/changes/(\d{{4}}-\d{{2}}-\d{{2}}T\d{{2}}-\d{{2}}-\d{{2}})\.csv$"),
|
||||||
|
post_apply=post_apply,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
# ── data loading (CSV · incremental CDC change stream · per-file watermark) ─────
|
# ── data loading (CSV · incremental CDC change stream · per-file watermark) ─────
|
||||||
# The n8n export writes an INCREMENTAL change stream to the `isptickets` bucket under
|
|
||||||
# automations/inc/changes/<EAT-timestamp>.csv: a first full-state baseline, then files
|
|
||||||
# holding only the rows that CHANGED (with periodic full-state re-emissions). We drain
|
|
||||||
# EVERY not-yet-processed file oldest→newest, upsert on ticket_id, advance the watermark
|
|
||||||
# (tickets.import_meta.metadata->>'source_max_key') per file, and archive it to processed/.
|
|
||||||
# S3 access is via boto3 (no aws-CLI dependency → runs cleanly in a slim container).
|
# S3 access is via boto3 (no aws-CLI dependency → runs cleanly in a slim container).
|
||||||
def _s3_client():
|
def _s3_client():
|
||||||
"""boto3 S3 client for the S3 endpoint (force path-style addressing)."""
|
"""boto3 S3 client for the S3 endpoint (force path-style addressing)."""
|
||||||
|
|
@ -123,9 +124,9 @@ def _s3_client():
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def _ts_from_key(key: str) -> datetime | None:
|
def _ts_from_key(ds: Dataset, key: str) -> datetime | None:
|
||||||
"""EAT timestamp embedded in an automations/inc/changes/<ts>.csv key (or None)."""
|
"""EAT timestamp embedded in an automations/<ds>/changes/<ts>.csv key (or None)."""
|
||||||
m = _CHANGE_KEY_RE.match(key)
|
m = ds.key_regex.match(key)
|
||||||
if not m:
|
if not m:
|
||||||
return None
|
return None
|
||||||
try: # regex shape can match an impossible date (e.g. 9999-99-99T…) — don't crash the sort
|
try: # regex shape can match an impossible date (e.g. 9999-99-99T…) — don't crash the sort
|
||||||
|
|
@ -134,12 +135,12 @@ def _ts_from_key(key: str) -> datetime | None:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
def _list_inc_csvs(s3) -> list[tuple[str, str]]:
|
def _list_csvs(s3, ds: Dataset) -> list[tuple[str, str]]:
|
||||||
"""[(key, etag)] for every automations/inc/changes/<ts>.csv (excludes processed/ + dirs)."""
|
"""[(key, etag)] for every changes/<ts>.csv of this dataset (excludes processed/ + dirs)."""
|
||||||
out: list[tuple[str, str]] = []
|
out: list[tuple[str, str]] = []
|
||||||
for page in s3.get_paginator("list_objects_v2").paginate(Bucket=_BUCKET, Prefix=_INC_PREFIX):
|
for page in s3.get_paginator("list_objects_v2").paginate(Bucket=_BUCKET, Prefix=ds.change_prefix):
|
||||||
for it in page.get("Contents", []):
|
for it in page.get("Contents", []):
|
||||||
if _CHANGE_KEY_RE.match(it["Key"]):
|
if ds.key_regex.match(it["Key"]):
|
||||||
out.append((it["Key"], (it.get("ETag") or "").strip('"')))
|
out.append((it["Key"], (it.get("ETag") or "").strip('"')))
|
||||||
return out
|
return out
|
||||||
|
|
||||||
|
|
@ -149,23 +150,22 @@ def _get_text(s3, key: str) -> str:
|
||||||
return s3.get_object(Bucket=_BUCKET, Key=key)["Body"].read().decode("utf-8")
|
return s3.get_object(Bucket=_BUCKET, Key=key)["Body"].read().decode("utf-8")
|
||||||
|
|
||||||
|
|
||||||
def _last_processed_ts() -> datetime | None:
|
def _last_processed_ts(ds: Dataset) -> datetime | None:
|
||||||
"""Watermark: EAT timestamp of the newest change file already ingested.
|
"""Watermark: EAT timestamp of the newest change file already ingested for this dataset.
|
||||||
|
|
||||||
Read from tickets.import_meta (metadata->>'source_max_key', advanced per file as
|
Read from tickets.import_meta (metadata->>'source_max_key', advanced per file as
|
||||||
we drain changes/ oldest→newest). None when nothing has been ingested via the
|
we drain changes/ oldest→newest). None when nothing has been ingested via the
|
||||||
changes stream yet (e.g. the first run after the source switched to incremental,
|
changes stream yet (e.g. a brand-new dataset, or the first run after the source
|
||||||
where the stored key is an old full-snapshot path) — then every file currently in
|
switched buckets) — then every file currently in changes/ is processed.
|
||||||
changes/ is processed.
|
|
||||||
"""
|
"""
|
||||||
with get_conn() as conn:
|
with get_conn() as conn:
|
||||||
with conn.cursor() as cur:
|
with conn.cursor() as cur:
|
||||||
cur.execute(
|
cur.execute(
|
||||||
"SELECT metadata->>'source_max_key' FROM tickets.import_meta WHERE dataset = %s",
|
"SELECT metadata->>'source_max_key' FROM tickets.import_meta WHERE dataset = %s",
|
||||||
(_DATASET,),
|
(ds.name,),
|
||||||
)
|
)
|
||||||
row = cur.fetchone()
|
row = cur.fetchone()
|
||||||
return _ts_from_key(row[0]) if row and row[0] else None
|
return _ts_from_key(ds, row[0]) if row and row[0] else None
|
||||||
|
|
||||||
|
|
||||||
def _parse_csv(text: str) -> list[dict]:
|
def _parse_csv(text: str) -> list[dict]:
|
||||||
|
|
@ -177,10 +177,10 @@ def _load_csv_local(path: str) -> list[dict]:
|
||||||
return list(csv.DictReader(f))
|
return list(csv.DictReader(f))
|
||||||
|
|
||||||
|
|
||||||
def _move_processed(s3, keys: list[str]) -> None:
|
def _move_processed(s3, ds: Dataset, keys: list[str]) -> None:
|
||||||
"""Archive listed INC csv objects to automations/inc/processed/ (copy + delete)."""
|
"""Archive listed csv objects to automations/<ds>/processed/ (copy + delete)."""
|
||||||
for key in keys:
|
for key in keys:
|
||||||
dst = _PROCESSED_PREFIX + key.rsplit("/", 1)[-1]
|
dst = ds.processed_prefix + key.rsplit("/", 1)[-1]
|
||||||
s3.copy_object(Bucket=_BUCKET, CopySource={"Bucket": _BUCKET, "Key": key}, Key=dst)
|
s3.copy_object(Bucket=_BUCKET, CopySource={"Bucket": _BUCKET, "Key": key}, Key=dst)
|
||||||
s3.delete_object(Bucket=_BUCKET, Key=key)
|
s3.delete_object(Bucket=_BUCKET, Key=key)
|
||||||
log.info("archived %s -> %s", key, dst)
|
log.info("archived %s -> %s", key, dst)
|
||||||
|
|
@ -206,8 +206,8 @@ def _prepare(row: dict) -> dict:
|
||||||
|
|
||||||
|
|
||||||
# ── upsert (raw-first) ────────────────────────────────────────────────────────
|
# ── upsert (raw-first) ────────────────────────────────────────────────────────
|
||||||
def _record_meta(cur, meta: dict, records_ingested: int) -> None:
|
def _record_meta(cur, ds: Dataset, meta: dict, records_ingested: int) -> None:
|
||||||
"""Upsert the INC snapshot metadata (powers map freshness + holds source_etag).
|
"""Upsert the snapshot metadata (powers map freshness + holds source_max_key).
|
||||||
|
|
||||||
Runs on the caller's cursor so the row upsert and the meta write commit
|
Runs on the caller's cursor so the row upsert and the meta write commit
|
||||||
together — a half-written state (rows in, meta stale) breaks skip-if-unchanged.
|
together — a half-written state (rows in, meta stale) breaks skip-if-unchanged.
|
||||||
|
|
@ -225,54 +225,58 @@ def _record_meta(cur, meta: dict, records_ingested: int) -> None:
|
||||||
records_ingested = EXCLUDED.records_ingested,
|
records_ingested = EXCLUDED.records_ingested,
|
||||||
n8n_execution_id = EXCLUDED.n8n_execution_id, metadata = EXCLUDED.metadata,
|
n8n_execution_id = EXCLUDED.n8n_execution_id, metadata = EXCLUDED.metadata,
|
||||||
ingested_at = now()""",
|
ingested_at = now()""",
|
||||||
(_DATASET, clean(meta.get("export_type")), clean(meta.get("exported_at")),
|
(ds.name, clean(meta.get("export_type")), clean(meta.get("exported_at")),
|
||||||
clean(meta.get("snapshot_date")), clean(meta.get("source_schema")),
|
clean(meta.get("snapshot_date")), clean(meta.get("source_schema")),
|
||||||
clean(meta.get("source_table")), meta.get("row_count"), records_ingested,
|
clean(meta.get("source_table")), meta.get("row_count"), records_ingested,
|
||||||
clean(meta.get("n8n_execution_id")), psycopg2.extras.Json(meta)),
|
clean(meta.get("n8n_execution_id")), psycopg2.extras.Json(meta)),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def upsert(rows: list[dict], apply: bool, meta: dict | None = None) -> int:
|
def upsert(ds: Dataset, rows: list[dict], apply: bool, meta: dict | None = None) -> int:
|
||||||
meta = meta or {}
|
meta = meta or {}
|
||||||
kept = [r for r in rows if _keep_row(r)]
|
kept = [r for r in rows if _keep_row(r)]
|
||||||
payload = [(clean(r["ticket_id"]), psycopg2.extras.Json(_prepare(r))) for r in kept]
|
payload = [(clean(r["ticket_id"]), psycopg2.extras.Json(_prepare(r))) for r in kept]
|
||||||
log.info("%s: %d rows read, %d kept, %d dropped (alarm/sentinel/no-id)",
|
log.info("%s: %d rows read, %d kept, %d dropped (alarm/sentinel/no-id)",
|
||||||
_TABLE, len(rows), len(payload), len(rows) - len(payload))
|
ds.table, len(rows), len(payload), len(rows) - len(payload))
|
||||||
if not apply:
|
if not apply:
|
||||||
log.info("DRY-RUN — nothing written to %s. Use --apply.", _TABLE)
|
log.info("DRY-RUN — nothing written to %s. Use --apply.", ds.table)
|
||||||
return len(payload)
|
return len(payload)
|
||||||
with get_conn() as conn:
|
with get_conn() as conn:
|
||||||
with conn.cursor() as cur:
|
with conn.cursor() as cur:
|
||||||
psycopg2.extras.execute_values(
|
psycopg2.extras.execute_values(
|
||||||
cur,
|
cur,
|
||||||
f"INSERT INTO {_TABLE} (ticket_id, raw) VALUES %s "
|
f"INSERT INTO {ds.table} (ticket_id, raw) VALUES %s "
|
||||||
"ON CONFLICT (ticket_id) DO UPDATE SET raw = EXCLUDED.raw, ingested_at = now()",
|
"ON CONFLICT (ticket_id) DO UPDATE SET raw = EXCLUDED.raw, ingested_at = now()",
|
||||||
payload, page_size=500,
|
payload, page_size=500,
|
||||||
)
|
)
|
||||||
# same transaction as the upsert: rows + snapshot meta commit atomically
|
# same transaction as the upsert: rows + snapshot meta commit atomically
|
||||||
_record_meta(cur, meta, len(payload))
|
_record_meta(cur, ds, meta, len(payload))
|
||||||
log.info("upserted %d rows into %s", len(payload), _TABLE)
|
log.info("upserted %d rows into %s", len(payload), ds.table)
|
||||||
return len(payload)
|
return len(payload)
|
||||||
|
|
||||||
|
|
||||||
def _capture_history() -> None:
|
def capture_history() -> None:
|
||||||
"""Append new closures + upsert today's backlog snapshot (tickets.capture_history)."""
|
"""Append new closures + upsert today's backlog snapshot (tickets.capture_history).
|
||||||
|
|
||||||
|
INC-only today (CRQ install-lifecycle history is a future migration); wired as
|
||||||
|
the INC Dataset's post_apply hook.
|
||||||
|
"""
|
||||||
with get_conn() as conn:
|
with get_conn() as conn:
|
||||||
with conn.cursor() as cur:
|
with conn.cursor() as cur:
|
||||||
cur.execute("SELECT tickets.capture_history()")
|
cur.execute("SELECT tickets.capture_history()")
|
||||||
log.info("history: %s", cur.fetchone()[0])
|
log.info("history: %s", cur.fetchone()[0])
|
||||||
|
|
||||||
|
|
||||||
def ingest(args) -> None:
|
def ingest(ds: Dataset, args) -> None:
|
||||||
# Local-file path (dev): ingest a single CSV, no bucket / no archive.
|
# Local-file path (dev): ingest a single CSV, no bucket / no archive / no history.
|
||||||
if args.inc_csv:
|
if args.local_csv:
|
||||||
rows = _load_csv_local(args.inc_csv)
|
rows = _load_csv_local(args.local_csv)
|
||||||
name = os.path.basename(args.inc_csv)
|
name = os.path.basename(args.local_csv)
|
||||||
ts = _ts_from_key(_INC_PREFIX + name)
|
ts = _ts_from_key(ds, ds.change_prefix + name)
|
||||||
meta = {"export_type": "full", "source_s3_key": name, "row_count": len(rows)}
|
meta = {"export_type": "full", "source_s3_key": name, "row_count": len(rows)}
|
||||||
if ts:
|
if ts:
|
||||||
meta["exported_at"] = ts.isoformat()
|
meta["exported_at"] = ts.isoformat()
|
||||||
upsert(rows, args.apply, meta=meta)
|
upsert(ds, rows, args.apply, meta=meta)
|
||||||
return
|
return
|
||||||
|
|
||||||
# --from-bucket: ingest EVERY not-yet-processed change file, oldest→newest
|
# --from-bucket: ingest EVERY not-yet-processed change file, oldest→newest
|
||||||
|
|
@ -280,11 +284,11 @@ def ingest(args) -> None:
|
||||||
# the file is archived PER file, so a mid-run failure leaves a consistent state
|
# the file is archived PER file, so a mid-run failure leaves a consistent state
|
||||||
# (folder state matches the watermark) and the next run resumes cleanly.
|
# (folder state matches the watermark) and the next run resumes cleanly.
|
||||||
s3 = _s3_client()
|
s3 = _s3_client()
|
||||||
listing = _list_inc_csvs(s3)
|
listing = _list_csvs(s3, ds)
|
||||||
if not listing:
|
if not listing:
|
||||||
log.info("no INC change files under %s — nothing to do", _INC_PREFIX)
|
log.info("no %s change files under %s — nothing to do", ds.name, ds.change_prefix)
|
||||||
return
|
return
|
||||||
listing.sort(key=lambda ke: _ts_from_key(ke[0]) or datetime.min.replace(tzinfo=_EAT))
|
listing.sort(key=lambda ke: _ts_from_key(ds, ke[0]) or datetime.min.replace(tzinfo=_EAT))
|
||||||
|
|
||||||
# watermark: skip anything at/older than the newest file already applied. Archiving
|
# watermark: skip anything at/older than the newest file already applied. Archiving
|
||||||
# normally empties changes/, but this guards a failed archive from re-applying.
|
# normally empties changes/, but this guards a failed archive from re-applying.
|
||||||
|
|
@ -292,24 +296,25 @@ def ingest(args) -> None:
|
||||||
# for a one-time bucket cutover, where the stored key points at the old bucket's stream
|
# for a one-time bucket cutover, where the stored key points at the old bucket's stream
|
||||||
# and its timestamp may be newer than the new bucket's first file. Crash-safe: each file
|
# and its timestamp may be newer than the new bucket's first file. Crash-safe: each file
|
||||||
# still advances source_max_key + archives per file, so a plain rerun resumes cleanly.
|
# still advances source_max_key + archives per file, so a plain rerun resumes cleanly.
|
||||||
last_ts = None if args.reseed else _last_processed_ts()
|
last_ts = None if args.reseed else _last_processed_ts(ds)
|
||||||
_floor = datetime.min.replace(tzinfo=_EAT)
|
_floor = datetime.min.replace(tzinfo=_EAT)
|
||||||
pending = [(k, e) for k, e in listing
|
pending = [(k, e) for k, e in listing
|
||||||
if last_ts is None or (_ts_from_key(k) or _floor) > last_ts]
|
if last_ts is None or (_ts_from_key(ds, k) or _floor) > last_ts]
|
||||||
if not pending:
|
if not pending:
|
||||||
log.info("all %d change file(s) already processed (watermark %s) — nothing new",
|
log.info("all %d %s change file(s) already processed (watermark %s) — nothing new",
|
||||||
len(listing), last_ts and last_ts.isoformat())
|
len(listing), ds.name, last_ts and last_ts.isoformat())
|
||||||
if args.apply:
|
if args.apply:
|
||||||
_move_processed(s3, [k for k, _ in listing]) # archive any stragglers
|
_move_processed(s3, ds, [k for k, _ in listing]) # archive any stragglers
|
||||||
_capture_history()
|
if ds.post_apply:
|
||||||
|
ds.post_apply()
|
||||||
return
|
return
|
||||||
log.info("%d of %d change file(s) to ingest (watermark %s); newest=%s",
|
log.info("%d of %d %s change file(s) to ingest (watermark %s); newest=%s",
|
||||||
len(pending), len(listing), last_ts and last_ts.isoformat(), pending[-1][0])
|
len(pending), len(listing), ds.name, last_ts and last_ts.isoformat(), pending[-1][0])
|
||||||
|
|
||||||
total = 0
|
total = 0
|
||||||
for i, (key, etag) in enumerate(pending):
|
for i, (key, etag) in enumerate(pending):
|
||||||
rows = _parse_csv(_get_text(s3, key))
|
rows = _parse_csv(_get_text(s3, key))
|
||||||
ts = _ts_from_key(key)
|
ts = _ts_from_key(ds, key)
|
||||||
# the first file applied onto an empty watermark is the full baseline; every
|
# the first file applied onto an empty watermark is the full baseline; every
|
||||||
# file after is a delta. export_type is informational (recorded in import_meta).
|
# file after is a delta. export_type is informational (recorded in import_meta).
|
||||||
meta = {"export_type": "baseline" if (last_ts is None and i == 0) else "delta",
|
meta = {"export_type": "baseline" if (last_ts is None and i == 0) else "delta",
|
||||||
|
|
@ -319,14 +324,14 @@ def ingest(args) -> None:
|
||||||
meta["exported_at"] = ts.isoformat()
|
meta["exported_at"] = ts.isoformat()
|
||||||
# rows + watermark (source_max_key) commit in one txn, advancing per file; only
|
# rows + watermark (source_max_key) commit in one txn, advancing per file; only
|
||||||
# then archive, so the changes/ folder state always matches the watermark.
|
# then archive, so the changes/ folder state always matches the watermark.
|
||||||
total += upsert(rows, args.apply, meta=meta)
|
total += upsert(ds, rows, args.apply, meta=meta)
|
||||||
if args.apply:
|
if args.apply:
|
||||||
_move_processed(s3, [key])
|
_move_processed(s3, ds, [key])
|
||||||
else:
|
else:
|
||||||
log.info("DRY-RUN — would archive %s to %s", key, _PROCESSED_PREFIX)
|
log.info("DRY-RUN — would archive %s to %s", key, ds.processed_prefix)
|
||||||
log.info("ingested %d change file(s); %d rows kept in total", len(pending), total)
|
log.info("ingested %d %s change file(s); %d rows kept in total", len(pending), ds.name, total)
|
||||||
if args.apply:
|
if args.apply and ds.post_apply:
|
||||||
_capture_history()
|
ds.post_apply()
|
||||||
|
|
||||||
|
|
||||||
# ── place extraction (strip network codes, keep the real place) ───────────────
|
# ── place extraction (strip network codes, keep the real place) ───────────────
|
||||||
|
|
@ -475,7 +480,7 @@ def geocode(query: str, viewbox: tuple | None = None) -> tuple[float, float, flo
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
# ── cluster gazetteer (coarse fallback) ───────────────────────────────────────
|
# ── cluster gazetteer (coarse fallback; CROSS-DATASET: inc + crq) ─────────────
|
||||||
def geocode_clusters(apply: bool) -> None:
|
def geocode_clusters(apply: bool) -> None:
|
||||||
with get_conn() as conn:
|
with get_conn() as conn:
|
||||||
with conn.cursor() as cur:
|
with conn.cursor() as cur:
|
||||||
|
|
@ -522,7 +527,7 @@ def geocode_clusters(apply: bool) -> None:
|
||||||
log.info("gazetteer: %d clusters written (unverified — review tickets.geo_clusters)", written)
|
log.info("gazetteer: %d clusters written (unverified — review tickets.geo_clusters)", written)
|
||||||
|
|
||||||
|
|
||||||
# ── per-location geocoding (precise; actionable INC) ──────────────────────────
|
# ── per-location geocoding (precise; actionable inc + crq) ────────────────────
|
||||||
# A location geocode is only trusted if it lands within this radius of the
|
# A location geocode is only trusted if it lands within this radius of the
|
||||||
# cluster centroid; otherwise the geocoder matched the landmark in the wrong
|
# cluster centroid; otherwise the geocoder matched the landmark in the wrong
|
||||||
# place and we fall back to the cluster centroid.
|
# place and we fall back to the cluster centroid.
|
||||||
|
|
@ -537,17 +542,25 @@ def geocode_locations(apply: bool) -> None:
|
||||||
"""
|
"""
|
||||||
SELECT t.key, t.location_name, t.cluster, t.region, gc.lat AS clat, gc.lng AS clng
|
SELECT t.key, t.location_name, t.cluster, t.region, gc.lat AS clat, gc.lng AS clng
|
||||||
FROM (
|
FROM (
|
||||||
SELECT tickets.norm_cluster(raw->>'location_name') AS key,
|
SELECT tickets.norm_cluster(src.raw->>'location_name') AS key,
|
||||||
(array_agg(raw->>'location_name'))[1] AS location_name,
|
(array_agg(src.raw->>'location_name'))[1] AS location_name,
|
||||||
(array_agg(raw->>'cluster'))[1] AS cluster,
|
(array_agg(src.raw->>'cluster'))[1] AS cluster,
|
||||||
(array_agg(raw->>'region'))[1] AS region,
|
(array_agg(src.raw->>'region'))[1] AS region,
|
||||||
tickets.norm_cluster((array_agg(raw->>'cluster'))[1]) AS ckey
|
tickets.norm_cluster((array_agg(src.raw->>'cluster'))[1]) AS ckey
|
||||||
FROM tickets.inc
|
FROM (
|
||||||
WHERE (raw->>'is_actionable')::boolean
|
-- CROSS-DATASET: actionable INC + CRQ share one location gazetteer
|
||||||
AND raw->>'location_name' IS NOT NULL
|
SELECT raw FROM tickets.inc
|
||||||
AND tickets.norm_cluster(raw->>'location_name') IS NOT NULL
|
WHERE (raw->>'is_actionable')::boolean
|
||||||
AND NOT EXISTS (SELECT 1 FROM tickets.geo_locations gl
|
AND raw->>'location_name' IS NOT NULL
|
||||||
WHERE gl.query_key = tickets.norm_cluster(raw->>'location_name')
|
AND tickets.norm_cluster(raw->>'location_name') IS NOT NULL
|
||||||
|
UNION ALL
|
||||||
|
SELECT raw FROM tickets.crq
|
||||||
|
WHERE (raw->>'is_actionable')::boolean
|
||||||
|
AND raw->>'location_name' IS NOT NULL
|
||||||
|
AND tickets.norm_cluster(raw->>'location_name') IS NOT NULL
|
||||||
|
) src
|
||||||
|
WHERE NOT EXISTS (SELECT 1 FROM tickets.geo_locations gl
|
||||||
|
WHERE gl.query_key = tickets.norm_cluster(src.raw->>'location_name')
|
||||||
AND gl.geom IS NOT NULL)
|
AND gl.geom IS NOT NULL)
|
||||||
GROUP BY 1
|
GROUP BY 1
|
||||||
) t
|
) t
|
||||||
|
|
@ -555,7 +568,7 @@ def geocode_locations(apply: bool) -> None:
|
||||||
"""
|
"""
|
||||||
)
|
)
|
||||||
todo = cur.fetchall()
|
todo = cur.fetchall()
|
||||||
log.info("%d actionable-INC locations to geocode (provider=%s)", len(todo), _PROVIDER)
|
log.info("%d actionable inc+crq locations to geocode (provider=%s)", len(todo), _PROVIDER)
|
||||||
if not apply:
|
if not apply:
|
||||||
for key, loc, cluster, region, clat, clng in todo[:50]:
|
for key, loc, cluster, region, clat, clng in todo[:50]:
|
||||||
log.info(" %s -> %s", key, " | ".join(compose_queries(loc, cluster, region)))
|
log.info(" %s -> %s", key, " | ".join(compose_queries(loc, cluster, region)))
|
||||||
|
|
@ -609,41 +622,3 @@ def _resolve() -> int:
|
||||||
with conn.cursor() as cur:
|
with conn.cursor() as cur:
|
||||||
cur.execute("SELECT tickets.resolve_ticket_geoms()")
|
cur.execute("SELECT tickets.resolve_ticket_geoms()")
|
||||||
return cur.fetchone()[0]
|
return cur.fetchone()[0]
|
||||||
|
|
||||||
|
|
||||||
# ── entrypoint ────────────────────────────────────────────────────────────────
|
|
||||||
def main() -> None:
|
|
||||||
ap = argparse.ArgumentParser(description="Ingest INC tickets from CSV (raw-first) + geocode")
|
|
||||||
ap.add_argument("--apply", action="store_true", help="Write to DB (default: dry-run)")
|
|
||||||
ap.add_argument("--from-bucket", action="store_true",
|
|
||||||
help="Drain the incremental INC change stream (automations/inc/changes/) "
|
|
||||||
"from the isptickets S3 bucket: every not-yet-processed file "
|
|
||||||
"oldest→newest, upsert on ticket_id, advance the watermark, archive")
|
|
||||||
ap.add_argument("--reseed", action="store_true",
|
|
||||||
help="Ignore the stored watermark and drain every file in changes/ once "
|
|
||||||
"(one-time bucket cutover / reseed). Use with --from-bucket --apply")
|
|
||||||
ap.add_argument("--inc-csv", default=None, help="Local INC tickets CSV file (dev)")
|
|
||||||
ap.add_argument("--geocode-clusters", action="store_true",
|
|
||||||
help="Geocode distinct clusters into the gazetteer, then re-resolve geoms")
|
|
||||||
ap.add_argument("--geocode-locations", action="store_true",
|
|
||||||
help="Geocode actionable-INC location_names precisely (keyed provider), then re-resolve")
|
|
||||||
ap.add_argument("--capture-history", action="store_true",
|
|
||||||
help="Run tickets.capture_history() standalone (closure_events + daily snapshot)")
|
|
||||||
args = ap.parse_args()
|
|
||||||
|
|
||||||
if args.geocode_clusters:
|
|
||||||
geocode_clusters(apply=args.apply)
|
|
||||||
return
|
|
||||||
if args.geocode_locations:
|
|
||||||
geocode_locations(apply=args.apply)
|
|
||||||
return
|
|
||||||
if args.capture_history:
|
|
||||||
_capture_history()
|
|
||||||
return
|
|
||||||
if not (args.from_bucket or args.inc_csv):
|
|
||||||
ap.error("provide --from-bucket, --inc-csv, --geocode-clusters, --geocode-locations, or --capture-history")
|
|
||||||
ingest(args)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
|
|
@ -18,10 +18,12 @@ dev = [
|
||||||
"ruff>=0.4",
|
"ruff>=0.4",
|
||||||
]
|
]
|
||||||
|
|
||||||
# Flat-module project (no package dir) — list the top-level modules explicitly so
|
# Shared engine (pipeline) + helpers as top-level modules, plus the thin per-type
|
||||||
# `pip install .` works (the Docker image installs the project to pull its deps).
|
# entrypoint packages (inc/, crq/). Listed explicitly so `pip install .` works (the
|
||||||
|
# Docker image installs the project to pull its deps; runtime runs from /app via -m).
|
||||||
[tool.setuptools]
|
[tool.setuptools]
|
||||||
py-modules = ["import_tickets", "shared", "run_migrations"]
|
py-modules = ["pipeline", "shared", "run_migrations"]
|
||||||
|
packages = ["inc", "crq"]
|
||||||
|
|
||||||
[tool.uv]
|
[tool.uv]
|
||||||
managed = true
|
managed = true
|
||||||
|
|
|
||||||
|
|
@ -1,13 +1,17 @@
|
||||||
#!/usr/bin/env bash
|
#!/usr/bin/env bash
|
||||||
# run_ingest.sh — fleettickets · hourly INC ingest wrapper for cron.
|
# run_ingest.sh — fleettickets · INC + CRQ ingest wrapper for cron (plain host/VM).
|
||||||
#
|
#
|
||||||
# Loads env from the local .env (DATABASE_URL + RUSTFS_* + GEOCODER_*) and runs the
|
# Loads env from the local .env (DATABASE_URL + RUSTFS_* + GEOCODER_*) and drains
|
||||||
# newest-INC-CSV ingest with --apply (skip-if-unchanged + archive are built in).
|
# both ticket change streams with --apply (watermark skip-if-unchanged + per-file
|
||||||
|
# archive are built in, so a run with no new files is a cheap no-op).
|
||||||
#
|
#
|
||||||
# Install on the instance (ingest every 20 min, 06:00–20:40 EAT):
|
# Install on the instance (every 20 min, 06:00–20:40 EAT):
|
||||||
# */20 6-20 * * * /opt/fleettickets/run_ingest.sh >> /var/log/fleettickets-inc.log 2>&1
|
# */20 6-20 * * * /opt/fleettickets/run_ingest.sh >> /var/log/fleettickets.log 2>&1
|
||||||
# Ensure the crontab runs in the Africa/Nairobi timezone (CRON_TZ=Africa/Nairobi or
|
# Ensure the crontab runs in the Africa/Nairobi timezone (CRON_TZ=Africa/Nairobi or
|
||||||
# the host/container TZ), since the export filenames and the schedule are EAT.
|
# the host/container TZ), since the export filenames and the schedule are EAT.
|
||||||
|
#
|
||||||
|
# On Coolify the two ingests run as separate Scheduled Tasks instead (see Dockerfile
|
||||||
|
# + docs/deployment-and-operations.md); this wrapper is the plain-host fallback.
|
||||||
set -euo pipefail
|
set -euo pipefail
|
||||||
|
|
||||||
cd "$(dirname "$0")"
|
cd "$(dirname "$0")"
|
||||||
|
|
@ -24,4 +28,7 @@ fi
|
||||||
PY="python"
|
PY="python"
|
||||||
[ -x ".venv/bin/python" ] && PY=".venv/bin/python"
|
[ -x ".venv/bin/python" ] && PY=".venv/bin/python"
|
||||||
|
|
||||||
exec "$PY" import_tickets.py --from-bucket --apply
|
# Run from the repo root (cwd above) so `-m inc.import_inc` / `-m crq.import_crq`
|
||||||
|
# resolve the packages alongside pipeline.py + shared.py.
|
||||||
|
"$PY" -m inc.import_inc --from-bucket --apply
|
||||||
|
"$PY" -m crq.import_crq --from-bucket --apply
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue