Compare commits
8 commits
fix/bug-re
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c980f3edd0 | ||
|
|
066d866b90 | ||
|
|
5f5d71d500 | ||
|
|
0787d3a185 | ||
|
|
f06c11fd11 | ||
|
|
7d3bba8d78 | ||
|
|
509338c076 | ||
|
|
a4b90a33d8 |
19 changed files with 1320 additions and 375 deletions
|
|
@ -3,12 +3,12 @@
|
||||||
# Shared database (the `tickets` schema lives in tracksolid_db; internal Docker host)
|
# Shared database (the `tickets` schema lives in tracksolid_db; internal Docker host)
|
||||||
DATABASE_URL=postgresql://tracksolid_owner:<password>@timescale_db:5432/tracksolid_db
|
DATABASE_URL=postgresql://tracksolid_owner:<password>@timescale_db:5432/tracksolid_db
|
||||||
|
|
||||||
# rustfs / S3 — source ticket snapshots (automations/inc/<EAT-timestamp>.csv)
|
# S3 — source ticket CDC streams (isptickets bucket, automations/{inc,crq}/changes/<EAT-ts>.csv)
|
||||||
RUSTFS_ENDPOINT=https://s3.rahamafresh.com
|
RUSTFS_ENDPOINT=https://s3.rahamafresh.com
|
||||||
RUSTFS_ACCESS_KEY=<key>
|
RUSTFS_ACCESS_KEY=isptickets
|
||||||
RUSTFS_SECRET_KEY=<secret>
|
RUSTFS_SECRET_KEY=<secret>
|
||||||
RUSTFS_REGION=us-east-1
|
RUSTFS_REGION=us-east-1
|
||||||
TICKETS_BUCKET=tickets
|
TICKETS_BUCKET=isptickets
|
||||||
|
|
||||||
# Geocoder (keyed — public Nominatim rate-limits bulk)
|
# Geocoder (keyed — public Nominatim rate-limits bulk)
|
||||||
GEOCODER_PROVIDER=locationiq # locationiq | opencage
|
GEOCODER_PROVIDER=locationiq # locationiq | opencage
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,9 @@
|
||||||
# fleettickets — INC ingestion image (Coolify-deployable).
|
# fleettickets — INC + CRQ ticket ingestion image (Coolify-deployable).
|
||||||
# A small batch/cron worker: it has no web server. Coolify keeps the container
|
# A small batch/cron worker: it has no web server. Coolify keeps the container
|
||||||
# running (CMD below) and fires the ingest via a Scheduled Task:
|
# running (CMD below) and fires the ingests via two Scheduled Tasks:
|
||||||
# python import_tickets.py --from-bucket --apply (cron: 15 7-19 * * *)
|
# python -m inc.import_inc --from-bucket --apply (cron: */20 6-20 * * *)
|
||||||
|
# python -m crq.import_crq --from-bucket --apply (cron: */20 6-20 * * *)
|
||||||
|
# (run from /app so the inc/ and crq/ packages + pipeline.py/shared.py import.)
|
||||||
# Env (set in Coolify): DATABASE_URL, RUSTFS_*, GEOCODER_*. S3 is via boto3 — no
|
# Env (set in Coolify): DATABASE_URL, RUSTFS_*, GEOCODER_*. S3 is via boto3 — no
|
||||||
# aws CLI needed. psycopg2-binary ships its own libpq, so no build toolchain.
|
# aws CLI needed. psycopg2-binary ships its own libpq, so no build toolchain.
|
||||||
FROM python:3.12-slim
|
FROM python:3.12-slim
|
||||||
|
|
|
||||||
119
README.md
119
README.md
|
|
@ -1,11 +1,22 @@
|
||||||
# fleettickets
|
# fleettickets
|
||||||
|
|
||||||
Field-ops **INC ticket** ingestion, geocoding, and read-schema that powers the
|
Field-ops **ticket** ingestion, geocoding, and read-schema that powers the
|
||||||
**Tickets** map in FleetOps. Extracted from the `tracksolid` repo into its own module
|
**Tickets** map in FleetOps. Extracted from the `tracksolid` repo into its own module
|
||||||
(it previously lived there as migrations 21–23 + `tools/import_tickets.py`).
|
(it previously lived there as migrations 21–23 + `tools/import_tickets.py`).
|
||||||
|
|
||||||
- **INC** — incident / customer-fault tickets *(this pipeline is **strictly INC**)*
|
Two ticket types, identical 32-column source schema and CDC change stream, served
|
||||||
- **CRQ** — new-installation requests *(schema kept, but **out of scope** — not ingested here)*
|
through a **shared engine** (`pipeline.py`) with a thin per-type entrypoint each:
|
||||||
|
|
||||||
|
- **INC** — incident / customer-fault tickets → `tickets.inc` (`inc/import_inc.py`).
|
||||||
|
Full feature set: typed columns, geocoding, SLA view, dashboard fn, history capture.
|
||||||
|
- **CRQ** — new-installation requests → `tickets.crq` (`crq/import_crq.py`). **Data
|
||||||
|
layer + map** (typed columns, geocoding, appears on the Tickets map via
|
||||||
|
`fn_tickets_for_map`). SLA view / dashboard fn / history capture are deferred —
|
||||||
|
installation-lifecycle semantics differ from incidents (see roadmap). CRQ gets its
|
||||||
|
**own FleetOps tab**, same look & feel as INC.
|
||||||
|
|
||||||
|
Geocoding is **cross-dataset** (one gazetteer, one geocoder budget, covers inc + crq)
|
||||||
|
and is driven from the INC entrypoint.
|
||||||
|
|
||||||
## What this owns
|
## What this owns
|
||||||
|
|
||||||
|
|
@ -21,7 +32,10 @@ Field-ops **INC ticket** ingestion, geocoding, and read-schema that powers the
|
||||||
| `migrations/08_inc_open_sla_view.sql` | `tickets.inc_open_sla` view — open (`is_actionable`) tickets with **derived SLA** (`hours_open`, `sla_state` vs 48h; clock = `created_at_service` ∥ `first_seen_at`), plus team/cluster/`geog` for dispatch |
|
| `migrations/08_inc_open_sla_view.sql` | `tickets.inc_open_sla` view — open (`is_actionable`) tickets with **derived SLA** (`hours_open`, `sla_state` vs 48h; clock = `created_at_service` ∥ `first_seen_at`), plus team/cluster/`geog` for dispatch |
|
||||||
| `migrations/09_inc_dashboard_fn.sql` | `reporting.fn_inc_dashboard(cluster, status, window, from, to)` — one JSON payload (`window` / `open` GeoJSON / `closed` GeoJSON / `metrics` / `freshness`) powering the FleetOps live INC map. Open=live, closed=windowed (EAT calendar / custom); filters AND |
|
| `migrations/09_inc_dashboard_fn.sql` | `reporting.fn_inc_dashboard(cluster, status, window, from, to)` — one JSON payload (`window` / `open` GeoJSON / `closed` GeoJSON / `metrics` / `freshness`) powering the FleetOps live INC map. Open=live, closed=windowed (EAT calendar / custom); filters AND |
|
||||||
| `migrations/10_inc_history_capture.sql` | History for time-series: `tickets.closure_events` (append-only observed closures) + `tickets.inc_daily_snapshot` (per-EAT-day open backlog + flow), populated by `tickets.capture_history()` each ingest. Unlocks **backlog-over-time** |
|
| `migrations/10_inc_history_capture.sql` | History for time-series: `tickets.closure_events` (append-only observed closures) + `tickets.inc_daily_snapshot` (per-EAT-day open backlog + flow), populated by `tickets.capture_history()` each ingest. Unlocks **backlog-over-time** |
|
||||||
| `import_tickets.py` | Ingests the **newest INC CSV** from the rustfs `tickets` bucket (`automations/inc/<EAT-timestamp>.csv`) and upserts on `ticket_id`; geocodes clusters + INC locations |
|
| `migrations/15_crq_table.sql` | **Materializes `tickets.crq`** (table + geom trigger + indexes — `01`'s crq section never ran on the live DB) and unpacks `raw` into the same **typed STORED generated columns** as INC's `03` (reuses `tickets.eat_ts()`). Brings CRQ to data-layer parity |
|
||||||
|
| `pipeline.py` | **Shared engine** — the dataset-agnostic CDC loader (drains `automations/<type>/changes/<EAT-ts>.csv` from the `isptickets` bucket, upserts on `ticket_id` oldest→newest, watermark + per-file archive) and the **cross-dataset** geocoder (clusters + actionable inc/crq locations) |
|
||||||
|
| `inc/import_inc.py` | INC entrypoint (`python -m inc.import_inc`) — INC `Dataset` config + CLI; runs `tickets.capture_history()` after each `--apply`; hosts the shared geocode commands |
|
||||||
|
| `crq/import_crq.py` | CRQ entrypoint (`python -m crq.import_crq`) — CRQ `Dataset` config + CLI (ingest only; no history hook yet) |
|
||||||
| `run_migrations.py` | Applies `migrations/*.sql` in order (ledger: `tickets.schema_migrations`) |
|
| `run_migrations.py` | Applies `migrations/*.sql` in order (ledger: `tickets.schema_migrations`) |
|
||||||
| `shared.py` | Minimal DB/logging helpers (self-contained — no tracksolid dependency) |
|
| `shared.py` | Minimal DB/logging helpers (self-contained — no tracksolid dependency) |
|
||||||
|
|
||||||
|
|
@ -86,50 +100,81 @@ python run_migrations.py # apply the schema (idempotent)
|
||||||
|
|
||||||
## Run
|
## Run
|
||||||
|
|
||||||
```bash
|
Run from the repo root so the `inc`/`crq` packages + `pipeline.py`/`shared.py` import.
|
||||||
# ingest the newest INC CSV from the bucket (skip-if-unchanged, then archive)
|
|
||||||
python import_tickets.py --from-bucket --apply
|
|
||||||
|
|
||||||
# geocode (needs GEOCODER_API_KEY)
|
```bash
|
||||||
python import_tickets.py --geocode-clusters --apply # coarse, once
|
# drain the incremental change streams (every new file oldest→newest, then archive)
|
||||||
python import_tickets.py --geocode-locations --apply # precise, actionable INC
|
python -m inc.import_inc --from-bucket --apply
|
||||||
|
python -m crq.import_crq --from-bucket --apply
|
||||||
|
|
||||||
|
# geocode — CROSS-DATASET (covers inc + crq); driven from the INC entrypoint, needs GEOCODER_API_KEY
|
||||||
|
python -m inc.import_inc --geocode-clusters --apply # coarse, once
|
||||||
|
python -m inc.import_inc --geocode-locations --apply # precise, actionable inc+crq
|
||||||
|
|
||||||
# from a local CSV instead of the bucket (dev)
|
# from a local CSV instead of the bucket (dev)
|
||||||
python import_tickets.py --inc-csv 2026-06-15T17-00-00.csv --apply
|
python -m inc.import_inc --inc-csv 2026-06-15T17-00-00.csv --apply
|
||||||
|
python -m crq.import_crq --crq-csv 2026-06-24T12-55-44.csv --apply
|
||||||
```
|
```
|
||||||
|
|
||||||
Dry-run is the default (omit `--apply`). `import_tickets.py --from-bucket` talks to S3
|
Dry-run is the default (omit `--apply`). `--from-bucket` talks to S3 via **boto3** using
|
||||||
via **boto3** using the `RUSTFS_*` env (path-style addressing; no aws-CLI dependency).
|
the `RUSTFS_*` env (path-style addressing; no aws-CLI dependency).
|
||||||
|
|
||||||
## Deploy (Coolify)
|
## Deploy (Coolify)
|
||||||
|
|
||||||
The repo ships a [`Dockerfile`](Dockerfile) — a small batch worker with no web server.
|
The repo ships a [`Dockerfile`](Dockerfile) — a small batch worker with no web server.
|
||||||
Coolify builds it and keeps the container alive (`CMD tail -f /dev/null`); the ingest
|
Coolify builds it and keeps the container alive (`CMD tail -f /dev/null`); each ingest
|
||||||
runs as a **Scheduled Task**, not a system crontab:
|
runs as its own **Scheduled Task**, not a system crontab:
|
||||||
|
|
||||||
- **Command:** `python import_tickets.py --from-bucket --apply`
|
- **`inc_tickets`:** `python -m inc.import_inc --from-bucket --apply`
|
||||||
- **Frequency:** `15 7-19 * * *` (`:15` past each hour, **07:15–19:15 EAT**). This
|
- **`crq_tickets`:** `python -m crq.import_crq --from-bucket --apply`
|
||||||
|
- **Frequency:** both `*/20 6-20 * * *` (every 20 min, **06:00–20:40 EAT**). This
|
||||||
Coolify instance runs scheduled tasks in **EAT (Africa/Nairobi)**, so no UTC
|
Coolify instance runs scheduled tasks in **EAT (Africa/Nairobi)**, so no UTC
|
||||||
conversion is needed.
|
conversion is needed.
|
||||||
- **Env vars** (Coolify → Environment Variables): `DATABASE_URL` (internal DB host),
|
- **Env vars** (Coolify → Environment Variables): `DATABASE_URL` (internal DB host),
|
||||||
`RUSTFS_*`, `GEOCODER_*`.
|
`RUSTFS_*` (the `isptickets` bucket credentials), `GEOCODER_*`. The same bucket holds
|
||||||
|
both `automations/inc/` and `automations/crq/`, so one credential set serves both tasks.
|
||||||
|
|
||||||
Skip-if-unchanged makes a run on an already-ingested snapshot a cheap no-op.
|
The watermark makes a run with no new change files a cheap no-op.
|
||||||
|
|
||||||
For a plain host/VM instead of Coolify, [`run_ingest.sh`](run_ingest.sh) loads `.env`
|
For a plain host/VM instead of Coolify, [`run_ingest.sh`](run_ingest.sh) loads `.env`
|
||||||
and runs the ingest; schedule it with a crontab line
|
and runs **both** ingests; schedule it with a crontab line
|
||||||
(`CRON_TZ=Africa/Nairobi` / `15 7-19 * * *`).
|
(`CRON_TZ=Africa/Nairobi` / `*/20 6-20 * * *`).
|
||||||
|
|
||||||
|
Full operational runbook — container, env management (encrypted; via the UI or
|
||||||
|
`artisan tinker`), the **Forgejo → Coolify auto-deploy webhook**, manual deploys, and the
|
||||||
|
source-bucket cutover procedure — is in
|
||||||
|
[`docs/deployment-and-operations.md`](docs/deployment-and-operations.md).
|
||||||
|
|
||||||
|
### Bucket cutover (one-time reseed)
|
||||||
|
|
||||||
|
When the source provider moves the feed to a new bucket (e.g. `tickets` → `isptickets`),
|
||||||
|
the stored watermark holds a key from the *old* bucket's stream, whose timestamp may be
|
||||||
|
newer than the new bucket's first file — which would otherwise be skipped. Point the
|
||||||
|
`RUSTFS_*` creds + `TICKETS_BUCKET` at the new bucket, then drain it once with `--reseed`,
|
||||||
|
which ignores the stored watermark and ingests **every** file in `changes/` oldest→newest:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
python -m inc.import_inc --from-bucket --reseed # dry-run first (or -m crq.import_crq)
|
||||||
|
python -m inc.import_inc --from-bucket --reseed --apply # commit + archive
|
||||||
|
```
|
||||||
|
|
||||||
|
Upserts are idempotent (`ticket_id` PK, rows never deleted) and the new stream's periodic
|
||||||
|
full-state re-emissions re-assert current state, so this is non-destructive and converges
|
||||||
|
even across the cutover gap. After it, the watermark is current — resume normal
|
||||||
|
`--from-bucket --apply` runs (no `--reseed`). The old bucket is left untouched.
|
||||||
|
|
||||||
## Notes
|
## Notes
|
||||||
|
|
||||||
- The n8n export writes a **full current-state CSV per hour** to
|
- The n8n export writes an **incremental CDC change stream** to
|
||||||
`automations/inc/<EAT-timestamp>.csv` — no `latest` pointer, no metadata envelope, no
|
`automations/inc/changes/<EAT-timestamp>.csv`: a full-state baseline followed by files
|
||||||
deltas. The loader lists the prefix, takes the **newest** file, and ingests it.
|
holding only the rows that changed (with periodic full-state re-emissions). No `latest`
|
||||||
- **Skip-if-unchanged:** the newest file's S3 **ETag** is compared to the last processed
|
pointer, no metadata envelope. The loader drains **every not-yet-processed file
|
||||||
file's ETag (stored in `tickets.import_meta.metadata.source_etag`); if equal, the DB write
|
oldest→newest** — taking only the newest would drop intermediate deltas.
|
||||||
is skipped (the export re-emits byte-identical content most hours).
|
- **Watermark:** the newest file already applied is recorded in
|
||||||
|
`tickets.import_meta.metadata.source_max_key`; runs skip anything at/older than it, so
|
||||||
|
reruns are cheap no-ops. `--reseed` ignores it for a one-time bucket cutover.
|
||||||
- **Upsert on `ticket_id`** (PRIMARY KEY) — duplication is impossible; rows are never
|
- **Upsert on `ticket_id`** (PRIMARY KEY) — duplication is impossible; rows are never
|
||||||
deleted, so closed-ticket history accumulates. On success the file is **moved** to
|
deleted, so closed-ticket history accumulates. On success each file is **moved** to
|
||||||
`automations/inc/processed/`.
|
`automations/inc/processed/`.
|
||||||
- **Cleaning at ingest:** drop `is_alarm=true` rows + the `EXPORT STOPPED…` sentinel; drop
|
- **Cleaning at ingest:** drop `is_alarm=true` rows + the `EXPORT STOPPED…` sentinel; drop
|
||||||
`week_start`/`week_end`, `source_s3_*`/`source_snapshot_id`, `department`/`source_type`;
|
`week_start`/`week_end`, `source_s3_*`/`source_snapshot_id`, `department`/`source_type`;
|
||||||
|
|
@ -192,8 +237,18 @@ Findings to keep in mind (see the PRD for detail):
|
||||||
|
|
||||||
## Status / roadmap
|
## Status / roadmap
|
||||||
|
|
||||||
Live: INC ingestion deployed on Coolify (hourly `15 7-19 * * *` EAT), schema +
|
Live: INC ingestion deployed on Coolify (every 20 min `*/20 6-20 * * *` EAT), schema +
|
||||||
generated columns + geocoding + the `inc_open_sla` view in `tracksolid_db`.
|
generated columns + geocoding + the `inc_open_sla` view in `tracksolid_db`.
|
||||||
Next (Phase 2): time-series analytics (closure rate, MTTR/SLA trends), then FleetNow
|
|
||||||
vehicle **dispatch** off `geog`, and **team closure attribution**. **CRQ** is a
|
**CRQ (this milestone):** data layer + map — `tickets.crq` fed from
|
||||||
separate future project that will reuse this machinery against `automations/crq/`.
|
`automations/crq/changes/` by `crq/import_crq.py`, the `tickets.crq` table + typed columns (migration 15),
|
||||||
|
cross-dataset geocoding, and visibility on the Tickets map via `fn_tickets_for_map`.
|
||||||
|
One-time seed: drain the isptickets CRQ stream (`python -m crq.import_crq --from-bucket
|
||||||
|
--apply`) — empty watermark + the stream's periodic full-state snapshots converge to
|
||||||
|
current state — then run the shared geocode once. See
|
||||||
|
[`docs/deployment-and-operations.md`](docs/deployment-and-operations.md).
|
||||||
|
|
||||||
|
Next (Phase 2): bring CRQ to full INC parity once installation-lifecycle semantics are
|
||||||
|
confirmed — a `crq_open_sla` view, `fn_crq_dashboard`, and CRQ history capture (the INC
|
||||||
|
analogues of migrations 08/09/10). Then time-series analytics (closure rate, MTTR/SLA
|
||||||
|
trends), FleetNow vehicle **dispatch** off `geog`, and **team closure attribution**.
|
||||||
|
|
|
||||||
0
crq/__init__.py
Normal file
0
crq/__init__.py
Normal file
61
crq/import_crq.py
Normal file
61
crq/import_crq.py
Normal file
|
|
@ -0,0 +1,61 @@
|
||||||
|
"""
|
||||||
|
crq/import_crq.py — Fireside Communications · CRQ (new-installation) ingestion.
|
||||||
|
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||||||
|
Thin entrypoint over the shared engine (`pipeline.py`) for the CRQ dataset:
|
||||||
|
tickets.crq — new-installation requests (FleetOps "Tickets" CRQ tab)
|
||||||
|
|
||||||
|
CRQ mirrors INC at the data layer — IDENTICAL 32-column CSV schema and the same
|
||||||
|
incremental CDC change stream automations/crq/changes/<EAT-ts>.csv in the
|
||||||
|
`isptickets` bucket. This loader upserts on ticket_id, advances the per-dataset
|
||||||
|
watermark (tickets.import_meta dataset='crq'), and archives each consumed file to
|
||||||
|
automations/crq/processed/. CRQ flows onto the existing Tickets map via
|
||||||
|
reporting.fn_tickets_for_map (which already unions tickets.crq).
|
||||||
|
|
||||||
|
Scope (current): data layer + map only. CRQ has NO post-apply history capture yet
|
||||||
|
(installation-lifecycle SLA/backlog semantics differ from incidents — a future
|
||||||
|
migration). Geocoding is CROSS-DATASET and run from the INC entrypoint
|
||||||
|
(python -m inc.import_inc --geocode-clusters / --geocode-locations) against the
|
||||||
|
shared gazetteer, which covers both inc and crq.
|
||||||
|
|
||||||
|
Usage (needs DATABASE_URL + RUSTFS_* env; see .env.example):
|
||||||
|
python -m crq.import_crq --from-bucket --apply
|
||||||
|
python -m crq.import_crq --from-bucket --reseed --apply # one-time bucket cutover
|
||||||
|
python -m crq.import_crq --crq-csv 2026-06-24T12-55-44.csv --apply
|
||||||
|
|
||||||
|
Pre-requisite: migrations applied (run_migrations.py) — tickets.crq + its typed
|
||||||
|
columns (15_crq_table.sql) + geo_clusters/geo_locations + fn_tickets_for_map.
|
||||||
|
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
|
||||||
|
import pipeline
|
||||||
|
|
||||||
|
# CRQ has no post-apply hook yet (history capture is INC-only — see module docstring).
|
||||||
|
DATASET = pipeline.make_dataset("crq", post_apply=None)
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
ap = argparse.ArgumentParser(
|
||||||
|
description="Ingest CRQ (installation) tickets from CSV (raw-first)")
|
||||||
|
ap.add_argument("--apply", action="store_true", help="Write to DB (default: dry-run)")
|
||||||
|
ap.add_argument("--from-bucket", action="store_true",
|
||||||
|
help="Drain the incremental CRQ change stream (automations/crq/changes/) "
|
||||||
|
"from the isptickets S3 bucket: every not-yet-processed file "
|
||||||
|
"oldest→newest, upsert on ticket_id, advance the watermark, archive")
|
||||||
|
ap.add_argument("--reseed", action="store_true",
|
||||||
|
help="Ignore the stored watermark and drain every file in changes/ once "
|
||||||
|
"(one-time bucket cutover / reseed). Use with --from-bucket --apply")
|
||||||
|
ap.add_argument("--crq-csv", dest="local_csv", default=None,
|
||||||
|
help="Local CRQ tickets CSV file (dev)")
|
||||||
|
args = ap.parse_args()
|
||||||
|
|
||||||
|
if not (args.from_bucket or args.local_csv):
|
||||||
|
ap.error("provide --from-bucket or --crq-csv")
|
||||||
|
pipeline.ingest(DATASET, args)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
202
docs/deployment-and-operations.md
Normal file
202
docs/deployment-and-operations.md
Normal file
|
|
@ -0,0 +1,202 @@
|
||||||
|
# Deployment & Operations — fleettickets
|
||||||
|
|
||||||
|
Operational runbook for the INC + CRQ ingest pipelines as deployed on **Coolify**
|
||||||
|
(host `kianiadee@twala.rahamafresh.com`, key `~/.ssh/id_ed25519`). Covers the
|
||||||
|
container, environment, schedule, auto-deploy webhook, the source-bucket cutover
|
||||||
|
procedure, and verification. Secrets are referenced by **where to retrieve them**,
|
||||||
|
never by value.
|
||||||
|
|
||||||
|
> **One image, two datasets.** INC and CRQ share an identical 32-column source schema
|
||||||
|
> and the same `isptickets` bucket; they run as **two Scheduled Tasks** off the one
|
||||||
|
> container, via thin entrypoints `python -m inc.import_inc` / `python -m crq.import_crq`
|
||||||
|
> over the shared `pipeline.py` engine. Everything below applies to both unless noted.
|
||||||
|
|
||||||
|
## What's deployed
|
||||||
|
|
||||||
|
| Thing | Detail |
|
||||||
|
|---|---|
|
||||||
|
| Coolify app | **`fleettickets`** — id `15`, uuid `g14mwzo73q20g70vc6fzumya`, build pack `dockerfile`, git `main` |
|
||||||
|
| Container | built from this repo's `Dockerfile` (`python:3.12-slim`, `TZ=Africa/Nairobi`); kept alive with `tail -f /dev/null` (no web server) |
|
||||||
|
| Ingest (INC) | Coolify **Scheduled Task** `inc_tickets` → `python -m inc.import_inc --from-bucket --apply` |
|
||||||
|
| Ingest (CRQ) | Coolify **Scheduled Task** `crq_tickets` → `python -m crq.import_crq --from-bucket --apply` |
|
||||||
|
| DB | `tickets` schema in the shared `tracksolid_db` (internal host `timescale_db:5432`) |
|
||||||
|
| Source | **`isptickets`** S3 bucket, `automations/{inc,crq}/changes/<EAT-ts>.csv` CDC streams (see `../n8n-s3-ticket-exports.md` and `../README.md`) |
|
||||||
|
|
||||||
|
Resolve the live container name (Coolify appends a random suffix):
|
||||||
|
```bash
|
||||||
|
ssh -i ~/.ssh/id_ed25519 kianiadee@twala.rahamafresh.com \
|
||||||
|
'docker ps --filter name=g14mwzo73q20g70vc6fzumya --format "{{.Names}}" | head -1'
|
||||||
|
```
|
||||||
|
|
||||||
|
## Schedule (cron)
|
||||||
|
|
||||||
|
Both Scheduled Tasks (`inc_tickets`, `crq_tickets`) run **`*/20 6-20 * * *`** — every
|
||||||
|
20 min, **06:00–20:40 EAT**. Coolify evaluates task cron in the server timezone
|
||||||
|
(`server_settings.server_timezone` = `Africa/Nairobi`), so **no UTC conversion** — write
|
||||||
|
EAT directly. The `--from-bucket` run is a cheap no-op when no new change file has arrived
|
||||||
|
(watermark guard, per dataset), so a dense schedule is safe.
|
||||||
|
|
||||||
|
To change the frequency, edit the task in the Coolify UI, or in `coolify-db`:
|
||||||
|
```sql
|
||||||
|
UPDATE scheduled_tasks SET frequency = '*/20 6-20 * * *', updated_at = now()
|
||||||
|
WHERE name IN ('inc_tickets', 'crq_tickets');
|
||||||
|
```
|
||||||
|
The `crq_tickets` task is added the same way INC was — in the Coolify UI (Scheduled Tasks
|
||||||
|
→ Add) with command `python -m crq.import_crq --from-bucket --apply`, container
|
||||||
|
`fleettickets`, cron `*/20 6-20 * * *`.
|
||||||
|
Coolify's scheduler re-reads `scheduled_tasks` each minute, so the change is picked up
|
||||||
|
without a redeploy. Execution history: `scheduled_task_executions`.
|
||||||
|
|
||||||
|
> The repo's `Dockerfile`, `run_ingest.sh`, and `README.md` document this same cron for
|
||||||
|
> the plain-host/VM fallback (`CRON_TZ=Africa/Nairobi`).
|
||||||
|
|
||||||
|
## Environment variables
|
||||||
|
|
||||||
|
Set on the Coolify app (Environment Variables). Names only — values live in Coolify:
|
||||||
|
|
||||||
|
| Var | Purpose |
|
||||||
|
|---|---|
|
||||||
|
| `DATABASE_URL` | `tracksolid_db` (internal `timescale_db:5432`) |
|
||||||
|
| `RUSTFS_ENDPOINT` | `https://s3.rahamafresh.com` |
|
||||||
|
| `RUSTFS_ACCESS_KEY` / `RUSTFS_SECRET_KEY` | `isptickets` bucket credentials |
|
||||||
|
| `RUSTFS_REGION` | `us-east-1` |
|
||||||
|
| `TICKETS_BUCKET` | `isptickets` |
|
||||||
|
| `GEOCODER_PROVIDER` / `GEOCODER_API_KEY` | keyed geocoder (LocationIQ/OpenCage) |
|
||||||
|
|
||||||
|
**Env vars are Laravel-encrypted in `coolify-db` — never raw-`UPDATE` them.** Change them
|
||||||
|
in the Coolify UI, or via `artisan tinker` (which re-encrypts on save):
|
||||||
|
```bash
|
||||||
|
ssh -i ~/.ssh/id_ed25519 kianiadee@twala.rahamafresh.com 'docker exec -i coolify php artisan tinker' <<'PHP'
|
||||||
|
$e = \App\Models\EnvironmentVariable::where('resourceable_type','App\\Models\\Application')
|
||||||
|
->where('resourceable_id',15)->where('key','TICKETS_BUCKET')->first();
|
||||||
|
$e->value = 'isptickets'; $e->save(); echo $e->value.PHP_EOL;
|
||||||
|
PHP
|
||||||
|
```
|
||||||
|
An env change only takes effect after the container is **recreated** (a redeploy — see below),
|
||||||
|
since Coolify injects env at container create time.
|
||||||
|
|
||||||
|
## Deploys
|
||||||
|
|
||||||
|
### Auto-deploy (Forgejo → Coolify webhook)
|
||||||
|
|
||||||
|
A push to `main` should auto-deploy. This needs **both** the Coolify per-app Auto-Deploy
|
||||||
|
toggle (Configuration → Advanced) **and** a webhook on the Forgejo repo. The webhook was
|
||||||
|
missing originally (the toggle alone is not enough); it now exists as hook id `3` on
|
||||||
|
`kianiadee/fleettickets`:
|
||||||
|
|
||||||
|
| Field | Value |
|
||||||
|
|---|---|
|
||||||
|
| URL | `https://stage.rahamafresh.com/webhooks/source/gitea/events/manual` |
|
||||||
|
| Type / content-type | `gitea` / `json` |
|
||||||
|
| Events / branch filter | `push` / `main` |
|
||||||
|
| Secret | the app's `manual_webhook_secret_gitea` (Coolify HMAC-validates `X-Hub-Signature-256`) |
|
||||||
|
|
||||||
|
Recreate / inspect it via the Forgejo API (auth: `git credential fill`, host
|
||||||
|
`repo.rahamafresh.com`, basic auth to `/api/v1` — no `tea`/`gh` needed). Get the secret by
|
||||||
|
decrypting it in Coolify:
|
||||||
|
```bash
|
||||||
|
ssh -i ~/.ssh/id_ed25519 kianiadee@twala.rahamafresh.com \
|
||||||
|
"docker exec -i coolify php artisan tinker --execute=\"echo \\App\\Models\\Application::find(15)->manual_webhook_secret_gitea;\""
|
||||||
|
```
|
||||||
|
```bash
|
||||||
|
# list / test the webhook (USER:PASS from git credential fill)
|
||||||
|
curl -s -u "$USER:$PASS" https://repo.rahamafresh.com/api/v1/repos/kianiadee/fleettickets/hooks
|
||||||
|
curl -s -u "$USER:$PASS" -X POST https://repo.rahamafresh.com/api/v1/repos/kianiadee/fleettickets/hooks/3/tests
|
||||||
|
```
|
||||||
|
A successful test shows a webhook hit in `docker logs coolify` (no `invalid_signature`
|
||||||
|
audit) and a new row in `application_deployment_queues`.
|
||||||
|
|
||||||
|
### Manual deploy (no push)
|
||||||
|
|
||||||
|
Trigger the same action as Coolify's Deploy button via tinker:
|
||||||
|
```bash
|
||||||
|
ssh -i ~/.ssh/id_ed25519 kianiadee@twala.rahamafresh.com 'docker exec -i coolify php artisan tinker' <<'PHP'
|
||||||
|
$app = \App\Models\Application::where('uuid','g14mwzo73q20g70vc6fzumya')->first();
|
||||||
|
$uuid = new \Visus\Cuid2\Cuid2;
|
||||||
|
echo json_encode(queue_application_deployment(
|
||||||
|
application: $app, deployment_uuid: $uuid, force_rebuild: false, is_api: true)).PHP_EOL;
|
||||||
|
echo $uuid.PHP_EOL;
|
||||||
|
PHP
|
||||||
|
```
|
||||||
|
Watch it: `SELECT id, status, created_at FROM application_deployment_queues WHERE
|
||||||
|
application_id = '15' ORDER BY created_at DESC LIMIT 3;` (note: `application_id` is the
|
||||||
|
**numeric id stored as text**).
|
||||||
|
|
||||||
|
## Source-bucket cutover (when the provider moves buckets)
|
||||||
|
|
||||||
|
If the provider moves the INC feed to a new bucket (as happened `tickets` → `isptickets`,
|
||||||
|
2026-06-25):
|
||||||
|
|
||||||
|
1. **Inspect** the new bucket (read-only) — confirm `automations/{inc,crq}/changes/` layout,
|
||||||
|
timestamp range, schema parity.
|
||||||
|
2. **Update env** (UI or tinker): `RUSTFS_ACCESS_KEY`, `RUSTFS_SECRET_KEY`,
|
||||||
|
`TICKETS_BUCKET` → the new bucket (endpoint usually unchanged). Both datasets read the
|
||||||
|
same bucket, so one env change serves both tasks.
|
||||||
|
3. **Reconcile the DB** to current. The loader drains every `changes/` file newer than the
|
||||||
|
watermark (`tickets.import_meta.metadata.source_max_key`, **per dataset**), oldest→newest,
|
||||||
|
upserting on `ticket_id`:
|
||||||
|
- If the watermark **predates** the new bucket's first file, a normal
|
||||||
|
`--from-bucket --apply` drains the whole new stream — no reseed needed.
|
||||||
|
- Otherwise use **`--reseed`** (ignores the watermark, drains all `changes/` once):
|
||||||
|
`python -m inc.import_inc --from-bucket --reseed --apply` (see README "Bucket cutover").
|
||||||
|
The new stream's periodic full-state re-emissions make this converge even across the
|
||||||
|
cutover gap. Idempotent upserts + never-delete make it non-destructive.
|
||||||
|
- For a one-off, you can run it in the live container with the new creds inlined:
|
||||||
|
`docker exec -e TICKETS_BUCKET=… -e RUSTFS_ACCESS_KEY=… -e RUSTFS_SECRET_KEY=… <container>
|
||||||
|
sh -c "cd /app && python -m inc.import_inc --from-bucket --apply"`.
|
||||||
|
4. **Re-geocode** new clusters/locations: `python -m inc.import_inc --geocode-clusters --apply`
|
||||||
|
then `--geocode-locations --apply` (cross-dataset; existing gazetteer persists; only new
|
||||||
|
keys are looked up).
|
||||||
|
5. **Redeploy** so the Scheduled Task's container picks up the new env (push `main` → webhook,
|
||||||
|
or manual deploy). Old bucket is left untouched for rollback.
|
||||||
|
|
||||||
|
## Bringing CRQ online (one-time seed)
|
||||||
|
|
||||||
|
CRQ was added 2026-06-25 (data layer + map). Migration `15_crq_table.sql` **creates**
|
||||||
|
`tickets.crq` (the live DB's `01` predated its crq section, so the table never existed)
|
||||||
|
plus the typed columns. To seed it from zero on the live DB — once the code + migration are
|
||||||
|
applied (`run_migrations.py`; on the live cutover it was applied out-of-band via the running
|
||||||
|
container, see below):
|
||||||
|
|
||||||
|
1. **Verify** the migration applied: `SELECT 1 FROM tickets.schema_migrations WHERE
|
||||||
|
filename='15_crq_table.sql';` and `\d tickets.crq` shows the table + typed columns.
|
||||||
|
2. **Seed** from isptickets (empty `crq` watermark → drains all `automations/crq/changes/`
|
||||||
|
files oldest→newest; the stream's periodic full-state snapshots converge to current
|
||||||
|
state — same convergence the INC cutover relied on, so **no `--reseed` needed**):
|
||||||
|
```bash
|
||||||
|
python -m crq.import_crq --from-bucket # dry-run first ("N of N change file(s)…")
|
||||||
|
python -m crq.import_crq --from-bucket --apply # commit + archive to crq/processed/
|
||||||
|
```
|
||||||
|
(Or in the live container with `docker exec … sh -c "cd /app && python -m crq.import_crq
|
||||||
|
--from-bucket --apply"`.)
|
||||||
|
3. **Geocode** (cross-dataset; most clusters already resolved from INC, so few new lookups):
|
||||||
|
`python -m inc.import_inc --geocode-clusters --apply` then `--geocode-locations --apply`.
|
||||||
|
4. **Confirm** CRQ on the map: `SELECT reporting.fn_tickets_for_map() -> 'summary';` shows a
|
||||||
|
non-zero `crq` count. The `crq_tickets` Scheduled Task then keeps it current.
|
||||||
|
|
||||||
|
## Verification
|
||||||
|
|
||||||
|
```bash
|
||||||
|
DB=$(docker ps --filter name=timescale_db --format "{{.Names}}" | head -1)
|
||||||
|
docker exec -i "$DB" psql -U postgres -d tracksolid_db <<'SQL'
|
||||||
|
-- watermark + freshness
|
||||||
|
SELECT export_type, records_ingested, ingested_at, metadata->>'source_max_key'
|
||||||
|
FROM tickets.import_meta WHERE dataset='inc';
|
||||||
|
-- counts
|
||||||
|
SELECT count(*) total_inc,
|
||||||
|
count(*) FILTER (WHERE (raw->>'is_actionable')::boolean) AS open
|
||||||
|
FROM tickets.inc;
|
||||||
|
-- map payload sanity
|
||||||
|
SELECT reporting.fn_tickets_for_map() -> 'summary' ->> 'ticket_count';
|
||||||
|
SQL
|
||||||
|
```
|
||||||
|
- New bucket `changes/` empties as files move to `automations/inc/processed/`.
|
||||||
|
- A plain `--from-bucket --apply` reports "nothing new" until the next change file lands.
|
||||||
|
- FleetOps Tickets map freshness reflects the new `ingested_at`.
|
||||||
|
|
||||||
|
## Rollback
|
||||||
|
|
||||||
|
- **Bucket:** revert the three env vars to the old bucket + creds and redeploy. The old
|
||||||
|
bucket and its `processed/` history are untouched; upserts are idempotent and rows are
|
||||||
|
never deleted, so re-running is safe.
|
||||||
|
- **Cron:** `UPDATE scheduled_tasks SET frequency = <old> WHERE name='inc_tickets';`
|
||||||
|
|
@ -3,14 +3,25 @@
|
||||||
What is actually built and deployed, as of the Phase-1 completion. Companion to
|
What is actually built and deployed, as of the Phase-1 completion. Companion to
|
||||||
`docs/phase-1-ingestion.md` (plan) and `docs/phase-2-dashboard.md` (next).
|
`docs/phase-1-ingestion.md` (plan) and `docs/phase-2-dashboard.md` (next).
|
||||||
|
|
||||||
## Pipeline (`import_tickets.py`)
|
## Pipeline (`pipeline.py` engine + `inc/`,`crq/` entrypoints)
|
||||||
|
|
||||||
- **Source:** newest `automations/inc/<EAT-timestamp>.csv` in the rustfs `tickets`
|
The dataset-agnostic CDC engine lives in **`pipeline.py`**, parameterized by a small
|
||||||
bucket (endpoint `https://s3.rahamafresh.com`, path-style, region `us-east-1`).
|
`Dataset` config (name, table, `automations/<type>/changes|processed/` prefixes, key
|
||||||
|
regex, optional `post_apply` hook). Two thin entrypoints supply that config and the CLI:
|
||||||
|
**`inc/import_inc.py`** (`python -m inc.import_inc`, `post_apply=capture_history`) and
|
||||||
|
**`crq/import_crq.py`** (`python -m crq.import_crq`, no history hook). INC and CRQ share an
|
||||||
|
**identical 32-column source schema**, so the engine is fully shared; geocoding is
|
||||||
|
**cross-dataset** (one gazetteer/budget, unions `tickets.inc` + `tickets.crq`) and is run
|
||||||
|
from the INC entrypoint.
|
||||||
|
|
||||||
|
- **Source:** the incremental CDC stream `automations/<inc|crq>/changes/<EAT-timestamp>.csv`
|
||||||
|
in the **`isptickets`** S3 bucket (endpoint `https://s3.rahamafresh.com`, path-style,
|
||||||
|
region `us-east-1`; was the `tickets` bucket before the 2026-06-25 cutover).
|
||||||
- **S3 access via boto3** (no aws-CLI dependency): `list_objects_v2` (paginator),
|
- **S3 access via boto3** (no aws-CLI dependency): `list_objects_v2` (paginator),
|
||||||
`get_object`, `copy_object` + `delete_object` for archiving.
|
`get_object`, `copy_object` + `delete_object` for archiving.
|
||||||
- **Skip-if-unchanged:** newest S3 **ETag** vs `tickets.import_meta.metadata.source_etag`;
|
- **Watermark:** drains every `changes/` file newer than
|
||||||
equal → skip the DB write (the export re-emits identical content most hours).
|
`tickets.import_meta.metadata.source_max_key`, oldest→newest; reruns with no new file
|
||||||
|
are a cheap no-op. `--reseed` ignores the watermark for a one-time bucket cutover.
|
||||||
- **Cleaning:** drop `is_alarm=true` rows + the `EXPORT STOPPED…` sentinel; drop
|
- **Cleaning:** drop `is_alarm=true` rows + the `EXPORT STOPPED…` sentinel; drop
|
||||||
`week_start`/`week_end`, `source_s3_bucket`/`source_s3_key`/`source_snapshot_id`,
|
`week_start`/`week_end`, `source_s3_bucket`/`source_s3_key`/`source_snapshot_id`,
|
||||||
`department`, `source_type`; normalize `region`→lowercase, `raw_status`→UPPERCASE.
|
`department`, `source_type`; normalize `region`→lowercase, `raw_status`→UPPERCASE.
|
||||||
|
|
@ -22,8 +33,11 @@ What is actually built and deployed, as of the Phase-1 completion. Companion to
|
||||||
- **History capture:** after each `--apply` run (ingest or skip), calls
|
- **History capture:** after each `--apply` run (ingest or skip), calls
|
||||||
`tickets.capture_history()` → appends new closures + upserts today's backlog
|
`tickets.capture_history()` → appends new closures + upserts today's backlog
|
||||||
snapshot.
|
snapshot.
|
||||||
- CLI: `--from-bucket` (newest INC csv), `--inc-csv <file>` (local dev), `--apply`
|
- CLI (`inc`): `--from-bucket` (drain the INC change stream), `--reseed` (ignore the
|
||||||
(else dry-run), `--geocode-clusters`, `--geocode-locations`, `--capture-history`.
|
watermark; one-time bucket cutover), `--inc-csv <file>` (local dev), `--apply` (else
|
||||||
|
dry-run), `--geocode-clusters`, `--geocode-locations`, `--capture-history`.
|
||||||
|
- CLI (`crq`): `--from-bucket`, `--reseed`, `--crq-csv <file>`, `--apply` (ingest only;
|
||||||
|
geocoding + history are not on the CRQ entrypoint).
|
||||||
|
|
||||||
## Schema / migrations (`tracksolid_db`, applied via `run_migrations.py`)
|
## Schema / migrations (`tracksolid_db`, applied via `run_migrations.py`)
|
||||||
|
|
||||||
|
|
@ -39,6 +53,8 @@ What is actually built and deployed, as of the Phase-1 completion. Companion to
|
||||||
| 08_inc_open_sla_view | `tickets.inc_open_sla` view (open tickets + derived SLA) |
|
| 08_inc_open_sla_view | `tickets.inc_open_sla` view (open tickets + derived SLA) |
|
||||||
| 09_inc_dashboard_fn | **built** — `reporting.fn_inc_dashboard(cluster, status, window, from, to)`: one JSON payload (open GeoJSON + windowed closed GeoJSON + metrics + freshness) for the FleetOps live INC map. See `docs/phase-2-dashboard.md` |
|
| 09_inc_dashboard_fn | **built** — `reporting.fn_inc_dashboard(cluster, status, window, from, to)`: one JSON payload (open GeoJSON + windowed closed GeoJSON + metrics + freshness) for the FleetOps live INC map. See `docs/phase-2-dashboard.md` |
|
||||||
| 10_inc_history_capture | **built** — `tickets.closure_events` (append-only observed closures) + `tickets.inc_daily_snapshot` (per-EAT-day open backlog + flow) + `tickets.capture_history()`; the ingest calls it each `--apply` run. Unlocks backlog-over-time |
|
| 10_inc_history_capture | **built** — `tickets.closure_events` (append-only observed closures) + `tickets.inc_daily_snapshot` (per-EAT-day open backlog + flow) + `tickets.capture_history()`; the ingest calls it each `--apply` run. Unlocks backlog-over-time |
|
||||||
|
| 12_inc_dashboard_by_owner | **built** — owner/team breakdown extension to `fn_inc_dashboard` |
|
||||||
|
| 15_crq_table | **built** — materializes `tickets.crq` (table + geom trigger + indexes; `01`'s crq section never ran on the live DB) + the typed STORED generated columns from `03` (reuses `tickets.eat_ts()`). Data-layer parity for the CRQ tab |
|
||||||
|
|
||||||
`tickets.inc` columns: `ticket_id` (PK), `raw` (jsonb, source of truth),
|
`tickets.inc` columns: `ticket_id` (PK), `raw` (jsonb, source of truth),
|
||||||
`normalized_status`/`raw_status`, `bucket`, `is_actionable`, `cluster`/`region`/
|
`normalized_status`/`raw_status`, `bucket`, `is_actionable`, `cluster`/`region`/
|
||||||
|
|
@ -53,11 +69,16 @@ What is actually built and deployed, as of the Phase-1 completion. Companion to
|
||||||
- **Coolify** app built from this repo's `Dockerfile` (`python:3.12-slim`,
|
- **Coolify** app built from this repo's `Dockerfile` (`python:3.12-slim`,
|
||||||
`TZ=Africa/Nairobi`, keep-alive `tail -f /dev/null`). Separate from the FleetOps
|
`TZ=Africa/Nairobi`, keep-alive `tail -f /dev/null`). Separate from the FleetOps
|
||||||
web app (`fleet-ops-staging`).
|
web app (`fleet-ops-staging`).
|
||||||
- **Scheduled Task:** `python import_tickets.py --from-bucket --apply`, cron
|
- **Scheduled Tasks (two):** `inc_tickets` → `python -m inc.import_inc --from-bucket
|
||||||
`15 7-19 * * *` in **EAT** (Coolify runs tasks in EAT — no UTC conversion).
|
--apply` and `crq_tickets` → `python -m crq.import_crq --from-bucket --apply`, both cron
|
||||||
- **Env vars** (Coolify): `DATABASE_URL` (internal DB host), `RUSTFS_*`, `GEOCODER_*`.
|
`*/20 6-20 * * *` in **EAT** (Coolify runs tasks in EAT — no UTC conversion).
|
||||||
|
- **Env vars** (Coolify): `DATABASE_URL` (internal DB host), `RUSTFS_*`
|
||||||
|
(`isptickets` bucket — serves both inc + crq), `GEOCODER_*`.
|
||||||
- For a plain host/VM, `run_ingest.sh` + a crontab line is the alternative.
|
- For a plain host/VM, `run_ingest.sh` + a crontab line is the alternative.
|
||||||
|
|
||||||
|
Full ops runbook (env management, the Forgejo → Coolify auto-deploy webhook, manual
|
||||||
|
deploys, bucket cutover, verification): **`docs/deployment-and-operations.md`**.
|
||||||
|
|
||||||
## State at hand-off
|
## State at hand-off
|
||||||
|
|
||||||
- `tickets.inc` ≈ 21,312 rows (current non-alarm INC + a few aged-out history rows);
|
- `tickets.inc` ≈ 21,312 rows (current non-alarm INC + a few aged-out history rows);
|
||||||
|
|
@ -86,5 +107,12 @@ Phase 2 (built): `fn_inc_dashboard` read-API → FleetOps live map (open + close
|
||||||
overlay + metrics); history capture (`closure_events` + `inc_daily_snapshot`) for
|
overlay + metrics); history capture (`closure_events` + `inc_daily_snapshot`) for
|
||||||
backlog/closure trends. Remaining: `dashboard_api` endpoint + FleetOps SPA (other
|
backlog/closure trends. Remaining: `dashboard_api` endpoint + FleetOps SPA (other
|
||||||
repos; see `docs/dashboard-api-contract.md`), FleetNow **dispatch** off `geog`,
|
repos; see `docs/dashboard-api-contract.md`), FleetNow **dispatch** off `geog`,
|
||||||
**team closure attribution**. **CRQ** = separate future project reusing this
|
**team closure attribution**.
|
||||||
machinery against `automations/crq/`.
|
|
||||||
|
**CRQ** (this milestone): the shared engine now feeds `tickets.crq` from
|
||||||
|
`automations/crq/changes/` (`crq/import_crq.py`), with the `tickets.crq` table + typed columns (migration 15) and
|
||||||
|
cross-dataset geocoding — CRQ shows on the Tickets map via `fn_tickets_for_map` (which
|
||||||
|
already unions it) and gets its own FleetOps tab. Deferred to a follow-up once
|
||||||
|
installation-lifecycle semantics are confirmed: the CRQ analogues of migrations
|
||||||
|
08/09/10 — `crq_open_sla`, `fn_crq_dashboard`, and CRQ history capture (`tickets.crq`
|
||||||
|
currently has **no** `post_apply` hook).
|
||||||
|
|
|
||||||
|
|
@ -19,7 +19,9 @@ tickets to our S3-compatible bucket **every hour**:
|
||||||
- `automations/inc/<EAT-timestamp>.csv` — **incidents / customer faults** *(in scope)*
|
- `automations/inc/<EAT-timestamp>.csv` — **incidents / customer faults** *(in scope)*
|
||||||
- `automations/crq/<EAT-timestamp>.csv` — new-installation requests *(out of scope)*
|
- `automations/crq/<EAT-timestamp>.csv` — new-installation requests *(out of scope)*
|
||||||
|
|
||||||
(See `n8n-hourly-s3-full-data-exports.md`. Sample: `2026-06-15T17-00-00.csv`.)
|
(See `n8n-s3-ticket-exports.md`. Sample: `2026-06-15T17-00-00.csv`. Note: the
|
||||||
|
source later switched to an incremental `automations/inc/changes/` stream — that
|
||||||
|
doc has the current layout; this PRD records the original Phase-1 model.)
|
||||||
|
|
||||||
`fleettickets` owns the **downstream**: the `tickets` schema in the shared
|
`fleettickets` owns the **downstream**: the `tickets` schema in the shared
|
||||||
`tracksolid_db` (raw-jsonb-first `tickets.inc`, geocoding gazetteers, and
|
`tracksolid_db` (raw-jsonb-first `tickets.inc`, geocoding gazetteers, and
|
||||||
|
|
@ -80,6 +82,11 @@ Deployed on **Coolify** (own app, `Dockerfile`, keep-alive worker). Ingest runs
|
||||||
**Scheduled Task**: `python import_tickets.py --from-bucket --apply`, cron
|
**Scheduled Task**: `python import_tickets.py --from-bucket --apply`, cron
|
||||||
`15 7-19 * * *` in **EAT**. Env: `DATABASE_URL`, `RUSTFS_*`, `GEOCODER_*`.
|
`15 7-19 * * *` in **EAT**. Env: `DATABASE_URL`, `RUSTFS_*`, `GEOCODER_*`.
|
||||||
|
|
||||||
|
> **Superseded** (historical Phase-1 plan). As built: the loader is now the shared
|
||||||
|
> `pipeline.py` engine with thin entrypoints (`python -m inc.import_inc` / `-m
|
||||||
|
> crq.import_crq`), running as **two** Scheduled Tasks at cron `*/20 6-20 * * *`. See
|
||||||
|
> `implementation.md` and `deployment-and-operations.md`.
|
||||||
|
|
||||||
## Data-quality findings (carried into Phase 2)
|
## Data-quality findings (carried into Phase 2)
|
||||||
|
|
||||||
- Source `sla_status` ≠ a plain 48h rule, and `mttr` is not wall-clock — pin the
|
- Source `sla_status` ≠ a plain 48h rule, and `mttr` is not wall-clock — pin the
|
||||||
|
|
|
||||||
0
inc/__init__.py
Normal file
0
inc/__init__.py
Normal file
74
inc/import_inc.py
Normal file
74
inc/import_inc.py
Normal file
|
|
@ -0,0 +1,74 @@
|
||||||
|
"""
|
||||||
|
inc/import_inc.py — Fireside Communications · INC (incident / fault) ingestion.
|
||||||
|
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||||||
|
Thin entrypoint over the shared engine (`pipeline.py`) for the INC dataset:
|
||||||
|
tickets.inc — incidents / customer faults (FleetOps "Tickets" INC tab)
|
||||||
|
|
||||||
|
INC reads the incremental CDC change stream automations/inc/changes/<EAT-ts>.csv
|
||||||
|
from the `isptickets` bucket, upserts on ticket_id, advances the per-dataset
|
||||||
|
watermark, archives each file to automations/inc/processed/, and — uniquely to
|
||||||
|
INC — runs tickets.capture_history() after each --apply run (closure_events +
|
||||||
|
daily backlog snapshot). Geocoding (--geocode-clusters / --geocode-locations) is
|
||||||
|
CROSS-DATASET and driven from here (the shared gazetteer covers inc + crq).
|
||||||
|
|
||||||
|
Usage (needs DATABASE_URL + RUSTFS_* + GEOCODER_* env; see .env.example):
|
||||||
|
python -m inc.import_inc --from-bucket --apply
|
||||||
|
python -m inc.import_inc --from-bucket --reseed --apply # one-time bucket cutover
|
||||||
|
python -m inc.import_inc --inc-csv 2026-06-15T17-00-00.csv --apply
|
||||||
|
python -m inc.import_inc --geocode-clusters --apply
|
||||||
|
python -m inc.import_inc --geocode-locations --apply
|
||||||
|
|
||||||
|
Pre-requisite: migrations applied (run_migrations.py) — tickets.inc/crq +
|
||||||
|
geo_clusters + geo_locations + reporting.fn_tickets_for_map.
|
||||||
|
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
|
||||||
|
import pipeline
|
||||||
|
|
||||||
|
# INC captures closure/backlog history after every --apply run (CRQ does not yet).
|
||||||
|
DATASET = pipeline.make_dataset("inc", post_apply=pipeline.capture_history)
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
ap = argparse.ArgumentParser(description="Ingest INC tickets from CSV (raw-first) + geocode")
|
||||||
|
ap.add_argument("--apply", action="store_true", help="Write to DB (default: dry-run)")
|
||||||
|
ap.add_argument("--from-bucket", action="store_true",
|
||||||
|
help="Drain the incremental INC change stream (automations/inc/changes/) "
|
||||||
|
"from the isptickets S3 bucket: every not-yet-processed file "
|
||||||
|
"oldest→newest, upsert on ticket_id, advance the watermark, archive")
|
||||||
|
ap.add_argument("--reseed", action="store_true",
|
||||||
|
help="Ignore the stored watermark and drain every file in changes/ once "
|
||||||
|
"(one-time bucket cutover / reseed). Use with --from-bucket --apply")
|
||||||
|
ap.add_argument("--inc-csv", dest="local_csv", default=None,
|
||||||
|
help="Local INC tickets CSV file (dev)")
|
||||||
|
ap.add_argument("--geocode-clusters", action="store_true",
|
||||||
|
help="Geocode distinct clusters (inc+crq) into the gazetteer, then re-resolve")
|
||||||
|
ap.add_argument("--geocode-locations", action="store_true",
|
||||||
|
help="Geocode actionable inc+crq location_names precisely (keyed provider), "
|
||||||
|
"then re-resolve")
|
||||||
|
ap.add_argument("--capture-history", action="store_true",
|
||||||
|
help="Run tickets.capture_history() standalone "
|
||||||
|
"(closure_events + daily snapshot)")
|
||||||
|
args = ap.parse_args()
|
||||||
|
|
||||||
|
if args.geocode_clusters:
|
||||||
|
pipeline.geocode_clusters(apply=args.apply)
|
||||||
|
return
|
||||||
|
if args.geocode_locations:
|
||||||
|
pipeline.geocode_locations(apply=args.apply)
|
||||||
|
return
|
||||||
|
if args.capture_history:
|
||||||
|
pipeline.capture_history()
|
||||||
|
return
|
||||||
|
if not (args.from_bucket or args.local_csv):
|
||||||
|
ap.error("provide --from-bucket, --inc-csv, --geocode-clusters, "
|
||||||
|
"--geocode-locations, or --capture-history")
|
||||||
|
pipeline.ingest(DATASET, args)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
89
migrations/13_inc_search_fn.sql
Normal file
89
migrations/13_inc_search_fn.sql
Normal file
|
|
@ -0,0 +1,89 @@
|
||||||
|
-- 13_inc_search_fn.sql — fleettickets · INC ticket explorer (search) function
|
||||||
|
-- ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
-- reporting.fn_inc_search — ad-hoc ticket lookup by id / engineer / cluster /
|
||||||
|
-- status / state / time, for the FleetOps "Ticket explorer" card. Returns
|
||||||
|
-- { count, truncated, limit, state, rows }. Consumed by dashboard_api
|
||||||
|
-- GET /webhook/inc-search.
|
||||||
|
--
|
||||||
|
-- RECOVERED INTO VERSION CONTROL 2026-06-26: this migration was applied to the live
|
||||||
|
-- DB on 2026-06-19 but the file was never committed. Recovered verbatim from the live
|
||||||
|
-- definition (pg_get_functiondef) so a fresh DB rebuilds faithfully; the live ledger
|
||||||
|
-- already lists it, so run_migrations skips it there. The crq mirror is in 16.
|
||||||
|
-- Idempotent (CREATE OR REPLACE).
|
||||||
|
-- ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
SET search_path = tickets, public;
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION reporting.fn_inc_search(
|
||||||
|
p_ticket_id text DEFAULT NULL,
|
||||||
|
p_owner text DEFAULT NULL,
|
||||||
|
p_cluster text DEFAULT NULL,
|
||||||
|
p_status text DEFAULT NULL,
|
||||||
|
p_state text DEFAULT 'closed',
|
||||||
|
p_from timestamptz DEFAULT NULL,
|
||||||
|
p_to timestamptz DEFAULT NULL,
|
||||||
|
p_limit integer DEFAULT 500
|
||||||
|
)
|
||||||
|
RETURNS jsonb LANGUAGE plpgsql STABLE AS $function$
|
||||||
|
DECLARE
|
||||||
|
v_state text := lower(COALESCE(NULLIF(p_state, ''), 'closed'));
|
||||||
|
v_limit integer := LEAST(GREATEST(COALESCE(p_limit, 500), 1), 5000);
|
||||||
|
v_result jsonb;
|
||||||
|
BEGIN
|
||||||
|
p_ticket_id := NULLIF(trim(p_ticket_id), '');
|
||||||
|
p_owner := NULLIF(trim(p_owner), '');
|
||||||
|
p_cluster := NULLIF(p_cluster, '');
|
||||||
|
p_status := NULLIF(p_status, '');
|
||||||
|
|
||||||
|
WITH hits AS (
|
||||||
|
SELECT ticket_id, normalized_status, cluster, region, location_name,
|
||||||
|
initcap(lower(NULLIF(owner, ''))) AS owner, assigned_team,
|
||||||
|
sla_status, mttr, closed_at, created_at_service, is_actionable,
|
||||||
|
CASE WHEN geom IS NOT NULL THEN ST_Y(geom) END AS lat,
|
||||||
|
CASE WHEN geom IS NOT NULL THEN ST_X(geom) END AS lng
|
||||||
|
FROM tickets.inc
|
||||||
|
WHERE (p_ticket_id IS NULL OR ticket_id ILIKE '%' || p_ticket_id || '%')
|
||||||
|
AND (p_owner IS NULL OR lower(owner) LIKE '%' || lower(p_owner) || '%')
|
||||||
|
AND (p_cluster IS NULL OR cluster = p_cluster)
|
||||||
|
AND (p_status IS NULL OR normalized_status = p_status)
|
||||||
|
AND CASE v_state
|
||||||
|
WHEN 'open' THEN COALESCE(is_actionable, false)
|
||||||
|
WHEN 'all' THEN COALESCE(is_actionable, false)
|
||||||
|
OR (closed_at IS NOT NULL
|
||||||
|
AND (p_from IS NULL OR closed_at >= p_from)
|
||||||
|
AND (p_to IS NULL OR closed_at < p_to))
|
||||||
|
ELSE NOT COALESCE(is_actionable, false) -- 'closed'
|
||||||
|
AND closed_at IS NOT NULL
|
||||||
|
AND (p_from IS NULL OR closed_at >= p_from)
|
||||||
|
AND (p_to IS NULL OR closed_at < p_to)
|
||||||
|
END
|
||||||
|
),
|
||||||
|
total AS (SELECT count(*) AS n FROM hits),
|
||||||
|
page AS (
|
||||||
|
SELECT * FROM hits
|
||||||
|
ORDER BY closed_at DESC NULLS LAST, created_at_service DESC NULLS LAST
|
||||||
|
LIMIT v_limit
|
||||||
|
)
|
||||||
|
SELECT jsonb_build_object(
|
||||||
|
'count', (SELECT n FROM total),
|
||||||
|
'truncated', (SELECT n FROM total) > v_limit,
|
||||||
|
'limit', v_limit,
|
||||||
|
'state', v_state,
|
||||||
|
'rows', COALESCE((SELECT jsonb_agg(to_jsonb(page)
|
||||||
|
ORDER BY page.closed_at DESC NULLS LAST,
|
||||||
|
page.created_at_service DESC NULLS LAST)
|
||||||
|
FROM page), '[]'::jsonb)
|
||||||
|
) INTO v_result;
|
||||||
|
|
||||||
|
RETURN v_result;
|
||||||
|
END $function$;
|
||||||
|
|
||||||
|
DO $grants$
|
||||||
|
BEGIN
|
||||||
|
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'dashboard_ro') THEN
|
||||||
|
GRANT EXECUTE ON FUNCTION reporting.fn_inc_search(text, text, text, text, text, timestamptz, timestamptz, integer) TO dashboard_ro;
|
||||||
|
END IF;
|
||||||
|
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'grafana_ro') THEN
|
||||||
|
GRANT EXECUTE ON FUNCTION reporting.fn_inc_search(text, text, text, text, text, timestamptz, timestamptz, integer) TO grafana_ro;
|
||||||
|
END IF;
|
||||||
|
END $grants$;
|
||||||
37
migrations/14_inc_filter_options.sql
Normal file
37
migrations/14_inc_filter_options.sql
Normal file
|
|
@ -0,0 +1,37 @@
|
||||||
|
-- 14_inc_filter_options.sql — fleettickets · INC explorer dropdown options
|
||||||
|
-- ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
-- reporting.fn_inc_filter_options — distinct engineers (owner), clusters, and the
|
||||||
|
-- ids of currently-open tickets, for the FleetOps "Ticket explorer" dropdowns.
|
||||||
|
-- Consumed by dashboard_api GET /webhook/inc-filter-options.
|
||||||
|
--
|
||||||
|
-- RECOVERED INTO VERSION CONTROL 2026-06-26: applied to the live DB 2026-06-19 but
|
||||||
|
-- never committed. Recovered verbatim from the live definition so a fresh DB rebuilds
|
||||||
|
-- faithfully; the live ledger already lists it (run_migrations skips it there). The crq
|
||||||
|
-- mirror is in 16. Idempotent (CREATE OR REPLACE).
|
||||||
|
-- ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
SET search_path = tickets, public;
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION reporting.fn_inc_filter_options()
|
||||||
|
RETURNS jsonb LANGUAGE sql STABLE AS $function$
|
||||||
|
SELECT jsonb_build_object(
|
||||||
|
'owners', (SELECT COALESCE(jsonb_agg(o ORDER BY o), '[]'::jsonb)
|
||||||
|
FROM (SELECT DISTINCT initcap(lower(NULLIF(owner, ''))) AS o
|
||||||
|
FROM tickets.inc WHERE NULLIF(owner, '') IS NOT NULL) s),
|
||||||
|
'clusters', (SELECT COALESCE(jsonb_agg(c ORDER BY c), '[]'::jsonb)
|
||||||
|
FROM (SELECT DISTINCT cluster AS c
|
||||||
|
FROM tickets.inc WHERE NULLIF(cluster, '') IS NOT NULL) s),
|
||||||
|
'open_ticket_ids', (SELECT COALESCE(jsonb_agg(ticket_id ORDER BY ticket_id), '[]'::jsonb)
|
||||||
|
FROM tickets.inc WHERE COALESCE(is_actionable, false))
|
||||||
|
);
|
||||||
|
$function$;
|
||||||
|
|
||||||
|
DO $grants$
|
||||||
|
BEGIN
|
||||||
|
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'dashboard_ro') THEN
|
||||||
|
GRANT EXECUTE ON FUNCTION reporting.fn_inc_filter_options() TO dashboard_ro;
|
||||||
|
END IF;
|
||||||
|
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'grafana_ro') THEN
|
||||||
|
GRANT EXECUTE ON FUNCTION reporting.fn_inc_filter_options() TO grafana_ro;
|
||||||
|
END IF;
|
||||||
|
END $grants$;
|
||||||
101
migrations/15_crq_table.sql
Normal file
101
migrations/15_crq_table.sql
Normal file
|
|
@ -0,0 +1,101 @@
|
||||||
|
-- 15_crq_table.sql — fleettickets · materialize tickets.crq + typed columns
|
||||||
|
-- ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
-- Why a NEW migration (not an edit to 01): `01_tickets_schema.sql` was applied to the
|
||||||
|
-- live DB on 2026-06-15 from a version that PREDATED its `tickets.crq` section, so the
|
||||||
|
-- IF-NOT-EXISTS ledger guard has kept crq from ever being created there — even though
|
||||||
|
-- the live `reporting.fn_tickets_for_map` and `tickets.resolve_ticket_geoms` already
|
||||||
|
-- reference it (they error if called until crq exists). This migration creates
|
||||||
|
-- `tickets.crq` self-containedly (table + geom trigger + indexes) and adds the same
|
||||||
|
-- typed STORED generated columns INC got in `03_inc_columns.sql`, bringing CRQ to
|
||||||
|
-- data-layer parity.
|
||||||
|
--
|
||||||
|
-- Deterministic + idempotent — converges to the same shape on BOTH:
|
||||||
|
-- • the live DB (crq missing) -> CREATE makes it, ALTER adds typed cols
|
||||||
|
-- • a fresh DB (crq minimal, from 01) -> CREATE skipped, ALTER adds typed cols
|
||||||
|
-- Reuses shared objects already present: tickets.tg_ticket_geom() (01),
|
||||||
|
-- tickets.norm_cluster() (01), tickets.eat_ts() (03).
|
||||||
|
--
|
||||||
|
-- NOTE: the live DB also carries un-versioned migrations 13_inc_search_fn.sql /
|
||||||
|
-- 14_inc_filter_options.sql (applied 2026-06-19, absent from this repo) — INC dashboard
|
||||||
|
-- functions, unrelated to CRQ. Numbered 15 here to sit cleanly after the live ledger.
|
||||||
|
-- ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
SET search_path = tickets, public;
|
||||||
|
|
||||||
|
-- ── table (base shape mirrors tickets.inc's original 01 base) ────────────────
|
||||||
|
CREATE TABLE IF NOT EXISTS tickets.crq (
|
||||||
|
ticket_id text PRIMARY KEY,
|
||||||
|
raw jsonb NOT NULL,
|
||||||
|
geom geometry(Point, 4326),
|
||||||
|
geo_source text, -- 'feed' | 'location' | 'cluster' | 'none'
|
||||||
|
ingested_at timestamptz NOT NULL DEFAULT now()
|
||||||
|
);
|
||||||
|
|
||||||
|
-- ── geom trigger — read from raw; shared tickets.tg_ticket_geom() (from 01) ───
|
||||||
|
DROP TRIGGER IF EXISTS trg_crq_geom ON tickets.crq;
|
||||||
|
CREATE TRIGGER trg_crq_geom BEFORE INSERT OR UPDATE ON tickets.crq
|
||||||
|
FOR EACH ROW EXECUTE FUNCTION tickets.tg_ticket_geom();
|
||||||
|
|
||||||
|
-- ── raw-based indexes (mirror 01's inc/crq set) ──────────────────────────────
|
||||||
|
CREATE INDEX IF NOT EXISTS ix_crq_status_raw ON tickets.crq ((raw->>'normalized_status'));
|
||||||
|
CREATE INDEX IF NOT EXISTS ix_crq_actionable_raw ON tickets.crq (((raw->>'is_actionable')::boolean))
|
||||||
|
WHERE (raw->>'is_actionable')::boolean;
|
||||||
|
CREATE INDEX IF NOT EXISTS ix_crq_cluster_raw ON tickets.crq (tickets.norm_cluster(raw->>'cluster'));
|
||||||
|
CREATE INDEX IF NOT EXISTS ix_crq_loc_raw ON tickets.crq (tickets.norm_cluster(raw->>'location_name'));
|
||||||
|
CREATE INDEX IF NOT EXISTS ix_crq_geom ON tickets.crq USING gist (geom);
|
||||||
|
|
||||||
|
-- ── typed STORED generated columns (mirror of 03_inc_columns.sql) ────────────
|
||||||
|
-- Computed for ALL existing rows on creation + auto-recomputed on every insert/update;
|
||||||
|
-- `raw` stays the source of truth. tickets.eat_ts() (EAT->timestamptz, IMMUTABLE) is
|
||||||
|
-- reused from 03 — see that file's note on why IMMUTABLE is safe for Kenya (UTC+3, no DST).
|
||||||
|
ALTER TABLE tickets.crq
|
||||||
|
-- text
|
||||||
|
ADD COLUMN IF NOT EXISTS service_type text GENERATED ALWAYS AS (raw->>'service_type') STORED,
|
||||||
|
ADD COLUMN IF NOT EXISTS bucket text GENERATED ALWAYS AS (raw->>'bucket') STORED,
|
||||||
|
ADD COLUMN IF NOT EXISTS raw_status text GENERATED ALWAYS AS (raw->>'raw_status') STORED,
|
||||||
|
ADD COLUMN IF NOT EXISTS normalized_status text GENERATED ALWAYS AS (raw->>'normalized_status') STORED,
|
||||||
|
ADD COLUMN IF NOT EXISTS cluster text GENERATED ALWAYS AS (raw->>'cluster') STORED,
|
||||||
|
ADD COLUMN IF NOT EXISTS region text GENERATED ALWAYS AS (raw->>'region') STORED,
|
||||||
|
ADD COLUMN IF NOT EXISTS location_name text GENERATED ALWAYS AS (raw->>'location_name') STORED,
|
||||||
|
ADD COLUMN IF NOT EXISTS assigned_team text GENERATED ALWAYS AS (raw->>'assigned_team') STORED,
|
||||||
|
ADD COLUMN IF NOT EXISTS owner text GENERATED ALWAYS AS (raw->>'owner') STORED,
|
||||||
|
ADD COLUMN IF NOT EXISTS sla_status text GENERATED ALWAYS AS (raw->>'sla_status') STORED,
|
||||||
|
-- numeric / float
|
||||||
|
ADD COLUMN IF NOT EXISTS mttr numeric GENERATED ALWAYS AS (NULLIF(raw->>'mttr','')::numeric) STORED,
|
||||||
|
ADD COLUMN IF NOT EXISTS latitude double precision GENERATED ALWAYS AS (NULLIF(raw->>'latitude','')::double precision) STORED,
|
||||||
|
ADD COLUMN IF NOT EXISTS longitude double precision GENERATED ALWAYS AS (NULLIF(raw->>'longitude','')::double precision) STORED,
|
||||||
|
-- boolean
|
||||||
|
ADD COLUMN IF NOT EXISTS is_actionable boolean GENERATED ALWAYS AS (NULLIF(raw->>'is_actionable','')::boolean) STORED,
|
||||||
|
ADD COLUMN IF NOT EXISTS is_auto_created boolean GENERATED ALWAYS AS (NULLIF(raw->>'is_auto_created','')::boolean) STORED,
|
||||||
|
ADD COLUMN IF NOT EXISTS is_auto_closed boolean GENERATED ALWAYS AS (NULLIF(raw->>'is_auto_closed','')::boolean) STORED,
|
||||||
|
ADD COLUMN IF NOT EXISTS is_alarm boolean GENERATED ALWAYS AS (NULLIF(raw->>'is_alarm','')::boolean) STORED,
|
||||||
|
-- timestamps (EAT wall-clock -> timestamptz). created_at/updated_at are the EXPORT
|
||||||
|
-- pipeline's bookkeeping (not ticket lifecycle), hence the source_ prefix.
|
||||||
|
ADD COLUMN IF NOT EXISTS created_at_service timestamptz GENERATED ALWAYS AS (tickets.eat_ts(raw->>'created_at_service')) STORED,
|
||||||
|
ADD COLUMN IF NOT EXISTS scheduled_at timestamptz GENERATED ALWAYS AS (tickets.eat_ts(raw->>'scheduled_at')) STORED,
|
||||||
|
ADD COLUMN IF NOT EXISTS closed_at timestamptz GENERATED ALWAYS AS (tickets.eat_ts(raw->>'closed_at')) STORED,
|
||||||
|
ADD COLUMN IF NOT EXISTS last_seen_at timestamptz GENERATED ALWAYS AS (tickets.eat_ts(raw->>'last_seen_at')) STORED,
|
||||||
|
ADD COLUMN IF NOT EXISTS first_seen_at timestamptz GENERATED ALWAYS AS (tickets.eat_ts(raw->>'first_seen_at')) STORED,
|
||||||
|
ADD COLUMN IF NOT EXISTS source_created_at timestamptz GENERATED ALWAYS AS (tickets.eat_ts(raw->>'created_at')) STORED,
|
||||||
|
ADD COLUMN IF NOT EXISTS source_updated_at timestamptz GENERATED ALWAYS AS (tickets.eat_ts(raw->>'updated_at')) STORED;
|
||||||
|
|
||||||
|
-- ── typed-column indexes (serve cluster / team / closure queries) ────────────
|
||||||
|
CREATE INDEX IF NOT EXISTS ix_crq_norm_status_col ON tickets.crq (normalized_status);
|
||||||
|
CREATE INDEX IF NOT EXISTS ix_crq_cluster_col ON tickets.crq (cluster);
|
||||||
|
CREATE INDEX IF NOT EXISTS ix_crq_assigned_team ON tickets.crq (assigned_team);
|
||||||
|
CREATE INDEX IF NOT EXISTS ix_crq_closed_at ON tickets.crq (closed_at);
|
||||||
|
CREATE INDEX IF NOT EXISTS ix_crq_actionable_col ON tickets.crq (is_actionable) WHERE is_actionable;
|
||||||
|
|
||||||
|
-- ── grants (guarded: roles may not exist on a fresh DB) ──────────────────────
|
||||||
|
DO $grants$
|
||||||
|
BEGIN
|
||||||
|
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'tracksolid_owner') THEN
|
||||||
|
GRANT SELECT, INSERT, UPDATE, DELETE ON tickets.crq TO tracksolid_owner;
|
||||||
|
END IF;
|
||||||
|
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'dashboard_ro') THEN
|
||||||
|
GRANT SELECT ON tickets.crq TO dashboard_ro;
|
||||||
|
END IF;
|
||||||
|
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'grafana_ro') THEN
|
||||||
|
GRANT SELECT ON tickets.crq TO grafana_ro;
|
||||||
|
END IF;
|
||||||
|
END $grants$;
|
||||||
297
migrations/16_crq_dashboard.sql
Normal file
297
migrations/16_crq_dashboard.sql
Normal file
|
|
@ -0,0 +1,297 @@
|
||||||
|
-- 16_crq_dashboard.sql — fleettickets · CRQ dashboard parity (view + read functions)
|
||||||
|
-- ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
-- Brings CRQ to FleetOps-dashboard parity with INC, so the Tickets tab's CRQ
|
||||||
|
-- sub-tab works "just like INC". Mirrors, against tickets.crq:
|
||||||
|
-- tickets.crq_open_sla ← mirror of tickets.inc_open_sla (08)
|
||||||
|
-- reporting.fn_crq_dashboard ← mirror of reporting.fn_inc_dashboard (09/12)
|
||||||
|
-- reporting.fn_crq_search ← mirror of reporting.fn_inc_search (13)
|
||||||
|
-- reporting.fn_crq_filter_options ← mirror of reporting.fn_inc_filter_options (14)
|
||||||
|
-- consumed by dashboard_api GET /webhook/crq-dashboard | crq-search | crq-filter-options.
|
||||||
|
--
|
||||||
|
-- Differences from the INC view: tickets.crq has no `geog` column (mig 05 is INC-only)
|
||||||
|
-- and its latitude/longitude come from `raw` (empty in the feed), so crq_open_sla omits
|
||||||
|
-- geog and derives latitude/longitude from `geom`. The 48h SLA rule is reused verbatim
|
||||||
|
-- for layout parity — installation-lifecycle SLA semantics may be refined later.
|
||||||
|
--
|
||||||
|
-- Idempotent (CREATE OR REPLACE / VIEW). Requires migration 15 (tickets.crq + typed cols).
|
||||||
|
-- ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
SET search_path = tickets, public;
|
||||||
|
|
||||||
|
-- ── crq_open_sla — open CRQ tickets with derived SLA (mirror of inc_open_sla) ─
|
||||||
|
CREATE OR REPLACE VIEW tickets.crq_open_sla AS
|
||||||
|
SELECT
|
||||||
|
ticket_id,
|
||||||
|
normalized_status,
|
||||||
|
bucket,
|
||||||
|
cluster,
|
||||||
|
region,
|
||||||
|
location_name,
|
||||||
|
assigned_team,
|
||||||
|
owner,
|
||||||
|
sla_status AS source_sla_status,
|
||||||
|
mttr, -- minutes (null until closed)
|
||||||
|
COALESCE(created_at_service, first_seen_at) AS sla_clock,
|
||||||
|
CASE WHEN created_at_service IS NOT NULL THEN 'service' ELSE 'first_seen' END AS sla_clock_source,
|
||||||
|
round((EXTRACT(EPOCH FROM now() - COALESCE(created_at_service, first_seen_at)) / 3600)::numeric, 1) AS hours_open,
|
||||||
|
CASE
|
||||||
|
WHEN COALESCE(created_at_service, first_seen_at) IS NULL THEN 'unknown'
|
||||||
|
WHEN now() - COALESCE(created_at_service, first_seen_at) >= interval '48h' THEN 'breached'
|
||||||
|
WHEN now() - COALESCE(created_at_service, first_seen_at) >= interval '36h' THEN 'at_risk'
|
||||||
|
ELSE 'ok'
|
||||||
|
END AS sla_state,
|
||||||
|
created_at_service,
|
||||||
|
first_seen_at,
|
||||||
|
scheduled_at,
|
||||||
|
CASE WHEN geom IS NOT NULL THEN ST_Y(geom) END AS latitude,
|
||||||
|
CASE WHEN geom IS NOT NULL THEN ST_X(geom) END AS longitude,
|
||||||
|
geo_source,
|
||||||
|
geom
|
||||||
|
FROM tickets.crq
|
||||||
|
WHERE is_actionable;
|
||||||
|
|
||||||
|
COMMENT ON VIEW tickets.crq_open_sla IS
|
||||||
|
'Open (is_actionable) CRQ tickets with derived SLA (48h rule; clock = created_at_service '
|
||||||
|
'or first_seen_at fallback). Mirror of inc_open_sla; no geog. fleettickets 16.';
|
||||||
|
|
||||||
|
-- ── fn_crq_dashboard — mirror of fn_inc_dashboard over tickets.crq ───────────
|
||||||
|
CREATE OR REPLACE FUNCTION reporting.fn_crq_dashboard(
|
||||||
|
p_cluster text DEFAULT NULL,
|
||||||
|
p_status text DEFAULT NULL,
|
||||||
|
p_window text DEFAULT 'today',
|
||||||
|
p_from timestamptz DEFAULT NULL,
|
||||||
|
p_to timestamptz DEFAULT NULL
|
||||||
|
)
|
||||||
|
RETURNS jsonb LANGUAGE plpgsql STABLE AS $function$
|
||||||
|
DECLARE
|
||||||
|
v_now_eat timestamp;
|
||||||
|
v_from timestamptz;
|
||||||
|
v_to timestamptz;
|
||||||
|
v_preset text;
|
||||||
|
v_days numeric;
|
||||||
|
v_result jsonb;
|
||||||
|
BEGIN
|
||||||
|
p_cluster := NULLIF(p_cluster, '');
|
||||||
|
p_status := NULLIF(p_status, '');
|
||||||
|
v_now_eat := now() AT TIME ZONE 'Africa/Nairobi';
|
||||||
|
|
||||||
|
-- ── resolve the window ──────────────────────────────────────────────────────
|
||||||
|
IF p_from IS NOT NULL OR p_to IS NOT NULL THEN
|
||||||
|
v_preset := 'custom';
|
||||||
|
v_from := COALESCE(p_from, '-infinity'::timestamptz);
|
||||||
|
v_to := COALESCE(p_to, 'infinity'::timestamptz);
|
||||||
|
ELSE
|
||||||
|
v_preset := lower(COALESCE(NULLIF(p_window, ''), 'today'));
|
||||||
|
IF v_preset = 'week' THEN
|
||||||
|
v_from := date_trunc('week', v_now_eat) AT TIME ZONE 'Africa/Nairobi';
|
||||||
|
v_to := (date_trunc('week', v_now_eat) + interval '1 week') AT TIME ZONE 'Africa/Nairobi';
|
||||||
|
ELSIF v_preset = 'month' THEN
|
||||||
|
v_from := date_trunc('month', v_now_eat) AT TIME ZONE 'Africa/Nairobi';
|
||||||
|
v_to := (date_trunc('month', v_now_eat) + interval '1 month') AT TIME ZONE 'Africa/Nairobi';
|
||||||
|
ELSE
|
||||||
|
v_preset := 'today';
|
||||||
|
v_from := date_trunc('day', v_now_eat) AT TIME ZONE 'Africa/Nairobi';
|
||||||
|
v_to := (date_trunc('day', v_now_eat) + interval '1 day') AT TIME ZONE 'Africa/Nairobi';
|
||||||
|
END IF;
|
||||||
|
END IF;
|
||||||
|
|
||||||
|
IF v_from > '-infinity'::timestamptz AND v_to < 'infinity'::timestamptz THEN
|
||||||
|
v_days := GREATEST(EXTRACT(EPOCH FROM (v_to - v_from)) / 86400.0, 1);
|
||||||
|
ELSE
|
||||||
|
v_days := NULL; -- open-ended custom window → per-day average not meaningful
|
||||||
|
END IF;
|
||||||
|
|
||||||
|
-- ── build payload ───────────────────────────────────────────────────────────
|
||||||
|
WITH open_t AS (
|
||||||
|
SELECT * FROM tickets.crq_open_sla
|
||||||
|
WHERE (p_cluster IS NULL OR cluster = p_cluster)
|
||||||
|
AND (p_status IS NULL OR normalized_status = p_status)
|
||||||
|
),
|
||||||
|
closed_t AS (
|
||||||
|
SELECT ticket_id, normalized_status, cluster, region, location_name,
|
||||||
|
assigned_team, owner, closed_at, mttr, sla_status, geo_source, geom
|
||||||
|
FROM tickets.crq
|
||||||
|
WHERE NOT COALESCE(is_actionable, false)
|
||||||
|
AND closed_at IS NOT NULL
|
||||||
|
AND closed_at >= v_from AND closed_at < v_to
|
||||||
|
AND (p_cluster IS NULL OR cluster = p_cluster)
|
||||||
|
AND (p_status IS NULL OR normalized_status = p_status)
|
||||||
|
)
|
||||||
|
SELECT jsonb_build_object(
|
||||||
|
'window', jsonb_build_object('from', v_from, 'to', v_to, 'preset', v_preset),
|
||||||
|
|
||||||
|
'open', jsonb_build_object(
|
||||||
|
'type', 'FeatureCollection',
|
||||||
|
'features', COALESCE((
|
||||||
|
SELECT jsonb_agg(jsonb_build_object(
|
||||||
|
'type', 'Feature',
|
||||||
|
'properties', jsonb_build_object(
|
||||||
|
'ticket_id', ticket_id, 'normalized_status', normalized_status,
|
||||||
|
'cluster', cluster, 'region', region, 'location_name', location_name,
|
||||||
|
'assigned_team', assigned_team, 'owner', initcap(lower(NULLIF(owner, ''))),
|
||||||
|
'geo_source', geo_source,
|
||||||
|
'sla_state', sla_state, 'hours_open', hours_open),
|
||||||
|
'geometry', ST_AsGeoJSON(geom)::jsonb))
|
||||||
|
FROM open_t WHERE geom IS NOT NULL), '[]'::jsonb)
|
||||||
|
),
|
||||||
|
|
||||||
|
'closed', jsonb_build_object(
|
||||||
|
'type', 'FeatureCollection',
|
||||||
|
'features', COALESCE((
|
||||||
|
SELECT jsonb_agg(jsonb_build_object(
|
||||||
|
'type', 'Feature',
|
||||||
|
'properties', jsonb_build_object(
|
||||||
|
'ticket_id', ticket_id, 'normalized_status', normalized_status,
|
||||||
|
'cluster', cluster, 'region', region, 'location_name', location_name,
|
||||||
|
'assigned_team', assigned_team, 'owner', initcap(lower(NULLIF(owner, ''))),
|
||||||
|
'geo_source', geo_source,
|
||||||
|
'closed_at', closed_at, 'mttr', mttr, 'sla_status', sla_status),
|
||||||
|
'geometry', ST_AsGeoJSON(geom)::jsonb))
|
||||||
|
FROM closed_t WHERE geom IS NOT NULL), '[]'::jsonb)
|
||||||
|
),
|
||||||
|
|
||||||
|
'metrics', jsonb_build_object(
|
||||||
|
'open_now', (SELECT count(*) FROM open_t),
|
||||||
|
'closed_in_window', (SELECT count(*) FROM closed_t),
|
||||||
|
'sla', jsonb_build_object(
|
||||||
|
'open', (SELECT jsonb_build_object(
|
||||||
|
'breached', count(*) FILTER (WHERE sla_state = 'breached'),
|
||||||
|
'at_risk', count(*) FILTER (WHERE sla_state = 'at_risk'),
|
||||||
|
'ok', count(*) FILTER (WHERE sla_state = 'ok'),
|
||||||
|
'unknown', count(*) FILTER (WHERE sla_state = 'unknown')) FROM open_t),
|
||||||
|
'closed', (SELECT jsonb_build_object(
|
||||||
|
'compliant', count(*) FILTER (WHERE sla_status = 'Compliant'),
|
||||||
|
'breached', count(*) FILTER (WHERE sla_status = 'Breached')) FROM closed_t)
|
||||||
|
),
|
||||||
|
'by_status', COALESCE((SELECT jsonb_object_agg(s, c) FROM (
|
||||||
|
SELECT COALESCE(normalized_status, '(none)') AS s, count(*) AS c FROM (
|
||||||
|
SELECT normalized_status FROM open_t
|
||||||
|
UNION ALL SELECT normalized_status FROM closed_t) u GROUP BY 1) z), '{}'::jsonb),
|
||||||
|
'by_cluster', COALESCE((SELECT jsonb_object_agg(cl, c) FROM (
|
||||||
|
SELECT COALESCE(cluster, '(none)') AS cl, count(*) AS c FROM (
|
||||||
|
SELECT cluster FROM open_t
|
||||||
|
UNION ALL SELECT cluster FROM closed_t) u GROUP BY 1) z), '{}'::jsonb),
|
||||||
|
-- closures by engineer (CASE-NORMALIZED owner) — leaderboard for "who closed".
|
||||||
|
'by_owner', COALESCE((SELECT jsonb_agg(jsonb_build_object(
|
||||||
|
'owner', o, 'closed', c, 'breached', b, 'avg_mttr_min', a) ORDER BY c DESC, o)
|
||||||
|
FROM (
|
||||||
|
SELECT COALESCE(initcap(lower(NULLIF(owner, ''))), '(unattributed)') AS o,
|
||||||
|
count(*) AS c,
|
||||||
|
count(*) FILTER (WHERE sla_status = 'Breached') AS b,
|
||||||
|
round(avg(mttr) FILTER (WHERE mttr IS NOT NULL), 1) AS a
|
||||||
|
FROM closed_t GROUP BY 1) z), '[]'::jsonb),
|
||||||
|
'closure_rate', jsonb_build_object(
|
||||||
|
'per_day_avg', CASE WHEN v_days IS NULL THEN NULL
|
||||||
|
ELSE round((SELECT count(*) FROM closed_t)::numeric / v_days, 2) END,
|
||||||
|
'series', COALESCE((SELECT jsonb_agg(jsonb_build_object('day', d, 'count', c) ORDER BY d) FROM (
|
||||||
|
SELECT (closed_at AT TIME ZONE 'Africa/Nairobi')::date AS d, count(*) AS c
|
||||||
|
FROM closed_t GROUP BY 1) z), '[]'::jsonb)
|
||||||
|
),
|
||||||
|
'avg_mttr_min', (SELECT round(avg(mttr), 1) FROM closed_t WHERE mttr IS NOT NULL)
|
||||||
|
),
|
||||||
|
|
||||||
|
'freshness', (SELECT jsonb_object_agg(dataset, jsonb_build_object(
|
||||||
|
'export_type', export_type, 'exported_at', exported_at,
|
||||||
|
'records_ingested', records_ingested, 'ingested_at', ingested_at))
|
||||||
|
FROM tickets.import_meta)
|
||||||
|
) INTO v_result;
|
||||||
|
|
||||||
|
RETURN v_result;
|
||||||
|
END $function$;
|
||||||
|
|
||||||
|
-- ── fn_crq_search — mirror of fn_inc_search over tickets.crq ──────────────────
|
||||||
|
CREATE OR REPLACE FUNCTION reporting.fn_crq_search(
|
||||||
|
p_ticket_id text DEFAULT NULL,
|
||||||
|
p_owner text DEFAULT NULL,
|
||||||
|
p_cluster text DEFAULT NULL,
|
||||||
|
p_status text DEFAULT NULL,
|
||||||
|
p_state text DEFAULT 'closed',
|
||||||
|
p_from timestamptz DEFAULT NULL,
|
||||||
|
p_to timestamptz DEFAULT NULL,
|
||||||
|
p_limit integer DEFAULT 500
|
||||||
|
)
|
||||||
|
RETURNS jsonb LANGUAGE plpgsql STABLE AS $function$
|
||||||
|
DECLARE
|
||||||
|
v_state text := lower(COALESCE(NULLIF(p_state, ''), 'closed'));
|
||||||
|
v_limit integer := LEAST(GREATEST(COALESCE(p_limit, 500), 1), 5000);
|
||||||
|
v_result jsonb;
|
||||||
|
BEGIN
|
||||||
|
p_ticket_id := NULLIF(trim(p_ticket_id), '');
|
||||||
|
p_owner := NULLIF(trim(p_owner), '');
|
||||||
|
p_cluster := NULLIF(p_cluster, '');
|
||||||
|
p_status := NULLIF(p_status, '');
|
||||||
|
|
||||||
|
WITH hits AS (
|
||||||
|
SELECT ticket_id, normalized_status, cluster, region, location_name,
|
||||||
|
initcap(lower(NULLIF(owner, ''))) AS owner, assigned_team,
|
||||||
|
sla_status, mttr, closed_at, created_at_service, is_actionable,
|
||||||
|
CASE WHEN geom IS NOT NULL THEN ST_Y(geom) END AS lat,
|
||||||
|
CASE WHEN geom IS NOT NULL THEN ST_X(geom) END AS lng
|
||||||
|
FROM tickets.crq
|
||||||
|
WHERE (p_ticket_id IS NULL OR ticket_id ILIKE '%' || p_ticket_id || '%')
|
||||||
|
AND (p_owner IS NULL OR lower(owner) LIKE '%' || lower(p_owner) || '%')
|
||||||
|
AND (p_cluster IS NULL OR cluster = p_cluster)
|
||||||
|
AND (p_status IS NULL OR normalized_status = p_status)
|
||||||
|
AND CASE v_state
|
||||||
|
WHEN 'open' THEN COALESCE(is_actionable, false)
|
||||||
|
WHEN 'all' THEN COALESCE(is_actionable, false)
|
||||||
|
OR (closed_at IS NOT NULL
|
||||||
|
AND (p_from IS NULL OR closed_at >= p_from)
|
||||||
|
AND (p_to IS NULL OR closed_at < p_to))
|
||||||
|
ELSE NOT COALESCE(is_actionable, false) -- 'closed'
|
||||||
|
AND closed_at IS NOT NULL
|
||||||
|
AND (p_from IS NULL OR closed_at >= p_from)
|
||||||
|
AND (p_to IS NULL OR closed_at < p_to)
|
||||||
|
END
|
||||||
|
),
|
||||||
|
total AS (SELECT count(*) AS n FROM hits),
|
||||||
|
page AS (
|
||||||
|
SELECT * FROM hits
|
||||||
|
ORDER BY closed_at DESC NULLS LAST, created_at_service DESC NULLS LAST
|
||||||
|
LIMIT v_limit
|
||||||
|
)
|
||||||
|
SELECT jsonb_build_object(
|
||||||
|
'count', (SELECT n FROM total),
|
||||||
|
'truncated', (SELECT n FROM total) > v_limit,
|
||||||
|
'limit', v_limit,
|
||||||
|
'state', v_state,
|
||||||
|
'rows', COALESCE((SELECT jsonb_agg(to_jsonb(page)
|
||||||
|
ORDER BY page.closed_at DESC NULLS LAST,
|
||||||
|
page.created_at_service DESC NULLS LAST)
|
||||||
|
FROM page), '[]'::jsonb)
|
||||||
|
) INTO v_result;
|
||||||
|
|
||||||
|
RETURN v_result;
|
||||||
|
END $function$;
|
||||||
|
|
||||||
|
-- ── fn_crq_filter_options — mirror of fn_inc_filter_options over tickets.crq ──
|
||||||
|
CREATE OR REPLACE FUNCTION reporting.fn_crq_filter_options()
|
||||||
|
RETURNS jsonb LANGUAGE sql STABLE AS $function$
|
||||||
|
SELECT jsonb_build_object(
|
||||||
|
'owners', (SELECT COALESCE(jsonb_agg(o ORDER BY o), '[]'::jsonb)
|
||||||
|
FROM (SELECT DISTINCT initcap(lower(NULLIF(owner, ''))) AS o
|
||||||
|
FROM tickets.crq WHERE NULLIF(owner, '') IS NOT NULL) s),
|
||||||
|
'clusters', (SELECT COALESCE(jsonb_agg(c ORDER BY c), '[]'::jsonb)
|
||||||
|
FROM (SELECT DISTINCT cluster AS c
|
||||||
|
FROM tickets.crq WHERE NULLIF(cluster, '') IS NOT NULL) s),
|
||||||
|
'open_ticket_ids', (SELECT COALESCE(jsonb_agg(ticket_id ORDER BY ticket_id), '[]'::jsonb)
|
||||||
|
FROM tickets.crq WHERE COALESCE(is_actionable, false))
|
||||||
|
);
|
||||||
|
$function$;
|
||||||
|
|
||||||
|
-- ── grants (guarded: roles may not exist on a fresh DB) ──────────────────────
|
||||||
|
DO $grants$
|
||||||
|
BEGIN
|
||||||
|
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'dashboard_ro') THEN
|
||||||
|
GRANT SELECT ON tickets.crq_open_sla TO dashboard_ro;
|
||||||
|
GRANT EXECUTE ON FUNCTION reporting.fn_crq_dashboard(text, text, text, timestamptz, timestamptz) TO dashboard_ro;
|
||||||
|
GRANT EXECUTE ON FUNCTION reporting.fn_crq_search(text, text, text, text, text, timestamptz, timestamptz, integer) TO dashboard_ro;
|
||||||
|
GRANT EXECUTE ON FUNCTION reporting.fn_crq_filter_options() TO dashboard_ro;
|
||||||
|
END IF;
|
||||||
|
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'grafana_ro') THEN
|
||||||
|
GRANT SELECT ON tickets.crq_open_sla TO grafana_ro;
|
||||||
|
GRANT EXECUTE ON FUNCTION reporting.fn_crq_dashboard(text, text, text, timestamptz, timestamptz) TO grafana_ro;
|
||||||
|
GRANT EXECUTE ON FUNCTION reporting.fn_crq_search(text, text, text, text, text, timestamptz, timestamptz, integer) TO grafana_ro;
|
||||||
|
GRANT EXECUTE ON FUNCTION reporting.fn_crq_filter_options() TO grafana_ro;
|
||||||
|
END IF;
|
||||||
|
END $grants$;
|
||||||
|
|
@ -1,156 +0,0 @@
|
||||||
# n8n Hourly S3 Full-Data Exports
|
|
||||||
|
|
||||||
Updated on June 15, 2026.
|
|
||||||
|
|
||||||
## Overview
|
|
||||||
|
|
||||||
Two active n8n workflows export complete datasets to S3 every hour:
|
|
||||||
|
|
||||||
1. `FTTH Automation Ticket S3 Export`
|
|
||||||
2. `Fuel Records S3 Export`
|
|
||||||
|
|
||||||
Each execution creates CSV files only. Filenames use the actual execution time
|
|
||||||
in the `Africa/Nairobi` timezone.
|
|
||||||
|
|
||||||
No delta files, JSON files, `latest` files, `changes/` directories, `full/`
|
|
||||||
directories, or midnight-specific exports are created.
|
|
||||||
|
|
||||||
## Hourly Output
|
|
||||||
|
|
||||||
Together, the two workflows create exactly three files during their hourly
|
|
||||||
executions:
|
|
||||||
|
|
||||||
```text
|
|
||||||
automations/crq/YYYY-MM-DDTHH-mm-ss.csv
|
|
||||||
automations/inc/YYYY-MM-DDTHH-mm-ss.csv
|
|
||||||
fuel_records/YYYY-MM-DDTHH-mm-ss.csv
|
|
||||||
```
|
|
||||||
|
|
||||||
The CRQ and INC files are uploaded to the `tickets` bucket. The Fuel file is
|
|
||||||
uploaded to the `fuel` bucket.
|
|
||||||
|
|
||||||
## FTTH Automation Ticket S3 Export
|
|
||||||
|
|
||||||
Workflow ID: `JI3QkcJeHk9eYRsY`
|
|
||||||
|
|
||||||
The workflow:
|
|
||||||
|
|
||||||
1. Runs at the start of every hour using the `Africa/Nairobi` workflow timezone.
|
|
||||||
2. Creates one execution timestamp.
|
|
||||||
3. Calls the existing authenticated Scoreboard export endpoint with
|
|
||||||
`export_type: full`.
|
|
||||||
4. Reads all CRQ and INC rows returned by the endpoint.
|
|
||||||
5. Converts each complete dataset to CSV.
|
|
||||||
6. Uploads exactly two files:
|
|
||||||
- `automations/crq/<execution-timestamp>.csv`
|
|
||||||
- `automations/inc/<execution-timestamp>.csv`
|
|
||||||
7. Fails the execution if exactly two successful upload results are not
|
|
||||||
returned.
|
|
||||||
|
|
||||||
The workflow still has its existing manual webhook for operational testing.
|
|
||||||
|
|
||||||
## Fuel Records S3 Export
|
|
||||||
|
|
||||||
Workflow ID: `IP2KNAfFazAjTesh`
|
|
||||||
|
|
||||||
The workflow:
|
|
||||||
|
|
||||||
1. Runs at the start of every hour using the `Africa/Nairobi` workflow timezone.
|
|
||||||
2. Creates one execution timestamp.
|
|
||||||
3. Reads the complete `logistics_department.fuel_records` table.
|
|
||||||
4. Converts all returned rows to one CSV.
|
|
||||||
5. Uploads exactly one file:
|
|
||||||
- `fuel_records/<execution-timestamp>.csv`
|
|
||||||
6. Fails the execution if the S3 upload reports an error.
|
|
||||||
|
|
||||||
The workflow still has its existing manual webhook for operational testing.
|
|
||||||
|
|
||||||
## Timestamp Format
|
|
||||||
|
|
||||||
The timestamp format is:
|
|
||||||
|
|
||||||
```text
|
|
||||||
YYYY-MM-DDTHH-mm-ss
|
|
||||||
```
|
|
||||||
|
|
||||||
Example:
|
|
||||||
|
|
||||||
```text
|
|
||||||
2026-06-15T14-39-53
|
|
||||||
```
|
|
||||||
|
|
||||||
The timestamp is generated once at the start of each workflow execution and is
|
|
||||||
formatted in `Africa/Nairobi`.
|
|
||||||
|
|
||||||
## Credentials and Safety
|
|
||||||
|
|
||||||
- Existing n8n PostgreSQL, S3, workflow-variable, and API token configuration is
|
|
||||||
reused.
|
|
||||||
- No S3 credentials or API secrets are hard-coded in workflow code.
|
|
||||||
- Secrets are not included in workflow result messages.
|
|
||||||
- Source database queries are read-only.
|
|
||||||
- The workflows do not delete or update source database rows.
|
|
||||||
- S3 upload nodes retain retry handling. A failed hourly execution can also be
|
|
||||||
recovered naturally by the next full-data run.
|
|
||||||
|
|
||||||
## Removed Behavior
|
|
||||||
|
|
||||||
The workflows no longer contain:
|
|
||||||
|
|
||||||
- Delta export logic or stored delta pointers
|
|
||||||
- Midnight full-export schedules
|
|
||||||
- `latest.json` or `latest.csv`
|
|
||||||
- JSON output
|
|
||||||
- `changes/` keys
|
|
||||||
- `full/` keys
|
|
||||||
- Multipart or additional export files
|
|
||||||
- FTTH mark-sent state handling
|
|
||||||
|
|
||||||
## Deployment Status
|
|
||||||
|
|
||||||
Both workflows were saved, published, and activated on June 15, 2026.
|
|
||||||
|
|
||||||
Active versions:
|
|
||||||
|
|
||||||
```text
|
|
||||||
Fuel Records S3 Export:
|
|
||||||
60cf5824-9345-45bb-a2eb-3b20b877fd32
|
|
||||||
|
|
||||||
FTTH Automation Ticket S3 Export:
|
|
||||||
68b7be10-ac3a-43d8-8c17-b46a2cbb48d2
|
|
||||||
```
|
|
||||||
|
|
||||||
## Manual Test Evidence
|
|
||||||
|
|
||||||
### Fuel Records S3 Export
|
|
||||||
|
|
||||||
Execution ID: `404079`
|
|
||||||
|
|
||||||
Rows exported: `2001`
|
|
||||||
|
|
||||||
Exact S3 key:
|
|
||||||
|
|
||||||
```text
|
|
||||||
fuel_records/2026-06-15T14-39-50.csv
|
|
||||||
```
|
|
||||||
|
|
||||||
### FTTH Automation Ticket S3 Export
|
|
||||||
|
|
||||||
Execution ID: `404080`
|
|
||||||
|
|
||||||
Rows exported:
|
|
||||||
|
|
||||||
```text
|
|
||||||
CRQ: 12680
|
|
||||||
INC: 31434
|
|
||||||
```
|
|
||||||
|
|
||||||
Exact S3 keys:
|
|
||||||
|
|
||||||
```text
|
|
||||||
automations/crq/2026-06-15T14-39-53.csv
|
|
||||||
automations/inc/2026-06-15T14-39-53.csv
|
|
||||||
```
|
|
||||||
|
|
||||||
Both manual tests completed successfully. Their upload builders generated one
|
|
||||||
Fuel item and exactly two FTTH items, matching the required three output files.
|
|
||||||
130
n8n-s3-ticket-exports.md
Normal file
130
n8n-s3-ticket-exports.md
Normal file
|
|
@ -0,0 +1,130 @@
|
||||||
|
# n8n S3 Ticket Exports — Incremental (CDC) Stream
|
||||||
|
|
||||||
|
Updated on June 23, 2026.
|
||||||
|
|
||||||
|
> **History.** This doc previously described a full-snapshot-per-hour model
|
||||||
|
> ("No delta files … No `changes/` directories"). That is no longer accurate.
|
||||||
|
> As of the June 22, 2026 re-seed the source switched to an **incremental
|
||||||
|
> change-data-capture (CDC) stream** under `automations/<dataset>/changes/`.
|
||||||
|
> The structure below was verified by direct S3 inspection of the `tickets`
|
||||||
|
> bucket on June 23, 2026. Workflow-internal details (IDs, node behaviour) are
|
||||||
|
> carried over from the prior version and may be stale — trust the bucket.
|
||||||
|
|
||||||
|
## Overview
|
||||||
|
|
||||||
|
The FTTH ticket export now writes an **incremental** CSV stream per dataset:
|
||||||
|
|
||||||
|
- The **first** file in a stream is a full current-state **baseline**.
|
||||||
|
- Every **later** file holds **only the rows that changed** since the prior
|
||||||
|
export — new and updated tickets, keyed by `ticket_id`.
|
||||||
|
- **Deletions are never emitted** (tickets are closed in place, not removed).
|
||||||
|
|
||||||
|
Consumers must ingest **every not-yet-processed file in ascending timestamp
|
||||||
|
order** (baseline first, then each delta) and **upsert on `ticket_id`**. Taking
|
||||||
|
only the newest file silently drops the intermediate deltas.
|
||||||
|
|
||||||
|
CSV files only. Filenames use the execution time in the `Africa/Nairobi`
|
||||||
|
timezone (format below). All files share one identical flat-CSV schema (header
|
||||||
|
+ rows) — the same column set the previous full snapshots used.
|
||||||
|
|
||||||
|
## Output Layout
|
||||||
|
|
||||||
|
The change stream lives under a `changes/` prefix per dataset in the `tickets`
|
||||||
|
bucket:
|
||||||
|
|
||||||
|
```text
|
||||||
|
automations/crq/changes/YYYY-MM-DDTHH-mm-ss.csv
|
||||||
|
automations/inc/changes/YYYY-MM-DDTHH-mm-ss.csv
|
||||||
|
```
|
||||||
|
|
||||||
|
Observed `tickets` bucket layout (June 23, 2026):
|
||||||
|
|
||||||
|
```text
|
||||||
|
automations/inc/
|
||||||
|
├── changes/ ← ACTIVE incremental stream (baseline + deltas)
|
||||||
|
│ ├── 2026-06-22T15-50-39.csv (baseline, ~15 MB, 34,849 rows)
|
||||||
|
│ ├── 2026-06-22T15-53-04.csv (delta, 1 row)
|
||||||
|
│ ├── 2026-06-22T17-10-41.csv (delta, 22 rows)
|
||||||
|
│ └── 2026-06-22T17-15-41.csv (delta, 131 rows)
|
||||||
|
├── processed/ ← our pipeline's archive of consumed files
|
||||||
|
├── full/ ← present but EMPTY (legacy prefix)
|
||||||
|
├── latest.csv/ ← present but EMPTY (legacy prefix)
|
||||||
|
└── latest.json/ ← present but EMPTY (legacy prefix)
|
||||||
|
```
|
||||||
|
|
||||||
|
Notes:
|
||||||
|
|
||||||
|
- There are **no longer any `automations/inc/<ts>.csv` files at the root** of
|
||||||
|
`inc/` — the last full snapshots (through `2026-06-18T17-00-05.csv`) were
|
||||||
|
archived to `processed/`. New data arrives **only** under `changes/`.
|
||||||
|
- The `full/`, `latest.csv/`, and `latest.json/` prefixes still appear in
|
||||||
|
listings but contain **no objects** (leftover/legacy; ignore them). There is
|
||||||
|
no `latest` pointer and no JSON/metadata envelope.
|
||||||
|
- **CRQ mirrors INC**: `automations/crq/changes/` carries the same incremental
|
||||||
|
stream (with matching baseline timestamps and additional newer deltas) and the
|
||||||
|
identical 32-column schema. As of 2026-06-25 CRQ **is consumed** — by
|
||||||
|
`crq/import_crq.py` over the shared `pipeline.py` engine (`tickets.crq`), the same
|
||||||
|
way INC is consumed by `inc/import_inc.py`. CRQ's old root snapshots
|
||||||
|
(`automations/crq/<ts>.csv`, old `tickets` bucket) are still present because nothing
|
||||||
|
archives them — they are not consumed (the `changes/` stream is the source of truth).
|
||||||
|
|
||||||
|
## CSV Schema
|
||||||
|
|
||||||
|
Header (32 columns), identical across baseline and delta files:
|
||||||
|
|
||||||
|
```text
|
||||||
|
ticket_id, source_type, service_type, bucket, raw_status, normalized_status,
|
||||||
|
created_at_service, scheduled_at, closed_at, last_seen_at, first_seen_at,
|
||||||
|
week_start, week_end, cluster, region, location_name, latitude, longitude,
|
||||||
|
department, assigned_team, owner, sla_status, mttr, is_auto_created,
|
||||||
|
is_auto_closed, is_alarm, is_actionable, source_s3_bucket, source_s3_key,
|
||||||
|
source_snapshot_id, created_at, updated_at
|
||||||
|
```
|
||||||
|
|
||||||
|
Each row is a complete record (not a partial diff): a delta row carries the
|
||||||
|
ticket's full current state, so a plain upsert on `ticket_id` is correct. The
|
||||||
|
baseline still contains `is_alarm=true` rows and may include a leading
|
||||||
|
`EXPORT STOPPED…` truncation-sentinel row in `ticket_id`; both are filtered by
|
||||||
|
the consumer (see `pipeline.py` + the `inc/`,`crq/` entrypoints).
|
||||||
|
|
||||||
|
## Timestamp Format
|
||||||
|
|
||||||
|
```text
|
||||||
|
YYYY-MM-DDTHH-mm-ss e.g. 2026-06-22T15-50-39
|
||||||
|
```
|
||||||
|
|
||||||
|
Generated once at the start of each execution, formatted in `Africa/Nairobi`
|
||||||
|
(EAT). Note this is the *execution* time, not a top-of-the-hour schedule — the
|
||||||
|
incremental files appear whenever a change batch is exported (multiple within
|
||||||
|
the same hour are normal, e.g. `15-50-39` then `15-53-04`).
|
||||||
|
|
||||||
|
## How the loader Consumes It
|
||||||
|
|
||||||
|
`python -m inc.import_inc --from-bucket --apply` (and `python -m crq.import_crq
|
||||||
|
--from-bucket --apply`; see `run_ingest.sh`) — both over the shared `pipeline.py` engine:
|
||||||
|
|
||||||
|
1. Lists `automations/<inc|crq>/changes/<ts>.csv`, sorts ascending by timestamp.
|
||||||
|
2. Skips files at/older than the **watermark**
|
||||||
|
(`tickets.import_meta.metadata->>'source_max_key'` — the newest file already
|
||||||
|
applied); on a fresh stream it processes everything present.
|
||||||
|
3. For each pending file, oldest→newest: drop `is_alarm=true` + sentinel rows,
|
||||||
|
strip `DROP_FIELDS`, normalize `region`/`raw_status`, then upsert on
|
||||||
|
`ticket_id`. The row upsert and the watermark advance **commit in one
|
||||||
|
transaction per file**, after which the file is moved to
|
||||||
|
`automations/inc/processed/`.
|
||||||
|
4. A mid-run failure therefore leaves folder state consistent with the
|
||||||
|
watermark; the next run resumes cleanly from where it stopped.
|
||||||
|
|
||||||
|
The first file applied onto an empty watermark is recorded as
|
||||||
|
`export_type="baseline"` in `tickets.import_meta`; every file after is `"delta"`.
|
||||||
|
|
||||||
|
## Workflow Context (carried over — verify before relying on)
|
||||||
|
|
||||||
|
The export originates from the FTTH Automation Ticket export workflow, calling
|
||||||
|
the authenticated Scoreboard export endpoint and uploading CSV(s) to the
|
||||||
|
`tickets` bucket; a sibling workflow exports `fuel_records/<ts>.csv` to the
|
||||||
|
`fuel` bucket. Source DB queries are read-only and the workflows do not delete
|
||||||
|
or update source rows. The previously documented workflow IDs and the claim of
|
||||||
|
"two files per hour, full snapshots, no `changes/`" predate the June 22 switch
|
||||||
|
to the incremental stream and should be re-confirmed against the live n8n
|
||||||
|
configuration before being treated as current.
|
||||||
|
|
@ -1,60 +1,51 @@
|
||||||
"""
|
"""
|
||||||
import_tickets.py — Fireside Communications · INC ticket ingestion (raw-first)
|
pipeline.py — Fireside Communications · generic ticket ingestion engine (raw-first)
|
||||||
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||||||
Loads the client's field-ops INC ticket snapshots into the `tickets` schema — the
|
The dataset-agnostic core shared by the per-type entrypoints:
|
||||||
source of the FleetOps "Tickets" map.
|
inc/import_inc.py -> tickets.inc (incidents / customer faults)
|
||||||
tickets.inc — incidents / customer faults
|
crq/import_crq.py -> tickets.crq (new-installation requests)
|
||||||
|
|
||||||
STRICTLY INC: CRQ (new-installation) exports are out of scope and not processed
|
Both datasets share an IDENTICAL flat-CSV schema and the same CDC change stream,
|
||||||
here. `tickets.crq` stays in the schema but is not fed by this pipeline.
|
so the only differences are the table, the S3 prefixes, the import_meta dataset
|
||||||
|
key, and an optional post-apply hook (INC captures closure/backlog history; CRQ
|
||||||
|
does not yet). Those are carried by the `Dataset` config; everything else here is
|
||||||
|
generic. Geocoding is inherently CROSS-DATASET (one gazetteer, one geocoder
|
||||||
|
budget): geocode_clusters / geocode_locations / resolve operate on BOTH tables and
|
||||||
|
are driven from a single entrypoint (the INC one) — never duplicated per dataset.
|
||||||
|
|
||||||
RAW-FIRST: each row stores only `ticket_id` + `raw` (the source record as jsonb).
|
RAW-FIRST: each row stores only `ticket_id` + `raw` (the source record as jsonb).
|
||||||
Everything downstream reads from `raw` (resilient to source schema drift). The DB
|
Everything downstream reads from `raw` (resilient to source schema drift). The DB
|
||||||
derives `geom` (see migrations): feed coords (raw lat/lng) -> location geocode
|
derives `geom` (see migrations): feed coords (raw lat/lng) -> location geocode
|
||||||
(tickets.geo_locations) -> cluster centroid (tickets.geo_clusters) -> none.
|
(tickets.geo_locations) -> cluster centroid (tickets.geo_clusters) -> none.
|
||||||
|
|
||||||
Source data: the n8n hourly S3 export (see n8n-hourly-s3-full-data-exports.md)
|
Source data: the n8n S3 export writes CSV files to the `isptickets` bucket under
|
||||||
writes a full current-state snapshot CSV per hour to the `tickets` bucket at
|
automations/<dataset>/changes/<EAT-timestamp>.csv (e.g. 2026-06-24T09-55-44.csv)
|
||||||
automations/inc/<EAT-timestamp>.csv (e.g. 2026-06-15T17-00-00.csv)
|
This is an INCREMENTAL (CDC) stream: the first file is a full current-state
|
||||||
There is NO latest pointer, NO metadata envelope, and NO deltas — each file is a
|
baseline, and every later file holds only the rows that CHANGED since the prior
|
||||||
flat CSV (header + rows). We ingest the NEWEST file:
|
export (with periodic full-state re-emissions). Deletions are never emitted. Every
|
||||||
- skip-if-unchanged: if its S3 ETag matches the last processed file's ETag we
|
file shares the identical flat-CSV schema. We ingest EVERY not-yet-processed file
|
||||||
skip the DB write (the export re-emits byte-identical content most hours);
|
in ASCENDING timestamp order (baseline first, then each delta) — taking only the
|
||||||
|
newest would silently drop the intermediate deltas:
|
||||||
- drop is_alarm=true rows + the "EXPORT STOPPED…" truncation-sentinel row;
|
- drop is_alarm=true rows + the "EXPORT STOPPED…" truncation-sentinel row;
|
||||||
- drop derivable / provenance / zero-info columns (see DROP_FIELDS);
|
- drop derivable / provenance / zero-info columns (see DROP_FIELDS);
|
||||||
- normalize region -> lowercase, raw_status -> UPPERCASE;
|
- normalize region -> lowercase, raw_status -> UPPERCASE;
|
||||||
- upsert on ticket_id (PRIMARY KEY → no duplication; never delete, so closure
|
- upsert on ticket_id (PRIMARY KEY → no duplication; never delete, so closure
|
||||||
history accumulates), and record snapshot freshness in tickets.import_meta;
|
history accumulates), and advance the watermark in tickets.import_meta
|
||||||
- on success, MOVE the file to automations/inc/processed/ (copy + delete).
|
(metadata->>'source_max_key' = newest file applied) so reruns skip what's done;
|
||||||
|
- on success, MOVE each file to automations/<dataset>/processed/ (copy + delete).
|
||||||
Geocoding (two layers, both via a KEYED provider — public Nominatim rate-limits):
|
|
||||||
--geocode-clusters one coordinate per cluster (coarse fallback; ~50 lookups)
|
|
||||||
--geocode-locations precise per-location for ACTIONABLE INC tickets: parses the
|
|
||||||
real place out of location_name (region+cluster+location_name,
|
|
||||||
network codes stripped), geocodes it, caches in
|
|
||||||
tickets.geo_locations, then re-resolves geoms.
|
|
||||||
Provider/key from env: GEOCODER_PROVIDER (locationiq|opencage), GEOCODER_API_KEY.
|
|
||||||
|
|
||||||
Usage (needs DATABASE_URL + RUSTFS_* + GEOCODER_* env; see .env.example):
|
|
||||||
python import_tickets.py --from-bucket --apply
|
|
||||||
python import_tickets.py --inc-csv 2026-06-15T17-00-00.csv --apply
|
|
||||||
python import_tickets.py --geocode-clusters --apply
|
|
||||||
python import_tickets.py --geocode-locations --apply
|
|
||||||
|
|
||||||
Pre-requisite: migration applied (run_migrations.py) — tickets.inc/crq +
|
|
||||||
geo_clusters + geo_locations + reporting.fn_tickets_for_map.
|
|
||||||
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import argparse
|
|
||||||
import csv
|
|
||||||
import io
|
import io
|
||||||
|
import csv
|
||||||
import math
|
import math
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
import time
|
import time
|
||||||
|
from collections.abc import Callable
|
||||||
|
from dataclasses import dataclass
|
||||||
from datetime import datetime, timezone, timedelta
|
from datetime import datetime, timezone, timedelta
|
||||||
|
|
||||||
import boto3
|
import boto3
|
||||||
|
|
@ -64,14 +55,10 @@ from botocore.config import Config as BotoConfig
|
||||||
|
|
||||||
from shared import clean, get_conn, get_logger
|
from shared import clean, get_conn, get_logger
|
||||||
|
|
||||||
log = get_logger("import_tickets")
|
log = get_logger("pipeline")
|
||||||
|
|
||||||
# ── INC ingestion config ──────────────────────────────────────────────────────
|
# ── shared ingestion config ─────────────────────────────────────────────────────
|
||||||
_TABLE = "tickets.inc"
|
_BUCKET = os.getenv("TICKETS_BUCKET", "isptickets")
|
||||||
_DATASET = "inc"
|
|
||||||
_BUCKET = os.getenv("TICKETS_BUCKET", "tickets")
|
|
||||||
_INC_PREFIX = "automations/inc/"
|
|
||||||
_PROCESSED_PREFIX = "automations/inc/processed/"
|
|
||||||
_EAT = timezone(timedelta(hours=3)) # Africa/Nairobi — filenames + data are EAT
|
_EAT = timezone(timedelta(hours=3)) # Africa/Nairobi — filenames + data are EAT
|
||||||
|
|
||||||
# Garbage row the source leaks (commonly the first data line): its ticket_id is the
|
# Garbage row the source leaks (commonly the first data line): its ticket_id is the
|
||||||
|
|
@ -88,10 +75,6 @@ DROP_FIELDS = frozenset({
|
||||||
"department", "source_type",
|
"department", "source_type",
|
||||||
})
|
})
|
||||||
|
|
||||||
# Only files matching automations/inc/<EAT-timestamp>.csv (NOT processed/, NOT the
|
|
||||||
# leftover latest.csv/, latest.json/, full/ prefixes).
|
|
||||||
_CSV_KEY_RE = re.compile(r"^automations/inc/(\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2})\.csv$")
|
|
||||||
|
|
||||||
# Geocoder (keyed) — public Nominatim rate-limits bulk, so we use LocationIQ/OpenCage.
|
# Geocoder (keyed) — public Nominatim rate-limits bulk, so we use LocationIQ/OpenCage.
|
||||||
_PROVIDER = os.getenv("GEOCODER_PROVIDER", "locationiq").lower()
|
_PROVIDER = os.getenv("GEOCODER_PROVIDER", "locationiq").lower()
|
||||||
_API_KEY = os.getenv("GEOCODER_API_KEY", "")
|
_API_KEY = os.getenv("GEOCODER_API_KEY", "")
|
||||||
|
|
@ -99,14 +82,37 @@ _GEOCODE_INTERVAL_S = float(os.getenv("GEOCODER_MIN_INTERVAL_S", "1.1"))
|
||||||
_last_geocode_at = 0.0
|
_last_geocode_at = 0.0
|
||||||
|
|
||||||
|
|
||||||
# ── data loading (CSV · newest-file · ETag skip-if-unchanged) ───────────────────
|
# ── dataset config (per ticket type) ────────────────────────────────────────────
|
||||||
# The n8n hourly export writes a full current-state CSV per hour to
|
@dataclass(frozen=True)
|
||||||
# automations/inc/<EAT-timestamp>.csv (no latest pointer, no envelope, no deltas).
|
class Dataset:
|
||||||
# We ingest the NEWEST file; if its S3 ETag matches the last processed file's ETag
|
"""All that distinguishes one ticket type from another in the generic engine."""
|
||||||
# we skip the DB write (the export re-emits byte-identical content most hours).
|
name: str # 'inc' | 'crq' (import_meta.dataset)
|
||||||
|
table: str # 'tickets.inc' | 'tickets.crq'
|
||||||
|
change_prefix: str # 'automations/<name>/changes/'
|
||||||
|
processed_prefix: str # 'automations/<name>/processed/'
|
||||||
|
key_regex: re.Pattern # matches a <prefix><EAT-ts>.csv key
|
||||||
|
post_apply: Callable[[], None] | None = None # e.g. capture_history (INC only)
|
||||||
|
|
||||||
|
|
||||||
|
def make_dataset(name: str, post_apply: Callable[[], None] | None = None) -> Dataset:
|
||||||
|
"""Build the standard Dataset for a ticket type (inc/crq) — only the name varies."""
|
||||||
|
return Dataset(
|
||||||
|
name=name,
|
||||||
|
table=f"tickets.{name}",
|
||||||
|
change_prefix=f"automations/{name}/changes/",
|
||||||
|
processed_prefix=f"automations/{name}/processed/",
|
||||||
|
# only automations/<name>/changes/<EAT-timestamp>.csv — the incremental stream
|
||||||
|
# (NOT processed/, NOT the leftover latest.csv/, latest.json/, full/ prefixes).
|
||||||
|
key_regex=re.compile(
|
||||||
|
rf"^automations/{name}/changes/(\d{{4}}-\d{{2}}-\d{{2}}T\d{{2}}-\d{{2}}-\d{{2}})\.csv$"),
|
||||||
|
post_apply=post_apply,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ── data loading (CSV · incremental CDC change stream · per-file watermark) ─────
|
||||||
# S3 access is via boto3 (no aws-CLI dependency → runs cleanly in a slim container).
|
# S3 access is via boto3 (no aws-CLI dependency → runs cleanly in a slim container).
|
||||||
def _s3_client():
|
def _s3_client():
|
||||||
"""boto3 S3 client for the rustfs endpoint (force path-style addressing)."""
|
"""boto3 S3 client for the S3 endpoint (force path-style addressing)."""
|
||||||
return boto3.client(
|
return boto3.client(
|
||||||
"s3",
|
"s3",
|
||||||
endpoint_url=os.environ["RUSTFS_ENDPOINT"],
|
endpoint_url=os.environ["RUSTFS_ENDPOINT"],
|
||||||
|
|
@ -118,9 +124,9 @@ def _s3_client():
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def _ts_from_key(key: str) -> datetime | None:
|
def _ts_from_key(ds: Dataset, key: str) -> datetime | None:
|
||||||
"""EAT timestamp embedded in an automations/inc/<ts>.csv key (or None)."""
|
"""EAT timestamp embedded in an automations/<ds>/changes/<ts>.csv key (or None)."""
|
||||||
m = _CSV_KEY_RE.match(key)
|
m = ds.key_regex.match(key)
|
||||||
if not m:
|
if not m:
|
||||||
return None
|
return None
|
||||||
try: # regex shape can match an impossible date (e.g. 9999-99-99T…) — don't crash the sort
|
try: # regex shape can match an impossible date (e.g. 9999-99-99T…) — don't crash the sort
|
||||||
|
|
@ -129,12 +135,12 @@ def _ts_from_key(key: str) -> datetime | None:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
def _list_inc_csvs(s3) -> list[tuple[str, str]]:
|
def _list_csvs(s3, ds: Dataset) -> list[tuple[str, str]]:
|
||||||
"""[(key, etag)] for every automations/inc/<ts>.csv (excludes processed/ + dirs)."""
|
"""[(key, etag)] for every changes/<ts>.csv of this dataset (excludes processed/ + dirs)."""
|
||||||
out: list[tuple[str, str]] = []
|
out: list[tuple[str, str]] = []
|
||||||
for page in s3.get_paginator("list_objects_v2").paginate(Bucket=_BUCKET, Prefix=_INC_PREFIX):
|
for page in s3.get_paginator("list_objects_v2").paginate(Bucket=_BUCKET, Prefix=ds.change_prefix):
|
||||||
for it in page.get("Contents", []):
|
for it in page.get("Contents", []):
|
||||||
if _CSV_KEY_RE.match(it["Key"]):
|
if ds.key_regex.match(it["Key"]):
|
||||||
out.append((it["Key"], (it.get("ETag") or "").strip('"')))
|
out.append((it["Key"], (it.get("ETag") or "").strip('"')))
|
||||||
return out
|
return out
|
||||||
|
|
||||||
|
|
@ -144,16 +150,22 @@ def _get_text(s3, key: str) -> str:
|
||||||
return s3.get_object(Bucket=_BUCKET, Key=key)["Body"].read().decode("utf-8")
|
return s3.get_object(Bucket=_BUCKET, Key=key)["Body"].read().decode("utf-8")
|
||||||
|
|
||||||
|
|
||||||
def _last_processed_etag() -> str | None:
|
def _last_processed_ts(ds: Dataset) -> datetime | None:
|
||||||
"""ETag of the most recently ingested INC file (from tickets.import_meta)."""
|
"""Watermark: EAT timestamp of the newest change file already ingested for this dataset.
|
||||||
|
|
||||||
|
Read from tickets.import_meta (metadata->>'source_max_key', advanced per file as
|
||||||
|
we drain changes/ oldest→newest). None when nothing has been ingested via the
|
||||||
|
changes stream yet (e.g. a brand-new dataset, or the first run after the source
|
||||||
|
switched buckets) — then every file currently in changes/ is processed.
|
||||||
|
"""
|
||||||
with get_conn() as conn:
|
with get_conn() as conn:
|
||||||
with conn.cursor() as cur:
|
with conn.cursor() as cur:
|
||||||
cur.execute(
|
cur.execute(
|
||||||
"SELECT metadata->>'source_etag' FROM tickets.import_meta WHERE dataset = %s",
|
"SELECT metadata->>'source_max_key' FROM tickets.import_meta WHERE dataset = %s",
|
||||||
(_DATASET,),
|
(ds.name,),
|
||||||
)
|
)
|
||||||
row = cur.fetchone()
|
row = cur.fetchone()
|
||||||
return row[0] if row else None
|
return _ts_from_key(ds, row[0]) if row and row[0] else None
|
||||||
|
|
||||||
|
|
||||||
def _parse_csv(text: str) -> list[dict]:
|
def _parse_csv(text: str) -> list[dict]:
|
||||||
|
|
@ -165,10 +177,10 @@ def _load_csv_local(path: str) -> list[dict]:
|
||||||
return list(csv.DictReader(f))
|
return list(csv.DictReader(f))
|
||||||
|
|
||||||
|
|
||||||
def _move_processed(s3, keys: list[str]) -> None:
|
def _move_processed(s3, ds: Dataset, keys: list[str]) -> None:
|
||||||
"""Archive listed INC csv objects to automations/inc/processed/ (copy + delete)."""
|
"""Archive listed csv objects to automations/<ds>/processed/ (copy + delete)."""
|
||||||
for key in keys:
|
for key in keys:
|
||||||
dst = _PROCESSED_PREFIX + key.rsplit("/", 1)[-1]
|
dst = ds.processed_prefix + key.rsplit("/", 1)[-1]
|
||||||
s3.copy_object(Bucket=_BUCKET, CopySource={"Bucket": _BUCKET, "Key": key}, Key=dst)
|
s3.copy_object(Bucket=_BUCKET, CopySource={"Bucket": _BUCKET, "Key": key}, Key=dst)
|
||||||
s3.delete_object(Bucket=_BUCKET, Key=key)
|
s3.delete_object(Bucket=_BUCKET, Key=key)
|
||||||
log.info("archived %s -> %s", key, dst)
|
log.info("archived %s -> %s", key, dst)
|
||||||
|
|
@ -194,8 +206,8 @@ def _prepare(row: dict) -> dict:
|
||||||
|
|
||||||
|
|
||||||
# ── upsert (raw-first) ────────────────────────────────────────────────────────
|
# ── upsert (raw-first) ────────────────────────────────────────────────────────
|
||||||
def _record_meta(cur, meta: dict, records_ingested: int) -> None:
|
def _record_meta(cur, ds: Dataset, meta: dict, records_ingested: int) -> None:
|
||||||
"""Upsert the INC snapshot metadata (powers map freshness + holds source_etag).
|
"""Upsert the snapshot metadata (powers map freshness + holds source_max_key).
|
||||||
|
|
||||||
Runs on the caller's cursor so the row upsert and the meta write commit
|
Runs on the caller's cursor so the row upsert and the meta write commit
|
||||||
together — a half-written state (rows in, meta stale) breaks skip-if-unchanged.
|
together — a half-written state (rows in, meta stale) breaks skip-if-unchanged.
|
||||||
|
|
@ -213,90 +225,113 @@ def _record_meta(cur, meta: dict, records_ingested: int) -> None:
|
||||||
records_ingested = EXCLUDED.records_ingested,
|
records_ingested = EXCLUDED.records_ingested,
|
||||||
n8n_execution_id = EXCLUDED.n8n_execution_id, metadata = EXCLUDED.metadata,
|
n8n_execution_id = EXCLUDED.n8n_execution_id, metadata = EXCLUDED.metadata,
|
||||||
ingested_at = now()""",
|
ingested_at = now()""",
|
||||||
(_DATASET, clean(meta.get("export_type")), clean(meta.get("exported_at")),
|
(ds.name, clean(meta.get("export_type")), clean(meta.get("exported_at")),
|
||||||
clean(meta.get("snapshot_date")), clean(meta.get("source_schema")),
|
clean(meta.get("snapshot_date")), clean(meta.get("source_schema")),
|
||||||
clean(meta.get("source_table")), meta.get("row_count"), records_ingested,
|
clean(meta.get("source_table")), meta.get("row_count"), records_ingested,
|
||||||
clean(meta.get("n8n_execution_id")), psycopg2.extras.Json(meta)),
|
clean(meta.get("n8n_execution_id")), psycopg2.extras.Json(meta)),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def upsert(rows: list[dict], apply: bool, meta: dict | None = None) -> int:
|
def upsert(ds: Dataset, rows: list[dict], apply: bool, meta: dict | None = None) -> int:
|
||||||
meta = meta or {}
|
meta = meta or {}
|
||||||
kept = [r for r in rows if _keep_row(r)]
|
kept = [r for r in rows if _keep_row(r)]
|
||||||
payload = [(clean(r["ticket_id"]), psycopg2.extras.Json(_prepare(r))) for r in kept]
|
payload = [(clean(r["ticket_id"]), psycopg2.extras.Json(_prepare(r))) for r in kept]
|
||||||
log.info("%s: %d rows read, %d kept, %d dropped (alarm/sentinel/no-id)",
|
log.info("%s: %d rows read, %d kept, %d dropped (alarm/sentinel/no-id)",
|
||||||
_TABLE, len(rows), len(payload), len(rows) - len(payload))
|
ds.table, len(rows), len(payload), len(rows) - len(payload))
|
||||||
if not apply:
|
if not apply:
|
||||||
log.info("DRY-RUN — nothing written to %s. Use --apply.", _TABLE)
|
log.info("DRY-RUN — nothing written to %s. Use --apply.", ds.table)
|
||||||
return len(payload)
|
return len(payload)
|
||||||
with get_conn() as conn:
|
with get_conn() as conn:
|
||||||
with conn.cursor() as cur:
|
with conn.cursor() as cur:
|
||||||
psycopg2.extras.execute_values(
|
psycopg2.extras.execute_values(
|
||||||
cur,
|
cur,
|
||||||
f"INSERT INTO {_TABLE} (ticket_id, raw) VALUES %s "
|
f"INSERT INTO {ds.table} (ticket_id, raw) VALUES %s "
|
||||||
"ON CONFLICT (ticket_id) DO UPDATE SET raw = EXCLUDED.raw, ingested_at = now()",
|
"ON CONFLICT (ticket_id) DO UPDATE SET raw = EXCLUDED.raw, ingested_at = now()",
|
||||||
payload, page_size=500,
|
payload, page_size=500,
|
||||||
)
|
)
|
||||||
# same transaction as the upsert: rows + snapshot meta commit atomically
|
# same transaction as the upsert: rows + snapshot meta commit atomically
|
||||||
_record_meta(cur, meta, len(payload))
|
_record_meta(cur, ds, meta, len(payload))
|
||||||
log.info("upserted %d rows into %s", len(payload), _TABLE)
|
log.info("upserted %d rows into %s", len(payload), ds.table)
|
||||||
return len(payload)
|
return len(payload)
|
||||||
|
|
||||||
|
|
||||||
def _capture_history() -> None:
|
def capture_history() -> None:
|
||||||
"""Append new closures + upsert today's backlog snapshot (tickets.capture_history)."""
|
"""Append new closures + upsert today's backlog snapshot (tickets.capture_history).
|
||||||
|
|
||||||
|
INC-only today (CRQ install-lifecycle history is a future migration); wired as
|
||||||
|
the INC Dataset's post_apply hook.
|
||||||
|
"""
|
||||||
with get_conn() as conn:
|
with get_conn() as conn:
|
||||||
with conn.cursor() as cur:
|
with conn.cursor() as cur:
|
||||||
cur.execute("SELECT tickets.capture_history()")
|
cur.execute("SELECT tickets.capture_history()")
|
||||||
log.info("history: %s", cur.fetchone()[0])
|
log.info("history: %s", cur.fetchone()[0])
|
||||||
|
|
||||||
|
|
||||||
def ingest(args) -> None:
|
def ingest(ds: Dataset, args) -> None:
|
||||||
# Local-file path (dev): ingest a single CSV, no bucket / no archive.
|
# Local-file path (dev): ingest a single CSV, no bucket / no archive / no history.
|
||||||
if args.inc_csv:
|
if args.local_csv:
|
||||||
rows = _load_csv_local(args.inc_csv)
|
rows = _load_csv_local(args.local_csv)
|
||||||
name = os.path.basename(args.inc_csv)
|
name = os.path.basename(args.local_csv)
|
||||||
ts = _ts_from_key(_INC_PREFIX + name)
|
ts = _ts_from_key(ds, ds.change_prefix + name)
|
||||||
meta = {"export_type": "full", "source_s3_key": name, "row_count": len(rows)}
|
meta = {"export_type": "full", "source_s3_key": name, "row_count": len(rows)}
|
||||||
if ts:
|
if ts:
|
||||||
meta["exported_at"] = ts.isoformat()
|
meta["exported_at"] = ts.isoformat()
|
||||||
upsert(rows, args.apply, meta=meta)
|
upsert(ds, rows, args.apply, meta=meta)
|
||||||
return
|
return
|
||||||
|
|
||||||
# --from-bucket: newest INC csv → skip-if-unchanged → ingest → archive.
|
# --from-bucket: ingest EVERY not-yet-processed change file, oldest→newest
|
||||||
|
# (baseline first, then each delta), upserting each. The watermark advances and
|
||||||
|
# the file is archived PER file, so a mid-run failure leaves a consistent state
|
||||||
|
# (folder state matches the watermark) and the next run resumes cleanly.
|
||||||
s3 = _s3_client()
|
s3 = _s3_client()
|
||||||
listing = _list_inc_csvs(s3)
|
listing = _list_csvs(s3, ds)
|
||||||
if not listing:
|
if not listing:
|
||||||
log.info("no INC csv files under %s — nothing to do", _INC_PREFIX)
|
log.info("no %s change files under %s — nothing to do", ds.name, ds.change_prefix)
|
||||||
return
|
return
|
||||||
listing.sort(key=lambda ke: _ts_from_key(ke[0]) or datetime.min.replace(tzinfo=_EAT))
|
listing.sort(key=lambda ke: _ts_from_key(ds, ke[0]) or datetime.min.replace(tzinfo=_EAT))
|
||||||
all_keys = [k for k, _ in listing]
|
|
||||||
newest_key, newest_etag = listing[-1]
|
|
||||||
log.info("newest INC file: %s (etag=%s; %d file(s) present)",
|
|
||||||
newest_key, newest_etag, len(listing))
|
|
||||||
|
|
||||||
last_etag = _last_processed_etag()
|
# watermark: skip anything at/older than the newest file already applied. Archiving
|
||||||
if newest_etag and newest_etag == last_etag:
|
# normally empties changes/, but this guards a failed archive from re-applying.
|
||||||
log.info("etag unchanged from last processed (%s) — skipping DB write", last_etag)
|
# --reseed ignores the stored watermark and drains EVERY file in changes/ once — used
|
||||||
|
# for a one-time bucket cutover, where the stored key points at the old bucket's stream
|
||||||
|
# and its timestamp may be newer than the new bucket's first file. Crash-safe: each file
|
||||||
|
# still advances source_max_key + archives per file, so a plain rerun resumes cleanly.
|
||||||
|
last_ts = None if args.reseed else _last_processed_ts(ds)
|
||||||
|
_floor = datetime.min.replace(tzinfo=_EAT)
|
||||||
|
pending = [(k, e) for k, e in listing
|
||||||
|
if last_ts is None or (_ts_from_key(ds, k) or _floor) > last_ts]
|
||||||
|
if not pending:
|
||||||
|
log.info("all %d %s change file(s) already processed (watermark %s) — nothing new",
|
||||||
|
len(listing), ds.name, last_ts and last_ts.isoformat())
|
||||||
if args.apply:
|
if args.apply:
|
||||||
_move_processed(s3, all_keys)
|
_move_processed(s3, ds, [k for k, _ in listing]) # archive any stragglers
|
||||||
_capture_history() # still record today's snapshot even when unchanged
|
if ds.post_apply:
|
||||||
else:
|
ds.post_apply()
|
||||||
log.info("DRY-RUN — would archive %d file(s) to %s", len(all_keys), _PROCESSED_PREFIX)
|
|
||||||
return
|
return
|
||||||
|
log.info("%d of %d %s change file(s) to ingest (watermark %s); newest=%s",
|
||||||
|
len(pending), len(listing), ds.name, last_ts and last_ts.isoformat(), pending[-1][0])
|
||||||
|
|
||||||
rows = _parse_csv(_get_text(s3, newest_key))
|
total = 0
|
||||||
ts = _ts_from_key(newest_key)
|
for i, (key, etag) in enumerate(pending):
|
||||||
meta = {"export_type": "full", "source_s3_key": newest_key,
|
rows = _parse_csv(_get_text(s3, key))
|
||||||
"source_etag": newest_etag, "row_count": len(rows)}
|
ts = _ts_from_key(ds, key)
|
||||||
|
# the first file applied onto an empty watermark is the full baseline; every
|
||||||
|
# file after is a delta. export_type is informational (recorded in import_meta).
|
||||||
|
meta = {"export_type": "baseline" if (last_ts is None and i == 0) else "delta",
|
||||||
|
"source_s3_key": key, "source_etag": etag,
|
||||||
|
"source_max_key": key, "row_count": len(rows)}
|
||||||
if ts:
|
if ts:
|
||||||
meta["exported_at"] = ts.isoformat()
|
meta["exported_at"] = ts.isoformat()
|
||||||
upsert(rows, args.apply, meta=meta)
|
# rows + watermark (source_max_key) commit in one txn, advancing per file; only
|
||||||
|
# then archive, so the changes/ folder state always matches the watermark.
|
||||||
|
total += upsert(ds, rows, args.apply, meta=meta)
|
||||||
if args.apply:
|
if args.apply:
|
||||||
_move_processed(s3, all_keys)
|
_move_processed(s3, ds, [key])
|
||||||
_capture_history()
|
|
||||||
else:
|
else:
|
||||||
log.info("DRY-RUN — would archive %d file(s) to %s", len(all_keys), _PROCESSED_PREFIX)
|
log.info("DRY-RUN — would archive %s to %s", key, ds.processed_prefix)
|
||||||
|
log.info("ingested %d %s change file(s); %d rows kept in total", len(pending), ds.name, total)
|
||||||
|
if args.apply and ds.post_apply:
|
||||||
|
ds.post_apply()
|
||||||
|
|
||||||
|
|
||||||
# ── place extraction (strip network codes, keep the real place) ───────────────
|
# ── place extraction (strip network codes, keep the real place) ───────────────
|
||||||
|
|
@ -445,7 +480,7 @@ def geocode(query: str, viewbox: tuple | None = None) -> tuple[float, float, flo
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
# ── cluster gazetteer (coarse fallback) ───────────────────────────────────────
|
# ── cluster gazetteer (coarse fallback; CROSS-DATASET: inc + crq) ─────────────
|
||||||
def geocode_clusters(apply: bool) -> None:
|
def geocode_clusters(apply: bool) -> None:
|
||||||
with get_conn() as conn:
|
with get_conn() as conn:
|
||||||
with conn.cursor() as cur:
|
with conn.cursor() as cur:
|
||||||
|
|
@ -492,7 +527,7 @@ def geocode_clusters(apply: bool) -> None:
|
||||||
log.info("gazetteer: %d clusters written (unverified — review tickets.geo_clusters)", written)
|
log.info("gazetteer: %d clusters written (unverified — review tickets.geo_clusters)", written)
|
||||||
|
|
||||||
|
|
||||||
# ── per-location geocoding (precise; actionable INC) ──────────────────────────
|
# ── per-location geocoding (precise; actionable inc + crq) ────────────────────
|
||||||
# A location geocode is only trusted if it lands within this radius of the
|
# A location geocode is only trusted if it lands within this radius of the
|
||||||
# cluster centroid; otherwise the geocoder matched the landmark in the wrong
|
# cluster centroid; otherwise the geocoder matched the landmark in the wrong
|
||||||
# place and we fall back to the cluster centroid.
|
# place and we fall back to the cluster centroid.
|
||||||
|
|
@ -507,17 +542,25 @@ def geocode_locations(apply: bool) -> None:
|
||||||
"""
|
"""
|
||||||
SELECT t.key, t.location_name, t.cluster, t.region, gc.lat AS clat, gc.lng AS clng
|
SELECT t.key, t.location_name, t.cluster, t.region, gc.lat AS clat, gc.lng AS clng
|
||||||
FROM (
|
FROM (
|
||||||
SELECT tickets.norm_cluster(raw->>'location_name') AS key,
|
SELECT tickets.norm_cluster(src.raw->>'location_name') AS key,
|
||||||
(array_agg(raw->>'location_name'))[1] AS location_name,
|
(array_agg(src.raw->>'location_name'))[1] AS location_name,
|
||||||
(array_agg(raw->>'cluster'))[1] AS cluster,
|
(array_agg(src.raw->>'cluster'))[1] AS cluster,
|
||||||
(array_agg(raw->>'region'))[1] AS region,
|
(array_agg(src.raw->>'region'))[1] AS region,
|
||||||
tickets.norm_cluster((array_agg(raw->>'cluster'))[1]) AS ckey
|
tickets.norm_cluster((array_agg(src.raw->>'cluster'))[1]) AS ckey
|
||||||
FROM tickets.inc
|
FROM (
|
||||||
|
-- CROSS-DATASET: actionable INC + CRQ share one location gazetteer
|
||||||
|
SELECT raw FROM tickets.inc
|
||||||
WHERE (raw->>'is_actionable')::boolean
|
WHERE (raw->>'is_actionable')::boolean
|
||||||
AND raw->>'location_name' IS NOT NULL
|
AND raw->>'location_name' IS NOT NULL
|
||||||
AND tickets.norm_cluster(raw->>'location_name') IS NOT NULL
|
AND tickets.norm_cluster(raw->>'location_name') IS NOT NULL
|
||||||
AND NOT EXISTS (SELECT 1 FROM tickets.geo_locations gl
|
UNION ALL
|
||||||
WHERE gl.query_key = tickets.norm_cluster(raw->>'location_name')
|
SELECT raw FROM tickets.crq
|
||||||
|
WHERE (raw->>'is_actionable')::boolean
|
||||||
|
AND raw->>'location_name' IS NOT NULL
|
||||||
|
AND tickets.norm_cluster(raw->>'location_name') IS NOT NULL
|
||||||
|
) src
|
||||||
|
WHERE NOT EXISTS (SELECT 1 FROM tickets.geo_locations gl
|
||||||
|
WHERE gl.query_key = tickets.norm_cluster(src.raw->>'location_name')
|
||||||
AND gl.geom IS NOT NULL)
|
AND gl.geom IS NOT NULL)
|
||||||
GROUP BY 1
|
GROUP BY 1
|
||||||
) t
|
) t
|
||||||
|
|
@ -525,7 +568,7 @@ def geocode_locations(apply: bool) -> None:
|
||||||
"""
|
"""
|
||||||
)
|
)
|
||||||
todo = cur.fetchall()
|
todo = cur.fetchall()
|
||||||
log.info("%d actionable-INC locations to geocode (provider=%s)", len(todo), _PROVIDER)
|
log.info("%d actionable inc+crq locations to geocode (provider=%s)", len(todo), _PROVIDER)
|
||||||
if not apply:
|
if not apply:
|
||||||
for key, loc, cluster, region, clat, clng in todo[:50]:
|
for key, loc, cluster, region, clat, clng in todo[:50]:
|
||||||
log.info(" %s -> %s", key, " | ".join(compose_queries(loc, cluster, region)))
|
log.info(" %s -> %s", key, " | ".join(compose_queries(loc, cluster, region)))
|
||||||
|
|
@ -579,37 +622,3 @@ def _resolve() -> int:
|
||||||
with conn.cursor() as cur:
|
with conn.cursor() as cur:
|
||||||
cur.execute("SELECT tickets.resolve_ticket_geoms()")
|
cur.execute("SELECT tickets.resolve_ticket_geoms()")
|
||||||
return cur.fetchone()[0]
|
return cur.fetchone()[0]
|
||||||
|
|
||||||
|
|
||||||
# ── entrypoint ────────────────────────────────────────────────────────────────
|
|
||||||
def main() -> None:
|
|
||||||
ap = argparse.ArgumentParser(description="Ingest INC tickets from CSV (raw-first) + geocode")
|
|
||||||
ap.add_argument("--apply", action="store_true", help="Write to DB (default: dry-run)")
|
|
||||||
ap.add_argument("--from-bucket", action="store_true",
|
|
||||||
help="Ingest the newest INC csv from the rustfs tickets bucket (aws CLI); "
|
|
||||||
"skips if unchanged (ETag) and archives processed files")
|
|
||||||
ap.add_argument("--inc-csv", default=None, help="Local INC tickets CSV file (dev)")
|
|
||||||
ap.add_argument("--geocode-clusters", action="store_true",
|
|
||||||
help="Geocode distinct clusters into the gazetteer, then re-resolve geoms")
|
|
||||||
ap.add_argument("--geocode-locations", action="store_true",
|
|
||||||
help="Geocode actionable-INC location_names precisely (keyed provider), then re-resolve")
|
|
||||||
ap.add_argument("--capture-history", action="store_true",
|
|
||||||
help="Run tickets.capture_history() standalone (closure_events + daily snapshot)")
|
|
||||||
args = ap.parse_args()
|
|
||||||
|
|
||||||
if args.geocode_clusters:
|
|
||||||
geocode_clusters(apply=args.apply)
|
|
||||||
return
|
|
||||||
if args.geocode_locations:
|
|
||||||
geocode_locations(apply=args.apply)
|
|
||||||
return
|
|
||||||
if args.capture_history:
|
|
||||||
_capture_history()
|
|
||||||
return
|
|
||||||
if not (args.from_bucket or args.inc_csv):
|
|
||||||
ap.error("provide --from-bucket, --inc-csv, --geocode-clusters, --geocode-locations, or --capture-history")
|
|
||||||
ingest(args)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
|
|
@ -18,10 +18,12 @@ dev = [
|
||||||
"ruff>=0.4",
|
"ruff>=0.4",
|
||||||
]
|
]
|
||||||
|
|
||||||
# Flat-module project (no package dir) — list the top-level modules explicitly so
|
# Shared engine (pipeline) + helpers as top-level modules, plus the thin per-type
|
||||||
# `pip install .` works (the Docker image installs the project to pull its deps).
|
# entrypoint packages (inc/, crq/). Listed explicitly so `pip install .` works (the
|
||||||
|
# Docker image installs the project to pull its deps; runtime runs from /app via -m).
|
||||||
[tool.setuptools]
|
[tool.setuptools]
|
||||||
py-modules = ["import_tickets", "shared", "run_migrations"]
|
py-modules = ["pipeline", "shared", "run_migrations"]
|
||||||
|
packages = ["inc", "crq"]
|
||||||
|
|
||||||
[tool.uv]
|
[tool.uv]
|
||||||
managed = true
|
managed = true
|
||||||
|
|
|
||||||
|
|
@ -1,13 +1,17 @@
|
||||||
#!/usr/bin/env bash
|
#!/usr/bin/env bash
|
||||||
# run_ingest.sh — fleettickets · hourly INC ingest wrapper for cron.
|
# run_ingest.sh — fleettickets · INC + CRQ ingest wrapper for cron (plain host/VM).
|
||||||
#
|
#
|
||||||
# Loads env from the local .env (DATABASE_URL + RUSTFS_* + GEOCODER_*) and runs the
|
# Loads env from the local .env (DATABASE_URL + RUSTFS_* + GEOCODER_*) and drains
|
||||||
# newest-INC-CSV ingest with --apply (skip-if-unchanged + archive are built in).
|
# both ticket change streams with --apply (watermark skip-if-unchanged + per-file
|
||||||
|
# archive are built in, so a run with no new files is a cheap no-op).
|
||||||
#
|
#
|
||||||
# Install on the instance (ingest at :15, 07:00–19:00 EAT):
|
# Install on the instance (every 20 min, 06:00–20:40 EAT):
|
||||||
# 15 7-19 * * * /opt/fleettickets/run_ingest.sh >> /var/log/fleettickets-inc.log 2>&1
|
# */20 6-20 * * * /opt/fleettickets/run_ingest.sh >> /var/log/fleettickets.log 2>&1
|
||||||
# Ensure the crontab runs in the Africa/Nairobi timezone (CRON_TZ=Africa/Nairobi or
|
# Ensure the crontab runs in the Africa/Nairobi timezone (CRON_TZ=Africa/Nairobi or
|
||||||
# the host/container TZ), since the export filenames and the schedule are EAT.
|
# the host/container TZ), since the export filenames and the schedule are EAT.
|
||||||
|
#
|
||||||
|
# On Coolify the two ingests run as separate Scheduled Tasks instead (see Dockerfile
|
||||||
|
# + docs/deployment-and-operations.md); this wrapper is the plain-host fallback.
|
||||||
set -euo pipefail
|
set -euo pipefail
|
||||||
|
|
||||||
cd "$(dirname "$0")"
|
cd "$(dirname "$0")"
|
||||||
|
|
@ -24,4 +28,7 @@ fi
|
||||||
PY="python"
|
PY="python"
|
||||||
[ -x ".venv/bin/python" ] && PY=".venv/bin/python"
|
[ -x ".venv/bin/python" ] && PY=".venv/bin/python"
|
||||||
|
|
||||||
exec "$PY" import_tickets.py --from-bucket --apply
|
# Run from the repo root (cwd above) so `-m inc.import_inc` / `-m crq.import_crq`
|
||||||
|
# resolve the packages alongside pipeline.py + shared.py.
|
||||||
|
"$PY" -m inc.import_inc --from-bucket --apply
|
||||||
|
"$PY" -m crq.import_crq --from-bucket --apply
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue