Provider moved the INC CDC feed to a new bucket (tickets -> isptickets, new per-bucket creds; same s3.rahamafresh.com endpoint, identical 32-col schema). This is config + a one-time reseed, not a rewrite — the loader already drains automations/inc/changes/ oldest->newest with a source_max_key watermark. - default _BUCKET -> isptickets (TICKETS_BUCKET still overrides) - add --reseed: ignore the stored watermark and drain every changes/ file once (the old-bucket watermark may post-date the new bucket's first file). Crash-safe via the existing per-file watermark-advance + archive loop. - refresh stale "newest-file / full-snapshot-per-hour" docstring/comments to the CDC reality; .env.example + README updated (new bucket + reseed runbook). Verified live dry-run: 41/41 files drained (watermark None), alarm/sentinel filter active, exit 0. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
649 lines
32 KiB
Python
649 lines
32 KiB
Python
"""
|
|
import_tickets.py — Fireside Communications · INC ticket ingestion (raw-first)
|
|
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
|
Loads the client's field-ops INC ticket snapshots into the `tickets` schema — the
|
|
source of the FleetOps "Tickets" map.
|
|
tickets.inc — incidents / customer faults
|
|
|
|
STRICTLY INC: CRQ (new-installation) exports are out of scope and not processed
|
|
here. `tickets.crq` stays in the schema but is not fed by this pipeline.
|
|
|
|
RAW-FIRST: each row stores only `ticket_id` + `raw` (the source record as jsonb).
|
|
Everything downstream reads from `raw` (resilient to source schema drift). The DB
|
|
derives `geom` (see migrations): feed coords (raw lat/lng) -> location geocode
|
|
(tickets.geo_locations) -> cluster centroid (tickets.geo_clusters) -> none.
|
|
|
|
Source data: the n8n S3 export writes CSV files to the `isptickets` bucket under
|
|
automations/inc/changes/<EAT-timestamp>.csv (e.g. 2026-06-24T09-55-44.csv)
|
|
This is an INCREMENTAL (CDC) stream: the first file is a full current-state
|
|
baseline, and every later file holds only the rows that CHANGED since the prior
|
|
export (new + updated tickets, keyed by ticket_id; deletions are never emitted).
|
|
Every file shares the identical flat-CSV schema (header + rows). We ingest EVERY
|
|
not-yet-processed file in ASCENDING timestamp order (baseline first, then each
|
|
delta) — taking only the newest would silently drop the intermediate deltas:
|
|
- drop is_alarm=true rows + the "EXPORT STOPPED…" truncation-sentinel row;
|
|
- drop derivable / provenance / zero-info columns (see DROP_FIELDS);
|
|
- normalize region -> lowercase, raw_status -> UPPERCASE;
|
|
- upsert on ticket_id (PRIMARY KEY → no duplication; never delete, so closure
|
|
history accumulates), and advance the watermark in tickets.import_meta
|
|
(metadata->>'source_max_key' = newest file applied) so reruns skip what's done;
|
|
- on success, MOVE each file to automations/inc/processed/ (copy + delete).
|
|
|
|
Geocoding (two layers, both via a KEYED provider — public Nominatim rate-limits):
|
|
--geocode-clusters one coordinate per cluster (coarse fallback; ~50 lookups)
|
|
--geocode-locations precise per-location for ACTIONABLE INC tickets: parses the
|
|
real place out of location_name (region+cluster+location_name,
|
|
network codes stripped), geocodes it, caches in
|
|
tickets.geo_locations, then re-resolves geoms.
|
|
Provider/key from env: GEOCODER_PROVIDER (locationiq|opencage), GEOCODER_API_KEY.
|
|
|
|
Usage (needs DATABASE_URL + RUSTFS_* + GEOCODER_* env; see .env.example):
|
|
python import_tickets.py --from-bucket --apply
|
|
python import_tickets.py --from-bucket --reseed --apply # one-time bucket cutover
|
|
python import_tickets.py --inc-csv 2026-06-15T17-00-00.csv --apply
|
|
python import_tickets.py --geocode-clusters --apply
|
|
python import_tickets.py --geocode-locations --apply
|
|
|
|
Pre-requisite: migration applied (run_migrations.py) — tickets.inc/crq +
|
|
geo_clusters + geo_locations + reporting.fn_tickets_for_map.
|
|
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import csv
|
|
import io
|
|
import math
|
|
import os
|
|
import re
|
|
import time
|
|
from datetime import datetime, timezone, timedelta
|
|
|
|
import boto3
|
|
import requests
|
|
import psycopg2.extras
|
|
from botocore.config import Config as BotoConfig
|
|
|
|
from shared import clean, get_conn, get_logger
|
|
|
|
log = get_logger("import_tickets")
|
|
|
|
# ── INC ingestion config ──────────────────────────────────────────────────────
|
|
_TABLE = "tickets.inc"
|
|
_DATASET = "inc"
|
|
_BUCKET = os.getenv("TICKETS_BUCKET", "isptickets")
|
|
_INC_PREFIX = "automations/inc/changes/" # the incremental (CDC) change stream
|
|
_PROCESSED_PREFIX = "automations/inc/processed/"
|
|
_EAT = timezone(timedelta(hours=3)) # Africa/Nairobi — filenames + data are EAT
|
|
|
|
# Garbage row the source leaks (commonly the first data line): its ticket_id is the
|
|
# message itself. Matched by prefix so position/exact-tail don't matter.
|
|
_SENTINEL_PREFIX = "EXPORT STOPPED"
|
|
|
|
# Columns dropped before building `raw`: derivable (week_*), the client's row-level
|
|
# export provenance (source_s3_*, source_snapshot_id), and zero-information columns
|
|
# (department=always FTTH, source_type=duplicate of service_type). We KEEP
|
|
# service_type and `bucket` (the latter is a closed/pending lifecycle flag).
|
|
DROP_FIELDS = frozenset({
|
|
"week_start", "week_end",
|
|
"source_s3_bucket", "source_s3_key", "source_snapshot_id",
|
|
"department", "source_type",
|
|
})
|
|
|
|
# Only files matching automations/inc/changes/<EAT-timestamp>.csv — the incremental
|
|
# stream (NOT processed/, NOT the leftover latest.csv/, latest.json/, full/ prefixes).
|
|
_CHANGE_KEY_RE = re.compile(
|
|
r"^automations/inc/changes/(\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2})\.csv$")
|
|
|
|
# Geocoder (keyed) — public Nominatim rate-limits bulk, so we use LocationIQ/OpenCage.
|
|
_PROVIDER = os.getenv("GEOCODER_PROVIDER", "locationiq").lower()
|
|
_API_KEY = os.getenv("GEOCODER_API_KEY", "")
|
|
_GEOCODE_INTERVAL_S = float(os.getenv("GEOCODER_MIN_INTERVAL_S", "1.1"))
|
|
_last_geocode_at = 0.0
|
|
|
|
|
|
# ── data loading (CSV · incremental CDC change stream · per-file watermark) ─────
|
|
# The n8n export writes an INCREMENTAL change stream to the `isptickets` bucket under
|
|
# automations/inc/changes/<EAT-timestamp>.csv: a first full-state baseline, then files
|
|
# holding only the rows that CHANGED (with periodic full-state re-emissions). We drain
|
|
# EVERY not-yet-processed file oldest→newest, upsert on ticket_id, advance the watermark
|
|
# (tickets.import_meta.metadata->>'source_max_key') per file, and archive it to processed/.
|
|
# S3 access is via boto3 (no aws-CLI dependency → runs cleanly in a slim container).
|
|
def _s3_client():
|
|
"""boto3 S3 client for the S3 endpoint (force path-style addressing)."""
|
|
return boto3.client(
|
|
"s3",
|
|
endpoint_url=os.environ["RUSTFS_ENDPOINT"],
|
|
aws_access_key_id=os.environ["RUSTFS_ACCESS_KEY"],
|
|
aws_secret_access_key=os.environ["RUSTFS_SECRET_KEY"],
|
|
region_name=os.getenv("RUSTFS_REGION", "us-east-1"),
|
|
config=BotoConfig(s3={"addressing_style": "path"}, signature_version="s3v4",
|
|
retries={"max_attempts": 3, "mode": "standard"}),
|
|
)
|
|
|
|
|
|
def _ts_from_key(key: str) -> datetime | None:
|
|
"""EAT timestamp embedded in an automations/inc/changes/<ts>.csv key (or None)."""
|
|
m = _CHANGE_KEY_RE.match(key)
|
|
if not m:
|
|
return None
|
|
try: # regex shape can match an impossible date (e.g. 9999-99-99T…) — don't crash the sort
|
|
return datetime.strptime(m.group(1), "%Y-%m-%dT%H-%M-%S").replace(tzinfo=_EAT)
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def _list_inc_csvs(s3) -> list[tuple[str, str]]:
|
|
"""[(key, etag)] for every automations/inc/changes/<ts>.csv (excludes processed/ + dirs)."""
|
|
out: list[tuple[str, str]] = []
|
|
for page in s3.get_paginator("list_objects_v2").paginate(Bucket=_BUCKET, Prefix=_INC_PREFIX):
|
|
for it in page.get("Contents", []):
|
|
if _CHANGE_KEY_RE.match(it["Key"]):
|
|
out.append((it["Key"], (it.get("ETag") or "").strip('"')))
|
|
return out
|
|
|
|
|
|
def _get_text(s3, key: str) -> str:
|
|
"""Download an object's body as UTF-8 text."""
|
|
return s3.get_object(Bucket=_BUCKET, Key=key)["Body"].read().decode("utf-8")
|
|
|
|
|
|
def _last_processed_ts() -> datetime | None:
|
|
"""Watermark: EAT timestamp of the newest change file already ingested.
|
|
|
|
Read from tickets.import_meta (metadata->>'source_max_key', advanced per file as
|
|
we drain changes/ oldest→newest). None when nothing has been ingested via the
|
|
changes stream yet (e.g. the first run after the source switched to incremental,
|
|
where the stored key is an old full-snapshot path) — then every file currently in
|
|
changes/ is processed.
|
|
"""
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"SELECT metadata->>'source_max_key' FROM tickets.import_meta WHERE dataset = %s",
|
|
(_DATASET,),
|
|
)
|
|
row = cur.fetchone()
|
|
return _ts_from_key(row[0]) if row and row[0] else None
|
|
|
|
|
|
def _parse_csv(text: str) -> list[dict]:
|
|
return list(csv.DictReader(io.StringIO(text)))
|
|
|
|
|
|
def _load_csv_local(path: str) -> list[dict]:
|
|
with open(path, encoding="utf-8", newline="") as f:
|
|
return list(csv.DictReader(f))
|
|
|
|
|
|
def _move_processed(s3, keys: list[str]) -> None:
|
|
"""Archive listed INC csv objects to automations/inc/processed/ (copy + delete)."""
|
|
for key in keys:
|
|
dst = _PROCESSED_PREFIX + key.rsplit("/", 1)[-1]
|
|
s3.copy_object(Bucket=_BUCKET, CopySource={"Bucket": _BUCKET, "Key": key}, Key=dst)
|
|
s3.delete_object(Bucket=_BUCKET, Key=key)
|
|
log.info("archived %s -> %s", key, dst)
|
|
|
|
|
|
# ── row preparation (filter · drop columns · normalize) ─────────────────────────
|
|
def _keep_row(row: dict) -> bool:
|
|
"""Drop alarm rows + the truncation-sentinel; require a real ticket_id."""
|
|
tid = clean(row.get("ticket_id"))
|
|
if not tid or tid.startswith(_SENTINEL_PREFIX):
|
|
return False
|
|
return clean(row.get("is_alarm")) != "true"
|
|
|
|
|
|
def _prepare(row: dict) -> dict:
|
|
"""Strip DROP_FIELDS and normalize region/raw_status — returns the `raw` payload."""
|
|
r = {k: v for k, v in row.items() if k not in DROP_FIELDS}
|
|
if r.get("region"):
|
|
r["region"] = r["region"].lower()
|
|
if r.get("raw_status"):
|
|
r["raw_status"] = r["raw_status"].upper()
|
|
return r
|
|
|
|
|
|
# ── upsert (raw-first) ────────────────────────────────────────────────────────
|
|
def _record_meta(cur, meta: dict, records_ingested: int) -> None:
|
|
"""Upsert the INC snapshot metadata (powers map freshness + holds source_etag).
|
|
|
|
Runs on the caller's cursor so the row upsert and the meta write commit
|
|
together — a half-written state (rows in, meta stale) breaks skip-if-unchanged.
|
|
"""
|
|
cur.execute(
|
|
"""INSERT INTO tickets.import_meta
|
|
(dataset, export_type, exported_at, snapshot_date, source_schema,
|
|
source_table, row_count, records_ingested, n8n_execution_id, metadata,
|
|
ingested_at)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, now())
|
|
ON CONFLICT (dataset) DO UPDATE
|
|
SET export_type = EXCLUDED.export_type, exported_at = EXCLUDED.exported_at,
|
|
snapshot_date = EXCLUDED.snapshot_date, source_schema = EXCLUDED.source_schema,
|
|
source_table = EXCLUDED.source_table, row_count = EXCLUDED.row_count,
|
|
records_ingested = EXCLUDED.records_ingested,
|
|
n8n_execution_id = EXCLUDED.n8n_execution_id, metadata = EXCLUDED.metadata,
|
|
ingested_at = now()""",
|
|
(_DATASET, clean(meta.get("export_type")), clean(meta.get("exported_at")),
|
|
clean(meta.get("snapshot_date")), clean(meta.get("source_schema")),
|
|
clean(meta.get("source_table")), meta.get("row_count"), records_ingested,
|
|
clean(meta.get("n8n_execution_id")), psycopg2.extras.Json(meta)),
|
|
)
|
|
|
|
|
|
def upsert(rows: list[dict], apply: bool, meta: dict | None = None) -> int:
|
|
meta = meta or {}
|
|
kept = [r for r in rows if _keep_row(r)]
|
|
payload = [(clean(r["ticket_id"]), psycopg2.extras.Json(_prepare(r))) for r in kept]
|
|
log.info("%s: %d rows read, %d kept, %d dropped (alarm/sentinel/no-id)",
|
|
_TABLE, len(rows), len(payload), len(rows) - len(payload))
|
|
if not apply:
|
|
log.info("DRY-RUN — nothing written to %s. Use --apply.", _TABLE)
|
|
return len(payload)
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
psycopg2.extras.execute_values(
|
|
cur,
|
|
f"INSERT INTO {_TABLE} (ticket_id, raw) VALUES %s "
|
|
"ON CONFLICT (ticket_id) DO UPDATE SET raw = EXCLUDED.raw, ingested_at = now()",
|
|
payload, page_size=500,
|
|
)
|
|
# same transaction as the upsert: rows + snapshot meta commit atomically
|
|
_record_meta(cur, meta, len(payload))
|
|
log.info("upserted %d rows into %s", len(payload), _TABLE)
|
|
return len(payload)
|
|
|
|
|
|
def _capture_history() -> None:
|
|
"""Append new closures + upsert today's backlog snapshot (tickets.capture_history)."""
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute("SELECT tickets.capture_history()")
|
|
log.info("history: %s", cur.fetchone()[0])
|
|
|
|
|
|
def ingest(args) -> None:
|
|
# Local-file path (dev): ingest a single CSV, no bucket / no archive.
|
|
if args.inc_csv:
|
|
rows = _load_csv_local(args.inc_csv)
|
|
name = os.path.basename(args.inc_csv)
|
|
ts = _ts_from_key(_INC_PREFIX + name)
|
|
meta = {"export_type": "full", "source_s3_key": name, "row_count": len(rows)}
|
|
if ts:
|
|
meta["exported_at"] = ts.isoformat()
|
|
upsert(rows, args.apply, meta=meta)
|
|
return
|
|
|
|
# --from-bucket: ingest EVERY not-yet-processed change file, oldest→newest
|
|
# (baseline first, then each delta), upserting each. The watermark advances and
|
|
# the file is archived PER file, so a mid-run failure leaves a consistent state
|
|
# (folder state matches the watermark) and the next run resumes cleanly.
|
|
s3 = _s3_client()
|
|
listing = _list_inc_csvs(s3)
|
|
if not listing:
|
|
log.info("no INC change files under %s — nothing to do", _INC_PREFIX)
|
|
return
|
|
listing.sort(key=lambda ke: _ts_from_key(ke[0]) or datetime.min.replace(tzinfo=_EAT))
|
|
|
|
# watermark: skip anything at/older than the newest file already applied. Archiving
|
|
# normally empties changes/, but this guards a failed archive from re-applying.
|
|
# --reseed ignores the stored watermark and drains EVERY file in changes/ once — used
|
|
# for a one-time bucket cutover, where the stored key points at the old bucket's stream
|
|
# and its timestamp may be newer than the new bucket's first file. Crash-safe: each file
|
|
# still advances source_max_key + archives per file, so a plain rerun resumes cleanly.
|
|
last_ts = None if args.reseed else _last_processed_ts()
|
|
_floor = datetime.min.replace(tzinfo=_EAT)
|
|
pending = [(k, e) for k, e in listing
|
|
if last_ts is None or (_ts_from_key(k) or _floor) > last_ts]
|
|
if not pending:
|
|
log.info("all %d change file(s) already processed (watermark %s) — nothing new",
|
|
len(listing), last_ts and last_ts.isoformat())
|
|
if args.apply:
|
|
_move_processed(s3, [k for k, _ in listing]) # archive any stragglers
|
|
_capture_history()
|
|
return
|
|
log.info("%d of %d change file(s) to ingest (watermark %s); newest=%s",
|
|
len(pending), len(listing), last_ts and last_ts.isoformat(), pending[-1][0])
|
|
|
|
total = 0
|
|
for i, (key, etag) in enumerate(pending):
|
|
rows = _parse_csv(_get_text(s3, key))
|
|
ts = _ts_from_key(key)
|
|
# the first file applied onto an empty watermark is the full baseline; every
|
|
# file after is a delta. export_type is informational (recorded in import_meta).
|
|
meta = {"export_type": "baseline" if (last_ts is None and i == 0) else "delta",
|
|
"source_s3_key": key, "source_etag": etag,
|
|
"source_max_key": key, "row_count": len(rows)}
|
|
if ts:
|
|
meta["exported_at"] = ts.isoformat()
|
|
# rows + watermark (source_max_key) commit in one txn, advancing per file; only
|
|
# then archive, so the changes/ folder state always matches the watermark.
|
|
total += upsert(rows, args.apply, meta=meta)
|
|
if args.apply:
|
|
_move_processed(s3, [key])
|
|
else:
|
|
log.info("DRY-RUN — would archive %s to %s", key, _PROCESSED_PREFIX)
|
|
log.info("ingested %d change file(s); %d rows kept in total", len(pending), total)
|
|
if args.apply:
|
|
_capture_history()
|
|
|
|
|
|
# ── place extraction (strip network codes, keep the real place) ───────────────
|
|
# Leading site-code prefixes (NW_, CO_, ADR_MNT_, COAST_, …) — applied repeatedly.
|
|
_PREFIX_RE = re.compile(r"^(?:NW|NE|NM|SW|SE|CO|COAST|ADR|MNT|CMT|DR|NAIROBI|FIBER\w*)[\s_]+")
|
|
# 'NW' is the one site-code that the source also glues straight onto the place with
|
|
# no separator (NWKIAMBU, NWRIDGE, NWTHE — ~1.7k rows in a single snapshot). Safe to
|
|
# split because no place/word starts with "NW"; the other codes (CO/NE/SE/DR…) begin
|
|
# real words (COAST, NEW, SEASONS, DRIVE) so we only strip THOSE when delimited above.
|
|
_GLUED_NW_RE = re.compile(r"^NW(?=[A-Z])")
|
|
# Inline network/work-order codes to drop wherever they appear.
|
|
_CODE_RE = re.compile(
|
|
r"\b(?:SDUS|SDU|MDUS|MDU|FDT\s*\d*|AP|CLUSTER\s*\d*[A-Z]?|PHASE\s*\d+|CL\s*\d+|MNT|SITE|ADR)\b"
|
|
)
|
|
# Trailing '-<segment>' after the final hyphen: a unit/instruction code, not a place.
|
|
# Dropped only when it LOOKS like one — a unit number (37, F32, 3C, 302), a short
|
|
# code (<=3 chars: E, NB, KKK), or an instruction phrase (CALL ON ARRIVAL, TBC, NA).
|
|
# A real word tail (…-MALL) is kept.
|
|
_UNIT_TAIL_RE = re.compile(r"^[A-Z]{0,2}\d+[A-Z]{0,3}$")
|
|
_TAIL_INSTRUCTION_TOKENS = frozenset({
|
|
"CALL", "TO", "NA", "NB", "TBC", "NULL", "NONE", "NIL", "OOO",
|
|
"OBT", "PENDING", "CONFIRM", "CHECK", "CLIENT", "ON",
|
|
})
|
|
|
|
|
|
def extract_place(location_name: str | None) -> str:
|
|
"""Pull the human place/landmark out of a coded location_name string.
|
|
|
|
e.g. 'NW_RUIRU KAMAKIS_DEEP EAST APARTMENT-37' -> 'RUIRU KAMAKIS DEEP EAST APARTMENT'
|
|
'NWKIAMBU_KIRIGITI_MWANJA APARTMENTS-TBC' -> 'KIAMBU KIRIGITI MWANJA APARTMENTS'
|
|
"""
|
|
s = (location_name or "").upper().strip()
|
|
if not s:
|
|
return ""
|
|
# drop the trailing '-<segment>' only when it's a unit/instruction code, not a
|
|
# real word (so '…-37'/'…-CALL ON ARRIVAL' drop but '…-MALL' is kept)
|
|
if "-" in s:
|
|
head, _, tail = s.rpartition("-")
|
|
head, tail = head.strip(), tail.strip()
|
|
first = tail.split()[0] if tail else ""
|
|
if head and (not tail or len(tail) <= 3 or _UNIT_TAIL_RE.match(tail)
|
|
or first in _TAIL_INSTRUCTION_TOKENS):
|
|
s = head
|
|
s = s.replace("_", " ")
|
|
# strip leading site-code prefixes (may be stacked: ADR MNT KAHAWA…; or glued: NWKIAMBU)
|
|
prev = None
|
|
while prev != s:
|
|
prev = s
|
|
s = _PREFIX_RE.sub("", s).strip()
|
|
s = _GLUED_NW_RE.sub("", s).strip()
|
|
s = _CODE_RE.sub(" ", s)
|
|
s = re.sub(r"\s+", " ", s).strip(" ,-")
|
|
return s
|
|
|
|
|
|
def compose_query(location_name: str | None, cluster: str | None, region: str | None) -> str:
|
|
parts = [p for p in (extract_place(location_name), clean(cluster), clean(region), "Kenya") if p]
|
|
return ", ".join(dict.fromkeys(parts)) # de-dupe while preserving order
|
|
|
|
|
|
def compose_queries(location_name: str | None, cluster: str | None,
|
|
region: str | None) -> list[str]:
|
|
"""Ordered geocode candidates, most → least specific (two-pass estate fallback).
|
|
|
|
Building-level location_names (e.g. 'KAHAWA WENDANI ALVO HOUSE') aren't in OSM, so
|
|
the precise query 404s. We then fall back to the estate (leading tokens of the
|
|
place) — each still constrained to the cluster viewbox + distance check by the
|
|
caller, so a coarse hit lands in the right neighbourhood (tighter than the bare
|
|
cluster centroid). We deliberately do NOT add a pure-cluster candidate: that would
|
|
just reproduce the cluster centroid while mislabelling it geo_source='location';
|
|
a truly unmatchable ticket should keep its honest cluster-centroid fallback.
|
|
e.g. 'KAHAWA WENDANI ALVO HOUSE' -> ['KAHAWA WENDANI ALVO HOUSE, WENDANI, nairobi,
|
|
Kenya', 'KAHAWA WENDANI, nairobi, Kenya', 'KAHAWA, nairobi, Kenya']
|
|
"""
|
|
region_part, cluster_part = clean(region), clean(cluster)
|
|
place = extract_place(location_name)
|
|
toks = place.split()
|
|
out: list[str] = []
|
|
|
|
def add(*parts: str | None) -> None:
|
|
q = ", ".join(dict.fromkeys([p for p in parts if p] + ["Kenya"]))
|
|
if q and q != "Kenya" and q not in out:
|
|
out.append(q)
|
|
|
|
add(place, cluster_part, region_part) # 1. full precise
|
|
if len(toks) > 2:
|
|
add(" ".join(toks[:2]), region_part) # 2. estate (leading 2 tokens)
|
|
if len(toks) > 1:
|
|
add(toks[0], region_part) # 3. leading token (broad estate)
|
|
return out
|
|
|
|
|
|
# ── keyed geocoder ────────────────────────────────────────────────────────────
|
|
def _throttle() -> None:
|
|
global _last_geocode_at
|
|
wait = _GEOCODE_INTERVAL_S - (time.monotonic() - _last_geocode_at)
|
|
if wait > 0:
|
|
time.sleep(wait)
|
|
_last_geocode_at = time.monotonic()
|
|
|
|
|
|
def _haversine_km(lat1: float, lng1: float, lat2: float, lng2: float) -> float:
|
|
dlat, dlng = math.radians(lat2 - lat1), math.radians(lng2 - lng1)
|
|
a = (math.sin(dlat / 2) ** 2
|
|
+ math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(dlng / 2) ** 2)
|
|
return 2 * 6371.0 * math.asin(math.sqrt(a))
|
|
|
|
|
|
def geocode(query: str, viewbox: tuple | None = None) -> tuple[float, float, float | None] | None:
|
|
"""Forward-geocode via the configured keyed provider. (lat, lng, confidence) | None.
|
|
|
|
`viewbox` = (min_lon, min_lat, max_lon, max_lat) constrains results to a box
|
|
around the cluster centroid (bounded), which stops the geocoder matching a
|
|
landmark name in the wrong city.
|
|
"""
|
|
if not _API_KEY:
|
|
log.error("GEOCODER_API_KEY is not set — cannot geocode (provider=%s)", _PROVIDER)
|
|
return None
|
|
_throttle()
|
|
try:
|
|
if _PROVIDER == "opencage":
|
|
params = {"key": _API_KEY, "q": query, "limit": 1, "countrycode": "ke", "no_annotations": 1}
|
|
if viewbox:
|
|
params["bounds"] = "%s,%s,%s,%s" % viewbox
|
|
r = requests.get("https://api.opencagedata.com/geocode/v1/json", params=params, timeout=15)
|
|
r.raise_for_status()
|
|
res = (r.json().get("results") or [])
|
|
if res:
|
|
g = res[0]["geometry"]
|
|
return float(g["lat"]), float(g["lng"]), res[0].get("confidence")
|
|
else: # locationiq (default)
|
|
params = {"key": _API_KEY, "q": query, "format": "json", "limit": 1, "countrycodes": "ke"}
|
|
if viewbox:
|
|
params["viewbox"] = "%s,%s,%s,%s" % viewbox
|
|
params["bounded"] = 1
|
|
r = requests.get("https://us1.locationiq.com/v1/search", params=params, timeout=15)
|
|
if r.status_code == 404: # LocationIQ returns 404 for "no matches"
|
|
return None
|
|
r.raise_for_status()
|
|
hits = r.json()
|
|
if hits:
|
|
h = hits[0]
|
|
return float(h["lat"]), float(h["lon"]), float(h.get("importance") or 0)
|
|
except (requests.RequestException, ValueError, KeyError) as e:
|
|
log.warning("geocode failed for %r: %s", query, e)
|
|
return None
|
|
|
|
|
|
# ── cluster gazetteer (coarse fallback) ───────────────────────────────────────
|
|
def geocode_clusters(apply: bool) -> None:
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
SELECT key, region FROM (
|
|
SELECT tickets.norm_cluster(raw->>'cluster') AS key,
|
|
(array_agg(raw->>'region'))[1] AS region
|
|
FROM tickets.inc WHERE raw->>'cluster' IS NOT NULL GROUP BY 1
|
|
UNION
|
|
SELECT tickets.norm_cluster(raw->>'cluster'),
|
|
(array_agg(raw->>'region'))[1]
|
|
FROM tickets.crq WHERE raw->>'cluster' IS NOT NULL GROUP BY 1
|
|
) z
|
|
WHERE key IS NOT NULL
|
|
AND NOT EXISTS (SELECT 1 FROM tickets.geo_clusters g
|
|
WHERE g.cluster_key = z.key AND g.geom IS NOT NULL)
|
|
"""
|
|
)
|
|
todo = cur.fetchall()
|
|
log.info("%d clusters to geocode", len(todo))
|
|
if not apply:
|
|
for key, region in todo:
|
|
log.info(" would geocode cluster: %s (%s)", key, region)
|
|
return
|
|
written = 0
|
|
for key, region in todo:
|
|
hit = geocode(f"{key}, {region}, Kenya" if region else f"{key}, Kenya")
|
|
if not hit:
|
|
continue
|
|
lat, lng, _ = hit
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""INSERT INTO tickets.geo_clusters (cluster_key, region, lat, lng, source, verified)
|
|
VALUES (%s, %s, %s, %s, %s, false)
|
|
ON CONFLICT (cluster_key) DO UPDATE
|
|
SET region = EXCLUDED.region, lat = EXCLUDED.lat,
|
|
lng = EXCLUDED.lng, source = EXCLUDED.source""",
|
|
(key, region, lat, lng, _PROVIDER),
|
|
)
|
|
written += 1
|
|
_resolve()
|
|
log.info("gazetteer: %d clusters written (unverified — review tickets.geo_clusters)", written)
|
|
|
|
|
|
# ── per-location geocoding (precise; actionable INC) ──────────────────────────
|
|
# A location geocode is only trusted if it lands within this radius of the
|
|
# cluster centroid; otherwise the geocoder matched the landmark in the wrong
|
|
# place and we fall back to the cluster centroid.
|
|
_MAX_KM_FROM_CLUSTER = float(os.getenv("GEOCODER_MAX_KM", "25"))
|
|
_VIEWBOX_DEG = 0.2 # ~22 km half-box around the cluster centroid
|
|
|
|
|
|
def geocode_locations(apply: bool) -> None:
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
SELECT t.key, t.location_name, t.cluster, t.region, gc.lat AS clat, gc.lng AS clng
|
|
FROM (
|
|
SELECT tickets.norm_cluster(raw->>'location_name') AS key,
|
|
(array_agg(raw->>'location_name'))[1] AS location_name,
|
|
(array_agg(raw->>'cluster'))[1] AS cluster,
|
|
(array_agg(raw->>'region'))[1] AS region,
|
|
tickets.norm_cluster((array_agg(raw->>'cluster'))[1]) AS ckey
|
|
FROM tickets.inc
|
|
WHERE (raw->>'is_actionable')::boolean
|
|
AND raw->>'location_name' IS NOT NULL
|
|
AND tickets.norm_cluster(raw->>'location_name') IS NOT NULL
|
|
AND NOT EXISTS (SELECT 1 FROM tickets.geo_locations gl
|
|
WHERE gl.query_key = tickets.norm_cluster(raw->>'location_name')
|
|
AND gl.geom IS NOT NULL)
|
|
GROUP BY 1
|
|
) t
|
|
LEFT JOIN tickets.geo_clusters gc ON gc.cluster_key = t.ckey
|
|
"""
|
|
)
|
|
todo = cur.fetchall()
|
|
log.info("%d actionable-INC locations to geocode (provider=%s)", len(todo), _PROVIDER)
|
|
if not apply:
|
|
for key, loc, cluster, region, clat, clng in todo[:50]:
|
|
log.info(" %s -> %s", key, " | ".join(compose_queries(loc, cluster, region)))
|
|
return
|
|
written = missed = coarse = 0
|
|
for key, loc, cluster, region, clat, clng in todo:
|
|
viewbox = None
|
|
if clat is not None and clng is not None:
|
|
viewbox = (clng - _VIEWBOX_DEG, clat - _VIEWBOX_DEG, clng + _VIEWBOX_DEG, clat + _VIEWBOX_DEG)
|
|
# two-pass: precise → estate → cluster; accept the FIRST in-range hit. A wrong-area
|
|
# match (> MAX_KM from the cluster centroid) is skipped so we try a coarser query.
|
|
hit = used = None
|
|
for i, cand in enumerate(compose_queries(loc, cluster, region)):
|
|
g = geocode(cand, viewbox)
|
|
if not g:
|
|
continue
|
|
lat, lng, conf = g
|
|
if (clat is not None and clng is not None
|
|
and _haversine_km(lat, lng, clat, clng) > _MAX_KM_FROM_CLUSTER):
|
|
continue
|
|
hit, used = g, cand
|
|
if i > 0:
|
|
coarse += 1
|
|
break
|
|
if not hit:
|
|
missed += 1 # no match even coarsely — keeps cluster-centroid fallback
|
|
continue
|
|
lat, lng, conf = hit
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""INSERT INTO tickets.geo_locations
|
|
(query_key, location_name, cluster, region, query, lat, lng, confidence, provider)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
ON CONFLICT (query_key) DO UPDATE
|
|
SET location_name = EXCLUDED.location_name, cluster = EXCLUDED.cluster,
|
|
region = EXCLUDED.region, query = EXCLUDED.query, lat = EXCLUDED.lat,
|
|
lng = EXCLUDED.lng, confidence = EXCLUDED.confidence, provider = EXCLUDED.provider""",
|
|
(key, loc, cluster, region, used, lat, lng, conf, _PROVIDER),
|
|
)
|
|
written += 1
|
|
log.info(" geocoded %s -> %.5f, %.5f", used, lat, lng)
|
|
n = _resolve()
|
|
log.info("locations: %d accepted (%d via estate/cluster fallback), %d unmatched; "
|
|
"re-resolved geom on %d tickets (unverified — review tickets.geo_locations)",
|
|
written, coarse, missed, n)
|
|
|
|
|
|
def _resolve() -> int:
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute("SELECT tickets.resolve_ticket_geoms()")
|
|
return cur.fetchone()[0]
|
|
|
|
|
|
# ── entrypoint ────────────────────────────────────────────────────────────────
|
|
def main() -> None:
|
|
ap = argparse.ArgumentParser(description="Ingest INC tickets from CSV (raw-first) + geocode")
|
|
ap.add_argument("--apply", action="store_true", help="Write to DB (default: dry-run)")
|
|
ap.add_argument("--from-bucket", action="store_true",
|
|
help="Drain the incremental INC change stream (automations/inc/changes/) "
|
|
"from the isptickets S3 bucket: every not-yet-processed file "
|
|
"oldest→newest, upsert on ticket_id, advance the watermark, archive")
|
|
ap.add_argument("--reseed", action="store_true",
|
|
help="Ignore the stored watermark and drain every file in changes/ once "
|
|
"(one-time bucket cutover / reseed). Use with --from-bucket --apply")
|
|
ap.add_argument("--inc-csv", default=None, help="Local INC tickets CSV file (dev)")
|
|
ap.add_argument("--geocode-clusters", action="store_true",
|
|
help="Geocode distinct clusters into the gazetteer, then re-resolve geoms")
|
|
ap.add_argument("--geocode-locations", action="store_true",
|
|
help="Geocode actionable-INC location_names precisely (keyed provider), then re-resolve")
|
|
ap.add_argument("--capture-history", action="store_true",
|
|
help="Run tickets.capture_history() standalone (closure_events + daily snapshot)")
|
|
args = ap.parse_args()
|
|
|
|
if args.geocode_clusters:
|
|
geocode_clusters(apply=args.apply)
|
|
return
|
|
if args.geocode_locations:
|
|
geocode_locations(apply=args.apply)
|
|
return
|
|
if args.capture_history:
|
|
_capture_history()
|
|
return
|
|
if not (args.from_bucket or args.inc_csv):
|
|
ap.error("provide --from-bucket, --inc-csv, --geocode-clusters, --geocode-locations, or --capture-history")
|
|
ingest(args)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|