DWH pipeline (new):
- dwh/261001_dwh_control.sql — watermarks + per-run audit log schema
- dwh/261002_bronze_constraints_audit.sql — ON CONFLICT key assertion
- dwh/261003_dwh_roles.sql — dwh_owner / grafana_ro contract assertion
- dwh/261004_dwh_observability_views.sql — v_table_freshness,
v_recent_failures, v_watermark_lag (readable by grafana_ro)
- docs/DWH_PIPELINE.md — operations runbook (setup, troubleshooting,
manual re-run, back-fill, rotation)
- DWH_Execution_Manual.md — reusable playbook for future data
projects (extract → blob → load pattern, 7 design principles,
snapshot-vs-incremental matrix, verification gates)
- docs/superpowers/{specs,plans}/2026-04-24-n8n-dwh-bronze-pipeline-*
— design spec + 27-task implementation plan
Security:
- dwh/260423_dwh_ddl_v1.sql — redacted plaintext role passwords to
'CHANGE_ME_BEFORE_APPLY' placeholders; added SECURITY header
documenting generation + rotation flow
Docs:
- CLAUDE.md — §3 adds tracksolid_dwh@31.97.44.246:5888 target,
§4 adds dwh/ + docs/DWH_PIPELINE.md to codebase map, §5 adds
bronze + dwh_control schema roll-up, §10 adds deploy task +
password rotation follow-up
Also includes miscellaneous in-progress files accumulated on this
branch (workspace, analytics notes, vehicle CSVs, extract helpers,
renamed markdown archives).
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
19 KiB
DWH Execution Manual
Purpose: A reusable playbook for building an
extract → blob → loaddata warehouse bronze layer using n8n (or any equivalent orchestrator) + object storage + PostgreSQL/PostGIS. Generalised from the Fireside Tracksolid DWH pipeline (2026-04-24). Apply this pattern to future data projects to skip re-deriving the same decisions.Reference implementation:
dwh/26100{1..4}.sql+docs/DWH_PIPELINE.md+ the twon8n-workflows/dwh_*.jsonfiles. Treat those as the copy-paste template for the next project.
1. When to Use This Pattern
Use it when all of these are true:
- You need an analytical replica of a production OLTP DB without letting analytical load hit the OLTP.
- Source and target are separate PostgreSQL instances (possibly separate networks).
- Data volumes are moderate: millions of rows per day, not billions.
- You already have an orchestrator (n8n, Airflow, Prefect) and object storage (rustfs, S3, MinIO) in the stack.
- Latency tolerance is hours, not minutes.
Don't use it when:
- Sub-minute latency is required → use logical replication or CDC (Debezium, pg_logical, AWS DMS).
- Volumes exceed ~100 GB/day → step up to Spark/Flink + columnar store (Iceberg, Delta).
- Source and target are the same DB → just use materialized views or scheduled SQL.
- You need exactly-once streaming semantics → this pattern is at-least-once + idempotent load.
2. The Core Pattern
┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Source DB │──(a)──▶ Orchestr. │──(b)──▶ Object Store │──(c)──▶ Target DB │
│ (OLTP) │ │ (extract) │ │ (CSVs) │ │ (bronze) │
└──────────────┘ └──────────────┘ └──────────────┘ └──────────────┘
│ ▲
└──────────────(d)─────────────────────────────┘
(load workflow, per CSV)
(a) Watermarked SELECT, closed upper bound
(b) Atomic CSV upload with timestamped path
(c) CSV stays until load confirms success
(d) Load = BEGIN → INSERT ON CONFLICT → UPDATE watermark → UPDATE run log → COMMIT → move CSV
Why three stages and not two:
- Audit trail — every extracted CSV is a point-in-time snapshot you can replay.
- Decoupling — target DB downtime doesn't lose data; the CSV waits.
- Observability — blob listings are a second source of truth independent of the DB.
3. Pre-flight Checklist
Before writing any SQL or workflow JSON, confirm in writing:
- Source DB reachable from orchestrator (internal network preferred, VPN/public IP with
sslmode=requireotherwise). - Target DB reachable; you hold a superuser credential for one-time DDL.
- Object storage bucket exists; credentials are configured in the orchestrator.
- For each source table to extract, you have identified:
- A DB-insertion timestamp column (not device/user-reported time), or "it's a snapshot table".
- A natural unique key that already has a
PRIMARY KEYorUNIQUEconstraint on source (for theON CONFLICTtarget on bronze). - Any unit/column drift between source and target (e.g.,
distance_mvs.distance_km).
- Acceptable end-to-end latency (to calibrate cron cadence).
- Security baseline: who writes bronze, who reads it, SSL requirement, password rotation cadence.
If any row is unchecked, pause and resolve it. Skipping this step is the #1 cause of pipelines that "worked in test but lose data in prod."
4. Phase-by-Phase Execution
Execute in order. Phases are independent of each other within their phase, but phases have strict dependencies.
Phase A — Target DB preparation
Apply three types of migrations, in numeric order:
- Bronze DDL — one table per source table. Use
IF NOT EXISTS; make it idempotent. - Control schema —
dwh_control.extract_watermarks+dwh_control.extract_runs. - Assertion migrations — verify roles exist, verify every
ON CONFLICTtarget is backed by a PK/UNIQUE (fail loudly if not).
Template files: dwh/260423_dwh_ddl_v1.sql, dwh/261001_dwh_control.sql, dwh/261002_bronze_constraints_audit.sql, dwh/261003_dwh_roles.sql.
Role model:
<proj>_owner— owns schemas, writes bronze + control tables.<proj>_ro(orgrafana_ro) — reads everything, writes nothing.- Never use
postgresor another superuser from the orchestrator.
Watermark seed: set last_extracted_at to a date before any real data ('2000-01-01T00:00:00Z' is safe) so the first run back-fills all history in a single CSV per table.
Phase B — Object storage
Create two prefixes per table:
s3://<bucket>/<project>/exports/{table}/ # active CSVs, in-flight
s3://<bucket>/<project>/processed/{table}/ # loaded CSVs, never deleted (audit)
Naming convention: {YYYYMMDD_HHMM}_{TZ}.csv (e.g., 20260424_1400_EAT.csv). Timezone in the filename because "08:00" means nothing a year from now without it.
Retention: match whatever backup retention is already in the stack (e.g., 30 days). processed/ should outlive exports/.
Phase C — Orchestrator credentials
Three credentials:
| Credential | Role | Purpose |
|---|---|---|
<proj>_source |
Read-only role on source DB | Extract queries |
<proj>_dwh_target |
<proj>_owner on target DB |
Bronze writes + control updates |
<proj>_s3 |
IAM user with s3:PutObject, s3:GetObject, s3:ListBucket, s3:DeleteObject on the prefix |
CSV upload/download/move |
Always sslmode=require on any public-IP DB connection. Test each credential with the orchestrator's "Test connection" button before proceeding.
Phase D — Load workflow (build this BEFORE the extract workflow)
Building load first lets you iterate with hand-crafted CSVs in blob storage before wiring up extract. Much faster feedback loop.
Load workflow input (parameters):
{
"table": "position_history",
"csv_path": "s3://bucket/project/exports/position_history/20260424_1400_EAT.csv",
"run_id": 12345,
"run_started_at": "2026-04-24T11:00:00Z"
}
Load workflow steps:
- Download CSV from blob storage.
- Parse CSV into rows.
- Open transaction.
INSERT INTO bronze.<table> (...) VALUES (...) ON CONFLICT (<natural_key>) DO NOTHING;UPDATE dwh_control.extract_watermarks SET last_extracted_at = :run_started_at, last_loaded_at = NOW(), rows_loaded_last_run = <count> WHERE table_name = :table;UPDATE dwh_control.extract_runs SET status = 'loaded', run_finished_at = NOW(), rows_loaded = <count> WHERE run_id = :run_id;- Commit.
- Move CSV from
exports/toprocessed/(copy-then-delete; never delete before copy confirms).
Non-negotiable invariants:
- Steps 3–7 are one transaction. If any fails, all rollback.
- Step 8 only runs after commit. If step 8 fails, the next run will re-load the CSV (idempotent via ON CONFLICT) — not a data loss event.
Phase E — Extract workflow
Extract workflow steps, per table:
- Read current watermark:
SELECT last_extracted_at FROM dwh_control.extract_watermarks WHERE table_name = :table; - Capture
run_started_at = NOW()(in the target DB's clock, not the orchestrator's — reduces clock-skew bugs). INSERT INTO dwh_control.extract_runs (table_name, run_started_at, status) VALUES (:table, :run_started_at, 'extracting') RETURNING run_id;- Query source with closed upper bound:
SELECT <cols> FROM <source_schema>.<table> WHERE <watermark_col> > :last_extracted_at AND <watermark_col> <= :run_started_at ORDER BY <watermark_col>; - Render rows as CSV. For geometry columns:
CASE WHEN geom IS NULL THEN NULL ELSE ST_AsEWKT(geom) END. - Upload CSV to
s3://bucket/project/exports/{table}/{YYYYMMDD_HHMM}_{TZ}.csv. UPDATE dwh_control.extract_runs SET status = 'uploaded', rows_extracted = <count>, csv_path = :path WHERE run_id = :run_id;- Call load workflow with
{table, csv_path, run_id, run_started_at}.
Phase F — Schedule + observability
Cron cadence: start with 6–8 runs/day during active hours. Fold the overnight gap where traffic is low. Example: 0 5,8,11,14,17,20,23 * * * TZ Africa/Nairobi.
Three observability views (readable by the RO role):
v_table_freshness— per-table lag from last successful load. Drives the freshness alert.v_recent_failures— failed runs in last 24h. Zero rows = healthy.v_watermark_lag— extract vs. load lag per table. Distinguishes "nothing to extract" from "stuck".
Template file: dwh/261004_dwh_observability_views.sql.
Grafana panels (add at minimum):
- Freshness panel — red if any row in
v_table_freshnesshaslag > 4h. - Failures panel — red if
v_recent_failureshas any row. - Row counts panel — daily bar chart from
extract_runs.
5. Design Principles (Do Not Skip)
5.1 Watermark on DB insertion time, not source-reported time
The watermark column must be "when the target DB got the row", not "when the device/user said it happened". Device clocks skew, webhooks arrive late, and batch imports backdate records. A source-reported watermark will silently drop rows that arrive out of order. Use recorded_at, created_at, updated_at (with DEFAULT NOW()), or ingested_at — never gps_time / event_time / timestamp.
5.2 Closed upper bound
Extract uses > last_extracted_at AND <= run_started_at. The closed upper bound prevents "row committed at NOW() during the extract query" from appearing in two adjacent runs. Without it, some rows are double-extracted (wasteful) or missed (data loss).
5.3 Idempotent load via natural unique keys
Every incremental bronze table needs a PRIMARY KEY or UNIQUE that matches the source's natural unique key. ON CONFLICT DO NOTHING makes re-running a CSV harmless. Do not invent surrogate keys on bronze — they defeat the ON CONFLICT guarantee. If the source has no natural key, fix the source or accept the table as a snapshot.
5.4 Transactional load boundary
Insert + watermark update + run-log update are one transaction. Splitting them creates "ghost" states where watermark advanced but rows didn't load, causing silent holes.
5.5 CSV audit trail — never delete
Moved-to-processed/ CSVs are cheap ($0.023/GB/month on S3-class storage). They pay for themselves the first time you need to replay a window or debug a row-count mismatch.
5.6 PostGIS round-trip via EWKT
ST_AsEWKT(geom) on extract, ST_GeomFromEWKT(ewkt) on load. Preserves SRID inline. Do NOT store ST_AsText + separate SRID column — it doubles the chance of mismatch. Guard NULLs: CASE WHEN geom IS NULL THEN NULL ELSE ST_AsEWKT(geom) END.
5.7 Fail loud, fail early
Audit migrations (roles, constraints) should RAISE EXCEPTION with a bullet list of what's missing. Silent success is worse than noisy failure — a missing PK surfaces three months later as "why are there duplicate trips?".
6. Snapshot vs. Incremental Decision Matrix
| Signal | Snapshot (TRUNCATE + reload) | Incremental (watermark + append) |
|---|---|---|
| Row count | < ~10k | > ~10k |
| Meaning of "current state" | Matters | Doesn't matter; history matters |
| Deletes in source | Common | Rare |
| Update frequency per row | High | Low (append-mostly) |
| Natural unique key | May not exist | Must exist |
| Example | devices, live_positions, geofences |
position_history, trips, alarms, event logs |
When in doubt: snapshot is simpler. Only escalate to incremental when the snapshot CSV would exceed a few MB per run.
7. Observability Contract
Every pipeline adds these three views to its control schema — no exceptions:
CREATE OR REPLACE VIEW <control>.v_table_freshness AS
SELECT table_name,
MAX(run_finished_at) AS last_loaded_at,
NOW() - MAX(run_finished_at) AS lag,
COUNT(*) FILTER (WHERE run_started_at > NOW() - INTERVAL '24 hours') AS loads_last_24h
FROM <control>.extract_runs
WHERE status = 'loaded'
GROUP BY table_name;
CREATE OR REPLACE VIEW <control>.v_recent_failures AS
SELECT run_id, table_name, run_started_at, run_finished_at, csv_path, error_message
FROM <control>.extract_runs
WHERE status = 'failed' AND run_started_at > NOW() - INTERVAL '24 hours'
ORDER BY run_started_at DESC;
CREATE OR REPLACE VIEW <control>.v_watermark_lag AS
SELECT table_name, last_extracted_at, last_loaded_at, rows_loaded_last_run,
NOW() - last_loaded_at AS load_lag,
NOW() - last_extracted_at AS extract_lag
FROM <control>.extract_watermarks;
Wire a Grafana alert on each view. Test the alert by manually failing a run before go-live.
8. Schema Drift Handling
Schema drift between source and bronze is inevitable. Two rules:
- Detect at design time. Diff source DDL against bronze DDL before writing any extract SQL. Unit changes (metres vs. km), renamed columns, and added nullable columns are the usual suspects.
- Fix in the extract query, not the load. Put all transformations in the SELECT so the CSV on disk already matches the bronze column names and units. The load workflow should be dumb — CSV column N goes to bronze column N.
Document every drift in the runbook (§5 of the operations runbook). Future developers WILL hit them.
9. Verification Gates
Pre-deploy (before first cron tick)
- Every migration applied successfully.
- Control tables seeded (one watermark row per incremental table).
- Every credential's "Test connection" passes.
- Blob storage prefixes exist.
- Manual workflow trigger succeeds end-to-end for one table.
First run (manual trigger of extract workflow)
- Every processed table has a row in
extract_runswithstatus='loaded'. - Row-count parity with source (± in-flight writes):
SELECT COUNT(*) FROM <source>vs.SELECT COUNT(*) FROM bronze.<table>. - Geometry columns round-trip cleanly:
SELECT ST_AsText(geom) FROM bronze.<table> LIMIT 5returns valid POINTs. - All CSVs moved from
exports/toprocessed/.
Steady-state (after 24h / first full schedule cycle)
v_table_freshnessshows lag < cadence × 2 for every table.v_recent_failuresis empty.- Row counts in bronze growing at expected rate.
Only declare "done" after all three gates pass.
10. Scheduling Calibration
Tradeoffs:
| Cadence | Pros | Cons |
|---|---|---|
| Every 15 min | Low lag, small CSVs | High orchestrator churn, noisy alerts |
| Every 3 h (recommended) | Predictable, fits ops windows, tolerable lag | Overnight backlog carries to morning |
| Nightly (once/day) | Cheap, simple | Unacceptable for real-time panels |
Rule of thumb: cadence = 25–50% of your latency tolerance. 4h latency budget → 1-2h cadence.
Fold cadence around traffic patterns. Don't run 24× at 1-hour intervals if the source generates zero rows between midnight and 05:00.
11. Common Failure Modes & Recovery
| Failure | Symptom | Fix |
|---|---|---|
CSV stuck in exports/ |
v_recent_failures has a row; CSV never moved |
Next scheduled run retries automatically (idempotent). If persistent, open orchestrator logs by run_id. |
Table marked loading for >1 cadence |
n8n executor crashed mid-transaction | DB rolled back. Next run retries. If stuck >2 cadences, manually re-trigger the extract. |
| Row counts diverge > 1% | CSV parse error silently dropped rows | rows_extracted != rows_loaded in extract_runs — inspect the CSV for malformed rows. |
| Geometry loads as NULL | EWKT serialisation broke | Check for missing CASE WHEN geom IS NULL guard in extract SQL. |
| Distance/units 1000× wrong | Schema drift not caught | Check extract SQL for the unit conversion (see §8). |
Back-fill a window:
UPDATE <control>.extract_watermarks
SET last_extracted_at = NOW() - INTERVAL '24 hours'
WHERE table_name = '<table>';
Next run re-extracts the gap. ON CONFLICT DO NOTHING filters duplicates.
Full reseed (nuclear):
UPDATE <control>.extract_watermarks
SET last_extracted_at = '2000-01-01T00:00:00Z'
WHERE table_name = '<table>';
Next run back-fills all history in one very large CSV. Expected; it moves to processed/ on success.
12. Security Baseline
- Two roles minimum: owner (writes) and RO (reads). Never use superuser from the orchestrator.
sslmode=requireon every public-IP DB connection.- Passwords never in committed SQL — use placeholder tokens (
CHANGE_ME_BEFORE_APPLY) and swap in-session during apply. Document rotation in the runbook. - Blob storage credentials scoped to the project's prefix, not the whole bucket.
- Rotate all credentials before go-live (don't reuse the ones that were flying around in design conversations).
13. Reusability Checklist (Applying to a New Project)
When starting a new data project, copy the Tracksolid DWH layout and edit these points:
- Rename schemas:
<proj>_controlinstead ofdwh_controlif multiple DWHs share a DB. - Adjust
<proj>_owner/<proj>_rorole names. - Update bucket prefix:
s3://<bucket>/<project>/exports|processed/. - Re-do the snapshot/incremental decision for every source table (§6).
- Identify watermark columns and natural unique keys for every incremental table (§5.1, §5.3).
- Map schema drift before writing extract SQL (§8).
- Calibrate cadence to the new project's latency budget (§10).
- Ship the three observability views (§7) — even if nobody will look at them in week one.
- Write the runbook from the template: follow
docs/DWH_PIPELINE.mdsection-for-section. - Run the verification gates (§9) before declaring done.
14. Reference Implementation (Tracksolid DWH)
These files are the copy-paste template:
| File | Purpose |
|---|---|
dwh/260423_dwh_ddl_v1.sql |
Bronze DDL + roles + schemas |
dwh/261001_dwh_control.sql |
Control schema (watermarks + run log) |
dwh/261002_bronze_constraints_audit.sql |
ON CONFLICT key assertion |
dwh/261003_dwh_roles.sql |
Role contract assertion |
dwh/261004_dwh_observability_views.sql |
Freshness/failure/watermark views |
docs/DWH_PIPELINE.md |
Operations runbook (troubleshooting, manual re-run, rotation) |
docs/superpowers/specs/2026-04-24-n8n-dwh-bronze-pipeline-design.md |
Design spec (why each decision) |
docs/superpowers/plans/2026-04-24-n8n-dwh-bronze-pipeline.md |
Task-by-task implementation plan |
n8n-workflows/dwh_extract.json |
Extract workflow (reference) |
n8n-workflows/dwh_load_bronze.json |
Load workflow (reference) |
For the next project, fork this manual first, then adapt. Do not re-design from scratch — the seven design principles in §5 are the parts people keep getting wrong.