diff --git a/docs/phase-1-ingestion.md b/docs/phase-1-ingestion.md index 03f6de9..9cbbb0b 100644 --- a/docs/phase-1-ingestion.md +++ b/docs/phase-1-ingestion.md @@ -19,7 +19,9 @@ tickets to our S3-compatible bucket **every hour**: - `automations/inc/.csv` — **incidents / customer faults** *(in scope)* - `automations/crq/.csv` — new-installation requests *(out of scope)* -(See `n8n-hourly-s3-full-data-exports.md`. Sample: `2026-06-15T17-00-00.csv`.) +(See `n8n-s3-ticket-exports.md`. Sample: `2026-06-15T17-00-00.csv`. Note: the +source later switched to an incremental `automations/inc/changes/` stream — that +doc has the current layout; this PRD records the original Phase-1 model.) `fleettickets` owns the **downstream**: the `tickets` schema in the shared `tracksolid_db` (raw-jsonb-first `tickets.inc`, geocoding gazetteers, and diff --git a/import_tickets.py b/import_tickets.py index 97bbcca..2c4ae3c 100644 --- a/import_tickets.py +++ b/import_tickets.py @@ -13,19 +13,21 @@ Everything downstream reads from `raw` (resilient to source schema drift). The D derives `geom` (see migrations): feed coords (raw lat/lng) -> location geocode (tickets.geo_locations) -> cluster centroid (tickets.geo_clusters) -> none. -Source data: the n8n hourly S3 export (see n8n-hourly-s3-full-data-exports.md) -writes a full current-state snapshot CSV per hour to the `tickets` bucket at - automations/inc/.csv (e.g. 2026-06-15T17-00-00.csv) -There is NO latest pointer, NO metadata envelope, and NO deltas — each file is a -flat CSV (header + rows). We ingest the NEWEST file: - - skip-if-unchanged: if its S3 ETag matches the last processed file's ETag we - skip the DB write (the export re-emits byte-identical content most hours); +Source data: the n8n S3 export writes CSV files to the `tickets` bucket under + automations/inc/changes/.csv (e.g. 2026-06-22T15-50-39.csv) +This is an INCREMENTAL (CDC) stream: the first file is a full current-state +baseline, and every later file holds only the rows that CHANGED since the prior +export (new + updated tickets, keyed by ticket_id; deletions are never emitted). +Every file shares the identical flat-CSV schema (header + rows). We ingest EVERY +not-yet-processed file in ASCENDING timestamp order (baseline first, then each +delta) — taking only the newest would silently drop the intermediate deltas: - drop is_alarm=true rows + the "EXPORT STOPPED…" truncation-sentinel row; - drop derivable / provenance / zero-info columns (see DROP_FIELDS); - normalize region -> lowercase, raw_status -> UPPERCASE; - upsert on ticket_id (PRIMARY KEY → no duplication; never delete, so closure - history accumulates), and record snapshot freshness in tickets.import_meta; - - on success, MOVE the file to automations/inc/processed/ (copy + delete). + history accumulates), and advance the watermark in tickets.import_meta + (metadata->>'source_max_key' = newest file applied) so reruns skip what's done; + - on success, MOVE each file to automations/inc/processed/ (copy + delete). Geocoding (two layers, both via a KEYED provider — public Nominatim rate-limits): --geocode-clusters one coordinate per cluster (coarse fallback; ~50 lookups) @@ -70,7 +72,7 @@ log = get_logger("import_tickets") _TABLE = "tickets.inc" _DATASET = "inc" _BUCKET = os.getenv("TICKETS_BUCKET", "tickets") -_INC_PREFIX = "automations/inc/" +_INC_PREFIX = "automations/inc/changes/" # the incremental (CDC) change stream _PROCESSED_PREFIX = "automations/inc/processed/" _EAT = timezone(timedelta(hours=3)) # Africa/Nairobi — filenames + data are EAT @@ -88,9 +90,10 @@ DROP_FIELDS = frozenset({ "department", "source_type", }) -# Only files matching automations/inc/.csv (NOT processed/, NOT the -# leftover latest.csv/, latest.json/, full/ prefixes). -_CSV_KEY_RE = re.compile(r"^automations/inc/(\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2})\.csv$") +# Only files matching automations/inc/changes/.csv — the incremental +# stream (NOT processed/, NOT the leftover latest.csv/, latest.json/, full/ prefixes). +_CHANGE_KEY_RE = re.compile( + r"^automations/inc/changes/(\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2})\.csv$") # Geocoder (keyed) — public Nominatim rate-limits bulk, so we use LocationIQ/OpenCage. _PROVIDER = os.getenv("GEOCODER_PROVIDER", "locationiq").lower() @@ -119,8 +122,8 @@ def _s3_client(): def _ts_from_key(key: str) -> datetime | None: - """EAT timestamp embedded in an automations/inc/.csv key (or None).""" - m = _CSV_KEY_RE.match(key) + """EAT timestamp embedded in an automations/inc/changes/.csv key (or None).""" + m = _CHANGE_KEY_RE.match(key) if not m: return None try: # regex shape can match an impossible date (e.g. 9999-99-99T…) — don't crash the sort @@ -130,11 +133,11 @@ def _ts_from_key(key: str) -> datetime | None: def _list_inc_csvs(s3) -> list[tuple[str, str]]: - """[(key, etag)] for every automations/inc/.csv (excludes processed/ + dirs).""" + """[(key, etag)] for every automations/inc/changes/.csv (excludes processed/ + dirs).""" out: list[tuple[str, str]] = [] for page in s3.get_paginator("list_objects_v2").paginate(Bucket=_BUCKET, Prefix=_INC_PREFIX): for it in page.get("Contents", []): - if _CSV_KEY_RE.match(it["Key"]): + if _CHANGE_KEY_RE.match(it["Key"]): out.append((it["Key"], (it.get("ETag") or "").strip('"'))) return out @@ -144,16 +147,23 @@ def _get_text(s3, key: str) -> str: return s3.get_object(Bucket=_BUCKET, Key=key)["Body"].read().decode("utf-8") -def _last_processed_etag() -> str | None: - """ETag of the most recently ingested INC file (from tickets.import_meta).""" +def _last_processed_ts() -> datetime | None: + """Watermark: EAT timestamp of the newest change file already ingested. + + Read from tickets.import_meta (metadata->>'source_max_key', advanced per file as + we drain changes/ oldest→newest). None when nothing has been ingested via the + changes stream yet (e.g. the first run after the source switched to incremental, + where the stored key is an old full-snapshot path) — then every file currently in + changes/ is processed. + """ with get_conn() as conn: with conn.cursor() as cur: cur.execute( - "SELECT metadata->>'source_etag' FROM tickets.import_meta WHERE dataset = %s", + "SELECT metadata->>'source_max_key' FROM tickets.import_meta WHERE dataset = %s", (_DATASET,), ) row = cur.fetchone() - return row[0] if row else None + return _ts_from_key(row[0]) if row and row[0] else None def _parse_csv(text: str) -> list[dict]: @@ -263,40 +273,54 @@ def ingest(args) -> None: upsert(rows, args.apply, meta=meta) return - # --from-bucket: newest INC csv → skip-if-unchanged → ingest → archive. + # --from-bucket: ingest EVERY not-yet-processed change file, oldest→newest + # (baseline first, then each delta), upserting each. The watermark advances and + # the file is archived PER file, so a mid-run failure leaves a consistent state + # (folder state matches the watermark) and the next run resumes cleanly. s3 = _s3_client() listing = _list_inc_csvs(s3) if not listing: - log.info("no INC csv files under %s — nothing to do", _INC_PREFIX) + log.info("no INC change files under %s — nothing to do", _INC_PREFIX) return listing.sort(key=lambda ke: _ts_from_key(ke[0]) or datetime.min.replace(tzinfo=_EAT)) - all_keys = [k for k, _ in listing] - newest_key, newest_etag = listing[-1] - log.info("newest INC file: %s (etag=%s; %d file(s) present)", - newest_key, newest_etag, len(listing)) - last_etag = _last_processed_etag() - if newest_etag and newest_etag == last_etag: - log.info("etag unchanged from last processed (%s) — skipping DB write", last_etag) + # watermark: skip anything at/older than the newest file already applied. Archiving + # normally empties changes/, but this guards a failed archive from re-applying. + last_ts = _last_processed_ts() + _floor = datetime.min.replace(tzinfo=_EAT) + pending = [(k, e) for k, e in listing + if last_ts is None or (_ts_from_key(k) or _floor) > last_ts] + if not pending: + log.info("all %d change file(s) already processed (watermark %s) — nothing new", + len(listing), last_ts and last_ts.isoformat()) if args.apply: - _move_processed(s3, all_keys) - _capture_history() # still record today's snapshot even when unchanged - else: - log.info("DRY-RUN — would archive %d file(s) to %s", len(all_keys), _PROCESSED_PREFIX) + _move_processed(s3, [k for k, _ in listing]) # archive any stragglers + _capture_history() return + log.info("%d of %d change file(s) to ingest (watermark %s); newest=%s", + len(pending), len(listing), last_ts and last_ts.isoformat(), pending[-1][0]) - rows = _parse_csv(_get_text(s3, newest_key)) - ts = _ts_from_key(newest_key) - meta = {"export_type": "full", "source_s3_key": newest_key, - "source_etag": newest_etag, "row_count": len(rows)} - if ts: - meta["exported_at"] = ts.isoformat() - upsert(rows, args.apply, meta=meta) + total = 0 + for i, (key, etag) in enumerate(pending): + rows = _parse_csv(_get_text(s3, key)) + ts = _ts_from_key(key) + # the first file applied onto an empty watermark is the full baseline; every + # file after is a delta. export_type is informational (recorded in import_meta). + meta = {"export_type": "baseline" if (last_ts is None and i == 0) else "delta", + "source_s3_key": key, "source_etag": etag, + "source_max_key": key, "row_count": len(rows)} + if ts: + meta["exported_at"] = ts.isoformat() + # rows + watermark (source_max_key) commit in one txn, advancing per file; only + # then archive, so the changes/ folder state always matches the watermark. + total += upsert(rows, args.apply, meta=meta) + if args.apply: + _move_processed(s3, [key]) + else: + log.info("DRY-RUN — would archive %s to %s", key, _PROCESSED_PREFIX) + log.info("ingested %d change file(s); %d rows kept in total", len(pending), total) if args.apply: - _move_processed(s3, all_keys) _capture_history() - else: - log.info("DRY-RUN — would archive %d file(s) to %s", len(all_keys), _PROCESSED_PREFIX) # ── place extraction (strip network codes, keep the real place) ─────────────── diff --git a/n8n-hourly-s3-full-data-exports.md b/n8n-hourly-s3-full-data-exports.md deleted file mode 100644 index 1563162..0000000 --- a/n8n-hourly-s3-full-data-exports.md +++ /dev/null @@ -1,156 +0,0 @@ -# n8n Hourly S3 Full-Data Exports - -Updated on June 15, 2026. - -## Overview - -Two active n8n workflows export complete datasets to S3 every hour: - -1. `FTTH Automation Ticket S3 Export` -2. `Fuel Records S3 Export` - -Each execution creates CSV files only. Filenames use the actual execution time -in the `Africa/Nairobi` timezone. - -No delta files, JSON files, `latest` files, `changes/` directories, `full/` -directories, or midnight-specific exports are created. - -## Hourly Output - -Together, the two workflows create exactly three files during their hourly -executions: - -```text -automations/crq/YYYY-MM-DDTHH-mm-ss.csv -automations/inc/YYYY-MM-DDTHH-mm-ss.csv -fuel_records/YYYY-MM-DDTHH-mm-ss.csv -``` - -The CRQ and INC files are uploaded to the `tickets` bucket. The Fuel file is -uploaded to the `fuel` bucket. - -## FTTH Automation Ticket S3 Export - -Workflow ID: `JI3QkcJeHk9eYRsY` - -The workflow: - -1. Runs at the start of every hour using the `Africa/Nairobi` workflow timezone. -2. Creates one execution timestamp. -3. Calls the existing authenticated Scoreboard export endpoint with - `export_type: full`. -4. Reads all CRQ and INC rows returned by the endpoint. -5. Converts each complete dataset to CSV. -6. Uploads exactly two files: - - `automations/crq/.csv` - - `automations/inc/.csv` -7. Fails the execution if exactly two successful upload results are not - returned. - -The workflow still has its existing manual webhook for operational testing. - -## Fuel Records S3 Export - -Workflow ID: `IP2KNAfFazAjTesh` - -The workflow: - -1. Runs at the start of every hour using the `Africa/Nairobi` workflow timezone. -2. Creates one execution timestamp. -3. Reads the complete `logistics_department.fuel_records` table. -4. Converts all returned rows to one CSV. -5. Uploads exactly one file: - - `fuel_records/.csv` -6. Fails the execution if the S3 upload reports an error. - -The workflow still has its existing manual webhook for operational testing. - -## Timestamp Format - -The timestamp format is: - -```text -YYYY-MM-DDTHH-mm-ss -``` - -Example: - -```text -2026-06-15T14-39-53 -``` - -The timestamp is generated once at the start of each workflow execution and is -formatted in `Africa/Nairobi`. - -## Credentials and Safety - -- Existing n8n PostgreSQL, S3, workflow-variable, and API token configuration is - reused. -- No S3 credentials or API secrets are hard-coded in workflow code. -- Secrets are not included in workflow result messages. -- Source database queries are read-only. -- The workflows do not delete or update source database rows. -- S3 upload nodes retain retry handling. A failed hourly execution can also be - recovered naturally by the next full-data run. - -## Removed Behavior - -The workflows no longer contain: - -- Delta export logic or stored delta pointers -- Midnight full-export schedules -- `latest.json` or `latest.csv` -- JSON output -- `changes/` keys -- `full/` keys -- Multipart or additional export files -- FTTH mark-sent state handling - -## Deployment Status - -Both workflows were saved, published, and activated on June 15, 2026. - -Active versions: - -```text -Fuel Records S3 Export: -60cf5824-9345-45bb-a2eb-3b20b877fd32 - -FTTH Automation Ticket S3 Export: -68b7be10-ac3a-43d8-8c17-b46a2cbb48d2 -``` - -## Manual Test Evidence - -### Fuel Records S3 Export - -Execution ID: `404079` - -Rows exported: `2001` - -Exact S3 key: - -```text -fuel_records/2026-06-15T14-39-50.csv -``` - -### FTTH Automation Ticket S3 Export - -Execution ID: `404080` - -Rows exported: - -```text -CRQ: 12680 -INC: 31434 -``` - -Exact S3 keys: - -```text -automations/crq/2026-06-15T14-39-53.csv -automations/inc/2026-06-15T14-39-53.csv -``` - -Both manual tests completed successfully. Their upload builders generated one -Fuel item and exactly two FTTH items, matching the required three output files. diff --git a/n8n-s3-ticket-exports.md b/n8n-s3-ticket-exports.md new file mode 100644 index 0000000..4084bc6 --- /dev/null +++ b/n8n-s3-ticket-exports.md @@ -0,0 +1,127 @@ +# n8n S3 Ticket Exports — Incremental (CDC) Stream + +Updated on June 23, 2026. + +> **History.** This doc previously described a full-snapshot-per-hour model +> ("No delta files … No `changes/` directories"). That is no longer accurate. +> As of the June 22, 2026 re-seed the source switched to an **incremental +> change-data-capture (CDC) stream** under `automations//changes/`. +> The structure below was verified by direct S3 inspection of the `tickets` +> bucket on June 23, 2026. Workflow-internal details (IDs, node behaviour) are +> carried over from the prior version and may be stale — trust the bucket. + +## Overview + +The FTTH ticket export now writes an **incremental** CSV stream per dataset: + +- The **first** file in a stream is a full current-state **baseline**. +- Every **later** file holds **only the rows that changed** since the prior + export — new and updated tickets, keyed by `ticket_id`. +- **Deletions are never emitted** (tickets are closed in place, not removed). + +Consumers must ingest **every not-yet-processed file in ascending timestamp +order** (baseline first, then each delta) and **upsert on `ticket_id`**. Taking +only the newest file silently drops the intermediate deltas. + +CSV files only. Filenames use the execution time in the `Africa/Nairobi` +timezone (format below). All files share one identical flat-CSV schema (header ++ rows) — the same column set the previous full snapshots used. + +## Output Layout + +The change stream lives under a `changes/` prefix per dataset in the `tickets` +bucket: + +```text +automations/crq/changes/YYYY-MM-DDTHH-mm-ss.csv +automations/inc/changes/YYYY-MM-DDTHH-mm-ss.csv +``` + +Observed `tickets` bucket layout (June 23, 2026): + +```text +automations/inc/ +├── changes/ ← ACTIVE incremental stream (baseline + deltas) +│ ├── 2026-06-22T15-50-39.csv (baseline, ~15 MB, 34,849 rows) +│ ├── 2026-06-22T15-53-04.csv (delta, 1 row) +│ ├── 2026-06-22T17-10-41.csv (delta, 22 rows) +│ └── 2026-06-22T17-15-41.csv (delta, 131 rows) +├── processed/ ← our pipeline's archive of consumed files +├── full/ ← present but EMPTY (legacy prefix) +├── latest.csv/ ← present but EMPTY (legacy prefix) +└── latest.json/ ← present but EMPTY (legacy prefix) +``` + +Notes: + +- There are **no longer any `automations/inc/.csv` files at the root** of + `inc/` — the last full snapshots (through `2026-06-18T17-00-05.csv`) were + archived to `processed/`. New data arrives **only** under `changes/`. +- The `full/`, `latest.csv/`, and `latest.json/` prefixes still appear in + listings but contain **no objects** (leftover/legacy; ignore them). There is + no `latest` pointer and no JSON/metadata envelope. +- **CRQ mirrors INC**: `automations/crq/changes/` carries the same incremental + stream (with matching baseline timestamps and additional newer deltas). CRQ + remains out of scope for `import_tickets.py` (INC-only), but the source-side + shape is the same. CRQ's old root snapshots (`automations/crq/.csv`) are + still present because nothing archives them — they are not consumed. + +## CSV Schema + +Header (32 columns), identical across baseline and delta files: + +```text +ticket_id, source_type, service_type, bucket, raw_status, normalized_status, +created_at_service, scheduled_at, closed_at, last_seen_at, first_seen_at, +week_start, week_end, cluster, region, location_name, latitude, longitude, +department, assigned_team, owner, sla_status, mttr, is_auto_created, +is_auto_closed, is_alarm, is_actionable, source_s3_bucket, source_s3_key, +source_snapshot_id, created_at, updated_at +``` + +Each row is a complete record (not a partial diff): a delta row carries the +ticket's full current state, so a plain upsert on `ticket_id` is correct. The +baseline still contains `is_alarm=true` rows and may include a leading +`EXPORT STOPPED…` truncation-sentinel row in `ticket_id`; both are filtered by +the consumer (see `import_tickets.py`). + +## Timestamp Format + +```text +YYYY-MM-DDTHH-mm-ss e.g. 2026-06-22T15-50-39 +``` + +Generated once at the start of each execution, formatted in `Africa/Nairobi` +(EAT). Note this is the *execution* time, not a top-of-the-hour schedule — the +incremental files appear whenever a change batch is exported (multiple within +the same hour are normal, e.g. `15-50-39` then `15-53-04`). + +## How `import_tickets.py` Consumes It + +`python import_tickets.py --from-bucket --apply` (see `run_ingest.sh`): + +1. Lists `automations/inc/changes/.csv`, sorts ascending by timestamp. +2. Skips files at/older than the **watermark** + (`tickets.import_meta.metadata->>'source_max_key'` — the newest file already + applied); on a fresh stream it processes everything present. +3. For each pending file, oldest→newest: drop `is_alarm=true` + sentinel rows, + strip `DROP_FIELDS`, normalize `region`/`raw_status`, then upsert on + `ticket_id`. The row upsert and the watermark advance **commit in one + transaction per file**, after which the file is moved to + `automations/inc/processed/`. +4. A mid-run failure therefore leaves folder state consistent with the + watermark; the next run resumes cleanly from where it stopped. + +The first file applied onto an empty watermark is recorded as +`export_type="baseline"` in `tickets.import_meta`; every file after is `"delta"`. + +## Workflow Context (carried over — verify before relying on) + +The export originates from the FTTH Automation Ticket export workflow, calling +the authenticated Scoreboard export endpoint and uploading CSV(s) to the +`tickets` bucket; a sibling workflow exports `fuel_records/.csv` to the +`fuel` bucket. Source DB queries are read-only and the workflows do not delete +or update source rows. The previously documented workflow IDs and the claim of +"two files per hour, full snapshots, no `changes/`" predate the June 22 switch +to the incremental stream and should be re-confirmed against the live n8n +configuration before being treated as current.