fleettickets/docs/phase-1-ingestion.md
david kiania a4b90a33d8 fix(inc): ingest the incremental changes/ stream (baseline + deltas)
The S3 source switched from full hourly snapshots at
automations/inc/<ts>.csv to an incremental CDC stream at
automations/inc/changes/<ts>.csv (first file = full baseline, each later
file = only the rows that changed, keyed by ticket_id; no deletions).

The loader still pointed at the old root path and only ingested the single
newest file, so after the switch it found nothing (no new tickets ingested)
and, even with the path fixed, would silently drop intermediate deltas.

Changes:
- point ingestion at automations/inc/changes/ (_CHANGE_KEY_RE)
- ingest EVERY not-yet-processed file in ascending timestamp order
  (baseline first, then each delta), upserting each
- replace the single-ETag skip with a per-file timestamp watermark
  (import_meta.metadata->>'source_max_key'); rows + watermark commit in one
  txn per file, then archive to processed/ — so a mid-run failure leaves a
  consistent, resumable state
- docs: rename n8n-hourly-s3-full-data-exports.md -> n8n-s3-ticket-exports.md
  and rewrite it for the incremental stream; fix the reference in
  docs/phase-1-ingestion.md

Verified live against prod: re-seeded baseline + 5 deltas (26,529 rows),
files archived to processed/, watermark advanced, re-run is a no-op.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-23 14:37:17 +03:00

5.3 KiB
Raw Blame History

PRD (Phase 1) — INC hourly CSV ingestion → tracksolid_db → FleetOps Tickets map

Status: complete and deployed (migrations 0108, boto3 loader, geocoding, Coolify hourly 15 7-19 * * * EAT). This document is the record of the Phase-1 plan; see README.md and docs/implementation.md for the as-built state.

Scope: INC only

This workflow is strictly for INC (incident / customer-fault tickets). It ingests only automations/inc/<EAT-timestamp>.csv. CRQ (new-installation) exports at automations/crq/ are out of scope and are not processed here; the field transforms below are likewise INC-only.

Context

The client (Rahamafresh / Fireside) runs an n8n workflow that exports field-ops tickets to our S3-compatible bucket every hour:

  • automations/inc/<EAT-timestamp>.csvincidents / customer faults (in scope)
  • automations/crq/<EAT-timestamp>.csv — new-installation requests (out of scope)

(See n8n-s3-ticket-exports.md. Sample: 2026-06-15T17-00-00.csv. Note: the source later switched to an incremental automations/inc/changes/ stream — that doc has the current layout; this PRD records the original Phase-1 model.)

fleettickets owns the downstream: the tickets schema in the shared tracksolid_db (raw-jsonb-first tickets.inc, geocoding gazetteers, and reporting.fn_tickets_for_map, which dashboard_api serves to the FleetOps "Tickets" tab). tickets.crq keeps existing but is not fed by this pipeline.

The problem: the loader was written for the old export model — JSON {metadata, records} envelopes at a stable automations/inc/latest.json. That model is gone; the new exports are flat CSV, timestamped per hour, with no latest pointer, no envelope, and no deltas — every hourly file is a full current-state snapshot.

Two driving objectives this pipeline feeds:

  1. SLA tracking — contract requires tickets closed within 48h of created_at_service; closed carry source sla_status + mttr, open need a derived state (now created_at_service ≥48h breached / ≥36h at-risk).
  2. Vehicle routing (most important) — accurately geocoded open tickets so FleetNow can route nearest vehicles; subsequent: team closure attribution.

Data contract (verified against live snapshots)

  • 32 columns; header + double-quoted values. INC sample = 31,434 rows.
  • ticket_id is the primary key; the same ticket recurs across snapshots as it moves open → closed. Verified: 31,434 distinct ids per file, 0 in-file dups, same id set every hour (0 added/dropped) → upsert is the dedup mechanism, no TRUNCATE. Consecutive files are often byte-identical → skip-if-unchanged.
  • is_alarm=true (~10,132 rows, all is_actionable=false) → dropped.
  • latitude/longitude are empty in the feed → geocoding required.
  • A garbage sentinel row (ticket_id = "EXPORT STOPPED DUE TO EXCESSIVE SIZE…") is commonly the first data line → filtered by ticket_id prefix.
  • Timestamps (filenames + data) are EAT (Africa/Nairobi, UTC+3).
  • bucket is meaningful (closed/pending), distinct from source_s3_bucket.

Approach

Keep the raw-jsonb-first model and everything downstream; only the loader's input path changes: JSON-latestnewest timestamped CSV, plus move-on-success.

  • Newest file per automations/inc/ (parse YYYY-MM-DDTHH-mm-ss.csv), via boto3 (path-style; no aws-CLI dependency).
  • Skip-if-unchanged: compare newest S3 ETag to the last processed ETag (tickets.import_meta.metadata.source_etag); equal → skip DB write.
  • Cleaning at ingest: drop is_alarm=true + sentinel; drop week_start, week_end, source_s3_bucket, source_s3_key, source_snapshot_id, department, source_type; normalize region→lowercase, raw_status→UPPERCASE; keep service_type* and bucket. (*service_type later dropped as constant.)
  • Upsert on ticket_id (ON CONFLICT DO UPDATE); never delete → closure history accumulates. On success move the file(s) to automations/inc/processed/.
  • Record snapshot freshness in tickets.import_meta.
  • Geocoding unchanged: --geocode-clusters (coarse) + --geocode-locations (precise, actionable INC; keyed LocationIQ; 25 km wrong-city guard).

Orchestration

Deployed on Coolify (own app, Dockerfile, keep-alive worker). Ingest runs as a Scheduled Task: python import_tickets.py --from-bucket --apply, cron 15 7-19 * * * in EAT. Env: DATABASE_URL, RUSTFS_*, GEOCODER_*.

Data-quality findings (carried into Phase 2)

  • Source sla_status ≠ a plain 48h rule, and mttr is not wall-clock — pin the contract's SLA definition before trusting cross-field SLA math.
  • created_at_service is null on ~30% of rows (incl. most open) → needs a fallback clock (first_seen_at).
  • Split timestamp semantics: lifecycle = created_at_serviceclosed_at; export bookkeeping = created_at/updated_at/first_seen_at/last_seen_at.
  • assigned_team missing ~34% (owner better).
  • Content lag ~2 days (underlying …wm_task.xlsx source date).

Outcome (as built)

Live in tracksolid_db: tickets.inc (raw + typed generated columns), geocoded to ~99.99%, alarm/sentinel removed, hourly refresh with ETag skip + archive. See docs/implementation.md.