fleettickets/inc/import_inc.py
david kiania 5f5d71d500 feat(crq): add CRQ ingestion via shared engine + thin inc/crq entrypoints
Split the INC-only loader into a dataset-agnostic engine (pipeline.py, renamed
from import_tickets.py) parameterized by a Dataset config, with thin per-type
entrypoints inc/import_inc.py and crq/import_crq.py. CRQ shares INC's identical
32-column source schema and CDC change stream, so the engine is fully shared.

- pipeline.py: Dataset config (name/table/prefixes/key_regex/post_apply); INC
  keeps the capture_history post-apply hook, CRQ has none yet. geocode_locations
  now unions tickets.crq (geocoding is cross-dataset: one gazetteer/budget).
- crq/import_crq.py: drains automations/crq/changes/ from isptickets into
  tickets.crq (data layer + map; SLA/dashboard/history deferred).
- migrations/13_crq_columns.sql: CRQ mirror of 03 — typed STORED generated
  columns + indexes on tickets.crq (reuses tickets.eat_ts()).
- Deployment: Dockerfile/run_ingest.sh run both via `python -m`; pyproject
  packages inc/crq. Docs (README, implementation, deployment-and-operations,
  n8n export ref, phase-1) updated for the split + the one-time CRQ seed runbook.

tickets.crq already exists (mig 01, LIKE tickets.inc) and is unioned into
reporting.fn_tickets_for_map + resolve_ticket_geoms, so CRQ appears on the
existing Tickets map once seeded. Verified locally: ruff-clean new files, engine
lists/parses both streams against live S3 (crq=52 files, inc unaffected).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 23:16:38 +03:00

74 lines
3.9 KiB
Python

"""
inc/import_inc.py — Fireside Communications · INC (incident / fault) ingestion.
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Thin entrypoint over the shared engine (`pipeline.py`) for the INC dataset:
tickets.inc — incidents / customer faults (FleetOps "Tickets" INC tab)
INC reads the incremental CDC change stream automations/inc/changes/<EAT-ts>.csv
from the `isptickets` bucket, upserts on ticket_id, advances the per-dataset
watermark, archives each file to automations/inc/processed/, and — uniquely to
INC — runs tickets.capture_history() after each --apply run (closure_events +
daily backlog snapshot). Geocoding (--geocode-clusters / --geocode-locations) is
CROSS-DATASET and driven from here (the shared gazetteer covers inc + crq).
Usage (needs DATABASE_URL + RUSTFS_* + GEOCODER_* env; see .env.example):
python -m inc.import_inc --from-bucket --apply
python -m inc.import_inc --from-bucket --reseed --apply # one-time bucket cutover
python -m inc.import_inc --inc-csv 2026-06-15T17-00-00.csv --apply
python -m inc.import_inc --geocode-clusters --apply
python -m inc.import_inc --geocode-locations --apply
Pre-requisite: migrations applied (run_migrations.py) — tickets.inc/crq +
geo_clusters + geo_locations + reporting.fn_tickets_for_map.
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
"""
from __future__ import annotations
import argparse
import pipeline
# INC captures closure/backlog history after every --apply run (CRQ does not yet).
DATASET = pipeline.make_dataset("inc", post_apply=pipeline.capture_history)
def main() -> None:
ap = argparse.ArgumentParser(description="Ingest INC tickets from CSV (raw-first) + geocode")
ap.add_argument("--apply", action="store_true", help="Write to DB (default: dry-run)")
ap.add_argument("--from-bucket", action="store_true",
help="Drain the incremental INC change stream (automations/inc/changes/) "
"from the isptickets S3 bucket: every not-yet-processed file "
"oldest→newest, upsert on ticket_id, advance the watermark, archive")
ap.add_argument("--reseed", action="store_true",
help="Ignore the stored watermark and drain every file in changes/ once "
"(one-time bucket cutover / reseed). Use with --from-bucket --apply")
ap.add_argument("--inc-csv", dest="local_csv", default=None,
help="Local INC tickets CSV file (dev)")
ap.add_argument("--geocode-clusters", action="store_true",
help="Geocode distinct clusters (inc+crq) into the gazetteer, then re-resolve")
ap.add_argument("--geocode-locations", action="store_true",
help="Geocode actionable inc+crq location_names precisely (keyed provider), "
"then re-resolve")
ap.add_argument("--capture-history", action="store_true",
help="Run tickets.capture_history() standalone "
"(closure_events + daily snapshot)")
args = ap.parse_args()
if args.geocode_clusters:
pipeline.geocode_clusters(apply=args.apply)
return
if args.geocode_locations:
pipeline.geocode_locations(apply=args.apply)
return
if args.capture_history:
pipeline.capture_history()
return
if not (args.from_bucket or args.local_csv):
ap.error("provide --from-bucket, --inc-csv, --geocode-clusters, "
"--geocode-locations, or --capture-history")
pipeline.ingest(DATASET, args)
if __name__ == "__main__":
main()