diff --git a/.env.example b/.env.example index b580ddd..5c00e7f 100644 --- a/.env.example +++ b/.env.example @@ -3,7 +3,7 @@ # Shared database (the `tickets` schema lives in tracksolid_db; internal Docker host) DATABASE_URL=postgresql://tracksolid_owner:@timescale_db:5432/tracksolid_db -# S3 — source INC ticket CDC stream (isptickets bucket, automations/inc/changes/.csv) +# S3 — source ticket CDC streams (isptickets bucket, automations/{inc,crq}/changes/.csv) RUSTFS_ENDPOINT=https://s3.rahamafresh.com RUSTFS_ACCESS_KEY=isptickets RUSTFS_SECRET_KEY= diff --git a/Dockerfile b/Dockerfile index 974e637..4660d77 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,7 +1,9 @@ -# fleettickets — INC ingestion image (Coolify-deployable). +# fleettickets — INC + CRQ ticket ingestion image (Coolify-deployable). # A small batch/cron worker: it has no web server. Coolify keeps the container -# running (CMD below) and fires the ingest via a Scheduled Task: -# python import_tickets.py --from-bucket --apply (cron: */20 6-20 * * *) +# running (CMD below) and fires the ingests via two Scheduled Tasks: +# python -m inc.import_inc --from-bucket --apply (cron: */20 6-20 * * *) +# python -m crq.import_crq --from-bucket --apply (cron: */20 6-20 * * *) +# (run from /app so the inc/ and crq/ packages + pipeline.py/shared.py import.) # Env (set in Coolify): DATABASE_URL, RUSTFS_*, GEOCODER_*. S3 is via boto3 — no # aws CLI needed. psycopg2-binary ships its own libpq, so no build toolchain. FROM python:3.12-slim diff --git a/README.md b/README.md index 5f89ec5..f539506 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,22 @@ # fleettickets -Field-ops **INC ticket** ingestion, geocoding, and read-schema that powers the +Field-ops **ticket** ingestion, geocoding, and read-schema that powers the **Tickets** map in FleetOps. Extracted from the `tracksolid` repo into its own module (it previously lived there as migrations 21–23 + `tools/import_tickets.py`). -- **INC** — incident / customer-fault tickets *(this pipeline is **strictly INC**)* -- **CRQ** — new-installation requests *(schema kept, but **out of scope** — not ingested here)* +Two ticket types, identical 32-column source schema and CDC change stream, served +through a **shared engine** (`pipeline.py`) with a thin per-type entrypoint each: + +- **INC** — incident / customer-fault tickets → `tickets.inc` (`inc/import_inc.py`). + Full feature set: typed columns, geocoding, SLA view, dashboard fn, history capture. +- **CRQ** — new-installation requests → `tickets.crq` (`crq/import_crq.py`). **Data + layer + map** (typed columns, geocoding, appears on the Tickets map via + `fn_tickets_for_map`). SLA view / dashboard fn / history capture are deferred — + installation-lifecycle semantics differ from incidents (see roadmap). CRQ gets its + **own FleetOps tab**, same look & feel as INC. + +Geocoding is **cross-dataset** (one gazetteer, one geocoder budget, covers inc + crq) +and is driven from the INC entrypoint. ## What this owns @@ -21,7 +32,10 @@ Field-ops **INC ticket** ingestion, geocoding, and read-schema that powers the | `migrations/08_inc_open_sla_view.sql` | `tickets.inc_open_sla` view — open (`is_actionable`) tickets with **derived SLA** (`hours_open`, `sla_state` vs 48h; clock = `created_at_service` ∥ `first_seen_at`), plus team/cluster/`geog` for dispatch | | `migrations/09_inc_dashboard_fn.sql` | `reporting.fn_inc_dashboard(cluster, status, window, from, to)` — one JSON payload (`window` / `open` GeoJSON / `closed` GeoJSON / `metrics` / `freshness`) powering the FleetOps live INC map. Open=live, closed=windowed (EAT calendar / custom); filters AND | | `migrations/10_inc_history_capture.sql` | History for time-series: `tickets.closure_events` (append-only observed closures) + `tickets.inc_daily_snapshot` (per-EAT-day open backlog + flow), populated by `tickets.capture_history()` each ingest. Unlocks **backlog-over-time** | -| `import_tickets.py` | Drains the **incremental INC change stream** from the `isptickets` bucket (`automations/inc/changes/.csv`), upserting on `ticket_id` oldest→newest; geocodes clusters + INC locations | +| `migrations/13_crq_columns.sql` | CRQ mirror of `03`: unpacks `tickets.crq.raw` into the same **typed STORED generated columns** + indexes (reuses `tickets.eat_ts()`). Brings CRQ to data-layer parity with INC | +| `pipeline.py` | **Shared engine** — the dataset-agnostic CDC loader (drains `automations//changes/.csv` from the `isptickets` bucket, upserts on `ticket_id` oldest→newest, watermark + per-file archive) and the **cross-dataset** geocoder (clusters + actionable inc/crq locations) | +| `inc/import_inc.py` | INC entrypoint (`python -m inc.import_inc`) — INC `Dataset` config + CLI; runs `tickets.capture_history()` after each `--apply`; hosts the shared geocode commands | +| `crq/import_crq.py` | CRQ entrypoint (`python -m crq.import_crq`) — CRQ `Dataset` config + CLI (ingest only; no history hook yet) | | `run_migrations.py` | Applies `migrations/*.sql` in order (ledger: `tickets.schema_migrations`) | | `shared.py` | Minimal DB/logging helpers (self-contained — no tracksolid dependency) | @@ -86,38 +100,44 @@ python run_migrations.py # apply the schema (idempotent) ## Run -```bash -# drain the incremental INC change stream (every new file oldest→newest, then archive) -python import_tickets.py --from-bucket --apply +Run from the repo root so the `inc`/`crq` packages + `pipeline.py`/`shared.py` import. -# geocode (needs GEOCODER_API_KEY) -python import_tickets.py --geocode-clusters --apply # coarse, once -python import_tickets.py --geocode-locations --apply # precise, actionable INC +```bash +# drain the incremental change streams (every new file oldest→newest, then archive) +python -m inc.import_inc --from-bucket --apply +python -m crq.import_crq --from-bucket --apply + +# geocode — CROSS-DATASET (covers inc + crq); driven from the INC entrypoint, needs GEOCODER_API_KEY +python -m inc.import_inc --geocode-clusters --apply # coarse, once +python -m inc.import_inc --geocode-locations --apply # precise, actionable inc+crq # from a local CSV instead of the bucket (dev) -python import_tickets.py --inc-csv 2026-06-15T17-00-00.csv --apply +python -m inc.import_inc --inc-csv 2026-06-15T17-00-00.csv --apply +python -m crq.import_crq --crq-csv 2026-06-24T12-55-44.csv --apply ``` -Dry-run is the default (omit `--apply`). `import_tickets.py --from-bucket` talks to S3 -via **boto3** using the `RUSTFS_*` env (path-style addressing; no aws-CLI dependency). +Dry-run is the default (omit `--apply`). `--from-bucket` talks to S3 via **boto3** using +the `RUSTFS_*` env (path-style addressing; no aws-CLI dependency). ## Deploy (Coolify) The repo ships a [`Dockerfile`](Dockerfile) — a small batch worker with no web server. -Coolify builds it and keeps the container alive (`CMD tail -f /dev/null`); the ingest -runs as a **Scheduled Task**, not a system crontab: +Coolify builds it and keeps the container alive (`CMD tail -f /dev/null`); each ingest +runs as its own **Scheduled Task**, not a system crontab: -- **Command:** `python import_tickets.py --from-bucket --apply` -- **Frequency:** `*/20 6-20 * * *` (every 20 min, **06:00–20:40 EAT**). This +- **`inc_tickets`:** `python -m inc.import_inc --from-bucket --apply` +- **`crq_tickets`:** `python -m crq.import_crq --from-bucket --apply` +- **Frequency:** both `*/20 6-20 * * *` (every 20 min, **06:00–20:40 EAT**). This Coolify instance runs scheduled tasks in **EAT (Africa/Nairobi)**, so no UTC conversion is needed. - **Env vars** (Coolify → Environment Variables): `DATABASE_URL` (internal DB host), - `RUSTFS_*` (now the `isptickets` bucket credentials), `GEOCODER_*`. + `RUSTFS_*` (the `isptickets` bucket credentials), `GEOCODER_*`. The same bucket holds + both `automations/inc/` and `automations/crq/`, so one credential set serves both tasks. The watermark makes a run with no new change files a cheap no-op. For a plain host/VM instead of Coolify, [`run_ingest.sh`](run_ingest.sh) loads `.env` -and runs the ingest; schedule it with a crontab line +and runs **both** ingests; schedule it with a crontab line (`CRON_TZ=Africa/Nairobi` / `*/20 6-20 * * *`). Full operational runbook — container, env management (encrypted; via the UI or @@ -134,8 +154,8 @@ newer than the new bucket's first file — which would otherwise be skipped. Poi which ignores the stored watermark and ingests **every** file in `changes/` oldest→newest: ```bash -python import_tickets.py --from-bucket --reseed # dry-run first -python import_tickets.py --from-bucket --reseed --apply # commit + archive +python -m inc.import_inc --from-bucket --reseed # dry-run first (or -m crq.import_crq) +python -m inc.import_inc --from-bucket --reseed --apply # commit + archive ``` Upserts are idempotent (`ticket_id` PK, rows never deleted) and the new stream's periodic @@ -219,6 +239,16 @@ Findings to keep in mind (see the PRD for detail): Live: INC ingestion deployed on Coolify (every 20 min `*/20 6-20 * * *` EAT), schema + generated columns + geocoding + the `inc_open_sla` view in `tracksolid_db`. -Next (Phase 2): time-series analytics (closure rate, MTTR/SLA trends), then FleetNow -vehicle **dispatch** off `geog`, and **team closure attribution**. **CRQ** is a -separate future project that will reuse this machinery against `automations/crq/`. + +**CRQ (this milestone):** data layer + map — `tickets.crq` fed from +`automations/crq/changes/` by `crq/import_crq.py`, typed columns (migration 13), +cross-dataset geocoding, and visibility on the Tickets map via `fn_tickets_for_map`. +One-time seed: drain the isptickets CRQ stream (`python -m crq.import_crq --from-bucket +--apply`) — empty watermark + the stream's periodic full-state snapshots converge to +current state — then run the shared geocode once. See +[`docs/deployment-and-operations.md`](docs/deployment-and-operations.md). + +Next (Phase 2): bring CRQ to full INC parity once installation-lifecycle semantics are +confirmed — a `crq_open_sla` view, `fn_crq_dashboard`, and CRQ history capture (the INC +analogues of migrations 08/09/10). Then time-series analytics (closure rate, MTTR/SLA +trends), FleetNow vehicle **dispatch** off `geog`, and **team closure attribution**. diff --git a/crq/__init__.py b/crq/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/crq/import_crq.py b/crq/import_crq.py new file mode 100644 index 0000000..6539bb9 --- /dev/null +++ b/crq/import_crq.py @@ -0,0 +1,61 @@ +""" +crq/import_crq.py — Fireside Communications · CRQ (new-installation) ingestion. +━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ +Thin entrypoint over the shared engine (`pipeline.py`) for the CRQ dataset: + tickets.crq — new-installation requests (FleetOps "Tickets" CRQ tab) + +CRQ mirrors INC at the data layer — IDENTICAL 32-column CSV schema and the same +incremental CDC change stream automations/crq/changes/.csv in the +`isptickets` bucket. This loader upserts on ticket_id, advances the per-dataset +watermark (tickets.import_meta dataset='crq'), and archives each consumed file to +automations/crq/processed/. CRQ flows onto the existing Tickets map via +reporting.fn_tickets_for_map (which already unions tickets.crq). + +Scope (current): data layer + map only. CRQ has NO post-apply history capture yet +(installation-lifecycle SLA/backlog semantics differ from incidents — a future +migration). Geocoding is CROSS-DATASET and run from the INC entrypoint +(python -m inc.import_inc --geocode-clusters / --geocode-locations) against the +shared gazetteer, which covers both inc and crq. + +Usage (needs DATABASE_URL + RUSTFS_* env; see .env.example): + python -m crq.import_crq --from-bucket --apply + python -m crq.import_crq --from-bucket --reseed --apply # one-time bucket cutover + python -m crq.import_crq --crq-csv 2026-06-24T12-55-44.csv --apply + +Pre-requisite: migrations applied (run_migrations.py) — tickets.crq + its typed +columns (13_crq_columns.sql) + geo_clusters/geo_locations + fn_tickets_for_map. +━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ +""" + +from __future__ import annotations + +import argparse + +import pipeline + +# CRQ has no post-apply hook yet (history capture is INC-only — see module docstring). +DATASET = pipeline.make_dataset("crq", post_apply=None) + + +def main() -> None: + ap = argparse.ArgumentParser( + description="Ingest CRQ (installation) tickets from CSV (raw-first)") + ap.add_argument("--apply", action="store_true", help="Write to DB (default: dry-run)") + ap.add_argument("--from-bucket", action="store_true", + help="Drain the incremental CRQ change stream (automations/crq/changes/) " + "from the isptickets S3 bucket: every not-yet-processed file " + "oldest→newest, upsert on ticket_id, advance the watermark, archive") + ap.add_argument("--reseed", action="store_true", + help="Ignore the stored watermark and drain every file in changes/ once " + "(one-time bucket cutover / reseed). Use with --from-bucket --apply") + ap.add_argument("--crq-csv", dest="local_csv", default=None, + help="Local CRQ tickets CSV file (dev)") + args = ap.parse_args() + + if not (args.from_bucket or args.local_csv): + ap.error("provide --from-bucket or --crq-csv") + pipeline.ingest(DATASET, args) + + +if __name__ == "__main__": + main() diff --git a/docs/deployment-and-operations.md b/docs/deployment-and-operations.md index fcc790f..306452f 100644 --- a/docs/deployment-and-operations.md +++ b/docs/deployment-and-operations.md @@ -1,20 +1,26 @@ # Deployment & Operations — fleettickets -Operational runbook for the INC ingest pipeline as deployed on **Coolify** +Operational runbook for the INC + CRQ ingest pipelines as deployed on **Coolify** (host `kianiadee@twala.rahamafresh.com`, key `~/.ssh/id_ed25519`). Covers the container, environment, schedule, auto-deploy webhook, the source-bucket cutover procedure, and verification. Secrets are referenced by **where to retrieve them**, never by value. +> **One image, two datasets.** INC and CRQ share an identical 32-column source schema +> and the same `isptickets` bucket; they run as **two Scheduled Tasks** off the one +> container, via thin entrypoints `python -m inc.import_inc` / `python -m crq.import_crq` +> over the shared `pipeline.py` engine. Everything below applies to both unless noted. + ## What's deployed | Thing | Detail | |---|---| | Coolify app | **`fleettickets`** — id `15`, uuid `g14mwzo73q20g70vc6fzumya`, build pack `dockerfile`, git `main` | | Container | built from this repo's `Dockerfile` (`python:3.12-slim`, `TZ=Africa/Nairobi`); kept alive with `tail -f /dev/null` (no web server) | -| Ingest | a Coolify **Scheduled Task** `inc_tickets` running `python import_tickets.py --from-bucket --apply` | +| Ingest (INC) | Coolify **Scheduled Task** `inc_tickets` → `python -m inc.import_inc --from-bucket --apply` | +| Ingest (CRQ) | Coolify **Scheduled Task** `crq_tickets` → `python -m crq.import_crq --from-bucket --apply` | | DB | `tickets` schema in the shared `tracksolid_db` (internal host `timescale_db:5432`) | -| Source | **`isptickets`** S3 bucket, `automations/inc/changes/.csv` CDC stream (see `../n8n-s3-ticket-exports.md` and `../README.md`) | +| Source | **`isptickets`** S3 bucket, `automations/{inc,crq}/changes/.csv` CDC streams (see `../n8n-s3-ticket-exports.md` and `../README.md`) | Resolve the live container name (Coolify appends a random suffix): ```bash @@ -24,17 +30,20 @@ ssh -i ~/.ssh/id_ed25519 kianiadee@twala.rahamafresh.com \ ## Schedule (cron) -The Scheduled Task runs **`*/20 6-20 * * *`** — every 20 min, **06:00–20:40 EAT**. -Coolify evaluates task cron in the server timezone (`server_settings.server_timezone` -= `Africa/Nairobi`), so **no UTC conversion** — write EAT directly. The `--from-bucket` -run is a cheap no-op when no new change file has arrived (watermark guard), so a dense -schedule is safe. +Both Scheduled Tasks (`inc_tickets`, `crq_tickets`) run **`*/20 6-20 * * *`** — every +20 min, **06:00–20:40 EAT**. Coolify evaluates task cron in the server timezone +(`server_settings.server_timezone` = `Africa/Nairobi`), so **no UTC conversion** — write +EAT directly. The `--from-bucket` run is a cheap no-op when no new change file has arrived +(watermark guard, per dataset), so a dense schedule is safe. To change the frequency, edit the task in the Coolify UI, or in `coolify-db`: ```sql UPDATE scheduled_tasks SET frequency = '*/20 6-20 * * *', updated_at = now() -WHERE name = 'inc_tickets'; -- id 3 +WHERE name IN ('inc_tickets', 'crq_tickets'); ``` +The `crq_tickets` task is added the same way INC was — in the Coolify UI (Scheduled Tasks +→ Add) with command `python -m crq.import_crq --from-bucket --apply`, container +`fleettickets`, cron `*/20 6-20 * * *`. Coolify's scheduler re-reads `scheduled_tasks` each minute, so the change is picked up without a redeploy. Execution history: `scheduled_task_executions`. @@ -118,27 +127,51 @@ application_id = '15' ORDER BY created_at DESC LIMIT 3;` (note: `application_id` If the provider moves the INC feed to a new bucket (as happened `tickets` → `isptickets`, 2026-06-25): -1. **Inspect** the new bucket (read-only) — confirm `automations/inc/changes/` layout, - timestamp range, schema parity. CRQ (`automations/crq/`) stays out of scope. +1. **Inspect** the new bucket (read-only) — confirm `automations/{inc,crq}/changes/` layout, + timestamp range, schema parity. 2. **Update env** (UI or tinker): `RUSTFS_ACCESS_KEY`, `RUSTFS_SECRET_KEY`, - `TICKETS_BUCKET` → the new bucket (endpoint usually unchanged). + `TICKETS_BUCKET` → the new bucket (endpoint usually unchanged). Both datasets read the + same bucket, so one env change serves both tasks. 3. **Reconcile the DB** to current. The loader drains every `changes/` file newer than the - watermark (`tickets.import_meta.metadata.source_max_key`), oldest→newest, upserting on - `ticket_id`: + watermark (`tickets.import_meta.metadata.source_max_key`, **per dataset**), oldest→newest, + upserting on `ticket_id`: - If the watermark **predates** the new bucket's first file, a normal `--from-bucket --apply` drains the whole new stream — no reseed needed. - Otherwise use **`--reseed`** (ignores the watermark, drains all `changes/` once): - `python import_tickets.py --from-bucket --reseed --apply` (see README "Bucket cutover"). + `python -m inc.import_inc --from-bucket --reseed --apply` (see README "Bucket cutover"). The new stream's periodic full-state re-emissions make this converge even across the cutover gap. Idempotent upserts + never-delete make it non-destructive. - For a one-off, you can run it in the live container with the new creds inlined: `docker exec -e TICKETS_BUCKET=… -e RUSTFS_ACCESS_KEY=… -e RUSTFS_SECRET_KEY=… - sh -c "cd /app && python import_tickets.py --from-bucket --apply"`. -4. **Re-geocode** new clusters/locations: `--geocode-clusters --apply` then - `--geocode-locations --apply` (existing gazetteer persists; only new keys are looked up). + sh -c "cd /app && python -m inc.import_inc --from-bucket --apply"`. +4. **Re-geocode** new clusters/locations: `python -m inc.import_inc --geocode-clusters --apply` + then `--geocode-locations --apply` (cross-dataset; existing gazetteer persists; only new + keys are looked up). 5. **Redeploy** so the Scheduled Task's container picks up the new env (push `main` → webhook, or manual deploy). Old bucket is left untouched for rollback. +## Bringing CRQ online (one-time seed) + +CRQ was added 2026-06-25 (data layer + map). To seed `tickets.crq` from zero on the live +DB — once the code + migration `13_crq_columns.sql` are deployed (`run_migrations.py` +applies it on build): + +1. **Verify** the migration applied: `SELECT 1 FROM tickets.schema_migrations WHERE + filename='13_crq_columns.sql';` and `\d tickets.crq` shows the typed columns. +2. **Seed** from isptickets (empty `crq` watermark → drains all `automations/crq/changes/` + files oldest→newest; the stream's periodic full-state snapshots converge to current + state — same convergence the INC cutover relied on, so **no `--reseed` needed**): + ```bash + python -m crq.import_crq --from-bucket # dry-run first ("N of N change file(s)…") + python -m crq.import_crq --from-bucket --apply # commit + archive to crq/processed/ + ``` + (Or in the live container with `docker exec … sh -c "cd /app && python -m crq.import_crq + --from-bucket --apply"`.) +3. **Geocode** (cross-dataset; most clusters already resolved from INC, so few new lookups): + `python -m inc.import_inc --geocode-clusters --apply` then `--geocode-locations --apply`. +4. **Confirm** CRQ on the map: `SELECT reporting.fn_tickets_for_map() -> 'summary';` shows a + non-zero `crq` count. The `crq_tickets` Scheduled Task then keeps it current. + ## Verification ```bash diff --git a/docs/implementation.md b/docs/implementation.md index 54db84d..25563e5 100644 --- a/docs/implementation.md +++ b/docs/implementation.md @@ -3,9 +3,18 @@ What is actually built and deployed, as of the Phase-1 completion. Companion to `docs/phase-1-ingestion.md` (plan) and `docs/phase-2-dashboard.md` (next). -## Pipeline (`import_tickets.py`) +## Pipeline (`pipeline.py` engine + `inc/`,`crq/` entrypoints) -- **Source:** the incremental CDC stream `automations/inc/changes/.csv` +The dataset-agnostic CDC engine lives in **`pipeline.py`**, parameterized by a small +`Dataset` config (name, table, `automations//changes|processed/` prefixes, key +regex, optional `post_apply` hook). Two thin entrypoints supply that config and the CLI: +**`inc/import_inc.py`** (`python -m inc.import_inc`, `post_apply=capture_history`) and +**`crq/import_crq.py`** (`python -m crq.import_crq`, no history hook). INC and CRQ share an +**identical 32-column source schema**, so the engine is fully shared; geocoding is +**cross-dataset** (one gazetteer/budget, unions `tickets.inc` + `tickets.crq`) and is run +from the INC entrypoint. + +- **Source:** the incremental CDC stream `automations//changes/.csv` in the **`isptickets`** S3 bucket (endpoint `https://s3.rahamafresh.com`, path-style, region `us-east-1`; was the `tickets` bucket before the 2026-06-25 cutover). - **S3 access via boto3** (no aws-CLI dependency): `list_objects_v2` (paginator), @@ -24,9 +33,11 @@ What is actually built and deployed, as of the Phase-1 completion. Companion to - **History capture:** after each `--apply` run (ingest or skip), calls `tickets.capture_history()` → appends new closures + upserts today's backlog snapshot. -- CLI: `--from-bucket` (drain the INC change stream), `--reseed` (ignore the watermark; - one-time bucket cutover), `--inc-csv ` (local dev), `--apply` (else dry-run), - `--geocode-clusters`, `--geocode-locations`, `--capture-history`. +- CLI (`inc`): `--from-bucket` (drain the INC change stream), `--reseed` (ignore the + watermark; one-time bucket cutover), `--inc-csv ` (local dev), `--apply` (else + dry-run), `--geocode-clusters`, `--geocode-locations`, `--capture-history`. +- CLI (`crq`): `--from-bucket`, `--reseed`, `--crq-csv `, `--apply` (ingest only; + geocoding + history are not on the CRQ entrypoint). ## Schema / migrations (`tracksolid_db`, applied via `run_migrations.py`) @@ -42,6 +53,8 @@ What is actually built and deployed, as of the Phase-1 completion. Companion to | 08_inc_open_sla_view | `tickets.inc_open_sla` view (open tickets + derived SLA) | | 09_inc_dashboard_fn | **built** — `reporting.fn_inc_dashboard(cluster, status, window, from, to)`: one JSON payload (open GeoJSON + windowed closed GeoJSON + metrics + freshness) for the FleetOps live INC map. See `docs/phase-2-dashboard.md` | | 10_inc_history_capture | **built** — `tickets.closure_events` (append-only observed closures) + `tickets.inc_daily_snapshot` (per-EAT-day open backlog + flow) + `tickets.capture_history()`; the ingest calls it each `--apply` run. Unlocks backlog-over-time | +| 12_inc_dashboard_by_owner | **built** — owner/team breakdown extension to `fn_inc_dashboard` | +| 13_crq_columns | **built** — CRQ mirror of `03`: typed STORED generated columns + indexes on `tickets.crq` (reuses `tickets.eat_ts()`). Data-layer parity for the CRQ tab | `tickets.inc` columns: `ticket_id` (PK), `raw` (jsonb, source of truth), `normalized_status`/`raw_status`, `bucket`, `is_actionable`, `cluster`/`region`/ @@ -56,10 +69,11 @@ What is actually built and deployed, as of the Phase-1 completion. Companion to - **Coolify** app built from this repo's `Dockerfile` (`python:3.12-slim`, `TZ=Africa/Nairobi`, keep-alive `tail -f /dev/null`). Separate from the FleetOps web app (`fleet-ops-staging`). -- **Scheduled Task:** `python import_tickets.py --from-bucket --apply`, cron +- **Scheduled Tasks (two):** `inc_tickets` → `python -m inc.import_inc --from-bucket + --apply` and `crq_tickets` → `python -m crq.import_crq --from-bucket --apply`, both cron `*/20 6-20 * * *` in **EAT** (Coolify runs tasks in EAT — no UTC conversion). - **Env vars** (Coolify): `DATABASE_URL` (internal DB host), `RUSTFS_*` - (`isptickets` bucket), `GEOCODER_*`. + (`isptickets` bucket — serves both inc + crq), `GEOCODER_*`. - For a plain host/VM, `run_ingest.sh` + a crontab line is the alternative. Full ops runbook (env management, the Forgejo → Coolify auto-deploy webhook, manual @@ -93,5 +107,12 @@ Phase 2 (built): `fn_inc_dashboard` read-API → FleetOps live map (open + close overlay + metrics); history capture (`closure_events` + `inc_daily_snapshot`) for backlog/closure trends. Remaining: `dashboard_api` endpoint + FleetOps SPA (other repos; see `docs/dashboard-api-contract.md`), FleetNow **dispatch** off `geog`, -**team closure attribution**. **CRQ** = separate future project reusing this -machinery against `automations/crq/`. +**team closure attribution**. + +**CRQ** (this milestone): the shared engine now feeds `tickets.crq` from +`automations/crq/changes/` (`crq/import_crq.py`), with typed columns (migration 13) and +cross-dataset geocoding — CRQ shows on the Tickets map via `fn_tickets_for_map` (which +already unions it) and gets its own FleetOps tab. Deferred to a follow-up once +installation-lifecycle semantics are confirmed: the CRQ analogues of migrations +08/09/10 — `crq_open_sla`, `fn_crq_dashboard`, and CRQ history capture (`tickets.crq` +currently has **no** `post_apply` hook). diff --git a/docs/phase-1-ingestion.md b/docs/phase-1-ingestion.md index 9cbbb0b..ee6a5d6 100644 --- a/docs/phase-1-ingestion.md +++ b/docs/phase-1-ingestion.md @@ -82,6 +82,11 @@ Deployed on **Coolify** (own app, `Dockerfile`, keep-alive worker). Ingest runs **Scheduled Task**: `python import_tickets.py --from-bucket --apply`, cron `15 7-19 * * *` in **EAT**. Env: `DATABASE_URL`, `RUSTFS_*`, `GEOCODER_*`. +> **Superseded** (historical Phase-1 plan). As built: the loader is now the shared +> `pipeline.py` engine with thin entrypoints (`python -m inc.import_inc` / `-m +> crq.import_crq`), running as **two** Scheduled Tasks at cron `*/20 6-20 * * *`. See +> `implementation.md` and `deployment-and-operations.md`. + ## Data-quality findings (carried into Phase 2) - Source `sla_status` ≠ a plain 48h rule, and `mttr` is not wall-clock — pin the diff --git a/inc/__init__.py b/inc/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/inc/import_inc.py b/inc/import_inc.py new file mode 100644 index 0000000..bedc2a7 --- /dev/null +++ b/inc/import_inc.py @@ -0,0 +1,74 @@ +""" +inc/import_inc.py — Fireside Communications · INC (incident / fault) ingestion. +━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ +Thin entrypoint over the shared engine (`pipeline.py`) for the INC dataset: + tickets.inc — incidents / customer faults (FleetOps "Tickets" INC tab) + +INC reads the incremental CDC change stream automations/inc/changes/.csv +from the `isptickets` bucket, upserts on ticket_id, advances the per-dataset +watermark, archives each file to automations/inc/processed/, and — uniquely to +INC — runs tickets.capture_history() after each --apply run (closure_events + +daily backlog snapshot). Geocoding (--geocode-clusters / --geocode-locations) is +CROSS-DATASET and driven from here (the shared gazetteer covers inc + crq). + +Usage (needs DATABASE_URL + RUSTFS_* + GEOCODER_* env; see .env.example): + python -m inc.import_inc --from-bucket --apply + python -m inc.import_inc --from-bucket --reseed --apply # one-time bucket cutover + python -m inc.import_inc --inc-csv 2026-06-15T17-00-00.csv --apply + python -m inc.import_inc --geocode-clusters --apply + python -m inc.import_inc --geocode-locations --apply + +Pre-requisite: migrations applied (run_migrations.py) — tickets.inc/crq + +geo_clusters + geo_locations + reporting.fn_tickets_for_map. +━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ +""" + +from __future__ import annotations + +import argparse + +import pipeline + +# INC captures closure/backlog history after every --apply run (CRQ does not yet). +DATASET = pipeline.make_dataset("inc", post_apply=pipeline.capture_history) + + +def main() -> None: + ap = argparse.ArgumentParser(description="Ingest INC tickets from CSV (raw-first) + geocode") + ap.add_argument("--apply", action="store_true", help="Write to DB (default: dry-run)") + ap.add_argument("--from-bucket", action="store_true", + help="Drain the incremental INC change stream (automations/inc/changes/) " + "from the isptickets S3 bucket: every not-yet-processed file " + "oldest→newest, upsert on ticket_id, advance the watermark, archive") + ap.add_argument("--reseed", action="store_true", + help="Ignore the stored watermark and drain every file in changes/ once " + "(one-time bucket cutover / reseed). Use with --from-bucket --apply") + ap.add_argument("--inc-csv", dest="local_csv", default=None, + help="Local INC tickets CSV file (dev)") + ap.add_argument("--geocode-clusters", action="store_true", + help="Geocode distinct clusters (inc+crq) into the gazetteer, then re-resolve") + ap.add_argument("--geocode-locations", action="store_true", + help="Geocode actionable inc+crq location_names precisely (keyed provider), " + "then re-resolve") + ap.add_argument("--capture-history", action="store_true", + help="Run tickets.capture_history() standalone " + "(closure_events + daily snapshot)") + args = ap.parse_args() + + if args.geocode_clusters: + pipeline.geocode_clusters(apply=args.apply) + return + if args.geocode_locations: + pipeline.geocode_locations(apply=args.apply) + return + if args.capture_history: + pipeline.capture_history() + return + if not (args.from_bucket or args.local_csv): + ap.error("provide --from-bucket, --inc-csv, --geocode-clusters, " + "--geocode-locations, or --capture-history") + pipeline.ingest(DATASET, args) + + +if __name__ == "__main__": + main() diff --git a/migrations/13_crq_columns.sql b/migrations/13_crq_columns.sql new file mode 100644 index 0000000..197e6ca --- /dev/null +++ b/migrations/13_crq_columns.sql @@ -0,0 +1,55 @@ +-- 13_crq_columns.sql — fleettickets · unpack tickets.crq.raw into typed columns +-- ───────────────────────────────────────────────────────────────────────────── +-- CRQ (new-installation) mirror of 03_inc_columns.sql. CRQ shares the INC source's +-- IDENTICAL 32-column flat-CSV schema, so the same STORED generated columns apply: +-- the crq dataset gets real typed, indexable columns while `raw` stays the source +-- of truth (drift-safe). STORED generated columns are computed for ALL existing +-- rows on creation and auto-recomputed on every future insert/update — no loader +-- change needed. +-- +-- tickets.eat_ts() (EAT wall-clock text -> timestamptz, IMMUTABLE) already exists +-- from 03_inc_columns.sql — reuse it, don't redefine. See that file's note on why +-- IMMUTABLE is safe for Kenya (fixed UTC+3, no DST). +-- +-- Idempotent: safe on a fresh DB and re-appliable on the live DB. +-- ───────────────────────────────────────────────────────────────────────────── + +SET search_path = tickets, public; + +ALTER TABLE tickets.crq + -- text + ADD COLUMN IF NOT EXISTS service_type text GENERATED ALWAYS AS (raw->>'service_type') STORED, + ADD COLUMN IF NOT EXISTS bucket text GENERATED ALWAYS AS (raw->>'bucket') STORED, + ADD COLUMN IF NOT EXISTS raw_status text GENERATED ALWAYS AS (raw->>'raw_status') STORED, + ADD COLUMN IF NOT EXISTS normalized_status text GENERATED ALWAYS AS (raw->>'normalized_status') STORED, + ADD COLUMN IF NOT EXISTS cluster text GENERATED ALWAYS AS (raw->>'cluster') STORED, + ADD COLUMN IF NOT EXISTS region text GENERATED ALWAYS AS (raw->>'region') STORED, + ADD COLUMN IF NOT EXISTS location_name text GENERATED ALWAYS AS (raw->>'location_name') STORED, + ADD COLUMN IF NOT EXISTS assigned_team text GENERATED ALWAYS AS (raw->>'assigned_team') STORED, + ADD COLUMN IF NOT EXISTS owner text GENERATED ALWAYS AS (raw->>'owner') STORED, + ADD COLUMN IF NOT EXISTS sla_status text GENERATED ALWAYS AS (raw->>'sla_status') STORED, + -- numeric / float + ADD COLUMN IF NOT EXISTS mttr numeric GENERATED ALWAYS AS (NULLIF(raw->>'mttr','')::numeric) STORED, + ADD COLUMN IF NOT EXISTS latitude double precision GENERATED ALWAYS AS (NULLIF(raw->>'latitude','')::double precision) STORED, + ADD COLUMN IF NOT EXISTS longitude double precision GENERATED ALWAYS AS (NULLIF(raw->>'longitude','')::double precision) STORED, + -- boolean + ADD COLUMN IF NOT EXISTS is_actionable boolean GENERATED ALWAYS AS (NULLIF(raw->>'is_actionable','')::boolean) STORED, + ADD COLUMN IF NOT EXISTS is_auto_created boolean GENERATED ALWAYS AS (NULLIF(raw->>'is_auto_created','')::boolean) STORED, + ADD COLUMN IF NOT EXISTS is_auto_closed boolean GENERATED ALWAYS AS (NULLIF(raw->>'is_auto_closed','')::boolean) STORED, + ADD COLUMN IF NOT EXISTS is_alarm boolean GENERATED ALWAYS AS (NULLIF(raw->>'is_alarm','')::boolean) STORED, + -- timestamps (EAT wall-clock -> timestamptz). created_at/updated_at are the + -- EXPORT pipeline's bookkeeping (not ticket lifecycle), hence the source_ prefix. + ADD COLUMN IF NOT EXISTS created_at_service timestamptz GENERATED ALWAYS AS (tickets.eat_ts(raw->>'created_at_service')) STORED, + ADD COLUMN IF NOT EXISTS scheduled_at timestamptz GENERATED ALWAYS AS (tickets.eat_ts(raw->>'scheduled_at')) STORED, + ADD COLUMN IF NOT EXISTS closed_at timestamptz GENERATED ALWAYS AS (tickets.eat_ts(raw->>'closed_at')) STORED, + ADD COLUMN IF NOT EXISTS last_seen_at timestamptz GENERATED ALWAYS AS (tickets.eat_ts(raw->>'last_seen_at')) STORED, + ADD COLUMN IF NOT EXISTS first_seen_at timestamptz GENERATED ALWAYS AS (tickets.eat_ts(raw->>'first_seen_at')) STORED, + ADD COLUMN IF NOT EXISTS source_created_at timestamptz GENERATED ALWAYS AS (tickets.eat_ts(raw->>'created_at')) STORED, + ADD COLUMN IF NOT EXISTS source_updated_at timestamptz GENERATED ALWAYS AS (tickets.eat_ts(raw->>'updated_at')) STORED; + +-- indexes on the new typed columns (serve cluster / team / closure queries) +CREATE INDEX IF NOT EXISTS ix_crq_norm_status_col ON tickets.crq (normalized_status); +CREATE INDEX IF NOT EXISTS ix_crq_cluster_col ON tickets.crq (cluster); +CREATE INDEX IF NOT EXISTS ix_crq_assigned_team ON tickets.crq (assigned_team); +CREATE INDEX IF NOT EXISTS ix_crq_closed_at ON tickets.crq (closed_at); +CREATE INDEX IF NOT EXISTS ix_crq_actionable_col ON tickets.crq (is_actionable) WHERE is_actionable; diff --git a/n8n-s3-ticket-exports.md b/n8n-s3-ticket-exports.md index 4084bc6..4984491 100644 --- a/n8n-s3-ticket-exports.md +++ b/n8n-s3-ticket-exports.md @@ -61,10 +61,12 @@ Notes: listings but contain **no objects** (leftover/legacy; ignore them). There is no `latest` pointer and no JSON/metadata envelope. - **CRQ mirrors INC**: `automations/crq/changes/` carries the same incremental - stream (with matching baseline timestamps and additional newer deltas). CRQ - remains out of scope for `import_tickets.py` (INC-only), but the source-side - shape is the same. CRQ's old root snapshots (`automations/crq/.csv`) are - still present because nothing archives them — they are not consumed. + stream (with matching baseline timestamps and additional newer deltas) and the + identical 32-column schema. As of 2026-06-25 CRQ **is consumed** — by + `crq/import_crq.py` over the shared `pipeline.py` engine (`tickets.crq`), the same + way INC is consumed by `inc/import_inc.py`. CRQ's old root snapshots + (`automations/crq/.csv`, old `tickets` bucket) are still present because nothing + archives them — they are not consumed (the `changes/` stream is the source of truth). ## CSV Schema @@ -83,7 +85,7 @@ Each row is a complete record (not a partial diff): a delta row carries the ticket's full current state, so a plain upsert on `ticket_id` is correct. The baseline still contains `is_alarm=true` rows and may include a leading `EXPORT STOPPED…` truncation-sentinel row in `ticket_id`; both are filtered by -the consumer (see `import_tickets.py`). +the consumer (see `pipeline.py` + the `inc/`,`crq/` entrypoints). ## Timestamp Format @@ -96,11 +98,12 @@ Generated once at the start of each execution, formatted in `Africa/Nairobi` incremental files appear whenever a change batch is exported (multiple within the same hour are normal, e.g. `15-50-39` then `15-53-04`). -## How `import_tickets.py` Consumes It +## How the loader Consumes It -`python import_tickets.py --from-bucket --apply` (see `run_ingest.sh`): +`python -m inc.import_inc --from-bucket --apply` (and `python -m crq.import_crq +--from-bucket --apply`; see `run_ingest.sh`) — both over the shared `pipeline.py` engine: -1. Lists `automations/inc/changes/.csv`, sorts ascending by timestamp. +1. Lists `automations//changes/.csv`, sorts ascending by timestamp. 2. Skips files at/older than the **watermark** (`tickets.import_meta.metadata->>'source_max_key'` — the newest file already applied); on a fresh stream it processes everything present. diff --git a/import_tickets.py b/pipeline.py similarity index 72% rename from import_tickets.py rename to pipeline.py index 5497bf9..4233038 100644 --- a/import_tickets.py +++ b/pipeline.py @@ -1,12 +1,17 @@ """ -import_tickets.py — Fireside Communications · INC ticket ingestion (raw-first) +pipeline.py — Fireside Communications · generic ticket ingestion engine (raw-first) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ -Loads the client's field-ops INC ticket snapshots into the `tickets` schema — the -source of the FleetOps "Tickets" map. - tickets.inc — incidents / customer faults +The dataset-agnostic core shared by the per-type entrypoints: + inc/import_inc.py -> tickets.inc (incidents / customer faults) + crq/import_crq.py -> tickets.crq (new-installation requests) -STRICTLY INC: CRQ (new-installation) exports are out of scope and not processed -here. `tickets.crq` stays in the schema but is not fed by this pipeline. +Both datasets share an IDENTICAL flat-CSV schema and the same CDC change stream, +so the only differences are the table, the S3 prefixes, the import_meta dataset +key, and an optional post-apply hook (INC captures closure/backlog history; CRQ +does not yet). Those are carried by the `Dataset` config; everything else here is +generic. Geocoding is inherently CROSS-DATASET (one gazetteer, one geocoder +budget): geocode_clusters / geocode_locations / resolve operate on BOTH tables and +are driven from a single entrypoint (the INC one) — never duplicated per dataset. RAW-FIRST: each row stores only `ticket_id` + `raw` (the source record as jsonb). Everything downstream reads from `raw` (resilient to source schema drift). The DB @@ -14,50 +19,33 @@ derives `geom` (see migrations): feed coords (raw lat/lng) -> location geocode (tickets.geo_locations) -> cluster centroid (tickets.geo_clusters) -> none. Source data: the n8n S3 export writes CSV files to the `isptickets` bucket under - automations/inc/changes/.csv (e.g. 2026-06-24T09-55-44.csv) + automations//changes/.csv (e.g. 2026-06-24T09-55-44.csv) This is an INCREMENTAL (CDC) stream: the first file is a full current-state baseline, and every later file holds only the rows that CHANGED since the prior -export (new + updated tickets, keyed by ticket_id; deletions are never emitted). -Every file shares the identical flat-CSV schema (header + rows). We ingest EVERY -not-yet-processed file in ASCENDING timestamp order (baseline first, then each -delta) — taking only the newest would silently drop the intermediate deltas: +export (with periodic full-state re-emissions). Deletions are never emitted. Every +file shares the identical flat-CSV schema. We ingest EVERY not-yet-processed file +in ASCENDING timestamp order (baseline first, then each delta) — taking only the +newest would silently drop the intermediate deltas: - drop is_alarm=true rows + the "EXPORT STOPPED…" truncation-sentinel row; - drop derivable / provenance / zero-info columns (see DROP_FIELDS); - normalize region -> lowercase, raw_status -> UPPERCASE; - upsert on ticket_id (PRIMARY KEY → no duplication; never delete, so closure history accumulates), and advance the watermark in tickets.import_meta (metadata->>'source_max_key' = newest file applied) so reruns skip what's done; - - on success, MOVE each file to automations/inc/processed/ (copy + delete). - -Geocoding (two layers, both via a KEYED provider — public Nominatim rate-limits): - --geocode-clusters one coordinate per cluster (coarse fallback; ~50 lookups) - --geocode-locations precise per-location for ACTIONABLE INC tickets: parses the - real place out of location_name (region+cluster+location_name, - network codes stripped), geocodes it, caches in - tickets.geo_locations, then re-resolves geoms. -Provider/key from env: GEOCODER_PROVIDER (locationiq|opencage), GEOCODER_API_KEY. - -Usage (needs DATABASE_URL + RUSTFS_* + GEOCODER_* env; see .env.example): - python import_tickets.py --from-bucket --apply - python import_tickets.py --from-bucket --reseed --apply # one-time bucket cutover - python import_tickets.py --inc-csv 2026-06-15T17-00-00.csv --apply - python import_tickets.py --geocode-clusters --apply - python import_tickets.py --geocode-locations --apply - -Pre-requisite: migration applied (run_migrations.py) — tickets.inc/crq + -geo_clusters + geo_locations + reporting.fn_tickets_for_map. + - on success, MOVE each file to automations//processed/ (copy + delete). ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ """ from __future__ import annotations -import argparse -import csv import io +import csv import math import os import re import time +from collections.abc import Callable +from dataclasses import dataclass from datetime import datetime, timezone, timedelta import boto3 @@ -67,14 +55,10 @@ from botocore.config import Config as BotoConfig from shared import clean, get_conn, get_logger -log = get_logger("import_tickets") +log = get_logger("pipeline") -# ── INC ingestion config ────────────────────────────────────────────────────── -_TABLE = "tickets.inc" -_DATASET = "inc" +# ── shared ingestion config ───────────────────────────────────────────────────── _BUCKET = os.getenv("TICKETS_BUCKET", "isptickets") -_INC_PREFIX = "automations/inc/changes/" # the incremental (CDC) change stream -_PROCESSED_PREFIX = "automations/inc/processed/" _EAT = timezone(timedelta(hours=3)) # Africa/Nairobi — filenames + data are EAT # Garbage row the source leaks (commonly the first data line): its ticket_id is the @@ -91,11 +75,6 @@ DROP_FIELDS = frozenset({ "department", "source_type", }) -# Only files matching automations/inc/changes/.csv — the incremental -# stream (NOT processed/, NOT the leftover latest.csv/, latest.json/, full/ prefixes). -_CHANGE_KEY_RE = re.compile( - r"^automations/inc/changes/(\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2})\.csv$") - # Geocoder (keyed) — public Nominatim rate-limits bulk, so we use LocationIQ/OpenCage. _PROVIDER = os.getenv("GEOCODER_PROVIDER", "locationiq").lower() _API_KEY = os.getenv("GEOCODER_API_KEY", "") @@ -103,12 +82,34 @@ _GEOCODE_INTERVAL_S = float(os.getenv("GEOCODER_MIN_INTERVAL_S", "1.1")) _last_geocode_at = 0.0 +# ── dataset config (per ticket type) ──────────────────────────────────────────── +@dataclass(frozen=True) +class Dataset: + """All that distinguishes one ticket type from another in the generic engine.""" + name: str # 'inc' | 'crq' (import_meta.dataset) + table: str # 'tickets.inc' | 'tickets.crq' + change_prefix: str # 'automations//changes/' + processed_prefix: str # 'automations//processed/' + key_regex: re.Pattern # matches a .csv key + post_apply: Callable[[], None] | None = None # e.g. capture_history (INC only) + + +def make_dataset(name: str, post_apply: Callable[[], None] | None = None) -> Dataset: + """Build the standard Dataset for a ticket type (inc/crq) — only the name varies.""" + return Dataset( + name=name, + table=f"tickets.{name}", + change_prefix=f"automations/{name}/changes/", + processed_prefix=f"automations/{name}/processed/", + # only automations//changes/.csv — the incremental stream + # (NOT processed/, NOT the leftover latest.csv/, latest.json/, full/ prefixes). + key_regex=re.compile( + rf"^automations/{name}/changes/(\d{{4}}-\d{{2}}-\d{{2}}T\d{{2}}-\d{{2}}-\d{{2}})\.csv$"), + post_apply=post_apply, + ) + + # ── data loading (CSV · incremental CDC change stream · per-file watermark) ───── -# The n8n export writes an INCREMENTAL change stream to the `isptickets` bucket under -# automations/inc/changes/.csv: a first full-state baseline, then files -# holding only the rows that CHANGED (with periodic full-state re-emissions). We drain -# EVERY not-yet-processed file oldest→newest, upsert on ticket_id, advance the watermark -# (tickets.import_meta.metadata->>'source_max_key') per file, and archive it to processed/. # S3 access is via boto3 (no aws-CLI dependency → runs cleanly in a slim container). def _s3_client(): """boto3 S3 client for the S3 endpoint (force path-style addressing).""" @@ -123,9 +124,9 @@ def _s3_client(): ) -def _ts_from_key(key: str) -> datetime | None: - """EAT timestamp embedded in an automations/inc/changes/.csv key (or None).""" - m = _CHANGE_KEY_RE.match(key) +def _ts_from_key(ds: Dataset, key: str) -> datetime | None: + """EAT timestamp embedded in an automations//changes/.csv key (or None).""" + m = ds.key_regex.match(key) if not m: return None try: # regex shape can match an impossible date (e.g. 9999-99-99T…) — don't crash the sort @@ -134,12 +135,12 @@ def _ts_from_key(key: str) -> datetime | None: return None -def _list_inc_csvs(s3) -> list[tuple[str, str]]: - """[(key, etag)] for every automations/inc/changes/.csv (excludes processed/ + dirs).""" +def _list_csvs(s3, ds: Dataset) -> list[tuple[str, str]]: + """[(key, etag)] for every changes/.csv of this dataset (excludes processed/ + dirs).""" out: list[tuple[str, str]] = [] - for page in s3.get_paginator("list_objects_v2").paginate(Bucket=_BUCKET, Prefix=_INC_PREFIX): + for page in s3.get_paginator("list_objects_v2").paginate(Bucket=_BUCKET, Prefix=ds.change_prefix): for it in page.get("Contents", []): - if _CHANGE_KEY_RE.match(it["Key"]): + if ds.key_regex.match(it["Key"]): out.append((it["Key"], (it.get("ETag") or "").strip('"'))) return out @@ -149,23 +150,22 @@ def _get_text(s3, key: str) -> str: return s3.get_object(Bucket=_BUCKET, Key=key)["Body"].read().decode("utf-8") -def _last_processed_ts() -> datetime | None: - """Watermark: EAT timestamp of the newest change file already ingested. +def _last_processed_ts(ds: Dataset) -> datetime | None: + """Watermark: EAT timestamp of the newest change file already ingested for this dataset. Read from tickets.import_meta (metadata->>'source_max_key', advanced per file as we drain changes/ oldest→newest). None when nothing has been ingested via the - changes stream yet (e.g. the first run after the source switched to incremental, - where the stored key is an old full-snapshot path) — then every file currently in - changes/ is processed. + changes stream yet (e.g. a brand-new dataset, or the first run after the source + switched buckets) — then every file currently in changes/ is processed. """ with get_conn() as conn: with conn.cursor() as cur: cur.execute( "SELECT metadata->>'source_max_key' FROM tickets.import_meta WHERE dataset = %s", - (_DATASET,), + (ds.name,), ) row = cur.fetchone() - return _ts_from_key(row[0]) if row and row[0] else None + return _ts_from_key(ds, row[0]) if row and row[0] else None def _parse_csv(text: str) -> list[dict]: @@ -177,10 +177,10 @@ def _load_csv_local(path: str) -> list[dict]: return list(csv.DictReader(f)) -def _move_processed(s3, keys: list[str]) -> None: - """Archive listed INC csv objects to automations/inc/processed/ (copy + delete).""" +def _move_processed(s3, ds: Dataset, keys: list[str]) -> None: + """Archive listed csv objects to automations//processed/ (copy + delete).""" for key in keys: - dst = _PROCESSED_PREFIX + key.rsplit("/", 1)[-1] + dst = ds.processed_prefix + key.rsplit("/", 1)[-1] s3.copy_object(Bucket=_BUCKET, CopySource={"Bucket": _BUCKET, "Key": key}, Key=dst) s3.delete_object(Bucket=_BUCKET, Key=key) log.info("archived %s -> %s", key, dst) @@ -206,8 +206,8 @@ def _prepare(row: dict) -> dict: # ── upsert (raw-first) ──────────────────────────────────────────────────────── -def _record_meta(cur, meta: dict, records_ingested: int) -> None: - """Upsert the INC snapshot metadata (powers map freshness + holds source_etag). +def _record_meta(cur, ds: Dataset, meta: dict, records_ingested: int) -> None: + """Upsert the snapshot metadata (powers map freshness + holds source_max_key). Runs on the caller's cursor so the row upsert and the meta write commit together — a half-written state (rows in, meta stale) breaks skip-if-unchanged. @@ -225,54 +225,58 @@ def _record_meta(cur, meta: dict, records_ingested: int) -> None: records_ingested = EXCLUDED.records_ingested, n8n_execution_id = EXCLUDED.n8n_execution_id, metadata = EXCLUDED.metadata, ingested_at = now()""", - (_DATASET, clean(meta.get("export_type")), clean(meta.get("exported_at")), + (ds.name, clean(meta.get("export_type")), clean(meta.get("exported_at")), clean(meta.get("snapshot_date")), clean(meta.get("source_schema")), clean(meta.get("source_table")), meta.get("row_count"), records_ingested, clean(meta.get("n8n_execution_id")), psycopg2.extras.Json(meta)), ) -def upsert(rows: list[dict], apply: bool, meta: dict | None = None) -> int: +def upsert(ds: Dataset, rows: list[dict], apply: bool, meta: dict | None = None) -> int: meta = meta or {} kept = [r for r in rows if _keep_row(r)] payload = [(clean(r["ticket_id"]), psycopg2.extras.Json(_prepare(r))) for r in kept] log.info("%s: %d rows read, %d kept, %d dropped (alarm/sentinel/no-id)", - _TABLE, len(rows), len(payload), len(rows) - len(payload)) + ds.table, len(rows), len(payload), len(rows) - len(payload)) if not apply: - log.info("DRY-RUN — nothing written to %s. Use --apply.", _TABLE) + log.info("DRY-RUN — nothing written to %s. Use --apply.", ds.table) return len(payload) with get_conn() as conn: with conn.cursor() as cur: psycopg2.extras.execute_values( cur, - f"INSERT INTO {_TABLE} (ticket_id, raw) VALUES %s " + f"INSERT INTO {ds.table} (ticket_id, raw) VALUES %s " "ON CONFLICT (ticket_id) DO UPDATE SET raw = EXCLUDED.raw, ingested_at = now()", payload, page_size=500, ) # same transaction as the upsert: rows + snapshot meta commit atomically - _record_meta(cur, meta, len(payload)) - log.info("upserted %d rows into %s", len(payload), _TABLE) + _record_meta(cur, ds, meta, len(payload)) + log.info("upserted %d rows into %s", len(payload), ds.table) return len(payload) -def _capture_history() -> None: - """Append new closures + upsert today's backlog snapshot (tickets.capture_history).""" +def capture_history() -> None: + """Append new closures + upsert today's backlog snapshot (tickets.capture_history). + + INC-only today (CRQ install-lifecycle history is a future migration); wired as + the INC Dataset's post_apply hook. + """ with get_conn() as conn: with conn.cursor() as cur: cur.execute("SELECT tickets.capture_history()") log.info("history: %s", cur.fetchone()[0]) -def ingest(args) -> None: - # Local-file path (dev): ingest a single CSV, no bucket / no archive. - if args.inc_csv: - rows = _load_csv_local(args.inc_csv) - name = os.path.basename(args.inc_csv) - ts = _ts_from_key(_INC_PREFIX + name) +def ingest(ds: Dataset, args) -> None: + # Local-file path (dev): ingest a single CSV, no bucket / no archive / no history. + if args.local_csv: + rows = _load_csv_local(args.local_csv) + name = os.path.basename(args.local_csv) + ts = _ts_from_key(ds, ds.change_prefix + name) meta = {"export_type": "full", "source_s3_key": name, "row_count": len(rows)} if ts: meta["exported_at"] = ts.isoformat() - upsert(rows, args.apply, meta=meta) + upsert(ds, rows, args.apply, meta=meta) return # --from-bucket: ingest EVERY not-yet-processed change file, oldest→newest @@ -280,11 +284,11 @@ def ingest(args) -> None: # the file is archived PER file, so a mid-run failure leaves a consistent state # (folder state matches the watermark) and the next run resumes cleanly. s3 = _s3_client() - listing = _list_inc_csvs(s3) + listing = _list_csvs(s3, ds) if not listing: - log.info("no INC change files under %s — nothing to do", _INC_PREFIX) + log.info("no %s change files under %s — nothing to do", ds.name, ds.change_prefix) return - listing.sort(key=lambda ke: _ts_from_key(ke[0]) or datetime.min.replace(tzinfo=_EAT)) + listing.sort(key=lambda ke: _ts_from_key(ds, ke[0]) or datetime.min.replace(tzinfo=_EAT)) # watermark: skip anything at/older than the newest file already applied. Archiving # normally empties changes/, but this guards a failed archive from re-applying. @@ -292,24 +296,25 @@ def ingest(args) -> None: # for a one-time bucket cutover, where the stored key points at the old bucket's stream # and its timestamp may be newer than the new bucket's first file. Crash-safe: each file # still advances source_max_key + archives per file, so a plain rerun resumes cleanly. - last_ts = None if args.reseed else _last_processed_ts() + last_ts = None if args.reseed else _last_processed_ts(ds) _floor = datetime.min.replace(tzinfo=_EAT) pending = [(k, e) for k, e in listing - if last_ts is None or (_ts_from_key(k) or _floor) > last_ts] + if last_ts is None or (_ts_from_key(ds, k) or _floor) > last_ts] if not pending: - log.info("all %d change file(s) already processed (watermark %s) — nothing new", - len(listing), last_ts and last_ts.isoformat()) + log.info("all %d %s change file(s) already processed (watermark %s) — nothing new", + len(listing), ds.name, last_ts and last_ts.isoformat()) if args.apply: - _move_processed(s3, [k for k, _ in listing]) # archive any stragglers - _capture_history() + _move_processed(s3, ds, [k for k, _ in listing]) # archive any stragglers + if ds.post_apply: + ds.post_apply() return - log.info("%d of %d change file(s) to ingest (watermark %s); newest=%s", - len(pending), len(listing), last_ts and last_ts.isoformat(), pending[-1][0]) + log.info("%d of %d %s change file(s) to ingest (watermark %s); newest=%s", + len(pending), len(listing), ds.name, last_ts and last_ts.isoformat(), pending[-1][0]) total = 0 for i, (key, etag) in enumerate(pending): rows = _parse_csv(_get_text(s3, key)) - ts = _ts_from_key(key) + ts = _ts_from_key(ds, key) # the first file applied onto an empty watermark is the full baseline; every # file after is a delta. export_type is informational (recorded in import_meta). meta = {"export_type": "baseline" if (last_ts is None and i == 0) else "delta", @@ -319,14 +324,14 @@ def ingest(args) -> None: meta["exported_at"] = ts.isoformat() # rows + watermark (source_max_key) commit in one txn, advancing per file; only # then archive, so the changes/ folder state always matches the watermark. - total += upsert(rows, args.apply, meta=meta) + total += upsert(ds, rows, args.apply, meta=meta) if args.apply: - _move_processed(s3, [key]) + _move_processed(s3, ds, [key]) else: - log.info("DRY-RUN — would archive %s to %s", key, _PROCESSED_PREFIX) - log.info("ingested %d change file(s); %d rows kept in total", len(pending), total) - if args.apply: - _capture_history() + log.info("DRY-RUN — would archive %s to %s", key, ds.processed_prefix) + log.info("ingested %d %s change file(s); %d rows kept in total", len(pending), ds.name, total) + if args.apply and ds.post_apply: + ds.post_apply() # ── place extraction (strip network codes, keep the real place) ─────────────── @@ -475,7 +480,7 @@ def geocode(query: str, viewbox: tuple | None = None) -> tuple[float, float, flo return None -# ── cluster gazetteer (coarse fallback) ─────────────────────────────────────── +# ── cluster gazetteer (coarse fallback; CROSS-DATASET: inc + crq) ───────────── def geocode_clusters(apply: bool) -> None: with get_conn() as conn: with conn.cursor() as cur: @@ -522,7 +527,7 @@ def geocode_clusters(apply: bool) -> None: log.info("gazetteer: %d clusters written (unverified — review tickets.geo_clusters)", written) -# ── per-location geocoding (precise; actionable INC) ────────────────────────── +# ── per-location geocoding (precise; actionable inc + crq) ──────────────────── # A location geocode is only trusted if it lands within this radius of the # cluster centroid; otherwise the geocoder matched the landmark in the wrong # place and we fall back to the cluster centroid. @@ -537,17 +542,25 @@ def geocode_locations(apply: bool) -> None: """ SELECT t.key, t.location_name, t.cluster, t.region, gc.lat AS clat, gc.lng AS clng FROM ( - SELECT tickets.norm_cluster(raw->>'location_name') AS key, - (array_agg(raw->>'location_name'))[1] AS location_name, - (array_agg(raw->>'cluster'))[1] AS cluster, - (array_agg(raw->>'region'))[1] AS region, - tickets.norm_cluster((array_agg(raw->>'cluster'))[1]) AS ckey - FROM tickets.inc - WHERE (raw->>'is_actionable')::boolean - AND raw->>'location_name' IS NOT NULL - AND tickets.norm_cluster(raw->>'location_name') IS NOT NULL - AND NOT EXISTS (SELECT 1 FROM tickets.geo_locations gl - WHERE gl.query_key = tickets.norm_cluster(raw->>'location_name') + SELECT tickets.norm_cluster(src.raw->>'location_name') AS key, + (array_agg(src.raw->>'location_name'))[1] AS location_name, + (array_agg(src.raw->>'cluster'))[1] AS cluster, + (array_agg(src.raw->>'region'))[1] AS region, + tickets.norm_cluster((array_agg(src.raw->>'cluster'))[1]) AS ckey + FROM ( + -- CROSS-DATASET: actionable INC + CRQ share one location gazetteer + SELECT raw FROM tickets.inc + WHERE (raw->>'is_actionable')::boolean + AND raw->>'location_name' IS NOT NULL + AND tickets.norm_cluster(raw->>'location_name') IS NOT NULL + UNION ALL + SELECT raw FROM tickets.crq + WHERE (raw->>'is_actionable')::boolean + AND raw->>'location_name' IS NOT NULL + AND tickets.norm_cluster(raw->>'location_name') IS NOT NULL + ) src + WHERE NOT EXISTS (SELECT 1 FROM tickets.geo_locations gl + WHERE gl.query_key = tickets.norm_cluster(src.raw->>'location_name') AND gl.geom IS NOT NULL) GROUP BY 1 ) t @@ -555,7 +568,7 @@ def geocode_locations(apply: bool) -> None: """ ) todo = cur.fetchall() - log.info("%d actionable-INC locations to geocode (provider=%s)", len(todo), _PROVIDER) + log.info("%d actionable inc+crq locations to geocode (provider=%s)", len(todo), _PROVIDER) if not apply: for key, loc, cluster, region, clat, clng in todo[:50]: log.info(" %s -> %s", key, " | ".join(compose_queries(loc, cluster, region))) @@ -609,41 +622,3 @@ def _resolve() -> int: with conn.cursor() as cur: cur.execute("SELECT tickets.resolve_ticket_geoms()") return cur.fetchone()[0] - - -# ── entrypoint ──────────────────────────────────────────────────────────────── -def main() -> None: - ap = argparse.ArgumentParser(description="Ingest INC tickets from CSV (raw-first) + geocode") - ap.add_argument("--apply", action="store_true", help="Write to DB (default: dry-run)") - ap.add_argument("--from-bucket", action="store_true", - help="Drain the incremental INC change stream (automations/inc/changes/) " - "from the isptickets S3 bucket: every not-yet-processed file " - "oldest→newest, upsert on ticket_id, advance the watermark, archive") - ap.add_argument("--reseed", action="store_true", - help="Ignore the stored watermark and drain every file in changes/ once " - "(one-time bucket cutover / reseed). Use with --from-bucket --apply") - ap.add_argument("--inc-csv", default=None, help="Local INC tickets CSV file (dev)") - ap.add_argument("--geocode-clusters", action="store_true", - help="Geocode distinct clusters into the gazetteer, then re-resolve geoms") - ap.add_argument("--geocode-locations", action="store_true", - help="Geocode actionable-INC location_names precisely (keyed provider), then re-resolve") - ap.add_argument("--capture-history", action="store_true", - help="Run tickets.capture_history() standalone (closure_events + daily snapshot)") - args = ap.parse_args() - - if args.geocode_clusters: - geocode_clusters(apply=args.apply) - return - if args.geocode_locations: - geocode_locations(apply=args.apply) - return - if args.capture_history: - _capture_history() - return - if not (args.from_bucket or args.inc_csv): - ap.error("provide --from-bucket, --inc-csv, --geocode-clusters, --geocode-locations, or --capture-history") - ingest(args) - - -if __name__ == "__main__": - main() diff --git a/pyproject.toml b/pyproject.toml index 952f3c7..85ac2e5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,10 +18,12 @@ dev = [ "ruff>=0.4", ] -# Flat-module project (no package dir) — list the top-level modules explicitly so -# `pip install .` works (the Docker image installs the project to pull its deps). +# Shared engine (pipeline) + helpers as top-level modules, plus the thin per-type +# entrypoint packages (inc/, crq/). Listed explicitly so `pip install .` works (the +# Docker image installs the project to pull its deps; runtime runs from /app via -m). [tool.setuptools] -py-modules = ["import_tickets", "shared", "run_migrations"] +py-modules = ["pipeline", "shared", "run_migrations"] +packages = ["inc", "crq"] [tool.uv] managed = true diff --git a/run_ingest.sh b/run_ingest.sh index 5593219..06d19b3 100755 --- a/run_ingest.sh +++ b/run_ingest.sh @@ -1,13 +1,17 @@ #!/usr/bin/env bash -# run_ingest.sh — fleettickets · hourly INC ingest wrapper for cron. +# run_ingest.sh — fleettickets · INC + CRQ ingest wrapper for cron (plain host/VM). # -# Loads env from the local .env (DATABASE_URL + RUSTFS_* + GEOCODER_*) and runs the -# newest-INC-CSV ingest with --apply (skip-if-unchanged + archive are built in). +# Loads env from the local .env (DATABASE_URL + RUSTFS_* + GEOCODER_*) and drains +# both ticket change streams with --apply (watermark skip-if-unchanged + per-file +# archive are built in, so a run with no new files is a cheap no-op). # -# Install on the instance (ingest every 20 min, 06:00–20:40 EAT): -# */20 6-20 * * * /opt/fleettickets/run_ingest.sh >> /var/log/fleettickets-inc.log 2>&1 +# Install on the instance (every 20 min, 06:00–20:40 EAT): +# */20 6-20 * * * /opt/fleettickets/run_ingest.sh >> /var/log/fleettickets.log 2>&1 # Ensure the crontab runs in the Africa/Nairobi timezone (CRON_TZ=Africa/Nairobi or # the host/container TZ), since the export filenames and the schedule are EAT. +# +# On Coolify the two ingests run as separate Scheduled Tasks instead (see Dockerfile +# + docs/deployment-and-operations.md); this wrapper is the plain-host fallback. set -euo pipefail cd "$(dirname "$0")" @@ -24,4 +28,7 @@ fi PY="python" [ -x ".venv/bin/python" ] && PY=".venv/bin/python" -exec "$PY" import_tickets.py --from-bucket --apply +# Run from the repo root (cwd above) so `-m inc.import_inc` / `-m crq.import_crq` +# resolve the packages alongside pipeline.py + shared.py. +"$PY" -m inc.import_inc --from-bucket --apply +"$PY" -m crq.import_crq --from-bucket --apply