chore(tickets): hand the tickets schema + ingest to the fleettickets repo
The `tickets` schema (INC/CRQ map), its migrations, and the ingest/geocode loader moved to their own repo: repo.rahamafresh.com/kianiadee/fleettickets.git. - remove migrations 21-23 and tools/import_tickets.py - run_migrations.py: drop the 21-23 entries (fleettickets owns them now) - dashboard_api keeps GET /webhook/tickets, calling reporting.fn_tickets_for_map which fleettickets defines - move geocoder env-var docs to fleettickets Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
parent
cae64167eb
commit
868960c39c
8 changed files with 12 additions and 1201 deletions
|
|
@ -265,12 +265,13 @@ def vehicle_track(vehicle_number: str | None = None, hours: int = 1):
|
||||||
return JSONResponse({"error": "vehicle-track unavailable"})
|
return JSONResponse({"error": "vehicle-track unavailable"})
|
||||||
|
|
||||||
|
|
||||||
# ── Tickets (#21 — FleetOps Tickets map) ─────────────────────────────────────
|
# ── Tickets (FleetOps Tickets map) ───────────────────────────────────────────
|
||||||
# INC (incident / customer fault) + CRQ (new-installation) tickets as a GeoJSON
|
# INC (incident / customer fault) + CRQ (new-installation) tickets as a GeoJSON
|
||||||
# FeatureCollection for the FleetOps Tickets tab (FleetNow-style map). Backed by
|
# FeatureCollection for the FleetOps Tickets tab (FleetNow-style map). Backed by
|
||||||
# reporting.fn_tickets_for_map (migration 21) over tracksolid.tickets, which is
|
# reporting.fn_tickets_for_map over tickets.inc / tickets.crq. The schema, ingest,
|
||||||
# fed from the rustfs `tickets` bucket by tools/import_tickets.py. Only geocoded
|
# and that read function are owned by the separate `fleettickets` repo
|
||||||
# rows are mapped; open_only (default true) restricts to actionable tickets.
|
# (repo.rahamafresh.com/kianiadee/fleettickets.git); this endpoint just calls it.
|
||||||
|
# Only geocoded rows are mapped; open_only (default true) restricts to actionable.
|
||||||
@app.get("/webhook/tickets")
|
@app.get("/webhook/tickets")
|
||||||
def tickets(
|
def tickets(
|
||||||
service_type: str | None = None, # 'inc' | 'crq' | None (both)
|
service_type: str | None = None, # 'inc' | 'crq' | None (both)
|
||||||
|
|
|
||||||
|
|
@ -66,20 +66,6 @@ docker exec -i $DB psql -U postgres -d tracksolid_db < migration.sql
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## Geocoder (tickets location geocoding)
|
|
||||||
|
|
||||||
Used by `tools/import_tickets.py --geocode-locations` / `--geocode-clusters` to turn ticket
|
|
||||||
`location_name`/`cluster` text into coordinates (`tickets.geo_locations` / `tickets.geo_clusters`).
|
|
||||||
A **keyed** provider is required — the public Nominatim server rate-limits bulk lookups.
|
|
||||||
|
|
||||||
| Parameter | Env var |
|
|
||||||
|---|---|
|
|
||||||
| Provider | `GEOCODER_PROVIDER` (`locationiq` (default) \| `opencage`) |
|
|
||||||
| API key | `GEOCODER_API_KEY` |
|
|
||||||
| Min interval | `GEOCODER_MIN_INTERVAL_S` (default `1.1` — throttle to provider TOS) |
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Container Name Resolution
|
## Container Name Resolution
|
||||||
|
|
||||||
Coolify appends a random suffix to all container names. Never hardcode. Always resolve:
|
Coolify appends a random suffix to all container names. Never hardcode. Always resolve:
|
||||||
|
|
|
||||||
|
|
@ -1,247 +0,0 @@
|
||||||
-- 21_tickets.sql
|
|
||||||
-- Field-ops TICKET layer for FleetOps: INC (incident / customer fault) + CRQ
|
|
||||||
-- (new-installation request). Tickets are produced by the client's email
|
|
||||||
-- automation as full-snapshot files in the rustfs `tickets` bucket
|
|
||||||
-- (automations/{inc,crq}/latest.json) and ingested by tools/import_tickets.py
|
|
||||||
-- into tracksolid.tickets. The map read layer is reporting.fn_tickets_for_map,
|
|
||||||
-- served by dashboard_api GET /webhook/tickets and rendered as INC/CRQ overlay
|
|
||||||
-- layers in the FleetOps "Tickets" tab (FleetNow-style map).
|
|
||||||
--
|
|
||||||
-- Geocoding: source rows carry NO coordinates (latitude/longitude are null in
|
|
||||||
-- every row observed). geom is resolved from a small cluster gazetteer
|
|
||||||
-- (tracksolid.geo_clusters), or directly from the feed lat/lng if the upstream
|
|
||||||
-- ever starts populating them. Until the gazetteer is seeded, geom is NULL and
|
|
||||||
-- tickets are simply not mapped (fn_tickets_for_map filters geom IS NOT NULL).
|
|
||||||
--
|
|
||||||
-- Safe to re-apply (IF NOT EXISTS / CREATE OR REPLACE / DROP TRIGGER IF EXISTS).
|
|
||||||
|
|
||||||
SET search_path = tracksolid, reporting, public;
|
|
||||||
|
|
||||||
-- ── normalize helper (cluster_key) ────────────────────────────────────────────
|
|
||||||
-- upper + collapse internal whitespace + trim; '' → NULL. IMMUTABLE so it can
|
|
||||||
-- back a functional index.
|
|
||||||
CREATE OR REPLACE FUNCTION tracksolid.norm_cluster(p text)
|
|
||||||
RETURNS text LANGUAGE sql IMMUTABLE PARALLEL SAFE
|
|
||||||
AS $function$
|
|
||||||
SELECT NULLIF(upper(regexp_replace(trim(COALESCE(p, '')), '\s+', ' ', 'g')), '')
|
|
||||||
$function$;
|
|
||||||
|
|
||||||
-- ── gazetteer: normalized cluster → coordinates ──────────────────────────────
|
|
||||||
CREATE TABLE IF NOT EXISTS tracksolid.geo_clusters (
|
|
||||||
cluster_key text PRIMARY KEY, -- tracksolid.norm_cluster(cluster)
|
|
||||||
region text,
|
|
||||||
lat double precision,
|
|
||||||
lng double precision,
|
|
||||||
geom geometry(Point, 4326),
|
|
||||||
source text, -- 'nominatim' | 'manual' | …
|
|
||||||
verified boolean NOT NULL DEFAULT false,
|
|
||||||
updated_at timestamptz NOT NULL DEFAULT now()
|
|
||||||
);
|
|
||||||
|
|
||||||
CREATE OR REPLACE FUNCTION tracksolid.tg_geo_clusters_geom()
|
|
||||||
RETURNS trigger LANGUAGE plpgsql
|
|
||||||
AS $function$
|
|
||||||
BEGIN
|
|
||||||
IF NEW.lat IS NOT NULL AND NEW.lng IS NOT NULL
|
|
||||||
AND NEW.lat BETWEEN -90 AND 90 AND NEW.lng BETWEEN -180 AND 180
|
|
||||||
AND NOT (NEW.lat = 0 AND NEW.lng = 0) THEN
|
|
||||||
NEW.geom := ST_SetSRID(ST_MakePoint(NEW.lng, NEW.lat), 4326);
|
|
||||||
ELSE
|
|
||||||
NEW.geom := NULL;
|
|
||||||
END IF;
|
|
||||||
NEW.updated_at := now();
|
|
||||||
RETURN NEW;
|
|
||||||
END $function$;
|
|
||||||
|
|
||||||
DROP TRIGGER IF EXISTS trg_geo_clusters_geom ON tracksolid.geo_clusters;
|
|
||||||
CREATE TRIGGER trg_geo_clusters_geom
|
|
||||||
BEFORE INSERT OR UPDATE ON tracksolid.geo_clusters
|
|
||||||
FOR EACH ROW EXECUTE FUNCTION tracksolid.tg_geo_clusters_geom();
|
|
||||||
|
|
||||||
-- ── tickets ───────────────────────────────────────────────────────────────────
|
|
||||||
-- Columns mirror the source snapshot schema (32 fields, identical for INC/CRQ)
|
|
||||||
-- 1:1, plus locally-derived geom / geo_source / raw / ingested_at.
|
|
||||||
CREATE TABLE IF NOT EXISTS tracksolid.tickets (
|
|
||||||
ticket_id text PRIMARY KEY,
|
|
||||||
source_type text,
|
|
||||||
service_type text, -- 'inc' | 'crq' (type discriminator)
|
|
||||||
bucket text,
|
|
||||||
raw_status text,
|
|
||||||
normalized_status text,
|
|
||||||
created_at_service timestamptz,
|
|
||||||
scheduled_at timestamptz,
|
|
||||||
closed_at timestamptz,
|
|
||||||
last_seen_at timestamptz,
|
|
||||||
first_seen_at timestamptz,
|
|
||||||
week_start date,
|
|
||||||
week_end date,
|
|
||||||
cluster text,
|
|
||||||
region text,
|
|
||||||
location_name text,
|
|
||||||
latitude double precision,
|
|
||||||
longitude double precision,
|
|
||||||
department text,
|
|
||||||
assigned_team text,
|
|
||||||
owner text,
|
|
||||||
sla_status text,
|
|
||||||
mttr double precision,
|
|
||||||
is_auto_created boolean,
|
|
||||||
is_auto_closed boolean,
|
|
||||||
is_alarm boolean,
|
|
||||||
is_actionable boolean,
|
|
||||||
source_s3_bucket text,
|
|
||||||
source_s3_key text,
|
|
||||||
source_snapshot_id bigint,
|
|
||||||
created_at timestamptz,
|
|
||||||
updated_at timestamptz,
|
|
||||||
-- locally derived
|
|
||||||
geom geometry(Point, 4326),
|
|
||||||
geo_source text, -- 'feed' | 'cluster' | 'none'
|
|
||||||
raw jsonb, -- full source row (hedge for new fields)
|
|
||||||
ingested_at timestamptz NOT NULL DEFAULT now(),
|
|
||||||
CONSTRAINT tickets_service_type_chk
|
|
||||||
CHECK (service_type IS NULL OR service_type IN ('inc', 'crq'))
|
|
||||||
);
|
|
||||||
|
|
||||||
-- geom resolution: feed coords first, else cluster gazetteer, else none.
|
|
||||||
CREATE OR REPLACE FUNCTION tracksolid.tg_tickets_geom()
|
|
||||||
RETURNS trigger LANGUAGE plpgsql
|
|
||||||
AS $function$
|
|
||||||
DECLARE
|
|
||||||
g geometry(Point, 4326);
|
|
||||||
BEGIN
|
|
||||||
IF NEW.latitude IS NOT NULL AND NEW.longitude IS NOT NULL
|
|
||||||
AND NEW.latitude BETWEEN -90 AND 90
|
|
||||||
AND NEW.longitude BETWEEN -180 AND 180
|
|
||||||
AND NOT (NEW.latitude = 0 AND NEW.longitude = 0) THEN
|
|
||||||
NEW.geom := ST_SetSRID(ST_MakePoint(NEW.longitude, NEW.latitude), 4326);
|
|
||||||
NEW.geo_source := 'feed';
|
|
||||||
ELSE
|
|
||||||
SELECT gc.geom INTO g
|
|
||||||
FROM tracksolid.geo_clusters gc
|
|
||||||
WHERE gc.cluster_key = tracksolid.norm_cluster(NEW.cluster)
|
|
||||||
AND gc.geom IS NOT NULL
|
|
||||||
LIMIT 1;
|
|
||||||
IF g IS NOT NULL THEN
|
|
||||||
NEW.geom := g; NEW.geo_source := 'cluster';
|
|
||||||
ELSE
|
|
||||||
NEW.geom := NULL; NEW.geo_source := 'none';
|
|
||||||
END IF;
|
|
||||||
END IF;
|
|
||||||
RETURN NEW;
|
|
||||||
END $function$;
|
|
||||||
|
|
||||||
DROP TRIGGER IF EXISTS trg_tickets_geom ON tracksolid.tickets;
|
|
||||||
CREATE TRIGGER trg_tickets_geom
|
|
||||||
BEFORE INSERT OR UPDATE ON tracksolid.tickets
|
|
||||||
FOR EACH ROW EXECUTE FUNCTION tracksolid.tg_tickets_geom();
|
|
||||||
|
|
||||||
-- Re-resolve geoms for already-loaded tickets after the gazetteer is (re)seeded,
|
|
||||||
-- without re-running ingestion. Skips rows already pinned by feed coordinates.
|
|
||||||
-- Returns the number of rows updated. Call: SELECT tracksolid.resolve_ticket_geoms();
|
|
||||||
CREATE OR REPLACE FUNCTION tracksolid.resolve_ticket_geoms()
|
|
||||||
RETURNS integer LANGUAGE plpgsql
|
|
||||||
AS $function$
|
|
||||||
DECLARE n integer;
|
|
||||||
BEGIN
|
|
||||||
UPDATE tracksolid.tickets t
|
|
||||||
SET geom = gc.geom, geo_source = 'cluster'
|
|
||||||
FROM tracksolid.geo_clusters gc
|
|
||||||
WHERE gc.cluster_key = tracksolid.norm_cluster(t.cluster)
|
|
||||||
AND gc.geom IS NOT NULL
|
|
||||||
AND t.geo_source IS DISTINCT FROM 'feed'
|
|
||||||
AND t.geom IS DISTINCT FROM gc.geom;
|
|
||||||
GET DIAGNOSTICS n = ROW_COUNT;
|
|
||||||
RETURN n;
|
|
||||||
END $function$;
|
|
||||||
|
|
||||||
-- ── indexes ───────────────────────────────────────────────────────────────────
|
|
||||||
CREATE INDEX IF NOT EXISTS ix_tickets_service_type ON tracksolid.tickets (service_type);
|
|
||||||
CREATE INDEX IF NOT EXISTS ix_tickets_status ON tracksolid.tickets (normalized_status);
|
|
||||||
CREATE INDEX IF NOT EXISTS ix_tickets_type_status ON tracksolid.tickets (service_type, normalized_status);
|
|
||||||
CREATE INDEX IF NOT EXISTS ix_tickets_actionable ON tracksolid.tickets (is_actionable) WHERE is_actionable;
|
|
||||||
CREATE INDEX IF NOT EXISTS ix_tickets_geom ON tracksolid.tickets USING gist (geom);
|
|
||||||
CREATE INDEX IF NOT EXISTS ix_tickets_cluster ON tracksolid.tickets (tracksolid.norm_cluster(cluster));
|
|
||||||
|
|
||||||
-- ── map read function ─────────────────────────────────────────────────────────
|
|
||||||
-- Modeled on reporting.fn_live_positions: NULLIF-cleaned params, single filtered
|
|
||||||
-- CTE, returns { summary, geojson:FeatureCollection }. Maps only geocoded rows
|
|
||||||
-- (geom IS NOT NULL). p_open_only restricts to actionable (open) tickets — the
|
|
||||||
-- map default, since the bulk of INC are CLOSED.
|
|
||||||
CREATE OR REPLACE FUNCTION reporting.fn_tickets_for_map(
|
|
||||||
p_service_type text DEFAULT NULL,
|
|
||||||
p_status text DEFAULT NULL,
|
|
||||||
p_open_only boolean DEFAULT true
|
|
||||||
)
|
|
||||||
RETURNS jsonb LANGUAGE plpgsql STABLE
|
|
||||||
AS $function$
|
|
||||||
DECLARE
|
|
||||||
v_result jsonb;
|
|
||||||
BEGIN
|
|
||||||
p_service_type := lower(NULLIF(p_service_type, ''));
|
|
||||||
p_status := NULLIF(p_status, '');
|
|
||||||
WITH filtered AS (
|
|
||||||
SELECT *
|
|
||||||
FROM tracksolid.tickets
|
|
||||||
WHERE geom IS NOT NULL
|
|
||||||
AND (p_service_type IS NULL OR service_type = p_service_type)
|
|
||||||
AND (p_status IS NULL OR normalized_status = p_status)
|
|
||||||
AND (NOT p_open_only OR is_actionable IS TRUE)
|
|
||||||
)
|
|
||||||
SELECT jsonb_build_object(
|
|
||||||
'summary', jsonb_build_object(
|
|
||||||
'ticket_count', COUNT(*),
|
|
||||||
'inc', COUNT(*) FILTER (WHERE service_type = 'inc'),
|
|
||||||
'crq', COUNT(*) FILTER (WHERE service_type = 'crq'),
|
|
||||||
'open', COUNT(*) FILTER (WHERE is_actionable IS TRUE),
|
|
||||||
'by_status', (SELECT jsonb_object_agg(s, c)
|
|
||||||
FROM (SELECT normalized_status AS s, COUNT(*) AS c
|
|
||||||
FROM filtered GROUP BY normalized_status) z)
|
|
||||||
),
|
|
||||||
'geojson', jsonb_build_object(
|
|
||||||
'type', 'FeatureCollection',
|
|
||||||
'features', COALESCE(jsonb_agg(
|
|
||||||
jsonb_build_object(
|
|
||||||
'type', 'Feature',
|
|
||||||
'properties', jsonb_build_object(
|
|
||||||
'ticket_id', ticket_id,
|
|
||||||
'service_type', service_type,
|
|
||||||
'status', normalized_status,
|
|
||||||
'raw_status', raw_status,
|
|
||||||
'cluster', cluster,
|
|
||||||
'region', region,
|
|
||||||
'location_name', location_name,
|
|
||||||
'department', department,
|
|
||||||
'owner', owner,
|
|
||||||
'assigned_team', assigned_team,
|
|
||||||
'sla_status', sla_status,
|
|
||||||
'is_actionable', is_actionable,
|
|
||||||
'geo_source', geo_source,
|
|
||||||
'created_at', to_char(created_at_service, 'YYYY-MM-DD HH24:MI:SS'),
|
|
||||||
'scheduled_at', to_char(scheduled_at, 'YYYY-MM-DD HH24:MI:SS')
|
|
||||||
),
|
|
||||||
'geometry', ST_AsGeoJSON(geom)::jsonb
|
|
||||||
)
|
|
||||||
), '[]'::jsonb)
|
|
||||||
)
|
|
||||||
) INTO v_result FROM filtered;
|
|
||||||
|
|
||||||
RETURN v_result;
|
|
||||||
END $function$;
|
|
||||||
|
|
||||||
COMMENT ON FUNCTION reporting.fn_tickets_for_map(text, text, boolean) IS
|
|
||||||
'INC/CRQ tickets as a GeoJSON FeatureCollection for the FleetOps Tickets map. Migration 21.';
|
|
||||||
|
|
||||||
-- ── grants (guarded: roles may not exist on a fresh DB) ───────────────────────
|
|
||||||
DO $grants$
|
|
||||||
BEGIN
|
|
||||||
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'dashboard_ro') THEN
|
|
||||||
GRANT USAGE ON SCHEMA tracksolid TO dashboard_ro;
|
|
||||||
GRANT SELECT ON tracksolid.tickets, tracksolid.geo_clusters TO dashboard_ro;
|
|
||||||
GRANT EXECUTE ON FUNCTION reporting.fn_tickets_for_map(text, text, boolean) TO dashboard_ro;
|
|
||||||
END IF;
|
|
||||||
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'grafana_ro') THEN
|
|
||||||
GRANT SELECT ON tracksolid.tickets, tracksolid.geo_clusters TO grafana_ro;
|
|
||||||
GRANT EXECUTE ON FUNCTION reporting.fn_tickets_for_map(text, text, boolean) TO grafana_ro;
|
|
||||||
END IF;
|
|
||||||
END $grants$;
|
|
||||||
|
|
@ -1,309 +0,0 @@
|
||||||
-- 22_tickets_schema.sql
|
|
||||||
-- Promote the ticket layer into its own `tickets` schema and SPLIT by type:
|
|
||||||
-- migration-21 tracksolid.tickets (one table, service_type discriminator) -->
|
|
||||||
-- tickets.inc + tickets.crq (one table per ticket type)
|
|
||||||
-- tracksolid.geo_clusters --> tickets.geo_clusters (shared gazetteer)
|
|
||||||
--
|
|
||||||
-- The read function reporting.fn_tickets_for_map keeps its SAME signature
|
|
||||||
-- (so dashboard_api GET /webhook/tickets and the FleetOps map are unchanged) but
|
|
||||||
-- now UNIONs the two per-type tables. The tracksolid.* objects from migration 21
|
|
||||||
-- are dropped after the data is copied across. Safe to re-apply.
|
|
||||||
--
|
|
||||||
-- NOTE: this intentionally introduces a schema beyond `tracksolid`/`reporting`
|
|
||||||
-- (a deliberate exception to the single-live-schema convention) to give tickets
|
|
||||||
-- their own namespace + access boundary.
|
|
||||||
|
|
||||||
CREATE SCHEMA IF NOT EXISTS tickets;
|
|
||||||
SET search_path = tickets, public;
|
|
||||||
|
|
||||||
-- ── normalize helper (cluster_key) ────────────────────────────────────────────
|
|
||||||
CREATE OR REPLACE FUNCTION tickets.norm_cluster(p text)
|
|
||||||
RETURNS text LANGUAGE sql IMMUTABLE PARALLEL SAFE
|
|
||||||
AS $fn$ SELECT NULLIF(upper(regexp_replace(trim(COALESCE(p, '')), '\s+', ' ', 'g')), '') $fn$;
|
|
||||||
|
|
||||||
-- ── gazetteer: move from tracksolid if present, else create fresh ─────────────
|
|
||||||
DO $mv$
|
|
||||||
BEGIN
|
|
||||||
IF to_regclass('tracksolid.geo_clusters') IS NOT NULL
|
|
||||||
AND to_regclass('tickets.geo_clusters') IS NULL THEN
|
|
||||||
ALTER TABLE tracksolid.geo_clusters SET SCHEMA tickets;
|
|
||||||
END IF;
|
|
||||||
END $mv$;
|
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS tickets.geo_clusters (
|
|
||||||
cluster_key text PRIMARY KEY,
|
|
||||||
region text,
|
|
||||||
lat double precision,
|
|
||||||
lng double precision,
|
|
||||||
geom geometry(Point, 4326),
|
|
||||||
source text,
|
|
||||||
verified boolean NOT NULL DEFAULT false,
|
|
||||||
updated_at timestamptz NOT NULL DEFAULT now()
|
|
||||||
);
|
|
||||||
|
|
||||||
CREATE OR REPLACE FUNCTION tickets.tg_geo_clusters_geom()
|
|
||||||
RETURNS trigger LANGUAGE plpgsql AS $fn$
|
|
||||||
BEGIN
|
|
||||||
IF NEW.lat IS NOT NULL AND NEW.lng IS NOT NULL
|
|
||||||
AND NEW.lat BETWEEN -90 AND 90 AND NEW.lng BETWEEN -180 AND 180
|
|
||||||
AND NOT (NEW.lat = 0 AND NEW.lng = 0) THEN
|
|
||||||
NEW.geom := ST_SetSRID(ST_MakePoint(NEW.lng, NEW.lat), 4326);
|
|
||||||
ELSE
|
|
||||||
NEW.geom := NULL;
|
|
||||||
END IF;
|
|
||||||
NEW.updated_at := now();
|
|
||||||
RETURN NEW;
|
|
||||||
END $fn$;
|
|
||||||
DROP TRIGGER IF EXISTS trg_geo_clusters_geom ON tickets.geo_clusters;
|
|
||||||
CREATE TRIGGER trg_geo_clusters_geom BEFORE INSERT OR UPDATE ON tickets.geo_clusters
|
|
||||||
FOR EACH ROW EXECUTE FUNCTION tickets.tg_geo_clusters_geom();
|
|
||||||
|
|
||||||
-- ── per-type ticket tables (identical shape; CHECK-locked to their type) ──────
|
|
||||||
CREATE TABLE IF NOT EXISTS tickets.inc (
|
|
||||||
ticket_id text PRIMARY KEY,
|
|
||||||
source_type text,
|
|
||||||
service_type text,
|
|
||||||
bucket text,
|
|
||||||
raw_status text,
|
|
||||||
normalized_status text,
|
|
||||||
created_at_service timestamptz,
|
|
||||||
scheduled_at timestamptz,
|
|
||||||
closed_at timestamptz,
|
|
||||||
last_seen_at timestamptz,
|
|
||||||
first_seen_at timestamptz,
|
|
||||||
week_start date,
|
|
||||||
week_end date,
|
|
||||||
cluster text,
|
|
||||||
region text,
|
|
||||||
location_name text,
|
|
||||||
latitude double precision,
|
|
||||||
longitude double precision,
|
|
||||||
department text,
|
|
||||||
assigned_team text,
|
|
||||||
owner text,
|
|
||||||
sla_status text,
|
|
||||||
mttr double precision,
|
|
||||||
is_auto_created boolean,
|
|
||||||
is_auto_closed boolean,
|
|
||||||
is_alarm boolean,
|
|
||||||
is_actionable boolean,
|
|
||||||
source_s3_bucket text,
|
|
||||||
source_s3_key text,
|
|
||||||
source_snapshot_id bigint,
|
|
||||||
created_at timestamptz,
|
|
||||||
updated_at timestamptz,
|
|
||||||
geom geometry(Point, 4326),
|
|
||||||
geo_source text,
|
|
||||||
raw jsonb,
|
|
||||||
ingested_at timestamptz NOT NULL DEFAULT now(),
|
|
||||||
CONSTRAINT inc_service_type_chk CHECK (service_type = 'inc')
|
|
||||||
);
|
|
||||||
|
|
||||||
-- crq mirrors inc's columns/defaults; add its own PK + type CHECK.
|
|
||||||
CREATE TABLE IF NOT EXISTS tickets.crq (LIKE tickets.inc INCLUDING DEFAULTS);
|
|
||||||
DO $crq$
|
|
||||||
BEGIN
|
|
||||||
IF NOT EXISTS (SELECT 1 FROM pg_constraint
|
|
||||||
WHERE conrelid = 'tickets.crq'::regclass AND contype = 'p') THEN
|
|
||||||
ALTER TABLE tickets.crq ADD PRIMARY KEY (ticket_id);
|
|
||||||
END IF;
|
|
||||||
IF NOT EXISTS (SELECT 1 FROM pg_constraint
|
|
||||||
WHERE conrelid = 'tickets.crq'::regclass AND conname = 'crq_service_type_chk') THEN
|
|
||||||
ALTER TABLE tickets.crq ADD CONSTRAINT crq_service_type_chk CHECK (service_type = 'crq');
|
|
||||||
END IF;
|
|
||||||
END $crq$;
|
|
||||||
|
|
||||||
-- ── shared geom-resolution trigger (feed coords → cluster gazetteer → none) ───
|
|
||||||
CREATE OR REPLACE FUNCTION tickets.tg_ticket_geom()
|
|
||||||
RETURNS trigger LANGUAGE plpgsql AS $fn$
|
|
||||||
DECLARE g geometry(Point, 4326);
|
|
||||||
BEGIN
|
|
||||||
IF NEW.latitude IS NOT NULL AND NEW.longitude IS NOT NULL
|
|
||||||
AND NEW.latitude BETWEEN -90 AND 90
|
|
||||||
AND NEW.longitude BETWEEN -180 AND 180
|
|
||||||
AND NOT (NEW.latitude = 0 AND NEW.longitude = 0) THEN
|
|
||||||
NEW.geom := ST_SetSRID(ST_MakePoint(NEW.longitude, NEW.latitude), 4326);
|
|
||||||
NEW.geo_source := 'feed';
|
|
||||||
ELSE
|
|
||||||
SELECT gc.geom INTO g FROM tickets.geo_clusters gc
|
|
||||||
WHERE gc.cluster_key = tickets.norm_cluster(NEW.cluster) AND gc.geom IS NOT NULL LIMIT 1;
|
|
||||||
IF g IS NOT NULL THEN NEW.geom := g; NEW.geo_source := 'cluster';
|
|
||||||
ELSE NEW.geom := NULL; NEW.geo_source := 'none'; END IF;
|
|
||||||
END IF;
|
|
||||||
RETURN NEW;
|
|
||||||
END $fn$;
|
|
||||||
|
|
||||||
DROP TRIGGER IF EXISTS trg_inc_geom ON tickets.inc;
|
|
||||||
CREATE TRIGGER trg_inc_geom BEFORE INSERT OR UPDATE ON tickets.inc
|
|
||||||
FOR EACH ROW EXECUTE FUNCTION tickets.tg_ticket_geom();
|
|
||||||
DROP TRIGGER IF EXISTS trg_crq_geom ON tickets.crq;
|
|
||||||
CREATE TRIGGER trg_crq_geom BEFORE INSERT OR UPDATE ON tickets.crq
|
|
||||||
FOR EACH ROW EXECUTE FUNCTION tickets.tg_ticket_geom();
|
|
||||||
|
|
||||||
-- re-resolve geoms across both tables after the gazetteer is (re)seeded.
|
|
||||||
CREATE OR REPLACE FUNCTION tickets.resolve_ticket_geoms()
|
|
||||||
RETURNS integer LANGUAGE plpgsql AS $fn$
|
|
||||||
DECLARE n integer; m integer;
|
|
||||||
BEGIN
|
|
||||||
UPDATE tickets.inc t SET geom = gc.geom, geo_source = 'cluster'
|
|
||||||
FROM tickets.geo_clusters gc
|
|
||||||
WHERE gc.cluster_key = tickets.norm_cluster(t.cluster) AND gc.geom IS NOT NULL
|
|
||||||
AND t.geo_source IS DISTINCT FROM 'feed' AND t.geom IS DISTINCT FROM gc.geom;
|
|
||||||
GET DIAGNOSTICS n = ROW_COUNT;
|
|
||||||
UPDATE tickets.crq t SET geom = gc.geom, geo_source = 'cluster'
|
|
||||||
FROM tickets.geo_clusters gc
|
|
||||||
WHERE gc.cluster_key = tickets.norm_cluster(t.cluster) AND gc.geom IS NOT NULL
|
|
||||||
AND t.geo_source IS DISTINCT FROM 'feed' AND t.geom IS DISTINCT FROM gc.geom;
|
|
||||||
GET DIAGNOSTICS m = ROW_COUNT;
|
|
||||||
RETURN n + m;
|
|
||||||
END $fn$;
|
|
||||||
|
|
||||||
-- ── indexes (per table) ───────────────────────────────────────────────────────
|
|
||||||
CREATE INDEX IF NOT EXISTS ix_inc_status ON tickets.inc (normalized_status);
|
|
||||||
CREATE INDEX IF NOT EXISTS ix_inc_actionable ON tickets.inc (is_actionable) WHERE is_actionable;
|
|
||||||
CREATE INDEX IF NOT EXISTS ix_inc_geom ON tickets.inc USING gist (geom);
|
|
||||||
CREATE INDEX IF NOT EXISTS ix_inc_cluster ON tickets.inc (tickets.norm_cluster(cluster));
|
|
||||||
CREATE INDEX IF NOT EXISTS ix_crq_status ON tickets.crq (normalized_status);
|
|
||||||
CREATE INDEX IF NOT EXISTS ix_crq_actionable ON tickets.crq (is_actionable) WHERE is_actionable;
|
|
||||||
CREATE INDEX IF NOT EXISTS ix_crq_geom ON tickets.crq USING gist (geom);
|
|
||||||
CREATE INDEX IF NOT EXISTS ix_crq_cluster ON tickets.crq (tickets.norm_cluster(cluster));
|
|
||||||
|
|
||||||
-- ── backfill from migration-21 tracksolid.tickets, then retire it ─────────────
|
|
||||||
DO $bf$
|
|
||||||
BEGIN
|
|
||||||
IF to_regclass('tracksolid.tickets') IS NOT NULL THEN
|
|
||||||
INSERT INTO tickets.inc (
|
|
||||||
ticket_id, source_type, service_type, bucket, raw_status, normalized_status,
|
|
||||||
created_at_service, scheduled_at, closed_at, last_seen_at, first_seen_at,
|
|
||||||
week_start, week_end, cluster, region, location_name, latitude, longitude,
|
|
||||||
department, assigned_team, owner, sla_status, mttr, is_auto_created,
|
|
||||||
is_auto_closed, is_alarm, is_actionable, source_s3_bucket, source_s3_key,
|
|
||||||
source_snapshot_id, created_at, updated_at, raw)
|
|
||||||
SELECT
|
|
||||||
ticket_id, source_type, service_type, bucket, raw_status, normalized_status,
|
|
||||||
created_at_service, scheduled_at, closed_at, last_seen_at, first_seen_at,
|
|
||||||
week_start, week_end, cluster, region, location_name, latitude, longitude,
|
|
||||||
department, assigned_team, owner, sla_status, mttr, is_auto_created,
|
|
||||||
is_auto_closed, is_alarm, is_actionable, source_s3_bucket, source_s3_key,
|
|
||||||
source_snapshot_id, created_at, updated_at, raw
|
|
||||||
FROM tracksolid.tickets WHERE service_type = 'inc'
|
|
||||||
ON CONFLICT (ticket_id) DO NOTHING;
|
|
||||||
|
|
||||||
INSERT INTO tickets.crq (
|
|
||||||
ticket_id, source_type, service_type, bucket, raw_status, normalized_status,
|
|
||||||
created_at_service, scheduled_at, closed_at, last_seen_at, first_seen_at,
|
|
||||||
week_start, week_end, cluster, region, location_name, latitude, longitude,
|
|
||||||
department, assigned_team, owner, sla_status, mttr, is_auto_created,
|
|
||||||
is_auto_closed, is_alarm, is_actionable, source_s3_bucket, source_s3_key,
|
|
||||||
source_snapshot_id, created_at, updated_at, raw)
|
|
||||||
SELECT
|
|
||||||
ticket_id, source_type, service_type, bucket, raw_status, normalized_status,
|
|
||||||
created_at_service, scheduled_at, closed_at, last_seen_at, first_seen_at,
|
|
||||||
week_start, week_end, cluster, region, location_name, latitude, longitude,
|
|
||||||
department, assigned_team, owner, sla_status, mttr, is_auto_created,
|
|
||||||
is_auto_closed, is_alarm, is_actionable, source_s3_bucket, source_s3_key,
|
|
||||||
source_snapshot_id, created_at, updated_at, raw
|
|
||||||
FROM tracksolid.tickets WHERE service_type = 'crq'
|
|
||||||
ON CONFLICT (ticket_id) DO NOTHING;
|
|
||||||
END IF;
|
|
||||||
END $bf$;
|
|
||||||
|
|
||||||
-- retire migration-21 objects (data now lives in tickets.*)
|
|
||||||
DROP TABLE IF EXISTS tracksolid.tickets;
|
|
||||||
DROP FUNCTION IF EXISTS tracksolid.resolve_ticket_geoms();
|
|
||||||
DROP FUNCTION IF EXISTS tracksolid.tg_tickets_geom();
|
|
||||||
DROP FUNCTION IF EXISTS tracksolid.tg_geo_clusters_geom();
|
|
||||||
DROP FUNCTION IF EXISTS tracksolid.norm_cluster(text);
|
|
||||||
|
|
||||||
-- ── read function (same signature; now UNIONs the two tables) ─────────────────
|
|
||||||
CREATE OR REPLACE FUNCTION reporting.fn_tickets_for_map(
|
|
||||||
p_service_type text DEFAULT NULL,
|
|
||||||
p_status text DEFAULT NULL,
|
|
||||||
p_open_only boolean DEFAULT true
|
|
||||||
)
|
|
||||||
RETURNS jsonb LANGUAGE plpgsql STABLE AS $fn$
|
|
||||||
DECLARE v_result jsonb;
|
|
||||||
BEGIN
|
|
||||||
p_service_type := lower(NULLIF(p_service_type, ''));
|
|
||||||
p_status := NULLIF(p_status, '');
|
|
||||||
WITH filtered AS (
|
|
||||||
SELECT ticket_id, service_type, normalized_status, raw_status, cluster, region,
|
|
||||||
location_name, department, owner, assigned_team, sla_status, is_actionable,
|
|
||||||
geo_source, created_at_service, scheduled_at, geom
|
|
||||||
FROM tickets.inc
|
|
||||||
WHERE geom IS NOT NULL
|
|
||||||
AND (p_service_type IS NULL OR p_service_type = 'inc')
|
|
||||||
AND (p_status IS NULL OR normalized_status = p_status)
|
|
||||||
AND (NOT p_open_only OR is_actionable IS TRUE)
|
|
||||||
UNION ALL
|
|
||||||
SELECT ticket_id, service_type, normalized_status, raw_status, cluster, region,
|
|
||||||
location_name, department, owner, assigned_team, sla_status, is_actionable,
|
|
||||||
geo_source, created_at_service, scheduled_at, geom
|
|
||||||
FROM tickets.crq
|
|
||||||
WHERE geom IS NOT NULL
|
|
||||||
AND (p_service_type IS NULL OR p_service_type = 'crq')
|
|
||||||
AND (p_status IS NULL OR normalized_status = p_status)
|
|
||||||
AND (NOT p_open_only OR is_actionable IS TRUE)
|
|
||||||
)
|
|
||||||
SELECT jsonb_build_object(
|
|
||||||
'summary', jsonb_build_object(
|
|
||||||
'ticket_count', COUNT(*),
|
|
||||||
'inc', COUNT(*) FILTER (WHERE service_type = 'inc'),
|
|
||||||
'crq', COUNT(*) FILTER (WHERE service_type = 'crq'),
|
|
||||||
'open', COUNT(*) FILTER (WHERE is_actionable IS TRUE),
|
|
||||||
'by_status', (SELECT jsonb_object_agg(s, c)
|
|
||||||
FROM (SELECT normalized_status AS s, COUNT(*) AS c
|
|
||||||
FROM filtered GROUP BY normalized_status) z)
|
|
||||||
),
|
|
||||||
'geojson', jsonb_build_object(
|
|
||||||
'type', 'FeatureCollection',
|
|
||||||
'features', COALESCE(jsonb_agg(
|
|
||||||
jsonb_build_object(
|
|
||||||
'type', 'Feature',
|
|
||||||
'properties', jsonb_build_object(
|
|
||||||
'ticket_id', ticket_id,
|
|
||||||
'service_type', service_type,
|
|
||||||
'status', normalized_status,
|
|
||||||
'raw_status', raw_status,
|
|
||||||
'cluster', cluster,
|
|
||||||
'region', region,
|
|
||||||
'location_name', location_name,
|
|
||||||
'department', department,
|
|
||||||
'owner', owner,
|
|
||||||
'assigned_team', assigned_team,
|
|
||||||
'sla_status', sla_status,
|
|
||||||
'is_actionable', is_actionable,
|
|
||||||
'geo_source', geo_source,
|
|
||||||
'created_at', to_char(created_at_service, 'YYYY-MM-DD HH24:MI:SS'),
|
|
||||||
'scheduled_at', to_char(scheduled_at, 'YYYY-MM-DD HH24:MI:SS')
|
|
||||||
),
|
|
||||||
'geometry', ST_AsGeoJSON(geom)::jsonb
|
|
||||||
)
|
|
||||||
), '[]'::jsonb)
|
|
||||||
)
|
|
||||||
) INTO v_result FROM filtered;
|
|
||||||
RETURN v_result;
|
|
||||||
END $fn$;
|
|
||||||
|
|
||||||
COMMENT ON FUNCTION reporting.fn_tickets_for_map(text, text, boolean) IS
|
|
||||||
'INC/CRQ tickets (tickets.inc + tickets.crq) as GeoJSON for the FleetOps map. Migration 22.';
|
|
||||||
|
|
||||||
-- ── grants ────────────────────────────────────────────────────────────────────
|
|
||||||
DO $grants$
|
|
||||||
BEGIN
|
|
||||||
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'tracksolid_owner') THEN
|
|
||||||
GRANT USAGE, CREATE ON SCHEMA tickets TO tracksolid_owner;
|
|
||||||
GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA tickets TO tracksolid_owner;
|
|
||||||
GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA tickets TO tracksolid_owner;
|
|
||||||
END IF;
|
|
||||||
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'dashboard_ro') THEN
|
|
||||||
GRANT USAGE ON SCHEMA tickets TO dashboard_ro;
|
|
||||||
GRANT SELECT ON tickets.inc, tickets.crq, tickets.geo_clusters TO dashboard_ro;
|
|
||||||
GRANT EXECUTE ON FUNCTION reporting.fn_tickets_for_map(text, text, boolean) TO dashboard_ro;
|
|
||||||
END IF;
|
|
||||||
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'grafana_ro') THEN
|
|
||||||
GRANT USAGE ON SCHEMA tickets TO grafana_ro;
|
|
||||||
GRANT SELECT ON tickets.inc, tickets.crq, tickets.geo_clusters TO grafana_ro;
|
|
||||||
GRANT EXECUTE ON FUNCTION reporting.fn_tickets_for_map(text, text, boolean) TO grafana_ro;
|
|
||||||
END IF;
|
|
||||||
END $grants$;
|
|
||||||
|
|
@ -1,246 +0,0 @@
|
||||||
-- 23_tickets_raw_first.sql
|
|
||||||
-- Make `raw` (jsonb) the single source of truth for tickets, and add a
|
|
||||||
-- location-level geocode cache so INC pins can be placed precisely (parsed from
|
|
||||||
-- region + cluster + location_name) instead of all stacking on a cluster centroid.
|
|
||||||
--
|
|
||||||
-- * tickets.inc / tickets.crq --> slim to (ticket_id, raw, geom, geo_source, ingested_at)
|
|
||||||
-- (the ~31 flattened typed columns are DROPPED; raw keeps them)
|
|
||||||
-- * tickets.geo_locations --> NEW: cleaned-location -> coordinates cache
|
|
||||||
-- * geom resolution --> feed(raw lat/lng) -> location(cache) -> cluster(gazetteer) -> none
|
|
||||||
--
|
|
||||||
-- The trigger/resolve/read-function are repointed to read from `raw`. The read
|
|
||||||
-- function reporting.fn_tickets_for_map keeps its SAME signature, so dashboard_api
|
|
||||||
-- and the FleetOps map need no change. Safe to re-apply.
|
|
||||||
|
|
||||||
SET search_path = tickets, public;
|
|
||||||
|
|
||||||
-- ── 1. slim the per-type tables to raw-first ─────────────────────────────────
|
|
||||||
DO $slim$
|
|
||||||
DECLARE c text;
|
|
||||||
BEGIN
|
|
||||||
FOREACH c IN ARRAY ARRAY[
|
|
||||||
'source_type','service_type','bucket','raw_status','normalized_status',
|
|
||||||
'created_at_service','scheduled_at','closed_at','last_seen_at','first_seen_at',
|
|
||||||
'week_start','week_end','cluster','region','location_name','latitude','longitude',
|
|
||||||
'department','assigned_team','owner','sla_status','mttr','is_auto_created',
|
|
||||||
'is_auto_closed','is_alarm','is_actionable','source_s3_bucket','source_s3_key',
|
|
||||||
'source_snapshot_id','created_at','updated_at']
|
|
||||||
LOOP
|
|
||||||
EXECUTE format('ALTER TABLE tickets.inc DROP COLUMN IF EXISTS %I', c);
|
|
||||||
EXECUTE format('ALTER TABLE tickets.crq DROP COLUMN IF EXISTS %I', c);
|
|
||||||
END LOOP;
|
|
||||||
END $slim$;
|
|
||||||
|
|
||||||
-- raw is now mandatory (all rows already carry it).
|
|
||||||
ALTER TABLE tickets.inc ALTER COLUMN raw SET NOT NULL;
|
|
||||||
ALTER TABLE tickets.crq ALTER COLUMN raw SET NOT NULL;
|
|
||||||
|
|
||||||
-- ── 2. location geocode cache ────────────────────────────────────────────────
|
|
||||||
-- query_key = tickets.norm_cluster(location_name): a normalised key on the source
|
|
||||||
-- location string (norm_cluster just upper/collapse/trims — generic). The loader
|
|
||||||
-- geocodes the *cleaned* place (region+cluster+location_name, codes stripped) but
|
|
||||||
-- caches under the raw location_name key so resolve_ticket_geoms can join in SQL
|
|
||||||
-- without re-deriving the regex.
|
|
||||||
CREATE TABLE IF NOT EXISTS tickets.geo_locations (
|
|
||||||
query_key text PRIMARY KEY,
|
|
||||||
location_name text,
|
|
||||||
cluster text,
|
|
||||||
region text,
|
|
||||||
query text, -- the cleaned string actually sent to the geocoder
|
|
||||||
lat double precision,
|
|
||||||
lng double precision,
|
|
||||||
geom geometry(Point, 4326),
|
|
||||||
confidence numeric,
|
|
||||||
provider text,
|
|
||||||
verified boolean NOT NULL DEFAULT false,
|
|
||||||
updated_at timestamptz NOT NULL DEFAULT now()
|
|
||||||
);
|
|
||||||
|
|
||||||
CREATE OR REPLACE FUNCTION tickets.tg_geo_locations_geom()
|
|
||||||
RETURNS trigger LANGUAGE plpgsql AS $fn$
|
|
||||||
BEGIN
|
|
||||||
IF NEW.lat IS NOT NULL AND NEW.lng IS NOT NULL
|
|
||||||
AND NEW.lat BETWEEN -90 AND 90 AND NEW.lng BETWEEN -180 AND 180
|
|
||||||
AND NOT (NEW.lat = 0 AND NEW.lng = 0) THEN
|
|
||||||
NEW.geom := ST_SetSRID(ST_MakePoint(NEW.lng, NEW.lat), 4326);
|
|
||||||
ELSE
|
|
||||||
NEW.geom := NULL;
|
|
||||||
END IF;
|
|
||||||
NEW.updated_at := now();
|
|
||||||
RETURN NEW;
|
|
||||||
END $fn$;
|
|
||||||
DROP TRIGGER IF EXISTS trg_geo_locations_geom ON tickets.geo_locations;
|
|
||||||
CREATE TRIGGER trg_geo_locations_geom BEFORE INSERT OR UPDATE ON tickets.geo_locations
|
|
||||||
FOR EACH ROW EXECUTE FUNCTION tickets.tg_geo_locations_geom();
|
|
||||||
|
|
||||||
-- ── 3. geom trigger — read from raw; never clobber a deliberate geom-only update ─
|
|
||||||
CREATE OR REPLACE FUNCTION tickets.tg_ticket_geom()
|
|
||||||
RETURNS trigger LANGUAGE plpgsql AS $fn$
|
|
||||||
DECLARE
|
|
||||||
v_lat double precision := NULLIF(NEW.raw->>'latitude','')::double precision;
|
|
||||||
v_lng double precision := NULLIF(NEW.raw->>'longitude','')::double precision;
|
|
||||||
g geometry(Point, 4326);
|
|
||||||
BEGIN
|
|
||||||
-- A geom/geo_source-only UPDATE (raw unchanged — e.g. resolve_ticket_geoms or the
|
|
||||||
-- location geocode pass) must keep the caller's geom, not recompute it.
|
|
||||||
IF TG_OP = 'UPDATE' AND NEW.raw IS NOT DISTINCT FROM OLD.raw THEN
|
|
||||||
RETURN NEW;
|
|
||||||
END IF;
|
|
||||||
IF v_lat IS NOT NULL AND v_lng IS NOT NULL
|
|
||||||
AND v_lat BETWEEN -90 AND 90 AND v_lng BETWEEN -180 AND 180
|
|
||||||
AND NOT (v_lat = 0 AND v_lng = 0) THEN
|
|
||||||
NEW.geom := ST_SetSRID(ST_MakePoint(v_lng, v_lat), 4326);
|
|
||||||
NEW.geo_source := 'feed';
|
|
||||||
ELSE
|
|
||||||
SELECT gc.geom INTO g FROM tickets.geo_clusters gc
|
|
||||||
WHERE gc.cluster_key = tickets.norm_cluster(NEW.raw->>'cluster') AND gc.geom IS NOT NULL LIMIT 1;
|
|
||||||
IF g IS NOT NULL THEN NEW.geom := g; NEW.geo_source := 'cluster';
|
|
||||||
ELSE NEW.geom := NULL; NEW.geo_source := 'none'; END IF;
|
|
||||||
END IF;
|
|
||||||
RETURN NEW;
|
|
||||||
END $fn$;
|
|
||||||
-- (triggers trg_inc_geom / trg_crq_geom from migration 22 already call this fn.)
|
|
||||||
|
|
||||||
-- ── 4. resolve — prefer location cache, else cluster centroid (non-feed rows) ──
|
|
||||||
CREATE OR REPLACE FUNCTION tickets.resolve_ticket_geoms()
|
|
||||||
RETURNS integer LANGUAGE plpgsql AS $fn$
|
|
||||||
DECLARE n integer; m integer;
|
|
||||||
BEGIN
|
|
||||||
UPDATE tickets.inc t
|
|
||||||
SET geom = COALESCE(loc.geom, gc.geom),
|
|
||||||
geo_source = CASE WHEN loc.geom IS NOT NULL THEN 'location'
|
|
||||||
WHEN gc.geom IS NOT NULL THEN 'cluster'
|
|
||||||
ELSE 'none' END
|
|
||||||
FROM tickets.inc base
|
|
||||||
LEFT JOIN tickets.geo_locations loc
|
|
||||||
ON loc.query_key = tickets.norm_cluster(base.raw->>'location_name') AND loc.geom IS NOT NULL
|
|
||||||
LEFT JOIN tickets.geo_clusters gc
|
|
||||||
ON gc.cluster_key = tickets.norm_cluster(base.raw->>'cluster') AND gc.geom IS NOT NULL
|
|
||||||
WHERE t.ticket_id = base.ticket_id
|
|
||||||
AND t.geo_source IS DISTINCT FROM 'feed'
|
|
||||||
AND (t.geom IS DISTINCT FROM COALESCE(loc.geom, gc.geom)
|
|
||||||
OR t.geo_source IS DISTINCT FROM CASE WHEN loc.geom IS NOT NULL THEN 'location'
|
|
||||||
WHEN gc.geom IS NOT NULL THEN 'cluster'
|
|
||||||
ELSE 'none' END);
|
|
||||||
GET DIAGNOSTICS n = ROW_COUNT;
|
|
||||||
|
|
||||||
UPDATE tickets.crq t
|
|
||||||
SET geom = COALESCE(loc.geom, gc.geom),
|
|
||||||
geo_source = CASE WHEN loc.geom IS NOT NULL THEN 'location'
|
|
||||||
WHEN gc.geom IS NOT NULL THEN 'cluster'
|
|
||||||
ELSE 'none' END
|
|
||||||
FROM tickets.crq base
|
|
||||||
LEFT JOIN tickets.geo_locations loc
|
|
||||||
ON loc.query_key = tickets.norm_cluster(base.raw->>'location_name') AND loc.geom IS NOT NULL
|
|
||||||
LEFT JOIN tickets.geo_clusters gc
|
|
||||||
ON gc.cluster_key = tickets.norm_cluster(base.raw->>'cluster') AND gc.geom IS NOT NULL
|
|
||||||
WHERE t.ticket_id = base.ticket_id
|
|
||||||
AND t.geo_source IS DISTINCT FROM 'feed'
|
|
||||||
AND (t.geom IS DISTINCT FROM COALESCE(loc.geom, gc.geom)
|
|
||||||
OR t.geo_source IS DISTINCT FROM CASE WHEN loc.geom IS NOT NULL THEN 'location'
|
|
||||||
WHEN gc.geom IS NOT NULL THEN 'cluster'
|
|
||||||
ELSE 'none' END);
|
|
||||||
GET DIAGNOSTICS m = ROW_COUNT;
|
|
||||||
RETURN n + m;
|
|
||||||
END $fn$;
|
|
||||||
|
|
||||||
-- ── 5. expression indexes (replace the dropped column indexes) ────────────────
|
|
||||||
CREATE INDEX IF NOT EXISTS ix_inc_status_raw ON tickets.inc ((raw->>'normalized_status'));
|
|
||||||
CREATE INDEX IF NOT EXISTS ix_inc_actionable_raw ON tickets.inc (((raw->>'is_actionable')::boolean))
|
|
||||||
WHERE (raw->>'is_actionable')::boolean;
|
|
||||||
CREATE INDEX IF NOT EXISTS ix_inc_cluster_raw ON tickets.inc (tickets.norm_cluster(raw->>'cluster'));
|
|
||||||
CREATE INDEX IF NOT EXISTS ix_inc_loc_raw ON tickets.inc (tickets.norm_cluster(raw->>'location_name'));
|
|
||||||
CREATE INDEX IF NOT EXISTS ix_inc_geom ON tickets.inc USING gist (geom);
|
|
||||||
CREATE INDEX IF NOT EXISTS ix_crq_status_raw ON tickets.crq ((raw->>'normalized_status'));
|
|
||||||
CREATE INDEX IF NOT EXISTS ix_crq_actionable_raw ON tickets.crq (((raw->>'is_actionable')::boolean))
|
|
||||||
WHERE (raw->>'is_actionable')::boolean;
|
|
||||||
CREATE INDEX IF NOT EXISTS ix_crq_cluster_raw ON tickets.crq (tickets.norm_cluster(raw->>'cluster'));
|
|
||||||
CREATE INDEX IF NOT EXISTS ix_crq_loc_raw ON tickets.crq (tickets.norm_cluster(raw->>'location_name'));
|
|
||||||
CREATE INDEX IF NOT EXISTS ix_crq_geom ON tickets.crq USING gist (geom);
|
|
||||||
|
|
||||||
-- ── 6. read function — extract every property from raw (same signature) ───────
|
|
||||||
CREATE OR REPLACE FUNCTION reporting.fn_tickets_for_map(
|
|
||||||
p_service_type text DEFAULT NULL,
|
|
||||||
p_status text DEFAULT NULL,
|
|
||||||
p_open_only boolean DEFAULT true
|
|
||||||
)
|
|
||||||
RETURNS jsonb LANGUAGE plpgsql STABLE AS $fn$
|
|
||||||
DECLARE v_result jsonb;
|
|
||||||
BEGIN
|
|
||||||
p_service_type := lower(NULLIF(p_service_type, ''));
|
|
||||||
p_status := NULLIF(p_status, '');
|
|
||||||
WITH filtered AS (
|
|
||||||
SELECT 'inc'::text AS service_type, raw, geom, geo_source FROM tickets.inc
|
|
||||||
WHERE geom IS NOT NULL
|
|
||||||
AND (p_service_type IS NULL OR p_service_type = 'inc')
|
|
||||||
AND (p_status IS NULL OR raw->>'normalized_status' = p_status)
|
|
||||||
AND (NOT p_open_only OR (raw->>'is_actionable')::boolean IS TRUE)
|
|
||||||
UNION ALL
|
|
||||||
SELECT 'crq'::text AS service_type, raw, geom, geo_source FROM tickets.crq
|
|
||||||
WHERE geom IS NOT NULL
|
|
||||||
AND (p_service_type IS NULL OR p_service_type = 'crq')
|
|
||||||
AND (p_status IS NULL OR raw->>'normalized_status' = p_status)
|
|
||||||
AND (NOT p_open_only OR (raw->>'is_actionable')::boolean IS TRUE)
|
|
||||||
)
|
|
||||||
SELECT jsonb_build_object(
|
|
||||||
'summary', jsonb_build_object(
|
|
||||||
'ticket_count', COUNT(*),
|
|
||||||
'inc', COUNT(*) FILTER (WHERE service_type = 'inc'),
|
|
||||||
'crq', COUNT(*) FILTER (WHERE service_type = 'crq'),
|
|
||||||
'open', COUNT(*) FILTER (WHERE (raw->>'is_actionable')::boolean IS TRUE),
|
|
||||||
'by_status', (SELECT jsonb_object_agg(s, c)
|
|
||||||
FROM (SELECT raw->>'normalized_status' AS s, COUNT(*) AS c
|
|
||||||
FROM filtered GROUP BY raw->>'normalized_status') z)
|
|
||||||
),
|
|
||||||
'geojson', jsonb_build_object(
|
|
||||||
'type', 'FeatureCollection',
|
|
||||||
'features', COALESCE(jsonb_agg(
|
|
||||||
jsonb_build_object(
|
|
||||||
'type', 'Feature',
|
|
||||||
'properties', jsonb_build_object(
|
|
||||||
'ticket_id', raw->>'ticket_id',
|
|
||||||
'service_type', service_type,
|
|
||||||
'status', raw->>'normalized_status',
|
|
||||||
'raw_status', raw->>'raw_status',
|
|
||||||
'cluster', raw->>'cluster',
|
|
||||||
'region', raw->>'region',
|
|
||||||
'location_name', raw->>'location_name',
|
|
||||||
'department', raw->>'department',
|
|
||||||
'owner', raw->>'owner',
|
|
||||||
'assigned_team', raw->>'assigned_team',
|
|
||||||
'sla_status', raw->>'sla_status',
|
|
||||||
'is_actionable', (raw->>'is_actionable')::boolean,
|
|
||||||
'geo_source', geo_source,
|
|
||||||
'created_at', raw->>'created_at_service',
|
|
||||||
'scheduled_at', raw->>'scheduled_at'
|
|
||||||
),
|
|
||||||
'geometry', ST_AsGeoJSON(geom)::jsonb
|
|
||||||
)
|
|
||||||
), '[]'::jsonb)
|
|
||||||
)
|
|
||||||
) INTO v_result FROM filtered;
|
|
||||||
RETURN v_result;
|
|
||||||
END $fn$;
|
|
||||||
|
|
||||||
COMMENT ON FUNCTION reporting.fn_tickets_for_map(text, text, boolean) IS
|
|
||||||
'INC/CRQ tickets (tickets.inc + tickets.crq, raw-jsonb-first) as GeoJSON. Migration 23.';
|
|
||||||
|
|
||||||
-- ── 7. grants ─────────────────────────────────────────────────────────────────
|
|
||||||
DO $grants$
|
|
||||||
BEGIN
|
|
||||||
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'tracksolid_owner') THEN
|
|
||||||
GRANT USAGE, CREATE ON SCHEMA tickets TO tracksolid_owner;
|
|
||||||
GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA tickets TO tracksolid_owner;
|
|
||||||
GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA tickets TO tracksolid_owner;
|
|
||||||
END IF;
|
|
||||||
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'dashboard_ro') THEN
|
|
||||||
GRANT USAGE ON SCHEMA tickets TO dashboard_ro;
|
|
||||||
GRANT SELECT ON tickets.inc, tickets.crq, tickets.geo_clusters, tickets.geo_locations TO dashboard_ro;
|
|
||||||
GRANT EXECUTE ON FUNCTION reporting.fn_tickets_for_map(text, text, boolean) TO dashboard_ro;
|
|
||||||
END IF;
|
|
||||||
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'grafana_ro') THEN
|
|
||||||
GRANT USAGE ON SCHEMA tickets TO grafana_ro;
|
|
||||||
GRANT SELECT ON tickets.inc, tickets.crq, tickets.geo_clusters, tickets.geo_locations TO grafana_ro;
|
|
||||||
GRANT EXECUTE ON FUNCTION reporting.fn_tickets_for_map(text, text, boolean) TO grafana_ro;
|
|
||||||
END IF;
|
|
||||||
END $grants$;
|
|
||||||
|
|
@ -44,9 +44,10 @@ MIGRATIONS = [
|
||||||
"18_grant_reporting_ro.sql", # grant SELECT on reporting.* to grafana_ro (staging read-only role)
|
"18_grant_reporting_ro.sql", # grant SELECT on reporting.* to grafana_ro (staging read-only role)
|
||||||
"19_v_ingest_health.sql", # reporting.v_ingest_health — pipeline freshness (replaces Grafana panels)
|
"19_v_ingest_health.sql", # reporting.v_ingest_health — pipeline freshness (replaces Grafana panels)
|
||||||
"20_restore_live_feed.sql", # re-assert v_live_positions exclusion + fn_live_positions vehicle_type (migration-order regression fix)
|
"20_restore_live_feed.sql", # re-assert v_live_positions exclusion + fn_live_positions vehicle_type (migration-order regression fix)
|
||||||
"21_tickets.sql", # tracksolid.tickets + geo_clusters gazetteer + reporting.fn_tickets_for_map (FleetOps Tickets map)
|
# The `tickets` schema (INC/CRQ map) was migrations 21-23; it now lives in its
|
||||||
"22_tickets_schema.sql", # move tickets into `tickets` schema; split into tickets.inc + tickets.crq; fn_tickets_for_map unions them
|
# own repo — repo.rahamafresh.com/kianiadee/fleettickets.git (run its
|
||||||
"23_tickets_raw_first.sql", # slim tickets.inc/crq to raw-jsonb-first; geo_locations cache; location-level geocoding; fn reads from raw
|
# run_migrations.py). dashboard_api still serves GET /webhook/tickets via
|
||||||
|
# reporting.fn_tickets_for_map, which fleettickets defines.
|
||||||
]
|
]
|
||||||
|
|
||||||
# ── Tables that must exist before the service is allowed to start ─────────────
|
# ── Tables that must exist before the service is allowed to start ─────────────
|
||||||
|
|
|
||||||
|
|
@ -18,6 +18,8 @@ docker exec -it "$WK" python -m tools.<name> [args]
|
||||||
| `audit_device_reconciliation.py` | Read-only: reconcile the vehicle CSV (`data/`) against `tracksolid.devices`; reports gaps + NULL fields | re-runnable audit |
|
| `audit_device_reconciliation.py` | Read-only: reconcile the vehicle CSV (`data/`) against `tracksolid.devices`; reports gaps + NULL fields | re-runnable audit |
|
||||||
| `import_drivers_csv.py` | Populate device names/plates from CSV (`--apply` to commit). The registry is already populated — kept for future bulk re-imports | one-shot (done) |
|
| `import_drivers_csv.py` | Populate device names/plates from CSV (`--apply` to commit). The registry is already populated — kept for future bulk re-imports | one-shot (done) |
|
||||||
| `backfill_trips_enrichment.py` | One-shot backfill of historical `tracksolid.trips` (route_geom/addresses/plate) for rows predating migration 09 (`--apply` to commit) | one-shot (done) |
|
| `backfill_trips_enrichment.py` | One-shot backfill of historical `tracksolid.trips` (route_geom/addresses/plate) for rows predating migration 09 (`--apply` to commit) | one-shot (done) |
|
||||||
| `import_tickets.py` | Ingest INC/CRQ ticket snapshots (rustfs `tickets` bucket → `tracksolid.tickets`); `--from-bucket --apply`. `--geocode-clusters` seeds the cluster gazetteer that puts tickets on the FleetOps map. Needs migration 21 | re-runnable (per snapshot) |
|
|
||||||
|
> INC/CRQ ticket ingestion (`import_tickets.py`) moved to its own repo —
|
||||||
|
> `repo.rahamafresh.com/kianiadee/fleettickets.git`.
|
||||||
|
|
||||||
`data/` holds the source CSVs the import/audit scripts read (default: `data/20260427_FSG_Vehicles_mitieng.csv`).
|
`data/` holds the source CSVs the import/audit scripts read (default: `data/20260427_FSG_Vehicles_mitieng.csv`).
|
||||||
|
|
|
||||||
|
|
@ -1,377 +0,0 @@
|
||||||
"""
|
|
||||||
import_tickets.py — Fireside Communications · INC/CRQ ticket ingestion (raw-first)
|
|
||||||
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
|
||||||
Loads the client's field-ops ticket snapshots into the `tickets` schema — the
|
|
||||||
source of the FleetOps "Tickets" map. Two categories, one table each:
|
|
||||||
tickets.inc — incidents / customer faults
|
|
||||||
tickets.crq — new-installation requests
|
|
||||||
|
|
||||||
RAW-FIRST: each row stores only `ticket_id` + `raw` (the full source record as
|
|
||||||
jsonb). Everything downstream reads from `raw` (resilient to source schema drift).
|
|
||||||
The DB derives `geom` (migration 23): feed coords (raw lat/lng) -> location geocode
|
|
||||||
(tickets.geo_locations) -> cluster centroid (tickets.geo_clusters) -> none.
|
|
||||||
|
|
||||||
Source data: rustfs `tickets` bucket, full snapshots from the client's email
|
|
||||||
automation — automations/{inc,crq}/latest.json (array of 32-key objects).
|
|
||||||
|
|
||||||
Geocoding (two layers, both via a KEYED provider — public Nominatim rate-limits):
|
|
||||||
--geocode-clusters one coordinate per cluster (coarse fallback; ~50 lookups)
|
|
||||||
--geocode-locations precise per-location for ACTIONABLE INC tickets: parses the
|
|
||||||
real place out of location_name (region+cluster+location_name,
|
|
||||||
network codes stripped), geocodes it, caches in
|
|
||||||
tickets.geo_locations, then re-resolves geoms.
|
|
||||||
Provider/key from env: GEOCODER_PROVIDER (locationiq|opencage), GEOCODER_API_KEY.
|
|
||||||
|
|
||||||
Usage (run as a module from the image root):
|
|
||||||
WK=$(docker ps --filter name=ingest_worker --format "{{.Names}}" | head -1)
|
|
||||||
docker exec -it "$WK" python -m tools.import_tickets --from-bucket --apply
|
|
||||||
docker exec -it "$WK" python -m tools.import_tickets \
|
|
||||||
--inc-json /tmp/inc.json --crq-json /tmp/crq.json --apply
|
|
||||||
docker exec -it "$WK" python -m tools.import_tickets --geocode-clusters --apply
|
|
||||||
docker exec -it "$WK" python -m tools.import_tickets --geocode-locations --apply
|
|
||||||
|
|
||||||
Pre-requisite: migrations 21–23 applied (tickets.inc/crq + geo_clusters +
|
|
||||||
geo_locations + reporting.fn_tickets_for_map).
|
|
||||||
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
|
||||||
"""
|
|
||||||
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import argparse
|
|
||||||
import json
|
|
||||||
import math
|
|
||||||
import os
|
|
||||||
import re
|
|
||||||
import subprocess
|
|
||||||
import time
|
|
||||||
|
|
||||||
import requests
|
|
||||||
import psycopg2.extras
|
|
||||||
|
|
||||||
from ts_shared_rev import clean, get_conn, get_logger
|
|
||||||
|
|
||||||
log = get_logger("import_tickets")
|
|
||||||
|
|
||||||
TABLE_FOR = {"inc": "tickets.inc", "crq": "tickets.crq"}
|
|
||||||
_BUCKET = os.getenv("TICKETS_BUCKET", "tickets")
|
|
||||||
_BUCKET_KEYS = {"inc": "automations/inc/latest.json", "crq": "automations/crq/latest.json"}
|
|
||||||
|
|
||||||
# Geocoder (keyed) — public Nominatim rate-limits bulk, so we use LocationIQ/OpenCage.
|
|
||||||
_PROVIDER = os.getenv("GEOCODER_PROVIDER", "locationiq").lower()
|
|
||||||
_API_KEY = os.getenv("GEOCODER_API_KEY", "")
|
|
||||||
_GEOCODE_INTERVAL_S = float(os.getenv("GEOCODER_MIN_INTERVAL_S", "1.1"))
|
|
||||||
_last_geocode_at = 0.0
|
|
||||||
|
|
||||||
|
|
||||||
# ── data loading ──────────────────────────────────────────────────────────────
|
|
||||||
def _load_local(path: str) -> list[dict]:
|
|
||||||
with open(path, encoding="utf-8") as f:
|
|
||||||
data = json.load(f) # json.loads accepts NaN by default
|
|
||||||
return data if isinstance(data, list) else []
|
|
||||||
|
|
||||||
|
|
||||||
def _load_bucket(kind: str) -> list[dict]:
|
|
||||||
env = {
|
|
||||||
**os.environ,
|
|
||||||
"AWS_ACCESS_KEY_ID": os.environ["RUSTFS_ACCESS_KEY"],
|
|
||||||
"AWS_SECRET_ACCESS_KEY": os.environ["RUSTFS_SECRET_KEY"],
|
|
||||||
"AWS_DEFAULT_REGION": os.getenv("RUSTFS_REGION", "us-east-1"),
|
|
||||||
}
|
|
||||||
uri = f"s3://{_BUCKET}/{_BUCKET_KEYS[kind]}"
|
|
||||||
log.info("fetching %s", uri)
|
|
||||||
out = subprocess.run(
|
|
||||||
["aws", "--endpoint-url", os.environ["RUSTFS_ENDPOINT"], "s3", "cp", uri, "-"],
|
|
||||||
env=env, capture_output=True, timeout=180, check=True,
|
|
||||||
).stdout
|
|
||||||
data = json.loads(out.decode("utf-8"))
|
|
||||||
return data if isinstance(data, list) else []
|
|
||||||
|
|
||||||
|
|
||||||
# ── upsert (raw-first) ────────────────────────────────────────────────────────
|
|
||||||
def _scrub_nan(row: dict) -> dict:
|
|
||||||
# Postgres jsonb rejects the JSON `NaN` token (e.g. mttr) — scrub to null.
|
|
||||||
return {k: (None if isinstance(v, float) and math.isnan(v) else v) for k, v in row.items()}
|
|
||||||
|
|
||||||
|
|
||||||
def upsert(rows: list[dict], table: str, apply: bool) -> int:
|
|
||||||
payload = [
|
|
||||||
(tid, psycopg2.extras.Json(_scrub_nan(r)))
|
|
||||||
for r in rows
|
|
||||||
if (tid := clean(r.get("ticket_id")))
|
|
||||||
]
|
|
||||||
log.info("%s: %d valid rows (skipped %d without ticket_id)",
|
|
||||||
table, len(payload), len(rows) - len(payload))
|
|
||||||
if not apply:
|
|
||||||
log.info("DRY-RUN — nothing written to %s. Use --apply.", table)
|
|
||||||
return len(payload)
|
|
||||||
with get_conn() as conn:
|
|
||||||
with conn.cursor() as cur:
|
|
||||||
psycopg2.extras.execute_values(
|
|
||||||
cur,
|
|
||||||
f"INSERT INTO {table} (ticket_id, raw) VALUES %s "
|
|
||||||
"ON CONFLICT (ticket_id) DO UPDATE SET raw = EXCLUDED.raw, ingested_at = now()",
|
|
||||||
payload, page_size=500,
|
|
||||||
)
|
|
||||||
log.info("upserted %d rows into %s", len(payload), table)
|
|
||||||
return len(payload)
|
|
||||||
|
|
||||||
|
|
||||||
def ingest(args) -> None:
|
|
||||||
if args.from_bucket:
|
|
||||||
for kind in ("inc", "crq"):
|
|
||||||
upsert(_load_bucket(kind), TABLE_FOR[kind], args.apply)
|
|
||||||
else:
|
|
||||||
if args.inc_json:
|
|
||||||
upsert(_load_local(args.inc_json), TABLE_FOR["inc"], args.apply)
|
|
||||||
if args.crq_json:
|
|
||||||
upsert(_load_local(args.crq_json), TABLE_FOR["crq"], args.apply)
|
|
||||||
|
|
||||||
|
|
||||||
# ── place extraction (strip network codes, keep the real place) ───────────────
|
|
||||||
# Leading site-code prefixes (NW_, CO_, ADR_MNT_, COAST_, …) — applied repeatedly.
|
|
||||||
_PREFIX_RE = re.compile(r"^(?:NW|NE|NM|SW|SE|CO|COAST|ADR|MNT|CMT|DR|NAIROBI|FIBER\w*)[\s_]+")
|
|
||||||
# Inline network/work-order codes to drop wherever they appear.
|
|
||||||
_CODE_RE = re.compile(
|
|
||||||
r"\b(?:SDUS|SDU|MDUS|MDU|FDT\s*\d*|AP|CLUSTER\s*\d*[A-Z]?|PHASE\s*\d+|CL\s*\d+|MNT|SITE|ADR)\b"
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def extract_place(location_name: str | None) -> str:
|
|
||||||
"""Pull the human place/landmark out of a coded location_name string.
|
|
||||||
|
|
||||||
e.g. 'NW_RUIRU KAMAKIS_DEEP EAST APARTMENT-37' -> 'RUIRU KAMAKIS DEEP EAST APARTMENT'
|
|
||||||
"""
|
|
||||||
s = (location_name or "").upper().strip()
|
|
||||||
if not s:
|
|
||||||
return ""
|
|
||||||
# drop the trailing '-<unit/instruction>' segment (e.g. -37, -CALL CLIENT, -F32)
|
|
||||||
if "-" in s:
|
|
||||||
s = s.rsplit("-", 1)[0]
|
|
||||||
s = s.replace("_", " ")
|
|
||||||
# strip leading site-code prefixes (may be stacked: ADR MNT KAHAWA…)
|
|
||||||
prev = None
|
|
||||||
while prev != s:
|
|
||||||
prev = s
|
|
||||||
s = _PREFIX_RE.sub("", s).strip()
|
|
||||||
s = _CODE_RE.sub(" ", s)
|
|
||||||
s = re.sub(r"\s+", " ", s).strip(" ,-")
|
|
||||||
return s
|
|
||||||
|
|
||||||
|
|
||||||
def compose_query(location_name: str | None, cluster: str | None, region: str | None) -> str:
|
|
||||||
parts = [p for p in (extract_place(location_name), clean(cluster), clean(region), "Kenya") if p]
|
|
||||||
return ", ".join(dict.fromkeys(parts)) # de-dupe while preserving order
|
|
||||||
|
|
||||||
|
|
||||||
# ── keyed geocoder ────────────────────────────────────────────────────────────
|
|
||||||
def _throttle() -> None:
|
|
||||||
global _last_geocode_at
|
|
||||||
wait = _GEOCODE_INTERVAL_S - (time.monotonic() - _last_geocode_at)
|
|
||||||
if wait > 0:
|
|
||||||
time.sleep(wait)
|
|
||||||
_last_geocode_at = time.monotonic()
|
|
||||||
|
|
||||||
|
|
||||||
def _haversine_km(lat1: float, lng1: float, lat2: float, lng2: float) -> float:
|
|
||||||
dlat, dlng = math.radians(lat2 - lat1), math.radians(lng2 - lng1)
|
|
||||||
a = (math.sin(dlat / 2) ** 2
|
|
||||||
+ math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(dlng / 2) ** 2)
|
|
||||||
return 2 * 6371.0 * math.asin(math.sqrt(a))
|
|
||||||
|
|
||||||
|
|
||||||
def geocode(query: str, viewbox: tuple | None = None) -> tuple[float, float, float | None] | None:
|
|
||||||
"""Forward-geocode via the configured keyed provider. (lat, lng, confidence) | None.
|
|
||||||
|
|
||||||
`viewbox` = (min_lon, min_lat, max_lon, max_lat) constrains results to a box
|
|
||||||
around the cluster centroid (bounded), which stops the geocoder matching a
|
|
||||||
landmark name in the wrong city.
|
|
||||||
"""
|
|
||||||
if not _API_KEY:
|
|
||||||
log.error("GEOCODER_API_KEY is not set — cannot geocode (provider=%s)", _PROVIDER)
|
|
||||||
return None
|
|
||||||
_throttle()
|
|
||||||
try:
|
|
||||||
if _PROVIDER == "opencage":
|
|
||||||
params = {"key": _API_KEY, "q": query, "limit": 1, "countrycode": "ke", "no_annotations": 1}
|
|
||||||
if viewbox:
|
|
||||||
params["bounds"] = "%s,%s,%s,%s" % viewbox
|
|
||||||
r = requests.get("https://api.opencagedata.com/geocode/v1/json", params=params, timeout=15)
|
|
||||||
r.raise_for_status()
|
|
||||||
res = (r.json().get("results") or [])
|
|
||||||
if res:
|
|
||||||
g = res[0]["geometry"]
|
|
||||||
return float(g["lat"]), float(g["lng"]), res[0].get("confidence")
|
|
||||||
else: # locationiq (default)
|
|
||||||
params = {"key": _API_KEY, "q": query, "format": "json", "limit": 1, "countrycodes": "ke"}
|
|
||||||
if viewbox:
|
|
||||||
params["viewbox"] = "%s,%s,%s,%s" % viewbox
|
|
||||||
params["bounded"] = 1
|
|
||||||
r = requests.get("https://us1.locationiq.com/v1/search", params=params, timeout=15)
|
|
||||||
if r.status_code == 404: # LocationIQ returns 404 for "no matches"
|
|
||||||
return None
|
|
||||||
r.raise_for_status()
|
|
||||||
hits = r.json()
|
|
||||||
if hits:
|
|
||||||
h = hits[0]
|
|
||||||
return float(h["lat"]), float(h["lon"]), float(h.get("importance") or 0)
|
|
||||||
except (requests.RequestException, ValueError, KeyError) as e:
|
|
||||||
log.warning("geocode failed for %r: %s", query, e)
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
# ── cluster gazetteer (coarse fallback) ───────────────────────────────────────
|
|
||||||
def geocode_clusters(apply: bool) -> None:
|
|
||||||
with get_conn() as conn:
|
|
||||||
with conn.cursor() as cur:
|
|
||||||
cur.execute(
|
|
||||||
"""
|
|
||||||
SELECT key, region FROM (
|
|
||||||
SELECT tickets.norm_cluster(raw->>'cluster') AS key,
|
|
||||||
(array_agg(raw->>'region'))[1] AS region
|
|
||||||
FROM tickets.inc WHERE raw->>'cluster' IS NOT NULL GROUP BY 1
|
|
||||||
UNION
|
|
||||||
SELECT tickets.norm_cluster(raw->>'cluster'),
|
|
||||||
(array_agg(raw->>'region'))[1]
|
|
||||||
FROM tickets.crq WHERE raw->>'cluster' IS NOT NULL GROUP BY 1
|
|
||||||
) z
|
|
||||||
WHERE key IS NOT NULL
|
|
||||||
AND NOT EXISTS (SELECT 1 FROM tickets.geo_clusters g
|
|
||||||
WHERE g.cluster_key = z.key AND g.geom IS NOT NULL)
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
todo = cur.fetchall()
|
|
||||||
log.info("%d clusters to geocode", len(todo))
|
|
||||||
if not apply:
|
|
||||||
for key, region in todo:
|
|
||||||
log.info(" would geocode cluster: %s (%s)", key, region)
|
|
||||||
return
|
|
||||||
written = 0
|
|
||||||
for key, region in todo:
|
|
||||||
hit = geocode(f"{key}, {region}, Kenya" if region else f"{key}, Kenya")
|
|
||||||
if not hit:
|
|
||||||
continue
|
|
||||||
lat, lng, _ = hit
|
|
||||||
with get_conn() as conn:
|
|
||||||
with conn.cursor() as cur:
|
|
||||||
cur.execute(
|
|
||||||
"""INSERT INTO tickets.geo_clusters (cluster_key, region, lat, lng, source, verified)
|
|
||||||
VALUES (%s, %s, %s, %s, %s, false)
|
|
||||||
ON CONFLICT (cluster_key) DO UPDATE
|
|
||||||
SET region = EXCLUDED.region, lat = EXCLUDED.lat,
|
|
||||||
lng = EXCLUDED.lng, source = EXCLUDED.source""",
|
|
||||||
(key, region, lat, lng, _PROVIDER),
|
|
||||||
)
|
|
||||||
written += 1
|
|
||||||
_resolve()
|
|
||||||
log.info("gazetteer: %d clusters written (unverified — review tickets.geo_clusters)", written)
|
|
||||||
|
|
||||||
|
|
||||||
# ── per-location geocoding (precise; actionable INC) ──────────────────────────
|
|
||||||
# A location geocode is only trusted if it lands within this radius of the
|
|
||||||
# cluster centroid; otherwise the geocoder matched the landmark in the wrong
|
|
||||||
# place and we fall back to the cluster centroid.
|
|
||||||
_MAX_KM_FROM_CLUSTER = float(os.getenv("GEOCODER_MAX_KM", "25"))
|
|
||||||
_VIEWBOX_DEG = 0.2 # ~22 km half-box around the cluster centroid
|
|
||||||
|
|
||||||
|
|
||||||
def geocode_locations(apply: bool) -> None:
|
|
||||||
with get_conn() as conn:
|
|
||||||
with conn.cursor() as cur:
|
|
||||||
cur.execute(
|
|
||||||
"""
|
|
||||||
SELECT t.key, t.location_name, t.cluster, t.region, gc.lat AS clat, gc.lng AS clng
|
|
||||||
FROM (
|
|
||||||
SELECT tickets.norm_cluster(raw->>'location_name') AS key,
|
|
||||||
(array_agg(raw->>'location_name'))[1] AS location_name,
|
|
||||||
(array_agg(raw->>'cluster'))[1] AS cluster,
|
|
||||||
(array_agg(raw->>'region'))[1] AS region,
|
|
||||||
tickets.norm_cluster((array_agg(raw->>'cluster'))[1]) AS ckey
|
|
||||||
FROM tickets.inc
|
|
||||||
WHERE (raw->>'is_actionable')::boolean
|
|
||||||
AND raw->>'location_name' IS NOT NULL
|
|
||||||
AND tickets.norm_cluster(raw->>'location_name') IS NOT NULL
|
|
||||||
AND NOT EXISTS (SELECT 1 FROM tickets.geo_locations gl
|
|
||||||
WHERE gl.query_key = tickets.norm_cluster(raw->>'location_name')
|
|
||||||
AND gl.geom IS NOT NULL)
|
|
||||||
GROUP BY 1
|
|
||||||
) t
|
|
||||||
LEFT JOIN tickets.geo_clusters gc ON gc.cluster_key = t.ckey
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
todo = cur.fetchall()
|
|
||||||
log.info("%d actionable-INC locations to geocode (provider=%s)", len(todo), _PROVIDER)
|
|
||||||
if not apply:
|
|
||||||
for key, loc, cluster, region, clat, clng in todo[:50]:
|
|
||||||
log.info(" %s -> %r", key, compose_query(loc, cluster, region))
|
|
||||||
return
|
|
||||||
written = rejected = 0
|
|
||||||
for key, loc, cluster, region, clat, clng in todo:
|
|
||||||
query = compose_query(loc, cluster, region)
|
|
||||||
viewbox = None
|
|
||||||
if clat is not None and clng is not None:
|
|
||||||
viewbox = (clng - _VIEWBOX_DEG, clat - _VIEWBOX_DEG, clng + _VIEWBOX_DEG, clat + _VIEWBOX_DEG)
|
|
||||||
hit = geocode(query, viewbox)
|
|
||||||
if not hit:
|
|
||||||
continue
|
|
||||||
lat, lng, conf = hit
|
|
||||||
# distance sanity: a result far from the cluster centroid is a wrong-city
|
|
||||||
# match — drop it so the ticket keeps the cluster-centroid fallback.
|
|
||||||
if clat is not None and clng is not None:
|
|
||||||
km = _haversine_km(lat, lng, clat, clng)
|
|
||||||
if km > _MAX_KM_FROM_CLUSTER:
|
|
||||||
rejected += 1
|
|
||||||
log.info(" reject (%.0f km from cluster): %s", km, query)
|
|
||||||
continue
|
|
||||||
with get_conn() as conn:
|
|
||||||
with conn.cursor() as cur:
|
|
||||||
cur.execute(
|
|
||||||
"""INSERT INTO tickets.geo_locations
|
|
||||||
(query_key, location_name, cluster, region, query, lat, lng, confidence, provider)
|
|
||||||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
||||||
ON CONFLICT (query_key) DO UPDATE
|
|
||||||
SET location_name = EXCLUDED.location_name, cluster = EXCLUDED.cluster,
|
|
||||||
region = EXCLUDED.region, query = EXCLUDED.query, lat = EXCLUDED.lat,
|
|
||||||
lng = EXCLUDED.lng, confidence = EXCLUDED.confidence, provider = EXCLUDED.provider""",
|
|
||||||
(key, loc, cluster, region, query, lat, lng, conf, _PROVIDER),
|
|
||||||
)
|
|
||||||
written += 1
|
|
||||||
log.info(" geocoded %s -> %.5f, %.5f", query, lat, lng)
|
|
||||||
n = _resolve()
|
|
||||||
log.info("locations: %d accepted, %d rejected (too far); re-resolved geom on %d tickets "
|
|
||||||
"(unverified — review tickets.geo_locations)", written, rejected, n)
|
|
||||||
|
|
||||||
|
|
||||||
def _resolve() -> int:
|
|
||||||
with get_conn() as conn:
|
|
||||||
with conn.cursor() as cur:
|
|
||||||
cur.execute("SELECT tickets.resolve_ticket_geoms()")
|
|
||||||
return cur.fetchone()[0]
|
|
||||||
|
|
||||||
|
|
||||||
# ── entrypoint ────────────────────────────────────────────────────────────────
|
|
||||||
def main() -> None:
|
|
||||||
ap = argparse.ArgumentParser(description="Ingest INC/CRQ tickets (raw-first) + geocode")
|
|
||||||
ap.add_argument("--apply", action="store_true", help="Write to DB (default: dry-run)")
|
|
||||||
ap.add_argument("--from-bucket", action="store_true",
|
|
||||||
help="Fetch latest.json for inc+crq from the rustfs tickets bucket (aws CLI)")
|
|
||||||
ap.add_argument("--inc-json", default=None, help="Local INC tickets JSON file")
|
|
||||||
ap.add_argument("--crq-json", default=None, help="Local CRQ tickets JSON file")
|
|
||||||
ap.add_argument("--geocode-clusters", action="store_true",
|
|
||||||
help="Geocode distinct clusters into the gazetteer, then re-resolve geoms")
|
|
||||||
ap.add_argument("--geocode-locations", action="store_true",
|
|
||||||
help="Geocode actionable-INC location_names precisely (keyed provider), then re-resolve")
|
|
||||||
args = ap.parse_args()
|
|
||||||
|
|
||||||
if args.geocode_clusters:
|
|
||||||
geocode_clusters(apply=args.apply)
|
|
||||||
return
|
|
||||||
if args.geocode_locations:
|
|
||||||
geocode_locations(apply=args.apply)
|
|
||||||
return
|
|
||||||
if not (args.from_bucket or args.inc_json or args.crq_json):
|
|
||||||
ap.error("provide --from-bucket, --inc-json/--crq-json, --geocode-clusters, or --geocode-locations")
|
|
||||||
ingest(args)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
Loading…
Reference in a new issue