Commit graph

2 commits

Author SHA1 Message Date
david kiania
5f5d71d500 feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints
Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.

- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
  keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
  now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
  tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
  columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
  packages inc/crq. Docs (README, implementation, deployment-and-operations,
  n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.

tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 23:16:38 +03:00
david kiania
a4b90a33d8 fix(inc): ingest the incremental changes/ stream (baseline + deltas)
The S3 source switched from full hourly snapshots at
automations/inc/<ts>.csv to an incremental CDC stream at
automations/inc/changes/<ts>.csv (first file = full baseline, each later
file = only the rows that changed, keyed by ticket_id; no deletions).

The loader still pointed at the old root path and only ingested the single
newest file, so after the switch it found nothing (no new tickets ingested)
and, even with the path fixed, would silently drop intermediate deltas.

Changes:
- point ingestion at automations/inc/changes/ (_CHANGE_KEY_RE)
- ingest EVERY not-yet-processed file in ascending timestamp order
  (baseline first, then each delta), upserting each
- replace the single-ETag skip with a per-file timestamp watermark
  (import_meta.metadata->>'source_max_key'); rows + watermark commit in one
  txn per file, then archive to processed/ — so a mid-run failure leaves a
  consistent, resumable state
- docs: rename n8n-hourly-s3-full-data-exports.md -> n8n-s3-ticket-exports.md
  and rewrite it for the incremental stream; fix the reference in
  docs/phase-1-ingestion.md

Verified live against prod: re-seeded baseline + 5 deltas (26,529 rows),
files archived to processed/, watermark advanced, re-run is a no-op.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-23 14:37:17 +03:00