feat: INC hourly-CSV ingestion (newest-file, ETag dedup, clean + archive)
Rework import_tickets.py from the retired JSON `latest.json` model to the new hourly full-snapshot CSV export. Strictly INC (CRQ out of scope). - Ingest the newest automations/inc/<EAT-timestamp>.csv; skip-if-unchanged by comparing S3 ETag to tickets.import_meta.metadata.source_etag. - Upsert on ticket_id (PK; no dups, never delete -> closure history accrues). No truncate. On success, move processed files to automations/inc/processed/. - Clean at ingest: drop is_alarm=true + the "EXPORT STOPPED..." sentinel; drop week_*, source_s3_*/source_snapshot_id, department/source_type; lowercase region, uppercase raw_status; keep service_type + bucket. - Force path-style S3 addressing; --inc-csv for local dev; --from-bucket for cron. - Add migrations/02 (import_meta + freshness); refresh README/.env.example/docs. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
parent
4631cc6382
commit
df054c92be
7 changed files with 771 additions and 65 deletions
|
|
@ -3,7 +3,7 @@
|
|||
# Shared database (the `tickets` schema lives in tracksolid_db; internal Docker host)
|
||||
DATABASE_URL=postgresql://tracksolid_owner:<password>@timescale_db:5432/tracksolid_db
|
||||
|
||||
# rustfs / S3 — source ticket snapshots (automations/{inc,crq}/latest.json)
|
||||
# rustfs / S3 — source ticket snapshots (automations/inc/<EAT-timestamp>.csv)
|
||||
RUSTFS_ENDPOINT=https://s3.rahamafresh.com
|
||||
RUSTFS_ACCESS_KEY=<key>
|
||||
RUSTFS_SECRET_KEY=<secret>
|
||||
|
|
|
|||
1
.gitignore
vendored
1
.gitignore
vendored
|
|
@ -5,4 +5,5 @@ __pycache__/
|
|||
uv.lock
|
||||
*.json
|
||||
!.*.json
|
||||
*.csv
|
||||
.DS_Store
|
||||
|
|
|
|||
35
README.md
35
README.md
|
|
@ -1,18 +1,19 @@
|
|||
# fleettickets
|
||||
|
||||
Field-ops **INC / CRQ ticket** ingestion, geocoding, and read-schema that powers the
|
||||
Field-ops **INC ticket** ingestion, geocoding, and read-schema that powers the
|
||||
**Tickets** map in FleetOps. Extracted from the `tracksolid` repo into its own module
|
||||
(it previously lived there as migrations 21–23 + `tools/import_tickets.py`).
|
||||
|
||||
- **INC** — incident / customer-fault tickets
|
||||
- **CRQ** — new-installation requests
|
||||
- **INC** — incident / customer-fault tickets *(this pipeline is **strictly INC**)*
|
||||
- **CRQ** — new-installation requests *(schema kept, but **out of scope** — not ingested here)*
|
||||
|
||||
## What this owns
|
||||
|
||||
| Piece | What |
|
||||
|---|---|
|
||||
| `migrations/01_tickets_schema.sql` | The `tickets` schema: `tickets.inc` / `tickets.crq` (raw-jsonb-first), `tickets.geo_clusters` + `tickets.geo_locations` gazetteers, geom-resolution trigger, and `reporting.fn_tickets_for_map` (the GeoJSON read function) |
|
||||
| `import_tickets.py` | Pulls ticket snapshots from the rustfs `tickets` bucket and upserts them; geocodes clusters + INC locations |
|
||||
| `migrations/02_import_meta.sql` | `tickets.import_meta` (per-dataset snapshot envelope metadata) + `fn_tickets_for_map` re-defined to expose it as `summary.freshness` (same signature — dashboard_api unchanged) |
|
||||
| `import_tickets.py` | Ingests the **newest INC CSV** from the rustfs `tickets` bucket (`automations/inc/<EAT-timestamp>.csv`) and upserts on `ticket_id`; geocodes clusters + INC locations |
|
||||
| `run_migrations.py` | Applies `migrations/*.sql` in order (ledger: `tickets.schema_migrations`) |
|
||||
| `shared.py` | Minimal DB/logging helpers (self-contained — no tracksolid dependency) |
|
||||
|
||||
|
|
@ -49,23 +50,37 @@ python run_migrations.py # apply the schema (idempotent)
|
|||
## Run
|
||||
|
||||
```bash
|
||||
# ingest the latest snapshots from the bucket
|
||||
# ingest the newest INC CSV from the bucket (skip-if-unchanged, then archive)
|
||||
python import_tickets.py --from-bucket --apply
|
||||
|
||||
# geocode (needs GEOCODER_API_KEY)
|
||||
python import_tickets.py --geocode-clusters --apply # coarse, once
|
||||
python import_tickets.py --geocode-locations --apply # precise, actionable INC
|
||||
|
||||
# from local files instead of the bucket
|
||||
python import_tickets.py --inc-json inc.json --crq-json crq.json --apply
|
||||
# from a local CSV instead of the bucket (dev)
|
||||
python import_tickets.py --inc-csv 2026-06-15T17-00-00.csv --apply
|
||||
```
|
||||
|
||||
Dry-run is the default (omit `--apply`). `import_tickets.py --from-bucket` shells out to
|
||||
the `aws` CLI using the `RUSTFS_*` env (no boto3 dependency).
|
||||
the `aws` CLI using the `RUSTFS_*` env (no boto3 dependency). Hourly on the instance via
|
||||
cron `5 * * * *` (a few minutes after each export).
|
||||
|
||||
## Notes
|
||||
|
||||
- The `changes/` subdirectory in the bucket holds **full timestamped snapshots** (not
|
||||
deltas) — ingest `latest.json` only; don't process `changes/`.
|
||||
- The n8n export writes a **full current-state CSV per hour** to
|
||||
`automations/inc/<EAT-timestamp>.csv` — no `latest` pointer, no metadata envelope, no
|
||||
deltas. The loader lists the prefix, takes the **newest** file, and ingests it.
|
||||
- **Skip-if-unchanged:** the newest file's S3 **ETag** is compared to the last processed
|
||||
file's ETag (stored in `tickets.import_meta.metadata.source_etag`); if equal, the DB write
|
||||
is skipped (the export re-emits byte-identical content most hours).
|
||||
- **Upsert on `ticket_id`** (PRIMARY KEY) — duplication is impossible; rows are never
|
||||
deleted, so closed-ticket history accumulates. On success the file is **moved** to
|
||||
`automations/inc/processed/`.
|
||||
- **Cleaning at ingest:** drop `is_alarm=true` rows + the `EXPORT STOPPED…` sentinel; drop
|
||||
`week_start`/`week_end`, `source_s3_*`/`source_snapshot_id`, `department`/`source_type`;
|
||||
normalize `region` → lowercase and `raw_status` → UPPERCASE. `service_type` and `bucket`
|
||||
(a `closed`/`pending` flag) are kept.
|
||||
- `tickets.import_meta` captures snapshot freshness (surfaced as `summary.freshness` by
|
||||
`fn_tickets_for_map`).
|
||||
- The curated/geocoded coordinates are written `verified = false` — review
|
||||
`tickets.geo_clusters` / `tickets.geo_locations` and flip `verified` once checked.
|
||||
|
|
|
|||
|
|
@ -1,18 +1,31 @@
|
|||
"""
|
||||
import_tickets.py — Fireside Communications · INC/CRQ ticket ingestion (raw-first)
|
||||
import_tickets.py — Fireside Communications · INC ticket ingestion (raw-first)
|
||||
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||||
Loads the client's field-ops ticket snapshots into the `tickets` schema — the
|
||||
source of the FleetOps "Tickets" map. Two categories, one table each:
|
||||
Loads the client's field-ops INC ticket snapshots into the `tickets` schema — the
|
||||
source of the FleetOps "Tickets" map.
|
||||
tickets.inc — incidents / customer faults
|
||||
tickets.crq — new-installation requests
|
||||
|
||||
RAW-FIRST: each row stores only `ticket_id` + `raw` (the full source record as
|
||||
jsonb). Everything downstream reads from `raw` (resilient to source schema drift).
|
||||
The DB derives `geom` (see migrations): feed coords (raw lat/lng) -> location geocode
|
||||
STRICTLY INC: CRQ (new-installation) exports are out of scope and not processed
|
||||
here. `tickets.crq` stays in the schema but is not fed by this pipeline.
|
||||
|
||||
RAW-FIRST: each row stores only `ticket_id` + `raw` (the source record as jsonb).
|
||||
Everything downstream reads from `raw` (resilient to source schema drift). The DB
|
||||
derives `geom` (see migrations): feed coords (raw lat/lng) -> location geocode
|
||||
(tickets.geo_locations) -> cluster centroid (tickets.geo_clusters) -> none.
|
||||
|
||||
Source data: rustfs `tickets` bucket, full snapshots from the client's email
|
||||
automation — automations/{inc,crq}/latest.json (array of 32-key objects).
|
||||
Source data: the n8n hourly S3 export (see n8n-hourly-s3-full-data-exports.md)
|
||||
writes a full current-state snapshot CSV per hour to the `tickets` bucket at
|
||||
automations/inc/<EAT-timestamp>.csv (e.g. 2026-06-15T17-00-00.csv)
|
||||
There is NO latest pointer, NO metadata envelope, and NO deltas — each file is a
|
||||
flat CSV (header + rows). We ingest the NEWEST file:
|
||||
- skip-if-unchanged: if its S3 ETag matches the last processed file's ETag we
|
||||
skip the DB write (the export re-emits byte-identical content most hours);
|
||||
- drop is_alarm=true rows + the "EXPORT STOPPED…" truncation-sentinel row;
|
||||
- drop derivable / provenance / zero-info columns (see DROP_FIELDS);
|
||||
- normalize region -> lowercase, raw_status -> UPPERCASE;
|
||||
- upsert on ticket_id (PRIMARY KEY → no duplication; never delete, so closure
|
||||
history accumulates), and record snapshot freshness in tickets.import_meta;
|
||||
- on success, MOVE the file to automations/inc/processed/ (copy + delete).
|
||||
|
||||
Geocoding (two layers, both via a KEYED provider — public Nominatim rate-limits):
|
||||
--geocode-clusters one coordinate per cluster (coarse fallback; ~50 lookups)
|
||||
|
|
@ -24,7 +37,7 @@ Provider/key from env: GEOCODER_PROVIDER (locationiq|opencage), GEOCODER_API_KEY
|
|||
|
||||
Usage (needs DATABASE_URL + RUSTFS_* + GEOCODER_* env; see .env.example):
|
||||
python import_tickets.py --from-bucket --apply
|
||||
python import_tickets.py --inc-json inc.json --crq-json crq.json --apply
|
||||
python import_tickets.py --inc-csv 2026-06-15T17-00-00.csv --apply
|
||||
python import_tickets.py --geocode-clusters --apply
|
||||
python import_tickets.py --geocode-locations --apply
|
||||
|
||||
|
|
@ -36,12 +49,15 @@ geo_clusters + geo_locations + reporting.fn_tickets_for_map.
|
|||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import csv
|
||||
import io
|
||||
import json
|
||||
import math
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import time
|
||||
from datetime import datetime, timezone, timedelta
|
||||
|
||||
import requests
|
||||
import psycopg2.extras
|
||||
|
|
@ -50,9 +66,31 @@ from shared import clean, get_conn, get_logger
|
|||
|
||||
log = get_logger("import_tickets")
|
||||
|
||||
TABLE_FOR = {"inc": "tickets.inc", "crq": "tickets.crq"}
|
||||
# ── INC ingestion config ──────────────────────────────────────────────────────
|
||||
_TABLE = "tickets.inc"
|
||||
_DATASET = "inc"
|
||||
_BUCKET = os.getenv("TICKETS_BUCKET", "tickets")
|
||||
_BUCKET_KEYS = {"inc": "automations/inc/latest.json", "crq": "automations/crq/latest.json"}
|
||||
_INC_PREFIX = "automations/inc/"
|
||||
_PROCESSED_PREFIX = "automations/inc/processed/"
|
||||
_EAT = timezone(timedelta(hours=3)) # Africa/Nairobi — filenames + data are EAT
|
||||
|
||||
# Garbage row the source leaks (commonly the first data line): its ticket_id is the
|
||||
# message itself. Matched by prefix so position/exact-tail don't matter.
|
||||
_SENTINEL_PREFIX = "EXPORT STOPPED"
|
||||
|
||||
# Columns dropped before building `raw`: derivable (week_*), the client's row-level
|
||||
# export provenance (source_s3_*, source_snapshot_id), and zero-information columns
|
||||
# (department=always FTTH, source_type=duplicate of service_type). We KEEP
|
||||
# service_type and `bucket` (the latter is a closed/pending lifecycle flag).
|
||||
DROP_FIELDS = frozenset({
|
||||
"week_start", "week_end",
|
||||
"source_s3_bucket", "source_s3_key", "source_snapshot_id",
|
||||
"department", "source_type",
|
||||
})
|
||||
|
||||
# Only files matching automations/inc/<EAT-timestamp>.csv (NOT processed/, NOT the
|
||||
# leftover latest.csv/, latest.json/, full/ prefixes).
|
||||
_CSV_KEY_RE = re.compile(r"^automations/inc/(\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2})\.csv$")
|
||||
|
||||
# Geocoder (keyed) — public Nominatim rate-limits bulk, so we use LocationIQ/OpenCage.
|
||||
_PROVIDER = os.getenv("GEOCODER_PROVIDER", "locationiq").lower()
|
||||
|
|
@ -61,68 +99,190 @@ _GEOCODE_INTERVAL_S = float(os.getenv("GEOCODER_MIN_INTERVAL_S", "1.1"))
|
|||
_last_geocode_at = 0.0
|
||||
|
||||
|
||||
# ── data loading ──────────────────────────────────────────────────────────────
|
||||
def _load_local(path: str) -> list[dict]:
|
||||
with open(path, encoding="utf-8") as f:
|
||||
data = json.load(f) # json.loads accepts NaN by default
|
||||
return data if isinstance(data, list) else []
|
||||
|
||||
|
||||
def _load_bucket(kind: str) -> list[dict]:
|
||||
env = {
|
||||
# ── data loading (CSV · newest-file · ETag skip-if-unchanged) ───────────────────
|
||||
# The n8n hourly export writes a full current-state CSV per hour to
|
||||
# automations/inc/<EAT-timestamp>.csv (no latest pointer, no envelope, no deltas).
|
||||
# We ingest the NEWEST file; if its S3 ETag matches the last processed file's ETag
|
||||
# we skip the DB write (the export re-emits byte-identical content most hours).
|
||||
def _s3_env() -> dict:
|
||||
return {
|
||||
**os.environ,
|
||||
"AWS_ACCESS_KEY_ID": os.environ["RUSTFS_ACCESS_KEY"],
|
||||
"AWS_SECRET_ACCESS_KEY": os.environ["RUSTFS_SECRET_KEY"],
|
||||
"AWS_DEFAULT_REGION": os.getenv("RUSTFS_REGION", "us-east-1"),
|
||||
"AWS_S3_ADDRESSING_STYLE": "path", # force path-style to match the rustfs endpoint
|
||||
}
|
||||
uri = f"s3://{_BUCKET}/{_BUCKET_KEYS[kind]}"
|
||||
log.info("fetching %s", uri)
|
||||
out = subprocess.run(
|
||||
["aws", "--endpoint-url", os.environ["RUSTFS_ENDPOINT"], "s3", "cp", uri, "-"],
|
||||
|
||||
|
||||
def _aws(args: list[str], env: dict) -> bytes:
|
||||
return subprocess.run(
|
||||
["aws", "--endpoint-url", os.environ["RUSTFS_ENDPOINT"], *args],
|
||||
env=env, capture_output=True, timeout=180, check=True,
|
||||
).stdout
|
||||
data = json.loads(out.decode("utf-8"))
|
||||
return data if isinstance(data, list) else []
|
||||
|
||||
|
||||
def _ts_from_key(key: str) -> datetime | None:
|
||||
"""EAT timestamp embedded in an automations/inc/<ts>.csv key (or None)."""
|
||||
m = _CSV_KEY_RE.match(key)
|
||||
if not m:
|
||||
return None
|
||||
return datetime.strptime(m.group(1), "%Y-%m-%dT%H-%M-%S").replace(tzinfo=_EAT)
|
||||
|
||||
|
||||
def _list_inc_csvs(env: dict) -> list[tuple[str, str]]:
|
||||
"""[(key, etag)] for every automations/inc/<ts>.csv (excludes processed/ + dirs)."""
|
||||
out = _aws(
|
||||
["s3api", "list-objects-v2", "--bucket", _BUCKET, "--prefix", _INC_PREFIX,
|
||||
"--query", "Contents[].{Key:Key,ETag:ETag}", "--output", "json"],
|
||||
env,
|
||||
).decode("utf-8").strip()
|
||||
items = json.loads(out) if out and out != "None" else []
|
||||
return [
|
||||
(it["Key"], (it.get("ETag") or "").strip('"'))
|
||||
for it in (items or []) if _CSV_KEY_RE.match(it.get("Key", ""))
|
||||
]
|
||||
|
||||
|
||||
def _last_processed_etag() -> str | None:
|
||||
"""ETag of the most recently ingested INC file (from tickets.import_meta)."""
|
||||
with get_conn() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"SELECT metadata->>'source_etag' FROM tickets.import_meta WHERE dataset = %s",
|
||||
(_DATASET,),
|
||||
)
|
||||
row = cur.fetchone()
|
||||
return row[0] if row else None
|
||||
|
||||
|
||||
def _parse_csv(text: str) -> list[dict]:
|
||||
return list(csv.DictReader(io.StringIO(text)))
|
||||
|
||||
|
||||
def _load_csv_local(path: str) -> list[dict]:
|
||||
with open(path, encoding="utf-8", newline="") as f:
|
||||
return list(csv.DictReader(f))
|
||||
|
||||
|
||||
def _move_processed(keys: list[str], env: dict) -> None:
|
||||
"""Archive listed INC csv objects to automations/inc/processed/ (S3 mv = copy+delete)."""
|
||||
for key in keys:
|
||||
dst = _PROCESSED_PREFIX + key.rsplit("/", 1)[-1]
|
||||
_aws(["s3", "mv", f"s3://{_BUCKET}/{key}", f"s3://{_BUCKET}/{dst}"], env)
|
||||
log.info("archived %s -> %s", key, dst)
|
||||
|
||||
|
||||
# ── row preparation (filter · drop columns · normalize) ─────────────────────────
|
||||
def _keep_row(row: dict) -> bool:
|
||||
"""Drop alarm rows + the truncation-sentinel; require a real ticket_id."""
|
||||
tid = clean(row.get("ticket_id"))
|
||||
if not tid or tid.startswith(_SENTINEL_PREFIX):
|
||||
return False
|
||||
return clean(row.get("is_alarm")) != "true"
|
||||
|
||||
|
||||
def _prepare(row: dict) -> dict:
|
||||
"""Strip DROP_FIELDS and normalize region/raw_status — returns the `raw` payload."""
|
||||
r = {k: v for k, v in row.items() if k not in DROP_FIELDS}
|
||||
if r.get("region"):
|
||||
r["region"] = r["region"].lower()
|
||||
if r.get("raw_status"):
|
||||
r["raw_status"] = r["raw_status"].upper()
|
||||
return r
|
||||
|
||||
|
||||
# ── upsert (raw-first) ────────────────────────────────────────────────────────
|
||||
def _scrub_nan(row: dict) -> dict:
|
||||
# Postgres jsonb rejects the JSON `NaN` token (e.g. mttr) — scrub to null.
|
||||
return {k: (None if isinstance(v, float) and math.isnan(v) else v) for k, v in row.items()}
|
||||
def _record_meta(meta: dict, records_ingested: int) -> None:
|
||||
"""Upsert the INC snapshot metadata (powers map freshness + holds source_etag)."""
|
||||
with get_conn() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"""INSERT INTO tickets.import_meta
|
||||
(dataset, export_type, exported_at, snapshot_date, source_schema,
|
||||
source_table, row_count, records_ingested, n8n_execution_id, metadata,
|
||||
ingested_at)
|
||||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, now())
|
||||
ON CONFLICT (dataset) DO UPDATE
|
||||
SET export_type = EXCLUDED.export_type, exported_at = EXCLUDED.exported_at,
|
||||
snapshot_date = EXCLUDED.snapshot_date, source_schema = EXCLUDED.source_schema,
|
||||
source_table = EXCLUDED.source_table, row_count = EXCLUDED.row_count,
|
||||
records_ingested = EXCLUDED.records_ingested,
|
||||
n8n_execution_id = EXCLUDED.n8n_execution_id, metadata = EXCLUDED.metadata,
|
||||
ingested_at = now()""",
|
||||
(_DATASET, clean(meta.get("export_type")), clean(meta.get("exported_at")),
|
||||
clean(meta.get("snapshot_date")), clean(meta.get("source_schema")),
|
||||
clean(meta.get("source_table")), meta.get("row_count"), records_ingested,
|
||||
clean(meta.get("n8n_execution_id")), psycopg2.extras.Json(meta)),
|
||||
)
|
||||
|
||||
|
||||
def upsert(rows: list[dict], table: str, apply: bool) -> int:
|
||||
payload = [
|
||||
(tid, psycopg2.extras.Json(_scrub_nan(r)))
|
||||
for r in rows
|
||||
if (tid := clean(r.get("ticket_id")))
|
||||
]
|
||||
log.info("%s: %d valid rows (skipped %d without ticket_id)",
|
||||
table, len(payload), len(rows) - len(payload))
|
||||
def upsert(rows: list[dict], apply: bool, meta: dict | None = None) -> int:
|
||||
meta = meta or {}
|
||||
kept = [r for r in rows if _keep_row(r)]
|
||||
payload = [(clean(r["ticket_id"]), psycopg2.extras.Json(_prepare(r))) for r in kept]
|
||||
log.info("%s: %d rows read, %d kept, %d dropped (alarm/sentinel/no-id)",
|
||||
_TABLE, len(rows), len(payload), len(rows) - len(payload))
|
||||
if not apply:
|
||||
log.info("DRY-RUN — nothing written to %s. Use --apply.", table)
|
||||
log.info("DRY-RUN — nothing written to %s. Use --apply.", _TABLE)
|
||||
return len(payload)
|
||||
with get_conn() as conn:
|
||||
with conn.cursor() as cur:
|
||||
psycopg2.extras.execute_values(
|
||||
cur,
|
||||
f"INSERT INTO {table} (ticket_id, raw) VALUES %s "
|
||||
f"INSERT INTO {_TABLE} (ticket_id, raw) VALUES %s "
|
||||
"ON CONFLICT (ticket_id) DO UPDATE SET raw = EXCLUDED.raw, ingested_at = now()",
|
||||
payload, page_size=500,
|
||||
)
|
||||
log.info("upserted %d rows into %s", len(payload), table)
|
||||
_record_meta(meta, len(payload))
|
||||
log.info("upserted %d rows into %s", len(payload), _TABLE)
|
||||
return len(payload)
|
||||
|
||||
|
||||
def ingest(args) -> None:
|
||||
if args.from_bucket:
|
||||
for kind in ("inc", "crq"):
|
||||
upsert(_load_bucket(kind), TABLE_FOR[kind], args.apply)
|
||||
# Local-file path (dev): ingest a single CSV, no bucket / no archive.
|
||||
if args.inc_csv:
|
||||
rows = _load_csv_local(args.inc_csv)
|
||||
name = os.path.basename(args.inc_csv)
|
||||
ts = _ts_from_key(_INC_PREFIX + name)
|
||||
meta = {"export_type": "full", "source_s3_key": name, "row_count": len(rows)}
|
||||
if ts:
|
||||
meta["exported_at"] = ts.isoformat()
|
||||
upsert(rows, args.apply, meta=meta)
|
||||
return
|
||||
|
||||
# --from-bucket: newest INC csv → skip-if-unchanged → ingest → archive.
|
||||
env = _s3_env()
|
||||
listing = _list_inc_csvs(env)
|
||||
if not listing:
|
||||
log.info("no INC csv files under %s — nothing to do", _INC_PREFIX)
|
||||
return
|
||||
listing.sort(key=lambda ke: _ts_from_key(ke[0]) or datetime.min.replace(tzinfo=_EAT))
|
||||
all_keys = [k for k, _ in listing]
|
||||
newest_key, newest_etag = listing[-1]
|
||||
log.info("newest INC file: %s (etag=%s; %d file(s) present)",
|
||||
newest_key, newest_etag, len(listing))
|
||||
|
||||
last_etag = _last_processed_etag()
|
||||
if newest_etag and newest_etag == last_etag:
|
||||
log.info("etag unchanged from last processed (%s) — skipping DB write", last_etag)
|
||||
if args.apply:
|
||||
_move_processed(all_keys, env)
|
||||
else:
|
||||
log.info("DRY-RUN — would archive %d file(s) to %s", len(all_keys), _PROCESSED_PREFIX)
|
||||
return
|
||||
|
||||
text = _aws(["s3", "cp", f"s3://{_BUCKET}/{newest_key}", "-"], env).decode("utf-8")
|
||||
rows = _parse_csv(text)
|
||||
ts = _ts_from_key(newest_key)
|
||||
meta = {"export_type": "full", "source_s3_key": newest_key,
|
||||
"source_etag": newest_etag, "row_count": len(rows)}
|
||||
if ts:
|
||||
meta["exported_at"] = ts.isoformat()
|
||||
upsert(rows, args.apply, meta=meta)
|
||||
if args.apply:
|
||||
_move_processed(all_keys, env)
|
||||
else:
|
||||
if args.inc_json:
|
||||
upsert(_load_local(args.inc_json), TABLE_FOR["inc"], args.apply)
|
||||
if args.crq_json:
|
||||
upsert(_load_local(args.crq_json), TABLE_FOR["crq"], args.apply)
|
||||
log.info("DRY-RUN — would archive %d file(s) to %s", len(all_keys), _PROCESSED_PREFIX)
|
||||
|
||||
|
||||
# ── place extraction (strip network codes, keep the real place) ───────────────
|
||||
|
|
@ -348,12 +508,12 @@ def _resolve() -> int:
|
|||
|
||||
# ── entrypoint ────────────────────────────────────────────────────────────────
|
||||
def main() -> None:
|
||||
ap = argparse.ArgumentParser(description="Ingest INC/CRQ tickets (raw-first) + geocode")
|
||||
ap = argparse.ArgumentParser(description="Ingest INC tickets from CSV (raw-first) + geocode")
|
||||
ap.add_argument("--apply", action="store_true", help="Write to DB (default: dry-run)")
|
||||
ap.add_argument("--from-bucket", action="store_true",
|
||||
help="Fetch latest.json for inc+crq from the rustfs tickets bucket (aws CLI)")
|
||||
ap.add_argument("--inc-json", default=None, help="Local INC tickets JSON file")
|
||||
ap.add_argument("--crq-json", default=None, help="Local CRQ tickets JSON file")
|
||||
help="Ingest the newest INC csv from the rustfs tickets bucket (aws CLI); "
|
||||
"skips if unchanged (ETag) and archives processed files")
|
||||
ap.add_argument("--inc-csv", default=None, help="Local INC tickets CSV file (dev)")
|
||||
ap.add_argument("--geocode-clusters", action="store_true",
|
||||
help="Geocode distinct clusters into the gazetteer, then re-resolve geoms")
|
||||
ap.add_argument("--geocode-locations", action="store_true",
|
||||
|
|
@ -366,8 +526,8 @@ def main() -> None:
|
|||
if args.geocode_locations:
|
||||
geocode_locations(apply=args.apply)
|
||||
return
|
||||
if not (args.from_bucket or args.inc_json or args.crq_json):
|
||||
ap.error("provide --from-bucket, --inc-json/--crq-json, --geocode-clusters, or --geocode-locations")
|
||||
if not (args.from_bucket or args.inc_csv):
|
||||
ap.error("provide --from-bucket, --inc-csv, --geocode-clusters, or --geocode-locations")
|
||||
ingest(args)
|
||||
|
||||
|
||||
|
|
|
|||
118
migrations/02_import_meta.sql
Normal file
118
migrations/02_import_meta.sql
Normal file
|
|
@ -0,0 +1,118 @@
|
|||
-- 02_import_meta.sql — fleettickets · snapshot metadata + map freshness
|
||||
-- ─────────────────────────────────────────────────────────────────────────────
|
||||
-- The n8n S3 export now wraps each dataset in a metadata envelope
|
||||
-- ({ "metadata": {...}, "records": [...] }; see n8n-s3-export-workflows.md).
|
||||
-- We capture that envelope per dataset at ingest so the map can show how fresh
|
||||
-- the snapshot is, and re-define reporting.fn_tickets_for_map (same signature —
|
||||
-- dashboard_api unchanged) to expose it under summary.freshness.
|
||||
--
|
||||
-- Idempotent: safe on a fresh DB and re-appliable on the live DB.
|
||||
-- ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
SET search_path = tickets, public;
|
||||
|
||||
-- ── per-dataset snapshot metadata (one row per dataset; upserted each ingest) ─
|
||||
CREATE TABLE IF NOT EXISTS tickets.import_meta (
|
||||
dataset text PRIMARY KEY, -- 'inc' | 'crq'
|
||||
export_type text, -- 'delta' | 'full'
|
||||
exported_at timestamptz, -- metadata.exported_at (source)
|
||||
snapshot_date date, -- metadata.snapshot_date (full runs)
|
||||
source_schema text,
|
||||
source_table text,
|
||||
row_count integer, -- metadata.row_count (source count)
|
||||
records_ingested integer, -- rows we actually read/upserted
|
||||
n8n_execution_id text,
|
||||
metadata jsonb, -- full envelope metadata (audit)
|
||||
ingested_at timestamptz NOT NULL DEFAULT now()
|
||||
);
|
||||
|
||||
-- ── read function — add summary.freshness (signature unchanged) ──────────────
|
||||
CREATE OR REPLACE FUNCTION reporting.fn_tickets_for_map(
|
||||
p_service_type text DEFAULT NULL,
|
||||
p_status text DEFAULT NULL,
|
||||
p_open_only boolean DEFAULT true
|
||||
)
|
||||
RETURNS jsonb LANGUAGE plpgsql STABLE AS $fn$
|
||||
DECLARE v_result jsonb;
|
||||
BEGIN
|
||||
p_service_type := lower(NULLIF(p_service_type, ''));
|
||||
p_status := NULLIF(p_status, '');
|
||||
WITH filtered AS (
|
||||
SELECT 'inc'::text AS service_type, raw, geom, geo_source FROM tickets.inc
|
||||
WHERE geom IS NOT NULL
|
||||
AND (p_service_type IS NULL OR p_service_type = 'inc')
|
||||
AND (p_status IS NULL OR raw->>'normalized_status' = p_status)
|
||||
AND (NOT p_open_only OR (raw->>'is_actionable')::boolean IS TRUE)
|
||||
UNION ALL
|
||||
SELECT 'crq'::text AS service_type, raw, geom, geo_source FROM tickets.crq
|
||||
WHERE geom IS NOT NULL
|
||||
AND (p_service_type IS NULL OR p_service_type = 'crq')
|
||||
AND (p_status IS NULL OR raw->>'normalized_status' = p_status)
|
||||
AND (NOT p_open_only OR (raw->>'is_actionable')::boolean IS TRUE)
|
||||
)
|
||||
SELECT jsonb_build_object(
|
||||
'summary', jsonb_build_object(
|
||||
'ticket_count', COUNT(*),
|
||||
'inc', COUNT(*) FILTER (WHERE service_type = 'inc'),
|
||||
'crq', COUNT(*) FILTER (WHERE service_type = 'crq'),
|
||||
'open', COUNT(*) FILTER (WHERE (raw->>'is_actionable')::boolean IS TRUE),
|
||||
'by_status', (SELECT jsonb_object_agg(s, c)
|
||||
FROM (SELECT raw->>'normalized_status' AS s, COUNT(*) AS c
|
||||
FROM filtered GROUP BY raw->>'normalized_status') z),
|
||||
'freshness', (SELECT jsonb_object_agg(dataset, jsonb_build_object(
|
||||
'export_type', export_type,
|
||||
'exported_at', exported_at,
|
||||
'snapshot_date', snapshot_date,
|
||||
'row_count', row_count,
|
||||
'records_ingested', records_ingested,
|
||||
'ingested_at', ingested_at))
|
||||
FROM tickets.import_meta
|
||||
WHERE p_service_type IS NULL OR dataset = p_service_type)
|
||||
),
|
||||
'geojson', jsonb_build_object(
|
||||
'type', 'FeatureCollection',
|
||||
'features', COALESCE(jsonb_agg(
|
||||
jsonb_build_object(
|
||||
'type', 'Feature',
|
||||
'properties', jsonb_build_object(
|
||||
'ticket_id', raw->>'ticket_id',
|
||||
'service_type', service_type,
|
||||
'status', raw->>'normalized_status',
|
||||
'raw_status', raw->>'raw_status',
|
||||
'cluster', raw->>'cluster',
|
||||
'region', raw->>'region',
|
||||
'location_name', raw->>'location_name',
|
||||
'department', raw->>'department',
|
||||
'owner', raw->>'owner',
|
||||
'assigned_team', raw->>'assigned_team',
|
||||
'sla_status', raw->>'sla_status',
|
||||
'is_actionable', (raw->>'is_actionable')::boolean,
|
||||
'geo_source', geo_source,
|
||||
'created_at', raw->>'created_at_service',
|
||||
'scheduled_at', raw->>'scheduled_at'
|
||||
),
|
||||
'geometry', ST_AsGeoJSON(geom)::jsonb
|
||||
)
|
||||
), '[]'::jsonb)
|
||||
)
|
||||
) INTO v_result FROM filtered;
|
||||
RETURN v_result;
|
||||
END $fn$;
|
||||
|
||||
COMMENT ON FUNCTION reporting.fn_tickets_for_map(text, text, boolean) IS
|
||||
'INC/CRQ tickets (tickets.inc + tickets.crq, raw-jsonb-first) as GeoJSON, with '
|
||||
'summary.freshness from tickets.import_meta. fleettickets 02.';
|
||||
|
||||
-- ── grants (guarded: roles may not exist on a fresh DB) ───────────────────────
|
||||
DO $grants$
|
||||
BEGIN
|
||||
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'tracksolid_owner') THEN
|
||||
GRANT SELECT, INSERT, UPDATE, DELETE ON tickets.import_meta TO tracksolid_owner;
|
||||
END IF;
|
||||
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'dashboard_ro') THEN
|
||||
GRANT SELECT ON tickets.import_meta TO dashboard_ro;
|
||||
END IF;
|
||||
IF EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'grafana_ro') THEN
|
||||
GRANT SELECT ON tickets.import_meta TO grafana_ro;
|
||||
END IF;
|
||||
END $grants$;
|
||||
156
n8n-hourly-s3-full-data-exports.md
Normal file
156
n8n-hourly-s3-full-data-exports.md
Normal file
|
|
@ -0,0 +1,156 @@
|
|||
# n8n Hourly S3 Full-Data Exports
|
||||
|
||||
Updated on June 15, 2026.
|
||||
|
||||
## Overview
|
||||
|
||||
Two active n8n workflows export complete datasets to S3 every hour:
|
||||
|
||||
1. `FTTH Automation Ticket S3 Export`
|
||||
2. `Fuel Records S3 Export`
|
||||
|
||||
Each execution creates CSV files only. Filenames use the actual execution time
|
||||
in the `Africa/Nairobi` timezone.
|
||||
|
||||
No delta files, JSON files, `latest` files, `changes/` directories, `full/`
|
||||
directories, or midnight-specific exports are created.
|
||||
|
||||
## Hourly Output
|
||||
|
||||
Together, the two workflows create exactly three files during their hourly
|
||||
executions:
|
||||
|
||||
```text
|
||||
automations/crq/YYYY-MM-DDTHH-mm-ss.csv
|
||||
automations/inc/YYYY-MM-DDTHH-mm-ss.csv
|
||||
fuel_records/YYYY-MM-DDTHH-mm-ss.csv
|
||||
```
|
||||
|
||||
The CRQ and INC files are uploaded to the `tickets` bucket. The Fuel file is
|
||||
uploaded to the `fuel` bucket.
|
||||
|
||||
## FTTH Automation Ticket S3 Export
|
||||
|
||||
Workflow ID: `JI3QkcJeHk9eYRsY`
|
||||
|
||||
The workflow:
|
||||
|
||||
1. Runs at the start of every hour using the `Africa/Nairobi` workflow timezone.
|
||||
2. Creates one execution timestamp.
|
||||
3. Calls the existing authenticated Scoreboard export endpoint with
|
||||
`export_type: full`.
|
||||
4. Reads all CRQ and INC rows returned by the endpoint.
|
||||
5. Converts each complete dataset to CSV.
|
||||
6. Uploads exactly two files:
|
||||
- `automations/crq/<execution-timestamp>.csv`
|
||||
- `automations/inc/<execution-timestamp>.csv`
|
||||
7. Fails the execution if exactly two successful upload results are not
|
||||
returned.
|
||||
|
||||
The workflow still has its existing manual webhook for operational testing.
|
||||
|
||||
## Fuel Records S3 Export
|
||||
|
||||
Workflow ID: `IP2KNAfFazAjTesh`
|
||||
|
||||
The workflow:
|
||||
|
||||
1. Runs at the start of every hour using the `Africa/Nairobi` workflow timezone.
|
||||
2. Creates one execution timestamp.
|
||||
3. Reads the complete `logistics_department.fuel_records` table.
|
||||
4. Converts all returned rows to one CSV.
|
||||
5. Uploads exactly one file:
|
||||
- `fuel_records/<execution-timestamp>.csv`
|
||||
6. Fails the execution if the S3 upload reports an error.
|
||||
|
||||
The workflow still has its existing manual webhook for operational testing.
|
||||
|
||||
## Timestamp Format
|
||||
|
||||
The timestamp format is:
|
||||
|
||||
```text
|
||||
YYYY-MM-DDTHH-mm-ss
|
||||
```
|
||||
|
||||
Example:
|
||||
|
||||
```text
|
||||
2026-06-15T14-39-53
|
||||
```
|
||||
|
||||
The timestamp is generated once at the start of each workflow execution and is
|
||||
formatted in `Africa/Nairobi`.
|
||||
|
||||
## Credentials and Safety
|
||||
|
||||
- Existing n8n PostgreSQL, S3, workflow-variable, and API token configuration is
|
||||
reused.
|
||||
- No S3 credentials or API secrets are hard-coded in workflow code.
|
||||
- Secrets are not included in workflow result messages.
|
||||
- Source database queries are read-only.
|
||||
- The workflows do not delete or update source database rows.
|
||||
- S3 upload nodes retain retry handling. A failed hourly execution can also be
|
||||
recovered naturally by the next full-data run.
|
||||
|
||||
## Removed Behavior
|
||||
|
||||
The workflows no longer contain:
|
||||
|
||||
- Delta export logic or stored delta pointers
|
||||
- Midnight full-export schedules
|
||||
- `latest.json` or `latest.csv`
|
||||
- JSON output
|
||||
- `changes/` keys
|
||||
- `full/` keys
|
||||
- Multipart or additional export files
|
||||
- FTTH mark-sent state handling
|
||||
|
||||
## Deployment Status
|
||||
|
||||
Both workflows were saved, published, and activated on June 15, 2026.
|
||||
|
||||
Active versions:
|
||||
|
||||
```text
|
||||
Fuel Records S3 Export:
|
||||
60cf5824-9345-45bb-a2eb-3b20b877fd32
|
||||
|
||||
FTTH Automation Ticket S3 Export:
|
||||
68b7be10-ac3a-43d8-8c17-b46a2cbb48d2
|
||||
```
|
||||
|
||||
## Manual Test Evidence
|
||||
|
||||
### Fuel Records S3 Export
|
||||
|
||||
Execution ID: `404079`
|
||||
|
||||
Rows exported: `2001`
|
||||
|
||||
Exact S3 key:
|
||||
|
||||
```text
|
||||
fuel_records/2026-06-15T14-39-50.csv
|
||||
```
|
||||
|
||||
### FTTH Automation Ticket S3 Export
|
||||
|
||||
Execution ID: `404080`
|
||||
|
||||
Rows exported:
|
||||
|
||||
```text
|
||||
CRQ: 12680
|
||||
INC: 31434
|
||||
```
|
||||
|
||||
Exact S3 keys:
|
||||
|
||||
```text
|
||||
automations/crq/2026-06-15T14-39-53.csv
|
||||
automations/inc/2026-06-15T14-39-53.csv
|
||||
```
|
||||
|
||||
Both manual tests completed successfully. Their upload builders generated one
|
||||
Fuel item and exactly two FTTH items, matching the required three output files.
|
||||
256
n8n-s3-export-workflows.md
Normal file
256
n8n-s3-export-workflows.md
Normal file
|
|
@ -0,0 +1,256 @@
|
|||
# n8n S3 Export Workflows
|
||||
|
||||
## Overview
|
||||
|
||||
Both workflows run in the `Africa/Nairobi` timezone and use the
|
||||
`Rahamafresh Tickets S3` credential.
|
||||
|
||||
| Workflow | n8n ID | Source | Bucket |
|
||||
| --- | --- | --- | --- |
|
||||
| Fuel Records S3 Export | `IP2KNAfFazAjTesh` | `logistics_department.fuel_records` | `fuel` |
|
||||
| FTTH Automation Ticket S3 Export | `JI3QkcJeHk9eYRsY` | `isp_department_crq.automations` and `isp_department_osp.automations` | `tickets` |
|
||||
|
||||
## Schedules
|
||||
|
||||
- Hourly delta export: `10` minutes after each hour, from `01:10` through
|
||||
`23:10` (`0 10 1-23 * * *`).
|
||||
- Daily full export: `00:05` (`0 5 0 * * *`).
|
||||
- The `00:05` run exports rows up to the end of the previous local day. For
|
||||
example, Wednesday's run exports the snapshot through Tuesday `23:59:59`.
|
||||
- The full export has its own state and does not read or advance the hourly
|
||||
delta pointer.
|
||||
|
||||
## Fuel Records S3 Export
|
||||
|
||||
### Source and change tracking
|
||||
|
||||
The source is `logistics_department.fuel_records`. The table has an indexed
|
||||
`updated_at` column and an update trigger that refreshes it whenever a row is
|
||||
changed.
|
||||
|
||||
Hourly runs select rows where:
|
||||
|
||||
```sql
|
||||
updated_at > last_successful_delta_export_at
|
||||
AND updated_at <= requested_at
|
||||
```
|
||||
|
||||
The delta pointer advances to the maximum exported `updated_at` only after all
|
||||
S3 uploads complete and the downloaded `latest.json` passes validation. A
|
||||
failed or empty run does not move the pointer incorrectly.
|
||||
|
||||
The full run selects all rows created before local midnight, independently of
|
||||
the delta pointer.
|
||||
|
||||
### Fuel object keys
|
||||
|
||||
Every successful run updates:
|
||||
|
||||
- `fuel_records/latest.json`
|
||||
- `fuel_records/latest.csv`
|
||||
|
||||
Delta runs with changed rows also write:
|
||||
|
||||
- `fuel_records/changes/YYYY-MM-DDTHH-mm-ss.json`
|
||||
- `fuel_records/changes/YYYY-MM-DDTHH-mm-ss.csv`
|
||||
|
||||
Full runs write:
|
||||
|
||||
- `fuel_records/full/YYYY-MM-DD.json`
|
||||
- `fuel_records/full/YYYY-MM-DD.csv`
|
||||
|
||||
Exports larger than 5,000 rows additionally produce numbered JSON and CSV
|
||||
parts such as `-part-0001`.
|
||||
|
||||
### Fuel state
|
||||
|
||||
Fuel state is stored in n8n workflow static data and is updated only after S3
|
||||
read-back validation:
|
||||
|
||||
- `last_successful_delta_export_at`
|
||||
- `last_successful_full_export_date`
|
||||
- `rows_exported`
|
||||
- `destination_key` and `destination_keys`
|
||||
- `n8n_execution_id`
|
||||
- `success`
|
||||
- `error_message`
|
||||
- `completed_at`
|
||||
|
||||
## FTTH Automation Ticket S3 Export
|
||||
|
||||
### Source and change tracking
|
||||
|
||||
The workflow requests an export package from the active SCOREBOARD service:
|
||||
|
||||
```text
|
||||
POST /api/v1/ftth/automation-export/package
|
||||
```
|
||||
|
||||
Datasets:
|
||||
|
||||
- CRQ: `isp_department_crq.automations`
|
||||
- INC: `isp_department_osp.automations`
|
||||
|
||||
Hourly delta packages select records changed after the last successful delta
|
||||
export. The SCOREBOARD service creates an export run before returning the
|
||||
package. n8n uploads every returned object and then calls:
|
||||
|
||||
```text
|
||||
POST /api/v1/ftth/automation-export/mark-sent
|
||||
```
|
||||
|
||||
Only a `SUCCESS` acknowledgement advances the delta pointer. Upload failure
|
||||
marks the run `FAILED` and preserves the previous pointer.
|
||||
|
||||
Full packages use the previous local date as `snapshot_date`, select the
|
||||
complete current-state dataset through the previous day, and update only the
|
||||
full-export date after successful upload.
|
||||
|
||||
### FTTH object keys
|
||||
|
||||
CRQ:
|
||||
|
||||
- `automations/crq/latest.json`
|
||||
- `automations/crq/latest.csv`
|
||||
- `automations/crq/changes/YYYY-MM-DDTHH-mm-ss.json`
|
||||
- `automations/crq/changes/YYYY-MM-DDTHH-mm-ss.csv`
|
||||
- `automations/crq/full/YYYY-MM-DD.json`
|
||||
- `automations/crq/full/YYYY-MM-DD.csv`
|
||||
|
||||
INC:
|
||||
|
||||
- `automations/inc/latest.json`
|
||||
- `automations/inc/latest.csv`
|
||||
- `automations/inc/changes/YYYY-MM-DDTHH-mm-ss.json`
|
||||
- `automations/inc/changes/YYYY-MM-DDTHH-mm-ss.csv`
|
||||
- `automations/inc/full/YYYY-MM-DD.json`
|
||||
- `automations/inc/full/YYYY-MM-DD.csv`
|
||||
|
||||
Exports larger than 5,000 rows additionally produce numbered JSON and CSV
|
||||
parts.
|
||||
|
||||
### FTTH state
|
||||
|
||||
State and audit history are stored in
|
||||
`ftth_automation.automation_export_runs`. Each run records:
|
||||
|
||||
- export type and requested timestamp
|
||||
- last successful delta timestamp
|
||||
- last successful full export date
|
||||
- snapshot date
|
||||
- row count
|
||||
- destination object keys
|
||||
- n8n execution ID
|
||||
- status (`PENDING`, `SUCCESS`, or `FAILED`)
|
||||
- completion timestamp and error summary
|
||||
|
||||
## File Contents
|
||||
|
||||
JSON files contain:
|
||||
|
||||
```json
|
||||
{
|
||||
"metadata": {
|
||||
"exported_at": "...",
|
||||
"export_type": "delta or full",
|
||||
"source_schema": "...",
|
||||
"source_table": "...",
|
||||
"dataset": "crq, inc, or omitted for fuel",
|
||||
"row_count": 0,
|
||||
"last_successful_delta_export_at": "...",
|
||||
"last_successful_full_export_date": "...",
|
||||
"snapshot_date": "...",
|
||||
"n8n_execution_id": "..."
|
||||
},
|
||||
"records": []
|
||||
}
|
||||
```
|
||||
|
||||
Fuel metadata names the previous and candidate delta pointers explicitly.
|
||||
CSV files contain the same exported records as tabular rows with a header
|
||||
line. CSV files do not contain the JSON metadata envelope.
|
||||
|
||||
## Manual Tests
|
||||
|
||||
Both workflows expose production POST webhooks.
|
||||
|
||||
Fuel:
|
||||
|
||||
```bash
|
||||
curl -X POST \
|
||||
-H 'Content-Type: application/json' \
|
||||
-d '{"export_type":"delta"}' \
|
||||
https://n8n.firesideafrica.cloud/webhook/fuel-records-s3-export
|
||||
|
||||
curl -X POST \
|
||||
-H 'Content-Type: application/json' \
|
||||
-d '{"export_type":"full"}' \
|
||||
https://n8n.firesideafrica.cloud/webhook/fuel-records-s3-export
|
||||
```
|
||||
|
||||
FTTH:
|
||||
|
||||
```bash
|
||||
curl -X POST \
|
||||
-H 'Content-Type: application/json' \
|
||||
-d '{"export_type":"delta"}' \
|
||||
https://n8n.firesideafrica.cloud/webhook/ftth-automation-s3-export
|
||||
|
||||
curl -X POST \
|
||||
-H 'Content-Type: application/json' \
|
||||
-d '{"export_type":"full","force":true}' \
|
||||
https://n8n.firesideafrica.cloud/webhook/ftth-automation-s3-export
|
||||
```
|
||||
|
||||
After a test:
|
||||
|
||||
1. Confirm the webhook response has `success: true`.
|
||||
2. Open the execution ID in n8n and confirm every upload succeeded.
|
||||
3. Confirm the response lists the expected bucket and destination keys.
|
||||
4. Check the S3 object timestamps and inspect the JSON metadata and row count.
|
||||
5. For FTTH, confirm the matching export run is `SUCCESS`.
|
||||
|
||||
### Production test record
|
||||
|
||||
Tests run on June 15, 2026:
|
||||
|
||||
| Workflow | Type | Execution | Result |
|
||||
| --- | --- | --- | --- |
|
||||
| Fuel Records S3 Export | Delta | `402524` | Success; 0 changed rows; latest JSON and CSV validated |
|
||||
| Fuel Records S3 Export | Full | `402527` | Success; 1,965 rows; snapshot date `2026-06-14` |
|
||||
| FTTH Automation Ticket S3 Export | Delta | `402530` | Success; CRQ and INC latest/change JSON and CSV written |
|
||||
| FTTH Automation Ticket S3 Export | Full | `402536` | Success; 44,114 rows; snapshot date `2026-06-14`; 28 objects including batch parts |
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
1. Check the n8n execution and identify whether the source query/package,
|
||||
upload, read-back validation, or mark-sent step failed.
|
||||
2. Confirm the `Rahamafresh Tickets S3` credential can write to the configured
|
||||
bucket.
|
||||
3. For fuel, inspect workflow static data. Do not manually advance
|
||||
`last_successful_delta_export_at` after a failed run.
|
||||
4. Verify `fuel_records.updated_at` is populated and its update trigger exists
|
||||
if fuel changes are missing.
|
||||
5. For FTTH, inspect `ftth_automation.automation_export_runs`, including
|
||||
`status`, `destination_object_keys`, `n8n_execution_id`, and
|
||||
`error_summary`.
|
||||
6. Confirm the SCOREBOARD health endpoint is healthy and that the configured
|
||||
export token and base URL are correct.
|
||||
7. Re-run the appropriate manual webhook after fixing the failure. A failed
|
||||
run leaves the last successful pointer unchanged, so the rows are retried.
|
||||
|
||||
## Published Version Check
|
||||
|
||||
In n8n, open each workflow and confirm:
|
||||
|
||||
- `Active` is enabled.
|
||||
- The saved `versionId` equals `activeVersionId`.
|
||||
- The trigger list contains the hourly schedule, daily `00:05` schedule, and
|
||||
manual webhook.
|
||||
- A new production webhook execution uses the same active version and returns
|
||||
the expected destination keys.
|
||||
|
||||
Current published versions as of June 15, 2026:
|
||||
|
||||
- Fuel Records S3 Export: `6833e5e5-97a0-41be-8f82-9ec612de92ce`
|
||||
- FTTH Automation Ticket S3 Export: `b2171088-eac2-439b-97e8-83dfa8117783`
|
||||
Loading…
Reference in a new issue