Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed from import_tickets.py) parameterized by a Dataset config, with thin per-type entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical 32-column source schema and CDC change stream, so the engine is fully shared. - pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget). - crq/import_crq.py: drains automations/crq/changes/ from isptickets into tickets.crq (data layer + map; SLA/dashboard/history deferred). - migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated columns + indexes on tickets.crq (reuses tickets.eat_ts()). - Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject packages inc/crq. Docs (README, implementation, deployment-and-operations, n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook. tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the existing Tickets map once seeded. Verified locally: ruff-clean new files, engine lists/parses both streams against live S3 (crq=52 files, inc unaffected). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
130 lines
6.2 KiB
Markdown
130 lines
6.2 KiB
Markdown
# n8n S3 Ticket Exports — Incremental (CDC) Stream
|
|
|
|
Updated on June 23, 2026.
|
|
|
|
> **History.** This doc previously described a full-snapshot-per-hour model
|
|
> ("No delta files … No `changes/` directories"). That is no longer accurate.
|
|
> As of the June 22, 2026 re-seed the source switched to an **incremental
|
|
> change-data-capture (CDC) stream** under `automations/<dataset>/changes/`.
|
|
> The structure below was verified by direct S3 inspection of the `tickets`
|
|
> bucket on June 23, 2026. Workflow-internal details (IDs, node behaviour) are
|
|
> carried over from the prior version and may be stale — trust the bucket.
|
|
|
|
## Overview
|
|
|
|
The FTTH ticket export now writes an **incremental** CSV stream per dataset:
|
|
|
|
- The **first** file in a stream is a full current-state **baseline**.
|
|
- Every **later** file holds **only the rows that changed** since the prior
|
|
export — new and updated tickets, keyed by `ticket_id`.
|
|
- **Deletions are never emitted** (tickets are closed in place, not removed).
|
|
|
|
Consumers must ingest **every not-yet-processed file in ascending timestamp
|
|
order** (baseline first, then each delta) and **upsert on `ticket_id`**. Taking
|
|
only the newest file silently drops the intermediate deltas.
|
|
|
|
CSV files only. Filenames use the execution time in the `Africa/Nairobi`
|
|
timezone (format below). All files share one identical flat-CSV schema (header
|
|
+ rows) — the same column set the previous full snapshots used.
|
|
|
|
## Output Layout
|
|
|
|
The change stream lives under a `changes/` prefix per dataset in the `tickets`
|
|
bucket:
|
|
|
|
```text
|
|
automations/crq/changes/YYYY-MM-DDTHH-mm-ss.csv
|
|
automations/inc/changes/YYYY-MM-DDTHH-mm-ss.csv
|
|
```
|
|
|
|
Observed `tickets` bucket layout (June 23, 2026):
|
|
|
|
```text
|
|
automations/inc/
|
|
├── changes/ ← ACTIVE incremental stream (baseline + deltas)
|
|
│ ├── 2026-06-22T15-50-39.csv (baseline, ~15 MB, 34,849 rows)
|
|
│ ├── 2026-06-22T15-53-04.csv (delta, 1 row)
|
|
│ ├── 2026-06-22T17-10-41.csv (delta, 22 rows)
|
|
│ └── 2026-06-22T17-15-41.csv (delta, 131 rows)
|
|
├── processed/ ← our pipeline's archive of consumed files
|
|
├── full/ ← present but EMPTY (legacy prefix)
|
|
├── latest.csv/ ← present but EMPTY (legacy prefix)
|
|
└── latest.json/ ← present but EMPTY (legacy prefix)
|
|
```
|
|
|
|
Notes:
|
|
|
|
- There are **no longer any `automations/inc/<ts>.csv` files at the root** of
|
|
`inc/` — the last full snapshots (through `2026-06-18T17-00-05.csv`) were
|
|
archived to `processed/`. New data arrives **only** under `changes/`.
|
|
- The `full/`, `latest.csv/`, and `latest.json/` prefixes still appear in
|
|
listings but contain **no objects** (leftover/legacy; ignore them). There is
|
|
no `latest` pointer and no JSON/metadata envelope.
|
|
- **CRQ mirrors INC**: `automations/crq/changes/` carries the same incremental
|
|
stream (with matching baseline timestamps and additional newer deltas) and the
|
|
identical 32-column schema. As of 2026-06-25 CRQ **is consumed** — by
|
|
`crq/import_crq.py` over the shared `pipeline.py` engine (`tickets.crq`), the same
|
|
way INC is consumed by `inc/import_inc.py`. CRQ's old root snapshots
|
|
(`automations/crq/<ts>.csv`, old `tickets` bucket) are still present because nothing
|
|
archives them — they are not consumed (the `changes/` stream is the source of truth).
|
|
|
|
## CSV Schema
|
|
|
|
Header (32 columns), identical across baseline and delta files:
|
|
|
|
```text
|
|
ticket_id, source_type, service_type, bucket, raw_status, normalized_status,
|
|
created_at_service, scheduled_at, closed_at, last_seen_at, first_seen_at,
|
|
week_start, week_end, cluster, region, location_name, latitude, longitude,
|
|
department, assigned_team, owner, sla_status, mttr, is_auto_created,
|
|
is_auto_closed, is_alarm, is_actionable, source_s3_bucket, source_s3_key,
|
|
source_snapshot_id, created_at, updated_at
|
|
```
|
|
|
|
Each row is a complete record (not a partial diff): a delta row carries the
|
|
ticket's full current state, so a plain upsert on `ticket_id` is correct. The
|
|
baseline still contains `is_alarm=true` rows and may include a leading
|
|
`EXPORT STOPPED…` truncation-sentinel row in `ticket_id`; both are filtered by
|
|
the consumer (see `pipeline.py` + the `inc/`,`crq/` entrypoints).
|
|
|
|
## Timestamp Format
|
|
|
|
```text
|
|
YYYY-MM-DDTHH-mm-ss e.g. 2026-06-22T15-50-39
|
|
```
|
|
|
|
Generated once at the start of each execution, formatted in `Africa/Nairobi`
|
|
(EAT). Note this is the *execution* time, not a top-of-the-hour schedule — the
|
|
incremental files appear whenever a change batch is exported (multiple within
|
|
the same hour are normal, e.g. `15-50-39` then `15-53-04`).
|
|
|
|
## How the loader Consumes It
|
|
|
|
`python -m inc.import_inc --from-bucket --apply` (and `python -m crq.import_crq
|
|
--from-bucket --apply`; see `run_ingest.sh`) — both over the shared `pipeline.py` engine:
|
|
|
|
1. Lists `automations/<inc|crq>/changes/<ts>.csv`, sorts ascending by timestamp.
|
|
2. Skips files at/older than the **watermark**
|
|
(`tickets.import_meta.metadata->>'source_max_key'` — the newest file already
|
|
applied); on a fresh stream it processes everything present.
|
|
3. For each pending file, oldest→newest: drop `is_alarm=true` + sentinel rows,
|
|
strip `DROP_FIELDS`, normalize `region`/`raw_status`, then upsert on
|
|
`ticket_id`. The row upsert and the watermark advance **commit in one
|
|
transaction per file**, after which the file is moved to
|
|
`automations/inc/processed/`.
|
|
4. A mid-run failure therefore leaves folder state consistent with the
|
|
watermark; the next run resumes cleanly from where it stopped.
|
|
|
|
The first file applied onto an empty watermark is recorded as
|
|
`export_type="baseline"` in `tickets.import_meta`; every file after is `"delta"`.
|
|
|
|
## Workflow Context (carried over — verify before relying on)
|
|
|
|
The export originates from the FTTH Automation Ticket export workflow, calling
|
|
the authenticated Scoreboard export endpoint and uploading CSV(s) to the
|
|
`tickets` bucket; a sibling workflow exports `fuel_records/<ts>.csv` to the
|
|
`fuel` bucket. Source DB queries are read-only and the workflows do not delete
|
|
or update source rows. The previously documented workflow IDs and the claim of
|
|
"two files per hour, full snapshots, no `changes/`" predate the June 22 switch
|
|
to the incremental stream and should be re-confirmed against the live n8n
|
|
configuration before being treated as current.
|