Compare commits

...

2 commits

Author SHA1 Message Date
david kiania
4532643247 chore: add hourly INC ingest cron wrapper + schedule docs
run_ingest.sh loads .env and runs `import_tickets.py --from-bucket --apply`.
Documented crontab: `15 7-19 * * *` in Africa/Nairobi (ingest at :15, 07:00–19:00).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-15 19:40:50 +03:00
david kiania
df054c92be feat: INC hourly-CSV ingestion (newest-file, ETag dedup, clean + archive)
Rework import_tickets.py from the retired JSON `latest.json` model to the new
hourly full-snapshot CSV export. Strictly INC (CRQ out of scope).

- Ingest the newest automations/inc/<EAT-timestamp>.csv; skip-if-unchanged by
  comparing S3 ETag to tickets.import_meta.metadata.source_etag.
- Upsert on ticket_id (PK; no dups, never delete -> closure history accrues).
  No truncate. On success, move processed files to automations/inc/processed/.
- Clean at ingest: drop is_alarm=true + the "EXPORT STOPPED..." sentinel; drop
  week_*, source_s3_*/source_snapshot_id, department/source_type; lowercase
  region, uppercase raw_status; keep service_type + bucket.
- Force path-style S3 addressing; --inc-csv for local dev; --from-bucket for cron.
- Add migrations/02 (import_meta + freshness); refresh README/.env.example/docs.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-15 19:33:16 +03:00
8 changed files with 809 additions and 64 deletions

View file

@ -3,7 +3,7 @@
# Shared database (the `tickets` schema lives in tracksolid_db; internal Docker host)
DATABASE_URL=postgresql://tracksolid_owner:<password>@timescale_db:5432/tracksolid_db
# rustfs / S3 — source ticket snapshots (automations/{inc,crq}/latest.json)
# rustfs / S3 — source ticket snapshots (automations/inc/<EAT-timestamp>.csv)
RUSTFS_ENDPOINT=https://s3.rahamafresh.com
RUSTFS_ACCESS_KEY=<key>
RUSTFS_SECRET_KEY=<secret>

1
.gitignore vendored
View file

@ -5,4 +5,5 @@ __pycache__/
uv.lock
*.json
!.*.json
*.csv
.DS_Store

View file

@ -1,18 +1,19 @@
# fleettickets
Field-ops **INC / CRQ ticket** ingestion, geocoding, and read-schema that powers the
Field-ops **INC ticket** ingestion, geocoding, and read-schema that powers the
**Tickets** map in FleetOps. Extracted from the `tracksolid` repo into its own module
(it previously lived there as migrations 2123 + `tools/import_tickets.py`).
- **INC** — incident / customer-fault tickets
- **CRQ** — new-installation requests
- **INC** — incident / customer-fault tickets *(this pipeline is **strictly INC**)*
- **CRQ** — new-installation requests *(schema kept, but **out of scope** — not ingested here)*
## What this owns
| Piece | What |
|---|---|
| `migrations/01_tickets_schema.sql` | The `tickets` schema: `tickets.inc` / `tickets.crq` (raw-jsonb-first), `tickets.geo_clusters` + `tickets.geo_locations` gazetteers, geom-resolution trigger, and `reporting.fn_tickets_for_map` (the GeoJSON read function) |
| `import_tickets.py` | Pulls ticket snapshots from the rustfs `tickets` bucket and upserts them; geocodes clusters + INC locations |
| `migrations/02_import_meta.sql` | `tickets.import_meta` (per-dataset snapshot envelope metadata) + `fn_tickets_for_map` re-defined to expose it as `summary.freshness` (same signature — dashboard_api unchanged) |
| `import_tickets.py` | Ingests the **newest INC CSV** from the rustfs `tickets` bucket (`automations/inc/<EAT-timestamp>.csv`) and upserts on `ticket_id`; geocodes clusters + INC locations |
| `run_migrations.py` | Applies `migrations/*.sql` in order (ledger: `tickets.schema_migrations`) |
| `shared.py` | Minimal DB/logging helpers (self-contained — no tracksolid dependency) |
@ -49,23 +50,49 @@ python run_migrations.py # apply the schema (idempotent)
## Run
```bash
# ingest the latest snapshots from the bucket
# ingest the newest INC CSV from the bucket (skip-if-unchanged, then archive)
python import_tickets.py --from-bucket --apply
# geocode (needs GEOCODER_API_KEY)
python import_tickets.py --geocode-clusters --apply # coarse, once
python import_tickets.py --geocode-locations --apply # precise, actionable INC
# from local files instead of the bucket
python import_tickets.py --inc-json inc.json --crq-json crq.json --apply
# from a local CSV instead of the bucket (dev)
python import_tickets.py --inc-csv 2026-06-15T17-00-00.csv --apply
```
Dry-run is the default (omit `--apply`). `import_tickets.py --from-bucket` shells out to
the `aws` CLI using the `RUSTFS_*` env (no boto3 dependency).
## Schedule (cron)
On the instance, ingest at **:15 past every hour, 07:0019:00 EAT** via
[`run_ingest.sh`](run_ingest.sh) (loads `.env`, runs `--from-bucket --apply`):
```cron
CRON_TZ=Africa/Nairobi
15 7-19 * * * /opt/fleettickets/run_ingest.sh >> /var/log/fleettickets-inc.log 2>&1
```
`CRON_TZ` matters — the export filenames and this schedule are in `Africa/Nairobi`.
Skip-if-unchanged means a run on an already-ingested snapshot is a cheap no-op.
## Notes
- The `changes/` subdirectory in the bucket holds **full timestamped snapshots** (not
deltas) — ingest `latest.json` only; don't process `changes/`.
- The n8n export writes a **full current-state CSV per hour** to
`automations/inc/<EAT-timestamp>.csv` — no `latest` pointer, no metadata envelope, no
deltas. The loader lists the prefix, takes the **newest** file, and ingests it.
- **Skip-if-unchanged:** the newest file's S3 **ETag** is compared to the last processed
file's ETag (stored in `tickets.import_meta.metadata.source_etag`); if equal, the DB write
is skipped (the export re-emits byte-identical content most hours).
- **Upsert on `ticket_id`** (PRIMARY KEY) — duplication is impossible; rows are never
deleted, so closed-ticket history accumulates. On success the file is **moved** to
`automations/inc/processed/`.
- **Cleaning at ingest:** drop `is_alarm=true` rows + the `EXPORT STOPPED…` sentinel; drop
`week_start`/`week_end`, `source_s3_*`/`source_snapshot_id`, `department`/`source_type`;
normalize `region` → lowercase and `raw_status` → UPPERCASE. `service_type` and `bucket`
(a `closed`/`pending` flag) are kept.
- `tickets.import_meta` captures snapshot freshness (surfaced as `summary.freshness` by
`fn_tickets_for_map`).
- The curated/geocoded coordinates are written `verified = false` — review
`tickets.geo_clusters` / `tickets.geo_locations` and flip `verified` once checked.

View file

@ -1,18 +1,31 @@
"""
import_tickets.py Fireside Communications · INC/CRQ ticket ingestion (raw-first)
import_tickets.py Fireside Communications · INC ticket ingestion (raw-first)
Loads the client's field-ops ticket snapshots into the `tickets` schema — the
source of the FleetOps "Tickets" map. Two categories, one table each:
Loads the client's field-ops INC ticket snapshots into the `tickets` schema — the
source of the FleetOps "Tickets" map.
tickets.inc incidents / customer faults
tickets.crq new-installation requests
RAW-FIRST: each row stores only `ticket_id` + `raw` (the full source record as
jsonb). Everything downstream reads from `raw` (resilient to source schema drift).
The DB derives `geom` (see migrations): feed coords (raw lat/lng) -> location geocode
STRICTLY INC: CRQ (new-installation) exports are out of scope and not processed
here. `tickets.crq` stays in the schema but is not fed by this pipeline.
RAW-FIRST: each row stores only `ticket_id` + `raw` (the source record as jsonb).
Everything downstream reads from `raw` (resilient to source schema drift). The DB
derives `geom` (see migrations): feed coords (raw lat/lng) -> location geocode
(tickets.geo_locations) -> cluster centroid (tickets.geo_clusters) -> none.
Source data: rustfs `tickets` bucket, full snapshots from the client's email
automation automations/{inc,crq}/latest.json (array of 32-key objects).
Source data: the n8n hourly S3 export (see n8n-hourly-s3-full-data-exports.md)
writes a full current-state snapshot CSV per hour to the `tickets` bucket at
automations/inc/<EAT-timestamp>.csv (e.g. 2026-06-15T17-00-00.csv)
There is NO latest pointer, NO metadata envelope, and NO deltas each file is a
flat CSV (header + rows). We ingest the NEWEST file:
- skip-if-unchanged: if its S3 ETag matches the last processed file's ETag we
skip the DB write (the export re-emits byte-identical content most hours);
- drop is_alarm=true rows + the "EXPORT STOPPED…" truncation-sentinel row;
- drop derivable / provenance / zero-info columns (see DROP_FIELDS);
- normalize region -> lowercase, raw_status -> UPPERCASE;
- upsert on ticket_id (PRIMARY KEY no duplication; never delete, so closure
history accumulates), and record snapshot freshness in tickets.import_meta;
- on success, MOVE the file to automations/inc/processed/ (copy + delete).
Geocoding (two layers, both via a KEYED provider public Nominatim rate-limits):
--geocode-clusters one coordinate per cluster (coarse fallback; ~50 lookups)
@ -24,7 +37,7 @@ Provider/key from env: GEOCODER_PROVIDER (locationiq|opencage), GEOCODER_API_KEY
Usage (needs DATABASE_URL + RUSTFS_* + GEOCODER_* env; see .env.example):
python import_tickets.py --from-bucket --apply
python import_tickets.py --inc-json inc.json --crq-json crq.json --apply
python import_tickets.py --inc-csv 2026-06-15T17-00-00.csv --apply
python import_tickets.py --geocode-clusters --apply
python import_tickets.py --geocode-locations --apply
@ -36,12 +49,15 @@ geo_clusters + geo_locations + reporting.fn_tickets_for_map.
from __future__ import annotations
import argparse
import csv
import io
import json
import math
import os
import re
import subprocess
import time
from datetime import datetime, timezone, timedelta
import requests
import psycopg2.extras
@ -50,9 +66,31 @@ from shared import clean, get_conn, get_logger
log = get_logger("import_tickets")
TABLE_FOR = {"inc": "tickets.inc", "crq": "tickets.crq"}
# ── INC ingestion config ──────────────────────────────────────────────────────
_TABLE = "tickets.inc"
_DATASET = "inc"
_BUCKET = os.getenv("TICKETS_BUCKET", "tickets")
_BUCKET_KEYS = {"inc": "automations/inc/latest.json", "crq": "automations/crq/latest.json"}
_INC_PREFIX = "automations/inc/"
_PROCESSED_PREFIX = "automations/inc/processed/"
_EAT = timezone(timedelta(hours=3)) # Africa/Nairobi — filenames + data are EAT
# Garbage row the source leaks (commonly the first data line): its ticket_id is the
# message itself. Matched by prefix so position/exact-tail don't matter.
_SENTINEL_PREFIX = "EXPORT STOPPED"
# Columns dropped before building `raw`: derivable (week_*), the client's row-level
# export provenance (source_s3_*, source_snapshot_id), and zero-information columns
# (department=always FTTH, source_type=duplicate of service_type). We KEEP
# service_type and `bucket` (the latter is a closed/pending lifecycle flag).
DROP_FIELDS = frozenset({
"week_start", "week_end",
"source_s3_bucket", "source_s3_key", "source_snapshot_id",
"department", "source_type",
})
# Only files matching automations/inc/<EAT-timestamp>.csv (NOT processed/, NOT the
# leftover latest.csv/, latest.json/, full/ prefixes).
_CSV_KEY_RE = re.compile(r"^automations/inc/(\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2})\.csv$")
# Geocoder (keyed) — public Nominatim rate-limits bulk, so we use LocationIQ/OpenCage.
_PROVIDER = os.getenv("GEOCODER_PROVIDER", "locationiq").lower()
@ -61,68 +99,190 @@ _GEOCODE_INTERVAL_S = float(os.getenv("GEOCODER_MIN_INTERVAL_S", "1.1"))
_last_geocode_at = 0.0
# ── data loading ──────────────────────────────────────────────────────────────
def _load_local(path: str) -> list[dict]:
with open(path, encoding="utf-8") as f:
data = json.load(f) # json.loads accepts NaN by default
return data if isinstance(data, list) else []
def _load_bucket(kind: str) -> list[dict]:
env = {
# ── data loading (CSV · newest-file · ETag skip-if-unchanged) ───────────────────
# The n8n hourly export writes a full current-state CSV per hour to
# automations/inc/<EAT-timestamp>.csv (no latest pointer, no envelope, no deltas).
# We ingest the NEWEST file; if its S3 ETag matches the last processed file's ETag
# we skip the DB write (the export re-emits byte-identical content most hours).
def _s3_env() -> dict:
return {
**os.environ,
"AWS_ACCESS_KEY_ID": os.environ["RUSTFS_ACCESS_KEY"],
"AWS_SECRET_ACCESS_KEY": os.environ["RUSTFS_SECRET_KEY"],
"AWS_DEFAULT_REGION": os.getenv("RUSTFS_REGION", "us-east-1"),
"AWS_S3_ADDRESSING_STYLE": "path", # force path-style to match the rustfs endpoint
}
uri = f"s3://{_BUCKET}/{_BUCKET_KEYS[kind]}"
log.info("fetching %s", uri)
out = subprocess.run(
["aws", "--endpoint-url", os.environ["RUSTFS_ENDPOINT"], "s3", "cp", uri, "-"],
def _aws(args: list[str], env: dict) -> bytes:
return subprocess.run(
["aws", "--endpoint-url", os.environ["RUSTFS_ENDPOINT"], *args],
env=env, capture_output=True, timeout=180, check=True,
).stdout
data = json.loads(out.decode("utf-8"))
return data if isinstance(data, list) else []
def _ts_from_key(key: str) -> datetime | None:
"""EAT timestamp embedded in an automations/inc/<ts>.csv key (or None)."""
m = _CSV_KEY_RE.match(key)
if not m:
return None
return datetime.strptime(m.group(1), "%Y-%m-%dT%H-%M-%S").replace(tzinfo=_EAT)
def _list_inc_csvs(env: dict) -> list[tuple[str, str]]:
"""[(key, etag)] for every automations/inc/<ts>.csv (excludes processed/ + dirs)."""
out = _aws(
["s3api", "list-objects-v2", "--bucket", _BUCKET, "--prefix", _INC_PREFIX,
"--query", "Contents[].{Key:Key,ETag:ETag}", "--output", "json"],
env,
).decode("utf-8").strip()
items = json.loads(out) if out and out != "None" else []
return [
(it["Key"], (it.get("ETag") or "").strip('"'))
for it in (items or []) if _CSV_KEY_RE.match(it.get("Key", ""))
]
def _last_processed_etag() -> str | None:
"""ETag of the most recently ingested INC file (from tickets.import_meta)."""
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT metadata->>'source_etag' FROM tickets.import_meta WHERE dataset = %s",
(_DATASET,),
)
row = cur.fetchone()
return row[0] if row else None
def _parse_csv(text: str) -> list[dict]:
return list(csv.DictReader(io.StringIO(text)))
def _load_csv_local(path: str) -> list[dict]:
with open(path, encoding="utf-8", newline="") as f:
return list(csv.DictReader(f))
def _move_processed(keys: list[str], env: dict) -> None:
"""Archive listed INC csv objects to automations/inc/processed/ (S3 mv = copy+delete)."""
for key in keys:
dst = _PROCESSED_PREFIX + key.rsplit("/", 1)[-1]
_aws(["s3", "mv", f"s3://{_BUCKET}/{key}", f"s3://{_BUCKET}/{dst}"], env)
log.info("archived %s -> %s", key, dst)
# ── row preparation (filter · drop columns · normalize) ─────────────────────────
def _keep_row(row: dict) -> bool:
"""Drop alarm rows + the truncation-sentinel; require a real ticket_id."""
tid = clean(row.get("ticket_id"))
if not tid or tid.startswith(_SENTINEL_PREFIX):
return False
return clean(row.get("is_alarm")) != "true"
def _prepare(row: dict) -> dict:
"""Strip DROP_FIELDS and normalize region/raw_status — returns the `raw` payload."""
r = {k: v for k, v in row.items() if k not in DROP_FIELDS}
if r.get("region"):
r["region"] = r["region"].lower()
if r.get("raw_status"):
r["raw_status"] = r["raw_status"].upper()
return r
# ── upsert (raw-first) ────────────────────────────────────────────────────────
def _scrub_nan(row: dict) -> dict:
# Postgres jsonb rejects the JSON `NaN` token (e.g. mttr) — scrub to null.
return {k: (None if isinstance(v, float) and math.isnan(v) else v) for k, v in row.items()}
def _record_meta(meta: dict, records_ingested: int) -> None:
"""Upsert the INC snapshot metadata (powers map freshness + holds source_etag)."""
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""INSERT INTO tickets.import_meta
(dataset, export_type, exported_at, snapshot_date, source_schema,
source_table, row_count, records_ingested, n8n_execution_id, metadata,
ingested_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, now())
ON CONFLICT (dataset) DO UPDATE
SET export_type = EXCLUDED.export_type, exported_at = EXCLUDED.exported_at,
snapshot_date = EXCLUDED.snapshot_date, source_schema = EXCLUDED.source_schema,
source_table = EXCLUDED.source_table, row_count = EXCLUDED.row_count,
records_ingested = EXCLUDED.records_ingested,
n8n_execution_id = EXCLUDED.n8n_execution_id, metadata = EXCLUDED.metadata,
ingested_at = now()""",
(_DATASET, clean(meta.get("export_type")), clean(meta.get("exported_at")),
clean(meta.get("snapshot_date")), clean(meta.get("source_schema")),
clean(meta.get("source_table")), meta.get("row_count"), records_ingested,
clean(meta.get("n8n_execution_id")), psycopg2.extras.Json(meta)),
)
def upsert(rows: list[dict], table: str, apply: bool) -> int:
payload = [
(tid, psycopg2.extras.Json(_scrub_nan(r)))
for r in rows
if (tid := clean(r.get("ticket_id")))
]
log.info("%s: %d valid rows (skipped %d without ticket_id)",
table, len(payload), len(rows) - len(payload))
def upsert(rows: list[dict], apply: bool, meta: dict | None = None) -> int:
meta = meta or {}
kept = [r for r in rows if _keep_row(r)]
payload = [(clean(r["ticket_id"]), psycopg2.extras.Json(_prepare(r))) for r in kept]
log.info("%s: %d rows read, %d kept, %d dropped (alarm/sentinel/no-id)",
_TABLE, len(rows), len(payload), len(rows) - len(payload))
if not apply:
log.info("DRY-RUN — nothing written to %s. Use --apply.", table)
log.info("DRY-RUN — nothing written to %s. Use --apply.", _TABLE)
return len(payload)
with get_conn() as conn:
with conn.cursor() as cur:
psycopg2.extras.execute_values(
cur,
f"INSERT INTO {table} (ticket_id, raw) VALUES %s "
f"INSERT INTO {_TABLE} (ticket_id, raw) VALUES %s "
"ON CONFLICT (ticket_id) DO UPDATE SET raw = EXCLUDED.raw, ingested_at = now()",
payload, page_size=500,
)
log.info("upserted %d rows into %s", len(payload), table)
_record_meta(meta, len(payload))
log.info("upserted %d rows into %s", len(payload), _TABLE)
return len(payload)
def ingest(args) -> None:
if args.from_bucket:
for kind in ("inc", "crq"):
upsert(_load_bucket(kind), TABLE_FOR[kind], args.apply)
# Local-file path (dev): ingest a single CSV, no bucket / no archive.
if args.inc_csv:
rows = _load_csv_local(args.inc_csv)
name = os.path.basename(args.inc_csv)
ts = _ts_from_key(_INC_PREFIX + name)
meta = {"export_type": "full", "source_s3_key": name, "row_count": len(rows)}
if ts:
meta["exported_at"] = ts.isoformat()
upsert(rows, args.apply, meta=meta)
return
# --from-bucket: newest INC csv → skip-if-unchanged → ingest → archive.
env = _s3_env()
listing = _list_inc_csvs(env)
if not listing:
log.info("no INC csv files under %s — nothing to do", _INC_PREFIX)
return
listing.sort(key=lambda ke: _ts_from_key(ke[0]) or datetime.min.replace(tzinfo=_EAT))
all_keys = [k for k, _ in listing]
newest_key, newest_etag = listing[-1]
log.info("newest INC file: %s (etag=%s; %d file(s) present)",
newest_key, newest_etag, len(listing))
last_etag = _last_processed_etag()
if newest_etag and newest_etag == last_etag:
log.info("etag unchanged from last processed (%s) — skipping DB write", last_etag)
if args.apply:
_move_processed(all_keys, env)
else:
log.info("DRY-RUN — would archive %d file(s) to %s", len(all_keys), _PROCESSED_PREFIX)
return
text = _aws(["s3", "cp", f"s3://{_BUCKET}/{newest_key}", "-"], env).decode("utf-8")
rows = _parse_csv(text)
ts = _ts_from_key(newest_key)
meta = {"export_type": "full", "source_s3_key": newest_key,
"source_etag": newest_etag, "row_count": len(rows)}
if ts:
meta["exported_at"] = ts.isoformat()
upsert(rows, args.apply, meta=meta)
if args.apply:
_move_processed(all_keys, env)
else:
if args.inc_json:
upsert(_load_local(args.inc_json), TABLE_FOR["inc"], args.apply)
if args.crq_json:
upsert(_load_local(args.crq_json), TABLE_FOR["crq"], args.apply)
log.info("DRY-RUN — would archive %d file(s) to %s", len(all_keys), _PROCESSED_PREFIX)
# ── place extraction (strip network codes, keep the real place) ───────────────
@ -348,12 +508,12 @@ def _resolve() -> int:
# ── entrypoint ────────────────────────────────────────────────────────────────
def main() -> None:
ap = argparse.ArgumentParser(description="Ingest INC/CRQ tickets (raw-first) + geocode")
ap = argparse.ArgumentParser(description="Ingest INC tickets from CSV (raw-first) + geocode")
ap.add_argument("--apply", action="store_true", help="Write to DB (default: dry-run)")
ap.add_argument("--from-bucket", action="store_true",
help="Fetch latest.json for inc+crq from the rustfs tickets bucket (aws CLI)")
ap.add_argument("--inc-json", default=None, help="Local INC tickets JSON file")
ap.add_argument("--crq-json", default=None, help="Local CRQ tickets JSON file")
help="Ingest the newest INC csv from the rustfs tickets bucket (aws CLI); "
"skips if unchanged (ETag) and archives processed files")
ap.add_argument("--inc-csv", default=None, help="Local INC tickets CSV file (dev)")
ap.add_argument("--geocode-clusters", action="store_true",
help="Geocode distinct clusters into the gazetteer, then re-resolve geoms")
ap.add_argument("--geocode-locations", action="store_true",
@ -366,8 +526,8 @@ def main() -> None:
if args.geocode_locations:
geocode_locations(apply=args.apply)
return
if not (args.from_bucket or args.inc_json or args.crq_json):
ap.error("provide --from-bucket, --inc-json/--crq-json, --geocode-clusters, or --geocode-locations")
if not (args.from_bucket or args.inc_csv):
ap.error("provide --from-bucket, --inc-csv, --geocode-clusters, or --geocode-locations")
ingest(args)

View file

@ -0,0 +1,118 @@
-- 02_import_meta.sql — fleettickets · snapshot metadata + map freshness
-- ─────────────────────────────────────────────────────────────────────────────
-- The n8n S3 export now wraps each dataset in a metadata envelope
-- ({ "metadata": {...}, "records": [...] }; see n8n-s3-export-workflows.md).
-- We capture that envelope per dataset at ingest so the map can show how fresh
-- the snapshot is, and re-define reporting.fn_tickets_for_map (same signature —
-- dashboard_api unchanged) to expose it under summary.freshness.
--
-- Idempotent: safe on a fresh DB and re-appliable on the live DB.
-- ─────────────────────────────────────────────────────────────────────────────
SET search_path = tickets, public;
-- ── per-dataset snapshot metadata (one row per dataset; upserted each ingest) ─
CREATE TABLE IF NOT EXISTS tickets.import_meta (
dataset text PRIMARY KEY, -- 'inc' | 'crq'
export_type text, -- 'delta' | 'full'
exported_at timestamptz, -- metadata.exported_at (source)
snapshot_date date, -- metadata.snapshot_date (full runs)
source_schema text,
source_table text,
row_count integer, -- metadata.row_count (source count)
records_ingested integer, -- rows we actually read/upserted
n8n_execution_id text,
metadata jsonb, -- full envelope metadata (audit)
ingested_at timestamptz NOT NULL DEFAULT now()
);
-- ── read function — add summary.freshness (signature unchanged) ──────────────
CREATE OR REPLACE FUNCTION reporting.fn_tickets_for_map(
p_service_type text DEFAULT NULL,
p_status text DEFAULT NULL,
p_open_only boolean DEFAULT true
)
RETURNS jsonb LANGUAGE plpgsql STABLE AS $fn$
DECLARE v_result jsonb;
BEGIN
p_service_type := lower(NULLIF(p_service_type, ''));
p_status := NULLIF(p_status, '');
WITH filtered AS (
SELECT 'inc'::text AS service_type, raw, geom, geo_source FROM tickets.inc
WHERE geom IS NOT NULL
AND (p_service_type IS NULL OR p_service_type = 'inc')
AND (p_status IS NULL OR raw->>'normalized_status' = p_status)
AND (NOT p_open_only OR (raw->>'is_actionable')::boolean IS TRUE)
UNION ALL
SELECT 'crq'::text AS service_type, raw, geom, geo_source FROM tickets.crq
WHERE geom IS NOT NULL
AND (p_service_type IS NULL OR p_service_type = 'crq')
AND (p_status IS NULL OR raw->>'normalized_status' = p_status)
AND (NOT p_open_only OR (raw->>'is_actionable')::boolean IS TRUE)
)
SELECT jsonb_build_object(
'summary', jsonb_build_object(
'ticket_count', COUNT(*),
'inc', COUNT(*) FILTER (WHERE service_type = 'inc'),
'crq', COUNT(*) FILTER (WHERE service_type = 'crq'),
'open', COUNT(*) FILTER (WHERE (raw->>'is_actionable')::boolean IS TRUE),
'by_status', (SELECT jsonb_object_agg(s, c)
FROM (SELECT raw->>'normalized_status' AS s, COUNT(*) AS c
FROM filtered GROUP BY raw->>'normalized_status') z),
'freshness', (SELECT jsonb_object_agg(dataset, jsonb_build_object(
'export_type', export_type,
'exported_at', exported_at,
'snapshot_date', snapshot_date,
'row_count', row_count,
'records_ingested', records_ingested,
'ingested_at', ingested_at))
FROM tickets.import_meta
WHERE p_service_type IS NULL OR dataset = p_service_type)
),
'geojson', jsonb_build_object(
'type', 'FeatureCollection',
'features', COALESCE(jsonb_agg(
jsonb_build_object(
'type', 'Feature',
'properties', jsonb_build_object(
'ticket_id', raw->>'ticket_id',
'service_type', service_type,
'status', raw->>'normalized_status',
'raw_status', raw->>'raw_status',
'cluster', raw->>'cluster',
'region', raw->>'region',
'location_name', raw->>'location_name',
'department', raw->>'department',
'owner', raw->>'owner',
'assigned_team', raw->>'assigned_team',
'sla_status', raw->>'sla_status',
'is_actionable', (raw->>'is_actionable')::boolean,
'geo_source', geo_source,
'created_at', raw->>'created_at_service',
'scheduled_at', raw->>'scheduled_at'
),
'geometry', ST_AsGeoJSON(geom)::jsonb
)
), '[]'::jsonb)
)
) INTO v_result FROM filtered;
RETURN v_result;
END $fn$;
COMMENT ON FUNCTION reporting.fn_tickets_for_map(text, text, boolean) IS
'INC/CRQ tickets (tickets.inc + tickets.crq, raw-jsonb-first) as GeoJSON, with '
'summary.freshness from tickets.import_meta. fleettickets 02.';
-- ── grants (guarded: roles may not exist on a fresh DB) ───────────────────────
DO $grants$
BEGIN
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'tracksolid_owner') THEN
GRANT SELECT, INSERT, UPDATE, DELETE ON tickets.import_meta TO tracksolid_owner;
END IF;
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'dashboard_ro') THEN
GRANT SELECT ON tickets.import_meta TO dashboard_ro;
END IF;
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'grafana_ro') THEN
GRANT SELECT ON tickets.import_meta TO grafana_ro;
END IF;
END $grants$;

View file

@ -0,0 +1,156 @@
# n8n Hourly S3 Full-Data Exports
Updated on June 15, 2026.
## Overview
Two active n8n workflows export complete datasets to S3 every hour:
1. `FTTH Automation Ticket S3 Export`
2. `Fuel Records S3 Export`
Each execution creates CSV files only. Filenames use the actual execution time
in the `Africa/Nairobi` timezone.
No delta files, JSON files, `latest` files, `changes/` directories, `full/`
directories, or midnight-specific exports are created.
## Hourly Output
Together, the two workflows create exactly three files during their hourly
executions:
```text
automations/crq/YYYY-MM-DDTHH-mm-ss.csv
automations/inc/YYYY-MM-DDTHH-mm-ss.csv
fuel_records/YYYY-MM-DDTHH-mm-ss.csv
```
The CRQ and INC files are uploaded to the `tickets` bucket. The Fuel file is
uploaded to the `fuel` bucket.
## FTTH Automation Ticket S3 Export
Workflow ID: `JI3QkcJeHk9eYRsY`
The workflow:
1. Runs at the start of every hour using the `Africa/Nairobi` workflow timezone.
2. Creates one execution timestamp.
3. Calls the existing authenticated Scoreboard export endpoint with
`export_type: full`.
4. Reads all CRQ and INC rows returned by the endpoint.
5. Converts each complete dataset to CSV.
6. Uploads exactly two files:
- `automations/crq/<execution-timestamp>.csv`
- `automations/inc/<execution-timestamp>.csv`
7. Fails the execution if exactly two successful upload results are not
returned.
The workflow still has its existing manual webhook for operational testing.
## Fuel Records S3 Export
Workflow ID: `IP2KNAfFazAjTesh`
The workflow:
1. Runs at the start of every hour using the `Africa/Nairobi` workflow timezone.
2. Creates one execution timestamp.
3. Reads the complete `logistics_department.fuel_records` table.
4. Converts all returned rows to one CSV.
5. Uploads exactly one file:
- `fuel_records/<execution-timestamp>.csv`
6. Fails the execution if the S3 upload reports an error.
The workflow still has its existing manual webhook for operational testing.
## Timestamp Format
The timestamp format is:
```text
YYYY-MM-DDTHH-mm-ss
```
Example:
```text
2026-06-15T14-39-53
```
The timestamp is generated once at the start of each workflow execution and is
formatted in `Africa/Nairobi`.
## Credentials and Safety
- Existing n8n PostgreSQL, S3, workflow-variable, and API token configuration is
reused.
- No S3 credentials or API secrets are hard-coded in workflow code.
- Secrets are not included in workflow result messages.
- Source database queries are read-only.
- The workflows do not delete or update source database rows.
- S3 upload nodes retain retry handling. A failed hourly execution can also be
recovered naturally by the next full-data run.
## Removed Behavior
The workflows no longer contain:
- Delta export logic or stored delta pointers
- Midnight full-export schedules
- `latest.json` or `latest.csv`
- JSON output
- `changes/` keys
- `full/` keys
- Multipart or additional export files
- FTTH mark-sent state handling
## Deployment Status
Both workflows were saved, published, and activated on June 15, 2026.
Active versions:
```text
Fuel Records S3 Export:
60cf5824-9345-45bb-a2eb-3b20b877fd32
FTTH Automation Ticket S3 Export:
68b7be10-ac3a-43d8-8c17-b46a2cbb48d2
```
## Manual Test Evidence
### Fuel Records S3 Export
Execution ID: `404079`
Rows exported: `2001`
Exact S3 key:
```text
fuel_records/2026-06-15T14-39-50.csv
```
### FTTH Automation Ticket S3 Export
Execution ID: `404080`
Rows exported:
```text
CRQ: 12680
INC: 31434
```
Exact S3 keys:
```text
automations/crq/2026-06-15T14-39-53.csv
automations/inc/2026-06-15T14-39-53.csv
```
Both manual tests completed successfully. Their upload builders generated one
Fuel item and exactly two FTTH items, matching the required three output files.

256
n8n-s3-export-workflows.md Normal file
View file

@ -0,0 +1,256 @@
# n8n S3 Export Workflows
## Overview
Both workflows run in the `Africa/Nairobi` timezone and use the
`Rahamafresh Tickets S3` credential.
| Workflow | n8n ID | Source | Bucket |
| --- | --- | --- | --- |
| Fuel Records S3 Export | `IP2KNAfFazAjTesh` | `logistics_department.fuel_records` | `fuel` |
| FTTH Automation Ticket S3 Export | `JI3QkcJeHk9eYRsY` | `isp_department_crq.automations` and `isp_department_osp.automations` | `tickets` |
## Schedules
- Hourly delta export: `10` minutes after each hour, from `01:10` through
`23:10` (`0 10 1-23 * * *`).
- Daily full export: `00:05` (`0 5 0 * * *`).
- The `00:05` run exports rows up to the end of the previous local day. For
example, Wednesday's run exports the snapshot through Tuesday `23:59:59`.
- The full export has its own state and does not read or advance the hourly
delta pointer.
## Fuel Records S3 Export
### Source and change tracking
The source is `logistics_department.fuel_records`. The table has an indexed
`updated_at` column and an update trigger that refreshes it whenever a row is
changed.
Hourly runs select rows where:
```sql
updated_at > last_successful_delta_export_at
AND updated_at <= requested_at
```
The delta pointer advances to the maximum exported `updated_at` only after all
S3 uploads complete and the downloaded `latest.json` passes validation. A
failed or empty run does not move the pointer incorrectly.
The full run selects all rows created before local midnight, independently of
the delta pointer.
### Fuel object keys
Every successful run updates:
- `fuel_records/latest.json`
- `fuel_records/latest.csv`
Delta runs with changed rows also write:
- `fuel_records/changes/YYYY-MM-DDTHH-mm-ss.json`
- `fuel_records/changes/YYYY-MM-DDTHH-mm-ss.csv`
Full runs write:
- `fuel_records/full/YYYY-MM-DD.json`
- `fuel_records/full/YYYY-MM-DD.csv`
Exports larger than 5,000 rows additionally produce numbered JSON and CSV
parts such as `-part-0001`.
### Fuel state
Fuel state is stored in n8n workflow static data and is updated only after S3
read-back validation:
- `last_successful_delta_export_at`
- `last_successful_full_export_date`
- `rows_exported`
- `destination_key` and `destination_keys`
- `n8n_execution_id`
- `success`
- `error_message`
- `completed_at`
## FTTH Automation Ticket S3 Export
### Source and change tracking
The workflow requests an export package from the active SCOREBOARD service:
```text
POST /api/v1/ftth/automation-export/package
```
Datasets:
- CRQ: `isp_department_crq.automations`
- INC: `isp_department_osp.automations`
Hourly delta packages select records changed after the last successful delta
export. The SCOREBOARD service creates an export run before returning the
package. n8n uploads every returned object and then calls:
```text
POST /api/v1/ftth/automation-export/mark-sent
```
Only a `SUCCESS` acknowledgement advances the delta pointer. Upload failure
marks the run `FAILED` and preserves the previous pointer.
Full packages use the previous local date as `snapshot_date`, select the
complete current-state dataset through the previous day, and update only the
full-export date after successful upload.
### FTTH object keys
CRQ:
- `automations/crq/latest.json`
- `automations/crq/latest.csv`
- `automations/crq/changes/YYYY-MM-DDTHH-mm-ss.json`
- `automations/crq/changes/YYYY-MM-DDTHH-mm-ss.csv`
- `automations/crq/full/YYYY-MM-DD.json`
- `automations/crq/full/YYYY-MM-DD.csv`
INC:
- `automations/inc/latest.json`
- `automations/inc/latest.csv`
- `automations/inc/changes/YYYY-MM-DDTHH-mm-ss.json`
- `automations/inc/changes/YYYY-MM-DDTHH-mm-ss.csv`
- `automations/inc/full/YYYY-MM-DD.json`
- `automations/inc/full/YYYY-MM-DD.csv`
Exports larger than 5,000 rows additionally produce numbered JSON and CSV
parts.
### FTTH state
State and audit history are stored in
`ftth_automation.automation_export_runs`. Each run records:
- export type and requested timestamp
- last successful delta timestamp
- last successful full export date
- snapshot date
- row count
- destination object keys
- n8n execution ID
- status (`PENDING`, `SUCCESS`, or `FAILED`)
- completion timestamp and error summary
## File Contents
JSON files contain:
```json
{
"metadata": {
"exported_at": "...",
"export_type": "delta or full",
"source_schema": "...",
"source_table": "...",
"dataset": "crq, inc, or omitted for fuel",
"row_count": 0,
"last_successful_delta_export_at": "...",
"last_successful_full_export_date": "...",
"snapshot_date": "...",
"n8n_execution_id": "..."
},
"records": []
}
```
Fuel metadata names the previous and candidate delta pointers explicitly.
CSV files contain the same exported records as tabular rows with a header
line. CSV files do not contain the JSON metadata envelope.
## Manual Tests
Both workflows expose production POST webhooks.
Fuel:
```bash
curl -X POST \
-H 'Content-Type: application/json' \
-d '{"export_type":"delta"}' \
https://n8n.firesideafrica.cloud/webhook/fuel-records-s3-export
curl -X POST \
-H 'Content-Type: application/json' \
-d '{"export_type":"full"}' \
https://n8n.firesideafrica.cloud/webhook/fuel-records-s3-export
```
FTTH:
```bash
curl -X POST \
-H 'Content-Type: application/json' \
-d '{"export_type":"delta"}' \
https://n8n.firesideafrica.cloud/webhook/ftth-automation-s3-export
curl -X POST \
-H 'Content-Type: application/json' \
-d '{"export_type":"full","force":true}' \
https://n8n.firesideafrica.cloud/webhook/ftth-automation-s3-export
```
After a test:
1. Confirm the webhook response has `success: true`.
2. Open the execution ID in n8n and confirm every upload succeeded.
3. Confirm the response lists the expected bucket and destination keys.
4. Check the S3 object timestamps and inspect the JSON metadata and row count.
5. For FTTH, confirm the matching export run is `SUCCESS`.
### Production test record
Tests run on June 15, 2026:
| Workflow | Type | Execution | Result |
| --- | --- | --- | --- |
| Fuel Records S3 Export | Delta | `402524` | Success; 0 changed rows; latest JSON and CSV validated |
| Fuel Records S3 Export | Full | `402527` | Success; 1,965 rows; snapshot date `2026-06-14` |
| FTTH Automation Ticket S3 Export | Delta | `402530` | Success; CRQ and INC latest/change JSON and CSV written |
| FTTH Automation Ticket S3 Export | Full | `402536` | Success; 44,114 rows; snapshot date `2026-06-14`; 28 objects including batch parts |
## Troubleshooting
1. Check the n8n execution and identify whether the source query/package,
upload, read-back validation, or mark-sent step failed.
2. Confirm the `Rahamafresh Tickets S3` credential can write to the configured
bucket.
3. For fuel, inspect workflow static data. Do not manually advance
`last_successful_delta_export_at` after a failed run.
4. Verify `fuel_records.updated_at` is populated and its update trigger exists
if fuel changes are missing.
5. For FTTH, inspect `ftth_automation.automation_export_runs`, including
`status`, `destination_object_keys`, `n8n_execution_id`, and
`error_summary`.
6. Confirm the SCOREBOARD health endpoint is healthy and that the configured
export token and base URL are correct.
7. Re-run the appropriate manual webhook after fixing the failure. A failed
run leaves the last successful pointer unchanged, so the rows are retried.
## Published Version Check
In n8n, open each workflow and confirm:
- `Active` is enabled.
- The saved `versionId` equals `activeVersionId`.
- The trigger list contains the hourly schedule, daily `00:05` schedule, and
manual webhook.
- A new production webhook execution uses the same active version and returns
the expected destination keys.
Current published versions as of June 15, 2026:
- Fuel Records S3 Export: `6833e5e5-97a0-41be-8f82-9ec612de92ce`
- FTTH Automation Ticket S3 Export: `b2171088-eac2-439b-97e8-83dfa8117783`

27
run_ingest.sh Executable file
View file

@ -0,0 +1,27 @@
#!/usr/bin/env bash
# run_ingest.sh — fleettickets · hourly INC ingest wrapper for cron.
#
# Loads env from the local .env (DATABASE_URL + RUSTFS_* + GEOCODER_*) and runs the
# newest-INC-CSV ingest with --apply (skip-if-unchanged + archive are built in).
#
# Install on the instance (ingest at :15, 07:0019:00 EAT):
# 15 7-19 * * * /opt/fleettickets/run_ingest.sh >> /var/log/fleettickets-inc.log 2>&1
# Ensure the crontab runs in the Africa/Nairobi timezone (CRON_TZ=Africa/Nairobi or
# the host/container TZ), since the export filenames and the schedule are EAT.
set -euo pipefail
cd "$(dirname "$0")"
# Load .env if present (KEY=VALUE lines); never commit the real .env.
if [ -f .env ]; then
set -a
# shellcheck disable=SC1091
. ./.env
set +a
fi
# Prefer the project venv if it exists, else the python on PATH (e.g. in-container).
PY="python"
[ -x ".venv/bin/python" ] && PY=".venv/bin/python"
exec "$PY" import_tickets.py --from-bucket --apply