fleettickets/import_tickets.py

536 lines
25 KiB
Python
Raw Normal View History

"""
import_tickets.py Fireside Communications · INC ticket ingestion (raw-first)
Loads the client's field-ops INC ticket snapshots into the `tickets` schema — the
source of the FleetOps "Tickets" map.
tickets.inc incidents / customer faults
STRICTLY INC: CRQ (new-installation) exports are out of scope and not processed
here. `tickets.crq` stays in the schema but is not fed by this pipeline.
RAW-FIRST: each row stores only `ticket_id` + `raw` (the source record as jsonb).
Everything downstream reads from `raw` (resilient to source schema drift). The DB
derives `geom` (see migrations): feed coords (raw lat/lng) -> location geocode
(tickets.geo_locations) -> cluster centroid (tickets.geo_clusters) -> none.
Source data: the n8n hourly S3 export (see n8n-hourly-s3-full-data-exports.md)
writes a full current-state snapshot CSV per hour to the `tickets` bucket at
automations/inc/<EAT-timestamp>.csv (e.g. 2026-06-15T17-00-00.csv)
There is NO latest pointer, NO metadata envelope, and NO deltas each file is a
flat CSV (header + rows). We ingest the NEWEST file:
- skip-if-unchanged: if its S3 ETag matches the last processed file's ETag we
skip the DB write (the export re-emits byte-identical content most hours);
- drop is_alarm=true rows + the "EXPORT STOPPED…" truncation-sentinel row;
- drop derivable / provenance / zero-info columns (see DROP_FIELDS);
- normalize region -> lowercase, raw_status -> UPPERCASE;
- upsert on ticket_id (PRIMARY KEY no duplication; never delete, so closure
history accumulates), and record snapshot freshness in tickets.import_meta;
- on success, MOVE the file to automations/inc/processed/ (copy + delete).
Geocoding (two layers, both via a KEYED provider public Nominatim rate-limits):
--geocode-clusters one coordinate per cluster (coarse fallback; ~50 lookups)
--geocode-locations precise per-location for ACTIONABLE INC tickets: parses the
real place out of location_name (region+cluster+location_name,
network codes stripped), geocodes it, caches in
tickets.geo_locations, then re-resolves geoms.
Provider/key from env: GEOCODER_PROVIDER (locationiq|opencage), GEOCODER_API_KEY.
Usage (needs DATABASE_URL + RUSTFS_* + GEOCODER_* env; see .env.example):
python import_tickets.py --from-bucket --apply
python import_tickets.py --inc-csv 2026-06-15T17-00-00.csv --apply
python import_tickets.py --geocode-clusters --apply
python import_tickets.py --geocode-locations --apply
Pre-requisite: migration applied (run_migrations.py) tickets.inc/crq +
geo_clusters + geo_locations + reporting.fn_tickets_for_map.
"""
from __future__ import annotations
import argparse
import csv
import io
import json
import math
import os
import re
import subprocess
import time
from datetime import datetime, timezone, timedelta
import requests
import psycopg2.extras
from shared import clean, get_conn, get_logger
log = get_logger("import_tickets")
# ── INC ingestion config ──────────────────────────────────────────────────────
_TABLE = "tickets.inc"
_DATASET = "inc"
_BUCKET = os.getenv("TICKETS_BUCKET", "tickets")
_INC_PREFIX = "automations/inc/"
_PROCESSED_PREFIX = "automations/inc/processed/"
_EAT = timezone(timedelta(hours=3)) # Africa/Nairobi — filenames + data are EAT
# Garbage row the source leaks (commonly the first data line): its ticket_id is the
# message itself. Matched by prefix so position/exact-tail don't matter.
_SENTINEL_PREFIX = "EXPORT STOPPED"
# Columns dropped before building `raw`: derivable (week_*), the client's row-level
# export provenance (source_s3_*, source_snapshot_id), and zero-information columns
# (department=always FTTH, source_type=duplicate of service_type). We KEEP
# service_type and `bucket` (the latter is a closed/pending lifecycle flag).
DROP_FIELDS = frozenset({
"week_start", "week_end",
"source_s3_bucket", "source_s3_key", "source_snapshot_id",
"department", "source_type",
})
# Only files matching automations/inc/<EAT-timestamp>.csv (NOT processed/, NOT the
# leftover latest.csv/, latest.json/, full/ prefixes).
_CSV_KEY_RE = re.compile(r"^automations/inc/(\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2})\.csv$")
# Geocoder (keyed) — public Nominatim rate-limits bulk, so we use LocationIQ/OpenCage.
_PROVIDER = os.getenv("GEOCODER_PROVIDER", "locationiq").lower()
_API_KEY = os.getenv("GEOCODER_API_KEY", "")
_GEOCODE_INTERVAL_S = float(os.getenv("GEOCODER_MIN_INTERVAL_S", "1.1"))
_last_geocode_at = 0.0
# ── data loading (CSV · newest-file · ETag skip-if-unchanged) ───────────────────
# The n8n hourly export writes a full current-state CSV per hour to
# automations/inc/<EAT-timestamp>.csv (no latest pointer, no envelope, no deltas).
# We ingest the NEWEST file; if its S3 ETag matches the last processed file's ETag
# we skip the DB write (the export re-emits byte-identical content most hours).
def _s3_env() -> dict:
return {
**os.environ,
"AWS_ACCESS_KEY_ID": os.environ["RUSTFS_ACCESS_KEY"],
"AWS_SECRET_ACCESS_KEY": os.environ["RUSTFS_SECRET_KEY"],
"AWS_DEFAULT_REGION": os.getenv("RUSTFS_REGION", "us-east-1"),
"AWS_S3_ADDRESSING_STYLE": "path", # force path-style to match the rustfs endpoint
}
def _aws(args: list[str], env: dict) -> bytes:
return subprocess.run(
["aws", "--endpoint-url", os.environ["RUSTFS_ENDPOINT"], *args],
env=env, capture_output=True, timeout=180, check=True,
).stdout
def _ts_from_key(key: str) -> datetime | None:
"""EAT timestamp embedded in an automations/inc/<ts>.csv key (or None)."""
m = _CSV_KEY_RE.match(key)
if not m:
return None
return datetime.strptime(m.group(1), "%Y-%m-%dT%H-%M-%S").replace(tzinfo=_EAT)
def _list_inc_csvs(env: dict) -> list[tuple[str, str]]:
"""[(key, etag)] for every automations/inc/<ts>.csv (excludes processed/ + dirs)."""
out = _aws(
["s3api", "list-objects-v2", "--bucket", _BUCKET, "--prefix", _INC_PREFIX,
"--query", "Contents[].{Key:Key,ETag:ETag}", "--output", "json"],
env,
).decode("utf-8").strip()
items = json.loads(out) if out and out != "None" else []
return [
(it["Key"], (it.get("ETag") or "").strip('"'))
for it in (items or []) if _CSV_KEY_RE.match(it.get("Key", ""))
]
def _last_processed_etag() -> str | None:
"""ETag of the most recently ingested INC file (from tickets.import_meta)."""
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT metadata->>'source_etag' FROM tickets.import_meta WHERE dataset = %s",
(_DATASET,),
)
row = cur.fetchone()
return row[0] if row else None
def _parse_csv(text: str) -> list[dict]:
return list(csv.DictReader(io.StringIO(text)))
def _load_csv_local(path: str) -> list[dict]:
with open(path, encoding="utf-8", newline="") as f:
return list(csv.DictReader(f))
def _move_processed(keys: list[str], env: dict) -> None:
"""Archive listed INC csv objects to automations/inc/processed/ (S3 mv = copy+delete)."""
for key in keys:
dst = _PROCESSED_PREFIX + key.rsplit("/", 1)[-1]
_aws(["s3", "mv", f"s3://{_BUCKET}/{key}", f"s3://{_BUCKET}/{dst}"], env)
log.info("archived %s -> %s", key, dst)
# ── row preparation (filter · drop columns · normalize) ─────────────────────────
def _keep_row(row: dict) -> bool:
"""Drop alarm rows + the truncation-sentinel; require a real ticket_id."""
tid = clean(row.get("ticket_id"))
if not tid or tid.startswith(_SENTINEL_PREFIX):
return False
return clean(row.get("is_alarm")) != "true"
def _prepare(row: dict) -> dict:
"""Strip DROP_FIELDS and normalize region/raw_status — returns the `raw` payload."""
r = {k: v for k, v in row.items() if k not in DROP_FIELDS}
if r.get("region"):
r["region"] = r["region"].lower()
if r.get("raw_status"):
r["raw_status"] = r["raw_status"].upper()
return r
# ── upsert (raw-first) ────────────────────────────────────────────────────────
def _record_meta(meta: dict, records_ingested: int) -> None:
"""Upsert the INC snapshot metadata (powers map freshness + holds source_etag)."""
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""INSERT INTO tickets.import_meta
(dataset, export_type, exported_at, snapshot_date, source_schema,
source_table, row_count, records_ingested, n8n_execution_id, metadata,
ingested_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, now())
ON CONFLICT (dataset) DO UPDATE
SET export_type = EXCLUDED.export_type, exported_at = EXCLUDED.exported_at,
snapshot_date = EXCLUDED.snapshot_date, source_schema = EXCLUDED.source_schema,
source_table = EXCLUDED.source_table, row_count = EXCLUDED.row_count,
records_ingested = EXCLUDED.records_ingested,
n8n_execution_id = EXCLUDED.n8n_execution_id, metadata = EXCLUDED.metadata,
ingested_at = now()""",
(_DATASET, clean(meta.get("export_type")), clean(meta.get("exported_at")),
clean(meta.get("snapshot_date")), clean(meta.get("source_schema")),
clean(meta.get("source_table")), meta.get("row_count"), records_ingested,
clean(meta.get("n8n_execution_id")), psycopg2.extras.Json(meta)),
)
def upsert(rows: list[dict], apply: bool, meta: dict | None = None) -> int:
meta = meta or {}
kept = [r for r in rows if _keep_row(r)]
payload = [(clean(r["ticket_id"]), psycopg2.extras.Json(_prepare(r))) for r in kept]
log.info("%s: %d rows read, %d kept, %d dropped (alarm/sentinel/no-id)",
_TABLE, len(rows), len(payload), len(rows) - len(payload))
if not apply:
log.info("DRY-RUN — nothing written to %s. Use --apply.", _TABLE)
return len(payload)
with get_conn() as conn:
with conn.cursor() as cur:
psycopg2.extras.execute_values(
cur,
f"INSERT INTO {_TABLE} (ticket_id, raw) VALUES %s "
"ON CONFLICT (ticket_id) DO UPDATE SET raw = EXCLUDED.raw, ingested_at = now()",
payload, page_size=500,
)
_record_meta(meta, len(payload))
log.info("upserted %d rows into %s", len(payload), _TABLE)
return len(payload)
def ingest(args) -> None:
# Local-file path (dev): ingest a single CSV, no bucket / no archive.
if args.inc_csv:
rows = _load_csv_local(args.inc_csv)
name = os.path.basename(args.inc_csv)
ts = _ts_from_key(_INC_PREFIX + name)
meta = {"export_type": "full", "source_s3_key": name, "row_count": len(rows)}
if ts:
meta["exported_at"] = ts.isoformat()
upsert(rows, args.apply, meta=meta)
return
# --from-bucket: newest INC csv → skip-if-unchanged → ingest → archive.
env = _s3_env()
listing = _list_inc_csvs(env)
if not listing:
log.info("no INC csv files under %s — nothing to do", _INC_PREFIX)
return
listing.sort(key=lambda ke: _ts_from_key(ke[0]) or datetime.min.replace(tzinfo=_EAT))
all_keys = [k for k, _ in listing]
newest_key, newest_etag = listing[-1]
log.info("newest INC file: %s (etag=%s; %d file(s) present)",
newest_key, newest_etag, len(listing))
last_etag = _last_processed_etag()
if newest_etag and newest_etag == last_etag:
log.info("etag unchanged from last processed (%s) — skipping DB write", last_etag)
if args.apply:
_move_processed(all_keys, env)
else:
log.info("DRY-RUN — would archive %d file(s) to %s", len(all_keys), _PROCESSED_PREFIX)
return
text = _aws(["s3", "cp", f"s3://{_BUCKET}/{newest_key}", "-"], env).decode("utf-8")
rows = _parse_csv(text)
ts = _ts_from_key(newest_key)
meta = {"export_type": "full", "source_s3_key": newest_key,
"source_etag": newest_etag, "row_count": len(rows)}
if ts:
meta["exported_at"] = ts.isoformat()
upsert(rows, args.apply, meta=meta)
if args.apply:
_move_processed(all_keys, env)
else:
log.info("DRY-RUN — would archive %d file(s) to %s", len(all_keys), _PROCESSED_PREFIX)
# ── place extraction (strip network codes, keep the real place) ───────────────
# Leading site-code prefixes (NW_, CO_, ADR_MNT_, COAST_, …) — applied repeatedly.
_PREFIX_RE = re.compile(r"^(?:NW|NE|NM|SW|SE|CO|COAST|ADR|MNT|CMT|DR|NAIROBI|FIBER\w*)[\s_]+")
# Inline network/work-order codes to drop wherever they appear.
_CODE_RE = re.compile(
r"\b(?:SDUS|SDU|MDUS|MDU|FDT\s*\d*|AP|CLUSTER\s*\d*[A-Z]?|PHASE\s*\d+|CL\s*\d+|MNT|SITE|ADR)\b"
)
def extract_place(location_name: str | None) -> str:
"""Pull the human place/landmark out of a coded location_name string.
e.g. 'NW_RUIRU KAMAKIS_DEEP EAST APARTMENT-37' -> 'RUIRU KAMAKIS DEEP EAST APARTMENT'
"""
s = (location_name or "").upper().strip()
if not s:
return ""
# drop the trailing '-<unit/instruction>' segment (e.g. -37, -CALL CLIENT, -F32)
if "-" in s:
s = s.rsplit("-", 1)[0]
s = s.replace("_", " ")
# strip leading site-code prefixes (may be stacked: ADR MNT KAHAWA…)
prev = None
while prev != s:
prev = s
s = _PREFIX_RE.sub("", s).strip()
s = _CODE_RE.sub(" ", s)
s = re.sub(r"\s+", " ", s).strip(" ,-")
return s
def compose_query(location_name: str | None, cluster: str | None, region: str | None) -> str:
parts = [p for p in (extract_place(location_name), clean(cluster), clean(region), "Kenya") if p]
return ", ".join(dict.fromkeys(parts)) # de-dupe while preserving order
# ── keyed geocoder ────────────────────────────────────────────────────────────
def _throttle() -> None:
global _last_geocode_at
wait = _GEOCODE_INTERVAL_S - (time.monotonic() - _last_geocode_at)
if wait > 0:
time.sleep(wait)
_last_geocode_at = time.monotonic()
def _haversine_km(lat1: float, lng1: float, lat2: float, lng2: float) -> float:
dlat, dlng = math.radians(lat2 - lat1), math.radians(lng2 - lng1)
a = (math.sin(dlat / 2) ** 2
+ math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(dlng / 2) ** 2)
return 2 * 6371.0 * math.asin(math.sqrt(a))
def geocode(query: str, viewbox: tuple | None = None) -> tuple[float, float, float | None] | None:
"""Forward-geocode via the configured keyed provider. (lat, lng, confidence) | None.
`viewbox` = (min_lon, min_lat, max_lon, max_lat) constrains results to a box
around the cluster centroid (bounded), which stops the geocoder matching a
landmark name in the wrong city.
"""
if not _API_KEY:
log.error("GEOCODER_API_KEY is not set — cannot geocode (provider=%s)", _PROVIDER)
return None
_throttle()
try:
if _PROVIDER == "opencage":
params = {"key": _API_KEY, "q": query, "limit": 1, "countrycode": "ke", "no_annotations": 1}
if viewbox:
params["bounds"] = "%s,%s,%s,%s" % viewbox
r = requests.get("https://api.opencagedata.com/geocode/v1/json", params=params, timeout=15)
r.raise_for_status()
res = (r.json().get("results") or [])
if res:
g = res[0]["geometry"]
return float(g["lat"]), float(g["lng"]), res[0].get("confidence")
else: # locationiq (default)
params = {"key": _API_KEY, "q": query, "format": "json", "limit": 1, "countrycodes": "ke"}
if viewbox:
params["viewbox"] = "%s,%s,%s,%s" % viewbox
params["bounded"] = 1
r = requests.get("https://us1.locationiq.com/v1/search", params=params, timeout=15)
if r.status_code == 404: # LocationIQ returns 404 for "no matches"
return None
r.raise_for_status()
hits = r.json()
if hits:
h = hits[0]
return float(h["lat"]), float(h["lon"]), float(h.get("importance") or 0)
except (requests.RequestException, ValueError, KeyError) as e:
log.warning("geocode failed for %r: %s", query, e)
return None
# ── cluster gazetteer (coarse fallback) ───────────────────────────────────────
def geocode_clusters(apply: bool) -> None:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT key, region FROM (
SELECT tickets.norm_cluster(raw->>'cluster') AS key,
(array_agg(raw->>'region'))[1] AS region
FROM tickets.inc WHERE raw->>'cluster' IS NOT NULL GROUP BY 1
UNION
SELECT tickets.norm_cluster(raw->>'cluster'),
(array_agg(raw->>'region'))[1]
FROM tickets.crq WHERE raw->>'cluster' IS NOT NULL GROUP BY 1
) z
WHERE key IS NOT NULL
AND NOT EXISTS (SELECT 1 FROM tickets.geo_clusters g
WHERE g.cluster_key = z.key AND g.geom IS NOT NULL)
"""
)
todo = cur.fetchall()
log.info("%d clusters to geocode", len(todo))
if not apply:
for key, region in todo:
log.info(" would geocode cluster: %s (%s)", key, region)
return
written = 0
for key, region in todo:
hit = geocode(f"{key}, {region}, Kenya" if region else f"{key}, Kenya")
if not hit:
continue
lat, lng, _ = hit
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""INSERT INTO tickets.geo_clusters (cluster_key, region, lat, lng, source, verified)
VALUES (%s, %s, %s, %s, %s, false)
ON CONFLICT (cluster_key) DO UPDATE
SET region = EXCLUDED.region, lat = EXCLUDED.lat,
lng = EXCLUDED.lng, source = EXCLUDED.source""",
(key, region, lat, lng, _PROVIDER),
)
written += 1
_resolve()
log.info("gazetteer: %d clusters written (unverified — review tickets.geo_clusters)", written)
# ── per-location geocoding (precise; actionable INC) ──────────────────────────
# A location geocode is only trusted if it lands within this radius of the
# cluster centroid; otherwise the geocoder matched the landmark in the wrong
# place and we fall back to the cluster centroid.
_MAX_KM_FROM_CLUSTER = float(os.getenv("GEOCODER_MAX_KM", "25"))
_VIEWBOX_DEG = 0.2 # ~22 km half-box around the cluster centroid
def geocode_locations(apply: bool) -> None:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT t.key, t.location_name, t.cluster, t.region, gc.lat AS clat, gc.lng AS clng
FROM (
SELECT tickets.norm_cluster(raw->>'location_name') AS key,
(array_agg(raw->>'location_name'))[1] AS location_name,
(array_agg(raw->>'cluster'))[1] AS cluster,
(array_agg(raw->>'region'))[1] AS region,
tickets.norm_cluster((array_agg(raw->>'cluster'))[1]) AS ckey
FROM tickets.inc
WHERE (raw->>'is_actionable')::boolean
AND raw->>'location_name' IS NOT NULL
AND tickets.norm_cluster(raw->>'location_name') IS NOT NULL
AND NOT EXISTS (SELECT 1 FROM tickets.geo_locations gl
WHERE gl.query_key = tickets.norm_cluster(raw->>'location_name')
AND gl.geom IS NOT NULL)
GROUP BY 1
) t
LEFT JOIN tickets.geo_clusters gc ON gc.cluster_key = t.ckey
"""
)
todo = cur.fetchall()
log.info("%d actionable-INC locations to geocode (provider=%s)", len(todo), _PROVIDER)
if not apply:
for key, loc, cluster, region, clat, clng in todo[:50]:
log.info(" %s -> %r", key, compose_query(loc, cluster, region))
return
written = rejected = 0
for key, loc, cluster, region, clat, clng in todo:
query = compose_query(loc, cluster, region)
viewbox = None
if clat is not None and clng is not None:
viewbox = (clng - _VIEWBOX_DEG, clat - _VIEWBOX_DEG, clng + _VIEWBOX_DEG, clat + _VIEWBOX_DEG)
hit = geocode(query, viewbox)
if not hit:
continue
lat, lng, conf = hit
# distance sanity: a result far from the cluster centroid is a wrong-city
# match — drop it so the ticket keeps the cluster-centroid fallback.
if clat is not None and clng is not None:
km = _haversine_km(lat, lng, clat, clng)
if km > _MAX_KM_FROM_CLUSTER:
rejected += 1
log.info(" reject (%.0f km from cluster): %s", km, query)
continue
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""INSERT INTO tickets.geo_locations
(query_key, location_name, cluster, region, query, lat, lng, confidence, provider)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (query_key) DO UPDATE
SET location_name = EXCLUDED.location_name, cluster = EXCLUDED.cluster,
region = EXCLUDED.region, query = EXCLUDED.query, lat = EXCLUDED.lat,
lng = EXCLUDED.lng, confidence = EXCLUDED.confidence, provider = EXCLUDED.provider""",
(key, loc, cluster, region, query, lat, lng, conf, _PROVIDER),
)
written += 1
log.info(" geocoded %s -> %.5f, %.5f", query, lat, lng)
n = _resolve()
log.info("locations: %d accepted, %d rejected (too far); re-resolved geom on %d tickets "
"(unverified — review tickets.geo_locations)", written, rejected, n)
def _resolve() -> int:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute("SELECT tickets.resolve_ticket_geoms()")
return cur.fetchone()[0]
# ── entrypoint ────────────────────────────────────────────────────────────────
def main() -> None:
ap = argparse.ArgumentParser(description="Ingest INC tickets from CSV (raw-first) + geocode")
ap.add_argument("--apply", action="store_true", help="Write to DB (default: dry-run)")
ap.add_argument("--from-bucket", action="store_true",
help="Ingest the newest INC csv from the rustfs tickets bucket (aws CLI); "
"skips if unchanged (ETag) and archives processed files")
ap.add_argument("--inc-csv", default=None, help="Local INC tickets CSV file (dev)")
ap.add_argument("--geocode-clusters", action="store_true",
help="Geocode distinct clusters into the gazetteer, then re-resolve geoms")
ap.add_argument("--geocode-locations", action="store_true",
help="Geocode actionable-INC location_names precisely (keyed provider), then re-resolve")
args = ap.parse_args()
if args.geocode_clusters:
geocode_clusters(apply=args.apply)
return
if args.geocode_locations:
geocode_locations(apply=args.apply)
return
if not (args.from_bucket or args.inc_csv):
ap.error("provide --from-bucket, --inc-csv, --geocode-clusters, or --geocode-locations")
ingest(args)
if __name__ == "__main__":
main()