diff --git a/.gitignore b/.gitignore index e1ef631..1cc95d1 100644 --- a/.gitignore +++ b/.gitignore @@ -3,7 +3,9 @@ __pycache__/ *.pyc .venv/ uv.lock -*.json -!.*.json +build/ +*.egg-info/ +# local/secret config (e.g. .claude/settings.local.json) — but allow real .json fixtures +*.local.json *.csv .DS_Store diff --git a/Dockerfile b/Dockerfile index d922fc6..d91eb92 100644 --- a/Dockerfile +++ b/Dockerfile @@ -16,10 +16,9 @@ RUN apt-get update \ WORKDIR /app -# Dependencies (mirror pyproject.toml) — separate layer for build caching. -RUN pip install "psycopg2-binary>=2.9.9" "requests>=2.32.3" "boto3>=1.34" - +# Install from pyproject.toml (single source of truth for deps — no manual mirror). COPY . . +RUN pip install . # Keep the container alive so Coolify Scheduled Tasks can exec into it. CMD ["tail", "-f", "/dev/null"] diff --git a/import_tickets.py b/import_tickets.py index d8263d6..353048a 100644 --- a/import_tickets.py +++ b/import_tickets.py @@ -123,7 +123,10 @@ def _ts_from_key(key: str) -> datetime | None: m = _CSV_KEY_RE.match(key) if not m: return None - return datetime.strptime(m.group(1), "%Y-%m-%dT%H-%M-%S").replace(tzinfo=_EAT) + try: # regex shape can match an impossible date (e.g. 9999-99-99T…) — don't crash the sort + return datetime.strptime(m.group(1), "%Y-%m-%dT%H-%M-%S").replace(tzinfo=_EAT) + except ValueError: + return None def _list_inc_csvs(s3) -> list[tuple[str, str]]: @@ -191,28 +194,30 @@ def _prepare(row: dict) -> dict: # ── upsert (raw-first) ──────────────────────────────────────────────────────── -def _record_meta(meta: dict, records_ingested: int) -> None: - """Upsert the INC snapshot metadata (powers map freshness + holds source_etag).""" - with get_conn() as conn: - with conn.cursor() as cur: - cur.execute( - """INSERT INTO tickets.import_meta - (dataset, export_type, exported_at, snapshot_date, source_schema, - source_table, row_count, records_ingested, n8n_execution_id, metadata, - ingested_at) - VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, now()) - ON CONFLICT (dataset) DO UPDATE - SET export_type = EXCLUDED.export_type, exported_at = EXCLUDED.exported_at, - snapshot_date = EXCLUDED.snapshot_date, source_schema = EXCLUDED.source_schema, - source_table = EXCLUDED.source_table, row_count = EXCLUDED.row_count, - records_ingested = EXCLUDED.records_ingested, - n8n_execution_id = EXCLUDED.n8n_execution_id, metadata = EXCLUDED.metadata, - ingested_at = now()""", - (_DATASET, clean(meta.get("export_type")), clean(meta.get("exported_at")), - clean(meta.get("snapshot_date")), clean(meta.get("source_schema")), - clean(meta.get("source_table")), meta.get("row_count"), records_ingested, - clean(meta.get("n8n_execution_id")), psycopg2.extras.Json(meta)), - ) +def _record_meta(cur, meta: dict, records_ingested: int) -> None: + """Upsert the INC snapshot metadata (powers map freshness + holds source_etag). + + Runs on the caller's cursor so the row upsert and the meta write commit + together — a half-written state (rows in, meta stale) breaks skip-if-unchanged. + """ + cur.execute( + """INSERT INTO tickets.import_meta + (dataset, export_type, exported_at, snapshot_date, source_schema, + source_table, row_count, records_ingested, n8n_execution_id, metadata, + ingested_at) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, now()) + ON CONFLICT (dataset) DO UPDATE + SET export_type = EXCLUDED.export_type, exported_at = EXCLUDED.exported_at, + snapshot_date = EXCLUDED.snapshot_date, source_schema = EXCLUDED.source_schema, + source_table = EXCLUDED.source_table, row_count = EXCLUDED.row_count, + records_ingested = EXCLUDED.records_ingested, + n8n_execution_id = EXCLUDED.n8n_execution_id, metadata = EXCLUDED.metadata, + ingested_at = now()""", + (_DATASET, clean(meta.get("export_type")), clean(meta.get("exported_at")), + clean(meta.get("snapshot_date")), clean(meta.get("source_schema")), + clean(meta.get("source_table")), meta.get("row_count"), records_ingested, + clean(meta.get("n8n_execution_id")), psycopg2.extras.Json(meta)), + ) def upsert(rows: list[dict], apply: bool, meta: dict | None = None) -> int: @@ -232,7 +237,8 @@ def upsert(rows: list[dict], apply: bool, meta: dict | None = None) -> int: "ON CONFLICT (ticket_id) DO UPDATE SET raw = EXCLUDED.raw, ingested_at = now()", payload, page_size=500, ) - _record_meta(meta, len(payload)) + # same transaction as the upsert: rows + snapshot meta commit atomically + _record_meta(cur, meta, len(payload)) log.info("upserted %d rows into %s", len(payload), _TABLE) return len(payload) @@ -296,29 +302,51 @@ def ingest(args) -> None: # ── place extraction (strip network codes, keep the real place) ─────────────── # Leading site-code prefixes (NW_, CO_, ADR_MNT_, COAST_, …) — applied repeatedly. _PREFIX_RE = re.compile(r"^(?:NW|NE|NM|SW|SE|CO|COAST|ADR|MNT|CMT|DR|NAIROBI|FIBER\w*)[\s_]+") +# 'NW' is the one site-code that the source also glues straight onto the place with +# no separator (NWKIAMBU, NWRIDGE, NWTHE — ~1.7k rows in a single snapshot). Safe to +# split because no place/word starts with "NW"; the other codes (CO/NE/SE/DR…) begin +# real words (COAST, NEW, SEASONS, DRIVE) so we only strip THOSE when delimited above. +_GLUED_NW_RE = re.compile(r"^NW(?=[A-Z])") # Inline network/work-order codes to drop wherever they appear. _CODE_RE = re.compile( r"\b(?:SDUS|SDU|MDUS|MDU|FDT\s*\d*|AP|CLUSTER\s*\d*[A-Z]?|PHASE\s*\d+|CL\s*\d+|MNT|SITE|ADR)\b" ) +# Trailing '-' after the final hyphen: a unit/instruction code, not a place. +# Dropped only when it LOOKS like one — a unit number (37, F32, 3C, 302), a short +# code (<=3 chars: E, NB, KKK), or an instruction phrase (CALL ON ARRIVAL, TBC, NA). +# A real word tail (…-MALL) is kept. +_UNIT_TAIL_RE = re.compile(r"^[A-Z]{0,2}\d+[A-Z]{0,3}$") +_TAIL_INSTRUCTION_TOKENS = frozenset({ + "CALL", "TO", "NA", "NB", "TBC", "NULL", "NONE", "NIL", "OOO", + "OBT", "PENDING", "CONFIRM", "CHECK", "CLIENT", "ON", +}) def extract_place(location_name: str | None) -> str: """Pull the human place/landmark out of a coded location_name string. e.g. 'NW_RUIRU KAMAKIS_DEEP EAST APARTMENT-37' -> 'RUIRU KAMAKIS DEEP EAST APARTMENT' + 'NWKIAMBU_KIRIGITI_MWANJA APARTMENTS-TBC' -> 'KIAMBU KIRIGITI MWANJA APARTMENTS' """ s = (location_name or "").upper().strip() if not s: return "" - # drop the trailing '-' segment (e.g. -37, -CALL CLIENT, -F32) + # drop the trailing '-' only when it's a unit/instruction code, not a + # real word (so '…-37'/'…-CALL ON ARRIVAL' drop but '…-MALL' is kept) if "-" in s: - s = s.rsplit("-", 1)[0] + head, _, tail = s.rpartition("-") + head, tail = head.strip(), tail.strip() + first = tail.split()[0] if tail else "" + if head and (not tail or len(tail) <= 3 or _UNIT_TAIL_RE.match(tail) + or first in _TAIL_INSTRUCTION_TOKENS): + s = head s = s.replace("_", " ") - # strip leading site-code prefixes (may be stacked: ADR MNT KAHAWA…) + # strip leading site-code prefixes (may be stacked: ADR MNT KAHAWA…; or glued: NWKIAMBU) prev = None while prev != s: prev = s s = _PREFIX_RE.sub("", s).strip() + s = _GLUED_NW_RE.sub("", s).strip() s = _CODE_RE.sub(" ", s) s = re.sub(r"\s+", " ", s).strip(" ,-") return s diff --git a/migrations/03_inc_columns.sql b/migrations/03_inc_columns.sql index 3948843..8649977 100644 --- a/migrations/03_inc_columns.sql +++ b/migrations/03_inc_columns.sql @@ -17,6 +17,11 @@ SET search_path = tickets, public; -- EAT (Africa/Nairobi) text -> timestamptz; IMMUTABLE so it can back generated cols. +-- FOOTGUN: 'AT TIME ZONE' resolves against the OS tzdata, so IMMUTABLE is a slight +-- lie. It's safe here because Kenya is fixed at UTC+3 (no DST, no pending changes), +-- so the result is genuinely invariant. If that ever changes, a tzdata update will +-- NOT recompute the STORED generated columns below — they'd need a manual rebuild +-- (PG17+: ALTER TABLE tickets.inc ALTER COLUMN SET EXPRESSION AS (...same...)). CREATE OR REPLACE FUNCTION tickets.eat_ts(p text) RETURNS timestamptz LANGUAGE sql IMMUTABLE PARALLEL SAFE AS $fn$ SELECT (NULLIF(p, '')::timestamp) AT TIME ZONE 'Africa/Nairobi' $fn$; diff --git a/pyproject.toml b/pyproject.toml index 4050e51..952f3c7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,3 +1,7 @@ +[build-system] +requires = ["setuptools>=61"] +build-backend = "setuptools.build_meta" + [project] name = "fleettickets" version = "0.1.0" @@ -14,6 +18,11 @@ dev = [ "ruff>=0.4", ] +# Flat-module project (no package dir) — list the top-level modules explicitly so +# `pip install .` works (the Docker image installs the project to pull its deps). +[tool.setuptools] +py-modules = ["import_tickets", "shared", "run_migrations"] + [tool.uv] managed = true