Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed from import_tickets.py) parameterized by a Dataset config, with thin per-type entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical 32-column source schema and CDC change stream, so the engine is fully shared. - pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget). - crq/import_crq.py: drains automations/crq/changes/ from isptickets into tickets.crq (data layer + map; SLA/dashboard/history deferred). - migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated columns + indexes on tickets.crq (reuses tickets.eat_ts()). - Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject packages inc/crq. Docs (README, implementation, deployment-and-operations, n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook. tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the existing Tickets map once seeded. Verified locally: ruff-clean new files, engine lists/parses both streams against live S3 (crq=52 files, inc unaffected). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
5.6 KiB
PRD (Phase 1) — INC hourly CSV ingestion → tracksolid_db → FleetOps Tickets map
Status: complete and deployed (migrations 01–08, boto3 loader, geocoding, Coolify hourly
15 7-19 * * *EAT). This document is the record of the Phase-1 plan; seeREADME.mdanddocs/implementation.mdfor the as-built state.
Scope: INC only
This workflow is strictly for INC (incident / customer-fault tickets). It
ingests only automations/inc/<EAT-timestamp>.csv. CRQ (new-installation)
exports at automations/crq/ are out of scope and are not processed here; the
field transforms below are likewise INC-only.
Context
The client (Rahamafresh / Fireside) runs an n8n workflow that exports field-ops tickets to our S3-compatible bucket every hour:
automations/inc/<EAT-timestamp>.csv— incidents / customer faults (in scope)automations/crq/<EAT-timestamp>.csv— new-installation requests (out of scope)
(See n8n-s3-ticket-exports.md. Sample: 2026-06-15T17-00-00.csv. Note: the
source later switched to an incremental automations/inc/changes/ stream — that
doc has the current layout; this PRD records the original Phase-1 model.)
fleettickets owns the downstream: the tickets schema in the shared
tracksolid_db (raw-jsonb-first tickets.inc, geocoding gazetteers, and
reporting.fn_tickets_for_map, which dashboard_api serves to the FleetOps
"Tickets" tab). tickets.crq keeps existing but is not fed by this pipeline.
The problem: the loader was written for the old export model — JSON
{metadata, records} envelopes at a stable automations/inc/latest.json. That
model is gone; the new exports are flat CSV, timestamped per hour, with no
latest pointer, no envelope, and no deltas — every hourly file is a full
current-state snapshot.
Two driving objectives this pipeline feeds:
- SLA tracking — contract requires tickets closed within 48h of
created_at_service; closed carry sourcesla_status+mttr, open need a derived state (now − created_at_service≥48h breached / ≥36h at-risk). - Vehicle routing (most important) — accurately geocoded open tickets so FleetNow can route nearest vehicles; subsequent: team closure attribution.
Data contract (verified against live snapshots)
- 32 columns; header + double-quoted values. INC sample = 31,434 rows.
ticket_idis the primary key; the same ticket recurs across snapshots as it movesopen → closed. Verified: 31,434 distinct ids per file, 0 in-file dups, same id set every hour (0 added/dropped) → upsert is the dedup mechanism, no TRUNCATE. Consecutive files are often byte-identical → skip-if-unchanged.is_alarm=true(~10,132 rows, allis_actionable=false) → dropped.latitude/longitudeare empty in the feed → geocoding required.- A garbage sentinel row (
ticket_id = "EXPORT STOPPED DUE TO EXCESSIVE SIZE…") is commonly the first data line → filtered byticket_idprefix. - Timestamps (filenames + data) are EAT (Africa/Nairobi, UTC+3).
bucketis meaningful (closed/pending), distinct fromsource_s3_bucket.
Approach
Keep the raw-jsonb-first model and everything downstream; only the loader's
input path changes: JSON-latest → newest timestamped CSV, plus move-on-success.
- Newest file per
automations/inc/(parseYYYY-MM-DDTHH-mm-ss.csv), via boto3 (path-style; no aws-CLI dependency). - Skip-if-unchanged: compare newest S3 ETag to the last processed ETag
(
tickets.import_meta.metadata.source_etag); equal → skip DB write. - Cleaning at ingest: drop
is_alarm=true+ sentinel; dropweek_start,week_end,source_s3_bucket,source_s3_key,source_snapshot_id,department,source_type; normalizeregion→lowercase,raw_status→UPPERCASE; keepservice_type* andbucket. (*service_typelater dropped as constant.) - Upsert on
ticket_id(ON CONFLICT DO UPDATE); never delete → closure history accumulates. On success move the file(s) toautomations/inc/processed/. - Record snapshot freshness in
tickets.import_meta. - Geocoding unchanged:
--geocode-clusters(coarse) +--geocode-locations(precise, actionable INC; keyed LocationIQ; 25 km wrong-city guard).
Orchestration
Deployed on Coolify (own app, Dockerfile, keep-alive worker). Ingest runs as a
Scheduled Task: python import_tickets.py --from-bucket --apply, cron
15 7-19 * * * in EAT. Env: DATABASE_URL, RUSTFS_*, GEOCODER_*.
Superseded (historical Phase-1 plan). As built: the loader is now the shared
pipeline.pyengine with thin entrypoints (python -m inc.import_inc/-m crq.import_crq), running as two Scheduled Tasks at cron*/20 6-20 * * *. Seeimplementation.mdanddeployment-and-operations.md.
Data-quality findings (carried into Phase 2)
- Source
sla_status≠ a plain 48h rule, andmttris not wall-clock — pin the contract's SLA definition before trusting cross-field SLA math. created_at_serviceis null on ~30% of rows (incl. most open) → needs a fallback clock (first_seen_at).- Split timestamp semantics: lifecycle =
created_at_service→closed_at; export bookkeeping =created_at/updated_at/first_seen_at/last_seen_at. assigned_teammissing ~34% (ownerbetter).- Content lag ~2 days (underlying
…wm_task.xlsxsource date).
Outcome (as built)
Live in tracksolid_db: tickets.inc (raw + typed generated columns), geocoded to
~99.99%, alarm/sentinel removed, hourly refresh with ETag skip + archive. See
docs/implementation.md.