From a4b90a33d8b30d2ebda5a009f7e1420ed35a4d42 Mon Sep 17 00:00:00 2001 From: david kiania Date: Tue, 23 Jun 2026 14:37:17 +0300 Subject: [PATCH] fix(inc): ingest the incremental changes/ stream (baseline + deltas) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The S3 source switched from full hourly snapshots at automations/inc/.csv to an incremental CDC stream at automations/inc/changes/.csv (first file = full baseline, each later file = only the rows that changed, keyed by ticket_id; no deletions). The loader still pointed at the old root path and only ingested the single newest file, so after the switch it found nothing (no new tickets ingested) and, even with the path fixed, would silently drop intermediate deltas. Changes: - point ingestion at automations/inc/changes/ (_CHANGE_KEY_RE) - ingest EVERY not-yet-processed file in ascending timestamp order (baseline first, then each delta), upserting each - replace the single-ETag skip with a per-file timestamp watermark (import_meta.metadata->>'source_max_key'); rows + watermark commit in one txn per file, then archive to processed/ — so a mid-run failure leaves a consistent, resumable state - docs: rename n8n-hourly-s3-full-data-exports.md -> n8n-s3-ticket-exports.md and rewrite it for the incremental stream; fix the reference in docs/phase-1-ingestion.md Verified live against prod: re-seeded baseline + 5 deltas (26,529 rows), files archived to processed/, watermark advanced, re-run is a no-op. Co-Authored-By: Claude Opus 4.8 --- docs/phase-1-ingestion.md | 4 +- import_tickets.py | 112 +++++++++++++-------- n8n-hourly-s3-full-data-exports.md | 156 ----------------------------- n8n-s3-ticket-exports.md | 127 +++++++++++++++++++++++ 4 files changed, 198 insertions(+), 201 deletions(-) delete mode 100644 n8n-hourly-s3-full-data-exports.md create mode 100644 n8n-s3-ticket-exports.md diff --git a/docs/phase-1-ingestion.md b/docs/phase-1-ingestion.md index 03f6de9..9cbbb0b 100644 --- a/docs/phase-1-ingestion.md +++ b/docs/phase-1-ingestion.md @@ -19,7 +19,9 @@ tickets to our S3-compatible bucket **every hour**: - `automations/inc/.csv` — **incidents / customer faults** *(in scope)* - `automations/crq/.csv` — new-installation requests *(out of scope)* -(See `n8n-hourly-s3-full-data-exports.md`. Sample: `2026-06-15T17-00-00.csv`.) +(See `n8n-s3-ticket-exports.md`. Sample: `2026-06-15T17-00-00.csv`. Note: the +source later switched to an incremental `automations/inc/changes/` stream — that +doc has the current layout; this PRD records the original Phase-1 model.) `fleettickets` owns the **downstream**: the `tickets` schema in the shared `tracksolid_db` (raw-jsonb-first `tickets.inc`, geocoding gazetteers, and diff --git a/import_tickets.py b/import_tickets.py index 97bbcca..2c4ae3c 100644 --- a/import_tickets.py +++ b/import_tickets.py @@ -13,19 +13,21 @@ Everything downstream reads from `raw` (resilient to source schema drift). The D derives `geom` (see migrations): feed coords (raw lat/lng) -> location geocode (tickets.geo_locations) -> cluster centroid (tickets.geo_clusters) -> none. -Source data: the n8n hourly S3 export (see n8n-hourly-s3-full-data-exports.md) -writes a full current-state snapshot CSV per hour to the `tickets` bucket at - automations/inc/.csv (e.g. 2026-06-15T17-00-00.csv) -There is NO latest pointer, NO metadata envelope, and NO deltas — each file is a -flat CSV (header + rows). We ingest the NEWEST file: - - skip-if-unchanged: if its S3 ETag matches the last processed file's ETag we - skip the DB write (the export re-emits byte-identical content most hours); +Source data: the n8n S3 export writes CSV files to the `tickets` bucket under + automations/inc/changes/.csv (e.g. 2026-06-22T15-50-39.csv) +This is an INCREMENTAL (CDC) stream: the first file is a full current-state +baseline, and every later file holds only the rows that CHANGED since the prior +export (new + updated tickets, keyed by ticket_id; deletions are never emitted). +Every file shares the identical flat-CSV schema (header + rows). We ingest EVERY +not-yet-processed file in ASCENDING timestamp order (baseline first, then each +delta) — taking only the newest would silently drop the intermediate deltas: - drop is_alarm=true rows + the "EXPORT STOPPED…" truncation-sentinel row; - drop derivable / provenance / zero-info columns (see DROP_FIELDS); - normalize region -> lowercase, raw_status -> UPPERCASE; - upsert on ticket_id (PRIMARY KEY → no duplication; never delete, so closure - history accumulates), and record snapshot freshness in tickets.import_meta; - - on success, MOVE the file to automations/inc/processed/ (copy + delete). + history accumulates), and advance the watermark in tickets.import_meta + (metadata->>'source_max_key' = newest file applied) so reruns skip what's done; + - on success, MOVE each file to automations/inc/processed/ (copy + delete). Geocoding (two layers, both via a KEYED provider — public Nominatim rate-limits): --geocode-clusters one coordinate per cluster (coarse fallback; ~50 lookups) @@ -70,7 +72,7 @@ log = get_logger("import_tickets") _TABLE = "tickets.inc" _DATASET = "inc" _BUCKET = os.getenv("TICKETS_BUCKET", "tickets") -_INC_PREFIX = "automations/inc/" +_INC_PREFIX = "automations/inc/changes/" # the incremental (CDC) change stream _PROCESSED_PREFIX = "automations/inc/processed/" _EAT = timezone(timedelta(hours=3)) # Africa/Nairobi — filenames + data are EAT @@ -88,9 +90,10 @@ DROP_FIELDS = frozenset({ "department", "source_type", }) -# Only files matching automations/inc/.csv (NOT processed/, NOT the -# leftover latest.csv/, latest.json/, full/ prefixes). -_CSV_KEY_RE = re.compile(r"^automations/inc/(\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2})\.csv$") +# Only files matching automations/inc/changes/.csv — the incremental +# stream (NOT processed/, NOT the leftover latest.csv/, latest.json/, full/ prefixes). +_CHANGE_KEY_RE = re.compile( + r"^automations/inc/changes/(\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2})\.csv$") # Geocoder (keyed) — public Nominatim rate-limits bulk, so we use LocationIQ/OpenCage. _PROVIDER = os.getenv("GEOCODER_PROVIDER", "locationiq").lower() @@ -119,8 +122,8 @@ def _s3_client(): def _ts_from_key(key: str) -> datetime | None: - """EAT timestamp embedded in an automations/inc/.csv key (or None).""" - m = _CSV_KEY_RE.match(key) + """EAT timestamp embedded in an automations/inc/changes/.csv key (or None).""" + m = _CHANGE_KEY_RE.match(key) if not m: return None try: # regex shape can match an impossible date (e.g. 9999-99-99T…) — don't crash the sort @@ -130,11 +133,11 @@ def _ts_from_key(key: str) -> datetime | None: def _list_inc_csvs(s3) -> list[tuple[str, str]]: - """[(key, etag)] for every automations/inc/.csv (excludes processed/ + dirs).""" + """[(key, etag)] for every automations/inc/changes/.csv (excludes processed/ + dirs).""" out: list[tuple[str, str]] = [] for page in s3.get_paginator("list_objects_v2").paginate(Bucket=_BUCKET, Prefix=_INC_PREFIX): for it in page.get("Contents", []): - if _CSV_KEY_RE.match(it["Key"]): + if _CHANGE_KEY_RE.match(it["Key"]): out.append((it["Key"], (it.get("ETag") or "").strip('"'))) return out @@ -144,16 +147,23 @@ def _get_text(s3, key: str) -> str: return s3.get_object(Bucket=_BUCKET, Key=key)["Body"].read().decode("utf-8") -def _last_processed_etag() -> str | None: - """ETag of the most recently ingested INC file (from tickets.import_meta).""" +def _last_processed_ts() -> datetime | None: + """Watermark: EAT timestamp of the newest change file already ingested. + + Read from tickets.import_meta (metadata->>'source_max_key', advanced per file as + we drain changes/ oldest→newest). None when nothing has been ingested via the + changes stream yet (e.g. the first run after the source switched to incremental, + where the stored key is an old full-snapshot path) — then every file currently in + changes/ is processed. + """ with get_conn() as conn: with conn.cursor() as cur: cur.execute( - "SELECT metadata->>'source_etag' FROM tickets.import_meta WHERE dataset = %s", + "SELECT metadata->>'source_max_key' FROM tickets.import_meta WHERE dataset = %s", (_DATASET,), ) row = cur.fetchone() - return row[0] if row else None + return _ts_from_key(row[0]) if row and row[0] else None def _parse_csv(text: str) -> list[dict]: @@ -263,40 +273,54 @@ def ingest(args) -> None: upsert(rows, args.apply, meta=meta) return - # --from-bucket: newest INC csv → skip-if-unchanged → ingest → archive. + # --from-bucket: ingest EVERY not-yet-processed change file, oldest→newest + # (baseline first, then each delta), upserting each. The watermark advances and + # the file is archived PER file, so a mid-run failure leaves a consistent state + # (folder state matches the watermark) and the next run resumes cleanly. s3 = _s3_client() listing = _list_inc_csvs(s3) if not listing: - log.info("no INC csv files under %s — nothing to do", _INC_PREFIX) + log.info("no INC change files under %s — nothing to do", _INC_PREFIX) return listing.sort(key=lambda ke: _ts_from_key(ke[0]) or datetime.min.replace(tzinfo=_EAT)) - all_keys = [k for k, _ in listing] - newest_key, newest_etag = listing[-1] - log.info("newest INC file: %s (etag=%s; %d file(s) present)", - newest_key, newest_etag, len(listing)) - last_etag = _last_processed_etag() - if newest_etag and newest_etag == last_etag: - log.info("etag unchanged from last processed (%s) — skipping DB write", last_etag) + # watermark: skip anything at/older than the newest file already applied. Archiving + # normally empties changes/, but this guards a failed archive from re-applying. + last_ts = _last_processed_ts() + _floor = datetime.min.replace(tzinfo=_EAT) + pending = [(k, e) for k, e in listing + if last_ts is None or (_ts_from_key(k) or _floor) > last_ts] + if not pending: + log.info("all %d change file(s) already processed (watermark %s) — nothing new", + len(listing), last_ts and last_ts.isoformat()) if args.apply: - _move_processed(s3, all_keys) - _capture_history() # still record today's snapshot even when unchanged - else: - log.info("DRY-RUN — would archive %d file(s) to %s", len(all_keys), _PROCESSED_PREFIX) + _move_processed(s3, [k for k, _ in listing]) # archive any stragglers + _capture_history() return + log.info("%d of %d change file(s) to ingest (watermark %s); newest=%s", + len(pending), len(listing), last_ts and last_ts.isoformat(), pending[-1][0]) - rows = _parse_csv(_get_text(s3, newest_key)) - ts = _ts_from_key(newest_key) - meta = {"export_type": "full", "source_s3_key": newest_key, - "source_etag": newest_etag, "row_count": len(rows)} - if ts: - meta["exported_at"] = ts.isoformat() - upsert(rows, args.apply, meta=meta) + total = 0 + for i, (key, etag) in enumerate(pending): + rows = _parse_csv(_get_text(s3, key)) + ts = _ts_from_key(key) + # the first file applied onto an empty watermark is the full baseline; every + # file after is a delta. export_type is informational (recorded in import_meta). + meta = {"export_type": "baseline" if (last_ts is None and i == 0) else "delta", + "source_s3_key": key, "source_etag": etag, + "source_max_key": key, "row_count": len(rows)} + if ts: + meta["exported_at"] = ts.isoformat() + # rows + watermark (source_max_key) commit in one txn, advancing per file; only + # then archive, so the changes/ folder state always matches the watermark. + total += upsert(rows, args.apply, meta=meta) + if args.apply: + _move_processed(s3, [key]) + else: + log.info("DRY-RUN — would archive %s to %s", key, _PROCESSED_PREFIX) + log.info("ingested %d change file(s); %d rows kept in total", len(pending), total) if args.apply: - _move_processed(s3, all_keys) _capture_history() - else: - log.info("DRY-RUN — would archive %d file(s) to %s", len(all_keys), _PROCESSED_PREFIX) # ── place extraction (strip network codes, keep the real place) ─────────────── diff --git a/n8n-hourly-s3-full-data-exports.md b/n8n-hourly-s3-full-data-exports.md deleted file mode 100644 index 1563162..0000000 --- a/n8n-hourly-s3-full-data-exports.md +++ /dev/null @@ -1,156 +0,0 @@ -# n8n Hourly S3 Full-Data Exports - -Updated on June 15, 2026. - -## Overview - -Two active n8n workflows export complete datasets to S3 every hour: - -1. `FTTH Automation Ticket S3 Export` -2. `Fuel Records S3 Export` - -Each execution creates CSV files only. Filenames use the actual execution time -in the `Africa/Nairobi` timezone. - -No delta files, JSON files, `latest` files, `changes/` directories, `full/` -directories, or midnight-specific exports are created. - -## Hourly Output - -Together, the two workflows create exactly three files during their hourly -executions: - -```text -automations/crq/YYYY-MM-DDTHH-mm-ss.csv -automations/inc/YYYY-MM-DDTHH-mm-ss.csv -fuel_records/YYYY-MM-DDTHH-mm-ss.csv -``` - -The CRQ and INC files are uploaded to the `tickets` bucket. The Fuel file is -uploaded to the `fuel` bucket. - -## FTTH Automation Ticket S3 Export - -Workflow ID: `JI3QkcJeHk9eYRsY` - -The workflow: - -1. Runs at the start of every hour using the `Africa/Nairobi` workflow timezone. -2. Creates one execution timestamp. -3. Calls the existing authenticated Scoreboard export endpoint with - `export_type: full`. -4. Reads all CRQ and INC rows returned by the endpoint. -5. Converts each complete dataset to CSV. -6. Uploads exactly two files: - - `automations/crq/.csv` - - `automations/inc/.csv` -7. Fails the execution if exactly two successful upload results are not - returned. - -The workflow still has its existing manual webhook for operational testing. - -## Fuel Records S3 Export - -Workflow ID: `IP2KNAfFazAjTesh` - -The workflow: - -1. Runs at the start of every hour using the `Africa/Nairobi` workflow timezone. -2. Creates one execution timestamp. -3. Reads the complete `logistics_department.fuel_records` table. -4. Converts all returned rows to one CSV. -5. Uploads exactly one file: - - `fuel_records/.csv` -6. Fails the execution if the S3 upload reports an error. - -The workflow still has its existing manual webhook for operational testing. - -## Timestamp Format - -The timestamp format is: - -```text -YYYY-MM-DDTHH-mm-ss -``` - -Example: - -```text -2026-06-15T14-39-53 -``` - -The timestamp is generated once at the start of each workflow execution and is -formatted in `Africa/Nairobi`. - -## Credentials and Safety - -- Existing n8n PostgreSQL, S3, workflow-variable, and API token configuration is - reused. -- No S3 credentials or API secrets are hard-coded in workflow code. -- Secrets are not included in workflow result messages. -- Source database queries are read-only. -- The workflows do not delete or update source database rows. -- S3 upload nodes retain retry handling. A failed hourly execution can also be - recovered naturally by the next full-data run. - -## Removed Behavior - -The workflows no longer contain: - -- Delta export logic or stored delta pointers -- Midnight full-export schedules -- `latest.json` or `latest.csv` -- JSON output -- `changes/` keys -- `full/` keys -- Multipart or additional export files -- FTTH mark-sent state handling - -## Deployment Status - -Both workflows were saved, published, and activated on June 15, 2026. - -Active versions: - -```text -Fuel Records S3 Export: -60cf5824-9345-45bb-a2eb-3b20b877fd32 - -FTTH Automation Ticket S3 Export: -68b7be10-ac3a-43d8-8c17-b46a2cbb48d2 -``` - -## Manual Test Evidence - -### Fuel Records S3 Export - -Execution ID: `404079` - -Rows exported: `2001` - -Exact S3 key: - -```text -fuel_records/2026-06-15T14-39-50.csv -``` - -### FTTH Automation Ticket S3 Export - -Execution ID: `404080` - -Rows exported: - -```text -CRQ: 12680 -INC: 31434 -``` - -Exact S3 keys: - -```text -automations/crq/2026-06-15T14-39-53.csv -automations/inc/2026-06-15T14-39-53.csv -``` - -Both manual tests completed successfully. Their upload builders generated one -Fuel item and exactly two FTTH items, matching the required three output files. diff --git a/n8n-s3-ticket-exports.md b/n8n-s3-ticket-exports.md new file mode 100644 index 0000000..4084bc6 --- /dev/null +++ b/n8n-s3-ticket-exports.md @@ -0,0 +1,127 @@ +# n8n S3 Ticket Exports — Incremental (CDC) Stream + +Updated on June 23, 2026. + +> **History.** This doc previously described a full-snapshot-per-hour model +> ("No delta files … No `changes/` directories"). That is no longer accurate. +> As of the June 22, 2026 re-seed the source switched to an **incremental +> change-data-capture (CDC) stream** under `automations//changes/`. +> The structure below was verified by direct S3 inspection of the `tickets` +> bucket on June 23, 2026. Workflow-internal details (IDs, node behaviour) are +> carried over from the prior version and may be stale — trust the bucket. + +## Overview + +The FTTH ticket export now writes an **incremental** CSV stream per dataset: + +- The **first** file in a stream is a full current-state **baseline**. +- Every **later** file holds **only the rows that changed** since the prior + export — new and updated tickets, keyed by `ticket_id`. +- **Deletions are never emitted** (tickets are closed in place, not removed). + +Consumers must ingest **every not-yet-processed file in ascending timestamp +order** (baseline first, then each delta) and **upsert on `ticket_id`**. Taking +only the newest file silently drops the intermediate deltas. + +CSV files only. Filenames use the execution time in the `Africa/Nairobi` +timezone (format below). All files share one identical flat-CSV schema (header ++ rows) — the same column set the previous full snapshots used. + +## Output Layout + +The change stream lives under a `changes/` prefix per dataset in the `tickets` +bucket: + +```text +automations/crq/changes/YYYY-MM-DDTHH-mm-ss.csv +automations/inc/changes/YYYY-MM-DDTHH-mm-ss.csv +``` + +Observed `tickets` bucket layout (June 23, 2026): + +```text +automations/inc/ +├── changes/ ← ACTIVE incremental stream (baseline + deltas) +│ ├── 2026-06-22T15-50-39.csv (baseline, ~15 MB, 34,849 rows) +│ ├── 2026-06-22T15-53-04.csv (delta, 1 row) +│ ├── 2026-06-22T17-10-41.csv (delta, 22 rows) +│ └── 2026-06-22T17-15-41.csv (delta, 131 rows) +├── processed/ ← our pipeline's archive of consumed files +├── full/ ← present but EMPTY (legacy prefix) +├── latest.csv/ ← present but EMPTY (legacy prefix) +└── latest.json/ ← present but EMPTY (legacy prefix) +``` + +Notes: + +- There are **no longer any `automations/inc/.csv` files at the root** of + `inc/` — the last full snapshots (through `2026-06-18T17-00-05.csv`) were + archived to `processed/`. New data arrives **only** under `changes/`. +- The `full/`, `latest.csv/`, and `latest.json/` prefixes still appear in + listings but contain **no objects** (leftover/legacy; ignore them). There is + no `latest` pointer and no JSON/metadata envelope. +- **CRQ mirrors INC**: `automations/crq/changes/` carries the same incremental + stream (with matching baseline timestamps and additional newer deltas). CRQ + remains out of scope for `import_tickets.py` (INC-only), but the source-side + shape is the same. CRQ's old root snapshots (`automations/crq/.csv`) are + still present because nothing archives them — they are not consumed. + +## CSV Schema + +Header (32 columns), identical across baseline and delta files: + +```text +ticket_id, source_type, service_type, bucket, raw_status, normalized_status, +created_at_service, scheduled_at, closed_at, last_seen_at, first_seen_at, +week_start, week_end, cluster, region, location_name, latitude, longitude, +department, assigned_team, owner, sla_status, mttr, is_auto_created, +is_auto_closed, is_alarm, is_actionable, source_s3_bucket, source_s3_key, +source_snapshot_id, created_at, updated_at +``` + +Each row is a complete record (not a partial diff): a delta row carries the +ticket's full current state, so a plain upsert on `ticket_id` is correct. The +baseline still contains `is_alarm=true` rows and may include a leading +`EXPORT STOPPED…` truncation-sentinel row in `ticket_id`; both are filtered by +the consumer (see `import_tickets.py`). + +## Timestamp Format + +```text +YYYY-MM-DDTHH-mm-ss e.g. 2026-06-22T15-50-39 +``` + +Generated once at the start of each execution, formatted in `Africa/Nairobi` +(EAT). Note this is the *execution* time, not a top-of-the-hour schedule — the +incremental files appear whenever a change batch is exported (multiple within +the same hour are normal, e.g. `15-50-39` then `15-53-04`). + +## How `import_tickets.py` Consumes It + +`python import_tickets.py --from-bucket --apply` (see `run_ingest.sh`): + +1. Lists `automations/inc/changes/.csv`, sorts ascending by timestamp. +2. Skips files at/older than the **watermark** + (`tickets.import_meta.metadata->>'source_max_key'` — the newest file already + applied); on a fresh stream it processes everything present. +3. For each pending file, oldest→newest: drop `is_alarm=true` + sentinel rows, + strip `DROP_FIELDS`, normalize `region`/`raw_status`, then upsert on + `ticket_id`. The row upsert and the watermark advance **commit in one + transaction per file**, after which the file is moved to + `automations/inc/processed/`. +4. A mid-run failure therefore leaves folder state consistent with the + watermark; the next run resumes cleanly from where it stopped. + +The first file applied onto an empty watermark is recorded as +`export_type="baseline"` in `tickets.import_meta`; every file after is `"delta"`. + +## Workflow Context (carried over — verify before relying on) + +The export originates from the FTTH Automation Ticket export workflow, calling +the authenticated Scoreboard export endpoint and uploading CSV(s) to the +`tickets` bucket; a sibling workflow exports `fuel_records/.csv` to the +`fuel` bucket. Source DB queries are read-only and the workflows do not delete +or update source rows. The previously documented workflow IDs and the claim of +"two files per hour, full snapshots, no `changes/`" predate the June 22 switch +to the incremental stream and should be re-confirmed against the live n8n +configuration before being treated as current.