feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints

Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.

- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
  keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
  now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
  tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
  columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
  packages inc/crq. Docs (README, implementation, deployment-and-operations,
  n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.

tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
david kiania 2026-06-25 23:16:38 +03:00
parent 0787d3a185
commit 5f5d71d500
15 changed files with 493 additions and 225 deletions

View file

@ -3,7 +3,7 @@
# Shared database (the `tickets` schema lives in tracksolid_db; internal Docker host) # Shared database (the `tickets` schema lives in tracksolid_db; internal Docker host)
DATABASE_URL=postgresql://tracksolid_owner:<password>@timescale_db:5432/tracksolid_db DATABASE_URL=postgresql://tracksolid_owner:<password>@timescale_db:5432/tracksolid_db
# S3 — source INC ticket CDC stream (isptickets bucket, automations/inc/changes/<EAT-ts>.csv) # S3 — source ticket CDC streams (isptickets bucket, automations/{inc,crq}/changes/<EAT-ts>.csv)
RUSTFS_ENDPOINT=https://s3.rahamafresh.com RUSTFS_ENDPOINT=https://s3.rahamafresh.com
RUSTFS_ACCESS_KEY=isptickets RUSTFS_ACCESS_KEY=isptickets
RUSTFS_SECRET_KEY=<secret> RUSTFS_SECRET_KEY=<secret>

View file

@ -1,7 +1,9 @@
# fleettickets — INC ingestion image (Coolify-deployable). # fleettickets — INC + CRQ ticket ingestion image (Coolify-deployable).
# A small batch/cron worker: it has no web server. Coolify keeps the container # A small batch/cron worker: it has no web server. Coolify keeps the container
# running (CMD below) and fires the ingest via a Scheduled Task: # running (CMD below) and fires the ingests via two Scheduled Tasks:
# python import_tickets.py --from-bucket --apply (cron: */20 6-20 * * *) # python -m inc.import_inc --from-bucket --apply (cron: */20 6-20 * * *)
# python -m crq.import_crq --from-bucket --apply (cron: */20 6-20 * * *)
# (run from /app so the inc/ and crq/ packages + pipeline.py/shared.py import.)
# Env (set in Coolify): DATABASE_URL, RUSTFS_*, GEOCODER_*. S3 is via boto3 — no # Env (set in Coolify): DATABASE_URL, RUSTFS_*, GEOCODER_*. S3 is via boto3 — no
# aws CLI needed. psycopg2-binary ships its own libpq, so no build toolchain. # aws CLI needed. psycopg2-binary ships its own libpq, so no build toolchain.
FROM python:3.12-slim FROM python:3.12-slim

View file

@ -1,11 +1,22 @@
# fleettickets # fleettickets
Field-ops **INC ticket** ingestion, geocoding, and read-schema that powers the Field-ops **ticket** ingestion, geocoding, and read-schema that powers the
**Tickets** map in FleetOps. Extracted from the `tracksolid` repo into its own module **Tickets** map in FleetOps. Extracted from the `tracksolid` repo into its own module
(it previously lived there as migrations 2123 + `tools/import_tickets.py`). (it previously lived there as migrations 2123 + `tools/import_tickets.py`).
- **INC** — incident / customer-fault tickets *(this pipeline is **strictly INC**)* Two ticket types, identical 32-column source schema and CDC change stream, served
- **CRQ** — new-installation requests *(schema kept, but **out of scope** — not ingested here)* through a **shared engine** (`pipeline.py`) with a thin per-type entrypoint each:
- **INC** — incident / customer-fault tickets → `tickets.inc` (`inc/import_inc.py`).
Full feature set: typed columns, geocoding, SLA view, dashboard fn, history capture.
- **CRQ** — new-installation requests → `tickets.crq` (`crq/import_crq.py`). **Data
layer + map** (typed columns, geocoding, appears on the Tickets map via
`fn_tickets_for_map`). SLA view / dashboard fn / history capture are deferred —
installation-lifecycle semantics differ from incidents (see roadmap). CRQ gets its
**own FleetOps tab**, same look & feel as INC.
Geocoding is **cross-dataset** (one gazetteer, one geocoder budget, covers inc + crq)
and is driven from the INC entrypoint.
## What this owns ## What this owns
@ -21,7 +32,10 @@ Field-ops **INC ticket** ingestion, geocoding, and read-schema that powers the
| `migrations/08_inc_open_sla_view.sql` | `tickets.inc_open_sla` view — open (`is_actionable`) tickets with **derived SLA** (`hours_open`, `sla_state` vs 48h; clock = `created_at_service``first_seen_at`), plus team/cluster/`geog` for dispatch | | `migrations/08_inc_open_sla_view.sql` | `tickets.inc_open_sla` view — open (`is_actionable`) tickets with **derived SLA** (`hours_open`, `sla_state` vs 48h; clock = `created_at_service``first_seen_at`), plus team/cluster/`geog` for dispatch |
| `migrations/09_inc_dashboard_fn.sql` | `reporting.fn_inc_dashboard(cluster, status, window, from, to)` — one JSON payload (`window` / `open` GeoJSON / `closed` GeoJSON / `metrics` / `freshness`) powering the FleetOps live INC map. Open=live, closed=windowed (EAT calendar / custom); filters AND | | `migrations/09_inc_dashboard_fn.sql` | `reporting.fn_inc_dashboard(cluster, status, window, from, to)` — one JSON payload (`window` / `open` GeoJSON / `closed` GeoJSON / `metrics` / `freshness`) powering the FleetOps live INC map. Open=live, closed=windowed (EAT calendar / custom); filters AND |
| `migrations/10_inc_history_capture.sql` | History for time-series: `tickets.closure_events` (append-only observed closures) + `tickets.inc_daily_snapshot` (per-EAT-day open backlog + flow), populated by `tickets.capture_history()` each ingest. Unlocks **backlog-over-time** | | `migrations/10_inc_history_capture.sql` | History for time-series: `tickets.closure_events` (append-only observed closures) + `tickets.inc_daily_snapshot` (per-EAT-day open backlog + flow), populated by `tickets.capture_history()` each ingest. Unlocks **backlog-over-time** |
| `import_tickets.py` | Drains the **incremental INC change stream** from the `isptickets` bucket (`automations/inc/changes/<EAT-timestamp>.csv`), upserting on `ticket_id` oldest→newest; geocodes clusters + INC locations | | `migrations/13_crq_columns.sql` | CRQ mirror of `03`: unpacks `tickets.crq.raw` into the same **typed STORED generated columns** + indexes (reuses `tickets.eat_ts()`). Brings CRQ to data-layer parity with INC |
| `pipeline.py` | **Shared engine** — the dataset-agnostic CDC loader (drains `automations/<type>/changes/<EAT-ts>.csv` from the `isptickets` bucket, upserts on `ticket_id` oldest→newest, watermark + per-file archive) and the **cross-dataset** geocoder (clusters + actionable inc/crq locations) |
| `inc/import_inc.py` | INC entrypoint (`python -m inc.import_inc`) — INC `Dataset` config + CLI; runs `tickets.capture_history()` after each `--apply`; hosts the shared geocode commands |
| `crq/import_crq.py` | CRQ entrypoint (`python -m crq.import_crq`) — CRQ `Dataset` config + CLI (ingest only; no history hook yet) |
| `run_migrations.py` | Applies `migrations/*.sql` in order (ledger: `tickets.schema_migrations`) | | `run_migrations.py` | Applies `migrations/*.sql` in order (ledger: `tickets.schema_migrations`) |
| `shared.py` | Minimal DB/logging helpers (self-contained — no tracksolid dependency) | | `shared.py` | Minimal DB/logging helpers (self-contained — no tracksolid dependency) |
@ -86,38 +100,44 @@ python run_migrations.py # apply the schema (idempotent)
## Run ## Run
```bash Run from the repo root so the `inc`/`crq` packages + `pipeline.py`/`shared.py` import.
# drain the incremental INC change stream (every new file oldest→newest, then archive)
python import_tickets.py --from-bucket --apply
# geocode (needs GEOCODER_API_KEY) ```bash
python import_tickets.py --geocode-clusters --apply # coarse, once # drain the incremental change streams (every new file oldest→newest, then archive)
python import_tickets.py --geocode-locations --apply # precise, actionable INC python -m inc.import_inc --from-bucket --apply
python -m crq.import_crq --from-bucket --apply
# geocode — CROSS-DATASET (covers inc + crq); driven from the INC entrypoint, needs GEOCODER_API_KEY
python -m inc.import_inc --geocode-clusters --apply # coarse, once
python -m inc.import_inc --geocode-locations --apply # precise, actionable inc+crq
# from a local CSV instead of the bucket (dev) # from a local CSV instead of the bucket (dev)
python import_tickets.py --inc-csv 2026-06-15T17-00-00.csv --apply python -m inc.import_inc --inc-csv 2026-06-15T17-00-00.csv --apply
python -m crq.import_crq --crq-csv 2026-06-24T12-55-44.csv --apply
``` ```
Dry-run is the default (omit `--apply`). `import_tickets.py --from-bucket` talks to S3 Dry-run is the default (omit `--apply`). `--from-bucket` talks to S3 via **boto3** using
via **boto3** using the `RUSTFS_*` env (path-style addressing; no aws-CLI dependency). the `RUSTFS_*` env (path-style addressing; no aws-CLI dependency).
## Deploy (Coolify) ## Deploy (Coolify)
The repo ships a [`Dockerfile`](Dockerfile) — a small batch worker with no web server. The repo ships a [`Dockerfile`](Dockerfile) — a small batch worker with no web server.
Coolify builds it and keeps the container alive (`CMD tail -f /dev/null`); the ingest Coolify builds it and keeps the container alive (`CMD tail -f /dev/null`); each ingest
runs as a **Scheduled Task**, not a system crontab: runs as its own **Scheduled Task**, not a system crontab:
- **Command:** `python import_tickets.py --from-bucket --apply` - **`inc_tickets`:** `python -m inc.import_inc --from-bucket --apply`
- **Frequency:** `*/20 6-20 * * *` (every 20 min, **06:0020:40 EAT**). This - **`crq_tickets`:** `python -m crq.import_crq --from-bucket --apply`
- **Frequency:** both `*/20 6-20 * * *` (every 20 min, **06:0020:40 EAT**). This
Coolify instance runs scheduled tasks in **EAT (Africa/Nairobi)**, so no UTC Coolify instance runs scheduled tasks in **EAT (Africa/Nairobi)**, so no UTC
conversion is needed. conversion is needed.
- **Env vars** (Coolify → Environment Variables): `DATABASE_URL` (internal DB host), - **Env vars** (Coolify → Environment Variables): `DATABASE_URL` (internal DB host),
`RUSTFS_*` (now the `isptickets` bucket credentials), `GEOCODER_*`. `RUSTFS_*` (the `isptickets` bucket credentials), `GEOCODER_*`. The same bucket holds
both `automations/inc/` and `automations/crq/`, so one credential set serves both tasks.
The watermark makes a run with no new change files a cheap no-op. The watermark makes a run with no new change files a cheap no-op.
For a plain host/VM instead of Coolify, [`run_ingest.sh`](run_ingest.sh) loads `.env` For a plain host/VM instead of Coolify, [`run_ingest.sh`](run_ingest.sh) loads `.env`
and runs the ingest; schedule it with a crontab line and runs **both** ingests; schedule it with a crontab line
(`CRON_TZ=Africa/Nairobi` / `*/20 6-20 * * *`). (`CRON_TZ=Africa/Nairobi` / `*/20 6-20 * * *`).
Full operational runbook — container, env management (encrypted; via the UI or Full operational runbook — container, env management (encrypted; via the UI or
@ -134,8 +154,8 @@ newer than the new bucket's first file — which would otherwise be skipped. Poi
which ignores the stored watermark and ingests **every** file in `changes/` oldest→newest: which ignores the stored watermark and ingests **every** file in `changes/` oldest→newest:
```bash ```bash
python import_tickets.py --from-bucket --reseed # dry-run first python -m inc.import_inc --from-bucket --reseed # dry-run first (or -m crq.import_crq)
python import_tickets.py --from-bucket --reseed --apply # commit + archive python -m inc.import_inc --from-bucket --reseed --apply # commit + archive
``` ```
Upserts are idempotent (`ticket_id` PK, rows never deleted) and the new stream's periodic Upserts are idempotent (`ticket_id` PK, rows never deleted) and the new stream's periodic
@ -219,6 +239,16 @@ Findings to keep in mind (see the PRD for detail):
Live: INC ingestion deployed on Coolify (every 20 min `*/20 6-20 * * *` EAT), schema + Live: INC ingestion deployed on Coolify (every 20 min `*/20 6-20 * * *` EAT), schema +
generated columns + geocoding + the `inc_open_sla` view in `tracksolid_db`. generated columns + geocoding + the `inc_open_sla` view in `tracksolid_db`.
Next (Phase 2): time-series analytics (closure rate, MTTR/SLA trends), then FleetNow
vehicle **dispatch** off `geog`, and **team closure attribution**. **CRQ** is a **CRQ (this milestone):** data layer + map — `tickets.crq` fed from
separate future project that will reuse this machinery against `automations/crq/`. `automations/crq/changes/` by `crq/import_crq.py`, typed columns (migration 13),
cross-dataset geocoding, and visibility on the Tickets map via `fn_tickets_for_map`.
One-time seed: drain the isptickets CRQ stream (`python -m crq.import_crq --from-bucket
--apply`) — empty watermark + the stream's periodic full-state snapshots converge to
current state — then run the shared geocode once. See
[`docs/deployment-and-operations.md`](docs/deployment-and-operations.md).
Next (Phase 2): bring CRQ to full INC parity once installation-lifecycle semantics are
confirmed — a `crq_open_sla` view, `fn_crq_dashboard`, and CRQ history capture (the INC
analogues of migrations 08/09/10). Then time-series analytics (closure rate, MTTR/SLA
trends), FleetNow vehicle **dispatch** off `geog`, and **team closure attribution**.

0
crq/__init__.py Normal file
View file

61
crq/import_crq.py Normal file
View file

@ -0,0 +1,61 @@
"""
crq/import_crq.py Fireside Communications · CRQ (new-installation) ingestion.
Thin entrypoint over the shared engine (`pipeline.py`) for the CRQ dataset:
tickets.crq new-installation requests (FleetOps "Tickets" CRQ tab)
CRQ mirrors INC at the data layer IDENTICAL 32-column CSV schema and the same
incremental CDC change stream automations/crq/changes/<EAT-ts>.csv in the
`isptickets` bucket. This loader upserts on ticket_id, advances the per-dataset
watermark (tickets.import_meta dataset='crq'), and archives each consumed file to
automations/crq/processed/. CRQ flows onto the existing Tickets map via
reporting.fn_tickets_for_map (which already unions tickets.crq).
Scope (current): data layer + map only. CRQ has NO post-apply history capture yet
(installation-lifecycle SLA/backlog semantics differ from incidents a future
migration). Geocoding is CROSS-DATASET and run from the INC entrypoint
(python -m inc.import_inc --geocode-clusters / --geocode-locations) against the
shared gazetteer, which covers both inc and crq.
Usage (needs DATABASE_URL + RUSTFS_* env; see .env.example):
python -m crq.import_crq --from-bucket --apply
python -m crq.import_crq --from-bucket --reseed --apply # one-time bucket cutover
python -m crq.import_crq --crq-csv 2026-06-24T12-55-44.csv --apply
Pre-requisite: migrations applied (run_migrations.py) tickets.crq + its typed
columns (13_crq_columns.sql) + geo_clusters/geo_locations + fn_tickets_for_map.
"""
from __future__ import annotations
import argparse
import pipeline
# CRQ has no post-apply hook yet (history capture is INC-only — see module docstring).
DATASET = pipeline.make_dataset("crq", post_apply=None)
def main() -> None:
ap = argparse.ArgumentParser(
description="Ingest CRQ (installation) tickets from CSV (raw-first)")
ap.add_argument("--apply", action="store_true", help="Write to DB (default: dry-run)")
ap.add_argument("--from-bucket", action="store_true",
help="Drain the incremental CRQ change stream (automations/crq/changes/) "
"from the isptickets S3 bucket: every not-yet-processed file "
"oldest→newest, upsert on ticket_id, advance the watermark, archive")
ap.add_argument("--reseed", action="store_true",
help="Ignore the stored watermark and drain every file in changes/ once "
"(one-time bucket cutover / reseed). Use with --from-bucket --apply")
ap.add_argument("--crq-csv", dest="local_csv", default=None,
help="Local CRQ tickets CSV file (dev)")
args = ap.parse_args()
if not (args.from_bucket or args.local_csv):
ap.error("provide --from-bucket or --crq-csv")
pipeline.ingest(DATASET, args)
if __name__ == "__main__":
main()

View file

@ -1,20 +1,26 @@
# Deployment & Operations — fleettickets # Deployment & Operations — fleettickets
Operational runbook for the INC ingest pipeline as deployed on **Coolify** Operational runbook for the INC + CRQ ingest pipelines as deployed on **Coolify**
(host `kianiadee@twala.rahamafresh.com`, key `~/.ssh/id_ed25519`). Covers the (host `kianiadee@twala.rahamafresh.com`, key `~/.ssh/id_ed25519`). Covers the
container, environment, schedule, auto-deploy webhook, the source-bucket cutover container, environment, schedule, auto-deploy webhook, the source-bucket cutover
procedure, and verification. Secrets are referenced by **where to retrieve them**, procedure, and verification. Secrets are referenced by **where to retrieve them**,
never by value. never by value.
> **One image, two datasets.** INC and CRQ share an identical 32-column source schema
> and the same `isptickets` bucket; they run as **two Scheduled Tasks** off the one
> container, via thin entrypoints `python -m inc.import_inc` / `python -m crq.import_crq`
> over the shared `pipeline.py` engine. Everything below applies to both unless noted.
## What's deployed ## What's deployed
| Thing | Detail | | Thing | Detail |
|---|---| |---|---|
| Coolify app | **`fleettickets`** — id `15`, uuid `g14mwzo73q20g70vc6fzumya`, build pack `dockerfile`, git `main` | | Coolify app | **`fleettickets`** — id `15`, uuid `g14mwzo73q20g70vc6fzumya`, build pack `dockerfile`, git `main` |
| Container | built from this repo's `Dockerfile` (`python:3.12-slim`, `TZ=Africa/Nairobi`); kept alive with `tail -f /dev/null` (no web server) | | Container | built from this repo's `Dockerfile` (`python:3.12-slim`, `TZ=Africa/Nairobi`); kept alive with `tail -f /dev/null` (no web server) |
| Ingest | a Coolify **Scheduled Task** `inc_tickets` running `python import_tickets.py --from-bucket --apply` | | Ingest (INC) | Coolify **Scheduled Task** `inc_tickets``python -m inc.import_inc --from-bucket --apply` |
| Ingest (CRQ) | Coolify **Scheduled Task** `crq_tickets``python -m crq.import_crq --from-bucket --apply` |
| DB | `tickets` schema in the shared `tracksolid_db` (internal host `timescale_db:5432`) | | DB | `tickets` schema in the shared `tracksolid_db` (internal host `timescale_db:5432`) |
| Source | **`isptickets`** S3 bucket, `automations/inc/changes/<EAT-ts>.csv` CDC stream (see `../n8n-s3-ticket-exports.md` and `../README.md`) | | Source | **`isptickets`** S3 bucket, `automations/{inc,crq}/changes/<EAT-ts>.csv` CDC streams (see `../n8n-s3-ticket-exports.md` and `../README.md`) |
Resolve the live container name (Coolify appends a random suffix): Resolve the live container name (Coolify appends a random suffix):
```bash ```bash
@ -24,17 +30,20 @@ ssh -i ~/.ssh/id_ed25519 kianiadee@twala.rahamafresh.com \
## Schedule (cron) ## Schedule (cron)
The Scheduled Task runs **`*/20 6-20 * * *`** — every 20 min, **06:0020:40 EAT**. Both Scheduled Tasks (`inc_tickets`, `crq_tickets`) run **`*/20 6-20 * * *`** — every
Coolify evaluates task cron in the server timezone (`server_settings.server_timezone` 20 min, **06:0020:40 EAT**. Coolify evaluates task cron in the server timezone
= `Africa/Nairobi`), so **no UTC conversion** — write EAT directly. The `--from-bucket` (`server_settings.server_timezone` = `Africa/Nairobi`), so **no UTC conversion** — write
run is a cheap no-op when no new change file has arrived (watermark guard), so a dense EAT directly. The `--from-bucket` run is a cheap no-op when no new change file has arrived
schedule is safe. (watermark guard, per dataset), so a dense schedule is safe.
To change the frequency, edit the task in the Coolify UI, or in `coolify-db`: To change the frequency, edit the task in the Coolify UI, or in `coolify-db`:
```sql ```sql
UPDATE scheduled_tasks SET frequency = '*/20 6-20 * * *', updated_at = now() UPDATE scheduled_tasks SET frequency = '*/20 6-20 * * *', updated_at = now()
WHERE name = 'inc_tickets'; -- id 3 WHERE name IN ('inc_tickets', 'crq_tickets');
``` ```
The `crq_tickets` task is added the same way INC was — in the Coolify UI (Scheduled Tasks
→ Add) with command `python -m crq.import_crq --from-bucket --apply`, container
`fleettickets`, cron `*/20 6-20 * * *`.
Coolify's scheduler re-reads `scheduled_tasks` each minute, so the change is picked up Coolify's scheduler re-reads `scheduled_tasks` each minute, so the change is picked up
without a redeploy. Execution history: `scheduled_task_executions`. without a redeploy. Execution history: `scheduled_task_executions`.
@ -118,27 +127,51 @@ application_id = '15' ORDER BY created_at DESC LIMIT 3;` (note: `application_id`
If the provider moves the INC feed to a new bucket (as happened `tickets``isptickets`, If the provider moves the INC feed to a new bucket (as happened `tickets``isptickets`,
2026-06-25): 2026-06-25):
1. **Inspect** the new bucket (read-only) — confirm `automations/inc/changes/` layout, 1. **Inspect** the new bucket (read-only) — confirm `automations/{inc,crq}/changes/` layout,
timestamp range, schema parity. CRQ (`automations/crq/`) stays out of scope. timestamp range, schema parity.
2. **Update env** (UI or tinker): `RUSTFS_ACCESS_KEY`, `RUSTFS_SECRET_KEY`, 2. **Update env** (UI or tinker): `RUSTFS_ACCESS_KEY`, `RUSTFS_SECRET_KEY`,
`TICKETS_BUCKET` → the new bucket (endpoint usually unchanged). `TICKETS_BUCKET` → the new bucket (endpoint usually unchanged). Both datasets read the
same bucket, so one env change serves both tasks.
3. **Reconcile the DB** to current. The loader drains every `changes/` file newer than the 3. **Reconcile the DB** to current. The loader drains every `changes/` file newer than the
watermark (`tickets.import_meta.metadata.source_max_key`), oldest→newest, upserting on watermark (`tickets.import_meta.metadata.source_max_key`, **per dataset**), oldest→newest,
`ticket_id`: upserting on `ticket_id`:
- If the watermark **predates** the new bucket's first file, a normal - If the watermark **predates** the new bucket's first file, a normal
`--from-bucket --apply` drains the whole new stream — no reseed needed. `--from-bucket --apply` drains the whole new stream — no reseed needed.
- Otherwise use **`--reseed`** (ignores the watermark, drains all `changes/` once): - Otherwise use **`--reseed`** (ignores the watermark, drains all `changes/` once):
`python import_tickets.py --from-bucket --reseed --apply` (see README "Bucket cutover"). `python -m inc.import_inc --from-bucket --reseed --apply` (see README "Bucket cutover").
The new stream's periodic full-state re-emissions make this converge even across the The new stream's periodic full-state re-emissions make this converge even across the
cutover gap. Idempotent upserts + never-delete make it non-destructive. cutover gap. Idempotent upserts + never-delete make it non-destructive.
- For a one-off, you can run it in the live container with the new creds inlined: - For a one-off, you can run it in the live container with the new creds inlined:
`docker exec -e TICKETS_BUCKET=… -e RUSTFS_ACCESS_KEY=… -e RUSTFS_SECRET_KEY=… <container> `docker exec -e TICKETS_BUCKET=… -e RUSTFS_ACCESS_KEY=… -e RUSTFS_SECRET_KEY=… <container>
sh -c "cd /app && python import_tickets.py --from-bucket --apply"`. sh -c "cd /app && python -m inc.import_inc --from-bucket --apply"`.
4. **Re-geocode** new clusters/locations: `--geocode-clusters --apply` then 4. **Re-geocode** new clusters/locations: `python -m inc.import_inc --geocode-clusters --apply`
`--geocode-locations --apply` (existing gazetteer persists; only new keys are looked up). then `--geocode-locations --apply` (cross-dataset; existing gazetteer persists; only new
keys are looked up).
5. **Redeploy** so the Scheduled Task's container picks up the new env (push `main` → webhook, 5. **Redeploy** so the Scheduled Task's container picks up the new env (push `main` → webhook,
or manual deploy). Old bucket is left untouched for rollback. or manual deploy). Old bucket is left untouched for rollback.
## Bringing CRQ online (one-time seed)
CRQ was added 2026-06-25 (data layer + map). To seed `tickets.crq` from zero on the live
DB — once the code + migration `13_crq_columns.sql` are deployed (`run_migrations.py`
applies it on build):
1. **Verify** the migration applied: `SELECT 1 FROM tickets.schema_migrations WHERE
filename='13_crq_columns.sql';` and `\d tickets.crq` shows the typed columns.
2. **Seed** from isptickets (empty `crq` watermark → drains all `automations/crq/changes/`
files oldest→newest; the stream's periodic full-state snapshots converge to current
state — same convergence the INC cutover relied on, so **no `--reseed` needed**):
```bash
python -m crq.import_crq --from-bucket # dry-run first ("N of N change file(s)…")
python -m crq.import_crq --from-bucket --apply # commit + archive to crq/processed/
```
(Or in the live container with `docker exec … sh -c "cd /app && python -m crq.import_crq
--from-bucket --apply"`.)
3. **Geocode** (cross-dataset; most clusters already resolved from INC, so few new lookups):
`python -m inc.import_inc --geocode-clusters --apply` then `--geocode-locations --apply`.
4. **Confirm** CRQ on the map: `SELECT reporting.fn_tickets_for_map() -> 'summary';` shows a
non-zero `crq` count. The `crq_tickets` Scheduled Task then keeps it current.
## Verification ## Verification
```bash ```bash

View file

@ -3,9 +3,18 @@
What is actually built and deployed, as of the Phase-1 completion. Companion to What is actually built and deployed, as of the Phase-1 completion. Companion to
`docs/phase-1-ingestion.md` (plan) and `docs/phase-2-dashboard.md` (next). `docs/phase-1-ingestion.md` (plan) and `docs/phase-2-dashboard.md` (next).
## Pipeline (`import_tickets.py`) ## Pipeline (`pipeline.py` engine + `inc/`,`crq/` entrypoints)
- **Source:** the incremental CDC stream `automations/inc/changes/<EAT-timestamp>.csv` The dataset-agnostic CDC engine lives in **`pipeline.py`**, parameterized by a small
`Dataset` config (name, table, `automations/<type>/changes|processed/` prefixes, key
regex, optional `post_apply` hook). Two thin entrypoints supply that config and the CLI:
**`inc/import_inc.py`** (`python -m inc.import_inc`, `post_apply=capture_history`) and
**`crq/import_crq.py`** (`python -m crq.import_crq`, no history hook). INC and CRQ share an
**identical 32-column source schema**, so the engine is fully shared; geocoding is
**cross-dataset** (one gazetteer/budget, unions `tickets.inc` + `tickets.crq`) and is run
from the INC entrypoint.
- **Source:** the incremental CDC stream `automations/<inc|crq>/changes/<EAT-timestamp>.csv`
in the **`isptickets`** S3 bucket (endpoint `https://s3.rahamafresh.com`, path-style, in the **`isptickets`** S3 bucket (endpoint `https://s3.rahamafresh.com`, path-style,
region `us-east-1`; was the `tickets` bucket before the 2026-06-25 cutover). region `us-east-1`; was the `tickets` bucket before the 2026-06-25 cutover).
- **S3 access via boto3** (no aws-CLI dependency): `list_objects_v2` (paginator), - **S3 access via boto3** (no aws-CLI dependency): `list_objects_v2` (paginator),
@ -24,9 +33,11 @@ What is actually built and deployed, as of the Phase-1 completion. Companion to
- **History capture:** after each `--apply` run (ingest or skip), calls - **History capture:** after each `--apply` run (ingest or skip), calls
`tickets.capture_history()` → appends new closures + upserts today's backlog `tickets.capture_history()` → appends new closures + upserts today's backlog
snapshot. snapshot.
- CLI: `--from-bucket` (drain the INC change stream), `--reseed` (ignore the watermark; - CLI (`inc`): `--from-bucket` (drain the INC change stream), `--reseed` (ignore the
one-time bucket cutover), `--inc-csv <file>` (local dev), `--apply` (else dry-run), watermark; one-time bucket cutover), `--inc-csv <file>` (local dev), `--apply` (else
`--geocode-clusters`, `--geocode-locations`, `--capture-history`. dry-run), `--geocode-clusters`, `--geocode-locations`, `--capture-history`.
- CLI (`crq`): `--from-bucket`, `--reseed`, `--crq-csv <file>`, `--apply` (ingest only;
geocoding + history are not on the CRQ entrypoint).
## Schema / migrations (`tracksolid_db`, applied via `run_migrations.py`) ## Schema / migrations (`tracksolid_db`, applied via `run_migrations.py`)
@ -42,6 +53,8 @@ What is actually built and deployed, as of the Phase-1 completion. Companion to
| 08_inc_open_sla_view | `tickets.inc_open_sla` view (open tickets + derived SLA) | | 08_inc_open_sla_view | `tickets.inc_open_sla` view (open tickets + derived SLA) |
| 09_inc_dashboard_fn | **built**`reporting.fn_inc_dashboard(cluster, status, window, from, to)`: one JSON payload (open GeoJSON + windowed closed GeoJSON + metrics + freshness) for the FleetOps live INC map. See `docs/phase-2-dashboard.md` | | 09_inc_dashboard_fn | **built**`reporting.fn_inc_dashboard(cluster, status, window, from, to)`: one JSON payload (open GeoJSON + windowed closed GeoJSON + metrics + freshness) for the FleetOps live INC map. See `docs/phase-2-dashboard.md` |
| 10_inc_history_capture | **built**`tickets.closure_events` (append-only observed closures) + `tickets.inc_daily_snapshot` (per-EAT-day open backlog + flow) + `tickets.capture_history()`; the ingest calls it each `--apply` run. Unlocks backlog-over-time | | 10_inc_history_capture | **built**`tickets.closure_events` (append-only observed closures) + `tickets.inc_daily_snapshot` (per-EAT-day open backlog + flow) + `tickets.capture_history()`; the ingest calls it each `--apply` run. Unlocks backlog-over-time |
| 12_inc_dashboard_by_owner | **built** — owner/team breakdown extension to `fn_inc_dashboard` |
| 13_crq_columns | **built** — CRQ mirror of `03`: typed STORED generated columns + indexes on `tickets.crq` (reuses `tickets.eat_ts()`). Data-layer parity for the CRQ tab |
`tickets.inc` columns: `ticket_id` (PK), `raw` (jsonb, source of truth), `tickets.inc` columns: `ticket_id` (PK), `raw` (jsonb, source of truth),
`normalized_status`/`raw_status`, `bucket`, `is_actionable`, `cluster`/`region`/ `normalized_status`/`raw_status`, `bucket`, `is_actionable`, `cluster`/`region`/
@ -56,10 +69,11 @@ What is actually built and deployed, as of the Phase-1 completion. Companion to
- **Coolify** app built from this repo's `Dockerfile` (`python:3.12-slim`, - **Coolify** app built from this repo's `Dockerfile` (`python:3.12-slim`,
`TZ=Africa/Nairobi`, keep-alive `tail -f /dev/null`). Separate from the FleetOps `TZ=Africa/Nairobi`, keep-alive `tail -f /dev/null`). Separate from the FleetOps
web app (`fleet-ops-staging`). web app (`fleet-ops-staging`).
- **Scheduled Task:** `python import_tickets.py --from-bucket --apply`, cron - **Scheduled Tasks (two):** `inc_tickets` → `python -m inc.import_inc --from-bucket
--apply` and `crq_tickets``python -m crq.import_crq --from-bucket --apply`, both cron
`*/20 6-20 * * *` in **EAT** (Coolify runs tasks in EAT — no UTC conversion). `*/20 6-20 * * *` in **EAT** (Coolify runs tasks in EAT — no UTC conversion).
- **Env vars** (Coolify): `DATABASE_URL` (internal DB host), `RUSTFS_*` - **Env vars** (Coolify): `DATABASE_URL` (internal DB host), `RUSTFS_*`
(`isptickets` bucket), `GEOCODER_*`. (`isptickets` bucket — serves both inc + crq), `GEOCODER_*`.
- For a plain host/VM, `run_ingest.sh` + a crontab line is the alternative. - For a plain host/VM, `run_ingest.sh` + a crontab line is the alternative.
Full ops runbook (env management, the Forgejo → Coolify auto-deploy webhook, manual Full ops runbook (env management, the Forgejo → Coolify auto-deploy webhook, manual
@ -93,5 +107,12 @@ Phase 2 (built): `fn_inc_dashboard` read-API → FleetOps live map (open + close
overlay + metrics); history capture (`closure_events` + `inc_daily_snapshot`) for overlay + metrics); history capture (`closure_events` + `inc_daily_snapshot`) for
backlog/closure trends. Remaining: `dashboard_api` endpoint + FleetOps SPA (other backlog/closure trends. Remaining: `dashboard_api` endpoint + FleetOps SPA (other
repos; see `docs/dashboard-api-contract.md`), FleetNow **dispatch** off `geog`, repos; see `docs/dashboard-api-contract.md`), FleetNow **dispatch** off `geog`,
**team closure attribution**. **CRQ** = separate future project reusing this **team closure attribution**.
machinery against `automations/crq/`.
**CRQ** (this milestone): the shared engine now feeds `tickets.crq` from
`automations/crq/changes/` (`crq/import_crq.py`), with typed columns (migration 13) and
cross-dataset geocoding — CRQ shows on the Tickets map via `fn_tickets_for_map` (which
already unions it) and gets its own FleetOps tab. Deferred to a follow-up once
installation-lifecycle semantics are confirmed: the CRQ analogues of migrations
08/09/10 — `crq_open_sla`, `fn_crq_dashboard`, and CRQ history capture (`tickets.crq`
currently has **no** `post_apply` hook).

View file

@ -82,6 +82,11 @@ Deployed on **Coolify** (own app, `Dockerfile`, keep-alive worker). Ingest runs
**Scheduled Task**: `python import_tickets.py --from-bucket --apply`, cron **Scheduled Task**: `python import_tickets.py --from-bucket --apply`, cron
`15 7-19 * * *` in **EAT**. Env: `DATABASE_URL`, `RUSTFS_*`, `GEOCODER_*`. `15 7-19 * * *` in **EAT**. Env: `DATABASE_URL`, `RUSTFS_*`, `GEOCODER_*`.
> **Superseded** (historical Phase-1 plan). As built: the loader is now the shared
> `pipeline.py` engine with thin entrypoints (`python -m inc.import_inc` / `-m
> crq.import_crq`), running as **two** Scheduled Tasks at cron `*/20 6-20 * * *`. See
> `implementation.md` and `deployment-and-operations.md`.
## Data-quality findings (carried into Phase 2) ## Data-quality findings (carried into Phase 2)
- Source `sla_status` ≠ a plain 48h rule, and `mttr` is not wall-clock — pin the - Source `sla_status` ≠ a plain 48h rule, and `mttr` is not wall-clock — pin the

0
inc/__init__.py Normal file
View file

74
inc/import_inc.py Normal file
View file

@ -0,0 +1,74 @@
"""
inc/import_inc.py Fireside Communications · INC (incident / fault) ingestion.
Thin entrypoint over the shared engine (`pipeline.py`) for the INC dataset:
tickets.inc incidents / customer faults (FleetOps "Tickets" INC tab)
INC reads the incremental CDC change stream automations/inc/changes/<EAT-ts>.csv
from the `isptickets` bucket, upserts on ticket_id, advances the per-dataset
watermark, archives each file to automations/inc/processed/, and uniquely to
INC runs tickets.capture_history() after each --apply run (closure_events +
daily backlog snapshot). Geocoding (--geocode-clusters / --geocode-locations) is
CROSS-DATASET and driven from here (the shared gazetteer covers inc + crq).
Usage (needs DATABASE_URL + RUSTFS_* + GEOCODER_* env; see .env.example):
python -m inc.import_inc --from-bucket --apply
python -m inc.import_inc --from-bucket --reseed --apply # one-time bucket cutover
python -m inc.import_inc --inc-csv 2026-06-15T17-00-00.csv --apply
python -m inc.import_inc --geocode-clusters --apply
python -m inc.import_inc --geocode-locations --apply
Pre-requisite: migrations applied (run_migrations.py) tickets.inc/crq +
geo_clusters + geo_locations + reporting.fn_tickets_for_map.
"""
from __future__ import annotations
import argparse
import pipeline
# INC captures closure/backlog history after every --apply run (CRQ does not yet).
DATASET = pipeline.make_dataset("inc", post_apply=pipeline.capture_history)
def main() -> None:
ap = argparse.ArgumentParser(description="Ingest INC tickets from CSV (raw-first) + geocode")
ap.add_argument("--apply", action="store_true", help="Write to DB (default: dry-run)")
ap.add_argument("--from-bucket", action="store_true",
help="Drain the incremental INC change stream (automations/inc/changes/) "
"from the isptickets S3 bucket: every not-yet-processed file "
"oldest→newest, upsert on ticket_id, advance the watermark, archive")
ap.add_argument("--reseed", action="store_true",
help="Ignore the stored watermark and drain every file in changes/ once "
"(one-time bucket cutover / reseed). Use with --from-bucket --apply")
ap.add_argument("--inc-csv", dest="local_csv", default=None,
help="Local INC tickets CSV file (dev)")
ap.add_argument("--geocode-clusters", action="store_true",
help="Geocode distinct clusters (inc+crq) into the gazetteer, then re-resolve")
ap.add_argument("--geocode-locations", action="store_true",
help="Geocode actionable inc+crq location_names precisely (keyed provider), "
"then re-resolve")
ap.add_argument("--capture-history", action="store_true",
help="Run tickets.capture_history() standalone "
"(closure_events + daily snapshot)")
args = ap.parse_args()
if args.geocode_clusters:
pipeline.geocode_clusters(apply=args.apply)
return
if args.geocode_locations:
pipeline.geocode_locations(apply=args.apply)
return
if args.capture_history:
pipeline.capture_history()
return
if not (args.from_bucket or args.local_csv):
ap.error("provide --from-bucket, --inc-csv, --geocode-clusters, "
"--geocode-locations, or --capture-history")
pipeline.ingest(DATASET, args)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,55 @@
-- 13_crq_columns.sql — fleettickets · unpack tickets.crq.raw into typed columns
-- ─────────────────────────────────────────────────────────────────────────────
-- CRQ (new-installation) mirror of 03_inc_columns.sql. CRQ shares the INC source's
-- IDENTICAL 32-column flat-CSV schema, so the same STORED generated columns apply:
-- the crq dataset gets real typed, indexable columns while `raw` stays the source
-- of truth (drift-safe). STORED generated columns are computed for ALL existing
-- rows on creation and auto-recomputed on every future insert/update — no loader
-- change needed.
--
-- tickets.eat_ts() (EAT wall-clock text -> timestamptz, IMMUTABLE) already exists
-- from 03_inc_columns.sql — reuse it, don't redefine. See that file's note on why
-- IMMUTABLE is safe for Kenya (fixed UTC+3, no DST).
--
-- Idempotent: safe on a fresh DB and re-appliable on the live DB.
-- ─────────────────────────────────────────────────────────────────────────────
SET search_path = tickets, public;
ALTER TABLE tickets.crq
-- text
ADD COLUMN IF NOT EXISTS service_type text GENERATED ALWAYS AS (raw->>'service_type') STORED,
ADD COLUMN IF NOT EXISTS bucket text GENERATED ALWAYS AS (raw->>'bucket') STORED,
ADD COLUMN IF NOT EXISTS raw_status text GENERATED ALWAYS AS (raw->>'raw_status') STORED,
ADD COLUMN IF NOT EXISTS normalized_status text GENERATED ALWAYS AS (raw->>'normalized_status') STORED,
ADD COLUMN IF NOT EXISTS cluster text GENERATED ALWAYS AS (raw->>'cluster') STORED,
ADD COLUMN IF NOT EXISTS region text GENERATED ALWAYS AS (raw->>'region') STORED,
ADD COLUMN IF NOT EXISTS location_name text GENERATED ALWAYS AS (raw->>'location_name') STORED,
ADD COLUMN IF NOT EXISTS assigned_team text GENERATED ALWAYS AS (raw->>'assigned_team') STORED,
ADD COLUMN IF NOT EXISTS owner text GENERATED ALWAYS AS (raw->>'owner') STORED,
ADD COLUMN IF NOT EXISTS sla_status text GENERATED ALWAYS AS (raw->>'sla_status') STORED,
-- numeric / float
ADD COLUMN IF NOT EXISTS mttr numeric GENERATED ALWAYS AS (NULLIF(raw->>'mttr','')::numeric) STORED,
ADD COLUMN IF NOT EXISTS latitude double precision GENERATED ALWAYS AS (NULLIF(raw->>'latitude','')::double precision) STORED,
ADD COLUMN IF NOT EXISTS longitude double precision GENERATED ALWAYS AS (NULLIF(raw->>'longitude','')::double precision) STORED,
-- boolean
ADD COLUMN IF NOT EXISTS is_actionable boolean GENERATED ALWAYS AS (NULLIF(raw->>'is_actionable','')::boolean) STORED,
ADD COLUMN IF NOT EXISTS is_auto_created boolean GENERATED ALWAYS AS (NULLIF(raw->>'is_auto_created','')::boolean) STORED,
ADD COLUMN IF NOT EXISTS is_auto_closed boolean GENERATED ALWAYS AS (NULLIF(raw->>'is_auto_closed','')::boolean) STORED,
ADD COLUMN IF NOT EXISTS is_alarm boolean GENERATED ALWAYS AS (NULLIF(raw->>'is_alarm','')::boolean) STORED,
-- timestamps (EAT wall-clock -> timestamptz). created_at/updated_at are the
-- EXPORT pipeline's bookkeeping (not ticket lifecycle), hence the source_ prefix.
ADD COLUMN IF NOT EXISTS created_at_service timestamptz GENERATED ALWAYS AS (tickets.eat_ts(raw->>'created_at_service')) STORED,
ADD COLUMN IF NOT EXISTS scheduled_at timestamptz GENERATED ALWAYS AS (tickets.eat_ts(raw->>'scheduled_at')) STORED,
ADD COLUMN IF NOT EXISTS closed_at timestamptz GENERATED ALWAYS AS (tickets.eat_ts(raw->>'closed_at')) STORED,
ADD COLUMN IF NOT EXISTS last_seen_at timestamptz GENERATED ALWAYS AS (tickets.eat_ts(raw->>'last_seen_at')) STORED,
ADD COLUMN IF NOT EXISTS first_seen_at timestamptz GENERATED ALWAYS AS (tickets.eat_ts(raw->>'first_seen_at')) STORED,
ADD COLUMN IF NOT EXISTS source_created_at timestamptz GENERATED ALWAYS AS (tickets.eat_ts(raw->>'created_at')) STORED,
ADD COLUMN IF NOT EXISTS source_updated_at timestamptz GENERATED ALWAYS AS (tickets.eat_ts(raw->>'updated_at')) STORED;
-- indexes on the new typed columns (serve cluster / team / closure queries)
CREATE INDEX IF NOT EXISTS ix_crq_norm_status_col ON tickets.crq (normalized_status);
CREATE INDEX IF NOT EXISTS ix_crq_cluster_col ON tickets.crq (cluster);
CREATE INDEX IF NOT EXISTS ix_crq_assigned_team ON tickets.crq (assigned_team);
CREATE INDEX IF NOT EXISTS ix_crq_closed_at ON tickets.crq (closed_at);
CREATE INDEX IF NOT EXISTS ix_crq_actionable_col ON tickets.crq (is_actionable) WHERE is_actionable;

View file

@ -61,10 +61,12 @@ Notes:
listings but contain **no objects** (leftover/legacy; ignore them). There is listings but contain **no objects** (leftover/legacy; ignore them). There is
no `latest` pointer and no JSON/metadata envelope. no `latest` pointer and no JSON/metadata envelope.
- **CRQ mirrors INC**: `automations/crq/changes/` carries the same incremental - **CRQ mirrors INC**: `automations/crq/changes/` carries the same incremental
stream (with matching baseline timestamps and additional newer deltas). CRQ stream (with matching baseline timestamps and additional newer deltas) and the
remains out of scope for `import_tickets.py` (INC-only), but the source-side identical 32-column schema. As of 2026-06-25 CRQ **is consumed** — by
shape is the same. CRQ's old root snapshots (`automations/crq/<ts>.csv`) are `crq/import_crq.py` over the shared `pipeline.py` engine (`tickets.crq`), the same
still present because nothing archives them — they are not consumed. way INC is consumed by `inc/import_inc.py`. CRQ's old root snapshots
(`automations/crq/<ts>.csv`, old `tickets` bucket) are still present because nothing
archives them — they are not consumed (the `changes/` stream is the source of truth).
## CSV Schema ## CSV Schema
@ -83,7 +85,7 @@ Each row is a complete record (not a partial diff): a delta row carries the
ticket's full current state, so a plain upsert on `ticket_id` is correct. The ticket's full current state, so a plain upsert on `ticket_id` is correct. The
baseline still contains `is_alarm=true` rows and may include a leading baseline still contains `is_alarm=true` rows and may include a leading
`EXPORT STOPPED…` truncation-sentinel row in `ticket_id`; both are filtered by `EXPORT STOPPED…` truncation-sentinel row in `ticket_id`; both are filtered by
the consumer (see `import_tickets.py`). the consumer (see `pipeline.py` + the `inc/`,`crq/` entrypoints).
## Timestamp Format ## Timestamp Format
@ -96,11 +98,12 @@ Generated once at the start of each execution, formatted in `Africa/Nairobi`
incremental files appear whenever a change batch is exported (multiple within incremental files appear whenever a change batch is exported (multiple within
the same hour are normal, e.g. `15-50-39` then `15-53-04`). the same hour are normal, e.g. `15-50-39` then `15-53-04`).
## How `import_tickets.py` Consumes It ## How the loader Consumes It
`python import_tickets.py --from-bucket --apply` (see `run_ingest.sh`): `python -m inc.import_inc --from-bucket --apply` (and `python -m crq.import_crq
--from-bucket --apply`; see `run_ingest.sh`) — both over the shared `pipeline.py` engine:
1. Lists `automations/inc/changes/<ts>.csv`, sorts ascending by timestamp. 1. Lists `automations/<inc|crq>/changes/<ts>.csv`, sorts ascending by timestamp.
2. Skips files at/older than the **watermark** 2. Skips files at/older than the **watermark**
(`tickets.import_meta.metadata->>'source_max_key'` — the newest file already (`tickets.import_meta.metadata->>'source_max_key'` — the newest file already
applied); on a fresh stream it processes everything present. applied); on a fresh stream it processes everything present.

View file

@ -1,12 +1,17 @@
""" """
import_tickets.py Fireside Communications · INC ticket ingestion (raw-first) pipeline.py Fireside Communications · generic ticket ingestion engine (raw-first)
Loads the client's field-ops INC ticket snapshots into the `tickets` schema — the The dataset-agnostic core shared by the per-type entrypoints:
source of the FleetOps "Tickets" map. inc/import_inc.py -> tickets.inc (incidents / customer faults)
tickets.inc incidents / customer faults crq/import_crq.py -> tickets.crq (new-installation requests)
STRICTLY INC: CRQ (new-installation) exports are out of scope and not processed Both datasets share an IDENTICAL flat-CSV schema and the same CDC change stream,
here. `tickets.crq` stays in the schema but is not fed by this pipeline. so the only differences are the table, the S3 prefixes, the import_meta dataset
key, and an optional post-apply hook (INC captures closure/backlog history; CRQ
does not yet). Those are carried by the `Dataset` config; everything else here is
generic. Geocoding is inherently CROSS-DATASET (one gazetteer, one geocoder
budget): geocode_clusters / geocode_locations / resolve operate on BOTH tables and
are driven from a single entrypoint (the INC one) never duplicated per dataset.
RAW-FIRST: each row stores only `ticket_id` + `raw` (the source record as jsonb). RAW-FIRST: each row stores only `ticket_id` + `raw` (the source record as jsonb).
Everything downstream reads from `raw` (resilient to source schema drift). The DB Everything downstream reads from `raw` (resilient to source schema drift). The DB
@ -14,50 +19,33 @@ derives `geom` (see migrations): feed coords (raw lat/lng) -> location geocode
(tickets.geo_locations) -> cluster centroid (tickets.geo_clusters) -> none. (tickets.geo_locations) -> cluster centroid (tickets.geo_clusters) -> none.
Source data: the n8n S3 export writes CSV files to the `isptickets` bucket under Source data: the n8n S3 export writes CSV files to the `isptickets` bucket under
automations/inc/changes/<EAT-timestamp>.csv (e.g. 2026-06-24T09-55-44.csv) automations/<dataset>/changes/<EAT-timestamp>.csv (e.g. 2026-06-24T09-55-44.csv)
This is an INCREMENTAL (CDC) stream: the first file is a full current-state This is an INCREMENTAL (CDC) stream: the first file is a full current-state
baseline, and every later file holds only the rows that CHANGED since the prior baseline, and every later file holds only the rows that CHANGED since the prior
export (new + updated tickets, keyed by ticket_id; deletions are never emitted). export (with periodic full-state re-emissions). Deletions are never emitted. Every
Every file shares the identical flat-CSV schema (header + rows). We ingest EVERY file shares the identical flat-CSV schema. We ingest EVERY not-yet-processed file
not-yet-processed file in ASCENDING timestamp order (baseline first, then each in ASCENDING timestamp order (baseline first, then each delta) taking only the
delta) taking only the newest would silently drop the intermediate deltas: newest would silently drop the intermediate deltas:
- drop is_alarm=true rows + the "EXPORT STOPPED…" truncation-sentinel row; - drop is_alarm=true rows + the "EXPORT STOPPED…" truncation-sentinel row;
- drop derivable / provenance / zero-info columns (see DROP_FIELDS); - drop derivable / provenance / zero-info columns (see DROP_FIELDS);
- normalize region -> lowercase, raw_status -> UPPERCASE; - normalize region -> lowercase, raw_status -> UPPERCASE;
- upsert on ticket_id (PRIMARY KEY no duplication; never delete, so closure - upsert on ticket_id (PRIMARY KEY no duplication; never delete, so closure
history accumulates), and advance the watermark in tickets.import_meta history accumulates), and advance the watermark in tickets.import_meta
(metadata->>'source_max_key' = newest file applied) so reruns skip what's done; (metadata->>'source_max_key' = newest file applied) so reruns skip what's done;
- on success, MOVE each file to automations/inc/processed/ (copy + delete). - on success, MOVE each file to automations/<dataset>/processed/ (copy + delete).
Geocoding (two layers, both via a KEYED provider public Nominatim rate-limits):
--geocode-clusters one coordinate per cluster (coarse fallback; ~50 lookups)
--geocode-locations precise per-location for ACTIONABLE INC tickets: parses the
real place out of location_name (region+cluster+location_name,
network codes stripped), geocodes it, caches in
tickets.geo_locations, then re-resolves geoms.
Provider/key from env: GEOCODER_PROVIDER (locationiq|opencage), GEOCODER_API_KEY.
Usage (needs DATABASE_URL + RUSTFS_* + GEOCODER_* env; see .env.example):
python import_tickets.py --from-bucket --apply
python import_tickets.py --from-bucket --reseed --apply # one-time bucket cutover
python import_tickets.py --inc-csv 2026-06-15T17-00-00.csv --apply
python import_tickets.py --geocode-clusters --apply
python import_tickets.py --geocode-locations --apply
Pre-requisite: migration applied (run_migrations.py) tickets.inc/crq +
geo_clusters + geo_locations + reporting.fn_tickets_for_map.
""" """
from __future__ import annotations from __future__ import annotations
import argparse
import csv
import io import io
import csv
import math import math
import os import os
import re import re
import time import time
from collections.abc import Callable
from dataclasses import dataclass
from datetime import datetime, timezone, timedelta from datetime import datetime, timezone, timedelta
import boto3 import boto3
@ -67,14 +55,10 @@ from botocore.config import Config as BotoConfig
from shared import clean, get_conn, get_logger from shared import clean, get_conn, get_logger
log = get_logger("import_tickets") log = get_logger("pipeline")
# ── INC ingestion config ────────────────────────────────────────────────────── # ── shared ingestion config ─────────────────────────────────────────────────────
_TABLE = "tickets.inc"
_DATASET = "inc"
_BUCKET = os.getenv("TICKETS_BUCKET", "isptickets") _BUCKET = os.getenv("TICKETS_BUCKET", "isptickets")
_INC_PREFIX = "automations/inc/changes/" # the incremental (CDC) change stream
_PROCESSED_PREFIX = "automations/inc/processed/"
_EAT = timezone(timedelta(hours=3)) # Africa/Nairobi — filenames + data are EAT _EAT = timezone(timedelta(hours=3)) # Africa/Nairobi — filenames + data are EAT
# Garbage row the source leaks (commonly the first data line): its ticket_id is the # Garbage row the source leaks (commonly the first data line): its ticket_id is the
@ -91,11 +75,6 @@ DROP_FIELDS = frozenset({
"department", "source_type", "department", "source_type",
}) })
# Only files matching automations/inc/changes/<EAT-timestamp>.csv — the incremental
# stream (NOT processed/, NOT the leftover latest.csv/, latest.json/, full/ prefixes).
_CHANGE_KEY_RE = re.compile(
r"^automations/inc/changes/(\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2})\.csv$")
# Geocoder (keyed) — public Nominatim rate-limits bulk, so we use LocationIQ/OpenCage. # Geocoder (keyed) — public Nominatim rate-limits bulk, so we use LocationIQ/OpenCage.
_PROVIDER = os.getenv("GEOCODER_PROVIDER", "locationiq").lower() _PROVIDER = os.getenv("GEOCODER_PROVIDER", "locationiq").lower()
_API_KEY = os.getenv("GEOCODER_API_KEY", "") _API_KEY = os.getenv("GEOCODER_API_KEY", "")
@ -103,12 +82,34 @@ _GEOCODE_INTERVAL_S = float(os.getenv("GEOCODER_MIN_INTERVAL_S", "1.1"))
_last_geocode_at = 0.0 _last_geocode_at = 0.0
# ── dataset config (per ticket type) ────────────────────────────────────────────
@dataclass(frozen=True)
class Dataset:
"""All that distinguishes one ticket type from another in the generic engine."""
name: str # 'inc' | 'crq' (import_meta.dataset)
table: str # 'tickets.inc' | 'tickets.crq'
change_prefix: str # 'automations/<name>/changes/'
processed_prefix: str # 'automations/<name>/processed/'
key_regex: re.Pattern # matches a <prefix><EAT-ts>.csv key
post_apply: Callable[[], None] | None = None # e.g. capture_history (INC only)
def make_dataset(name: str, post_apply: Callable[[], None] | None = None) -> Dataset:
"""Build the standard Dataset for a ticket type (inc/crq) — only the name varies."""
return Dataset(
name=name,
table=f"tickets.{name}",
change_prefix=f"automations/{name}/changes/",
processed_prefix=f"automations/{name}/processed/",
# only automations/<name>/changes/<EAT-timestamp>.csv — the incremental stream
# (NOT processed/, NOT the leftover latest.csv/, latest.json/, full/ prefixes).
key_regex=re.compile(
rf"^automations/{name}/changes/(\d{{4}}-\d{{2}}-\d{{2}}T\d{{2}}-\d{{2}}-\d{{2}})\.csv$"),
post_apply=post_apply,
)
# ── data loading (CSV · incremental CDC change stream · per-file watermark) ───── # ── data loading (CSV · incremental CDC change stream · per-file watermark) ─────
# The n8n export writes an INCREMENTAL change stream to the `isptickets` bucket under
# automations/inc/changes/<EAT-timestamp>.csv: a first full-state baseline, then files
# holding only the rows that CHANGED (with periodic full-state re-emissions). We drain
# EVERY not-yet-processed file oldest→newest, upsert on ticket_id, advance the watermark
# (tickets.import_meta.metadata->>'source_max_key') per file, and archive it to processed/.
# S3 access is via boto3 (no aws-CLI dependency → runs cleanly in a slim container). # S3 access is via boto3 (no aws-CLI dependency → runs cleanly in a slim container).
def _s3_client(): def _s3_client():
"""boto3 S3 client for the S3 endpoint (force path-style addressing).""" """boto3 S3 client for the S3 endpoint (force path-style addressing)."""
@ -123,9 +124,9 @@ def _s3_client():
) )
def _ts_from_key(key: str) -> datetime | None: def _ts_from_key(ds: Dataset, key: str) -> datetime | None:
"""EAT timestamp embedded in an automations/inc/changes/<ts>.csv key (or None).""" """EAT timestamp embedded in an automations/<ds>/changes/<ts>.csv key (or None)."""
m = _CHANGE_KEY_RE.match(key) m = ds.key_regex.match(key)
if not m: if not m:
return None return None
try: # regex shape can match an impossible date (e.g. 9999-99-99T…) — don't crash the sort try: # regex shape can match an impossible date (e.g. 9999-99-99T…) — don't crash the sort
@ -134,12 +135,12 @@ def _ts_from_key(key: str) -> datetime | None:
return None return None
def _list_inc_csvs(s3) -> list[tuple[str, str]]: def _list_csvs(s3, ds: Dataset) -> list[tuple[str, str]]:
"""[(key, etag)] for every automations/inc/changes/<ts>.csv (excludes processed/ + dirs).""" """[(key, etag)] for every changes/<ts>.csv of this dataset (excludes processed/ + dirs)."""
out: list[tuple[str, str]] = [] out: list[tuple[str, str]] = []
for page in s3.get_paginator("list_objects_v2").paginate(Bucket=_BUCKET, Prefix=_INC_PREFIX): for page in s3.get_paginator("list_objects_v2").paginate(Bucket=_BUCKET, Prefix=ds.change_prefix):
for it in page.get("Contents", []): for it in page.get("Contents", []):
if _CHANGE_KEY_RE.match(it["Key"]): if ds.key_regex.match(it["Key"]):
out.append((it["Key"], (it.get("ETag") or "").strip('"'))) out.append((it["Key"], (it.get("ETag") or "").strip('"')))
return out return out
@ -149,23 +150,22 @@ def _get_text(s3, key: str) -> str:
return s3.get_object(Bucket=_BUCKET, Key=key)["Body"].read().decode("utf-8") return s3.get_object(Bucket=_BUCKET, Key=key)["Body"].read().decode("utf-8")
def _last_processed_ts() -> datetime | None: def _last_processed_ts(ds: Dataset) -> datetime | None:
"""Watermark: EAT timestamp of the newest change file already ingested. """Watermark: EAT timestamp of the newest change file already ingested for this dataset.
Read from tickets.import_meta (metadata->>'source_max_key', advanced per file as Read from tickets.import_meta (metadata->>'source_max_key', advanced per file as
we drain changes/ oldestnewest). None when nothing has been ingested via the we drain changes/ oldestnewest). None when nothing has been ingested via the
changes stream yet (e.g. the first run after the source switched to incremental, changes stream yet (e.g. a brand-new dataset, or the first run after the source
where the stored key is an old full-snapshot path) then every file currently in switched buckets) then every file currently in changes/ is processed.
changes/ is processed.
""" """
with get_conn() as conn: with get_conn() as conn:
with conn.cursor() as cur: with conn.cursor() as cur:
cur.execute( cur.execute(
"SELECT metadata->>'source_max_key' FROM tickets.import_meta WHERE dataset = %s", "SELECT metadata->>'source_max_key' FROM tickets.import_meta WHERE dataset = %s",
(_DATASET,), (ds.name,),
) )
row = cur.fetchone() row = cur.fetchone()
return _ts_from_key(row[0]) if row and row[0] else None return _ts_from_key(ds, row[0]) if row and row[0] else None
def _parse_csv(text: str) -> list[dict]: def _parse_csv(text: str) -> list[dict]:
@ -177,10 +177,10 @@ def _load_csv_local(path: str) -> list[dict]:
return list(csv.DictReader(f)) return list(csv.DictReader(f))
def _move_processed(s3, keys: list[str]) -> None: def _move_processed(s3, ds: Dataset, keys: list[str]) -> None:
"""Archive listed INC csv objects to automations/inc/processed/ (copy + delete).""" """Archive listed csv objects to automations/<ds>/processed/ (copy + delete)."""
for key in keys: for key in keys:
dst = _PROCESSED_PREFIX + key.rsplit("/", 1)[-1] dst = ds.processed_prefix + key.rsplit("/", 1)[-1]
s3.copy_object(Bucket=_BUCKET, CopySource={"Bucket": _BUCKET, "Key": key}, Key=dst) s3.copy_object(Bucket=_BUCKET, CopySource={"Bucket": _BUCKET, "Key": key}, Key=dst)
s3.delete_object(Bucket=_BUCKET, Key=key) s3.delete_object(Bucket=_BUCKET, Key=key)
log.info("archived %s -> %s", key, dst) log.info("archived %s -> %s", key, dst)
@ -206,8 +206,8 @@ def _prepare(row: dict) -> dict:
# ── upsert (raw-first) ──────────────────────────────────────────────────────── # ── upsert (raw-first) ────────────────────────────────────────────────────────
def _record_meta(cur, meta: dict, records_ingested: int) -> None: def _record_meta(cur, ds: Dataset, meta: dict, records_ingested: int) -> None:
"""Upsert the INC snapshot metadata (powers map freshness + holds source_etag). """Upsert the snapshot metadata (powers map freshness + holds source_max_key).
Runs on the caller's cursor so the row upsert and the meta write commit Runs on the caller's cursor so the row upsert and the meta write commit
together a half-written state (rows in, meta stale) breaks skip-if-unchanged. together a half-written state (rows in, meta stale) breaks skip-if-unchanged.
@ -225,54 +225,58 @@ def _record_meta(cur, meta: dict, records_ingested: int) -> None:
records_ingested = EXCLUDED.records_ingested, records_ingested = EXCLUDED.records_ingested,
n8n_execution_id = EXCLUDED.n8n_execution_id, metadata = EXCLUDED.metadata, n8n_execution_id = EXCLUDED.n8n_execution_id, metadata = EXCLUDED.metadata,
ingested_at = now()""", ingested_at = now()""",
(_DATASET, clean(meta.get("export_type")), clean(meta.get("exported_at")), (ds.name, clean(meta.get("export_type")), clean(meta.get("exported_at")),
clean(meta.get("snapshot_date")), clean(meta.get("source_schema")), clean(meta.get("snapshot_date")), clean(meta.get("source_schema")),
clean(meta.get("source_table")), meta.get("row_count"), records_ingested, clean(meta.get("source_table")), meta.get("row_count"), records_ingested,
clean(meta.get("n8n_execution_id")), psycopg2.extras.Json(meta)), clean(meta.get("n8n_execution_id")), psycopg2.extras.Json(meta)),
) )
def upsert(rows: list[dict], apply: bool, meta: dict | None = None) -> int: def upsert(ds: Dataset, rows: list[dict], apply: bool, meta: dict | None = None) -> int:
meta = meta or {} meta = meta or {}
kept = [r for r in rows if _keep_row(r)] kept = [r for r in rows if _keep_row(r)]
payload = [(clean(r["ticket_id"]), psycopg2.extras.Json(_prepare(r))) for r in kept] payload = [(clean(r["ticket_id"]), psycopg2.extras.Json(_prepare(r))) for r in kept]
log.info("%s: %d rows read, %d kept, %d dropped (alarm/sentinel/no-id)", log.info("%s: %d rows read, %d kept, %d dropped (alarm/sentinel/no-id)",
_TABLE, len(rows), len(payload), len(rows) - len(payload)) ds.table, len(rows), len(payload), len(rows) - len(payload))
if not apply: if not apply:
log.info("DRY-RUN — nothing written to %s. Use --apply.", _TABLE) log.info("DRY-RUN — nothing written to %s. Use --apply.", ds.table)
return len(payload) return len(payload)
with get_conn() as conn: with get_conn() as conn:
with conn.cursor() as cur: with conn.cursor() as cur:
psycopg2.extras.execute_values( psycopg2.extras.execute_values(
cur, cur,
f"INSERT INTO {_TABLE} (ticket_id, raw) VALUES %s " f"INSERT INTO {ds.table} (ticket_id, raw) VALUES %s "
"ON CONFLICT (ticket_id) DO UPDATE SET raw = EXCLUDED.raw, ingested_at = now()", "ON CONFLICT (ticket_id) DO UPDATE SET raw = EXCLUDED.raw, ingested_at = now()",
payload, page_size=500, payload, page_size=500,
) )
# same transaction as the upsert: rows + snapshot meta commit atomically # same transaction as the upsert: rows + snapshot meta commit atomically
_record_meta(cur, meta, len(payload)) _record_meta(cur, ds, meta, len(payload))
log.info("upserted %d rows into %s", len(payload), _TABLE) log.info("upserted %d rows into %s", len(payload), ds.table)
return len(payload) return len(payload)
def _capture_history() -> None: def capture_history() -> None:
"""Append new closures + upsert today's backlog snapshot (tickets.capture_history).""" """Append new closures + upsert today's backlog snapshot (tickets.capture_history).
INC-only today (CRQ install-lifecycle history is a future migration); wired as
the INC Dataset's post_apply hook.
"""
with get_conn() as conn: with get_conn() as conn:
with conn.cursor() as cur: with conn.cursor() as cur:
cur.execute("SELECT tickets.capture_history()") cur.execute("SELECT tickets.capture_history()")
log.info("history: %s", cur.fetchone()[0]) log.info("history: %s", cur.fetchone()[0])
def ingest(args) -> None: def ingest(ds: Dataset, args) -> None:
# Local-file path (dev): ingest a single CSV, no bucket / no archive. # Local-file path (dev): ingest a single CSV, no bucket / no archive / no history.
if args.inc_csv: if args.local_csv:
rows = _load_csv_local(args.inc_csv) rows = _load_csv_local(args.local_csv)
name = os.path.basename(args.inc_csv) name = os.path.basename(args.local_csv)
ts = _ts_from_key(_INC_PREFIX + name) ts = _ts_from_key(ds, ds.change_prefix + name)
meta = {"export_type": "full", "source_s3_key": name, "row_count": len(rows)} meta = {"export_type": "full", "source_s3_key": name, "row_count": len(rows)}
if ts: if ts:
meta["exported_at"] = ts.isoformat() meta["exported_at"] = ts.isoformat()
upsert(rows, args.apply, meta=meta) upsert(ds, rows, args.apply, meta=meta)
return return
# --from-bucket: ingest EVERY not-yet-processed change file, oldest→newest # --from-bucket: ingest EVERY not-yet-processed change file, oldest→newest
@ -280,11 +284,11 @@ def ingest(args) -> None:
# the file is archived PER file, so a mid-run failure leaves a consistent state # the file is archived PER file, so a mid-run failure leaves a consistent state
# (folder state matches the watermark) and the next run resumes cleanly. # (folder state matches the watermark) and the next run resumes cleanly.
s3 = _s3_client() s3 = _s3_client()
listing = _list_inc_csvs(s3) listing = _list_csvs(s3, ds)
if not listing: if not listing:
log.info("no INC change files under %s — nothing to do", _INC_PREFIX) log.info("no %s change files under %s — nothing to do", ds.name, ds.change_prefix)
return return
listing.sort(key=lambda ke: _ts_from_key(ke[0]) or datetime.min.replace(tzinfo=_EAT)) listing.sort(key=lambda ke: _ts_from_key(ds, ke[0]) or datetime.min.replace(tzinfo=_EAT))
# watermark: skip anything at/older than the newest file already applied. Archiving # watermark: skip anything at/older than the newest file already applied. Archiving
# normally empties changes/, but this guards a failed archive from re-applying. # normally empties changes/, but this guards a failed archive from re-applying.
@ -292,24 +296,25 @@ def ingest(args) -> None:
# for a one-time bucket cutover, where the stored key points at the old bucket's stream # for a one-time bucket cutover, where the stored key points at the old bucket's stream
# and its timestamp may be newer than the new bucket's first file. Crash-safe: each file # and its timestamp may be newer than the new bucket's first file. Crash-safe: each file
# still advances source_max_key + archives per file, so a plain rerun resumes cleanly. # still advances source_max_key + archives per file, so a plain rerun resumes cleanly.
last_ts = None if args.reseed else _last_processed_ts() last_ts = None if args.reseed else _last_processed_ts(ds)
_floor = datetime.min.replace(tzinfo=_EAT) _floor = datetime.min.replace(tzinfo=_EAT)
pending = [(k, e) for k, e in listing pending = [(k, e) for k, e in listing
if last_ts is None or (_ts_from_key(k) or _floor) > last_ts] if last_ts is None or (_ts_from_key(ds, k) or _floor) > last_ts]
if not pending: if not pending:
log.info("all %d change file(s) already processed (watermark %s) — nothing new", log.info("all %d %s change file(s) already processed (watermark %s) — nothing new",
len(listing), last_ts and last_ts.isoformat()) len(listing), ds.name, last_ts and last_ts.isoformat())
if args.apply: if args.apply:
_move_processed(s3, [k for k, _ in listing]) # archive any stragglers _move_processed(s3, ds, [k for k, _ in listing]) # archive any stragglers
_capture_history() if ds.post_apply:
ds.post_apply()
return return
log.info("%d of %d change file(s) to ingest (watermark %s); newest=%s", log.info("%d of %d %s change file(s) to ingest (watermark %s); newest=%s",
len(pending), len(listing), last_ts and last_ts.isoformat(), pending[-1][0]) len(pending), len(listing), ds.name, last_ts and last_ts.isoformat(), pending[-1][0])
total = 0 total = 0
for i, (key, etag) in enumerate(pending): for i, (key, etag) in enumerate(pending):
rows = _parse_csv(_get_text(s3, key)) rows = _parse_csv(_get_text(s3, key))
ts = _ts_from_key(key) ts = _ts_from_key(ds, key)
# the first file applied onto an empty watermark is the full baseline; every # the first file applied onto an empty watermark is the full baseline; every
# file after is a delta. export_type is informational (recorded in import_meta). # file after is a delta. export_type is informational (recorded in import_meta).
meta = {"export_type": "baseline" if (last_ts is None and i == 0) else "delta", meta = {"export_type": "baseline" if (last_ts is None and i == 0) else "delta",
@ -319,14 +324,14 @@ def ingest(args) -> None:
meta["exported_at"] = ts.isoformat() meta["exported_at"] = ts.isoformat()
# rows + watermark (source_max_key) commit in one txn, advancing per file; only # rows + watermark (source_max_key) commit in one txn, advancing per file; only
# then archive, so the changes/ folder state always matches the watermark. # then archive, so the changes/ folder state always matches the watermark.
total += upsert(rows, args.apply, meta=meta) total += upsert(ds, rows, args.apply, meta=meta)
if args.apply: if args.apply:
_move_processed(s3, [key]) _move_processed(s3, ds, [key])
else: else:
log.info("DRY-RUN — would archive %s to %s", key, _PROCESSED_PREFIX) log.info("DRY-RUN — would archive %s to %s", key, ds.processed_prefix)
log.info("ingested %d change file(s); %d rows kept in total", len(pending), total) log.info("ingested %d %s change file(s); %d rows kept in total", len(pending), ds.name, total)
if args.apply: if args.apply and ds.post_apply:
_capture_history() ds.post_apply()
# ── place extraction (strip network codes, keep the real place) ─────────────── # ── place extraction (strip network codes, keep the real place) ───────────────
@ -475,7 +480,7 @@ def geocode(query: str, viewbox: tuple | None = None) -> tuple[float, float, flo
return None return None
# ── cluster gazetteer (coarse fallback) ─────────────────────────────────────── # ── cluster gazetteer (coarse fallback; CROSS-DATASET: inc + crq) ─────────────
def geocode_clusters(apply: bool) -> None: def geocode_clusters(apply: bool) -> None:
with get_conn() as conn: with get_conn() as conn:
with conn.cursor() as cur: with conn.cursor() as cur:
@ -522,7 +527,7 @@ def geocode_clusters(apply: bool) -> None:
log.info("gazetteer: %d clusters written (unverified — review tickets.geo_clusters)", written) log.info("gazetteer: %d clusters written (unverified — review tickets.geo_clusters)", written)
# ── per-location geocoding (precise; actionable INC) ────────────────────────── # ── per-location geocoding (precise; actionable inc + crq) ────────────────────
# A location geocode is only trusted if it lands within this radius of the # A location geocode is only trusted if it lands within this radius of the
# cluster centroid; otherwise the geocoder matched the landmark in the wrong # cluster centroid; otherwise the geocoder matched the landmark in the wrong
# place and we fall back to the cluster centroid. # place and we fall back to the cluster centroid.
@ -537,17 +542,25 @@ def geocode_locations(apply: bool) -> None:
""" """
SELECT t.key, t.location_name, t.cluster, t.region, gc.lat AS clat, gc.lng AS clng SELECT t.key, t.location_name, t.cluster, t.region, gc.lat AS clat, gc.lng AS clng
FROM ( FROM (
SELECT tickets.norm_cluster(raw->>'location_name') AS key, SELECT tickets.norm_cluster(src.raw->>'location_name') AS key,
(array_agg(raw->>'location_name'))[1] AS location_name, (array_agg(src.raw->>'location_name'))[1] AS location_name,
(array_agg(raw->>'cluster'))[1] AS cluster, (array_agg(src.raw->>'cluster'))[1] AS cluster,
(array_agg(raw->>'region'))[1] AS region, (array_agg(src.raw->>'region'))[1] AS region,
tickets.norm_cluster((array_agg(raw->>'cluster'))[1]) AS ckey tickets.norm_cluster((array_agg(src.raw->>'cluster'))[1]) AS ckey
FROM tickets.inc FROM (
-- CROSS-DATASET: actionable INC + CRQ share one location gazetteer
SELECT raw FROM tickets.inc
WHERE (raw->>'is_actionable')::boolean WHERE (raw->>'is_actionable')::boolean
AND raw->>'location_name' IS NOT NULL AND raw->>'location_name' IS NOT NULL
AND tickets.norm_cluster(raw->>'location_name') IS NOT NULL AND tickets.norm_cluster(raw->>'location_name') IS NOT NULL
AND NOT EXISTS (SELECT 1 FROM tickets.geo_locations gl UNION ALL
WHERE gl.query_key = tickets.norm_cluster(raw->>'location_name') SELECT raw FROM tickets.crq
WHERE (raw->>'is_actionable')::boolean
AND raw->>'location_name' IS NOT NULL
AND tickets.norm_cluster(raw->>'location_name') IS NOT NULL
) src
WHERE NOT EXISTS (SELECT 1 FROM tickets.geo_locations gl
WHERE gl.query_key = tickets.norm_cluster(src.raw->>'location_name')
AND gl.geom IS NOT NULL) AND gl.geom IS NOT NULL)
GROUP BY 1 GROUP BY 1
) t ) t
@ -555,7 +568,7 @@ def geocode_locations(apply: bool) -> None:
""" """
) )
todo = cur.fetchall() todo = cur.fetchall()
log.info("%d actionable-INC locations to geocode (provider=%s)", len(todo), _PROVIDER) log.info("%d actionable inc+crq locations to geocode (provider=%s)", len(todo), _PROVIDER)
if not apply: if not apply:
for key, loc, cluster, region, clat, clng in todo[:50]: for key, loc, cluster, region, clat, clng in todo[:50]:
log.info(" %s -> %s", key, " | ".join(compose_queries(loc, cluster, region))) log.info(" %s -> %s", key, " | ".join(compose_queries(loc, cluster, region)))
@ -609,41 +622,3 @@ def _resolve() -> int:
with conn.cursor() as cur: with conn.cursor() as cur:
cur.execute("SELECT tickets.resolve_ticket_geoms()") cur.execute("SELECT tickets.resolve_ticket_geoms()")
return cur.fetchone()[0] return cur.fetchone()[0]
# ── entrypoint ────────────────────────────────────────────────────────────────
def main() -> None:
ap = argparse.ArgumentParser(description="Ingest INC tickets from CSV (raw-first) + geocode")
ap.add_argument("--apply", action="store_true", help="Write to DB (default: dry-run)")
ap.add_argument("--from-bucket", action="store_true",
help="Drain the incremental INC change stream (automations/inc/changes/) "
"from the isptickets S3 bucket: every not-yet-processed file "
"oldest→newest, upsert on ticket_id, advance the watermark, archive")
ap.add_argument("--reseed", action="store_true",
help="Ignore the stored watermark and drain every file in changes/ once "
"(one-time bucket cutover / reseed). Use with --from-bucket --apply")
ap.add_argument("--inc-csv", default=None, help="Local INC tickets CSV file (dev)")
ap.add_argument("--geocode-clusters", action="store_true",
help="Geocode distinct clusters into the gazetteer, then re-resolve geoms")
ap.add_argument("--geocode-locations", action="store_true",
help="Geocode actionable-INC location_names precisely (keyed provider), then re-resolve")
ap.add_argument("--capture-history", action="store_true",
help="Run tickets.capture_history() standalone (closure_events + daily snapshot)")
args = ap.parse_args()
if args.geocode_clusters:
geocode_clusters(apply=args.apply)
return
if args.geocode_locations:
geocode_locations(apply=args.apply)
return
if args.capture_history:
_capture_history()
return
if not (args.from_bucket or args.inc_csv):
ap.error("provide --from-bucket, --inc-csv, --geocode-clusters, --geocode-locations, or --capture-history")
ingest(args)
if __name__ == "__main__":
main()

View file

@ -18,10 +18,12 @@ dev = [
"ruff>=0.4", "ruff>=0.4",
] ]
# Flat-module project (no package dir) — list the top-level modules explicitly so # Shared engine (pipeline) + helpers as top-level modules, plus the thin per-type
# `pip install .` works (the Docker image installs the project to pull its deps). # entrypoint packages (inc/, crq/). Listed explicitly so `pip install .` works (the
# Docker image installs the project to pull its deps; runtime runs from /app via -m).
[tool.setuptools] [tool.setuptools]
py-modules = ["import_tickets", "shared", "run_migrations"] py-modules = ["pipeline", "shared", "run_migrations"]
packages = ["inc", "crq"]
[tool.uv] [tool.uv]
managed = true managed = true

View file

@ -1,13 +1,17 @@
#!/usr/bin/env bash #!/usr/bin/env bash
# run_ingest.sh — fleettickets · hourly INC ingest wrapper for cron. # run_ingest.sh — fleettickets · INC + CRQ ingest wrapper for cron (plain host/VM).
# #
# Loads env from the local .env (DATABASE_URL + RUSTFS_* + GEOCODER_*) and runs the # Loads env from the local .env (DATABASE_URL + RUSTFS_* + GEOCODER_*) and drains
# newest-INC-CSV ingest with --apply (skip-if-unchanged + archive are built in). # both ticket change streams with --apply (watermark skip-if-unchanged + per-file
# archive are built in, so a run with no new files is a cheap no-op).
# #
# Install on the instance (ingest every 20 min, 06:0020:40 EAT): # Install on the instance (every 20 min, 06:0020:40 EAT):
# */20 6-20 * * * /opt/fleettickets/run_ingest.sh >> /var/log/fleettickets-inc.log 2>&1 # */20 6-20 * * * /opt/fleettickets/run_ingest.sh >> /var/log/fleettickets.log 2>&1
# Ensure the crontab runs in the Africa/Nairobi timezone (CRON_TZ=Africa/Nairobi or # Ensure the crontab runs in the Africa/Nairobi timezone (CRON_TZ=Africa/Nairobi or
# the host/container TZ), since the export filenames and the schedule are EAT. # the host/container TZ), since the export filenames and the schedule are EAT.
#
# On Coolify the two ingests run as separate Scheduled Tasks instead (see Dockerfile
# + docs/deployment-and-operations.md); this wrapper is the plain-host fallback.
set -euo pipefail set -euo pipefail
cd "$(dirname "$0")" cd "$(dirname "$0")"
@ -24,4 +28,7 @@ fi
PY="python" PY="python"
[ -x ".venv/bin/python" ] && PY=".venv/bin/python" [ -x ".venv/bin/python" ] && PY=".venv/bin/python"
exec "$PY" import_tickets.py --from-bucket --apply # Run from the repo root (cwd above) so `-m inc.import_inc` / `-m crq.import_crq`
# resolve the packages alongside pipeline.py + shared.py.
"$PY" -m inc.import_inc --from-bucket --apply
"$PY" -m crq.import_crq --from-bucket --apply