Compare commits

..

No commits in common. "main" and "fix/inc-changes-stream" have entirely different histories.

18 changed files with 208 additions and 1126 deletions

View file

@ -3,7 +3,7 @@
# Shared database (the `tickets` schema lives in tracksolid_db; internal Docker host) # Shared database (the `tickets` schema lives in tracksolid_db; internal Docker host)
DATABASE_URL=postgresql://tracksolid_owner:<password>@timescale_db:5432/tracksolid_db DATABASE_URL=postgresql://tracksolid_owner:<password>@timescale_db:5432/tracksolid_db
# S3 — source ticket CDC streams (isptickets bucket, automations/{inc,crq}/changes/<EAT-ts>.csv) # S3 — source INC ticket CDC stream (isptickets bucket, automations/inc/changes/<EAT-ts>.csv)
RUSTFS_ENDPOINT=https://s3.rahamafresh.com RUSTFS_ENDPOINT=https://s3.rahamafresh.com
RUSTFS_ACCESS_KEY=isptickets RUSTFS_ACCESS_KEY=isptickets
RUSTFS_SECRET_KEY=<secret> RUSTFS_SECRET_KEY=<secret>

View file

@ -1,9 +1,7 @@
# fleettickets — INC + CRQ ticket ingestion image (Coolify-deployable). # fleettickets — INC ingestion image (Coolify-deployable).
# A small batch/cron worker: it has no web server. Coolify keeps the container # A small batch/cron worker: it has no web server. Coolify keeps the container
# running (CMD below) and fires the ingests via two Scheduled Tasks: # running (CMD below) and fires the ingest via a Scheduled Task:
# python -m inc.import_inc --from-bucket --apply (cron: */20 6-20 * * *) # python import_tickets.py --from-bucket --apply (cron: */20 6-20 * * *)
# python -m crq.import_crq --from-bucket --apply (cron: */20 6-20 * * *)
# (run from /app so the inc/ and crq/ packages + pipeline.py/shared.py import.)
# Env (set in Coolify): DATABASE_URL, RUSTFS_*, GEOCODER_*. S3 is via boto3 — no # Env (set in Coolify): DATABASE_URL, RUSTFS_*, GEOCODER_*. S3 is via boto3 — no
# aws CLI needed. psycopg2-binary ships its own libpq, so no build toolchain. # aws CLI needed. psycopg2-binary ships its own libpq, so no build toolchain.
FROM python:3.12-slim FROM python:3.12-slim

View file

@ -1,22 +1,11 @@
# fleettickets # fleettickets
Field-ops **ticket** ingestion, geocoding, and read-schema that powers the Field-ops **INC ticket** ingestion, geocoding, and read-schema that powers the
**Tickets** map in FleetOps. Extracted from the `tracksolid` repo into its own module **Tickets** map in FleetOps. Extracted from the `tracksolid` repo into its own module
(it previously lived there as migrations 2123 + `tools/import_tickets.py`). (it previously lived there as migrations 2123 + `tools/import_tickets.py`).
Two ticket types, identical 32-column source schema and CDC change stream, served - **INC** — incident / customer-fault tickets *(this pipeline is **strictly INC**)*
through a **shared engine** (`pipeline.py`) with a thin per-type entrypoint each: - **CRQ** — new-installation requests *(schema kept, but **out of scope** — not ingested here)*
- **INC** — incident / customer-fault tickets → `tickets.inc` (`inc/import_inc.py`).
Full feature set: typed columns, geocoding, SLA view, dashboard fn, history capture.
- **CRQ** — new-installation requests → `tickets.crq` (`crq/import_crq.py`). **Data
layer + map** (typed columns, geocoding, appears on the Tickets map via
`fn_tickets_for_map`). SLA view / dashboard fn / history capture are deferred —
installation-lifecycle semantics differ from incidents (see roadmap). CRQ gets its
**own FleetOps tab**, same look & feel as INC.
Geocoding is **cross-dataset** (one gazetteer, one geocoder budget, covers inc + crq)
and is driven from the INC entrypoint.
## What this owns ## What this owns
@ -32,10 +21,7 @@ and is driven from the INC entrypoint.
| `migrations/08_inc_open_sla_view.sql` | `tickets.inc_open_sla` view — open (`is_actionable`) tickets with **derived SLA** (`hours_open`, `sla_state` vs 48h; clock = `created_at_service``first_seen_at`), plus team/cluster/`geog` for dispatch | | `migrations/08_inc_open_sla_view.sql` | `tickets.inc_open_sla` view — open (`is_actionable`) tickets with **derived SLA** (`hours_open`, `sla_state` vs 48h; clock = `created_at_service``first_seen_at`), plus team/cluster/`geog` for dispatch |
| `migrations/09_inc_dashboard_fn.sql` | `reporting.fn_inc_dashboard(cluster, status, window, from, to)` — one JSON payload (`window` / `open` GeoJSON / `closed` GeoJSON / `metrics` / `freshness`) powering the FleetOps live INC map. Open=live, closed=windowed (EAT calendar / custom); filters AND | | `migrations/09_inc_dashboard_fn.sql` | `reporting.fn_inc_dashboard(cluster, status, window, from, to)` — one JSON payload (`window` / `open` GeoJSON / `closed` GeoJSON / `metrics` / `freshness`) powering the FleetOps live INC map. Open=live, closed=windowed (EAT calendar / custom); filters AND |
| `migrations/10_inc_history_capture.sql` | History for time-series: `tickets.closure_events` (append-only observed closures) + `tickets.inc_daily_snapshot` (per-EAT-day open backlog + flow), populated by `tickets.capture_history()` each ingest. Unlocks **backlog-over-time** | | `migrations/10_inc_history_capture.sql` | History for time-series: `tickets.closure_events` (append-only observed closures) + `tickets.inc_daily_snapshot` (per-EAT-day open backlog + flow), populated by `tickets.capture_history()` each ingest. Unlocks **backlog-over-time** |
| `migrations/15_crq_table.sql` | **Materializes `tickets.crq`** (table + geom trigger + indexes — `01`'s crq section never ran on the live DB) and unpacks `raw` into the same **typed STORED generated columns** as INC's `03` (reuses `tickets.eat_ts()`). Brings CRQ to data-layer parity | | `import_tickets.py` | Drains the **incremental INC change stream** from the `isptickets` bucket (`automations/inc/changes/<EAT-timestamp>.csv`), upserting on `ticket_id` oldest→newest; geocodes clusters + INC locations |
| `pipeline.py` | **Shared engine** — the dataset-agnostic CDC loader (drains `automations/<type>/changes/<EAT-ts>.csv` from the `isptickets` bucket, upserts on `ticket_id` oldest→newest, watermark + per-file archive) and the **cross-dataset** geocoder (clusters + actionable inc/crq locations) |
| `inc/import_inc.py` | INC entrypoint (`python -m inc.import_inc`) — INC `Dataset` config + CLI; runs `tickets.capture_history()` after each `--apply`; hosts the shared geocode commands |
| `crq/import_crq.py` | CRQ entrypoint (`python -m crq.import_crq`) — CRQ `Dataset` config + CLI (ingest only; no history hook yet) |
| `run_migrations.py` | Applies `migrations/*.sql` in order (ledger: `tickets.schema_migrations`) | | `run_migrations.py` | Applies `migrations/*.sql` in order (ledger: `tickets.schema_migrations`) |
| `shared.py` | Minimal DB/logging helpers (self-contained — no tracksolid dependency) | | `shared.py` | Minimal DB/logging helpers (self-contained — no tracksolid dependency) |
@ -100,51 +86,40 @@ python run_migrations.py # apply the schema (idempotent)
## Run ## Run
Run from the repo root so the `inc`/`crq` packages + `pipeline.py`/`shared.py` import.
```bash ```bash
# drain the incremental change streams (every new file oldest→newest, then archive) # drain the incremental INC change stream (every new file oldest→newest, then archive)
python -m inc.import_inc --from-bucket --apply python import_tickets.py --from-bucket --apply
python -m crq.import_crq --from-bucket --apply
# geocode — CROSS-DATASET (covers inc + crq); driven from the INC entrypoint, needs GEOCODER_API_KEY # geocode (needs GEOCODER_API_KEY)
python -m inc.import_inc --geocode-clusters --apply # coarse, once python import_tickets.py --geocode-clusters --apply # coarse, once
python -m inc.import_inc --geocode-locations --apply # precise, actionable inc+crq python import_tickets.py --geocode-locations --apply # precise, actionable INC
# from a local CSV instead of the bucket (dev) # from a local CSV instead of the bucket (dev)
python -m inc.import_inc --inc-csv 2026-06-15T17-00-00.csv --apply python import_tickets.py --inc-csv 2026-06-15T17-00-00.csv --apply
python -m crq.import_crq --crq-csv 2026-06-24T12-55-44.csv --apply
``` ```
Dry-run is the default (omit `--apply`). `--from-bucket` talks to S3 via **boto3** using Dry-run is the default (omit `--apply`). `import_tickets.py --from-bucket` talks to S3
the `RUSTFS_*` env (path-style addressing; no aws-CLI dependency). via **boto3** using the `RUSTFS_*` env (path-style addressing; no aws-CLI dependency).
## Deploy (Coolify) ## Deploy (Coolify)
The repo ships a [`Dockerfile`](Dockerfile) — a small batch worker with no web server. The repo ships a [`Dockerfile`](Dockerfile) — a small batch worker with no web server.
Coolify builds it and keeps the container alive (`CMD tail -f /dev/null`); each ingest Coolify builds it and keeps the container alive (`CMD tail -f /dev/null`); the ingest
runs as its own **Scheduled Task**, not a system crontab: runs as a **Scheduled Task**, not a system crontab:
- **`inc_tickets`:** `python -m inc.import_inc --from-bucket --apply` - **Command:** `python import_tickets.py --from-bucket --apply`
- **`crq_tickets`:** `python -m crq.import_crq --from-bucket --apply` - **Frequency:** `*/20 6-20 * * *` (every 20 min, **06:0020:40 EAT**). This
- **Frequency:** both `*/20 6-20 * * *` (every 20 min, **06:0020:40 EAT**). This
Coolify instance runs scheduled tasks in **EAT (Africa/Nairobi)**, so no UTC Coolify instance runs scheduled tasks in **EAT (Africa/Nairobi)**, so no UTC
conversion is needed. conversion is needed.
- **Env vars** (Coolify → Environment Variables): `DATABASE_URL` (internal DB host), - **Env vars** (Coolify → Environment Variables): `DATABASE_URL` (internal DB host),
`RUSTFS_*` (the `isptickets` bucket credentials), `GEOCODER_*`. The same bucket holds `RUSTFS_*` (now the `isptickets` bucket credentials), `GEOCODER_*`.
both `automations/inc/` and `automations/crq/`, so one credential set serves both tasks.
The watermark makes a run with no new change files a cheap no-op. The watermark makes a run with no new change files a cheap no-op.
For a plain host/VM instead of Coolify, [`run_ingest.sh`](run_ingest.sh) loads `.env` For a plain host/VM instead of Coolify, [`run_ingest.sh`](run_ingest.sh) loads `.env`
and runs **both** ingests; schedule it with a crontab line and runs the ingest; schedule it with a crontab line
(`CRON_TZ=Africa/Nairobi` / `*/20 6-20 * * *`). (`CRON_TZ=Africa/Nairobi` / `*/20 6-20 * * *`).
Full operational runbook — container, env management (encrypted; via the UI or
`artisan tinker`), the **Forgejo → Coolify auto-deploy webhook**, manual deploys, and the
source-bucket cutover procedure — is in
[`docs/deployment-and-operations.md`](docs/deployment-and-operations.md).
### Bucket cutover (one-time reseed) ### Bucket cutover (one-time reseed)
When the source provider moves the feed to a new bucket (e.g. `tickets``isptickets`), When the source provider moves the feed to a new bucket (e.g. `tickets``isptickets`),
@ -154,8 +129,8 @@ newer than the new bucket's first file — which would otherwise be skipped. Poi
which ignores the stored watermark and ingests **every** file in `changes/` oldest→newest: which ignores the stored watermark and ingests **every** file in `changes/` oldest→newest:
```bash ```bash
python -m inc.import_inc --from-bucket --reseed # dry-run first (or -m crq.import_crq) python import_tickets.py --from-bucket --reseed # dry-run first
python -m inc.import_inc --from-bucket --reseed --apply # commit + archive python import_tickets.py --from-bucket --reseed --apply # commit + archive
``` ```
Upserts are idempotent (`ticket_id` PK, rows never deleted) and the new stream's periodic Upserts are idempotent (`ticket_id` PK, rows never deleted) and the new stream's periodic
@ -239,16 +214,6 @@ Findings to keep in mind (see the PRD for detail):
Live: INC ingestion deployed on Coolify (every 20 min `*/20 6-20 * * *` EAT), schema + Live: INC ingestion deployed on Coolify (every 20 min `*/20 6-20 * * *` EAT), schema +
generated columns + geocoding + the `inc_open_sla` view in `tracksolid_db`. generated columns + geocoding + the `inc_open_sla` view in `tracksolid_db`.
Next (Phase 2): time-series analytics (closure rate, MTTR/SLA trends), then FleetNow
**CRQ (this milestone):** data layer + map — `tickets.crq` fed from vehicle **dispatch** off `geog`, and **team closure attribution**. **CRQ** is a
`automations/crq/changes/` by `crq/import_crq.py`, the `tickets.crq` table + typed columns (migration 15), separate future project that will reuse this machinery against `automations/crq/`.
cross-dataset geocoding, and visibility on the Tickets map via `fn_tickets_for_map`.
One-time seed: drain the isptickets CRQ stream (`python -m crq.import_crq --from-bucket
--apply`) — empty watermark + the stream's periodic full-state snapshots converge to
current state — then run the shared geocode once. See
[`docs/deployment-and-operations.md`](docs/deployment-and-operations.md).
Next (Phase 2): bring CRQ to full INC parity once installation-lifecycle semantics are
confirmed — a `crq_open_sla` view, `fn_crq_dashboard`, and CRQ history capture (the INC
analogues of migrations 08/09/10). Then time-series analytics (closure rate, MTTR/SLA
trends), FleetNow vehicle **dispatch** off `geog`, and **team closure attribution**.

View file

View file

@ -1,61 +0,0 @@
"""
crq/import_crq.py Fireside Communications · CRQ (new-installation) ingestion.
Thin entrypoint over the shared engine (`pipeline.py`) for the CRQ dataset:
tickets.crq new-installation requests (FleetOps "Tickets" CRQ tab)
CRQ mirrors INC at the data layer IDENTICAL 32-column CSV schema and the same
incremental CDC change stream automations/crq/changes/<EAT-ts>.csv in the
`isptickets` bucket. This loader upserts on ticket_id, advances the per-dataset
watermark (tickets.import_meta dataset='crq'), and archives each consumed file to
automations/crq/processed/. CRQ flows onto the existing Tickets map via
reporting.fn_tickets_for_map (which already unions tickets.crq).
Scope (current): data layer + map only. CRQ has NO post-apply history capture yet
(installation-lifecycle SLA/backlog semantics differ from incidents a future
migration). Geocoding is CROSS-DATASET and run from the INC entrypoint
(python -m inc.import_inc --geocode-clusters / --geocode-locations) against the
shared gazetteer, which covers both inc and crq.
Usage (needs DATABASE_URL + RUSTFS_* env; see .env.example):
python -m crq.import_crq --from-bucket --apply
python -m crq.import_crq --from-bucket --reseed --apply # one-time bucket cutover
python -m crq.import_crq --crq-csv 2026-06-24T12-55-44.csv --apply
Pre-requisite: migrations applied (run_migrations.py) tickets.crq + its typed
columns (15_crq_table.sql) + geo_clusters/geo_locations + fn_tickets_for_map.
"""
from __future__ import annotations
import argparse
import pipeline
# CRQ has no post-apply hook yet (history capture is INC-only — see module docstring).
DATASET = pipeline.make_dataset("crq", post_apply=None)
def main() -> None:
ap = argparse.ArgumentParser(
description="Ingest CRQ (installation) tickets from CSV (raw-first)")
ap.add_argument("--apply", action="store_true", help="Write to DB (default: dry-run)")
ap.add_argument("--from-bucket", action="store_true",
help="Drain the incremental CRQ change stream (automations/crq/changes/) "
"from the isptickets S3 bucket: every not-yet-processed file "
"oldest→newest, upsert on ticket_id, advance the watermark, archive")
ap.add_argument("--reseed", action="store_true",
help="Ignore the stored watermark and drain every file in changes/ once "
"(one-time bucket cutover / reseed). Use with --from-bucket --apply")
ap.add_argument("--crq-csv", dest="local_csv", default=None,
help="Local CRQ tickets CSV file (dev)")
args = ap.parse_args()
if not (args.from_bucket or args.local_csv):
ap.error("provide --from-bucket or --crq-csv")
pipeline.ingest(DATASET, args)
if __name__ == "__main__":
main()

View file

@ -1,202 +0,0 @@
# Deployment & Operations — fleettickets
Operational runbook for the INC + CRQ ingest pipelines as deployed on **Coolify**
(host `kianiadee@twala.rahamafresh.com`, key `~/.ssh/id_ed25519`). Covers the
container, environment, schedule, auto-deploy webhook, the source-bucket cutover
procedure, and verification. Secrets are referenced by **where to retrieve them**,
never by value.
> **One image, two datasets.** INC and CRQ share an identical 32-column source schema
> and the same `isptickets` bucket; they run as **two Scheduled Tasks** off the one
> container, via thin entrypoints `python -m inc.import_inc` / `python -m crq.import_crq`
> over the shared `pipeline.py` engine. Everything below applies to both unless noted.
## What's deployed
| Thing | Detail |
|---|---|
| Coolify app | **`fleettickets`** — id `15`, uuid `g14mwzo73q20g70vc6fzumya`, build pack `dockerfile`, git `main` |
| Container | built from this repo's `Dockerfile` (`python:3.12-slim`, `TZ=Africa/Nairobi`); kept alive with `tail -f /dev/null` (no web server) |
| Ingest (INC) | Coolify **Scheduled Task** `inc_tickets``python -m inc.import_inc --from-bucket --apply` |
| Ingest (CRQ) | Coolify **Scheduled Task** `crq_tickets``python -m crq.import_crq --from-bucket --apply` |
| DB | `tickets` schema in the shared `tracksolid_db` (internal host `timescale_db:5432`) |
| Source | **`isptickets`** S3 bucket, `automations/{inc,crq}/changes/<EAT-ts>.csv` CDC streams (see `../n8n-s3-ticket-exports.md` and `../README.md`) |
Resolve the live container name (Coolify appends a random suffix):
```bash
ssh -i ~/.ssh/id_ed25519 kianiadee@twala.rahamafresh.com \
'docker ps --filter name=g14mwzo73q20g70vc6fzumya --format "{{.Names}}" | head -1'
```
## Schedule (cron)
Both Scheduled Tasks (`inc_tickets`, `crq_tickets`) run **`*/20 6-20 * * *`** — every
20 min, **06:0020:40 EAT**. Coolify evaluates task cron in the server timezone
(`server_settings.server_timezone` = `Africa/Nairobi`), so **no UTC conversion** — write
EAT directly. The `--from-bucket` run is a cheap no-op when no new change file has arrived
(watermark guard, per dataset), so a dense schedule is safe.
To change the frequency, edit the task in the Coolify UI, or in `coolify-db`:
```sql
UPDATE scheduled_tasks SET frequency = '*/20 6-20 * * *', updated_at = now()
WHERE name IN ('inc_tickets', 'crq_tickets');
```
The `crq_tickets` task is added the same way INC was — in the Coolify UI (Scheduled Tasks
→ Add) with command `python -m crq.import_crq --from-bucket --apply`, container
`fleettickets`, cron `*/20 6-20 * * *`.
Coolify's scheduler re-reads `scheduled_tasks` each minute, so the change is picked up
without a redeploy. Execution history: `scheduled_task_executions`.
> The repo's `Dockerfile`, `run_ingest.sh`, and `README.md` document this same cron for
> the plain-host/VM fallback (`CRON_TZ=Africa/Nairobi`).
## Environment variables
Set on the Coolify app (Environment Variables). Names only — values live in Coolify:
| Var | Purpose |
|---|---|
| `DATABASE_URL` | `tracksolid_db` (internal `timescale_db:5432`) |
| `RUSTFS_ENDPOINT` | `https://s3.rahamafresh.com` |
| `RUSTFS_ACCESS_KEY` / `RUSTFS_SECRET_KEY` | `isptickets` bucket credentials |
| `RUSTFS_REGION` | `us-east-1` |
| `TICKETS_BUCKET` | `isptickets` |
| `GEOCODER_PROVIDER` / `GEOCODER_API_KEY` | keyed geocoder (LocationIQ/OpenCage) |
**Env vars are Laravel-encrypted in `coolify-db` — never raw-`UPDATE` them.** Change them
in the Coolify UI, or via `artisan tinker` (which re-encrypts on save):
```bash
ssh -i ~/.ssh/id_ed25519 kianiadee@twala.rahamafresh.com 'docker exec -i coolify php artisan tinker' <<'PHP'
$e = \App\Models\EnvironmentVariable::where('resourceable_type','App\\Models\\Application')
->where('resourceable_id',15)->where('key','TICKETS_BUCKET')->first();
$e->value = 'isptickets'; $e->save(); echo $e->value.PHP_EOL;
PHP
```
An env change only takes effect after the container is **recreated** (a redeploy — see below),
since Coolify injects env at container create time.
## Deploys
### Auto-deploy (Forgejo → Coolify webhook)
A push to `main` should auto-deploy. This needs **both** the Coolify per-app Auto-Deploy
toggle (Configuration → Advanced) **and** a webhook on the Forgejo repo. The webhook was
missing originally (the toggle alone is not enough); it now exists as hook id `3` on
`kianiadee/fleettickets`:
| Field | Value |
|---|---|
| URL | `https://stage.rahamafresh.com/webhooks/source/gitea/events/manual` |
| Type / content-type | `gitea` / `json` |
| Events / branch filter | `push` / `main` |
| Secret | the app's `manual_webhook_secret_gitea` (Coolify HMAC-validates `X-Hub-Signature-256`) |
Recreate / inspect it via the Forgejo API (auth: `git credential fill`, host
`repo.rahamafresh.com`, basic auth to `/api/v1` — no `tea`/`gh` needed). Get the secret by
decrypting it in Coolify:
```bash
ssh -i ~/.ssh/id_ed25519 kianiadee@twala.rahamafresh.com \
"docker exec -i coolify php artisan tinker --execute=\"echo \\App\\Models\\Application::find(15)->manual_webhook_secret_gitea;\""
```
```bash
# list / test the webhook (USER:PASS from git credential fill)
curl -s -u "$USER:$PASS" https://repo.rahamafresh.com/api/v1/repos/kianiadee/fleettickets/hooks
curl -s -u "$USER:$PASS" -X POST https://repo.rahamafresh.com/api/v1/repos/kianiadee/fleettickets/hooks/3/tests
```
A successful test shows a webhook hit in `docker logs coolify` (no `invalid_signature`
audit) and a new row in `application_deployment_queues`.
### Manual deploy (no push)
Trigger the same action as Coolify's Deploy button via tinker:
```bash
ssh -i ~/.ssh/id_ed25519 kianiadee@twala.rahamafresh.com 'docker exec -i coolify php artisan tinker' <<'PHP'
$app = \App\Models\Application::where('uuid','g14mwzo73q20g70vc6fzumya')->first();
$uuid = new \Visus\Cuid2\Cuid2;
echo json_encode(queue_application_deployment(
application: $app, deployment_uuid: $uuid, force_rebuild: false, is_api: true)).PHP_EOL;
echo $uuid.PHP_EOL;
PHP
```
Watch it: `SELECT id, status, created_at FROM application_deployment_queues WHERE
application_id = '15' ORDER BY created_at DESC LIMIT 3;` (note: `application_id` is the
**numeric id stored as text**).
## Source-bucket cutover (when the provider moves buckets)
If the provider moves the INC feed to a new bucket (as happened `tickets``isptickets`,
2026-06-25):
1. **Inspect** the new bucket (read-only) — confirm `automations/{inc,crq}/changes/` layout,
timestamp range, schema parity.
2. **Update env** (UI or tinker): `RUSTFS_ACCESS_KEY`, `RUSTFS_SECRET_KEY`,
`TICKETS_BUCKET` → the new bucket (endpoint usually unchanged). Both datasets read the
same bucket, so one env change serves both tasks.
3. **Reconcile the DB** to current. The loader drains every `changes/` file newer than the
watermark (`tickets.import_meta.metadata.source_max_key`, **per dataset**), oldest→newest,
upserting on `ticket_id`:
- If the watermark **predates** the new bucket's first file, a normal
`--from-bucket --apply` drains the whole new stream — no reseed needed.
- Otherwise use **`--reseed`** (ignores the watermark, drains all `changes/` once):
`python -m inc.import_inc --from-bucket --reseed --apply` (see README "Bucket cutover").
The new stream's periodic full-state re-emissions make this converge even across the
cutover gap. Idempotent upserts + never-delete make it non-destructive.
- For a one-off, you can run it in the live container with the new creds inlined:
`docker exec -e TICKETS_BUCKET=… -e RUSTFS_ACCESS_KEY=… -e RUSTFS_SECRET_KEY=… <container>
sh -c "cd /app && python -m inc.import_inc --from-bucket --apply"`.
4. **Re-geocode** new clusters/locations: `python -m inc.import_inc --geocode-clusters --apply`
then `--geocode-locations --apply` (cross-dataset; existing gazetteer persists; only new
keys are looked up).
5. **Redeploy** so the Scheduled Task's container picks up the new env (push `main` → webhook,
or manual deploy). Old bucket is left untouched for rollback.
## Bringing CRQ online (one-time seed)
CRQ was added 2026-06-25 (data layer + map). Migration `15_crq_table.sql` **creates**
`tickets.crq` (the live DB's `01` predated its crq section, so the table never existed)
plus the typed columns. To seed it from zero on the live DB — once the code + migration are
applied (`run_migrations.py`; on the live cutover it was applied out-of-band via the running
container, see below):
1. **Verify** the migration applied: `SELECT 1 FROM tickets.schema_migrations WHERE
filename='15_crq_table.sql';` and `\d tickets.crq` shows the table + typed columns.
2. **Seed** from isptickets (empty `crq` watermark → drains all `automations/crq/changes/`
files oldest→newest; the stream's periodic full-state snapshots converge to current
state — same convergence the INC cutover relied on, so **no `--reseed` needed**):
```bash
python -m crq.import_crq --from-bucket # dry-run first ("N of N change file(s)…")
python -m crq.import_crq --from-bucket --apply # commit + archive to crq/processed/
```
(Or in the live container with `docker exec … sh -c "cd /app && python -m crq.import_crq
--from-bucket --apply"`.)
3. **Geocode** (cross-dataset; most clusters already resolved from INC, so few new lookups):
`python -m inc.import_inc --geocode-clusters --apply` then `--geocode-locations --apply`.
4. **Confirm** CRQ on the map: `SELECT reporting.fn_tickets_for_map() -> 'summary';` shows a
non-zero `crq` count. The `crq_tickets` Scheduled Task then keeps it current.
## Verification
```bash
DB=$(docker ps --filter name=timescale_db --format "{{.Names}}" | head -1)
docker exec -i "$DB" psql -U postgres -d tracksolid_db <<'SQL'
-- watermark + freshness
SELECT export_type, records_ingested, ingested_at, metadata->>'source_max_key'
FROM tickets.import_meta WHERE dataset='inc';
-- counts
SELECT count(*) total_inc,
count(*) FILTER (WHERE (raw->>'is_actionable')::boolean) AS open
FROM tickets.inc;
-- map payload sanity
SELECT reporting.fn_tickets_for_map() -> 'summary' ->> 'ticket_count';
SQL
```
- New bucket `changes/` empties as files move to `automations/inc/processed/`.
- A plain `--from-bucket --apply` reports "nothing new" until the next change file lands.
- FleetOps Tickets map freshness reflects the new `ingested_at`.
## Rollback
- **Bucket:** revert the three env vars to the old bucket + creds and redeploy. The old
bucket and its `processed/` history are untouched; upserts are idempotent and rows are
never deleted, so re-running is safe.
- **Cron:** `UPDATE scheduled_tasks SET frequency = <old> WHERE name='inc_tickets';`

View file

@ -3,25 +3,14 @@
What is actually built and deployed, as of the Phase-1 completion. Companion to What is actually built and deployed, as of the Phase-1 completion. Companion to
`docs/phase-1-ingestion.md` (plan) and `docs/phase-2-dashboard.md` (next). `docs/phase-1-ingestion.md` (plan) and `docs/phase-2-dashboard.md` (next).
## Pipeline (`pipeline.py` engine + `inc/`,`crq/` entrypoints) ## Pipeline (`import_tickets.py`)
The dataset-agnostic CDC engine lives in **`pipeline.py`**, parameterized by a small - **Source:** newest `automations/inc/<EAT-timestamp>.csv` in the rustfs `tickets`
`Dataset` config (name, table, `automations/<type>/changes|processed/` prefixes, key bucket (endpoint `https://s3.rahamafresh.com`, path-style, region `us-east-1`).
regex, optional `post_apply` hook). Two thin entrypoints supply that config and the CLI:
**`inc/import_inc.py`** (`python -m inc.import_inc`, `post_apply=capture_history`) and
**`crq/import_crq.py`** (`python -m crq.import_crq`, no history hook). INC and CRQ share an
**identical 32-column source schema**, so the engine is fully shared; geocoding is
**cross-dataset** (one gazetteer/budget, unions `tickets.inc` + `tickets.crq`) and is run
from the INC entrypoint.
- **Source:** the incremental CDC stream `automations/<inc|crq>/changes/<EAT-timestamp>.csv`
in the **`isptickets`** S3 bucket (endpoint `https://s3.rahamafresh.com`, path-style,
region `us-east-1`; was the `tickets` bucket before the 2026-06-25 cutover).
- **S3 access via boto3** (no aws-CLI dependency): `list_objects_v2` (paginator), - **S3 access via boto3** (no aws-CLI dependency): `list_objects_v2` (paginator),
`get_object`, `copy_object` + `delete_object` for archiving. `get_object`, `copy_object` + `delete_object` for archiving.
- **Watermark:** drains every `changes/` file newer than - **Skip-if-unchanged:** newest S3 **ETag** vs `tickets.import_meta.metadata.source_etag`;
`tickets.import_meta.metadata.source_max_key`, oldest→newest; reruns with no new file equal → skip the DB write (the export re-emits identical content most hours).
are a cheap no-op. `--reseed` ignores the watermark for a one-time bucket cutover.
- **Cleaning:** drop `is_alarm=true` rows + the `EXPORT STOPPED…` sentinel; drop - **Cleaning:** drop `is_alarm=true` rows + the `EXPORT STOPPED…` sentinel; drop
`week_start`/`week_end`, `source_s3_bucket`/`source_s3_key`/`source_snapshot_id`, `week_start`/`week_end`, `source_s3_bucket`/`source_s3_key`/`source_snapshot_id`,
`department`, `source_type`; normalize `region`→lowercase, `raw_status`→UPPERCASE. `department`, `source_type`; normalize `region`→lowercase, `raw_status`→UPPERCASE.
@ -33,11 +22,8 @@ from the INC entrypoint.
- **History capture:** after each `--apply` run (ingest or skip), calls - **History capture:** after each `--apply` run (ingest or skip), calls
`tickets.capture_history()` → appends new closures + upserts today's backlog `tickets.capture_history()` → appends new closures + upserts today's backlog
snapshot. snapshot.
- CLI (`inc`): `--from-bucket` (drain the INC change stream), `--reseed` (ignore the - CLI: `--from-bucket` (newest INC csv), `--inc-csv <file>` (local dev), `--apply`
watermark; one-time bucket cutover), `--inc-csv <file>` (local dev), `--apply` (else (else dry-run), `--geocode-clusters`, `--geocode-locations`, `--capture-history`.
dry-run), `--geocode-clusters`, `--geocode-locations`, `--capture-history`.
- CLI (`crq`): `--from-bucket`, `--reseed`, `--crq-csv <file>`, `--apply` (ingest only;
geocoding + history are not on the CRQ entrypoint).
## Schema / migrations (`tracksolid_db`, applied via `run_migrations.py`) ## Schema / migrations (`tracksolid_db`, applied via `run_migrations.py`)
@ -53,8 +39,6 @@ from the INC entrypoint.
| 08_inc_open_sla_view | `tickets.inc_open_sla` view (open tickets + derived SLA) | | 08_inc_open_sla_view | `tickets.inc_open_sla` view (open tickets + derived SLA) |
| 09_inc_dashboard_fn | **built**`reporting.fn_inc_dashboard(cluster, status, window, from, to)`: one JSON payload (open GeoJSON + windowed closed GeoJSON + metrics + freshness) for the FleetOps live INC map. See `docs/phase-2-dashboard.md` | | 09_inc_dashboard_fn | **built**`reporting.fn_inc_dashboard(cluster, status, window, from, to)`: one JSON payload (open GeoJSON + windowed closed GeoJSON + metrics + freshness) for the FleetOps live INC map. See `docs/phase-2-dashboard.md` |
| 10_inc_history_capture | **built**`tickets.closure_events` (append-only observed closures) + `tickets.inc_daily_snapshot` (per-EAT-day open backlog + flow) + `tickets.capture_history()`; the ingest calls it each `--apply` run. Unlocks backlog-over-time | | 10_inc_history_capture | **built**`tickets.closure_events` (append-only observed closures) + `tickets.inc_daily_snapshot` (per-EAT-day open backlog + flow) + `tickets.capture_history()`; the ingest calls it each `--apply` run. Unlocks backlog-over-time |
| 12_inc_dashboard_by_owner | **built** — owner/team breakdown extension to `fn_inc_dashboard` |
| 15_crq_table | **built** — materializes `tickets.crq` (table + geom trigger + indexes; `01`'s crq section never ran on the live DB) + the typed STORED generated columns from `03` (reuses `tickets.eat_ts()`). Data-layer parity for the CRQ tab |
`tickets.inc` columns: `ticket_id` (PK), `raw` (jsonb, source of truth), `tickets.inc` columns: `ticket_id` (PK), `raw` (jsonb, source of truth),
`normalized_status`/`raw_status`, `bucket`, `is_actionable`, `cluster`/`region`/ `normalized_status`/`raw_status`, `bucket`, `is_actionable`, `cluster`/`region`/
@ -69,16 +53,11 @@ from the INC entrypoint.
- **Coolify** app built from this repo's `Dockerfile` (`python:3.12-slim`, - **Coolify** app built from this repo's `Dockerfile` (`python:3.12-slim`,
`TZ=Africa/Nairobi`, keep-alive `tail -f /dev/null`). Separate from the FleetOps `TZ=Africa/Nairobi`, keep-alive `tail -f /dev/null`). Separate from the FleetOps
web app (`fleet-ops-staging`). web app (`fleet-ops-staging`).
- **Scheduled Tasks (two):** `inc_tickets` → `python -m inc.import_inc --from-bucket - **Scheduled Task:** `python import_tickets.py --from-bucket --apply`, cron
--apply` and `crq_tickets``python -m crq.import_crq --from-bucket --apply`, both cron
`*/20 6-20 * * *` in **EAT** (Coolify runs tasks in EAT — no UTC conversion). `*/20 6-20 * * *` in **EAT** (Coolify runs tasks in EAT — no UTC conversion).
- **Env vars** (Coolify): `DATABASE_URL` (internal DB host), `RUSTFS_*` - **Env vars** (Coolify): `DATABASE_URL` (internal DB host), `RUSTFS_*`, `GEOCODER_*`.
(`isptickets` bucket — serves both inc + crq), `GEOCODER_*`.
- For a plain host/VM, `run_ingest.sh` + a crontab line is the alternative. - For a plain host/VM, `run_ingest.sh` + a crontab line is the alternative.
Full ops runbook (env management, the Forgejo → Coolify auto-deploy webhook, manual
deploys, bucket cutover, verification): **`docs/deployment-and-operations.md`**.
## State at hand-off ## State at hand-off
- `tickets.inc` ≈ 21,312 rows (current non-alarm INC + a few aged-out history rows); - `tickets.inc` ≈ 21,312 rows (current non-alarm INC + a few aged-out history rows);
@ -107,12 +86,5 @@ Phase 2 (built): `fn_inc_dashboard` read-API → FleetOps live map (open + close
overlay + metrics); history capture (`closure_events` + `inc_daily_snapshot`) for overlay + metrics); history capture (`closure_events` + `inc_daily_snapshot`) for
backlog/closure trends. Remaining: `dashboard_api` endpoint + FleetOps SPA (other backlog/closure trends. Remaining: `dashboard_api` endpoint + FleetOps SPA (other
repos; see `docs/dashboard-api-contract.md`), FleetNow **dispatch** off `geog`, repos; see `docs/dashboard-api-contract.md`), FleetNow **dispatch** off `geog`,
**team closure attribution**. **team closure attribution**. **CRQ** = separate future project reusing this
machinery against `automations/crq/`.
**CRQ** (this milestone): the shared engine now feeds `tickets.crq` from
`automations/crq/changes/` (`crq/import_crq.py`), with the `tickets.crq` table + typed columns (migration 15) and
cross-dataset geocoding — CRQ shows on the Tickets map via `fn_tickets_for_map` (which
already unions it) and gets its own FleetOps tab. Deferred to a follow-up once
installation-lifecycle semantics are confirmed: the CRQ analogues of migrations
08/09/10 — `crq_open_sla`, `fn_crq_dashboard`, and CRQ history capture (`tickets.crq`
currently has **no** `post_apply` hook).

View file

@ -82,11 +82,6 @@ Deployed on **Coolify** (own app, `Dockerfile`, keep-alive worker). Ingest runs
**Scheduled Task**: `python import_tickets.py --from-bucket --apply`, cron **Scheduled Task**: `python import_tickets.py --from-bucket --apply`, cron
`15 7-19 * * *` in **EAT**. Env: `DATABASE_URL`, `RUSTFS_*`, `GEOCODER_*`. `15 7-19 * * *` in **EAT**. Env: `DATABASE_URL`, `RUSTFS_*`, `GEOCODER_*`.
> **Superseded** (historical Phase-1 plan). As built: the loader is now the shared
> `pipeline.py` engine with thin entrypoints (`python -m inc.import_inc` / `-m
> crq.import_crq`), running as **two** Scheduled Tasks at cron `*/20 6-20 * * *`. See
> `implementation.md` and `deployment-and-operations.md`.
## Data-quality findings (carried into Phase 2) ## Data-quality findings (carried into Phase 2)
- Source `sla_status` ≠ a plain 48h rule, and `mttr` is not wall-clock — pin the - Source `sla_status` ≠ a plain 48h rule, and `mttr` is not wall-clock — pin the

View file

@ -1,17 +1,12 @@
""" """
pipeline.py Fireside Communications · generic ticket ingestion engine (raw-first) import_tickets.py Fireside Communications · INC ticket ingestion (raw-first)
The dataset-agnostic core shared by the per-type entrypoints: Loads the client's field-ops INC ticket snapshots into the `tickets` schema — the
inc/import_inc.py -> tickets.inc (incidents / customer faults) source of the FleetOps "Tickets" map.
crq/import_crq.py -> tickets.crq (new-installation requests) tickets.inc incidents / customer faults
Both datasets share an IDENTICAL flat-CSV schema and the same CDC change stream, STRICTLY INC: CRQ (new-installation) exports are out of scope and not processed
so the only differences are the table, the S3 prefixes, the import_meta dataset here. `tickets.crq` stays in the schema but is not fed by this pipeline.
key, and an optional post-apply hook (INC captures closure/backlog history; CRQ
does not yet). Those are carried by the `Dataset` config; everything else here is
generic. Geocoding is inherently CROSS-DATASET (one gazetteer, one geocoder
budget): geocode_clusters / geocode_locations / resolve operate on BOTH tables and
are driven from a single entrypoint (the INC one) never duplicated per dataset.
RAW-FIRST: each row stores only `ticket_id` + `raw` (the source record as jsonb). RAW-FIRST: each row stores only `ticket_id` + `raw` (the source record as jsonb).
Everything downstream reads from `raw` (resilient to source schema drift). The DB Everything downstream reads from `raw` (resilient to source schema drift). The DB
@ -19,33 +14,50 @@ derives `geom` (see migrations): feed coords (raw lat/lng) -> location geocode
(tickets.geo_locations) -> cluster centroid (tickets.geo_clusters) -> none. (tickets.geo_locations) -> cluster centroid (tickets.geo_clusters) -> none.
Source data: the n8n S3 export writes CSV files to the `isptickets` bucket under Source data: the n8n S3 export writes CSV files to the `isptickets` bucket under
automations/<dataset>/changes/<EAT-timestamp>.csv (e.g. 2026-06-24T09-55-44.csv) automations/inc/changes/<EAT-timestamp>.csv (e.g. 2026-06-24T09-55-44.csv)
This is an INCREMENTAL (CDC) stream: the first file is a full current-state This is an INCREMENTAL (CDC) stream: the first file is a full current-state
baseline, and every later file holds only the rows that CHANGED since the prior baseline, and every later file holds only the rows that CHANGED since the prior
export (with periodic full-state re-emissions). Deletions are never emitted. Every export (new + updated tickets, keyed by ticket_id; deletions are never emitted).
file shares the identical flat-CSV schema. We ingest EVERY not-yet-processed file Every file shares the identical flat-CSV schema (header + rows). We ingest EVERY
in ASCENDING timestamp order (baseline first, then each delta) taking only the not-yet-processed file in ASCENDING timestamp order (baseline first, then each
newest would silently drop the intermediate deltas: delta) taking only the newest would silently drop the intermediate deltas:
- drop is_alarm=true rows + the "EXPORT STOPPED…" truncation-sentinel row; - drop is_alarm=true rows + the "EXPORT STOPPED…" truncation-sentinel row;
- drop derivable / provenance / zero-info columns (see DROP_FIELDS); - drop derivable / provenance / zero-info columns (see DROP_FIELDS);
- normalize region -> lowercase, raw_status -> UPPERCASE; - normalize region -> lowercase, raw_status -> UPPERCASE;
- upsert on ticket_id (PRIMARY KEY no duplication; never delete, so closure - upsert on ticket_id (PRIMARY KEY no duplication; never delete, so closure
history accumulates), and advance the watermark in tickets.import_meta history accumulates), and advance the watermark in tickets.import_meta
(metadata->>'source_max_key' = newest file applied) so reruns skip what's done; (metadata->>'source_max_key' = newest file applied) so reruns skip what's done;
- on success, MOVE each file to automations/<dataset>/processed/ (copy + delete). - on success, MOVE each file to automations/inc/processed/ (copy + delete).
Geocoding (two layers, both via a KEYED provider public Nominatim rate-limits):
--geocode-clusters one coordinate per cluster (coarse fallback; ~50 lookups)
--geocode-locations precise per-location for ACTIONABLE INC tickets: parses the
real place out of location_name (region+cluster+location_name,
network codes stripped), geocodes it, caches in
tickets.geo_locations, then re-resolves geoms.
Provider/key from env: GEOCODER_PROVIDER (locationiq|opencage), GEOCODER_API_KEY.
Usage (needs DATABASE_URL + RUSTFS_* + GEOCODER_* env; see .env.example):
python import_tickets.py --from-bucket --apply
python import_tickets.py --from-bucket --reseed --apply # one-time bucket cutover
python import_tickets.py --inc-csv 2026-06-15T17-00-00.csv --apply
python import_tickets.py --geocode-clusters --apply
python import_tickets.py --geocode-locations --apply
Pre-requisite: migration applied (run_migrations.py) tickets.inc/crq +
geo_clusters + geo_locations + reporting.fn_tickets_for_map.
""" """
from __future__ import annotations from __future__ import annotations
import io import argparse
import csv import csv
import io
import math import math
import os import os
import re import re
import time import time
from collections.abc import Callable
from dataclasses import dataclass
from datetime import datetime, timezone, timedelta from datetime import datetime, timezone, timedelta
import boto3 import boto3
@ -55,10 +67,14 @@ from botocore.config import Config as BotoConfig
from shared import clean, get_conn, get_logger from shared import clean, get_conn, get_logger
log = get_logger("pipeline") log = get_logger("import_tickets")
# ── shared ingestion config ───────────────────────────────────────────────────── # ── INC ingestion config ──────────────────────────────────────────────────────
_TABLE = "tickets.inc"
_DATASET = "inc"
_BUCKET = os.getenv("TICKETS_BUCKET", "isptickets") _BUCKET = os.getenv("TICKETS_BUCKET", "isptickets")
_INC_PREFIX = "automations/inc/changes/" # the incremental (CDC) change stream
_PROCESSED_PREFIX = "automations/inc/processed/"
_EAT = timezone(timedelta(hours=3)) # Africa/Nairobi — filenames + data are EAT _EAT = timezone(timedelta(hours=3)) # Africa/Nairobi — filenames + data are EAT
# Garbage row the source leaks (commonly the first data line): its ticket_id is the # Garbage row the source leaks (commonly the first data line): its ticket_id is the
@ -75,6 +91,11 @@ DROP_FIELDS = frozenset({
"department", "source_type", "department", "source_type",
}) })
# Only files matching automations/inc/changes/<EAT-timestamp>.csv — the incremental
# stream (NOT processed/, NOT the leftover latest.csv/, latest.json/, full/ prefixes).
_CHANGE_KEY_RE = re.compile(
r"^automations/inc/changes/(\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2})\.csv$")
# Geocoder (keyed) — public Nominatim rate-limits bulk, so we use LocationIQ/OpenCage. # Geocoder (keyed) — public Nominatim rate-limits bulk, so we use LocationIQ/OpenCage.
_PROVIDER = os.getenv("GEOCODER_PROVIDER", "locationiq").lower() _PROVIDER = os.getenv("GEOCODER_PROVIDER", "locationiq").lower()
_API_KEY = os.getenv("GEOCODER_API_KEY", "") _API_KEY = os.getenv("GEOCODER_API_KEY", "")
@ -82,34 +103,12 @@ _GEOCODE_INTERVAL_S = float(os.getenv("GEOCODER_MIN_INTERVAL_S", "1.1"))
_last_geocode_at = 0.0 _last_geocode_at = 0.0
# ── dataset config (per ticket type) ────────────────────────────────────────────
@dataclass(frozen=True)
class Dataset:
"""All that distinguishes one ticket type from another in the generic engine."""
name: str # 'inc' | 'crq' (import_meta.dataset)
table: str # 'tickets.inc' | 'tickets.crq'
change_prefix: str # 'automations/<name>/changes/'
processed_prefix: str # 'automations/<name>/processed/'
key_regex: re.Pattern # matches a <prefix><EAT-ts>.csv key
post_apply: Callable[[], None] | None = None # e.g. capture_history (INC only)
def make_dataset(name: str, post_apply: Callable[[], None] | None = None) -> Dataset:
"""Build the standard Dataset for a ticket type (inc/crq) — only the name varies."""
return Dataset(
name=name,
table=f"tickets.{name}",
change_prefix=f"automations/{name}/changes/",
processed_prefix=f"automations/{name}/processed/",
# only automations/<name>/changes/<EAT-timestamp>.csv — the incremental stream
# (NOT processed/, NOT the leftover latest.csv/, latest.json/, full/ prefixes).
key_regex=re.compile(
rf"^automations/{name}/changes/(\d{{4}}-\d{{2}}-\d{{2}}T\d{{2}}-\d{{2}}-\d{{2}})\.csv$"),
post_apply=post_apply,
)
# ── data loading (CSV · incremental CDC change stream · per-file watermark) ───── # ── data loading (CSV · incremental CDC change stream · per-file watermark) ─────
# The n8n export writes an INCREMENTAL change stream to the `isptickets` bucket under
# automations/inc/changes/<EAT-timestamp>.csv: a first full-state baseline, then files
# holding only the rows that CHANGED (with periodic full-state re-emissions). We drain
# EVERY not-yet-processed file oldest→newest, upsert on ticket_id, advance the watermark
# (tickets.import_meta.metadata->>'source_max_key') per file, and archive it to processed/.
# S3 access is via boto3 (no aws-CLI dependency → runs cleanly in a slim container). # S3 access is via boto3 (no aws-CLI dependency → runs cleanly in a slim container).
def _s3_client(): def _s3_client():
"""boto3 S3 client for the S3 endpoint (force path-style addressing).""" """boto3 S3 client for the S3 endpoint (force path-style addressing)."""
@ -124,9 +123,9 @@ def _s3_client():
) )
def _ts_from_key(ds: Dataset, key: str) -> datetime | None: def _ts_from_key(key: str) -> datetime | None:
"""EAT timestamp embedded in an automations/<ds>/changes/<ts>.csv key (or None).""" """EAT timestamp embedded in an automations/inc/changes/<ts>.csv key (or None)."""
m = ds.key_regex.match(key) m = _CHANGE_KEY_RE.match(key)
if not m: if not m:
return None return None
try: # regex shape can match an impossible date (e.g. 9999-99-99T…) — don't crash the sort try: # regex shape can match an impossible date (e.g. 9999-99-99T…) — don't crash the sort
@ -135,12 +134,12 @@ def _ts_from_key(ds: Dataset, key: str) -> datetime | None:
return None return None
def _list_csvs(s3, ds: Dataset) -> list[tuple[str, str]]: def _list_inc_csvs(s3) -> list[tuple[str, str]]:
"""[(key, etag)] for every changes/<ts>.csv of this dataset (excludes processed/ + dirs).""" """[(key, etag)] for every automations/inc/changes/<ts>.csv (excludes processed/ + dirs)."""
out: list[tuple[str, str]] = [] out: list[tuple[str, str]] = []
for page in s3.get_paginator("list_objects_v2").paginate(Bucket=_BUCKET, Prefix=ds.change_prefix): for page in s3.get_paginator("list_objects_v2").paginate(Bucket=_BUCKET, Prefix=_INC_PREFIX):
for it in page.get("Contents", []): for it in page.get("Contents", []):
if ds.key_regex.match(it["Key"]): if _CHANGE_KEY_RE.match(it["Key"]):
out.append((it["Key"], (it.get("ETag") or "").strip('"'))) out.append((it["Key"], (it.get("ETag") or "").strip('"')))
return out return out
@ -150,22 +149,23 @@ def _get_text(s3, key: str) -> str:
return s3.get_object(Bucket=_BUCKET, Key=key)["Body"].read().decode("utf-8") return s3.get_object(Bucket=_BUCKET, Key=key)["Body"].read().decode("utf-8")
def _last_processed_ts(ds: Dataset) -> datetime | None: def _last_processed_ts() -> datetime | None:
"""Watermark: EAT timestamp of the newest change file already ingested for this dataset. """Watermark: EAT timestamp of the newest change file already ingested.
Read from tickets.import_meta (metadata->>'source_max_key', advanced per file as Read from tickets.import_meta (metadata->>'source_max_key', advanced per file as
we drain changes/ oldestnewest). None when nothing has been ingested via the we drain changes/ oldestnewest). None when nothing has been ingested via the
changes stream yet (e.g. a brand-new dataset, or the first run after the source changes stream yet (e.g. the first run after the source switched to incremental,
switched buckets) then every file currently in changes/ is processed. where the stored key is an old full-snapshot path) then every file currently in
changes/ is processed.
""" """
with get_conn() as conn: with get_conn() as conn:
with conn.cursor() as cur: with conn.cursor() as cur:
cur.execute( cur.execute(
"SELECT metadata->>'source_max_key' FROM tickets.import_meta WHERE dataset = %s", "SELECT metadata->>'source_max_key' FROM tickets.import_meta WHERE dataset = %s",
(ds.name,), (_DATASET,),
) )
row = cur.fetchone() row = cur.fetchone()
return _ts_from_key(ds, row[0]) if row and row[0] else None return _ts_from_key(row[0]) if row and row[0] else None
def _parse_csv(text: str) -> list[dict]: def _parse_csv(text: str) -> list[dict]:
@ -177,10 +177,10 @@ def _load_csv_local(path: str) -> list[dict]:
return list(csv.DictReader(f)) return list(csv.DictReader(f))
def _move_processed(s3, ds: Dataset, keys: list[str]) -> None: def _move_processed(s3, keys: list[str]) -> None:
"""Archive listed csv objects to automations/<ds>/processed/ (copy + delete).""" """Archive listed INC csv objects to automations/inc/processed/ (copy + delete)."""
for key in keys: for key in keys:
dst = ds.processed_prefix + key.rsplit("/", 1)[-1] dst = _PROCESSED_PREFIX + key.rsplit("/", 1)[-1]
s3.copy_object(Bucket=_BUCKET, CopySource={"Bucket": _BUCKET, "Key": key}, Key=dst) s3.copy_object(Bucket=_BUCKET, CopySource={"Bucket": _BUCKET, "Key": key}, Key=dst)
s3.delete_object(Bucket=_BUCKET, Key=key) s3.delete_object(Bucket=_BUCKET, Key=key)
log.info("archived %s -> %s", key, dst) log.info("archived %s -> %s", key, dst)
@ -206,8 +206,8 @@ def _prepare(row: dict) -> dict:
# ── upsert (raw-first) ──────────────────────────────────────────────────────── # ── upsert (raw-first) ────────────────────────────────────────────────────────
def _record_meta(cur, ds: Dataset, meta: dict, records_ingested: int) -> None: def _record_meta(cur, meta: dict, records_ingested: int) -> None:
"""Upsert the snapshot metadata (powers map freshness + holds source_max_key). """Upsert the INC snapshot metadata (powers map freshness + holds source_etag).
Runs on the caller's cursor so the row upsert and the meta write commit Runs on the caller's cursor so the row upsert and the meta write commit
together a half-written state (rows in, meta stale) breaks skip-if-unchanged. together a half-written state (rows in, meta stale) breaks skip-if-unchanged.
@ -225,58 +225,54 @@ def _record_meta(cur, ds: Dataset, meta: dict, records_ingested: int) -> None:
records_ingested = EXCLUDED.records_ingested, records_ingested = EXCLUDED.records_ingested,
n8n_execution_id = EXCLUDED.n8n_execution_id, metadata = EXCLUDED.metadata, n8n_execution_id = EXCLUDED.n8n_execution_id, metadata = EXCLUDED.metadata,
ingested_at = now()""", ingested_at = now()""",
(ds.name, clean(meta.get("export_type")), clean(meta.get("exported_at")), (_DATASET, clean(meta.get("export_type")), clean(meta.get("exported_at")),
clean(meta.get("snapshot_date")), clean(meta.get("source_schema")), clean(meta.get("snapshot_date")), clean(meta.get("source_schema")),
clean(meta.get("source_table")), meta.get("row_count"), records_ingested, clean(meta.get("source_table")), meta.get("row_count"), records_ingested,
clean(meta.get("n8n_execution_id")), psycopg2.extras.Json(meta)), clean(meta.get("n8n_execution_id")), psycopg2.extras.Json(meta)),
) )
def upsert(ds: Dataset, rows: list[dict], apply: bool, meta: dict | None = None) -> int: def upsert(rows: list[dict], apply: bool, meta: dict | None = None) -> int:
meta = meta or {} meta = meta or {}
kept = [r for r in rows if _keep_row(r)] kept = [r for r in rows if _keep_row(r)]
payload = [(clean(r["ticket_id"]), psycopg2.extras.Json(_prepare(r))) for r in kept] payload = [(clean(r["ticket_id"]), psycopg2.extras.Json(_prepare(r))) for r in kept]
log.info("%s: %d rows read, %d kept, %d dropped (alarm/sentinel/no-id)", log.info("%s: %d rows read, %d kept, %d dropped (alarm/sentinel/no-id)",
ds.table, len(rows), len(payload), len(rows) - len(payload)) _TABLE, len(rows), len(payload), len(rows) - len(payload))
if not apply: if not apply:
log.info("DRY-RUN — nothing written to %s. Use --apply.", ds.table) log.info("DRY-RUN — nothing written to %s. Use --apply.", _TABLE)
return len(payload) return len(payload)
with get_conn() as conn: with get_conn() as conn:
with conn.cursor() as cur: with conn.cursor() as cur:
psycopg2.extras.execute_values( psycopg2.extras.execute_values(
cur, cur,
f"INSERT INTO {ds.table} (ticket_id, raw) VALUES %s " f"INSERT INTO {_TABLE} (ticket_id, raw) VALUES %s "
"ON CONFLICT (ticket_id) DO UPDATE SET raw = EXCLUDED.raw, ingested_at = now()", "ON CONFLICT (ticket_id) DO UPDATE SET raw = EXCLUDED.raw, ingested_at = now()",
payload, page_size=500, payload, page_size=500,
) )
# same transaction as the upsert: rows + snapshot meta commit atomically # same transaction as the upsert: rows + snapshot meta commit atomically
_record_meta(cur, ds, meta, len(payload)) _record_meta(cur, meta, len(payload))
log.info("upserted %d rows into %s", len(payload), ds.table) log.info("upserted %d rows into %s", len(payload), _TABLE)
return len(payload) return len(payload)
def capture_history() -> None: def _capture_history() -> None:
"""Append new closures + upsert today's backlog snapshot (tickets.capture_history). """Append new closures + upsert today's backlog snapshot (tickets.capture_history)."""
INC-only today (CRQ install-lifecycle history is a future migration); wired as
the INC Dataset's post_apply hook.
"""
with get_conn() as conn: with get_conn() as conn:
with conn.cursor() as cur: with conn.cursor() as cur:
cur.execute("SELECT tickets.capture_history()") cur.execute("SELECT tickets.capture_history()")
log.info("history: %s", cur.fetchone()[0]) log.info("history: %s", cur.fetchone()[0])
def ingest(ds: Dataset, args) -> None: def ingest(args) -> None:
# Local-file path (dev): ingest a single CSV, no bucket / no archive / no history. # Local-file path (dev): ingest a single CSV, no bucket / no archive.
if args.local_csv: if args.inc_csv:
rows = _load_csv_local(args.local_csv) rows = _load_csv_local(args.inc_csv)
name = os.path.basename(args.local_csv) name = os.path.basename(args.inc_csv)
ts = _ts_from_key(ds, ds.change_prefix + name) ts = _ts_from_key(_INC_PREFIX + name)
meta = {"export_type": "full", "source_s3_key": name, "row_count": len(rows)} meta = {"export_type": "full", "source_s3_key": name, "row_count": len(rows)}
if ts: if ts:
meta["exported_at"] = ts.isoformat() meta["exported_at"] = ts.isoformat()
upsert(ds, rows, args.apply, meta=meta) upsert(rows, args.apply, meta=meta)
return return
# --from-bucket: ingest EVERY not-yet-processed change file, oldest→newest # --from-bucket: ingest EVERY not-yet-processed change file, oldest→newest
@ -284,11 +280,11 @@ def ingest(ds: Dataset, args) -> None:
# the file is archived PER file, so a mid-run failure leaves a consistent state # the file is archived PER file, so a mid-run failure leaves a consistent state
# (folder state matches the watermark) and the next run resumes cleanly. # (folder state matches the watermark) and the next run resumes cleanly.
s3 = _s3_client() s3 = _s3_client()
listing = _list_csvs(s3, ds) listing = _list_inc_csvs(s3)
if not listing: if not listing:
log.info("no %s change files under %s — nothing to do", ds.name, ds.change_prefix) log.info("no INC change files under %s — nothing to do", _INC_PREFIX)
return return
listing.sort(key=lambda ke: _ts_from_key(ds, ke[0]) or datetime.min.replace(tzinfo=_EAT)) listing.sort(key=lambda ke: _ts_from_key(ke[0]) or datetime.min.replace(tzinfo=_EAT))
# watermark: skip anything at/older than the newest file already applied. Archiving # watermark: skip anything at/older than the newest file already applied. Archiving
# normally empties changes/, but this guards a failed archive from re-applying. # normally empties changes/, but this guards a failed archive from re-applying.
@ -296,25 +292,24 @@ def ingest(ds: Dataset, args) -> None:
# for a one-time bucket cutover, where the stored key points at the old bucket's stream # for a one-time bucket cutover, where the stored key points at the old bucket's stream
# and its timestamp may be newer than the new bucket's first file. Crash-safe: each file # and its timestamp may be newer than the new bucket's first file. Crash-safe: each file
# still advances source_max_key + archives per file, so a plain rerun resumes cleanly. # still advances source_max_key + archives per file, so a plain rerun resumes cleanly.
last_ts = None if args.reseed else _last_processed_ts(ds) last_ts = None if args.reseed else _last_processed_ts()
_floor = datetime.min.replace(tzinfo=_EAT) _floor = datetime.min.replace(tzinfo=_EAT)
pending = [(k, e) for k, e in listing pending = [(k, e) for k, e in listing
if last_ts is None or (_ts_from_key(ds, k) or _floor) > last_ts] if last_ts is None or (_ts_from_key(k) or _floor) > last_ts]
if not pending: if not pending:
log.info("all %d %s change file(s) already processed (watermark %s) — nothing new", log.info("all %d change file(s) already processed (watermark %s) — nothing new",
len(listing), ds.name, last_ts and last_ts.isoformat()) len(listing), last_ts and last_ts.isoformat())
if args.apply: if args.apply:
_move_processed(s3, ds, [k for k, _ in listing]) # archive any stragglers _move_processed(s3, [k for k, _ in listing]) # archive any stragglers
if ds.post_apply: _capture_history()
ds.post_apply()
return return
log.info("%d of %d %s change file(s) to ingest (watermark %s); newest=%s", log.info("%d of %d change file(s) to ingest (watermark %s); newest=%s",
len(pending), len(listing), ds.name, last_ts and last_ts.isoformat(), pending[-1][0]) len(pending), len(listing), last_ts and last_ts.isoformat(), pending[-1][0])
total = 0 total = 0
for i, (key, etag) in enumerate(pending): for i, (key, etag) in enumerate(pending):
rows = _parse_csv(_get_text(s3, key)) rows = _parse_csv(_get_text(s3, key))
ts = _ts_from_key(ds, key) ts = _ts_from_key(key)
# the first file applied onto an empty watermark is the full baseline; every # the first file applied onto an empty watermark is the full baseline; every
# file after is a delta. export_type is informational (recorded in import_meta). # file after is a delta. export_type is informational (recorded in import_meta).
meta = {"export_type": "baseline" if (last_ts is None and i == 0) else "delta", meta = {"export_type": "baseline" if (last_ts is None and i == 0) else "delta",
@ -324,14 +319,14 @@ def ingest(ds: Dataset, args) -> None:
meta["exported_at"] = ts.isoformat() meta["exported_at"] = ts.isoformat()
# rows + watermark (source_max_key) commit in one txn, advancing per file; only # rows + watermark (source_max_key) commit in one txn, advancing per file; only
# then archive, so the changes/ folder state always matches the watermark. # then archive, so the changes/ folder state always matches the watermark.
total += upsert(ds, rows, args.apply, meta=meta) total += upsert(rows, args.apply, meta=meta)
if args.apply: if args.apply:
_move_processed(s3, ds, [key]) _move_processed(s3, [key])
else: else:
log.info("DRY-RUN — would archive %s to %s", key, ds.processed_prefix) log.info("DRY-RUN — would archive %s to %s", key, _PROCESSED_PREFIX)
log.info("ingested %d %s change file(s); %d rows kept in total", len(pending), ds.name, total) log.info("ingested %d change file(s); %d rows kept in total", len(pending), total)
if args.apply and ds.post_apply: if args.apply:
ds.post_apply() _capture_history()
# ── place extraction (strip network codes, keep the real place) ─────────────── # ── place extraction (strip network codes, keep the real place) ───────────────
@ -480,7 +475,7 @@ def geocode(query: str, viewbox: tuple | None = None) -> tuple[float, float, flo
return None return None
# ── cluster gazetteer (coarse fallback; CROSS-DATASET: inc + crq) ───────────── # ── cluster gazetteer (coarse fallback) ───────────────────────────────────────
def geocode_clusters(apply: bool) -> None: def geocode_clusters(apply: bool) -> None:
with get_conn() as conn: with get_conn() as conn:
with conn.cursor() as cur: with conn.cursor() as cur:
@ -527,7 +522,7 @@ def geocode_clusters(apply: bool) -> None:
log.info("gazetteer: %d clusters written (unverified — review tickets.geo_clusters)", written) log.info("gazetteer: %d clusters written (unverified — review tickets.geo_clusters)", written)
# ── per-location geocoding (precise; actionable inc + crq) ──────────────────── # ── per-location geocoding (precise; actionable INC) ──────────────────────────
# A location geocode is only trusted if it lands within this radius of the # A location geocode is only trusted if it lands within this radius of the
# cluster centroid; otherwise the geocoder matched the landmark in the wrong # cluster centroid; otherwise the geocoder matched the landmark in the wrong
# place and we fall back to the cluster centroid. # place and we fall back to the cluster centroid.
@ -542,25 +537,17 @@ def geocode_locations(apply: bool) -> None:
""" """
SELECT t.key, t.location_name, t.cluster, t.region, gc.lat AS clat, gc.lng AS clng SELECT t.key, t.location_name, t.cluster, t.region, gc.lat AS clat, gc.lng AS clng
FROM ( FROM (
SELECT tickets.norm_cluster(src.raw->>'location_name') AS key, SELECT tickets.norm_cluster(raw->>'location_name') AS key,
(array_agg(src.raw->>'location_name'))[1] AS location_name, (array_agg(raw->>'location_name'))[1] AS location_name,
(array_agg(src.raw->>'cluster'))[1] AS cluster, (array_agg(raw->>'cluster'))[1] AS cluster,
(array_agg(src.raw->>'region'))[1] AS region, (array_agg(raw->>'region'))[1] AS region,
tickets.norm_cluster((array_agg(src.raw->>'cluster'))[1]) AS ckey tickets.norm_cluster((array_agg(raw->>'cluster'))[1]) AS ckey
FROM ( FROM tickets.inc
-- CROSS-DATASET: actionable INC + CRQ share one location gazetteer
SELECT raw FROM tickets.inc
WHERE (raw->>'is_actionable')::boolean WHERE (raw->>'is_actionable')::boolean
AND raw->>'location_name' IS NOT NULL AND raw->>'location_name' IS NOT NULL
AND tickets.norm_cluster(raw->>'location_name') IS NOT NULL AND tickets.norm_cluster(raw->>'location_name') IS NOT NULL
UNION ALL AND NOT EXISTS (SELECT 1 FROM tickets.geo_locations gl
SELECT raw FROM tickets.crq WHERE gl.query_key = tickets.norm_cluster(raw->>'location_name')
WHERE (raw->>'is_actionable')::boolean
AND raw->>'location_name' IS NOT NULL
AND tickets.norm_cluster(raw->>'location_name') IS NOT NULL
) src
WHERE NOT EXISTS (SELECT 1 FROM tickets.geo_locations gl
WHERE gl.query_key = tickets.norm_cluster(src.raw->>'location_name')
AND gl.geom IS NOT NULL) AND gl.geom IS NOT NULL)
GROUP BY 1 GROUP BY 1
) t ) t
@ -568,7 +555,7 @@ def geocode_locations(apply: bool) -> None:
""" """
) )
todo = cur.fetchall() todo = cur.fetchall()
log.info("%d actionable inc+crq locations to geocode (provider=%s)", len(todo), _PROVIDER) log.info("%d actionable-INC locations to geocode (provider=%s)", len(todo), _PROVIDER)
if not apply: if not apply:
for key, loc, cluster, region, clat, clng in todo[:50]: for key, loc, cluster, region, clat, clng in todo[:50]:
log.info(" %s -> %s", key, " | ".join(compose_queries(loc, cluster, region))) log.info(" %s -> %s", key, " | ".join(compose_queries(loc, cluster, region)))
@ -622,3 +609,41 @@ def _resolve() -> int:
with conn.cursor() as cur: with conn.cursor() as cur:
cur.execute("SELECT tickets.resolve_ticket_geoms()") cur.execute("SELECT tickets.resolve_ticket_geoms()")
return cur.fetchone()[0] return cur.fetchone()[0]
# ── entrypoint ────────────────────────────────────────────────────────────────
def main() -> None:
ap = argparse.ArgumentParser(description="Ingest INC tickets from CSV (raw-first) + geocode")
ap.add_argument("--apply", action="store_true", help="Write to DB (default: dry-run)")
ap.add_argument("--from-bucket", action="store_true",
help="Drain the incremental INC change stream (automations/inc/changes/) "
"from the isptickets S3 bucket: every not-yet-processed file "
"oldest→newest, upsert on ticket_id, advance the watermark, archive")
ap.add_argument("--reseed", action="store_true",
help="Ignore the stored watermark and drain every file in changes/ once "
"(one-time bucket cutover / reseed). Use with --from-bucket --apply")
ap.add_argument("--inc-csv", default=None, help="Local INC tickets CSV file (dev)")
ap.add_argument("--geocode-clusters", action="store_true",
help="Geocode distinct clusters into the gazetteer, then re-resolve geoms")
ap.add_argument("--geocode-locations", action="store_true",
help="Geocode actionable-INC location_names precisely (keyed provider), then re-resolve")
ap.add_argument("--capture-history", action="store_true",
help="Run tickets.capture_history() standalone (closure_events + daily snapshot)")
args = ap.parse_args()
if args.geocode_clusters:
geocode_clusters(apply=args.apply)
return
if args.geocode_locations:
geocode_locations(apply=args.apply)
return
if args.capture_history:
_capture_history()
return
if not (args.from_bucket or args.inc_csv):
ap.error("provide --from-bucket, --inc-csv, --geocode-clusters, --geocode-locations, or --capture-history")
ingest(args)
if __name__ == "__main__":
main()

View file

View file

@ -1,74 +0,0 @@
"""
inc/import_inc.py Fireside Communications · INC (incident / fault) ingestion.
Thin entrypoint over the shared engine (`pipeline.py`) for the INC dataset:
tickets.inc incidents / customer faults (FleetOps "Tickets" INC tab)
INC reads the incremental CDC change stream automations/inc/changes/<EAT-ts>.csv
from the `isptickets` bucket, upserts on ticket_id, advances the per-dataset
watermark, archives each file to automations/inc/processed/, and uniquely to
INC runs tickets.capture_history() after each --apply run (closure_events +
daily backlog snapshot). Geocoding (--geocode-clusters / --geocode-locations) is
CROSS-DATASET and driven from here (the shared gazetteer covers inc + crq).
Usage (needs DATABASE_URL + RUSTFS_* + GEOCODER_* env; see .env.example):
python -m inc.import_inc --from-bucket --apply
python -m inc.import_inc --from-bucket --reseed --apply # one-time bucket cutover
python -m inc.import_inc --inc-csv 2026-06-15T17-00-00.csv --apply
python -m inc.import_inc --geocode-clusters --apply
python -m inc.import_inc --geocode-locations --apply
Pre-requisite: migrations applied (run_migrations.py) tickets.inc/crq +
geo_clusters + geo_locations + reporting.fn_tickets_for_map.
"""
from __future__ import annotations
import argparse
import pipeline
# INC captures closure/backlog history after every --apply run (CRQ does not yet).
DATASET = pipeline.make_dataset("inc", post_apply=pipeline.capture_history)
def main() -> None:
ap = argparse.ArgumentParser(description="Ingest INC tickets from CSV (raw-first) + geocode")
ap.add_argument("--apply", action="store_true", help="Write to DB (default: dry-run)")
ap.add_argument("--from-bucket", action="store_true",
help="Drain the incremental INC change stream (automations/inc/changes/) "
"from the isptickets S3 bucket: every not-yet-processed file "
"oldest→newest, upsert on ticket_id, advance the watermark, archive")
ap.add_argument("--reseed", action="store_true",
help="Ignore the stored watermark and drain every file in changes/ once "
"(one-time bucket cutover / reseed). Use with --from-bucket --apply")
ap.add_argument("--inc-csv", dest="local_csv", default=None,
help="Local INC tickets CSV file (dev)")
ap.add_argument("--geocode-clusters", action="store_true",
help="Geocode distinct clusters (inc+crq) into the gazetteer, then re-resolve")
ap.add_argument("--geocode-locations", action="store_true",
help="Geocode actionable inc+crq location_names precisely (keyed provider), "
"then re-resolve")
ap.add_argument("--capture-history", action="store_true",
help="Run tickets.capture_history() standalone "
"(closure_events + daily snapshot)")
args = ap.parse_args()
if args.geocode_clusters:
pipeline.geocode_clusters(apply=args.apply)
return
if args.geocode_locations:
pipeline.geocode_locations(apply=args.apply)
return
if args.capture_history:
pipeline.capture_history()
return
if not (args.from_bucket or args.local_csv):
ap.error("provide --from-bucket, --inc-csv, --geocode-clusters, "
"--geocode-locations, or --capture-history")
pipeline.ingest(DATASET, args)
if __name__ == "__main__":
main()

View file

@ -1,89 +0,0 @@
-- 13_inc_search_fn.sql — fleettickets · INC ticket explorer (search) function
-- ─────────────────────────────────────────────────────────────────────────────
-- reporting.fn_inc_search — ad-hoc ticket lookup by id / engineer / cluster /
-- status / state / time, for the FleetOps "Ticket explorer" card. Returns
-- { count, truncated, limit, state, rows }. Consumed by dashboard_api
-- GET /webhook/inc-search.
--
-- RECOVERED INTO VERSION CONTROL 2026-06-26: this migration was applied to the live
-- DB on 2026-06-19 but the file was never committed. Recovered verbatim from the live
-- definition (pg_get_functiondef) so a fresh DB rebuilds faithfully; the live ledger
-- already lists it, so run_migrations skips it there. The crq mirror is in 16.
-- Idempotent (CREATE OR REPLACE).
-- ─────────────────────────────────────────────────────────────────────────────
SET search_path = tickets, public;
CREATE OR REPLACE FUNCTION reporting.fn_inc_search(
p_ticket_id text DEFAULT NULL,
p_owner text DEFAULT NULL,
p_cluster text DEFAULT NULL,
p_status text DEFAULT NULL,
p_state text DEFAULT 'closed',
p_from timestamptz DEFAULT NULL,
p_to timestamptz DEFAULT NULL,
p_limit integer DEFAULT 500
)
RETURNS jsonb LANGUAGE plpgsql STABLE AS $function$
DECLARE
v_state text := lower(COALESCE(NULLIF(p_state, ''), 'closed'));
v_limit integer := LEAST(GREATEST(COALESCE(p_limit, 500), 1), 5000);
v_result jsonb;
BEGIN
p_ticket_id := NULLIF(trim(p_ticket_id), '');
p_owner := NULLIF(trim(p_owner), '');
p_cluster := NULLIF(p_cluster, '');
p_status := NULLIF(p_status, '');
WITH hits AS (
SELECT ticket_id, normalized_status, cluster, region, location_name,
initcap(lower(NULLIF(owner, ''))) AS owner, assigned_team,
sla_status, mttr, closed_at, created_at_service, is_actionable,
CASE WHEN geom IS NOT NULL THEN ST_Y(geom) END AS lat,
CASE WHEN geom IS NOT NULL THEN ST_X(geom) END AS lng
FROM tickets.inc
WHERE (p_ticket_id IS NULL OR ticket_id ILIKE '%' || p_ticket_id || '%')
AND (p_owner IS NULL OR lower(owner) LIKE '%' || lower(p_owner) || '%')
AND (p_cluster IS NULL OR cluster = p_cluster)
AND (p_status IS NULL OR normalized_status = p_status)
AND CASE v_state
WHEN 'open' THEN COALESCE(is_actionable, false)
WHEN 'all' THEN COALESCE(is_actionable, false)
OR (closed_at IS NOT NULL
AND (p_from IS NULL OR closed_at >= p_from)
AND (p_to IS NULL OR closed_at < p_to))
ELSE NOT COALESCE(is_actionable, false) -- 'closed'
AND closed_at IS NOT NULL
AND (p_from IS NULL OR closed_at >= p_from)
AND (p_to IS NULL OR closed_at < p_to)
END
),
total AS (SELECT count(*) AS n FROM hits),
page AS (
SELECT * FROM hits
ORDER BY closed_at DESC NULLS LAST, created_at_service DESC NULLS LAST
LIMIT v_limit
)
SELECT jsonb_build_object(
'count', (SELECT n FROM total),
'truncated', (SELECT n FROM total) > v_limit,
'limit', v_limit,
'state', v_state,
'rows', COALESCE((SELECT jsonb_agg(to_jsonb(page)
ORDER BY page.closed_at DESC NULLS LAST,
page.created_at_service DESC NULLS LAST)
FROM page), '[]'::jsonb)
) INTO v_result;
RETURN v_result;
END $function$;
DO $grants$
BEGIN
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'dashboard_ro') THEN
GRANT EXECUTE ON FUNCTION reporting.fn_inc_search(text, text, text, text, text, timestamptz, timestamptz, integer) TO dashboard_ro;
END IF;
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'grafana_ro') THEN
GRANT EXECUTE ON FUNCTION reporting.fn_inc_search(text, text, text, text, text, timestamptz, timestamptz, integer) TO grafana_ro;
END IF;
END $grants$;

View file

@ -1,37 +0,0 @@
-- 14_inc_filter_options.sql — fleettickets · INC explorer dropdown options
-- ─────────────────────────────────────────────────────────────────────────────
-- reporting.fn_inc_filter_options — distinct engineers (owner), clusters, and the
-- ids of currently-open tickets, for the FleetOps "Ticket explorer" dropdowns.
-- Consumed by dashboard_api GET /webhook/inc-filter-options.
--
-- RECOVERED INTO VERSION CONTROL 2026-06-26: applied to the live DB 2026-06-19 but
-- never committed. Recovered verbatim from the live definition so a fresh DB rebuilds
-- faithfully; the live ledger already lists it (run_migrations skips it there). The crq
-- mirror is in 16. Idempotent (CREATE OR REPLACE).
-- ─────────────────────────────────────────────────────────────────────────────
SET search_path = tickets, public;
CREATE OR REPLACE FUNCTION reporting.fn_inc_filter_options()
RETURNS jsonb LANGUAGE sql STABLE AS $function$
SELECT jsonb_build_object(
'owners', (SELECT COALESCE(jsonb_agg(o ORDER BY o), '[]'::jsonb)
FROM (SELECT DISTINCT initcap(lower(NULLIF(owner, ''))) AS o
FROM tickets.inc WHERE NULLIF(owner, '') IS NOT NULL) s),
'clusters', (SELECT COALESCE(jsonb_agg(c ORDER BY c), '[]'::jsonb)
FROM (SELECT DISTINCT cluster AS c
FROM tickets.inc WHERE NULLIF(cluster, '') IS NOT NULL) s),
'open_ticket_ids', (SELECT COALESCE(jsonb_agg(ticket_id ORDER BY ticket_id), '[]'::jsonb)
FROM tickets.inc WHERE COALESCE(is_actionable, false))
);
$function$;
DO $grants$
BEGIN
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'dashboard_ro') THEN
GRANT EXECUTE ON FUNCTION reporting.fn_inc_filter_options() TO dashboard_ro;
END IF;
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'grafana_ro') THEN
GRANT EXECUTE ON FUNCTION reporting.fn_inc_filter_options() TO grafana_ro;
END IF;
END $grants$;

View file

@ -1,101 +0,0 @@
-- 15_crq_table.sql — fleettickets · materialize tickets.crq + typed columns
-- ─────────────────────────────────────────────────────────────────────────────
-- Why a NEW migration (not an edit to 01): `01_tickets_schema.sql` was applied to the
-- live DB on 2026-06-15 from a version that PREDATED its `tickets.crq` section, so the
-- IF-NOT-EXISTS ledger guard has kept crq from ever being created there — even though
-- the live `reporting.fn_tickets_for_map` and `tickets.resolve_ticket_geoms` already
-- reference it (they error if called until crq exists). This migration creates
-- `tickets.crq` self-containedly (table + geom trigger + indexes) and adds the same
-- typed STORED generated columns INC got in `03_inc_columns.sql`, bringing CRQ to
-- data-layer parity.
--
-- Deterministic + idempotent — converges to the same shape on BOTH:
-- • the live DB (crq missing) -> CREATE makes it, ALTER adds typed cols
-- • a fresh DB (crq minimal, from 01) -> CREATE skipped, ALTER adds typed cols
-- Reuses shared objects already present: tickets.tg_ticket_geom() (01),
-- tickets.norm_cluster() (01), tickets.eat_ts() (03).
--
-- NOTE: the live DB also carries un-versioned migrations 13_inc_search_fn.sql /
-- 14_inc_filter_options.sql (applied 2026-06-19, absent from this repo) — INC dashboard
-- functions, unrelated to CRQ. Numbered 15 here to sit cleanly after the live ledger.
-- ─────────────────────────────────────────────────────────────────────────────
SET search_path = tickets, public;
-- ── table (base shape mirrors tickets.inc's original 01 base) ────────────────
CREATE TABLE IF NOT EXISTS tickets.crq (
ticket_id text PRIMARY KEY,
raw jsonb NOT NULL,
geom geometry(Point, 4326),
geo_source text, -- 'feed' | 'location' | 'cluster' | 'none'
ingested_at timestamptz NOT NULL DEFAULT now()
);
-- ── geom trigger — read from raw; shared tickets.tg_ticket_geom() (from 01) ───
DROP TRIGGER IF EXISTS trg_crq_geom ON tickets.crq;
CREATE TRIGGER trg_crq_geom BEFORE INSERT OR UPDATE ON tickets.crq
FOR EACH ROW EXECUTE FUNCTION tickets.tg_ticket_geom();
-- ── raw-based indexes (mirror 01's inc/crq set) ──────────────────────────────
CREATE INDEX IF NOT EXISTS ix_crq_status_raw ON tickets.crq ((raw->>'normalized_status'));
CREATE INDEX IF NOT EXISTS ix_crq_actionable_raw ON tickets.crq (((raw->>'is_actionable')::boolean))
WHERE (raw->>'is_actionable')::boolean;
CREATE INDEX IF NOT EXISTS ix_crq_cluster_raw ON tickets.crq (tickets.norm_cluster(raw->>'cluster'));
CREATE INDEX IF NOT EXISTS ix_crq_loc_raw ON tickets.crq (tickets.norm_cluster(raw->>'location_name'));
CREATE INDEX IF NOT EXISTS ix_crq_geom ON tickets.crq USING gist (geom);
-- ── typed STORED generated columns (mirror of 03_inc_columns.sql) ────────────
-- Computed for ALL existing rows on creation + auto-recomputed on every insert/update;
-- `raw` stays the source of truth. tickets.eat_ts() (EAT->timestamptz, IMMUTABLE) is
-- reused from 03 — see that file's note on why IMMUTABLE is safe for Kenya (UTC+3, no DST).
ALTER TABLE tickets.crq
-- text
ADD COLUMN IF NOT EXISTS service_type text GENERATED ALWAYS AS (raw->>'service_type') STORED,
ADD COLUMN IF NOT EXISTS bucket text GENERATED ALWAYS AS (raw->>'bucket') STORED,
ADD COLUMN IF NOT EXISTS raw_status text GENERATED ALWAYS AS (raw->>'raw_status') STORED,
ADD COLUMN IF NOT EXISTS normalized_status text GENERATED ALWAYS AS (raw->>'normalized_status') STORED,
ADD COLUMN IF NOT EXISTS cluster text GENERATED ALWAYS AS (raw->>'cluster') STORED,
ADD COLUMN IF NOT EXISTS region text GENERATED ALWAYS AS (raw->>'region') STORED,
ADD COLUMN IF NOT EXISTS location_name text GENERATED ALWAYS AS (raw->>'location_name') STORED,
ADD COLUMN IF NOT EXISTS assigned_team text GENERATED ALWAYS AS (raw->>'assigned_team') STORED,
ADD COLUMN IF NOT EXISTS owner text GENERATED ALWAYS AS (raw->>'owner') STORED,
ADD COLUMN IF NOT EXISTS sla_status text GENERATED ALWAYS AS (raw->>'sla_status') STORED,
-- numeric / float
ADD COLUMN IF NOT EXISTS mttr numeric GENERATED ALWAYS AS (NULLIF(raw->>'mttr','')::numeric) STORED,
ADD COLUMN IF NOT EXISTS latitude double precision GENERATED ALWAYS AS (NULLIF(raw->>'latitude','')::double precision) STORED,
ADD COLUMN IF NOT EXISTS longitude double precision GENERATED ALWAYS AS (NULLIF(raw->>'longitude','')::double precision) STORED,
-- boolean
ADD COLUMN IF NOT EXISTS is_actionable boolean GENERATED ALWAYS AS (NULLIF(raw->>'is_actionable','')::boolean) STORED,
ADD COLUMN IF NOT EXISTS is_auto_created boolean GENERATED ALWAYS AS (NULLIF(raw->>'is_auto_created','')::boolean) STORED,
ADD COLUMN IF NOT EXISTS is_auto_closed boolean GENERATED ALWAYS AS (NULLIF(raw->>'is_auto_closed','')::boolean) STORED,
ADD COLUMN IF NOT EXISTS is_alarm boolean GENERATED ALWAYS AS (NULLIF(raw->>'is_alarm','')::boolean) STORED,
-- timestamps (EAT wall-clock -> timestamptz). created_at/updated_at are the EXPORT
-- pipeline's bookkeeping (not ticket lifecycle), hence the source_ prefix.
ADD COLUMN IF NOT EXISTS created_at_service timestamptz GENERATED ALWAYS AS (tickets.eat_ts(raw->>'created_at_service')) STORED,
ADD COLUMN IF NOT EXISTS scheduled_at timestamptz GENERATED ALWAYS AS (tickets.eat_ts(raw->>'scheduled_at')) STORED,
ADD COLUMN IF NOT EXISTS closed_at timestamptz GENERATED ALWAYS AS (tickets.eat_ts(raw->>'closed_at')) STORED,
ADD COLUMN IF NOT EXISTS last_seen_at timestamptz GENERATED ALWAYS AS (tickets.eat_ts(raw->>'last_seen_at')) STORED,
ADD COLUMN IF NOT EXISTS first_seen_at timestamptz GENERATED ALWAYS AS (tickets.eat_ts(raw->>'first_seen_at')) STORED,
ADD COLUMN IF NOT EXISTS source_created_at timestamptz GENERATED ALWAYS AS (tickets.eat_ts(raw->>'created_at')) STORED,
ADD COLUMN IF NOT EXISTS source_updated_at timestamptz GENERATED ALWAYS AS (tickets.eat_ts(raw->>'updated_at')) STORED;
-- ── typed-column indexes (serve cluster / team / closure queries) ────────────
CREATE INDEX IF NOT EXISTS ix_crq_norm_status_col ON tickets.crq (normalized_status);
CREATE INDEX IF NOT EXISTS ix_crq_cluster_col ON tickets.crq (cluster);
CREATE INDEX IF NOT EXISTS ix_crq_assigned_team ON tickets.crq (assigned_team);
CREATE INDEX IF NOT EXISTS ix_crq_closed_at ON tickets.crq (closed_at);
CREATE INDEX IF NOT EXISTS ix_crq_actionable_col ON tickets.crq (is_actionable) WHERE is_actionable;
-- ── grants (guarded: roles may not exist on a fresh DB) ──────────────────────
DO $grants$
BEGIN
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'tracksolid_owner') THEN
GRANT SELECT, INSERT, UPDATE, DELETE ON tickets.crq TO tracksolid_owner;
END IF;
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'dashboard_ro') THEN
GRANT SELECT ON tickets.crq TO dashboard_ro;
END IF;
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'grafana_ro') THEN
GRANT SELECT ON tickets.crq TO grafana_ro;
END IF;
END $grants$;

View file

@ -1,297 +0,0 @@
-- 16_crq_dashboard.sql — fleettickets · CRQ dashboard parity (view + read functions)
-- ─────────────────────────────────────────────────────────────────────────────
-- Brings CRQ to FleetOps-dashboard parity with INC, so the Tickets tab's CRQ
-- sub-tab works "just like INC". Mirrors, against tickets.crq:
-- tickets.crq_open_sla ← mirror of tickets.inc_open_sla (08)
-- reporting.fn_crq_dashboard ← mirror of reporting.fn_inc_dashboard (09/12)
-- reporting.fn_crq_search ← mirror of reporting.fn_inc_search (13)
-- reporting.fn_crq_filter_options ← mirror of reporting.fn_inc_filter_options (14)
-- consumed by dashboard_api GET /webhook/crq-dashboard | crq-search | crq-filter-options.
--
-- Differences from the INC view: tickets.crq has no `geog` column (mig 05 is INC-only)
-- and its latitude/longitude come from `raw` (empty in the feed), so crq_open_sla omits
-- geog and derives latitude/longitude from `geom`. The 48h SLA rule is reused verbatim
-- for layout parity — installation-lifecycle SLA semantics may be refined later.
--
-- Idempotent (CREATE OR REPLACE / VIEW). Requires migration 15 (tickets.crq + typed cols).
-- ─────────────────────────────────────────────────────────────────────────────
SET search_path = tickets, public;
-- ── crq_open_sla — open CRQ tickets with derived SLA (mirror of inc_open_sla) ─
CREATE OR REPLACE VIEW tickets.crq_open_sla AS
SELECT
ticket_id,
normalized_status,
bucket,
cluster,
region,
location_name,
assigned_team,
owner,
sla_status AS source_sla_status,
mttr, -- minutes (null until closed)
COALESCE(created_at_service, first_seen_at) AS sla_clock,
CASE WHEN created_at_service IS NOT NULL THEN 'service' ELSE 'first_seen' END AS sla_clock_source,
round((EXTRACT(EPOCH FROM now() - COALESCE(created_at_service, first_seen_at)) / 3600)::numeric, 1) AS hours_open,
CASE
WHEN COALESCE(created_at_service, first_seen_at) IS NULL THEN 'unknown'
WHEN now() - COALESCE(created_at_service, first_seen_at) >= interval '48h' THEN 'breached'
WHEN now() - COALESCE(created_at_service, first_seen_at) >= interval '36h' THEN 'at_risk'
ELSE 'ok'
END AS sla_state,
created_at_service,
first_seen_at,
scheduled_at,
CASE WHEN geom IS NOT NULL THEN ST_Y(geom) END AS latitude,
CASE WHEN geom IS NOT NULL THEN ST_X(geom) END AS longitude,
geo_source,
geom
FROM tickets.crq
WHERE is_actionable;
COMMENT ON VIEW tickets.crq_open_sla IS
'Open (is_actionable) CRQ tickets with derived SLA (48h rule; clock = created_at_service '
'or first_seen_at fallback). Mirror of inc_open_sla; no geog. fleettickets 16.';
-- ── fn_crq_dashboard — mirror of fn_inc_dashboard over tickets.crq ───────────
CREATE OR REPLACE FUNCTION reporting.fn_crq_dashboard(
p_cluster text DEFAULT NULL,
p_status text DEFAULT NULL,
p_window text DEFAULT 'today',
p_from timestamptz DEFAULT NULL,
p_to timestamptz DEFAULT NULL
)
RETURNS jsonb LANGUAGE plpgsql STABLE AS $function$
DECLARE
v_now_eat timestamp;
v_from timestamptz;
v_to timestamptz;
v_preset text;
v_days numeric;
v_result jsonb;
BEGIN
p_cluster := NULLIF(p_cluster, '');
p_status := NULLIF(p_status, '');
v_now_eat := now() AT TIME ZONE 'Africa/Nairobi';
-- ── resolve the window ──────────────────────────────────────────────────────
IF p_from IS NOT NULL OR p_to IS NOT NULL THEN
v_preset := 'custom';
v_from := COALESCE(p_from, '-infinity'::timestamptz);
v_to := COALESCE(p_to, 'infinity'::timestamptz);
ELSE
v_preset := lower(COALESCE(NULLIF(p_window, ''), 'today'));
IF v_preset = 'week' THEN
v_from := date_trunc('week', v_now_eat) AT TIME ZONE 'Africa/Nairobi';
v_to := (date_trunc('week', v_now_eat) + interval '1 week') AT TIME ZONE 'Africa/Nairobi';
ELSIF v_preset = 'month' THEN
v_from := date_trunc('month', v_now_eat) AT TIME ZONE 'Africa/Nairobi';
v_to := (date_trunc('month', v_now_eat) + interval '1 month') AT TIME ZONE 'Africa/Nairobi';
ELSE
v_preset := 'today';
v_from := date_trunc('day', v_now_eat) AT TIME ZONE 'Africa/Nairobi';
v_to := (date_trunc('day', v_now_eat) + interval '1 day') AT TIME ZONE 'Africa/Nairobi';
END IF;
END IF;
IF v_from > '-infinity'::timestamptz AND v_to < 'infinity'::timestamptz THEN
v_days := GREATEST(EXTRACT(EPOCH FROM (v_to - v_from)) / 86400.0, 1);
ELSE
v_days := NULL; -- open-ended custom window → per-day average not meaningful
END IF;
-- ── build payload ───────────────────────────────────────────────────────────
WITH open_t AS (
SELECT * FROM tickets.crq_open_sla
WHERE (p_cluster IS NULL OR cluster = p_cluster)
AND (p_status IS NULL OR normalized_status = p_status)
),
closed_t AS (
SELECT ticket_id, normalized_status, cluster, region, location_name,
assigned_team, owner, closed_at, mttr, sla_status, geo_source, geom
FROM tickets.crq
WHERE NOT COALESCE(is_actionable, false)
AND closed_at IS NOT NULL
AND closed_at >= v_from AND closed_at < v_to
AND (p_cluster IS NULL OR cluster = p_cluster)
AND (p_status IS NULL OR normalized_status = p_status)
)
SELECT jsonb_build_object(
'window', jsonb_build_object('from', v_from, 'to', v_to, 'preset', v_preset),
'open', jsonb_build_object(
'type', 'FeatureCollection',
'features', COALESCE((
SELECT jsonb_agg(jsonb_build_object(
'type', 'Feature',
'properties', jsonb_build_object(
'ticket_id', ticket_id, 'normalized_status', normalized_status,
'cluster', cluster, 'region', region, 'location_name', location_name,
'assigned_team', assigned_team, 'owner', initcap(lower(NULLIF(owner, ''))),
'geo_source', geo_source,
'sla_state', sla_state, 'hours_open', hours_open),
'geometry', ST_AsGeoJSON(geom)::jsonb))
FROM open_t WHERE geom IS NOT NULL), '[]'::jsonb)
),
'closed', jsonb_build_object(
'type', 'FeatureCollection',
'features', COALESCE((
SELECT jsonb_agg(jsonb_build_object(
'type', 'Feature',
'properties', jsonb_build_object(
'ticket_id', ticket_id, 'normalized_status', normalized_status,
'cluster', cluster, 'region', region, 'location_name', location_name,
'assigned_team', assigned_team, 'owner', initcap(lower(NULLIF(owner, ''))),
'geo_source', geo_source,
'closed_at', closed_at, 'mttr', mttr, 'sla_status', sla_status),
'geometry', ST_AsGeoJSON(geom)::jsonb))
FROM closed_t WHERE geom IS NOT NULL), '[]'::jsonb)
),
'metrics', jsonb_build_object(
'open_now', (SELECT count(*) FROM open_t),
'closed_in_window', (SELECT count(*) FROM closed_t),
'sla', jsonb_build_object(
'open', (SELECT jsonb_build_object(
'breached', count(*) FILTER (WHERE sla_state = 'breached'),
'at_risk', count(*) FILTER (WHERE sla_state = 'at_risk'),
'ok', count(*) FILTER (WHERE sla_state = 'ok'),
'unknown', count(*) FILTER (WHERE sla_state = 'unknown')) FROM open_t),
'closed', (SELECT jsonb_build_object(
'compliant', count(*) FILTER (WHERE sla_status = 'Compliant'),
'breached', count(*) FILTER (WHERE sla_status = 'Breached')) FROM closed_t)
),
'by_status', COALESCE((SELECT jsonb_object_agg(s, c) FROM (
SELECT COALESCE(normalized_status, '(none)') AS s, count(*) AS c FROM (
SELECT normalized_status FROM open_t
UNION ALL SELECT normalized_status FROM closed_t) u GROUP BY 1) z), '{}'::jsonb),
'by_cluster', COALESCE((SELECT jsonb_object_agg(cl, c) FROM (
SELECT COALESCE(cluster, '(none)') AS cl, count(*) AS c FROM (
SELECT cluster FROM open_t
UNION ALL SELECT cluster FROM closed_t) u GROUP BY 1) z), '{}'::jsonb),
-- closures by engineer (CASE-NORMALIZED owner) — leaderboard for "who closed".
'by_owner', COALESCE((SELECT jsonb_agg(jsonb_build_object(
'owner', o, 'closed', c, 'breached', b, 'avg_mttr_min', a) ORDER BY c DESC, o)
FROM (
SELECT COALESCE(initcap(lower(NULLIF(owner, ''))), '(unattributed)') AS o,
count(*) AS c,
count(*) FILTER (WHERE sla_status = 'Breached') AS b,
round(avg(mttr) FILTER (WHERE mttr IS NOT NULL), 1) AS a
FROM closed_t GROUP BY 1) z), '[]'::jsonb),
'closure_rate', jsonb_build_object(
'per_day_avg', CASE WHEN v_days IS NULL THEN NULL
ELSE round((SELECT count(*) FROM closed_t)::numeric / v_days, 2) END,
'series', COALESCE((SELECT jsonb_agg(jsonb_build_object('day', d, 'count', c) ORDER BY d) FROM (
SELECT (closed_at AT TIME ZONE 'Africa/Nairobi')::date AS d, count(*) AS c
FROM closed_t GROUP BY 1) z), '[]'::jsonb)
),
'avg_mttr_min', (SELECT round(avg(mttr), 1) FROM closed_t WHERE mttr IS NOT NULL)
),
'freshness', (SELECT jsonb_object_agg(dataset, jsonb_build_object(
'export_type', export_type, 'exported_at', exported_at,
'records_ingested', records_ingested, 'ingested_at', ingested_at))
FROM tickets.import_meta)
) INTO v_result;
RETURN v_result;
END $function$;
-- ── fn_crq_search — mirror of fn_inc_search over tickets.crq ──────────────────
CREATE OR REPLACE FUNCTION reporting.fn_crq_search(
p_ticket_id text DEFAULT NULL,
p_owner text DEFAULT NULL,
p_cluster text DEFAULT NULL,
p_status text DEFAULT NULL,
p_state text DEFAULT 'closed',
p_from timestamptz DEFAULT NULL,
p_to timestamptz DEFAULT NULL,
p_limit integer DEFAULT 500
)
RETURNS jsonb LANGUAGE plpgsql STABLE AS $function$
DECLARE
v_state text := lower(COALESCE(NULLIF(p_state, ''), 'closed'));
v_limit integer := LEAST(GREATEST(COALESCE(p_limit, 500), 1), 5000);
v_result jsonb;
BEGIN
p_ticket_id := NULLIF(trim(p_ticket_id), '');
p_owner := NULLIF(trim(p_owner), '');
p_cluster := NULLIF(p_cluster, '');
p_status := NULLIF(p_status, '');
WITH hits AS (
SELECT ticket_id, normalized_status, cluster, region, location_name,
initcap(lower(NULLIF(owner, ''))) AS owner, assigned_team,
sla_status, mttr, closed_at, created_at_service, is_actionable,
CASE WHEN geom IS NOT NULL THEN ST_Y(geom) END AS lat,
CASE WHEN geom IS NOT NULL THEN ST_X(geom) END AS lng
FROM tickets.crq
WHERE (p_ticket_id IS NULL OR ticket_id ILIKE '%' || p_ticket_id || '%')
AND (p_owner IS NULL OR lower(owner) LIKE '%' || lower(p_owner) || '%')
AND (p_cluster IS NULL OR cluster = p_cluster)
AND (p_status IS NULL OR normalized_status = p_status)
AND CASE v_state
WHEN 'open' THEN COALESCE(is_actionable, false)
WHEN 'all' THEN COALESCE(is_actionable, false)
OR (closed_at IS NOT NULL
AND (p_from IS NULL OR closed_at >= p_from)
AND (p_to IS NULL OR closed_at < p_to))
ELSE NOT COALESCE(is_actionable, false) -- 'closed'
AND closed_at IS NOT NULL
AND (p_from IS NULL OR closed_at >= p_from)
AND (p_to IS NULL OR closed_at < p_to)
END
),
total AS (SELECT count(*) AS n FROM hits),
page AS (
SELECT * FROM hits
ORDER BY closed_at DESC NULLS LAST, created_at_service DESC NULLS LAST
LIMIT v_limit
)
SELECT jsonb_build_object(
'count', (SELECT n FROM total),
'truncated', (SELECT n FROM total) > v_limit,
'limit', v_limit,
'state', v_state,
'rows', COALESCE((SELECT jsonb_agg(to_jsonb(page)
ORDER BY page.closed_at DESC NULLS LAST,
page.created_at_service DESC NULLS LAST)
FROM page), '[]'::jsonb)
) INTO v_result;
RETURN v_result;
END $function$;
-- ── fn_crq_filter_options — mirror of fn_inc_filter_options over tickets.crq ──
CREATE OR REPLACE FUNCTION reporting.fn_crq_filter_options()
RETURNS jsonb LANGUAGE sql STABLE AS $function$
SELECT jsonb_build_object(
'owners', (SELECT COALESCE(jsonb_agg(o ORDER BY o), '[]'::jsonb)
FROM (SELECT DISTINCT initcap(lower(NULLIF(owner, ''))) AS o
FROM tickets.crq WHERE NULLIF(owner, '') IS NOT NULL) s),
'clusters', (SELECT COALESCE(jsonb_agg(c ORDER BY c), '[]'::jsonb)
FROM (SELECT DISTINCT cluster AS c
FROM tickets.crq WHERE NULLIF(cluster, '') IS NOT NULL) s),
'open_ticket_ids', (SELECT COALESCE(jsonb_agg(ticket_id ORDER BY ticket_id), '[]'::jsonb)
FROM tickets.crq WHERE COALESCE(is_actionable, false))
);
$function$;
-- ── grants (guarded: roles may not exist on a fresh DB) ──────────────────────
DO $grants$
BEGIN
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'dashboard_ro') THEN
GRANT SELECT ON tickets.crq_open_sla TO dashboard_ro;
GRANT EXECUTE ON FUNCTION reporting.fn_crq_dashboard(text, text, text, timestamptz, timestamptz) TO dashboard_ro;
GRANT EXECUTE ON FUNCTION reporting.fn_crq_search(text, text, text, text, text, timestamptz, timestamptz, integer) TO dashboard_ro;
GRANT EXECUTE ON FUNCTION reporting.fn_crq_filter_options() TO dashboard_ro;
END IF;
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'grafana_ro') THEN
GRANT SELECT ON tickets.crq_open_sla TO grafana_ro;
GRANT EXECUTE ON FUNCTION reporting.fn_crq_dashboard(text, text, text, timestamptz, timestamptz) TO grafana_ro;
GRANT EXECUTE ON FUNCTION reporting.fn_crq_search(text, text, text, text, text, timestamptz, timestamptz, integer) TO grafana_ro;
GRANT EXECUTE ON FUNCTION reporting.fn_crq_filter_options() TO grafana_ro;
END IF;
END $grants$;

View file

@ -61,12 +61,10 @@ Notes:
listings but contain **no objects** (leftover/legacy; ignore them). There is listings but contain **no objects** (leftover/legacy; ignore them). There is
no `latest` pointer and no JSON/metadata envelope. no `latest` pointer and no JSON/metadata envelope.
- **CRQ mirrors INC**: `automations/crq/changes/` carries the same incremental - **CRQ mirrors INC**: `automations/crq/changes/` carries the same incremental
stream (with matching baseline timestamps and additional newer deltas) and the stream (with matching baseline timestamps and additional newer deltas). CRQ
identical 32-column schema. As of 2026-06-25 CRQ **is consumed** — by remains out of scope for `import_tickets.py` (INC-only), but the source-side
`crq/import_crq.py` over the shared `pipeline.py` engine (`tickets.crq`), the same shape is the same. CRQ's old root snapshots (`automations/crq/<ts>.csv`) are
way INC is consumed by `inc/import_inc.py`. CRQ's old root snapshots still present because nothing archives them — they are not consumed.
(`automations/crq/<ts>.csv`, old `tickets` bucket) are still present because nothing
archives them — they are not consumed (the `changes/` stream is the source of truth).
## CSV Schema ## CSV Schema
@ -85,7 +83,7 @@ Each row is a complete record (not a partial diff): a delta row carries the
ticket's full current state, so a plain upsert on `ticket_id` is correct. The ticket's full current state, so a plain upsert on `ticket_id` is correct. The
baseline still contains `is_alarm=true` rows and may include a leading baseline still contains `is_alarm=true` rows and may include a leading
`EXPORT STOPPED…` truncation-sentinel row in `ticket_id`; both are filtered by `EXPORT STOPPED…` truncation-sentinel row in `ticket_id`; both are filtered by
the consumer (see `pipeline.py` + the `inc/`,`crq/` entrypoints). the consumer (see `import_tickets.py`).
## Timestamp Format ## Timestamp Format
@ -98,12 +96,11 @@ Generated once at the start of each execution, formatted in `Africa/Nairobi`
incremental files appear whenever a change batch is exported (multiple within incremental files appear whenever a change batch is exported (multiple within
the same hour are normal, e.g. `15-50-39` then `15-53-04`). the same hour are normal, e.g. `15-50-39` then `15-53-04`).
## How the loader Consumes It ## How `import_tickets.py` Consumes It
`python -m inc.import_inc --from-bucket --apply` (and `python -m crq.import_crq `python import_tickets.py --from-bucket --apply` (see `run_ingest.sh`):
--from-bucket --apply`; see `run_ingest.sh`) — both over the shared `pipeline.py` engine:
1. Lists `automations/<inc|crq>/changes/<ts>.csv`, sorts ascending by timestamp. 1. Lists `automations/inc/changes/<ts>.csv`, sorts ascending by timestamp.
2. Skips files at/older than the **watermark** 2. Skips files at/older than the **watermark**
(`tickets.import_meta.metadata->>'source_max_key'` — the newest file already (`tickets.import_meta.metadata->>'source_max_key'` — the newest file already
applied); on a fresh stream it processes everything present. applied); on a fresh stream it processes everything present.

View file

@ -18,12 +18,10 @@ dev = [
"ruff>=0.4", "ruff>=0.4",
] ]
# Shared engine (pipeline) + helpers as top-level modules, plus the thin per-type # Flat-module project (no package dir) — list the top-level modules explicitly so
# entrypoint packages (inc/, crq/). Listed explicitly so `pip install .` works (the # `pip install .` works (the Docker image installs the project to pull its deps).
# Docker image installs the project to pull its deps; runtime runs from /app via -m).
[tool.setuptools] [tool.setuptools]
py-modules = ["pipeline", "shared", "run_migrations"] py-modules = ["import_tickets", "shared", "run_migrations"]
packages = ["inc", "crq"]
[tool.uv] [tool.uv]
managed = true managed = true

View file

@ -1,17 +1,13 @@
#!/usr/bin/env bash #!/usr/bin/env bash
# run_ingest.sh — fleettickets · INC + CRQ ingest wrapper for cron (plain host/VM). # run_ingest.sh — fleettickets · hourly INC ingest wrapper for cron.
# #
# Loads env from the local .env (DATABASE_URL + RUSTFS_* + GEOCODER_*) and drains # Loads env from the local .env (DATABASE_URL + RUSTFS_* + GEOCODER_*) and runs the
# both ticket change streams with --apply (watermark skip-if-unchanged + per-file # newest-INC-CSV ingest with --apply (skip-if-unchanged + archive are built in).
# archive are built in, so a run with no new files is a cheap no-op).
# #
# Install on the instance (every 20 min, 06:0020:40 EAT): # Install on the instance (ingest every 20 min, 06:0020:40 EAT):
# */20 6-20 * * * /opt/fleettickets/run_ingest.sh >> /var/log/fleettickets.log 2>&1 # */20 6-20 * * * /opt/fleettickets/run_ingest.sh >> /var/log/fleettickets-inc.log 2>&1
# Ensure the crontab runs in the Africa/Nairobi timezone (CRON_TZ=Africa/Nairobi or # Ensure the crontab runs in the Africa/Nairobi timezone (CRON_TZ=Africa/Nairobi or
# the host/container TZ), since the export filenames and the schedule are EAT. # the host/container TZ), since the export filenames and the schedule are EAT.
#
# On Coolify the two ingests run as separate Scheduled Tasks instead (see Dockerfile
# + docs/deployment-and-operations.md); this wrapper is the plain-host fallback.
set -euo pipefail set -euo pipefail
cd "$(dirname "$0")" cd "$(dirname "$0")"
@ -28,7 +24,4 @@ fi
PY="python" PY="python"
[ -x ".venv/bin/python" ] && PY=".venv/bin/python" [ -x ".venv/bin/python" ] && PY=".venv/bin/python"
# Run from the repo root (cwd above) so `-m inc.import_inc` / `-m crq.import_crq` exec "$PY" import_tickets.py --from-bucket --apply
# resolve the packages alongside pipeline.py + shared.py.
"$PY" -m inc.import_inc --from-bucket --apply
"$PY" -m crq.import_crq --from-bucket --apply