From 509338c076f48821a4f3d7345cc956d22b83dea5 Mon Sep 17 00:00:00 2001 From: david kiania Date: Thu, 25 Jun 2026 18:20:04 +0300 Subject: [PATCH] feat(import_tickets): migrate INC ingest to isptickets bucket + --reseed cutover MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Provider moved the INC CDC feed to a new bucket (tickets -> isptickets, new per-bucket creds; same s3.rahamafresh.com endpoint, identical 32-col schema). This is config + a one-time reseed, not a rewrite — the loader already drains automations/inc/changes/ oldest->newest with a source_max_key watermark. - default _BUCKET -> isptickets (TICKETS_BUCKET still overrides) - add --reseed: ignore the stored watermark and drain every changes/ file once (the old-bucket watermark may post-date the new bucket's first file). Crash-safe via the existing per-file watermark-advance + archive loop. - refresh stale "newest-file / full-snapshot-per-hour" docstring/comments to the CDC reality; .env.example + README updated (new bucket + reseed runbook). Verified live dry-run: 41/41 files drained (watermark None), alarm/sentinel filter active, exit 0. Co-Authored-By: Claude Opus 4.8 --- .env.example | 6 +++--- README.md | 42 +++++++++++++++++++++++++++++++----------- import_tickets.py | 34 ++++++++++++++++++++++------------ 3 files changed, 56 insertions(+), 26 deletions(-) diff --git a/.env.example b/.env.example index 5de8686..b580ddd 100644 --- a/.env.example +++ b/.env.example @@ -3,12 +3,12 @@ # Shared database (the `tickets` schema lives in tracksolid_db; internal Docker host) DATABASE_URL=postgresql://tracksolid_owner:@timescale_db:5432/tracksolid_db -# rustfs / S3 — source ticket snapshots (automations/inc/.csv) +# S3 — source INC ticket CDC stream (isptickets bucket, automations/inc/changes/.csv) RUSTFS_ENDPOINT=https://s3.rahamafresh.com -RUSTFS_ACCESS_KEY= +RUSTFS_ACCESS_KEY=isptickets RUSTFS_SECRET_KEY= RUSTFS_REGION=us-east-1 -TICKETS_BUCKET=tickets +TICKETS_BUCKET=isptickets # Geocoder (keyed — public Nominatim rate-limits bulk) GEOCODER_PROVIDER=locationiq # locationiq | opencage diff --git a/README.md b/README.md index dcbb81d..7faa516 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,7 @@ Field-ops **INC ticket** ingestion, geocoding, and read-schema that powers the | `migrations/08_inc_open_sla_view.sql` | `tickets.inc_open_sla` view — open (`is_actionable`) tickets with **derived SLA** (`hours_open`, `sla_state` vs 48h; clock = `created_at_service` ∥ `first_seen_at`), plus team/cluster/`geog` for dispatch | | `migrations/09_inc_dashboard_fn.sql` | `reporting.fn_inc_dashboard(cluster, status, window, from, to)` — one JSON payload (`window` / `open` GeoJSON / `closed` GeoJSON / `metrics` / `freshness`) powering the FleetOps live INC map. Open=live, closed=windowed (EAT calendar / custom); filters AND | | `migrations/10_inc_history_capture.sql` | History for time-series: `tickets.closure_events` (append-only observed closures) + `tickets.inc_daily_snapshot` (per-EAT-day open backlog + flow), populated by `tickets.capture_history()` each ingest. Unlocks **backlog-over-time** | -| `import_tickets.py` | Ingests the **newest INC CSV** from the rustfs `tickets` bucket (`automations/inc/.csv`) and upserts on `ticket_id`; geocodes clusters + INC locations | +| `import_tickets.py` | Drains the **incremental INC change stream** from the `isptickets` bucket (`automations/inc/changes/.csv`), upserting on `ticket_id` oldest→newest; geocodes clusters + INC locations | | `run_migrations.py` | Applies `migrations/*.sql` in order (ledger: `tickets.schema_migrations`) | | `shared.py` | Minimal DB/logging helpers (self-contained — no tracksolid dependency) | @@ -87,7 +87,7 @@ python run_migrations.py # apply the schema (idempotent) ## Run ```bash -# ingest the newest INC CSV from the bucket (skip-if-unchanged, then archive) +# drain the incremental INC change stream (every new file oldest→newest, then archive) python import_tickets.py --from-bucket --apply # geocode (needs GEOCODER_API_KEY) @@ -112,24 +112,44 @@ runs as a **Scheduled Task**, not a system crontab: Coolify instance runs scheduled tasks in **EAT (Africa/Nairobi)**, so no UTC conversion is needed. - **Env vars** (Coolify → Environment Variables): `DATABASE_URL` (internal DB host), - `RUSTFS_*`, `GEOCODER_*`. + `RUSTFS_*` (now the `isptickets` bucket credentials), `GEOCODER_*`. -Skip-if-unchanged makes a run on an already-ingested snapshot a cheap no-op. +The watermark makes a run with no new change files a cheap no-op. For a plain host/VM instead of Coolify, [`run_ingest.sh`](run_ingest.sh) loads `.env` and runs the ingest; schedule it with a crontab line (`CRON_TZ=Africa/Nairobi` / `15 7-19 * * *`). +### Bucket cutover (one-time reseed) + +When the source provider moves the feed to a new bucket (e.g. `tickets` → `isptickets`), +the stored watermark holds a key from the *old* bucket's stream, whose timestamp may be +newer than the new bucket's first file — which would otherwise be skipped. Point the +`RUSTFS_*` creds + `TICKETS_BUCKET` at the new bucket, then drain it once with `--reseed`, +which ignores the stored watermark and ingests **every** file in `changes/` oldest→newest: + +```bash +python import_tickets.py --from-bucket --reseed # dry-run first +python import_tickets.py --from-bucket --reseed --apply # commit + archive +``` + +Upserts are idempotent (`ticket_id` PK, rows never deleted) and the new stream's periodic +full-state re-emissions re-assert current state, so this is non-destructive and converges +even across the cutover gap. After it, the watermark is current — resume normal +`--from-bucket --apply` runs (no `--reseed`). The old bucket is left untouched. + ## Notes -- The n8n export writes a **full current-state CSV per hour** to - `automations/inc/.csv` — no `latest` pointer, no metadata envelope, no - deltas. The loader lists the prefix, takes the **newest** file, and ingests it. -- **Skip-if-unchanged:** the newest file's S3 **ETag** is compared to the last processed - file's ETag (stored in `tickets.import_meta.metadata.source_etag`); if equal, the DB write - is skipped (the export re-emits byte-identical content most hours). +- The n8n export writes an **incremental CDC change stream** to + `automations/inc/changes/.csv`: a full-state baseline followed by files + holding only the rows that changed (with periodic full-state re-emissions). No `latest` + pointer, no metadata envelope. The loader drains **every not-yet-processed file + oldest→newest** — taking only the newest would drop intermediate deltas. +- **Watermark:** the newest file already applied is recorded in + `tickets.import_meta.metadata.source_max_key`; runs skip anything at/older than it, so + reruns are cheap no-ops. `--reseed` ignores it for a one-time bucket cutover. - **Upsert on `ticket_id`** (PRIMARY KEY) — duplication is impossible; rows are never - deleted, so closed-ticket history accumulates. On success the file is **moved** to + deleted, so closed-ticket history accumulates. On success each file is **moved** to `automations/inc/processed/`. - **Cleaning at ingest:** drop `is_alarm=true` rows + the `EXPORT STOPPED…` sentinel; drop `week_start`/`week_end`, `source_s3_*`/`source_snapshot_id`, `department`/`source_type`; diff --git a/import_tickets.py b/import_tickets.py index 2c4ae3c..5497bf9 100644 --- a/import_tickets.py +++ b/import_tickets.py @@ -13,8 +13,8 @@ Everything downstream reads from `raw` (resilient to source schema drift). The D derives `geom` (see migrations): feed coords (raw lat/lng) -> location geocode (tickets.geo_locations) -> cluster centroid (tickets.geo_clusters) -> none. -Source data: the n8n S3 export writes CSV files to the `tickets` bucket under - automations/inc/changes/.csv (e.g. 2026-06-22T15-50-39.csv) +Source data: the n8n S3 export writes CSV files to the `isptickets` bucket under + automations/inc/changes/.csv (e.g. 2026-06-24T09-55-44.csv) This is an INCREMENTAL (CDC) stream: the first file is a full current-state baseline, and every later file holds only the rows that CHANGED since the prior export (new + updated tickets, keyed by ticket_id; deletions are never emitted). @@ -39,6 +39,7 @@ Provider/key from env: GEOCODER_PROVIDER (locationiq|opencage), GEOCODER_API_KEY Usage (needs DATABASE_URL + RUSTFS_* + GEOCODER_* env; see .env.example): python import_tickets.py --from-bucket --apply + python import_tickets.py --from-bucket --reseed --apply # one-time bucket cutover python import_tickets.py --inc-csv 2026-06-15T17-00-00.csv --apply python import_tickets.py --geocode-clusters --apply python import_tickets.py --geocode-locations --apply @@ -71,7 +72,7 @@ log = get_logger("import_tickets") # ── INC ingestion config ────────────────────────────────────────────────────── _TABLE = "tickets.inc" _DATASET = "inc" -_BUCKET = os.getenv("TICKETS_BUCKET", "tickets") +_BUCKET = os.getenv("TICKETS_BUCKET", "isptickets") _INC_PREFIX = "automations/inc/changes/" # the incremental (CDC) change stream _PROCESSED_PREFIX = "automations/inc/processed/" _EAT = timezone(timedelta(hours=3)) # Africa/Nairobi — filenames + data are EAT @@ -102,14 +103,15 @@ _GEOCODE_INTERVAL_S = float(os.getenv("GEOCODER_MIN_INTERVAL_S", "1.1")) _last_geocode_at = 0.0 -# ── data loading (CSV · newest-file · ETag skip-if-unchanged) ─────────────────── -# The n8n hourly export writes a full current-state CSV per hour to -# automations/inc/.csv (no latest pointer, no envelope, no deltas). -# We ingest the NEWEST file; if its S3 ETag matches the last processed file's ETag -# we skip the DB write (the export re-emits byte-identical content most hours). +# ── data loading (CSV · incremental CDC change stream · per-file watermark) ───── +# The n8n export writes an INCREMENTAL change stream to the `isptickets` bucket under +# automations/inc/changes/.csv: a first full-state baseline, then files +# holding only the rows that CHANGED (with periodic full-state re-emissions). We drain +# EVERY not-yet-processed file oldest→newest, upsert on ticket_id, advance the watermark +# (tickets.import_meta.metadata->>'source_max_key') per file, and archive it to processed/. # S3 access is via boto3 (no aws-CLI dependency → runs cleanly in a slim container). def _s3_client(): - """boto3 S3 client for the rustfs endpoint (force path-style addressing).""" + """boto3 S3 client for the S3 endpoint (force path-style addressing).""" return boto3.client( "s3", endpoint_url=os.environ["RUSTFS_ENDPOINT"], @@ -286,7 +288,11 @@ def ingest(args) -> None: # watermark: skip anything at/older than the newest file already applied. Archiving # normally empties changes/, but this guards a failed archive from re-applying. - last_ts = _last_processed_ts() + # --reseed ignores the stored watermark and drains EVERY file in changes/ once — used + # for a one-time bucket cutover, where the stored key points at the old bucket's stream + # and its timestamp may be newer than the new bucket's first file. Crash-safe: each file + # still advances source_max_key + archives per file, so a plain rerun resumes cleanly. + last_ts = None if args.reseed else _last_processed_ts() _floor = datetime.min.replace(tzinfo=_EAT) pending = [(k, e) for k, e in listing if last_ts is None or (_ts_from_key(k) or _floor) > last_ts] @@ -610,8 +616,12 @@ def main() -> None: ap = argparse.ArgumentParser(description="Ingest INC tickets from CSV (raw-first) + geocode") ap.add_argument("--apply", action="store_true", help="Write to DB (default: dry-run)") ap.add_argument("--from-bucket", action="store_true", - help="Ingest the newest INC csv from the rustfs tickets bucket (aws CLI); " - "skips if unchanged (ETag) and archives processed files") + help="Drain the incremental INC change stream (automations/inc/changes/) " + "from the isptickets S3 bucket: every not-yet-processed file " + "oldest→newest, upsert on ticket_id, advance the watermark, archive") + ap.add_argument("--reseed", action="store_true", + help="Ignore the stored watermark and drain every file in changes/ once " + "(one-time bucket cutover / reseed). Use with --from-bucket --apply") ap.add_argument("--inc-csv", default=None, help="Local INC tickets CSV file (dev)") ap.add_argument("--geocode-clusters", action="store_true", help="Geocode distinct clusters into the gazetteer, then re-resolve geoms")