Compare commits

..

No commits in common. "f06c11fd118b3198d40a479e311913e576d1f55c" and "a4b90a33d8b30d2ebda5a009f7e1420ed35a4d42" have entirely different histories.

6 changed files with 33 additions and 63 deletions

View file

@ -3,12 +3,12 @@
# Shared database (the `tickets` schema lives in tracksolid_db; internal Docker host) # Shared database (the `tickets` schema lives in tracksolid_db; internal Docker host)
DATABASE_URL=postgresql://tracksolid_owner:<password>@timescale_db:5432/tracksolid_db DATABASE_URL=postgresql://tracksolid_owner:<password>@timescale_db:5432/tracksolid_db
# S3 — source INC ticket CDC stream (isptickets bucket, automations/inc/changes/<EAT-ts>.csv) # rustfs / S3 — source ticket snapshots (automations/inc/<EAT-timestamp>.csv)
RUSTFS_ENDPOINT=https://s3.rahamafresh.com RUSTFS_ENDPOINT=https://s3.rahamafresh.com
RUSTFS_ACCESS_KEY=isptickets RUSTFS_ACCESS_KEY=<key>
RUSTFS_SECRET_KEY=<secret> RUSTFS_SECRET_KEY=<secret>
RUSTFS_REGION=us-east-1 RUSTFS_REGION=us-east-1
TICKETS_BUCKET=isptickets TICKETS_BUCKET=tickets
# Geocoder (keyed — public Nominatim rate-limits bulk) # Geocoder (keyed — public Nominatim rate-limits bulk)
GEOCODER_PROVIDER=locationiq # locationiq | opencage GEOCODER_PROVIDER=locationiq # locationiq | opencage

View file

@ -1,7 +1,7 @@
# fleettickets — INC ingestion image (Coolify-deployable). # fleettickets — INC ingestion image (Coolify-deployable).
# A small batch/cron worker: it has no web server. Coolify keeps the container # A small batch/cron worker: it has no web server. Coolify keeps the container
# running (CMD below) and fires the ingest via a Scheduled Task: # running (CMD below) and fires the ingest via a Scheduled Task:
# python import_tickets.py --from-bucket --apply (cron: */20 6-20 * * *) # python import_tickets.py --from-bucket --apply (cron: 15 7-19 * * *)
# Env (set in Coolify): DATABASE_URL, RUSTFS_*, GEOCODER_*. S3 is via boto3 — no # Env (set in Coolify): DATABASE_URL, RUSTFS_*, GEOCODER_*. S3 is via boto3 — no
# aws CLI needed. psycopg2-binary ships its own libpq, so no build toolchain. # aws CLI needed. psycopg2-binary ships its own libpq, so no build toolchain.
FROM python:3.12-slim FROM python:3.12-slim

View file

@ -21,7 +21,7 @@ Field-ops **INC ticket** ingestion, geocoding, and read-schema that powers the
| `migrations/08_inc_open_sla_view.sql` | `tickets.inc_open_sla` view — open (`is_actionable`) tickets with **derived SLA** (`hours_open`, `sla_state` vs 48h; clock = `created_at_service``first_seen_at`), plus team/cluster/`geog` for dispatch | | `migrations/08_inc_open_sla_view.sql` | `tickets.inc_open_sla` view — open (`is_actionable`) tickets with **derived SLA** (`hours_open`, `sla_state` vs 48h; clock = `created_at_service``first_seen_at`), plus team/cluster/`geog` for dispatch |
| `migrations/09_inc_dashboard_fn.sql` | `reporting.fn_inc_dashboard(cluster, status, window, from, to)` — one JSON payload (`window` / `open` GeoJSON / `closed` GeoJSON / `metrics` / `freshness`) powering the FleetOps live INC map. Open=live, closed=windowed (EAT calendar / custom); filters AND | | `migrations/09_inc_dashboard_fn.sql` | `reporting.fn_inc_dashboard(cluster, status, window, from, to)` — one JSON payload (`window` / `open` GeoJSON / `closed` GeoJSON / `metrics` / `freshness`) powering the FleetOps live INC map. Open=live, closed=windowed (EAT calendar / custom); filters AND |
| `migrations/10_inc_history_capture.sql` | History for time-series: `tickets.closure_events` (append-only observed closures) + `tickets.inc_daily_snapshot` (per-EAT-day open backlog + flow), populated by `tickets.capture_history()` each ingest. Unlocks **backlog-over-time** | | `migrations/10_inc_history_capture.sql` | History for time-series: `tickets.closure_events` (append-only observed closures) + `tickets.inc_daily_snapshot` (per-EAT-day open backlog + flow), populated by `tickets.capture_history()` each ingest. Unlocks **backlog-over-time** |
| `import_tickets.py` | Drains the **incremental INC change stream** from the `isptickets` bucket (`automations/inc/changes/<EAT-timestamp>.csv`), upserting on `ticket_id` oldest→newest; geocodes clusters + INC locations | | `import_tickets.py` | Ingests the **newest INC CSV** from the rustfs `tickets` bucket (`automations/inc/<EAT-timestamp>.csv`) and upserts on `ticket_id`; geocodes clusters + INC locations |
| `run_migrations.py` | Applies `migrations/*.sql` in order (ledger: `tickets.schema_migrations`) | | `run_migrations.py` | Applies `migrations/*.sql` in order (ledger: `tickets.schema_migrations`) |
| `shared.py` | Minimal DB/logging helpers (self-contained — no tracksolid dependency) | | `shared.py` | Minimal DB/logging helpers (self-contained — no tracksolid dependency) |
@ -87,7 +87,7 @@ python run_migrations.py # apply the schema (idempotent)
## Run ## Run
```bash ```bash
# drain the incremental INC change stream (every new file oldest→newest, then archive) # ingest the newest INC CSV from the bucket (skip-if-unchanged, then archive)
python import_tickets.py --from-bucket --apply python import_tickets.py --from-bucket --apply
# geocode (needs GEOCODER_API_KEY) # geocode (needs GEOCODER_API_KEY)
@ -108,48 +108,28 @@ Coolify builds it and keeps the container alive (`CMD tail -f /dev/null`); the i
runs as a **Scheduled Task**, not a system crontab: runs as a **Scheduled Task**, not a system crontab:
- **Command:** `python import_tickets.py --from-bucket --apply` - **Command:** `python import_tickets.py --from-bucket --apply`
- **Frequency:** `*/20 6-20 * * *` (every 20 min, **06:0020:40 EAT**). This - **Frequency:** `15 7-19 * * *` (`:15` past each hour, **07:1519:15 EAT**). This
Coolify instance runs scheduled tasks in **EAT (Africa/Nairobi)**, so no UTC Coolify instance runs scheduled tasks in **EAT (Africa/Nairobi)**, so no UTC
conversion is needed. conversion is needed.
- **Env vars** (Coolify → Environment Variables): `DATABASE_URL` (internal DB host), - **Env vars** (Coolify → Environment Variables): `DATABASE_URL` (internal DB host),
`RUSTFS_*` (now the `isptickets` bucket credentials), `GEOCODER_*`. `RUSTFS_*`, `GEOCODER_*`.
The watermark makes a run with no new change files a cheap no-op. Skip-if-unchanged makes a run on an already-ingested snapshot a cheap no-op.
For a plain host/VM instead of Coolify, [`run_ingest.sh`](run_ingest.sh) loads `.env` For a plain host/VM instead of Coolify, [`run_ingest.sh`](run_ingest.sh) loads `.env`
and runs the ingest; schedule it with a crontab line and runs the ingest; schedule it with a crontab line
(`CRON_TZ=Africa/Nairobi` / `*/20 6-20 * * *`). (`CRON_TZ=Africa/Nairobi` / `15 7-19 * * *`).
### Bucket cutover (one-time reseed)
When the source provider moves the feed to a new bucket (e.g. `tickets``isptickets`),
the stored watermark holds a key from the *old* bucket's stream, whose timestamp may be
newer than the new bucket's first file — which would otherwise be skipped. Point the
`RUSTFS_*` creds + `TICKETS_BUCKET` at the new bucket, then drain it once with `--reseed`,
which ignores the stored watermark and ingests **every** file in `changes/` oldest→newest:
```bash
python import_tickets.py --from-bucket --reseed # dry-run first
python import_tickets.py --from-bucket --reseed --apply # commit + archive
```
Upserts are idempotent (`ticket_id` PK, rows never deleted) and the new stream's periodic
full-state re-emissions re-assert current state, so this is non-destructive and converges
even across the cutover gap. After it, the watermark is current — resume normal
`--from-bucket --apply` runs (no `--reseed`). The old bucket is left untouched.
## Notes ## Notes
- The n8n export writes an **incremental CDC change stream** to - The n8n export writes a **full current-state CSV per hour** to
`automations/inc/changes/<EAT-timestamp>.csv`: a full-state baseline followed by files `automations/inc/<EAT-timestamp>.csv` — no `latest` pointer, no metadata envelope, no
holding only the rows that changed (with periodic full-state re-emissions). No `latest` deltas. The loader lists the prefix, takes the **newest** file, and ingests it.
pointer, no metadata envelope. The loader drains **every not-yet-processed file - **Skip-if-unchanged:** the newest file's S3 **ETag** is compared to the last processed
oldest→newest** — taking only the newest would drop intermediate deltas. file's ETag (stored in `tickets.import_meta.metadata.source_etag`); if equal, the DB write
- **Watermark:** the newest file already applied is recorded in is skipped (the export re-emits byte-identical content most hours).
`tickets.import_meta.metadata.source_max_key`; runs skip anything at/older than it, so
reruns are cheap no-ops. `--reseed` ignores it for a one-time bucket cutover.
- **Upsert on `ticket_id`** (PRIMARY KEY) — duplication is impossible; rows are never - **Upsert on `ticket_id`** (PRIMARY KEY) — duplication is impossible; rows are never
deleted, so closed-ticket history accumulates. On success each file is **moved** to deleted, so closed-ticket history accumulates. On success the file is **moved** to
`automations/inc/processed/`. `automations/inc/processed/`.
- **Cleaning at ingest:** drop `is_alarm=true` rows + the `EXPORT STOPPED…` sentinel; drop - **Cleaning at ingest:** drop `is_alarm=true` rows + the `EXPORT STOPPED…` sentinel; drop
`week_start`/`week_end`, `source_s3_*`/`source_snapshot_id`, `department`/`source_type`; `week_start`/`week_end`, `source_s3_*`/`source_snapshot_id`, `department`/`source_type`;
@ -212,7 +192,7 @@ Findings to keep in mind (see the PRD for detail):
## Status / roadmap ## Status / roadmap
Live: INC ingestion deployed on Coolify (every 20 min `*/20 6-20 * * *` EAT), schema + Live: INC ingestion deployed on Coolify (hourly `15 7-19 * * *` EAT), schema +
generated columns + geocoding + the `inc_open_sla` view in `tracksolid_db`. generated columns + geocoding + the `inc_open_sla` view in `tracksolid_db`.
Next (Phase 2): time-series analytics (closure rate, MTTR/SLA trends), then FleetNow Next (Phase 2): time-series analytics (closure rate, MTTR/SLA trends), then FleetNow
vehicle **dispatch** off `geog`, and **team closure attribution**. **CRQ** is a vehicle **dispatch** off `geog`, and **team closure attribution**. **CRQ** is a

View file

@ -54,7 +54,7 @@ What is actually built and deployed, as of the Phase-1 completion. Companion to
`TZ=Africa/Nairobi`, keep-alive `tail -f /dev/null`). Separate from the FleetOps `TZ=Africa/Nairobi`, keep-alive `tail -f /dev/null`). Separate from the FleetOps
web app (`fleet-ops-staging`). web app (`fleet-ops-staging`).
- **Scheduled Task:** `python import_tickets.py --from-bucket --apply`, cron - **Scheduled Task:** `python import_tickets.py --from-bucket --apply`, cron
`*/20 6-20 * * *` in **EAT** (Coolify runs tasks in EAT — no UTC conversion). `15 7-19 * * *` in **EAT** (Coolify runs tasks in EAT — no UTC conversion).
- **Env vars** (Coolify): `DATABASE_URL` (internal DB host), `RUSTFS_*`, `GEOCODER_*`. - **Env vars** (Coolify): `DATABASE_URL` (internal DB host), `RUSTFS_*`, `GEOCODER_*`.
- For a plain host/VM, `run_ingest.sh` + a crontab line is the alternative. - For a plain host/VM, `run_ingest.sh` + a crontab line is the alternative.

View file

@ -13,8 +13,8 @@ Everything downstream reads from `raw` (resilient to source schema drift). The D
derives `geom` (see migrations): feed coords (raw lat/lng) -> location geocode derives `geom` (see migrations): feed coords (raw lat/lng) -> location geocode
(tickets.geo_locations) -> cluster centroid (tickets.geo_clusters) -> none. (tickets.geo_locations) -> cluster centroid (tickets.geo_clusters) -> none.
Source data: the n8n S3 export writes CSV files to the `isptickets` bucket under Source data: the n8n S3 export writes CSV files to the `tickets` bucket under
automations/inc/changes/<EAT-timestamp>.csv (e.g. 2026-06-24T09-55-44.csv) automations/inc/changes/<EAT-timestamp>.csv (e.g. 2026-06-22T15-50-39.csv)
This is an INCREMENTAL (CDC) stream: the first file is a full current-state This is an INCREMENTAL (CDC) stream: the first file is a full current-state
baseline, and every later file holds only the rows that CHANGED since the prior baseline, and every later file holds only the rows that CHANGED since the prior
export (new + updated tickets, keyed by ticket_id; deletions are never emitted). export (new + updated tickets, keyed by ticket_id; deletions are never emitted).
@ -39,7 +39,6 @@ Provider/key from env: GEOCODER_PROVIDER (locationiq|opencage), GEOCODER_API_KEY
Usage (needs DATABASE_URL + RUSTFS_* + GEOCODER_* env; see .env.example): Usage (needs DATABASE_URL + RUSTFS_* + GEOCODER_* env; see .env.example):
python import_tickets.py --from-bucket --apply python import_tickets.py --from-bucket --apply
python import_tickets.py --from-bucket --reseed --apply # one-time bucket cutover
python import_tickets.py --inc-csv 2026-06-15T17-00-00.csv --apply python import_tickets.py --inc-csv 2026-06-15T17-00-00.csv --apply
python import_tickets.py --geocode-clusters --apply python import_tickets.py --geocode-clusters --apply
python import_tickets.py --geocode-locations --apply python import_tickets.py --geocode-locations --apply
@ -72,7 +71,7 @@ log = get_logger("import_tickets")
# ── INC ingestion config ────────────────────────────────────────────────────── # ── INC ingestion config ──────────────────────────────────────────────────────
_TABLE = "tickets.inc" _TABLE = "tickets.inc"
_DATASET = "inc" _DATASET = "inc"
_BUCKET = os.getenv("TICKETS_BUCKET", "isptickets") _BUCKET = os.getenv("TICKETS_BUCKET", "tickets")
_INC_PREFIX = "automations/inc/changes/" # the incremental (CDC) change stream _INC_PREFIX = "automations/inc/changes/" # the incremental (CDC) change stream
_PROCESSED_PREFIX = "automations/inc/processed/" _PROCESSED_PREFIX = "automations/inc/processed/"
_EAT = timezone(timedelta(hours=3)) # Africa/Nairobi — filenames + data are EAT _EAT = timezone(timedelta(hours=3)) # Africa/Nairobi — filenames + data are EAT
@ -103,15 +102,14 @@ _GEOCODE_INTERVAL_S = float(os.getenv("GEOCODER_MIN_INTERVAL_S", "1.1"))
_last_geocode_at = 0.0 _last_geocode_at = 0.0
# ── data loading (CSV · incremental CDC change stream · per-file watermark) ───── # ── data loading (CSV · newest-file · ETag skip-if-unchanged) ───────────────────
# The n8n export writes an INCREMENTAL change stream to the `isptickets` bucket under # The n8n hourly export writes a full current-state CSV per hour to
# automations/inc/changes/<EAT-timestamp>.csv: a first full-state baseline, then files # automations/inc/<EAT-timestamp>.csv (no latest pointer, no envelope, no deltas).
# holding only the rows that CHANGED (with periodic full-state re-emissions). We drain # We ingest the NEWEST file; if its S3 ETag matches the last processed file's ETag
# EVERY not-yet-processed file oldest→newest, upsert on ticket_id, advance the watermark # we skip the DB write (the export re-emits byte-identical content most hours).
# (tickets.import_meta.metadata->>'source_max_key') per file, and archive it to processed/.
# S3 access is via boto3 (no aws-CLI dependency → runs cleanly in a slim container). # S3 access is via boto3 (no aws-CLI dependency → runs cleanly in a slim container).
def _s3_client(): def _s3_client():
"""boto3 S3 client for the S3 endpoint (force path-style addressing).""" """boto3 S3 client for the rustfs endpoint (force path-style addressing)."""
return boto3.client( return boto3.client(
"s3", "s3",
endpoint_url=os.environ["RUSTFS_ENDPOINT"], endpoint_url=os.environ["RUSTFS_ENDPOINT"],
@ -288,11 +286,7 @@ def ingest(args) -> None:
# watermark: skip anything at/older than the newest file already applied. Archiving # watermark: skip anything at/older than the newest file already applied. Archiving
# normally empties changes/, but this guards a failed archive from re-applying. # normally empties changes/, but this guards a failed archive from re-applying.
# --reseed ignores the stored watermark and drains EVERY file in changes/ once — used last_ts = _last_processed_ts()
# for a one-time bucket cutover, where the stored key points at the old bucket's stream
# and its timestamp may be newer than the new bucket's first file. Crash-safe: each file
# still advances source_max_key + archives per file, so a plain rerun resumes cleanly.
last_ts = None if args.reseed else _last_processed_ts()
_floor = datetime.min.replace(tzinfo=_EAT) _floor = datetime.min.replace(tzinfo=_EAT)
pending = [(k, e) for k, e in listing pending = [(k, e) for k, e in listing
if last_ts is None or (_ts_from_key(k) or _floor) > last_ts] if last_ts is None or (_ts_from_key(k) or _floor) > last_ts]
@ -616,12 +610,8 @@ def main() -> None:
ap = argparse.ArgumentParser(description="Ingest INC tickets from CSV (raw-first) + geocode") ap = argparse.ArgumentParser(description="Ingest INC tickets from CSV (raw-first) + geocode")
ap.add_argument("--apply", action="store_true", help="Write to DB (default: dry-run)") ap.add_argument("--apply", action="store_true", help="Write to DB (default: dry-run)")
ap.add_argument("--from-bucket", action="store_true", ap.add_argument("--from-bucket", action="store_true",
help="Drain the incremental INC change stream (automations/inc/changes/) " help="Ingest the newest INC csv from the rustfs tickets bucket (aws CLI); "
"from the isptickets S3 bucket: every not-yet-processed file " "skips if unchanged (ETag) and archives processed files")
"oldest→newest, upsert on ticket_id, advance the watermark, archive")
ap.add_argument("--reseed", action="store_true",
help="Ignore the stored watermark and drain every file in changes/ once "
"(one-time bucket cutover / reseed). Use with --from-bucket --apply")
ap.add_argument("--inc-csv", default=None, help="Local INC tickets CSV file (dev)") ap.add_argument("--inc-csv", default=None, help="Local INC tickets CSV file (dev)")
ap.add_argument("--geocode-clusters", action="store_true", ap.add_argument("--geocode-clusters", action="store_true",
help="Geocode distinct clusters into the gazetteer, then re-resolve geoms") help="Geocode distinct clusters into the gazetteer, then re-resolve geoms")

View file

@ -4,8 +4,8 @@
# Loads env from the local .env (DATABASE_URL + RUSTFS_* + GEOCODER_*) and runs the # Loads env from the local .env (DATABASE_URL + RUSTFS_* + GEOCODER_*) and runs the
# newest-INC-CSV ingest with --apply (skip-if-unchanged + archive are built in). # newest-INC-CSV ingest with --apply (skip-if-unchanged + archive are built in).
# #
# Install on the instance (ingest every 20 min, 06:0020:40 EAT): # Install on the instance (ingest at :15, 07:0019:00 EAT):
# */20 6-20 * * * /opt/fleettickets/run_ingest.sh >> /var/log/fleettickets-inc.log 2>&1 # 15 7-19 * * * /opt/fleettickets/run_ingest.sh >> /var/log/fleettickets-inc.log 2>&1
# Ensure the crontab runs in the Africa/Nairobi timezone (CRON_TZ=Africa/Nairobi or # Ensure the crontab runs in the Africa/Nairobi timezone (CRON_TZ=Africa/Nairobi or
# the host/container TZ), since the export filenames and the schedule are EAT. # the host/container TZ), since the export filenames and the schedule are EAT.
set -euo pipefail set -euo pipefail