Compare commits
3 commits
a4b90a33d8
...
f06c11fd11
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f06c11fd11 | ||
|
|
7d3bba8d78 | ||
|
|
509338c076 |
6 changed files with 63 additions and 33 deletions
|
|
@ -3,12 +3,12 @@
|
||||||
# Shared database (the `tickets` schema lives in tracksolid_db; internal Docker host)
|
# Shared database (the `tickets` schema lives in tracksolid_db; internal Docker host)
|
||||||
DATABASE_URL=postgresql://tracksolid_owner:<password>@timescale_db:5432/tracksolid_db
|
DATABASE_URL=postgresql://tracksolid_owner:<password>@timescale_db:5432/tracksolid_db
|
||||||
|
|
||||||
# rustfs / S3 — source ticket snapshots (automations/inc/<EAT-timestamp>.csv)
|
# S3 — source INC ticket CDC stream (isptickets bucket, automations/inc/changes/<EAT-ts>.csv)
|
||||||
RUSTFS_ENDPOINT=https://s3.rahamafresh.com
|
RUSTFS_ENDPOINT=https://s3.rahamafresh.com
|
||||||
RUSTFS_ACCESS_KEY=<key>
|
RUSTFS_ACCESS_KEY=isptickets
|
||||||
RUSTFS_SECRET_KEY=<secret>
|
RUSTFS_SECRET_KEY=<secret>
|
||||||
RUSTFS_REGION=us-east-1
|
RUSTFS_REGION=us-east-1
|
||||||
TICKETS_BUCKET=tickets
|
TICKETS_BUCKET=isptickets
|
||||||
|
|
||||||
# Geocoder (keyed — public Nominatim rate-limits bulk)
|
# Geocoder (keyed — public Nominatim rate-limits bulk)
|
||||||
GEOCODER_PROVIDER=locationiq # locationiq | opencage
|
GEOCODER_PROVIDER=locationiq # locationiq | opencage
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,7 @@
|
||||||
# fleettickets — INC ingestion image (Coolify-deployable).
|
# fleettickets — INC ingestion image (Coolify-deployable).
|
||||||
# A small batch/cron worker: it has no web server. Coolify keeps the container
|
# A small batch/cron worker: it has no web server. Coolify keeps the container
|
||||||
# running (CMD below) and fires the ingest via a Scheduled Task:
|
# running (CMD below) and fires the ingest via a Scheduled Task:
|
||||||
# python import_tickets.py --from-bucket --apply (cron: 15 7-19 * * *)
|
# python import_tickets.py --from-bucket --apply (cron: */20 6-20 * * *)
|
||||||
# Env (set in Coolify): DATABASE_URL, RUSTFS_*, GEOCODER_*. S3 is via boto3 — no
|
# Env (set in Coolify): DATABASE_URL, RUSTFS_*, GEOCODER_*. S3 is via boto3 — no
|
||||||
# aws CLI needed. psycopg2-binary ships its own libpq, so no build toolchain.
|
# aws CLI needed. psycopg2-binary ships its own libpq, so no build toolchain.
|
||||||
FROM python:3.12-slim
|
FROM python:3.12-slim
|
||||||
|
|
|
||||||
48
README.md
48
README.md
|
|
@ -21,7 +21,7 @@ Field-ops **INC ticket** ingestion, geocoding, and read-schema that powers the
|
||||||
| `migrations/08_inc_open_sla_view.sql` | `tickets.inc_open_sla` view — open (`is_actionable`) tickets with **derived SLA** (`hours_open`, `sla_state` vs 48h; clock = `created_at_service` ∥ `first_seen_at`), plus team/cluster/`geog` for dispatch |
|
| `migrations/08_inc_open_sla_view.sql` | `tickets.inc_open_sla` view — open (`is_actionable`) tickets with **derived SLA** (`hours_open`, `sla_state` vs 48h; clock = `created_at_service` ∥ `first_seen_at`), plus team/cluster/`geog` for dispatch |
|
||||||
| `migrations/09_inc_dashboard_fn.sql` | `reporting.fn_inc_dashboard(cluster, status, window, from, to)` — one JSON payload (`window` / `open` GeoJSON / `closed` GeoJSON / `metrics` / `freshness`) powering the FleetOps live INC map. Open=live, closed=windowed (EAT calendar / custom); filters AND |
|
| `migrations/09_inc_dashboard_fn.sql` | `reporting.fn_inc_dashboard(cluster, status, window, from, to)` — one JSON payload (`window` / `open` GeoJSON / `closed` GeoJSON / `metrics` / `freshness`) powering the FleetOps live INC map. Open=live, closed=windowed (EAT calendar / custom); filters AND |
|
||||||
| `migrations/10_inc_history_capture.sql` | History for time-series: `tickets.closure_events` (append-only observed closures) + `tickets.inc_daily_snapshot` (per-EAT-day open backlog + flow), populated by `tickets.capture_history()` each ingest. Unlocks **backlog-over-time** |
|
| `migrations/10_inc_history_capture.sql` | History for time-series: `tickets.closure_events` (append-only observed closures) + `tickets.inc_daily_snapshot` (per-EAT-day open backlog + flow), populated by `tickets.capture_history()` each ingest. Unlocks **backlog-over-time** |
|
||||||
| `import_tickets.py` | Ingests the **newest INC CSV** from the rustfs `tickets` bucket (`automations/inc/<EAT-timestamp>.csv`) and upserts on `ticket_id`; geocodes clusters + INC locations |
|
| `import_tickets.py` | Drains the **incremental INC change stream** from the `isptickets` bucket (`automations/inc/changes/<EAT-timestamp>.csv`), upserting on `ticket_id` oldest→newest; geocodes clusters + INC locations |
|
||||||
| `run_migrations.py` | Applies `migrations/*.sql` in order (ledger: `tickets.schema_migrations`) |
|
| `run_migrations.py` | Applies `migrations/*.sql` in order (ledger: `tickets.schema_migrations`) |
|
||||||
| `shared.py` | Minimal DB/logging helpers (self-contained — no tracksolid dependency) |
|
| `shared.py` | Minimal DB/logging helpers (self-contained — no tracksolid dependency) |
|
||||||
|
|
||||||
|
|
@ -87,7 +87,7 @@ python run_migrations.py # apply the schema (idempotent)
|
||||||
## Run
|
## Run
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
# ingest the newest INC CSV from the bucket (skip-if-unchanged, then archive)
|
# drain the incremental INC change stream (every new file oldest→newest, then archive)
|
||||||
python import_tickets.py --from-bucket --apply
|
python import_tickets.py --from-bucket --apply
|
||||||
|
|
||||||
# geocode (needs GEOCODER_API_KEY)
|
# geocode (needs GEOCODER_API_KEY)
|
||||||
|
|
@ -108,28 +108,48 @@ Coolify builds it and keeps the container alive (`CMD tail -f /dev/null`); the i
|
||||||
runs as a **Scheduled Task**, not a system crontab:
|
runs as a **Scheduled Task**, not a system crontab:
|
||||||
|
|
||||||
- **Command:** `python import_tickets.py --from-bucket --apply`
|
- **Command:** `python import_tickets.py --from-bucket --apply`
|
||||||
- **Frequency:** `15 7-19 * * *` (`:15` past each hour, **07:15–19:15 EAT**). This
|
- **Frequency:** `*/20 6-20 * * *` (every 20 min, **06:00–20:40 EAT**). This
|
||||||
Coolify instance runs scheduled tasks in **EAT (Africa/Nairobi)**, so no UTC
|
Coolify instance runs scheduled tasks in **EAT (Africa/Nairobi)**, so no UTC
|
||||||
conversion is needed.
|
conversion is needed.
|
||||||
- **Env vars** (Coolify → Environment Variables): `DATABASE_URL` (internal DB host),
|
- **Env vars** (Coolify → Environment Variables): `DATABASE_URL` (internal DB host),
|
||||||
`RUSTFS_*`, `GEOCODER_*`.
|
`RUSTFS_*` (now the `isptickets` bucket credentials), `GEOCODER_*`.
|
||||||
|
|
||||||
Skip-if-unchanged makes a run on an already-ingested snapshot a cheap no-op.
|
The watermark makes a run with no new change files a cheap no-op.
|
||||||
|
|
||||||
For a plain host/VM instead of Coolify, [`run_ingest.sh`](run_ingest.sh) loads `.env`
|
For a plain host/VM instead of Coolify, [`run_ingest.sh`](run_ingest.sh) loads `.env`
|
||||||
and runs the ingest; schedule it with a crontab line
|
and runs the ingest; schedule it with a crontab line
|
||||||
(`CRON_TZ=Africa/Nairobi` / `15 7-19 * * *`).
|
(`CRON_TZ=Africa/Nairobi` / `*/20 6-20 * * *`).
|
||||||
|
|
||||||
|
### Bucket cutover (one-time reseed)
|
||||||
|
|
||||||
|
When the source provider moves the feed to a new bucket (e.g. `tickets` → `isptickets`),
|
||||||
|
the stored watermark holds a key from the *old* bucket's stream, whose timestamp may be
|
||||||
|
newer than the new bucket's first file — which would otherwise be skipped. Point the
|
||||||
|
`RUSTFS_*` creds + `TICKETS_BUCKET` at the new bucket, then drain it once with `--reseed`,
|
||||||
|
which ignores the stored watermark and ingests **every** file in `changes/` oldest→newest:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
python import_tickets.py --from-bucket --reseed # dry-run first
|
||||||
|
python import_tickets.py --from-bucket --reseed --apply # commit + archive
|
||||||
|
```
|
||||||
|
|
||||||
|
Upserts are idempotent (`ticket_id` PK, rows never deleted) and the new stream's periodic
|
||||||
|
full-state re-emissions re-assert current state, so this is non-destructive and converges
|
||||||
|
even across the cutover gap. After it, the watermark is current — resume normal
|
||||||
|
`--from-bucket --apply` runs (no `--reseed`). The old bucket is left untouched.
|
||||||
|
|
||||||
## Notes
|
## Notes
|
||||||
|
|
||||||
- The n8n export writes a **full current-state CSV per hour** to
|
- The n8n export writes an **incremental CDC change stream** to
|
||||||
`automations/inc/<EAT-timestamp>.csv` — no `latest` pointer, no metadata envelope, no
|
`automations/inc/changes/<EAT-timestamp>.csv`: a full-state baseline followed by files
|
||||||
deltas. The loader lists the prefix, takes the **newest** file, and ingests it.
|
holding only the rows that changed (with periodic full-state re-emissions). No `latest`
|
||||||
- **Skip-if-unchanged:** the newest file's S3 **ETag** is compared to the last processed
|
pointer, no metadata envelope. The loader drains **every not-yet-processed file
|
||||||
file's ETag (stored in `tickets.import_meta.metadata.source_etag`); if equal, the DB write
|
oldest→newest** — taking only the newest would drop intermediate deltas.
|
||||||
is skipped (the export re-emits byte-identical content most hours).
|
- **Watermark:** the newest file already applied is recorded in
|
||||||
|
`tickets.import_meta.metadata.source_max_key`; runs skip anything at/older than it, so
|
||||||
|
reruns are cheap no-ops. `--reseed` ignores it for a one-time bucket cutover.
|
||||||
- **Upsert on `ticket_id`** (PRIMARY KEY) — duplication is impossible; rows are never
|
- **Upsert on `ticket_id`** (PRIMARY KEY) — duplication is impossible; rows are never
|
||||||
deleted, so closed-ticket history accumulates. On success the file is **moved** to
|
deleted, so closed-ticket history accumulates. On success each file is **moved** to
|
||||||
`automations/inc/processed/`.
|
`automations/inc/processed/`.
|
||||||
- **Cleaning at ingest:** drop `is_alarm=true` rows + the `EXPORT STOPPED…` sentinel; drop
|
- **Cleaning at ingest:** drop `is_alarm=true` rows + the `EXPORT STOPPED…` sentinel; drop
|
||||||
`week_start`/`week_end`, `source_s3_*`/`source_snapshot_id`, `department`/`source_type`;
|
`week_start`/`week_end`, `source_s3_*`/`source_snapshot_id`, `department`/`source_type`;
|
||||||
|
|
@ -192,7 +212,7 @@ Findings to keep in mind (see the PRD for detail):
|
||||||
|
|
||||||
## Status / roadmap
|
## Status / roadmap
|
||||||
|
|
||||||
Live: INC ingestion deployed on Coolify (hourly `15 7-19 * * *` EAT), schema +
|
Live: INC ingestion deployed on Coolify (every 20 min `*/20 6-20 * * *` EAT), schema +
|
||||||
generated columns + geocoding + the `inc_open_sla` view in `tracksolid_db`.
|
generated columns + geocoding + the `inc_open_sla` view in `tracksolid_db`.
|
||||||
Next (Phase 2): time-series analytics (closure rate, MTTR/SLA trends), then FleetNow
|
Next (Phase 2): time-series analytics (closure rate, MTTR/SLA trends), then FleetNow
|
||||||
vehicle **dispatch** off `geog`, and **team closure attribution**. **CRQ** is a
|
vehicle **dispatch** off `geog`, and **team closure attribution**. **CRQ** is a
|
||||||
|
|
|
||||||
|
|
@ -54,7 +54,7 @@ What is actually built and deployed, as of the Phase-1 completion. Companion to
|
||||||
`TZ=Africa/Nairobi`, keep-alive `tail -f /dev/null`). Separate from the FleetOps
|
`TZ=Africa/Nairobi`, keep-alive `tail -f /dev/null`). Separate from the FleetOps
|
||||||
web app (`fleet-ops-staging`).
|
web app (`fleet-ops-staging`).
|
||||||
- **Scheduled Task:** `python import_tickets.py --from-bucket --apply`, cron
|
- **Scheduled Task:** `python import_tickets.py --from-bucket --apply`, cron
|
||||||
`15 7-19 * * *` in **EAT** (Coolify runs tasks in EAT — no UTC conversion).
|
`*/20 6-20 * * *` in **EAT** (Coolify runs tasks in EAT — no UTC conversion).
|
||||||
- **Env vars** (Coolify): `DATABASE_URL` (internal DB host), `RUSTFS_*`, `GEOCODER_*`.
|
- **Env vars** (Coolify): `DATABASE_URL` (internal DB host), `RUSTFS_*`, `GEOCODER_*`.
|
||||||
- For a plain host/VM, `run_ingest.sh` + a crontab line is the alternative.
|
- For a plain host/VM, `run_ingest.sh` + a crontab line is the alternative.
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -13,8 +13,8 @@ Everything downstream reads from `raw` (resilient to source schema drift). The D
|
||||||
derives `geom` (see migrations): feed coords (raw lat/lng) -> location geocode
|
derives `geom` (see migrations): feed coords (raw lat/lng) -> location geocode
|
||||||
(tickets.geo_locations) -> cluster centroid (tickets.geo_clusters) -> none.
|
(tickets.geo_locations) -> cluster centroid (tickets.geo_clusters) -> none.
|
||||||
|
|
||||||
Source data: the n8n S3 export writes CSV files to the `tickets` bucket under
|
Source data: the n8n S3 export writes CSV files to the `isptickets` bucket under
|
||||||
automations/inc/changes/<EAT-timestamp>.csv (e.g. 2026-06-22T15-50-39.csv)
|
automations/inc/changes/<EAT-timestamp>.csv (e.g. 2026-06-24T09-55-44.csv)
|
||||||
This is an INCREMENTAL (CDC) stream: the first file is a full current-state
|
This is an INCREMENTAL (CDC) stream: the first file is a full current-state
|
||||||
baseline, and every later file holds only the rows that CHANGED since the prior
|
baseline, and every later file holds only the rows that CHANGED since the prior
|
||||||
export (new + updated tickets, keyed by ticket_id; deletions are never emitted).
|
export (new + updated tickets, keyed by ticket_id; deletions are never emitted).
|
||||||
|
|
@ -39,6 +39,7 @@ Provider/key from env: GEOCODER_PROVIDER (locationiq|opencage), GEOCODER_API_KEY
|
||||||
|
|
||||||
Usage (needs DATABASE_URL + RUSTFS_* + GEOCODER_* env; see .env.example):
|
Usage (needs DATABASE_URL + RUSTFS_* + GEOCODER_* env; see .env.example):
|
||||||
python import_tickets.py --from-bucket --apply
|
python import_tickets.py --from-bucket --apply
|
||||||
|
python import_tickets.py --from-bucket --reseed --apply # one-time bucket cutover
|
||||||
python import_tickets.py --inc-csv 2026-06-15T17-00-00.csv --apply
|
python import_tickets.py --inc-csv 2026-06-15T17-00-00.csv --apply
|
||||||
python import_tickets.py --geocode-clusters --apply
|
python import_tickets.py --geocode-clusters --apply
|
||||||
python import_tickets.py --geocode-locations --apply
|
python import_tickets.py --geocode-locations --apply
|
||||||
|
|
@ -71,7 +72,7 @@ log = get_logger("import_tickets")
|
||||||
# ── INC ingestion config ──────────────────────────────────────────────────────
|
# ── INC ingestion config ──────────────────────────────────────────────────────
|
||||||
_TABLE = "tickets.inc"
|
_TABLE = "tickets.inc"
|
||||||
_DATASET = "inc"
|
_DATASET = "inc"
|
||||||
_BUCKET = os.getenv("TICKETS_BUCKET", "tickets")
|
_BUCKET = os.getenv("TICKETS_BUCKET", "isptickets")
|
||||||
_INC_PREFIX = "automations/inc/changes/" # the incremental (CDC) change stream
|
_INC_PREFIX = "automations/inc/changes/" # the incremental (CDC) change stream
|
||||||
_PROCESSED_PREFIX = "automations/inc/processed/"
|
_PROCESSED_PREFIX = "automations/inc/processed/"
|
||||||
_EAT = timezone(timedelta(hours=3)) # Africa/Nairobi — filenames + data are EAT
|
_EAT = timezone(timedelta(hours=3)) # Africa/Nairobi — filenames + data are EAT
|
||||||
|
|
@ -102,14 +103,15 @@ _GEOCODE_INTERVAL_S = float(os.getenv("GEOCODER_MIN_INTERVAL_S", "1.1"))
|
||||||
_last_geocode_at = 0.0
|
_last_geocode_at = 0.0
|
||||||
|
|
||||||
|
|
||||||
# ── data loading (CSV · newest-file · ETag skip-if-unchanged) ───────────────────
|
# ── data loading (CSV · incremental CDC change stream · per-file watermark) ─────
|
||||||
# The n8n hourly export writes a full current-state CSV per hour to
|
# The n8n export writes an INCREMENTAL change stream to the `isptickets` bucket under
|
||||||
# automations/inc/<EAT-timestamp>.csv (no latest pointer, no envelope, no deltas).
|
# automations/inc/changes/<EAT-timestamp>.csv: a first full-state baseline, then files
|
||||||
# We ingest the NEWEST file; if its S3 ETag matches the last processed file's ETag
|
# holding only the rows that CHANGED (with periodic full-state re-emissions). We drain
|
||||||
# we skip the DB write (the export re-emits byte-identical content most hours).
|
# EVERY not-yet-processed file oldest→newest, upsert on ticket_id, advance the watermark
|
||||||
|
# (tickets.import_meta.metadata->>'source_max_key') per file, and archive it to processed/.
|
||||||
# S3 access is via boto3 (no aws-CLI dependency → runs cleanly in a slim container).
|
# S3 access is via boto3 (no aws-CLI dependency → runs cleanly in a slim container).
|
||||||
def _s3_client():
|
def _s3_client():
|
||||||
"""boto3 S3 client for the rustfs endpoint (force path-style addressing)."""
|
"""boto3 S3 client for the S3 endpoint (force path-style addressing)."""
|
||||||
return boto3.client(
|
return boto3.client(
|
||||||
"s3",
|
"s3",
|
||||||
endpoint_url=os.environ["RUSTFS_ENDPOINT"],
|
endpoint_url=os.environ["RUSTFS_ENDPOINT"],
|
||||||
|
|
@ -286,7 +288,11 @@ def ingest(args) -> None:
|
||||||
|
|
||||||
# watermark: skip anything at/older than the newest file already applied. Archiving
|
# watermark: skip anything at/older than the newest file already applied. Archiving
|
||||||
# normally empties changes/, but this guards a failed archive from re-applying.
|
# normally empties changes/, but this guards a failed archive from re-applying.
|
||||||
last_ts = _last_processed_ts()
|
# --reseed ignores the stored watermark and drains EVERY file in changes/ once — used
|
||||||
|
# for a one-time bucket cutover, where the stored key points at the old bucket's stream
|
||||||
|
# and its timestamp may be newer than the new bucket's first file. Crash-safe: each file
|
||||||
|
# still advances source_max_key + archives per file, so a plain rerun resumes cleanly.
|
||||||
|
last_ts = None if args.reseed else _last_processed_ts()
|
||||||
_floor = datetime.min.replace(tzinfo=_EAT)
|
_floor = datetime.min.replace(tzinfo=_EAT)
|
||||||
pending = [(k, e) for k, e in listing
|
pending = [(k, e) for k, e in listing
|
||||||
if last_ts is None or (_ts_from_key(k) or _floor) > last_ts]
|
if last_ts is None or (_ts_from_key(k) or _floor) > last_ts]
|
||||||
|
|
@ -610,8 +616,12 @@ def main() -> None:
|
||||||
ap = argparse.ArgumentParser(description="Ingest INC tickets from CSV (raw-first) + geocode")
|
ap = argparse.ArgumentParser(description="Ingest INC tickets from CSV (raw-first) + geocode")
|
||||||
ap.add_argument("--apply", action="store_true", help="Write to DB (default: dry-run)")
|
ap.add_argument("--apply", action="store_true", help="Write to DB (default: dry-run)")
|
||||||
ap.add_argument("--from-bucket", action="store_true",
|
ap.add_argument("--from-bucket", action="store_true",
|
||||||
help="Ingest the newest INC csv from the rustfs tickets bucket (aws CLI); "
|
help="Drain the incremental INC change stream (automations/inc/changes/) "
|
||||||
"skips if unchanged (ETag) and archives processed files")
|
"from the isptickets S3 bucket: every not-yet-processed file "
|
||||||
|
"oldest→newest, upsert on ticket_id, advance the watermark, archive")
|
||||||
|
ap.add_argument("--reseed", action="store_true",
|
||||||
|
help="Ignore the stored watermark and drain every file in changes/ once "
|
||||||
|
"(one-time bucket cutover / reseed). Use with --from-bucket --apply")
|
||||||
ap.add_argument("--inc-csv", default=None, help="Local INC tickets CSV file (dev)")
|
ap.add_argument("--inc-csv", default=None, help="Local INC tickets CSV file (dev)")
|
||||||
ap.add_argument("--geocode-clusters", action="store_true",
|
ap.add_argument("--geocode-clusters", action="store_true",
|
||||||
help="Geocode distinct clusters into the gazetteer, then re-resolve geoms")
|
help="Geocode distinct clusters into the gazetteer, then re-resolve geoms")
|
||||||
|
|
|
||||||
|
|
@ -4,8 +4,8 @@
|
||||||
# Loads env from the local .env (DATABASE_URL + RUSTFS_* + GEOCODER_*) and runs the
|
# Loads env from the local .env (DATABASE_URL + RUSTFS_* + GEOCODER_*) and runs the
|
||||||
# newest-INC-CSV ingest with --apply (skip-if-unchanged + archive are built in).
|
# newest-INC-CSV ingest with --apply (skip-if-unchanged + archive are built in).
|
||||||
#
|
#
|
||||||
# Install on the instance (ingest at :15, 07:00–19:00 EAT):
|
# Install on the instance (ingest every 20 min, 06:00–20:40 EAT):
|
||||||
# 15 7-19 * * * /opt/fleettickets/run_ingest.sh >> /var/log/fleettickets-inc.log 2>&1
|
# */20 6-20 * * * /opt/fleettickets/run_ingest.sh >> /var/log/fleettickets-inc.log 2>&1
|
||||||
# Ensure the crontab runs in the Africa/Nairobi timezone (CRON_TZ=Africa/Nairobi or
|
# Ensure the crontab runs in the Africa/Nairobi timezone (CRON_TZ=Africa/Nairobi or
|
||||||
# the host/container TZ), since the export filenames and the schedule are EAT.
|
# the host/container TZ), since the export filenames and the schedule are EAT.
|
||||||
set -euo pipefail
|
set -euo pipefail
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue