Compare commits

..

No commits in common. "e71c8914f18b7ba5d043be912e94e2d2d47a05da" and "764dee986f3dc689125e6bbe7f9bdef3f3954072" have entirely different histories.

6 changed files with 48 additions and 312 deletions

6
.gitignore vendored
View file

@ -3,9 +3,7 @@ __pycache__/
*.pyc
.venv/
uv.lock
build/
*.egg-info/
# local/secret config (e.g. .claude/settings.local.json) — but allow real .json fixtures
*.local.json
*.json
!.*.json
*.csv
.DS_Store

View file

@ -16,9 +16,10 @@ RUN apt-get update \
WORKDIR /app
# Install from pyproject.toml (single source of truth for deps — no manual mirror).
# Dependencies (mirror pyproject.toml) — separate layer for build caching.
RUN pip install "psycopg2-binary>=2.9.9" "requests>=2.32.3" "boto3>=1.34"
COPY . .
RUN pip install .
# Keep the container alive so Coolify Scheduled Tasks can exec into it.
CMD ["tail", "-f", "/dev/null"]

View file

@ -123,10 +123,7 @@ def _ts_from_key(key: str) -> datetime | None:
m = _CSV_KEY_RE.match(key)
if not m:
return None
try: # regex shape can match an impossible date (e.g. 9999-99-99T…) — don't crash the sort
return datetime.strptime(m.group(1), "%Y-%m-%dT%H-%M-%S").replace(tzinfo=_EAT)
except ValueError:
return None
def _list_inc_csvs(s3) -> list[tuple[str, str]]:
@ -194,12 +191,10 @@ def _prepare(row: dict) -> dict:
# ── upsert (raw-first) ────────────────────────────────────────────────────────
def _record_meta(cur, meta: dict, records_ingested: int) -> None:
"""Upsert the INC snapshot metadata (powers map freshness + holds source_etag).
Runs on the caller's cursor so the row upsert and the meta write commit
together a half-written state (rows in, meta stale) breaks skip-if-unchanged.
"""
def _record_meta(meta: dict, records_ingested: int) -> None:
"""Upsert the INC snapshot metadata (powers map freshness + holds source_etag)."""
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""INSERT INTO tickets.import_meta
(dataset, export_type, exported_at, snapshot_date, source_schema,
@ -237,8 +232,7 @@ def upsert(rows: list[dict], apply: bool, meta: dict | None = None) -> int:
"ON CONFLICT (ticket_id) DO UPDATE SET raw = EXCLUDED.raw, ingested_at = now()",
payload, page_size=500,
)
# same transaction as the upsert: rows + snapshot meta commit atomically
_record_meta(cur, meta, len(payload))
_record_meta(meta, len(payload))
log.info("upserted %d rows into %s", len(payload), _TABLE)
return len(payload)
@ -302,51 +296,29 @@ def ingest(args) -> None:
# ── place extraction (strip network codes, keep the real place) ───────────────
# Leading site-code prefixes (NW_, CO_, ADR_MNT_, COAST_, …) — applied repeatedly.
_PREFIX_RE = re.compile(r"^(?:NW|NE|NM|SW|SE|CO|COAST|ADR|MNT|CMT|DR|NAIROBI|FIBER\w*)[\s_]+")
# 'NW' is the one site-code that the source also glues straight onto the place with
# no separator (NWKIAMBU, NWRIDGE, NWTHE — ~1.7k rows in a single snapshot). Safe to
# split because no place/word starts with "NW"; the other codes (CO/NE/SE/DR…) begin
# real words (COAST, NEW, SEASONS, DRIVE) so we only strip THOSE when delimited above.
_GLUED_NW_RE = re.compile(r"^NW(?=[A-Z])")
# Inline network/work-order codes to drop wherever they appear.
_CODE_RE = re.compile(
r"\b(?:SDUS|SDU|MDUS|MDU|FDT\s*\d*|AP|CLUSTER\s*\d*[A-Z]?|PHASE\s*\d+|CL\s*\d+|MNT|SITE|ADR)\b"
)
# Trailing '-<segment>' after the final hyphen: a unit/instruction code, not a place.
# Dropped only when it LOOKS like one — a unit number (37, F32, 3C, 302), a short
# code (<=3 chars: E, NB, KKK), or an instruction phrase (CALL ON ARRIVAL, TBC, NA).
# A real word tail (…-MALL) is kept.
_UNIT_TAIL_RE = re.compile(r"^[A-Z]{0,2}\d+[A-Z]{0,3}$")
_TAIL_INSTRUCTION_TOKENS = frozenset({
"CALL", "TO", "NA", "NB", "TBC", "NULL", "NONE", "NIL", "OOO",
"OBT", "PENDING", "CONFIRM", "CHECK", "CLIENT", "ON",
})
def extract_place(location_name: str | None) -> str:
"""Pull the human place/landmark out of a coded location_name string.
e.g. 'NW_RUIRU KAMAKIS_DEEP EAST APARTMENT-37' -> 'RUIRU KAMAKIS DEEP EAST APARTMENT'
'NWKIAMBU_KIRIGITI_MWANJA APARTMENTS-TBC' -> 'KIAMBU KIRIGITI MWANJA APARTMENTS'
"""
s = (location_name or "").upper().strip()
if not s:
return ""
# drop the trailing '-<segment>' only when it's a unit/instruction code, not a
# real word (so '…-37'/'…-CALL ON ARRIVAL' drop but '…-MALL' is kept)
# drop the trailing '-<unit/instruction>' segment (e.g. -37, -CALL CLIENT, -F32)
if "-" in s:
head, _, tail = s.rpartition("-")
head, tail = head.strip(), tail.strip()
first = tail.split()[0] if tail else ""
if head and (not tail or len(tail) <= 3 or _UNIT_TAIL_RE.match(tail)
or first in _TAIL_INSTRUCTION_TOKENS):
s = head
s = s.rsplit("-", 1)[0]
s = s.replace("_", " ")
# strip leading site-code prefixes (may be stacked: ADR MNT KAHAWA…; or glued: NWKIAMBU)
# strip leading site-code prefixes (may be stacked: ADR MNT KAHAWA…)
prev = None
while prev != s:
prev = s
s = _PREFIX_RE.sub("", s).strip()
s = _GLUED_NW_RE.sub("", s).strip()
s = _CODE_RE.sub(" ", s)
s = re.sub(r"\s+", " ", s).strip(" ,-")
return s
@ -357,38 +329,6 @@ def compose_query(location_name: str | None, cluster: str | None, region: str |
return ", ".join(dict.fromkeys(parts)) # de-dupe while preserving order
def compose_queries(location_name: str | None, cluster: str | None,
region: str | None) -> list[str]:
"""Ordered geocode candidates, most → least specific (two-pass estate fallback).
Building-level location_names (e.g. 'KAHAWA WENDANI ALVO HOUSE') aren't in OSM, so
the precise query 404s. We then fall back to the estate (leading tokens of the
place) each still constrained to the cluster viewbox + distance check by the
caller, so a coarse hit lands in the right neighbourhood (tighter than the bare
cluster centroid). We deliberately do NOT add a pure-cluster candidate: that would
just reproduce the cluster centroid while mislabelling it geo_source='location';
a truly unmatchable ticket should keep its honest cluster-centroid fallback.
e.g. 'KAHAWA WENDANI ALVO HOUSE' -> ['KAHAWA WENDANI ALVO HOUSE, WENDANI, nairobi,
Kenya', 'KAHAWA WENDANI, nairobi, Kenya', 'KAHAWA, nairobi, Kenya']
"""
region_part, cluster_part = clean(region), clean(cluster)
place = extract_place(location_name)
toks = place.split()
out: list[str] = []
def add(*parts: str | None) -> None:
q = ", ".join(dict.fromkeys([p for p in parts if p] + ["Kenya"]))
if q and q != "Kenya" and q not in out:
out.append(q)
add(place, cluster_part, region_part) # 1. full precise
if len(toks) > 2:
add(" ".join(toks[:2]), region_part) # 2. estate (leading 2 tokens)
if len(toks) > 1:
add(toks[0], region_part) # 3. leading token (broad estate)
return out
# ── keyed geocoder ────────────────────────────────────────────────────────────
def _throttle() -> None:
global _last_geocode_at
@ -528,32 +468,26 @@ def geocode_locations(apply: bool) -> None:
log.info("%d actionable-INC locations to geocode (provider=%s)", len(todo), _PROVIDER)
if not apply:
for key, loc, cluster, region, clat, clng in todo[:50]:
log.info(" %s -> %s", key, " | ".join(compose_queries(loc, cluster, region)))
log.info(" %s -> %r", key, compose_query(loc, cluster, region))
return
written = missed = coarse = 0
written = rejected = 0
for key, loc, cluster, region, clat, clng in todo:
query = compose_query(loc, cluster, region)
viewbox = None
if clat is not None and clng is not None:
viewbox = (clng - _VIEWBOX_DEG, clat - _VIEWBOX_DEG, clng + _VIEWBOX_DEG, clat + _VIEWBOX_DEG)
# two-pass: precise → estate → cluster; accept the FIRST in-range hit. A wrong-area
# match (> MAX_KM from the cluster centroid) is skipped so we try a coarser query.
hit = used = None
for i, cand in enumerate(compose_queries(loc, cluster, region)):
g = geocode(cand, viewbox)
if not g:
continue
lat, lng, conf = g
if (clat is not None and clng is not None
and _haversine_km(lat, lng, clat, clng) > _MAX_KM_FROM_CLUSTER):
continue
hit, used = g, cand
if i > 0:
coarse += 1
break
hit = geocode(query, viewbox)
if not hit:
missed += 1 # no match even coarsely — keeps cluster-centroid fallback
continue
lat, lng, conf = hit
# distance sanity: a result far from the cluster centroid is a wrong-city
# match — drop it so the ticket keeps the cluster-centroid fallback.
if clat is not None and clng is not None:
km = _haversine_km(lat, lng, clat, clng)
if km > _MAX_KM_FROM_CLUSTER:
rejected += 1
log.info(" reject (%.0f km from cluster): %s", km, query)
continue
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
@ -564,14 +498,13 @@ def geocode_locations(apply: bool) -> None:
SET location_name = EXCLUDED.location_name, cluster = EXCLUDED.cluster,
region = EXCLUDED.region, query = EXCLUDED.query, lat = EXCLUDED.lat,
lng = EXCLUDED.lng, confidence = EXCLUDED.confidence, provider = EXCLUDED.provider""",
(key, loc, cluster, region, used, lat, lng, conf, _PROVIDER),
(key, loc, cluster, region, query, lat, lng, conf, _PROVIDER),
)
written += 1
log.info(" geocoded %s -> %.5f, %.5f", used, lat, lng)
log.info(" geocoded %s -> %.5f, %.5f", query, lat, lng)
n = _resolve()
log.info("locations: %d accepted (%d via estate/cluster fallback), %d unmatched; "
"re-resolved geom on %d tickets (unverified — review tickets.geo_locations)",
written, coarse, missed, n)
log.info("locations: %d accepted, %d rejected (too far); re-resolved geom on %d tickets "
"(unverified — review tickets.geo_locations)", written, rejected, n)
def _resolve() -> int:

View file

@ -17,11 +17,6 @@
SET search_path = tickets, public;
-- EAT (Africa/Nairobi) text -> timestamptz; IMMUTABLE so it can back generated cols.
-- FOOTGUN: 'AT TIME ZONE' resolves against the OS tzdata, so IMMUTABLE is a slight
-- lie. It's safe here because Kenya is fixed at UTC+3 (no DST, no pending changes),
-- so the result is genuinely invariant. If that ever changes, a tzdata update will
-- NOT recompute the STORED generated columns below — they'd need a manual rebuild
-- (PG17+: ALTER TABLE tickets.inc ALTER COLUMN <col> SET EXPRESSION AS (...same...)).
CREATE OR REPLACE FUNCTION tickets.eat_ts(p text)
RETURNS timestamptz LANGUAGE sql IMMUTABLE PARALLEL SAFE
AS $fn$ SELECT (NULLIF(p, '')::timestamp) AT TIME ZONE 'Africa/Nairobi' $fn$;

View file

@ -1,182 +0,0 @@
-- 12_inc_dashboard_by_owner.sql — fleettickets · closure-by-engineer analytics
-- ─────────────────────────────────────────────────────────────────────────────
-- CREATE OR REPLACE of reporting.fn_inc_dashboard (supersedes migration 09) to add
-- closure-by-engineer analytics for the FleetOps Tickets dashboard:
--
-- metrics.by_owner — over the windowed CLOSED set, who closed how many, grouped by
-- a CASE-NORMALIZED owner. The offline closing system emits the
-- same engineer in mixed case ('Elikana Mabonga' vs
-- 'ELIKANA MABONGA'); normalizing with initcap(lower(...)) merges
-- those duplicates (102 -> 58 distinct owners observed). Returns
-- an array of { owner, closed, breached, avg_mttr_min } sorted by
-- closed desc — a ready-to-render leaderboard.
--
-- The owner field on the open/closed GeoJSON features is normalized the SAME way, so
-- the leaderboard and the per-ticket drill-down (the closed features already carry
-- owner + closed_at) agree on casing.
--
-- Everything else is unchanged from migration 09. The dashboard_api route is a plain
-- passthrough, so no API change is needed — only this DB function. Idempotent.
-- ─────────────────────────────────────────────────────────────────────────────
SET search_path = tickets, public;
CREATE OR REPLACE FUNCTION reporting.fn_inc_dashboard(
p_cluster text DEFAULT NULL,
p_status text DEFAULT NULL,
p_window text DEFAULT 'today',
p_from timestamptz DEFAULT NULL,
p_to timestamptz DEFAULT NULL
)
RETURNS jsonb LANGUAGE plpgsql STABLE AS $fn$
DECLARE
v_now_eat timestamp;
v_from timestamptz;
v_to timestamptz;
v_preset text;
v_days numeric;
v_result jsonb;
BEGIN
p_cluster := NULLIF(p_cluster, '');
p_status := NULLIF(p_status, '');
v_now_eat := now() AT TIME ZONE 'Africa/Nairobi';
-- ── resolve the window ──────────────────────────────────────────────────────
IF p_from IS NOT NULL OR p_to IS NOT NULL THEN
v_preset := 'custom';
v_from := COALESCE(p_from, '-infinity'::timestamptz);
v_to := COALESCE(p_to, 'infinity'::timestamptz);
ELSE
v_preset := lower(COALESCE(NULLIF(p_window, ''), 'today'));
IF v_preset = 'week' THEN
v_from := date_trunc('week', v_now_eat) AT TIME ZONE 'Africa/Nairobi';
v_to := (date_trunc('week', v_now_eat) + interval '1 week') AT TIME ZONE 'Africa/Nairobi';
ELSIF v_preset = 'month' THEN
v_from := date_trunc('month', v_now_eat) AT TIME ZONE 'Africa/Nairobi';
v_to := (date_trunc('month', v_now_eat) + interval '1 month') AT TIME ZONE 'Africa/Nairobi';
ELSE
v_preset := 'today';
v_from := date_trunc('day', v_now_eat) AT TIME ZONE 'Africa/Nairobi';
v_to := (date_trunc('day', v_now_eat) + interval '1 day') AT TIME ZONE 'Africa/Nairobi';
END IF;
END IF;
IF v_from > '-infinity'::timestamptz AND v_to < 'infinity'::timestamptz THEN
v_days := GREATEST(EXTRACT(EPOCH FROM (v_to - v_from)) / 86400.0, 1);
ELSE
v_days := NULL; -- open-ended custom window → per-day average not meaningful
END IF;
-- ── build payload ───────────────────────────────────────────────────────────
WITH open_t AS (
SELECT * FROM tickets.inc_open_sla
WHERE (p_cluster IS NULL OR cluster = p_cluster)
AND (p_status IS NULL OR normalized_status = p_status)
),
closed_t AS (
SELECT ticket_id, normalized_status, cluster, region, location_name,
assigned_team, owner, closed_at, mttr, sla_status, geo_source, geom
FROM tickets.inc
WHERE NOT COALESCE(is_actionable, false)
AND closed_at IS NOT NULL
AND closed_at >= v_from AND closed_at < v_to
AND (p_cluster IS NULL OR cluster = p_cluster)
AND (p_status IS NULL OR normalized_status = p_status)
)
SELECT jsonb_build_object(
'window', jsonb_build_object('from', v_from, 'to', v_to, 'preset', v_preset),
'open', jsonb_build_object(
'type', 'FeatureCollection',
'features', COALESCE((
SELECT jsonb_agg(jsonb_build_object(
'type', 'Feature',
'properties', jsonb_build_object(
'ticket_id', ticket_id, 'normalized_status', normalized_status,
'cluster', cluster, 'region', region, 'location_name', location_name,
'assigned_team', assigned_team, 'owner', initcap(lower(NULLIF(owner, ''))),
'geo_source', geo_source,
'sla_state', sla_state, 'hours_open', hours_open),
'geometry', ST_AsGeoJSON(geom)::jsonb))
FROM open_t WHERE geom IS NOT NULL), '[]'::jsonb)
),
'closed', jsonb_build_object(
'type', 'FeatureCollection',
'features', COALESCE((
SELECT jsonb_agg(jsonb_build_object(
'type', 'Feature',
'properties', jsonb_build_object(
'ticket_id', ticket_id, 'normalized_status', normalized_status,
'cluster', cluster, 'region', region, 'location_name', location_name,
'assigned_team', assigned_team, 'owner', initcap(lower(NULLIF(owner, ''))),
'geo_source', geo_source,
'closed_at', closed_at, 'mttr', mttr, 'sla_status', sla_status),
'geometry', ST_AsGeoJSON(geom)::jsonb))
FROM closed_t WHERE geom IS NOT NULL), '[]'::jsonb)
),
'metrics', jsonb_build_object(
'open_now', (SELECT count(*) FROM open_t),
'closed_in_window', (SELECT count(*) FROM closed_t),
'sla', jsonb_build_object(
'open', (SELECT jsonb_build_object(
'breached', count(*) FILTER (WHERE sla_state = 'breached'),
'at_risk', count(*) FILTER (WHERE sla_state = 'at_risk'),
'ok', count(*) FILTER (WHERE sla_state = 'ok'),
'unknown', count(*) FILTER (WHERE sla_state = 'unknown')) FROM open_t),
'closed', (SELECT jsonb_build_object(
'compliant', count(*) FILTER (WHERE sla_status = 'Compliant'),
'breached', count(*) FILTER (WHERE sla_status = 'Breached')) FROM closed_t)
),
'by_status', COALESCE((SELECT jsonb_object_agg(s, c) FROM (
SELECT COALESCE(normalized_status, '(none)') AS s, count(*) AS c FROM (
SELECT normalized_status FROM open_t
UNION ALL SELECT normalized_status FROM closed_t) u GROUP BY 1) z), '{}'::jsonb),
'by_cluster', COALESCE((SELECT jsonb_object_agg(cl, c) FROM (
SELECT COALESCE(cluster, '(none)') AS cl, count(*) AS c FROM (
SELECT cluster FROM open_t
UNION ALL SELECT cluster FROM closed_t) u GROUP BY 1) z), '{}'::jsonb),
-- closures by engineer (CASE-NORMALIZED owner) — leaderboard for "who closed".
'by_owner', COALESCE((SELECT jsonb_agg(jsonb_build_object(
'owner', o, 'closed', c, 'breached', b, 'avg_mttr_min', a) ORDER BY c DESC, o)
FROM (
SELECT COALESCE(initcap(lower(NULLIF(owner, ''))), '(unattributed)') AS o,
count(*) AS c,
count(*) FILTER (WHERE sla_status = 'Breached') AS b,
round(avg(mttr) FILTER (WHERE mttr IS NOT NULL), 1) AS a
FROM closed_t GROUP BY 1) z), '[]'::jsonb),
'closure_rate', jsonb_build_object(
'per_day_avg', CASE WHEN v_days IS NULL THEN NULL
ELSE round((SELECT count(*) FROM closed_t)::numeric / v_days, 2) END,
'series', COALESCE((SELECT jsonb_agg(jsonb_build_object('day', d, 'count', c) ORDER BY d) FROM (
SELECT (closed_at AT TIME ZONE 'Africa/Nairobi')::date AS d, count(*) AS c
FROM closed_t GROUP BY 1) z), '[]'::jsonb)
),
'avg_mttr_min', (SELECT round(avg(mttr), 1) FROM closed_t WHERE mttr IS NOT NULL)
),
'freshness', (SELECT jsonb_object_agg(dataset, jsonb_build_object(
'export_type', export_type, 'exported_at', exported_at,
'records_ingested', records_ingested, 'ingested_at', ingested_at))
FROM tickets.import_meta)
) INTO v_result;
RETURN v_result;
END $fn$;
COMMENT ON FUNCTION reporting.fn_inc_dashboard(text, text, text, timestamptz, timestamptz) IS
'FleetOps INC operations dashboard: open (live) + closed (windowed) GeoJSON + ticket '
'metrics incl. by_owner closure leaderboard (case-normalized), filtered by '
'cluster/status/time (EAT). fleettickets 12 (supersedes 09).';
-- grants (guarded: roles may not exist on a fresh DB)
DO $grants$
BEGIN
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'dashboard_ro') THEN
GRANT EXECUTE ON FUNCTION reporting.fn_inc_dashboard(text, text, text, timestamptz, timestamptz) TO dashboard_ro;
END IF;
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'grafana_ro') THEN
GRANT EXECUTE ON FUNCTION reporting.fn_inc_dashboard(text, text, text, timestamptz, timestamptz) TO grafana_ro;
END IF;
END $grants$;

View file

@ -1,7 +1,3 @@
[build-system]
requires = ["setuptools>=61"]
build-backend = "setuptools.build_meta"
[project]
name = "fleettickets"
version = "0.1.0"
@ -18,11 +14,6 @@ dev = [
"ruff>=0.4",
]
# Flat-module project (no package dir) — list the top-level modules explicitly so
# `pip install .` works (the Docker image installs the project to pull its deps).
[tool.setuptools]
py-modules = ["import_tickets", "shared", "run_migrations"]
[tool.uv]
managed = true