fix(inc): ingest the incremental changes/ stream (baseline + deltas)

The S3 source switched from full hourly snapshots at
automations/inc/<ts>.csv to an incremental CDC stream at
automations/inc/changes/<ts>.csv (first file = full baseline, each later
file = only the rows that changed, keyed by ticket_id; no deletions).

The loader still pointed at the old root path and only ingested the single
newest file, so after the switch it found nothing (no new tickets ingested)
and, even with the path fixed, would silently drop intermediate deltas.

Changes:
- point ingestion at automations/inc/changes/ (_CHANGE_KEY_RE)
- ingest EVERY not-yet-processed file in ascending timestamp order
  (baseline first, then each delta), upserting each
- replace the single-ETag skip with a per-file timestamp watermark
  (import_meta.metadata->>'source_max_key'); rows + watermark commit in one
  txn per file, then archive to processed/ — so a mid-run failure leaves a
  consistent, resumable state
- docs: rename n8n-hourly-s3-full-data-exports.md -> n8n-s3-ticket-exports.md
  and rewrite it for the incremental stream; fix the reference in
  docs/phase-1-ingestion.md

Verified live against prod: re-seeded baseline + 5 deltas (26,529 rows),
files archived to processed/, watermark advanced, re-run is a no-op.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
david kiania 2026-06-23 14:37:17 +03:00
parent e71c8914f1
commit a4b90a33d8
4 changed files with 198 additions and 201 deletions

View file

@ -19,7 +19,9 @@ tickets to our S3-compatible bucket **every hour**:
- `automations/inc/<EAT-timestamp>.csv`**incidents / customer faults** *(in scope)*
- `automations/crq/<EAT-timestamp>.csv` — new-installation requests *(out of scope)*
(See `n8n-hourly-s3-full-data-exports.md`. Sample: `2026-06-15T17-00-00.csv`.)
(See `n8n-s3-ticket-exports.md`. Sample: `2026-06-15T17-00-00.csv`. Note: the
source later switched to an incremental `automations/inc/changes/` stream — that
doc has the current layout; this PRD records the original Phase-1 model.)
`fleettickets` owns the **downstream**: the `tickets` schema in the shared
`tracksolid_db` (raw-jsonb-first `tickets.inc`, geocoding gazetteers, and

View file

@ -13,19 +13,21 @@ Everything downstream reads from `raw` (resilient to source schema drift). The D
derives `geom` (see migrations): feed coords (raw lat/lng) -> location geocode
(tickets.geo_locations) -> cluster centroid (tickets.geo_clusters) -> none.
Source data: the n8n hourly S3 export (see n8n-hourly-s3-full-data-exports.md)
writes a full current-state snapshot CSV per hour to the `tickets` bucket at
automations/inc/<EAT-timestamp>.csv (e.g. 2026-06-15T17-00-00.csv)
There is NO latest pointer, NO metadata envelope, and NO deltas each file is a
flat CSV (header + rows). We ingest the NEWEST file:
- skip-if-unchanged: if its S3 ETag matches the last processed file's ETag we
skip the DB write (the export re-emits byte-identical content most hours);
Source data: the n8n S3 export writes CSV files to the `tickets` bucket under
automations/inc/changes/<EAT-timestamp>.csv (e.g. 2026-06-22T15-50-39.csv)
This is an INCREMENTAL (CDC) stream: the first file is a full current-state
baseline, and every later file holds only the rows that CHANGED since the prior
export (new + updated tickets, keyed by ticket_id; deletions are never emitted).
Every file shares the identical flat-CSV schema (header + rows). We ingest EVERY
not-yet-processed file in ASCENDING timestamp order (baseline first, then each
delta) taking only the newest would silently drop the intermediate deltas:
- drop is_alarm=true rows + the "EXPORT STOPPED…" truncation-sentinel row;
- drop derivable / provenance / zero-info columns (see DROP_FIELDS);
- normalize region -> lowercase, raw_status -> UPPERCASE;
- upsert on ticket_id (PRIMARY KEY no duplication; never delete, so closure
history accumulates), and record snapshot freshness in tickets.import_meta;
- on success, MOVE the file to automations/inc/processed/ (copy + delete).
history accumulates), and advance the watermark in tickets.import_meta
(metadata->>'source_max_key' = newest file applied) so reruns skip what's done;
- on success, MOVE each file to automations/inc/processed/ (copy + delete).
Geocoding (two layers, both via a KEYED provider public Nominatim rate-limits):
--geocode-clusters one coordinate per cluster (coarse fallback; ~50 lookups)
@ -70,7 +72,7 @@ log = get_logger("import_tickets")
_TABLE = "tickets.inc"
_DATASET = "inc"
_BUCKET = os.getenv("TICKETS_BUCKET", "tickets")
_INC_PREFIX = "automations/inc/"
_INC_PREFIX = "automations/inc/changes/" # the incremental (CDC) change stream
_PROCESSED_PREFIX = "automations/inc/processed/"
_EAT = timezone(timedelta(hours=3)) # Africa/Nairobi — filenames + data are EAT
@ -88,9 +90,10 @@ DROP_FIELDS = frozenset({
"department", "source_type",
})
# Only files matching automations/inc/<EAT-timestamp>.csv (NOT processed/, NOT the
# leftover latest.csv/, latest.json/, full/ prefixes).
_CSV_KEY_RE = re.compile(r"^automations/inc/(\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2})\.csv$")
# Only files matching automations/inc/changes/<EAT-timestamp>.csv — the incremental
# stream (NOT processed/, NOT the leftover latest.csv/, latest.json/, full/ prefixes).
_CHANGE_KEY_RE = re.compile(
r"^automations/inc/changes/(\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2})\.csv$")
# Geocoder (keyed) — public Nominatim rate-limits bulk, so we use LocationIQ/OpenCage.
_PROVIDER = os.getenv("GEOCODER_PROVIDER", "locationiq").lower()
@ -119,8 +122,8 @@ def _s3_client():
def _ts_from_key(key: str) -> datetime | None:
"""EAT timestamp embedded in an automations/inc/<ts>.csv key (or None)."""
m = _CSV_KEY_RE.match(key)
"""EAT timestamp embedded in an automations/inc/changes/<ts>.csv key (or None)."""
m = _CHANGE_KEY_RE.match(key)
if not m:
return None
try: # regex shape can match an impossible date (e.g. 9999-99-99T…) — don't crash the sort
@ -130,11 +133,11 @@ def _ts_from_key(key: str) -> datetime | None:
def _list_inc_csvs(s3) -> list[tuple[str, str]]:
"""[(key, etag)] for every automations/inc/<ts>.csv (excludes processed/ + dirs)."""
"""[(key, etag)] for every automations/inc/changes/<ts>.csv (excludes processed/ + dirs)."""
out: list[tuple[str, str]] = []
for page in s3.get_paginator("list_objects_v2").paginate(Bucket=_BUCKET, Prefix=_INC_PREFIX):
for it in page.get("Contents", []):
if _CSV_KEY_RE.match(it["Key"]):
if _CHANGE_KEY_RE.match(it["Key"]):
out.append((it["Key"], (it.get("ETag") or "").strip('"')))
return out
@ -144,16 +147,23 @@ def _get_text(s3, key: str) -> str:
return s3.get_object(Bucket=_BUCKET, Key=key)["Body"].read().decode("utf-8")
def _last_processed_etag() -> str | None:
"""ETag of the most recently ingested INC file (from tickets.import_meta)."""
def _last_processed_ts() -> datetime | None:
"""Watermark: EAT timestamp of the newest change file already ingested.
Read from tickets.import_meta (metadata->>'source_max_key', advanced per file as
we drain changes/ oldestnewest). None when nothing has been ingested via the
changes stream yet (e.g. the first run after the source switched to incremental,
where the stored key is an old full-snapshot path) then every file currently in
changes/ is processed.
"""
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT metadata->>'source_etag' FROM tickets.import_meta WHERE dataset = %s",
"SELECT metadata->>'source_max_key' FROM tickets.import_meta WHERE dataset = %s",
(_DATASET,),
)
row = cur.fetchone()
return row[0] if row else None
return _ts_from_key(row[0]) if row and row[0] else None
def _parse_csv(text: str) -> list[dict]:
@ -263,40 +273,54 @@ def ingest(args) -> None:
upsert(rows, args.apply, meta=meta)
return
# --from-bucket: newest INC csv → skip-if-unchanged → ingest → archive.
# --from-bucket: ingest EVERY not-yet-processed change file, oldest→newest
# (baseline first, then each delta), upserting each. The watermark advances and
# the file is archived PER file, so a mid-run failure leaves a consistent state
# (folder state matches the watermark) and the next run resumes cleanly.
s3 = _s3_client()
listing = _list_inc_csvs(s3)
if not listing:
log.info("no INC csv files under %s — nothing to do", _INC_PREFIX)
log.info("no INC change files under %s — nothing to do", _INC_PREFIX)
return
listing.sort(key=lambda ke: _ts_from_key(ke[0]) or datetime.min.replace(tzinfo=_EAT))
all_keys = [k for k, _ in listing]
newest_key, newest_etag = listing[-1]
log.info("newest INC file: %s (etag=%s; %d file(s) present)",
newest_key, newest_etag, len(listing))
last_etag = _last_processed_etag()
if newest_etag and newest_etag == last_etag:
log.info("etag unchanged from last processed (%s) — skipping DB write", last_etag)
# watermark: skip anything at/older than the newest file already applied. Archiving
# normally empties changes/, but this guards a failed archive from re-applying.
last_ts = _last_processed_ts()
_floor = datetime.min.replace(tzinfo=_EAT)
pending = [(k, e) for k, e in listing
if last_ts is None or (_ts_from_key(k) or _floor) > last_ts]
if not pending:
log.info("all %d change file(s) already processed (watermark %s) — nothing new",
len(listing), last_ts and last_ts.isoformat())
if args.apply:
_move_processed(s3, all_keys)
_capture_history() # still record today's snapshot even when unchanged
else:
log.info("DRY-RUN — would archive %d file(s) to %s", len(all_keys), _PROCESSED_PREFIX)
_move_processed(s3, [k for k, _ in listing]) # archive any stragglers
_capture_history()
return
log.info("%d of %d change file(s) to ingest (watermark %s); newest=%s",
len(pending), len(listing), last_ts and last_ts.isoformat(), pending[-1][0])
rows = _parse_csv(_get_text(s3, newest_key))
ts = _ts_from_key(newest_key)
meta = {"export_type": "full", "source_s3_key": newest_key,
"source_etag": newest_etag, "row_count": len(rows)}
total = 0
for i, (key, etag) in enumerate(pending):
rows = _parse_csv(_get_text(s3, key))
ts = _ts_from_key(key)
# the first file applied onto an empty watermark is the full baseline; every
# file after is a delta. export_type is informational (recorded in import_meta).
meta = {"export_type": "baseline" if (last_ts is None and i == 0) else "delta",
"source_s3_key": key, "source_etag": etag,
"source_max_key": key, "row_count": len(rows)}
if ts:
meta["exported_at"] = ts.isoformat()
upsert(rows, args.apply, meta=meta)
# rows + watermark (source_max_key) commit in one txn, advancing per file; only
# then archive, so the changes/ folder state always matches the watermark.
total += upsert(rows, args.apply, meta=meta)
if args.apply:
_move_processed(s3, all_keys)
_capture_history()
_move_processed(s3, [key])
else:
log.info("DRY-RUN — would archive %d file(s) to %s", len(all_keys), _PROCESSED_PREFIX)
log.info("DRY-RUN — would archive %s to %s", key, _PROCESSED_PREFIX)
log.info("ingested %d change file(s); %d rows kept in total", len(pending), total)
if args.apply:
_capture_history()
# ── place extraction (strip network codes, keep the real place) ───────────────

View file

@ -1,156 +0,0 @@
# n8n Hourly S3 Full-Data Exports
Updated on June 15, 2026.
## Overview
Two active n8n workflows export complete datasets to S3 every hour:
1. `FTTH Automation Ticket S3 Export`
2. `Fuel Records S3 Export`
Each execution creates CSV files only. Filenames use the actual execution time
in the `Africa/Nairobi` timezone.
No delta files, JSON files, `latest` files, `changes/` directories, `full/`
directories, or midnight-specific exports are created.
## Hourly Output
Together, the two workflows create exactly three files during their hourly
executions:
```text
automations/crq/YYYY-MM-DDTHH-mm-ss.csv
automations/inc/YYYY-MM-DDTHH-mm-ss.csv
fuel_records/YYYY-MM-DDTHH-mm-ss.csv
```
The CRQ and INC files are uploaded to the `tickets` bucket. The Fuel file is
uploaded to the `fuel` bucket.
## FTTH Automation Ticket S3 Export
Workflow ID: `JI3QkcJeHk9eYRsY`
The workflow:
1. Runs at the start of every hour using the `Africa/Nairobi` workflow timezone.
2. Creates one execution timestamp.
3. Calls the existing authenticated Scoreboard export endpoint with
`export_type: full`.
4. Reads all CRQ and INC rows returned by the endpoint.
5. Converts each complete dataset to CSV.
6. Uploads exactly two files:
- `automations/crq/<execution-timestamp>.csv`
- `automations/inc/<execution-timestamp>.csv`
7. Fails the execution if exactly two successful upload results are not
returned.
The workflow still has its existing manual webhook for operational testing.
## Fuel Records S3 Export
Workflow ID: `IP2KNAfFazAjTesh`
The workflow:
1. Runs at the start of every hour using the `Africa/Nairobi` workflow timezone.
2. Creates one execution timestamp.
3. Reads the complete `logistics_department.fuel_records` table.
4. Converts all returned rows to one CSV.
5. Uploads exactly one file:
- `fuel_records/<execution-timestamp>.csv`
6. Fails the execution if the S3 upload reports an error.
The workflow still has its existing manual webhook for operational testing.
## Timestamp Format
The timestamp format is:
```text
YYYY-MM-DDTHH-mm-ss
```
Example:
```text
2026-06-15T14-39-53
```
The timestamp is generated once at the start of each workflow execution and is
formatted in `Africa/Nairobi`.
## Credentials and Safety
- Existing n8n PostgreSQL, S3, workflow-variable, and API token configuration is
reused.
- No S3 credentials or API secrets are hard-coded in workflow code.
- Secrets are not included in workflow result messages.
- Source database queries are read-only.
- The workflows do not delete or update source database rows.
- S3 upload nodes retain retry handling. A failed hourly execution can also be
recovered naturally by the next full-data run.
## Removed Behavior
The workflows no longer contain:
- Delta export logic or stored delta pointers
- Midnight full-export schedules
- `latest.json` or `latest.csv`
- JSON output
- `changes/` keys
- `full/` keys
- Multipart or additional export files
- FTTH mark-sent state handling
## Deployment Status
Both workflows were saved, published, and activated on June 15, 2026.
Active versions:
```text
Fuel Records S3 Export:
60cf5824-9345-45bb-a2eb-3b20b877fd32
FTTH Automation Ticket S3 Export:
68b7be10-ac3a-43d8-8c17-b46a2cbb48d2
```
## Manual Test Evidence
### Fuel Records S3 Export
Execution ID: `404079`
Rows exported: `2001`
Exact S3 key:
```text
fuel_records/2026-06-15T14-39-50.csv
```
### FTTH Automation Ticket S3 Export
Execution ID: `404080`
Rows exported:
```text
CRQ: 12680
INC: 31434
```
Exact S3 keys:
```text
automations/crq/2026-06-15T14-39-53.csv
automations/inc/2026-06-15T14-39-53.csv
```
Both manual tests completed successfully. Their upload builders generated one
Fuel item and exactly two FTTH items, matching the required three output files.

127
n8n-s3-ticket-exports.md Normal file
View file

@ -0,0 +1,127 @@
# n8n S3 Ticket Exports — Incremental (CDC) Stream
Updated on June 23, 2026.
> **History.** This doc previously described a full-snapshot-per-hour model
> ("No delta files … No `changes/` directories"). That is no longer accurate.
> As of the June 22, 2026 re-seed the source switched to an **incremental
> change-data-capture (CDC) stream** under `automations/<dataset>/changes/`.
> The structure below was verified by direct S3 inspection of the `tickets`
> bucket on June 23, 2026. Workflow-internal details (IDs, node behaviour) are
> carried over from the prior version and may be stale — trust the bucket.
## Overview
The FTTH ticket export now writes an **incremental** CSV stream per dataset:
- The **first** file in a stream is a full current-state **baseline**.
- Every **later** file holds **only the rows that changed** since the prior
export — new and updated tickets, keyed by `ticket_id`.
- **Deletions are never emitted** (tickets are closed in place, not removed).
Consumers must ingest **every not-yet-processed file in ascending timestamp
order** (baseline first, then each delta) and **upsert on `ticket_id`**. Taking
only the newest file silently drops the intermediate deltas.
CSV files only. Filenames use the execution time in the `Africa/Nairobi`
timezone (format below). All files share one identical flat-CSV schema (header
+ rows) — the same column set the previous full snapshots used.
## Output Layout
The change stream lives under a `changes/` prefix per dataset in the `tickets`
bucket:
```text
automations/crq/changes/YYYY-MM-DDTHH-mm-ss.csv
automations/inc/changes/YYYY-MM-DDTHH-mm-ss.csv
```
Observed `tickets` bucket layout (June 23, 2026):
```text
automations/inc/
├── changes/ ← ACTIVE incremental stream (baseline + deltas)
│ ├── 2026-06-22T15-50-39.csv (baseline, ~15 MB, 34,849 rows)
│ ├── 2026-06-22T15-53-04.csv (delta, 1 row)
│ ├── 2026-06-22T17-10-41.csv (delta, 22 rows)
│ └── 2026-06-22T17-15-41.csv (delta, 131 rows)
├── processed/ ← our pipeline's archive of consumed files
├── full/ ← present but EMPTY (legacy prefix)
├── latest.csv/ ← present but EMPTY (legacy prefix)
└── latest.json/ ← present but EMPTY (legacy prefix)
```
Notes:
- There are **no longer any `automations/inc/<ts>.csv` files at the root** of
`inc/` — the last full snapshots (through `2026-06-18T17-00-05.csv`) were
archived to `processed/`. New data arrives **only** under `changes/`.
- The `full/`, `latest.csv/`, and `latest.json/` prefixes still appear in
listings but contain **no objects** (leftover/legacy; ignore them). There is
no `latest` pointer and no JSON/metadata envelope.
- **CRQ mirrors INC**: `automations/crq/changes/` carries the same incremental
stream (with matching baseline timestamps and additional newer deltas). CRQ
remains out of scope for `import_tickets.py` (INC-only), but the source-side
shape is the same. CRQ's old root snapshots (`automations/crq/<ts>.csv`) are
still present because nothing archives them — they are not consumed.
## CSV Schema
Header (32 columns), identical across baseline and delta files:
```text
ticket_id, source_type, service_type, bucket, raw_status, normalized_status,
created_at_service, scheduled_at, closed_at, last_seen_at, first_seen_at,
week_start, week_end, cluster, region, location_name, latitude, longitude,
department, assigned_team, owner, sla_status, mttr, is_auto_created,
is_auto_closed, is_alarm, is_actionable, source_s3_bucket, source_s3_key,
source_snapshot_id, created_at, updated_at
```
Each row is a complete record (not a partial diff): a delta row carries the
ticket's full current state, so a plain upsert on `ticket_id` is correct. The
baseline still contains `is_alarm=true` rows and may include a leading
`EXPORT STOPPED…` truncation-sentinel row in `ticket_id`; both are filtered by
the consumer (see `import_tickets.py`).
## Timestamp Format
```text
YYYY-MM-DDTHH-mm-ss e.g. 2026-06-22T15-50-39
```
Generated once at the start of each execution, formatted in `Africa/Nairobi`
(EAT). Note this is the *execution* time, not a top-of-the-hour schedule — the
incremental files appear whenever a change batch is exported (multiple within
the same hour are normal, e.g. `15-50-39` then `15-53-04`).
## How `import_tickets.py` Consumes It
`python import_tickets.py --from-bucket --apply` (see `run_ingest.sh`):
1. Lists `automations/inc/changes/<ts>.csv`, sorts ascending by timestamp.
2. Skips files at/older than the **watermark**
(`tickets.import_meta.metadata->>'source_max_key'` — the newest file already
applied); on a fresh stream it processes everything present.
3. For each pending file, oldest→newest: drop `is_alarm=true` + sentinel rows,
strip `DROP_FIELDS`, normalize `region`/`raw_status`, then upsert on
`ticket_id`. The row upsert and the watermark advance **commit in one
transaction per file**, after which the file is moved to
`automations/inc/processed/`.
4. A mid-run failure therefore leaves folder state consistent with the
watermark; the next run resumes cleanly from where it stopped.
The first file applied onto an empty watermark is recorded as
`export_type="baseline"` in `tickets.import_meta`; every file after is `"delta"`.
## Workflow Context (carried over — verify before relying on)
The export originates from the FTTH Automation Ticket export workflow, calling
the authenticated Scoreboard export endpoint and uploading CSV(s) to the
`tickets` bucket; a sibling workflow exports `fuel_records/<ts>.csv` to the
`fuel` bucket. Source DB queries are read-only and the workflows do not delete
or update source rows. The previously documented workflow IDs and the claim of
"two files per hour, full snapshots, no `changes/`" predate the June 22 switch
to the incremental stream and should be re-confirmed against the live n8n
configuration before being treated as current.