Compare commits

..

3 commits

Author SHA1 Message Date
david kiania
f06c11fd11 Merge fix/inc-changes-stream: isptickets bucket cutover + --reseed + 20-min cron 2026-06-25 18:40:57 +03:00
david kiania
7d3bba8d78 chore(schedule): INC ingest cron -> every 20 min, 06:00-20:40 EAT
Was hourly at :15 (15 7-19 * * *); now */20 6-20 * * * for fresher ticket
data through the working day. Updates the documented schedule in the Coolify
Scheduled Task command, run_ingest.sh, Dockerfile, README, and implementation
notes (the live schedule is set in the Coolify UI).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 18:23:17 +03:00
david kiania
509338c076 feat(import_tickets): migrate INC ingest to isptickets bucket + --reseed cutover
Provider moved the INC CDC feed to a new bucket (tickets -> isptickets, new
per-bucket creds; same s3.rahamafresh.com endpoint, identical 32-col schema).
This is config + a one-time reseed, not a rewrite — the loader already drains
automations/inc/changes/ oldest->newest with a source_max_key watermark.

- default _BUCKET -> isptickets (TICKETS_BUCKET still overrides)
- add --reseed: ignore the stored watermark and drain every changes/ file once
  (the old-bucket watermark may post-date the new bucket's first file). Crash-safe
  via the existing per-file watermark-advance + archive loop.
- refresh stale "newest-file / full-snapshot-per-hour" docstring/comments to the
  CDC reality; .env.example + README updated (new bucket + reseed runbook).

Verified live dry-run: 41/41 files drained (watermark None), alarm/sentinel
filter active, exit 0.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 18:20:15 +03:00
6 changed files with 63 additions and 33 deletions

View file

@ -3,12 +3,12 @@
# Shared database (the `tickets` schema lives in tracksolid_db; internal Docker host) # Shared database (the `tickets` schema lives in tracksolid_db; internal Docker host)
DATABASE_URL=postgresql://tracksolid_owner:<password>@timescale_db:5432/tracksolid_db DATABASE_URL=postgresql://tracksolid_owner:<password>@timescale_db:5432/tracksolid_db
# rustfs / S3 — source ticket snapshots (automations/inc/<EAT-timestamp>.csv) # S3 — source INC ticket CDC stream (isptickets bucket, automations/inc/changes/<EAT-ts>.csv)
RUSTFS_ENDPOINT=https://s3.rahamafresh.com RUSTFS_ENDPOINT=https://s3.rahamafresh.com
RUSTFS_ACCESS_KEY=<key> RUSTFS_ACCESS_KEY=isptickets
RUSTFS_SECRET_KEY=<secret> RUSTFS_SECRET_KEY=<secret>
RUSTFS_REGION=us-east-1 RUSTFS_REGION=us-east-1
TICKETS_BUCKET=tickets TICKETS_BUCKET=isptickets
# Geocoder (keyed — public Nominatim rate-limits bulk) # Geocoder (keyed — public Nominatim rate-limits bulk)
GEOCODER_PROVIDER=locationiq # locationiq | opencage GEOCODER_PROVIDER=locationiq # locationiq | opencage

View file

@ -1,7 +1,7 @@
# fleettickets — INC ingestion image (Coolify-deployable). # fleettickets — INC ingestion image (Coolify-deployable).
# A small batch/cron worker: it has no web server. Coolify keeps the container # A small batch/cron worker: it has no web server. Coolify keeps the container
# running (CMD below) and fires the ingest via a Scheduled Task: # running (CMD below) and fires the ingest via a Scheduled Task:
# python import_tickets.py --from-bucket --apply (cron: 15 7-19 * * *) # python import_tickets.py --from-bucket --apply (cron: */20 6-20 * * *)
# Env (set in Coolify): DATABASE_URL, RUSTFS_*, GEOCODER_*. S3 is via boto3 — no # Env (set in Coolify): DATABASE_URL, RUSTFS_*, GEOCODER_*. S3 is via boto3 — no
# aws CLI needed. psycopg2-binary ships its own libpq, so no build toolchain. # aws CLI needed. psycopg2-binary ships its own libpq, so no build toolchain.
FROM python:3.12-slim FROM python:3.12-slim

View file

@ -21,7 +21,7 @@ Field-ops **INC ticket** ingestion, geocoding, and read-schema that powers the
| `migrations/08_inc_open_sla_view.sql` | `tickets.inc_open_sla` view — open (`is_actionable`) tickets with **derived SLA** (`hours_open`, `sla_state` vs 48h; clock = `created_at_service``first_seen_at`), plus team/cluster/`geog` for dispatch | | `migrations/08_inc_open_sla_view.sql` | `tickets.inc_open_sla` view — open (`is_actionable`) tickets with **derived SLA** (`hours_open`, `sla_state` vs 48h; clock = `created_at_service``first_seen_at`), plus team/cluster/`geog` for dispatch |
| `migrations/09_inc_dashboard_fn.sql` | `reporting.fn_inc_dashboard(cluster, status, window, from, to)` — one JSON payload (`window` / `open` GeoJSON / `closed` GeoJSON / `metrics` / `freshness`) powering the FleetOps live INC map. Open=live, closed=windowed (EAT calendar / custom); filters AND | | `migrations/09_inc_dashboard_fn.sql` | `reporting.fn_inc_dashboard(cluster, status, window, from, to)` — one JSON payload (`window` / `open` GeoJSON / `closed` GeoJSON / `metrics` / `freshness`) powering the FleetOps live INC map. Open=live, closed=windowed (EAT calendar / custom); filters AND |
| `migrations/10_inc_history_capture.sql` | History for time-series: `tickets.closure_events` (append-only observed closures) + `tickets.inc_daily_snapshot` (per-EAT-day open backlog + flow), populated by `tickets.capture_history()` each ingest. Unlocks **backlog-over-time** | | `migrations/10_inc_history_capture.sql` | History for time-series: `tickets.closure_events` (append-only observed closures) + `tickets.inc_daily_snapshot` (per-EAT-day open backlog + flow), populated by `tickets.capture_history()` each ingest. Unlocks **backlog-over-time** |
| `import_tickets.py` | Ingests the **newest INC CSV** from the rustfs `tickets` bucket (`automations/inc/<EAT-timestamp>.csv`) and upserts on `ticket_id`; geocodes clusters + INC locations | | `import_tickets.py` | Drains the **incremental INC change stream** from the `isptickets` bucket (`automations/inc/changes/<EAT-timestamp>.csv`), upserting on `ticket_id` oldest→newest; geocodes clusters + INC locations |
| `run_migrations.py` | Applies `migrations/*.sql` in order (ledger: `tickets.schema_migrations`) | | `run_migrations.py` | Applies `migrations/*.sql` in order (ledger: `tickets.schema_migrations`) |
| `shared.py` | Minimal DB/logging helpers (self-contained — no tracksolid dependency) | | `shared.py` | Minimal DB/logging helpers (self-contained — no tracksolid dependency) |
@ -87,7 +87,7 @@ python run_migrations.py # apply the schema (idempotent)
## Run ## Run
```bash ```bash
# ingest the newest INC CSV from the bucket (skip-if-unchanged, then archive) # drain the incremental INC change stream (every new file oldest→newest, then archive)
python import_tickets.py --from-bucket --apply python import_tickets.py --from-bucket --apply
# geocode (needs GEOCODER_API_KEY) # geocode (needs GEOCODER_API_KEY)
@ -108,28 +108,48 @@ Coolify builds it and keeps the container alive (`CMD tail -f /dev/null`); the i
runs as a **Scheduled Task**, not a system crontab: runs as a **Scheduled Task**, not a system crontab:
- **Command:** `python import_tickets.py --from-bucket --apply` - **Command:** `python import_tickets.py --from-bucket --apply`
- **Frequency:** `15 7-19 * * *` (`:15` past each hour, **07:1519:15 EAT**). This - **Frequency:** `*/20 6-20 * * *` (every 20 min, **06:0020:40 EAT**). This
Coolify instance runs scheduled tasks in **EAT (Africa/Nairobi)**, so no UTC Coolify instance runs scheduled tasks in **EAT (Africa/Nairobi)**, so no UTC
conversion is needed. conversion is needed.
- **Env vars** (Coolify → Environment Variables): `DATABASE_URL` (internal DB host), - **Env vars** (Coolify → Environment Variables): `DATABASE_URL` (internal DB host),
`RUSTFS_*`, `GEOCODER_*`. `RUSTFS_*` (now the `isptickets` bucket credentials), `GEOCODER_*`.
Skip-if-unchanged makes a run on an already-ingested snapshot a cheap no-op. The watermark makes a run with no new change files a cheap no-op.
For a plain host/VM instead of Coolify, [`run_ingest.sh`](run_ingest.sh) loads `.env` For a plain host/VM instead of Coolify, [`run_ingest.sh`](run_ingest.sh) loads `.env`
and runs the ingest; schedule it with a crontab line and runs the ingest; schedule it with a crontab line
(`CRON_TZ=Africa/Nairobi` / `15 7-19 * * *`). (`CRON_TZ=Africa/Nairobi` / `*/20 6-20 * * *`).
### Bucket cutover (one-time reseed)
When the source provider moves the feed to a new bucket (e.g. `tickets``isptickets`),
the stored watermark holds a key from the *old* bucket's stream, whose timestamp may be
newer than the new bucket's first file — which would otherwise be skipped. Point the
`RUSTFS_*` creds + `TICKETS_BUCKET` at the new bucket, then drain it once with `--reseed`,
which ignores the stored watermark and ingests **every** file in `changes/` oldest→newest:
```bash
python import_tickets.py --from-bucket --reseed # dry-run first
python import_tickets.py --from-bucket --reseed --apply # commit + archive
```
Upserts are idempotent (`ticket_id` PK, rows never deleted) and the new stream's periodic
full-state re-emissions re-assert current state, so this is non-destructive and converges
even across the cutover gap. After it, the watermark is current — resume normal
`--from-bucket --apply` runs (no `--reseed`). The old bucket is left untouched.
## Notes ## Notes
- The n8n export writes a **full current-state CSV per hour** to - The n8n export writes an **incremental CDC change stream** to
`automations/inc/<EAT-timestamp>.csv` — no `latest` pointer, no metadata envelope, no `automations/inc/changes/<EAT-timestamp>.csv`: a full-state baseline followed by files
deltas. The loader lists the prefix, takes the **newest** file, and ingests it. holding only the rows that changed (with periodic full-state re-emissions). No `latest`
- **Skip-if-unchanged:** the newest file's S3 **ETag** is compared to the last processed pointer, no metadata envelope. The loader drains **every not-yet-processed file
file's ETag (stored in `tickets.import_meta.metadata.source_etag`); if equal, the DB write oldest→newest** — taking only the newest would drop intermediate deltas.
is skipped (the export re-emits byte-identical content most hours). - **Watermark:** the newest file already applied is recorded in
`tickets.import_meta.metadata.source_max_key`; runs skip anything at/older than it, so
reruns are cheap no-ops. `--reseed` ignores it for a one-time bucket cutover.
- **Upsert on `ticket_id`** (PRIMARY KEY) — duplication is impossible; rows are never - **Upsert on `ticket_id`** (PRIMARY KEY) — duplication is impossible; rows are never
deleted, so closed-ticket history accumulates. On success the file is **moved** to deleted, so closed-ticket history accumulates. On success each file is **moved** to
`automations/inc/processed/`. `automations/inc/processed/`.
- **Cleaning at ingest:** drop `is_alarm=true` rows + the `EXPORT STOPPED…` sentinel; drop - **Cleaning at ingest:** drop `is_alarm=true` rows + the `EXPORT STOPPED…` sentinel; drop
`week_start`/`week_end`, `source_s3_*`/`source_snapshot_id`, `department`/`source_type`; `week_start`/`week_end`, `source_s3_*`/`source_snapshot_id`, `department`/`source_type`;
@ -192,7 +212,7 @@ Findings to keep in mind (see the PRD for detail):
## Status / roadmap ## Status / roadmap
Live: INC ingestion deployed on Coolify (hourly `15 7-19 * * *` EAT), schema + Live: INC ingestion deployed on Coolify (every 20 min `*/20 6-20 * * *` EAT), schema +
generated columns + geocoding + the `inc_open_sla` view in `tracksolid_db`. generated columns + geocoding + the `inc_open_sla` view in `tracksolid_db`.
Next (Phase 2): time-series analytics (closure rate, MTTR/SLA trends), then FleetNow Next (Phase 2): time-series analytics (closure rate, MTTR/SLA trends), then FleetNow
vehicle **dispatch** off `geog`, and **team closure attribution**. **CRQ** is a vehicle **dispatch** off `geog`, and **team closure attribution**. **CRQ** is a

View file

@ -54,7 +54,7 @@ What is actually built and deployed, as of the Phase-1 completion. Companion to
`TZ=Africa/Nairobi`, keep-alive `tail -f /dev/null`). Separate from the FleetOps `TZ=Africa/Nairobi`, keep-alive `tail -f /dev/null`). Separate from the FleetOps
web app (`fleet-ops-staging`). web app (`fleet-ops-staging`).
- **Scheduled Task:** `python import_tickets.py --from-bucket --apply`, cron - **Scheduled Task:** `python import_tickets.py --from-bucket --apply`, cron
`15 7-19 * * *` in **EAT** (Coolify runs tasks in EAT — no UTC conversion). `*/20 6-20 * * *` in **EAT** (Coolify runs tasks in EAT — no UTC conversion).
- **Env vars** (Coolify): `DATABASE_URL` (internal DB host), `RUSTFS_*`, `GEOCODER_*`. - **Env vars** (Coolify): `DATABASE_URL` (internal DB host), `RUSTFS_*`, `GEOCODER_*`.
- For a plain host/VM, `run_ingest.sh` + a crontab line is the alternative. - For a plain host/VM, `run_ingest.sh` + a crontab line is the alternative.

View file

@ -13,8 +13,8 @@ Everything downstream reads from `raw` (resilient to source schema drift). The D
derives `geom` (see migrations): feed coords (raw lat/lng) -> location geocode derives `geom` (see migrations): feed coords (raw lat/lng) -> location geocode
(tickets.geo_locations) -> cluster centroid (tickets.geo_clusters) -> none. (tickets.geo_locations) -> cluster centroid (tickets.geo_clusters) -> none.
Source data: the n8n S3 export writes CSV files to the `tickets` bucket under Source data: the n8n S3 export writes CSV files to the `isptickets` bucket under
automations/inc/changes/<EAT-timestamp>.csv (e.g. 2026-06-22T15-50-39.csv) automations/inc/changes/<EAT-timestamp>.csv (e.g. 2026-06-24T09-55-44.csv)
This is an INCREMENTAL (CDC) stream: the first file is a full current-state This is an INCREMENTAL (CDC) stream: the first file is a full current-state
baseline, and every later file holds only the rows that CHANGED since the prior baseline, and every later file holds only the rows that CHANGED since the prior
export (new + updated tickets, keyed by ticket_id; deletions are never emitted). export (new + updated tickets, keyed by ticket_id; deletions are never emitted).
@ -39,6 +39,7 @@ Provider/key from env: GEOCODER_PROVIDER (locationiq|opencage), GEOCODER_API_KEY
Usage (needs DATABASE_URL + RUSTFS_* + GEOCODER_* env; see .env.example): Usage (needs DATABASE_URL + RUSTFS_* + GEOCODER_* env; see .env.example):
python import_tickets.py --from-bucket --apply python import_tickets.py --from-bucket --apply
python import_tickets.py --from-bucket --reseed --apply # one-time bucket cutover
python import_tickets.py --inc-csv 2026-06-15T17-00-00.csv --apply python import_tickets.py --inc-csv 2026-06-15T17-00-00.csv --apply
python import_tickets.py --geocode-clusters --apply python import_tickets.py --geocode-clusters --apply
python import_tickets.py --geocode-locations --apply python import_tickets.py --geocode-locations --apply
@ -71,7 +72,7 @@ log = get_logger("import_tickets")
# ── INC ingestion config ────────────────────────────────────────────────────── # ── INC ingestion config ──────────────────────────────────────────────────────
_TABLE = "tickets.inc" _TABLE = "tickets.inc"
_DATASET = "inc" _DATASET = "inc"
_BUCKET = os.getenv("TICKETS_BUCKET", "tickets") _BUCKET = os.getenv("TICKETS_BUCKET", "isptickets")
_INC_PREFIX = "automations/inc/changes/" # the incremental (CDC) change stream _INC_PREFIX = "automations/inc/changes/" # the incremental (CDC) change stream
_PROCESSED_PREFIX = "automations/inc/processed/" _PROCESSED_PREFIX = "automations/inc/processed/"
_EAT = timezone(timedelta(hours=3)) # Africa/Nairobi — filenames + data are EAT _EAT = timezone(timedelta(hours=3)) # Africa/Nairobi — filenames + data are EAT
@ -102,14 +103,15 @@ _GEOCODE_INTERVAL_S = float(os.getenv("GEOCODER_MIN_INTERVAL_S", "1.1"))
_last_geocode_at = 0.0 _last_geocode_at = 0.0
# ── data loading (CSV · newest-file · ETag skip-if-unchanged) ─────────────────── # ── data loading (CSV · incremental CDC change stream · per-file watermark) ─────
# The n8n hourly export writes a full current-state CSV per hour to # The n8n export writes an INCREMENTAL change stream to the `isptickets` bucket under
# automations/inc/<EAT-timestamp>.csv (no latest pointer, no envelope, no deltas). # automations/inc/changes/<EAT-timestamp>.csv: a first full-state baseline, then files
# We ingest the NEWEST file; if its S3 ETag matches the last processed file's ETag # holding only the rows that CHANGED (with periodic full-state re-emissions). We drain
# we skip the DB write (the export re-emits byte-identical content most hours). # EVERY not-yet-processed file oldest→newest, upsert on ticket_id, advance the watermark
# (tickets.import_meta.metadata->>'source_max_key') per file, and archive it to processed/.
# S3 access is via boto3 (no aws-CLI dependency → runs cleanly in a slim container). # S3 access is via boto3 (no aws-CLI dependency → runs cleanly in a slim container).
def _s3_client(): def _s3_client():
"""boto3 S3 client for the rustfs endpoint (force path-style addressing).""" """boto3 S3 client for the S3 endpoint (force path-style addressing)."""
return boto3.client( return boto3.client(
"s3", "s3",
endpoint_url=os.environ["RUSTFS_ENDPOINT"], endpoint_url=os.environ["RUSTFS_ENDPOINT"],
@ -286,7 +288,11 @@ def ingest(args) -> None:
# watermark: skip anything at/older than the newest file already applied. Archiving # watermark: skip anything at/older than the newest file already applied. Archiving
# normally empties changes/, but this guards a failed archive from re-applying. # normally empties changes/, but this guards a failed archive from re-applying.
last_ts = _last_processed_ts() # --reseed ignores the stored watermark and drains EVERY file in changes/ once — used
# for a one-time bucket cutover, where the stored key points at the old bucket's stream
# and its timestamp may be newer than the new bucket's first file. Crash-safe: each file
# still advances source_max_key + archives per file, so a plain rerun resumes cleanly.
last_ts = None if args.reseed else _last_processed_ts()
_floor = datetime.min.replace(tzinfo=_EAT) _floor = datetime.min.replace(tzinfo=_EAT)
pending = [(k, e) for k, e in listing pending = [(k, e) for k, e in listing
if last_ts is None or (_ts_from_key(k) or _floor) > last_ts] if last_ts is None or (_ts_from_key(k) or _floor) > last_ts]
@ -610,8 +616,12 @@ def main() -> None:
ap = argparse.ArgumentParser(description="Ingest INC tickets from CSV (raw-first) + geocode") ap = argparse.ArgumentParser(description="Ingest INC tickets from CSV (raw-first) + geocode")
ap.add_argument("--apply", action="store_true", help="Write to DB (default: dry-run)") ap.add_argument("--apply", action="store_true", help="Write to DB (default: dry-run)")
ap.add_argument("--from-bucket", action="store_true", ap.add_argument("--from-bucket", action="store_true",
help="Ingest the newest INC csv from the rustfs tickets bucket (aws CLI); " help="Drain the incremental INC change stream (automations/inc/changes/) "
"skips if unchanged (ETag) and archives processed files") "from the isptickets S3 bucket: every not-yet-processed file "
"oldest→newest, upsert on ticket_id, advance the watermark, archive")
ap.add_argument("--reseed", action="store_true",
help="Ignore the stored watermark and drain every file in changes/ once "
"(one-time bucket cutover / reseed). Use with --from-bucket --apply")
ap.add_argument("--inc-csv", default=None, help="Local INC tickets CSV file (dev)") ap.add_argument("--inc-csv", default=None, help="Local INC tickets CSV file (dev)")
ap.add_argument("--geocode-clusters", action="store_true", ap.add_argument("--geocode-clusters", action="store_true",
help="Geocode distinct clusters into the gazetteer, then re-resolve geoms") help="Geocode distinct clusters into the gazetteer, then re-resolve geoms")

View file

@ -4,8 +4,8 @@
# Loads env from the local .env (DATABASE_URL + RUSTFS_* + GEOCODER_*) and runs the # Loads env from the local .env (DATABASE_URL + RUSTFS_* + GEOCODER_*) and runs the
# newest-INC-CSV ingest with --apply (skip-if-unchanged + archive are built in). # newest-INC-CSV ingest with --apply (skip-if-unchanged + archive are built in).
# #
# Install on the instance (ingest at :15, 07:0019:00 EAT): # Install on the instance (ingest every 20 min, 06:0020:40 EAT):
# 15 7-19 * * * /opt/fleettickets/run_ingest.sh >> /var/log/fleettickets-inc.log 2>&1 # */20 6-20 * * * /opt/fleettickets/run_ingest.sh >> /var/log/fleettickets-inc.log 2>&1
# Ensure the crontab runs in the Africa/Nairobi timezone (CRON_TZ=Africa/Nairobi or # Ensure the crontab runs in the Africa/Nairobi timezone (CRON_TZ=Africa/Nairobi or
# the host/container TZ), since the export filenames and the schedule are EAT. # the host/container TZ), since the export filenames and the schedule are EAT.
set -euo pipefail set -euo pipefail