Compare commits
No commits in common. "main" and "fix/inc-changes-stream" have entirely different histories.
main
...
fix/inc-ch
18 changed files with 208 additions and 1126 deletions
|
|
@ -3,7 +3,7 @@
|
|||
# Shared database (the `tickets` schema lives in tracksolid_db; internal Docker host)
|
||||
DATABASE_URL=postgresql://tracksolid_owner:<password>@timescale_db:5432/tracksolid_db
|
||||
|
||||
# S3 — source ticket CDC streams (isptickets bucket, automations/{inc,crq}/changes/<EAT-ts>.csv)
|
||||
# S3 — source INC ticket CDC stream (isptickets bucket, automations/inc/changes/<EAT-ts>.csv)
|
||||
RUSTFS_ENDPOINT=https://s3.rahamafresh.com
|
||||
RUSTFS_ACCESS_KEY=isptickets
|
||||
RUSTFS_SECRET_KEY=<secret>
|
||||
|
|
|
|||
|
|
@ -1,9 +1,7 @@
|
|||
# fleettickets — INC + CRQ ticket ingestion image (Coolify-deployable).
|
||||
# fleettickets — INC ingestion image (Coolify-deployable).
|
||||
# A small batch/cron worker: it has no web server. Coolify keeps the container
|
||||
# running (CMD below) and fires the ingests via two Scheduled Tasks:
|
||||
# python -m inc.import_inc --from-bucket --apply (cron: */20 6-20 * * *)
|
||||
# python -m crq.import_crq --from-bucket --apply (cron: */20 6-20 * * *)
|
||||
# (run from /app so the inc/ and crq/ packages + pipeline.py/shared.py import.)
|
||||
# running (CMD below) and fires the ingest via a Scheduled Task:
|
||||
# python import_tickets.py --from-bucket --apply (cron: */20 6-20 * * *)
|
||||
# Env (set in Coolify): DATABASE_URL, RUSTFS_*, GEOCODER_*. S3 is via boto3 — no
|
||||
# aws CLI needed. psycopg2-binary ships its own libpq, so no build toolchain.
|
||||
FROM python:3.12-slim
|
||||
|
|
|
|||
81
README.md
81
README.md
|
|
@ -1,22 +1,11 @@
|
|||
# fleettickets
|
||||
|
||||
Field-ops **ticket** ingestion, geocoding, and read-schema that powers the
|
||||
Field-ops **INC ticket** ingestion, geocoding, and read-schema that powers the
|
||||
**Tickets** map in FleetOps. Extracted from the `tracksolid` repo into its own module
|
||||
(it previously lived there as migrations 21–23 + `tools/import_tickets.py`).
|
||||
|
||||
Two ticket types, identical 32-column source schema and CDC change stream, served
|
||||
through a **shared engine** (`pipeline.py`) with a thin per-type entrypoint each:
|
||||
|
||||
- **INC** — incident / customer-fault tickets → `tickets.inc` (`inc/import_inc.py`).
|
||||
Full feature set: typed columns, geocoding, SLA view, dashboard fn, history capture.
|
||||
- **CRQ** — new-installation requests → `tickets.crq` (`crq/import_crq.py`). **Data
|
||||
layer + map** (typed columns, geocoding, appears on the Tickets map via
|
||||
`fn_tickets_for_map`). SLA view / dashboard fn / history capture are deferred —
|
||||
installation-lifecycle semantics differ from incidents (see roadmap). CRQ gets its
|
||||
**own FleetOps tab**, same look & feel as INC.
|
||||
|
||||
Geocoding is **cross-dataset** (one gazetteer, one geocoder budget, covers inc + crq)
|
||||
and is driven from the INC entrypoint.
|
||||
- **INC** — incident / customer-fault tickets *(this pipeline is **strictly INC**)*
|
||||
- **CRQ** — new-installation requests *(schema kept, but **out of scope** — not ingested here)*
|
||||
|
||||
## What this owns
|
||||
|
||||
|
|
@ -32,10 +21,7 @@ and is driven from the INC entrypoint.
|
|||
| `migrations/08_inc_open_sla_view.sql` | `tickets.inc_open_sla` view — open (`is_actionable`) tickets with **derived SLA** (`hours_open`, `sla_state` vs 48h; clock = `created_at_service` ∥ `first_seen_at`), plus team/cluster/`geog` for dispatch |
|
||||
| `migrations/09_inc_dashboard_fn.sql` | `reporting.fn_inc_dashboard(cluster, status, window, from, to)` — one JSON payload (`window` / `open` GeoJSON / `closed` GeoJSON / `metrics` / `freshness`) powering the FleetOps live INC map. Open=live, closed=windowed (EAT calendar / custom); filters AND |
|
||||
| `migrations/10_inc_history_capture.sql` | History for time-series: `tickets.closure_events` (append-only observed closures) + `tickets.inc_daily_snapshot` (per-EAT-day open backlog + flow), populated by `tickets.capture_history()` each ingest. Unlocks **backlog-over-time** |
|
||||
| `migrations/15_crq_table.sql` | **Materializes `tickets.crq`** (table + geom trigger + indexes — `01`'s crq section never ran on the live DB) and unpacks `raw` into the same **typed STORED generated columns** as INC's `03` (reuses `tickets.eat_ts()`). Brings CRQ to data-layer parity |
|
||||
| `pipeline.py` | **Shared engine** — the dataset-agnostic CDC loader (drains `automations/<type>/changes/<EAT-ts>.csv` from the `isptickets` bucket, upserts on `ticket_id` oldest→newest, watermark + per-file archive) and the **cross-dataset** geocoder (clusters + actionable inc/crq locations) |
|
||||
| `inc/import_inc.py` | INC entrypoint (`python -m inc.import_inc`) — INC `Dataset` config + CLI; runs `tickets.capture_history()` after each `--apply`; hosts the shared geocode commands |
|
||||
| `crq/import_crq.py` | CRQ entrypoint (`python -m crq.import_crq`) — CRQ `Dataset` config + CLI (ingest only; no history hook yet) |
|
||||
| `import_tickets.py` | Drains the **incremental INC change stream** from the `isptickets` bucket (`automations/inc/changes/<EAT-timestamp>.csv`), upserting on `ticket_id` oldest→newest; geocodes clusters + INC locations |
|
||||
| `run_migrations.py` | Applies `migrations/*.sql` in order (ledger: `tickets.schema_migrations`) |
|
||||
| `shared.py` | Minimal DB/logging helpers (self-contained — no tracksolid dependency) |
|
||||
|
||||
|
|
@ -100,51 +86,40 @@ python run_migrations.py # apply the schema (idempotent)
|
|||
|
||||
## Run
|
||||
|
||||
Run from the repo root so the `inc`/`crq` packages + `pipeline.py`/`shared.py` import.
|
||||
|
||||
```bash
|
||||
# drain the incremental change streams (every new file oldest→newest, then archive)
|
||||
python -m inc.import_inc --from-bucket --apply
|
||||
python -m crq.import_crq --from-bucket --apply
|
||||
# drain the incremental INC change stream (every new file oldest→newest, then archive)
|
||||
python import_tickets.py --from-bucket --apply
|
||||
|
||||
# geocode — CROSS-DATASET (covers inc + crq); driven from the INC entrypoint, needs GEOCODER_API_KEY
|
||||
python -m inc.import_inc --geocode-clusters --apply # coarse, once
|
||||
python -m inc.import_inc --geocode-locations --apply # precise, actionable inc+crq
|
||||
# geocode (needs GEOCODER_API_KEY)
|
||||
python import_tickets.py --geocode-clusters --apply # coarse, once
|
||||
python import_tickets.py --geocode-locations --apply # precise, actionable INC
|
||||
|
||||
# from a local CSV instead of the bucket (dev)
|
||||
python -m inc.import_inc --inc-csv 2026-06-15T17-00-00.csv --apply
|
||||
python -m crq.import_crq --crq-csv 2026-06-24T12-55-44.csv --apply
|
||||
python import_tickets.py --inc-csv 2026-06-15T17-00-00.csv --apply
|
||||
```
|
||||
|
||||
Dry-run is the default (omit `--apply`). `--from-bucket` talks to S3 via **boto3** using
|
||||
the `RUSTFS_*` env (path-style addressing; no aws-CLI dependency).
|
||||
Dry-run is the default (omit `--apply`). `import_tickets.py --from-bucket` talks to S3
|
||||
via **boto3** using the `RUSTFS_*` env (path-style addressing; no aws-CLI dependency).
|
||||
|
||||
## Deploy (Coolify)
|
||||
|
||||
The repo ships a [`Dockerfile`](Dockerfile) — a small batch worker with no web server.
|
||||
Coolify builds it and keeps the container alive (`CMD tail -f /dev/null`); each ingest
|
||||
runs as its own **Scheduled Task**, not a system crontab:
|
||||
Coolify builds it and keeps the container alive (`CMD tail -f /dev/null`); the ingest
|
||||
runs as a **Scheduled Task**, not a system crontab:
|
||||
|
||||
- **`inc_tickets`:** `python -m inc.import_inc --from-bucket --apply`
|
||||
- **`crq_tickets`:** `python -m crq.import_crq --from-bucket --apply`
|
||||
- **Frequency:** both `*/20 6-20 * * *` (every 20 min, **06:00–20:40 EAT**). This
|
||||
- **Command:** `python import_tickets.py --from-bucket --apply`
|
||||
- **Frequency:** `*/20 6-20 * * *` (every 20 min, **06:00–20:40 EAT**). This
|
||||
Coolify instance runs scheduled tasks in **EAT (Africa/Nairobi)**, so no UTC
|
||||
conversion is needed.
|
||||
- **Env vars** (Coolify → Environment Variables): `DATABASE_URL` (internal DB host),
|
||||
`RUSTFS_*` (the `isptickets` bucket credentials), `GEOCODER_*`. The same bucket holds
|
||||
both `automations/inc/` and `automations/crq/`, so one credential set serves both tasks.
|
||||
`RUSTFS_*` (now the `isptickets` bucket credentials), `GEOCODER_*`.
|
||||
|
||||
The watermark makes a run with no new change files a cheap no-op.
|
||||
|
||||
For a plain host/VM instead of Coolify, [`run_ingest.sh`](run_ingest.sh) loads `.env`
|
||||
and runs **both** ingests; schedule it with a crontab line
|
||||
and runs the ingest; schedule it with a crontab line
|
||||
(`CRON_TZ=Africa/Nairobi` / `*/20 6-20 * * *`).
|
||||
|
||||
Full operational runbook — container, env management (encrypted; via the UI or
|
||||
`artisan tinker`), the **Forgejo → Coolify auto-deploy webhook**, manual deploys, and the
|
||||
source-bucket cutover procedure — is in
|
||||
[`docs/deployment-and-operations.md`](docs/deployment-and-operations.md).
|
||||
|
||||
### Bucket cutover (one-time reseed)
|
||||
|
||||
When the source provider moves the feed to a new bucket (e.g. `tickets` → `isptickets`),
|
||||
|
|
@ -154,8 +129,8 @@ newer than the new bucket's first file — which would otherwise be skipped. Poi
|
|||
which ignores the stored watermark and ingests **every** file in `changes/` oldest→newest:
|
||||
|
||||
```bash
|
||||
python -m inc.import_inc --from-bucket --reseed # dry-run first (or -m crq.import_crq)
|
||||
python -m inc.import_inc --from-bucket --reseed --apply # commit + archive
|
||||
python import_tickets.py --from-bucket --reseed # dry-run first
|
||||
python import_tickets.py --from-bucket --reseed --apply # commit + archive
|
||||
```
|
||||
|
||||
Upserts are idempotent (`ticket_id` PK, rows never deleted) and the new stream's periodic
|
||||
|
|
@ -239,16 +214,6 @@ Findings to keep in mind (see the PRD for detail):
|
|||
|
||||
Live: INC ingestion deployed on Coolify (every 20 min `*/20 6-20 * * *` EAT), schema +
|
||||
generated columns + geocoding + the `inc_open_sla` view in `tracksolid_db`.
|
||||
|
||||
**CRQ (this milestone):** data layer + map — `tickets.crq` fed from
|
||||
`automations/crq/changes/` by `crq/import_crq.py`, the `tickets.crq` table + typed columns (migration 15),
|
||||
cross-dataset geocoding, and visibility on the Tickets map via `fn_tickets_for_map`.
|
||||
One-time seed: drain the isptickets CRQ stream (`python -m crq.import_crq --from-bucket
|
||||
--apply`) — empty watermark + the stream's periodic full-state snapshots converge to
|
||||
current state — then run the shared geocode once. See
|
||||
[`docs/deployment-and-operations.md`](docs/deployment-and-operations.md).
|
||||
|
||||
Next (Phase 2): bring CRQ to full INC parity once installation-lifecycle semantics are
|
||||
confirmed — a `crq_open_sla` view, `fn_crq_dashboard`, and CRQ history capture (the INC
|
||||
analogues of migrations 08/09/10). Then time-series analytics (closure rate, MTTR/SLA
|
||||
trends), FleetNow vehicle **dispatch** off `geog`, and **team closure attribution**.
|
||||
Next (Phase 2): time-series analytics (closure rate, MTTR/SLA trends), then FleetNow
|
||||
vehicle **dispatch** off `geog`, and **team closure attribution**. **CRQ** is a
|
||||
separate future project that will reuse this machinery against `automations/crq/`.
|
||||
|
|
|
|||
|
|
@ -1,61 +0,0 @@
|
|||
"""
|
||||
crq/import_crq.py — Fireside Communications · CRQ (new-installation) ingestion.
|
||||
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||||
Thin entrypoint over the shared engine (`pipeline.py`) for the CRQ dataset:
|
||||
tickets.crq — new-installation requests (FleetOps "Tickets" CRQ tab)
|
||||
|
||||
CRQ mirrors INC at the data layer — IDENTICAL 32-column CSV schema and the same
|
||||
incremental CDC change stream automations/crq/changes/<EAT-ts>.csv in the
|
||||
`isptickets` bucket. This loader upserts on ticket_id, advances the per-dataset
|
||||
watermark (tickets.import_meta dataset='crq'), and archives each consumed file to
|
||||
automations/crq/processed/. CRQ flows onto the existing Tickets map via
|
||||
reporting.fn_tickets_for_map (which already unions tickets.crq).
|
||||
|
||||
Scope (current): data layer + map only. CRQ has NO post-apply history capture yet
|
||||
(installation-lifecycle SLA/backlog semantics differ from incidents — a future
|
||||
migration). Geocoding is CROSS-DATASET and run from the INC entrypoint
|
||||
(python -m inc.import_inc --geocode-clusters / --geocode-locations) against the
|
||||
shared gazetteer, which covers both inc and crq.
|
||||
|
||||
Usage (needs DATABASE_URL + RUSTFS_* env; see .env.example):
|
||||
python -m crq.import_crq --from-bucket --apply
|
||||
python -m crq.import_crq --from-bucket --reseed --apply # one-time bucket cutover
|
||||
python -m crq.import_crq --crq-csv 2026-06-24T12-55-44.csv --apply
|
||||
|
||||
Pre-requisite: migrations applied (run_migrations.py) — tickets.crq + its typed
|
||||
columns (15_crq_table.sql) + geo_clusters/geo_locations + fn_tickets_for_map.
|
||||
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
|
||||
import pipeline
|
||||
|
||||
# CRQ has no post-apply hook yet (history capture is INC-only — see module docstring).
|
||||
DATASET = pipeline.make_dataset("crq", post_apply=None)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
ap = argparse.ArgumentParser(
|
||||
description="Ingest CRQ (installation) tickets from CSV (raw-first)")
|
||||
ap.add_argument("--apply", action="store_true", help="Write to DB (default: dry-run)")
|
||||
ap.add_argument("--from-bucket", action="store_true",
|
||||
help="Drain the incremental CRQ change stream (automations/crq/changes/) "
|
||||
"from the isptickets S3 bucket: every not-yet-processed file "
|
||||
"oldest→newest, upsert on ticket_id, advance the watermark, archive")
|
||||
ap.add_argument("--reseed", action="store_true",
|
||||
help="Ignore the stored watermark and drain every file in changes/ once "
|
||||
"(one-time bucket cutover / reseed). Use with --from-bucket --apply")
|
||||
ap.add_argument("--crq-csv", dest="local_csv", default=None,
|
||||
help="Local CRQ tickets CSV file (dev)")
|
||||
args = ap.parse_args()
|
||||
|
||||
if not (args.from_bucket or args.local_csv):
|
||||
ap.error("provide --from-bucket or --crq-csv")
|
||||
pipeline.ingest(DATASET, args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
@ -1,202 +0,0 @@
|
|||
# Deployment & Operations — fleettickets
|
||||
|
||||
Operational runbook for the INC + CRQ ingest pipelines as deployed on **Coolify**
|
||||
(host `kianiadee@twala.rahamafresh.com`, key `~/.ssh/id_ed25519`). Covers the
|
||||
container, environment, schedule, auto-deploy webhook, the source-bucket cutover
|
||||
procedure, and verification. Secrets are referenced by **where to retrieve them**,
|
||||
never by value.
|
||||
|
||||
> **One image, two datasets.** INC and CRQ share an identical 32-column source schema
|
||||
> and the same `isptickets` bucket; they run as **two Scheduled Tasks** off the one
|
||||
> container, via thin entrypoints `python -m inc.import_inc` / `python -m crq.import_crq`
|
||||
> over the shared `pipeline.py` engine. Everything below applies to both unless noted.
|
||||
|
||||
## What's deployed
|
||||
|
||||
| Thing | Detail |
|
||||
|---|---|
|
||||
| Coolify app | **`fleettickets`** — id `15`, uuid `g14mwzo73q20g70vc6fzumya`, build pack `dockerfile`, git `main` |
|
||||
| Container | built from this repo's `Dockerfile` (`python:3.12-slim`, `TZ=Africa/Nairobi`); kept alive with `tail -f /dev/null` (no web server) |
|
||||
| Ingest (INC) | Coolify **Scheduled Task** `inc_tickets` → `python -m inc.import_inc --from-bucket --apply` |
|
||||
| Ingest (CRQ) | Coolify **Scheduled Task** `crq_tickets` → `python -m crq.import_crq --from-bucket --apply` |
|
||||
| DB | `tickets` schema in the shared `tracksolid_db` (internal host `timescale_db:5432`) |
|
||||
| Source | **`isptickets`** S3 bucket, `automations/{inc,crq}/changes/<EAT-ts>.csv` CDC streams (see `../n8n-s3-ticket-exports.md` and `../README.md`) |
|
||||
|
||||
Resolve the live container name (Coolify appends a random suffix):
|
||||
```bash
|
||||
ssh -i ~/.ssh/id_ed25519 kianiadee@twala.rahamafresh.com \
|
||||
'docker ps --filter name=g14mwzo73q20g70vc6fzumya --format "{{.Names}}" | head -1'
|
||||
```
|
||||
|
||||
## Schedule (cron)
|
||||
|
||||
Both Scheduled Tasks (`inc_tickets`, `crq_tickets`) run **`*/20 6-20 * * *`** — every
|
||||
20 min, **06:00–20:40 EAT**. Coolify evaluates task cron in the server timezone
|
||||
(`server_settings.server_timezone` = `Africa/Nairobi`), so **no UTC conversion** — write
|
||||
EAT directly. The `--from-bucket` run is a cheap no-op when no new change file has arrived
|
||||
(watermark guard, per dataset), so a dense schedule is safe.
|
||||
|
||||
To change the frequency, edit the task in the Coolify UI, or in `coolify-db`:
|
||||
```sql
|
||||
UPDATE scheduled_tasks SET frequency = '*/20 6-20 * * *', updated_at = now()
|
||||
WHERE name IN ('inc_tickets', 'crq_tickets');
|
||||
```
|
||||
The `crq_tickets` task is added the same way INC was — in the Coolify UI (Scheduled Tasks
|
||||
→ Add) with command `python -m crq.import_crq --from-bucket --apply`, container
|
||||
`fleettickets`, cron `*/20 6-20 * * *`.
|
||||
Coolify's scheduler re-reads `scheduled_tasks` each minute, so the change is picked up
|
||||
without a redeploy. Execution history: `scheduled_task_executions`.
|
||||
|
||||
> The repo's `Dockerfile`, `run_ingest.sh`, and `README.md` document this same cron for
|
||||
> the plain-host/VM fallback (`CRON_TZ=Africa/Nairobi`).
|
||||
|
||||
## Environment variables
|
||||
|
||||
Set on the Coolify app (Environment Variables). Names only — values live in Coolify:
|
||||
|
||||
| Var | Purpose |
|
||||
|---|---|
|
||||
| `DATABASE_URL` | `tracksolid_db` (internal `timescale_db:5432`) |
|
||||
| `RUSTFS_ENDPOINT` | `https://s3.rahamafresh.com` |
|
||||
| `RUSTFS_ACCESS_KEY` / `RUSTFS_SECRET_KEY` | `isptickets` bucket credentials |
|
||||
| `RUSTFS_REGION` | `us-east-1` |
|
||||
| `TICKETS_BUCKET` | `isptickets` |
|
||||
| `GEOCODER_PROVIDER` / `GEOCODER_API_KEY` | keyed geocoder (LocationIQ/OpenCage) |
|
||||
|
||||
**Env vars are Laravel-encrypted in `coolify-db` — never raw-`UPDATE` them.** Change them
|
||||
in the Coolify UI, or via `artisan tinker` (which re-encrypts on save):
|
||||
```bash
|
||||
ssh -i ~/.ssh/id_ed25519 kianiadee@twala.rahamafresh.com 'docker exec -i coolify php artisan tinker' <<'PHP'
|
||||
$e = \App\Models\EnvironmentVariable::where('resourceable_type','App\\Models\\Application')
|
||||
->where('resourceable_id',15)->where('key','TICKETS_BUCKET')->first();
|
||||
$e->value = 'isptickets'; $e->save(); echo $e->value.PHP_EOL;
|
||||
PHP
|
||||
```
|
||||
An env change only takes effect after the container is **recreated** (a redeploy — see below),
|
||||
since Coolify injects env at container create time.
|
||||
|
||||
## Deploys
|
||||
|
||||
### Auto-deploy (Forgejo → Coolify webhook)
|
||||
|
||||
A push to `main` should auto-deploy. This needs **both** the Coolify per-app Auto-Deploy
|
||||
toggle (Configuration → Advanced) **and** a webhook on the Forgejo repo. The webhook was
|
||||
missing originally (the toggle alone is not enough); it now exists as hook id `3` on
|
||||
`kianiadee/fleettickets`:
|
||||
|
||||
| Field | Value |
|
||||
|---|---|
|
||||
| URL | `https://stage.rahamafresh.com/webhooks/source/gitea/events/manual` |
|
||||
| Type / content-type | `gitea` / `json` |
|
||||
| Events / branch filter | `push` / `main` |
|
||||
| Secret | the app's `manual_webhook_secret_gitea` (Coolify HMAC-validates `X-Hub-Signature-256`) |
|
||||
|
||||
Recreate / inspect it via the Forgejo API (auth: `git credential fill`, host
|
||||
`repo.rahamafresh.com`, basic auth to `/api/v1` — no `tea`/`gh` needed). Get the secret by
|
||||
decrypting it in Coolify:
|
||||
```bash
|
||||
ssh -i ~/.ssh/id_ed25519 kianiadee@twala.rahamafresh.com \
|
||||
"docker exec -i coolify php artisan tinker --execute=\"echo \\App\\Models\\Application::find(15)->manual_webhook_secret_gitea;\""
|
||||
```
|
||||
```bash
|
||||
# list / test the webhook (USER:PASS from git credential fill)
|
||||
curl -s -u "$USER:$PASS" https://repo.rahamafresh.com/api/v1/repos/kianiadee/fleettickets/hooks
|
||||
curl -s -u "$USER:$PASS" -X POST https://repo.rahamafresh.com/api/v1/repos/kianiadee/fleettickets/hooks/3/tests
|
||||
```
|
||||
A successful test shows a webhook hit in `docker logs coolify` (no `invalid_signature`
|
||||
audit) and a new row in `application_deployment_queues`.
|
||||
|
||||
### Manual deploy (no push)
|
||||
|
||||
Trigger the same action as Coolify's Deploy button via tinker:
|
||||
```bash
|
||||
ssh -i ~/.ssh/id_ed25519 kianiadee@twala.rahamafresh.com 'docker exec -i coolify php artisan tinker' <<'PHP'
|
||||
$app = \App\Models\Application::where('uuid','g14mwzo73q20g70vc6fzumya')->first();
|
||||
$uuid = new \Visus\Cuid2\Cuid2;
|
||||
echo json_encode(queue_application_deployment(
|
||||
application: $app, deployment_uuid: $uuid, force_rebuild: false, is_api: true)).PHP_EOL;
|
||||
echo $uuid.PHP_EOL;
|
||||
PHP
|
||||
```
|
||||
Watch it: `SELECT id, status, created_at FROM application_deployment_queues WHERE
|
||||
application_id = '15' ORDER BY created_at DESC LIMIT 3;` (note: `application_id` is the
|
||||
**numeric id stored as text**).
|
||||
|
||||
## Source-bucket cutover (when the provider moves buckets)
|
||||
|
||||
If the provider moves the INC feed to a new bucket (as happened `tickets` → `isptickets`,
|
||||
2026-06-25):
|
||||
|
||||
1. **Inspect** the new bucket (read-only) — confirm `automations/{inc,crq}/changes/` layout,
|
||||
timestamp range, schema parity.
|
||||
2. **Update env** (UI or tinker): `RUSTFS_ACCESS_KEY`, `RUSTFS_SECRET_KEY`,
|
||||
`TICKETS_BUCKET` → the new bucket (endpoint usually unchanged). Both datasets read the
|
||||
same bucket, so one env change serves both tasks.
|
||||
3. **Reconcile the DB** to current. The loader drains every `changes/` file newer than the
|
||||
watermark (`tickets.import_meta.metadata.source_max_key`, **per dataset**), oldest→newest,
|
||||
upserting on `ticket_id`:
|
||||
- If the watermark **predates** the new bucket's first file, a normal
|
||||
`--from-bucket --apply` drains the whole new stream — no reseed needed.
|
||||
- Otherwise use **`--reseed`** (ignores the watermark, drains all `changes/` once):
|
||||
`python -m inc.import_inc --from-bucket --reseed --apply` (see README "Bucket cutover").
|
||||
The new stream's periodic full-state re-emissions make this converge even across the
|
||||
cutover gap. Idempotent upserts + never-delete make it non-destructive.
|
||||
- For a one-off, you can run it in the live container with the new creds inlined:
|
||||
`docker exec -e TICKETS_BUCKET=… -e RUSTFS_ACCESS_KEY=… -e RUSTFS_SECRET_KEY=… <container>
|
||||
sh -c "cd /app && python -m inc.import_inc --from-bucket --apply"`.
|
||||
4. **Re-geocode** new clusters/locations: `python -m inc.import_inc --geocode-clusters --apply`
|
||||
then `--geocode-locations --apply` (cross-dataset; existing gazetteer persists; only new
|
||||
keys are looked up).
|
||||
5. **Redeploy** so the Scheduled Task's container picks up the new env (push `main` → webhook,
|
||||
or manual deploy). Old bucket is left untouched for rollback.
|
||||
|
||||
## Bringing CRQ online (one-time seed)
|
||||
|
||||
CRQ was added 2026-06-25 (data layer + map). Migration `15_crq_table.sql` **creates**
|
||||
`tickets.crq` (the live DB's `01` predated its crq section, so the table never existed)
|
||||
plus the typed columns. To seed it from zero on the live DB — once the code + migration are
|
||||
applied (`run_migrations.py`; on the live cutover it was applied out-of-band via the running
|
||||
container, see below):
|
||||
|
||||
1. **Verify** the migration applied: `SELECT 1 FROM tickets.schema_migrations WHERE
|
||||
filename='15_crq_table.sql';` and `\d tickets.crq` shows the table + typed columns.
|
||||
2. **Seed** from isptickets (empty `crq` watermark → drains all `automations/crq/changes/`
|
||||
files oldest→newest; the stream's periodic full-state snapshots converge to current
|
||||
state — same convergence the INC cutover relied on, so **no `--reseed` needed**):
|
||||
```bash
|
||||
python -m crq.import_crq --from-bucket # dry-run first ("N of N change file(s)…")
|
||||
python -m crq.import_crq --from-bucket --apply # commit + archive to crq/processed/
|
||||
```
|
||||
(Or in the live container with `docker exec … sh -c "cd /app && python -m crq.import_crq
|
||||
--from-bucket --apply"`.)
|
||||
3. **Geocode** (cross-dataset; most clusters already resolved from INC, so few new lookups):
|
||||
`python -m inc.import_inc --geocode-clusters --apply` then `--geocode-locations --apply`.
|
||||
4. **Confirm** CRQ on the map: `SELECT reporting.fn_tickets_for_map() -> 'summary';` shows a
|
||||
non-zero `crq` count. The `crq_tickets` Scheduled Task then keeps it current.
|
||||
|
||||
## Verification
|
||||
|
||||
```bash
|
||||
DB=$(docker ps --filter name=timescale_db --format "{{.Names}}" | head -1)
|
||||
docker exec -i "$DB" psql -U postgres -d tracksolid_db <<'SQL'
|
||||
-- watermark + freshness
|
||||
SELECT export_type, records_ingested, ingested_at, metadata->>'source_max_key'
|
||||
FROM tickets.import_meta WHERE dataset='inc';
|
||||
-- counts
|
||||
SELECT count(*) total_inc,
|
||||
count(*) FILTER (WHERE (raw->>'is_actionable')::boolean) AS open
|
||||
FROM tickets.inc;
|
||||
-- map payload sanity
|
||||
SELECT reporting.fn_tickets_for_map() -> 'summary' ->> 'ticket_count';
|
||||
SQL
|
||||
```
|
||||
- New bucket `changes/` empties as files move to `automations/inc/processed/`.
|
||||
- A plain `--from-bucket --apply` reports "nothing new" until the next change file lands.
|
||||
- FleetOps Tickets map freshness reflects the new `ingested_at`.
|
||||
|
||||
## Rollback
|
||||
|
||||
- **Bucket:** revert the three env vars to the old bucket + creds and redeploy. The old
|
||||
bucket and its `processed/` history are untouched; upserts are idempotent and rows are
|
||||
never deleted, so re-running is safe.
|
||||
- **Cron:** `UPDATE scheduled_tasks SET frequency = <old> WHERE name='inc_tickets';`
|
||||
|
|
@ -3,25 +3,14 @@
|
|||
What is actually built and deployed, as of the Phase-1 completion. Companion to
|
||||
`docs/phase-1-ingestion.md` (plan) and `docs/phase-2-dashboard.md` (next).
|
||||
|
||||
## Pipeline (`pipeline.py` engine + `inc/`,`crq/` entrypoints)
|
||||
## Pipeline (`import_tickets.py`)
|
||||
|
||||
The dataset-agnostic CDC engine lives in **`pipeline.py`**, parameterized by a small
|
||||
`Dataset` config (name, table, `automations/<type>/changes|processed/` prefixes, key
|
||||
regex, optional `post_apply` hook). Two thin entrypoints supply that config and the CLI:
|
||||
**`inc/import_inc.py`** (`python -m inc.import_inc`, `post_apply=capture_history`) and
|
||||
**`crq/import_crq.py`** (`python -m crq.import_crq`, no history hook). INC and CRQ share an
|
||||
**identical 32-column source schema**, so the engine is fully shared; geocoding is
|
||||
**cross-dataset** (one gazetteer/budget, unions `tickets.inc` + `tickets.crq`) and is run
|
||||
from the INC entrypoint.
|
||||
|
||||
- **Source:** the incremental CDC stream `automations/<inc|crq>/changes/<EAT-timestamp>.csv`
|
||||
in the **`isptickets`** S3 bucket (endpoint `https://s3.rahamafresh.com`, path-style,
|
||||
region `us-east-1`; was the `tickets` bucket before the 2026-06-25 cutover).
|
||||
- **Source:** newest `automations/inc/<EAT-timestamp>.csv` in the rustfs `tickets`
|
||||
bucket (endpoint `https://s3.rahamafresh.com`, path-style, region `us-east-1`).
|
||||
- **S3 access via boto3** (no aws-CLI dependency): `list_objects_v2` (paginator),
|
||||
`get_object`, `copy_object` + `delete_object` for archiving.
|
||||
- **Watermark:** drains every `changes/` file newer than
|
||||
`tickets.import_meta.metadata.source_max_key`, oldest→newest; reruns with no new file
|
||||
are a cheap no-op. `--reseed` ignores the watermark for a one-time bucket cutover.
|
||||
- **Skip-if-unchanged:** newest S3 **ETag** vs `tickets.import_meta.metadata.source_etag`;
|
||||
equal → skip the DB write (the export re-emits identical content most hours).
|
||||
- **Cleaning:** drop `is_alarm=true` rows + the `EXPORT STOPPED…` sentinel; drop
|
||||
`week_start`/`week_end`, `source_s3_bucket`/`source_s3_key`/`source_snapshot_id`,
|
||||
`department`, `source_type`; normalize `region`→lowercase, `raw_status`→UPPERCASE.
|
||||
|
|
@ -33,11 +22,8 @@ from the INC entrypoint.
|
|||
- **History capture:** after each `--apply` run (ingest or skip), calls
|
||||
`tickets.capture_history()` → appends new closures + upserts today's backlog
|
||||
snapshot.
|
||||
- CLI (`inc`): `--from-bucket` (drain the INC change stream), `--reseed` (ignore the
|
||||
watermark; one-time bucket cutover), `--inc-csv <file>` (local dev), `--apply` (else
|
||||
dry-run), `--geocode-clusters`, `--geocode-locations`, `--capture-history`.
|
||||
- CLI (`crq`): `--from-bucket`, `--reseed`, `--crq-csv <file>`, `--apply` (ingest only;
|
||||
geocoding + history are not on the CRQ entrypoint).
|
||||
- CLI: `--from-bucket` (newest INC csv), `--inc-csv <file>` (local dev), `--apply`
|
||||
(else dry-run), `--geocode-clusters`, `--geocode-locations`, `--capture-history`.
|
||||
|
||||
## Schema / migrations (`tracksolid_db`, applied via `run_migrations.py`)
|
||||
|
||||
|
|
@ -53,8 +39,6 @@ from the INC entrypoint.
|
|||
| 08_inc_open_sla_view | `tickets.inc_open_sla` view (open tickets + derived SLA) |
|
||||
| 09_inc_dashboard_fn | **built** — `reporting.fn_inc_dashboard(cluster, status, window, from, to)`: one JSON payload (open GeoJSON + windowed closed GeoJSON + metrics + freshness) for the FleetOps live INC map. See `docs/phase-2-dashboard.md` |
|
||||
| 10_inc_history_capture | **built** — `tickets.closure_events` (append-only observed closures) + `tickets.inc_daily_snapshot` (per-EAT-day open backlog + flow) + `tickets.capture_history()`; the ingest calls it each `--apply` run. Unlocks backlog-over-time |
|
||||
| 12_inc_dashboard_by_owner | **built** — owner/team breakdown extension to `fn_inc_dashboard` |
|
||||
| 15_crq_table | **built** — materializes `tickets.crq` (table + geom trigger + indexes; `01`'s crq section never ran on the live DB) + the typed STORED generated columns from `03` (reuses `tickets.eat_ts()`). Data-layer parity for the CRQ tab |
|
||||
|
||||
`tickets.inc` columns: `ticket_id` (PK), `raw` (jsonb, source of truth),
|
||||
`normalized_status`/`raw_status`, `bucket`, `is_actionable`, `cluster`/`region`/
|
||||
|
|
@ -69,16 +53,11 @@ from the INC entrypoint.
|
|||
- **Coolify** app built from this repo's `Dockerfile` (`python:3.12-slim`,
|
||||
`TZ=Africa/Nairobi`, keep-alive `tail -f /dev/null`). Separate from the FleetOps
|
||||
web app (`fleet-ops-staging`).
|
||||
- **Scheduled Tasks (two):** `inc_tickets` → `python -m inc.import_inc --from-bucket
|
||||
--apply` and `crq_tickets` → `python -m crq.import_crq --from-bucket --apply`, both cron
|
||||
- **Scheduled Task:** `python import_tickets.py --from-bucket --apply`, cron
|
||||
`*/20 6-20 * * *` in **EAT** (Coolify runs tasks in EAT — no UTC conversion).
|
||||
- **Env vars** (Coolify): `DATABASE_URL` (internal DB host), `RUSTFS_*`
|
||||
(`isptickets` bucket — serves both inc + crq), `GEOCODER_*`.
|
||||
- **Env vars** (Coolify): `DATABASE_URL` (internal DB host), `RUSTFS_*`, `GEOCODER_*`.
|
||||
- For a plain host/VM, `run_ingest.sh` + a crontab line is the alternative.
|
||||
|
||||
Full ops runbook (env management, the Forgejo → Coolify auto-deploy webhook, manual
|
||||
deploys, bucket cutover, verification): **`docs/deployment-and-operations.md`**.
|
||||
|
||||
## State at hand-off
|
||||
|
||||
- `tickets.inc` ≈ 21,312 rows (current non-alarm INC + a few aged-out history rows);
|
||||
|
|
@ -107,12 +86,5 @@ Phase 2 (built): `fn_inc_dashboard` read-API → FleetOps live map (open + close
|
|||
overlay + metrics); history capture (`closure_events` + `inc_daily_snapshot`) for
|
||||
backlog/closure trends. Remaining: `dashboard_api` endpoint + FleetOps SPA (other
|
||||
repos; see `docs/dashboard-api-contract.md`), FleetNow **dispatch** off `geog`,
|
||||
**team closure attribution**.
|
||||
|
||||
**CRQ** (this milestone): the shared engine now feeds `tickets.crq` from
|
||||
`automations/crq/changes/` (`crq/import_crq.py`), with the `tickets.crq` table + typed columns (migration 15) and
|
||||
cross-dataset geocoding — CRQ shows on the Tickets map via `fn_tickets_for_map` (which
|
||||
already unions it) and gets its own FleetOps tab. Deferred to a follow-up once
|
||||
installation-lifecycle semantics are confirmed: the CRQ analogues of migrations
|
||||
08/09/10 — `crq_open_sla`, `fn_crq_dashboard`, and CRQ history capture (`tickets.crq`
|
||||
currently has **no** `post_apply` hook).
|
||||
**team closure attribution**. **CRQ** = separate future project reusing this
|
||||
machinery against `automations/crq/`.
|
||||
|
|
|
|||
|
|
@ -82,11 +82,6 @@ Deployed on **Coolify** (own app, `Dockerfile`, keep-alive worker). Ingest runs
|
|||
**Scheduled Task**: `python import_tickets.py --from-bucket --apply`, cron
|
||||
`15 7-19 * * *` in **EAT**. Env: `DATABASE_URL`, `RUSTFS_*`, `GEOCODER_*`.
|
||||
|
||||
> **Superseded** (historical Phase-1 plan). As built: the loader is now the shared
|
||||
> `pipeline.py` engine with thin entrypoints (`python -m inc.import_inc` / `-m
|
||||
> crq.import_crq`), running as **two** Scheduled Tasks at cron `*/20 6-20 * * *`. See
|
||||
> `implementation.md` and `deployment-and-operations.md`.
|
||||
|
||||
## Data-quality findings (carried into Phase 2)
|
||||
|
||||
- Source `sla_status` ≠ a plain 48h rule, and `mttr` is not wall-clock — pin the
|
||||
|
|
|
|||
|
|
@ -1,17 +1,12 @@
|
|||
"""
|
||||
pipeline.py — Fireside Communications · generic ticket ingestion engine (raw-first)
|
||||
import_tickets.py — Fireside Communications · INC ticket ingestion (raw-first)
|
||||
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||||
The dataset-agnostic core shared by the per-type entrypoints:
|
||||
inc/import_inc.py -> tickets.inc (incidents / customer faults)
|
||||
crq/import_crq.py -> tickets.crq (new-installation requests)
|
||||
Loads the client's field-ops INC ticket snapshots into the `tickets` schema — the
|
||||
source of the FleetOps "Tickets" map.
|
||||
tickets.inc — incidents / customer faults
|
||||
|
||||
Both datasets share an IDENTICAL flat-CSV schema and the same CDC change stream,
|
||||
so the only differences are the table, the S3 prefixes, the import_meta dataset
|
||||
key, and an optional post-apply hook (INC captures closure/backlog history; CRQ
|
||||
does not yet). Those are carried by the `Dataset` config; everything else here is
|
||||
generic. Geocoding is inherently CROSS-DATASET (one gazetteer, one geocoder
|
||||
budget): geocode_clusters / geocode_locations / resolve operate on BOTH tables and
|
||||
are driven from a single entrypoint (the INC one) — never duplicated per dataset.
|
||||
STRICTLY INC: CRQ (new-installation) exports are out of scope and not processed
|
||||
here. `tickets.crq` stays in the schema but is not fed by this pipeline.
|
||||
|
||||
RAW-FIRST: each row stores only `ticket_id` + `raw` (the source record as jsonb).
|
||||
Everything downstream reads from `raw` (resilient to source schema drift). The DB
|
||||
|
|
@ -19,33 +14,50 @@ derives `geom` (see migrations): feed coords (raw lat/lng) -> location geocode
|
|||
(tickets.geo_locations) -> cluster centroid (tickets.geo_clusters) -> none.
|
||||
|
||||
Source data: the n8n S3 export writes CSV files to the `isptickets` bucket under
|
||||
automations/<dataset>/changes/<EAT-timestamp>.csv (e.g. 2026-06-24T09-55-44.csv)
|
||||
automations/inc/changes/<EAT-timestamp>.csv (e.g. 2026-06-24T09-55-44.csv)
|
||||
This is an INCREMENTAL (CDC) stream: the first file is a full current-state
|
||||
baseline, and every later file holds only the rows that CHANGED since the prior
|
||||
export (with periodic full-state re-emissions). Deletions are never emitted. Every
|
||||
file shares the identical flat-CSV schema. We ingest EVERY not-yet-processed file
|
||||
in ASCENDING timestamp order (baseline first, then each delta) — taking only the
|
||||
newest would silently drop the intermediate deltas:
|
||||
export (new + updated tickets, keyed by ticket_id; deletions are never emitted).
|
||||
Every file shares the identical flat-CSV schema (header + rows). We ingest EVERY
|
||||
not-yet-processed file in ASCENDING timestamp order (baseline first, then each
|
||||
delta) — taking only the newest would silently drop the intermediate deltas:
|
||||
- drop is_alarm=true rows + the "EXPORT STOPPED…" truncation-sentinel row;
|
||||
- drop derivable / provenance / zero-info columns (see DROP_FIELDS);
|
||||
- normalize region -> lowercase, raw_status -> UPPERCASE;
|
||||
- upsert on ticket_id (PRIMARY KEY → no duplication; never delete, so closure
|
||||
history accumulates), and advance the watermark in tickets.import_meta
|
||||
(metadata->>'source_max_key' = newest file applied) so reruns skip what's done;
|
||||
- on success, MOVE each file to automations/<dataset>/processed/ (copy + delete).
|
||||
- on success, MOVE each file to automations/inc/processed/ (copy + delete).
|
||||
|
||||
Geocoding (two layers, both via a KEYED provider — public Nominatim rate-limits):
|
||||
--geocode-clusters one coordinate per cluster (coarse fallback; ~50 lookups)
|
||||
--geocode-locations precise per-location for ACTIONABLE INC tickets: parses the
|
||||
real place out of location_name (region+cluster+location_name,
|
||||
network codes stripped), geocodes it, caches in
|
||||
tickets.geo_locations, then re-resolves geoms.
|
||||
Provider/key from env: GEOCODER_PROVIDER (locationiq|opencage), GEOCODER_API_KEY.
|
||||
|
||||
Usage (needs DATABASE_URL + RUSTFS_* + GEOCODER_* env; see .env.example):
|
||||
python import_tickets.py --from-bucket --apply
|
||||
python import_tickets.py --from-bucket --reseed --apply # one-time bucket cutover
|
||||
python import_tickets.py --inc-csv 2026-06-15T17-00-00.csv --apply
|
||||
python import_tickets.py --geocode-clusters --apply
|
||||
python import_tickets.py --geocode-locations --apply
|
||||
|
||||
Pre-requisite: migration applied (run_migrations.py) — tickets.inc/crq +
|
||||
geo_clusters + geo_locations + reporting.fn_tickets_for_map.
|
||||
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import io
|
||||
import argparse
|
||||
import csv
|
||||
import io
|
||||
import math
|
||||
import os
|
||||
import re
|
||||
import time
|
||||
from collections.abc import Callable
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timezone, timedelta
|
||||
|
||||
import boto3
|
||||
|
|
@ -55,10 +67,14 @@ from botocore.config import Config as BotoConfig
|
|||
|
||||
from shared import clean, get_conn, get_logger
|
||||
|
||||
log = get_logger("pipeline")
|
||||
log = get_logger("import_tickets")
|
||||
|
||||
# ── shared ingestion config ─────────────────────────────────────────────────────
|
||||
# ── INC ingestion config ──────────────────────────────────────────────────────
|
||||
_TABLE = "tickets.inc"
|
||||
_DATASET = "inc"
|
||||
_BUCKET = os.getenv("TICKETS_BUCKET", "isptickets")
|
||||
_INC_PREFIX = "automations/inc/changes/" # the incremental (CDC) change stream
|
||||
_PROCESSED_PREFIX = "automations/inc/processed/"
|
||||
_EAT = timezone(timedelta(hours=3)) # Africa/Nairobi — filenames + data are EAT
|
||||
|
||||
# Garbage row the source leaks (commonly the first data line): its ticket_id is the
|
||||
|
|
@ -75,6 +91,11 @@ DROP_FIELDS = frozenset({
|
|||
"department", "source_type",
|
||||
})
|
||||
|
||||
# Only files matching automations/inc/changes/<EAT-timestamp>.csv — the incremental
|
||||
# stream (NOT processed/, NOT the leftover latest.csv/, latest.json/, full/ prefixes).
|
||||
_CHANGE_KEY_RE = re.compile(
|
||||
r"^automations/inc/changes/(\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2})\.csv$")
|
||||
|
||||
# Geocoder (keyed) — public Nominatim rate-limits bulk, so we use LocationIQ/OpenCage.
|
||||
_PROVIDER = os.getenv("GEOCODER_PROVIDER", "locationiq").lower()
|
||||
_API_KEY = os.getenv("GEOCODER_API_KEY", "")
|
||||
|
|
@ -82,34 +103,12 @@ _GEOCODE_INTERVAL_S = float(os.getenv("GEOCODER_MIN_INTERVAL_S", "1.1"))
|
|||
_last_geocode_at = 0.0
|
||||
|
||||
|
||||
# ── dataset config (per ticket type) ────────────────────────────────────────────
|
||||
@dataclass(frozen=True)
|
||||
class Dataset:
|
||||
"""All that distinguishes one ticket type from another in the generic engine."""
|
||||
name: str # 'inc' | 'crq' (import_meta.dataset)
|
||||
table: str # 'tickets.inc' | 'tickets.crq'
|
||||
change_prefix: str # 'automations/<name>/changes/'
|
||||
processed_prefix: str # 'automations/<name>/processed/'
|
||||
key_regex: re.Pattern # matches a <prefix><EAT-ts>.csv key
|
||||
post_apply: Callable[[], None] | None = None # e.g. capture_history (INC only)
|
||||
|
||||
|
||||
def make_dataset(name: str, post_apply: Callable[[], None] | None = None) -> Dataset:
|
||||
"""Build the standard Dataset for a ticket type (inc/crq) — only the name varies."""
|
||||
return Dataset(
|
||||
name=name,
|
||||
table=f"tickets.{name}",
|
||||
change_prefix=f"automations/{name}/changes/",
|
||||
processed_prefix=f"automations/{name}/processed/",
|
||||
# only automations/<name>/changes/<EAT-timestamp>.csv — the incremental stream
|
||||
# (NOT processed/, NOT the leftover latest.csv/, latest.json/, full/ prefixes).
|
||||
key_regex=re.compile(
|
||||
rf"^automations/{name}/changes/(\d{{4}}-\d{{2}}-\d{{2}}T\d{{2}}-\d{{2}}-\d{{2}})\.csv$"),
|
||||
post_apply=post_apply,
|
||||
)
|
||||
|
||||
|
||||
# ── data loading (CSV · incremental CDC change stream · per-file watermark) ─────
|
||||
# The n8n export writes an INCREMENTAL change stream to the `isptickets` bucket under
|
||||
# automations/inc/changes/<EAT-timestamp>.csv: a first full-state baseline, then files
|
||||
# holding only the rows that CHANGED (with periodic full-state re-emissions). We drain
|
||||
# EVERY not-yet-processed file oldest→newest, upsert on ticket_id, advance the watermark
|
||||
# (tickets.import_meta.metadata->>'source_max_key') per file, and archive it to processed/.
|
||||
# S3 access is via boto3 (no aws-CLI dependency → runs cleanly in a slim container).
|
||||
def _s3_client():
|
||||
"""boto3 S3 client for the S3 endpoint (force path-style addressing)."""
|
||||
|
|
@ -124,9 +123,9 @@ def _s3_client():
|
|||
)
|
||||
|
||||
|
||||
def _ts_from_key(ds: Dataset, key: str) -> datetime | None:
|
||||
"""EAT timestamp embedded in an automations/<ds>/changes/<ts>.csv key (or None)."""
|
||||
m = ds.key_regex.match(key)
|
||||
def _ts_from_key(key: str) -> datetime | None:
|
||||
"""EAT timestamp embedded in an automations/inc/changes/<ts>.csv key (or None)."""
|
||||
m = _CHANGE_KEY_RE.match(key)
|
||||
if not m:
|
||||
return None
|
||||
try: # regex shape can match an impossible date (e.g. 9999-99-99T…) — don't crash the sort
|
||||
|
|
@ -135,12 +134,12 @@ def _ts_from_key(ds: Dataset, key: str) -> datetime | None:
|
|||
return None
|
||||
|
||||
|
||||
def _list_csvs(s3, ds: Dataset) -> list[tuple[str, str]]:
|
||||
"""[(key, etag)] for every changes/<ts>.csv of this dataset (excludes processed/ + dirs)."""
|
||||
def _list_inc_csvs(s3) -> list[tuple[str, str]]:
|
||||
"""[(key, etag)] for every automations/inc/changes/<ts>.csv (excludes processed/ + dirs)."""
|
||||
out: list[tuple[str, str]] = []
|
||||
for page in s3.get_paginator("list_objects_v2").paginate(Bucket=_BUCKET, Prefix=ds.change_prefix):
|
||||
for page in s3.get_paginator("list_objects_v2").paginate(Bucket=_BUCKET, Prefix=_INC_PREFIX):
|
||||
for it in page.get("Contents", []):
|
||||
if ds.key_regex.match(it["Key"]):
|
||||
if _CHANGE_KEY_RE.match(it["Key"]):
|
||||
out.append((it["Key"], (it.get("ETag") or "").strip('"')))
|
||||
return out
|
||||
|
||||
|
|
@ -150,22 +149,23 @@ def _get_text(s3, key: str) -> str:
|
|||
return s3.get_object(Bucket=_BUCKET, Key=key)["Body"].read().decode("utf-8")
|
||||
|
||||
|
||||
def _last_processed_ts(ds: Dataset) -> datetime | None:
|
||||
"""Watermark: EAT timestamp of the newest change file already ingested for this dataset.
|
||||
def _last_processed_ts() -> datetime | None:
|
||||
"""Watermark: EAT timestamp of the newest change file already ingested.
|
||||
|
||||
Read from tickets.import_meta (metadata->>'source_max_key', advanced per file as
|
||||
we drain changes/ oldest→newest). None when nothing has been ingested via the
|
||||
changes stream yet (e.g. a brand-new dataset, or the first run after the source
|
||||
switched buckets) — then every file currently in changes/ is processed.
|
||||
changes stream yet (e.g. the first run after the source switched to incremental,
|
||||
where the stored key is an old full-snapshot path) — then every file currently in
|
||||
changes/ is processed.
|
||||
"""
|
||||
with get_conn() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"SELECT metadata->>'source_max_key' FROM tickets.import_meta WHERE dataset = %s",
|
||||
(ds.name,),
|
||||
(_DATASET,),
|
||||
)
|
||||
row = cur.fetchone()
|
||||
return _ts_from_key(ds, row[0]) if row and row[0] else None
|
||||
return _ts_from_key(row[0]) if row and row[0] else None
|
||||
|
||||
|
||||
def _parse_csv(text: str) -> list[dict]:
|
||||
|
|
@ -177,10 +177,10 @@ def _load_csv_local(path: str) -> list[dict]:
|
|||
return list(csv.DictReader(f))
|
||||
|
||||
|
||||
def _move_processed(s3, ds: Dataset, keys: list[str]) -> None:
|
||||
"""Archive listed csv objects to automations/<ds>/processed/ (copy + delete)."""
|
||||
def _move_processed(s3, keys: list[str]) -> None:
|
||||
"""Archive listed INC csv objects to automations/inc/processed/ (copy + delete)."""
|
||||
for key in keys:
|
||||
dst = ds.processed_prefix + key.rsplit("/", 1)[-1]
|
||||
dst = _PROCESSED_PREFIX + key.rsplit("/", 1)[-1]
|
||||
s3.copy_object(Bucket=_BUCKET, CopySource={"Bucket": _BUCKET, "Key": key}, Key=dst)
|
||||
s3.delete_object(Bucket=_BUCKET, Key=key)
|
||||
log.info("archived %s -> %s", key, dst)
|
||||
|
|
@ -206,8 +206,8 @@ def _prepare(row: dict) -> dict:
|
|||
|
||||
|
||||
# ── upsert (raw-first) ────────────────────────────────────────────────────────
|
||||
def _record_meta(cur, ds: Dataset, meta: dict, records_ingested: int) -> None:
|
||||
"""Upsert the snapshot metadata (powers map freshness + holds source_max_key).
|
||||
def _record_meta(cur, meta: dict, records_ingested: int) -> None:
|
||||
"""Upsert the INC snapshot metadata (powers map freshness + holds source_etag).
|
||||
|
||||
Runs on the caller's cursor so the row upsert and the meta write commit
|
||||
together — a half-written state (rows in, meta stale) breaks skip-if-unchanged.
|
||||
|
|
@ -225,58 +225,54 @@ def _record_meta(cur, ds: Dataset, meta: dict, records_ingested: int) -> None:
|
|||
records_ingested = EXCLUDED.records_ingested,
|
||||
n8n_execution_id = EXCLUDED.n8n_execution_id, metadata = EXCLUDED.metadata,
|
||||
ingested_at = now()""",
|
||||
(ds.name, clean(meta.get("export_type")), clean(meta.get("exported_at")),
|
||||
(_DATASET, clean(meta.get("export_type")), clean(meta.get("exported_at")),
|
||||
clean(meta.get("snapshot_date")), clean(meta.get("source_schema")),
|
||||
clean(meta.get("source_table")), meta.get("row_count"), records_ingested,
|
||||
clean(meta.get("n8n_execution_id")), psycopg2.extras.Json(meta)),
|
||||
)
|
||||
|
||||
|
||||
def upsert(ds: Dataset, rows: list[dict], apply: bool, meta: dict | None = None) -> int:
|
||||
def upsert(rows: list[dict], apply: bool, meta: dict | None = None) -> int:
|
||||
meta = meta or {}
|
||||
kept = [r for r in rows if _keep_row(r)]
|
||||
payload = [(clean(r["ticket_id"]), psycopg2.extras.Json(_prepare(r))) for r in kept]
|
||||
log.info("%s: %d rows read, %d kept, %d dropped (alarm/sentinel/no-id)",
|
||||
ds.table, len(rows), len(payload), len(rows) - len(payload))
|
||||
_TABLE, len(rows), len(payload), len(rows) - len(payload))
|
||||
if not apply:
|
||||
log.info("DRY-RUN — nothing written to %s. Use --apply.", ds.table)
|
||||
log.info("DRY-RUN — nothing written to %s. Use --apply.", _TABLE)
|
||||
return len(payload)
|
||||
with get_conn() as conn:
|
||||
with conn.cursor() as cur:
|
||||
psycopg2.extras.execute_values(
|
||||
cur,
|
||||
f"INSERT INTO {ds.table} (ticket_id, raw) VALUES %s "
|
||||
f"INSERT INTO {_TABLE} (ticket_id, raw) VALUES %s "
|
||||
"ON CONFLICT (ticket_id) DO UPDATE SET raw = EXCLUDED.raw, ingested_at = now()",
|
||||
payload, page_size=500,
|
||||
)
|
||||
# same transaction as the upsert: rows + snapshot meta commit atomically
|
||||
_record_meta(cur, ds, meta, len(payload))
|
||||
log.info("upserted %d rows into %s", len(payload), ds.table)
|
||||
_record_meta(cur, meta, len(payload))
|
||||
log.info("upserted %d rows into %s", len(payload), _TABLE)
|
||||
return len(payload)
|
||||
|
||||
|
||||
def capture_history() -> None:
|
||||
"""Append new closures + upsert today's backlog snapshot (tickets.capture_history).
|
||||
|
||||
INC-only today (CRQ install-lifecycle history is a future migration); wired as
|
||||
the INC Dataset's post_apply hook.
|
||||
"""
|
||||
def _capture_history() -> None:
|
||||
"""Append new closures + upsert today's backlog snapshot (tickets.capture_history)."""
|
||||
with get_conn() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("SELECT tickets.capture_history()")
|
||||
log.info("history: %s", cur.fetchone()[0])
|
||||
|
||||
|
||||
def ingest(ds: Dataset, args) -> None:
|
||||
# Local-file path (dev): ingest a single CSV, no bucket / no archive / no history.
|
||||
if args.local_csv:
|
||||
rows = _load_csv_local(args.local_csv)
|
||||
name = os.path.basename(args.local_csv)
|
||||
ts = _ts_from_key(ds, ds.change_prefix + name)
|
||||
def ingest(args) -> None:
|
||||
# Local-file path (dev): ingest a single CSV, no bucket / no archive.
|
||||
if args.inc_csv:
|
||||
rows = _load_csv_local(args.inc_csv)
|
||||
name = os.path.basename(args.inc_csv)
|
||||
ts = _ts_from_key(_INC_PREFIX + name)
|
||||
meta = {"export_type": "full", "source_s3_key": name, "row_count": len(rows)}
|
||||
if ts:
|
||||
meta["exported_at"] = ts.isoformat()
|
||||
upsert(ds, rows, args.apply, meta=meta)
|
||||
upsert(rows, args.apply, meta=meta)
|
||||
return
|
||||
|
||||
# --from-bucket: ingest EVERY not-yet-processed change file, oldest→newest
|
||||
|
|
@ -284,11 +280,11 @@ def ingest(ds: Dataset, args) -> None:
|
|||
# the file is archived PER file, so a mid-run failure leaves a consistent state
|
||||
# (folder state matches the watermark) and the next run resumes cleanly.
|
||||
s3 = _s3_client()
|
||||
listing = _list_csvs(s3, ds)
|
||||
listing = _list_inc_csvs(s3)
|
||||
if not listing:
|
||||
log.info("no %s change files under %s — nothing to do", ds.name, ds.change_prefix)
|
||||
log.info("no INC change files under %s — nothing to do", _INC_PREFIX)
|
||||
return
|
||||
listing.sort(key=lambda ke: _ts_from_key(ds, ke[0]) or datetime.min.replace(tzinfo=_EAT))
|
||||
listing.sort(key=lambda ke: _ts_from_key(ke[0]) or datetime.min.replace(tzinfo=_EAT))
|
||||
|
||||
# watermark: skip anything at/older than the newest file already applied. Archiving
|
||||
# normally empties changes/, but this guards a failed archive from re-applying.
|
||||
|
|
@ -296,25 +292,24 @@ def ingest(ds: Dataset, args) -> None:
|
|||
# for a one-time bucket cutover, where the stored key points at the old bucket's stream
|
||||
# and its timestamp may be newer than the new bucket's first file. Crash-safe: each file
|
||||
# still advances source_max_key + archives per file, so a plain rerun resumes cleanly.
|
||||
last_ts = None if args.reseed else _last_processed_ts(ds)
|
||||
last_ts = None if args.reseed else _last_processed_ts()
|
||||
_floor = datetime.min.replace(tzinfo=_EAT)
|
||||
pending = [(k, e) for k, e in listing
|
||||
if last_ts is None or (_ts_from_key(ds, k) or _floor) > last_ts]
|
||||
if last_ts is None or (_ts_from_key(k) or _floor) > last_ts]
|
||||
if not pending:
|
||||
log.info("all %d %s change file(s) already processed (watermark %s) — nothing new",
|
||||
len(listing), ds.name, last_ts and last_ts.isoformat())
|
||||
log.info("all %d change file(s) already processed (watermark %s) — nothing new",
|
||||
len(listing), last_ts and last_ts.isoformat())
|
||||
if args.apply:
|
||||
_move_processed(s3, ds, [k for k, _ in listing]) # archive any stragglers
|
||||
if ds.post_apply:
|
||||
ds.post_apply()
|
||||
_move_processed(s3, [k for k, _ in listing]) # archive any stragglers
|
||||
_capture_history()
|
||||
return
|
||||
log.info("%d of %d %s change file(s) to ingest (watermark %s); newest=%s",
|
||||
len(pending), len(listing), ds.name, last_ts and last_ts.isoformat(), pending[-1][0])
|
||||
log.info("%d of %d change file(s) to ingest (watermark %s); newest=%s",
|
||||
len(pending), len(listing), last_ts and last_ts.isoformat(), pending[-1][0])
|
||||
|
||||
total = 0
|
||||
for i, (key, etag) in enumerate(pending):
|
||||
rows = _parse_csv(_get_text(s3, key))
|
||||
ts = _ts_from_key(ds, key)
|
||||
ts = _ts_from_key(key)
|
||||
# the first file applied onto an empty watermark is the full baseline; every
|
||||
# file after is a delta. export_type is informational (recorded in import_meta).
|
||||
meta = {"export_type": "baseline" if (last_ts is None and i == 0) else "delta",
|
||||
|
|
@ -324,14 +319,14 @@ def ingest(ds: Dataset, args) -> None:
|
|||
meta["exported_at"] = ts.isoformat()
|
||||
# rows + watermark (source_max_key) commit in one txn, advancing per file; only
|
||||
# then archive, so the changes/ folder state always matches the watermark.
|
||||
total += upsert(ds, rows, args.apply, meta=meta)
|
||||
total += upsert(rows, args.apply, meta=meta)
|
||||
if args.apply:
|
||||
_move_processed(s3, ds, [key])
|
||||
_move_processed(s3, [key])
|
||||
else:
|
||||
log.info("DRY-RUN — would archive %s to %s", key, ds.processed_prefix)
|
||||
log.info("ingested %d %s change file(s); %d rows kept in total", len(pending), ds.name, total)
|
||||
if args.apply and ds.post_apply:
|
||||
ds.post_apply()
|
||||
log.info("DRY-RUN — would archive %s to %s", key, _PROCESSED_PREFIX)
|
||||
log.info("ingested %d change file(s); %d rows kept in total", len(pending), total)
|
||||
if args.apply:
|
||||
_capture_history()
|
||||
|
||||
|
||||
# ── place extraction (strip network codes, keep the real place) ───────────────
|
||||
|
|
@ -480,7 +475,7 @@ def geocode(query: str, viewbox: tuple | None = None) -> tuple[float, float, flo
|
|||
return None
|
||||
|
||||
|
||||
# ── cluster gazetteer (coarse fallback; CROSS-DATASET: inc + crq) ─────────────
|
||||
# ── cluster gazetteer (coarse fallback) ───────────────────────────────────────
|
||||
def geocode_clusters(apply: bool) -> None:
|
||||
with get_conn() as conn:
|
||||
with conn.cursor() as cur:
|
||||
|
|
@ -527,7 +522,7 @@ def geocode_clusters(apply: bool) -> None:
|
|||
log.info("gazetteer: %d clusters written (unverified — review tickets.geo_clusters)", written)
|
||||
|
||||
|
||||
# ── per-location geocoding (precise; actionable inc + crq) ────────────────────
|
||||
# ── per-location geocoding (precise; actionable INC) ──────────────────────────
|
||||
# A location geocode is only trusted if it lands within this radius of the
|
||||
# cluster centroid; otherwise the geocoder matched the landmark in the wrong
|
||||
# place and we fall back to the cluster centroid.
|
||||
|
|
@ -542,25 +537,17 @@ def geocode_locations(apply: bool) -> None:
|
|||
"""
|
||||
SELECT t.key, t.location_name, t.cluster, t.region, gc.lat AS clat, gc.lng AS clng
|
||||
FROM (
|
||||
SELECT tickets.norm_cluster(src.raw->>'location_name') AS key,
|
||||
(array_agg(src.raw->>'location_name'))[1] AS location_name,
|
||||
(array_agg(src.raw->>'cluster'))[1] AS cluster,
|
||||
(array_agg(src.raw->>'region'))[1] AS region,
|
||||
tickets.norm_cluster((array_agg(src.raw->>'cluster'))[1]) AS ckey
|
||||
FROM (
|
||||
-- CROSS-DATASET: actionable INC + CRQ share one location gazetteer
|
||||
SELECT raw FROM tickets.inc
|
||||
SELECT tickets.norm_cluster(raw->>'location_name') AS key,
|
||||
(array_agg(raw->>'location_name'))[1] AS location_name,
|
||||
(array_agg(raw->>'cluster'))[1] AS cluster,
|
||||
(array_agg(raw->>'region'))[1] AS region,
|
||||
tickets.norm_cluster((array_agg(raw->>'cluster'))[1]) AS ckey
|
||||
FROM tickets.inc
|
||||
WHERE (raw->>'is_actionable')::boolean
|
||||
AND raw->>'location_name' IS NOT NULL
|
||||
AND tickets.norm_cluster(raw->>'location_name') IS NOT NULL
|
||||
UNION ALL
|
||||
SELECT raw FROM tickets.crq
|
||||
WHERE (raw->>'is_actionable')::boolean
|
||||
AND raw->>'location_name' IS NOT NULL
|
||||
AND tickets.norm_cluster(raw->>'location_name') IS NOT NULL
|
||||
) src
|
||||
WHERE NOT EXISTS (SELECT 1 FROM tickets.geo_locations gl
|
||||
WHERE gl.query_key = tickets.norm_cluster(src.raw->>'location_name')
|
||||
AND NOT EXISTS (SELECT 1 FROM tickets.geo_locations gl
|
||||
WHERE gl.query_key = tickets.norm_cluster(raw->>'location_name')
|
||||
AND gl.geom IS NOT NULL)
|
||||
GROUP BY 1
|
||||
) t
|
||||
|
|
@ -568,7 +555,7 @@ def geocode_locations(apply: bool) -> None:
|
|||
"""
|
||||
)
|
||||
todo = cur.fetchall()
|
||||
log.info("%d actionable inc+crq locations to geocode (provider=%s)", len(todo), _PROVIDER)
|
||||
log.info("%d actionable-INC locations to geocode (provider=%s)", len(todo), _PROVIDER)
|
||||
if not apply:
|
||||
for key, loc, cluster, region, clat, clng in todo[:50]:
|
||||
log.info(" %s -> %s", key, " | ".join(compose_queries(loc, cluster, region)))
|
||||
|
|
@ -622,3 +609,41 @@ def _resolve() -> int:
|
|||
with conn.cursor() as cur:
|
||||
cur.execute("SELECT tickets.resolve_ticket_geoms()")
|
||||
return cur.fetchone()[0]
|
||||
|
||||
|
||||
# ── entrypoint ────────────────────────────────────────────────────────────────
|
||||
def main() -> None:
|
||||
ap = argparse.ArgumentParser(description="Ingest INC tickets from CSV (raw-first) + geocode")
|
||||
ap.add_argument("--apply", action="store_true", help="Write to DB (default: dry-run)")
|
||||
ap.add_argument("--from-bucket", action="store_true",
|
||||
help="Drain the incremental INC change stream (automations/inc/changes/) "
|
||||
"from the isptickets S3 bucket: every not-yet-processed file "
|
||||
"oldest→newest, upsert on ticket_id, advance the watermark, archive")
|
||||
ap.add_argument("--reseed", action="store_true",
|
||||
help="Ignore the stored watermark and drain every file in changes/ once "
|
||||
"(one-time bucket cutover / reseed). Use with --from-bucket --apply")
|
||||
ap.add_argument("--inc-csv", default=None, help="Local INC tickets CSV file (dev)")
|
||||
ap.add_argument("--geocode-clusters", action="store_true",
|
||||
help="Geocode distinct clusters into the gazetteer, then re-resolve geoms")
|
||||
ap.add_argument("--geocode-locations", action="store_true",
|
||||
help="Geocode actionable-INC location_names precisely (keyed provider), then re-resolve")
|
||||
ap.add_argument("--capture-history", action="store_true",
|
||||
help="Run tickets.capture_history() standalone (closure_events + daily snapshot)")
|
||||
args = ap.parse_args()
|
||||
|
||||
if args.geocode_clusters:
|
||||
geocode_clusters(apply=args.apply)
|
||||
return
|
||||
if args.geocode_locations:
|
||||
geocode_locations(apply=args.apply)
|
||||
return
|
||||
if args.capture_history:
|
||||
_capture_history()
|
||||
return
|
||||
if not (args.from_bucket or args.inc_csv):
|
||||
ap.error("provide --from-bucket, --inc-csv, --geocode-clusters, --geocode-locations, or --capture-history")
|
||||
ingest(args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
@ -1,74 +0,0 @@
|
|||
"""
|
||||
inc/import_inc.py — Fireside Communications · INC (incident / fault) ingestion.
|
||||
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||||
Thin entrypoint over the shared engine (`pipeline.py`) for the INC dataset:
|
||||
tickets.inc — incidents / customer faults (FleetOps "Tickets" INC tab)
|
||||
|
||||
INC reads the incremental CDC change stream automations/inc/changes/<EAT-ts>.csv
|
||||
from the `isptickets` bucket, upserts on ticket_id, advances the per-dataset
|
||||
watermark, archives each file to automations/inc/processed/, and — uniquely to
|
||||
INC — runs tickets.capture_history() after each --apply run (closure_events +
|
||||
daily backlog snapshot). Geocoding (--geocode-clusters / --geocode-locations) is
|
||||
CROSS-DATASET and driven from here (the shared gazetteer covers inc + crq).
|
||||
|
||||
Usage (needs DATABASE_URL + RUSTFS_* + GEOCODER_* env; see .env.example):
|
||||
python -m inc.import_inc --from-bucket --apply
|
||||
python -m inc.import_inc --from-bucket --reseed --apply # one-time bucket cutover
|
||||
python -m inc.import_inc --inc-csv 2026-06-15T17-00-00.csv --apply
|
||||
python -m inc.import_inc --geocode-clusters --apply
|
||||
python -m inc.import_inc --geocode-locations --apply
|
||||
|
||||
Pre-requisite: migrations applied (run_migrations.py) — tickets.inc/crq +
|
||||
geo_clusters + geo_locations + reporting.fn_tickets_for_map.
|
||||
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
|
||||
import pipeline
|
||||
|
||||
# INC captures closure/backlog history after every --apply run (CRQ does not yet).
|
||||
DATASET = pipeline.make_dataset("inc", post_apply=pipeline.capture_history)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
ap = argparse.ArgumentParser(description="Ingest INC tickets from CSV (raw-first) + geocode")
|
||||
ap.add_argument("--apply", action="store_true", help="Write to DB (default: dry-run)")
|
||||
ap.add_argument("--from-bucket", action="store_true",
|
||||
help="Drain the incremental INC change stream (automations/inc/changes/) "
|
||||
"from the isptickets S3 bucket: every not-yet-processed file "
|
||||
"oldest→newest, upsert on ticket_id, advance the watermark, archive")
|
||||
ap.add_argument("--reseed", action="store_true",
|
||||
help="Ignore the stored watermark and drain every file in changes/ once "
|
||||
"(one-time bucket cutover / reseed). Use with --from-bucket --apply")
|
||||
ap.add_argument("--inc-csv", dest="local_csv", default=None,
|
||||
help="Local INC tickets CSV file (dev)")
|
||||
ap.add_argument("--geocode-clusters", action="store_true",
|
||||
help="Geocode distinct clusters (inc+crq) into the gazetteer, then re-resolve")
|
||||
ap.add_argument("--geocode-locations", action="store_true",
|
||||
help="Geocode actionable inc+crq location_names precisely (keyed provider), "
|
||||
"then re-resolve")
|
||||
ap.add_argument("--capture-history", action="store_true",
|
||||
help="Run tickets.capture_history() standalone "
|
||||
"(closure_events + daily snapshot)")
|
||||
args = ap.parse_args()
|
||||
|
||||
if args.geocode_clusters:
|
||||
pipeline.geocode_clusters(apply=args.apply)
|
||||
return
|
||||
if args.geocode_locations:
|
||||
pipeline.geocode_locations(apply=args.apply)
|
||||
return
|
||||
if args.capture_history:
|
||||
pipeline.capture_history()
|
||||
return
|
||||
if not (args.from_bucket or args.local_csv):
|
||||
ap.error("provide --from-bucket, --inc-csv, --geocode-clusters, "
|
||||
"--geocode-locations, or --capture-history")
|
||||
pipeline.ingest(DATASET, args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
@ -1,89 +0,0 @@
|
|||
-- 13_inc_search_fn.sql — fleettickets · INC ticket explorer (search) function
|
||||
-- ─────────────────────────────────────────────────────────────────────────────
|
||||
-- reporting.fn_inc_search — ad-hoc ticket lookup by id / engineer / cluster /
|
||||
-- status / state / time, for the FleetOps "Ticket explorer" card. Returns
|
||||
-- { count, truncated, limit, state, rows }. Consumed by dashboard_api
|
||||
-- GET /webhook/inc-search.
|
||||
--
|
||||
-- RECOVERED INTO VERSION CONTROL 2026-06-26: this migration was applied to the live
|
||||
-- DB on 2026-06-19 but the file was never committed. Recovered verbatim from the live
|
||||
-- definition (pg_get_functiondef) so a fresh DB rebuilds faithfully; the live ledger
|
||||
-- already lists it, so run_migrations skips it there. The crq mirror is in 16.
|
||||
-- Idempotent (CREATE OR REPLACE).
|
||||
-- ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
SET search_path = tickets, public;
|
||||
|
||||
CREATE OR REPLACE FUNCTION reporting.fn_inc_search(
|
||||
p_ticket_id text DEFAULT NULL,
|
||||
p_owner text DEFAULT NULL,
|
||||
p_cluster text DEFAULT NULL,
|
||||
p_status text DEFAULT NULL,
|
||||
p_state text DEFAULT 'closed',
|
||||
p_from timestamptz DEFAULT NULL,
|
||||
p_to timestamptz DEFAULT NULL,
|
||||
p_limit integer DEFAULT 500
|
||||
)
|
||||
RETURNS jsonb LANGUAGE plpgsql STABLE AS $function$
|
||||
DECLARE
|
||||
v_state text := lower(COALESCE(NULLIF(p_state, ''), 'closed'));
|
||||
v_limit integer := LEAST(GREATEST(COALESCE(p_limit, 500), 1), 5000);
|
||||
v_result jsonb;
|
||||
BEGIN
|
||||
p_ticket_id := NULLIF(trim(p_ticket_id), '');
|
||||
p_owner := NULLIF(trim(p_owner), '');
|
||||
p_cluster := NULLIF(p_cluster, '');
|
||||
p_status := NULLIF(p_status, '');
|
||||
|
||||
WITH hits AS (
|
||||
SELECT ticket_id, normalized_status, cluster, region, location_name,
|
||||
initcap(lower(NULLIF(owner, ''))) AS owner, assigned_team,
|
||||
sla_status, mttr, closed_at, created_at_service, is_actionable,
|
||||
CASE WHEN geom IS NOT NULL THEN ST_Y(geom) END AS lat,
|
||||
CASE WHEN geom IS NOT NULL THEN ST_X(geom) END AS lng
|
||||
FROM tickets.inc
|
||||
WHERE (p_ticket_id IS NULL OR ticket_id ILIKE '%' || p_ticket_id || '%')
|
||||
AND (p_owner IS NULL OR lower(owner) LIKE '%' || lower(p_owner) || '%')
|
||||
AND (p_cluster IS NULL OR cluster = p_cluster)
|
||||
AND (p_status IS NULL OR normalized_status = p_status)
|
||||
AND CASE v_state
|
||||
WHEN 'open' THEN COALESCE(is_actionable, false)
|
||||
WHEN 'all' THEN COALESCE(is_actionable, false)
|
||||
OR (closed_at IS NOT NULL
|
||||
AND (p_from IS NULL OR closed_at >= p_from)
|
||||
AND (p_to IS NULL OR closed_at < p_to))
|
||||
ELSE NOT COALESCE(is_actionable, false) -- 'closed'
|
||||
AND closed_at IS NOT NULL
|
||||
AND (p_from IS NULL OR closed_at >= p_from)
|
||||
AND (p_to IS NULL OR closed_at < p_to)
|
||||
END
|
||||
),
|
||||
total AS (SELECT count(*) AS n FROM hits),
|
||||
page AS (
|
||||
SELECT * FROM hits
|
||||
ORDER BY closed_at DESC NULLS LAST, created_at_service DESC NULLS LAST
|
||||
LIMIT v_limit
|
||||
)
|
||||
SELECT jsonb_build_object(
|
||||
'count', (SELECT n FROM total),
|
||||
'truncated', (SELECT n FROM total) > v_limit,
|
||||
'limit', v_limit,
|
||||
'state', v_state,
|
||||
'rows', COALESCE((SELECT jsonb_agg(to_jsonb(page)
|
||||
ORDER BY page.closed_at DESC NULLS LAST,
|
||||
page.created_at_service DESC NULLS LAST)
|
||||
FROM page), '[]'::jsonb)
|
||||
) INTO v_result;
|
||||
|
||||
RETURN v_result;
|
||||
END $function$;
|
||||
|
||||
DO $grants$
|
||||
BEGIN
|
||||
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'dashboard_ro') THEN
|
||||
GRANT EXECUTE ON FUNCTION reporting.fn_inc_search(text, text, text, text, text, timestamptz, timestamptz, integer) TO dashboard_ro;
|
||||
END IF;
|
||||
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'grafana_ro') THEN
|
||||
GRANT EXECUTE ON FUNCTION reporting.fn_inc_search(text, text, text, text, text, timestamptz, timestamptz, integer) TO grafana_ro;
|
||||
END IF;
|
||||
END $grants$;
|
||||
|
|
@ -1,37 +0,0 @@
|
|||
-- 14_inc_filter_options.sql — fleettickets · INC explorer dropdown options
|
||||
-- ─────────────────────────────────────────────────────────────────────────────
|
||||
-- reporting.fn_inc_filter_options — distinct engineers (owner), clusters, and the
|
||||
-- ids of currently-open tickets, for the FleetOps "Ticket explorer" dropdowns.
|
||||
-- Consumed by dashboard_api GET /webhook/inc-filter-options.
|
||||
--
|
||||
-- RECOVERED INTO VERSION CONTROL 2026-06-26: applied to the live DB 2026-06-19 but
|
||||
-- never committed. Recovered verbatim from the live definition so a fresh DB rebuilds
|
||||
-- faithfully; the live ledger already lists it (run_migrations skips it there). The crq
|
||||
-- mirror is in 16. Idempotent (CREATE OR REPLACE).
|
||||
-- ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
SET search_path = tickets, public;
|
||||
|
||||
CREATE OR REPLACE FUNCTION reporting.fn_inc_filter_options()
|
||||
RETURNS jsonb LANGUAGE sql STABLE AS $function$
|
||||
SELECT jsonb_build_object(
|
||||
'owners', (SELECT COALESCE(jsonb_agg(o ORDER BY o), '[]'::jsonb)
|
||||
FROM (SELECT DISTINCT initcap(lower(NULLIF(owner, ''))) AS o
|
||||
FROM tickets.inc WHERE NULLIF(owner, '') IS NOT NULL) s),
|
||||
'clusters', (SELECT COALESCE(jsonb_agg(c ORDER BY c), '[]'::jsonb)
|
||||
FROM (SELECT DISTINCT cluster AS c
|
||||
FROM tickets.inc WHERE NULLIF(cluster, '') IS NOT NULL) s),
|
||||
'open_ticket_ids', (SELECT COALESCE(jsonb_agg(ticket_id ORDER BY ticket_id), '[]'::jsonb)
|
||||
FROM tickets.inc WHERE COALESCE(is_actionable, false))
|
||||
);
|
||||
$function$;
|
||||
|
||||
DO $grants$
|
||||
BEGIN
|
||||
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'dashboard_ro') THEN
|
||||
GRANT EXECUTE ON FUNCTION reporting.fn_inc_filter_options() TO dashboard_ro;
|
||||
END IF;
|
||||
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'grafana_ro') THEN
|
||||
GRANT EXECUTE ON FUNCTION reporting.fn_inc_filter_options() TO grafana_ro;
|
||||
END IF;
|
||||
END $grants$;
|
||||
|
|
@ -1,101 +0,0 @@
|
|||
-- 15_crq_table.sql — fleettickets · materialize tickets.crq + typed columns
|
||||
-- ─────────────────────────────────────────────────────────────────────────────
|
||||
-- Why a NEW migration (not an edit to 01): `01_tickets_schema.sql` was applied to the
|
||||
-- live DB on 2026-06-15 from a version that PREDATED its `tickets.crq` section, so the
|
||||
-- IF-NOT-EXISTS ledger guard has kept crq from ever being created there — even though
|
||||
-- the live `reporting.fn_tickets_for_map` and `tickets.resolve_ticket_geoms` already
|
||||
-- reference it (they error if called until crq exists). This migration creates
|
||||
-- `tickets.crq` self-containedly (table + geom trigger + indexes) and adds the same
|
||||
-- typed STORED generated columns INC got in `03_inc_columns.sql`, bringing CRQ to
|
||||
-- data-layer parity.
|
||||
--
|
||||
-- Deterministic + idempotent — converges to the same shape on BOTH:
|
||||
-- • the live DB (crq missing) -> CREATE makes it, ALTER adds typed cols
|
||||
-- • a fresh DB (crq minimal, from 01) -> CREATE skipped, ALTER adds typed cols
|
||||
-- Reuses shared objects already present: tickets.tg_ticket_geom() (01),
|
||||
-- tickets.norm_cluster() (01), tickets.eat_ts() (03).
|
||||
--
|
||||
-- NOTE: the live DB also carries un-versioned migrations 13_inc_search_fn.sql /
|
||||
-- 14_inc_filter_options.sql (applied 2026-06-19, absent from this repo) — INC dashboard
|
||||
-- functions, unrelated to CRQ. Numbered 15 here to sit cleanly after the live ledger.
|
||||
-- ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
SET search_path = tickets, public;
|
||||
|
||||
-- ── table (base shape mirrors tickets.inc's original 01 base) ────────────────
|
||||
CREATE TABLE IF NOT EXISTS tickets.crq (
|
||||
ticket_id text PRIMARY KEY,
|
||||
raw jsonb NOT NULL,
|
||||
geom geometry(Point, 4326),
|
||||
geo_source text, -- 'feed' | 'location' | 'cluster' | 'none'
|
||||
ingested_at timestamptz NOT NULL DEFAULT now()
|
||||
);
|
||||
|
||||
-- ── geom trigger — read from raw; shared tickets.tg_ticket_geom() (from 01) ───
|
||||
DROP TRIGGER IF EXISTS trg_crq_geom ON tickets.crq;
|
||||
CREATE TRIGGER trg_crq_geom BEFORE INSERT OR UPDATE ON tickets.crq
|
||||
FOR EACH ROW EXECUTE FUNCTION tickets.tg_ticket_geom();
|
||||
|
||||
-- ── raw-based indexes (mirror 01's inc/crq set) ──────────────────────────────
|
||||
CREATE INDEX IF NOT EXISTS ix_crq_status_raw ON tickets.crq ((raw->>'normalized_status'));
|
||||
CREATE INDEX IF NOT EXISTS ix_crq_actionable_raw ON tickets.crq (((raw->>'is_actionable')::boolean))
|
||||
WHERE (raw->>'is_actionable')::boolean;
|
||||
CREATE INDEX IF NOT EXISTS ix_crq_cluster_raw ON tickets.crq (tickets.norm_cluster(raw->>'cluster'));
|
||||
CREATE INDEX IF NOT EXISTS ix_crq_loc_raw ON tickets.crq (tickets.norm_cluster(raw->>'location_name'));
|
||||
CREATE INDEX IF NOT EXISTS ix_crq_geom ON tickets.crq USING gist (geom);
|
||||
|
||||
-- ── typed STORED generated columns (mirror of 03_inc_columns.sql) ────────────
|
||||
-- Computed for ALL existing rows on creation + auto-recomputed on every insert/update;
|
||||
-- `raw` stays the source of truth. tickets.eat_ts() (EAT->timestamptz, IMMUTABLE) is
|
||||
-- reused from 03 — see that file's note on why IMMUTABLE is safe for Kenya (UTC+3, no DST).
|
||||
ALTER TABLE tickets.crq
|
||||
-- text
|
||||
ADD COLUMN IF NOT EXISTS service_type text GENERATED ALWAYS AS (raw->>'service_type') STORED,
|
||||
ADD COLUMN IF NOT EXISTS bucket text GENERATED ALWAYS AS (raw->>'bucket') STORED,
|
||||
ADD COLUMN IF NOT EXISTS raw_status text GENERATED ALWAYS AS (raw->>'raw_status') STORED,
|
||||
ADD COLUMN IF NOT EXISTS normalized_status text GENERATED ALWAYS AS (raw->>'normalized_status') STORED,
|
||||
ADD COLUMN IF NOT EXISTS cluster text GENERATED ALWAYS AS (raw->>'cluster') STORED,
|
||||
ADD COLUMN IF NOT EXISTS region text GENERATED ALWAYS AS (raw->>'region') STORED,
|
||||
ADD COLUMN IF NOT EXISTS location_name text GENERATED ALWAYS AS (raw->>'location_name') STORED,
|
||||
ADD COLUMN IF NOT EXISTS assigned_team text GENERATED ALWAYS AS (raw->>'assigned_team') STORED,
|
||||
ADD COLUMN IF NOT EXISTS owner text GENERATED ALWAYS AS (raw->>'owner') STORED,
|
||||
ADD COLUMN IF NOT EXISTS sla_status text GENERATED ALWAYS AS (raw->>'sla_status') STORED,
|
||||
-- numeric / float
|
||||
ADD COLUMN IF NOT EXISTS mttr numeric GENERATED ALWAYS AS (NULLIF(raw->>'mttr','')::numeric) STORED,
|
||||
ADD COLUMN IF NOT EXISTS latitude double precision GENERATED ALWAYS AS (NULLIF(raw->>'latitude','')::double precision) STORED,
|
||||
ADD COLUMN IF NOT EXISTS longitude double precision GENERATED ALWAYS AS (NULLIF(raw->>'longitude','')::double precision) STORED,
|
||||
-- boolean
|
||||
ADD COLUMN IF NOT EXISTS is_actionable boolean GENERATED ALWAYS AS (NULLIF(raw->>'is_actionable','')::boolean) STORED,
|
||||
ADD COLUMN IF NOT EXISTS is_auto_created boolean GENERATED ALWAYS AS (NULLIF(raw->>'is_auto_created','')::boolean) STORED,
|
||||
ADD COLUMN IF NOT EXISTS is_auto_closed boolean GENERATED ALWAYS AS (NULLIF(raw->>'is_auto_closed','')::boolean) STORED,
|
||||
ADD COLUMN IF NOT EXISTS is_alarm boolean GENERATED ALWAYS AS (NULLIF(raw->>'is_alarm','')::boolean) STORED,
|
||||
-- timestamps (EAT wall-clock -> timestamptz). created_at/updated_at are the EXPORT
|
||||
-- pipeline's bookkeeping (not ticket lifecycle), hence the source_ prefix.
|
||||
ADD COLUMN IF NOT EXISTS created_at_service timestamptz GENERATED ALWAYS AS (tickets.eat_ts(raw->>'created_at_service')) STORED,
|
||||
ADD COLUMN IF NOT EXISTS scheduled_at timestamptz GENERATED ALWAYS AS (tickets.eat_ts(raw->>'scheduled_at')) STORED,
|
||||
ADD COLUMN IF NOT EXISTS closed_at timestamptz GENERATED ALWAYS AS (tickets.eat_ts(raw->>'closed_at')) STORED,
|
||||
ADD COLUMN IF NOT EXISTS last_seen_at timestamptz GENERATED ALWAYS AS (tickets.eat_ts(raw->>'last_seen_at')) STORED,
|
||||
ADD COLUMN IF NOT EXISTS first_seen_at timestamptz GENERATED ALWAYS AS (tickets.eat_ts(raw->>'first_seen_at')) STORED,
|
||||
ADD COLUMN IF NOT EXISTS source_created_at timestamptz GENERATED ALWAYS AS (tickets.eat_ts(raw->>'created_at')) STORED,
|
||||
ADD COLUMN IF NOT EXISTS source_updated_at timestamptz GENERATED ALWAYS AS (tickets.eat_ts(raw->>'updated_at')) STORED;
|
||||
|
||||
-- ── typed-column indexes (serve cluster / team / closure queries) ────────────
|
||||
CREATE INDEX IF NOT EXISTS ix_crq_norm_status_col ON tickets.crq (normalized_status);
|
||||
CREATE INDEX IF NOT EXISTS ix_crq_cluster_col ON tickets.crq (cluster);
|
||||
CREATE INDEX IF NOT EXISTS ix_crq_assigned_team ON tickets.crq (assigned_team);
|
||||
CREATE INDEX IF NOT EXISTS ix_crq_closed_at ON tickets.crq (closed_at);
|
||||
CREATE INDEX IF NOT EXISTS ix_crq_actionable_col ON tickets.crq (is_actionable) WHERE is_actionable;
|
||||
|
||||
-- ── grants (guarded: roles may not exist on a fresh DB) ──────────────────────
|
||||
DO $grants$
|
||||
BEGIN
|
||||
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'tracksolid_owner') THEN
|
||||
GRANT SELECT, INSERT, UPDATE, DELETE ON tickets.crq TO tracksolid_owner;
|
||||
END IF;
|
||||
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'dashboard_ro') THEN
|
||||
GRANT SELECT ON tickets.crq TO dashboard_ro;
|
||||
END IF;
|
||||
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'grafana_ro') THEN
|
||||
GRANT SELECT ON tickets.crq TO grafana_ro;
|
||||
END IF;
|
||||
END $grants$;
|
||||
|
|
@ -1,297 +0,0 @@
|
|||
-- 16_crq_dashboard.sql — fleettickets · CRQ dashboard parity (view + read functions)
|
||||
-- ─────────────────────────────────────────────────────────────────────────────
|
||||
-- Brings CRQ to FleetOps-dashboard parity with INC, so the Tickets tab's CRQ
|
||||
-- sub-tab works "just like INC". Mirrors, against tickets.crq:
|
||||
-- tickets.crq_open_sla ← mirror of tickets.inc_open_sla (08)
|
||||
-- reporting.fn_crq_dashboard ← mirror of reporting.fn_inc_dashboard (09/12)
|
||||
-- reporting.fn_crq_search ← mirror of reporting.fn_inc_search (13)
|
||||
-- reporting.fn_crq_filter_options ← mirror of reporting.fn_inc_filter_options (14)
|
||||
-- consumed by dashboard_api GET /webhook/crq-dashboard | crq-search | crq-filter-options.
|
||||
--
|
||||
-- Differences from the INC view: tickets.crq has no `geog` column (mig 05 is INC-only)
|
||||
-- and its latitude/longitude come from `raw` (empty in the feed), so crq_open_sla omits
|
||||
-- geog and derives latitude/longitude from `geom`. The 48h SLA rule is reused verbatim
|
||||
-- for layout parity — installation-lifecycle SLA semantics may be refined later.
|
||||
--
|
||||
-- Idempotent (CREATE OR REPLACE / VIEW). Requires migration 15 (tickets.crq + typed cols).
|
||||
-- ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
SET search_path = tickets, public;
|
||||
|
||||
-- ── crq_open_sla — open CRQ tickets with derived SLA (mirror of inc_open_sla) ─
|
||||
CREATE OR REPLACE VIEW tickets.crq_open_sla AS
|
||||
SELECT
|
||||
ticket_id,
|
||||
normalized_status,
|
||||
bucket,
|
||||
cluster,
|
||||
region,
|
||||
location_name,
|
||||
assigned_team,
|
||||
owner,
|
||||
sla_status AS source_sla_status,
|
||||
mttr, -- minutes (null until closed)
|
||||
COALESCE(created_at_service, first_seen_at) AS sla_clock,
|
||||
CASE WHEN created_at_service IS NOT NULL THEN 'service' ELSE 'first_seen' END AS sla_clock_source,
|
||||
round((EXTRACT(EPOCH FROM now() - COALESCE(created_at_service, first_seen_at)) / 3600)::numeric, 1) AS hours_open,
|
||||
CASE
|
||||
WHEN COALESCE(created_at_service, first_seen_at) IS NULL THEN 'unknown'
|
||||
WHEN now() - COALESCE(created_at_service, first_seen_at) >= interval '48h' THEN 'breached'
|
||||
WHEN now() - COALESCE(created_at_service, first_seen_at) >= interval '36h' THEN 'at_risk'
|
||||
ELSE 'ok'
|
||||
END AS sla_state,
|
||||
created_at_service,
|
||||
first_seen_at,
|
||||
scheduled_at,
|
||||
CASE WHEN geom IS NOT NULL THEN ST_Y(geom) END AS latitude,
|
||||
CASE WHEN geom IS NOT NULL THEN ST_X(geom) END AS longitude,
|
||||
geo_source,
|
||||
geom
|
||||
FROM tickets.crq
|
||||
WHERE is_actionable;
|
||||
|
||||
COMMENT ON VIEW tickets.crq_open_sla IS
|
||||
'Open (is_actionable) CRQ tickets with derived SLA (48h rule; clock = created_at_service '
|
||||
'or first_seen_at fallback). Mirror of inc_open_sla; no geog. fleettickets 16.';
|
||||
|
||||
-- ── fn_crq_dashboard — mirror of fn_inc_dashboard over tickets.crq ───────────
|
||||
CREATE OR REPLACE FUNCTION reporting.fn_crq_dashboard(
|
||||
p_cluster text DEFAULT NULL,
|
||||
p_status text DEFAULT NULL,
|
||||
p_window text DEFAULT 'today',
|
||||
p_from timestamptz DEFAULT NULL,
|
||||
p_to timestamptz DEFAULT NULL
|
||||
)
|
||||
RETURNS jsonb LANGUAGE plpgsql STABLE AS $function$
|
||||
DECLARE
|
||||
v_now_eat timestamp;
|
||||
v_from timestamptz;
|
||||
v_to timestamptz;
|
||||
v_preset text;
|
||||
v_days numeric;
|
||||
v_result jsonb;
|
||||
BEGIN
|
||||
p_cluster := NULLIF(p_cluster, '');
|
||||
p_status := NULLIF(p_status, '');
|
||||
v_now_eat := now() AT TIME ZONE 'Africa/Nairobi';
|
||||
|
||||
-- ── resolve the window ──────────────────────────────────────────────────────
|
||||
IF p_from IS NOT NULL OR p_to IS NOT NULL THEN
|
||||
v_preset := 'custom';
|
||||
v_from := COALESCE(p_from, '-infinity'::timestamptz);
|
||||
v_to := COALESCE(p_to, 'infinity'::timestamptz);
|
||||
ELSE
|
||||
v_preset := lower(COALESCE(NULLIF(p_window, ''), 'today'));
|
||||
IF v_preset = 'week' THEN
|
||||
v_from := date_trunc('week', v_now_eat) AT TIME ZONE 'Africa/Nairobi';
|
||||
v_to := (date_trunc('week', v_now_eat) + interval '1 week') AT TIME ZONE 'Africa/Nairobi';
|
||||
ELSIF v_preset = 'month' THEN
|
||||
v_from := date_trunc('month', v_now_eat) AT TIME ZONE 'Africa/Nairobi';
|
||||
v_to := (date_trunc('month', v_now_eat) + interval '1 month') AT TIME ZONE 'Africa/Nairobi';
|
||||
ELSE
|
||||
v_preset := 'today';
|
||||
v_from := date_trunc('day', v_now_eat) AT TIME ZONE 'Africa/Nairobi';
|
||||
v_to := (date_trunc('day', v_now_eat) + interval '1 day') AT TIME ZONE 'Africa/Nairobi';
|
||||
END IF;
|
||||
END IF;
|
||||
|
||||
IF v_from > '-infinity'::timestamptz AND v_to < 'infinity'::timestamptz THEN
|
||||
v_days := GREATEST(EXTRACT(EPOCH FROM (v_to - v_from)) / 86400.0, 1);
|
||||
ELSE
|
||||
v_days := NULL; -- open-ended custom window → per-day average not meaningful
|
||||
END IF;
|
||||
|
||||
-- ── build payload ───────────────────────────────────────────────────────────
|
||||
WITH open_t AS (
|
||||
SELECT * FROM tickets.crq_open_sla
|
||||
WHERE (p_cluster IS NULL OR cluster = p_cluster)
|
||||
AND (p_status IS NULL OR normalized_status = p_status)
|
||||
),
|
||||
closed_t AS (
|
||||
SELECT ticket_id, normalized_status, cluster, region, location_name,
|
||||
assigned_team, owner, closed_at, mttr, sla_status, geo_source, geom
|
||||
FROM tickets.crq
|
||||
WHERE NOT COALESCE(is_actionable, false)
|
||||
AND closed_at IS NOT NULL
|
||||
AND closed_at >= v_from AND closed_at < v_to
|
||||
AND (p_cluster IS NULL OR cluster = p_cluster)
|
||||
AND (p_status IS NULL OR normalized_status = p_status)
|
||||
)
|
||||
SELECT jsonb_build_object(
|
||||
'window', jsonb_build_object('from', v_from, 'to', v_to, 'preset', v_preset),
|
||||
|
||||
'open', jsonb_build_object(
|
||||
'type', 'FeatureCollection',
|
||||
'features', COALESCE((
|
||||
SELECT jsonb_agg(jsonb_build_object(
|
||||
'type', 'Feature',
|
||||
'properties', jsonb_build_object(
|
||||
'ticket_id', ticket_id, 'normalized_status', normalized_status,
|
||||
'cluster', cluster, 'region', region, 'location_name', location_name,
|
||||
'assigned_team', assigned_team, 'owner', initcap(lower(NULLIF(owner, ''))),
|
||||
'geo_source', geo_source,
|
||||
'sla_state', sla_state, 'hours_open', hours_open),
|
||||
'geometry', ST_AsGeoJSON(geom)::jsonb))
|
||||
FROM open_t WHERE geom IS NOT NULL), '[]'::jsonb)
|
||||
),
|
||||
|
||||
'closed', jsonb_build_object(
|
||||
'type', 'FeatureCollection',
|
||||
'features', COALESCE((
|
||||
SELECT jsonb_agg(jsonb_build_object(
|
||||
'type', 'Feature',
|
||||
'properties', jsonb_build_object(
|
||||
'ticket_id', ticket_id, 'normalized_status', normalized_status,
|
||||
'cluster', cluster, 'region', region, 'location_name', location_name,
|
||||
'assigned_team', assigned_team, 'owner', initcap(lower(NULLIF(owner, ''))),
|
||||
'geo_source', geo_source,
|
||||
'closed_at', closed_at, 'mttr', mttr, 'sla_status', sla_status),
|
||||
'geometry', ST_AsGeoJSON(geom)::jsonb))
|
||||
FROM closed_t WHERE geom IS NOT NULL), '[]'::jsonb)
|
||||
),
|
||||
|
||||
'metrics', jsonb_build_object(
|
||||
'open_now', (SELECT count(*) FROM open_t),
|
||||
'closed_in_window', (SELECT count(*) FROM closed_t),
|
||||
'sla', jsonb_build_object(
|
||||
'open', (SELECT jsonb_build_object(
|
||||
'breached', count(*) FILTER (WHERE sla_state = 'breached'),
|
||||
'at_risk', count(*) FILTER (WHERE sla_state = 'at_risk'),
|
||||
'ok', count(*) FILTER (WHERE sla_state = 'ok'),
|
||||
'unknown', count(*) FILTER (WHERE sla_state = 'unknown')) FROM open_t),
|
||||
'closed', (SELECT jsonb_build_object(
|
||||
'compliant', count(*) FILTER (WHERE sla_status = 'Compliant'),
|
||||
'breached', count(*) FILTER (WHERE sla_status = 'Breached')) FROM closed_t)
|
||||
),
|
||||
'by_status', COALESCE((SELECT jsonb_object_agg(s, c) FROM (
|
||||
SELECT COALESCE(normalized_status, '(none)') AS s, count(*) AS c FROM (
|
||||
SELECT normalized_status FROM open_t
|
||||
UNION ALL SELECT normalized_status FROM closed_t) u GROUP BY 1) z), '{}'::jsonb),
|
||||
'by_cluster', COALESCE((SELECT jsonb_object_agg(cl, c) FROM (
|
||||
SELECT COALESCE(cluster, '(none)') AS cl, count(*) AS c FROM (
|
||||
SELECT cluster FROM open_t
|
||||
UNION ALL SELECT cluster FROM closed_t) u GROUP BY 1) z), '{}'::jsonb),
|
||||
-- closures by engineer (CASE-NORMALIZED owner) — leaderboard for "who closed".
|
||||
'by_owner', COALESCE((SELECT jsonb_agg(jsonb_build_object(
|
||||
'owner', o, 'closed', c, 'breached', b, 'avg_mttr_min', a) ORDER BY c DESC, o)
|
||||
FROM (
|
||||
SELECT COALESCE(initcap(lower(NULLIF(owner, ''))), '(unattributed)') AS o,
|
||||
count(*) AS c,
|
||||
count(*) FILTER (WHERE sla_status = 'Breached') AS b,
|
||||
round(avg(mttr) FILTER (WHERE mttr IS NOT NULL), 1) AS a
|
||||
FROM closed_t GROUP BY 1) z), '[]'::jsonb),
|
||||
'closure_rate', jsonb_build_object(
|
||||
'per_day_avg', CASE WHEN v_days IS NULL THEN NULL
|
||||
ELSE round((SELECT count(*) FROM closed_t)::numeric / v_days, 2) END,
|
||||
'series', COALESCE((SELECT jsonb_agg(jsonb_build_object('day', d, 'count', c) ORDER BY d) FROM (
|
||||
SELECT (closed_at AT TIME ZONE 'Africa/Nairobi')::date AS d, count(*) AS c
|
||||
FROM closed_t GROUP BY 1) z), '[]'::jsonb)
|
||||
),
|
||||
'avg_mttr_min', (SELECT round(avg(mttr), 1) FROM closed_t WHERE mttr IS NOT NULL)
|
||||
),
|
||||
|
||||
'freshness', (SELECT jsonb_object_agg(dataset, jsonb_build_object(
|
||||
'export_type', export_type, 'exported_at', exported_at,
|
||||
'records_ingested', records_ingested, 'ingested_at', ingested_at))
|
||||
FROM tickets.import_meta)
|
||||
) INTO v_result;
|
||||
|
||||
RETURN v_result;
|
||||
END $function$;
|
||||
|
||||
-- ── fn_crq_search — mirror of fn_inc_search over tickets.crq ──────────────────
|
||||
CREATE OR REPLACE FUNCTION reporting.fn_crq_search(
|
||||
p_ticket_id text DEFAULT NULL,
|
||||
p_owner text DEFAULT NULL,
|
||||
p_cluster text DEFAULT NULL,
|
||||
p_status text DEFAULT NULL,
|
||||
p_state text DEFAULT 'closed',
|
||||
p_from timestamptz DEFAULT NULL,
|
||||
p_to timestamptz DEFAULT NULL,
|
||||
p_limit integer DEFAULT 500
|
||||
)
|
||||
RETURNS jsonb LANGUAGE plpgsql STABLE AS $function$
|
||||
DECLARE
|
||||
v_state text := lower(COALESCE(NULLIF(p_state, ''), 'closed'));
|
||||
v_limit integer := LEAST(GREATEST(COALESCE(p_limit, 500), 1), 5000);
|
||||
v_result jsonb;
|
||||
BEGIN
|
||||
p_ticket_id := NULLIF(trim(p_ticket_id), '');
|
||||
p_owner := NULLIF(trim(p_owner), '');
|
||||
p_cluster := NULLIF(p_cluster, '');
|
||||
p_status := NULLIF(p_status, '');
|
||||
|
||||
WITH hits AS (
|
||||
SELECT ticket_id, normalized_status, cluster, region, location_name,
|
||||
initcap(lower(NULLIF(owner, ''))) AS owner, assigned_team,
|
||||
sla_status, mttr, closed_at, created_at_service, is_actionable,
|
||||
CASE WHEN geom IS NOT NULL THEN ST_Y(geom) END AS lat,
|
||||
CASE WHEN geom IS NOT NULL THEN ST_X(geom) END AS lng
|
||||
FROM tickets.crq
|
||||
WHERE (p_ticket_id IS NULL OR ticket_id ILIKE '%' || p_ticket_id || '%')
|
||||
AND (p_owner IS NULL OR lower(owner) LIKE '%' || lower(p_owner) || '%')
|
||||
AND (p_cluster IS NULL OR cluster = p_cluster)
|
||||
AND (p_status IS NULL OR normalized_status = p_status)
|
||||
AND CASE v_state
|
||||
WHEN 'open' THEN COALESCE(is_actionable, false)
|
||||
WHEN 'all' THEN COALESCE(is_actionable, false)
|
||||
OR (closed_at IS NOT NULL
|
||||
AND (p_from IS NULL OR closed_at >= p_from)
|
||||
AND (p_to IS NULL OR closed_at < p_to))
|
||||
ELSE NOT COALESCE(is_actionable, false) -- 'closed'
|
||||
AND closed_at IS NOT NULL
|
||||
AND (p_from IS NULL OR closed_at >= p_from)
|
||||
AND (p_to IS NULL OR closed_at < p_to)
|
||||
END
|
||||
),
|
||||
total AS (SELECT count(*) AS n FROM hits),
|
||||
page AS (
|
||||
SELECT * FROM hits
|
||||
ORDER BY closed_at DESC NULLS LAST, created_at_service DESC NULLS LAST
|
||||
LIMIT v_limit
|
||||
)
|
||||
SELECT jsonb_build_object(
|
||||
'count', (SELECT n FROM total),
|
||||
'truncated', (SELECT n FROM total) > v_limit,
|
||||
'limit', v_limit,
|
||||
'state', v_state,
|
||||
'rows', COALESCE((SELECT jsonb_agg(to_jsonb(page)
|
||||
ORDER BY page.closed_at DESC NULLS LAST,
|
||||
page.created_at_service DESC NULLS LAST)
|
||||
FROM page), '[]'::jsonb)
|
||||
) INTO v_result;
|
||||
|
||||
RETURN v_result;
|
||||
END $function$;
|
||||
|
||||
-- ── fn_crq_filter_options — mirror of fn_inc_filter_options over tickets.crq ──
|
||||
CREATE OR REPLACE FUNCTION reporting.fn_crq_filter_options()
|
||||
RETURNS jsonb LANGUAGE sql STABLE AS $function$
|
||||
SELECT jsonb_build_object(
|
||||
'owners', (SELECT COALESCE(jsonb_agg(o ORDER BY o), '[]'::jsonb)
|
||||
FROM (SELECT DISTINCT initcap(lower(NULLIF(owner, ''))) AS o
|
||||
FROM tickets.crq WHERE NULLIF(owner, '') IS NOT NULL) s),
|
||||
'clusters', (SELECT COALESCE(jsonb_agg(c ORDER BY c), '[]'::jsonb)
|
||||
FROM (SELECT DISTINCT cluster AS c
|
||||
FROM tickets.crq WHERE NULLIF(cluster, '') IS NOT NULL) s),
|
||||
'open_ticket_ids', (SELECT COALESCE(jsonb_agg(ticket_id ORDER BY ticket_id), '[]'::jsonb)
|
||||
FROM tickets.crq WHERE COALESCE(is_actionable, false))
|
||||
);
|
||||
$function$;
|
||||
|
||||
-- ── grants (guarded: roles may not exist on a fresh DB) ──────────────────────
|
||||
DO $grants$
|
||||
BEGIN
|
||||
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'dashboard_ro') THEN
|
||||
GRANT SELECT ON tickets.crq_open_sla TO dashboard_ro;
|
||||
GRANT EXECUTE ON FUNCTION reporting.fn_crq_dashboard(text, text, text, timestamptz, timestamptz) TO dashboard_ro;
|
||||
GRANT EXECUTE ON FUNCTION reporting.fn_crq_search(text, text, text, text, text, timestamptz, timestamptz, integer) TO dashboard_ro;
|
||||
GRANT EXECUTE ON FUNCTION reporting.fn_crq_filter_options() TO dashboard_ro;
|
||||
END IF;
|
||||
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'grafana_ro') THEN
|
||||
GRANT SELECT ON tickets.crq_open_sla TO grafana_ro;
|
||||
GRANT EXECUTE ON FUNCTION reporting.fn_crq_dashboard(text, text, text, timestamptz, timestamptz) TO grafana_ro;
|
||||
GRANT EXECUTE ON FUNCTION reporting.fn_crq_search(text, text, text, text, text, timestamptz, timestamptz, integer) TO grafana_ro;
|
||||
GRANT EXECUTE ON FUNCTION reporting.fn_crq_filter_options() TO grafana_ro;
|
||||
END IF;
|
||||
END $grants$;
|
||||
|
|
@ -61,12 +61,10 @@ Notes:
|
|||
listings but contain **no objects** (leftover/legacy; ignore them). There is
|
||||
no `latest` pointer and no JSON/metadata envelope.
|
||||
- **CRQ mirrors INC**: `automations/crq/changes/` carries the same incremental
|
||||
stream (with matching baseline timestamps and additional newer deltas) and the
|
||||
identical 32-column schema. As of 2026-06-25 CRQ **is consumed** — by
|
||||
`crq/import_crq.py` over the shared `pipeline.py` engine (`tickets.crq`), the same
|
||||
way INC is consumed by `inc/import_inc.py`. CRQ's old root snapshots
|
||||
(`automations/crq/<ts>.csv`, old `tickets` bucket) are still present because nothing
|
||||
archives them — they are not consumed (the `changes/` stream is the source of truth).
|
||||
stream (with matching baseline timestamps and additional newer deltas). CRQ
|
||||
remains out of scope for `import_tickets.py` (INC-only), but the source-side
|
||||
shape is the same. CRQ's old root snapshots (`automations/crq/<ts>.csv`) are
|
||||
still present because nothing archives them — they are not consumed.
|
||||
|
||||
## CSV Schema
|
||||
|
||||
|
|
@ -85,7 +83,7 @@ Each row is a complete record (not a partial diff): a delta row carries the
|
|||
ticket's full current state, so a plain upsert on `ticket_id` is correct. The
|
||||
baseline still contains `is_alarm=true` rows and may include a leading
|
||||
`EXPORT STOPPED…` truncation-sentinel row in `ticket_id`; both are filtered by
|
||||
the consumer (see `pipeline.py` + the `inc/`,`crq/` entrypoints).
|
||||
the consumer (see `import_tickets.py`).
|
||||
|
||||
## Timestamp Format
|
||||
|
||||
|
|
@ -98,12 +96,11 @@ Generated once at the start of each execution, formatted in `Africa/Nairobi`
|
|||
incremental files appear whenever a change batch is exported (multiple within
|
||||
the same hour are normal, e.g. `15-50-39` then `15-53-04`).
|
||||
|
||||
## How the loader Consumes It
|
||||
## How `import_tickets.py` Consumes It
|
||||
|
||||
`python -m inc.import_inc --from-bucket --apply` (and `python -m crq.import_crq
|
||||
--from-bucket --apply`; see `run_ingest.sh`) — both over the shared `pipeline.py` engine:
|
||||
`python import_tickets.py --from-bucket --apply` (see `run_ingest.sh`):
|
||||
|
||||
1. Lists `automations/<inc|crq>/changes/<ts>.csv`, sorts ascending by timestamp.
|
||||
1. Lists `automations/inc/changes/<ts>.csv`, sorts ascending by timestamp.
|
||||
2. Skips files at/older than the **watermark**
|
||||
(`tickets.import_meta.metadata->>'source_max_key'` — the newest file already
|
||||
applied); on a fresh stream it processes everything present.
|
||||
|
|
|
|||
|
|
@ -18,12 +18,10 @@ dev = [
|
|||
"ruff>=0.4",
|
||||
]
|
||||
|
||||
# Shared engine (pipeline) + helpers as top-level modules, plus the thin per-type
|
||||
# entrypoint packages (inc/, crq/). Listed explicitly so `pip install .` works (the
|
||||
# Docker image installs the project to pull its deps; runtime runs from /app via -m).
|
||||
# Flat-module project (no package dir) — list the top-level modules explicitly so
|
||||
# `pip install .` works (the Docker image installs the project to pull its deps).
|
||||
[tool.setuptools]
|
||||
py-modules = ["pipeline", "shared", "run_migrations"]
|
||||
packages = ["inc", "crq"]
|
||||
py-modules = ["import_tickets", "shared", "run_migrations"]
|
||||
|
||||
[tool.uv]
|
||||
managed = true
|
||||
|
|
|
|||
|
|
@ -1,17 +1,13 @@
|
|||
#!/usr/bin/env bash
|
||||
# run_ingest.sh — fleettickets · INC + CRQ ingest wrapper for cron (plain host/VM).
|
||||
# run_ingest.sh — fleettickets · hourly INC ingest wrapper for cron.
|
||||
#
|
||||
# Loads env from the local .env (DATABASE_URL + RUSTFS_* + GEOCODER_*) and drains
|
||||
# both ticket change streams with --apply (watermark skip-if-unchanged + per-file
|
||||
# archive are built in, so a run with no new files is a cheap no-op).
|
||||
# Loads env from the local .env (DATABASE_URL + RUSTFS_* + GEOCODER_*) and runs the
|
||||
# newest-INC-CSV ingest with --apply (skip-if-unchanged + archive are built in).
|
||||
#
|
||||
# Install on the instance (every 20 min, 06:00–20:40 EAT):
|
||||
# */20 6-20 * * * /opt/fleettickets/run_ingest.sh >> /var/log/fleettickets.log 2>&1
|
||||
# Install on the instance (ingest every 20 min, 06:00–20:40 EAT):
|
||||
# */20 6-20 * * * /opt/fleettickets/run_ingest.sh >> /var/log/fleettickets-inc.log 2>&1
|
||||
# Ensure the crontab runs in the Africa/Nairobi timezone (CRON_TZ=Africa/Nairobi or
|
||||
# the host/container TZ), since the export filenames and the schedule are EAT.
|
||||
#
|
||||
# On Coolify the two ingests run as separate Scheduled Tasks instead (see Dockerfile
|
||||
# + docs/deployment-and-operations.md); this wrapper is the plain-host fallback.
|
||||
set -euo pipefail
|
||||
|
||||
cd "$(dirname "$0")"
|
||||
|
|
@ -28,7 +24,4 @@ fi
|
|||
PY="python"
|
||||
[ -x ".venv/bin/python" ] && PY=".venv/bin/python"
|
||||
|
||||
# Run from the repo root (cwd above) so `-m inc.import_inc` / `-m crq.import_crq`
|
||||
# resolve the packages alongside pipeline.py + shared.py.
|
||||
"$PY" -m inc.import_inc --from-bucket --apply
|
||||
"$PY" -m crq.import_crq --from-bucket --apply
|
||||
exec "$PY" import_tickets.py --from-bucket --apply
|
||||
|
|
|
|||
Loading…
Reference in a new issue