diff --git a/.env.example b/.env.example index b0fd306..5de8686 100644 --- a/.env.example +++ b/.env.example @@ -3,7 +3,7 @@ # Shared database (the `tickets` schema lives in tracksolid_db; internal Docker host) DATABASE_URL=postgresql://tracksolid_owner:@timescale_db:5432/tracksolid_db -# rustfs / S3 — source ticket snapshots (automations/{inc,crq}/latest.json) +# rustfs / S3 — source ticket snapshots (automations/inc/.csv) RUSTFS_ENDPOINT=https://s3.rahamafresh.com RUSTFS_ACCESS_KEY= RUSTFS_SECRET_KEY= diff --git a/.gitignore b/.gitignore index c678c77..e1ef631 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,5 @@ __pycache__/ uv.lock *.json !.*.json +*.csv .DS_Store diff --git a/README.md b/README.md index e1aa5fc..d6a9e17 100644 --- a/README.md +++ b/README.md @@ -1,18 +1,19 @@ # fleettickets -Field-ops **INC / CRQ ticket** ingestion, geocoding, and read-schema that powers the +Field-ops **INC ticket** ingestion, geocoding, and read-schema that powers the **Tickets** map in FleetOps. Extracted from the `tracksolid` repo into its own module (it previously lived there as migrations 21–23 + `tools/import_tickets.py`). -- **INC** — incident / customer-fault tickets -- **CRQ** — new-installation requests +- **INC** — incident / customer-fault tickets *(this pipeline is **strictly INC**)* +- **CRQ** — new-installation requests *(schema kept, but **out of scope** — not ingested here)* ## What this owns | Piece | What | |---|---| | `migrations/01_tickets_schema.sql` | The `tickets` schema: `tickets.inc` / `tickets.crq` (raw-jsonb-first), `tickets.geo_clusters` + `tickets.geo_locations` gazetteers, geom-resolution trigger, and `reporting.fn_tickets_for_map` (the GeoJSON read function) | -| `import_tickets.py` | Pulls ticket snapshots from the rustfs `tickets` bucket and upserts them; geocodes clusters + INC locations | +| `migrations/02_import_meta.sql` | `tickets.import_meta` (per-dataset snapshot envelope metadata) + `fn_tickets_for_map` re-defined to expose it as `summary.freshness` (same signature — dashboard_api unchanged) | +| `import_tickets.py` | Ingests the **newest INC CSV** from the rustfs `tickets` bucket (`automations/inc/.csv`) and upserts on `ticket_id`; geocodes clusters + INC locations | | `run_migrations.py` | Applies `migrations/*.sql` in order (ledger: `tickets.schema_migrations`) | | `shared.py` | Minimal DB/logging helpers (self-contained — no tracksolid dependency) | @@ -49,23 +50,37 @@ python run_migrations.py # apply the schema (idempotent) ## Run ```bash -# ingest the latest snapshots from the bucket +# ingest the newest INC CSV from the bucket (skip-if-unchanged, then archive) python import_tickets.py --from-bucket --apply # geocode (needs GEOCODER_API_KEY) python import_tickets.py --geocode-clusters --apply # coarse, once python import_tickets.py --geocode-locations --apply # precise, actionable INC -# from local files instead of the bucket -python import_tickets.py --inc-json inc.json --crq-json crq.json --apply +# from a local CSV instead of the bucket (dev) +python import_tickets.py --inc-csv 2026-06-15T17-00-00.csv --apply ``` Dry-run is the default (omit `--apply`). `import_tickets.py --from-bucket` shells out to -the `aws` CLI using the `RUSTFS_*` env (no boto3 dependency). +the `aws` CLI using the `RUSTFS_*` env (no boto3 dependency). Hourly on the instance via +cron `5 * * * *` (a few minutes after each export). ## Notes -- The `changes/` subdirectory in the bucket holds **full timestamped snapshots** (not - deltas) — ingest `latest.json` only; don't process `changes/`. +- The n8n export writes a **full current-state CSV per hour** to + `automations/inc/.csv` — no `latest` pointer, no metadata envelope, no + deltas. The loader lists the prefix, takes the **newest** file, and ingests it. +- **Skip-if-unchanged:** the newest file's S3 **ETag** is compared to the last processed + file's ETag (stored in `tickets.import_meta.metadata.source_etag`); if equal, the DB write + is skipped (the export re-emits byte-identical content most hours). +- **Upsert on `ticket_id`** (PRIMARY KEY) — duplication is impossible; rows are never + deleted, so closed-ticket history accumulates. On success the file is **moved** to + `automations/inc/processed/`. +- **Cleaning at ingest:** drop `is_alarm=true` rows + the `EXPORT STOPPED…` sentinel; drop + `week_start`/`week_end`, `source_s3_*`/`source_snapshot_id`, `department`/`source_type`; + normalize `region` → lowercase and `raw_status` → UPPERCASE. `service_type` and `bucket` + (a `closed`/`pending` flag) are kept. +- `tickets.import_meta` captures snapshot freshness (surfaced as `summary.freshness` by + `fn_tickets_for_map`). - The curated/geocoded coordinates are written `verified = false` — review `tickets.geo_clusters` / `tickets.geo_locations` and flip `verified` once checked. diff --git a/import_tickets.py b/import_tickets.py index 39da3a4..8dc1e63 100644 --- a/import_tickets.py +++ b/import_tickets.py @@ -1,18 +1,31 @@ """ -import_tickets.py — Fireside Communications · INC/CRQ ticket ingestion (raw-first) +import_tickets.py — Fireside Communications · INC ticket ingestion (raw-first) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ -Loads the client's field-ops ticket snapshots into the `tickets` schema — the -source of the FleetOps "Tickets" map. Two categories, one table each: +Loads the client's field-ops INC ticket snapshots into the `tickets` schema — the +source of the FleetOps "Tickets" map. tickets.inc — incidents / customer faults - tickets.crq — new-installation requests -RAW-FIRST: each row stores only `ticket_id` + `raw` (the full source record as -jsonb). Everything downstream reads from `raw` (resilient to source schema drift). -The DB derives `geom` (see migrations): feed coords (raw lat/lng) -> location geocode +STRICTLY INC: CRQ (new-installation) exports are out of scope and not processed +here. `tickets.crq` stays in the schema but is not fed by this pipeline. + +RAW-FIRST: each row stores only `ticket_id` + `raw` (the source record as jsonb). +Everything downstream reads from `raw` (resilient to source schema drift). The DB +derives `geom` (see migrations): feed coords (raw lat/lng) -> location geocode (tickets.geo_locations) -> cluster centroid (tickets.geo_clusters) -> none. -Source data: rustfs `tickets` bucket, full snapshots from the client's email -automation — automations/{inc,crq}/latest.json (array of 32-key objects). +Source data: the n8n hourly S3 export (see n8n-hourly-s3-full-data-exports.md) +writes a full current-state snapshot CSV per hour to the `tickets` bucket at + automations/inc/.csv (e.g. 2026-06-15T17-00-00.csv) +There is NO latest pointer, NO metadata envelope, and NO deltas — each file is a +flat CSV (header + rows). We ingest the NEWEST file: + - skip-if-unchanged: if its S3 ETag matches the last processed file's ETag we + skip the DB write (the export re-emits byte-identical content most hours); + - drop is_alarm=true rows + the "EXPORT STOPPED…" truncation-sentinel row; + - drop derivable / provenance / zero-info columns (see DROP_FIELDS); + - normalize region -> lowercase, raw_status -> UPPERCASE; + - upsert on ticket_id (PRIMARY KEY → no duplication; never delete, so closure + history accumulates), and record snapshot freshness in tickets.import_meta; + - on success, MOVE the file to automations/inc/processed/ (copy + delete). Geocoding (two layers, both via a KEYED provider — public Nominatim rate-limits): --geocode-clusters one coordinate per cluster (coarse fallback; ~50 lookups) @@ -24,7 +37,7 @@ Provider/key from env: GEOCODER_PROVIDER (locationiq|opencage), GEOCODER_API_KEY Usage (needs DATABASE_URL + RUSTFS_* + GEOCODER_* env; see .env.example): python import_tickets.py --from-bucket --apply - python import_tickets.py --inc-json inc.json --crq-json crq.json --apply + python import_tickets.py --inc-csv 2026-06-15T17-00-00.csv --apply python import_tickets.py --geocode-clusters --apply python import_tickets.py --geocode-locations --apply @@ -36,12 +49,15 @@ geo_clusters + geo_locations + reporting.fn_tickets_for_map. from __future__ import annotations import argparse +import csv +import io import json import math import os import re import subprocess import time +from datetime import datetime, timezone, timedelta import requests import psycopg2.extras @@ -50,9 +66,31 @@ from shared import clean, get_conn, get_logger log = get_logger("import_tickets") -TABLE_FOR = {"inc": "tickets.inc", "crq": "tickets.crq"} +# ── INC ingestion config ────────────────────────────────────────────────────── +_TABLE = "tickets.inc" +_DATASET = "inc" _BUCKET = os.getenv("TICKETS_BUCKET", "tickets") -_BUCKET_KEYS = {"inc": "automations/inc/latest.json", "crq": "automations/crq/latest.json"} +_INC_PREFIX = "automations/inc/" +_PROCESSED_PREFIX = "automations/inc/processed/" +_EAT = timezone(timedelta(hours=3)) # Africa/Nairobi — filenames + data are EAT + +# Garbage row the source leaks (commonly the first data line): its ticket_id is the +# message itself. Matched by prefix so position/exact-tail don't matter. +_SENTINEL_PREFIX = "EXPORT STOPPED" + +# Columns dropped before building `raw`: derivable (week_*), the client's row-level +# export provenance (source_s3_*, source_snapshot_id), and zero-information columns +# (department=always FTTH, source_type=duplicate of service_type). We KEEP +# service_type and `bucket` (the latter is a closed/pending lifecycle flag). +DROP_FIELDS = frozenset({ + "week_start", "week_end", + "source_s3_bucket", "source_s3_key", "source_snapshot_id", + "department", "source_type", +}) + +# Only files matching automations/inc/.csv (NOT processed/, NOT the +# leftover latest.csv/, latest.json/, full/ prefixes). +_CSV_KEY_RE = re.compile(r"^automations/inc/(\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2})\.csv$") # Geocoder (keyed) — public Nominatim rate-limits bulk, so we use LocationIQ/OpenCage. _PROVIDER = os.getenv("GEOCODER_PROVIDER", "locationiq").lower() @@ -61,68 +99,190 @@ _GEOCODE_INTERVAL_S = float(os.getenv("GEOCODER_MIN_INTERVAL_S", "1.1")) _last_geocode_at = 0.0 -# ── data loading ────────────────────────────────────────────────────────────── -def _load_local(path: str) -> list[dict]: - with open(path, encoding="utf-8") as f: - data = json.load(f) # json.loads accepts NaN by default - return data if isinstance(data, list) else [] - - -def _load_bucket(kind: str) -> list[dict]: - env = { +# ── data loading (CSV · newest-file · ETag skip-if-unchanged) ─────────────────── +# The n8n hourly export writes a full current-state CSV per hour to +# automations/inc/.csv (no latest pointer, no envelope, no deltas). +# We ingest the NEWEST file; if its S3 ETag matches the last processed file's ETag +# we skip the DB write (the export re-emits byte-identical content most hours). +def _s3_env() -> dict: + return { **os.environ, "AWS_ACCESS_KEY_ID": os.environ["RUSTFS_ACCESS_KEY"], "AWS_SECRET_ACCESS_KEY": os.environ["RUSTFS_SECRET_KEY"], "AWS_DEFAULT_REGION": os.getenv("RUSTFS_REGION", "us-east-1"), + "AWS_S3_ADDRESSING_STYLE": "path", # force path-style to match the rustfs endpoint } - uri = f"s3://{_BUCKET}/{_BUCKET_KEYS[kind]}" - log.info("fetching %s", uri) - out = subprocess.run( - ["aws", "--endpoint-url", os.environ["RUSTFS_ENDPOINT"], "s3", "cp", uri, "-"], + + +def _aws(args: list[str], env: dict) -> bytes: + return subprocess.run( + ["aws", "--endpoint-url", os.environ["RUSTFS_ENDPOINT"], *args], env=env, capture_output=True, timeout=180, check=True, ).stdout - data = json.loads(out.decode("utf-8")) - return data if isinstance(data, list) else [] + + +def _ts_from_key(key: str) -> datetime | None: + """EAT timestamp embedded in an automations/inc/.csv key (or None).""" + m = _CSV_KEY_RE.match(key) + if not m: + return None + return datetime.strptime(m.group(1), "%Y-%m-%dT%H-%M-%S").replace(tzinfo=_EAT) + + +def _list_inc_csvs(env: dict) -> list[tuple[str, str]]: + """[(key, etag)] for every automations/inc/.csv (excludes processed/ + dirs).""" + out = _aws( + ["s3api", "list-objects-v2", "--bucket", _BUCKET, "--prefix", _INC_PREFIX, + "--query", "Contents[].{Key:Key,ETag:ETag}", "--output", "json"], + env, + ).decode("utf-8").strip() + items = json.loads(out) if out and out != "None" else [] + return [ + (it["Key"], (it.get("ETag") or "").strip('"')) + for it in (items or []) if _CSV_KEY_RE.match(it.get("Key", "")) + ] + + +def _last_processed_etag() -> str | None: + """ETag of the most recently ingested INC file (from tickets.import_meta).""" + with get_conn() as conn: + with conn.cursor() as cur: + cur.execute( + "SELECT metadata->>'source_etag' FROM tickets.import_meta WHERE dataset = %s", + (_DATASET,), + ) + row = cur.fetchone() + return row[0] if row else None + + +def _parse_csv(text: str) -> list[dict]: + return list(csv.DictReader(io.StringIO(text))) + + +def _load_csv_local(path: str) -> list[dict]: + with open(path, encoding="utf-8", newline="") as f: + return list(csv.DictReader(f)) + + +def _move_processed(keys: list[str], env: dict) -> None: + """Archive listed INC csv objects to automations/inc/processed/ (S3 mv = copy+delete).""" + for key in keys: + dst = _PROCESSED_PREFIX + key.rsplit("/", 1)[-1] + _aws(["s3", "mv", f"s3://{_BUCKET}/{key}", f"s3://{_BUCKET}/{dst}"], env) + log.info("archived %s -> %s", key, dst) + + +# ── row preparation (filter · drop columns · normalize) ───────────────────────── +def _keep_row(row: dict) -> bool: + """Drop alarm rows + the truncation-sentinel; require a real ticket_id.""" + tid = clean(row.get("ticket_id")) + if not tid or tid.startswith(_SENTINEL_PREFIX): + return False + return clean(row.get("is_alarm")) != "true" + + +def _prepare(row: dict) -> dict: + """Strip DROP_FIELDS and normalize region/raw_status — returns the `raw` payload.""" + r = {k: v for k, v in row.items() if k not in DROP_FIELDS} + if r.get("region"): + r["region"] = r["region"].lower() + if r.get("raw_status"): + r["raw_status"] = r["raw_status"].upper() + return r # ── upsert (raw-first) ──────────────────────────────────────────────────────── -def _scrub_nan(row: dict) -> dict: - # Postgres jsonb rejects the JSON `NaN` token (e.g. mttr) — scrub to null. - return {k: (None if isinstance(v, float) and math.isnan(v) else v) for k, v in row.items()} +def _record_meta(meta: dict, records_ingested: int) -> None: + """Upsert the INC snapshot metadata (powers map freshness + holds source_etag).""" + with get_conn() as conn: + with conn.cursor() as cur: + cur.execute( + """INSERT INTO tickets.import_meta + (dataset, export_type, exported_at, snapshot_date, source_schema, + source_table, row_count, records_ingested, n8n_execution_id, metadata, + ingested_at) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, now()) + ON CONFLICT (dataset) DO UPDATE + SET export_type = EXCLUDED.export_type, exported_at = EXCLUDED.exported_at, + snapshot_date = EXCLUDED.snapshot_date, source_schema = EXCLUDED.source_schema, + source_table = EXCLUDED.source_table, row_count = EXCLUDED.row_count, + records_ingested = EXCLUDED.records_ingested, + n8n_execution_id = EXCLUDED.n8n_execution_id, metadata = EXCLUDED.metadata, + ingested_at = now()""", + (_DATASET, clean(meta.get("export_type")), clean(meta.get("exported_at")), + clean(meta.get("snapshot_date")), clean(meta.get("source_schema")), + clean(meta.get("source_table")), meta.get("row_count"), records_ingested, + clean(meta.get("n8n_execution_id")), psycopg2.extras.Json(meta)), + ) -def upsert(rows: list[dict], table: str, apply: bool) -> int: - payload = [ - (tid, psycopg2.extras.Json(_scrub_nan(r))) - for r in rows - if (tid := clean(r.get("ticket_id"))) - ] - log.info("%s: %d valid rows (skipped %d without ticket_id)", - table, len(payload), len(rows) - len(payload)) +def upsert(rows: list[dict], apply: bool, meta: dict | None = None) -> int: + meta = meta or {} + kept = [r for r in rows if _keep_row(r)] + payload = [(clean(r["ticket_id"]), psycopg2.extras.Json(_prepare(r))) for r in kept] + log.info("%s: %d rows read, %d kept, %d dropped (alarm/sentinel/no-id)", + _TABLE, len(rows), len(payload), len(rows) - len(payload)) if not apply: - log.info("DRY-RUN — nothing written to %s. Use --apply.", table) + log.info("DRY-RUN — nothing written to %s. Use --apply.", _TABLE) return len(payload) with get_conn() as conn: with conn.cursor() as cur: psycopg2.extras.execute_values( cur, - f"INSERT INTO {table} (ticket_id, raw) VALUES %s " + f"INSERT INTO {_TABLE} (ticket_id, raw) VALUES %s " "ON CONFLICT (ticket_id) DO UPDATE SET raw = EXCLUDED.raw, ingested_at = now()", payload, page_size=500, ) - log.info("upserted %d rows into %s", len(payload), table) + _record_meta(meta, len(payload)) + log.info("upserted %d rows into %s", len(payload), _TABLE) return len(payload) def ingest(args) -> None: - if args.from_bucket: - for kind in ("inc", "crq"): - upsert(_load_bucket(kind), TABLE_FOR[kind], args.apply) + # Local-file path (dev): ingest a single CSV, no bucket / no archive. + if args.inc_csv: + rows = _load_csv_local(args.inc_csv) + name = os.path.basename(args.inc_csv) + ts = _ts_from_key(_INC_PREFIX + name) + meta = {"export_type": "full", "source_s3_key": name, "row_count": len(rows)} + if ts: + meta["exported_at"] = ts.isoformat() + upsert(rows, args.apply, meta=meta) + return + + # --from-bucket: newest INC csv → skip-if-unchanged → ingest → archive. + env = _s3_env() + listing = _list_inc_csvs(env) + if not listing: + log.info("no INC csv files under %s — nothing to do", _INC_PREFIX) + return + listing.sort(key=lambda ke: _ts_from_key(ke[0]) or datetime.min.replace(tzinfo=_EAT)) + all_keys = [k for k, _ in listing] + newest_key, newest_etag = listing[-1] + log.info("newest INC file: %s (etag=%s; %d file(s) present)", + newest_key, newest_etag, len(listing)) + + last_etag = _last_processed_etag() + if newest_etag and newest_etag == last_etag: + log.info("etag unchanged from last processed (%s) — skipping DB write", last_etag) + if args.apply: + _move_processed(all_keys, env) + else: + log.info("DRY-RUN — would archive %d file(s) to %s", len(all_keys), _PROCESSED_PREFIX) + return + + text = _aws(["s3", "cp", f"s3://{_BUCKET}/{newest_key}", "-"], env).decode("utf-8") + rows = _parse_csv(text) + ts = _ts_from_key(newest_key) + meta = {"export_type": "full", "source_s3_key": newest_key, + "source_etag": newest_etag, "row_count": len(rows)} + if ts: + meta["exported_at"] = ts.isoformat() + upsert(rows, args.apply, meta=meta) + if args.apply: + _move_processed(all_keys, env) else: - if args.inc_json: - upsert(_load_local(args.inc_json), TABLE_FOR["inc"], args.apply) - if args.crq_json: - upsert(_load_local(args.crq_json), TABLE_FOR["crq"], args.apply) + log.info("DRY-RUN — would archive %d file(s) to %s", len(all_keys), _PROCESSED_PREFIX) # ── place extraction (strip network codes, keep the real place) ─────────────── @@ -348,12 +508,12 @@ def _resolve() -> int: # ── entrypoint ──────────────────────────────────────────────────────────────── def main() -> None: - ap = argparse.ArgumentParser(description="Ingest INC/CRQ tickets (raw-first) + geocode") + ap = argparse.ArgumentParser(description="Ingest INC tickets from CSV (raw-first) + geocode") ap.add_argument("--apply", action="store_true", help="Write to DB (default: dry-run)") ap.add_argument("--from-bucket", action="store_true", - help="Fetch latest.json for inc+crq from the rustfs tickets bucket (aws CLI)") - ap.add_argument("--inc-json", default=None, help="Local INC tickets JSON file") - ap.add_argument("--crq-json", default=None, help="Local CRQ tickets JSON file") + help="Ingest the newest INC csv from the rustfs tickets bucket (aws CLI); " + "skips if unchanged (ETag) and archives processed files") + ap.add_argument("--inc-csv", default=None, help="Local INC tickets CSV file (dev)") ap.add_argument("--geocode-clusters", action="store_true", help="Geocode distinct clusters into the gazetteer, then re-resolve geoms") ap.add_argument("--geocode-locations", action="store_true", @@ -366,8 +526,8 @@ def main() -> None: if args.geocode_locations: geocode_locations(apply=args.apply) return - if not (args.from_bucket or args.inc_json or args.crq_json): - ap.error("provide --from-bucket, --inc-json/--crq-json, --geocode-clusters, or --geocode-locations") + if not (args.from_bucket or args.inc_csv): + ap.error("provide --from-bucket, --inc-csv, --geocode-clusters, or --geocode-locations") ingest(args) diff --git a/migrations/02_import_meta.sql b/migrations/02_import_meta.sql new file mode 100644 index 0000000..c021375 --- /dev/null +++ b/migrations/02_import_meta.sql @@ -0,0 +1,118 @@ +-- 02_import_meta.sql — fleettickets · snapshot metadata + map freshness +-- ───────────────────────────────────────────────────────────────────────────── +-- The n8n S3 export now wraps each dataset in a metadata envelope +-- ({ "metadata": {...}, "records": [...] }; see n8n-s3-export-workflows.md). +-- We capture that envelope per dataset at ingest so the map can show how fresh +-- the snapshot is, and re-define reporting.fn_tickets_for_map (same signature — +-- dashboard_api unchanged) to expose it under summary.freshness. +-- +-- Idempotent: safe on a fresh DB and re-appliable on the live DB. +-- ───────────────────────────────────────────────────────────────────────────── + +SET search_path = tickets, public; + +-- ── per-dataset snapshot metadata (one row per dataset; upserted each ingest) ─ +CREATE TABLE IF NOT EXISTS tickets.import_meta ( + dataset text PRIMARY KEY, -- 'inc' | 'crq' + export_type text, -- 'delta' | 'full' + exported_at timestamptz, -- metadata.exported_at (source) + snapshot_date date, -- metadata.snapshot_date (full runs) + source_schema text, + source_table text, + row_count integer, -- metadata.row_count (source count) + records_ingested integer, -- rows we actually read/upserted + n8n_execution_id text, + metadata jsonb, -- full envelope metadata (audit) + ingested_at timestamptz NOT NULL DEFAULT now() +); + +-- ── read function — add summary.freshness (signature unchanged) ────────────── +CREATE OR REPLACE FUNCTION reporting.fn_tickets_for_map( + p_service_type text DEFAULT NULL, + p_status text DEFAULT NULL, + p_open_only boolean DEFAULT true +) + RETURNS jsonb LANGUAGE plpgsql STABLE AS $fn$ +DECLARE v_result jsonb; +BEGIN + p_service_type := lower(NULLIF(p_service_type, '')); + p_status := NULLIF(p_status, ''); + WITH filtered AS ( + SELECT 'inc'::text AS service_type, raw, geom, geo_source FROM tickets.inc + WHERE geom IS NOT NULL + AND (p_service_type IS NULL OR p_service_type = 'inc') + AND (p_status IS NULL OR raw->>'normalized_status' = p_status) + AND (NOT p_open_only OR (raw->>'is_actionable')::boolean IS TRUE) + UNION ALL + SELECT 'crq'::text AS service_type, raw, geom, geo_source FROM tickets.crq + WHERE geom IS NOT NULL + AND (p_service_type IS NULL OR p_service_type = 'crq') + AND (p_status IS NULL OR raw->>'normalized_status' = p_status) + AND (NOT p_open_only OR (raw->>'is_actionable')::boolean IS TRUE) + ) + SELECT jsonb_build_object( + 'summary', jsonb_build_object( + 'ticket_count', COUNT(*), + 'inc', COUNT(*) FILTER (WHERE service_type = 'inc'), + 'crq', COUNT(*) FILTER (WHERE service_type = 'crq'), + 'open', COUNT(*) FILTER (WHERE (raw->>'is_actionable')::boolean IS TRUE), + 'by_status', (SELECT jsonb_object_agg(s, c) + FROM (SELECT raw->>'normalized_status' AS s, COUNT(*) AS c + FROM filtered GROUP BY raw->>'normalized_status') z), + 'freshness', (SELECT jsonb_object_agg(dataset, jsonb_build_object( + 'export_type', export_type, + 'exported_at', exported_at, + 'snapshot_date', snapshot_date, + 'row_count', row_count, + 'records_ingested', records_ingested, + 'ingested_at', ingested_at)) + FROM tickets.import_meta + WHERE p_service_type IS NULL OR dataset = p_service_type) + ), + 'geojson', jsonb_build_object( + 'type', 'FeatureCollection', + 'features', COALESCE(jsonb_agg( + jsonb_build_object( + 'type', 'Feature', + 'properties', jsonb_build_object( + 'ticket_id', raw->>'ticket_id', + 'service_type', service_type, + 'status', raw->>'normalized_status', + 'raw_status', raw->>'raw_status', + 'cluster', raw->>'cluster', + 'region', raw->>'region', + 'location_name', raw->>'location_name', + 'department', raw->>'department', + 'owner', raw->>'owner', + 'assigned_team', raw->>'assigned_team', + 'sla_status', raw->>'sla_status', + 'is_actionable', (raw->>'is_actionable')::boolean, + 'geo_source', geo_source, + 'created_at', raw->>'created_at_service', + 'scheduled_at', raw->>'scheduled_at' + ), + 'geometry', ST_AsGeoJSON(geom)::jsonb + ) + ), '[]'::jsonb) + ) + ) INTO v_result FROM filtered; + RETURN v_result; +END $fn$; + +COMMENT ON FUNCTION reporting.fn_tickets_for_map(text, text, boolean) IS + 'INC/CRQ tickets (tickets.inc + tickets.crq, raw-jsonb-first) as GeoJSON, with ' + 'summary.freshness from tickets.import_meta. fleettickets 02.'; + +-- ── grants (guarded: roles may not exist on a fresh DB) ─────────────────────── +DO $grants$ +BEGIN + IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'tracksolid_owner') THEN + GRANT SELECT, INSERT, UPDATE, DELETE ON tickets.import_meta TO tracksolid_owner; + END IF; + IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'dashboard_ro') THEN + GRANT SELECT ON tickets.import_meta TO dashboard_ro; + END IF; + IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'grafana_ro') THEN + GRANT SELECT ON tickets.import_meta TO grafana_ro; + END IF; +END $grants$; diff --git a/n8n-hourly-s3-full-data-exports.md b/n8n-hourly-s3-full-data-exports.md new file mode 100644 index 0000000..1563162 --- /dev/null +++ b/n8n-hourly-s3-full-data-exports.md @@ -0,0 +1,156 @@ +# n8n Hourly S3 Full-Data Exports + +Updated on June 15, 2026. + +## Overview + +Two active n8n workflows export complete datasets to S3 every hour: + +1. `FTTH Automation Ticket S3 Export` +2. `Fuel Records S3 Export` + +Each execution creates CSV files only. Filenames use the actual execution time +in the `Africa/Nairobi` timezone. + +No delta files, JSON files, `latest` files, `changes/` directories, `full/` +directories, or midnight-specific exports are created. + +## Hourly Output + +Together, the two workflows create exactly three files during their hourly +executions: + +```text +automations/crq/YYYY-MM-DDTHH-mm-ss.csv +automations/inc/YYYY-MM-DDTHH-mm-ss.csv +fuel_records/YYYY-MM-DDTHH-mm-ss.csv +``` + +The CRQ and INC files are uploaded to the `tickets` bucket. The Fuel file is +uploaded to the `fuel` bucket. + +## FTTH Automation Ticket S3 Export + +Workflow ID: `JI3QkcJeHk9eYRsY` + +The workflow: + +1. Runs at the start of every hour using the `Africa/Nairobi` workflow timezone. +2. Creates one execution timestamp. +3. Calls the existing authenticated Scoreboard export endpoint with + `export_type: full`. +4. Reads all CRQ and INC rows returned by the endpoint. +5. Converts each complete dataset to CSV. +6. Uploads exactly two files: + - `automations/crq/.csv` + - `automations/inc/.csv` +7. Fails the execution if exactly two successful upload results are not + returned. + +The workflow still has its existing manual webhook for operational testing. + +## Fuel Records S3 Export + +Workflow ID: `IP2KNAfFazAjTesh` + +The workflow: + +1. Runs at the start of every hour using the `Africa/Nairobi` workflow timezone. +2. Creates one execution timestamp. +3. Reads the complete `logistics_department.fuel_records` table. +4. Converts all returned rows to one CSV. +5. Uploads exactly one file: + - `fuel_records/.csv` +6. Fails the execution if the S3 upload reports an error. + +The workflow still has its existing manual webhook for operational testing. + +## Timestamp Format + +The timestamp format is: + +```text +YYYY-MM-DDTHH-mm-ss +``` + +Example: + +```text +2026-06-15T14-39-53 +``` + +The timestamp is generated once at the start of each workflow execution and is +formatted in `Africa/Nairobi`. + +## Credentials and Safety + +- Existing n8n PostgreSQL, S3, workflow-variable, and API token configuration is + reused. +- No S3 credentials or API secrets are hard-coded in workflow code. +- Secrets are not included in workflow result messages. +- Source database queries are read-only. +- The workflows do not delete or update source database rows. +- S3 upload nodes retain retry handling. A failed hourly execution can also be + recovered naturally by the next full-data run. + +## Removed Behavior + +The workflows no longer contain: + +- Delta export logic or stored delta pointers +- Midnight full-export schedules +- `latest.json` or `latest.csv` +- JSON output +- `changes/` keys +- `full/` keys +- Multipart or additional export files +- FTTH mark-sent state handling + +## Deployment Status + +Both workflows were saved, published, and activated on June 15, 2026. + +Active versions: + +```text +Fuel Records S3 Export: +60cf5824-9345-45bb-a2eb-3b20b877fd32 + +FTTH Automation Ticket S3 Export: +68b7be10-ac3a-43d8-8c17-b46a2cbb48d2 +``` + +## Manual Test Evidence + +### Fuel Records S3 Export + +Execution ID: `404079` + +Rows exported: `2001` + +Exact S3 key: + +```text +fuel_records/2026-06-15T14-39-50.csv +``` + +### FTTH Automation Ticket S3 Export + +Execution ID: `404080` + +Rows exported: + +```text +CRQ: 12680 +INC: 31434 +``` + +Exact S3 keys: + +```text +automations/crq/2026-06-15T14-39-53.csv +automations/inc/2026-06-15T14-39-53.csv +``` + +Both manual tests completed successfully. Their upload builders generated one +Fuel item and exactly two FTTH items, matching the required three output files. diff --git a/n8n-s3-export-workflows.md b/n8n-s3-export-workflows.md new file mode 100644 index 0000000..38427b1 --- /dev/null +++ b/n8n-s3-export-workflows.md @@ -0,0 +1,256 @@ +# n8n S3 Export Workflows + +## Overview + +Both workflows run in the `Africa/Nairobi` timezone and use the +`Rahamafresh Tickets S3` credential. + +| Workflow | n8n ID | Source | Bucket | +| --- | --- | --- | --- | +| Fuel Records S3 Export | `IP2KNAfFazAjTesh` | `logistics_department.fuel_records` | `fuel` | +| FTTH Automation Ticket S3 Export | `JI3QkcJeHk9eYRsY` | `isp_department_crq.automations` and `isp_department_osp.automations` | `tickets` | + +## Schedules + +- Hourly delta export: `10` minutes after each hour, from `01:10` through + `23:10` (`0 10 1-23 * * *`). +- Daily full export: `00:05` (`0 5 0 * * *`). +- The `00:05` run exports rows up to the end of the previous local day. For + example, Wednesday's run exports the snapshot through Tuesday `23:59:59`. +- The full export has its own state and does not read or advance the hourly + delta pointer. + +## Fuel Records S3 Export + +### Source and change tracking + +The source is `logistics_department.fuel_records`. The table has an indexed +`updated_at` column and an update trigger that refreshes it whenever a row is +changed. + +Hourly runs select rows where: + +```sql +updated_at > last_successful_delta_export_at +AND updated_at <= requested_at +``` + +The delta pointer advances to the maximum exported `updated_at` only after all +S3 uploads complete and the downloaded `latest.json` passes validation. A +failed or empty run does not move the pointer incorrectly. + +The full run selects all rows created before local midnight, independently of +the delta pointer. + +### Fuel object keys + +Every successful run updates: + +- `fuel_records/latest.json` +- `fuel_records/latest.csv` + +Delta runs with changed rows also write: + +- `fuel_records/changes/YYYY-MM-DDTHH-mm-ss.json` +- `fuel_records/changes/YYYY-MM-DDTHH-mm-ss.csv` + +Full runs write: + +- `fuel_records/full/YYYY-MM-DD.json` +- `fuel_records/full/YYYY-MM-DD.csv` + +Exports larger than 5,000 rows additionally produce numbered JSON and CSV +parts such as `-part-0001`. + +### Fuel state + +Fuel state is stored in n8n workflow static data and is updated only after S3 +read-back validation: + +- `last_successful_delta_export_at` +- `last_successful_full_export_date` +- `rows_exported` +- `destination_key` and `destination_keys` +- `n8n_execution_id` +- `success` +- `error_message` +- `completed_at` + +## FTTH Automation Ticket S3 Export + +### Source and change tracking + +The workflow requests an export package from the active SCOREBOARD service: + +```text +POST /api/v1/ftth/automation-export/package +``` + +Datasets: + +- CRQ: `isp_department_crq.automations` +- INC: `isp_department_osp.automations` + +Hourly delta packages select records changed after the last successful delta +export. The SCOREBOARD service creates an export run before returning the +package. n8n uploads every returned object and then calls: + +```text +POST /api/v1/ftth/automation-export/mark-sent +``` + +Only a `SUCCESS` acknowledgement advances the delta pointer. Upload failure +marks the run `FAILED` and preserves the previous pointer. + +Full packages use the previous local date as `snapshot_date`, select the +complete current-state dataset through the previous day, and update only the +full-export date after successful upload. + +### FTTH object keys + +CRQ: + +- `automations/crq/latest.json` +- `automations/crq/latest.csv` +- `automations/crq/changes/YYYY-MM-DDTHH-mm-ss.json` +- `automations/crq/changes/YYYY-MM-DDTHH-mm-ss.csv` +- `automations/crq/full/YYYY-MM-DD.json` +- `automations/crq/full/YYYY-MM-DD.csv` + +INC: + +- `automations/inc/latest.json` +- `automations/inc/latest.csv` +- `automations/inc/changes/YYYY-MM-DDTHH-mm-ss.json` +- `automations/inc/changes/YYYY-MM-DDTHH-mm-ss.csv` +- `automations/inc/full/YYYY-MM-DD.json` +- `automations/inc/full/YYYY-MM-DD.csv` + +Exports larger than 5,000 rows additionally produce numbered JSON and CSV +parts. + +### FTTH state + +State and audit history are stored in +`ftth_automation.automation_export_runs`. Each run records: + +- export type and requested timestamp +- last successful delta timestamp +- last successful full export date +- snapshot date +- row count +- destination object keys +- n8n execution ID +- status (`PENDING`, `SUCCESS`, or `FAILED`) +- completion timestamp and error summary + +## File Contents + +JSON files contain: + +```json +{ + "metadata": { + "exported_at": "...", + "export_type": "delta or full", + "source_schema": "...", + "source_table": "...", + "dataset": "crq, inc, or omitted for fuel", + "row_count": 0, + "last_successful_delta_export_at": "...", + "last_successful_full_export_date": "...", + "snapshot_date": "...", + "n8n_execution_id": "..." + }, + "records": [] +} +``` + +Fuel metadata names the previous and candidate delta pointers explicitly. +CSV files contain the same exported records as tabular rows with a header +line. CSV files do not contain the JSON metadata envelope. + +## Manual Tests + +Both workflows expose production POST webhooks. + +Fuel: + +```bash +curl -X POST \ + -H 'Content-Type: application/json' \ + -d '{"export_type":"delta"}' \ + https://n8n.firesideafrica.cloud/webhook/fuel-records-s3-export + +curl -X POST \ + -H 'Content-Type: application/json' \ + -d '{"export_type":"full"}' \ + https://n8n.firesideafrica.cloud/webhook/fuel-records-s3-export +``` + +FTTH: + +```bash +curl -X POST \ + -H 'Content-Type: application/json' \ + -d '{"export_type":"delta"}' \ + https://n8n.firesideafrica.cloud/webhook/ftth-automation-s3-export + +curl -X POST \ + -H 'Content-Type: application/json' \ + -d '{"export_type":"full","force":true}' \ + https://n8n.firesideafrica.cloud/webhook/ftth-automation-s3-export +``` + +After a test: + +1. Confirm the webhook response has `success: true`. +2. Open the execution ID in n8n and confirm every upload succeeded. +3. Confirm the response lists the expected bucket and destination keys. +4. Check the S3 object timestamps and inspect the JSON metadata and row count. +5. For FTTH, confirm the matching export run is `SUCCESS`. + +### Production test record + +Tests run on June 15, 2026: + +| Workflow | Type | Execution | Result | +| --- | --- | --- | --- | +| Fuel Records S3 Export | Delta | `402524` | Success; 0 changed rows; latest JSON and CSV validated | +| Fuel Records S3 Export | Full | `402527` | Success; 1,965 rows; snapshot date `2026-06-14` | +| FTTH Automation Ticket S3 Export | Delta | `402530` | Success; CRQ and INC latest/change JSON and CSV written | +| FTTH Automation Ticket S3 Export | Full | `402536` | Success; 44,114 rows; snapshot date `2026-06-14`; 28 objects including batch parts | + +## Troubleshooting + +1. Check the n8n execution and identify whether the source query/package, + upload, read-back validation, or mark-sent step failed. +2. Confirm the `Rahamafresh Tickets S3` credential can write to the configured + bucket. +3. For fuel, inspect workflow static data. Do not manually advance + `last_successful_delta_export_at` after a failed run. +4. Verify `fuel_records.updated_at` is populated and its update trigger exists + if fuel changes are missing. +5. For FTTH, inspect `ftth_automation.automation_export_runs`, including + `status`, `destination_object_keys`, `n8n_execution_id`, and + `error_summary`. +6. Confirm the SCOREBOARD health endpoint is healthy and that the configured + export token and base URL are correct. +7. Re-run the appropriate manual webhook after fixing the failure. A failed + run leaves the last successful pointer unchanged, so the rows are retried. + +## Published Version Check + +In n8n, open each workflow and confirm: + +- `Active` is enabled. +- The saved `versionId` equals `activeVersionId`. +- The trigger list contains the hourly schedule, daily `00:05` schedule, and + manual webhook. +- A new production webhook execution uses the same active version and returns + the expected destination keys. + +Current published versions as of June 15, 2026: + +- Fuel Records S3 Export: `6833e5e5-97a0-41be-8f82-9ec612de92ce` +- FTTH Automation Ticket S3 Export: `b2171088-eac2-439b-97e8-83dfa8117783`