feat(import_tickets): migrate INC ingest to isptickets bucket + --reseed cutover

Provider moved the INC CDC feed to a new bucket (tickets -> isptickets, new
per-bucket creds; same s3.rahamafresh.com endpoint, identical 32-col schema).
This is config + a one-time reseed, not a rewrite — the loader already drains
automations/inc/changes/ oldest->newest with a source_max_key watermark.

- default _BUCKET -> isptickets (TICKETS_BUCKET still overrides)
- add --reseed: ignore the stored watermark and drain every changes/ file once
  (the old-bucket watermark may post-date the new bucket's first file). Crash-safe
  via the existing per-file watermark-advance + archive loop.
- refresh stale "newest-file / full-snapshot-per-hour" docstring/comments to the
  CDC reality; .env.example + README updated (new bucket + reseed runbook).

Verified live dry-run: 41/41 files drained (watermark None), alarm/sentinel
filter active, exit 0.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
david kiania 2026-06-25 18:20:04 +03:00
parent a4b90a33d8
commit 509338c076
3 changed files with 56 additions and 26 deletions

View file

@ -3,12 +3,12 @@
# Shared database (the `tickets` schema lives in tracksolid_db; internal Docker host) # Shared database (the `tickets` schema lives in tracksolid_db; internal Docker host)
DATABASE_URL=postgresql://tracksolid_owner:<password>@timescale_db:5432/tracksolid_db DATABASE_URL=postgresql://tracksolid_owner:<password>@timescale_db:5432/tracksolid_db
# rustfs / S3 — source ticket snapshots (automations/inc/<EAT-timestamp>.csv) # S3 — source INC ticket CDC stream (isptickets bucket, automations/inc/changes/<EAT-ts>.csv)
RUSTFS_ENDPOINT=https://s3.rahamafresh.com RUSTFS_ENDPOINT=https://s3.rahamafresh.com
RUSTFS_ACCESS_KEY=<key> RUSTFS_ACCESS_KEY=isptickets
RUSTFS_SECRET_KEY=<secret> RUSTFS_SECRET_KEY=<secret>
RUSTFS_REGION=us-east-1 RUSTFS_REGION=us-east-1
TICKETS_BUCKET=tickets TICKETS_BUCKET=isptickets
# Geocoder (keyed — public Nominatim rate-limits bulk) # Geocoder (keyed — public Nominatim rate-limits bulk)
GEOCODER_PROVIDER=locationiq # locationiq | opencage GEOCODER_PROVIDER=locationiq # locationiq | opencage

View file

@ -21,7 +21,7 @@ Field-ops **INC ticket** ingestion, geocoding, and read-schema that powers the
| `migrations/08_inc_open_sla_view.sql` | `tickets.inc_open_sla` view — open (`is_actionable`) tickets with **derived SLA** (`hours_open`, `sla_state` vs 48h; clock = `created_at_service``first_seen_at`), plus team/cluster/`geog` for dispatch | | `migrations/08_inc_open_sla_view.sql` | `tickets.inc_open_sla` view — open (`is_actionable`) tickets with **derived SLA** (`hours_open`, `sla_state` vs 48h; clock = `created_at_service``first_seen_at`), plus team/cluster/`geog` for dispatch |
| `migrations/09_inc_dashboard_fn.sql` | `reporting.fn_inc_dashboard(cluster, status, window, from, to)` — one JSON payload (`window` / `open` GeoJSON / `closed` GeoJSON / `metrics` / `freshness`) powering the FleetOps live INC map. Open=live, closed=windowed (EAT calendar / custom); filters AND | | `migrations/09_inc_dashboard_fn.sql` | `reporting.fn_inc_dashboard(cluster, status, window, from, to)` — one JSON payload (`window` / `open` GeoJSON / `closed` GeoJSON / `metrics` / `freshness`) powering the FleetOps live INC map. Open=live, closed=windowed (EAT calendar / custom); filters AND |
| `migrations/10_inc_history_capture.sql` | History for time-series: `tickets.closure_events` (append-only observed closures) + `tickets.inc_daily_snapshot` (per-EAT-day open backlog + flow), populated by `tickets.capture_history()` each ingest. Unlocks **backlog-over-time** | | `migrations/10_inc_history_capture.sql` | History for time-series: `tickets.closure_events` (append-only observed closures) + `tickets.inc_daily_snapshot` (per-EAT-day open backlog + flow), populated by `tickets.capture_history()` each ingest. Unlocks **backlog-over-time** |
| `import_tickets.py` | Ingests the **newest INC CSV** from the rustfs `tickets` bucket (`automations/inc/<EAT-timestamp>.csv`) and upserts on `ticket_id`; geocodes clusters + INC locations | | `import_tickets.py` | Drains the **incremental INC change stream** from the `isptickets` bucket (`automations/inc/changes/<EAT-timestamp>.csv`), upserting on `ticket_id` oldest→newest; geocodes clusters + INC locations |
| `run_migrations.py` | Applies `migrations/*.sql` in order (ledger: `tickets.schema_migrations`) | | `run_migrations.py` | Applies `migrations/*.sql` in order (ledger: `tickets.schema_migrations`) |
| `shared.py` | Minimal DB/logging helpers (self-contained — no tracksolid dependency) | | `shared.py` | Minimal DB/logging helpers (self-contained — no tracksolid dependency) |
@ -87,7 +87,7 @@ python run_migrations.py # apply the schema (idempotent)
## Run ## Run
```bash ```bash
# ingest the newest INC CSV from the bucket (skip-if-unchanged, then archive) # drain the incremental INC change stream (every new file oldest→newest, then archive)
python import_tickets.py --from-bucket --apply python import_tickets.py --from-bucket --apply
# geocode (needs GEOCODER_API_KEY) # geocode (needs GEOCODER_API_KEY)
@ -112,24 +112,44 @@ runs as a **Scheduled Task**, not a system crontab:
Coolify instance runs scheduled tasks in **EAT (Africa/Nairobi)**, so no UTC Coolify instance runs scheduled tasks in **EAT (Africa/Nairobi)**, so no UTC
conversion is needed. conversion is needed.
- **Env vars** (Coolify → Environment Variables): `DATABASE_URL` (internal DB host), - **Env vars** (Coolify → Environment Variables): `DATABASE_URL` (internal DB host),
`RUSTFS_*`, `GEOCODER_*`. `RUSTFS_*` (now the `isptickets` bucket credentials), `GEOCODER_*`.
Skip-if-unchanged makes a run on an already-ingested snapshot a cheap no-op. The watermark makes a run with no new change files a cheap no-op.
For a plain host/VM instead of Coolify, [`run_ingest.sh`](run_ingest.sh) loads `.env` For a plain host/VM instead of Coolify, [`run_ingest.sh`](run_ingest.sh) loads `.env`
and runs the ingest; schedule it with a crontab line and runs the ingest; schedule it with a crontab line
(`CRON_TZ=Africa/Nairobi` / `15 7-19 * * *`). (`CRON_TZ=Africa/Nairobi` / `15 7-19 * * *`).
### Bucket cutover (one-time reseed)
When the source provider moves the feed to a new bucket (e.g. `tickets``isptickets`),
the stored watermark holds a key from the *old* bucket's stream, whose timestamp may be
newer than the new bucket's first file — which would otherwise be skipped. Point the
`RUSTFS_*` creds + `TICKETS_BUCKET` at the new bucket, then drain it once with `--reseed`,
which ignores the stored watermark and ingests **every** file in `changes/` oldest→newest:
```bash
python import_tickets.py --from-bucket --reseed # dry-run first
python import_tickets.py --from-bucket --reseed --apply # commit + archive
```
Upserts are idempotent (`ticket_id` PK, rows never deleted) and the new stream's periodic
full-state re-emissions re-assert current state, so this is non-destructive and converges
even across the cutover gap. After it, the watermark is current — resume normal
`--from-bucket --apply` runs (no `--reseed`). The old bucket is left untouched.
## Notes ## Notes
- The n8n export writes a **full current-state CSV per hour** to - The n8n export writes an **incremental CDC change stream** to
`automations/inc/<EAT-timestamp>.csv` — no `latest` pointer, no metadata envelope, no `automations/inc/changes/<EAT-timestamp>.csv`: a full-state baseline followed by files
deltas. The loader lists the prefix, takes the **newest** file, and ingests it. holding only the rows that changed (with periodic full-state re-emissions). No `latest`
- **Skip-if-unchanged:** the newest file's S3 **ETag** is compared to the last processed pointer, no metadata envelope. The loader drains **every not-yet-processed file
file's ETag (stored in `tickets.import_meta.metadata.source_etag`); if equal, the DB write oldest→newest** — taking only the newest would drop intermediate deltas.
is skipped (the export re-emits byte-identical content most hours). - **Watermark:** the newest file already applied is recorded in
`tickets.import_meta.metadata.source_max_key`; runs skip anything at/older than it, so
reruns are cheap no-ops. `--reseed` ignores it for a one-time bucket cutover.
- **Upsert on `ticket_id`** (PRIMARY KEY) — duplication is impossible; rows are never - **Upsert on `ticket_id`** (PRIMARY KEY) — duplication is impossible; rows are never
deleted, so closed-ticket history accumulates. On success the file is **moved** to deleted, so closed-ticket history accumulates. On success each file is **moved** to
`automations/inc/processed/`. `automations/inc/processed/`.
- **Cleaning at ingest:** drop `is_alarm=true` rows + the `EXPORT STOPPED…` sentinel; drop - **Cleaning at ingest:** drop `is_alarm=true` rows + the `EXPORT STOPPED…` sentinel; drop
`week_start`/`week_end`, `source_s3_*`/`source_snapshot_id`, `department`/`source_type`; `week_start`/`week_end`, `source_s3_*`/`source_snapshot_id`, `department`/`source_type`;

View file

@ -13,8 +13,8 @@ Everything downstream reads from `raw` (resilient to source schema drift). The D
derives `geom` (see migrations): feed coords (raw lat/lng) -> location geocode derives `geom` (see migrations): feed coords (raw lat/lng) -> location geocode
(tickets.geo_locations) -> cluster centroid (tickets.geo_clusters) -> none. (tickets.geo_locations) -> cluster centroid (tickets.geo_clusters) -> none.
Source data: the n8n S3 export writes CSV files to the `tickets` bucket under Source data: the n8n S3 export writes CSV files to the `isptickets` bucket under
automations/inc/changes/<EAT-timestamp>.csv (e.g. 2026-06-22T15-50-39.csv) automations/inc/changes/<EAT-timestamp>.csv (e.g. 2026-06-24T09-55-44.csv)
This is an INCREMENTAL (CDC) stream: the first file is a full current-state This is an INCREMENTAL (CDC) stream: the first file is a full current-state
baseline, and every later file holds only the rows that CHANGED since the prior baseline, and every later file holds only the rows that CHANGED since the prior
export (new + updated tickets, keyed by ticket_id; deletions are never emitted). export (new + updated tickets, keyed by ticket_id; deletions are never emitted).
@ -39,6 +39,7 @@ Provider/key from env: GEOCODER_PROVIDER (locationiq|opencage), GEOCODER_API_KEY
Usage (needs DATABASE_URL + RUSTFS_* + GEOCODER_* env; see .env.example): Usage (needs DATABASE_URL + RUSTFS_* + GEOCODER_* env; see .env.example):
python import_tickets.py --from-bucket --apply python import_tickets.py --from-bucket --apply
python import_tickets.py --from-bucket --reseed --apply # one-time bucket cutover
python import_tickets.py --inc-csv 2026-06-15T17-00-00.csv --apply python import_tickets.py --inc-csv 2026-06-15T17-00-00.csv --apply
python import_tickets.py --geocode-clusters --apply python import_tickets.py --geocode-clusters --apply
python import_tickets.py --geocode-locations --apply python import_tickets.py --geocode-locations --apply
@ -71,7 +72,7 @@ log = get_logger("import_tickets")
# ── INC ingestion config ────────────────────────────────────────────────────── # ── INC ingestion config ──────────────────────────────────────────────────────
_TABLE = "tickets.inc" _TABLE = "tickets.inc"
_DATASET = "inc" _DATASET = "inc"
_BUCKET = os.getenv("TICKETS_BUCKET", "tickets") _BUCKET = os.getenv("TICKETS_BUCKET", "isptickets")
_INC_PREFIX = "automations/inc/changes/" # the incremental (CDC) change stream _INC_PREFIX = "automations/inc/changes/" # the incremental (CDC) change stream
_PROCESSED_PREFIX = "automations/inc/processed/" _PROCESSED_PREFIX = "automations/inc/processed/"
_EAT = timezone(timedelta(hours=3)) # Africa/Nairobi — filenames + data are EAT _EAT = timezone(timedelta(hours=3)) # Africa/Nairobi — filenames + data are EAT
@ -102,14 +103,15 @@ _GEOCODE_INTERVAL_S = float(os.getenv("GEOCODER_MIN_INTERVAL_S", "1.1"))
_last_geocode_at = 0.0 _last_geocode_at = 0.0
# ── data loading (CSV · newest-file · ETag skip-if-unchanged) ─────────────────── # ── data loading (CSV · incremental CDC change stream · per-file watermark) ─────
# The n8n hourly export writes a full current-state CSV per hour to # The n8n export writes an INCREMENTAL change stream to the `isptickets` bucket under
# automations/inc/<EAT-timestamp>.csv (no latest pointer, no envelope, no deltas). # automations/inc/changes/<EAT-timestamp>.csv: a first full-state baseline, then files
# We ingest the NEWEST file; if its S3 ETag matches the last processed file's ETag # holding only the rows that CHANGED (with periodic full-state re-emissions). We drain
# we skip the DB write (the export re-emits byte-identical content most hours). # EVERY not-yet-processed file oldest→newest, upsert on ticket_id, advance the watermark
# (tickets.import_meta.metadata->>'source_max_key') per file, and archive it to processed/.
# S3 access is via boto3 (no aws-CLI dependency → runs cleanly in a slim container). # S3 access is via boto3 (no aws-CLI dependency → runs cleanly in a slim container).
def _s3_client(): def _s3_client():
"""boto3 S3 client for the rustfs endpoint (force path-style addressing).""" """boto3 S3 client for the S3 endpoint (force path-style addressing)."""
return boto3.client( return boto3.client(
"s3", "s3",
endpoint_url=os.environ["RUSTFS_ENDPOINT"], endpoint_url=os.environ["RUSTFS_ENDPOINT"],
@ -286,7 +288,11 @@ def ingest(args) -> None:
# watermark: skip anything at/older than the newest file already applied. Archiving # watermark: skip anything at/older than the newest file already applied. Archiving
# normally empties changes/, but this guards a failed archive from re-applying. # normally empties changes/, but this guards a failed archive from re-applying.
last_ts = _last_processed_ts() # --reseed ignores the stored watermark and drains EVERY file in changes/ once — used
# for a one-time bucket cutover, where the stored key points at the old bucket's stream
# and its timestamp may be newer than the new bucket's first file. Crash-safe: each file
# still advances source_max_key + archives per file, so a plain rerun resumes cleanly.
last_ts = None if args.reseed else _last_processed_ts()
_floor = datetime.min.replace(tzinfo=_EAT) _floor = datetime.min.replace(tzinfo=_EAT)
pending = [(k, e) for k, e in listing pending = [(k, e) for k, e in listing
if last_ts is None or (_ts_from_key(k) or _floor) > last_ts] if last_ts is None or (_ts_from_key(k) or _floor) > last_ts]
@ -610,8 +616,12 @@ def main() -> None:
ap = argparse.ArgumentParser(description="Ingest INC tickets from CSV (raw-first) + geocode") ap = argparse.ArgumentParser(description="Ingest INC tickets from CSV (raw-first) + geocode")
ap.add_argument("--apply", action="store_true", help="Write to DB (default: dry-run)") ap.add_argument("--apply", action="store_true", help="Write to DB (default: dry-run)")
ap.add_argument("--from-bucket", action="store_true", ap.add_argument("--from-bucket", action="store_true",
help="Ingest the newest INC csv from the rustfs tickets bucket (aws CLI); " help="Drain the incremental INC change stream (automations/inc/changes/) "
"skips if unchanged (ETag) and archives processed files") "from the isptickets S3 bucket: every not-yet-processed file "
"oldest→newest, upsert on ticket_id, advance the watermark, archive")
ap.add_argument("--reseed", action="store_true",
help="Ignore the stored watermark and drain every file in changes/ once "
"(one-time bucket cutover / reseed). Use with --from-bucket --apply")
ap.add_argument("--inc-csv", default=None, help="Local INC tickets CSV file (dev)") ap.add_argument("--inc-csv", default=None, help="Local INC tickets CSV file (dev)")
ap.add_argument("--geocode-clusters", action="store_true", ap.add_argument("--geocode-clusters", action="store_true",
help="Geocode distinct clusters into the gazetteer, then re-resolve geoms") help="Geocode distinct clusters into the gazetteer, then re-resolve geoms")