- migrations 21->23: dedicated `tickets` schema (tickets.inc / tickets.crq, raw-jsonb-first), geo_clusters + geo_locations gazetteers, geom-resolution trigger (feed -> location -> cluster -> none), reporting.fn_tickets_for_map - dashboard_api: GET /webhook/tickets (INC/CRQ GeoJSON for the FleetOps map) - tools/import_tickets.py: raw-first bucket ingest + cluster/location geocoding (LocationIQ/OpenCage, viewbox-bounded with a cluster-distance sanity guard) - docs/CONNECTIONS.md: geocoder env var names Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
377 lines
18 KiB
Python
377 lines
18 KiB
Python
"""
|
||
import_tickets.py — Fireside Communications · INC/CRQ ticket ingestion (raw-first)
|
||
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||
Loads the client's field-ops ticket snapshots into the `tickets` schema — the
|
||
source of the FleetOps "Tickets" map. Two categories, one table each:
|
||
tickets.inc — incidents / customer faults
|
||
tickets.crq — new-installation requests
|
||
|
||
RAW-FIRST: each row stores only `ticket_id` + `raw` (the full source record as
|
||
jsonb). Everything downstream reads from `raw` (resilient to source schema drift).
|
||
The DB derives `geom` (migration 23): feed coords (raw lat/lng) -> location geocode
|
||
(tickets.geo_locations) -> cluster centroid (tickets.geo_clusters) -> none.
|
||
|
||
Source data: rustfs `tickets` bucket, full snapshots from the client's email
|
||
automation — automations/{inc,crq}/latest.json (array of 32-key objects).
|
||
|
||
Geocoding (two layers, both via a KEYED provider — public Nominatim rate-limits):
|
||
--geocode-clusters one coordinate per cluster (coarse fallback; ~50 lookups)
|
||
--geocode-locations precise per-location for ACTIONABLE INC tickets: parses the
|
||
real place out of location_name (region+cluster+location_name,
|
||
network codes stripped), geocodes it, caches in
|
||
tickets.geo_locations, then re-resolves geoms.
|
||
Provider/key from env: GEOCODER_PROVIDER (locationiq|opencage), GEOCODER_API_KEY.
|
||
|
||
Usage (run as a module from the image root):
|
||
WK=$(docker ps --filter name=ingest_worker --format "{{.Names}}" | head -1)
|
||
docker exec -it "$WK" python -m tools.import_tickets --from-bucket --apply
|
||
docker exec -it "$WK" python -m tools.import_tickets \
|
||
--inc-json /tmp/inc.json --crq-json /tmp/crq.json --apply
|
||
docker exec -it "$WK" python -m tools.import_tickets --geocode-clusters --apply
|
||
docker exec -it "$WK" python -m tools.import_tickets --geocode-locations --apply
|
||
|
||
Pre-requisite: migrations 21–23 applied (tickets.inc/crq + geo_clusters +
|
||
geo_locations + reporting.fn_tickets_for_map).
|
||
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import json
|
||
import math
|
||
import os
|
||
import re
|
||
import subprocess
|
||
import time
|
||
|
||
import requests
|
||
import psycopg2.extras
|
||
|
||
from ts_shared_rev import clean, get_conn, get_logger
|
||
|
||
log = get_logger("import_tickets")
|
||
|
||
TABLE_FOR = {"inc": "tickets.inc", "crq": "tickets.crq"}
|
||
_BUCKET = os.getenv("TICKETS_BUCKET", "tickets")
|
||
_BUCKET_KEYS = {"inc": "automations/inc/latest.json", "crq": "automations/crq/latest.json"}
|
||
|
||
# Geocoder (keyed) — public Nominatim rate-limits bulk, so we use LocationIQ/OpenCage.
|
||
_PROVIDER = os.getenv("GEOCODER_PROVIDER", "locationiq").lower()
|
||
_API_KEY = os.getenv("GEOCODER_API_KEY", "")
|
||
_GEOCODE_INTERVAL_S = float(os.getenv("GEOCODER_MIN_INTERVAL_S", "1.1"))
|
||
_last_geocode_at = 0.0
|
||
|
||
|
||
# ── data loading ──────────────────────────────────────────────────────────────
|
||
def _load_local(path: str) -> list[dict]:
|
||
with open(path, encoding="utf-8") as f:
|
||
data = json.load(f) # json.loads accepts NaN by default
|
||
return data if isinstance(data, list) else []
|
||
|
||
|
||
def _load_bucket(kind: str) -> list[dict]:
|
||
env = {
|
||
**os.environ,
|
||
"AWS_ACCESS_KEY_ID": os.environ["RUSTFS_ACCESS_KEY"],
|
||
"AWS_SECRET_ACCESS_KEY": os.environ["RUSTFS_SECRET_KEY"],
|
||
"AWS_DEFAULT_REGION": os.getenv("RUSTFS_REGION", "us-east-1"),
|
||
}
|
||
uri = f"s3://{_BUCKET}/{_BUCKET_KEYS[kind]}"
|
||
log.info("fetching %s", uri)
|
||
out = subprocess.run(
|
||
["aws", "--endpoint-url", os.environ["RUSTFS_ENDPOINT"], "s3", "cp", uri, "-"],
|
||
env=env, capture_output=True, timeout=180, check=True,
|
||
).stdout
|
||
data = json.loads(out.decode("utf-8"))
|
||
return data if isinstance(data, list) else []
|
||
|
||
|
||
# ── upsert (raw-first) ────────────────────────────────────────────────────────
|
||
def _scrub_nan(row: dict) -> dict:
|
||
# Postgres jsonb rejects the JSON `NaN` token (e.g. mttr) — scrub to null.
|
||
return {k: (None if isinstance(v, float) and math.isnan(v) else v) for k, v in row.items()}
|
||
|
||
|
||
def upsert(rows: list[dict], table: str, apply: bool) -> int:
|
||
payload = [
|
||
(tid, psycopg2.extras.Json(_scrub_nan(r)))
|
||
for r in rows
|
||
if (tid := clean(r.get("ticket_id")))
|
||
]
|
||
log.info("%s: %d valid rows (skipped %d without ticket_id)",
|
||
table, len(payload), len(rows) - len(payload))
|
||
if not apply:
|
||
log.info("DRY-RUN — nothing written to %s. Use --apply.", table)
|
||
return len(payload)
|
||
with get_conn() as conn:
|
||
with conn.cursor() as cur:
|
||
psycopg2.extras.execute_values(
|
||
cur,
|
||
f"INSERT INTO {table} (ticket_id, raw) VALUES %s "
|
||
"ON CONFLICT (ticket_id) DO UPDATE SET raw = EXCLUDED.raw, ingested_at = now()",
|
||
payload, page_size=500,
|
||
)
|
||
log.info("upserted %d rows into %s", len(payload), table)
|
||
return len(payload)
|
||
|
||
|
||
def ingest(args) -> None:
|
||
if args.from_bucket:
|
||
for kind in ("inc", "crq"):
|
||
upsert(_load_bucket(kind), TABLE_FOR[kind], args.apply)
|
||
else:
|
||
if args.inc_json:
|
||
upsert(_load_local(args.inc_json), TABLE_FOR["inc"], args.apply)
|
||
if args.crq_json:
|
||
upsert(_load_local(args.crq_json), TABLE_FOR["crq"], args.apply)
|
||
|
||
|
||
# ── place extraction (strip network codes, keep the real place) ───────────────
|
||
# Leading site-code prefixes (NW_, CO_, ADR_MNT_, COAST_, …) — applied repeatedly.
|
||
_PREFIX_RE = re.compile(r"^(?:NW|NE|NM|SW|SE|CO|COAST|ADR|MNT|CMT|DR|NAIROBI|FIBER\w*)[\s_]+")
|
||
# Inline network/work-order codes to drop wherever they appear.
|
||
_CODE_RE = re.compile(
|
||
r"\b(?:SDUS|SDU|MDUS|MDU|FDT\s*\d*|AP|CLUSTER\s*\d*[A-Z]?|PHASE\s*\d+|CL\s*\d+|MNT|SITE|ADR)\b"
|
||
)
|
||
|
||
|
||
def extract_place(location_name: str | None) -> str:
|
||
"""Pull the human place/landmark out of a coded location_name string.
|
||
|
||
e.g. 'NW_RUIRU KAMAKIS_DEEP EAST APARTMENT-37' -> 'RUIRU KAMAKIS DEEP EAST APARTMENT'
|
||
"""
|
||
s = (location_name or "").upper().strip()
|
||
if not s:
|
||
return ""
|
||
# drop the trailing '-<unit/instruction>' segment (e.g. -37, -CALL CLIENT, -F32)
|
||
if "-" in s:
|
||
s = s.rsplit("-", 1)[0]
|
||
s = s.replace("_", " ")
|
||
# strip leading site-code prefixes (may be stacked: ADR MNT KAHAWA…)
|
||
prev = None
|
||
while prev != s:
|
||
prev = s
|
||
s = _PREFIX_RE.sub("", s).strip()
|
||
s = _CODE_RE.sub(" ", s)
|
||
s = re.sub(r"\s+", " ", s).strip(" ,-")
|
||
return s
|
||
|
||
|
||
def compose_query(location_name: str | None, cluster: str | None, region: str | None) -> str:
|
||
parts = [p for p in (extract_place(location_name), clean(cluster), clean(region), "Kenya") if p]
|
||
return ", ".join(dict.fromkeys(parts)) # de-dupe while preserving order
|
||
|
||
|
||
# ── keyed geocoder ────────────────────────────────────────────────────────────
|
||
def _throttle() -> None:
|
||
global _last_geocode_at
|
||
wait = _GEOCODE_INTERVAL_S - (time.monotonic() - _last_geocode_at)
|
||
if wait > 0:
|
||
time.sleep(wait)
|
||
_last_geocode_at = time.monotonic()
|
||
|
||
|
||
def _haversine_km(lat1: float, lng1: float, lat2: float, lng2: float) -> float:
|
||
dlat, dlng = math.radians(lat2 - lat1), math.radians(lng2 - lng1)
|
||
a = (math.sin(dlat / 2) ** 2
|
||
+ math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(dlng / 2) ** 2)
|
||
return 2 * 6371.0 * math.asin(math.sqrt(a))
|
||
|
||
|
||
def geocode(query: str, viewbox: tuple | None = None) -> tuple[float, float, float | None] | None:
|
||
"""Forward-geocode via the configured keyed provider. (lat, lng, confidence) | None.
|
||
|
||
`viewbox` = (min_lon, min_lat, max_lon, max_lat) constrains results to a box
|
||
around the cluster centroid (bounded), which stops the geocoder matching a
|
||
landmark name in the wrong city.
|
||
"""
|
||
if not _API_KEY:
|
||
log.error("GEOCODER_API_KEY is not set — cannot geocode (provider=%s)", _PROVIDER)
|
||
return None
|
||
_throttle()
|
||
try:
|
||
if _PROVIDER == "opencage":
|
||
params = {"key": _API_KEY, "q": query, "limit": 1, "countrycode": "ke", "no_annotations": 1}
|
||
if viewbox:
|
||
params["bounds"] = "%s,%s,%s,%s" % viewbox
|
||
r = requests.get("https://api.opencagedata.com/geocode/v1/json", params=params, timeout=15)
|
||
r.raise_for_status()
|
||
res = (r.json().get("results") or [])
|
||
if res:
|
||
g = res[0]["geometry"]
|
||
return float(g["lat"]), float(g["lng"]), res[0].get("confidence")
|
||
else: # locationiq (default)
|
||
params = {"key": _API_KEY, "q": query, "format": "json", "limit": 1, "countrycodes": "ke"}
|
||
if viewbox:
|
||
params["viewbox"] = "%s,%s,%s,%s" % viewbox
|
||
params["bounded"] = 1
|
||
r = requests.get("https://us1.locationiq.com/v1/search", params=params, timeout=15)
|
||
if r.status_code == 404: # LocationIQ returns 404 for "no matches"
|
||
return None
|
||
r.raise_for_status()
|
||
hits = r.json()
|
||
if hits:
|
||
h = hits[0]
|
||
return float(h["lat"]), float(h["lon"]), float(h.get("importance") or 0)
|
||
except (requests.RequestException, ValueError, KeyError) as e:
|
||
log.warning("geocode failed for %r: %s", query, e)
|
||
return None
|
||
|
||
|
||
# ── cluster gazetteer (coarse fallback) ───────────────────────────────────────
|
||
def geocode_clusters(apply: bool) -> None:
|
||
with get_conn() as conn:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"""
|
||
SELECT key, region FROM (
|
||
SELECT tickets.norm_cluster(raw->>'cluster') AS key,
|
||
(array_agg(raw->>'region'))[1] AS region
|
||
FROM tickets.inc WHERE raw->>'cluster' IS NOT NULL GROUP BY 1
|
||
UNION
|
||
SELECT tickets.norm_cluster(raw->>'cluster'),
|
||
(array_agg(raw->>'region'))[1]
|
||
FROM tickets.crq WHERE raw->>'cluster' IS NOT NULL GROUP BY 1
|
||
) z
|
||
WHERE key IS NOT NULL
|
||
AND NOT EXISTS (SELECT 1 FROM tickets.geo_clusters g
|
||
WHERE g.cluster_key = z.key AND g.geom IS NOT NULL)
|
||
"""
|
||
)
|
||
todo = cur.fetchall()
|
||
log.info("%d clusters to geocode", len(todo))
|
||
if not apply:
|
||
for key, region in todo:
|
||
log.info(" would geocode cluster: %s (%s)", key, region)
|
||
return
|
||
written = 0
|
||
for key, region in todo:
|
||
hit = geocode(f"{key}, {region}, Kenya" if region else f"{key}, Kenya")
|
||
if not hit:
|
||
continue
|
||
lat, lng, _ = hit
|
||
with get_conn() as conn:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"""INSERT INTO tickets.geo_clusters (cluster_key, region, lat, lng, source, verified)
|
||
VALUES (%s, %s, %s, %s, %s, false)
|
||
ON CONFLICT (cluster_key) DO UPDATE
|
||
SET region = EXCLUDED.region, lat = EXCLUDED.lat,
|
||
lng = EXCLUDED.lng, source = EXCLUDED.source""",
|
||
(key, region, lat, lng, _PROVIDER),
|
||
)
|
||
written += 1
|
||
_resolve()
|
||
log.info("gazetteer: %d clusters written (unverified — review tickets.geo_clusters)", written)
|
||
|
||
|
||
# ── per-location geocoding (precise; actionable INC) ──────────────────────────
|
||
# A location geocode is only trusted if it lands within this radius of the
|
||
# cluster centroid; otherwise the geocoder matched the landmark in the wrong
|
||
# place and we fall back to the cluster centroid.
|
||
_MAX_KM_FROM_CLUSTER = float(os.getenv("GEOCODER_MAX_KM", "25"))
|
||
_VIEWBOX_DEG = 0.2 # ~22 km half-box around the cluster centroid
|
||
|
||
|
||
def geocode_locations(apply: bool) -> None:
|
||
with get_conn() as conn:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"""
|
||
SELECT t.key, t.location_name, t.cluster, t.region, gc.lat AS clat, gc.lng AS clng
|
||
FROM (
|
||
SELECT tickets.norm_cluster(raw->>'location_name') AS key,
|
||
(array_agg(raw->>'location_name'))[1] AS location_name,
|
||
(array_agg(raw->>'cluster'))[1] AS cluster,
|
||
(array_agg(raw->>'region'))[1] AS region,
|
||
tickets.norm_cluster((array_agg(raw->>'cluster'))[1]) AS ckey
|
||
FROM tickets.inc
|
||
WHERE (raw->>'is_actionable')::boolean
|
||
AND raw->>'location_name' IS NOT NULL
|
||
AND tickets.norm_cluster(raw->>'location_name') IS NOT NULL
|
||
AND NOT EXISTS (SELECT 1 FROM tickets.geo_locations gl
|
||
WHERE gl.query_key = tickets.norm_cluster(raw->>'location_name')
|
||
AND gl.geom IS NOT NULL)
|
||
GROUP BY 1
|
||
) t
|
||
LEFT JOIN tickets.geo_clusters gc ON gc.cluster_key = t.ckey
|
||
"""
|
||
)
|
||
todo = cur.fetchall()
|
||
log.info("%d actionable-INC locations to geocode (provider=%s)", len(todo), _PROVIDER)
|
||
if not apply:
|
||
for key, loc, cluster, region, clat, clng in todo[:50]:
|
||
log.info(" %s -> %r", key, compose_query(loc, cluster, region))
|
||
return
|
||
written = rejected = 0
|
||
for key, loc, cluster, region, clat, clng in todo:
|
||
query = compose_query(loc, cluster, region)
|
||
viewbox = None
|
||
if clat is not None and clng is not None:
|
||
viewbox = (clng - _VIEWBOX_DEG, clat - _VIEWBOX_DEG, clng + _VIEWBOX_DEG, clat + _VIEWBOX_DEG)
|
||
hit = geocode(query, viewbox)
|
||
if not hit:
|
||
continue
|
||
lat, lng, conf = hit
|
||
# distance sanity: a result far from the cluster centroid is a wrong-city
|
||
# match — drop it so the ticket keeps the cluster-centroid fallback.
|
||
if clat is not None and clng is not None:
|
||
km = _haversine_km(lat, lng, clat, clng)
|
||
if km > _MAX_KM_FROM_CLUSTER:
|
||
rejected += 1
|
||
log.info(" reject (%.0f km from cluster): %s", km, query)
|
||
continue
|
||
with get_conn() as conn:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"""INSERT INTO tickets.geo_locations
|
||
(query_key, location_name, cluster, region, query, lat, lng, confidence, provider)
|
||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||
ON CONFLICT (query_key) DO UPDATE
|
||
SET location_name = EXCLUDED.location_name, cluster = EXCLUDED.cluster,
|
||
region = EXCLUDED.region, query = EXCLUDED.query, lat = EXCLUDED.lat,
|
||
lng = EXCLUDED.lng, confidence = EXCLUDED.confidence, provider = EXCLUDED.provider""",
|
||
(key, loc, cluster, region, query, lat, lng, conf, _PROVIDER),
|
||
)
|
||
written += 1
|
||
log.info(" geocoded %s -> %.5f, %.5f", query, lat, lng)
|
||
n = _resolve()
|
||
log.info("locations: %d accepted, %d rejected (too far); re-resolved geom on %d tickets "
|
||
"(unverified — review tickets.geo_locations)", written, rejected, n)
|
||
|
||
|
||
def _resolve() -> int:
|
||
with get_conn() as conn:
|
||
with conn.cursor() as cur:
|
||
cur.execute("SELECT tickets.resolve_ticket_geoms()")
|
||
return cur.fetchone()[0]
|
||
|
||
|
||
# ── entrypoint ────────────────────────────────────────────────────────────────
|
||
def main() -> None:
|
||
ap = argparse.ArgumentParser(description="Ingest INC/CRQ tickets (raw-first) + geocode")
|
||
ap.add_argument("--apply", action="store_true", help="Write to DB (default: dry-run)")
|
||
ap.add_argument("--from-bucket", action="store_true",
|
||
help="Fetch latest.json for inc+crq from the rustfs tickets bucket (aws CLI)")
|
||
ap.add_argument("--inc-json", default=None, help="Local INC tickets JSON file")
|
||
ap.add_argument("--crq-json", default=None, help="Local CRQ tickets JSON file")
|
||
ap.add_argument("--geocode-clusters", action="store_true",
|
||
help="Geocode distinct clusters into the gazetteer, then re-resolve geoms")
|
||
ap.add_argument("--geocode-locations", action="store_true",
|
||
help="Geocode actionable-INC location_names precisely (keyed provider), then re-resolve")
|
||
args = ap.parse_args()
|
||
|
||
if args.geocode_clusters:
|
||
geocode_clusters(apply=args.apply)
|
||
return
|
||
if args.geocode_locations:
|
||
geocode_locations(apply=args.apply)
|
||
return
|
||
if not (args.from_bucket or args.inc_json or args.crq_json):
|
||
ap.error("provide --from-bucket, --inc-json/--crq-json, --geocode-clusters, or --geocode-locations")
|
||
ingest(args)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|