2026-06-15 22:05:18 +00:00
|
|
|
|
# PRD (Phase 1) — INC hourly CSV ingestion → tracksolid_db → FleetOps Tickets map
|
|
|
|
|
|
|
|
|
|
|
|
> Status: **complete and deployed** (migrations 01–08, boto3 loader, geocoding,
|
|
|
|
|
|
> Coolify hourly `15 7-19 * * *` EAT). This document is the record of the Phase-1
|
|
|
|
|
|
> plan; see `README.md` and `docs/implementation.md` for the as-built state.
|
|
|
|
|
|
|
|
|
|
|
|
## Scope: INC only
|
|
|
|
|
|
|
|
|
|
|
|
**This workflow is strictly for INC** (incident / customer-fault tickets). It
|
|
|
|
|
|
ingests **only** `automations/inc/<EAT-timestamp>.csv`. CRQ (new-installation)
|
|
|
|
|
|
exports at `automations/crq/` are **out of scope** and are not processed here; the
|
|
|
|
|
|
field transforms below are likewise INC-only.
|
|
|
|
|
|
|
|
|
|
|
|
## Context
|
|
|
|
|
|
|
|
|
|
|
|
The client (Rahamafresh / Fireside) runs an n8n workflow that exports field-ops
|
|
|
|
|
|
tickets to our S3-compatible bucket **every hour**:
|
|
|
|
|
|
|
|
|
|
|
|
- `automations/inc/<EAT-timestamp>.csv` — **incidents / customer faults** *(in scope)*
|
|
|
|
|
|
- `automations/crq/<EAT-timestamp>.csv` — new-installation requests *(out of scope)*
|
|
|
|
|
|
|
fix(inc): ingest the incremental changes/ stream (baseline + deltas)
The S3 source switched from full hourly snapshots at
automations/inc/<ts>.csv to an incremental CDC stream at
automations/inc/changes/<ts>.csv (first file = full baseline, each later
file = only the rows that changed, keyed by ticket_id; no deletions).
The loader still pointed at the old root path and only ingested the single
newest file, so after the switch it found nothing (no new tickets ingested)
and, even with the path fixed, would silently drop intermediate deltas.
Changes:
- point ingestion at automations/inc/changes/ (_CHANGE_KEY_RE)
- ingest EVERY not-yet-processed file in ascending timestamp order
(baseline first, then each delta), upserting each
- replace the single-ETag skip with a per-file timestamp watermark
(import_meta.metadata->>'source_max_key'); rows + watermark commit in one
txn per file, then archive to processed/ — so a mid-run failure leaves a
consistent, resumable state
- docs: rename n8n-hourly-s3-full-data-exports.md -> n8n-s3-ticket-exports.md
and rewrite it for the incremental stream; fix the reference in
docs/phase-1-ingestion.md
Verified live against prod: re-seeded baseline + 5 deltas (26,529 rows),
files archived to processed/, watermark advanced, re-run is a no-op.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-23 11:37:17 +00:00
|
|
|
|
(See `n8n-s3-ticket-exports.md`. Sample: `2026-06-15T17-00-00.csv`. Note: the
|
|
|
|
|
|
source later switched to an incremental `automations/inc/changes/` stream — that
|
|
|
|
|
|
doc has the current layout; this PRD records the original Phase-1 model.)
|
2026-06-15 22:05:18 +00:00
|
|
|
|
|
|
|
|
|
|
`fleettickets` owns the **downstream**: the `tickets` schema in the shared
|
|
|
|
|
|
`tracksolid_db` (raw-jsonb-first `tickets.inc`, geocoding gazetteers, and
|
|
|
|
|
|
`reporting.fn_tickets_for_map`, which `dashboard_api` serves to the FleetOps
|
|
|
|
|
|
"Tickets" tab). `tickets.crq` keeps existing but is not fed by this pipeline.
|
|
|
|
|
|
|
|
|
|
|
|
**The problem:** the loader was written for the *old* export model — JSON
|
|
|
|
|
|
`{metadata, records}` envelopes at a stable `automations/inc/latest.json`. That
|
|
|
|
|
|
model is gone; the new exports are **flat CSV, timestamped per hour, with no
|
|
|
|
|
|
`latest` pointer, no envelope, and no deltas** — every hourly file is a **full
|
|
|
|
|
|
current-state snapshot**.
|
|
|
|
|
|
|
|
|
|
|
|
**Two driving objectives this pipeline feeds:**
|
|
|
|
|
|
|
|
|
|
|
|
1. **SLA tracking** — contract requires tickets closed within **48h of
|
|
|
|
|
|
`created_at_service`**; closed carry source `sla_status` + `mttr`, open need a
|
|
|
|
|
|
derived state (`now − created_at_service` ≥48h breached / ≥36h at-risk).
|
|
|
|
|
|
2. **Vehicle routing (most important)** — accurately geocoded open tickets so
|
|
|
|
|
|
FleetNow can route nearest vehicles; subsequent: team closure attribution.
|
|
|
|
|
|
|
|
|
|
|
|
## Data contract (verified against live snapshots)
|
|
|
|
|
|
|
|
|
|
|
|
- 32 columns; header + double-quoted values. INC sample = 31,434 rows.
|
|
|
|
|
|
- `ticket_id` is the **primary key**; the same ticket recurs across snapshots as it
|
|
|
|
|
|
moves `open → closed`. Verified: 31,434 distinct ids per file, **0 in-file dups**,
|
|
|
|
|
|
same id set every hour (0 added/dropped) → **upsert is the dedup mechanism, no
|
|
|
|
|
|
TRUNCATE**. Consecutive files are often byte-identical → skip-if-unchanged.
|
|
|
|
|
|
- `is_alarm=true` (~10,132 rows, all `is_actionable=false`) → **dropped**.
|
|
|
|
|
|
- `latitude`/`longitude` are **empty** in the feed → geocoding required.
|
|
|
|
|
|
- A garbage **sentinel row** (`ticket_id = "EXPORT STOPPED DUE TO EXCESSIVE SIZE…"`)
|
|
|
|
|
|
is commonly the first data line → filtered by `ticket_id` prefix.
|
|
|
|
|
|
- Timestamps (filenames + data) are **EAT (Africa/Nairobi, UTC+3)**.
|
|
|
|
|
|
- `bucket` is meaningful (`closed`/`pending`), distinct from `source_s3_bucket`.
|
|
|
|
|
|
|
|
|
|
|
|
## Approach
|
|
|
|
|
|
|
|
|
|
|
|
Keep the **raw-jsonb-first** model and everything downstream; only the loader's
|
|
|
|
|
|
input path changes: JSON-`latest` → **newest timestamped CSV**, plus move-on-success.
|
|
|
|
|
|
|
|
|
|
|
|
- **Newest file** per `automations/inc/` (parse `YYYY-MM-DDTHH-mm-ss.csv`), via
|
|
|
|
|
|
**boto3** (path-style; no aws-CLI dependency).
|
|
|
|
|
|
- **Skip-if-unchanged**: compare newest S3 **ETag** to the last processed ETag
|
|
|
|
|
|
(`tickets.import_meta.metadata.source_etag`); equal → skip DB write.
|
|
|
|
|
|
- **Cleaning at ingest**: drop `is_alarm=true` + sentinel; drop `week_start`,
|
|
|
|
|
|
`week_end`, `source_s3_bucket`, `source_s3_key`, `source_snapshot_id`,
|
|
|
|
|
|
`department`, `source_type`; normalize `region`→lowercase, `raw_status`→UPPERCASE;
|
|
|
|
|
|
keep `service_type`* and `bucket`. (*`service_type` later dropped as constant.)
|
|
|
|
|
|
- **Upsert** on `ticket_id` (`ON CONFLICT DO UPDATE`); never delete → closure
|
|
|
|
|
|
history accumulates. On success **move** the file(s) to
|
|
|
|
|
|
`automations/inc/processed/`.
|
|
|
|
|
|
- Record snapshot freshness in `tickets.import_meta`.
|
|
|
|
|
|
- Geocoding unchanged: `--geocode-clusters` (coarse) + `--geocode-locations`
|
|
|
|
|
|
(precise, actionable INC; keyed LocationIQ; 25 km wrong-city guard).
|
|
|
|
|
|
|
|
|
|
|
|
## Orchestration
|
|
|
|
|
|
|
|
|
|
|
|
Deployed on **Coolify** (own app, `Dockerfile`, keep-alive worker). Ingest runs as a
|
|
|
|
|
|
**Scheduled Task**: `python import_tickets.py --from-bucket --apply`, cron
|
|
|
|
|
|
`15 7-19 * * *` in **EAT**. Env: `DATABASE_URL`, `RUSTFS_*`, `GEOCODER_*`.
|
|
|
|
|
|
|
feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints
Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.
- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
packages inc/crq. Docs (README, implementation, deployment-and-operations,
n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.
tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 20:16:38 +00:00
|
|
|
|
> **Superseded** (historical Phase-1 plan). As built: the loader is now the shared
|
|
|
|
|
|
> `pipeline.py` engine with thin entrypoints (`python -m inc.import_inc` / `-m
|
|
|
|
|
|
> crq.import_crq`), running as **two** Scheduled Tasks at cron `*/20 6-20 * * *`. See
|
|
|
|
|
|
> `implementation.md` and `deployment-and-operations.md`.
|
|
|
|
|
|
|
2026-06-15 22:05:18 +00:00
|
|
|
|
## Data-quality findings (carried into Phase 2)
|
|
|
|
|
|
|
|
|
|
|
|
- Source `sla_status` ≠ a plain 48h rule, and `mttr` is not wall-clock — pin the
|
|
|
|
|
|
contract's SLA definition before trusting cross-field SLA math.
|
|
|
|
|
|
- `created_at_service` is null on ~30% of rows (incl. most open) → needs a fallback
|
|
|
|
|
|
clock (`first_seen_at`).
|
|
|
|
|
|
- Split timestamp semantics: lifecycle = `created_at_service`→`closed_at`; export
|
|
|
|
|
|
bookkeeping = `created_at`/`updated_at`/`first_seen_at`/`last_seen_at`.
|
|
|
|
|
|
- `assigned_team` missing ~34% (`owner` better).
|
|
|
|
|
|
- Content lag ~2 days (underlying `…wm_task.xlsx` source date).
|
|
|
|
|
|
|
|
|
|
|
|
## Outcome (as built)
|
|
|
|
|
|
|
|
|
|
|
|
Live in `tracksolid_db`: `tickets.inc` (raw + typed generated columns), geocoded to
|
|
|
|
|
|
~99.99%, alarm/sentinel removed, hourly refresh with ETag skip + archive. See
|
|
|
|
|
|
`docs/implementation.md`.
|