fleettickets/docs/phase-1-ingestion.md

106 lines
5.6 KiB
Markdown
Raw Permalink Normal View History

# PRD (Phase 1) — INC hourly CSV ingestion → tracksolid_db → FleetOps Tickets map
> Status: **complete and deployed** (migrations 0108, boto3 loader, geocoding,
> Coolify hourly `15 7-19 * * *` EAT). This document is the record of the Phase-1
> plan; see `README.md` and `docs/implementation.md` for the as-built state.
## Scope: INC only
**This workflow is strictly for INC** (incident / customer-fault tickets). It
ingests **only** `automations/inc/<EAT-timestamp>.csv`. CRQ (new-installation)
exports at `automations/crq/` are **out of scope** and are not processed here; the
field transforms below are likewise INC-only.
## Context
The client (Rahamafresh / Fireside) runs an n8n workflow that exports field-ops
tickets to our S3-compatible bucket **every hour**:
- `automations/inc/<EAT-timestamp>.csv`**incidents / customer faults** *(in scope)*
- `automations/crq/<EAT-timestamp>.csv` — new-installation requests *(out of scope)*
(See `n8n-s3-ticket-exports.md`. Sample: `2026-06-15T17-00-00.csv`. Note: the
source later switched to an incremental `automations/inc/changes/` stream — that
doc has the current layout; this PRD records the original Phase-1 model.)
`fleettickets` owns the **downstream**: the `tickets` schema in the shared
`tracksolid_db` (raw-jsonb-first `tickets.inc`, geocoding gazetteers, and
`reporting.fn_tickets_for_map`, which `dashboard_api` serves to the FleetOps
"Tickets" tab). `tickets.crq` keeps existing but is not fed by this pipeline.
**The problem:** the loader was written for the *old* export model — JSON
`{metadata, records}` envelopes at a stable `automations/inc/latest.json`. That
model is gone; the new exports are **flat CSV, timestamped per hour, with no
`latest` pointer, no envelope, and no deltas** — every hourly file is a **full
current-state snapshot**.
**Two driving objectives this pipeline feeds:**
1. **SLA tracking** — contract requires tickets closed within **48h of
`created_at_service`**; closed carry source `sla_status` + `mttr`, open need a
derived state (`now created_at_service` ≥48h breached / ≥36h at-risk).
2. **Vehicle routing (most important)** — accurately geocoded open tickets so
FleetNow can route nearest vehicles; subsequent: team closure attribution.
## Data contract (verified against live snapshots)
- 32 columns; header + double-quoted values. INC sample = 31,434 rows.
- `ticket_id` is the **primary key**; the same ticket recurs across snapshots as it
moves `open → closed`. Verified: 31,434 distinct ids per file, **0 in-file dups**,
same id set every hour (0 added/dropped) → **upsert is the dedup mechanism, no
TRUNCATE**. Consecutive files are often byte-identical → skip-if-unchanged.
- `is_alarm=true` (~10,132 rows, all `is_actionable=false`) → **dropped**.
- `latitude`/`longitude` are **empty** in the feed → geocoding required.
- A garbage **sentinel row** (`ticket_id = "EXPORT STOPPED DUE TO EXCESSIVE SIZE…"`)
is commonly the first data line → filtered by `ticket_id` prefix.
- Timestamps (filenames + data) are **EAT (Africa/Nairobi, UTC+3)**.
- `bucket` is meaningful (`closed`/`pending`), distinct from `source_s3_bucket`.
## Approach
Keep the **raw-jsonb-first** model and everything downstream; only the loader's
input path changes: JSON-`latest` → **newest timestamped CSV**, plus move-on-success.
- **Newest file** per `automations/inc/` (parse `YYYY-MM-DDTHH-mm-ss.csv`), via
**boto3** (path-style; no aws-CLI dependency).
- **Skip-if-unchanged**: compare newest S3 **ETag** to the last processed ETag
(`tickets.import_meta.metadata.source_etag`); equal → skip DB write.
- **Cleaning at ingest**: drop `is_alarm=true` + sentinel; drop `week_start`,
`week_end`, `source_s3_bucket`, `source_s3_key`, `source_snapshot_id`,
`department`, `source_type`; normalize `region`→lowercase, `raw_status`→UPPERCASE;
keep `service_type`* and `bucket`. (*`service_type` later dropped as constant.)
- **Upsert** on `ticket_id` (`ON CONFLICT DO UPDATE`); never delete → closure
history accumulates. On success **move** the file(s) to
`automations/inc/processed/`.
- Record snapshot freshness in `tickets.import_meta`.
- Geocoding unchanged: `--geocode-clusters` (coarse) + `--geocode-locations`
(precise, actionable INC; keyed LocationIQ; 25 km wrong-city guard).
## Orchestration
Deployed on **Coolify** (own app, `Dockerfile`, keep-alive worker). Ingest runs as a
**Scheduled Task**: `python import_tickets.py --from-bucket --apply`, cron
`15 7-19 * * *` in **EAT**. Env: `DATABASE_URL`, `RUSTFS_*`, `GEOCODER_*`.
2026-06-25 20:16:38 +00:00
> **Superseded** (historical Phase-1 plan). As built: the loader is now the shared
> `pipeline.py` engine with thin entrypoints (`python -m inc.import_inc` / `-m
> crq.import_crq`), running as **two** Scheduled Tasks at cron `*/20 6-20 * * *`. See
> `implementation.md` and `deployment-and-operations.md`.
## Data-quality findings (carried into Phase 2)
- Source `sla_status` ≠ a plain 48h rule, and `mttr` is not wall-clock — pin the
contract's SLA definition before trusting cross-field SLA math.
- `created_at_service` is null on ~30% of rows (incl. most open) → needs a fallback
clock (`first_seen_at`).
- Split timestamp semantics: lifecycle = `created_at_service`→`closed_at`; export
bookkeeping = `created_at`/`updated_at`/`first_seen_at`/`last_seen_at`.
- `assigned_team` missing ~34% (`owner` better).
- Content lag ~2 days (underlying `…wm_task.xlsx` source date).
## Outcome (as built)
Live in `tracksolid_db`: `tickets.inc` (raw + typed generated columns), geocoded to
~99.99%, alarm/sentinel removed, hourly refresh with ETag skip + archive. See
`docs/implementation.md`.