From 868960c39cd96fa11d69a59845787fbbb72b1c0c Mon Sep 17 00:00:00 2001 From: david kiania Date: Thu, 11 Jun 2026 20:15:23 +0300 Subject: [PATCH] chore(tickets): hand the tickets schema + ingest to the fleettickets repo The `tickets` schema (INC/CRQ map), its migrations, and the ingest/geocode loader moved to their own repo: repo.rahamafresh.com/kianiadee/fleettickets.git. - remove migrations 21-23 and tools/import_tickets.py - run_migrations.py: drop the 21-23 entries (fleettickets owns them now) - dashboard_api keeps GET /webhook/tickets, calling reporting.fn_tickets_for_map which fleettickets defines - move geocoder env-var docs to fleettickets Co-Authored-By: Claude Opus 4.8 --- dashboard_api_rev.py | 9 +- docs/CONNECTIONS.md | 14 -- migrations/21_tickets.sql | 247 ------------------ migrations/22_tickets_schema.sql | 309 ----------------------- migrations/23_tickets_raw_first.sql | 246 ------------------ run_migrations.py | 7 +- tools/README.md | 4 +- tools/import_tickets.py | 377 ---------------------------- 8 files changed, 12 insertions(+), 1201 deletions(-) delete mode 100644 migrations/21_tickets.sql delete mode 100644 migrations/22_tickets_schema.sql delete mode 100644 migrations/23_tickets_raw_first.sql delete mode 100644 tools/import_tickets.py diff --git a/dashboard_api_rev.py b/dashboard_api_rev.py index feca70f..74009b8 100644 --- a/dashboard_api_rev.py +++ b/dashboard_api_rev.py @@ -265,12 +265,13 @@ def vehicle_track(vehicle_number: str | None = None, hours: int = 1): return JSONResponse({"error": "vehicle-track unavailable"}) -# ── Tickets (#21 — FleetOps Tickets map) ───────────────────────────────────── +# ── Tickets (FleetOps Tickets map) ─────────────────────────────────────────── # INC (incident / customer fault) + CRQ (new-installation) tickets as a GeoJSON # FeatureCollection for the FleetOps Tickets tab (FleetNow-style map). Backed by -# reporting.fn_tickets_for_map (migration 21) over tracksolid.tickets, which is -# fed from the rustfs `tickets` bucket by tools/import_tickets.py. Only geocoded -# rows are mapped; open_only (default true) restricts to actionable tickets. +# reporting.fn_tickets_for_map over tickets.inc / tickets.crq. The schema, ingest, +# and that read function are owned by the separate `fleettickets` repo +# (repo.rahamafresh.com/kianiadee/fleettickets.git); this endpoint just calls it. +# Only geocoded rows are mapped; open_only (default true) restricts to actionable. @app.get("/webhook/tickets") def tickets( service_type: str | None = None, # 'inc' | 'crq' | None (both) diff --git a/docs/CONNECTIONS.md b/docs/CONNECTIONS.md index ec81e6c..593edbb 100644 --- a/docs/CONNECTIONS.md +++ b/docs/CONNECTIONS.md @@ -66,20 +66,6 @@ docker exec -i $DB psql -U postgres -d tracksolid_db < migration.sql --- -## Geocoder (tickets location geocoding) - -Used by `tools/import_tickets.py --geocode-locations` / `--geocode-clusters` to turn ticket -`location_name`/`cluster` text into coordinates (`tickets.geo_locations` / `tickets.geo_clusters`). -A **keyed** provider is required — the public Nominatim server rate-limits bulk lookups. - -| Parameter | Env var | -|---|---| -| Provider | `GEOCODER_PROVIDER` (`locationiq` (default) \| `opencage`) | -| API key | `GEOCODER_API_KEY` | -| Min interval | `GEOCODER_MIN_INTERVAL_S` (default `1.1` — throttle to provider TOS) | - ---- - ## Container Name Resolution Coolify appends a random suffix to all container names. Never hardcode. Always resolve: diff --git a/migrations/21_tickets.sql b/migrations/21_tickets.sql deleted file mode 100644 index 99040a6..0000000 --- a/migrations/21_tickets.sql +++ /dev/null @@ -1,247 +0,0 @@ --- 21_tickets.sql --- Field-ops TICKET layer for FleetOps: INC (incident / customer fault) + CRQ --- (new-installation request). Tickets are produced by the client's email --- automation as full-snapshot files in the rustfs `tickets` bucket --- (automations/{inc,crq}/latest.json) and ingested by tools/import_tickets.py --- into tracksolid.tickets. The map read layer is reporting.fn_tickets_for_map, --- served by dashboard_api GET /webhook/tickets and rendered as INC/CRQ overlay --- layers in the FleetOps "Tickets" tab (FleetNow-style map). --- --- Geocoding: source rows carry NO coordinates (latitude/longitude are null in --- every row observed). geom is resolved from a small cluster gazetteer --- (tracksolid.geo_clusters), or directly from the feed lat/lng if the upstream --- ever starts populating them. Until the gazetteer is seeded, geom is NULL and --- tickets are simply not mapped (fn_tickets_for_map filters geom IS NOT NULL). --- --- Safe to re-apply (IF NOT EXISTS / CREATE OR REPLACE / DROP TRIGGER IF EXISTS). - -SET search_path = tracksolid, reporting, public; - --- ── normalize helper (cluster_key) ──────────────────────────────────────────── --- upper + collapse internal whitespace + trim; '' → NULL. IMMUTABLE so it can --- back a functional index. -CREATE OR REPLACE FUNCTION tracksolid.norm_cluster(p text) - RETURNS text LANGUAGE sql IMMUTABLE PARALLEL SAFE -AS $function$ - SELECT NULLIF(upper(regexp_replace(trim(COALESCE(p, '')), '\s+', ' ', 'g')), '') -$function$; - --- ── gazetteer: normalized cluster → coordinates ────────────────────────────── -CREATE TABLE IF NOT EXISTS tracksolid.geo_clusters ( - cluster_key text PRIMARY KEY, -- tracksolid.norm_cluster(cluster) - region text, - lat double precision, - lng double precision, - geom geometry(Point, 4326), - source text, -- 'nominatim' | 'manual' | … - verified boolean NOT NULL DEFAULT false, - updated_at timestamptz NOT NULL DEFAULT now() -); - -CREATE OR REPLACE FUNCTION tracksolid.tg_geo_clusters_geom() - RETURNS trigger LANGUAGE plpgsql -AS $function$ -BEGIN - IF NEW.lat IS NOT NULL AND NEW.lng IS NOT NULL - AND NEW.lat BETWEEN -90 AND 90 AND NEW.lng BETWEEN -180 AND 180 - AND NOT (NEW.lat = 0 AND NEW.lng = 0) THEN - NEW.geom := ST_SetSRID(ST_MakePoint(NEW.lng, NEW.lat), 4326); - ELSE - NEW.geom := NULL; - END IF; - NEW.updated_at := now(); - RETURN NEW; -END $function$; - -DROP TRIGGER IF EXISTS trg_geo_clusters_geom ON tracksolid.geo_clusters; -CREATE TRIGGER trg_geo_clusters_geom - BEFORE INSERT OR UPDATE ON tracksolid.geo_clusters - FOR EACH ROW EXECUTE FUNCTION tracksolid.tg_geo_clusters_geom(); - --- ── tickets ─────────────────────────────────────────────────────────────────── --- Columns mirror the source snapshot schema (32 fields, identical for INC/CRQ) --- 1:1, plus locally-derived geom / geo_source / raw / ingested_at. -CREATE TABLE IF NOT EXISTS tracksolid.tickets ( - ticket_id text PRIMARY KEY, - source_type text, - service_type text, -- 'inc' | 'crq' (type discriminator) - bucket text, - raw_status text, - normalized_status text, - created_at_service timestamptz, - scheduled_at timestamptz, - closed_at timestamptz, - last_seen_at timestamptz, - first_seen_at timestamptz, - week_start date, - week_end date, - cluster text, - region text, - location_name text, - latitude double precision, - longitude double precision, - department text, - assigned_team text, - owner text, - sla_status text, - mttr double precision, - is_auto_created boolean, - is_auto_closed boolean, - is_alarm boolean, - is_actionable boolean, - source_s3_bucket text, - source_s3_key text, - source_snapshot_id bigint, - created_at timestamptz, - updated_at timestamptz, - -- locally derived - geom geometry(Point, 4326), - geo_source text, -- 'feed' | 'cluster' | 'none' - raw jsonb, -- full source row (hedge for new fields) - ingested_at timestamptz NOT NULL DEFAULT now(), - CONSTRAINT tickets_service_type_chk - CHECK (service_type IS NULL OR service_type IN ('inc', 'crq')) -); - --- geom resolution: feed coords first, else cluster gazetteer, else none. -CREATE OR REPLACE FUNCTION tracksolid.tg_tickets_geom() - RETURNS trigger LANGUAGE plpgsql -AS $function$ -DECLARE - g geometry(Point, 4326); -BEGIN - IF NEW.latitude IS NOT NULL AND NEW.longitude IS NOT NULL - AND NEW.latitude BETWEEN -90 AND 90 - AND NEW.longitude BETWEEN -180 AND 180 - AND NOT (NEW.latitude = 0 AND NEW.longitude = 0) THEN - NEW.geom := ST_SetSRID(ST_MakePoint(NEW.longitude, NEW.latitude), 4326); - NEW.geo_source := 'feed'; - ELSE - SELECT gc.geom INTO g - FROM tracksolid.geo_clusters gc - WHERE gc.cluster_key = tracksolid.norm_cluster(NEW.cluster) - AND gc.geom IS NOT NULL - LIMIT 1; - IF g IS NOT NULL THEN - NEW.geom := g; NEW.geo_source := 'cluster'; - ELSE - NEW.geom := NULL; NEW.geo_source := 'none'; - END IF; - END IF; - RETURN NEW; -END $function$; - -DROP TRIGGER IF EXISTS trg_tickets_geom ON tracksolid.tickets; -CREATE TRIGGER trg_tickets_geom - BEFORE INSERT OR UPDATE ON tracksolid.tickets - FOR EACH ROW EXECUTE FUNCTION tracksolid.tg_tickets_geom(); - --- Re-resolve geoms for already-loaded tickets after the gazetteer is (re)seeded, --- without re-running ingestion. Skips rows already pinned by feed coordinates. --- Returns the number of rows updated. Call: SELECT tracksolid.resolve_ticket_geoms(); -CREATE OR REPLACE FUNCTION tracksolid.resolve_ticket_geoms() - RETURNS integer LANGUAGE plpgsql -AS $function$ -DECLARE n integer; -BEGIN - UPDATE tracksolid.tickets t - SET geom = gc.geom, geo_source = 'cluster' - FROM tracksolid.geo_clusters gc - WHERE gc.cluster_key = tracksolid.norm_cluster(t.cluster) - AND gc.geom IS NOT NULL - AND t.geo_source IS DISTINCT FROM 'feed' - AND t.geom IS DISTINCT FROM gc.geom; - GET DIAGNOSTICS n = ROW_COUNT; - RETURN n; -END $function$; - --- ── indexes ─────────────────────────────────────────────────────────────────── -CREATE INDEX IF NOT EXISTS ix_tickets_service_type ON tracksolid.tickets (service_type); -CREATE INDEX IF NOT EXISTS ix_tickets_status ON tracksolid.tickets (normalized_status); -CREATE INDEX IF NOT EXISTS ix_tickets_type_status ON tracksolid.tickets (service_type, normalized_status); -CREATE INDEX IF NOT EXISTS ix_tickets_actionable ON tracksolid.tickets (is_actionable) WHERE is_actionable; -CREATE INDEX IF NOT EXISTS ix_tickets_geom ON tracksolid.tickets USING gist (geom); -CREATE INDEX IF NOT EXISTS ix_tickets_cluster ON tracksolid.tickets (tracksolid.norm_cluster(cluster)); - --- ── map read function ───────────────────────────────────────────────────────── --- Modeled on reporting.fn_live_positions: NULLIF-cleaned params, single filtered --- CTE, returns { summary, geojson:FeatureCollection }. Maps only geocoded rows --- (geom IS NOT NULL). p_open_only restricts to actionable (open) tickets — the --- map default, since the bulk of INC are CLOSED. -CREATE OR REPLACE FUNCTION reporting.fn_tickets_for_map( - p_service_type text DEFAULT NULL, - p_status text DEFAULT NULL, - p_open_only boolean DEFAULT true -) - RETURNS jsonb LANGUAGE plpgsql STABLE -AS $function$ -DECLARE - v_result jsonb; -BEGIN - p_service_type := lower(NULLIF(p_service_type, '')); - p_status := NULLIF(p_status, ''); - WITH filtered AS ( - SELECT * - FROM tracksolid.tickets - WHERE geom IS NOT NULL - AND (p_service_type IS NULL OR service_type = p_service_type) - AND (p_status IS NULL OR normalized_status = p_status) - AND (NOT p_open_only OR is_actionable IS TRUE) - ) - SELECT jsonb_build_object( - 'summary', jsonb_build_object( - 'ticket_count', COUNT(*), - 'inc', COUNT(*) FILTER (WHERE service_type = 'inc'), - 'crq', COUNT(*) FILTER (WHERE service_type = 'crq'), - 'open', COUNT(*) FILTER (WHERE is_actionable IS TRUE), - 'by_status', (SELECT jsonb_object_agg(s, c) - FROM (SELECT normalized_status AS s, COUNT(*) AS c - FROM filtered GROUP BY normalized_status) z) - ), - 'geojson', jsonb_build_object( - 'type', 'FeatureCollection', - 'features', COALESCE(jsonb_agg( - jsonb_build_object( - 'type', 'Feature', - 'properties', jsonb_build_object( - 'ticket_id', ticket_id, - 'service_type', service_type, - 'status', normalized_status, - 'raw_status', raw_status, - 'cluster', cluster, - 'region', region, - 'location_name', location_name, - 'department', department, - 'owner', owner, - 'assigned_team', assigned_team, - 'sla_status', sla_status, - 'is_actionable', is_actionable, - 'geo_source', geo_source, - 'created_at', to_char(created_at_service, 'YYYY-MM-DD HH24:MI:SS'), - 'scheduled_at', to_char(scheduled_at, 'YYYY-MM-DD HH24:MI:SS') - ), - 'geometry', ST_AsGeoJSON(geom)::jsonb - ) - ), '[]'::jsonb) - ) - ) INTO v_result FROM filtered; - - RETURN v_result; -END $function$; - -COMMENT ON FUNCTION reporting.fn_tickets_for_map(text, text, boolean) IS - 'INC/CRQ tickets as a GeoJSON FeatureCollection for the FleetOps Tickets map. Migration 21.'; - --- ── grants (guarded: roles may not exist on a fresh DB) ─────────────────────── -DO $grants$ -BEGIN - IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'dashboard_ro') THEN - GRANT USAGE ON SCHEMA tracksolid TO dashboard_ro; - GRANT SELECT ON tracksolid.tickets, tracksolid.geo_clusters TO dashboard_ro; - GRANT EXECUTE ON FUNCTION reporting.fn_tickets_for_map(text, text, boolean) TO dashboard_ro; - END IF; - IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'grafana_ro') THEN - GRANT SELECT ON tracksolid.tickets, tracksolid.geo_clusters TO grafana_ro; - GRANT EXECUTE ON FUNCTION reporting.fn_tickets_for_map(text, text, boolean) TO grafana_ro; - END IF; -END $grants$; diff --git a/migrations/22_tickets_schema.sql b/migrations/22_tickets_schema.sql deleted file mode 100644 index 12af2b4..0000000 --- a/migrations/22_tickets_schema.sql +++ /dev/null @@ -1,309 +0,0 @@ --- 22_tickets_schema.sql --- Promote the ticket layer into its own `tickets` schema and SPLIT by type: --- migration-21 tracksolid.tickets (one table, service_type discriminator) --> --- tickets.inc + tickets.crq (one table per ticket type) --- tracksolid.geo_clusters --> tickets.geo_clusters (shared gazetteer) --- --- The read function reporting.fn_tickets_for_map keeps its SAME signature --- (so dashboard_api GET /webhook/tickets and the FleetOps map are unchanged) but --- now UNIONs the two per-type tables. The tracksolid.* objects from migration 21 --- are dropped after the data is copied across. Safe to re-apply. --- --- NOTE: this intentionally introduces a schema beyond `tracksolid`/`reporting` --- (a deliberate exception to the single-live-schema convention) to give tickets --- their own namespace + access boundary. - -CREATE SCHEMA IF NOT EXISTS tickets; -SET search_path = tickets, public; - --- ── normalize helper (cluster_key) ──────────────────────────────────────────── -CREATE OR REPLACE FUNCTION tickets.norm_cluster(p text) - RETURNS text LANGUAGE sql IMMUTABLE PARALLEL SAFE -AS $fn$ SELECT NULLIF(upper(regexp_replace(trim(COALESCE(p, '')), '\s+', ' ', 'g')), '') $fn$; - --- ── gazetteer: move from tracksolid if present, else create fresh ───────────── -DO $mv$ -BEGIN - IF to_regclass('tracksolid.geo_clusters') IS NOT NULL - AND to_regclass('tickets.geo_clusters') IS NULL THEN - ALTER TABLE tracksolid.geo_clusters SET SCHEMA tickets; - END IF; -END $mv$; - -CREATE TABLE IF NOT EXISTS tickets.geo_clusters ( - cluster_key text PRIMARY KEY, - region text, - lat double precision, - lng double precision, - geom geometry(Point, 4326), - source text, - verified boolean NOT NULL DEFAULT false, - updated_at timestamptz NOT NULL DEFAULT now() -); - -CREATE OR REPLACE FUNCTION tickets.tg_geo_clusters_geom() - RETURNS trigger LANGUAGE plpgsql AS $fn$ -BEGIN - IF NEW.lat IS NOT NULL AND NEW.lng IS NOT NULL - AND NEW.lat BETWEEN -90 AND 90 AND NEW.lng BETWEEN -180 AND 180 - AND NOT (NEW.lat = 0 AND NEW.lng = 0) THEN - NEW.geom := ST_SetSRID(ST_MakePoint(NEW.lng, NEW.lat), 4326); - ELSE - NEW.geom := NULL; - END IF; - NEW.updated_at := now(); - RETURN NEW; -END $fn$; -DROP TRIGGER IF EXISTS trg_geo_clusters_geom ON tickets.geo_clusters; -CREATE TRIGGER trg_geo_clusters_geom BEFORE INSERT OR UPDATE ON tickets.geo_clusters - FOR EACH ROW EXECUTE FUNCTION tickets.tg_geo_clusters_geom(); - --- ── per-type ticket tables (identical shape; CHECK-locked to their type) ────── -CREATE TABLE IF NOT EXISTS tickets.inc ( - ticket_id text PRIMARY KEY, - source_type text, - service_type text, - bucket text, - raw_status text, - normalized_status text, - created_at_service timestamptz, - scheduled_at timestamptz, - closed_at timestamptz, - last_seen_at timestamptz, - first_seen_at timestamptz, - week_start date, - week_end date, - cluster text, - region text, - location_name text, - latitude double precision, - longitude double precision, - department text, - assigned_team text, - owner text, - sla_status text, - mttr double precision, - is_auto_created boolean, - is_auto_closed boolean, - is_alarm boolean, - is_actionable boolean, - source_s3_bucket text, - source_s3_key text, - source_snapshot_id bigint, - created_at timestamptz, - updated_at timestamptz, - geom geometry(Point, 4326), - geo_source text, - raw jsonb, - ingested_at timestamptz NOT NULL DEFAULT now(), - CONSTRAINT inc_service_type_chk CHECK (service_type = 'inc') -); - --- crq mirrors inc's columns/defaults; add its own PK + type CHECK. -CREATE TABLE IF NOT EXISTS tickets.crq (LIKE tickets.inc INCLUDING DEFAULTS); -DO $crq$ -BEGIN - IF NOT EXISTS (SELECT 1 FROM pg_constraint - WHERE conrelid = 'tickets.crq'::regclass AND contype = 'p') THEN - ALTER TABLE tickets.crq ADD PRIMARY KEY (ticket_id); - END IF; - IF NOT EXISTS (SELECT 1 FROM pg_constraint - WHERE conrelid = 'tickets.crq'::regclass AND conname = 'crq_service_type_chk') THEN - ALTER TABLE tickets.crq ADD CONSTRAINT crq_service_type_chk CHECK (service_type = 'crq'); - END IF; -END $crq$; - --- ── shared geom-resolution trigger (feed coords → cluster gazetteer → none) ─── -CREATE OR REPLACE FUNCTION tickets.tg_ticket_geom() - RETURNS trigger LANGUAGE plpgsql AS $fn$ -DECLARE g geometry(Point, 4326); -BEGIN - IF NEW.latitude IS NOT NULL AND NEW.longitude IS NOT NULL - AND NEW.latitude BETWEEN -90 AND 90 - AND NEW.longitude BETWEEN -180 AND 180 - AND NOT (NEW.latitude = 0 AND NEW.longitude = 0) THEN - NEW.geom := ST_SetSRID(ST_MakePoint(NEW.longitude, NEW.latitude), 4326); - NEW.geo_source := 'feed'; - ELSE - SELECT gc.geom INTO g FROM tickets.geo_clusters gc - WHERE gc.cluster_key = tickets.norm_cluster(NEW.cluster) AND gc.geom IS NOT NULL LIMIT 1; - IF g IS NOT NULL THEN NEW.geom := g; NEW.geo_source := 'cluster'; - ELSE NEW.geom := NULL; NEW.geo_source := 'none'; END IF; - END IF; - RETURN NEW; -END $fn$; - -DROP TRIGGER IF EXISTS trg_inc_geom ON tickets.inc; -CREATE TRIGGER trg_inc_geom BEFORE INSERT OR UPDATE ON tickets.inc - FOR EACH ROW EXECUTE FUNCTION tickets.tg_ticket_geom(); -DROP TRIGGER IF EXISTS trg_crq_geom ON tickets.crq; -CREATE TRIGGER trg_crq_geom BEFORE INSERT OR UPDATE ON tickets.crq - FOR EACH ROW EXECUTE FUNCTION tickets.tg_ticket_geom(); - --- re-resolve geoms across both tables after the gazetteer is (re)seeded. -CREATE OR REPLACE FUNCTION tickets.resolve_ticket_geoms() - RETURNS integer LANGUAGE plpgsql AS $fn$ -DECLARE n integer; m integer; -BEGIN - UPDATE tickets.inc t SET geom = gc.geom, geo_source = 'cluster' - FROM tickets.geo_clusters gc - WHERE gc.cluster_key = tickets.norm_cluster(t.cluster) AND gc.geom IS NOT NULL - AND t.geo_source IS DISTINCT FROM 'feed' AND t.geom IS DISTINCT FROM gc.geom; - GET DIAGNOSTICS n = ROW_COUNT; - UPDATE tickets.crq t SET geom = gc.geom, geo_source = 'cluster' - FROM tickets.geo_clusters gc - WHERE gc.cluster_key = tickets.norm_cluster(t.cluster) AND gc.geom IS NOT NULL - AND t.geo_source IS DISTINCT FROM 'feed' AND t.geom IS DISTINCT FROM gc.geom; - GET DIAGNOSTICS m = ROW_COUNT; - RETURN n + m; -END $fn$; - --- ── indexes (per table) ─────────────────────────────────────────────────────── -CREATE INDEX IF NOT EXISTS ix_inc_status ON tickets.inc (normalized_status); -CREATE INDEX IF NOT EXISTS ix_inc_actionable ON tickets.inc (is_actionable) WHERE is_actionable; -CREATE INDEX IF NOT EXISTS ix_inc_geom ON tickets.inc USING gist (geom); -CREATE INDEX IF NOT EXISTS ix_inc_cluster ON tickets.inc (tickets.norm_cluster(cluster)); -CREATE INDEX IF NOT EXISTS ix_crq_status ON tickets.crq (normalized_status); -CREATE INDEX IF NOT EXISTS ix_crq_actionable ON tickets.crq (is_actionable) WHERE is_actionable; -CREATE INDEX IF NOT EXISTS ix_crq_geom ON tickets.crq USING gist (geom); -CREATE INDEX IF NOT EXISTS ix_crq_cluster ON tickets.crq (tickets.norm_cluster(cluster)); - --- ── backfill from migration-21 tracksolid.tickets, then retire it ───────────── -DO $bf$ -BEGIN - IF to_regclass('tracksolid.tickets') IS NOT NULL THEN - INSERT INTO tickets.inc ( - ticket_id, source_type, service_type, bucket, raw_status, normalized_status, - created_at_service, scheduled_at, closed_at, last_seen_at, first_seen_at, - week_start, week_end, cluster, region, location_name, latitude, longitude, - department, assigned_team, owner, sla_status, mttr, is_auto_created, - is_auto_closed, is_alarm, is_actionable, source_s3_bucket, source_s3_key, - source_snapshot_id, created_at, updated_at, raw) - SELECT - ticket_id, source_type, service_type, bucket, raw_status, normalized_status, - created_at_service, scheduled_at, closed_at, last_seen_at, first_seen_at, - week_start, week_end, cluster, region, location_name, latitude, longitude, - department, assigned_team, owner, sla_status, mttr, is_auto_created, - is_auto_closed, is_alarm, is_actionable, source_s3_bucket, source_s3_key, - source_snapshot_id, created_at, updated_at, raw - FROM tracksolid.tickets WHERE service_type = 'inc' - ON CONFLICT (ticket_id) DO NOTHING; - - INSERT INTO tickets.crq ( - ticket_id, source_type, service_type, bucket, raw_status, normalized_status, - created_at_service, scheduled_at, closed_at, last_seen_at, first_seen_at, - week_start, week_end, cluster, region, location_name, latitude, longitude, - department, assigned_team, owner, sla_status, mttr, is_auto_created, - is_auto_closed, is_alarm, is_actionable, source_s3_bucket, source_s3_key, - source_snapshot_id, created_at, updated_at, raw) - SELECT - ticket_id, source_type, service_type, bucket, raw_status, normalized_status, - created_at_service, scheduled_at, closed_at, last_seen_at, first_seen_at, - week_start, week_end, cluster, region, location_name, latitude, longitude, - department, assigned_team, owner, sla_status, mttr, is_auto_created, - is_auto_closed, is_alarm, is_actionable, source_s3_bucket, source_s3_key, - source_snapshot_id, created_at, updated_at, raw - FROM tracksolid.tickets WHERE service_type = 'crq' - ON CONFLICT (ticket_id) DO NOTHING; - END IF; -END $bf$; - --- retire migration-21 objects (data now lives in tickets.*) -DROP TABLE IF EXISTS tracksolid.tickets; -DROP FUNCTION IF EXISTS tracksolid.resolve_ticket_geoms(); -DROP FUNCTION IF EXISTS tracksolid.tg_tickets_geom(); -DROP FUNCTION IF EXISTS tracksolid.tg_geo_clusters_geom(); -DROP FUNCTION IF EXISTS tracksolid.norm_cluster(text); - --- ── read function (same signature; now UNIONs the two tables) ───────────────── -CREATE OR REPLACE FUNCTION reporting.fn_tickets_for_map( - p_service_type text DEFAULT NULL, - p_status text DEFAULT NULL, - p_open_only boolean DEFAULT true -) - RETURNS jsonb LANGUAGE plpgsql STABLE AS $fn$ -DECLARE v_result jsonb; -BEGIN - p_service_type := lower(NULLIF(p_service_type, '')); - p_status := NULLIF(p_status, ''); - WITH filtered AS ( - SELECT ticket_id, service_type, normalized_status, raw_status, cluster, region, - location_name, department, owner, assigned_team, sla_status, is_actionable, - geo_source, created_at_service, scheduled_at, geom - FROM tickets.inc - WHERE geom IS NOT NULL - AND (p_service_type IS NULL OR p_service_type = 'inc') - AND (p_status IS NULL OR normalized_status = p_status) - AND (NOT p_open_only OR is_actionable IS TRUE) - UNION ALL - SELECT ticket_id, service_type, normalized_status, raw_status, cluster, region, - location_name, department, owner, assigned_team, sla_status, is_actionable, - geo_source, created_at_service, scheduled_at, geom - FROM tickets.crq - WHERE geom IS NOT NULL - AND (p_service_type IS NULL OR p_service_type = 'crq') - AND (p_status IS NULL OR normalized_status = p_status) - AND (NOT p_open_only OR is_actionable IS TRUE) - ) - SELECT jsonb_build_object( - 'summary', jsonb_build_object( - 'ticket_count', COUNT(*), - 'inc', COUNT(*) FILTER (WHERE service_type = 'inc'), - 'crq', COUNT(*) FILTER (WHERE service_type = 'crq'), - 'open', COUNT(*) FILTER (WHERE is_actionable IS TRUE), - 'by_status', (SELECT jsonb_object_agg(s, c) - FROM (SELECT normalized_status AS s, COUNT(*) AS c - FROM filtered GROUP BY normalized_status) z) - ), - 'geojson', jsonb_build_object( - 'type', 'FeatureCollection', - 'features', COALESCE(jsonb_agg( - jsonb_build_object( - 'type', 'Feature', - 'properties', jsonb_build_object( - 'ticket_id', ticket_id, - 'service_type', service_type, - 'status', normalized_status, - 'raw_status', raw_status, - 'cluster', cluster, - 'region', region, - 'location_name', location_name, - 'department', department, - 'owner', owner, - 'assigned_team', assigned_team, - 'sla_status', sla_status, - 'is_actionable', is_actionable, - 'geo_source', geo_source, - 'created_at', to_char(created_at_service, 'YYYY-MM-DD HH24:MI:SS'), - 'scheduled_at', to_char(scheduled_at, 'YYYY-MM-DD HH24:MI:SS') - ), - 'geometry', ST_AsGeoJSON(geom)::jsonb - ) - ), '[]'::jsonb) - ) - ) INTO v_result FROM filtered; - RETURN v_result; -END $fn$; - -COMMENT ON FUNCTION reporting.fn_tickets_for_map(text, text, boolean) IS - 'INC/CRQ tickets (tickets.inc + tickets.crq) as GeoJSON for the FleetOps map. Migration 22.'; - --- ── grants ──────────────────────────────────────────────────────────────────── -DO $grants$ -BEGIN - IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'tracksolid_owner') THEN - GRANT USAGE, CREATE ON SCHEMA tickets TO tracksolid_owner; - GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA tickets TO tracksolid_owner; - GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA tickets TO tracksolid_owner; - END IF; - IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'dashboard_ro') THEN - GRANT USAGE ON SCHEMA tickets TO dashboard_ro; - GRANT SELECT ON tickets.inc, tickets.crq, tickets.geo_clusters TO dashboard_ro; - GRANT EXECUTE ON FUNCTION reporting.fn_tickets_for_map(text, text, boolean) TO dashboard_ro; - END IF; - IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'grafana_ro') THEN - GRANT USAGE ON SCHEMA tickets TO grafana_ro; - GRANT SELECT ON tickets.inc, tickets.crq, tickets.geo_clusters TO grafana_ro; - GRANT EXECUTE ON FUNCTION reporting.fn_tickets_for_map(text, text, boolean) TO grafana_ro; - END IF; -END $grants$; diff --git a/migrations/23_tickets_raw_first.sql b/migrations/23_tickets_raw_first.sql deleted file mode 100644 index 9343903..0000000 --- a/migrations/23_tickets_raw_first.sql +++ /dev/null @@ -1,246 +0,0 @@ --- 23_tickets_raw_first.sql --- Make `raw` (jsonb) the single source of truth for tickets, and add a --- location-level geocode cache so INC pins can be placed precisely (parsed from --- region + cluster + location_name) instead of all stacking on a cluster centroid. --- --- * tickets.inc / tickets.crq --> slim to (ticket_id, raw, geom, geo_source, ingested_at) --- (the ~31 flattened typed columns are DROPPED; raw keeps them) --- * tickets.geo_locations --> NEW: cleaned-location -> coordinates cache --- * geom resolution --> feed(raw lat/lng) -> location(cache) -> cluster(gazetteer) -> none --- --- The trigger/resolve/read-function are repointed to read from `raw`. The read --- function reporting.fn_tickets_for_map keeps its SAME signature, so dashboard_api --- and the FleetOps map need no change. Safe to re-apply. - -SET search_path = tickets, public; - --- ── 1. slim the per-type tables to raw-first ───────────────────────────────── -DO $slim$ -DECLARE c text; -BEGIN - FOREACH c IN ARRAY ARRAY[ - 'source_type','service_type','bucket','raw_status','normalized_status', - 'created_at_service','scheduled_at','closed_at','last_seen_at','first_seen_at', - 'week_start','week_end','cluster','region','location_name','latitude','longitude', - 'department','assigned_team','owner','sla_status','mttr','is_auto_created', - 'is_auto_closed','is_alarm','is_actionable','source_s3_bucket','source_s3_key', - 'source_snapshot_id','created_at','updated_at'] - LOOP - EXECUTE format('ALTER TABLE tickets.inc DROP COLUMN IF EXISTS %I', c); - EXECUTE format('ALTER TABLE tickets.crq DROP COLUMN IF EXISTS %I', c); - END LOOP; -END $slim$; - --- raw is now mandatory (all rows already carry it). -ALTER TABLE tickets.inc ALTER COLUMN raw SET NOT NULL; -ALTER TABLE tickets.crq ALTER COLUMN raw SET NOT NULL; - --- ── 2. location geocode cache ──────────────────────────────────────────────── --- query_key = tickets.norm_cluster(location_name): a normalised key on the source --- location string (norm_cluster just upper/collapse/trims — generic). The loader --- geocodes the *cleaned* place (region+cluster+location_name, codes stripped) but --- caches under the raw location_name key so resolve_ticket_geoms can join in SQL --- without re-deriving the regex. -CREATE TABLE IF NOT EXISTS tickets.geo_locations ( - query_key text PRIMARY KEY, - location_name text, - cluster text, - region text, - query text, -- the cleaned string actually sent to the geocoder - lat double precision, - lng double precision, - geom geometry(Point, 4326), - confidence numeric, - provider text, - verified boolean NOT NULL DEFAULT false, - updated_at timestamptz NOT NULL DEFAULT now() -); - -CREATE OR REPLACE FUNCTION tickets.tg_geo_locations_geom() - RETURNS trigger LANGUAGE plpgsql AS $fn$ -BEGIN - IF NEW.lat IS NOT NULL AND NEW.lng IS NOT NULL - AND NEW.lat BETWEEN -90 AND 90 AND NEW.lng BETWEEN -180 AND 180 - AND NOT (NEW.lat = 0 AND NEW.lng = 0) THEN - NEW.geom := ST_SetSRID(ST_MakePoint(NEW.lng, NEW.lat), 4326); - ELSE - NEW.geom := NULL; - END IF; - NEW.updated_at := now(); - RETURN NEW; -END $fn$; -DROP TRIGGER IF EXISTS trg_geo_locations_geom ON tickets.geo_locations; -CREATE TRIGGER trg_geo_locations_geom BEFORE INSERT OR UPDATE ON tickets.geo_locations - FOR EACH ROW EXECUTE FUNCTION tickets.tg_geo_locations_geom(); - --- ── 3. geom trigger — read from raw; never clobber a deliberate geom-only update ─ -CREATE OR REPLACE FUNCTION tickets.tg_ticket_geom() - RETURNS trigger LANGUAGE plpgsql AS $fn$ -DECLARE - v_lat double precision := NULLIF(NEW.raw->>'latitude','')::double precision; - v_lng double precision := NULLIF(NEW.raw->>'longitude','')::double precision; - g geometry(Point, 4326); -BEGIN - -- A geom/geo_source-only UPDATE (raw unchanged — e.g. resolve_ticket_geoms or the - -- location geocode pass) must keep the caller's geom, not recompute it. - IF TG_OP = 'UPDATE' AND NEW.raw IS NOT DISTINCT FROM OLD.raw THEN - RETURN NEW; - END IF; - IF v_lat IS NOT NULL AND v_lng IS NOT NULL - AND v_lat BETWEEN -90 AND 90 AND v_lng BETWEEN -180 AND 180 - AND NOT (v_lat = 0 AND v_lng = 0) THEN - NEW.geom := ST_SetSRID(ST_MakePoint(v_lng, v_lat), 4326); - NEW.geo_source := 'feed'; - ELSE - SELECT gc.geom INTO g FROM tickets.geo_clusters gc - WHERE gc.cluster_key = tickets.norm_cluster(NEW.raw->>'cluster') AND gc.geom IS NOT NULL LIMIT 1; - IF g IS NOT NULL THEN NEW.geom := g; NEW.geo_source := 'cluster'; - ELSE NEW.geom := NULL; NEW.geo_source := 'none'; END IF; - END IF; - RETURN NEW; -END $fn$; --- (triggers trg_inc_geom / trg_crq_geom from migration 22 already call this fn.) - --- ── 4. resolve — prefer location cache, else cluster centroid (non-feed rows) ── -CREATE OR REPLACE FUNCTION tickets.resolve_ticket_geoms() - RETURNS integer LANGUAGE plpgsql AS $fn$ -DECLARE n integer; m integer; -BEGIN - UPDATE tickets.inc t - SET geom = COALESCE(loc.geom, gc.geom), - geo_source = CASE WHEN loc.geom IS NOT NULL THEN 'location' - WHEN gc.geom IS NOT NULL THEN 'cluster' - ELSE 'none' END - FROM tickets.inc base - LEFT JOIN tickets.geo_locations loc - ON loc.query_key = tickets.norm_cluster(base.raw->>'location_name') AND loc.geom IS NOT NULL - LEFT JOIN tickets.geo_clusters gc - ON gc.cluster_key = tickets.norm_cluster(base.raw->>'cluster') AND gc.geom IS NOT NULL - WHERE t.ticket_id = base.ticket_id - AND t.geo_source IS DISTINCT FROM 'feed' - AND (t.geom IS DISTINCT FROM COALESCE(loc.geom, gc.geom) - OR t.geo_source IS DISTINCT FROM CASE WHEN loc.geom IS NOT NULL THEN 'location' - WHEN gc.geom IS NOT NULL THEN 'cluster' - ELSE 'none' END); - GET DIAGNOSTICS n = ROW_COUNT; - - UPDATE tickets.crq t - SET geom = COALESCE(loc.geom, gc.geom), - geo_source = CASE WHEN loc.geom IS NOT NULL THEN 'location' - WHEN gc.geom IS NOT NULL THEN 'cluster' - ELSE 'none' END - FROM tickets.crq base - LEFT JOIN tickets.geo_locations loc - ON loc.query_key = tickets.norm_cluster(base.raw->>'location_name') AND loc.geom IS NOT NULL - LEFT JOIN tickets.geo_clusters gc - ON gc.cluster_key = tickets.norm_cluster(base.raw->>'cluster') AND gc.geom IS NOT NULL - WHERE t.ticket_id = base.ticket_id - AND t.geo_source IS DISTINCT FROM 'feed' - AND (t.geom IS DISTINCT FROM COALESCE(loc.geom, gc.geom) - OR t.geo_source IS DISTINCT FROM CASE WHEN loc.geom IS NOT NULL THEN 'location' - WHEN gc.geom IS NOT NULL THEN 'cluster' - ELSE 'none' END); - GET DIAGNOSTICS m = ROW_COUNT; - RETURN n + m; -END $fn$; - --- ── 5. expression indexes (replace the dropped column indexes) ──────────────── -CREATE INDEX IF NOT EXISTS ix_inc_status_raw ON tickets.inc ((raw->>'normalized_status')); -CREATE INDEX IF NOT EXISTS ix_inc_actionable_raw ON tickets.inc (((raw->>'is_actionable')::boolean)) - WHERE (raw->>'is_actionable')::boolean; -CREATE INDEX IF NOT EXISTS ix_inc_cluster_raw ON tickets.inc (tickets.norm_cluster(raw->>'cluster')); -CREATE INDEX IF NOT EXISTS ix_inc_loc_raw ON tickets.inc (tickets.norm_cluster(raw->>'location_name')); -CREATE INDEX IF NOT EXISTS ix_inc_geom ON tickets.inc USING gist (geom); -CREATE INDEX IF NOT EXISTS ix_crq_status_raw ON tickets.crq ((raw->>'normalized_status')); -CREATE INDEX IF NOT EXISTS ix_crq_actionable_raw ON tickets.crq (((raw->>'is_actionable')::boolean)) - WHERE (raw->>'is_actionable')::boolean; -CREATE INDEX IF NOT EXISTS ix_crq_cluster_raw ON tickets.crq (tickets.norm_cluster(raw->>'cluster')); -CREATE INDEX IF NOT EXISTS ix_crq_loc_raw ON tickets.crq (tickets.norm_cluster(raw->>'location_name')); -CREATE INDEX IF NOT EXISTS ix_crq_geom ON tickets.crq USING gist (geom); - --- ── 6. read function — extract every property from raw (same signature) ─────── -CREATE OR REPLACE FUNCTION reporting.fn_tickets_for_map( - p_service_type text DEFAULT NULL, - p_status text DEFAULT NULL, - p_open_only boolean DEFAULT true -) - RETURNS jsonb LANGUAGE plpgsql STABLE AS $fn$ -DECLARE v_result jsonb; -BEGIN - p_service_type := lower(NULLIF(p_service_type, '')); - p_status := NULLIF(p_status, ''); - WITH filtered AS ( - SELECT 'inc'::text AS service_type, raw, geom, geo_source FROM tickets.inc - WHERE geom IS NOT NULL - AND (p_service_type IS NULL OR p_service_type = 'inc') - AND (p_status IS NULL OR raw->>'normalized_status' = p_status) - AND (NOT p_open_only OR (raw->>'is_actionable')::boolean IS TRUE) - UNION ALL - SELECT 'crq'::text AS service_type, raw, geom, geo_source FROM tickets.crq - WHERE geom IS NOT NULL - AND (p_service_type IS NULL OR p_service_type = 'crq') - AND (p_status IS NULL OR raw->>'normalized_status' = p_status) - AND (NOT p_open_only OR (raw->>'is_actionable')::boolean IS TRUE) - ) - SELECT jsonb_build_object( - 'summary', jsonb_build_object( - 'ticket_count', COUNT(*), - 'inc', COUNT(*) FILTER (WHERE service_type = 'inc'), - 'crq', COUNT(*) FILTER (WHERE service_type = 'crq'), - 'open', COUNT(*) FILTER (WHERE (raw->>'is_actionable')::boolean IS TRUE), - 'by_status', (SELECT jsonb_object_agg(s, c) - FROM (SELECT raw->>'normalized_status' AS s, COUNT(*) AS c - FROM filtered GROUP BY raw->>'normalized_status') z) - ), - 'geojson', jsonb_build_object( - 'type', 'FeatureCollection', - 'features', COALESCE(jsonb_agg( - jsonb_build_object( - 'type', 'Feature', - 'properties', jsonb_build_object( - 'ticket_id', raw->>'ticket_id', - 'service_type', service_type, - 'status', raw->>'normalized_status', - 'raw_status', raw->>'raw_status', - 'cluster', raw->>'cluster', - 'region', raw->>'region', - 'location_name', raw->>'location_name', - 'department', raw->>'department', - 'owner', raw->>'owner', - 'assigned_team', raw->>'assigned_team', - 'sla_status', raw->>'sla_status', - 'is_actionable', (raw->>'is_actionable')::boolean, - 'geo_source', geo_source, - 'created_at', raw->>'created_at_service', - 'scheduled_at', raw->>'scheduled_at' - ), - 'geometry', ST_AsGeoJSON(geom)::jsonb - ) - ), '[]'::jsonb) - ) - ) INTO v_result FROM filtered; - RETURN v_result; -END $fn$; - -COMMENT ON FUNCTION reporting.fn_tickets_for_map(text, text, boolean) IS - 'INC/CRQ tickets (tickets.inc + tickets.crq, raw-jsonb-first) as GeoJSON. Migration 23.'; - --- ── 7. grants ───────────────────────────────────────────────────────────────── -DO $grants$ -BEGIN - IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'tracksolid_owner') THEN - GRANT USAGE, CREATE ON SCHEMA tickets TO tracksolid_owner; - GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA tickets TO tracksolid_owner; - GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA tickets TO tracksolid_owner; - END IF; - IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'dashboard_ro') THEN - GRANT USAGE ON SCHEMA tickets TO dashboard_ro; - GRANT SELECT ON tickets.inc, tickets.crq, tickets.geo_clusters, tickets.geo_locations TO dashboard_ro; - GRANT EXECUTE ON FUNCTION reporting.fn_tickets_for_map(text, text, boolean) TO dashboard_ro; - END IF; - IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'grafana_ro') THEN - GRANT USAGE ON SCHEMA tickets TO grafana_ro; - GRANT SELECT ON tickets.inc, tickets.crq, tickets.geo_clusters, tickets.geo_locations TO grafana_ro; - GRANT EXECUTE ON FUNCTION reporting.fn_tickets_for_map(text, text, boolean) TO grafana_ro; - END IF; -END $grants$; diff --git a/run_migrations.py b/run_migrations.py index f642261..d94389d 100644 --- a/run_migrations.py +++ b/run_migrations.py @@ -44,9 +44,10 @@ MIGRATIONS = [ "18_grant_reporting_ro.sql", # grant SELECT on reporting.* to grafana_ro (staging read-only role) "19_v_ingest_health.sql", # reporting.v_ingest_health — pipeline freshness (replaces Grafana panels) "20_restore_live_feed.sql", # re-assert v_live_positions exclusion + fn_live_positions vehicle_type (migration-order regression fix) - "21_tickets.sql", # tracksolid.tickets + geo_clusters gazetteer + reporting.fn_tickets_for_map (FleetOps Tickets map) - "22_tickets_schema.sql", # move tickets into `tickets` schema; split into tickets.inc + tickets.crq; fn_tickets_for_map unions them - "23_tickets_raw_first.sql", # slim tickets.inc/crq to raw-jsonb-first; geo_locations cache; location-level geocoding; fn reads from raw + # The `tickets` schema (INC/CRQ map) was migrations 21-23; it now lives in its + # own repo — repo.rahamafresh.com/kianiadee/fleettickets.git (run its + # run_migrations.py). dashboard_api still serves GET /webhook/tickets via + # reporting.fn_tickets_for_map, which fleettickets defines. ] # ── Tables that must exist before the service is allowed to start ───────────── diff --git a/tools/README.md b/tools/README.md index 0ebd49f..4863cdc 100644 --- a/tools/README.md +++ b/tools/README.md @@ -18,6 +18,8 @@ docker exec -it "$WK" python -m tools. [args] | `audit_device_reconciliation.py` | Read-only: reconcile the vehicle CSV (`data/`) against `tracksolid.devices`; reports gaps + NULL fields | re-runnable audit | | `import_drivers_csv.py` | Populate device names/plates from CSV (`--apply` to commit). The registry is already populated — kept for future bulk re-imports | one-shot (done) | | `backfill_trips_enrichment.py` | One-shot backfill of historical `tracksolid.trips` (route_geom/addresses/plate) for rows predating migration 09 (`--apply` to commit) | one-shot (done) | -| `import_tickets.py` | Ingest INC/CRQ ticket snapshots (rustfs `tickets` bucket → `tracksolid.tickets`); `--from-bucket --apply`. `--geocode-clusters` seeds the cluster gazetteer that puts tickets on the FleetOps map. Needs migration 21 | re-runnable (per snapshot) | + +> INC/CRQ ticket ingestion (`import_tickets.py`) moved to its own repo — +> `repo.rahamafresh.com/kianiadee/fleettickets.git`. `data/` holds the source CSVs the import/audit scripts read (default: `data/20260427_FSG_Vehicles_mitieng.csv`). diff --git a/tools/import_tickets.py b/tools/import_tickets.py deleted file mode 100644 index 783d67c..0000000 --- a/tools/import_tickets.py +++ /dev/null @@ -1,377 +0,0 @@ -""" -import_tickets.py — Fireside Communications · INC/CRQ ticket ingestion (raw-first) -━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ -Loads the client's field-ops ticket snapshots into the `tickets` schema — the -source of the FleetOps "Tickets" map. Two categories, one table each: - tickets.inc — incidents / customer faults - tickets.crq — new-installation requests - -RAW-FIRST: each row stores only `ticket_id` + `raw` (the full source record as -jsonb). Everything downstream reads from `raw` (resilient to source schema drift). -The DB derives `geom` (migration 23): feed coords (raw lat/lng) -> location geocode -(tickets.geo_locations) -> cluster centroid (tickets.geo_clusters) -> none. - -Source data: rustfs `tickets` bucket, full snapshots from the client's email -automation — automations/{inc,crq}/latest.json (array of 32-key objects). - -Geocoding (two layers, both via a KEYED provider — public Nominatim rate-limits): - --geocode-clusters one coordinate per cluster (coarse fallback; ~50 lookups) - --geocode-locations precise per-location for ACTIONABLE INC tickets: parses the - real place out of location_name (region+cluster+location_name, - network codes stripped), geocodes it, caches in - tickets.geo_locations, then re-resolves geoms. -Provider/key from env: GEOCODER_PROVIDER (locationiq|opencage), GEOCODER_API_KEY. - -Usage (run as a module from the image root): - WK=$(docker ps --filter name=ingest_worker --format "{{.Names}}" | head -1) - docker exec -it "$WK" python -m tools.import_tickets --from-bucket --apply - docker exec -it "$WK" python -m tools.import_tickets \ - --inc-json /tmp/inc.json --crq-json /tmp/crq.json --apply - docker exec -it "$WK" python -m tools.import_tickets --geocode-clusters --apply - docker exec -it "$WK" python -m tools.import_tickets --geocode-locations --apply - -Pre-requisite: migrations 21–23 applied (tickets.inc/crq + geo_clusters + -geo_locations + reporting.fn_tickets_for_map). -━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ -""" - -from __future__ import annotations - -import argparse -import json -import math -import os -import re -import subprocess -import time - -import requests -import psycopg2.extras - -from ts_shared_rev import clean, get_conn, get_logger - -log = get_logger("import_tickets") - -TABLE_FOR = {"inc": "tickets.inc", "crq": "tickets.crq"} -_BUCKET = os.getenv("TICKETS_BUCKET", "tickets") -_BUCKET_KEYS = {"inc": "automations/inc/latest.json", "crq": "automations/crq/latest.json"} - -# Geocoder (keyed) — public Nominatim rate-limits bulk, so we use LocationIQ/OpenCage. -_PROVIDER = os.getenv("GEOCODER_PROVIDER", "locationiq").lower() -_API_KEY = os.getenv("GEOCODER_API_KEY", "") -_GEOCODE_INTERVAL_S = float(os.getenv("GEOCODER_MIN_INTERVAL_S", "1.1")) -_last_geocode_at = 0.0 - - -# ── data loading ────────────────────────────────────────────────────────────── -def _load_local(path: str) -> list[dict]: - with open(path, encoding="utf-8") as f: - data = json.load(f) # json.loads accepts NaN by default - return data if isinstance(data, list) else [] - - -def _load_bucket(kind: str) -> list[dict]: - env = { - **os.environ, - "AWS_ACCESS_KEY_ID": os.environ["RUSTFS_ACCESS_KEY"], - "AWS_SECRET_ACCESS_KEY": os.environ["RUSTFS_SECRET_KEY"], - "AWS_DEFAULT_REGION": os.getenv("RUSTFS_REGION", "us-east-1"), - } - uri = f"s3://{_BUCKET}/{_BUCKET_KEYS[kind]}" - log.info("fetching %s", uri) - out = subprocess.run( - ["aws", "--endpoint-url", os.environ["RUSTFS_ENDPOINT"], "s3", "cp", uri, "-"], - env=env, capture_output=True, timeout=180, check=True, - ).stdout - data = json.loads(out.decode("utf-8")) - return data if isinstance(data, list) else [] - - -# ── upsert (raw-first) ──────────────────────────────────────────────────────── -def _scrub_nan(row: dict) -> dict: - # Postgres jsonb rejects the JSON `NaN` token (e.g. mttr) — scrub to null. - return {k: (None if isinstance(v, float) and math.isnan(v) else v) for k, v in row.items()} - - -def upsert(rows: list[dict], table: str, apply: bool) -> int: - payload = [ - (tid, psycopg2.extras.Json(_scrub_nan(r))) - for r in rows - if (tid := clean(r.get("ticket_id"))) - ] - log.info("%s: %d valid rows (skipped %d without ticket_id)", - table, len(payload), len(rows) - len(payload)) - if not apply: - log.info("DRY-RUN — nothing written to %s. Use --apply.", table) - return len(payload) - with get_conn() as conn: - with conn.cursor() as cur: - psycopg2.extras.execute_values( - cur, - f"INSERT INTO {table} (ticket_id, raw) VALUES %s " - "ON CONFLICT (ticket_id) DO UPDATE SET raw = EXCLUDED.raw, ingested_at = now()", - payload, page_size=500, - ) - log.info("upserted %d rows into %s", len(payload), table) - return len(payload) - - -def ingest(args) -> None: - if args.from_bucket: - for kind in ("inc", "crq"): - upsert(_load_bucket(kind), TABLE_FOR[kind], args.apply) - else: - if args.inc_json: - upsert(_load_local(args.inc_json), TABLE_FOR["inc"], args.apply) - if args.crq_json: - upsert(_load_local(args.crq_json), TABLE_FOR["crq"], args.apply) - - -# ── place extraction (strip network codes, keep the real place) ─────────────── -# Leading site-code prefixes (NW_, CO_, ADR_MNT_, COAST_, …) — applied repeatedly. -_PREFIX_RE = re.compile(r"^(?:NW|NE|NM|SW|SE|CO|COAST|ADR|MNT|CMT|DR|NAIROBI|FIBER\w*)[\s_]+") -# Inline network/work-order codes to drop wherever they appear. -_CODE_RE = re.compile( - r"\b(?:SDUS|SDU|MDUS|MDU|FDT\s*\d*|AP|CLUSTER\s*\d*[A-Z]?|PHASE\s*\d+|CL\s*\d+|MNT|SITE|ADR)\b" -) - - -def extract_place(location_name: str | None) -> str: - """Pull the human place/landmark out of a coded location_name string. - - e.g. 'NW_RUIRU KAMAKIS_DEEP EAST APARTMENT-37' -> 'RUIRU KAMAKIS DEEP EAST APARTMENT' - """ - s = (location_name or "").upper().strip() - if not s: - return "" - # drop the trailing '-' segment (e.g. -37, -CALL CLIENT, -F32) - if "-" in s: - s = s.rsplit("-", 1)[0] - s = s.replace("_", " ") - # strip leading site-code prefixes (may be stacked: ADR MNT KAHAWA…) - prev = None - while prev != s: - prev = s - s = _PREFIX_RE.sub("", s).strip() - s = _CODE_RE.sub(" ", s) - s = re.sub(r"\s+", " ", s).strip(" ,-") - return s - - -def compose_query(location_name: str | None, cluster: str | None, region: str | None) -> str: - parts = [p for p in (extract_place(location_name), clean(cluster), clean(region), "Kenya") if p] - return ", ".join(dict.fromkeys(parts)) # de-dupe while preserving order - - -# ── keyed geocoder ──────────────────────────────────────────────────────────── -def _throttle() -> None: - global _last_geocode_at - wait = _GEOCODE_INTERVAL_S - (time.monotonic() - _last_geocode_at) - if wait > 0: - time.sleep(wait) - _last_geocode_at = time.monotonic() - - -def _haversine_km(lat1: float, lng1: float, lat2: float, lng2: float) -> float: - dlat, dlng = math.radians(lat2 - lat1), math.radians(lng2 - lng1) - a = (math.sin(dlat / 2) ** 2 - + math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(dlng / 2) ** 2) - return 2 * 6371.0 * math.asin(math.sqrt(a)) - - -def geocode(query: str, viewbox: tuple | None = None) -> tuple[float, float, float | None] | None: - """Forward-geocode via the configured keyed provider. (lat, lng, confidence) | None. - - `viewbox` = (min_lon, min_lat, max_lon, max_lat) constrains results to a box - around the cluster centroid (bounded), which stops the geocoder matching a - landmark name in the wrong city. - """ - if not _API_KEY: - log.error("GEOCODER_API_KEY is not set — cannot geocode (provider=%s)", _PROVIDER) - return None - _throttle() - try: - if _PROVIDER == "opencage": - params = {"key": _API_KEY, "q": query, "limit": 1, "countrycode": "ke", "no_annotations": 1} - if viewbox: - params["bounds"] = "%s,%s,%s,%s" % viewbox - r = requests.get("https://api.opencagedata.com/geocode/v1/json", params=params, timeout=15) - r.raise_for_status() - res = (r.json().get("results") or []) - if res: - g = res[0]["geometry"] - return float(g["lat"]), float(g["lng"]), res[0].get("confidence") - else: # locationiq (default) - params = {"key": _API_KEY, "q": query, "format": "json", "limit": 1, "countrycodes": "ke"} - if viewbox: - params["viewbox"] = "%s,%s,%s,%s" % viewbox - params["bounded"] = 1 - r = requests.get("https://us1.locationiq.com/v1/search", params=params, timeout=15) - if r.status_code == 404: # LocationIQ returns 404 for "no matches" - return None - r.raise_for_status() - hits = r.json() - if hits: - h = hits[0] - return float(h["lat"]), float(h["lon"]), float(h.get("importance") or 0) - except (requests.RequestException, ValueError, KeyError) as e: - log.warning("geocode failed for %r: %s", query, e) - return None - - -# ── cluster gazetteer (coarse fallback) ─────────────────────────────────────── -def geocode_clusters(apply: bool) -> None: - with get_conn() as conn: - with conn.cursor() as cur: - cur.execute( - """ - SELECT key, region FROM ( - SELECT tickets.norm_cluster(raw->>'cluster') AS key, - (array_agg(raw->>'region'))[1] AS region - FROM tickets.inc WHERE raw->>'cluster' IS NOT NULL GROUP BY 1 - UNION - SELECT tickets.norm_cluster(raw->>'cluster'), - (array_agg(raw->>'region'))[1] - FROM tickets.crq WHERE raw->>'cluster' IS NOT NULL GROUP BY 1 - ) z - WHERE key IS NOT NULL - AND NOT EXISTS (SELECT 1 FROM tickets.geo_clusters g - WHERE g.cluster_key = z.key AND g.geom IS NOT NULL) - """ - ) - todo = cur.fetchall() - log.info("%d clusters to geocode", len(todo)) - if not apply: - for key, region in todo: - log.info(" would geocode cluster: %s (%s)", key, region) - return - written = 0 - for key, region in todo: - hit = geocode(f"{key}, {region}, Kenya" if region else f"{key}, Kenya") - if not hit: - continue - lat, lng, _ = hit - with get_conn() as conn: - with conn.cursor() as cur: - cur.execute( - """INSERT INTO tickets.geo_clusters (cluster_key, region, lat, lng, source, verified) - VALUES (%s, %s, %s, %s, %s, false) - ON CONFLICT (cluster_key) DO UPDATE - SET region = EXCLUDED.region, lat = EXCLUDED.lat, - lng = EXCLUDED.lng, source = EXCLUDED.source""", - (key, region, lat, lng, _PROVIDER), - ) - written += 1 - _resolve() - log.info("gazetteer: %d clusters written (unverified — review tickets.geo_clusters)", written) - - -# ── per-location geocoding (precise; actionable INC) ────────────────────────── -# A location geocode is only trusted if it lands within this radius of the -# cluster centroid; otherwise the geocoder matched the landmark in the wrong -# place and we fall back to the cluster centroid. -_MAX_KM_FROM_CLUSTER = float(os.getenv("GEOCODER_MAX_KM", "25")) -_VIEWBOX_DEG = 0.2 # ~22 km half-box around the cluster centroid - - -def geocode_locations(apply: bool) -> None: - with get_conn() as conn: - with conn.cursor() as cur: - cur.execute( - """ - SELECT t.key, t.location_name, t.cluster, t.region, gc.lat AS clat, gc.lng AS clng - FROM ( - SELECT tickets.norm_cluster(raw->>'location_name') AS key, - (array_agg(raw->>'location_name'))[1] AS location_name, - (array_agg(raw->>'cluster'))[1] AS cluster, - (array_agg(raw->>'region'))[1] AS region, - tickets.norm_cluster((array_agg(raw->>'cluster'))[1]) AS ckey - FROM tickets.inc - WHERE (raw->>'is_actionable')::boolean - AND raw->>'location_name' IS NOT NULL - AND tickets.norm_cluster(raw->>'location_name') IS NOT NULL - AND NOT EXISTS (SELECT 1 FROM tickets.geo_locations gl - WHERE gl.query_key = tickets.norm_cluster(raw->>'location_name') - AND gl.geom IS NOT NULL) - GROUP BY 1 - ) t - LEFT JOIN tickets.geo_clusters gc ON gc.cluster_key = t.ckey - """ - ) - todo = cur.fetchall() - log.info("%d actionable-INC locations to geocode (provider=%s)", len(todo), _PROVIDER) - if not apply: - for key, loc, cluster, region, clat, clng in todo[:50]: - log.info(" %s -> %r", key, compose_query(loc, cluster, region)) - return - written = rejected = 0 - for key, loc, cluster, region, clat, clng in todo: - query = compose_query(loc, cluster, region) - viewbox = None - if clat is not None and clng is not None: - viewbox = (clng - _VIEWBOX_DEG, clat - _VIEWBOX_DEG, clng + _VIEWBOX_DEG, clat + _VIEWBOX_DEG) - hit = geocode(query, viewbox) - if not hit: - continue - lat, lng, conf = hit - # distance sanity: a result far from the cluster centroid is a wrong-city - # match — drop it so the ticket keeps the cluster-centroid fallback. - if clat is not None and clng is not None: - km = _haversine_km(lat, lng, clat, clng) - if km > _MAX_KM_FROM_CLUSTER: - rejected += 1 - log.info(" reject (%.0f km from cluster): %s", km, query) - continue - with get_conn() as conn: - with conn.cursor() as cur: - cur.execute( - """INSERT INTO tickets.geo_locations - (query_key, location_name, cluster, region, query, lat, lng, confidence, provider) - VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) - ON CONFLICT (query_key) DO UPDATE - SET location_name = EXCLUDED.location_name, cluster = EXCLUDED.cluster, - region = EXCLUDED.region, query = EXCLUDED.query, lat = EXCLUDED.lat, - lng = EXCLUDED.lng, confidence = EXCLUDED.confidence, provider = EXCLUDED.provider""", - (key, loc, cluster, region, query, lat, lng, conf, _PROVIDER), - ) - written += 1 - log.info(" geocoded %s -> %.5f, %.5f", query, lat, lng) - n = _resolve() - log.info("locations: %d accepted, %d rejected (too far); re-resolved geom on %d tickets " - "(unverified — review tickets.geo_locations)", written, rejected, n) - - -def _resolve() -> int: - with get_conn() as conn: - with conn.cursor() as cur: - cur.execute("SELECT tickets.resolve_ticket_geoms()") - return cur.fetchone()[0] - - -# ── entrypoint ──────────────────────────────────────────────────────────────── -def main() -> None: - ap = argparse.ArgumentParser(description="Ingest INC/CRQ tickets (raw-first) + geocode") - ap.add_argument("--apply", action="store_true", help="Write to DB (default: dry-run)") - ap.add_argument("--from-bucket", action="store_true", - help="Fetch latest.json for inc+crq from the rustfs tickets bucket (aws CLI)") - ap.add_argument("--inc-json", default=None, help="Local INC tickets JSON file") - ap.add_argument("--crq-json", default=None, help="Local CRQ tickets JSON file") - ap.add_argument("--geocode-clusters", action="store_true", - help="Geocode distinct clusters into the gazetteer, then re-resolve geoms") - ap.add_argument("--geocode-locations", action="store_true", - help="Geocode actionable-INC location_names precisely (keyed provider), then re-resolve") - args = ap.parse_args() - - if args.geocode_clusters: - geocode_clusters(apply=args.apply) - return - if args.geocode_locations: - geocode_locations(apply=args.apply) - return - if not (args.from_bucket or args.inc_json or args.crq_json): - ap.error("provide --from-bucket, --inc-json/--crq-json, --geocode-clusters, or --geocode-locations") - ingest(args) - - -if __name__ == "__main__": - main()