2026-06-11 17:13:50 +00:00
|
|
|
"""
|
feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints
Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.
- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
packages inc/crq. Docs (README, implementation, deployment-and-operations,
n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.
tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 20:16:38 +00:00
|
|
|
pipeline.py — Fireside Communications · generic ticket ingestion engine (raw-first)
|
2026-06-11 17:13:50 +00:00
|
|
|
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints
Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.
- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
packages inc/crq. Docs (README, implementation, deployment-and-operations,
n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.
tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 20:16:38 +00:00
|
|
|
The dataset-agnostic core shared by the per-type entrypoints:
|
|
|
|
|
inc/import_inc.py -> tickets.inc (incidents / customer faults)
|
|
|
|
|
crq/import_crq.py -> tickets.crq (new-installation requests)
|
|
|
|
|
|
|
|
|
|
Both datasets share an IDENTICAL flat-CSV schema and the same CDC change stream,
|
|
|
|
|
so the only differences are the table, the S3 prefixes, the import_meta dataset
|
|
|
|
|
key, and an optional post-apply hook (INC captures closure/backlog history; CRQ
|
|
|
|
|
does not yet). Those are carried by the `Dataset` config; everything else here is
|
|
|
|
|
generic. Geocoding is inherently CROSS-DATASET (one gazetteer, one geocoder
|
|
|
|
|
budget): geocode_clusters / geocode_locations / resolve operate on BOTH tables and
|
|
|
|
|
are driven from a single entrypoint (the INC one) — never duplicated per dataset.
|
2026-06-15 16:33:16 +00:00
|
|
|
|
|
|
|
|
RAW-FIRST: each row stores only `ticket_id` + `raw` (the source record as jsonb).
|
|
|
|
|
Everything downstream reads from `raw` (resilient to source schema drift). The DB
|
|
|
|
|
derives `geom` (see migrations): feed coords (raw lat/lng) -> location geocode
|
2026-06-11 17:13:50 +00:00
|
|
|
(tickets.geo_locations) -> cluster centroid (tickets.geo_clusters) -> none.
|
|
|
|
|
|
2026-06-25 15:20:04 +00:00
|
|
|
Source data: the n8n S3 export writes CSV files to the `isptickets` bucket under
|
feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints
Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.
- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
packages inc/crq. Docs (README, implementation, deployment-and-operations,
n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.
tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 20:16:38 +00:00
|
|
|
automations/<dataset>/changes/<EAT-timestamp>.csv (e.g. 2026-06-24T09-55-44.csv)
|
fix(inc): ingest the incremental changes/ stream (baseline + deltas)
The S3 source switched from full hourly snapshots at
automations/inc/<ts>.csv to an incremental CDC stream at
automations/inc/changes/<ts>.csv (first file = full baseline, each later
file = only the rows that changed, keyed by ticket_id; no deletions).
The loader still pointed at the old root path and only ingested the single
newest file, so after the switch it found nothing (no new tickets ingested)
and, even with the path fixed, would silently drop intermediate deltas.
Changes:
- point ingestion at automations/inc/changes/ (_CHANGE_KEY_RE)
- ingest EVERY not-yet-processed file in ascending timestamp order
(baseline first, then each delta), upserting each
- replace the single-ETag skip with a per-file timestamp watermark
(import_meta.metadata->>'source_max_key'); rows + watermark commit in one
txn per file, then archive to processed/ — so a mid-run failure leaves a
consistent, resumable state
- docs: rename n8n-hourly-s3-full-data-exports.md -> n8n-s3-ticket-exports.md
and rewrite it for the incremental stream; fix the reference in
docs/phase-1-ingestion.md
Verified live against prod: re-seeded baseline + 5 deltas (26,529 rows),
files archived to processed/, watermark advanced, re-run is a no-op.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-23 11:37:17 +00:00
|
|
|
This is an INCREMENTAL (CDC) stream: the first file is a full current-state
|
|
|
|
|
baseline, and every later file holds only the rows that CHANGED since the prior
|
feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints
Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.
- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
packages inc/crq. Docs (README, implementation, deployment-and-operations,
n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.
tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 20:16:38 +00:00
|
|
|
export (with periodic full-state re-emissions). Deletions are never emitted. Every
|
|
|
|
|
file shares the identical flat-CSV schema. We ingest EVERY not-yet-processed file
|
|
|
|
|
in ASCENDING timestamp order (baseline first, then each delta) — taking only the
|
|
|
|
|
newest would silently drop the intermediate deltas:
|
2026-06-15 16:33:16 +00:00
|
|
|
- drop is_alarm=true rows + the "EXPORT STOPPED…" truncation-sentinel row;
|
|
|
|
|
- drop derivable / provenance / zero-info columns (see DROP_FIELDS);
|
|
|
|
|
- normalize region -> lowercase, raw_status -> UPPERCASE;
|
|
|
|
|
- upsert on ticket_id (PRIMARY KEY → no duplication; never delete, so closure
|
fix(inc): ingest the incremental changes/ stream (baseline + deltas)
The S3 source switched from full hourly snapshots at
automations/inc/<ts>.csv to an incremental CDC stream at
automations/inc/changes/<ts>.csv (first file = full baseline, each later
file = only the rows that changed, keyed by ticket_id; no deletions).
The loader still pointed at the old root path and only ingested the single
newest file, so after the switch it found nothing (no new tickets ingested)
and, even with the path fixed, would silently drop intermediate deltas.
Changes:
- point ingestion at automations/inc/changes/ (_CHANGE_KEY_RE)
- ingest EVERY not-yet-processed file in ascending timestamp order
(baseline first, then each delta), upserting each
- replace the single-ETag skip with a per-file timestamp watermark
(import_meta.metadata->>'source_max_key'); rows + watermark commit in one
txn per file, then archive to processed/ — so a mid-run failure leaves a
consistent, resumable state
- docs: rename n8n-hourly-s3-full-data-exports.md -> n8n-s3-ticket-exports.md
and rewrite it for the incremental stream; fix the reference in
docs/phase-1-ingestion.md
Verified live against prod: re-seeded baseline + 5 deltas (26,529 rows),
files archived to processed/, watermark advanced, re-run is a no-op.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-23 11:37:17 +00:00
|
|
|
history accumulates), and advance the watermark in tickets.import_meta
|
|
|
|
|
(metadata->>'source_max_key' = newest file applied) so reruns skip what's done;
|
feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints
Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.
- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
packages inc/crq. Docs (README, implementation, deployment-and-operations,
n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.
tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 20:16:38 +00:00
|
|
|
- on success, MOVE each file to automations/<dataset>/processed/ (copy + delete).
|
2026-06-11 17:13:50 +00:00
|
|
|
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
2026-06-15 16:33:16 +00:00
|
|
|
import io
|
feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints
Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.
- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
packages inc/crq. Docs (README, implementation, deployment-and-operations,
n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.
tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 20:16:38 +00:00
|
|
|
import csv
|
2026-06-11 17:13:50 +00:00
|
|
|
import math
|
|
|
|
|
import os
|
|
|
|
|
import re
|
|
|
|
|
import time
|
feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints
Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.
- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
packages inc/crq. Docs (README, implementation, deployment-and-operations,
n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.
tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 20:16:38 +00:00
|
|
|
from collections.abc import Callable
|
|
|
|
|
from dataclasses import dataclass
|
2026-06-15 16:33:16 +00:00
|
|
|
from datetime import datetime, timezone, timedelta
|
2026-06-11 17:13:50 +00:00
|
|
|
|
2026-06-15 17:08:05 +00:00
|
|
|
import boto3
|
2026-06-11 17:13:50 +00:00
|
|
|
import requests
|
|
|
|
|
import psycopg2.extras
|
2026-06-15 17:08:05 +00:00
|
|
|
from botocore.config import Config as BotoConfig
|
2026-06-11 17:13:50 +00:00
|
|
|
|
|
|
|
|
from shared import clean, get_conn, get_logger
|
|
|
|
|
|
feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints
Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.
- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
packages inc/crq. Docs (README, implementation, deployment-and-operations,
n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.
tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 20:16:38 +00:00
|
|
|
log = get_logger("pipeline")
|
2026-06-11 17:13:50 +00:00
|
|
|
|
feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints
Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.
- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
packages inc/crq. Docs (README, implementation, deployment-and-operations,
n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.
tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 20:16:38 +00:00
|
|
|
# ── shared ingestion config ─────────────────────────────────────────────────────
|
2026-06-25 15:20:04 +00:00
|
|
|
_BUCKET = os.getenv("TICKETS_BUCKET", "isptickets")
|
2026-06-15 16:33:16 +00:00
|
|
|
_EAT = timezone(timedelta(hours=3)) # Africa/Nairobi — filenames + data are EAT
|
|
|
|
|
|
|
|
|
|
# Garbage row the source leaks (commonly the first data line): its ticket_id is the
|
|
|
|
|
# message itself. Matched by prefix so position/exact-tail don't matter.
|
|
|
|
|
_SENTINEL_PREFIX = "EXPORT STOPPED"
|
|
|
|
|
|
|
|
|
|
# Columns dropped before building `raw`: derivable (week_*), the client's row-level
|
|
|
|
|
# export provenance (source_s3_*, source_snapshot_id), and zero-information columns
|
|
|
|
|
# (department=always FTTH, source_type=duplicate of service_type). We KEEP
|
|
|
|
|
# service_type and `bucket` (the latter is a closed/pending lifecycle flag).
|
|
|
|
|
DROP_FIELDS = frozenset({
|
|
|
|
|
"week_start", "week_end",
|
|
|
|
|
"source_s3_bucket", "source_s3_key", "source_snapshot_id",
|
|
|
|
|
"department", "source_type",
|
|
|
|
|
})
|
|
|
|
|
|
2026-06-11 17:13:50 +00:00
|
|
|
# Geocoder (keyed) — public Nominatim rate-limits bulk, so we use LocationIQ/OpenCage.
|
|
|
|
|
_PROVIDER = os.getenv("GEOCODER_PROVIDER", "locationiq").lower()
|
|
|
|
|
_API_KEY = os.getenv("GEOCODER_API_KEY", "")
|
|
|
|
|
_GEOCODE_INTERVAL_S = float(os.getenv("GEOCODER_MIN_INTERVAL_S", "1.1"))
|
|
|
|
|
_last_geocode_at = 0.0
|
|
|
|
|
|
|
|
|
|
|
feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints
Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.
- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
packages inc/crq. Docs (README, implementation, deployment-and-operations,
n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.
tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 20:16:38 +00:00
|
|
|
# ── dataset config (per ticket type) ────────────────────────────────────────────
|
|
|
|
|
@dataclass(frozen=True)
|
|
|
|
|
class Dataset:
|
|
|
|
|
"""All that distinguishes one ticket type from another in the generic engine."""
|
|
|
|
|
name: str # 'inc' | 'crq' (import_meta.dataset)
|
|
|
|
|
table: str # 'tickets.inc' | 'tickets.crq'
|
|
|
|
|
change_prefix: str # 'automations/<name>/changes/'
|
|
|
|
|
processed_prefix: str # 'automations/<name>/processed/'
|
|
|
|
|
key_regex: re.Pattern # matches a <prefix><EAT-ts>.csv key
|
|
|
|
|
post_apply: Callable[[], None] | None = None # e.g. capture_history (INC only)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def make_dataset(name: str, post_apply: Callable[[], None] | None = None) -> Dataset:
|
|
|
|
|
"""Build the standard Dataset for a ticket type (inc/crq) — only the name varies."""
|
|
|
|
|
return Dataset(
|
|
|
|
|
name=name,
|
|
|
|
|
table=f"tickets.{name}",
|
|
|
|
|
change_prefix=f"automations/{name}/changes/",
|
|
|
|
|
processed_prefix=f"automations/{name}/processed/",
|
|
|
|
|
# only automations/<name>/changes/<EAT-timestamp>.csv — the incremental stream
|
|
|
|
|
# (NOT processed/, NOT the leftover latest.csv/, latest.json/, full/ prefixes).
|
|
|
|
|
key_regex=re.compile(
|
|
|
|
|
rf"^automations/{name}/changes/(\d{{4}}-\d{{2}}-\d{{2}}T\d{{2}}-\d{{2}}-\d{{2}})\.csv$"),
|
|
|
|
|
post_apply=post_apply,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
2026-06-25 15:20:04 +00:00
|
|
|
# ── data loading (CSV · incremental CDC change stream · per-file watermark) ─────
|
2026-06-15 17:08:05 +00:00
|
|
|
# S3 access is via boto3 (no aws-CLI dependency → runs cleanly in a slim container).
|
|
|
|
|
def _s3_client():
|
2026-06-25 15:20:04 +00:00
|
|
|
"""boto3 S3 client for the S3 endpoint (force path-style addressing)."""
|
2026-06-15 17:08:05 +00:00
|
|
|
return boto3.client(
|
|
|
|
|
"s3",
|
|
|
|
|
endpoint_url=os.environ["RUSTFS_ENDPOINT"],
|
|
|
|
|
aws_access_key_id=os.environ["RUSTFS_ACCESS_KEY"],
|
|
|
|
|
aws_secret_access_key=os.environ["RUSTFS_SECRET_KEY"],
|
|
|
|
|
region_name=os.getenv("RUSTFS_REGION", "us-east-1"),
|
|
|
|
|
config=BotoConfig(s3={"addressing_style": "path"}, signature_version="s3v4",
|
|
|
|
|
retries={"max_attempts": 3, "mode": "standard"}),
|
|
|
|
|
)
|
2026-06-15 16:33:16 +00:00
|
|
|
|
|
|
|
|
|
feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints
Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.
- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
packages inc/crq. Docs (README, implementation, deployment-and-operations,
n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.
tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 20:16:38 +00:00
|
|
|
def _ts_from_key(ds: Dataset, key: str) -> datetime | None:
|
|
|
|
|
"""EAT timestamp embedded in an automations/<ds>/changes/<ts>.csv key (or None)."""
|
|
|
|
|
m = ds.key_regex.match(key)
|
2026-06-15 16:33:16 +00:00
|
|
|
if not m:
|
|
|
|
|
return None
|
2026-06-18 10:41:38 +00:00
|
|
|
try: # regex shape can match an impossible date (e.g. 9999-99-99T…) — don't crash the sort
|
|
|
|
|
return datetime.strptime(m.group(1), "%Y-%m-%dT%H-%M-%S").replace(tzinfo=_EAT)
|
|
|
|
|
except ValueError:
|
|
|
|
|
return None
|
2026-06-15 16:33:16 +00:00
|
|
|
|
|
|
|
|
|
feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints
Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.
- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
packages inc/crq. Docs (README, implementation, deployment-and-operations,
n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.
tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 20:16:38 +00:00
|
|
|
def _list_csvs(s3, ds: Dataset) -> list[tuple[str, str]]:
|
|
|
|
|
"""[(key, etag)] for every changes/<ts>.csv of this dataset (excludes processed/ + dirs)."""
|
2026-06-15 17:08:05 +00:00
|
|
|
out: list[tuple[str, str]] = []
|
feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints
Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.
- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
packages inc/crq. Docs (README, implementation, deployment-and-operations,
n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.
tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 20:16:38 +00:00
|
|
|
for page in s3.get_paginator("list_objects_v2").paginate(Bucket=_BUCKET, Prefix=ds.change_prefix):
|
2026-06-15 17:08:05 +00:00
|
|
|
for it in page.get("Contents", []):
|
feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints
Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.
- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
packages inc/crq. Docs (README, implementation, deployment-and-operations,
n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.
tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 20:16:38 +00:00
|
|
|
if ds.key_regex.match(it["Key"]):
|
2026-06-15 17:08:05 +00:00
|
|
|
out.append((it["Key"], (it.get("ETag") or "").strip('"')))
|
|
|
|
|
return out
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _get_text(s3, key: str) -> str:
|
|
|
|
|
"""Download an object's body as UTF-8 text."""
|
|
|
|
|
return s3.get_object(Bucket=_BUCKET, Key=key)["Body"].read().decode("utf-8")
|
2026-06-15 16:33:16 +00:00
|
|
|
|
|
|
|
|
|
feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints
Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.
- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
packages inc/crq. Docs (README, implementation, deployment-and-operations,
n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.
tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 20:16:38 +00:00
|
|
|
def _last_processed_ts(ds: Dataset) -> datetime | None:
|
|
|
|
|
"""Watermark: EAT timestamp of the newest change file already ingested for this dataset.
|
fix(inc): ingest the incremental changes/ stream (baseline + deltas)
The S3 source switched from full hourly snapshots at
automations/inc/<ts>.csv to an incremental CDC stream at
automations/inc/changes/<ts>.csv (first file = full baseline, each later
file = only the rows that changed, keyed by ticket_id; no deletions).
The loader still pointed at the old root path and only ingested the single
newest file, so after the switch it found nothing (no new tickets ingested)
and, even with the path fixed, would silently drop intermediate deltas.
Changes:
- point ingestion at automations/inc/changes/ (_CHANGE_KEY_RE)
- ingest EVERY not-yet-processed file in ascending timestamp order
(baseline first, then each delta), upserting each
- replace the single-ETag skip with a per-file timestamp watermark
(import_meta.metadata->>'source_max_key'); rows + watermark commit in one
txn per file, then archive to processed/ — so a mid-run failure leaves a
consistent, resumable state
- docs: rename n8n-hourly-s3-full-data-exports.md -> n8n-s3-ticket-exports.md
and rewrite it for the incremental stream; fix the reference in
docs/phase-1-ingestion.md
Verified live against prod: re-seeded baseline + 5 deltas (26,529 rows),
files archived to processed/, watermark advanced, re-run is a no-op.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-23 11:37:17 +00:00
|
|
|
|
|
|
|
|
Read from tickets.import_meta (metadata->>'source_max_key', advanced per file as
|
|
|
|
|
we drain changes/ oldest→newest). None when nothing has been ingested via the
|
feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints
Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.
- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
packages inc/crq. Docs (README, implementation, deployment-and-operations,
n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.
tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 20:16:38 +00:00
|
|
|
changes stream yet (e.g. a brand-new dataset, or the first run after the source
|
|
|
|
|
switched buckets) — then every file currently in changes/ is processed.
|
fix(inc): ingest the incremental changes/ stream (baseline + deltas)
The S3 source switched from full hourly snapshots at
automations/inc/<ts>.csv to an incremental CDC stream at
automations/inc/changes/<ts>.csv (first file = full baseline, each later
file = only the rows that changed, keyed by ticket_id; no deletions).
The loader still pointed at the old root path and only ingested the single
newest file, so after the switch it found nothing (no new tickets ingested)
and, even with the path fixed, would silently drop intermediate deltas.
Changes:
- point ingestion at automations/inc/changes/ (_CHANGE_KEY_RE)
- ingest EVERY not-yet-processed file in ascending timestamp order
(baseline first, then each delta), upserting each
- replace the single-ETag skip with a per-file timestamp watermark
(import_meta.metadata->>'source_max_key'); rows + watermark commit in one
txn per file, then archive to processed/ — so a mid-run failure leaves a
consistent, resumable state
- docs: rename n8n-hourly-s3-full-data-exports.md -> n8n-s3-ticket-exports.md
and rewrite it for the incremental stream; fix the reference in
docs/phase-1-ingestion.md
Verified live against prod: re-seeded baseline + 5 deltas (26,529 rows),
files archived to processed/, watermark advanced, re-run is a no-op.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-23 11:37:17 +00:00
|
|
|
"""
|
2026-06-15 16:33:16 +00:00
|
|
|
with get_conn() as conn:
|
|
|
|
|
with conn.cursor() as cur:
|
|
|
|
|
cur.execute(
|
fix(inc): ingest the incremental changes/ stream (baseline + deltas)
The S3 source switched from full hourly snapshots at
automations/inc/<ts>.csv to an incremental CDC stream at
automations/inc/changes/<ts>.csv (first file = full baseline, each later
file = only the rows that changed, keyed by ticket_id; no deletions).
The loader still pointed at the old root path and only ingested the single
newest file, so after the switch it found nothing (no new tickets ingested)
and, even with the path fixed, would silently drop intermediate deltas.
Changes:
- point ingestion at automations/inc/changes/ (_CHANGE_KEY_RE)
- ingest EVERY not-yet-processed file in ascending timestamp order
(baseline first, then each delta), upserting each
- replace the single-ETag skip with a per-file timestamp watermark
(import_meta.metadata->>'source_max_key'); rows + watermark commit in one
txn per file, then archive to processed/ — so a mid-run failure leaves a
consistent, resumable state
- docs: rename n8n-hourly-s3-full-data-exports.md -> n8n-s3-ticket-exports.md
and rewrite it for the incremental stream; fix the reference in
docs/phase-1-ingestion.md
Verified live against prod: re-seeded baseline + 5 deltas (26,529 rows),
files archived to processed/, watermark advanced, re-run is a no-op.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-23 11:37:17 +00:00
|
|
|
"SELECT metadata->>'source_max_key' FROM tickets.import_meta WHERE dataset = %s",
|
feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints
Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.
- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
packages inc/crq. Docs (README, implementation, deployment-and-operations,
n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.
tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 20:16:38 +00:00
|
|
|
(ds.name,),
|
2026-06-15 16:33:16 +00:00
|
|
|
)
|
|
|
|
|
row = cur.fetchone()
|
feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints
Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.
- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
packages inc/crq. Docs (README, implementation, deployment-and-operations,
n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.
tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 20:16:38 +00:00
|
|
|
return _ts_from_key(ds, row[0]) if row and row[0] else None
|
2026-06-15 16:33:16 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def _parse_csv(text: str) -> list[dict]:
|
|
|
|
|
return list(csv.DictReader(io.StringIO(text)))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _load_csv_local(path: str) -> list[dict]:
|
|
|
|
|
with open(path, encoding="utf-8", newline="") as f:
|
|
|
|
|
return list(csv.DictReader(f))
|
|
|
|
|
|
|
|
|
|
|
feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints
Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.
- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
packages inc/crq. Docs (README, implementation, deployment-and-operations,
n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.
tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 20:16:38 +00:00
|
|
|
def _move_processed(s3, ds: Dataset, keys: list[str]) -> None:
|
|
|
|
|
"""Archive listed csv objects to automations/<ds>/processed/ (copy + delete)."""
|
2026-06-15 16:33:16 +00:00
|
|
|
for key in keys:
|
feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints
Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.
- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
packages inc/crq. Docs (README, implementation, deployment-and-operations,
n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.
tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 20:16:38 +00:00
|
|
|
dst = ds.processed_prefix + key.rsplit("/", 1)[-1]
|
2026-06-15 17:08:05 +00:00
|
|
|
s3.copy_object(Bucket=_BUCKET, CopySource={"Bucket": _BUCKET, "Key": key}, Key=dst)
|
|
|
|
|
s3.delete_object(Bucket=_BUCKET, Key=key)
|
2026-06-15 16:33:16 +00:00
|
|
|
log.info("archived %s -> %s", key, dst)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ── row preparation (filter · drop columns · normalize) ─────────────────────────
|
|
|
|
|
def _keep_row(row: dict) -> bool:
|
|
|
|
|
"""Drop alarm rows + the truncation-sentinel; require a real ticket_id."""
|
|
|
|
|
tid = clean(row.get("ticket_id"))
|
|
|
|
|
if not tid or tid.startswith(_SENTINEL_PREFIX):
|
|
|
|
|
return False
|
|
|
|
|
return clean(row.get("is_alarm")) != "true"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _prepare(row: dict) -> dict:
|
|
|
|
|
"""Strip DROP_FIELDS and normalize region/raw_status — returns the `raw` payload."""
|
|
|
|
|
r = {k: v for k, v in row.items() if k not in DROP_FIELDS}
|
|
|
|
|
if r.get("region"):
|
|
|
|
|
r["region"] = r["region"].lower()
|
|
|
|
|
if r.get("raw_status"):
|
|
|
|
|
r["raw_status"] = r["raw_status"].upper()
|
|
|
|
|
return r
|
2026-06-11 17:13:50 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
# ── upsert (raw-first) ────────────────────────────────────────────────────────
|
feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints
Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.
- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
packages inc/crq. Docs (README, implementation, deployment-and-operations,
n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.
tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 20:16:38 +00:00
|
|
|
def _record_meta(cur, ds: Dataset, meta: dict, records_ingested: int) -> None:
|
|
|
|
|
"""Upsert the snapshot metadata (powers map freshness + holds source_max_key).
|
2026-06-18 10:41:38 +00:00
|
|
|
|
|
|
|
|
Runs on the caller's cursor so the row upsert and the meta write commit
|
|
|
|
|
together — a half-written state (rows in, meta stale) breaks skip-if-unchanged.
|
|
|
|
|
"""
|
|
|
|
|
cur.execute(
|
|
|
|
|
"""INSERT INTO tickets.import_meta
|
|
|
|
|
(dataset, export_type, exported_at, snapshot_date, source_schema,
|
|
|
|
|
source_table, row_count, records_ingested, n8n_execution_id, metadata,
|
|
|
|
|
ingested_at)
|
|
|
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, now())
|
|
|
|
|
ON CONFLICT (dataset) DO UPDATE
|
|
|
|
|
SET export_type = EXCLUDED.export_type, exported_at = EXCLUDED.exported_at,
|
|
|
|
|
snapshot_date = EXCLUDED.snapshot_date, source_schema = EXCLUDED.source_schema,
|
|
|
|
|
source_table = EXCLUDED.source_table, row_count = EXCLUDED.row_count,
|
|
|
|
|
records_ingested = EXCLUDED.records_ingested,
|
|
|
|
|
n8n_execution_id = EXCLUDED.n8n_execution_id, metadata = EXCLUDED.metadata,
|
|
|
|
|
ingested_at = now()""",
|
feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints
Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.
- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
packages inc/crq. Docs (README, implementation, deployment-and-operations,
n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.
tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 20:16:38 +00:00
|
|
|
(ds.name, clean(meta.get("export_type")), clean(meta.get("exported_at")),
|
2026-06-18 10:41:38 +00:00
|
|
|
clean(meta.get("snapshot_date")), clean(meta.get("source_schema")),
|
|
|
|
|
clean(meta.get("source_table")), meta.get("row_count"), records_ingested,
|
|
|
|
|
clean(meta.get("n8n_execution_id")), psycopg2.extras.Json(meta)),
|
|
|
|
|
)
|
2026-06-11 17:13:50 +00:00
|
|
|
|
|
|
|
|
|
feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints
Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.
- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
packages inc/crq. Docs (README, implementation, deployment-and-operations,
n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.
tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 20:16:38 +00:00
|
|
|
def upsert(ds: Dataset, rows: list[dict], apply: bool, meta: dict | None = None) -> int:
|
2026-06-15 16:33:16 +00:00
|
|
|
meta = meta or {}
|
|
|
|
|
kept = [r for r in rows if _keep_row(r)]
|
|
|
|
|
payload = [(clean(r["ticket_id"]), psycopg2.extras.Json(_prepare(r))) for r in kept]
|
|
|
|
|
log.info("%s: %d rows read, %d kept, %d dropped (alarm/sentinel/no-id)",
|
feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints
Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.
- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
packages inc/crq. Docs (README, implementation, deployment-and-operations,
n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.
tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 20:16:38 +00:00
|
|
|
ds.table, len(rows), len(payload), len(rows) - len(payload))
|
2026-06-11 17:13:50 +00:00
|
|
|
if not apply:
|
feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints
Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.
- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
packages inc/crq. Docs (README, implementation, deployment-and-operations,
n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.
tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 20:16:38 +00:00
|
|
|
log.info("DRY-RUN — nothing written to %s. Use --apply.", ds.table)
|
2026-06-11 17:13:50 +00:00
|
|
|
return len(payload)
|
|
|
|
|
with get_conn() as conn:
|
|
|
|
|
with conn.cursor() as cur:
|
|
|
|
|
psycopg2.extras.execute_values(
|
|
|
|
|
cur,
|
feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints
Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.
- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
packages inc/crq. Docs (README, implementation, deployment-and-operations,
n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.
tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 20:16:38 +00:00
|
|
|
f"INSERT INTO {ds.table} (ticket_id, raw) VALUES %s "
|
2026-06-11 17:13:50 +00:00
|
|
|
"ON CONFLICT (ticket_id) DO UPDATE SET raw = EXCLUDED.raw, ingested_at = now()",
|
|
|
|
|
payload, page_size=500,
|
|
|
|
|
)
|
2026-06-18 10:41:38 +00:00
|
|
|
# same transaction as the upsert: rows + snapshot meta commit atomically
|
feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints
Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.
- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
packages inc/crq. Docs (README, implementation, deployment-and-operations,
n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.
tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 20:16:38 +00:00
|
|
|
_record_meta(cur, ds, meta, len(payload))
|
|
|
|
|
log.info("upserted %d rows into %s", len(payload), ds.table)
|
2026-06-11 17:13:50 +00:00
|
|
|
return len(payload)
|
|
|
|
|
|
|
|
|
|
|
feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints
Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.
- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
packages inc/crq. Docs (README, implementation, deployment-and-operations,
n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.
tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 20:16:38 +00:00
|
|
|
def capture_history() -> None:
|
|
|
|
|
"""Append new closures + upsert today's backlog snapshot (tickets.capture_history).
|
|
|
|
|
|
|
|
|
|
INC-only today (CRQ install-lifecycle history is a future migration); wired as
|
|
|
|
|
the INC Dataset's post_apply hook.
|
|
|
|
|
"""
|
2026-06-15 22:19:23 +00:00
|
|
|
with get_conn() as conn:
|
|
|
|
|
with conn.cursor() as cur:
|
|
|
|
|
cur.execute("SELECT tickets.capture_history()")
|
|
|
|
|
log.info("history: %s", cur.fetchone()[0])
|
|
|
|
|
|
|
|
|
|
|
feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints
Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.
- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
packages inc/crq. Docs (README, implementation, deployment-and-operations,
n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.
tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 20:16:38 +00:00
|
|
|
def ingest(ds: Dataset, args) -> None:
|
|
|
|
|
# Local-file path (dev): ingest a single CSV, no bucket / no archive / no history.
|
|
|
|
|
if args.local_csv:
|
|
|
|
|
rows = _load_csv_local(args.local_csv)
|
|
|
|
|
name = os.path.basename(args.local_csv)
|
|
|
|
|
ts = _ts_from_key(ds, ds.change_prefix + name)
|
2026-06-15 16:33:16 +00:00
|
|
|
meta = {"export_type": "full", "source_s3_key": name, "row_count": len(rows)}
|
|
|
|
|
if ts:
|
|
|
|
|
meta["exported_at"] = ts.isoformat()
|
feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints
Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.
- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
packages inc/crq. Docs (README, implementation, deployment-and-operations,
n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.
tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 20:16:38 +00:00
|
|
|
upsert(ds, rows, args.apply, meta=meta)
|
2026-06-15 16:33:16 +00:00
|
|
|
return
|
|
|
|
|
|
fix(inc): ingest the incremental changes/ stream (baseline + deltas)
The S3 source switched from full hourly snapshots at
automations/inc/<ts>.csv to an incremental CDC stream at
automations/inc/changes/<ts>.csv (first file = full baseline, each later
file = only the rows that changed, keyed by ticket_id; no deletions).
The loader still pointed at the old root path and only ingested the single
newest file, so after the switch it found nothing (no new tickets ingested)
and, even with the path fixed, would silently drop intermediate deltas.
Changes:
- point ingestion at automations/inc/changes/ (_CHANGE_KEY_RE)
- ingest EVERY not-yet-processed file in ascending timestamp order
(baseline first, then each delta), upserting each
- replace the single-ETag skip with a per-file timestamp watermark
(import_meta.metadata->>'source_max_key'); rows + watermark commit in one
txn per file, then archive to processed/ — so a mid-run failure leaves a
consistent, resumable state
- docs: rename n8n-hourly-s3-full-data-exports.md -> n8n-s3-ticket-exports.md
and rewrite it for the incremental stream; fix the reference in
docs/phase-1-ingestion.md
Verified live against prod: re-seeded baseline + 5 deltas (26,529 rows),
files archived to processed/, watermark advanced, re-run is a no-op.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-23 11:37:17 +00:00
|
|
|
# --from-bucket: ingest EVERY not-yet-processed change file, oldest→newest
|
|
|
|
|
# (baseline first, then each delta), upserting each. The watermark advances and
|
|
|
|
|
# the file is archived PER file, so a mid-run failure leaves a consistent state
|
|
|
|
|
# (folder state matches the watermark) and the next run resumes cleanly.
|
2026-06-15 17:08:05 +00:00
|
|
|
s3 = _s3_client()
|
feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints
Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.
- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
packages inc/crq. Docs (README, implementation, deployment-and-operations,
n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.
tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 20:16:38 +00:00
|
|
|
listing = _list_csvs(s3, ds)
|
2026-06-15 16:33:16 +00:00
|
|
|
if not listing:
|
feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints
Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.
- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
packages inc/crq. Docs (README, implementation, deployment-and-operations,
n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.
tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 20:16:38 +00:00
|
|
|
log.info("no %s change files under %s — nothing to do", ds.name, ds.change_prefix)
|
2026-06-15 16:33:16 +00:00
|
|
|
return
|
feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints
Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.
- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
packages inc/crq. Docs (README, implementation, deployment-and-operations,
n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.
tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 20:16:38 +00:00
|
|
|
listing.sort(key=lambda ke: _ts_from_key(ds, ke[0]) or datetime.min.replace(tzinfo=_EAT))
|
fix(inc): ingest the incremental changes/ stream (baseline + deltas)
The S3 source switched from full hourly snapshots at
automations/inc/<ts>.csv to an incremental CDC stream at
automations/inc/changes/<ts>.csv (first file = full baseline, each later
file = only the rows that changed, keyed by ticket_id; no deletions).
The loader still pointed at the old root path and only ingested the single
newest file, so after the switch it found nothing (no new tickets ingested)
and, even with the path fixed, would silently drop intermediate deltas.
Changes:
- point ingestion at automations/inc/changes/ (_CHANGE_KEY_RE)
- ingest EVERY not-yet-processed file in ascending timestamp order
(baseline first, then each delta), upserting each
- replace the single-ETag skip with a per-file timestamp watermark
(import_meta.metadata->>'source_max_key'); rows + watermark commit in one
txn per file, then archive to processed/ — so a mid-run failure leaves a
consistent, resumable state
- docs: rename n8n-hourly-s3-full-data-exports.md -> n8n-s3-ticket-exports.md
and rewrite it for the incremental stream; fix the reference in
docs/phase-1-ingestion.md
Verified live against prod: re-seeded baseline + 5 deltas (26,529 rows),
files archived to processed/, watermark advanced, re-run is a no-op.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-23 11:37:17 +00:00
|
|
|
|
|
|
|
|
# watermark: skip anything at/older than the newest file already applied. Archiving
|
|
|
|
|
# normally empties changes/, but this guards a failed archive from re-applying.
|
2026-06-25 15:20:04 +00:00
|
|
|
# --reseed ignores the stored watermark and drains EVERY file in changes/ once — used
|
|
|
|
|
# for a one-time bucket cutover, where the stored key points at the old bucket's stream
|
|
|
|
|
# and its timestamp may be newer than the new bucket's first file. Crash-safe: each file
|
|
|
|
|
# still advances source_max_key + archives per file, so a plain rerun resumes cleanly.
|
feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints
Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.
- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
packages inc/crq. Docs (README, implementation, deployment-and-operations,
n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.
tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 20:16:38 +00:00
|
|
|
last_ts = None if args.reseed else _last_processed_ts(ds)
|
fix(inc): ingest the incremental changes/ stream (baseline + deltas)
The S3 source switched from full hourly snapshots at
automations/inc/<ts>.csv to an incremental CDC stream at
automations/inc/changes/<ts>.csv (first file = full baseline, each later
file = only the rows that changed, keyed by ticket_id; no deletions).
The loader still pointed at the old root path and only ingested the single
newest file, so after the switch it found nothing (no new tickets ingested)
and, even with the path fixed, would silently drop intermediate deltas.
Changes:
- point ingestion at automations/inc/changes/ (_CHANGE_KEY_RE)
- ingest EVERY not-yet-processed file in ascending timestamp order
(baseline first, then each delta), upserting each
- replace the single-ETag skip with a per-file timestamp watermark
(import_meta.metadata->>'source_max_key'); rows + watermark commit in one
txn per file, then archive to processed/ — so a mid-run failure leaves a
consistent, resumable state
- docs: rename n8n-hourly-s3-full-data-exports.md -> n8n-s3-ticket-exports.md
and rewrite it for the incremental stream; fix the reference in
docs/phase-1-ingestion.md
Verified live against prod: re-seeded baseline + 5 deltas (26,529 rows),
files archived to processed/, watermark advanced, re-run is a no-op.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-23 11:37:17 +00:00
|
|
|
_floor = datetime.min.replace(tzinfo=_EAT)
|
|
|
|
|
pending = [(k, e) for k, e in listing
|
feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints
Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.
- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
packages inc/crq. Docs (README, implementation, deployment-and-operations,
n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.
tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 20:16:38 +00:00
|
|
|
if last_ts is None or (_ts_from_key(ds, k) or _floor) > last_ts]
|
fix(inc): ingest the incremental changes/ stream (baseline + deltas)
The S3 source switched from full hourly snapshots at
automations/inc/<ts>.csv to an incremental CDC stream at
automations/inc/changes/<ts>.csv (first file = full baseline, each later
file = only the rows that changed, keyed by ticket_id; no deletions).
The loader still pointed at the old root path and only ingested the single
newest file, so after the switch it found nothing (no new tickets ingested)
and, even with the path fixed, would silently drop intermediate deltas.
Changes:
- point ingestion at automations/inc/changes/ (_CHANGE_KEY_RE)
- ingest EVERY not-yet-processed file in ascending timestamp order
(baseline first, then each delta), upserting each
- replace the single-ETag skip with a per-file timestamp watermark
(import_meta.metadata->>'source_max_key'); rows + watermark commit in one
txn per file, then archive to processed/ — so a mid-run failure leaves a
consistent, resumable state
- docs: rename n8n-hourly-s3-full-data-exports.md -> n8n-s3-ticket-exports.md
and rewrite it for the incremental stream; fix the reference in
docs/phase-1-ingestion.md
Verified live against prod: re-seeded baseline + 5 deltas (26,529 rows),
files archived to processed/, watermark advanced, re-run is a no-op.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-23 11:37:17 +00:00
|
|
|
if not pending:
|
feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints
Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.
- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
packages inc/crq. Docs (README, implementation, deployment-and-operations,
n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.
tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 20:16:38 +00:00
|
|
|
log.info("all %d %s change file(s) already processed (watermark %s) — nothing new",
|
|
|
|
|
len(listing), ds.name, last_ts and last_ts.isoformat())
|
2026-06-15 16:33:16 +00:00
|
|
|
if args.apply:
|
feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints
Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.
- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
packages inc/crq. Docs (README, implementation, deployment-and-operations,
n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.
tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 20:16:38 +00:00
|
|
|
_move_processed(s3, ds, [k for k, _ in listing]) # archive any stragglers
|
|
|
|
|
if ds.post_apply:
|
|
|
|
|
ds.post_apply()
|
2026-06-15 16:33:16 +00:00
|
|
|
return
|
feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints
Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.
- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
packages inc/crq. Docs (README, implementation, deployment-and-operations,
n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.
tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 20:16:38 +00:00
|
|
|
log.info("%d of %d %s change file(s) to ingest (watermark %s); newest=%s",
|
|
|
|
|
len(pending), len(listing), ds.name, last_ts and last_ts.isoformat(), pending[-1][0])
|
fix(inc): ingest the incremental changes/ stream (baseline + deltas)
The S3 source switched from full hourly snapshots at
automations/inc/<ts>.csv to an incremental CDC stream at
automations/inc/changes/<ts>.csv (first file = full baseline, each later
file = only the rows that changed, keyed by ticket_id; no deletions).
The loader still pointed at the old root path and only ingested the single
newest file, so after the switch it found nothing (no new tickets ingested)
and, even with the path fixed, would silently drop intermediate deltas.
Changes:
- point ingestion at automations/inc/changes/ (_CHANGE_KEY_RE)
- ingest EVERY not-yet-processed file in ascending timestamp order
(baseline first, then each delta), upserting each
- replace the single-ETag skip with a per-file timestamp watermark
(import_meta.metadata->>'source_max_key'); rows + watermark commit in one
txn per file, then archive to processed/ — so a mid-run failure leaves a
consistent, resumable state
- docs: rename n8n-hourly-s3-full-data-exports.md -> n8n-s3-ticket-exports.md
and rewrite it for the incremental stream; fix the reference in
docs/phase-1-ingestion.md
Verified live against prod: re-seeded baseline + 5 deltas (26,529 rows),
files archived to processed/, watermark advanced, re-run is a no-op.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-23 11:37:17 +00:00
|
|
|
|
|
|
|
|
total = 0
|
|
|
|
|
for i, (key, etag) in enumerate(pending):
|
|
|
|
|
rows = _parse_csv(_get_text(s3, key))
|
feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints
Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.
- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
packages inc/crq. Docs (README, implementation, deployment-and-operations,
n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.
tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 20:16:38 +00:00
|
|
|
ts = _ts_from_key(ds, key)
|
fix(inc): ingest the incremental changes/ stream (baseline + deltas)
The S3 source switched from full hourly snapshots at
automations/inc/<ts>.csv to an incremental CDC stream at
automations/inc/changes/<ts>.csv (first file = full baseline, each later
file = only the rows that changed, keyed by ticket_id; no deletions).
The loader still pointed at the old root path and only ingested the single
newest file, so after the switch it found nothing (no new tickets ingested)
and, even with the path fixed, would silently drop intermediate deltas.
Changes:
- point ingestion at automations/inc/changes/ (_CHANGE_KEY_RE)
- ingest EVERY not-yet-processed file in ascending timestamp order
(baseline first, then each delta), upserting each
- replace the single-ETag skip with a per-file timestamp watermark
(import_meta.metadata->>'source_max_key'); rows + watermark commit in one
txn per file, then archive to processed/ — so a mid-run failure leaves a
consistent, resumable state
- docs: rename n8n-hourly-s3-full-data-exports.md -> n8n-s3-ticket-exports.md
and rewrite it for the incremental stream; fix the reference in
docs/phase-1-ingestion.md
Verified live against prod: re-seeded baseline + 5 deltas (26,529 rows),
files archived to processed/, watermark advanced, re-run is a no-op.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-23 11:37:17 +00:00
|
|
|
# the first file applied onto an empty watermark is the full baseline; every
|
|
|
|
|
# file after is a delta. export_type is informational (recorded in import_meta).
|
|
|
|
|
meta = {"export_type": "baseline" if (last_ts is None and i == 0) else "delta",
|
|
|
|
|
"source_s3_key": key, "source_etag": etag,
|
|
|
|
|
"source_max_key": key, "row_count": len(rows)}
|
|
|
|
|
if ts:
|
|
|
|
|
meta["exported_at"] = ts.isoformat()
|
|
|
|
|
# rows + watermark (source_max_key) commit in one txn, advancing per file; only
|
|
|
|
|
# then archive, so the changes/ folder state always matches the watermark.
|
feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints
Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.
- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
packages inc/crq. Docs (README, implementation, deployment-and-operations,
n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.
tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 20:16:38 +00:00
|
|
|
total += upsert(ds, rows, args.apply, meta=meta)
|
fix(inc): ingest the incremental changes/ stream (baseline + deltas)
The S3 source switched from full hourly snapshots at
automations/inc/<ts>.csv to an incremental CDC stream at
automations/inc/changes/<ts>.csv (first file = full baseline, each later
file = only the rows that changed, keyed by ticket_id; no deletions).
The loader still pointed at the old root path and only ingested the single
newest file, so after the switch it found nothing (no new tickets ingested)
and, even with the path fixed, would silently drop intermediate deltas.
Changes:
- point ingestion at automations/inc/changes/ (_CHANGE_KEY_RE)
- ingest EVERY not-yet-processed file in ascending timestamp order
(baseline first, then each delta), upserting each
- replace the single-ETag skip with a per-file timestamp watermark
(import_meta.metadata->>'source_max_key'); rows + watermark commit in one
txn per file, then archive to processed/ — so a mid-run failure leaves a
consistent, resumable state
- docs: rename n8n-hourly-s3-full-data-exports.md -> n8n-s3-ticket-exports.md
and rewrite it for the incremental stream; fix the reference in
docs/phase-1-ingestion.md
Verified live against prod: re-seeded baseline + 5 deltas (26,529 rows),
files archived to processed/, watermark advanced, re-run is a no-op.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-23 11:37:17 +00:00
|
|
|
if args.apply:
|
feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints
Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.
- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
packages inc/crq. Docs (README, implementation, deployment-and-operations,
n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.
tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 20:16:38 +00:00
|
|
|
_move_processed(s3, ds, [key])
|
fix(inc): ingest the incremental changes/ stream (baseline + deltas)
The S3 source switched from full hourly snapshots at
automations/inc/<ts>.csv to an incremental CDC stream at
automations/inc/changes/<ts>.csv (first file = full baseline, each later
file = only the rows that changed, keyed by ticket_id; no deletions).
The loader still pointed at the old root path and only ingested the single
newest file, so after the switch it found nothing (no new tickets ingested)
and, even with the path fixed, would silently drop intermediate deltas.
Changes:
- point ingestion at automations/inc/changes/ (_CHANGE_KEY_RE)
- ingest EVERY not-yet-processed file in ascending timestamp order
(baseline first, then each delta), upserting each
- replace the single-ETag skip with a per-file timestamp watermark
(import_meta.metadata->>'source_max_key'); rows + watermark commit in one
txn per file, then archive to processed/ — so a mid-run failure leaves a
consistent, resumable state
- docs: rename n8n-hourly-s3-full-data-exports.md -> n8n-s3-ticket-exports.md
and rewrite it for the incremental stream; fix the reference in
docs/phase-1-ingestion.md
Verified live against prod: re-seeded baseline + 5 deltas (26,529 rows),
files archived to processed/, watermark advanced, re-run is a no-op.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-23 11:37:17 +00:00
|
|
|
else:
|
feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints
Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.
- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
packages inc/crq. Docs (README, implementation, deployment-and-operations,
n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.
tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 20:16:38 +00:00
|
|
|
log.info("DRY-RUN — would archive %s to %s", key, ds.processed_prefix)
|
|
|
|
|
log.info("ingested %d %s change file(s); %d rows kept in total", len(pending), ds.name, total)
|
2026-07-02 06:47:15 +00:00
|
|
|
if args.apply:
|
|
|
|
|
# FT-BUG-01 (260702): re-resolve geoms after every applied ingest so rows
|
|
|
|
|
# whose location was geocoded AFTER their last upsert regain their precise
|
|
|
|
|
# geom (and, on a DB without migration 17, repair trigger downgrades).
|
|
|
|
|
# Cheap: resolve only touches rows whose geom/geo_source would change.
|
|
|
|
|
n = _resolve()
|
|
|
|
|
if n:
|
|
|
|
|
log.info("post-ingest resolve: %d %s geoms updated", n, ds.name)
|
|
|
|
|
if ds.post_apply:
|
|
|
|
|
ds.post_apply()
|
2026-06-11 17:13:50 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
# ── place extraction (strip network codes, keep the real place) ───────────────
|
|
|
|
|
# Leading site-code prefixes (NW_, CO_, ADR_MNT_, COAST_, …) — applied repeatedly.
|
|
|
|
|
_PREFIX_RE = re.compile(r"^(?:NW|NE|NM|SW|SE|CO|COAST|ADR|MNT|CMT|DR|NAIROBI|FIBER\w*)[\s_]+")
|
2026-06-18 10:41:38 +00:00
|
|
|
# 'NW' is the one site-code that the source also glues straight onto the place with
|
|
|
|
|
# no separator (NWKIAMBU, NWRIDGE, NWTHE — ~1.7k rows in a single snapshot). Safe to
|
|
|
|
|
# split because no place/word starts with "NW"; the other codes (CO/NE/SE/DR…) begin
|
|
|
|
|
# real words (COAST, NEW, SEASONS, DRIVE) so we only strip THOSE when delimited above.
|
|
|
|
|
_GLUED_NW_RE = re.compile(r"^NW(?=[A-Z])")
|
2026-06-11 17:13:50 +00:00
|
|
|
# Inline network/work-order codes to drop wherever they appear.
|
|
|
|
|
_CODE_RE = re.compile(
|
|
|
|
|
r"\b(?:SDUS|SDU|MDUS|MDU|FDT\s*\d*|AP|CLUSTER\s*\d*[A-Z]?|PHASE\s*\d+|CL\s*\d+|MNT|SITE|ADR)\b"
|
|
|
|
|
)
|
2026-06-18 10:41:38 +00:00
|
|
|
# Trailing '-<segment>' after the final hyphen: a unit/instruction code, not a place.
|
|
|
|
|
# Dropped only when it LOOKS like one — a unit number (37, F32, 3C, 302), a short
|
|
|
|
|
# code (<=3 chars: E, NB, KKK), or an instruction phrase (CALL ON ARRIVAL, TBC, NA).
|
|
|
|
|
# A real word tail (…-MALL) is kept.
|
|
|
|
|
_UNIT_TAIL_RE = re.compile(r"^[A-Z]{0,2}\d+[A-Z]{0,3}$")
|
|
|
|
|
_TAIL_INSTRUCTION_TOKENS = frozenset({
|
|
|
|
|
"CALL", "TO", "NA", "NB", "TBC", "NULL", "NONE", "NIL", "OOO",
|
|
|
|
|
"OBT", "PENDING", "CONFIRM", "CHECK", "CLIENT", "ON",
|
|
|
|
|
})
|
2026-06-11 17:13:50 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def extract_place(location_name: str | None) -> str:
|
|
|
|
|
"""Pull the human place/landmark out of a coded location_name string.
|
|
|
|
|
|
|
|
|
|
e.g. 'NW_RUIRU KAMAKIS_DEEP EAST APARTMENT-37' -> 'RUIRU KAMAKIS DEEP EAST APARTMENT'
|
2026-06-18 10:41:38 +00:00
|
|
|
'NWKIAMBU_KIRIGITI_MWANJA APARTMENTS-TBC' -> 'KIAMBU KIRIGITI MWANJA APARTMENTS'
|
2026-06-11 17:13:50 +00:00
|
|
|
"""
|
|
|
|
|
s = (location_name or "").upper().strip()
|
|
|
|
|
if not s:
|
|
|
|
|
return ""
|
2026-06-18 10:41:38 +00:00
|
|
|
# drop the trailing '-<segment>' only when it's a unit/instruction code, not a
|
|
|
|
|
# real word (so '…-37'/'…-CALL ON ARRIVAL' drop but '…-MALL' is kept)
|
2026-06-11 17:13:50 +00:00
|
|
|
if "-" in s:
|
2026-06-18 10:41:38 +00:00
|
|
|
head, _, tail = s.rpartition("-")
|
|
|
|
|
head, tail = head.strip(), tail.strip()
|
|
|
|
|
first = tail.split()[0] if tail else ""
|
|
|
|
|
if head and (not tail or len(tail) <= 3 or _UNIT_TAIL_RE.match(tail)
|
|
|
|
|
or first in _TAIL_INSTRUCTION_TOKENS):
|
|
|
|
|
s = head
|
2026-06-11 17:13:50 +00:00
|
|
|
s = s.replace("_", " ")
|
2026-06-18 10:41:38 +00:00
|
|
|
# strip leading site-code prefixes (may be stacked: ADR MNT KAHAWA…; or glued: NWKIAMBU)
|
2026-06-11 17:13:50 +00:00
|
|
|
prev = None
|
|
|
|
|
while prev != s:
|
|
|
|
|
prev = s
|
|
|
|
|
s = _PREFIX_RE.sub("", s).strip()
|
2026-06-18 10:41:38 +00:00
|
|
|
s = _GLUED_NW_RE.sub("", s).strip()
|
2026-06-11 17:13:50 +00:00
|
|
|
s = _CODE_RE.sub(" ", s)
|
|
|
|
|
s = re.sub(r"\s+", " ", s).strip(" ,-")
|
|
|
|
|
return s
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def compose_query(location_name: str | None, cluster: str | None, region: str | None) -> str:
|
|
|
|
|
parts = [p for p in (extract_place(location_name), clean(cluster), clean(region), "Kenya") if p]
|
|
|
|
|
return ", ".join(dict.fromkeys(parts)) # de-dupe while preserving order
|
|
|
|
|
|
|
|
|
|
|
2026-06-18 15:51:58 +00:00
|
|
|
def compose_queries(location_name: str | None, cluster: str | None,
|
|
|
|
|
region: str | None) -> list[str]:
|
|
|
|
|
"""Ordered geocode candidates, most → least specific (two-pass estate fallback).
|
|
|
|
|
|
|
|
|
|
Building-level location_names (e.g. 'KAHAWA WENDANI ALVO HOUSE') aren't in OSM, so
|
|
|
|
|
the precise query 404s. We then fall back to the estate (leading tokens of the
|
|
|
|
|
place) — each still constrained to the cluster viewbox + distance check by the
|
|
|
|
|
caller, so a coarse hit lands in the right neighbourhood (tighter than the bare
|
|
|
|
|
cluster centroid). We deliberately do NOT add a pure-cluster candidate: that would
|
|
|
|
|
just reproduce the cluster centroid while mislabelling it geo_source='location';
|
|
|
|
|
a truly unmatchable ticket should keep its honest cluster-centroid fallback.
|
|
|
|
|
e.g. 'KAHAWA WENDANI ALVO HOUSE' -> ['KAHAWA WENDANI ALVO HOUSE, WENDANI, nairobi,
|
|
|
|
|
Kenya', 'KAHAWA WENDANI, nairobi, Kenya', 'KAHAWA, nairobi, Kenya']
|
|
|
|
|
"""
|
|
|
|
|
region_part, cluster_part = clean(region), clean(cluster)
|
|
|
|
|
place = extract_place(location_name)
|
|
|
|
|
toks = place.split()
|
|
|
|
|
out: list[str] = []
|
|
|
|
|
|
|
|
|
|
def add(*parts: str | None) -> None:
|
|
|
|
|
q = ", ".join(dict.fromkeys([p for p in parts if p] + ["Kenya"]))
|
|
|
|
|
if q and q != "Kenya" and q not in out:
|
|
|
|
|
out.append(q)
|
|
|
|
|
|
|
|
|
|
add(place, cluster_part, region_part) # 1. full precise
|
|
|
|
|
if len(toks) > 2:
|
|
|
|
|
add(" ".join(toks[:2]), region_part) # 2. estate (leading 2 tokens)
|
|
|
|
|
if len(toks) > 1:
|
|
|
|
|
add(toks[0], region_part) # 3. leading token (broad estate)
|
|
|
|
|
return out
|
|
|
|
|
|
|
|
|
|
|
2026-06-11 17:13:50 +00:00
|
|
|
# ── keyed geocoder ────────────────────────────────────────────────────────────
|
|
|
|
|
def _throttle() -> None:
|
|
|
|
|
global _last_geocode_at
|
|
|
|
|
wait = _GEOCODE_INTERVAL_S - (time.monotonic() - _last_geocode_at)
|
|
|
|
|
if wait > 0:
|
|
|
|
|
time.sleep(wait)
|
|
|
|
|
_last_geocode_at = time.monotonic()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _haversine_km(lat1: float, lng1: float, lat2: float, lng2: float) -> float:
|
|
|
|
|
dlat, dlng = math.radians(lat2 - lat1), math.radians(lng2 - lng1)
|
|
|
|
|
a = (math.sin(dlat / 2) ** 2
|
|
|
|
|
+ math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(dlng / 2) ** 2)
|
|
|
|
|
return 2 * 6371.0 * math.asin(math.sqrt(a))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def geocode(query: str, viewbox: tuple | None = None) -> tuple[float, float, float | None] | None:
|
|
|
|
|
"""Forward-geocode via the configured keyed provider. (lat, lng, confidence) | None.
|
|
|
|
|
|
|
|
|
|
`viewbox` = (min_lon, min_lat, max_lon, max_lat) constrains results to a box
|
|
|
|
|
around the cluster centroid (bounded), which stops the geocoder matching a
|
|
|
|
|
landmark name in the wrong city.
|
|
|
|
|
"""
|
|
|
|
|
if not _API_KEY:
|
|
|
|
|
log.error("GEOCODER_API_KEY is not set — cannot geocode (provider=%s)", _PROVIDER)
|
|
|
|
|
return None
|
|
|
|
|
_throttle()
|
|
|
|
|
try:
|
|
|
|
|
if _PROVIDER == "opencage":
|
|
|
|
|
params = {"key": _API_KEY, "q": query, "limit": 1, "countrycode": "ke", "no_annotations": 1}
|
|
|
|
|
if viewbox:
|
|
|
|
|
params["bounds"] = "%s,%s,%s,%s" % viewbox
|
|
|
|
|
r = requests.get("https://api.opencagedata.com/geocode/v1/json", params=params, timeout=15)
|
|
|
|
|
r.raise_for_status()
|
|
|
|
|
res = (r.json().get("results") or [])
|
|
|
|
|
if res:
|
|
|
|
|
g = res[0]["geometry"]
|
|
|
|
|
return float(g["lat"]), float(g["lng"]), res[0].get("confidence")
|
|
|
|
|
else: # locationiq (default)
|
|
|
|
|
params = {"key": _API_KEY, "q": query, "format": "json", "limit": 1, "countrycodes": "ke"}
|
|
|
|
|
if viewbox:
|
|
|
|
|
params["viewbox"] = "%s,%s,%s,%s" % viewbox
|
|
|
|
|
params["bounded"] = 1
|
|
|
|
|
r = requests.get("https://us1.locationiq.com/v1/search", params=params, timeout=15)
|
|
|
|
|
if r.status_code == 404: # LocationIQ returns 404 for "no matches"
|
|
|
|
|
return None
|
|
|
|
|
r.raise_for_status()
|
|
|
|
|
hits = r.json()
|
|
|
|
|
if hits:
|
|
|
|
|
h = hits[0]
|
|
|
|
|
return float(h["lat"]), float(h["lon"]), float(h.get("importance") or 0)
|
|
|
|
|
except (requests.RequestException, ValueError, KeyError) as e:
|
|
|
|
|
log.warning("geocode failed for %r: %s", query, e)
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints
Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.
- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
packages inc/crq. Docs (README, implementation, deployment-and-operations,
n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.
tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 20:16:38 +00:00
|
|
|
# ── cluster gazetteer (coarse fallback; CROSS-DATASET: inc + crq) ─────────────
|
2026-06-11 17:13:50 +00:00
|
|
|
def geocode_clusters(apply: bool) -> None:
|
|
|
|
|
with get_conn() as conn:
|
|
|
|
|
with conn.cursor() as cur:
|
|
|
|
|
cur.execute(
|
|
|
|
|
"""
|
|
|
|
|
SELECT key, region FROM (
|
|
|
|
|
SELECT tickets.norm_cluster(raw->>'cluster') AS key,
|
|
|
|
|
(array_agg(raw->>'region'))[1] AS region
|
|
|
|
|
FROM tickets.inc WHERE raw->>'cluster' IS NOT NULL GROUP BY 1
|
|
|
|
|
UNION
|
|
|
|
|
SELECT tickets.norm_cluster(raw->>'cluster'),
|
|
|
|
|
(array_agg(raw->>'region'))[1]
|
|
|
|
|
FROM tickets.crq WHERE raw->>'cluster' IS NOT NULL GROUP BY 1
|
|
|
|
|
) z
|
|
|
|
|
WHERE key IS NOT NULL
|
|
|
|
|
AND NOT EXISTS (SELECT 1 FROM tickets.geo_clusters g
|
|
|
|
|
WHERE g.cluster_key = z.key AND g.geom IS NOT NULL)
|
|
|
|
|
"""
|
|
|
|
|
)
|
|
|
|
|
todo = cur.fetchall()
|
|
|
|
|
log.info("%d clusters to geocode", len(todo))
|
|
|
|
|
if not apply:
|
|
|
|
|
for key, region in todo:
|
|
|
|
|
log.info(" would geocode cluster: %s (%s)", key, region)
|
|
|
|
|
return
|
|
|
|
|
written = 0
|
|
|
|
|
for key, region in todo:
|
|
|
|
|
hit = geocode(f"{key}, {region}, Kenya" if region else f"{key}, Kenya")
|
|
|
|
|
if not hit:
|
|
|
|
|
continue
|
|
|
|
|
lat, lng, _ = hit
|
|
|
|
|
with get_conn() as conn:
|
|
|
|
|
with conn.cursor() as cur:
|
|
|
|
|
cur.execute(
|
|
|
|
|
"""INSERT INTO tickets.geo_clusters (cluster_key, region, lat, lng, source, verified)
|
|
|
|
|
VALUES (%s, %s, %s, %s, %s, false)
|
|
|
|
|
ON CONFLICT (cluster_key) DO UPDATE
|
|
|
|
|
SET region = EXCLUDED.region, lat = EXCLUDED.lat,
|
|
|
|
|
lng = EXCLUDED.lng, source = EXCLUDED.source""",
|
|
|
|
|
(key, region, lat, lng, _PROVIDER),
|
|
|
|
|
)
|
|
|
|
|
written += 1
|
|
|
|
|
_resolve()
|
|
|
|
|
log.info("gazetteer: %d clusters written (unverified — review tickets.geo_clusters)", written)
|
|
|
|
|
|
|
|
|
|
|
feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints
Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.
- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
packages inc/crq. Docs (README, implementation, deployment-and-operations,
n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.
tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 20:16:38 +00:00
|
|
|
# ── per-location geocoding (precise; actionable inc + crq) ────────────────────
|
2026-06-11 17:13:50 +00:00
|
|
|
# A location geocode is only trusted if it lands within this radius of the
|
|
|
|
|
# cluster centroid; otherwise the geocoder matched the landmark in the wrong
|
|
|
|
|
# place and we fall back to the cluster centroid.
|
|
|
|
|
_MAX_KM_FROM_CLUSTER = float(os.getenv("GEOCODER_MAX_KM", "25"))
|
|
|
|
|
_VIEWBOX_DEG = 0.2 # ~22 km half-box around the cluster centroid
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def geocode_locations(apply: bool) -> None:
|
|
|
|
|
with get_conn() as conn:
|
|
|
|
|
with conn.cursor() as cur:
|
|
|
|
|
cur.execute(
|
|
|
|
|
"""
|
|
|
|
|
SELECT t.key, t.location_name, t.cluster, t.region, gc.lat AS clat, gc.lng AS clng
|
|
|
|
|
FROM (
|
feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints
Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.
- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
packages inc/crq. Docs (README, implementation, deployment-and-operations,
n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.
tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 20:16:38 +00:00
|
|
|
SELECT tickets.norm_cluster(src.raw->>'location_name') AS key,
|
|
|
|
|
(array_agg(src.raw->>'location_name'))[1] AS location_name,
|
|
|
|
|
(array_agg(src.raw->>'cluster'))[1] AS cluster,
|
|
|
|
|
(array_agg(src.raw->>'region'))[1] AS region,
|
|
|
|
|
tickets.norm_cluster((array_agg(src.raw->>'cluster'))[1]) AS ckey
|
|
|
|
|
FROM (
|
|
|
|
|
-- CROSS-DATASET: actionable INC + CRQ share one location gazetteer
|
|
|
|
|
SELECT raw FROM tickets.inc
|
|
|
|
|
WHERE (raw->>'is_actionable')::boolean
|
|
|
|
|
AND raw->>'location_name' IS NOT NULL
|
|
|
|
|
AND tickets.norm_cluster(raw->>'location_name') IS NOT NULL
|
|
|
|
|
UNION ALL
|
|
|
|
|
SELECT raw FROM tickets.crq
|
|
|
|
|
WHERE (raw->>'is_actionable')::boolean
|
|
|
|
|
AND raw->>'location_name' IS NOT NULL
|
|
|
|
|
AND tickets.norm_cluster(raw->>'location_name') IS NOT NULL
|
|
|
|
|
) src
|
|
|
|
|
WHERE NOT EXISTS (SELECT 1 FROM tickets.geo_locations gl
|
|
|
|
|
WHERE gl.query_key = tickets.norm_cluster(src.raw->>'location_name')
|
2026-06-11 17:13:50 +00:00
|
|
|
AND gl.geom IS NOT NULL)
|
|
|
|
|
GROUP BY 1
|
|
|
|
|
) t
|
|
|
|
|
LEFT JOIN tickets.geo_clusters gc ON gc.cluster_key = t.ckey
|
|
|
|
|
"""
|
|
|
|
|
)
|
|
|
|
|
todo = cur.fetchall()
|
feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints
Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.
- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
packages inc/crq. Docs (README, implementation, deployment-and-operations,
n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.
tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 20:16:38 +00:00
|
|
|
log.info("%d actionable inc+crq locations to geocode (provider=%s)", len(todo), _PROVIDER)
|
2026-06-11 17:13:50 +00:00
|
|
|
if not apply:
|
|
|
|
|
for key, loc, cluster, region, clat, clng in todo[:50]:
|
2026-06-18 15:51:58 +00:00
|
|
|
log.info(" %s -> %s", key, " | ".join(compose_queries(loc, cluster, region)))
|
2026-06-11 17:13:50 +00:00
|
|
|
return
|
2026-06-18 15:51:58 +00:00
|
|
|
written = missed = coarse = 0
|
2026-06-11 17:13:50 +00:00
|
|
|
for key, loc, cluster, region, clat, clng in todo:
|
|
|
|
|
viewbox = None
|
|
|
|
|
if clat is not None and clng is not None:
|
|
|
|
|
viewbox = (clng - _VIEWBOX_DEG, clat - _VIEWBOX_DEG, clng + _VIEWBOX_DEG, clat + _VIEWBOX_DEG)
|
2026-06-18 15:51:58 +00:00
|
|
|
# two-pass: precise → estate → cluster; accept the FIRST in-range hit. A wrong-area
|
|
|
|
|
# match (> MAX_KM from the cluster centroid) is skipped so we try a coarser query.
|
|
|
|
|
hit = used = None
|
|
|
|
|
for i, cand in enumerate(compose_queries(loc, cluster, region)):
|
|
|
|
|
g = geocode(cand, viewbox)
|
|
|
|
|
if not g:
|
|
|
|
|
continue
|
|
|
|
|
lat, lng, conf = g
|
|
|
|
|
if (clat is not None and clng is not None
|
|
|
|
|
and _haversine_km(lat, lng, clat, clng) > _MAX_KM_FROM_CLUSTER):
|
|
|
|
|
continue
|
|
|
|
|
hit, used = g, cand
|
|
|
|
|
if i > 0:
|
|
|
|
|
coarse += 1
|
|
|
|
|
break
|
2026-06-11 17:13:50 +00:00
|
|
|
if not hit:
|
2026-06-18 15:51:58 +00:00
|
|
|
missed += 1 # no match even coarsely — keeps cluster-centroid fallback
|
2026-06-11 17:13:50 +00:00
|
|
|
continue
|
|
|
|
|
lat, lng, conf = hit
|
|
|
|
|
with get_conn() as conn:
|
|
|
|
|
with conn.cursor() as cur:
|
|
|
|
|
cur.execute(
|
|
|
|
|
"""INSERT INTO tickets.geo_locations
|
|
|
|
|
(query_key, location_name, cluster, region, query, lat, lng, confidence, provider)
|
|
|
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
|
|
|
ON CONFLICT (query_key) DO UPDATE
|
|
|
|
|
SET location_name = EXCLUDED.location_name, cluster = EXCLUDED.cluster,
|
|
|
|
|
region = EXCLUDED.region, query = EXCLUDED.query, lat = EXCLUDED.lat,
|
|
|
|
|
lng = EXCLUDED.lng, confidence = EXCLUDED.confidence, provider = EXCLUDED.provider""",
|
2026-06-18 15:51:58 +00:00
|
|
|
(key, loc, cluster, region, used, lat, lng, conf, _PROVIDER),
|
2026-06-11 17:13:50 +00:00
|
|
|
)
|
|
|
|
|
written += 1
|
2026-06-18 15:51:58 +00:00
|
|
|
log.info(" geocoded %s -> %.5f, %.5f", used, lat, lng)
|
2026-06-11 17:13:50 +00:00
|
|
|
n = _resolve()
|
2026-06-18 15:51:58 +00:00
|
|
|
log.info("locations: %d accepted (%d via estate/cluster fallback), %d unmatched; "
|
|
|
|
|
"re-resolved geom on %d tickets (unverified — review tickets.geo_locations)",
|
|
|
|
|
written, coarse, missed, n)
|
2026-06-11 17:13:50 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def _resolve() -> int:
|
|
|
|
|
with get_conn() as conn:
|
|
|
|
|
with conn.cursor() as cur:
|
|
|
|
|
cur.execute("SELECT tickets.resolve_ticket_geoms()")
|
|
|
|
|
return cur.fetchone()[0]
|