tracksolid_timescale_grafan.../DWH_Execution_Manual.md
David Kiania 34f5fa1b9c feat(dwh): bronze pipeline migrations, runbook, and execution manual
DWH pipeline (new):
  - dwh/261001_dwh_control.sql — watermarks + per-run audit log schema
  - dwh/261002_bronze_constraints_audit.sql — ON CONFLICT key assertion
  - dwh/261003_dwh_roles.sql — dwh_owner / grafana_ro contract assertion
  - dwh/261004_dwh_observability_views.sql — v_table_freshness,
    v_recent_failures, v_watermark_lag (readable by grafana_ro)
  - docs/DWH_PIPELINE.md — operations runbook (setup, troubleshooting,
    manual re-run, back-fill, rotation)
  - DWH_Execution_Manual.md — reusable playbook for future data
    projects (extract → blob → load pattern, 7 design principles,
    snapshot-vs-incremental matrix, verification gates)
  - docs/superpowers/{specs,plans}/2026-04-24-n8n-dwh-bronze-pipeline-*
    — design spec + 27-task implementation plan

Security:
  - dwh/260423_dwh_ddl_v1.sql — redacted plaintext role passwords to
    'CHANGE_ME_BEFORE_APPLY' placeholders; added SECURITY header
    documenting generation + rotation flow

Docs:
  - CLAUDE.md — §3 adds tracksolid_dwh@31.97.44.246:5888 target,
    §4 adds dwh/ + docs/DWH_PIPELINE.md to codebase map, §5 adds
    bronze + dwh_control schema roll-up, §10 adds deploy task +
    password rotation follow-up

Also includes miscellaneous in-progress files accumulated on this
branch (workspace, analytics notes, vehicle CSVs, extract helpers,
renamed markdown archives).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-25 01:07:53 +03:00

19 KiB
Raw Permalink Blame History

DWH Execution Manual

Purpose: A reusable playbook for building an extract → blob → load data warehouse bronze layer using n8n (or any equivalent orchestrator) + object storage + PostgreSQL/PostGIS. Generalised from the Fireside Tracksolid DWH pipeline (2026-04-24). Apply this pattern to future data projects to skip re-deriving the same decisions.

Reference implementation: dwh/26100{1..4}.sql + docs/DWH_PIPELINE.md + the two n8n-workflows/dwh_*.json files. Treat those as the copy-paste template for the next project.


1. When to Use This Pattern

Use it when all of these are true:

  • You need an analytical replica of a production OLTP DB without letting analytical load hit the OLTP.
  • Source and target are separate PostgreSQL instances (possibly separate networks).
  • Data volumes are moderate: millions of rows per day, not billions.
  • You already have an orchestrator (n8n, Airflow, Prefect) and object storage (rustfs, S3, MinIO) in the stack.
  • Latency tolerance is hours, not minutes.

Don't use it when:

  • Sub-minute latency is required → use logical replication or CDC (Debezium, pg_logical, AWS DMS).
  • Volumes exceed ~100 GB/day → step up to Spark/Flink + columnar store (Iceberg, Delta).
  • Source and target are the same DB → just use materialized views or scheduled SQL.
  • You need exactly-once streaming semantics → this pattern is at-least-once + idempotent load.

2. The Core Pattern

┌──────────────┐       ┌──────────────┐       ┌──────────────┐       ┌──────────────┐
│  Source DB   │──(a)──▶  Orchestr.  │──(b)──▶ Object Store │──(c)──▶  Target DB   │
│   (OLTP)     │       │  (extract)   │       │   (CSVs)     │       │   (bronze)   │
└──────────────┘       └──────────────┘       └──────────────┘       └──────────────┘
                              │                                              ▲
                              └──────────────(d)─────────────────────────────┘
                                     (load workflow, per CSV)

(a) Watermarked SELECT, closed upper bound
(b) Atomic CSV upload with timestamped path
(c) CSV stays until load confirms success
(d) Load = BEGIN → INSERT ON CONFLICT → UPDATE watermark → UPDATE run log → COMMIT → move CSV

Why three stages and not two:

  • Audit trail — every extracted CSV is a point-in-time snapshot you can replay.
  • Decoupling — target DB downtime doesn't lose data; the CSV waits.
  • Observability — blob listings are a second source of truth independent of the DB.

3. Pre-flight Checklist

Before writing any SQL or workflow JSON, confirm in writing:

  • Source DB reachable from orchestrator (internal network preferred, VPN/public IP with sslmode=require otherwise).
  • Target DB reachable; you hold a superuser credential for one-time DDL.
  • Object storage bucket exists; credentials are configured in the orchestrator.
  • For each source table to extract, you have identified:
    • A DB-insertion timestamp column (not device/user-reported time), or "it's a snapshot table".
    • A natural unique key that already has a PRIMARY KEY or UNIQUE constraint on source (for the ON CONFLICT target on bronze).
    • Any unit/column drift between source and target (e.g., distance_m vs. distance_km).
  • Acceptable end-to-end latency (to calibrate cron cadence).
  • Security baseline: who writes bronze, who reads it, SSL requirement, password rotation cadence.

If any row is unchecked, pause and resolve it. Skipping this step is the #1 cause of pipelines that "worked in test but lose data in prod."


4. Phase-by-Phase Execution

Execute in order. Phases are independent of each other within their phase, but phases have strict dependencies.

Phase A — Target DB preparation

Apply three types of migrations, in numeric order:

  1. Bronze DDL — one table per source table. Use IF NOT EXISTS; make it idempotent.
  2. Control schemadwh_control.extract_watermarks + dwh_control.extract_runs.
  3. Assertion migrations — verify roles exist, verify every ON CONFLICT target is backed by a PK/UNIQUE (fail loudly if not).

Template files: dwh/260423_dwh_ddl_v1.sql, dwh/261001_dwh_control.sql, dwh/261002_bronze_constraints_audit.sql, dwh/261003_dwh_roles.sql.

Role model:

  • <proj>_owner — owns schemas, writes bronze + control tables.
  • <proj>_ro (or grafana_ro) — reads everything, writes nothing.
  • Never use postgres or another superuser from the orchestrator.

Watermark seed: set last_extracted_at to a date before any real data ('2000-01-01T00:00:00Z' is safe) so the first run back-fills all history in a single CSV per table.

Phase B — Object storage

Create two prefixes per table:

s3://<bucket>/<project>/exports/{table}/     # active CSVs, in-flight
s3://<bucket>/<project>/processed/{table}/   # loaded CSVs, never deleted (audit)

Naming convention: {YYYYMMDD_HHMM}_{TZ}.csv (e.g., 20260424_1400_EAT.csv). Timezone in the filename because "08:00" means nothing a year from now without it.

Retention: match whatever backup retention is already in the stack (e.g., 30 days). processed/ should outlive exports/.

Phase C — Orchestrator credentials

Three credentials:

Credential Role Purpose
<proj>_source Read-only role on source DB Extract queries
<proj>_dwh_target <proj>_owner on target DB Bronze writes + control updates
<proj>_s3 IAM user with s3:PutObject, s3:GetObject, s3:ListBucket, s3:DeleteObject on the prefix CSV upload/download/move

Always sslmode=require on any public-IP DB connection. Test each credential with the orchestrator's "Test connection" button before proceeding.

Phase D — Load workflow (build this BEFORE the extract workflow)

Building load first lets you iterate with hand-crafted CSVs in blob storage before wiring up extract. Much faster feedback loop.

Load workflow input (parameters):

{
  "table": "position_history",
  "csv_path": "s3://bucket/project/exports/position_history/20260424_1400_EAT.csv",
  "run_id": 12345,
  "run_started_at": "2026-04-24T11:00:00Z"
}

Load workflow steps:

  1. Download CSV from blob storage.
  2. Parse CSV into rows.
  3. Open transaction.
  4. INSERT INTO bronze.<table> (...) VALUES (...) ON CONFLICT (<natural_key>) DO NOTHING;
  5. UPDATE dwh_control.extract_watermarks SET last_extracted_at = :run_started_at, last_loaded_at = NOW(), rows_loaded_last_run = <count> WHERE table_name = :table;
  6. UPDATE dwh_control.extract_runs SET status = 'loaded', run_finished_at = NOW(), rows_loaded = <count> WHERE run_id = :run_id;
  7. Commit.
  8. Move CSV from exports/ to processed/ (copy-then-delete; never delete before copy confirms).

Non-negotiable invariants:

  • Steps 37 are one transaction. If any fails, all rollback.
  • Step 8 only runs after commit. If step 8 fails, the next run will re-load the CSV (idempotent via ON CONFLICT) — not a data loss event.

Phase E — Extract workflow

Extract workflow steps, per table:

  1. Read current watermark: SELECT last_extracted_at FROM dwh_control.extract_watermarks WHERE table_name = :table;
  2. Capture run_started_at = NOW() (in the target DB's clock, not the orchestrator's — reduces clock-skew bugs).
  3. INSERT INTO dwh_control.extract_runs (table_name, run_started_at, status) VALUES (:table, :run_started_at, 'extracting') RETURNING run_id;
  4. Query source with closed upper bound:
    SELECT <cols>
    FROM <source_schema>.<table>
    WHERE <watermark_col> >  :last_extracted_at
      AND <watermark_col> <= :run_started_at
    ORDER BY <watermark_col>;
    
  5. Render rows as CSV. For geometry columns: CASE WHEN geom IS NULL THEN NULL ELSE ST_AsEWKT(geom) END.
  6. Upload CSV to s3://bucket/project/exports/{table}/{YYYYMMDD_HHMM}_{TZ}.csv.
  7. UPDATE dwh_control.extract_runs SET status = 'uploaded', rows_extracted = <count>, csv_path = :path WHERE run_id = :run_id;
  8. Call load workflow with {table, csv_path, run_id, run_started_at}.

Phase F — Schedule + observability

Cron cadence: start with 68 runs/day during active hours. Fold the overnight gap where traffic is low. Example: 0 5,8,11,14,17,20,23 * * * TZ Africa/Nairobi.

Three observability views (readable by the RO role):

  • v_table_freshness — per-table lag from last successful load. Drives the freshness alert.
  • v_recent_failures — failed runs in last 24h. Zero rows = healthy.
  • v_watermark_lag — extract vs. load lag per table. Distinguishes "nothing to extract" from "stuck".

Template file: dwh/261004_dwh_observability_views.sql.

Grafana panels (add at minimum):

  1. Freshness panel — red if any row in v_table_freshness has lag > 4h.
  2. Failures panel — red if v_recent_failures has any row.
  3. Row counts panel — daily bar chart from extract_runs.

5. Design Principles (Do Not Skip)

5.1 Watermark on DB insertion time, not source-reported time

The watermark column must be "when the target DB got the row", not "when the device/user said it happened". Device clocks skew, webhooks arrive late, and batch imports backdate records. A source-reported watermark will silently drop rows that arrive out of order. Use recorded_at, created_at, updated_at (with DEFAULT NOW()), or ingested_at — never gps_time / event_time / timestamp.

5.2 Closed upper bound

Extract uses > last_extracted_at AND <= run_started_at. The closed upper bound prevents "row committed at NOW() during the extract query" from appearing in two adjacent runs. Without it, some rows are double-extracted (wasteful) or missed (data loss).

5.3 Idempotent load via natural unique keys

Every incremental bronze table needs a PRIMARY KEY or UNIQUE that matches the source's natural unique key. ON CONFLICT DO NOTHING makes re-running a CSV harmless. Do not invent surrogate keys on bronze — they defeat the ON CONFLICT guarantee. If the source has no natural key, fix the source or accept the table as a snapshot.

5.4 Transactional load boundary

Insert + watermark update + run-log update are one transaction. Splitting them creates "ghost" states where watermark advanced but rows didn't load, causing silent holes.

5.5 CSV audit trail — never delete

Moved-to-processed/ CSVs are cheap ($0.023/GB/month on S3-class storage). They pay for themselves the first time you need to replay a window or debug a row-count mismatch.

5.6 PostGIS round-trip via EWKT

ST_AsEWKT(geom) on extract, ST_GeomFromEWKT(ewkt) on load. Preserves SRID inline. Do NOT store ST_AsText + separate SRID column — it doubles the chance of mismatch. Guard NULLs: CASE WHEN geom IS NULL THEN NULL ELSE ST_AsEWKT(geom) END.

5.7 Fail loud, fail early

Audit migrations (roles, constraints) should RAISE EXCEPTION with a bullet list of what's missing. Silent success is worse than noisy failure — a missing PK surfaces three months later as "why are there duplicate trips?".


6. Snapshot vs. Incremental Decision Matrix

Signal Snapshot (TRUNCATE + reload) Incremental (watermark + append)
Row count < ~10k > ~10k
Meaning of "current state" Matters Doesn't matter; history matters
Deletes in source Common Rare
Update frequency per row High Low (append-mostly)
Natural unique key May not exist Must exist
Example devices, live_positions, geofences position_history, trips, alarms, event logs

When in doubt: snapshot is simpler. Only escalate to incremental when the snapshot CSV would exceed a few MB per run.


7. Observability Contract

Every pipeline adds these three views to its control schema — no exceptions:

CREATE OR REPLACE VIEW <control>.v_table_freshness AS
SELECT table_name,
       MAX(run_finished_at) AS last_loaded_at,
       NOW() - MAX(run_finished_at) AS lag,
       COUNT(*) FILTER (WHERE run_started_at > NOW() - INTERVAL '24 hours') AS loads_last_24h
FROM <control>.extract_runs
WHERE status = 'loaded'
GROUP BY table_name;

CREATE OR REPLACE VIEW <control>.v_recent_failures AS
SELECT run_id, table_name, run_started_at, run_finished_at, csv_path, error_message
FROM <control>.extract_runs
WHERE status = 'failed' AND run_started_at > NOW() - INTERVAL '24 hours'
ORDER BY run_started_at DESC;

CREATE OR REPLACE VIEW <control>.v_watermark_lag AS
SELECT table_name, last_extracted_at, last_loaded_at, rows_loaded_last_run,
       NOW() - last_loaded_at    AS load_lag,
       NOW() - last_extracted_at AS extract_lag
FROM <control>.extract_watermarks;

Wire a Grafana alert on each view. Test the alert by manually failing a run before go-live.


8. Schema Drift Handling

Schema drift between source and bronze is inevitable. Two rules:

  1. Detect at design time. Diff source DDL against bronze DDL before writing any extract SQL. Unit changes (metres vs. km), renamed columns, and added nullable columns are the usual suspects.
  2. Fix in the extract query, not the load. Put all transformations in the SELECT so the CSV on disk already matches the bronze column names and units. The load workflow should be dumb — CSV column N goes to bronze column N.

Document every drift in the runbook (§5 of the operations runbook). Future developers WILL hit them.


9. Verification Gates

Pre-deploy (before first cron tick)

  • Every migration applied successfully.
  • Control tables seeded (one watermark row per incremental table).
  • Every credential's "Test connection" passes.
  • Blob storage prefixes exist.
  • Manual workflow trigger succeeds end-to-end for one table.

First run (manual trigger of extract workflow)

  • Every processed table has a row in extract_runs with status='loaded'.
  • Row-count parity with source (± in-flight writes): SELECT COUNT(*) FROM <source> vs. SELECT COUNT(*) FROM bronze.<table>.
  • Geometry columns round-trip cleanly: SELECT ST_AsText(geom) FROM bronze.<table> LIMIT 5 returns valid POINTs.
  • All CSVs moved from exports/ to processed/.

Steady-state (after 24h / first full schedule cycle)

  • v_table_freshness shows lag < cadence × 2 for every table.
  • v_recent_failures is empty.
  • Row counts in bronze growing at expected rate.

Only declare "done" after all three gates pass.


10. Scheduling Calibration

Tradeoffs:

Cadence Pros Cons
Every 15 min Low lag, small CSVs High orchestrator churn, noisy alerts
Every 3 h (recommended) Predictable, fits ops windows, tolerable lag Overnight backlog carries to morning
Nightly (once/day) Cheap, simple Unacceptable for real-time panels

Rule of thumb: cadence = 2550% of your latency tolerance. 4h latency budget → 1-2h cadence.

Fold cadence around traffic patterns. Don't run 24× at 1-hour intervals if the source generates zero rows between midnight and 05:00.


11. Common Failure Modes & Recovery

Failure Symptom Fix
CSV stuck in exports/ v_recent_failures has a row; CSV never moved Next scheduled run retries automatically (idempotent). If persistent, open orchestrator logs by run_id.
Table marked loading for >1 cadence n8n executor crashed mid-transaction DB rolled back. Next run retries. If stuck >2 cadences, manually re-trigger the extract.
Row counts diverge > 1% CSV parse error silently dropped rows rows_extracted != rows_loaded in extract_runs — inspect the CSV for malformed rows.
Geometry loads as NULL EWKT serialisation broke Check for missing CASE WHEN geom IS NULL guard in extract SQL.
Distance/units 1000× wrong Schema drift not caught Check extract SQL for the unit conversion (see §8).

Back-fill a window:

UPDATE <control>.extract_watermarks
SET last_extracted_at = NOW() - INTERVAL '24 hours'
WHERE table_name = '<table>';

Next run re-extracts the gap. ON CONFLICT DO NOTHING filters duplicates.

Full reseed (nuclear):

UPDATE <control>.extract_watermarks
SET last_extracted_at = '2000-01-01T00:00:00Z'
WHERE table_name = '<table>';

Next run back-fills all history in one very large CSV. Expected; it moves to processed/ on success.


12. Security Baseline

  • Two roles minimum: owner (writes) and RO (reads). Never use superuser from the orchestrator.
  • sslmode=require on every public-IP DB connection.
  • Passwords never in committed SQL — use placeholder tokens (CHANGE_ME_BEFORE_APPLY) and swap in-session during apply. Document rotation in the runbook.
  • Blob storage credentials scoped to the project's prefix, not the whole bucket.
  • Rotate all credentials before go-live (don't reuse the ones that were flying around in design conversations).

13. Reusability Checklist (Applying to a New Project)

When starting a new data project, copy the Tracksolid DWH layout and edit these points:

  • Rename schemas: <proj>_control instead of dwh_control if multiple DWHs share a DB.
  • Adjust <proj>_owner / <proj>_ro role names.
  • Update bucket prefix: s3://<bucket>/<project>/exports|processed/.
  • Re-do the snapshot/incremental decision for every source table (§6).
  • Identify watermark columns and natural unique keys for every incremental table (§5.1, §5.3).
  • Map schema drift before writing extract SQL (§8).
  • Calibrate cadence to the new project's latency budget (§10).
  • Ship the three observability views (§7) — even if nobody will look at them in week one.
  • Write the runbook from the template: follow docs/DWH_PIPELINE.md section-for-section.
  • Run the verification gates (§9) before declaring done.

14. Reference Implementation (Tracksolid DWH)

These files are the copy-paste template:

File Purpose
dwh/260423_dwh_ddl_v1.sql Bronze DDL + roles + schemas
dwh/261001_dwh_control.sql Control schema (watermarks + run log)
dwh/261002_bronze_constraints_audit.sql ON CONFLICT key assertion
dwh/261003_dwh_roles.sql Role contract assertion
dwh/261004_dwh_observability_views.sql Freshness/failure/watermark views
docs/DWH_PIPELINE.md Operations runbook (troubleshooting, manual re-run, rotation)
docs/superpowers/specs/2026-04-24-n8n-dwh-bronze-pipeline-design.md Design spec (why each decision)
docs/superpowers/plans/2026-04-24-n8n-dwh-bronze-pipeline.md Task-by-task implementation plan
n8n-workflows/dwh_extract.json Extract workflow (reference)
n8n-workflows/dwh_load_bronze.json Load workflow (reference)

For the next project, fork this manual first, then adapt. Do not re-design from scratch — the seven design principles in §5 are the parts people keep getting wrong.