commit 4631cc63820eccfa3008f54e3fbb6783583e22ff Author: david kiania Date: Thu Jun 11 20:13:50 2026 +0300 feat: fleettickets — INC/CRQ ticket ingestion, geocoding + read-schema Standalone module extracted from the tracksolid repo (was migrations 21-23 + tools/import_tickets.py). Owns the `tickets` schema in the shared tracksolid_db. - migrations/01_tickets_schema.sql: consolidated final-state schema (tickets.inc/ crq raw-jsonb-first, geo_clusters + geo_locations gazetteers, geom trigger, reporting.fn_tickets_for_map) - import_tickets.py: rustfs bucket ingest + cluster/location geocoding (LocationIQ/OpenCage, viewbox-bounded + cluster-distance guard) - run_migrations.py, shared.py (self-contained), pyproject, .env.example, README The DB stays in tracksolid_db; dashboard_api keeps serving /webhook/tickets; the Tickets map stays a FleetOps tab. Co-Authored-By: Claude Opus 4.8 diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..b0fd306 --- /dev/null +++ b/.env.example @@ -0,0 +1,17 @@ +# fleettickets — copy to .env and fill in. NEVER commit the real .env. + +# Shared database (the `tickets` schema lives in tracksolid_db; internal Docker host) +DATABASE_URL=postgresql://tracksolid_owner:@timescale_db:5432/tracksolid_db + +# rustfs / S3 — source ticket snapshots (automations/{inc,crq}/latest.json) +RUSTFS_ENDPOINT=https://s3.rahamafresh.com +RUSTFS_ACCESS_KEY= +RUSTFS_SECRET_KEY= +RUSTFS_REGION=us-east-1 +TICKETS_BUCKET=tickets + +# Geocoder (keyed — public Nominatim rate-limits bulk) +GEOCODER_PROVIDER=locationiq # locationiq | opencage +GEOCODER_API_KEY= +GEOCODER_MIN_INTERVAL_S=1.1 # throttle to provider TOS +GEOCODER_MAX_KM=25 # reject a location geocode this far from its cluster centroid diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c678c77 --- /dev/null +++ b/.gitignore @@ -0,0 +1,8 @@ +.env +__pycache__/ +*.pyc +.venv/ +uv.lock +*.json +!.*.json +.DS_Store diff --git a/README.md b/README.md new file mode 100644 index 0000000..e1aa5fc --- /dev/null +++ b/README.md @@ -0,0 +1,71 @@ +# fleettickets + +Field-ops **INC / CRQ ticket** ingestion, geocoding, and read-schema that powers the +**Tickets** map in FleetOps. Extracted from the `tracksolid` repo into its own module +(it previously lived there as migrations 21–23 + `tools/import_tickets.py`). + +- **INC** — incident / customer-fault tickets +- **CRQ** — new-installation requests + +## What this owns + +| Piece | What | +|---|---| +| `migrations/01_tickets_schema.sql` | The `tickets` schema: `tickets.inc` / `tickets.crq` (raw-jsonb-first), `tickets.geo_clusters` + `tickets.geo_locations` gazetteers, geom-resolution trigger, and `reporting.fn_tickets_for_map` (the GeoJSON read function) | +| `import_tickets.py` | Pulls ticket snapshots from the rustfs `tickets` bucket and upserts them; geocodes clusters + INC locations | +| `run_migrations.py` | Applies `migrations/*.sql` in order (ledger: `tickets.schema_migrations`) | +| `shared.py` | Minimal DB/logging helpers (self-contained — no tracksolid dependency) | + +## What this does NOT own (stays where it is) + +- **The DB** — the `tickets` schema lives in the shared `tracksolid_db`. +- **The read-API** — `dashboard_api` (in the tracksolid stack) serves + `GET /webhook/tickets`, which calls `reporting.fn_tickets_for_map` (defined here). +- **The frontend** — the Tickets map is a tab in the **FleetOps** SPA (`fleetops` repo). + +## Data model (raw-first) + +Each row is just `ticket_id` + `raw` (the full source record as `jsonb`) + a derived +`geom` / `geo_source`. Everything reads from `raw`, so a change to the source schema +needs no migration. `geom` is resolved: **feed** coords (`raw` lat/lng) → **location** +(geocoded `location_name`) → **cluster** centroid → **none**. + +Source coordinates are empty in the feed, so geocoding is required: +- `--geocode-clusters` — one coordinate per cluster (coarse fallback). +- `--geocode-locations` — precise per-location for **actionable INC** tickets: strips the + network codes from `location_name` (e.g. `NW_`, `ADR_MNT_`, `FDT`, `SDUS`), geocodes + the real place via a **keyed** provider (LocationIQ / OpenCage), and **rejects any result + >25 km from the cluster centroid** (wrong-city guard). Results cache in + `tickets.geo_locations`. + +## Setup + +```bash +uv sync +cp .env.example .env # fill in DATABASE_URL, RUSTFS_*, GEOCODER_* +python run_migrations.py # apply the schema (idempotent) +``` + +## Run + +```bash +# ingest the latest snapshots from the bucket +python import_tickets.py --from-bucket --apply + +# geocode (needs GEOCODER_API_KEY) +python import_tickets.py --geocode-clusters --apply # coarse, once +python import_tickets.py --geocode-locations --apply # precise, actionable INC + +# from local files instead of the bucket +python import_tickets.py --inc-json inc.json --crq-json crq.json --apply +``` + +Dry-run is the default (omit `--apply`). `import_tickets.py --from-bucket` shells out to +the `aws` CLI using the `RUSTFS_*` env (no boto3 dependency). + +## Notes + +- The `changes/` subdirectory in the bucket holds **full timestamped snapshots** (not + deltas) — ingest `latest.json` only; don't process `changes/`. +- The curated/geocoded coordinates are written `verified = false` — review + `tickets.geo_clusters` / `tickets.geo_locations` and flip `verified` once checked. diff --git a/import_tickets.py b/import_tickets.py new file mode 100644 index 0000000..39da3a4 --- /dev/null +++ b/import_tickets.py @@ -0,0 +1,375 @@ +""" +import_tickets.py — Fireside Communications · INC/CRQ ticket ingestion (raw-first) +━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ +Loads the client's field-ops ticket snapshots into the `tickets` schema — the +source of the FleetOps "Tickets" map. Two categories, one table each: + tickets.inc — incidents / customer faults + tickets.crq — new-installation requests + +RAW-FIRST: each row stores only `ticket_id` + `raw` (the full source record as +jsonb). Everything downstream reads from `raw` (resilient to source schema drift). +The DB derives `geom` (see migrations): feed coords (raw lat/lng) -> location geocode +(tickets.geo_locations) -> cluster centroid (tickets.geo_clusters) -> none. + +Source data: rustfs `tickets` bucket, full snapshots from the client's email +automation — automations/{inc,crq}/latest.json (array of 32-key objects). + +Geocoding (two layers, both via a KEYED provider — public Nominatim rate-limits): + --geocode-clusters one coordinate per cluster (coarse fallback; ~50 lookups) + --geocode-locations precise per-location for ACTIONABLE INC tickets: parses the + real place out of location_name (region+cluster+location_name, + network codes stripped), geocodes it, caches in + tickets.geo_locations, then re-resolves geoms. +Provider/key from env: GEOCODER_PROVIDER (locationiq|opencage), GEOCODER_API_KEY. + +Usage (needs DATABASE_URL + RUSTFS_* + GEOCODER_* env; see .env.example): + python import_tickets.py --from-bucket --apply + python import_tickets.py --inc-json inc.json --crq-json crq.json --apply + python import_tickets.py --geocode-clusters --apply + python import_tickets.py --geocode-locations --apply + +Pre-requisite: migration applied (run_migrations.py) — tickets.inc/crq + +geo_clusters + geo_locations + reporting.fn_tickets_for_map. +━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ +""" + +from __future__ import annotations + +import argparse +import json +import math +import os +import re +import subprocess +import time + +import requests +import psycopg2.extras + +from shared import clean, get_conn, get_logger + +log = get_logger("import_tickets") + +TABLE_FOR = {"inc": "tickets.inc", "crq": "tickets.crq"} +_BUCKET = os.getenv("TICKETS_BUCKET", "tickets") +_BUCKET_KEYS = {"inc": "automations/inc/latest.json", "crq": "automations/crq/latest.json"} + +# Geocoder (keyed) — public Nominatim rate-limits bulk, so we use LocationIQ/OpenCage. +_PROVIDER = os.getenv("GEOCODER_PROVIDER", "locationiq").lower() +_API_KEY = os.getenv("GEOCODER_API_KEY", "") +_GEOCODE_INTERVAL_S = float(os.getenv("GEOCODER_MIN_INTERVAL_S", "1.1")) +_last_geocode_at = 0.0 + + +# ── data loading ────────────────────────────────────────────────────────────── +def _load_local(path: str) -> list[dict]: + with open(path, encoding="utf-8") as f: + data = json.load(f) # json.loads accepts NaN by default + return data if isinstance(data, list) else [] + + +def _load_bucket(kind: str) -> list[dict]: + env = { + **os.environ, + "AWS_ACCESS_KEY_ID": os.environ["RUSTFS_ACCESS_KEY"], + "AWS_SECRET_ACCESS_KEY": os.environ["RUSTFS_SECRET_KEY"], + "AWS_DEFAULT_REGION": os.getenv("RUSTFS_REGION", "us-east-1"), + } + uri = f"s3://{_BUCKET}/{_BUCKET_KEYS[kind]}" + log.info("fetching %s", uri) + out = subprocess.run( + ["aws", "--endpoint-url", os.environ["RUSTFS_ENDPOINT"], "s3", "cp", uri, "-"], + env=env, capture_output=True, timeout=180, check=True, + ).stdout + data = json.loads(out.decode("utf-8")) + return data if isinstance(data, list) else [] + + +# ── upsert (raw-first) ──────────────────────────────────────────────────────── +def _scrub_nan(row: dict) -> dict: + # Postgres jsonb rejects the JSON `NaN` token (e.g. mttr) — scrub to null. + return {k: (None if isinstance(v, float) and math.isnan(v) else v) for k, v in row.items()} + + +def upsert(rows: list[dict], table: str, apply: bool) -> int: + payload = [ + (tid, psycopg2.extras.Json(_scrub_nan(r))) + for r in rows + if (tid := clean(r.get("ticket_id"))) + ] + log.info("%s: %d valid rows (skipped %d without ticket_id)", + table, len(payload), len(rows) - len(payload)) + if not apply: + log.info("DRY-RUN — nothing written to %s. Use --apply.", table) + return len(payload) + with get_conn() as conn: + with conn.cursor() as cur: + psycopg2.extras.execute_values( + cur, + f"INSERT INTO {table} (ticket_id, raw) VALUES %s " + "ON CONFLICT (ticket_id) DO UPDATE SET raw = EXCLUDED.raw, ingested_at = now()", + payload, page_size=500, + ) + log.info("upserted %d rows into %s", len(payload), table) + return len(payload) + + +def ingest(args) -> None: + if args.from_bucket: + for kind in ("inc", "crq"): + upsert(_load_bucket(kind), TABLE_FOR[kind], args.apply) + else: + if args.inc_json: + upsert(_load_local(args.inc_json), TABLE_FOR["inc"], args.apply) + if args.crq_json: + upsert(_load_local(args.crq_json), TABLE_FOR["crq"], args.apply) + + +# ── place extraction (strip network codes, keep the real place) ─────────────── +# Leading site-code prefixes (NW_, CO_, ADR_MNT_, COAST_, …) — applied repeatedly. +_PREFIX_RE = re.compile(r"^(?:NW|NE|NM|SW|SE|CO|COAST|ADR|MNT|CMT|DR|NAIROBI|FIBER\w*)[\s_]+") +# Inline network/work-order codes to drop wherever they appear. +_CODE_RE = re.compile( + r"\b(?:SDUS|SDU|MDUS|MDU|FDT\s*\d*|AP|CLUSTER\s*\d*[A-Z]?|PHASE\s*\d+|CL\s*\d+|MNT|SITE|ADR)\b" +) + + +def extract_place(location_name: str | None) -> str: + """Pull the human place/landmark out of a coded location_name string. + + e.g. 'NW_RUIRU KAMAKIS_DEEP EAST APARTMENT-37' -> 'RUIRU KAMAKIS DEEP EAST APARTMENT' + """ + s = (location_name or "").upper().strip() + if not s: + return "" + # drop the trailing '-' segment (e.g. -37, -CALL CLIENT, -F32) + if "-" in s: + s = s.rsplit("-", 1)[0] + s = s.replace("_", " ") + # strip leading site-code prefixes (may be stacked: ADR MNT KAHAWA…) + prev = None + while prev != s: + prev = s + s = _PREFIX_RE.sub("", s).strip() + s = _CODE_RE.sub(" ", s) + s = re.sub(r"\s+", " ", s).strip(" ,-") + return s + + +def compose_query(location_name: str | None, cluster: str | None, region: str | None) -> str: + parts = [p for p in (extract_place(location_name), clean(cluster), clean(region), "Kenya") if p] + return ", ".join(dict.fromkeys(parts)) # de-dupe while preserving order + + +# ── keyed geocoder ──────────────────────────────────────────────────────────── +def _throttle() -> None: + global _last_geocode_at + wait = _GEOCODE_INTERVAL_S - (time.monotonic() - _last_geocode_at) + if wait > 0: + time.sleep(wait) + _last_geocode_at = time.monotonic() + + +def _haversine_km(lat1: float, lng1: float, lat2: float, lng2: float) -> float: + dlat, dlng = math.radians(lat2 - lat1), math.radians(lng2 - lng1) + a = (math.sin(dlat / 2) ** 2 + + math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(dlng / 2) ** 2) + return 2 * 6371.0 * math.asin(math.sqrt(a)) + + +def geocode(query: str, viewbox: tuple | None = None) -> tuple[float, float, float | None] | None: + """Forward-geocode via the configured keyed provider. (lat, lng, confidence) | None. + + `viewbox` = (min_lon, min_lat, max_lon, max_lat) constrains results to a box + around the cluster centroid (bounded), which stops the geocoder matching a + landmark name in the wrong city. + """ + if not _API_KEY: + log.error("GEOCODER_API_KEY is not set — cannot geocode (provider=%s)", _PROVIDER) + return None + _throttle() + try: + if _PROVIDER == "opencage": + params = {"key": _API_KEY, "q": query, "limit": 1, "countrycode": "ke", "no_annotations": 1} + if viewbox: + params["bounds"] = "%s,%s,%s,%s" % viewbox + r = requests.get("https://api.opencagedata.com/geocode/v1/json", params=params, timeout=15) + r.raise_for_status() + res = (r.json().get("results") or []) + if res: + g = res[0]["geometry"] + return float(g["lat"]), float(g["lng"]), res[0].get("confidence") + else: # locationiq (default) + params = {"key": _API_KEY, "q": query, "format": "json", "limit": 1, "countrycodes": "ke"} + if viewbox: + params["viewbox"] = "%s,%s,%s,%s" % viewbox + params["bounded"] = 1 + r = requests.get("https://us1.locationiq.com/v1/search", params=params, timeout=15) + if r.status_code == 404: # LocationIQ returns 404 for "no matches" + return None + r.raise_for_status() + hits = r.json() + if hits: + h = hits[0] + return float(h["lat"]), float(h["lon"]), float(h.get("importance") or 0) + except (requests.RequestException, ValueError, KeyError) as e: + log.warning("geocode failed for %r: %s", query, e) + return None + + +# ── cluster gazetteer (coarse fallback) ─────────────────────────────────────── +def geocode_clusters(apply: bool) -> None: + with get_conn() as conn: + with conn.cursor() as cur: + cur.execute( + """ + SELECT key, region FROM ( + SELECT tickets.norm_cluster(raw->>'cluster') AS key, + (array_agg(raw->>'region'))[1] AS region + FROM tickets.inc WHERE raw->>'cluster' IS NOT NULL GROUP BY 1 + UNION + SELECT tickets.norm_cluster(raw->>'cluster'), + (array_agg(raw->>'region'))[1] + FROM tickets.crq WHERE raw->>'cluster' IS NOT NULL GROUP BY 1 + ) z + WHERE key IS NOT NULL + AND NOT EXISTS (SELECT 1 FROM tickets.geo_clusters g + WHERE g.cluster_key = z.key AND g.geom IS NOT NULL) + """ + ) + todo = cur.fetchall() + log.info("%d clusters to geocode", len(todo)) + if not apply: + for key, region in todo: + log.info(" would geocode cluster: %s (%s)", key, region) + return + written = 0 + for key, region in todo: + hit = geocode(f"{key}, {region}, Kenya" if region else f"{key}, Kenya") + if not hit: + continue + lat, lng, _ = hit + with get_conn() as conn: + with conn.cursor() as cur: + cur.execute( + """INSERT INTO tickets.geo_clusters (cluster_key, region, lat, lng, source, verified) + VALUES (%s, %s, %s, %s, %s, false) + ON CONFLICT (cluster_key) DO UPDATE + SET region = EXCLUDED.region, lat = EXCLUDED.lat, + lng = EXCLUDED.lng, source = EXCLUDED.source""", + (key, region, lat, lng, _PROVIDER), + ) + written += 1 + _resolve() + log.info("gazetteer: %d clusters written (unverified — review tickets.geo_clusters)", written) + + +# ── per-location geocoding (precise; actionable INC) ────────────────────────── +# A location geocode is only trusted if it lands within this radius of the +# cluster centroid; otherwise the geocoder matched the landmark in the wrong +# place and we fall back to the cluster centroid. +_MAX_KM_FROM_CLUSTER = float(os.getenv("GEOCODER_MAX_KM", "25")) +_VIEWBOX_DEG = 0.2 # ~22 km half-box around the cluster centroid + + +def geocode_locations(apply: bool) -> None: + with get_conn() as conn: + with conn.cursor() as cur: + cur.execute( + """ + SELECT t.key, t.location_name, t.cluster, t.region, gc.lat AS clat, gc.lng AS clng + FROM ( + SELECT tickets.norm_cluster(raw->>'location_name') AS key, + (array_agg(raw->>'location_name'))[1] AS location_name, + (array_agg(raw->>'cluster'))[1] AS cluster, + (array_agg(raw->>'region'))[1] AS region, + tickets.norm_cluster((array_agg(raw->>'cluster'))[1]) AS ckey + FROM tickets.inc + WHERE (raw->>'is_actionable')::boolean + AND raw->>'location_name' IS NOT NULL + AND tickets.norm_cluster(raw->>'location_name') IS NOT NULL + AND NOT EXISTS (SELECT 1 FROM tickets.geo_locations gl + WHERE gl.query_key = tickets.norm_cluster(raw->>'location_name') + AND gl.geom IS NOT NULL) + GROUP BY 1 + ) t + LEFT JOIN tickets.geo_clusters gc ON gc.cluster_key = t.ckey + """ + ) + todo = cur.fetchall() + log.info("%d actionable-INC locations to geocode (provider=%s)", len(todo), _PROVIDER) + if not apply: + for key, loc, cluster, region, clat, clng in todo[:50]: + log.info(" %s -> %r", key, compose_query(loc, cluster, region)) + return + written = rejected = 0 + for key, loc, cluster, region, clat, clng in todo: + query = compose_query(loc, cluster, region) + viewbox = None + if clat is not None and clng is not None: + viewbox = (clng - _VIEWBOX_DEG, clat - _VIEWBOX_DEG, clng + _VIEWBOX_DEG, clat + _VIEWBOX_DEG) + hit = geocode(query, viewbox) + if not hit: + continue + lat, lng, conf = hit + # distance sanity: a result far from the cluster centroid is a wrong-city + # match — drop it so the ticket keeps the cluster-centroid fallback. + if clat is not None and clng is not None: + km = _haversine_km(lat, lng, clat, clng) + if km > _MAX_KM_FROM_CLUSTER: + rejected += 1 + log.info(" reject (%.0f km from cluster): %s", km, query) + continue + with get_conn() as conn: + with conn.cursor() as cur: + cur.execute( + """INSERT INTO tickets.geo_locations + (query_key, location_name, cluster, region, query, lat, lng, confidence, provider) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) + ON CONFLICT (query_key) DO UPDATE + SET location_name = EXCLUDED.location_name, cluster = EXCLUDED.cluster, + region = EXCLUDED.region, query = EXCLUDED.query, lat = EXCLUDED.lat, + lng = EXCLUDED.lng, confidence = EXCLUDED.confidence, provider = EXCLUDED.provider""", + (key, loc, cluster, region, query, lat, lng, conf, _PROVIDER), + ) + written += 1 + log.info(" geocoded %s -> %.5f, %.5f", query, lat, lng) + n = _resolve() + log.info("locations: %d accepted, %d rejected (too far); re-resolved geom on %d tickets " + "(unverified — review tickets.geo_locations)", written, rejected, n) + + +def _resolve() -> int: + with get_conn() as conn: + with conn.cursor() as cur: + cur.execute("SELECT tickets.resolve_ticket_geoms()") + return cur.fetchone()[0] + + +# ── entrypoint ──────────────────────────────────────────────────────────────── +def main() -> None: + ap = argparse.ArgumentParser(description="Ingest INC/CRQ tickets (raw-first) + geocode") + ap.add_argument("--apply", action="store_true", help="Write to DB (default: dry-run)") + ap.add_argument("--from-bucket", action="store_true", + help="Fetch latest.json for inc+crq from the rustfs tickets bucket (aws CLI)") + ap.add_argument("--inc-json", default=None, help="Local INC tickets JSON file") + ap.add_argument("--crq-json", default=None, help="Local CRQ tickets JSON file") + ap.add_argument("--geocode-clusters", action="store_true", + help="Geocode distinct clusters into the gazetteer, then re-resolve geoms") + ap.add_argument("--geocode-locations", action="store_true", + help="Geocode actionable-INC location_names precisely (keyed provider), then re-resolve") + args = ap.parse_args() + + if args.geocode_clusters: + geocode_clusters(apply=args.apply) + return + if args.geocode_locations: + geocode_locations(apply=args.apply) + return + if not (args.from_bucket or args.inc_json or args.crq_json): + ap.error("provide --from-bucket, --inc-json/--crq-json, --geocode-clusters, or --geocode-locations") + ingest(args) + + +if __name__ == "__main__": + main() diff --git a/migrations/01_tickets_schema.sql b/migrations/01_tickets_schema.sql new file mode 100644 index 0000000..f9e9e94 --- /dev/null +++ b/migrations/01_tickets_schema.sql @@ -0,0 +1,276 @@ +-- 01_tickets_schema.sql — fleettickets · INC/CRQ ticket store (raw-jsonb-first) +-- ───────────────────────────────────────────────────────────────────────────── +-- Consolidated final-state schema for the field-ops ticket layer. This supersedes +-- the historical tracksolid migrations 21→23 (where this feature was first built); +-- fleettickets now owns the `tickets` schema. The schema lives in the shared +-- database (`tracksolid_db` today) so the existing dashboard_api read-API and the +-- FleetOps Tickets map keep working unchanged. +-- +-- tickets.inc / tickets.crq one raw-jsonb row per ticket (ticket_id + raw + +-- derived geom/geo_source). INC = incident/fault, +-- CRQ = new-installation. +-- tickets.geo_clusters cluster -> coordinate gazetteer (coarse fallback) +-- tickets.geo_locations cleaned-location -> coordinate cache (precise) +-- reporting.fn_tickets_for_map GeoJSON read function consumed by dashboard_api +-- +-- geom resolution: feed coords (raw lat/lng) -> location cache -> cluster centroid +-- -> none. Idempotent: safe on a fresh DB and re-appliable on the live DB. +-- Requires PostGIS (present on the shared DB; on a brand-new DB a superuser must +-- run CREATE EXTENSION postgis first). +-- ───────────────────────────────────────────────────────────────────────────── + +CREATE EXTENSION IF NOT EXISTS postgis; +CREATE SCHEMA IF NOT EXISTS tickets; +CREATE SCHEMA IF NOT EXISTS reporting; -- shared read layer (the fn lives here for dashboard_api) +SET search_path = tickets, public; + +-- ── normalize helper (generic upper/collapse/trim key) ─────────────────────── +CREATE OR REPLACE FUNCTION tickets.norm_cluster(p text) + RETURNS text LANGUAGE sql IMMUTABLE PARALLEL SAFE +AS $fn$ SELECT NULLIF(upper(regexp_replace(trim(COALESCE(p, '')), '\s+', ' ', 'g')), '') $fn$; + +-- ── gazetteer: cluster -> coordinates (coarse fallback) ────────────────────── +CREATE TABLE IF NOT EXISTS tickets.geo_clusters ( + cluster_key text PRIMARY KEY, + region text, + lat double precision, + lng double precision, + geom geometry(Point, 4326), + source text, + verified boolean NOT NULL DEFAULT false, + updated_at timestamptz NOT NULL DEFAULT now() +); + +CREATE OR REPLACE FUNCTION tickets.tg_geo_clusters_geom() + RETURNS trigger LANGUAGE plpgsql AS $fn$ +BEGIN + IF NEW.lat IS NOT NULL AND NEW.lng IS NOT NULL + AND NEW.lat BETWEEN -90 AND 90 AND NEW.lng BETWEEN -180 AND 180 + AND NOT (NEW.lat = 0 AND NEW.lng = 0) THEN + NEW.geom := ST_SetSRID(ST_MakePoint(NEW.lng, NEW.lat), 4326); + ELSE + NEW.geom := NULL; + END IF; + NEW.updated_at := now(); + RETURN NEW; +END $fn$; +DROP TRIGGER IF EXISTS trg_geo_clusters_geom ON tickets.geo_clusters; +CREATE TRIGGER trg_geo_clusters_geom BEFORE INSERT OR UPDATE ON tickets.geo_clusters + FOR EACH ROW EXECUTE FUNCTION tickets.tg_geo_clusters_geom(); + +-- ── location geocode cache: cleaned location_name -> coordinates (precise) ─── +-- query_key = tickets.norm_cluster(location_name); resolve joins on it without +-- re-deriving the place-extraction regex (that lives in the loader). +CREATE TABLE IF NOT EXISTS tickets.geo_locations ( + query_key text PRIMARY KEY, + location_name text, + cluster text, + region text, + query text, + lat double precision, + lng double precision, + geom geometry(Point, 4326), + confidence numeric, + provider text, + verified boolean NOT NULL DEFAULT false, + updated_at timestamptz NOT NULL DEFAULT now() +); + +CREATE OR REPLACE FUNCTION tickets.tg_geo_locations_geom() + RETURNS trigger LANGUAGE plpgsql AS $fn$ +BEGIN + IF NEW.lat IS NOT NULL AND NEW.lng IS NOT NULL + AND NEW.lat BETWEEN -90 AND 90 AND NEW.lng BETWEEN -180 AND 180 + AND NOT (NEW.lat = 0 AND NEW.lng = 0) THEN + NEW.geom := ST_SetSRID(ST_MakePoint(NEW.lng, NEW.lat), 4326); + ELSE + NEW.geom := NULL; + END IF; + NEW.updated_at := now(); + RETURN NEW; +END $fn$; +DROP TRIGGER IF EXISTS trg_geo_locations_geom ON tickets.geo_locations; +CREATE TRIGGER trg_geo_locations_geom BEFORE INSERT OR UPDATE ON tickets.geo_locations + FOR EACH ROW EXECUTE FUNCTION tickets.tg_geo_locations_geom(); + +-- ── per-type ticket tables (raw-jsonb-first) ───────────────────────────────── +CREATE TABLE IF NOT EXISTS tickets.inc ( + ticket_id text PRIMARY KEY, + raw jsonb NOT NULL, + geom geometry(Point, 4326), + geo_source text, -- 'feed' | 'location' | 'cluster' | 'none' + ingested_at timestamptz NOT NULL DEFAULT now() +); +CREATE TABLE IF NOT EXISTS tickets.crq (LIKE tickets.inc INCLUDING ALL); + +-- ── geom trigger — read from raw; never clobber a deliberate geom-only update ─ +CREATE OR REPLACE FUNCTION tickets.tg_ticket_geom() + RETURNS trigger LANGUAGE plpgsql AS $fn$ +DECLARE + v_lat double precision := NULLIF(NEW.raw->>'latitude','')::double precision; + v_lng double precision := NULLIF(NEW.raw->>'longitude','')::double precision; + g geometry(Point, 4326); +BEGIN + IF TG_OP = 'UPDATE' AND NEW.raw IS NOT DISTINCT FROM OLD.raw THEN + RETURN NEW; -- geom/geo_source-only update — keep caller's value + END IF; + IF v_lat IS NOT NULL AND v_lng IS NOT NULL + AND v_lat BETWEEN -90 AND 90 AND v_lng BETWEEN -180 AND 180 + AND NOT (v_lat = 0 AND v_lng = 0) THEN + NEW.geom := ST_SetSRID(ST_MakePoint(v_lng, v_lat), 4326); + NEW.geo_source := 'feed'; + ELSE + SELECT gc.geom INTO g FROM tickets.geo_clusters gc + WHERE gc.cluster_key = tickets.norm_cluster(NEW.raw->>'cluster') AND gc.geom IS NOT NULL LIMIT 1; + IF g IS NOT NULL THEN NEW.geom := g; NEW.geo_source := 'cluster'; + ELSE NEW.geom := NULL; NEW.geo_source := 'none'; END IF; + END IF; + RETURN NEW; +END $fn$; + +DROP TRIGGER IF EXISTS trg_inc_geom ON tickets.inc; +CREATE TRIGGER trg_inc_geom BEFORE INSERT OR UPDATE ON tickets.inc + FOR EACH ROW EXECUTE FUNCTION tickets.tg_ticket_geom(); +DROP TRIGGER IF EXISTS trg_crq_geom ON tickets.crq; +CREATE TRIGGER trg_crq_geom BEFORE INSERT OR UPDATE ON tickets.crq + FOR EACH ROW EXECUTE FUNCTION tickets.tg_ticket_geom(); + +-- ── resolve — prefer location cache, else cluster centroid (non-feed rows) ─── +CREATE OR REPLACE FUNCTION tickets.resolve_ticket_geoms() + RETURNS integer LANGUAGE plpgsql AS $fn$ +DECLARE n integer; m integer; +BEGIN + UPDATE tickets.inc t + SET geom = COALESCE(loc.geom, gc.geom), + geo_source = CASE WHEN loc.geom IS NOT NULL THEN 'location' + WHEN gc.geom IS NOT NULL THEN 'cluster' ELSE 'none' END + FROM tickets.inc base + LEFT JOIN tickets.geo_locations loc + ON loc.query_key = tickets.norm_cluster(base.raw->>'location_name') AND loc.geom IS NOT NULL + LEFT JOIN tickets.geo_clusters gc + ON gc.cluster_key = tickets.norm_cluster(base.raw->>'cluster') AND gc.geom IS NOT NULL + WHERE t.ticket_id = base.ticket_id + AND t.geo_source IS DISTINCT FROM 'feed' + AND (t.geom IS DISTINCT FROM COALESCE(loc.geom, gc.geom) + OR t.geo_source IS DISTINCT FROM CASE WHEN loc.geom IS NOT NULL THEN 'location' + WHEN gc.geom IS NOT NULL THEN 'cluster' ELSE 'none' END); + GET DIAGNOSTICS n = ROW_COUNT; + + UPDATE tickets.crq t + SET geom = COALESCE(loc.geom, gc.geom), + geo_source = CASE WHEN loc.geom IS NOT NULL THEN 'location' + WHEN gc.geom IS NOT NULL THEN 'cluster' ELSE 'none' END + FROM tickets.crq base + LEFT JOIN tickets.geo_locations loc + ON loc.query_key = tickets.norm_cluster(base.raw->>'location_name') AND loc.geom IS NOT NULL + LEFT JOIN tickets.geo_clusters gc + ON gc.cluster_key = tickets.norm_cluster(base.raw->>'cluster') AND gc.geom IS NOT NULL + WHERE t.ticket_id = base.ticket_id + AND t.geo_source IS DISTINCT FROM 'feed' + AND (t.geom IS DISTINCT FROM COALESCE(loc.geom, gc.geom) + OR t.geo_source IS DISTINCT FROM CASE WHEN loc.geom IS NOT NULL THEN 'location' + WHEN gc.geom IS NOT NULL THEN 'cluster' ELSE 'none' END); + GET DIAGNOSTICS m = ROW_COUNT; + RETURN n + m; +END $fn$; + +-- ── indexes ─────────────────────────────────────────────────────────────────── +CREATE INDEX IF NOT EXISTS ix_inc_status_raw ON tickets.inc ((raw->>'normalized_status')); +CREATE INDEX IF NOT EXISTS ix_inc_actionable_raw ON tickets.inc (((raw->>'is_actionable')::boolean)) + WHERE (raw->>'is_actionable')::boolean; +CREATE INDEX IF NOT EXISTS ix_inc_cluster_raw ON tickets.inc (tickets.norm_cluster(raw->>'cluster')); +CREATE INDEX IF NOT EXISTS ix_inc_loc_raw ON tickets.inc (tickets.norm_cluster(raw->>'location_name')); +CREATE INDEX IF NOT EXISTS ix_inc_geom ON tickets.inc USING gist (geom); +CREATE INDEX IF NOT EXISTS ix_crq_status_raw ON tickets.crq ((raw->>'normalized_status')); +CREATE INDEX IF NOT EXISTS ix_crq_actionable_raw ON tickets.crq (((raw->>'is_actionable')::boolean)) + WHERE (raw->>'is_actionable')::boolean; +CREATE INDEX IF NOT EXISTS ix_crq_cluster_raw ON tickets.crq (tickets.norm_cluster(raw->>'cluster')); +CREATE INDEX IF NOT EXISTS ix_crq_loc_raw ON tickets.crq (tickets.norm_cluster(raw->>'location_name')); +CREATE INDEX IF NOT EXISTS ix_crq_geom ON tickets.crq USING gist (geom); + +-- ── read function (GeoJSON; consumed by dashboard_api GET /webhook/tickets) ── +CREATE OR REPLACE FUNCTION reporting.fn_tickets_for_map( + p_service_type text DEFAULT NULL, + p_status text DEFAULT NULL, + p_open_only boolean DEFAULT true +) + RETURNS jsonb LANGUAGE plpgsql STABLE AS $fn$ +DECLARE v_result jsonb; +BEGIN + p_service_type := lower(NULLIF(p_service_type, '')); + p_status := NULLIF(p_status, ''); + WITH filtered AS ( + SELECT 'inc'::text AS service_type, raw, geom, geo_source FROM tickets.inc + WHERE geom IS NOT NULL + AND (p_service_type IS NULL OR p_service_type = 'inc') + AND (p_status IS NULL OR raw->>'normalized_status' = p_status) + AND (NOT p_open_only OR (raw->>'is_actionable')::boolean IS TRUE) + UNION ALL + SELECT 'crq'::text AS service_type, raw, geom, geo_source FROM tickets.crq + WHERE geom IS NOT NULL + AND (p_service_type IS NULL OR p_service_type = 'crq') + AND (p_status IS NULL OR raw->>'normalized_status' = p_status) + AND (NOT p_open_only OR (raw->>'is_actionable')::boolean IS TRUE) + ) + SELECT jsonb_build_object( + 'summary', jsonb_build_object( + 'ticket_count', COUNT(*), + 'inc', COUNT(*) FILTER (WHERE service_type = 'inc'), + 'crq', COUNT(*) FILTER (WHERE service_type = 'crq'), + 'open', COUNT(*) FILTER (WHERE (raw->>'is_actionable')::boolean IS TRUE), + 'by_status', (SELECT jsonb_object_agg(s, c) + FROM (SELECT raw->>'normalized_status' AS s, COUNT(*) AS c + FROM filtered GROUP BY raw->>'normalized_status') z) + ), + 'geojson', jsonb_build_object( + 'type', 'FeatureCollection', + 'features', COALESCE(jsonb_agg( + jsonb_build_object( + 'type', 'Feature', + 'properties', jsonb_build_object( + 'ticket_id', raw->>'ticket_id', + 'service_type', service_type, + 'status', raw->>'normalized_status', + 'raw_status', raw->>'raw_status', + 'cluster', raw->>'cluster', + 'region', raw->>'region', + 'location_name', raw->>'location_name', + 'department', raw->>'department', + 'owner', raw->>'owner', + 'assigned_team', raw->>'assigned_team', + 'sla_status', raw->>'sla_status', + 'is_actionable', (raw->>'is_actionable')::boolean, + 'geo_source', geo_source, + 'created_at', raw->>'created_at_service', + 'scheduled_at', raw->>'scheduled_at' + ), + 'geometry', ST_AsGeoJSON(geom)::jsonb + ) + ), '[]'::jsonb) + ) + ) INTO v_result FROM filtered; + RETURN v_result; +END $fn$; + +COMMENT ON FUNCTION reporting.fn_tickets_for_map(text, text, boolean) IS + 'INC/CRQ tickets (tickets.inc + tickets.crq, raw-jsonb-first) as GeoJSON. fleettickets 01.'; + +-- ── grants (guarded: roles may not exist on a fresh DB) ─────────────────────── +DO $grants$ +BEGIN + IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'tracksolid_owner') THEN + GRANT USAGE, CREATE ON SCHEMA tickets TO tracksolid_owner; + GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA tickets TO tracksolid_owner; + GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA tickets TO tracksolid_owner; + END IF; + IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'dashboard_ro') THEN + GRANT USAGE ON SCHEMA tickets TO dashboard_ro; + GRANT SELECT ON tickets.inc, tickets.crq, tickets.geo_clusters, tickets.geo_locations TO dashboard_ro; + GRANT EXECUTE ON FUNCTION reporting.fn_tickets_for_map(text, text, boolean) TO dashboard_ro; + END IF; + IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'grafana_ro') THEN + GRANT USAGE ON SCHEMA tickets TO grafana_ro; + GRANT SELECT ON tickets.inc, tickets.crq, tickets.geo_clusters, tickets.geo_locations TO grafana_ro; + GRANT EXECUTE ON FUNCTION reporting.fn_tickets_for_map(text, text, boolean) TO grafana_ro; + END IF; +END $grants$; diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..e35980a --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,22 @@ +[project] +name = "fleettickets" +version = "0.1.0" +description = "Field-ops INC/CRQ ticket ingestion, geocoding, and read-schema for the FleetOps Tickets map" +requires-python = ">=3.12" +dependencies = [ + "psycopg2-binary>=2.9.9", # DB driver + "requests>=2.32.3", # geocoder HTTP +] + +[project.optional-dependencies] +dev = [ + "ruff>=0.4", +] + +[tool.uv] +managed = true + +[tool.ruff] +target-version = "py312" +line-length = 100 +lint.select = ["E", "W", "F", "B", "UP"] diff --git a/run_migrations.py b/run_migrations.py new file mode 100644 index 0000000..06b04dc --- /dev/null +++ b/run_migrations.py @@ -0,0 +1,56 @@ +""" +run_migrations.py — fleettickets · apply SQL migrations in order. + +Applies migrations/*.sql (lexical order) against DATABASE_URL, tracking applied +files in tickets.schema_migrations. Migrations are idempotent, so re-running is +safe. Run: `python run_migrations.py`. +""" + +from __future__ import annotations + +import glob +import os + +import psycopg2 + +MIG_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "migrations") + + +def main() -> None: + dsn = os.environ.get("DATABASE_URL") + if not dsn: + raise SystemExit("DATABASE_URL is not set") + conn = psycopg2.connect(dsn) + conn.autocommit = False + try: + with conn.cursor() as cur: + cur.execute("CREATE SCHEMA IF NOT EXISTS tickets") + cur.execute( + "CREATE TABLE IF NOT EXISTS tickets.schema_migrations " + "(filename text PRIMARY KEY, applied_at timestamptz NOT NULL DEFAULT now())" + ) + conn.commit() + cur.execute("SELECT filename FROM tickets.schema_migrations") + applied = {r[0] for r in cur.fetchall()} + + for path in sorted(glob.glob(os.path.join(MIG_DIR, "*.sql"))): + fn = os.path.basename(path) + if fn in applied: + print(f" skip {fn}") + continue + print(f" apply {fn}") + with open(path, encoding="utf-8") as f: + cur.execute(f.read()) + cur.execute( + "INSERT INTO tickets.schema_migrations (filename) VALUES (%s) " + "ON CONFLICT DO NOTHING", + (fn,), + ) + conn.commit() + print("migrations up to date.") + finally: + conn.close() + + +if __name__ == "__main__": + main() diff --git a/shared.py b/shared.py new file mode 100644 index 0000000..fbbae31 --- /dev/null +++ b/shared.py @@ -0,0 +1,54 @@ +""" +shared.py — fleettickets · minimal DB + helper utilities. + +Self-contained replacement for the handful of helpers the loader used to import +from the tracksolid repo's `ts_shared_rev`, so fleettickets stands alone. +Connection comes from the DATABASE_URL env var (points at the shared +`tracksolid_db`, where the `tickets` schema lives). +""" + +from __future__ import annotations + +import logging +import os +from contextlib import contextmanager +from typing import Any, Optional + +import psycopg2 + + +def get_logger(name: str) -> logging.Logger: + logger = logging.getLogger(f"fleettickets.{name}") + if not logger.handlers: + h = logging.StreamHandler() + h.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(name)s — %(message)s", + datefmt="%Y-%m-%d %H:%M:%S")) + logger.addHandler(h) + logger.setLevel(os.getenv("LOG_LEVEL", "INFO")) + return logger + + +@contextmanager +def get_conn(): + """DB connection context manager. Auto-commits on success, rolls back on error.""" + dsn = os.environ.get("DATABASE_URL") + if not dsn: + raise RuntimeError("DATABASE_URL is not set") + conn = psycopg2.connect(dsn) + try: + conn.autocommit = False + yield conn + conn.commit() + except Exception: + conn.rollback() + raise + finally: + conn.close() + + +def clean(v: Any) -> Optional[str]: + """Trimmed string, or None if empty/None.""" + if v is None: + return None + s = str(v).strip() + return s if s != "" else None