tracksolid_timescale_grafan.../DWH_Execution_Manual.md
David Kiania 34f5fa1b9c feat(dwh): bronze pipeline migrations, runbook, and execution manual
DWH pipeline (new):
  - dwh/261001_dwh_control.sql — watermarks + per-run audit log schema
  - dwh/261002_bronze_constraints_audit.sql — ON CONFLICT key assertion
  - dwh/261003_dwh_roles.sql — dwh_owner / grafana_ro contract assertion
  - dwh/261004_dwh_observability_views.sql — v_table_freshness,
    v_recent_failures, v_watermark_lag (readable by grafana_ro)
  - docs/DWH_PIPELINE.md — operations runbook (setup, troubleshooting,
    manual re-run, back-fill, rotation)
  - DWH_Execution_Manual.md — reusable playbook for future data
    projects (extract → blob → load pattern, 7 design principles,
    snapshot-vs-incremental matrix, verification gates)
  - docs/superpowers/{specs,plans}/2026-04-24-n8n-dwh-bronze-pipeline-*
    — design spec + 27-task implementation plan

Security:
  - dwh/260423_dwh_ddl_v1.sql — redacted plaintext role passwords to
    'CHANGE_ME_BEFORE_APPLY' placeholders; added SECURITY header
    documenting generation + rotation flow

Docs:
  - CLAUDE.md — §3 adds tracksolid_dwh@31.97.44.246:5888 target,
    §4 adds dwh/ + docs/DWH_PIPELINE.md to codebase map, §5 adds
    bronze + dwh_control schema roll-up, §10 adds deploy task +
    password rotation follow-up

Also includes miscellaneous in-progress files accumulated on this
branch (workspace, analytics notes, vehicle CSVs, extract helpers,
renamed markdown archives).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-25 01:07:53 +03:00

385 lines
19 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# DWH Execution Manual
> **Purpose:** A reusable playbook for building an `extract → blob → load` data warehouse bronze layer using n8n (or any equivalent orchestrator) + object storage + PostgreSQL/PostGIS. Generalised from the Fireside Tracksolid DWH pipeline (2026-04-24). Apply this pattern to future data projects to skip re-deriving the same decisions.
>
> **Reference implementation:** `dwh/26100{1..4}.sql` + `docs/DWH_PIPELINE.md` + the two `n8n-workflows/dwh_*.json` files. Treat those as the copy-paste template for the next project.
---
## 1. When to Use This Pattern
**Use it when all of these are true:**
- You need an analytical replica of a production OLTP DB without letting analytical load hit the OLTP.
- Source and target are separate PostgreSQL instances (possibly separate networks).
- Data volumes are moderate: millions of rows per day, not billions.
- You already have an orchestrator (n8n, Airflow, Prefect) and object storage (rustfs, S3, MinIO) in the stack.
- Latency tolerance is hours, not minutes.
**Don't use it when:**
- Sub-minute latency is required → use logical replication or CDC (Debezium, pg_logical, AWS DMS).
- Volumes exceed ~100 GB/day → step up to Spark/Flink + columnar store (Iceberg, Delta).
- Source and target are the same DB → just use materialized views or scheduled SQL.
- You need exactly-once streaming semantics → this pattern is at-least-once + idempotent load.
---
## 2. The Core Pattern
```
┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Source DB │──(a)──▶ Orchestr. │──(b)──▶ Object Store │──(c)──▶ Target DB │
│ (OLTP) │ │ (extract) │ │ (CSVs) │ │ (bronze) │
└──────────────┘ └──────────────┘ └──────────────┘ └──────────────┘
│ ▲
└──────────────(d)─────────────────────────────┘
(load workflow, per CSV)
(a) Watermarked SELECT, closed upper bound
(b) Atomic CSV upload with timestamped path
(c) CSV stays until load confirms success
(d) Load = BEGIN → INSERT ON CONFLICT → UPDATE watermark → UPDATE run log → COMMIT → move CSV
```
**Why three stages and not two:**
- **Audit trail** — every extracted CSV is a point-in-time snapshot you can replay.
- **Decoupling** — target DB downtime doesn't lose data; the CSV waits.
- **Observability** — blob listings are a second source of truth independent of the DB.
---
## 3. Pre-flight Checklist
Before writing any SQL or workflow JSON, confirm in writing:
- [ ] Source DB reachable from orchestrator (internal network preferred, VPN/public IP with `sslmode=require` otherwise).
- [ ] Target DB reachable; you hold a superuser credential for one-time DDL.
- [ ] Object storage bucket exists; credentials are configured in the orchestrator.
- [ ] For each source table to extract, you have identified:
- A **DB-insertion timestamp column** (not device/user-reported time), or "it's a snapshot table".
- A **natural unique key** that already has a `PRIMARY KEY` or `UNIQUE` constraint on source (for the `ON CONFLICT` target on bronze).
- Any **unit/column drift** between source and target (e.g., `distance_m` vs. `distance_km`).
- [ ] Acceptable end-to-end latency (to calibrate cron cadence).
- [ ] Security baseline: who writes bronze, who reads it, SSL requirement, password rotation cadence.
If any row is unchecked, pause and resolve it. Skipping this step is the #1 cause of pipelines that "worked in test but lose data in prod."
---
## 4. Phase-by-Phase Execution
Execute in order. Phases are independent of each other within their phase, but phases have strict dependencies.
### Phase A — Target DB preparation
Apply three types of migrations, in numeric order:
1. **Bronze DDL** — one table per source table. Use `IF NOT EXISTS`; make it idempotent.
2. **Control schema**`dwh_control.extract_watermarks` + `dwh_control.extract_runs`.
3. **Assertion migrations** — verify roles exist, verify every `ON CONFLICT` target is backed by a PK/UNIQUE (fail loudly if not).
Template files: `dwh/260423_dwh_ddl_v1.sql`, `dwh/261001_dwh_control.sql`, `dwh/261002_bronze_constraints_audit.sql`, `dwh/261003_dwh_roles.sql`.
**Role model:**
- `<proj>_owner` — owns schemas, writes bronze + control tables.
- `<proj>_ro` (or `grafana_ro`) — reads everything, writes nothing.
- Never use `postgres` or another superuser from the orchestrator.
**Watermark seed:** set `last_extracted_at` to a date before any real data (`'2000-01-01T00:00:00Z'` is safe) so the first run back-fills all history in a single CSV per table.
### Phase B — Object storage
Create two prefixes per table:
```
s3://<bucket>/<project>/exports/{table}/ # active CSVs, in-flight
s3://<bucket>/<project>/processed/{table}/ # loaded CSVs, never deleted (audit)
```
Naming convention: `{YYYYMMDD_HHMM}_{TZ}.csv` (e.g., `20260424_1400_EAT.csv`). Timezone in the filename because "08:00" means nothing a year from now without it.
Retention: match whatever backup retention is already in the stack (e.g., 30 days). `processed/` should outlive `exports/`.
### Phase C — Orchestrator credentials
Three credentials:
| Credential | Role | Purpose |
|---|---|---|
| `<proj>_source` | Read-only role on source DB | Extract queries |
| `<proj>_dwh_target` | `<proj>_owner` on target DB | Bronze writes + control updates |
| `<proj>_s3` | IAM user with `s3:PutObject`, `s3:GetObject`, `s3:ListBucket`, `s3:DeleteObject` on the prefix | CSV upload/download/move |
**Always** `sslmode=require` on any public-IP DB connection. Test each credential with the orchestrator's "Test connection" button before proceeding.
### Phase D — Load workflow (build this BEFORE the extract workflow)
Building load first lets you iterate with hand-crafted CSVs in blob storage before wiring up extract. Much faster feedback loop.
Load workflow input (parameters):
```json
{
"table": "position_history",
"csv_path": "s3://bucket/project/exports/position_history/20260424_1400_EAT.csv",
"run_id": 12345,
"run_started_at": "2026-04-24T11:00:00Z"
}
```
Load workflow steps:
1. Download CSV from blob storage.
2. Parse CSV into rows.
3. **Open transaction.**
4. `INSERT INTO bronze.<table> (...) VALUES (...) ON CONFLICT (<natural_key>) DO NOTHING;`
5. `UPDATE dwh_control.extract_watermarks SET last_extracted_at = :run_started_at, last_loaded_at = NOW(), rows_loaded_last_run = <count> WHERE table_name = :table;`
6. `UPDATE dwh_control.extract_runs SET status = 'loaded', run_finished_at = NOW(), rows_loaded = <count> WHERE run_id = :run_id;`
7. **Commit.**
8. Move CSV from `exports/` to `processed/` (copy-then-delete; never delete before copy confirms).
**Non-negotiable invariants:**
- Steps 37 are one transaction. If any fails, all rollback.
- Step 8 only runs after commit. If step 8 fails, the next run will re-load the CSV (idempotent via ON CONFLICT) — not a data loss event.
### Phase E — Extract workflow
Extract workflow steps, per table:
1. Read current watermark: `SELECT last_extracted_at FROM dwh_control.extract_watermarks WHERE table_name = :table;`
2. Capture `run_started_at = NOW()` (in the target DB's clock, not the orchestrator's — reduces clock-skew bugs).
3. `INSERT INTO dwh_control.extract_runs (table_name, run_started_at, status) VALUES (:table, :run_started_at, 'extracting') RETURNING run_id;`
4. Query source with closed upper bound:
```sql
SELECT <cols>
FROM <source_schema>.<table>
WHERE <watermark_col> > :last_extracted_at
AND <watermark_col> <= :run_started_at
ORDER BY <watermark_col>;
```
5. Render rows as CSV. For geometry columns: `CASE WHEN geom IS NULL THEN NULL ELSE ST_AsEWKT(geom) END`.
6. Upload CSV to `s3://bucket/project/exports/{table}/{YYYYMMDD_HHMM}_{TZ}.csv`.
7. `UPDATE dwh_control.extract_runs SET status = 'uploaded', rows_extracted = <count>, csv_path = :path WHERE run_id = :run_id;`
8. Call load workflow with `{table, csv_path, run_id, run_started_at}`.
### Phase F — Schedule + observability
**Cron cadence:** start with 68 runs/day during active hours. Fold the overnight gap where traffic is low. Example: `0 5,8,11,14,17,20,23 * * *` TZ `Africa/Nairobi`.
**Three observability views** (readable by the RO role):
- `v_table_freshness` — per-table lag from last successful load. Drives the freshness alert.
- `v_recent_failures` — failed runs in last 24h. Zero rows = healthy.
- `v_watermark_lag` — extract vs. load lag per table. Distinguishes "nothing to extract" from "stuck".
Template file: `dwh/261004_dwh_observability_views.sql`.
**Grafana panels** (add at minimum):
1. Freshness panel — red if any row in `v_table_freshness` has `lag > 4h`.
2. Failures panel — red if `v_recent_failures` has any row.
3. Row counts panel — daily bar chart from `extract_runs`.
---
## 5. Design Principles (Do Not Skip)
### 5.1 Watermark on DB insertion time, not source-reported time
The watermark column must be "when the target DB got the row", not "when the device/user said it happened". Device clocks skew, webhooks arrive late, and batch imports backdate records. A source-reported watermark will silently drop rows that arrive out of order. Use `recorded_at`, `created_at`, `updated_at` (with `DEFAULT NOW()`), or `ingested_at` — never `gps_time` / `event_time` / `timestamp`.
### 5.2 Closed upper bound
Extract uses `> last_extracted_at AND <= run_started_at`. The closed upper bound prevents "row committed at `NOW()` during the extract query" from appearing in two adjacent runs. Without it, some rows are double-extracted (wasteful) or missed (data loss).
### 5.3 Idempotent load via natural unique keys
Every incremental bronze table needs a PRIMARY KEY or UNIQUE that matches the source's natural unique key. `ON CONFLICT DO NOTHING` makes re-running a CSV harmless. **Do not invent surrogate keys on bronze** — they defeat the ON CONFLICT guarantee. If the source has no natural key, fix the source or accept the table as a snapshot.
### 5.4 Transactional load boundary
Insert + watermark update + run-log update are one transaction. Splitting them creates "ghost" states where watermark advanced but rows didn't load, causing silent holes.
### 5.5 CSV audit trail — never delete
Moved-to-`processed/` CSVs are cheap ($0.023/GB/month on S3-class storage). They pay for themselves the first time you need to replay a window or debug a row-count mismatch.
### 5.6 PostGIS round-trip via EWKT
`ST_AsEWKT(geom)` on extract, `ST_GeomFromEWKT(ewkt)` on load. Preserves SRID inline. Do NOT store `ST_AsText` + separate SRID column — it doubles the chance of mismatch. Guard NULLs: `CASE WHEN geom IS NULL THEN NULL ELSE ST_AsEWKT(geom) END`.
### 5.7 Fail loud, fail early
Audit migrations (roles, constraints) should `RAISE EXCEPTION` with a bullet list of what's missing. Silent success is worse than noisy failure — a missing PK surfaces three months later as "why are there duplicate trips?".
---
## 6. Snapshot vs. Incremental Decision Matrix
| Signal | Snapshot (TRUNCATE + reload) | Incremental (watermark + append) |
|---|---|---|
| Row count | < ~10k | > ~10k |
| Meaning of "current state" | Matters | Doesn't matter; history matters |
| Deletes in source | Common | Rare |
| Update frequency per row | High | Low (append-mostly) |
| Natural unique key | May not exist | Must exist |
| Example | `devices`, `live_positions`, `geofences` | `position_history`, `trips`, `alarms`, event logs |
When in doubt: **snapshot is simpler**. Only escalate to incremental when the snapshot CSV would exceed a few MB per run.
---
## 7. Observability Contract
Every pipeline adds these three views to its control schema — no exceptions:
```sql
CREATE OR REPLACE VIEW <control>.v_table_freshness AS
SELECT table_name,
MAX(run_finished_at) AS last_loaded_at,
NOW() - MAX(run_finished_at) AS lag,
COUNT(*) FILTER (WHERE run_started_at > NOW() - INTERVAL '24 hours') AS loads_last_24h
FROM <control>.extract_runs
WHERE status = 'loaded'
GROUP BY table_name;
CREATE OR REPLACE VIEW <control>.v_recent_failures AS
SELECT run_id, table_name, run_started_at, run_finished_at, csv_path, error_message
FROM <control>.extract_runs
WHERE status = 'failed' AND run_started_at > NOW() - INTERVAL '24 hours'
ORDER BY run_started_at DESC;
CREATE OR REPLACE VIEW <control>.v_watermark_lag AS
SELECT table_name, last_extracted_at, last_loaded_at, rows_loaded_last_run,
NOW() - last_loaded_at AS load_lag,
NOW() - last_extracted_at AS extract_lag
FROM <control>.extract_watermarks;
```
Wire a Grafana alert on each view. Test the alert by manually failing a run before go-live.
---
## 8. Schema Drift Handling
Schema drift between source and bronze is inevitable. Two rules:
1. **Detect at design time.** Diff source DDL against bronze DDL before writing any extract SQL. Unit changes (metres vs. km), renamed columns, and added nullable columns are the usual suspects.
2. **Fix in the extract query, not the load.** Put all transformations in the SELECT so the CSV on disk already matches the bronze column names and units. The load workflow should be dumb — CSV column N goes to bronze column N.
Document every drift in the runbook (§5 of the operations runbook). Future developers WILL hit them.
---
## 9. Verification Gates
### Pre-deploy (before first cron tick)
- [ ] Every migration applied successfully.
- [ ] Control tables seeded (one watermark row per incremental table).
- [ ] Every credential's "Test connection" passes.
- [ ] Blob storage prefixes exist.
- [ ] Manual workflow trigger succeeds end-to-end for one table.
### First run (manual trigger of extract workflow)
- [ ] Every processed table has a row in `extract_runs` with `status='loaded'`.
- [ ] Row-count parity with source (± in-flight writes): `SELECT COUNT(*) FROM <source>` vs. `SELECT COUNT(*) FROM bronze.<table>`.
- [ ] Geometry columns round-trip cleanly: `SELECT ST_AsText(geom) FROM bronze.<table> LIMIT 5` returns valid POINTs.
- [ ] All CSVs moved from `exports/` to `processed/`.
### Steady-state (after 24h / first full schedule cycle)
- [ ] `v_table_freshness` shows lag < cadence × 2 for every table.
- [ ] `v_recent_failures` is empty.
- [ ] Row counts in bronze growing at expected rate.
Only declare "done" after all three gates pass.
---
## 10. Scheduling Calibration
Tradeoffs:
| Cadence | Pros | Cons |
|---|---|---|
| Every 15 min | Low lag, small CSVs | High orchestrator churn, noisy alerts |
| Every 3 h (recommended) | Predictable, fits ops windows, tolerable lag | Overnight backlog carries to morning |
| Nightly (once/day) | Cheap, simple | Unacceptable for real-time panels |
Rule of thumb: cadence = 2550% of your latency tolerance. 4h latency budget 1-2h cadence.
Fold cadence around traffic patterns. Don't run 24× at 1-hour intervals if the source generates zero rows between midnight and 05:00.
---
## 11. Common Failure Modes & Recovery
| Failure | Symptom | Fix |
|---|---|---|
| CSV stuck in `exports/` | `v_recent_failures` has a row; CSV never moved | Next scheduled run retries automatically (idempotent). If persistent, open orchestrator logs by `run_id`. |
| Table marked `loading` for >1 cadence | n8n executor crashed mid-transaction | DB rolled back. Next run retries. If stuck >2 cadences, manually re-trigger the extract. |
| Row counts diverge > 1% | CSV parse error silently dropped rows | `rows_extracted != rows_loaded` in `extract_runs` — inspect the CSV for malformed rows. |
| Geometry loads as NULL | EWKT serialisation broke | Check for missing `CASE WHEN geom IS NULL` guard in extract SQL. |
| Distance/units 1000× wrong | Schema drift not caught | Check extract SQL for the unit conversion (see §8). |
**Back-fill a window:**
```sql
UPDATE <control>.extract_watermarks
SET last_extracted_at = NOW() - INTERVAL '24 hours'
WHERE table_name = '<table>';
```
Next run re-extracts the gap. `ON CONFLICT DO NOTHING` filters duplicates.
**Full reseed (nuclear):**
```sql
UPDATE <control>.extract_watermarks
SET last_extracted_at = '2000-01-01T00:00:00Z'
WHERE table_name = '<table>';
```
Next run back-fills all history in one very large CSV. Expected; it moves to `processed/` on success.
---
## 12. Security Baseline
- Two roles minimum: owner (writes) and RO (reads). Never use superuser from the orchestrator.
- `sslmode=require` on every public-IP DB connection.
- Passwords never in committed SQL — use placeholder tokens (`CHANGE_ME_BEFORE_APPLY`) and swap in-session during apply. Document rotation in the runbook.
- Blob storage credentials scoped to the project's prefix, not the whole bucket.
- Rotate all credentials before go-live (don't reuse the ones that were flying around in design conversations).
---
## 13. Reusability Checklist (Applying to a New Project)
When starting a new data project, copy the Tracksolid DWH layout and edit these points:
- [ ] Rename schemas: `<proj>_control` instead of `dwh_control` if multiple DWHs share a DB.
- [ ] Adjust `<proj>_owner` / `<proj>_ro` role names.
- [ ] Update bucket prefix: `s3://<bucket>/<project>/exports|processed/`.
- [ ] Re-do the snapshot/incremental decision for every source table (§6).
- [ ] Identify watermark columns and natural unique keys for every incremental table (§5.1, §5.3).
- [ ] Map schema drift before writing extract SQL (§8).
- [ ] Calibrate cadence to the new project's latency budget (§10).
- [ ] Ship the three observability views (§7) — even if nobody will look at them in week one.
- [ ] Write the runbook from the template: follow `docs/DWH_PIPELINE.md` section-for-section.
- [ ] Run the verification gates (§9) before declaring done.
---
## 14. Reference Implementation (Tracksolid DWH)
These files are the copy-paste template:
| File | Purpose |
|---|---|
| `dwh/260423_dwh_ddl_v1.sql` | Bronze DDL + roles + schemas |
| `dwh/261001_dwh_control.sql` | Control schema (watermarks + run log) |
| `dwh/261002_bronze_constraints_audit.sql` | ON CONFLICT key assertion |
| `dwh/261003_dwh_roles.sql` | Role contract assertion |
| `dwh/261004_dwh_observability_views.sql` | Freshness/failure/watermark views |
| `docs/DWH_PIPELINE.md` | Operations runbook (troubleshooting, manual re-run, rotation) |
| `docs/superpowers/specs/2026-04-24-n8n-dwh-bronze-pipeline-design.md` | Design spec (why each decision) |
| `docs/superpowers/plans/2026-04-24-n8n-dwh-bronze-pipeline.md` | Task-by-task implementation plan |
| `n8n-workflows/dwh_extract.json` | Extract workflow (reference) |
| `n8n-workflows/dwh_load_bronze.json` | Load workflow (reference) |
**For the next project, fork this manual first, then adapt.** Do not re-design from scratch — the seven design principles in §5 are the parts people keep getting wrong.