fleettickets/import_tickets.py
david kiania 509338c076 feat(import_tickets): migrate INC ingest to isptickets bucket + --reseed cutover
Provider moved the INC CDC feed to a new bucket (tickets -> isptickets, new
per-bucket creds; same s3.rahamafresh.com endpoint, identical 32-col schema).
This is config + a one-time reseed, not a rewrite — the loader already drains
automations/inc/changes/ oldest->newest with a source_max_key watermark.

- default _BUCKET -> isptickets (TICKETS_BUCKET still overrides)
- add --reseed: ignore the stored watermark and drain every changes/ file once
  (the old-bucket watermark may post-date the new bucket's first file). Crash-safe
  via the existing per-file watermark-advance + archive loop.
- refresh stale "newest-file / full-snapshot-per-hour" docstring/comments to the
  CDC reality; .env.example + README updated (new bucket + reseed runbook).

Verified live dry-run: 41/41 files drained (watermark None), alarm/sentinel
filter active, exit 0.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 18:20:15 +03:00

649 lines
32 KiB
Python

"""
import_tickets.py — Fireside Communications · INC ticket ingestion (raw-first)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Loads the client's field-ops INC ticket snapshots into the `tickets` schema — the
source of the FleetOps "Tickets" map.
tickets.inc — incidents / customer faults
STRICTLY INC: CRQ (new-installation) exports are out of scope and not processed
here. `tickets.crq` stays in the schema but is not fed by this pipeline.
RAW-FIRST: each row stores only `ticket_id` + `raw` (the source record as jsonb).
Everything downstream reads from `raw` (resilient to source schema drift). The DB
derives `geom` (see migrations): feed coords (raw lat/lng) -> location geocode
(tickets.geo_locations) -> cluster centroid (tickets.geo_clusters) -> none.
Source data: the n8n S3 export writes CSV files to the `isptickets` bucket under
automations/inc/changes/<EAT-timestamp>.csv (e.g. 2026-06-24T09-55-44.csv)
This is an INCREMENTAL (CDC) stream: the first file is a full current-state
baseline, and every later file holds only the rows that CHANGED since the prior
export (new + updated tickets, keyed by ticket_id; deletions are never emitted).
Every file shares the identical flat-CSV schema (header + rows). We ingest EVERY
not-yet-processed file in ASCENDING timestamp order (baseline first, then each
delta) — taking only the newest would silently drop the intermediate deltas:
- drop is_alarm=true rows + the "EXPORT STOPPED…" truncation-sentinel row;
- drop derivable / provenance / zero-info columns (see DROP_FIELDS);
- normalize region -> lowercase, raw_status -> UPPERCASE;
- upsert on ticket_id (PRIMARY KEY → no duplication; never delete, so closure
history accumulates), and advance the watermark in tickets.import_meta
(metadata->>'source_max_key' = newest file applied) so reruns skip what's done;
- on success, MOVE each file to automations/inc/processed/ (copy + delete).
Geocoding (two layers, both via a KEYED provider — public Nominatim rate-limits):
--geocode-clusters one coordinate per cluster (coarse fallback; ~50 lookups)
--geocode-locations precise per-location for ACTIONABLE INC tickets: parses the
real place out of location_name (region+cluster+location_name,
network codes stripped), geocodes it, caches in
tickets.geo_locations, then re-resolves geoms.
Provider/key from env: GEOCODER_PROVIDER (locationiq|opencage), GEOCODER_API_KEY.
Usage (needs DATABASE_URL + RUSTFS_* + GEOCODER_* env; see .env.example):
python import_tickets.py --from-bucket --apply
python import_tickets.py --from-bucket --reseed --apply # one-time bucket cutover
python import_tickets.py --inc-csv 2026-06-15T17-00-00.csv --apply
python import_tickets.py --geocode-clusters --apply
python import_tickets.py --geocode-locations --apply
Pre-requisite: migration applied (run_migrations.py) — tickets.inc/crq +
geo_clusters + geo_locations + reporting.fn_tickets_for_map.
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
"""
from __future__ import annotations
import argparse
import csv
import io
import math
import os
import re
import time
from datetime import datetime, timezone, timedelta
import boto3
import requests
import psycopg2.extras
from botocore.config import Config as BotoConfig
from shared import clean, get_conn, get_logger
log = get_logger("import_tickets")
# ── INC ingestion config ──────────────────────────────────────────────────────
_TABLE = "tickets.inc"
_DATASET = "inc"
_BUCKET = os.getenv("TICKETS_BUCKET", "isptickets")
_INC_PREFIX = "automations/inc/changes/" # the incremental (CDC) change stream
_PROCESSED_PREFIX = "automations/inc/processed/"
_EAT = timezone(timedelta(hours=3)) # Africa/Nairobi — filenames + data are EAT
# Garbage row the source leaks (commonly the first data line): its ticket_id is the
# message itself. Matched by prefix so position/exact-tail don't matter.
_SENTINEL_PREFIX = "EXPORT STOPPED"
# Columns dropped before building `raw`: derivable (week_*), the client's row-level
# export provenance (source_s3_*, source_snapshot_id), and zero-information columns
# (department=always FTTH, source_type=duplicate of service_type). We KEEP
# service_type and `bucket` (the latter is a closed/pending lifecycle flag).
DROP_FIELDS = frozenset({
"week_start", "week_end",
"source_s3_bucket", "source_s3_key", "source_snapshot_id",
"department", "source_type",
})
# Only files matching automations/inc/changes/<EAT-timestamp>.csv — the incremental
# stream (NOT processed/, NOT the leftover latest.csv/, latest.json/, full/ prefixes).
_CHANGE_KEY_RE = re.compile(
r"^automations/inc/changes/(\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2})\.csv$")
# Geocoder (keyed) — public Nominatim rate-limits bulk, so we use LocationIQ/OpenCage.
_PROVIDER = os.getenv("GEOCODER_PROVIDER", "locationiq").lower()
_API_KEY = os.getenv("GEOCODER_API_KEY", "")
_GEOCODE_INTERVAL_S = float(os.getenv("GEOCODER_MIN_INTERVAL_S", "1.1"))
_last_geocode_at = 0.0
# ── data loading (CSV · incremental CDC change stream · per-file watermark) ─────
# The n8n export writes an INCREMENTAL change stream to the `isptickets` bucket under
# automations/inc/changes/<EAT-timestamp>.csv: a first full-state baseline, then files
# holding only the rows that CHANGED (with periodic full-state re-emissions). We drain
# EVERY not-yet-processed file oldest→newest, upsert on ticket_id, advance the watermark
# (tickets.import_meta.metadata->>'source_max_key') per file, and archive it to processed/.
# S3 access is via boto3 (no aws-CLI dependency → runs cleanly in a slim container).
def _s3_client():
"""boto3 S3 client for the S3 endpoint (force path-style addressing)."""
return boto3.client(
"s3",
endpoint_url=os.environ["RUSTFS_ENDPOINT"],
aws_access_key_id=os.environ["RUSTFS_ACCESS_KEY"],
aws_secret_access_key=os.environ["RUSTFS_SECRET_KEY"],
region_name=os.getenv("RUSTFS_REGION", "us-east-1"),
config=BotoConfig(s3={"addressing_style": "path"}, signature_version="s3v4",
retries={"max_attempts": 3, "mode": "standard"}),
)
def _ts_from_key(key: str) -> datetime | None:
"""EAT timestamp embedded in an automations/inc/changes/<ts>.csv key (or None)."""
m = _CHANGE_KEY_RE.match(key)
if not m:
return None
try: # regex shape can match an impossible date (e.g. 9999-99-99T…) — don't crash the sort
return datetime.strptime(m.group(1), "%Y-%m-%dT%H-%M-%S").replace(tzinfo=_EAT)
except ValueError:
return None
def _list_inc_csvs(s3) -> list[tuple[str, str]]:
"""[(key, etag)] for every automations/inc/changes/<ts>.csv (excludes processed/ + dirs)."""
out: list[tuple[str, str]] = []
for page in s3.get_paginator("list_objects_v2").paginate(Bucket=_BUCKET, Prefix=_INC_PREFIX):
for it in page.get("Contents", []):
if _CHANGE_KEY_RE.match(it["Key"]):
out.append((it["Key"], (it.get("ETag") or "").strip('"')))
return out
def _get_text(s3, key: str) -> str:
"""Download an object's body as UTF-8 text."""
return s3.get_object(Bucket=_BUCKET, Key=key)["Body"].read().decode("utf-8")
def _last_processed_ts() -> datetime | None:
"""Watermark: EAT timestamp of the newest change file already ingested.
Read from tickets.import_meta (metadata->>'source_max_key', advanced per file as
we drain changes/ oldest→newest). None when nothing has been ingested via the
changes stream yet (e.g. the first run after the source switched to incremental,
where the stored key is an old full-snapshot path) — then every file currently in
changes/ is processed.
"""
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT metadata->>'source_max_key' FROM tickets.import_meta WHERE dataset = %s",
(_DATASET,),
)
row = cur.fetchone()
return _ts_from_key(row[0]) if row and row[0] else None
def _parse_csv(text: str) -> list[dict]:
return list(csv.DictReader(io.StringIO(text)))
def _load_csv_local(path: str) -> list[dict]:
with open(path, encoding="utf-8", newline="") as f:
return list(csv.DictReader(f))
def _move_processed(s3, keys: list[str]) -> None:
"""Archive listed INC csv objects to automations/inc/processed/ (copy + delete)."""
for key in keys:
dst = _PROCESSED_PREFIX + key.rsplit("/", 1)[-1]
s3.copy_object(Bucket=_BUCKET, CopySource={"Bucket": _BUCKET, "Key": key}, Key=dst)
s3.delete_object(Bucket=_BUCKET, Key=key)
log.info("archived %s -> %s", key, dst)
# ── row preparation (filter · drop columns · normalize) ─────────────────────────
def _keep_row(row: dict) -> bool:
"""Drop alarm rows + the truncation-sentinel; require a real ticket_id."""
tid = clean(row.get("ticket_id"))
if not tid or tid.startswith(_SENTINEL_PREFIX):
return False
return clean(row.get("is_alarm")) != "true"
def _prepare(row: dict) -> dict:
"""Strip DROP_FIELDS and normalize region/raw_status — returns the `raw` payload."""
r = {k: v for k, v in row.items() if k not in DROP_FIELDS}
if r.get("region"):
r["region"] = r["region"].lower()
if r.get("raw_status"):
r["raw_status"] = r["raw_status"].upper()
return r
# ── upsert (raw-first) ────────────────────────────────────────────────────────
def _record_meta(cur, meta: dict, records_ingested: int) -> None:
"""Upsert the INC snapshot metadata (powers map freshness + holds source_etag).
Runs on the caller's cursor so the row upsert and the meta write commit
together — a half-written state (rows in, meta stale) breaks skip-if-unchanged.
"""
cur.execute(
"""INSERT INTO tickets.import_meta
(dataset, export_type, exported_at, snapshot_date, source_schema,
source_table, row_count, records_ingested, n8n_execution_id, metadata,
ingested_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, now())
ON CONFLICT (dataset) DO UPDATE
SET export_type = EXCLUDED.export_type, exported_at = EXCLUDED.exported_at,
snapshot_date = EXCLUDED.snapshot_date, source_schema = EXCLUDED.source_schema,
source_table = EXCLUDED.source_table, row_count = EXCLUDED.row_count,
records_ingested = EXCLUDED.records_ingested,
n8n_execution_id = EXCLUDED.n8n_execution_id, metadata = EXCLUDED.metadata,
ingested_at = now()""",
(_DATASET, clean(meta.get("export_type")), clean(meta.get("exported_at")),
clean(meta.get("snapshot_date")), clean(meta.get("source_schema")),
clean(meta.get("source_table")), meta.get("row_count"), records_ingested,
clean(meta.get("n8n_execution_id")), psycopg2.extras.Json(meta)),
)
def upsert(rows: list[dict], apply: bool, meta: dict | None = None) -> int:
meta = meta or {}
kept = [r for r in rows if _keep_row(r)]
payload = [(clean(r["ticket_id"]), psycopg2.extras.Json(_prepare(r))) for r in kept]
log.info("%s: %d rows read, %d kept, %d dropped (alarm/sentinel/no-id)",
_TABLE, len(rows), len(payload), len(rows) - len(payload))
if not apply:
log.info("DRY-RUN — nothing written to %s. Use --apply.", _TABLE)
return len(payload)
with get_conn() as conn:
with conn.cursor() as cur:
psycopg2.extras.execute_values(
cur,
f"INSERT INTO {_TABLE} (ticket_id, raw) VALUES %s "
"ON CONFLICT (ticket_id) DO UPDATE SET raw = EXCLUDED.raw, ingested_at = now()",
payload, page_size=500,
)
# same transaction as the upsert: rows + snapshot meta commit atomically
_record_meta(cur, meta, len(payload))
log.info("upserted %d rows into %s", len(payload), _TABLE)
return len(payload)
def _capture_history() -> None:
"""Append new closures + upsert today's backlog snapshot (tickets.capture_history)."""
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute("SELECT tickets.capture_history()")
log.info("history: %s", cur.fetchone()[0])
def ingest(args) -> None:
# Local-file path (dev): ingest a single CSV, no bucket / no archive.
if args.inc_csv:
rows = _load_csv_local(args.inc_csv)
name = os.path.basename(args.inc_csv)
ts = _ts_from_key(_INC_PREFIX + name)
meta = {"export_type": "full", "source_s3_key": name, "row_count": len(rows)}
if ts:
meta["exported_at"] = ts.isoformat()
upsert(rows, args.apply, meta=meta)
return
# --from-bucket: ingest EVERY not-yet-processed change file, oldest→newest
# (baseline first, then each delta), upserting each. The watermark advances and
# the file is archived PER file, so a mid-run failure leaves a consistent state
# (folder state matches the watermark) and the next run resumes cleanly.
s3 = _s3_client()
listing = _list_inc_csvs(s3)
if not listing:
log.info("no INC change files under %s — nothing to do", _INC_PREFIX)
return
listing.sort(key=lambda ke: _ts_from_key(ke[0]) or datetime.min.replace(tzinfo=_EAT))
# watermark: skip anything at/older than the newest file already applied. Archiving
# normally empties changes/, but this guards a failed archive from re-applying.
# --reseed ignores the stored watermark and drains EVERY file in changes/ once — used
# for a one-time bucket cutover, where the stored key points at the old bucket's stream
# and its timestamp may be newer than the new bucket's first file. Crash-safe: each file
# still advances source_max_key + archives per file, so a plain rerun resumes cleanly.
last_ts = None if args.reseed else _last_processed_ts()
_floor = datetime.min.replace(tzinfo=_EAT)
pending = [(k, e) for k, e in listing
if last_ts is None or (_ts_from_key(k) or _floor) > last_ts]
if not pending:
log.info("all %d change file(s) already processed (watermark %s) — nothing new",
len(listing), last_ts and last_ts.isoformat())
if args.apply:
_move_processed(s3, [k for k, _ in listing]) # archive any stragglers
_capture_history()
return
log.info("%d of %d change file(s) to ingest (watermark %s); newest=%s",
len(pending), len(listing), last_ts and last_ts.isoformat(), pending[-1][0])
total = 0
for i, (key, etag) in enumerate(pending):
rows = _parse_csv(_get_text(s3, key))
ts = _ts_from_key(key)
# the first file applied onto an empty watermark is the full baseline; every
# file after is a delta. export_type is informational (recorded in import_meta).
meta = {"export_type": "baseline" if (last_ts is None and i == 0) else "delta",
"source_s3_key": key, "source_etag": etag,
"source_max_key": key, "row_count": len(rows)}
if ts:
meta["exported_at"] = ts.isoformat()
# rows + watermark (source_max_key) commit in one txn, advancing per file; only
# then archive, so the changes/ folder state always matches the watermark.
total += upsert(rows, args.apply, meta=meta)
if args.apply:
_move_processed(s3, [key])
else:
log.info("DRY-RUN — would archive %s to %s", key, _PROCESSED_PREFIX)
log.info("ingested %d change file(s); %d rows kept in total", len(pending), total)
if args.apply:
_capture_history()
# ── place extraction (strip network codes, keep the real place) ───────────────
# Leading site-code prefixes (NW_, CO_, ADR_MNT_, COAST_, …) — applied repeatedly.
_PREFIX_RE = re.compile(r"^(?:NW|NE|NM|SW|SE|CO|COAST|ADR|MNT|CMT|DR|NAIROBI|FIBER\w*)[\s_]+")
# 'NW' is the one site-code that the source also glues straight onto the place with
# no separator (NWKIAMBU, NWRIDGE, NWTHE — ~1.7k rows in a single snapshot). Safe to
# split because no place/word starts with "NW"; the other codes (CO/NE/SE/DR…) begin
# real words (COAST, NEW, SEASONS, DRIVE) so we only strip THOSE when delimited above.
_GLUED_NW_RE = re.compile(r"^NW(?=[A-Z])")
# Inline network/work-order codes to drop wherever they appear.
_CODE_RE = re.compile(
r"\b(?:SDUS|SDU|MDUS|MDU|FDT\s*\d*|AP|CLUSTER\s*\d*[A-Z]?|PHASE\s*\d+|CL\s*\d+|MNT|SITE|ADR)\b"
)
# Trailing '-<segment>' after the final hyphen: a unit/instruction code, not a place.
# Dropped only when it LOOKS like one — a unit number (37, F32, 3C, 302), a short
# code (<=3 chars: E, NB, KKK), or an instruction phrase (CALL ON ARRIVAL, TBC, NA).
# A real word tail (…-MALL) is kept.
_UNIT_TAIL_RE = re.compile(r"^[A-Z]{0,2}\d+[A-Z]{0,3}$")
_TAIL_INSTRUCTION_TOKENS = frozenset({
"CALL", "TO", "NA", "NB", "TBC", "NULL", "NONE", "NIL", "OOO",
"OBT", "PENDING", "CONFIRM", "CHECK", "CLIENT", "ON",
})
def extract_place(location_name: str | None) -> str:
"""Pull the human place/landmark out of a coded location_name string.
e.g. 'NW_RUIRU KAMAKIS_DEEP EAST APARTMENT-37' -> 'RUIRU KAMAKIS DEEP EAST APARTMENT'
'NWKIAMBU_KIRIGITI_MWANJA APARTMENTS-TBC' -> 'KIAMBU KIRIGITI MWANJA APARTMENTS'
"""
s = (location_name or "").upper().strip()
if not s:
return ""
# drop the trailing '-<segment>' only when it's a unit/instruction code, not a
# real word (so '…-37'/'…-CALL ON ARRIVAL' drop but '…-MALL' is kept)
if "-" in s:
head, _, tail = s.rpartition("-")
head, tail = head.strip(), tail.strip()
first = tail.split()[0] if tail else ""
if head and (not tail or len(tail) <= 3 or _UNIT_TAIL_RE.match(tail)
or first in _TAIL_INSTRUCTION_TOKENS):
s = head
s = s.replace("_", " ")
# strip leading site-code prefixes (may be stacked: ADR MNT KAHAWA…; or glued: NWKIAMBU)
prev = None
while prev != s:
prev = s
s = _PREFIX_RE.sub("", s).strip()
s = _GLUED_NW_RE.sub("", s).strip()
s = _CODE_RE.sub(" ", s)
s = re.sub(r"\s+", " ", s).strip(" ,-")
return s
def compose_query(location_name: str | None, cluster: str | None, region: str | None) -> str:
parts = [p for p in (extract_place(location_name), clean(cluster), clean(region), "Kenya") if p]
return ", ".join(dict.fromkeys(parts)) # de-dupe while preserving order
def compose_queries(location_name: str | None, cluster: str | None,
region: str | None) -> list[str]:
"""Ordered geocode candidates, most → least specific (two-pass estate fallback).
Building-level location_names (e.g. 'KAHAWA WENDANI ALVO HOUSE') aren't in OSM, so
the precise query 404s. We then fall back to the estate (leading tokens of the
place) — each still constrained to the cluster viewbox + distance check by the
caller, so a coarse hit lands in the right neighbourhood (tighter than the bare
cluster centroid). We deliberately do NOT add a pure-cluster candidate: that would
just reproduce the cluster centroid while mislabelling it geo_source='location';
a truly unmatchable ticket should keep its honest cluster-centroid fallback.
e.g. 'KAHAWA WENDANI ALVO HOUSE' -> ['KAHAWA WENDANI ALVO HOUSE, WENDANI, nairobi,
Kenya', 'KAHAWA WENDANI, nairobi, Kenya', 'KAHAWA, nairobi, Kenya']
"""
region_part, cluster_part = clean(region), clean(cluster)
place = extract_place(location_name)
toks = place.split()
out: list[str] = []
def add(*parts: str | None) -> None:
q = ", ".join(dict.fromkeys([p for p in parts if p] + ["Kenya"]))
if q and q != "Kenya" and q not in out:
out.append(q)
add(place, cluster_part, region_part) # 1. full precise
if len(toks) > 2:
add(" ".join(toks[:2]), region_part) # 2. estate (leading 2 tokens)
if len(toks) > 1:
add(toks[0], region_part) # 3. leading token (broad estate)
return out
# ── keyed geocoder ────────────────────────────────────────────────────────────
def _throttle() -> None:
global _last_geocode_at
wait = _GEOCODE_INTERVAL_S - (time.monotonic() - _last_geocode_at)
if wait > 0:
time.sleep(wait)
_last_geocode_at = time.monotonic()
def _haversine_km(lat1: float, lng1: float, lat2: float, lng2: float) -> float:
dlat, dlng = math.radians(lat2 - lat1), math.radians(lng2 - lng1)
a = (math.sin(dlat / 2) ** 2
+ math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(dlng / 2) ** 2)
return 2 * 6371.0 * math.asin(math.sqrt(a))
def geocode(query: str, viewbox: tuple | None = None) -> tuple[float, float, float | None] | None:
"""Forward-geocode via the configured keyed provider. (lat, lng, confidence) | None.
`viewbox` = (min_lon, min_lat, max_lon, max_lat) constrains results to a box
around the cluster centroid (bounded), which stops the geocoder matching a
landmark name in the wrong city.
"""
if not _API_KEY:
log.error("GEOCODER_API_KEY is not set — cannot geocode (provider=%s)", _PROVIDER)
return None
_throttle()
try:
if _PROVIDER == "opencage":
params = {"key": _API_KEY, "q": query, "limit": 1, "countrycode": "ke", "no_annotations": 1}
if viewbox:
params["bounds"] = "%s,%s,%s,%s" % viewbox
r = requests.get("https://api.opencagedata.com/geocode/v1/json", params=params, timeout=15)
r.raise_for_status()
res = (r.json().get("results") or [])
if res:
g = res[0]["geometry"]
return float(g["lat"]), float(g["lng"]), res[0].get("confidence")
else: # locationiq (default)
params = {"key": _API_KEY, "q": query, "format": "json", "limit": 1, "countrycodes": "ke"}
if viewbox:
params["viewbox"] = "%s,%s,%s,%s" % viewbox
params["bounded"] = 1
r = requests.get("https://us1.locationiq.com/v1/search", params=params, timeout=15)
if r.status_code == 404: # LocationIQ returns 404 for "no matches"
return None
r.raise_for_status()
hits = r.json()
if hits:
h = hits[0]
return float(h["lat"]), float(h["lon"]), float(h.get("importance") or 0)
except (requests.RequestException, ValueError, KeyError) as e:
log.warning("geocode failed for %r: %s", query, e)
return None
# ── cluster gazetteer (coarse fallback) ───────────────────────────────────────
def geocode_clusters(apply: bool) -> None:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT key, region FROM (
SELECT tickets.norm_cluster(raw->>'cluster') AS key,
(array_agg(raw->>'region'))[1] AS region
FROM tickets.inc WHERE raw->>'cluster' IS NOT NULL GROUP BY 1
UNION
SELECT tickets.norm_cluster(raw->>'cluster'),
(array_agg(raw->>'region'))[1]
FROM tickets.crq WHERE raw->>'cluster' IS NOT NULL GROUP BY 1
) z
WHERE key IS NOT NULL
AND NOT EXISTS (SELECT 1 FROM tickets.geo_clusters g
WHERE g.cluster_key = z.key AND g.geom IS NOT NULL)
"""
)
todo = cur.fetchall()
log.info("%d clusters to geocode", len(todo))
if not apply:
for key, region in todo:
log.info(" would geocode cluster: %s (%s)", key, region)
return
written = 0
for key, region in todo:
hit = geocode(f"{key}, {region}, Kenya" if region else f"{key}, Kenya")
if not hit:
continue
lat, lng, _ = hit
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""INSERT INTO tickets.geo_clusters (cluster_key, region, lat, lng, source, verified)
VALUES (%s, %s, %s, %s, %s, false)
ON CONFLICT (cluster_key) DO UPDATE
SET region = EXCLUDED.region, lat = EXCLUDED.lat,
lng = EXCLUDED.lng, source = EXCLUDED.source""",
(key, region, lat, lng, _PROVIDER),
)
written += 1
_resolve()
log.info("gazetteer: %d clusters written (unverified — review tickets.geo_clusters)", written)
# ── per-location geocoding (precise; actionable INC) ──────────────────────────
# A location geocode is only trusted if it lands within this radius of the
# cluster centroid; otherwise the geocoder matched the landmark in the wrong
# place and we fall back to the cluster centroid.
_MAX_KM_FROM_CLUSTER = float(os.getenv("GEOCODER_MAX_KM", "25"))
_VIEWBOX_DEG = 0.2 # ~22 km half-box around the cluster centroid
def geocode_locations(apply: bool) -> None:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT t.key, t.location_name, t.cluster, t.region, gc.lat AS clat, gc.lng AS clng
FROM (
SELECT tickets.norm_cluster(raw->>'location_name') AS key,
(array_agg(raw->>'location_name'))[1] AS location_name,
(array_agg(raw->>'cluster'))[1] AS cluster,
(array_agg(raw->>'region'))[1] AS region,
tickets.norm_cluster((array_agg(raw->>'cluster'))[1]) AS ckey
FROM tickets.inc
WHERE (raw->>'is_actionable')::boolean
AND raw->>'location_name' IS NOT NULL
AND tickets.norm_cluster(raw->>'location_name') IS NOT NULL
AND NOT EXISTS (SELECT 1 FROM tickets.geo_locations gl
WHERE gl.query_key = tickets.norm_cluster(raw->>'location_name')
AND gl.geom IS NOT NULL)
GROUP BY 1
) t
LEFT JOIN tickets.geo_clusters gc ON gc.cluster_key = t.ckey
"""
)
todo = cur.fetchall()
log.info("%d actionable-INC locations to geocode (provider=%s)", len(todo), _PROVIDER)
if not apply:
for key, loc, cluster, region, clat, clng in todo[:50]:
log.info(" %s -> %s", key, " | ".join(compose_queries(loc, cluster, region)))
return
written = missed = coarse = 0
for key, loc, cluster, region, clat, clng in todo:
viewbox = None
if clat is not None and clng is not None:
viewbox = (clng - _VIEWBOX_DEG, clat - _VIEWBOX_DEG, clng + _VIEWBOX_DEG, clat + _VIEWBOX_DEG)
# two-pass: precise → estate → cluster; accept the FIRST in-range hit. A wrong-area
# match (> MAX_KM from the cluster centroid) is skipped so we try a coarser query.
hit = used = None
for i, cand in enumerate(compose_queries(loc, cluster, region)):
g = geocode(cand, viewbox)
if not g:
continue
lat, lng, conf = g
if (clat is not None and clng is not None
and _haversine_km(lat, lng, clat, clng) > _MAX_KM_FROM_CLUSTER):
continue
hit, used = g, cand
if i > 0:
coarse += 1
break
if not hit:
missed += 1 # no match even coarsely — keeps cluster-centroid fallback
continue
lat, lng, conf = hit
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""INSERT INTO tickets.geo_locations
(query_key, location_name, cluster, region, query, lat, lng, confidence, provider)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (query_key) DO UPDATE
SET location_name = EXCLUDED.location_name, cluster = EXCLUDED.cluster,
region = EXCLUDED.region, query = EXCLUDED.query, lat = EXCLUDED.lat,
lng = EXCLUDED.lng, confidence = EXCLUDED.confidence, provider = EXCLUDED.provider""",
(key, loc, cluster, region, used, lat, lng, conf, _PROVIDER),
)
written += 1
log.info(" geocoded %s -> %.5f, %.5f", used, lat, lng)
n = _resolve()
log.info("locations: %d accepted (%d via estate/cluster fallback), %d unmatched; "
"re-resolved geom on %d tickets (unverified — review tickets.geo_locations)",
written, coarse, missed, n)
def _resolve() -> int:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute("SELECT tickets.resolve_ticket_geoms()")
return cur.fetchone()[0]
# ── entrypoint ────────────────────────────────────────────────────────────────
def main() -> None:
ap = argparse.ArgumentParser(description="Ingest INC tickets from CSV (raw-first) + geocode")
ap.add_argument("--apply", action="store_true", help="Write to DB (default: dry-run)")
ap.add_argument("--from-bucket", action="store_true",
help="Drain the incremental INC change stream (automations/inc/changes/) "
"from the isptickets S3 bucket: every not-yet-processed file "
"oldest→newest, upsert on ticket_id, advance the watermark, archive")
ap.add_argument("--reseed", action="store_true",
help="Ignore the stored watermark and drain every file in changes/ once "
"(one-time bucket cutover / reseed). Use with --from-bucket --apply")
ap.add_argument("--inc-csv", default=None, help="Local INC tickets CSV file (dev)")
ap.add_argument("--geocode-clusters", action="store_true",
help="Geocode distinct clusters into the gazetteer, then re-resolve geoms")
ap.add_argument("--geocode-locations", action="store_true",
help="Geocode actionable-INC location_names precisely (keyed provider), then re-resolve")
ap.add_argument("--capture-history", action="store_true",
help="Run tickets.capture_history() standalone (closure_events + daily snapshot)")
args = ap.parse_args()
if args.geocode_clusters:
geocode_clusters(apply=args.apply)
return
if args.geocode_locations:
geocode_locations(apply=args.apply)
return
if args.capture_history:
_capture_history()
return
if not (args.from_bucket or args.inc_csv):
ap.error("provide --from-bucket, --inc-csv, --geocode-clusters, --geocode-locations, or --capture-history")
ingest(args)
if __name__ == "__main__":
main()