tracksolid_timescale_grafan.../DWH_Execution_Manual.md

386 lines
19 KiB
Markdown
Raw Permalink Normal View History

# DWH Execution Manual
> **Purpose:** A reusable playbook for building an `extract → blob → load` data warehouse bronze layer using n8n (or any equivalent orchestrator) + object storage + PostgreSQL/PostGIS. Generalised from the Fireside Tracksolid DWH pipeline (2026-04-24). Apply this pattern to future data projects to skip re-deriving the same decisions.
>
> **Reference implementation:** `dwh/26100{1..4}.sql` + `docs/DWH_PIPELINE.md` + the two `n8n-workflows/dwh_*.json` files. Treat those as the copy-paste template for the next project.
---
## 1. When to Use This Pattern
**Use it when all of these are true:**
- You need an analytical replica of a production OLTP DB without letting analytical load hit the OLTP.
- Source and target are separate PostgreSQL instances (possibly separate networks).
- Data volumes are moderate: millions of rows per day, not billions.
- You already have an orchestrator (n8n, Airflow, Prefect) and object storage (rustfs, S3, MinIO) in the stack.
- Latency tolerance is hours, not minutes.
**Don't use it when:**
- Sub-minute latency is required → use logical replication or CDC (Debezium, pg_logical, AWS DMS).
- Volumes exceed ~100 GB/day → step up to Spark/Flink + columnar store (Iceberg, Delta).
- Source and target are the same DB → just use materialized views or scheduled SQL.
- You need exactly-once streaming semantics → this pattern is at-least-once + idempotent load.
---
## 2. The Core Pattern
```
┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Source DB │──(a)──▶ Orchestr. │──(b)──▶ Object Store │──(c)──▶ Target DB │
│ (OLTP) │ │ (extract) │ │ (CSVs) │ │ (bronze) │
└──────────────┘ └──────────────┘ └──────────────┘ └──────────────┘
│ ▲
└──────────────(d)─────────────────────────────┘
(load workflow, per CSV)
(a) Watermarked SELECT, closed upper bound
(b) Atomic CSV upload with timestamped path
(c) CSV stays until load confirms success
(d) Load = BEGIN → INSERT ON CONFLICT → UPDATE watermark → UPDATE run log → COMMIT → move CSV
```
**Why three stages and not two:**
- **Audit trail** — every extracted CSV is a point-in-time snapshot you can replay.
- **Decoupling** — target DB downtime doesn't lose data; the CSV waits.
- **Observability** — blob listings are a second source of truth independent of the DB.
---
## 3. Pre-flight Checklist
Before writing any SQL or workflow JSON, confirm in writing:
- [ ] Source DB reachable from orchestrator (internal network preferred, VPN/public IP with `sslmode=require` otherwise).
- [ ] Target DB reachable; you hold a superuser credential for one-time DDL.
- [ ] Object storage bucket exists; credentials are configured in the orchestrator.
- [ ] For each source table to extract, you have identified:
- A **DB-insertion timestamp column** (not device/user-reported time), or "it's a snapshot table".
- A **natural unique key** that already has a `PRIMARY KEY` or `UNIQUE` constraint on source (for the `ON CONFLICT` target on bronze).
- Any **unit/column drift** between source and target (e.g., `distance_m` vs. `distance_km`).
- [ ] Acceptable end-to-end latency (to calibrate cron cadence).
- [ ] Security baseline: who writes bronze, who reads it, SSL requirement, password rotation cadence.
If any row is unchecked, pause and resolve it. Skipping this step is the #1 cause of pipelines that "worked in test but lose data in prod."
---
## 4. Phase-by-Phase Execution
Execute in order. Phases are independent of each other within their phase, but phases have strict dependencies.
### Phase A — Target DB preparation
Apply three types of migrations, in numeric order:
1. **Bronze DDL** — one table per source table. Use `IF NOT EXISTS`; make it idempotent.
2. **Control schema**`dwh_control.extract_watermarks` + `dwh_control.extract_runs`.
3. **Assertion migrations** — verify roles exist, verify every `ON CONFLICT` target is backed by a PK/UNIQUE (fail loudly if not).
Template files: `dwh/260423_dwh_ddl_v1.sql`, `dwh/261001_dwh_control.sql`, `dwh/261002_bronze_constraints_audit.sql`, `dwh/261003_dwh_roles.sql`.
**Role model:**
- `<proj>_owner` — owns schemas, writes bronze + control tables.
- `<proj>_ro` (or `grafana_ro`) — reads everything, writes nothing.
- Never use `postgres` or another superuser from the orchestrator.
**Watermark seed:** set `last_extracted_at` to a date before any real data (`'2000-01-01T00:00:00Z'` is safe) so the first run back-fills all history in a single CSV per table.
### Phase B — Object storage
Create two prefixes per table:
```
s3://<bucket>/<project>/exports/{table}/ # active CSVs, in-flight
s3://<bucket>/<project>/processed/{table}/ # loaded CSVs, never deleted (audit)
```
Naming convention: `{YYYYMMDD_HHMM}_{TZ}.csv` (e.g., `20260424_1400_EAT.csv`). Timezone in the filename because "08:00" means nothing a year from now without it.
Retention: match whatever backup retention is already in the stack (e.g., 30 days). `processed/` should outlive `exports/`.
### Phase C — Orchestrator credentials
Three credentials:
| Credential | Role | Purpose |
|---|---|---|
| `<proj>_source` | Read-only role on source DB | Extract queries |
| `<proj>_dwh_target` | `<proj>_owner` on target DB | Bronze writes + control updates |
| `<proj>_s3` | IAM user with `s3:PutObject`, `s3:GetObject`, `s3:ListBucket`, `s3:DeleteObject` on the prefix | CSV upload/download/move |
**Always** `sslmode=require` on any public-IP DB connection. Test each credential with the orchestrator's "Test connection" button before proceeding.
### Phase D — Load workflow (build this BEFORE the extract workflow)
Building load first lets you iterate with hand-crafted CSVs in blob storage before wiring up extract. Much faster feedback loop.
Load workflow input (parameters):
```json
{
"table": "position_history",
"csv_path": "s3://bucket/project/exports/position_history/20260424_1400_EAT.csv",
"run_id": 12345,
"run_started_at": "2026-04-24T11:00:00Z"
}
```
Load workflow steps:
1. Download CSV from blob storage.
2. Parse CSV into rows.
3. **Open transaction.**
4. `INSERT INTO bronze.<table> (...) VALUES (...) ON CONFLICT (<natural_key>) DO NOTHING;`
5. `UPDATE dwh_control.extract_watermarks SET last_extracted_at = :run_started_at, last_loaded_at = NOW(), rows_loaded_last_run = <count> WHERE table_name = :table;`
6. `UPDATE dwh_control.extract_runs SET status = 'loaded', run_finished_at = NOW(), rows_loaded = <count> WHERE run_id = :run_id;`
7. **Commit.**
8. Move CSV from `exports/` to `processed/` (copy-then-delete; never delete before copy confirms).
**Non-negotiable invariants:**
- Steps 37 are one transaction. If any fails, all rollback.
- Step 8 only runs after commit. If step 8 fails, the next run will re-load the CSV (idempotent via ON CONFLICT) — not a data loss event.
### Phase E — Extract workflow
Extract workflow steps, per table:
1. Read current watermark: `SELECT last_extracted_at FROM dwh_control.extract_watermarks WHERE table_name = :table;`
2. Capture `run_started_at = NOW()` (in the target DB's clock, not the orchestrator's — reduces clock-skew bugs).
3. `INSERT INTO dwh_control.extract_runs (table_name, run_started_at, status) VALUES (:table, :run_started_at, 'extracting') RETURNING run_id;`
4. Query source with closed upper bound:
```sql
SELECT <cols>
FROM <source_schema>.<table>
WHERE <watermark_col> > :last_extracted_at
AND <watermark_col> <= :run_started_at
ORDER BY <watermark_col>;
```
5. Render rows as CSV. For geometry columns: `CASE WHEN geom IS NULL THEN NULL ELSE ST_AsEWKT(geom) END`.
6. Upload CSV to `s3://bucket/project/exports/{table}/{YYYYMMDD_HHMM}_{TZ}.csv`.
7. `UPDATE dwh_control.extract_runs SET status = 'uploaded', rows_extracted = <count>, csv_path = :path WHERE run_id = :run_id;`
8. Call load workflow with `{table, csv_path, run_id, run_started_at}`.
### Phase F — Schedule + observability
**Cron cadence:** start with 68 runs/day during active hours. Fold the overnight gap where traffic is low. Example: `0 5,8,11,14,17,20,23 * * *` TZ `Africa/Nairobi`.
**Three observability views** (readable by the RO role):
- `v_table_freshness` — per-table lag from last successful load. Drives the freshness alert.
- `v_recent_failures` — failed runs in last 24h. Zero rows = healthy.
- `v_watermark_lag` — extract vs. load lag per table. Distinguishes "nothing to extract" from "stuck".
Template file: `dwh/261004_dwh_observability_views.sql`.
**Grafana panels** (add at minimum):
1. Freshness panel — red if any row in `v_table_freshness` has `lag > 4h`.
2. Failures panel — red if `v_recent_failures` has any row.
3. Row counts panel — daily bar chart from `extract_runs`.
---
## 5. Design Principles (Do Not Skip)
### 5.1 Watermark on DB insertion time, not source-reported time
The watermark column must be "when the target DB got the row", not "when the device/user said it happened". Device clocks skew, webhooks arrive late, and batch imports backdate records. A source-reported watermark will silently drop rows that arrive out of order. Use `recorded_at`, `created_at`, `updated_at` (with `DEFAULT NOW()`), or `ingested_at` — never `gps_time` / `event_time` / `timestamp`.
### 5.2 Closed upper bound
Extract uses `> last_extracted_at AND <= run_started_at`. The closed upper bound prevents "row committed at `NOW()` during the extract query" from appearing in two adjacent runs. Without it, some rows are double-extracted (wasteful) or missed (data loss).
### 5.3 Idempotent load via natural unique keys
Every incremental bronze table needs a PRIMARY KEY or UNIQUE that matches the source's natural unique key. `ON CONFLICT DO NOTHING` makes re-running a CSV harmless. **Do not invent surrogate keys on bronze** — they defeat the ON CONFLICT guarantee. If the source has no natural key, fix the source or accept the table as a snapshot.
### 5.4 Transactional load boundary
Insert + watermark update + run-log update are one transaction. Splitting them creates "ghost" states where watermark advanced but rows didn't load, causing silent holes.
### 5.5 CSV audit trail — never delete
Moved-to-`processed/` CSVs are cheap ($0.023/GB/month on S3-class storage). They pay for themselves the first time you need to replay a window or debug a row-count mismatch.
### 5.6 PostGIS round-trip via EWKT
`ST_AsEWKT(geom)` on extract, `ST_GeomFromEWKT(ewkt)` on load. Preserves SRID inline. Do NOT store `ST_AsText` + separate SRID column — it doubles the chance of mismatch. Guard NULLs: `CASE WHEN geom IS NULL THEN NULL ELSE ST_AsEWKT(geom) END`.
### 5.7 Fail loud, fail early
Audit migrations (roles, constraints) should `RAISE EXCEPTION` with a bullet list of what's missing. Silent success is worse than noisy failure — a missing PK surfaces three months later as "why are there duplicate trips?".
---
## 6. Snapshot vs. Incremental Decision Matrix
| Signal | Snapshot (TRUNCATE + reload) | Incremental (watermark + append) |
|---|---|---|
| Row count | < ~10k | > ~10k |
| Meaning of "current state" | Matters | Doesn't matter; history matters |
| Deletes in source | Common | Rare |
| Update frequency per row | High | Low (append-mostly) |
| Natural unique key | May not exist | Must exist |
| Example | `devices`, `live_positions`, `geofences` | `position_history`, `trips`, `alarms`, event logs |
When in doubt: **snapshot is simpler**. Only escalate to incremental when the snapshot CSV would exceed a few MB per run.
---
## 7. Observability Contract
Every pipeline adds these three views to its control schema — no exceptions:
```sql
CREATE OR REPLACE VIEW <control>.v_table_freshness AS
SELECT table_name,
MAX(run_finished_at) AS last_loaded_at,
NOW() - MAX(run_finished_at) AS lag,
COUNT(*) FILTER (WHERE run_started_at > NOW() - INTERVAL '24 hours') AS loads_last_24h
FROM <control>.extract_runs
WHERE status = 'loaded'
GROUP BY table_name;
CREATE OR REPLACE VIEW <control>.v_recent_failures AS
SELECT run_id, table_name, run_started_at, run_finished_at, csv_path, error_message
FROM <control>.extract_runs
WHERE status = 'failed' AND run_started_at > NOW() - INTERVAL '24 hours'
ORDER BY run_started_at DESC;
CREATE OR REPLACE VIEW <control>.v_watermark_lag AS
SELECT table_name, last_extracted_at, last_loaded_at, rows_loaded_last_run,
NOW() - last_loaded_at AS load_lag,
NOW() - last_extracted_at AS extract_lag
FROM <control>.extract_watermarks;
```
Wire a Grafana alert on each view. Test the alert by manually failing a run before go-live.
---
## 8. Schema Drift Handling
Schema drift between source and bronze is inevitable. Two rules:
1. **Detect at design time.** Diff source DDL against bronze DDL before writing any extract SQL. Unit changes (metres vs. km), renamed columns, and added nullable columns are the usual suspects.
2. **Fix in the extract query, not the load.** Put all transformations in the SELECT so the CSV on disk already matches the bronze column names and units. The load workflow should be dumb — CSV column N goes to bronze column N.
Document every drift in the runbook (§5 of the operations runbook). Future developers WILL hit them.
---
## 9. Verification Gates
### Pre-deploy (before first cron tick)
- [ ] Every migration applied successfully.
- [ ] Control tables seeded (one watermark row per incremental table).
- [ ] Every credential's "Test connection" passes.
- [ ] Blob storage prefixes exist.
- [ ] Manual workflow trigger succeeds end-to-end for one table.
### First run (manual trigger of extract workflow)
- [ ] Every processed table has a row in `extract_runs` with `status='loaded'`.
- [ ] Row-count parity with source (± in-flight writes): `SELECT COUNT(*) FROM <source>` vs. `SELECT COUNT(*) FROM bronze.<table>`.
- [ ] Geometry columns round-trip cleanly: `SELECT ST_AsText(geom) FROM bronze.<table> LIMIT 5` returns valid POINTs.
- [ ] All CSVs moved from `exports/` to `processed/`.
### Steady-state (after 24h / first full schedule cycle)
- [ ] `v_table_freshness` shows lag < cadence × 2 for every table.
- [ ] `v_recent_failures` is empty.
- [ ] Row counts in bronze growing at expected rate.
Only declare "done" after all three gates pass.
---
## 10. Scheduling Calibration
Tradeoffs:
| Cadence | Pros | Cons |
|---|---|---|
| Every 15 min | Low lag, small CSVs | High orchestrator churn, noisy alerts |
| Every 3 h (recommended) | Predictable, fits ops windows, tolerable lag | Overnight backlog carries to morning |
| Nightly (once/day) | Cheap, simple | Unacceptable for real-time panels |
Rule of thumb: cadence = 2550% of your latency tolerance. 4h latency budget → 1-2h cadence.
Fold cadence around traffic patterns. Don't run 24× at 1-hour intervals if the source generates zero rows between midnight and 05:00.
---
## 11. Common Failure Modes & Recovery
| Failure | Symptom | Fix |
|---|---|---|
| CSV stuck in `exports/` | `v_recent_failures` has a row; CSV never moved | Next scheduled run retries automatically (idempotent). If persistent, open orchestrator logs by `run_id`. |
| Table marked `loading` for >1 cadence | n8n executor crashed mid-transaction | DB rolled back. Next run retries. If stuck >2 cadences, manually re-trigger the extract. |
| Row counts diverge > 1% | CSV parse error silently dropped rows | `rows_extracted != rows_loaded` in `extract_runs` — inspect the CSV for malformed rows. |
| Geometry loads as NULL | EWKT serialisation broke | Check for missing `CASE WHEN geom IS NULL` guard in extract SQL. |
| Distance/units 1000× wrong | Schema drift not caught | Check extract SQL for the unit conversion (see §8). |
**Back-fill a window:**
```sql
UPDATE <control>.extract_watermarks
SET last_extracted_at = NOW() - INTERVAL '24 hours'
WHERE table_name = '<table>';
```
Next run re-extracts the gap. `ON CONFLICT DO NOTHING` filters duplicates.
**Full reseed (nuclear):**
```sql
UPDATE <control>.extract_watermarks
SET last_extracted_at = '2000-01-01T00:00:00Z'
WHERE table_name = '<table>';
```
Next run back-fills all history in one very large CSV. Expected; it moves to `processed/` on success.
---
## 12. Security Baseline
- Two roles minimum: owner (writes) and RO (reads). Never use superuser from the orchestrator.
- `sslmode=require` on every public-IP DB connection.
- Passwords never in committed SQL — use placeholder tokens (`CHANGE_ME_BEFORE_APPLY`) and swap in-session during apply. Document rotation in the runbook.
- Blob storage credentials scoped to the project's prefix, not the whole bucket.
- Rotate all credentials before go-live (don't reuse the ones that were flying around in design conversations).
---
## 13. Reusability Checklist (Applying to a New Project)
When starting a new data project, copy the Tracksolid DWH layout and edit these points:
- [ ] Rename schemas: `<proj>_control` instead of `dwh_control` if multiple DWHs share a DB.
- [ ] Adjust `<proj>_owner` / `<proj>_ro` role names.
- [ ] Update bucket prefix: `s3://<bucket>/<project>/exports|processed/`.
- [ ] Re-do the snapshot/incremental decision for every source table (§6).
- [ ] Identify watermark columns and natural unique keys for every incremental table (§5.1, §5.3).
- [ ] Map schema drift before writing extract SQL (§8).
- [ ] Calibrate cadence to the new project's latency budget (§10).
- [ ] Ship the three observability views (§7) — even if nobody will look at them in week one.
- [ ] Write the runbook from the template: follow `docs/DWH_PIPELINE.md` section-for-section.
- [ ] Run the verification gates (§9) before declaring done.
---
## 14. Reference Implementation (Tracksolid DWH)
These files are the copy-paste template:
| File | Purpose |
|---|---|
| `dwh/260423_dwh_ddl_v1.sql` | Bronze DDL + roles + schemas |
| `dwh/261001_dwh_control.sql` | Control schema (watermarks + run log) |
| `dwh/261002_bronze_constraints_audit.sql` | ON CONFLICT key assertion |
| `dwh/261003_dwh_roles.sql` | Role contract assertion |
| `dwh/261004_dwh_observability_views.sql` | Freshness/failure/watermark views |
| `docs/DWH_PIPELINE.md` | Operations runbook (troubleshooting, manual re-run, rotation) |
| `docs/superpowers/specs/2026-04-24-n8n-dwh-bronze-pipeline-design.md` | Design spec (why each decision) |
| `docs/superpowers/plans/2026-04-24-n8n-dwh-bronze-pipeline.md` | Task-by-task implementation plan |
| `n8n-workflows/dwh_extract.json` | Extract workflow (reference) |
| `n8n-workflows/dwh_load_bronze.json` | Load workflow (reference) |
**For the next project, fork this manual first, then adapt.** Do not re-design from scratch — the seven design principles in §5 are the parts people keep getting wrong.