feat: fleettickets — INC/CRQ ticket ingestion, geocoding + read-schema
Standalone module extracted from the tracksolid repo (was migrations 21-23 + tools/import_tickets.py). Owns the `tickets` schema in the shared tracksolid_db. - migrations/01_tickets_schema.sql: consolidated final-state schema (tickets.inc/ crq raw-jsonb-first, geo_clusters + geo_locations gazetteers, geom trigger, reporting.fn_tickets_for_map) - import_tickets.py: rustfs bucket ingest + cluster/location geocoding (LocationIQ/OpenCage, viewbox-bounded + cluster-distance guard) - run_migrations.py, shared.py (self-contained), pyproject, .env.example, README The DB stays in tracksolid_db; dashboard_api keeps serving /webhook/tickets; the Tickets map stays a FleetOps tab. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
commit
4631cc6382
8 changed files with 879 additions and 0 deletions
17
.env.example
Normal file
17
.env.example
Normal file
|
|
@ -0,0 +1,17 @@
|
||||||
|
# fleettickets — copy to .env and fill in. NEVER commit the real .env.
|
||||||
|
|
||||||
|
# Shared database (the `tickets` schema lives in tracksolid_db; internal Docker host)
|
||||||
|
DATABASE_URL=postgresql://tracksolid_owner:<password>@timescale_db:5432/tracksolid_db
|
||||||
|
|
||||||
|
# rustfs / S3 — source ticket snapshots (automations/{inc,crq}/latest.json)
|
||||||
|
RUSTFS_ENDPOINT=https://s3.rahamafresh.com
|
||||||
|
RUSTFS_ACCESS_KEY=<key>
|
||||||
|
RUSTFS_SECRET_KEY=<secret>
|
||||||
|
RUSTFS_REGION=us-east-1
|
||||||
|
TICKETS_BUCKET=tickets
|
||||||
|
|
||||||
|
# Geocoder (keyed — public Nominatim rate-limits bulk)
|
||||||
|
GEOCODER_PROVIDER=locationiq # locationiq | opencage
|
||||||
|
GEOCODER_API_KEY=<key>
|
||||||
|
GEOCODER_MIN_INTERVAL_S=1.1 # throttle to provider TOS
|
||||||
|
GEOCODER_MAX_KM=25 # reject a location geocode this far from its cluster centroid
|
||||||
8
.gitignore
vendored
Normal file
8
.gitignore
vendored
Normal file
|
|
@ -0,0 +1,8 @@
|
||||||
|
.env
|
||||||
|
__pycache__/
|
||||||
|
*.pyc
|
||||||
|
.venv/
|
||||||
|
uv.lock
|
||||||
|
*.json
|
||||||
|
!.*.json
|
||||||
|
.DS_Store
|
||||||
71
README.md
Normal file
71
README.md
Normal file
|
|
@ -0,0 +1,71 @@
|
||||||
|
# fleettickets
|
||||||
|
|
||||||
|
Field-ops **INC / CRQ ticket** ingestion, geocoding, and read-schema that powers the
|
||||||
|
**Tickets** map in FleetOps. Extracted from the `tracksolid` repo into its own module
|
||||||
|
(it previously lived there as migrations 21–23 + `tools/import_tickets.py`).
|
||||||
|
|
||||||
|
- **INC** — incident / customer-fault tickets
|
||||||
|
- **CRQ** — new-installation requests
|
||||||
|
|
||||||
|
## What this owns
|
||||||
|
|
||||||
|
| Piece | What |
|
||||||
|
|---|---|
|
||||||
|
| `migrations/01_tickets_schema.sql` | The `tickets` schema: `tickets.inc` / `tickets.crq` (raw-jsonb-first), `tickets.geo_clusters` + `tickets.geo_locations` gazetteers, geom-resolution trigger, and `reporting.fn_tickets_for_map` (the GeoJSON read function) |
|
||||||
|
| `import_tickets.py` | Pulls ticket snapshots from the rustfs `tickets` bucket and upserts them; geocodes clusters + INC locations |
|
||||||
|
| `run_migrations.py` | Applies `migrations/*.sql` in order (ledger: `tickets.schema_migrations`) |
|
||||||
|
| `shared.py` | Minimal DB/logging helpers (self-contained — no tracksolid dependency) |
|
||||||
|
|
||||||
|
## What this does NOT own (stays where it is)
|
||||||
|
|
||||||
|
- **The DB** — the `tickets` schema lives in the shared `tracksolid_db`.
|
||||||
|
- **The read-API** — `dashboard_api` (in the tracksolid stack) serves
|
||||||
|
`GET /webhook/tickets`, which calls `reporting.fn_tickets_for_map` (defined here).
|
||||||
|
- **The frontend** — the Tickets map is a tab in the **FleetOps** SPA (`fleetops` repo).
|
||||||
|
|
||||||
|
## Data model (raw-first)
|
||||||
|
|
||||||
|
Each row is just `ticket_id` + `raw` (the full source record as `jsonb`) + a derived
|
||||||
|
`geom` / `geo_source`. Everything reads from `raw`, so a change to the source schema
|
||||||
|
needs no migration. `geom` is resolved: **feed** coords (`raw` lat/lng) → **location**
|
||||||
|
(geocoded `location_name`) → **cluster** centroid → **none**.
|
||||||
|
|
||||||
|
Source coordinates are empty in the feed, so geocoding is required:
|
||||||
|
- `--geocode-clusters` — one coordinate per cluster (coarse fallback).
|
||||||
|
- `--geocode-locations` — precise per-location for **actionable INC** tickets: strips the
|
||||||
|
network codes from `location_name` (e.g. `NW_`, `ADR_MNT_`, `FDT<n>`, `SDUS`), geocodes
|
||||||
|
the real place via a **keyed** provider (LocationIQ / OpenCage), and **rejects any result
|
||||||
|
>25 km from the cluster centroid** (wrong-city guard). Results cache in
|
||||||
|
`tickets.geo_locations`.
|
||||||
|
|
||||||
|
## Setup
|
||||||
|
|
||||||
|
```bash
|
||||||
|
uv sync
|
||||||
|
cp .env.example .env # fill in DATABASE_URL, RUSTFS_*, GEOCODER_*
|
||||||
|
python run_migrations.py # apply the schema (idempotent)
|
||||||
|
```
|
||||||
|
|
||||||
|
## Run
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# ingest the latest snapshots from the bucket
|
||||||
|
python import_tickets.py --from-bucket --apply
|
||||||
|
|
||||||
|
# geocode (needs GEOCODER_API_KEY)
|
||||||
|
python import_tickets.py --geocode-clusters --apply # coarse, once
|
||||||
|
python import_tickets.py --geocode-locations --apply # precise, actionable INC
|
||||||
|
|
||||||
|
# from local files instead of the bucket
|
||||||
|
python import_tickets.py --inc-json inc.json --crq-json crq.json --apply
|
||||||
|
```
|
||||||
|
|
||||||
|
Dry-run is the default (omit `--apply`). `import_tickets.py --from-bucket` shells out to
|
||||||
|
the `aws` CLI using the `RUSTFS_*` env (no boto3 dependency).
|
||||||
|
|
||||||
|
## Notes
|
||||||
|
|
||||||
|
- The `changes/` subdirectory in the bucket holds **full timestamped snapshots** (not
|
||||||
|
deltas) — ingest `latest.json` only; don't process `changes/`.
|
||||||
|
- The curated/geocoded coordinates are written `verified = false` — review
|
||||||
|
`tickets.geo_clusters` / `tickets.geo_locations` and flip `verified` once checked.
|
||||||
375
import_tickets.py
Normal file
375
import_tickets.py
Normal file
|
|
@ -0,0 +1,375 @@
|
||||||
|
"""
|
||||||
|
import_tickets.py — Fireside Communications · INC/CRQ ticket ingestion (raw-first)
|
||||||
|
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||||||
|
Loads the client's field-ops ticket snapshots into the `tickets` schema — the
|
||||||
|
source of the FleetOps "Tickets" map. Two categories, one table each:
|
||||||
|
tickets.inc — incidents / customer faults
|
||||||
|
tickets.crq — new-installation requests
|
||||||
|
|
||||||
|
RAW-FIRST: each row stores only `ticket_id` + `raw` (the full source record as
|
||||||
|
jsonb). Everything downstream reads from `raw` (resilient to source schema drift).
|
||||||
|
The DB derives `geom` (see migrations): feed coords (raw lat/lng) -> location geocode
|
||||||
|
(tickets.geo_locations) -> cluster centroid (tickets.geo_clusters) -> none.
|
||||||
|
|
||||||
|
Source data: rustfs `tickets` bucket, full snapshots from the client's email
|
||||||
|
automation — automations/{inc,crq}/latest.json (array of 32-key objects).
|
||||||
|
|
||||||
|
Geocoding (two layers, both via a KEYED provider — public Nominatim rate-limits):
|
||||||
|
--geocode-clusters one coordinate per cluster (coarse fallback; ~50 lookups)
|
||||||
|
--geocode-locations precise per-location for ACTIONABLE INC tickets: parses the
|
||||||
|
real place out of location_name (region+cluster+location_name,
|
||||||
|
network codes stripped), geocodes it, caches in
|
||||||
|
tickets.geo_locations, then re-resolves geoms.
|
||||||
|
Provider/key from env: GEOCODER_PROVIDER (locationiq|opencage), GEOCODER_API_KEY.
|
||||||
|
|
||||||
|
Usage (needs DATABASE_URL + RUSTFS_* + GEOCODER_* env; see .env.example):
|
||||||
|
python import_tickets.py --from-bucket --apply
|
||||||
|
python import_tickets.py --inc-json inc.json --crq-json crq.json --apply
|
||||||
|
python import_tickets.py --geocode-clusters --apply
|
||||||
|
python import_tickets.py --geocode-locations --apply
|
||||||
|
|
||||||
|
Pre-requisite: migration applied (run_migrations.py) — tickets.inc/crq +
|
||||||
|
geo_clusters + geo_locations + reporting.fn_tickets_for_map.
|
||||||
|
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import json
|
||||||
|
import math
|
||||||
|
import os
|
||||||
|
import re
|
||||||
|
import subprocess
|
||||||
|
import time
|
||||||
|
|
||||||
|
import requests
|
||||||
|
import psycopg2.extras
|
||||||
|
|
||||||
|
from shared import clean, get_conn, get_logger
|
||||||
|
|
||||||
|
log = get_logger("import_tickets")
|
||||||
|
|
||||||
|
TABLE_FOR = {"inc": "tickets.inc", "crq": "tickets.crq"}
|
||||||
|
_BUCKET = os.getenv("TICKETS_BUCKET", "tickets")
|
||||||
|
_BUCKET_KEYS = {"inc": "automations/inc/latest.json", "crq": "automations/crq/latest.json"}
|
||||||
|
|
||||||
|
# Geocoder (keyed) — public Nominatim rate-limits bulk, so we use LocationIQ/OpenCage.
|
||||||
|
_PROVIDER = os.getenv("GEOCODER_PROVIDER", "locationiq").lower()
|
||||||
|
_API_KEY = os.getenv("GEOCODER_API_KEY", "")
|
||||||
|
_GEOCODE_INTERVAL_S = float(os.getenv("GEOCODER_MIN_INTERVAL_S", "1.1"))
|
||||||
|
_last_geocode_at = 0.0
|
||||||
|
|
||||||
|
|
||||||
|
# ── data loading ──────────────────────────────────────────────────────────────
|
||||||
|
def _load_local(path: str) -> list[dict]:
|
||||||
|
with open(path, encoding="utf-8") as f:
|
||||||
|
data = json.load(f) # json.loads accepts NaN by default
|
||||||
|
return data if isinstance(data, list) else []
|
||||||
|
|
||||||
|
|
||||||
|
def _load_bucket(kind: str) -> list[dict]:
|
||||||
|
env = {
|
||||||
|
**os.environ,
|
||||||
|
"AWS_ACCESS_KEY_ID": os.environ["RUSTFS_ACCESS_KEY"],
|
||||||
|
"AWS_SECRET_ACCESS_KEY": os.environ["RUSTFS_SECRET_KEY"],
|
||||||
|
"AWS_DEFAULT_REGION": os.getenv("RUSTFS_REGION", "us-east-1"),
|
||||||
|
}
|
||||||
|
uri = f"s3://{_BUCKET}/{_BUCKET_KEYS[kind]}"
|
||||||
|
log.info("fetching %s", uri)
|
||||||
|
out = subprocess.run(
|
||||||
|
["aws", "--endpoint-url", os.environ["RUSTFS_ENDPOINT"], "s3", "cp", uri, "-"],
|
||||||
|
env=env, capture_output=True, timeout=180, check=True,
|
||||||
|
).stdout
|
||||||
|
data = json.loads(out.decode("utf-8"))
|
||||||
|
return data if isinstance(data, list) else []
|
||||||
|
|
||||||
|
|
||||||
|
# ── upsert (raw-first) ────────────────────────────────────────────────────────
|
||||||
|
def _scrub_nan(row: dict) -> dict:
|
||||||
|
# Postgres jsonb rejects the JSON `NaN` token (e.g. mttr) — scrub to null.
|
||||||
|
return {k: (None if isinstance(v, float) and math.isnan(v) else v) for k, v in row.items()}
|
||||||
|
|
||||||
|
|
||||||
|
def upsert(rows: list[dict], table: str, apply: bool) -> int:
|
||||||
|
payload = [
|
||||||
|
(tid, psycopg2.extras.Json(_scrub_nan(r)))
|
||||||
|
for r in rows
|
||||||
|
if (tid := clean(r.get("ticket_id")))
|
||||||
|
]
|
||||||
|
log.info("%s: %d valid rows (skipped %d without ticket_id)",
|
||||||
|
table, len(payload), len(rows) - len(payload))
|
||||||
|
if not apply:
|
||||||
|
log.info("DRY-RUN — nothing written to %s. Use --apply.", table)
|
||||||
|
return len(payload)
|
||||||
|
with get_conn() as conn:
|
||||||
|
with conn.cursor() as cur:
|
||||||
|
psycopg2.extras.execute_values(
|
||||||
|
cur,
|
||||||
|
f"INSERT INTO {table} (ticket_id, raw) VALUES %s "
|
||||||
|
"ON CONFLICT (ticket_id) DO UPDATE SET raw = EXCLUDED.raw, ingested_at = now()",
|
||||||
|
payload, page_size=500,
|
||||||
|
)
|
||||||
|
log.info("upserted %d rows into %s", len(payload), table)
|
||||||
|
return len(payload)
|
||||||
|
|
||||||
|
|
||||||
|
def ingest(args) -> None:
|
||||||
|
if args.from_bucket:
|
||||||
|
for kind in ("inc", "crq"):
|
||||||
|
upsert(_load_bucket(kind), TABLE_FOR[kind], args.apply)
|
||||||
|
else:
|
||||||
|
if args.inc_json:
|
||||||
|
upsert(_load_local(args.inc_json), TABLE_FOR["inc"], args.apply)
|
||||||
|
if args.crq_json:
|
||||||
|
upsert(_load_local(args.crq_json), TABLE_FOR["crq"], args.apply)
|
||||||
|
|
||||||
|
|
||||||
|
# ── place extraction (strip network codes, keep the real place) ───────────────
|
||||||
|
# Leading site-code prefixes (NW_, CO_, ADR_MNT_, COAST_, …) — applied repeatedly.
|
||||||
|
_PREFIX_RE = re.compile(r"^(?:NW|NE|NM|SW|SE|CO|COAST|ADR|MNT|CMT|DR|NAIROBI|FIBER\w*)[\s_]+")
|
||||||
|
# Inline network/work-order codes to drop wherever they appear.
|
||||||
|
_CODE_RE = re.compile(
|
||||||
|
r"\b(?:SDUS|SDU|MDUS|MDU|FDT\s*\d*|AP|CLUSTER\s*\d*[A-Z]?|PHASE\s*\d+|CL\s*\d+|MNT|SITE|ADR)\b"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def extract_place(location_name: str | None) -> str:
|
||||||
|
"""Pull the human place/landmark out of a coded location_name string.
|
||||||
|
|
||||||
|
e.g. 'NW_RUIRU KAMAKIS_DEEP EAST APARTMENT-37' -> 'RUIRU KAMAKIS DEEP EAST APARTMENT'
|
||||||
|
"""
|
||||||
|
s = (location_name or "").upper().strip()
|
||||||
|
if not s:
|
||||||
|
return ""
|
||||||
|
# drop the trailing '-<unit/instruction>' segment (e.g. -37, -CALL CLIENT, -F32)
|
||||||
|
if "-" in s:
|
||||||
|
s = s.rsplit("-", 1)[0]
|
||||||
|
s = s.replace("_", " ")
|
||||||
|
# strip leading site-code prefixes (may be stacked: ADR MNT KAHAWA…)
|
||||||
|
prev = None
|
||||||
|
while prev != s:
|
||||||
|
prev = s
|
||||||
|
s = _PREFIX_RE.sub("", s).strip()
|
||||||
|
s = _CODE_RE.sub(" ", s)
|
||||||
|
s = re.sub(r"\s+", " ", s).strip(" ,-")
|
||||||
|
return s
|
||||||
|
|
||||||
|
|
||||||
|
def compose_query(location_name: str | None, cluster: str | None, region: str | None) -> str:
|
||||||
|
parts = [p for p in (extract_place(location_name), clean(cluster), clean(region), "Kenya") if p]
|
||||||
|
return ", ".join(dict.fromkeys(parts)) # de-dupe while preserving order
|
||||||
|
|
||||||
|
|
||||||
|
# ── keyed geocoder ────────────────────────────────────────────────────────────
|
||||||
|
def _throttle() -> None:
|
||||||
|
global _last_geocode_at
|
||||||
|
wait = _GEOCODE_INTERVAL_S - (time.monotonic() - _last_geocode_at)
|
||||||
|
if wait > 0:
|
||||||
|
time.sleep(wait)
|
||||||
|
_last_geocode_at = time.monotonic()
|
||||||
|
|
||||||
|
|
||||||
|
def _haversine_km(lat1: float, lng1: float, lat2: float, lng2: float) -> float:
|
||||||
|
dlat, dlng = math.radians(lat2 - lat1), math.radians(lng2 - lng1)
|
||||||
|
a = (math.sin(dlat / 2) ** 2
|
||||||
|
+ math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(dlng / 2) ** 2)
|
||||||
|
return 2 * 6371.0 * math.asin(math.sqrt(a))
|
||||||
|
|
||||||
|
|
||||||
|
def geocode(query: str, viewbox: tuple | None = None) -> tuple[float, float, float | None] | None:
|
||||||
|
"""Forward-geocode via the configured keyed provider. (lat, lng, confidence) | None.
|
||||||
|
|
||||||
|
`viewbox` = (min_lon, min_lat, max_lon, max_lat) constrains results to a box
|
||||||
|
around the cluster centroid (bounded), which stops the geocoder matching a
|
||||||
|
landmark name in the wrong city.
|
||||||
|
"""
|
||||||
|
if not _API_KEY:
|
||||||
|
log.error("GEOCODER_API_KEY is not set — cannot geocode (provider=%s)", _PROVIDER)
|
||||||
|
return None
|
||||||
|
_throttle()
|
||||||
|
try:
|
||||||
|
if _PROVIDER == "opencage":
|
||||||
|
params = {"key": _API_KEY, "q": query, "limit": 1, "countrycode": "ke", "no_annotations": 1}
|
||||||
|
if viewbox:
|
||||||
|
params["bounds"] = "%s,%s,%s,%s" % viewbox
|
||||||
|
r = requests.get("https://api.opencagedata.com/geocode/v1/json", params=params, timeout=15)
|
||||||
|
r.raise_for_status()
|
||||||
|
res = (r.json().get("results") or [])
|
||||||
|
if res:
|
||||||
|
g = res[0]["geometry"]
|
||||||
|
return float(g["lat"]), float(g["lng"]), res[0].get("confidence")
|
||||||
|
else: # locationiq (default)
|
||||||
|
params = {"key": _API_KEY, "q": query, "format": "json", "limit": 1, "countrycodes": "ke"}
|
||||||
|
if viewbox:
|
||||||
|
params["viewbox"] = "%s,%s,%s,%s" % viewbox
|
||||||
|
params["bounded"] = 1
|
||||||
|
r = requests.get("https://us1.locationiq.com/v1/search", params=params, timeout=15)
|
||||||
|
if r.status_code == 404: # LocationIQ returns 404 for "no matches"
|
||||||
|
return None
|
||||||
|
r.raise_for_status()
|
||||||
|
hits = r.json()
|
||||||
|
if hits:
|
||||||
|
h = hits[0]
|
||||||
|
return float(h["lat"]), float(h["lon"]), float(h.get("importance") or 0)
|
||||||
|
except (requests.RequestException, ValueError, KeyError) as e:
|
||||||
|
log.warning("geocode failed for %r: %s", query, e)
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
# ── cluster gazetteer (coarse fallback) ───────────────────────────────────────
|
||||||
|
def geocode_clusters(apply: bool) -> None:
|
||||||
|
with get_conn() as conn:
|
||||||
|
with conn.cursor() as cur:
|
||||||
|
cur.execute(
|
||||||
|
"""
|
||||||
|
SELECT key, region FROM (
|
||||||
|
SELECT tickets.norm_cluster(raw->>'cluster') AS key,
|
||||||
|
(array_agg(raw->>'region'))[1] AS region
|
||||||
|
FROM tickets.inc WHERE raw->>'cluster' IS NOT NULL GROUP BY 1
|
||||||
|
UNION
|
||||||
|
SELECT tickets.norm_cluster(raw->>'cluster'),
|
||||||
|
(array_agg(raw->>'region'))[1]
|
||||||
|
FROM tickets.crq WHERE raw->>'cluster' IS NOT NULL GROUP BY 1
|
||||||
|
) z
|
||||||
|
WHERE key IS NOT NULL
|
||||||
|
AND NOT EXISTS (SELECT 1 FROM tickets.geo_clusters g
|
||||||
|
WHERE g.cluster_key = z.key AND g.geom IS NOT NULL)
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
todo = cur.fetchall()
|
||||||
|
log.info("%d clusters to geocode", len(todo))
|
||||||
|
if not apply:
|
||||||
|
for key, region in todo:
|
||||||
|
log.info(" would geocode cluster: %s (%s)", key, region)
|
||||||
|
return
|
||||||
|
written = 0
|
||||||
|
for key, region in todo:
|
||||||
|
hit = geocode(f"{key}, {region}, Kenya" if region else f"{key}, Kenya")
|
||||||
|
if not hit:
|
||||||
|
continue
|
||||||
|
lat, lng, _ = hit
|
||||||
|
with get_conn() as conn:
|
||||||
|
with conn.cursor() as cur:
|
||||||
|
cur.execute(
|
||||||
|
"""INSERT INTO tickets.geo_clusters (cluster_key, region, lat, lng, source, verified)
|
||||||
|
VALUES (%s, %s, %s, %s, %s, false)
|
||||||
|
ON CONFLICT (cluster_key) DO UPDATE
|
||||||
|
SET region = EXCLUDED.region, lat = EXCLUDED.lat,
|
||||||
|
lng = EXCLUDED.lng, source = EXCLUDED.source""",
|
||||||
|
(key, region, lat, lng, _PROVIDER),
|
||||||
|
)
|
||||||
|
written += 1
|
||||||
|
_resolve()
|
||||||
|
log.info("gazetteer: %d clusters written (unverified — review tickets.geo_clusters)", written)
|
||||||
|
|
||||||
|
|
||||||
|
# ── per-location geocoding (precise; actionable INC) ──────────────────────────
|
||||||
|
# A location geocode is only trusted if it lands within this radius of the
|
||||||
|
# cluster centroid; otherwise the geocoder matched the landmark in the wrong
|
||||||
|
# place and we fall back to the cluster centroid.
|
||||||
|
_MAX_KM_FROM_CLUSTER = float(os.getenv("GEOCODER_MAX_KM", "25"))
|
||||||
|
_VIEWBOX_DEG = 0.2 # ~22 km half-box around the cluster centroid
|
||||||
|
|
||||||
|
|
||||||
|
def geocode_locations(apply: bool) -> None:
|
||||||
|
with get_conn() as conn:
|
||||||
|
with conn.cursor() as cur:
|
||||||
|
cur.execute(
|
||||||
|
"""
|
||||||
|
SELECT t.key, t.location_name, t.cluster, t.region, gc.lat AS clat, gc.lng AS clng
|
||||||
|
FROM (
|
||||||
|
SELECT tickets.norm_cluster(raw->>'location_name') AS key,
|
||||||
|
(array_agg(raw->>'location_name'))[1] AS location_name,
|
||||||
|
(array_agg(raw->>'cluster'))[1] AS cluster,
|
||||||
|
(array_agg(raw->>'region'))[1] AS region,
|
||||||
|
tickets.norm_cluster((array_agg(raw->>'cluster'))[1]) AS ckey
|
||||||
|
FROM tickets.inc
|
||||||
|
WHERE (raw->>'is_actionable')::boolean
|
||||||
|
AND raw->>'location_name' IS NOT NULL
|
||||||
|
AND tickets.norm_cluster(raw->>'location_name') IS NOT NULL
|
||||||
|
AND NOT EXISTS (SELECT 1 FROM tickets.geo_locations gl
|
||||||
|
WHERE gl.query_key = tickets.norm_cluster(raw->>'location_name')
|
||||||
|
AND gl.geom IS NOT NULL)
|
||||||
|
GROUP BY 1
|
||||||
|
) t
|
||||||
|
LEFT JOIN tickets.geo_clusters gc ON gc.cluster_key = t.ckey
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
todo = cur.fetchall()
|
||||||
|
log.info("%d actionable-INC locations to geocode (provider=%s)", len(todo), _PROVIDER)
|
||||||
|
if not apply:
|
||||||
|
for key, loc, cluster, region, clat, clng in todo[:50]:
|
||||||
|
log.info(" %s -> %r", key, compose_query(loc, cluster, region))
|
||||||
|
return
|
||||||
|
written = rejected = 0
|
||||||
|
for key, loc, cluster, region, clat, clng in todo:
|
||||||
|
query = compose_query(loc, cluster, region)
|
||||||
|
viewbox = None
|
||||||
|
if clat is not None and clng is not None:
|
||||||
|
viewbox = (clng - _VIEWBOX_DEG, clat - _VIEWBOX_DEG, clng + _VIEWBOX_DEG, clat + _VIEWBOX_DEG)
|
||||||
|
hit = geocode(query, viewbox)
|
||||||
|
if not hit:
|
||||||
|
continue
|
||||||
|
lat, lng, conf = hit
|
||||||
|
# distance sanity: a result far from the cluster centroid is a wrong-city
|
||||||
|
# match — drop it so the ticket keeps the cluster-centroid fallback.
|
||||||
|
if clat is not None and clng is not None:
|
||||||
|
km = _haversine_km(lat, lng, clat, clng)
|
||||||
|
if km > _MAX_KM_FROM_CLUSTER:
|
||||||
|
rejected += 1
|
||||||
|
log.info(" reject (%.0f km from cluster): %s", km, query)
|
||||||
|
continue
|
||||||
|
with get_conn() as conn:
|
||||||
|
with conn.cursor() as cur:
|
||||||
|
cur.execute(
|
||||||
|
"""INSERT INTO tickets.geo_locations
|
||||||
|
(query_key, location_name, cluster, region, query, lat, lng, confidence, provider)
|
||||||
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||||||
|
ON CONFLICT (query_key) DO UPDATE
|
||||||
|
SET location_name = EXCLUDED.location_name, cluster = EXCLUDED.cluster,
|
||||||
|
region = EXCLUDED.region, query = EXCLUDED.query, lat = EXCLUDED.lat,
|
||||||
|
lng = EXCLUDED.lng, confidence = EXCLUDED.confidence, provider = EXCLUDED.provider""",
|
||||||
|
(key, loc, cluster, region, query, lat, lng, conf, _PROVIDER),
|
||||||
|
)
|
||||||
|
written += 1
|
||||||
|
log.info(" geocoded %s -> %.5f, %.5f", query, lat, lng)
|
||||||
|
n = _resolve()
|
||||||
|
log.info("locations: %d accepted, %d rejected (too far); re-resolved geom on %d tickets "
|
||||||
|
"(unverified — review tickets.geo_locations)", written, rejected, n)
|
||||||
|
|
||||||
|
|
||||||
|
def _resolve() -> int:
|
||||||
|
with get_conn() as conn:
|
||||||
|
with conn.cursor() as cur:
|
||||||
|
cur.execute("SELECT tickets.resolve_ticket_geoms()")
|
||||||
|
return cur.fetchone()[0]
|
||||||
|
|
||||||
|
|
||||||
|
# ── entrypoint ────────────────────────────────────────────────────────────────
|
||||||
|
def main() -> None:
|
||||||
|
ap = argparse.ArgumentParser(description="Ingest INC/CRQ tickets (raw-first) + geocode")
|
||||||
|
ap.add_argument("--apply", action="store_true", help="Write to DB (default: dry-run)")
|
||||||
|
ap.add_argument("--from-bucket", action="store_true",
|
||||||
|
help="Fetch latest.json for inc+crq from the rustfs tickets bucket (aws CLI)")
|
||||||
|
ap.add_argument("--inc-json", default=None, help="Local INC tickets JSON file")
|
||||||
|
ap.add_argument("--crq-json", default=None, help="Local CRQ tickets JSON file")
|
||||||
|
ap.add_argument("--geocode-clusters", action="store_true",
|
||||||
|
help="Geocode distinct clusters into the gazetteer, then re-resolve geoms")
|
||||||
|
ap.add_argument("--geocode-locations", action="store_true",
|
||||||
|
help="Geocode actionable-INC location_names precisely (keyed provider), then re-resolve")
|
||||||
|
args = ap.parse_args()
|
||||||
|
|
||||||
|
if args.geocode_clusters:
|
||||||
|
geocode_clusters(apply=args.apply)
|
||||||
|
return
|
||||||
|
if args.geocode_locations:
|
||||||
|
geocode_locations(apply=args.apply)
|
||||||
|
return
|
||||||
|
if not (args.from_bucket or args.inc_json or args.crq_json):
|
||||||
|
ap.error("provide --from-bucket, --inc-json/--crq-json, --geocode-clusters, or --geocode-locations")
|
||||||
|
ingest(args)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
276
migrations/01_tickets_schema.sql
Normal file
276
migrations/01_tickets_schema.sql
Normal file
|
|
@ -0,0 +1,276 @@
|
||||||
|
-- 01_tickets_schema.sql — fleettickets · INC/CRQ ticket store (raw-jsonb-first)
|
||||||
|
-- ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
-- Consolidated final-state schema for the field-ops ticket layer. This supersedes
|
||||||
|
-- the historical tracksolid migrations 21→23 (where this feature was first built);
|
||||||
|
-- fleettickets now owns the `tickets` schema. The schema lives in the shared
|
||||||
|
-- database (`tracksolid_db` today) so the existing dashboard_api read-API and the
|
||||||
|
-- FleetOps Tickets map keep working unchanged.
|
||||||
|
--
|
||||||
|
-- tickets.inc / tickets.crq one raw-jsonb row per ticket (ticket_id + raw +
|
||||||
|
-- derived geom/geo_source). INC = incident/fault,
|
||||||
|
-- CRQ = new-installation.
|
||||||
|
-- tickets.geo_clusters cluster -> coordinate gazetteer (coarse fallback)
|
||||||
|
-- tickets.geo_locations cleaned-location -> coordinate cache (precise)
|
||||||
|
-- reporting.fn_tickets_for_map GeoJSON read function consumed by dashboard_api
|
||||||
|
--
|
||||||
|
-- geom resolution: feed coords (raw lat/lng) -> location cache -> cluster centroid
|
||||||
|
-- -> none. Idempotent: safe on a fresh DB and re-appliable on the live DB.
|
||||||
|
-- Requires PostGIS (present on the shared DB; on a brand-new DB a superuser must
|
||||||
|
-- run CREATE EXTENSION postgis first).
|
||||||
|
-- ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
CREATE EXTENSION IF NOT EXISTS postgis;
|
||||||
|
CREATE SCHEMA IF NOT EXISTS tickets;
|
||||||
|
CREATE SCHEMA IF NOT EXISTS reporting; -- shared read layer (the fn lives here for dashboard_api)
|
||||||
|
SET search_path = tickets, public;
|
||||||
|
|
||||||
|
-- ── normalize helper (generic upper/collapse/trim key) ───────────────────────
|
||||||
|
CREATE OR REPLACE FUNCTION tickets.norm_cluster(p text)
|
||||||
|
RETURNS text LANGUAGE sql IMMUTABLE PARALLEL SAFE
|
||||||
|
AS $fn$ SELECT NULLIF(upper(regexp_replace(trim(COALESCE(p, '')), '\s+', ' ', 'g')), '') $fn$;
|
||||||
|
|
||||||
|
-- ── gazetteer: cluster -> coordinates (coarse fallback) ──────────────────────
|
||||||
|
CREATE TABLE IF NOT EXISTS tickets.geo_clusters (
|
||||||
|
cluster_key text PRIMARY KEY,
|
||||||
|
region text,
|
||||||
|
lat double precision,
|
||||||
|
lng double precision,
|
||||||
|
geom geometry(Point, 4326),
|
||||||
|
source text,
|
||||||
|
verified boolean NOT NULL DEFAULT false,
|
||||||
|
updated_at timestamptz NOT NULL DEFAULT now()
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION tickets.tg_geo_clusters_geom()
|
||||||
|
RETURNS trigger LANGUAGE plpgsql AS $fn$
|
||||||
|
BEGIN
|
||||||
|
IF NEW.lat IS NOT NULL AND NEW.lng IS NOT NULL
|
||||||
|
AND NEW.lat BETWEEN -90 AND 90 AND NEW.lng BETWEEN -180 AND 180
|
||||||
|
AND NOT (NEW.lat = 0 AND NEW.lng = 0) THEN
|
||||||
|
NEW.geom := ST_SetSRID(ST_MakePoint(NEW.lng, NEW.lat), 4326);
|
||||||
|
ELSE
|
||||||
|
NEW.geom := NULL;
|
||||||
|
END IF;
|
||||||
|
NEW.updated_at := now();
|
||||||
|
RETURN NEW;
|
||||||
|
END $fn$;
|
||||||
|
DROP TRIGGER IF EXISTS trg_geo_clusters_geom ON tickets.geo_clusters;
|
||||||
|
CREATE TRIGGER trg_geo_clusters_geom BEFORE INSERT OR UPDATE ON tickets.geo_clusters
|
||||||
|
FOR EACH ROW EXECUTE FUNCTION tickets.tg_geo_clusters_geom();
|
||||||
|
|
||||||
|
-- ── location geocode cache: cleaned location_name -> coordinates (precise) ───
|
||||||
|
-- query_key = tickets.norm_cluster(location_name); resolve joins on it without
|
||||||
|
-- re-deriving the place-extraction regex (that lives in the loader).
|
||||||
|
CREATE TABLE IF NOT EXISTS tickets.geo_locations (
|
||||||
|
query_key text PRIMARY KEY,
|
||||||
|
location_name text,
|
||||||
|
cluster text,
|
||||||
|
region text,
|
||||||
|
query text,
|
||||||
|
lat double precision,
|
||||||
|
lng double precision,
|
||||||
|
geom geometry(Point, 4326),
|
||||||
|
confidence numeric,
|
||||||
|
provider text,
|
||||||
|
verified boolean NOT NULL DEFAULT false,
|
||||||
|
updated_at timestamptz NOT NULL DEFAULT now()
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION tickets.tg_geo_locations_geom()
|
||||||
|
RETURNS trigger LANGUAGE plpgsql AS $fn$
|
||||||
|
BEGIN
|
||||||
|
IF NEW.lat IS NOT NULL AND NEW.lng IS NOT NULL
|
||||||
|
AND NEW.lat BETWEEN -90 AND 90 AND NEW.lng BETWEEN -180 AND 180
|
||||||
|
AND NOT (NEW.lat = 0 AND NEW.lng = 0) THEN
|
||||||
|
NEW.geom := ST_SetSRID(ST_MakePoint(NEW.lng, NEW.lat), 4326);
|
||||||
|
ELSE
|
||||||
|
NEW.geom := NULL;
|
||||||
|
END IF;
|
||||||
|
NEW.updated_at := now();
|
||||||
|
RETURN NEW;
|
||||||
|
END $fn$;
|
||||||
|
DROP TRIGGER IF EXISTS trg_geo_locations_geom ON tickets.geo_locations;
|
||||||
|
CREATE TRIGGER trg_geo_locations_geom BEFORE INSERT OR UPDATE ON tickets.geo_locations
|
||||||
|
FOR EACH ROW EXECUTE FUNCTION tickets.tg_geo_locations_geom();
|
||||||
|
|
||||||
|
-- ── per-type ticket tables (raw-jsonb-first) ─────────────────────────────────
|
||||||
|
CREATE TABLE IF NOT EXISTS tickets.inc (
|
||||||
|
ticket_id text PRIMARY KEY,
|
||||||
|
raw jsonb NOT NULL,
|
||||||
|
geom geometry(Point, 4326),
|
||||||
|
geo_source text, -- 'feed' | 'location' | 'cluster' | 'none'
|
||||||
|
ingested_at timestamptz NOT NULL DEFAULT now()
|
||||||
|
);
|
||||||
|
CREATE TABLE IF NOT EXISTS tickets.crq (LIKE tickets.inc INCLUDING ALL);
|
||||||
|
|
||||||
|
-- ── geom trigger — read from raw; never clobber a deliberate geom-only update ─
|
||||||
|
CREATE OR REPLACE FUNCTION tickets.tg_ticket_geom()
|
||||||
|
RETURNS trigger LANGUAGE plpgsql AS $fn$
|
||||||
|
DECLARE
|
||||||
|
v_lat double precision := NULLIF(NEW.raw->>'latitude','')::double precision;
|
||||||
|
v_lng double precision := NULLIF(NEW.raw->>'longitude','')::double precision;
|
||||||
|
g geometry(Point, 4326);
|
||||||
|
BEGIN
|
||||||
|
IF TG_OP = 'UPDATE' AND NEW.raw IS NOT DISTINCT FROM OLD.raw THEN
|
||||||
|
RETURN NEW; -- geom/geo_source-only update — keep caller's value
|
||||||
|
END IF;
|
||||||
|
IF v_lat IS NOT NULL AND v_lng IS NOT NULL
|
||||||
|
AND v_lat BETWEEN -90 AND 90 AND v_lng BETWEEN -180 AND 180
|
||||||
|
AND NOT (v_lat = 0 AND v_lng = 0) THEN
|
||||||
|
NEW.geom := ST_SetSRID(ST_MakePoint(v_lng, v_lat), 4326);
|
||||||
|
NEW.geo_source := 'feed';
|
||||||
|
ELSE
|
||||||
|
SELECT gc.geom INTO g FROM tickets.geo_clusters gc
|
||||||
|
WHERE gc.cluster_key = tickets.norm_cluster(NEW.raw->>'cluster') AND gc.geom IS NOT NULL LIMIT 1;
|
||||||
|
IF g IS NOT NULL THEN NEW.geom := g; NEW.geo_source := 'cluster';
|
||||||
|
ELSE NEW.geom := NULL; NEW.geo_source := 'none'; END IF;
|
||||||
|
END IF;
|
||||||
|
RETURN NEW;
|
||||||
|
END $fn$;
|
||||||
|
|
||||||
|
DROP TRIGGER IF EXISTS trg_inc_geom ON tickets.inc;
|
||||||
|
CREATE TRIGGER trg_inc_geom BEFORE INSERT OR UPDATE ON tickets.inc
|
||||||
|
FOR EACH ROW EXECUTE FUNCTION tickets.tg_ticket_geom();
|
||||||
|
DROP TRIGGER IF EXISTS trg_crq_geom ON tickets.crq;
|
||||||
|
CREATE TRIGGER trg_crq_geom BEFORE INSERT OR UPDATE ON tickets.crq
|
||||||
|
FOR EACH ROW EXECUTE FUNCTION tickets.tg_ticket_geom();
|
||||||
|
|
||||||
|
-- ── resolve — prefer location cache, else cluster centroid (non-feed rows) ───
|
||||||
|
CREATE OR REPLACE FUNCTION tickets.resolve_ticket_geoms()
|
||||||
|
RETURNS integer LANGUAGE plpgsql AS $fn$
|
||||||
|
DECLARE n integer; m integer;
|
||||||
|
BEGIN
|
||||||
|
UPDATE tickets.inc t
|
||||||
|
SET geom = COALESCE(loc.geom, gc.geom),
|
||||||
|
geo_source = CASE WHEN loc.geom IS NOT NULL THEN 'location'
|
||||||
|
WHEN gc.geom IS NOT NULL THEN 'cluster' ELSE 'none' END
|
||||||
|
FROM tickets.inc base
|
||||||
|
LEFT JOIN tickets.geo_locations loc
|
||||||
|
ON loc.query_key = tickets.norm_cluster(base.raw->>'location_name') AND loc.geom IS NOT NULL
|
||||||
|
LEFT JOIN tickets.geo_clusters gc
|
||||||
|
ON gc.cluster_key = tickets.norm_cluster(base.raw->>'cluster') AND gc.geom IS NOT NULL
|
||||||
|
WHERE t.ticket_id = base.ticket_id
|
||||||
|
AND t.geo_source IS DISTINCT FROM 'feed'
|
||||||
|
AND (t.geom IS DISTINCT FROM COALESCE(loc.geom, gc.geom)
|
||||||
|
OR t.geo_source IS DISTINCT FROM CASE WHEN loc.geom IS NOT NULL THEN 'location'
|
||||||
|
WHEN gc.geom IS NOT NULL THEN 'cluster' ELSE 'none' END);
|
||||||
|
GET DIAGNOSTICS n = ROW_COUNT;
|
||||||
|
|
||||||
|
UPDATE tickets.crq t
|
||||||
|
SET geom = COALESCE(loc.geom, gc.geom),
|
||||||
|
geo_source = CASE WHEN loc.geom IS NOT NULL THEN 'location'
|
||||||
|
WHEN gc.geom IS NOT NULL THEN 'cluster' ELSE 'none' END
|
||||||
|
FROM tickets.crq base
|
||||||
|
LEFT JOIN tickets.geo_locations loc
|
||||||
|
ON loc.query_key = tickets.norm_cluster(base.raw->>'location_name') AND loc.geom IS NOT NULL
|
||||||
|
LEFT JOIN tickets.geo_clusters gc
|
||||||
|
ON gc.cluster_key = tickets.norm_cluster(base.raw->>'cluster') AND gc.geom IS NOT NULL
|
||||||
|
WHERE t.ticket_id = base.ticket_id
|
||||||
|
AND t.geo_source IS DISTINCT FROM 'feed'
|
||||||
|
AND (t.geom IS DISTINCT FROM COALESCE(loc.geom, gc.geom)
|
||||||
|
OR t.geo_source IS DISTINCT FROM CASE WHEN loc.geom IS NOT NULL THEN 'location'
|
||||||
|
WHEN gc.geom IS NOT NULL THEN 'cluster' ELSE 'none' END);
|
||||||
|
GET DIAGNOSTICS m = ROW_COUNT;
|
||||||
|
RETURN n + m;
|
||||||
|
END $fn$;
|
||||||
|
|
||||||
|
-- ── indexes ───────────────────────────────────────────────────────────────────
|
||||||
|
CREATE INDEX IF NOT EXISTS ix_inc_status_raw ON tickets.inc ((raw->>'normalized_status'));
|
||||||
|
CREATE INDEX IF NOT EXISTS ix_inc_actionable_raw ON tickets.inc (((raw->>'is_actionable')::boolean))
|
||||||
|
WHERE (raw->>'is_actionable')::boolean;
|
||||||
|
CREATE INDEX IF NOT EXISTS ix_inc_cluster_raw ON tickets.inc (tickets.norm_cluster(raw->>'cluster'));
|
||||||
|
CREATE INDEX IF NOT EXISTS ix_inc_loc_raw ON tickets.inc (tickets.norm_cluster(raw->>'location_name'));
|
||||||
|
CREATE INDEX IF NOT EXISTS ix_inc_geom ON tickets.inc USING gist (geom);
|
||||||
|
CREATE INDEX IF NOT EXISTS ix_crq_status_raw ON tickets.crq ((raw->>'normalized_status'));
|
||||||
|
CREATE INDEX IF NOT EXISTS ix_crq_actionable_raw ON tickets.crq (((raw->>'is_actionable')::boolean))
|
||||||
|
WHERE (raw->>'is_actionable')::boolean;
|
||||||
|
CREATE INDEX IF NOT EXISTS ix_crq_cluster_raw ON tickets.crq (tickets.norm_cluster(raw->>'cluster'));
|
||||||
|
CREATE INDEX IF NOT EXISTS ix_crq_loc_raw ON tickets.crq (tickets.norm_cluster(raw->>'location_name'));
|
||||||
|
CREATE INDEX IF NOT EXISTS ix_crq_geom ON tickets.crq USING gist (geom);
|
||||||
|
|
||||||
|
-- ── read function (GeoJSON; consumed by dashboard_api GET /webhook/tickets) ──
|
||||||
|
CREATE OR REPLACE FUNCTION reporting.fn_tickets_for_map(
|
||||||
|
p_service_type text DEFAULT NULL,
|
||||||
|
p_status text DEFAULT NULL,
|
||||||
|
p_open_only boolean DEFAULT true
|
||||||
|
)
|
||||||
|
RETURNS jsonb LANGUAGE plpgsql STABLE AS $fn$
|
||||||
|
DECLARE v_result jsonb;
|
||||||
|
BEGIN
|
||||||
|
p_service_type := lower(NULLIF(p_service_type, ''));
|
||||||
|
p_status := NULLIF(p_status, '');
|
||||||
|
WITH filtered AS (
|
||||||
|
SELECT 'inc'::text AS service_type, raw, geom, geo_source FROM tickets.inc
|
||||||
|
WHERE geom IS NOT NULL
|
||||||
|
AND (p_service_type IS NULL OR p_service_type = 'inc')
|
||||||
|
AND (p_status IS NULL OR raw->>'normalized_status' = p_status)
|
||||||
|
AND (NOT p_open_only OR (raw->>'is_actionable')::boolean IS TRUE)
|
||||||
|
UNION ALL
|
||||||
|
SELECT 'crq'::text AS service_type, raw, geom, geo_source FROM tickets.crq
|
||||||
|
WHERE geom IS NOT NULL
|
||||||
|
AND (p_service_type IS NULL OR p_service_type = 'crq')
|
||||||
|
AND (p_status IS NULL OR raw->>'normalized_status' = p_status)
|
||||||
|
AND (NOT p_open_only OR (raw->>'is_actionable')::boolean IS TRUE)
|
||||||
|
)
|
||||||
|
SELECT jsonb_build_object(
|
||||||
|
'summary', jsonb_build_object(
|
||||||
|
'ticket_count', COUNT(*),
|
||||||
|
'inc', COUNT(*) FILTER (WHERE service_type = 'inc'),
|
||||||
|
'crq', COUNT(*) FILTER (WHERE service_type = 'crq'),
|
||||||
|
'open', COUNT(*) FILTER (WHERE (raw->>'is_actionable')::boolean IS TRUE),
|
||||||
|
'by_status', (SELECT jsonb_object_agg(s, c)
|
||||||
|
FROM (SELECT raw->>'normalized_status' AS s, COUNT(*) AS c
|
||||||
|
FROM filtered GROUP BY raw->>'normalized_status') z)
|
||||||
|
),
|
||||||
|
'geojson', jsonb_build_object(
|
||||||
|
'type', 'FeatureCollection',
|
||||||
|
'features', COALESCE(jsonb_agg(
|
||||||
|
jsonb_build_object(
|
||||||
|
'type', 'Feature',
|
||||||
|
'properties', jsonb_build_object(
|
||||||
|
'ticket_id', raw->>'ticket_id',
|
||||||
|
'service_type', service_type,
|
||||||
|
'status', raw->>'normalized_status',
|
||||||
|
'raw_status', raw->>'raw_status',
|
||||||
|
'cluster', raw->>'cluster',
|
||||||
|
'region', raw->>'region',
|
||||||
|
'location_name', raw->>'location_name',
|
||||||
|
'department', raw->>'department',
|
||||||
|
'owner', raw->>'owner',
|
||||||
|
'assigned_team', raw->>'assigned_team',
|
||||||
|
'sla_status', raw->>'sla_status',
|
||||||
|
'is_actionable', (raw->>'is_actionable')::boolean,
|
||||||
|
'geo_source', geo_source,
|
||||||
|
'created_at', raw->>'created_at_service',
|
||||||
|
'scheduled_at', raw->>'scheduled_at'
|
||||||
|
),
|
||||||
|
'geometry', ST_AsGeoJSON(geom)::jsonb
|
||||||
|
)
|
||||||
|
), '[]'::jsonb)
|
||||||
|
)
|
||||||
|
) INTO v_result FROM filtered;
|
||||||
|
RETURN v_result;
|
||||||
|
END $fn$;
|
||||||
|
|
||||||
|
COMMENT ON FUNCTION reporting.fn_tickets_for_map(text, text, boolean) IS
|
||||||
|
'INC/CRQ tickets (tickets.inc + tickets.crq, raw-jsonb-first) as GeoJSON. fleettickets 01.';
|
||||||
|
|
||||||
|
-- ── grants (guarded: roles may not exist on a fresh DB) ───────────────────────
|
||||||
|
DO $grants$
|
||||||
|
BEGIN
|
||||||
|
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'tracksolid_owner') THEN
|
||||||
|
GRANT USAGE, CREATE ON SCHEMA tickets TO tracksolid_owner;
|
||||||
|
GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA tickets TO tracksolid_owner;
|
||||||
|
GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA tickets TO tracksolid_owner;
|
||||||
|
END IF;
|
||||||
|
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'dashboard_ro') THEN
|
||||||
|
GRANT USAGE ON SCHEMA tickets TO dashboard_ro;
|
||||||
|
GRANT SELECT ON tickets.inc, tickets.crq, tickets.geo_clusters, tickets.geo_locations TO dashboard_ro;
|
||||||
|
GRANT EXECUTE ON FUNCTION reporting.fn_tickets_for_map(text, text, boolean) TO dashboard_ro;
|
||||||
|
END IF;
|
||||||
|
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'grafana_ro') THEN
|
||||||
|
GRANT USAGE ON SCHEMA tickets TO grafana_ro;
|
||||||
|
GRANT SELECT ON tickets.inc, tickets.crq, tickets.geo_clusters, tickets.geo_locations TO grafana_ro;
|
||||||
|
GRANT EXECUTE ON FUNCTION reporting.fn_tickets_for_map(text, text, boolean) TO grafana_ro;
|
||||||
|
END IF;
|
||||||
|
END $grants$;
|
||||||
22
pyproject.toml
Normal file
22
pyproject.toml
Normal file
|
|
@ -0,0 +1,22 @@
|
||||||
|
[project]
|
||||||
|
name = "fleettickets"
|
||||||
|
version = "0.1.0"
|
||||||
|
description = "Field-ops INC/CRQ ticket ingestion, geocoding, and read-schema for the FleetOps Tickets map"
|
||||||
|
requires-python = ">=3.12"
|
||||||
|
dependencies = [
|
||||||
|
"psycopg2-binary>=2.9.9", # DB driver
|
||||||
|
"requests>=2.32.3", # geocoder HTTP
|
||||||
|
]
|
||||||
|
|
||||||
|
[project.optional-dependencies]
|
||||||
|
dev = [
|
||||||
|
"ruff>=0.4",
|
||||||
|
]
|
||||||
|
|
||||||
|
[tool.uv]
|
||||||
|
managed = true
|
||||||
|
|
||||||
|
[tool.ruff]
|
||||||
|
target-version = "py312"
|
||||||
|
line-length = 100
|
||||||
|
lint.select = ["E", "W", "F", "B", "UP"]
|
||||||
56
run_migrations.py
Normal file
56
run_migrations.py
Normal file
|
|
@ -0,0 +1,56 @@
|
||||||
|
"""
|
||||||
|
run_migrations.py — fleettickets · apply SQL migrations in order.
|
||||||
|
|
||||||
|
Applies migrations/*.sql (lexical order) against DATABASE_URL, tracking applied
|
||||||
|
files in tickets.schema_migrations. Migrations are idempotent, so re-running is
|
||||||
|
safe. Run: `python run_migrations.py`.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import glob
|
||||||
|
import os
|
||||||
|
|
||||||
|
import psycopg2
|
||||||
|
|
||||||
|
MIG_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "migrations")
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
dsn = os.environ.get("DATABASE_URL")
|
||||||
|
if not dsn:
|
||||||
|
raise SystemExit("DATABASE_URL is not set")
|
||||||
|
conn = psycopg2.connect(dsn)
|
||||||
|
conn.autocommit = False
|
||||||
|
try:
|
||||||
|
with conn.cursor() as cur:
|
||||||
|
cur.execute("CREATE SCHEMA IF NOT EXISTS tickets")
|
||||||
|
cur.execute(
|
||||||
|
"CREATE TABLE IF NOT EXISTS tickets.schema_migrations "
|
||||||
|
"(filename text PRIMARY KEY, applied_at timestamptz NOT NULL DEFAULT now())"
|
||||||
|
)
|
||||||
|
conn.commit()
|
||||||
|
cur.execute("SELECT filename FROM tickets.schema_migrations")
|
||||||
|
applied = {r[0] for r in cur.fetchall()}
|
||||||
|
|
||||||
|
for path in sorted(glob.glob(os.path.join(MIG_DIR, "*.sql"))):
|
||||||
|
fn = os.path.basename(path)
|
||||||
|
if fn in applied:
|
||||||
|
print(f" skip {fn}")
|
||||||
|
continue
|
||||||
|
print(f" apply {fn}")
|
||||||
|
with open(path, encoding="utf-8") as f:
|
||||||
|
cur.execute(f.read())
|
||||||
|
cur.execute(
|
||||||
|
"INSERT INTO tickets.schema_migrations (filename) VALUES (%s) "
|
||||||
|
"ON CONFLICT DO NOTHING",
|
||||||
|
(fn,),
|
||||||
|
)
|
||||||
|
conn.commit()
|
||||||
|
print("migrations up to date.")
|
||||||
|
finally:
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
54
shared.py
Normal file
54
shared.py
Normal file
|
|
@ -0,0 +1,54 @@
|
||||||
|
"""
|
||||||
|
shared.py — fleettickets · minimal DB + helper utilities.
|
||||||
|
|
||||||
|
Self-contained replacement for the handful of helpers the loader used to import
|
||||||
|
from the tracksolid repo's `ts_shared_rev`, so fleettickets stands alone.
|
||||||
|
Connection comes from the DATABASE_URL env var (points at the shared
|
||||||
|
`tracksolid_db`, where the `tickets` schema lives).
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
from contextlib import contextmanager
|
||||||
|
from typing import Any, Optional
|
||||||
|
|
||||||
|
import psycopg2
|
||||||
|
|
||||||
|
|
||||||
|
def get_logger(name: str) -> logging.Logger:
|
||||||
|
logger = logging.getLogger(f"fleettickets.{name}")
|
||||||
|
if not logger.handlers:
|
||||||
|
h = logging.StreamHandler()
|
||||||
|
h.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(name)s — %(message)s",
|
||||||
|
datefmt="%Y-%m-%d %H:%M:%S"))
|
||||||
|
logger.addHandler(h)
|
||||||
|
logger.setLevel(os.getenv("LOG_LEVEL", "INFO"))
|
||||||
|
return logger
|
||||||
|
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def get_conn():
|
||||||
|
"""DB connection context manager. Auto-commits on success, rolls back on error."""
|
||||||
|
dsn = os.environ.get("DATABASE_URL")
|
||||||
|
if not dsn:
|
||||||
|
raise RuntimeError("DATABASE_URL is not set")
|
||||||
|
conn = psycopg2.connect(dsn)
|
||||||
|
try:
|
||||||
|
conn.autocommit = False
|
||||||
|
yield conn
|
||||||
|
conn.commit()
|
||||||
|
except Exception:
|
||||||
|
conn.rollback()
|
||||||
|
raise
|
||||||
|
finally:
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
def clean(v: Any) -> Optional[str]:
|
||||||
|
"""Trimmed string, or None if empty/None."""
|
||||||
|
if v is None:
|
||||||
|
return None
|
||||||
|
s = str(v).strip()
|
||||||
|
return s if s != "" else None
|
||||||
Loading…
Reference in a new issue