tracksolid_timescale_grafan.../docs/DWH_PIPELINE.md

253 lines
10 KiB
Markdown
Raw Permalink Normal View History

# DWH Pipeline — Operations Runbook
**Pipeline:** n8n extract + load into `tracksolid_dwh` bronze schema
**Design spec:** `docs/superpowers/specs/2026-04-24-n8n-dwh-bronze-pipeline-design.md`
**Implementation plan:** `docs/superpowers/plans/2026-04-24-n8n-dwh-bronze-pipeline.md`
---
## 1. What This Pipeline Does
Every ~3 hours during active hours (7 runs/day, 05:0023:00 EAT), n8n extracts 8 tables from the production `tracksolid_db` (Coolify internal network), writes each as a timestamped CSV to rustfs, then loads each CSV into the `bronze` schema on `tracksolid_dwh` (31.97.44.246:5888). Rustfs CSVs are moved to `dwh/processed/` after a successful load — never deleted.
Two n8n workflows:
- **`dwh_extract`** — cron-triggered, iterates tables in sequence, writes CSVs, calls `dwh_load_bronze` per table.
- **`dwh_load_bronze`** — triggered per-table by `dwh_extract`, loads one CSV inside a single transaction (insert → update watermark → update run log → move CSV).
---
## 2. Key Locations
| What | Where |
|---|---|
| Source DB | `tracksolid_db` on Coolify (internal Docker network, `timescale_db:5432`) |
| Target DB | `tracksolid_dwh` at `31.97.44.246:5888` |
| Rustfs bucket | `fleet-db` (same bucket used by pg_dump backups) |
| Active CSVs | `s3://fleet-db/dwh/exports/{table}/{YYYYMMDD_HHMM}_EAT.csv` |
| Processed CSVs | `s3://fleet-db/dwh/processed/{table}/{YYYYMMDD_HHMM}_EAT.csv` |
| Control schema | `dwh_control` in `tracksolid_dwh` |
| Migrations | `dwh/26*.sql` applied in numeric order |
---
## 3. First-Time Setup
Apply migrations to `tracksolid_dwh` in numeric order, as a superuser (not `dwh_owner`):
```bash
PSQL="psql 'postgres://postgres:***@31.97.44.246:5888/tracksolid_dwh?sslmode=require'"
$PSQL -f dwh/260423_dwh_ddl_v1.sql # Bronze DDL, roles, schemas
$PSQL -f dwh/261001_dwh_control.sql # Watermarks + run log
$PSQL -f dwh/261002_bronze_constraints_audit.sql # Assertion: ON CONFLICT keys exist
$PSQL -f dwh/261003_dwh_roles.sql # Assertion: roles + grants present
$PSQL -f dwh/261004_dwh_observability_views.sql # Freshness/failure views
```
Each migration is idempotent. Audit files (261002, 261003) raise an exception with a bullet list of what is missing if the contract is broken — re-apply the relevant predecessor file and try again.
### Rustfs prefixes
```bash
aws --endpoint ${RUSTFS_ENDPOINT} s3api put-object \
--bucket fleet-db --key dwh/exports/
aws --endpoint ${RUSTFS_ENDPOINT} s3api put-object \
--bucket fleet-db --key dwh/processed/
```
### n8n credentials
Three credentials, all configured in the n8n UI before importing workflows:
| Credential | Target | User | Notes |
|---|---|---|---|
| `tracksolid_source` | Coolify internal → `tracksolid_db` | `grafana_ro` | Read-only; no `sslmode` needed on internal network |
| `tracksolid_dwh_target` | `31.97.44.246:5888``tracksolid_dwh` | `dwh_owner` | **Must set `sslmode=require`** — public IP |
| `rustfs_s3` | `${RUSTFS_ENDPOINT}` | `${RUSTFS_ACCESS_KEY}` | Same creds as pg_dump backup sidecar |
Test each credential via the n8n "Test connection" button before enabling the cron schedule.
---
## 4. Schedule
n8n Schedule node, Africa/Nairobi TZ: `0 5,8,11,14,17,20,23 * * *`
- 7 runs/day at 05:00, 08:00, 11:00, 14:00, 17:00, 20:00, 23:00 EAT
- Overnight gap (23:00 → 05:00 = 6h) by design — device traffic minimal
- First-of-day run carries the overnight backlog (watermark picks up where 23:00 left off)
---
## 5. What Each Table Does on Every Run
### Snapshot tables (TRUNCATE + full reload)
`bronze.devices`, `bronze.live_positions` — small state tables, "current state" semantics. Full replace every run.
### Incremental tables (watermark + append-with-dedup)
| Bronze table | Source watermark column | ON CONFLICT target |
|---|---|---|
| `position_history` | `recorded_at` (DB insertion time) | `(imei, gps_time)` |
| `trips` | `updated_at` | `id` |
| `alarms` | `updated_at` | `id` |
| `parking_events` | `updated_at` | `id` |
| `device_events` | `created_at` | `id` |
| `ingestion_log` | `run_at` | `id` |
Watermark bounds are closed upper: `WHERE <col> > last_extracted_at AND <col> <= :run_started_at`.
### Schema drift to handle in extract SQL
- **`trips.distance_m` → `bronze.trips.distance_km`**: source stores metres, bronze expects km. Extract SQL: `SELECT ..., distance_m/1000.0 AS distance_km, ...`. Cross-reference: FIX-M16 in `CLAUDE.md`.
### PostGIS geometry round-trip
All six geometry columns (`live_positions`, `position_history`, `trips.start_geom`, `trips.end_geom`, `parking_events`, `alarms`) use EWKT serialisation:
```sql
-- Extract
SELECT ..., CASE WHEN geom IS NULL THEN NULL ELSE ST_AsEWKT(geom) END AS geom_ewkt FROM ...;
-- Load
INSERT INTO bronze... (..., geom) VALUES (..., ST_GeomFromEWKT(:geom_ewkt));
```
SRID 4326 is preserved inline; no separate SRID step required.
---
## 6. Verifying a Healthy Run
### Immediate sanity checks (after any scheduled run)
```sql
-- Any failures in the last hour?
SELECT * FROM dwh_control.v_recent_failures WHERE run_started_at > NOW() - INTERVAL '1 hour';
-- All tables loaded in last 4h?
SELECT * FROM dwh_control.v_table_freshness WHERE lag > INTERVAL '4 hours';
-- Watermarks advancing?
SELECT * FROM dwh_control.v_watermark_lag ORDER BY extract_lag DESC;
```
### Row-count parity (spot check weekly)
```sql
-- Source
SELECT COUNT(*) FROM tracksolid.position_history;
-- Target
SELECT COUNT(*) FROM bronze.position_history;
```
Numbers should match ± rows inserted between the two queries. Persistent gap > 1% → investigate CSV parse errors or a dropped batch.
### Geometry round-trip
```sql
SELECT ST_AsText(geom) FROM bronze.position_history WHERE geom IS NOT NULL LIMIT 5;
-- Should return valid POINT(lng lat), not NULL or garbage.
```
---
## 7. Troubleshooting
### "A table is stale (`v_table_freshness` shows lag > 4h)"
1. Check `v_recent_failures` for that table. If a row exists, read `error_message`.
2. If `status='loading'` in `extract_runs` for that table, a load is in progress — wait for the next cron tick. If it stays `loading` across two ticks, the n8n executor crashed mid-transaction; the DB rolled back, and the next run will retry naturally.
3. If no failure row and no in-progress row, the extract workflow never fired — check n8n execution logs for the cron trigger.
### "A CSV is stuck in `dwh/exports/`"
The load failed after upload. The next scheduled run will re-process it (the watermark did not advance, so the extract SQL returns the same window). Safe to leave. If multiple days of CSVs pile up, the load workflow has a persistent bug — open n8n execution logs for the specific `run_id` in `extract_runs`.
### "Row counts diverge more than ~1%"
Usually one of:
- A retry window overlapped the PK range and some rows lost the race with a concurrent source-side write. Re-trigger the extract for that window manually (see §8).
- CSV parse error silently dropped a row. Check `extract_runs.rows_extracted` vs. `rows_loaded` — if they differ, the loader found malformed CSV.
### "Geometry loaded as NULL"
EWKT serialisation broke on the extract side. Verify the source query has the `CASE WHEN geom IS NULL` guard — without it, `ST_AsEWKT(NULL)` returns `NULL` correctly but an empty geometry returns `'GEOMETRYCOLLECTION EMPTY'` which `ST_GeomFromEWKT` rejects.
### "`bronze.trips.distance_km` values are 1000× too large"
The extract query is missing `/1000.0` on the source `distance_m` column. See §5 "Schema drift".
---
## 8. Manual Re-Run
### Re-run a single table for the current window
1. Open n8n, go to `dwh_extract` workflow.
2. Locate the branch for that table.
3. Click **Execute Workflow** → selects that table only.
4. Confirm a new row appears in `dwh_control.extract_runs` with `status='loaded'`.
### Back-fill a historical window
The extract workflow respects the watermark; to re-extract a window, rewind the watermark:
```sql
-- Rewind position_history to 24h ago
UPDATE dwh_control.extract_watermarks
SET last_extracted_at = NOW() - INTERVAL '24 hours'
WHERE table_name = 'position_history';
```
Next scheduled run will re-extract the gap. Loads are idempotent (`ON CONFLICT DO NOTHING` on the PK), so duplicate rows are filtered at the bronze boundary.
### Full reseed (nuclear option)
```sql
-- Restart position_history from the beginning
UPDATE dwh_control.extract_watermarks
SET last_extracted_at = '2026-01-01T00:00:00Z'
WHERE table_name = 'position_history';
```
The first post-reseed run will produce one very large CSV (~all history). The rustfs `exports/` prefix will briefly hold a multi-GB object. Expected; it moves to `processed/` on success.
---
## 9. Credential Rotation
`260423_dwh_ddl_v1.sql` commits role passwords in plaintext — a pre-existing flaw to be cleaned up separately.
To rotate:
```sql
-- As superuser on tracksolid_dwh:
ALTER ROLE dwh_owner PASSWORD '<new secret>';
ALTER ROLE grafana_ro PASSWORD '<new secret>';
```
Then update the matching n8n credential and Grafana datasource. Never commit the new password — store in `.env` if needed for scripts, or keep exclusively inside n8n/Grafana credential stores.
---
## 10. Known Quirks
| Quirk | Source | Handling |
|---|---|---|
| `trips.distance_m``bronze.trips.distance_km` | Schema drift between source and bronze | Divide in extract SQL (§5) |
| Hypertable row counts read 0 in `pg_stat_user_tables` | TimescaleDB quirk | Always `SELECT COUNT(*)` directly |
| `parking_events` can be empty for days | Endpoint returns empty; not a failure | Zero rows loaded is a valid run outcome |
| First run of each day larger | Overnight backlog | Expected; plan watermark design |
| `last_extracted_at` default `2026-01-01` | Seed value from 261001 | First run on a new table back-fills all history |
---
## 11. Out of Scope (follow-up)
- Silver/gold transformations — `silver` and `gold` schemas exist but contain no views.
- Grafana dashboard panels — these views are the data source; panels TBD.
- OBD / fault codes / fuel / temperature / LBS / heartbeats — webhooks not yet registered; add a watermark row + extract branch when they start reporting.
- Bronze schema evolution tooling — additive changes via numbered migrations is fine for one pipeline; revisit if scope grows.