Compare commits

...

8 commits

Author SHA1 Message Date
david kiania
c980f3edd0 feat(crq): dashboard parity functions (migration 16) + recover inc 13/14
Brings CRQ to FleetOps-dashboard parity with INC for the Tickets tab's CRQ sub-tab:
- 16_crq_dashboard.sql: tickets.crq_open_sla view (mirror of inc_open_sla, no geog;
  lat/lng from geom) + reporting.fn_crq_dashboard / fn_crq_search / fn_crq_filter_options
  (mirrors of the inc functions over tickets.crq) + grants to dashboard_ro/grafana_ro.
- Recover the previously un-versioned 13_inc_search_fn.sql + 14_inc_filter_options.sql
  into the repo (verbatim from the live defs) so a fresh DB rebuilds faithfully; the
  live ledger already lists them so run_migrations skips them there.

Consumed by dashboard_api GET /webhook/crq-dashboard|crq-search|crq-filter-options.
Not yet applied to prod (pending go-ahead).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-26 00:19:54 +03:00
david kiania
066d866b90 fix(crq): migration 15 creates tickets.crq (live DB never materialized it)
Live-DB reconciliation before seeding CRQ revealed two divergences:
- tickets.crq did NOT exist: 01_tickets_schema.sql was applied 2026-06-15 from a
  version predating its crq section, so the IF-NOT-EXISTS ledger guard has blocked
  it ever since (fn_tickets_for_map + resolve_ticket_geoms already reference crq, so
  they errored if called — masked because the live INC view uses fn_inc_dashboard).
- The live ledger carries un-versioned 13_inc_search_fn.sql / 14_inc_filter_options.sql
  (applied 2026-06-19, absent from this repo).

So 13_crq_columns.sql (ALTER-only, number 13) is replaced by 15_crq_table.sql, which
CREATEs tickets.crq self-containedly (table + geom trigger + raw/typed indexes) and
adds the typed STORED generated columns. Deterministic + idempotent on both the live DB
(crq missing) and a fresh DB (crq minimal from 01). Numbered 15 to sit after the live
ledger's max. Docs/CLI references updated 13->15.

Applied + seeded on the live DB out-of-band (running container, INC image untouched):
39,240 crq rows, 99.99% geocoded (cluster + shared location cache), watermark current,
crq now renders on fn_tickets_for_map.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 23:55:17 +03:00
david kiania
5f5d71d500 feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints
Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.

- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
  keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
  now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
  tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
  columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
  packages inc/crq. Docs (README, implementation, deployment-and-operations,
  n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.

tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 23:16:38 +03:00
david kiania
0787d3a185 docs: add deployment & operations runbook (Coolify, webhook, bucket cutover)
Capture the operational knowledge from the isptickets cutover: Coolify app/container,
env management (encrypted — UI or artisan tinker), cron, the Forgejo->Coolify auto-deploy
webhook (config + recreate/verify; it was missing), manual deploy trigger, the
source-bucket cutover procedure, and verification queries. Link it from README; refresh
stale tickets-bucket/ETag references in implementation.md to the isptickets CDC model.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 22:24:23 +03:00
david kiania
f06c11fd11 Merge fix/inc-changes-stream: isptickets bucket cutover + --reseed + 20-min cron 2026-06-25 18:40:57 +03:00
david kiania
7d3bba8d78 chore(schedule): INC ingest cron -> every 20 min, 06:00-20:40 EAT
Was hourly at :15 (15 7-19 * * *); now */20 6-20 * * * for fresher ticket
data through the working day. Updates the documented schedule in the Coolify
Scheduled Task command, run_ingest.sh, Dockerfile, README, and implementation
notes (the live schedule is set in the Coolify UI).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 18:23:17 +03:00
david kiania
509338c076 feat(import_tickets): migrate INC ingest to isptickets bucket + --reseed cutover
Provider moved the INC CDC feed to a new bucket (tickets -> isptickets, new
per-bucket creds; same s3.rahamafresh.com endpoint, identical 32-col schema).
This is config + a one-time reseed, not a rewrite — the loader already drains
automations/inc/changes/ oldest->newest with a source_max_key watermark.

- default _BUCKET -> isptickets (TICKETS_BUCKET still overrides)
- add --reseed: ignore the stored watermark and drain every changes/ file once
  (the old-bucket watermark may post-date the new bucket's first file). Crash-safe
  via the existing per-file watermark-advance + archive loop.
- refresh stale "newest-file / full-snapshot-per-hour" docstring/comments to the
  CDC reality; .env.example + README updated (new bucket + reseed runbook).

Verified live dry-run: 41/41 files drained (watermark None), alarm/sentinel
filter active, exit 0.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 18:20:15 +03:00
david kiania
a4b90a33d8 fix(inc): ingest the incremental changes/ stream (baseline + deltas)
The S3 source switched from full hourly snapshots at
automations/inc/<ts>.csv to an incremental CDC stream at
automations/inc/changes/<ts>.csv (first file = full baseline, each later
file = only the rows that changed, keyed by ticket_id; no deletions).

The loader still pointed at the old root path and only ingested the single
newest file, so after the switch it found nothing (no new tickets ingested)
and, even with the path fixed, would silently drop intermediate deltas.

Changes:
- point ingestion at automations/inc/changes/ (_CHANGE_KEY_RE)
- ingest EVERY not-yet-processed file in ascending timestamp order
  (baseline first, then each delta), upserting each
- replace the single-ETag skip with a per-file timestamp watermark
  (import_meta.metadata->>'source_max_key'); rows + watermark commit in one
  txn per file, then archive to processed/ — so a mid-run failure leaves a
  consistent, resumable state
- docs: rename n8n-hourly-s3-full-data-exports.md -> n8n-s3-ticket-exports.md
  and rewrite it for the incremental stream; fix the reference in
  docs/phase-1-ingestion.md

Verified live against prod: re-seeded baseline + 5 deltas (26,529 rows),
files archived to processed/, watermark advanced, re-run is a no-op.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-23 14:37:17 +03:00
19 changed files with 1320 additions and 375 deletions

View file

@ -3,12 +3,12 @@
# Shared database (the `tickets` schema lives in tracksolid_db; internal Docker host) # Shared database (the `tickets` schema lives in tracksolid_db; internal Docker host)
DATABASE_URL=postgresql://tracksolid_owner:<password>@timescale_db:5432/tracksolid_db DATABASE_URL=postgresql://tracksolid_owner:<password>@timescale_db:5432/tracksolid_db
# rustfs / S3 — source ticket snapshots (automations/inc/<EAT-timestamp>.csv) # S3 — source ticket CDC streams (isptickets bucket, automations/{inc,crq}/changes/<EAT-ts>.csv)
RUSTFS_ENDPOINT=https://s3.rahamafresh.com RUSTFS_ENDPOINT=https://s3.rahamafresh.com
RUSTFS_ACCESS_KEY=<key> RUSTFS_ACCESS_KEY=isptickets
RUSTFS_SECRET_KEY=<secret> RUSTFS_SECRET_KEY=<secret>
RUSTFS_REGION=us-east-1 RUSTFS_REGION=us-east-1
TICKETS_BUCKET=tickets TICKETS_BUCKET=isptickets
# Geocoder (keyed — public Nominatim rate-limits bulk) # Geocoder (keyed — public Nominatim rate-limits bulk)
GEOCODER_PROVIDER=locationiq # locationiq | opencage GEOCODER_PROVIDER=locationiq # locationiq | opencage

View file

@ -1,7 +1,9 @@
# fleettickets — INC ingestion image (Coolify-deployable). # fleettickets — INC + CRQ ticket ingestion image (Coolify-deployable).
# A small batch/cron worker: it has no web server. Coolify keeps the container # A small batch/cron worker: it has no web server. Coolify keeps the container
# running (CMD below) and fires the ingest via a Scheduled Task: # running (CMD below) and fires the ingests via two Scheduled Tasks:
# python import_tickets.py --from-bucket --apply (cron: 15 7-19 * * *) # python -m inc.import_inc --from-bucket --apply (cron: */20 6-20 * * *)
# python -m crq.import_crq --from-bucket --apply (cron: */20 6-20 * * *)
# (run from /app so the inc/ and crq/ packages + pipeline.py/shared.py import.)
# Env (set in Coolify): DATABASE_URL, RUSTFS_*, GEOCODER_*. S3 is via boto3 — no # Env (set in Coolify): DATABASE_URL, RUSTFS_*, GEOCODER_*. S3 is via boto3 — no
# aws CLI needed. psycopg2-binary ships its own libpq, so no build toolchain. # aws CLI needed. psycopg2-binary ships its own libpq, so no build toolchain.
FROM python:3.12-slim FROM python:3.12-slim

119
README.md
View file

@ -1,11 +1,22 @@
# fleettickets # fleettickets
Field-ops **INC ticket** ingestion, geocoding, and read-schema that powers the Field-ops **ticket** ingestion, geocoding, and read-schema that powers the
**Tickets** map in FleetOps. Extracted from the `tracksolid` repo into its own module **Tickets** map in FleetOps. Extracted from the `tracksolid` repo into its own module
(it previously lived there as migrations 2123 + `tools/import_tickets.py`). (it previously lived there as migrations 2123 + `tools/import_tickets.py`).
- **INC** — incident / customer-fault tickets *(this pipeline is **strictly INC**)* Two ticket types, identical 32-column source schema and CDC change stream, served
- **CRQ** — new-installation requests *(schema kept, but **out of scope** — not ingested here)* through a **shared engine** (`pipeline.py`) with a thin per-type entrypoint each:
- **INC** — incident / customer-fault tickets → `tickets.inc` (`inc/import_inc.py`).
Full feature set: typed columns, geocoding, SLA view, dashboard fn, history capture.
- **CRQ** — new-installation requests → `tickets.crq` (`crq/import_crq.py`). **Data
layer + map** (typed columns, geocoding, appears on the Tickets map via
`fn_tickets_for_map`). SLA view / dashboard fn / history capture are deferred —
installation-lifecycle semantics differ from incidents (see roadmap). CRQ gets its
**own FleetOps tab**, same look & feel as INC.
Geocoding is **cross-dataset** (one gazetteer, one geocoder budget, covers inc + crq)
and is driven from the INC entrypoint.
## What this owns ## What this owns
@ -21,7 +32,10 @@ Field-ops **INC ticket** ingestion, geocoding, and read-schema that powers the
| `migrations/08_inc_open_sla_view.sql` | `tickets.inc_open_sla` view — open (`is_actionable`) tickets with **derived SLA** (`hours_open`, `sla_state` vs 48h; clock = `created_at_service``first_seen_at`), plus team/cluster/`geog` for dispatch | | `migrations/08_inc_open_sla_view.sql` | `tickets.inc_open_sla` view — open (`is_actionable`) tickets with **derived SLA** (`hours_open`, `sla_state` vs 48h; clock = `created_at_service``first_seen_at`), plus team/cluster/`geog` for dispatch |
| `migrations/09_inc_dashboard_fn.sql` | `reporting.fn_inc_dashboard(cluster, status, window, from, to)` — one JSON payload (`window` / `open` GeoJSON / `closed` GeoJSON / `metrics` / `freshness`) powering the FleetOps live INC map. Open=live, closed=windowed (EAT calendar / custom); filters AND | | `migrations/09_inc_dashboard_fn.sql` | `reporting.fn_inc_dashboard(cluster, status, window, from, to)` — one JSON payload (`window` / `open` GeoJSON / `closed` GeoJSON / `metrics` / `freshness`) powering the FleetOps live INC map. Open=live, closed=windowed (EAT calendar / custom); filters AND |
| `migrations/10_inc_history_capture.sql` | History for time-series: `tickets.closure_events` (append-only observed closures) + `tickets.inc_daily_snapshot` (per-EAT-day open backlog + flow), populated by `tickets.capture_history()` each ingest. Unlocks **backlog-over-time** | | `migrations/10_inc_history_capture.sql` | History for time-series: `tickets.closure_events` (append-only observed closures) + `tickets.inc_daily_snapshot` (per-EAT-day open backlog + flow), populated by `tickets.capture_history()` each ingest. Unlocks **backlog-over-time** |
| `import_tickets.py` | Ingests the **newest INC CSV** from the rustfs `tickets` bucket (`automations/inc/<EAT-timestamp>.csv`) and upserts on `ticket_id`; geocodes clusters + INC locations | | `migrations/15_crq_table.sql` | **Materializes `tickets.crq`** (table + geom trigger + indexes — `01`'s crq section never ran on the live DB) and unpacks `raw` into the same **typed STORED generated columns** as INC's `03` (reuses `tickets.eat_ts()`). Brings CRQ to data-layer parity |
| `pipeline.py` | **Shared engine** — the dataset-agnostic CDC loader (drains `automations/<type>/changes/<EAT-ts>.csv` from the `isptickets` bucket, upserts on `ticket_id` oldest→newest, watermark + per-file archive) and the **cross-dataset** geocoder (clusters + actionable inc/crq locations) |
| `inc/import_inc.py` | INC entrypoint (`python -m inc.import_inc`) — INC `Dataset` config + CLI; runs `tickets.capture_history()` after each `--apply`; hosts the shared geocode commands |
| `crq/import_crq.py` | CRQ entrypoint (`python -m crq.import_crq`) — CRQ `Dataset` config + CLI (ingest only; no history hook yet) |
| `run_migrations.py` | Applies `migrations/*.sql` in order (ledger: `tickets.schema_migrations`) | | `run_migrations.py` | Applies `migrations/*.sql` in order (ledger: `tickets.schema_migrations`) |
| `shared.py` | Minimal DB/logging helpers (self-contained — no tracksolid dependency) | | `shared.py` | Minimal DB/logging helpers (self-contained — no tracksolid dependency) |
@ -86,50 +100,81 @@ python run_migrations.py # apply the schema (idempotent)
## Run ## Run
```bash Run from the repo root so the `inc`/`crq` packages + `pipeline.py`/`shared.py` import.
# ingest the newest INC CSV from the bucket (skip-if-unchanged, then archive)
python import_tickets.py --from-bucket --apply
# geocode (needs GEOCODER_API_KEY) ```bash
python import_tickets.py --geocode-clusters --apply # coarse, once # drain the incremental change streams (every new file oldest→newest, then archive)
python import_tickets.py --geocode-locations --apply # precise, actionable INC python -m inc.import_inc --from-bucket --apply
python -m crq.import_crq --from-bucket --apply
# geocode — CROSS-DATASET (covers inc + crq); driven from the INC entrypoint, needs GEOCODER_API_KEY
python -m inc.import_inc --geocode-clusters --apply # coarse, once
python -m inc.import_inc --geocode-locations --apply # precise, actionable inc+crq
# from a local CSV instead of the bucket (dev) # from a local CSV instead of the bucket (dev)
python import_tickets.py --inc-csv 2026-06-15T17-00-00.csv --apply python -m inc.import_inc --inc-csv 2026-06-15T17-00-00.csv --apply
python -m crq.import_crq --crq-csv 2026-06-24T12-55-44.csv --apply
``` ```
Dry-run is the default (omit `--apply`). `import_tickets.py --from-bucket` talks to S3 Dry-run is the default (omit `--apply`). `--from-bucket` talks to S3 via **boto3** using
via **boto3** using the `RUSTFS_*` env (path-style addressing; no aws-CLI dependency). the `RUSTFS_*` env (path-style addressing; no aws-CLI dependency).
## Deploy (Coolify) ## Deploy (Coolify)
The repo ships a [`Dockerfile`](Dockerfile) — a small batch worker with no web server. The repo ships a [`Dockerfile`](Dockerfile) — a small batch worker with no web server.
Coolify builds it and keeps the container alive (`CMD tail -f /dev/null`); the ingest Coolify builds it and keeps the container alive (`CMD tail -f /dev/null`); each ingest
runs as a **Scheduled Task**, not a system crontab: runs as its own **Scheduled Task**, not a system crontab:
- **Command:** `python import_tickets.py --from-bucket --apply` - **`inc_tickets`:** `python -m inc.import_inc --from-bucket --apply`
- **Frequency:** `15 7-19 * * *` (`:15` past each hour, **07:1519:15 EAT**). This - **`crq_tickets`:** `python -m crq.import_crq --from-bucket --apply`
- **Frequency:** both `*/20 6-20 * * *` (every 20 min, **06:0020:40 EAT**). This
Coolify instance runs scheduled tasks in **EAT (Africa/Nairobi)**, so no UTC Coolify instance runs scheduled tasks in **EAT (Africa/Nairobi)**, so no UTC
conversion is needed. conversion is needed.
- **Env vars** (Coolify → Environment Variables): `DATABASE_URL` (internal DB host), - **Env vars** (Coolify → Environment Variables): `DATABASE_URL` (internal DB host),
`RUSTFS_*`, `GEOCODER_*`. `RUSTFS_*` (the `isptickets` bucket credentials), `GEOCODER_*`. The same bucket holds
both `automations/inc/` and `automations/crq/`, so one credential set serves both tasks.
Skip-if-unchanged makes a run on an already-ingested snapshot a cheap no-op. The watermark makes a run with no new change files a cheap no-op.
For a plain host/VM instead of Coolify, [`run_ingest.sh`](run_ingest.sh) loads `.env` For a plain host/VM instead of Coolify, [`run_ingest.sh`](run_ingest.sh) loads `.env`
and runs the ingest; schedule it with a crontab line and runs **both** ingests; schedule it with a crontab line
(`CRON_TZ=Africa/Nairobi` / `15 7-19 * * *`). (`CRON_TZ=Africa/Nairobi` / `*/20 6-20 * * *`).
Full operational runbook — container, env management (encrypted; via the UI or
`artisan tinker`), the **Forgejo → Coolify auto-deploy webhook**, manual deploys, and the
source-bucket cutover procedure — is in
[`docs/deployment-and-operations.md`](docs/deployment-and-operations.md).
### Bucket cutover (one-time reseed)
When the source provider moves the feed to a new bucket (e.g. `tickets``isptickets`),
the stored watermark holds a key from the *old* bucket's stream, whose timestamp may be
newer than the new bucket's first file — which would otherwise be skipped. Point the
`RUSTFS_*` creds + `TICKETS_BUCKET` at the new bucket, then drain it once with `--reseed`,
which ignores the stored watermark and ingests **every** file in `changes/` oldest→newest:
```bash
python -m inc.import_inc --from-bucket --reseed # dry-run first (or -m crq.import_crq)
python -m inc.import_inc --from-bucket --reseed --apply # commit + archive
```
Upserts are idempotent (`ticket_id` PK, rows never deleted) and the new stream's periodic
full-state re-emissions re-assert current state, so this is non-destructive and converges
even across the cutover gap. After it, the watermark is current — resume normal
`--from-bucket --apply` runs (no `--reseed`). The old bucket is left untouched.
## Notes ## Notes
- The n8n export writes a **full current-state CSV per hour** to - The n8n export writes an **incremental CDC change stream** to
`automations/inc/<EAT-timestamp>.csv` — no `latest` pointer, no metadata envelope, no `automations/inc/changes/<EAT-timestamp>.csv`: a full-state baseline followed by files
deltas. The loader lists the prefix, takes the **newest** file, and ingests it. holding only the rows that changed (with periodic full-state re-emissions). No `latest`
- **Skip-if-unchanged:** the newest file's S3 **ETag** is compared to the last processed pointer, no metadata envelope. The loader drains **every not-yet-processed file
file's ETag (stored in `tickets.import_meta.metadata.source_etag`); if equal, the DB write oldest→newest** — taking only the newest would drop intermediate deltas.
is skipped (the export re-emits byte-identical content most hours). - **Watermark:** the newest file already applied is recorded in
`tickets.import_meta.metadata.source_max_key`; runs skip anything at/older than it, so
reruns are cheap no-ops. `--reseed` ignores it for a one-time bucket cutover.
- **Upsert on `ticket_id`** (PRIMARY KEY) — duplication is impossible; rows are never - **Upsert on `ticket_id`** (PRIMARY KEY) — duplication is impossible; rows are never
deleted, so closed-ticket history accumulates. On success the file is **moved** to deleted, so closed-ticket history accumulates. On success each file is **moved** to
`automations/inc/processed/`. `automations/inc/processed/`.
- **Cleaning at ingest:** drop `is_alarm=true` rows + the `EXPORT STOPPED…` sentinel; drop - **Cleaning at ingest:** drop `is_alarm=true` rows + the `EXPORT STOPPED…` sentinel; drop
`week_start`/`week_end`, `source_s3_*`/`source_snapshot_id`, `department`/`source_type`; `week_start`/`week_end`, `source_s3_*`/`source_snapshot_id`, `department`/`source_type`;
@ -192,8 +237,18 @@ Findings to keep in mind (see the PRD for detail):
## Status / roadmap ## Status / roadmap
Live: INC ingestion deployed on Coolify (hourly `15 7-19 * * *` EAT), schema + Live: INC ingestion deployed on Coolify (every 20 min `*/20 6-20 * * *` EAT), schema +
generated columns + geocoding + the `inc_open_sla` view in `tracksolid_db`. generated columns + geocoding + the `inc_open_sla` view in `tracksolid_db`.
Next (Phase 2): time-series analytics (closure rate, MTTR/SLA trends), then FleetNow
vehicle **dispatch** off `geog`, and **team closure attribution**. **CRQ** is a **CRQ (this milestone):** data layer + map — `tickets.crq` fed from
separate future project that will reuse this machinery against `automations/crq/`. `automations/crq/changes/` by `crq/import_crq.py`, the `tickets.crq` table + typed columns (migration 15),
cross-dataset geocoding, and visibility on the Tickets map via `fn_tickets_for_map`.
One-time seed: drain the isptickets CRQ stream (`python -m crq.import_crq --from-bucket
--apply`) — empty watermark + the stream's periodic full-state snapshots converge to
current state — then run the shared geocode once. See
[`docs/deployment-and-operations.md`](docs/deployment-and-operations.md).
Next (Phase 2): bring CRQ to full INC parity once installation-lifecycle semantics are
confirmed — a `crq_open_sla` view, `fn_crq_dashboard`, and CRQ history capture (the INC
analogues of migrations 08/09/10). Then time-series analytics (closure rate, MTTR/SLA
trends), FleetNow vehicle **dispatch** off `geog`, and **team closure attribution**.

0
crq/__init__.py Normal file
View file

61
crq/import_crq.py Normal file
View file

@ -0,0 +1,61 @@
"""
crq/import_crq.py Fireside Communications · CRQ (new-installation) ingestion.
Thin entrypoint over the shared engine (`pipeline.py`) for the CRQ dataset:
tickets.crq new-installation requests (FleetOps "Tickets" CRQ tab)
CRQ mirrors INC at the data layer IDENTICAL 32-column CSV schema and the same
incremental CDC change stream automations/crq/changes/<EAT-ts>.csv in the
`isptickets` bucket. This loader upserts on ticket_id, advances the per-dataset
watermark (tickets.import_meta dataset='crq'), and archives each consumed file to
automations/crq/processed/. CRQ flows onto the existing Tickets map via
reporting.fn_tickets_for_map (which already unions tickets.crq).
Scope (current): data layer + map only. CRQ has NO post-apply history capture yet
(installation-lifecycle SLA/backlog semantics differ from incidents a future
migration). Geocoding is CROSS-DATASET and run from the INC entrypoint
(python -m inc.import_inc --geocode-clusters / --geocode-locations) against the
shared gazetteer, which covers both inc and crq.
Usage (needs DATABASE_URL + RUSTFS_* env; see .env.example):
python -m crq.import_crq --from-bucket --apply
python -m crq.import_crq --from-bucket --reseed --apply # one-time bucket cutover
python -m crq.import_crq --crq-csv 2026-06-24T12-55-44.csv --apply
Pre-requisite: migrations applied (run_migrations.py) tickets.crq + its typed
columns (15_crq_table.sql) + geo_clusters/geo_locations + fn_tickets_for_map.
"""
from __future__ import annotations
import argparse
import pipeline
# CRQ has no post-apply hook yet (history capture is INC-only — see module docstring).
DATASET = pipeline.make_dataset("crq", post_apply=None)
def main() -> None:
ap = argparse.ArgumentParser(
description="Ingest CRQ (installation) tickets from CSV (raw-first)")
ap.add_argument("--apply", action="store_true", help="Write to DB (default: dry-run)")
ap.add_argument("--from-bucket", action="store_true",
help="Drain the incremental CRQ change stream (automations/crq/changes/) "
"from the isptickets S3 bucket: every not-yet-processed file "
"oldest→newest, upsert on ticket_id, advance the watermark, archive")
ap.add_argument("--reseed", action="store_true",
help="Ignore the stored watermark and drain every file in changes/ once "
"(one-time bucket cutover / reseed). Use with --from-bucket --apply")
ap.add_argument("--crq-csv", dest="local_csv", default=None,
help="Local CRQ tickets CSV file (dev)")
args = ap.parse_args()
if not (args.from_bucket or args.local_csv):
ap.error("provide --from-bucket or --crq-csv")
pipeline.ingest(DATASET, args)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,202 @@
# Deployment & Operations — fleettickets
Operational runbook for the INC + CRQ ingest pipelines as deployed on **Coolify**
(host `kianiadee@twala.rahamafresh.com`, key `~/.ssh/id_ed25519`). Covers the
container, environment, schedule, auto-deploy webhook, the source-bucket cutover
procedure, and verification. Secrets are referenced by **where to retrieve them**,
never by value.
> **One image, two datasets.** INC and CRQ share an identical 32-column source schema
> and the same `isptickets` bucket; they run as **two Scheduled Tasks** off the one
> container, via thin entrypoints `python -m inc.import_inc` / `python -m crq.import_crq`
> over the shared `pipeline.py` engine. Everything below applies to both unless noted.
## What's deployed
| Thing | Detail |
|---|---|
| Coolify app | **`fleettickets`** — id `15`, uuid `g14mwzo73q20g70vc6fzumya`, build pack `dockerfile`, git `main` |
| Container | built from this repo's `Dockerfile` (`python:3.12-slim`, `TZ=Africa/Nairobi`); kept alive with `tail -f /dev/null` (no web server) |
| Ingest (INC) | Coolify **Scheduled Task** `inc_tickets``python -m inc.import_inc --from-bucket --apply` |
| Ingest (CRQ) | Coolify **Scheduled Task** `crq_tickets``python -m crq.import_crq --from-bucket --apply` |
| DB | `tickets` schema in the shared `tracksolid_db` (internal host `timescale_db:5432`) |
| Source | **`isptickets`** S3 bucket, `automations/{inc,crq}/changes/<EAT-ts>.csv` CDC streams (see `../n8n-s3-ticket-exports.md` and `../README.md`) |
Resolve the live container name (Coolify appends a random suffix):
```bash
ssh -i ~/.ssh/id_ed25519 kianiadee@twala.rahamafresh.com \
'docker ps --filter name=g14mwzo73q20g70vc6fzumya --format "{{.Names}}" | head -1'
```
## Schedule (cron)
Both Scheduled Tasks (`inc_tickets`, `crq_tickets`) run **`*/20 6-20 * * *`** — every
20 min, **06:0020:40 EAT**. Coolify evaluates task cron in the server timezone
(`server_settings.server_timezone` = `Africa/Nairobi`), so **no UTC conversion** — write
EAT directly. The `--from-bucket` run is a cheap no-op when no new change file has arrived
(watermark guard, per dataset), so a dense schedule is safe.
To change the frequency, edit the task in the Coolify UI, or in `coolify-db`:
```sql
UPDATE scheduled_tasks SET frequency = '*/20 6-20 * * *', updated_at = now()
WHERE name IN ('inc_tickets', 'crq_tickets');
```
The `crq_tickets` task is added the same way INC was — in the Coolify UI (Scheduled Tasks
→ Add) with command `python -m crq.import_crq --from-bucket --apply`, container
`fleettickets`, cron `*/20 6-20 * * *`.
Coolify's scheduler re-reads `scheduled_tasks` each minute, so the change is picked up
without a redeploy. Execution history: `scheduled_task_executions`.
> The repo's `Dockerfile`, `run_ingest.sh`, and `README.md` document this same cron for
> the plain-host/VM fallback (`CRON_TZ=Africa/Nairobi`).
## Environment variables
Set on the Coolify app (Environment Variables). Names only — values live in Coolify:
| Var | Purpose |
|---|---|
| `DATABASE_URL` | `tracksolid_db` (internal `timescale_db:5432`) |
| `RUSTFS_ENDPOINT` | `https://s3.rahamafresh.com` |
| `RUSTFS_ACCESS_KEY` / `RUSTFS_SECRET_KEY` | `isptickets` bucket credentials |
| `RUSTFS_REGION` | `us-east-1` |
| `TICKETS_BUCKET` | `isptickets` |
| `GEOCODER_PROVIDER` / `GEOCODER_API_KEY` | keyed geocoder (LocationIQ/OpenCage) |
**Env vars are Laravel-encrypted in `coolify-db` — never raw-`UPDATE` them.** Change them
in the Coolify UI, or via `artisan tinker` (which re-encrypts on save):
```bash
ssh -i ~/.ssh/id_ed25519 kianiadee@twala.rahamafresh.com 'docker exec -i coolify php artisan tinker' <<'PHP'
$e = \App\Models\EnvironmentVariable::where('resourceable_type','App\\Models\\Application')
->where('resourceable_id',15)->where('key','TICKETS_BUCKET')->first();
$e->value = 'isptickets'; $e->save(); echo $e->value.PHP_EOL;
PHP
```
An env change only takes effect after the container is **recreated** (a redeploy — see below),
since Coolify injects env at container create time.
## Deploys
### Auto-deploy (Forgejo → Coolify webhook)
A push to `main` should auto-deploy. This needs **both** the Coolify per-app Auto-Deploy
toggle (Configuration → Advanced) **and** a webhook on the Forgejo repo. The webhook was
missing originally (the toggle alone is not enough); it now exists as hook id `3` on
`kianiadee/fleettickets`:
| Field | Value |
|---|---|
| URL | `https://stage.rahamafresh.com/webhooks/source/gitea/events/manual` |
| Type / content-type | `gitea` / `json` |
| Events / branch filter | `push` / `main` |
| Secret | the app's `manual_webhook_secret_gitea` (Coolify HMAC-validates `X-Hub-Signature-256`) |
Recreate / inspect it via the Forgejo API (auth: `git credential fill`, host
`repo.rahamafresh.com`, basic auth to `/api/v1` — no `tea`/`gh` needed). Get the secret by
decrypting it in Coolify:
```bash
ssh -i ~/.ssh/id_ed25519 kianiadee@twala.rahamafresh.com \
"docker exec -i coolify php artisan tinker --execute=\"echo \\App\\Models\\Application::find(15)->manual_webhook_secret_gitea;\""
```
```bash
# list / test the webhook (USER:PASS from git credential fill)
curl -s -u "$USER:$PASS" https://repo.rahamafresh.com/api/v1/repos/kianiadee/fleettickets/hooks
curl -s -u "$USER:$PASS" -X POST https://repo.rahamafresh.com/api/v1/repos/kianiadee/fleettickets/hooks/3/tests
```
A successful test shows a webhook hit in `docker logs coolify` (no `invalid_signature`
audit) and a new row in `application_deployment_queues`.
### Manual deploy (no push)
Trigger the same action as Coolify's Deploy button via tinker:
```bash
ssh -i ~/.ssh/id_ed25519 kianiadee@twala.rahamafresh.com 'docker exec -i coolify php artisan tinker' <<'PHP'
$app = \App\Models\Application::where('uuid','g14mwzo73q20g70vc6fzumya')->first();
$uuid = new \Visus\Cuid2\Cuid2;
echo json_encode(queue_application_deployment(
application: $app, deployment_uuid: $uuid, force_rebuild: false, is_api: true)).PHP_EOL;
echo $uuid.PHP_EOL;
PHP
```
Watch it: `SELECT id, status, created_at FROM application_deployment_queues WHERE
application_id = '15' ORDER BY created_at DESC LIMIT 3;` (note: `application_id` is the
**numeric id stored as text**).
## Source-bucket cutover (when the provider moves buckets)
If the provider moves the INC feed to a new bucket (as happened `tickets``isptickets`,
2026-06-25):
1. **Inspect** the new bucket (read-only) — confirm `automations/{inc,crq}/changes/` layout,
timestamp range, schema parity.
2. **Update env** (UI or tinker): `RUSTFS_ACCESS_KEY`, `RUSTFS_SECRET_KEY`,
`TICKETS_BUCKET` → the new bucket (endpoint usually unchanged). Both datasets read the
same bucket, so one env change serves both tasks.
3. **Reconcile the DB** to current. The loader drains every `changes/` file newer than the
watermark (`tickets.import_meta.metadata.source_max_key`, **per dataset**), oldest→newest,
upserting on `ticket_id`:
- If the watermark **predates** the new bucket's first file, a normal
`--from-bucket --apply` drains the whole new stream — no reseed needed.
- Otherwise use **`--reseed`** (ignores the watermark, drains all `changes/` once):
`python -m inc.import_inc --from-bucket --reseed --apply` (see README "Bucket cutover").
The new stream's periodic full-state re-emissions make this converge even across the
cutover gap. Idempotent upserts + never-delete make it non-destructive.
- For a one-off, you can run it in the live container with the new creds inlined:
`docker exec -e TICKETS_BUCKET=… -e RUSTFS_ACCESS_KEY=… -e RUSTFS_SECRET_KEY=… <container>
sh -c "cd /app && python -m inc.import_inc --from-bucket --apply"`.
4. **Re-geocode** new clusters/locations: `python -m inc.import_inc --geocode-clusters --apply`
then `--geocode-locations --apply` (cross-dataset; existing gazetteer persists; only new
keys are looked up).
5. **Redeploy** so the Scheduled Task's container picks up the new env (push `main` → webhook,
or manual deploy). Old bucket is left untouched for rollback.
## Bringing CRQ online (one-time seed)
CRQ was added 2026-06-25 (data layer + map). Migration `15_crq_table.sql` **creates**
`tickets.crq` (the live DB's `01` predated its crq section, so the table never existed)
plus the typed columns. To seed it from zero on the live DB — once the code + migration are
applied (`run_migrations.py`; on the live cutover it was applied out-of-band via the running
container, see below):
1. **Verify** the migration applied: `SELECT 1 FROM tickets.schema_migrations WHERE
filename='15_crq_table.sql';` and `\d tickets.crq` shows the table + typed columns.
2. **Seed** from isptickets (empty `crq` watermark → drains all `automations/crq/changes/`
files oldest→newest; the stream's periodic full-state snapshots converge to current
state — same convergence the INC cutover relied on, so **no `--reseed` needed**):
```bash
python -m crq.import_crq --from-bucket # dry-run first ("N of N change file(s)…")
python -m crq.import_crq --from-bucket --apply # commit + archive to crq/processed/
```
(Or in the live container with `docker exec … sh -c "cd /app && python -m crq.import_crq
--from-bucket --apply"`.)
3. **Geocode** (cross-dataset; most clusters already resolved from INC, so few new lookups):
`python -m inc.import_inc --geocode-clusters --apply` then `--geocode-locations --apply`.
4. **Confirm** CRQ on the map: `SELECT reporting.fn_tickets_for_map() -> 'summary';` shows a
non-zero `crq` count. The `crq_tickets` Scheduled Task then keeps it current.
## Verification
```bash
DB=$(docker ps --filter name=timescale_db --format "{{.Names}}" | head -1)
docker exec -i "$DB" psql -U postgres -d tracksolid_db <<'SQL'
-- watermark + freshness
SELECT export_type, records_ingested, ingested_at, metadata->>'source_max_key'
FROM tickets.import_meta WHERE dataset='inc';
-- counts
SELECT count(*) total_inc,
count(*) FILTER (WHERE (raw->>'is_actionable')::boolean) AS open
FROM tickets.inc;
-- map payload sanity
SELECT reporting.fn_tickets_for_map() -> 'summary' ->> 'ticket_count';
SQL
```
- New bucket `changes/` empties as files move to `automations/inc/processed/`.
- A plain `--from-bucket --apply` reports "nothing new" until the next change file lands.
- FleetOps Tickets map freshness reflects the new `ingested_at`.
## Rollback
- **Bucket:** revert the three env vars to the old bucket + creds and redeploy. The old
bucket and its `processed/` history are untouched; upserts are idempotent and rows are
never deleted, so re-running is safe.
- **Cron:** `UPDATE scheduled_tasks SET frequency = <old> WHERE name='inc_tickets';`

View file

@ -3,14 +3,25 @@
What is actually built and deployed, as of the Phase-1 completion. Companion to What is actually built and deployed, as of the Phase-1 completion. Companion to
`docs/phase-1-ingestion.md` (plan) and `docs/phase-2-dashboard.md` (next). `docs/phase-1-ingestion.md` (plan) and `docs/phase-2-dashboard.md` (next).
## Pipeline (`import_tickets.py`) ## Pipeline (`pipeline.py` engine + `inc/`,`crq/` entrypoints)
- **Source:** newest `automations/inc/<EAT-timestamp>.csv` in the rustfs `tickets` The dataset-agnostic CDC engine lives in **`pipeline.py`**, parameterized by a small
bucket (endpoint `https://s3.rahamafresh.com`, path-style, region `us-east-1`). `Dataset` config (name, table, `automations/<type>/changes|processed/` prefixes, key
regex, optional `post_apply` hook). Two thin entrypoints supply that config and the CLI:
**`inc/import_inc.py`** (`python -m inc.import_inc`, `post_apply=capture_history`) and
**`crq/import_crq.py`** (`python -m crq.import_crq`, no history hook). INC and CRQ share an
**identical 32-column source schema**, so the engine is fully shared; geocoding is
**cross-dataset** (one gazetteer/budget, unions `tickets.inc` + `tickets.crq`) and is run
from the INC entrypoint.
- **Source:** the incremental CDC stream `automations/<inc|crq>/changes/<EAT-timestamp>.csv`
in the **`isptickets`** S3 bucket (endpoint `https://s3.rahamafresh.com`, path-style,
region `us-east-1`; was the `tickets` bucket before the 2026-06-25 cutover).
- **S3 access via boto3** (no aws-CLI dependency): `list_objects_v2` (paginator), - **S3 access via boto3** (no aws-CLI dependency): `list_objects_v2` (paginator),
`get_object`, `copy_object` + `delete_object` for archiving. `get_object`, `copy_object` + `delete_object` for archiving.
- **Skip-if-unchanged:** newest S3 **ETag** vs `tickets.import_meta.metadata.source_etag`; - **Watermark:** drains every `changes/` file newer than
equal → skip the DB write (the export re-emits identical content most hours). `tickets.import_meta.metadata.source_max_key`, oldest→newest; reruns with no new file
are a cheap no-op. `--reseed` ignores the watermark for a one-time bucket cutover.
- **Cleaning:** drop `is_alarm=true` rows + the `EXPORT STOPPED…` sentinel; drop - **Cleaning:** drop `is_alarm=true` rows + the `EXPORT STOPPED…` sentinel; drop
`week_start`/`week_end`, `source_s3_bucket`/`source_s3_key`/`source_snapshot_id`, `week_start`/`week_end`, `source_s3_bucket`/`source_s3_key`/`source_snapshot_id`,
`department`, `source_type`; normalize `region`→lowercase, `raw_status`→UPPERCASE. `department`, `source_type`; normalize `region`→lowercase, `raw_status`→UPPERCASE.
@ -22,8 +33,11 @@ What is actually built and deployed, as of the Phase-1 completion. Companion to
- **History capture:** after each `--apply` run (ingest or skip), calls - **History capture:** after each `--apply` run (ingest or skip), calls
`tickets.capture_history()` → appends new closures + upserts today's backlog `tickets.capture_history()` → appends new closures + upserts today's backlog
snapshot. snapshot.
- CLI: `--from-bucket` (newest INC csv), `--inc-csv <file>` (local dev), `--apply` - CLI (`inc`): `--from-bucket` (drain the INC change stream), `--reseed` (ignore the
(else dry-run), `--geocode-clusters`, `--geocode-locations`, `--capture-history`. watermark; one-time bucket cutover), `--inc-csv <file>` (local dev), `--apply` (else
dry-run), `--geocode-clusters`, `--geocode-locations`, `--capture-history`.
- CLI (`crq`): `--from-bucket`, `--reseed`, `--crq-csv <file>`, `--apply` (ingest only;
geocoding + history are not on the CRQ entrypoint).
## Schema / migrations (`tracksolid_db`, applied via `run_migrations.py`) ## Schema / migrations (`tracksolid_db`, applied via `run_migrations.py`)
@ -39,6 +53,8 @@ What is actually built and deployed, as of the Phase-1 completion. Companion to
| 08_inc_open_sla_view | `tickets.inc_open_sla` view (open tickets + derived SLA) | | 08_inc_open_sla_view | `tickets.inc_open_sla` view (open tickets + derived SLA) |
| 09_inc_dashboard_fn | **built**`reporting.fn_inc_dashboard(cluster, status, window, from, to)`: one JSON payload (open GeoJSON + windowed closed GeoJSON + metrics + freshness) for the FleetOps live INC map. See `docs/phase-2-dashboard.md` | | 09_inc_dashboard_fn | **built**`reporting.fn_inc_dashboard(cluster, status, window, from, to)`: one JSON payload (open GeoJSON + windowed closed GeoJSON + metrics + freshness) for the FleetOps live INC map. See `docs/phase-2-dashboard.md` |
| 10_inc_history_capture | **built**`tickets.closure_events` (append-only observed closures) + `tickets.inc_daily_snapshot` (per-EAT-day open backlog + flow) + `tickets.capture_history()`; the ingest calls it each `--apply` run. Unlocks backlog-over-time | | 10_inc_history_capture | **built**`tickets.closure_events` (append-only observed closures) + `tickets.inc_daily_snapshot` (per-EAT-day open backlog + flow) + `tickets.capture_history()`; the ingest calls it each `--apply` run. Unlocks backlog-over-time |
| 12_inc_dashboard_by_owner | **built** — owner/team breakdown extension to `fn_inc_dashboard` |
| 15_crq_table | **built** — materializes `tickets.crq` (table + geom trigger + indexes; `01`'s crq section never ran on the live DB) + the typed STORED generated columns from `03` (reuses `tickets.eat_ts()`). Data-layer parity for the CRQ tab |
`tickets.inc` columns: `ticket_id` (PK), `raw` (jsonb, source of truth), `tickets.inc` columns: `ticket_id` (PK), `raw` (jsonb, source of truth),
`normalized_status`/`raw_status`, `bucket`, `is_actionable`, `cluster`/`region`/ `normalized_status`/`raw_status`, `bucket`, `is_actionable`, `cluster`/`region`/
@ -53,11 +69,16 @@ What is actually built and deployed, as of the Phase-1 completion. Companion to
- **Coolify** app built from this repo's `Dockerfile` (`python:3.12-slim`, - **Coolify** app built from this repo's `Dockerfile` (`python:3.12-slim`,
`TZ=Africa/Nairobi`, keep-alive `tail -f /dev/null`). Separate from the FleetOps `TZ=Africa/Nairobi`, keep-alive `tail -f /dev/null`). Separate from the FleetOps
web app (`fleet-ops-staging`). web app (`fleet-ops-staging`).
- **Scheduled Task:** `python import_tickets.py --from-bucket --apply`, cron - **Scheduled Tasks (two):** `inc_tickets` → `python -m inc.import_inc --from-bucket
`15 7-19 * * *` in **EAT** (Coolify runs tasks in EAT — no UTC conversion). --apply` and `crq_tickets``python -m crq.import_crq --from-bucket --apply`, both cron
- **Env vars** (Coolify): `DATABASE_URL` (internal DB host), `RUSTFS_*`, `GEOCODER_*`. `*/20 6-20 * * *` in **EAT** (Coolify runs tasks in EAT — no UTC conversion).
- **Env vars** (Coolify): `DATABASE_URL` (internal DB host), `RUSTFS_*`
(`isptickets` bucket — serves both inc + crq), `GEOCODER_*`.
- For a plain host/VM, `run_ingest.sh` + a crontab line is the alternative. - For a plain host/VM, `run_ingest.sh` + a crontab line is the alternative.
Full ops runbook (env management, the Forgejo → Coolify auto-deploy webhook, manual
deploys, bucket cutover, verification): **`docs/deployment-and-operations.md`**.
## State at hand-off ## State at hand-off
- `tickets.inc` ≈ 21,312 rows (current non-alarm INC + a few aged-out history rows); - `tickets.inc` ≈ 21,312 rows (current non-alarm INC + a few aged-out history rows);
@ -86,5 +107,12 @@ Phase 2 (built): `fn_inc_dashboard` read-API → FleetOps live map (open + close
overlay + metrics); history capture (`closure_events` + `inc_daily_snapshot`) for overlay + metrics); history capture (`closure_events` + `inc_daily_snapshot`) for
backlog/closure trends. Remaining: `dashboard_api` endpoint + FleetOps SPA (other backlog/closure trends. Remaining: `dashboard_api` endpoint + FleetOps SPA (other
repos; see `docs/dashboard-api-contract.md`), FleetNow **dispatch** off `geog`, repos; see `docs/dashboard-api-contract.md`), FleetNow **dispatch** off `geog`,
**team closure attribution**. **CRQ** = separate future project reusing this **team closure attribution**.
machinery against `automations/crq/`.
**CRQ** (this milestone): the shared engine now feeds `tickets.crq` from
`automations/crq/changes/` (`crq/import_crq.py`), with the `tickets.crq` table + typed columns (migration 15) and
cross-dataset geocoding — CRQ shows on the Tickets map via `fn_tickets_for_map` (which
already unions it) and gets its own FleetOps tab. Deferred to a follow-up once
installation-lifecycle semantics are confirmed: the CRQ analogues of migrations
08/09/10 — `crq_open_sla`, `fn_crq_dashboard`, and CRQ history capture (`tickets.crq`
currently has **no** `post_apply` hook).

View file

@ -19,7 +19,9 @@ tickets to our S3-compatible bucket **every hour**:
- `automations/inc/<EAT-timestamp>.csv`**incidents / customer faults** *(in scope)* - `automations/inc/<EAT-timestamp>.csv`**incidents / customer faults** *(in scope)*
- `automations/crq/<EAT-timestamp>.csv` — new-installation requests *(out of scope)* - `automations/crq/<EAT-timestamp>.csv` — new-installation requests *(out of scope)*
(See `n8n-hourly-s3-full-data-exports.md`. Sample: `2026-06-15T17-00-00.csv`.) (See `n8n-s3-ticket-exports.md`. Sample: `2026-06-15T17-00-00.csv`. Note: the
source later switched to an incremental `automations/inc/changes/` stream — that
doc has the current layout; this PRD records the original Phase-1 model.)
`fleettickets` owns the **downstream**: the `tickets` schema in the shared `fleettickets` owns the **downstream**: the `tickets` schema in the shared
`tracksolid_db` (raw-jsonb-first `tickets.inc`, geocoding gazetteers, and `tracksolid_db` (raw-jsonb-first `tickets.inc`, geocoding gazetteers, and
@ -80,6 +82,11 @@ Deployed on **Coolify** (own app, `Dockerfile`, keep-alive worker). Ingest runs
**Scheduled Task**: `python import_tickets.py --from-bucket --apply`, cron **Scheduled Task**: `python import_tickets.py --from-bucket --apply`, cron
`15 7-19 * * *` in **EAT**. Env: `DATABASE_URL`, `RUSTFS_*`, `GEOCODER_*`. `15 7-19 * * *` in **EAT**. Env: `DATABASE_URL`, `RUSTFS_*`, `GEOCODER_*`.
> **Superseded** (historical Phase-1 plan). As built: the loader is now the shared
> `pipeline.py` engine with thin entrypoints (`python -m inc.import_inc` / `-m
> crq.import_crq`), running as **two** Scheduled Tasks at cron `*/20 6-20 * * *`. See
> `implementation.md` and `deployment-and-operations.md`.
## Data-quality findings (carried into Phase 2) ## Data-quality findings (carried into Phase 2)
- Source `sla_status` ≠ a plain 48h rule, and `mttr` is not wall-clock — pin the - Source `sla_status` ≠ a plain 48h rule, and `mttr` is not wall-clock — pin the

0
inc/__init__.py Normal file
View file

74
inc/import_inc.py Normal file
View file

@ -0,0 +1,74 @@
"""
inc/import_inc.py Fireside Communications · INC (incident / fault) ingestion.
Thin entrypoint over the shared engine (`pipeline.py`) for the INC dataset:
tickets.inc incidents / customer faults (FleetOps "Tickets" INC tab)
INC reads the incremental CDC change stream automations/inc/changes/<EAT-ts>.csv
from the `isptickets` bucket, upserts on ticket_id, advances the per-dataset
watermark, archives each file to automations/inc/processed/, and uniquely to
INC runs tickets.capture_history() after each --apply run (closure_events +
daily backlog snapshot). Geocoding (--geocode-clusters / --geocode-locations) is
CROSS-DATASET and driven from here (the shared gazetteer covers inc + crq).
Usage (needs DATABASE_URL + RUSTFS_* + GEOCODER_* env; see .env.example):
python -m inc.import_inc --from-bucket --apply
python -m inc.import_inc --from-bucket --reseed --apply # one-time bucket cutover
python -m inc.import_inc --inc-csv 2026-06-15T17-00-00.csv --apply
python -m inc.import_inc --geocode-clusters --apply
python -m inc.import_inc --geocode-locations --apply
Pre-requisite: migrations applied (run_migrations.py) tickets.inc/crq +
geo_clusters + geo_locations + reporting.fn_tickets_for_map.
"""
from __future__ import annotations
import argparse
import pipeline
# INC captures closure/backlog history after every --apply run (CRQ does not yet).
DATASET = pipeline.make_dataset("inc", post_apply=pipeline.capture_history)
def main() -> None:
ap = argparse.ArgumentParser(description="Ingest INC tickets from CSV (raw-first) + geocode")
ap.add_argument("--apply", action="store_true", help="Write to DB (default: dry-run)")
ap.add_argument("--from-bucket", action="store_true",
help="Drain the incremental INC change stream (automations/inc/changes/) "
"from the isptickets S3 bucket: every not-yet-processed file "
"oldest→newest, upsert on ticket_id, advance the watermark, archive")
ap.add_argument("--reseed", action="store_true",
help="Ignore the stored watermark and drain every file in changes/ once "
"(one-time bucket cutover / reseed). Use with --from-bucket --apply")
ap.add_argument("--inc-csv", dest="local_csv", default=None,
help="Local INC tickets CSV file (dev)")
ap.add_argument("--geocode-clusters", action="store_true",
help="Geocode distinct clusters (inc+crq) into the gazetteer, then re-resolve")
ap.add_argument("--geocode-locations", action="store_true",
help="Geocode actionable inc+crq location_names precisely (keyed provider), "
"then re-resolve")
ap.add_argument("--capture-history", action="store_true",
help="Run tickets.capture_history() standalone "
"(closure_events + daily snapshot)")
args = ap.parse_args()
if args.geocode_clusters:
pipeline.geocode_clusters(apply=args.apply)
return
if args.geocode_locations:
pipeline.geocode_locations(apply=args.apply)
return
if args.capture_history:
pipeline.capture_history()
return
if not (args.from_bucket or args.local_csv):
ap.error("provide --from-bucket, --inc-csv, --geocode-clusters, "
"--geocode-locations, or --capture-history")
pipeline.ingest(DATASET, args)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,89 @@
-- 13_inc_search_fn.sql — fleettickets · INC ticket explorer (search) function
-- ─────────────────────────────────────────────────────────────────────────────
-- reporting.fn_inc_search — ad-hoc ticket lookup by id / engineer / cluster /
-- status / state / time, for the FleetOps "Ticket explorer" card. Returns
-- { count, truncated, limit, state, rows }. Consumed by dashboard_api
-- GET /webhook/inc-search.
--
-- RECOVERED INTO VERSION CONTROL 2026-06-26: this migration was applied to the live
-- DB on 2026-06-19 but the file was never committed. Recovered verbatim from the live
-- definition (pg_get_functiondef) so a fresh DB rebuilds faithfully; the live ledger
-- already lists it, so run_migrations skips it there. The crq mirror is in 16.
-- Idempotent (CREATE OR REPLACE).
-- ─────────────────────────────────────────────────────────────────────────────
SET search_path = tickets, public;
CREATE OR REPLACE FUNCTION reporting.fn_inc_search(
p_ticket_id text DEFAULT NULL,
p_owner text DEFAULT NULL,
p_cluster text DEFAULT NULL,
p_status text DEFAULT NULL,
p_state text DEFAULT 'closed',
p_from timestamptz DEFAULT NULL,
p_to timestamptz DEFAULT NULL,
p_limit integer DEFAULT 500
)
RETURNS jsonb LANGUAGE plpgsql STABLE AS $function$
DECLARE
v_state text := lower(COALESCE(NULLIF(p_state, ''), 'closed'));
v_limit integer := LEAST(GREATEST(COALESCE(p_limit, 500), 1), 5000);
v_result jsonb;
BEGIN
p_ticket_id := NULLIF(trim(p_ticket_id), '');
p_owner := NULLIF(trim(p_owner), '');
p_cluster := NULLIF(p_cluster, '');
p_status := NULLIF(p_status, '');
WITH hits AS (
SELECT ticket_id, normalized_status, cluster, region, location_name,
initcap(lower(NULLIF(owner, ''))) AS owner, assigned_team,
sla_status, mttr, closed_at, created_at_service, is_actionable,
CASE WHEN geom IS NOT NULL THEN ST_Y(geom) END AS lat,
CASE WHEN geom IS NOT NULL THEN ST_X(geom) END AS lng
FROM tickets.inc
WHERE (p_ticket_id IS NULL OR ticket_id ILIKE '%' || p_ticket_id || '%')
AND (p_owner IS NULL OR lower(owner) LIKE '%' || lower(p_owner) || '%')
AND (p_cluster IS NULL OR cluster = p_cluster)
AND (p_status IS NULL OR normalized_status = p_status)
AND CASE v_state
WHEN 'open' THEN COALESCE(is_actionable, false)
WHEN 'all' THEN COALESCE(is_actionable, false)
OR (closed_at IS NOT NULL
AND (p_from IS NULL OR closed_at >= p_from)
AND (p_to IS NULL OR closed_at < p_to))
ELSE NOT COALESCE(is_actionable, false) -- 'closed'
AND closed_at IS NOT NULL
AND (p_from IS NULL OR closed_at >= p_from)
AND (p_to IS NULL OR closed_at < p_to)
END
),
total AS (SELECT count(*) AS n FROM hits),
page AS (
SELECT * FROM hits
ORDER BY closed_at DESC NULLS LAST, created_at_service DESC NULLS LAST
LIMIT v_limit
)
SELECT jsonb_build_object(
'count', (SELECT n FROM total),
'truncated', (SELECT n FROM total) > v_limit,
'limit', v_limit,
'state', v_state,
'rows', COALESCE((SELECT jsonb_agg(to_jsonb(page)
ORDER BY page.closed_at DESC NULLS LAST,
page.created_at_service DESC NULLS LAST)
FROM page), '[]'::jsonb)
) INTO v_result;
RETURN v_result;
END $function$;
DO $grants$
BEGIN
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'dashboard_ro') THEN
GRANT EXECUTE ON FUNCTION reporting.fn_inc_search(text, text, text, text, text, timestamptz, timestamptz, integer) TO dashboard_ro;
END IF;
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'grafana_ro') THEN
GRANT EXECUTE ON FUNCTION reporting.fn_inc_search(text, text, text, text, text, timestamptz, timestamptz, integer) TO grafana_ro;
END IF;
END $grants$;

View file

@ -0,0 +1,37 @@
-- 14_inc_filter_options.sql — fleettickets · INC explorer dropdown options
-- ─────────────────────────────────────────────────────────────────────────────
-- reporting.fn_inc_filter_options — distinct engineers (owner), clusters, and the
-- ids of currently-open tickets, for the FleetOps "Ticket explorer" dropdowns.
-- Consumed by dashboard_api GET /webhook/inc-filter-options.
--
-- RECOVERED INTO VERSION CONTROL 2026-06-26: applied to the live DB 2026-06-19 but
-- never committed. Recovered verbatim from the live definition so a fresh DB rebuilds
-- faithfully; the live ledger already lists it (run_migrations skips it there). The crq
-- mirror is in 16. Idempotent (CREATE OR REPLACE).
-- ─────────────────────────────────────────────────────────────────────────────
SET search_path = tickets, public;
CREATE OR REPLACE FUNCTION reporting.fn_inc_filter_options()
RETURNS jsonb LANGUAGE sql STABLE AS $function$
SELECT jsonb_build_object(
'owners', (SELECT COALESCE(jsonb_agg(o ORDER BY o), '[]'::jsonb)
FROM (SELECT DISTINCT initcap(lower(NULLIF(owner, ''))) AS o
FROM tickets.inc WHERE NULLIF(owner, '') IS NOT NULL) s),
'clusters', (SELECT COALESCE(jsonb_agg(c ORDER BY c), '[]'::jsonb)
FROM (SELECT DISTINCT cluster AS c
FROM tickets.inc WHERE NULLIF(cluster, '') IS NOT NULL) s),
'open_ticket_ids', (SELECT COALESCE(jsonb_agg(ticket_id ORDER BY ticket_id), '[]'::jsonb)
FROM tickets.inc WHERE COALESCE(is_actionable, false))
);
$function$;
DO $grants$
BEGIN
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'dashboard_ro') THEN
GRANT EXECUTE ON FUNCTION reporting.fn_inc_filter_options() TO dashboard_ro;
END IF;
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'grafana_ro') THEN
GRANT EXECUTE ON FUNCTION reporting.fn_inc_filter_options() TO grafana_ro;
END IF;
END $grants$;

101
migrations/15_crq_table.sql Normal file
View file

@ -0,0 +1,101 @@
-- 15_crq_table.sql — fleettickets · materialize tickets.crq + typed columns
-- ─────────────────────────────────────────────────────────────────────────────
-- Why a NEW migration (not an edit to 01): `01_tickets_schema.sql` was applied to the
-- live DB on 2026-06-15 from a version that PREDATED its `tickets.crq` section, so the
-- IF-NOT-EXISTS ledger guard has kept crq from ever being created there — even though
-- the live `reporting.fn_tickets_for_map` and `tickets.resolve_ticket_geoms` already
-- reference it (they error if called until crq exists). This migration creates
-- `tickets.crq` self-containedly (table + geom trigger + indexes) and adds the same
-- typed STORED generated columns INC got in `03_inc_columns.sql`, bringing CRQ to
-- data-layer parity.
--
-- Deterministic + idempotent — converges to the same shape on BOTH:
-- • the live DB (crq missing) -> CREATE makes it, ALTER adds typed cols
-- • a fresh DB (crq minimal, from 01) -> CREATE skipped, ALTER adds typed cols
-- Reuses shared objects already present: tickets.tg_ticket_geom() (01),
-- tickets.norm_cluster() (01), tickets.eat_ts() (03).
--
-- NOTE: the live DB also carries un-versioned migrations 13_inc_search_fn.sql /
-- 14_inc_filter_options.sql (applied 2026-06-19, absent from this repo) — INC dashboard
-- functions, unrelated to CRQ. Numbered 15 here to sit cleanly after the live ledger.
-- ─────────────────────────────────────────────────────────────────────────────
SET search_path = tickets, public;
-- ── table (base shape mirrors tickets.inc's original 01 base) ────────────────
CREATE TABLE IF NOT EXISTS tickets.crq (
ticket_id text PRIMARY KEY,
raw jsonb NOT NULL,
geom geometry(Point, 4326),
geo_source text, -- 'feed' | 'location' | 'cluster' | 'none'
ingested_at timestamptz NOT NULL DEFAULT now()
);
-- ── geom trigger — read from raw; shared tickets.tg_ticket_geom() (from 01) ───
DROP TRIGGER IF EXISTS trg_crq_geom ON tickets.crq;
CREATE TRIGGER trg_crq_geom BEFORE INSERT OR UPDATE ON tickets.crq
FOR EACH ROW EXECUTE FUNCTION tickets.tg_ticket_geom();
-- ── raw-based indexes (mirror 01's inc/crq set) ──────────────────────────────
CREATE INDEX IF NOT EXISTS ix_crq_status_raw ON tickets.crq ((raw->>'normalized_status'));
CREATE INDEX IF NOT EXISTS ix_crq_actionable_raw ON tickets.crq (((raw->>'is_actionable')::boolean))
WHERE (raw->>'is_actionable')::boolean;
CREATE INDEX IF NOT EXISTS ix_crq_cluster_raw ON tickets.crq (tickets.norm_cluster(raw->>'cluster'));
CREATE INDEX IF NOT EXISTS ix_crq_loc_raw ON tickets.crq (tickets.norm_cluster(raw->>'location_name'));
CREATE INDEX IF NOT EXISTS ix_crq_geom ON tickets.crq USING gist (geom);
-- ── typed STORED generated columns (mirror of 03_inc_columns.sql) ────────────
-- Computed for ALL existing rows on creation + auto-recomputed on every insert/update;
-- `raw` stays the source of truth. tickets.eat_ts() (EAT->timestamptz, IMMUTABLE) is
-- reused from 03 — see that file's note on why IMMUTABLE is safe for Kenya (UTC+3, no DST).
ALTER TABLE tickets.crq
-- text
ADD COLUMN IF NOT EXISTS service_type text GENERATED ALWAYS AS (raw->>'service_type') STORED,
ADD COLUMN IF NOT EXISTS bucket text GENERATED ALWAYS AS (raw->>'bucket') STORED,
ADD COLUMN IF NOT EXISTS raw_status text GENERATED ALWAYS AS (raw->>'raw_status') STORED,
ADD COLUMN IF NOT EXISTS normalized_status text GENERATED ALWAYS AS (raw->>'normalized_status') STORED,
ADD COLUMN IF NOT EXISTS cluster text GENERATED ALWAYS AS (raw->>'cluster') STORED,
ADD COLUMN IF NOT EXISTS region text GENERATED ALWAYS AS (raw->>'region') STORED,
ADD COLUMN IF NOT EXISTS location_name text GENERATED ALWAYS AS (raw->>'location_name') STORED,
ADD COLUMN IF NOT EXISTS assigned_team text GENERATED ALWAYS AS (raw->>'assigned_team') STORED,
ADD COLUMN IF NOT EXISTS owner text GENERATED ALWAYS AS (raw->>'owner') STORED,
ADD COLUMN IF NOT EXISTS sla_status text GENERATED ALWAYS AS (raw->>'sla_status') STORED,
-- numeric / float
ADD COLUMN IF NOT EXISTS mttr numeric GENERATED ALWAYS AS (NULLIF(raw->>'mttr','')::numeric) STORED,
ADD COLUMN IF NOT EXISTS latitude double precision GENERATED ALWAYS AS (NULLIF(raw->>'latitude','')::double precision) STORED,
ADD COLUMN IF NOT EXISTS longitude double precision GENERATED ALWAYS AS (NULLIF(raw->>'longitude','')::double precision) STORED,
-- boolean
ADD COLUMN IF NOT EXISTS is_actionable boolean GENERATED ALWAYS AS (NULLIF(raw->>'is_actionable','')::boolean) STORED,
ADD COLUMN IF NOT EXISTS is_auto_created boolean GENERATED ALWAYS AS (NULLIF(raw->>'is_auto_created','')::boolean) STORED,
ADD COLUMN IF NOT EXISTS is_auto_closed boolean GENERATED ALWAYS AS (NULLIF(raw->>'is_auto_closed','')::boolean) STORED,
ADD COLUMN IF NOT EXISTS is_alarm boolean GENERATED ALWAYS AS (NULLIF(raw->>'is_alarm','')::boolean) STORED,
-- timestamps (EAT wall-clock -> timestamptz). created_at/updated_at are the EXPORT
-- pipeline's bookkeeping (not ticket lifecycle), hence the source_ prefix.
ADD COLUMN IF NOT EXISTS created_at_service timestamptz GENERATED ALWAYS AS (tickets.eat_ts(raw->>'created_at_service')) STORED,
ADD COLUMN IF NOT EXISTS scheduled_at timestamptz GENERATED ALWAYS AS (tickets.eat_ts(raw->>'scheduled_at')) STORED,
ADD COLUMN IF NOT EXISTS closed_at timestamptz GENERATED ALWAYS AS (tickets.eat_ts(raw->>'closed_at')) STORED,
ADD COLUMN IF NOT EXISTS last_seen_at timestamptz GENERATED ALWAYS AS (tickets.eat_ts(raw->>'last_seen_at')) STORED,
ADD COLUMN IF NOT EXISTS first_seen_at timestamptz GENERATED ALWAYS AS (tickets.eat_ts(raw->>'first_seen_at')) STORED,
ADD COLUMN IF NOT EXISTS source_created_at timestamptz GENERATED ALWAYS AS (tickets.eat_ts(raw->>'created_at')) STORED,
ADD COLUMN IF NOT EXISTS source_updated_at timestamptz GENERATED ALWAYS AS (tickets.eat_ts(raw->>'updated_at')) STORED;
-- ── typed-column indexes (serve cluster / team / closure queries) ────────────
CREATE INDEX IF NOT EXISTS ix_crq_norm_status_col ON tickets.crq (normalized_status);
CREATE INDEX IF NOT EXISTS ix_crq_cluster_col ON tickets.crq (cluster);
CREATE INDEX IF NOT EXISTS ix_crq_assigned_team ON tickets.crq (assigned_team);
CREATE INDEX IF NOT EXISTS ix_crq_closed_at ON tickets.crq (closed_at);
CREATE INDEX IF NOT EXISTS ix_crq_actionable_col ON tickets.crq (is_actionable) WHERE is_actionable;
-- ── grants (guarded: roles may not exist on a fresh DB) ──────────────────────
DO $grants$
BEGIN
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'tracksolid_owner') THEN
GRANT SELECT, INSERT, UPDATE, DELETE ON tickets.crq TO tracksolid_owner;
END IF;
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'dashboard_ro') THEN
GRANT SELECT ON tickets.crq TO dashboard_ro;
END IF;
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'grafana_ro') THEN
GRANT SELECT ON tickets.crq TO grafana_ro;
END IF;
END $grants$;

View file

@ -0,0 +1,297 @@
-- 16_crq_dashboard.sql — fleettickets · CRQ dashboard parity (view + read functions)
-- ─────────────────────────────────────────────────────────────────────────────
-- Brings CRQ to FleetOps-dashboard parity with INC, so the Tickets tab's CRQ
-- sub-tab works "just like INC". Mirrors, against tickets.crq:
-- tickets.crq_open_sla ← mirror of tickets.inc_open_sla (08)
-- reporting.fn_crq_dashboard ← mirror of reporting.fn_inc_dashboard (09/12)
-- reporting.fn_crq_search ← mirror of reporting.fn_inc_search (13)
-- reporting.fn_crq_filter_options ← mirror of reporting.fn_inc_filter_options (14)
-- consumed by dashboard_api GET /webhook/crq-dashboard | crq-search | crq-filter-options.
--
-- Differences from the INC view: tickets.crq has no `geog` column (mig 05 is INC-only)
-- and its latitude/longitude come from `raw` (empty in the feed), so crq_open_sla omits
-- geog and derives latitude/longitude from `geom`. The 48h SLA rule is reused verbatim
-- for layout parity — installation-lifecycle SLA semantics may be refined later.
--
-- Idempotent (CREATE OR REPLACE / VIEW). Requires migration 15 (tickets.crq + typed cols).
-- ─────────────────────────────────────────────────────────────────────────────
SET search_path = tickets, public;
-- ── crq_open_sla — open CRQ tickets with derived SLA (mirror of inc_open_sla) ─
CREATE OR REPLACE VIEW tickets.crq_open_sla AS
SELECT
ticket_id,
normalized_status,
bucket,
cluster,
region,
location_name,
assigned_team,
owner,
sla_status AS source_sla_status,
mttr, -- minutes (null until closed)
COALESCE(created_at_service, first_seen_at) AS sla_clock,
CASE WHEN created_at_service IS NOT NULL THEN 'service' ELSE 'first_seen' END AS sla_clock_source,
round((EXTRACT(EPOCH FROM now() - COALESCE(created_at_service, first_seen_at)) / 3600)::numeric, 1) AS hours_open,
CASE
WHEN COALESCE(created_at_service, first_seen_at) IS NULL THEN 'unknown'
WHEN now() - COALESCE(created_at_service, first_seen_at) >= interval '48h' THEN 'breached'
WHEN now() - COALESCE(created_at_service, first_seen_at) >= interval '36h' THEN 'at_risk'
ELSE 'ok'
END AS sla_state,
created_at_service,
first_seen_at,
scheduled_at,
CASE WHEN geom IS NOT NULL THEN ST_Y(geom) END AS latitude,
CASE WHEN geom IS NOT NULL THEN ST_X(geom) END AS longitude,
geo_source,
geom
FROM tickets.crq
WHERE is_actionable;
COMMENT ON VIEW tickets.crq_open_sla IS
'Open (is_actionable) CRQ tickets with derived SLA (48h rule; clock = created_at_service '
'or first_seen_at fallback). Mirror of inc_open_sla; no geog. fleettickets 16.';
-- ── fn_crq_dashboard — mirror of fn_inc_dashboard over tickets.crq ───────────
CREATE OR REPLACE FUNCTION reporting.fn_crq_dashboard(
p_cluster text DEFAULT NULL,
p_status text DEFAULT NULL,
p_window text DEFAULT 'today',
p_from timestamptz DEFAULT NULL,
p_to timestamptz DEFAULT NULL
)
RETURNS jsonb LANGUAGE plpgsql STABLE AS $function$
DECLARE
v_now_eat timestamp;
v_from timestamptz;
v_to timestamptz;
v_preset text;
v_days numeric;
v_result jsonb;
BEGIN
p_cluster := NULLIF(p_cluster, '');
p_status := NULLIF(p_status, '');
v_now_eat := now() AT TIME ZONE 'Africa/Nairobi';
-- ── resolve the window ──────────────────────────────────────────────────────
IF p_from IS NOT NULL OR p_to IS NOT NULL THEN
v_preset := 'custom';
v_from := COALESCE(p_from, '-infinity'::timestamptz);
v_to := COALESCE(p_to, 'infinity'::timestamptz);
ELSE
v_preset := lower(COALESCE(NULLIF(p_window, ''), 'today'));
IF v_preset = 'week' THEN
v_from := date_trunc('week', v_now_eat) AT TIME ZONE 'Africa/Nairobi';
v_to := (date_trunc('week', v_now_eat) + interval '1 week') AT TIME ZONE 'Africa/Nairobi';
ELSIF v_preset = 'month' THEN
v_from := date_trunc('month', v_now_eat) AT TIME ZONE 'Africa/Nairobi';
v_to := (date_trunc('month', v_now_eat) + interval '1 month') AT TIME ZONE 'Africa/Nairobi';
ELSE
v_preset := 'today';
v_from := date_trunc('day', v_now_eat) AT TIME ZONE 'Africa/Nairobi';
v_to := (date_trunc('day', v_now_eat) + interval '1 day') AT TIME ZONE 'Africa/Nairobi';
END IF;
END IF;
IF v_from > '-infinity'::timestamptz AND v_to < 'infinity'::timestamptz THEN
v_days := GREATEST(EXTRACT(EPOCH FROM (v_to - v_from)) / 86400.0, 1);
ELSE
v_days := NULL; -- open-ended custom window → per-day average not meaningful
END IF;
-- ── build payload ───────────────────────────────────────────────────────────
WITH open_t AS (
SELECT * FROM tickets.crq_open_sla
WHERE (p_cluster IS NULL OR cluster = p_cluster)
AND (p_status IS NULL OR normalized_status = p_status)
),
closed_t AS (
SELECT ticket_id, normalized_status, cluster, region, location_name,
assigned_team, owner, closed_at, mttr, sla_status, geo_source, geom
FROM tickets.crq
WHERE NOT COALESCE(is_actionable, false)
AND closed_at IS NOT NULL
AND closed_at >= v_from AND closed_at < v_to
AND (p_cluster IS NULL OR cluster = p_cluster)
AND (p_status IS NULL OR normalized_status = p_status)
)
SELECT jsonb_build_object(
'window', jsonb_build_object('from', v_from, 'to', v_to, 'preset', v_preset),
'open', jsonb_build_object(
'type', 'FeatureCollection',
'features', COALESCE((
SELECT jsonb_agg(jsonb_build_object(
'type', 'Feature',
'properties', jsonb_build_object(
'ticket_id', ticket_id, 'normalized_status', normalized_status,
'cluster', cluster, 'region', region, 'location_name', location_name,
'assigned_team', assigned_team, 'owner', initcap(lower(NULLIF(owner, ''))),
'geo_source', geo_source,
'sla_state', sla_state, 'hours_open', hours_open),
'geometry', ST_AsGeoJSON(geom)::jsonb))
FROM open_t WHERE geom IS NOT NULL), '[]'::jsonb)
),
'closed', jsonb_build_object(
'type', 'FeatureCollection',
'features', COALESCE((
SELECT jsonb_agg(jsonb_build_object(
'type', 'Feature',
'properties', jsonb_build_object(
'ticket_id', ticket_id, 'normalized_status', normalized_status,
'cluster', cluster, 'region', region, 'location_name', location_name,
'assigned_team', assigned_team, 'owner', initcap(lower(NULLIF(owner, ''))),
'geo_source', geo_source,
'closed_at', closed_at, 'mttr', mttr, 'sla_status', sla_status),
'geometry', ST_AsGeoJSON(geom)::jsonb))
FROM closed_t WHERE geom IS NOT NULL), '[]'::jsonb)
),
'metrics', jsonb_build_object(
'open_now', (SELECT count(*) FROM open_t),
'closed_in_window', (SELECT count(*) FROM closed_t),
'sla', jsonb_build_object(
'open', (SELECT jsonb_build_object(
'breached', count(*) FILTER (WHERE sla_state = 'breached'),
'at_risk', count(*) FILTER (WHERE sla_state = 'at_risk'),
'ok', count(*) FILTER (WHERE sla_state = 'ok'),
'unknown', count(*) FILTER (WHERE sla_state = 'unknown')) FROM open_t),
'closed', (SELECT jsonb_build_object(
'compliant', count(*) FILTER (WHERE sla_status = 'Compliant'),
'breached', count(*) FILTER (WHERE sla_status = 'Breached')) FROM closed_t)
),
'by_status', COALESCE((SELECT jsonb_object_agg(s, c) FROM (
SELECT COALESCE(normalized_status, '(none)') AS s, count(*) AS c FROM (
SELECT normalized_status FROM open_t
UNION ALL SELECT normalized_status FROM closed_t) u GROUP BY 1) z), '{}'::jsonb),
'by_cluster', COALESCE((SELECT jsonb_object_agg(cl, c) FROM (
SELECT COALESCE(cluster, '(none)') AS cl, count(*) AS c FROM (
SELECT cluster FROM open_t
UNION ALL SELECT cluster FROM closed_t) u GROUP BY 1) z), '{}'::jsonb),
-- closures by engineer (CASE-NORMALIZED owner) — leaderboard for "who closed".
'by_owner', COALESCE((SELECT jsonb_agg(jsonb_build_object(
'owner', o, 'closed', c, 'breached', b, 'avg_mttr_min', a) ORDER BY c DESC, o)
FROM (
SELECT COALESCE(initcap(lower(NULLIF(owner, ''))), '(unattributed)') AS o,
count(*) AS c,
count(*) FILTER (WHERE sla_status = 'Breached') AS b,
round(avg(mttr) FILTER (WHERE mttr IS NOT NULL), 1) AS a
FROM closed_t GROUP BY 1) z), '[]'::jsonb),
'closure_rate', jsonb_build_object(
'per_day_avg', CASE WHEN v_days IS NULL THEN NULL
ELSE round((SELECT count(*) FROM closed_t)::numeric / v_days, 2) END,
'series', COALESCE((SELECT jsonb_agg(jsonb_build_object('day', d, 'count', c) ORDER BY d) FROM (
SELECT (closed_at AT TIME ZONE 'Africa/Nairobi')::date AS d, count(*) AS c
FROM closed_t GROUP BY 1) z), '[]'::jsonb)
),
'avg_mttr_min', (SELECT round(avg(mttr), 1) FROM closed_t WHERE mttr IS NOT NULL)
),
'freshness', (SELECT jsonb_object_agg(dataset, jsonb_build_object(
'export_type', export_type, 'exported_at', exported_at,
'records_ingested', records_ingested, 'ingested_at', ingested_at))
FROM tickets.import_meta)
) INTO v_result;
RETURN v_result;
END $function$;
-- ── fn_crq_search — mirror of fn_inc_search over tickets.crq ──────────────────
CREATE OR REPLACE FUNCTION reporting.fn_crq_search(
p_ticket_id text DEFAULT NULL,
p_owner text DEFAULT NULL,
p_cluster text DEFAULT NULL,
p_status text DEFAULT NULL,
p_state text DEFAULT 'closed',
p_from timestamptz DEFAULT NULL,
p_to timestamptz DEFAULT NULL,
p_limit integer DEFAULT 500
)
RETURNS jsonb LANGUAGE plpgsql STABLE AS $function$
DECLARE
v_state text := lower(COALESCE(NULLIF(p_state, ''), 'closed'));
v_limit integer := LEAST(GREATEST(COALESCE(p_limit, 500), 1), 5000);
v_result jsonb;
BEGIN
p_ticket_id := NULLIF(trim(p_ticket_id), '');
p_owner := NULLIF(trim(p_owner), '');
p_cluster := NULLIF(p_cluster, '');
p_status := NULLIF(p_status, '');
WITH hits AS (
SELECT ticket_id, normalized_status, cluster, region, location_name,
initcap(lower(NULLIF(owner, ''))) AS owner, assigned_team,
sla_status, mttr, closed_at, created_at_service, is_actionable,
CASE WHEN geom IS NOT NULL THEN ST_Y(geom) END AS lat,
CASE WHEN geom IS NOT NULL THEN ST_X(geom) END AS lng
FROM tickets.crq
WHERE (p_ticket_id IS NULL OR ticket_id ILIKE '%' || p_ticket_id || '%')
AND (p_owner IS NULL OR lower(owner) LIKE '%' || lower(p_owner) || '%')
AND (p_cluster IS NULL OR cluster = p_cluster)
AND (p_status IS NULL OR normalized_status = p_status)
AND CASE v_state
WHEN 'open' THEN COALESCE(is_actionable, false)
WHEN 'all' THEN COALESCE(is_actionable, false)
OR (closed_at IS NOT NULL
AND (p_from IS NULL OR closed_at >= p_from)
AND (p_to IS NULL OR closed_at < p_to))
ELSE NOT COALESCE(is_actionable, false) -- 'closed'
AND closed_at IS NOT NULL
AND (p_from IS NULL OR closed_at >= p_from)
AND (p_to IS NULL OR closed_at < p_to)
END
),
total AS (SELECT count(*) AS n FROM hits),
page AS (
SELECT * FROM hits
ORDER BY closed_at DESC NULLS LAST, created_at_service DESC NULLS LAST
LIMIT v_limit
)
SELECT jsonb_build_object(
'count', (SELECT n FROM total),
'truncated', (SELECT n FROM total) > v_limit,
'limit', v_limit,
'state', v_state,
'rows', COALESCE((SELECT jsonb_agg(to_jsonb(page)
ORDER BY page.closed_at DESC NULLS LAST,
page.created_at_service DESC NULLS LAST)
FROM page), '[]'::jsonb)
) INTO v_result;
RETURN v_result;
END $function$;
-- ── fn_crq_filter_options — mirror of fn_inc_filter_options over tickets.crq ──
CREATE OR REPLACE FUNCTION reporting.fn_crq_filter_options()
RETURNS jsonb LANGUAGE sql STABLE AS $function$
SELECT jsonb_build_object(
'owners', (SELECT COALESCE(jsonb_agg(o ORDER BY o), '[]'::jsonb)
FROM (SELECT DISTINCT initcap(lower(NULLIF(owner, ''))) AS o
FROM tickets.crq WHERE NULLIF(owner, '') IS NOT NULL) s),
'clusters', (SELECT COALESCE(jsonb_agg(c ORDER BY c), '[]'::jsonb)
FROM (SELECT DISTINCT cluster AS c
FROM tickets.crq WHERE NULLIF(cluster, '') IS NOT NULL) s),
'open_ticket_ids', (SELECT COALESCE(jsonb_agg(ticket_id ORDER BY ticket_id), '[]'::jsonb)
FROM tickets.crq WHERE COALESCE(is_actionable, false))
);
$function$;
-- ── grants (guarded: roles may not exist on a fresh DB) ──────────────────────
DO $grants$
BEGIN
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'dashboard_ro') THEN
GRANT SELECT ON tickets.crq_open_sla TO dashboard_ro;
GRANT EXECUTE ON FUNCTION reporting.fn_crq_dashboard(text, text, text, timestamptz, timestamptz) TO dashboard_ro;
GRANT EXECUTE ON FUNCTION reporting.fn_crq_search(text, text, text, text, text, timestamptz, timestamptz, integer) TO dashboard_ro;
GRANT EXECUTE ON FUNCTION reporting.fn_crq_filter_options() TO dashboard_ro;
END IF;
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'grafana_ro') THEN
GRANT SELECT ON tickets.crq_open_sla TO grafana_ro;
GRANT EXECUTE ON FUNCTION reporting.fn_crq_dashboard(text, text, text, timestamptz, timestamptz) TO grafana_ro;
GRANT EXECUTE ON FUNCTION reporting.fn_crq_search(text, text, text, text, text, timestamptz, timestamptz, integer) TO grafana_ro;
GRANT EXECUTE ON FUNCTION reporting.fn_crq_filter_options() TO grafana_ro;
END IF;
END $grants$;

View file

@ -1,156 +0,0 @@
# n8n Hourly S3 Full-Data Exports
Updated on June 15, 2026.
## Overview
Two active n8n workflows export complete datasets to S3 every hour:
1. `FTTH Automation Ticket S3 Export`
2. `Fuel Records S3 Export`
Each execution creates CSV files only. Filenames use the actual execution time
in the `Africa/Nairobi` timezone.
No delta files, JSON files, `latest` files, `changes/` directories, `full/`
directories, or midnight-specific exports are created.
## Hourly Output
Together, the two workflows create exactly three files during their hourly
executions:
```text
automations/crq/YYYY-MM-DDTHH-mm-ss.csv
automations/inc/YYYY-MM-DDTHH-mm-ss.csv
fuel_records/YYYY-MM-DDTHH-mm-ss.csv
```
The CRQ and INC files are uploaded to the `tickets` bucket. The Fuel file is
uploaded to the `fuel` bucket.
## FTTH Automation Ticket S3 Export
Workflow ID: `JI3QkcJeHk9eYRsY`
The workflow:
1. Runs at the start of every hour using the `Africa/Nairobi` workflow timezone.
2. Creates one execution timestamp.
3. Calls the existing authenticated Scoreboard export endpoint with
`export_type: full`.
4. Reads all CRQ and INC rows returned by the endpoint.
5. Converts each complete dataset to CSV.
6. Uploads exactly two files:
- `automations/crq/<execution-timestamp>.csv`
- `automations/inc/<execution-timestamp>.csv`
7. Fails the execution if exactly two successful upload results are not
returned.
The workflow still has its existing manual webhook for operational testing.
## Fuel Records S3 Export
Workflow ID: `IP2KNAfFazAjTesh`
The workflow:
1. Runs at the start of every hour using the `Africa/Nairobi` workflow timezone.
2. Creates one execution timestamp.
3. Reads the complete `logistics_department.fuel_records` table.
4. Converts all returned rows to one CSV.
5. Uploads exactly one file:
- `fuel_records/<execution-timestamp>.csv`
6. Fails the execution if the S3 upload reports an error.
The workflow still has its existing manual webhook for operational testing.
## Timestamp Format
The timestamp format is:
```text
YYYY-MM-DDTHH-mm-ss
```
Example:
```text
2026-06-15T14-39-53
```
The timestamp is generated once at the start of each workflow execution and is
formatted in `Africa/Nairobi`.
## Credentials and Safety
- Existing n8n PostgreSQL, S3, workflow-variable, and API token configuration is
reused.
- No S3 credentials or API secrets are hard-coded in workflow code.
- Secrets are not included in workflow result messages.
- Source database queries are read-only.
- The workflows do not delete or update source database rows.
- S3 upload nodes retain retry handling. A failed hourly execution can also be
recovered naturally by the next full-data run.
## Removed Behavior
The workflows no longer contain:
- Delta export logic or stored delta pointers
- Midnight full-export schedules
- `latest.json` or `latest.csv`
- JSON output
- `changes/` keys
- `full/` keys
- Multipart or additional export files
- FTTH mark-sent state handling
## Deployment Status
Both workflows were saved, published, and activated on June 15, 2026.
Active versions:
```text
Fuel Records S3 Export:
60cf5824-9345-45bb-a2eb-3b20b877fd32
FTTH Automation Ticket S3 Export:
68b7be10-ac3a-43d8-8c17-b46a2cbb48d2
```
## Manual Test Evidence
### Fuel Records S3 Export
Execution ID: `404079`
Rows exported: `2001`
Exact S3 key:
```text
fuel_records/2026-06-15T14-39-50.csv
```
### FTTH Automation Ticket S3 Export
Execution ID: `404080`
Rows exported:
```text
CRQ: 12680
INC: 31434
```
Exact S3 keys:
```text
automations/crq/2026-06-15T14-39-53.csv
automations/inc/2026-06-15T14-39-53.csv
```
Both manual tests completed successfully. Their upload builders generated one
Fuel item and exactly two FTTH items, matching the required three output files.

130
n8n-s3-ticket-exports.md Normal file
View file

@ -0,0 +1,130 @@
# n8n S3 Ticket Exports — Incremental (CDC) Stream
Updated on June 23, 2026.
> **History.** This doc previously described a full-snapshot-per-hour model
> ("No delta files … No `changes/` directories"). That is no longer accurate.
> As of the June 22, 2026 re-seed the source switched to an **incremental
> change-data-capture (CDC) stream** under `automations/<dataset>/changes/`.
> The structure below was verified by direct S3 inspection of the `tickets`
> bucket on June 23, 2026. Workflow-internal details (IDs, node behaviour) are
> carried over from the prior version and may be stale — trust the bucket.
## Overview
The FTTH ticket export now writes an **incremental** CSV stream per dataset:
- The **first** file in a stream is a full current-state **baseline**.
- Every **later** file holds **only the rows that changed** since the prior
export — new and updated tickets, keyed by `ticket_id`.
- **Deletions are never emitted** (tickets are closed in place, not removed).
Consumers must ingest **every not-yet-processed file in ascending timestamp
order** (baseline first, then each delta) and **upsert on `ticket_id`**. Taking
only the newest file silently drops the intermediate deltas.
CSV files only. Filenames use the execution time in the `Africa/Nairobi`
timezone (format below). All files share one identical flat-CSV schema (header
+ rows) — the same column set the previous full snapshots used.
## Output Layout
The change stream lives under a `changes/` prefix per dataset in the `tickets`
bucket:
```text
automations/crq/changes/YYYY-MM-DDTHH-mm-ss.csv
automations/inc/changes/YYYY-MM-DDTHH-mm-ss.csv
```
Observed `tickets` bucket layout (June 23, 2026):
```text
automations/inc/
├── changes/ ← ACTIVE incremental stream (baseline + deltas)
│ ├── 2026-06-22T15-50-39.csv (baseline, ~15 MB, 34,849 rows)
│ ├── 2026-06-22T15-53-04.csv (delta, 1 row)
│ ├── 2026-06-22T17-10-41.csv (delta, 22 rows)
│ └── 2026-06-22T17-15-41.csv (delta, 131 rows)
├── processed/ ← our pipeline's archive of consumed files
├── full/ ← present but EMPTY (legacy prefix)
├── latest.csv/ ← present but EMPTY (legacy prefix)
└── latest.json/ ← present but EMPTY (legacy prefix)
```
Notes:
- There are **no longer any `automations/inc/<ts>.csv` files at the root** of
`inc/` — the last full snapshots (through `2026-06-18T17-00-05.csv`) were
archived to `processed/`. New data arrives **only** under `changes/`.
- The `full/`, `latest.csv/`, and `latest.json/` prefixes still appear in
listings but contain **no objects** (leftover/legacy; ignore them). There is
no `latest` pointer and no JSON/metadata envelope.
- **CRQ mirrors INC**: `automations/crq/changes/` carries the same incremental
stream (with matching baseline timestamps and additional newer deltas) and the
identical 32-column schema. As of 2026-06-25 CRQ **is consumed** — by
`crq/import_crq.py` over the shared `pipeline.py` engine (`tickets.crq`), the same
way INC is consumed by `inc/import_inc.py`. CRQ's old root snapshots
(`automations/crq/<ts>.csv`, old `tickets` bucket) are still present because nothing
archives them — they are not consumed (the `changes/` stream is the source of truth).
## CSV Schema
Header (32 columns), identical across baseline and delta files:
```text
ticket_id, source_type, service_type, bucket, raw_status, normalized_status,
created_at_service, scheduled_at, closed_at, last_seen_at, first_seen_at,
week_start, week_end, cluster, region, location_name, latitude, longitude,
department, assigned_team, owner, sla_status, mttr, is_auto_created,
is_auto_closed, is_alarm, is_actionable, source_s3_bucket, source_s3_key,
source_snapshot_id, created_at, updated_at
```
Each row is a complete record (not a partial diff): a delta row carries the
ticket's full current state, so a plain upsert on `ticket_id` is correct. The
baseline still contains `is_alarm=true` rows and may include a leading
`EXPORT STOPPED…` truncation-sentinel row in `ticket_id`; both are filtered by
the consumer (see `pipeline.py` + the `inc/`,`crq/` entrypoints).
## Timestamp Format
```text
YYYY-MM-DDTHH-mm-ss e.g. 2026-06-22T15-50-39
```
Generated once at the start of each execution, formatted in `Africa/Nairobi`
(EAT). Note this is the *execution* time, not a top-of-the-hour schedule — the
incremental files appear whenever a change batch is exported (multiple within
the same hour are normal, e.g. `15-50-39` then `15-53-04`).
## How the loader Consumes It
`python -m inc.import_inc --from-bucket --apply` (and `python -m crq.import_crq
--from-bucket --apply`; see `run_ingest.sh`) — both over the shared `pipeline.py` engine:
1. Lists `automations/<inc|crq>/changes/<ts>.csv`, sorts ascending by timestamp.
2. Skips files at/older than the **watermark**
(`tickets.import_meta.metadata->>'source_max_key'` — the newest file already
applied); on a fresh stream it processes everything present.
3. For each pending file, oldest→newest: drop `is_alarm=true` + sentinel rows,
strip `DROP_FIELDS`, normalize `region`/`raw_status`, then upsert on
`ticket_id`. The row upsert and the watermark advance **commit in one
transaction per file**, after which the file is moved to
`automations/inc/processed/`.
4. A mid-run failure therefore leaves folder state consistent with the
watermark; the next run resumes cleanly from where it stopped.
The first file applied onto an empty watermark is recorded as
`export_type="baseline"` in `tickets.import_meta`; every file after is `"delta"`.
## Workflow Context (carried over — verify before relying on)
The export originates from the FTTH Automation Ticket export workflow, calling
the authenticated Scoreboard export endpoint and uploading CSV(s) to the
`tickets` bucket; a sibling workflow exports `fuel_records/<ts>.csv` to the
`fuel` bucket. Source DB queries are read-only and the workflows do not delete
or update source rows. The previously documented workflow IDs and the claim of
"two files per hour, full snapshots, no `changes/`" predate the June 22 switch
to the incremental stream and should be re-confirmed against the live n8n
configuration before being treated as current.

View file

@ -1,60 +1,51 @@
""" """
import_tickets.py Fireside Communications · INC ticket ingestion (raw-first) pipeline.py Fireside Communications · generic ticket ingestion engine (raw-first)
Loads the client's field-ops INC ticket snapshots into the `tickets` schema — the The dataset-agnostic core shared by the per-type entrypoints:
source of the FleetOps "Tickets" map. inc/import_inc.py -> tickets.inc (incidents / customer faults)
tickets.inc incidents / customer faults crq/import_crq.py -> tickets.crq (new-installation requests)
STRICTLY INC: CRQ (new-installation) exports are out of scope and not processed Both datasets share an IDENTICAL flat-CSV schema and the same CDC change stream,
here. `tickets.crq` stays in the schema but is not fed by this pipeline. so the only differences are the table, the S3 prefixes, the import_meta dataset
key, and an optional post-apply hook (INC captures closure/backlog history; CRQ
does not yet). Those are carried by the `Dataset` config; everything else here is
generic. Geocoding is inherently CROSS-DATASET (one gazetteer, one geocoder
budget): geocode_clusters / geocode_locations / resolve operate on BOTH tables and
are driven from a single entrypoint (the INC one) never duplicated per dataset.
RAW-FIRST: each row stores only `ticket_id` + `raw` (the source record as jsonb). RAW-FIRST: each row stores only `ticket_id` + `raw` (the source record as jsonb).
Everything downstream reads from `raw` (resilient to source schema drift). The DB Everything downstream reads from `raw` (resilient to source schema drift). The DB
derives `geom` (see migrations): feed coords (raw lat/lng) -> location geocode derives `geom` (see migrations): feed coords (raw lat/lng) -> location geocode
(tickets.geo_locations) -> cluster centroid (tickets.geo_clusters) -> none. (tickets.geo_locations) -> cluster centroid (tickets.geo_clusters) -> none.
Source data: the n8n hourly S3 export (see n8n-hourly-s3-full-data-exports.md) Source data: the n8n S3 export writes CSV files to the `isptickets` bucket under
writes a full current-state snapshot CSV per hour to the `tickets` bucket at automations/<dataset>/changes/<EAT-timestamp>.csv (e.g. 2026-06-24T09-55-44.csv)
automations/inc/<EAT-timestamp>.csv (e.g. 2026-06-15T17-00-00.csv) This is an INCREMENTAL (CDC) stream: the first file is a full current-state
There is NO latest pointer, NO metadata envelope, and NO deltas each file is a baseline, and every later file holds only the rows that CHANGED since the prior
flat CSV (header + rows). We ingest the NEWEST file: export (with periodic full-state re-emissions). Deletions are never emitted. Every
- skip-if-unchanged: if its S3 ETag matches the last processed file's ETag we file shares the identical flat-CSV schema. We ingest EVERY not-yet-processed file
skip the DB write (the export re-emits byte-identical content most hours); in ASCENDING timestamp order (baseline first, then each delta) taking only the
newest would silently drop the intermediate deltas:
- drop is_alarm=true rows + the "EXPORT STOPPED…" truncation-sentinel row; - drop is_alarm=true rows + the "EXPORT STOPPED…" truncation-sentinel row;
- drop derivable / provenance / zero-info columns (see DROP_FIELDS); - drop derivable / provenance / zero-info columns (see DROP_FIELDS);
- normalize region -> lowercase, raw_status -> UPPERCASE; - normalize region -> lowercase, raw_status -> UPPERCASE;
- upsert on ticket_id (PRIMARY KEY no duplication; never delete, so closure - upsert on ticket_id (PRIMARY KEY no duplication; never delete, so closure
history accumulates), and record snapshot freshness in tickets.import_meta; history accumulates), and advance the watermark in tickets.import_meta
- on success, MOVE the file to automations/inc/processed/ (copy + delete). (metadata->>'source_max_key' = newest file applied) so reruns skip what's done;
- on success, MOVE each file to automations/<dataset>/processed/ (copy + delete).
Geocoding (two layers, both via a KEYED provider public Nominatim rate-limits):
--geocode-clusters one coordinate per cluster (coarse fallback; ~50 lookups)
--geocode-locations precise per-location for ACTIONABLE INC tickets: parses the
real place out of location_name (region+cluster+location_name,
network codes stripped), geocodes it, caches in
tickets.geo_locations, then re-resolves geoms.
Provider/key from env: GEOCODER_PROVIDER (locationiq|opencage), GEOCODER_API_KEY.
Usage (needs DATABASE_URL + RUSTFS_* + GEOCODER_* env; see .env.example):
python import_tickets.py --from-bucket --apply
python import_tickets.py --inc-csv 2026-06-15T17-00-00.csv --apply
python import_tickets.py --geocode-clusters --apply
python import_tickets.py --geocode-locations --apply
Pre-requisite: migration applied (run_migrations.py) tickets.inc/crq +
geo_clusters + geo_locations + reporting.fn_tickets_for_map.
""" """
from __future__ import annotations from __future__ import annotations
import argparse
import csv
import io import io
import csv
import math import math
import os import os
import re import re
import time import time
from collections.abc import Callable
from dataclasses import dataclass
from datetime import datetime, timezone, timedelta from datetime import datetime, timezone, timedelta
import boto3 import boto3
@ -64,14 +55,10 @@ from botocore.config import Config as BotoConfig
from shared import clean, get_conn, get_logger from shared import clean, get_conn, get_logger
log = get_logger("import_tickets") log = get_logger("pipeline")
# ── INC ingestion config ────────────────────────────────────────────────────── # ── shared ingestion config ─────────────────────────────────────────────────────
_TABLE = "tickets.inc" _BUCKET = os.getenv("TICKETS_BUCKET", "isptickets")
_DATASET = "inc"
_BUCKET = os.getenv("TICKETS_BUCKET", "tickets")
_INC_PREFIX = "automations/inc/"
_PROCESSED_PREFIX = "automations/inc/processed/"
_EAT = timezone(timedelta(hours=3)) # Africa/Nairobi — filenames + data are EAT _EAT = timezone(timedelta(hours=3)) # Africa/Nairobi — filenames + data are EAT
# Garbage row the source leaks (commonly the first data line): its ticket_id is the # Garbage row the source leaks (commonly the first data line): its ticket_id is the
@ -88,10 +75,6 @@ DROP_FIELDS = frozenset({
"department", "source_type", "department", "source_type",
}) })
# Only files matching automations/inc/<EAT-timestamp>.csv (NOT processed/, NOT the
# leftover latest.csv/, latest.json/, full/ prefixes).
_CSV_KEY_RE = re.compile(r"^automations/inc/(\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2})\.csv$")
# Geocoder (keyed) — public Nominatim rate-limits bulk, so we use LocationIQ/OpenCage. # Geocoder (keyed) — public Nominatim rate-limits bulk, so we use LocationIQ/OpenCage.
_PROVIDER = os.getenv("GEOCODER_PROVIDER", "locationiq").lower() _PROVIDER = os.getenv("GEOCODER_PROVIDER", "locationiq").lower()
_API_KEY = os.getenv("GEOCODER_API_KEY", "") _API_KEY = os.getenv("GEOCODER_API_KEY", "")
@ -99,14 +82,37 @@ _GEOCODE_INTERVAL_S = float(os.getenv("GEOCODER_MIN_INTERVAL_S", "1.1"))
_last_geocode_at = 0.0 _last_geocode_at = 0.0
# ── data loading (CSV · newest-file · ETag skip-if-unchanged) ─────────────────── # ── dataset config (per ticket type) ────────────────────────────────────────────
# The n8n hourly export writes a full current-state CSV per hour to @dataclass(frozen=True)
# automations/inc/<EAT-timestamp>.csv (no latest pointer, no envelope, no deltas). class Dataset:
# We ingest the NEWEST file; if its S3 ETag matches the last processed file's ETag """All that distinguishes one ticket type from another in the generic engine."""
# we skip the DB write (the export re-emits byte-identical content most hours). name: str # 'inc' | 'crq' (import_meta.dataset)
table: str # 'tickets.inc' | 'tickets.crq'
change_prefix: str # 'automations/<name>/changes/'
processed_prefix: str # 'automations/<name>/processed/'
key_regex: re.Pattern # matches a <prefix><EAT-ts>.csv key
post_apply: Callable[[], None] | None = None # e.g. capture_history (INC only)
def make_dataset(name: str, post_apply: Callable[[], None] | None = None) -> Dataset:
"""Build the standard Dataset for a ticket type (inc/crq) — only the name varies."""
return Dataset(
name=name,
table=f"tickets.{name}",
change_prefix=f"automations/{name}/changes/",
processed_prefix=f"automations/{name}/processed/",
# only automations/<name>/changes/<EAT-timestamp>.csv — the incremental stream
# (NOT processed/, NOT the leftover latest.csv/, latest.json/, full/ prefixes).
key_regex=re.compile(
rf"^automations/{name}/changes/(\d{{4}}-\d{{2}}-\d{{2}}T\d{{2}}-\d{{2}}-\d{{2}})\.csv$"),
post_apply=post_apply,
)
# ── data loading (CSV · incremental CDC change stream · per-file watermark) ─────
# S3 access is via boto3 (no aws-CLI dependency → runs cleanly in a slim container). # S3 access is via boto3 (no aws-CLI dependency → runs cleanly in a slim container).
def _s3_client(): def _s3_client():
"""boto3 S3 client for the rustfs endpoint (force path-style addressing).""" """boto3 S3 client for the S3 endpoint (force path-style addressing)."""
return boto3.client( return boto3.client(
"s3", "s3",
endpoint_url=os.environ["RUSTFS_ENDPOINT"], endpoint_url=os.environ["RUSTFS_ENDPOINT"],
@ -118,9 +124,9 @@ def _s3_client():
) )
def _ts_from_key(key: str) -> datetime | None: def _ts_from_key(ds: Dataset, key: str) -> datetime | None:
"""EAT timestamp embedded in an automations/inc/<ts>.csv key (or None).""" """EAT timestamp embedded in an automations/<ds>/changes/<ts>.csv key (or None)."""
m = _CSV_KEY_RE.match(key) m = ds.key_regex.match(key)
if not m: if not m:
return None return None
try: # regex shape can match an impossible date (e.g. 9999-99-99T…) — don't crash the sort try: # regex shape can match an impossible date (e.g. 9999-99-99T…) — don't crash the sort
@ -129,12 +135,12 @@ def _ts_from_key(key: str) -> datetime | None:
return None return None
def _list_inc_csvs(s3) -> list[tuple[str, str]]: def _list_csvs(s3, ds: Dataset) -> list[tuple[str, str]]:
"""[(key, etag)] for every automations/inc/<ts>.csv (excludes processed/ + dirs).""" """[(key, etag)] for every changes/<ts>.csv of this dataset (excludes processed/ + dirs)."""
out: list[tuple[str, str]] = [] out: list[tuple[str, str]] = []
for page in s3.get_paginator("list_objects_v2").paginate(Bucket=_BUCKET, Prefix=_INC_PREFIX): for page in s3.get_paginator("list_objects_v2").paginate(Bucket=_BUCKET, Prefix=ds.change_prefix):
for it in page.get("Contents", []): for it in page.get("Contents", []):
if _CSV_KEY_RE.match(it["Key"]): if ds.key_regex.match(it["Key"]):
out.append((it["Key"], (it.get("ETag") or "").strip('"'))) out.append((it["Key"], (it.get("ETag") or "").strip('"')))
return out return out
@ -144,16 +150,22 @@ def _get_text(s3, key: str) -> str:
return s3.get_object(Bucket=_BUCKET, Key=key)["Body"].read().decode("utf-8") return s3.get_object(Bucket=_BUCKET, Key=key)["Body"].read().decode("utf-8")
def _last_processed_etag() -> str | None: def _last_processed_ts(ds: Dataset) -> datetime | None:
"""ETag of the most recently ingested INC file (from tickets.import_meta).""" """Watermark: EAT timestamp of the newest change file already ingested for this dataset.
Read from tickets.import_meta (metadata->>'source_max_key', advanced per file as
we drain changes/ oldestnewest). None when nothing has been ingested via the
changes stream yet (e.g. a brand-new dataset, or the first run after the source
switched buckets) then every file currently in changes/ is processed.
"""
with get_conn() as conn: with get_conn() as conn:
with conn.cursor() as cur: with conn.cursor() as cur:
cur.execute( cur.execute(
"SELECT metadata->>'source_etag' FROM tickets.import_meta WHERE dataset = %s", "SELECT metadata->>'source_max_key' FROM tickets.import_meta WHERE dataset = %s",
(_DATASET,), (ds.name,),
) )
row = cur.fetchone() row = cur.fetchone()
return row[0] if row else None return _ts_from_key(ds, row[0]) if row and row[0] else None
def _parse_csv(text: str) -> list[dict]: def _parse_csv(text: str) -> list[dict]:
@ -165,10 +177,10 @@ def _load_csv_local(path: str) -> list[dict]:
return list(csv.DictReader(f)) return list(csv.DictReader(f))
def _move_processed(s3, keys: list[str]) -> None: def _move_processed(s3, ds: Dataset, keys: list[str]) -> None:
"""Archive listed INC csv objects to automations/inc/processed/ (copy + delete).""" """Archive listed csv objects to automations/<ds>/processed/ (copy + delete)."""
for key in keys: for key in keys:
dst = _PROCESSED_PREFIX + key.rsplit("/", 1)[-1] dst = ds.processed_prefix + key.rsplit("/", 1)[-1]
s3.copy_object(Bucket=_BUCKET, CopySource={"Bucket": _BUCKET, "Key": key}, Key=dst) s3.copy_object(Bucket=_BUCKET, CopySource={"Bucket": _BUCKET, "Key": key}, Key=dst)
s3.delete_object(Bucket=_BUCKET, Key=key) s3.delete_object(Bucket=_BUCKET, Key=key)
log.info("archived %s -> %s", key, dst) log.info("archived %s -> %s", key, dst)
@ -194,8 +206,8 @@ def _prepare(row: dict) -> dict:
# ── upsert (raw-first) ──────────────────────────────────────────────────────── # ── upsert (raw-first) ────────────────────────────────────────────────────────
def _record_meta(cur, meta: dict, records_ingested: int) -> None: def _record_meta(cur, ds: Dataset, meta: dict, records_ingested: int) -> None:
"""Upsert the INC snapshot metadata (powers map freshness + holds source_etag). """Upsert the snapshot metadata (powers map freshness + holds source_max_key).
Runs on the caller's cursor so the row upsert and the meta write commit Runs on the caller's cursor so the row upsert and the meta write commit
together a half-written state (rows in, meta stale) breaks skip-if-unchanged. together a half-written state (rows in, meta stale) breaks skip-if-unchanged.
@ -213,90 +225,113 @@ def _record_meta(cur, meta: dict, records_ingested: int) -> None:
records_ingested = EXCLUDED.records_ingested, records_ingested = EXCLUDED.records_ingested,
n8n_execution_id = EXCLUDED.n8n_execution_id, metadata = EXCLUDED.metadata, n8n_execution_id = EXCLUDED.n8n_execution_id, metadata = EXCLUDED.metadata,
ingested_at = now()""", ingested_at = now()""",
(_DATASET, clean(meta.get("export_type")), clean(meta.get("exported_at")), (ds.name, clean(meta.get("export_type")), clean(meta.get("exported_at")),
clean(meta.get("snapshot_date")), clean(meta.get("source_schema")), clean(meta.get("snapshot_date")), clean(meta.get("source_schema")),
clean(meta.get("source_table")), meta.get("row_count"), records_ingested, clean(meta.get("source_table")), meta.get("row_count"), records_ingested,
clean(meta.get("n8n_execution_id")), psycopg2.extras.Json(meta)), clean(meta.get("n8n_execution_id")), psycopg2.extras.Json(meta)),
) )
def upsert(rows: list[dict], apply: bool, meta: dict | None = None) -> int: def upsert(ds: Dataset, rows: list[dict], apply: bool, meta: dict | None = None) -> int:
meta = meta or {} meta = meta or {}
kept = [r for r in rows if _keep_row(r)] kept = [r for r in rows if _keep_row(r)]
payload = [(clean(r["ticket_id"]), psycopg2.extras.Json(_prepare(r))) for r in kept] payload = [(clean(r["ticket_id"]), psycopg2.extras.Json(_prepare(r))) for r in kept]
log.info("%s: %d rows read, %d kept, %d dropped (alarm/sentinel/no-id)", log.info("%s: %d rows read, %d kept, %d dropped (alarm/sentinel/no-id)",
_TABLE, len(rows), len(payload), len(rows) - len(payload)) ds.table, len(rows), len(payload), len(rows) - len(payload))
if not apply: if not apply:
log.info("DRY-RUN — nothing written to %s. Use --apply.", _TABLE) log.info("DRY-RUN — nothing written to %s. Use --apply.", ds.table)
return len(payload) return len(payload)
with get_conn() as conn: with get_conn() as conn:
with conn.cursor() as cur: with conn.cursor() as cur:
psycopg2.extras.execute_values( psycopg2.extras.execute_values(
cur, cur,
f"INSERT INTO {_TABLE} (ticket_id, raw) VALUES %s " f"INSERT INTO {ds.table} (ticket_id, raw) VALUES %s "
"ON CONFLICT (ticket_id) DO UPDATE SET raw = EXCLUDED.raw, ingested_at = now()", "ON CONFLICT (ticket_id) DO UPDATE SET raw = EXCLUDED.raw, ingested_at = now()",
payload, page_size=500, payload, page_size=500,
) )
# same transaction as the upsert: rows + snapshot meta commit atomically # same transaction as the upsert: rows + snapshot meta commit atomically
_record_meta(cur, meta, len(payload)) _record_meta(cur, ds, meta, len(payload))
log.info("upserted %d rows into %s", len(payload), _TABLE) log.info("upserted %d rows into %s", len(payload), ds.table)
return len(payload) return len(payload)
def _capture_history() -> None: def capture_history() -> None:
"""Append new closures + upsert today's backlog snapshot (tickets.capture_history).""" """Append new closures + upsert today's backlog snapshot (tickets.capture_history).
INC-only today (CRQ install-lifecycle history is a future migration); wired as
the INC Dataset's post_apply hook.
"""
with get_conn() as conn: with get_conn() as conn:
with conn.cursor() as cur: with conn.cursor() as cur:
cur.execute("SELECT tickets.capture_history()") cur.execute("SELECT tickets.capture_history()")
log.info("history: %s", cur.fetchone()[0]) log.info("history: %s", cur.fetchone()[0])
def ingest(args) -> None: def ingest(ds: Dataset, args) -> None:
# Local-file path (dev): ingest a single CSV, no bucket / no archive. # Local-file path (dev): ingest a single CSV, no bucket / no archive / no history.
if args.inc_csv: if args.local_csv:
rows = _load_csv_local(args.inc_csv) rows = _load_csv_local(args.local_csv)
name = os.path.basename(args.inc_csv) name = os.path.basename(args.local_csv)
ts = _ts_from_key(_INC_PREFIX + name) ts = _ts_from_key(ds, ds.change_prefix + name)
meta = {"export_type": "full", "source_s3_key": name, "row_count": len(rows)} meta = {"export_type": "full", "source_s3_key": name, "row_count": len(rows)}
if ts: if ts:
meta["exported_at"] = ts.isoformat() meta["exported_at"] = ts.isoformat()
upsert(rows, args.apply, meta=meta) upsert(ds, rows, args.apply, meta=meta)
return return
# --from-bucket: newest INC csv → skip-if-unchanged → ingest → archive. # --from-bucket: ingest EVERY not-yet-processed change file, oldest→newest
# (baseline first, then each delta), upserting each. The watermark advances and
# the file is archived PER file, so a mid-run failure leaves a consistent state
# (folder state matches the watermark) and the next run resumes cleanly.
s3 = _s3_client() s3 = _s3_client()
listing = _list_inc_csvs(s3) listing = _list_csvs(s3, ds)
if not listing: if not listing:
log.info("no INC csv files under %s — nothing to do", _INC_PREFIX) log.info("no %s change files under %s — nothing to do", ds.name, ds.change_prefix)
return return
listing.sort(key=lambda ke: _ts_from_key(ke[0]) or datetime.min.replace(tzinfo=_EAT)) listing.sort(key=lambda ke: _ts_from_key(ds, ke[0]) or datetime.min.replace(tzinfo=_EAT))
all_keys = [k for k, _ in listing]
newest_key, newest_etag = listing[-1]
log.info("newest INC file: %s (etag=%s; %d file(s) present)",
newest_key, newest_etag, len(listing))
last_etag = _last_processed_etag() # watermark: skip anything at/older than the newest file already applied. Archiving
if newest_etag and newest_etag == last_etag: # normally empties changes/, but this guards a failed archive from re-applying.
log.info("etag unchanged from last processed (%s) — skipping DB write", last_etag) # --reseed ignores the stored watermark and drains EVERY file in changes/ once — used
# for a one-time bucket cutover, where the stored key points at the old bucket's stream
# and its timestamp may be newer than the new bucket's first file. Crash-safe: each file
# still advances source_max_key + archives per file, so a plain rerun resumes cleanly.
last_ts = None if args.reseed else _last_processed_ts(ds)
_floor = datetime.min.replace(tzinfo=_EAT)
pending = [(k, e) for k, e in listing
if last_ts is None or (_ts_from_key(ds, k) or _floor) > last_ts]
if not pending:
log.info("all %d %s change file(s) already processed (watermark %s) — nothing new",
len(listing), ds.name, last_ts and last_ts.isoformat())
if args.apply: if args.apply:
_move_processed(s3, all_keys) _move_processed(s3, ds, [k for k, _ in listing]) # archive any stragglers
_capture_history() # still record today's snapshot even when unchanged if ds.post_apply:
else: ds.post_apply()
log.info("DRY-RUN — would archive %d file(s) to %s", len(all_keys), _PROCESSED_PREFIX)
return return
log.info("%d of %d %s change file(s) to ingest (watermark %s); newest=%s",
len(pending), len(listing), ds.name, last_ts and last_ts.isoformat(), pending[-1][0])
rows = _parse_csv(_get_text(s3, newest_key)) total = 0
ts = _ts_from_key(newest_key) for i, (key, etag) in enumerate(pending):
meta = {"export_type": "full", "source_s3_key": newest_key, rows = _parse_csv(_get_text(s3, key))
"source_etag": newest_etag, "row_count": len(rows)} ts = _ts_from_key(ds, key)
if ts: # the first file applied onto an empty watermark is the full baseline; every
meta["exported_at"] = ts.isoformat() # file after is a delta. export_type is informational (recorded in import_meta).
upsert(rows, args.apply, meta=meta) meta = {"export_type": "baseline" if (last_ts is None and i == 0) else "delta",
if args.apply: "source_s3_key": key, "source_etag": etag,
_move_processed(s3, all_keys) "source_max_key": key, "row_count": len(rows)}
_capture_history() if ts:
else: meta["exported_at"] = ts.isoformat()
log.info("DRY-RUN — would archive %d file(s) to %s", len(all_keys), _PROCESSED_PREFIX) # rows + watermark (source_max_key) commit in one txn, advancing per file; only
# then archive, so the changes/ folder state always matches the watermark.
total += upsert(ds, rows, args.apply, meta=meta)
if args.apply:
_move_processed(s3, ds, [key])
else:
log.info("DRY-RUN — would archive %s to %s", key, ds.processed_prefix)
log.info("ingested %d %s change file(s); %d rows kept in total", len(pending), ds.name, total)
if args.apply and ds.post_apply:
ds.post_apply()
# ── place extraction (strip network codes, keep the real place) ─────────────── # ── place extraction (strip network codes, keep the real place) ───────────────
@ -445,7 +480,7 @@ def geocode(query: str, viewbox: tuple | None = None) -> tuple[float, float, flo
return None return None
# ── cluster gazetteer (coarse fallback) ─────────────────────────────────────── # ── cluster gazetteer (coarse fallback; CROSS-DATASET: inc + crq) ─────────────
def geocode_clusters(apply: bool) -> None: def geocode_clusters(apply: bool) -> None:
with get_conn() as conn: with get_conn() as conn:
with conn.cursor() as cur: with conn.cursor() as cur:
@ -492,7 +527,7 @@ def geocode_clusters(apply: bool) -> None:
log.info("gazetteer: %d clusters written (unverified — review tickets.geo_clusters)", written) log.info("gazetteer: %d clusters written (unverified — review tickets.geo_clusters)", written)
# ── per-location geocoding (precise; actionable INC) ────────────────────────── # ── per-location geocoding (precise; actionable inc + crq) ────────────────────
# A location geocode is only trusted if it lands within this radius of the # A location geocode is only trusted if it lands within this radius of the
# cluster centroid; otherwise the geocoder matched the landmark in the wrong # cluster centroid; otherwise the geocoder matched the landmark in the wrong
# place and we fall back to the cluster centroid. # place and we fall back to the cluster centroid.
@ -507,17 +542,25 @@ def geocode_locations(apply: bool) -> None:
""" """
SELECT t.key, t.location_name, t.cluster, t.region, gc.lat AS clat, gc.lng AS clng SELECT t.key, t.location_name, t.cluster, t.region, gc.lat AS clat, gc.lng AS clng
FROM ( FROM (
SELECT tickets.norm_cluster(raw->>'location_name') AS key, SELECT tickets.norm_cluster(src.raw->>'location_name') AS key,
(array_agg(raw->>'location_name'))[1] AS location_name, (array_agg(src.raw->>'location_name'))[1] AS location_name,
(array_agg(raw->>'cluster'))[1] AS cluster, (array_agg(src.raw->>'cluster'))[1] AS cluster,
(array_agg(raw->>'region'))[1] AS region, (array_agg(src.raw->>'region'))[1] AS region,
tickets.norm_cluster((array_agg(raw->>'cluster'))[1]) AS ckey tickets.norm_cluster((array_agg(src.raw->>'cluster'))[1]) AS ckey
FROM tickets.inc FROM (
WHERE (raw->>'is_actionable')::boolean -- CROSS-DATASET: actionable INC + CRQ share one location gazetteer
AND raw->>'location_name' IS NOT NULL SELECT raw FROM tickets.inc
AND tickets.norm_cluster(raw->>'location_name') IS NOT NULL WHERE (raw->>'is_actionable')::boolean
AND NOT EXISTS (SELECT 1 FROM tickets.geo_locations gl AND raw->>'location_name' IS NOT NULL
WHERE gl.query_key = tickets.norm_cluster(raw->>'location_name') AND tickets.norm_cluster(raw->>'location_name') IS NOT NULL
UNION ALL
SELECT raw FROM tickets.crq
WHERE (raw->>'is_actionable')::boolean
AND raw->>'location_name' IS NOT NULL
AND tickets.norm_cluster(raw->>'location_name') IS NOT NULL
) src
WHERE NOT EXISTS (SELECT 1 FROM tickets.geo_locations gl
WHERE gl.query_key = tickets.norm_cluster(src.raw->>'location_name')
AND gl.geom IS NOT NULL) AND gl.geom IS NOT NULL)
GROUP BY 1 GROUP BY 1
) t ) t
@ -525,7 +568,7 @@ def geocode_locations(apply: bool) -> None:
""" """
) )
todo = cur.fetchall() todo = cur.fetchall()
log.info("%d actionable-INC locations to geocode (provider=%s)", len(todo), _PROVIDER) log.info("%d actionable inc+crq locations to geocode (provider=%s)", len(todo), _PROVIDER)
if not apply: if not apply:
for key, loc, cluster, region, clat, clng in todo[:50]: for key, loc, cluster, region, clat, clng in todo[:50]:
log.info(" %s -> %s", key, " | ".join(compose_queries(loc, cluster, region))) log.info(" %s -> %s", key, " | ".join(compose_queries(loc, cluster, region)))
@ -579,37 +622,3 @@ def _resolve() -> int:
with conn.cursor() as cur: with conn.cursor() as cur:
cur.execute("SELECT tickets.resolve_ticket_geoms()") cur.execute("SELECT tickets.resolve_ticket_geoms()")
return cur.fetchone()[0] return cur.fetchone()[0]
# ── entrypoint ────────────────────────────────────────────────────────────────
def main() -> None:
ap = argparse.ArgumentParser(description="Ingest INC tickets from CSV (raw-first) + geocode")
ap.add_argument("--apply", action="store_true", help="Write to DB (default: dry-run)")
ap.add_argument("--from-bucket", action="store_true",
help="Ingest the newest INC csv from the rustfs tickets bucket (aws CLI); "
"skips if unchanged (ETag) and archives processed files")
ap.add_argument("--inc-csv", default=None, help="Local INC tickets CSV file (dev)")
ap.add_argument("--geocode-clusters", action="store_true",
help="Geocode distinct clusters into the gazetteer, then re-resolve geoms")
ap.add_argument("--geocode-locations", action="store_true",
help="Geocode actionable-INC location_names precisely (keyed provider), then re-resolve")
ap.add_argument("--capture-history", action="store_true",
help="Run tickets.capture_history() standalone (closure_events + daily snapshot)")
args = ap.parse_args()
if args.geocode_clusters:
geocode_clusters(apply=args.apply)
return
if args.geocode_locations:
geocode_locations(apply=args.apply)
return
if args.capture_history:
_capture_history()
return
if not (args.from_bucket or args.inc_csv):
ap.error("provide --from-bucket, --inc-csv, --geocode-clusters, --geocode-locations, or --capture-history")
ingest(args)
if __name__ == "__main__":
main()

View file

@ -18,10 +18,12 @@ dev = [
"ruff>=0.4", "ruff>=0.4",
] ]
# Flat-module project (no package dir) — list the top-level modules explicitly so # Shared engine (pipeline) + helpers as top-level modules, plus the thin per-type
# `pip install .` works (the Docker image installs the project to pull its deps). # entrypoint packages (inc/, crq/). Listed explicitly so `pip install .` works (the
# Docker image installs the project to pull its deps; runtime runs from /app via -m).
[tool.setuptools] [tool.setuptools]
py-modules = ["import_tickets", "shared", "run_migrations"] py-modules = ["pipeline", "shared", "run_migrations"]
packages = ["inc", "crq"]
[tool.uv] [tool.uv]
managed = true managed = true

View file

@ -1,13 +1,17 @@
#!/usr/bin/env bash #!/usr/bin/env bash
# run_ingest.sh — fleettickets · hourly INC ingest wrapper for cron. # run_ingest.sh — fleettickets · INC + CRQ ingest wrapper for cron (plain host/VM).
# #
# Loads env from the local .env (DATABASE_URL + RUSTFS_* + GEOCODER_*) and runs the # Loads env from the local .env (DATABASE_URL + RUSTFS_* + GEOCODER_*) and drains
# newest-INC-CSV ingest with --apply (skip-if-unchanged + archive are built in). # both ticket change streams with --apply (watermark skip-if-unchanged + per-file
# archive are built in, so a run with no new files is a cheap no-op).
# #
# Install on the instance (ingest at :15, 07:0019:00 EAT): # Install on the instance (every 20 min, 06:0020:40 EAT):
# 15 7-19 * * * /opt/fleettickets/run_ingest.sh >> /var/log/fleettickets-inc.log 2>&1 # */20 6-20 * * * /opt/fleettickets/run_ingest.sh >> /var/log/fleettickets.log 2>&1
# Ensure the crontab runs in the Africa/Nairobi timezone (CRON_TZ=Africa/Nairobi or # Ensure the crontab runs in the Africa/Nairobi timezone (CRON_TZ=Africa/Nairobi or
# the host/container TZ), since the export filenames and the schedule are EAT. # the host/container TZ), since the export filenames and the schedule are EAT.
#
# On Coolify the two ingests run as separate Scheduled Tasks instead (see Dockerfile
# + docs/deployment-and-operations.md); this wrapper is the plain-host fallback.
set -euo pipefail set -euo pipefail
cd "$(dirname "$0")" cd "$(dirname "$0")"
@ -24,4 +28,7 @@ fi
PY="python" PY="python"
[ -x ".venv/bin/python" ] && PY=".venv/bin/python" [ -x ".venv/bin/python" ] && PY=".venv/bin/python"
exec "$PY" import_tickets.py --from-bucket --apply # Run from the repo root (cwd above) so `-m inc.import_inc` / `-m crq.import_crq`
# resolve the packages alongside pipeline.py + shared.py.
"$PY" -m inc.import_inc --from-bucket --apply
"$PY" -m crq.import_crq --from-bucket --apply