tracksolid_timescale_grafan.../docs/superpowers/plans/2026-04-24-n8n-dwh-bronze-pipeline.md
David Kiania 34f5fa1b9c feat(dwh): bronze pipeline migrations, runbook, and execution manual
DWH pipeline (new):
  - dwh/261001_dwh_control.sql — watermarks + per-run audit log schema
  - dwh/261002_bronze_constraints_audit.sql — ON CONFLICT key assertion
  - dwh/261003_dwh_roles.sql — dwh_owner / grafana_ro contract assertion
  - dwh/261004_dwh_observability_views.sql — v_table_freshness,
    v_recent_failures, v_watermark_lag (readable by grafana_ro)
  - docs/DWH_PIPELINE.md — operations runbook (setup, troubleshooting,
    manual re-run, back-fill, rotation)
  - DWH_Execution_Manual.md — reusable playbook for future data
    projects (extract → blob → load pattern, 7 design principles,
    snapshot-vs-incremental matrix, verification gates)
  - docs/superpowers/{specs,plans}/2026-04-24-n8n-dwh-bronze-pipeline-*
    — design spec + 27-task implementation plan

Security:
  - dwh/260423_dwh_ddl_v1.sql — redacted plaintext role passwords to
    'CHANGE_ME_BEFORE_APPLY' placeholders; added SECURITY header
    documenting generation + rotation flow

Docs:
  - CLAUDE.md — §3 adds tracksolid_dwh@31.97.44.246:5888 target,
    §4 adds dwh/ + docs/DWH_PIPELINE.md to codebase map, §5 adds
    bronze + dwh_control schema roll-up, §10 adds deploy task +
    password rotation follow-up

Also includes miscellaneous in-progress files accumulated on this
branch (workspace, analytics notes, vehicle CSVs, extract helpers,
renamed markdown archives).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-25 01:07:53 +03:00

1732 lines
59 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# n8n DWH Bronze Layer Pipeline Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Stand up an n8n-driven DWH extract→blob→bronze pipeline that mirrors `tracksolid_db` tables into `tracksolid_dwh.bronze` via rustfs-hosted CSV files 7× per day.
**Architecture:** Two n8n workflows — `dwh_extract` (scheduled, reads source → writes CSV to rustfs) and `dwh_load_bronze` (triggered per table → reads CSV → upserts into `bronze` schema). A `dwh_control` schema on the target DB holds per-table watermarks and a run log for observability and idempotent retry.
**Tech Stack:** n8n (workflow orchestration) · PostgreSQL 16 + TimescaleDB 2.15 + PostGIS 3 (source `tracksolid_db` on Coolify, target `tracksolid_dwh` at `31.97.44.246:5888`) · rustfs S3-compatible storage (bucket `fleet-db`) · psql for migrations.
**Reference spec:** `docs/superpowers/specs/2026-04-24-n8n-dwh-bronze-pipeline-design.md`
---
## Adaptation Note for This Plan
Classic TDD (write failing unit test → implement → watch pass) doesn't cleanly apply to n8n JSON workflows. For every task in this plan:
- **SQL migrations / bash / scripts** — use TDD: write a verification query that SHOULD fail now, run it, apply the change, re-run, expect success.
- **n8n workflow nodes** — build each node, then run the workflow in n8n's "Execute Workflow" mode and inspect the output at that node before moving on. Export JSON to repo after each stable checkpoint.
- **End-to-end** — row-count parity between source and bronze is the integration test.
Every task below includes an explicit verification step with expected output.
---
## File Structure
| Path | Created | Purpose |
|---|---|---|
| `dwh/260423_dwh_ddl_v1.sql` | existing | Bronze/silver/gold schemas + 16 bronze tables (already authored) |
| `dwh/261001_dwh_control.sql` | new | `dwh_control` schema — watermarks + run log + role setup |
| `dwh/261002_bronze_constraints_audit.sql` | new | Patches any missing UNIQUE constraints on bronze tables needed for `ON CONFLICT` |
| `n8n-workflows/dwh_extract.json` | new | Workflow 1 export (scheduled extract → CSV → rustfs) |
| `n8n-workflows/dwh_load_bronze.json` | new | Workflow 2 export (rustfs CSV → bronze upsert) |
| `n8n-workflows/dwh_error_notifier.json` | new | Shared error-workflow wired to both pipeline workflows |
| `docs/DWH_PIPELINE.md` | new | Operations runbook (setup, manual trigger, troubleshooting) |
| `CLAUDE.md` | modify §3,§4,§5,§10 | Add `tracksolid_dwh` connection to §3; bronze pipeline to §4 map, §5 table list, and remove DWH from §10 open items |
---
## Task Sequence Overview
**Phase A — Target DB setup** (Tasks 15): apply bronze DDL, control schema, roles, constraint audit. One-time.
**Phase B — Rustfs setup** (Task 6): create prefixes.
**Phase C — n8n credential hardening** (Tasks 78): switch to scoped users, enforce SSL.
**Phase D — Workflow 2 (load) built first** (Tasks 913): the load workflow is simpler and Workflow 1 calls it, so we build the callee first and test it with a hand-crafted CSV.
**Phase E — Workflow 1 (extract) built per table** (Tasks 1423): add tables one at a time, starting with the smallest (`devices` snapshot), end-to-end verifying each before moving on.
**Phase F — Observability & go-live** (Tasks 2428): error workflow, cron enable, 24h steady-state check, runbook, docs.
---
## Phase A — Target DB Setup
### Task 1: Apply existing bronze DDL to `tracksolid_dwh`
**Files:**
- Apply: `dwh/260423_dwh_ddl_v1.sql` (existing file, no modification)
**Purpose:** Ensure all 16 bronze tables exist on the target DB before anything else touches it.
- [ ] **Step 1: Confirm target DB is reachable and empty (verification-first)**
Run:
```bash
PGPASSWORD=<postgres_password> psql \
-h 31.97.44.246 -p 5888 \
-U postgres -d tracksolid_dwh \
-c "\dt bronze.*"
```
Expected (before change): `Did not find any relation named "bronze.*".`
If a connection error occurs, confirm `sslmode=require` is appended to the URI or that SSL isn't enforced on the server yet — document which.
- [ ] **Step 2: Apply the DDL**
Run:
```bash
PGPASSWORD=<postgres_password> psql \
-h 31.97.44.246 -p 5888 \
-U postgres -d tracksolid_dwh \
-f dwh/260423_dwh_ddl_v1.sql
```
- [ ] **Step 3: Verify 16 bronze tables exist**
Run:
```bash
PGPASSWORD=<postgres_password> psql \
-h 31.97.44.246 -p 5888 \
-U postgres -d tracksolid_dwh \
-c "SELECT count(*) FROM pg_tables WHERE schemaname='bronze';"
```
Expected: `16` (per the DDL: devices, position_history, trips, alarms, live_positions, parking_events, device_events, fault_codes, fuel_readings, obd_readings, heartbeats, ingestion_log, dispatch_log, geofences, lbs_readings, temperature_readings).
- [ ] **Step 4: Verify `silver` and `gold` schemas exist (empty OK)**
Run:
```bash
PGPASSWORD=<postgres_password> psql \
-h 31.97.44.246 -p 5888 \
-U postgres -d tracksolid_dwh \
-c "SELECT schema_name FROM information_schema.schemata WHERE schema_name IN ('bronze','silver','gold') ORDER BY schema_name;"
```
Expected: three rows — `bronze`, `gold`, `silver`.
- [ ] **Step 5: Commit nothing (no repo change yet — just a deploy step)**
No commit. This is a stateful one-time operation on the remote DB; record the date/time applied in the runbook (Task 27).
---
### Task 2: Author and apply `dwh/261001_dwh_control.sql` (watermarks + run log)
**Files:**
- Create: `dwh/261001_dwh_control.sql`
- Apply to: `tracksolid_dwh` at `31.97.44.246:5888`
- [ ] **Step 1: Write `dwh/261001_dwh_control.sql`**
```sql
-- dwh/261001_dwh_control.sql
-- Control schema: per-table watermarks and run-level audit log.
-- Applied once to tracksolid_dwh.
BEGIN;
CREATE SCHEMA IF NOT EXISTS dwh_control;
CREATE TABLE IF NOT EXISTS dwh_control.extract_watermarks (
table_name TEXT PRIMARY KEY,
last_extracted_at TIMESTAMPTZ NOT NULL DEFAULT '2026-01-01T00:00:00Z',
last_loaded_at TIMESTAMPTZ,
rows_loaded_last_run INT,
updated_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS dwh_control.extract_runs (
run_id BIGSERIAL PRIMARY KEY,
table_name TEXT NOT NULL,
run_started_at TIMESTAMPTZ NOT NULL,
run_finished_at TIMESTAMPTZ,
rows_extracted INT,
rows_loaded INT,
csv_path TEXT,
status TEXT CHECK (status IN ('extracting','uploaded','loading','loaded','failed')),
error_message TEXT
);
CREATE INDEX IF NOT EXISTS idx_extract_runs_table_time
ON dwh_control.extract_runs (table_name, run_started_at DESC);
CREATE INDEX IF NOT EXISTS idx_extract_runs_status_time
ON dwh_control.extract_runs (status, run_finished_at DESC);
-- Seed one row per incremental table so first extract runs use the default bound.
INSERT INTO dwh_control.extract_watermarks (table_name) VALUES
('position_history'), ('trips'), ('alarms'),
('parking_events'), ('device_events'), ('ingestion_log')
ON CONFLICT (table_name) DO NOTHING;
COMMIT;
```
- [ ] **Step 2: Verify the migration fails "cleanly" pre-apply (schema doesn't exist)**
Run:
```bash
PGPASSWORD=<postgres_password> psql \
-h 31.97.44.246 -p 5888 -U postgres -d tracksolid_dwh \
-c "SELECT count(*) FROM dwh_control.extract_watermarks;"
```
Expected: error like `relation "dwh_control.extract_watermarks" does not exist`.
- [ ] **Step 3: Apply migration**
Run:
```bash
PGPASSWORD=<postgres_password> psql \
-h 31.97.44.246 -p 5888 -U postgres -d tracksolid_dwh \
-f dwh/261001_dwh_control.sql
```
- [ ] **Step 4: Verify watermark seeds**
Run:
```bash
PGPASSWORD=<postgres_password> psql \
-h 31.97.44.246 -p 5888 -U postgres -d tracksolid_dwh \
-c "SELECT table_name, last_extracted_at FROM dwh_control.extract_watermarks ORDER BY table_name;"
```
Expected: 6 rows, all with `last_extracted_at = 2026-01-01 00:00:00+00`:
```
table_name | last_extracted_at
-----------------+------------------------
alarms | 2026-01-01 00:00:00+00
device_events | 2026-01-01 00:00:00+00
ingestion_log | 2026-01-01 00:00:00+00
parking_events | 2026-01-01 00:00:00+00
position_history| 2026-01-01 00:00:00+00
trips | 2026-01-01 00:00:00+00
```
- [ ] **Step 5: Commit**
```bash
git add dwh/261001_dwh_control.sql
git commit -m "feat(dwh): add dwh_control schema with watermarks and run log"
```
---
### Task 3: Create scoped `dwh_owner` and `dwh_ro` roles
**Files:**
- Create: `dwh/261003_dwh_roles.sql`
- Apply to: `tracksolid_dwh`
- [ ] **Step 1: Write `dwh/261003_dwh_roles.sql`**
```sql
-- dwh/261003_dwh_roles.sql
-- Role hardening: n8n writes as dwh_owner (not postgres), Grafana reads as dwh_ro.
-- Passwords are templated; replace <DWH_OWNER_PASSWORD> and <DWH_RO_PASSWORD>
-- at apply time (do NOT commit the real values).
BEGIN;
-- Writer role: owns bronze + dwh_control only.
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_roles WHERE rolname='dwh_owner') THEN
CREATE ROLE dwh_owner LOGIN PASSWORD '<DWH_OWNER_PASSWORD>';
END IF;
END$$;
GRANT USAGE, CREATE ON SCHEMA bronze TO dwh_owner;
GRANT USAGE, CREATE ON SCHEMA dwh_control TO dwh_owner;
GRANT ALL ON ALL TABLES IN SCHEMA bronze TO dwh_owner;
GRANT ALL ON ALL TABLES IN SCHEMA dwh_control TO dwh_owner;
GRANT ALL ON ALL SEQUENCES IN SCHEMA bronze TO dwh_owner;
GRANT ALL ON ALL SEQUENCES IN SCHEMA dwh_control TO dwh_owner;
ALTER DEFAULT PRIVILEGES IN SCHEMA bronze GRANT ALL ON TABLES TO dwh_owner;
ALTER DEFAULT PRIVILEGES IN SCHEMA dwh_control GRANT ALL ON TABLES TO dwh_owner;
ALTER DEFAULT PRIVILEGES IN SCHEMA bronze GRANT ALL ON SEQUENCES TO dwh_owner;
ALTER DEFAULT PRIVILEGES IN SCHEMA dwh_control GRANT ALL ON SEQUENCES TO dwh_owner;
-- Reader role: read-only across bronze + dwh_control (for Grafana dashboards).
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_roles WHERE rolname='dwh_ro') THEN
CREATE ROLE dwh_ro LOGIN PASSWORD '<DWH_RO_PASSWORD>';
END IF;
END$$;
GRANT USAGE ON SCHEMA bronze TO dwh_ro;
GRANT USAGE ON SCHEMA dwh_control TO dwh_ro;
GRANT SELECT ON ALL TABLES IN SCHEMA bronze TO dwh_ro;
GRANT SELECT ON ALL TABLES IN SCHEMA dwh_control TO dwh_ro;
ALTER DEFAULT PRIVILEGES IN SCHEMA bronze GRANT SELECT ON TABLES TO dwh_ro;
ALTER DEFAULT PRIVILEGES IN SCHEMA dwh_control GRANT SELECT ON TABLES TO dwh_ro;
COMMIT;
```
- [ ] **Step 2: Apply with real passwords (templated, not committed)**
Generate two random passwords, export them as shell vars, substitute with `sed`, apply, then clear:
```bash
export DWH_OWNER_PW=$(openssl rand -hex 24)
export DWH_RO_PW=$(openssl rand -hex 24)
sed "s/<DWH_OWNER_PASSWORD>/$DWH_OWNER_PW/; s/<DWH_RO_PASSWORD>/$DWH_RO_PW/" \
dwh/261003_dwh_roles.sql \
| PGPASSWORD=<postgres_password> psql -h 31.97.44.246 -p 5888 -U postgres -d tracksolid_dwh
echo "Store these in 1Password / Coolify secrets:"
echo " dwh_owner: $DWH_OWNER_PW"
echo " dwh_ro: $DWH_RO_PW"
unset DWH_OWNER_PW DWH_RO_PW
```
Copy both passwords into Coolify secret manager (or 1Password vault, per team convention) BEFORE closing the terminal.
- [ ] **Step 3: Verify roles and grants**
Run:
```bash
PGPASSWORD=<postgres_password> psql \
-h 31.97.44.246 -p 5888 -U postgres -d tracksolid_dwh \
-c "\du dwh_owner" \
-c "\du dwh_ro" \
-c "SELECT grantee, privilege_type, table_schema FROM information_schema.table_privileges WHERE grantee IN ('dwh_owner','dwh_ro') AND table_schema IN ('bronze','dwh_control') GROUP BY 1,2,3 ORDER BY 1,3,2;"
```
Expected: `dwh_owner` has ALL on both schemas; `dwh_ro` has only SELECT.
- [ ] **Step 4: Verify `dwh_owner` can log in and write**
Run:
```bash
PGPASSWORD=$DWH_OWNER_PW psql \
-h 31.97.44.246 -p 5888 -U dwh_owner -d tracksolid_dwh \
-c "INSERT INTO dwh_control.extract_runs (table_name, run_started_at, status) VALUES ('_smoke_test_', NOW(), 'extracting') RETURNING run_id;" \
-c "DELETE FROM dwh_control.extract_runs WHERE table_name='_smoke_test_';"
```
Expected: one run_id returned, then `DELETE 1`.
- [ ] **Step 5: Verify `dwh_ro` cannot write**
Run:
```bash
PGPASSWORD=$DWH_RO_PW psql \
-h 31.97.44.246 -p 5888 -U dwh_ro -d tracksolid_dwh \
-c "INSERT INTO dwh_control.extract_runs (table_name, run_started_at, status) VALUES ('_should_fail_', NOW(), 'extracting');"
```
Expected: `ERROR: permission denied for table extract_runs`.
- [ ] **Step 6: Commit**
```bash
git add dwh/261003_dwh_roles.sql
git commit -m "feat(dwh): add dwh_owner and dwh_ro scoped roles"
```
---
### Task 4: Audit bronze tables for UNIQUE constraints needed by `ON CONFLICT`
**Files:**
- Create: `dwh/261002_bronze_constraints_audit.sql`
- Apply to: `tracksolid_dwh`
**Purpose:** The design spec uses `ON CONFLICT (id) DO NOTHING` (for tables with a serial id) and `ON CONFLICT (imei, gps_time) DO NOTHING` (for `position_history`). Verify these constraints exist in the bronze DDL; patch anything missing.
- [ ] **Step 1: Inspect existing bronze constraints**
Run:
```bash
PGPASSWORD=<postgres_password> psql \
-h 31.97.44.246 -p 5888 -U postgres -d tracksolid_dwh \
-c "SELECT conrelid::regclass AS table, conname, contype, pg_get_constraintdef(oid) FROM pg_constraint WHERE connamespace = 'bronze'::regnamespace AND contype IN ('p','u') ORDER BY 1;"
```
Review output. For each table below, confirm the listed conflict target exists as PK or UNIQUE:
| Bronze table | Required conflict target |
|---|---|
| `bronze.devices` | `(imei)` as PK |
| `bronze.live_positions` | `(imei)` as PK |
| `bronze.position_history` | `(imei, gps_time)` |
| `bronze.trips` | `(id)` as PK |
| `bronze.alarms` | `(id)` as PK |
| `bronze.parking_events` | `(id)` as PK |
| `bronze.device_events` | `(id)` as PK |
| `bronze.ingestion_log` | `(id)` as PK |
- [ ] **Step 2: Write `dwh/261002_bronze_constraints_audit.sql`**
This file is authored based on the output of Step 1. If all constraints are present, the file is a no-op audit with a comment documenting the check. If any are missing, add the appropriate `ALTER TABLE ... ADD CONSTRAINT` statements.
Template (fill the ADD CONSTRAINT block with only the statements that are actually needed):
```sql
-- dwh/261002_bronze_constraints_audit.sql
-- Audit: ensure bronze tables have unique keys matching the ON CONFLICT
-- targets used by the dwh_load_bronze workflow.
-- Run after 260423_dwh_ddl_v1.sql on tracksolid_dwh.
BEGIN;
-- PASTE any ALTER TABLE ... ADD CONSTRAINT statements identified in Step 1 here.
-- Example shape (only include if pg_constraint did not already list it):
-- ALTER TABLE bronze.position_history
-- ADD CONSTRAINT position_history_dedup UNIQUE (imei, gps_time);
-- Assert every target exists. If any assert fails, the migration aborts.
DO $$
DECLARE
checks TEXT[] := ARRAY[
'bronze.devices,{imei}',
'bronze.live_positions,{imei}',
'bronze.position_history,{imei,gps_time}',
'bronze.trips,{id}',
'bronze.alarms,{id}',
'bronze.parking_events,{id}',
'bronze.device_events,{id}',
'bronze.ingestion_log,{id}'
];
chk TEXT;
tbl TEXT;
cols TEXT;
BEGIN
FOREACH chk IN ARRAY checks LOOP
tbl := split_part(chk, ',', 1);
cols := split_part(chk, ',', 2);
IF NOT EXISTS (
SELECT 1 FROM pg_constraint c
JOIN pg_class t ON t.oid = c.conrelid
JOIN pg_namespace n ON n.oid = t.relnamespace
WHERE n.nspname || '.' || t.relname = tbl
AND c.contype IN ('p','u')
AND pg_get_constraintdef(c.oid) ILIKE '%' || replace(replace(cols,'{',''),'}','') || '%'
) THEN
RAISE EXCEPTION 'Missing unique/primary constraint: % on %', cols, tbl;
END IF;
END LOOP;
END$$;
COMMIT;
```
- [ ] **Step 3: Apply and verify**
Run:
```bash
PGPASSWORD=<postgres_password> psql \
-h 31.97.44.246 -p 5888 -U postgres -d tracksolid_dwh \
-f dwh/261002_bronze_constraints_audit.sql
```
Expected: `COMMIT`. If any constraint is missing, the `DO` block raises and aborts — iterate on Step 2 until all assertions pass.
- [ ] **Step 4: Commit**
```bash
git add dwh/261002_bronze_constraints_audit.sql
git commit -m "feat(dwh): assert bronze ON CONFLICT targets exist"
```
---
### Task 5: Verify end-to-end target-DB state
- [ ] **Step 1: Check-list query**
Run:
```bash
PGPASSWORD=<postgres_password> psql \
-h 31.97.44.246 -p 5888 -U postgres -d tracksolid_dwh <<'SQL'
\echo '== bronze tables =='
SELECT count(*) AS bronze_tables FROM pg_tables WHERE schemaname='bronze';
\echo '== dwh_control tables =='
SELECT count(*) AS control_tables FROM pg_tables WHERE schemaname='dwh_control';
\echo '== watermark seeds =='
SELECT count(*) AS seeded FROM dwh_control.extract_watermarks;
\echo '== roles =='
SELECT rolname FROM pg_roles WHERE rolname IN ('dwh_owner','dwh_ro') ORDER BY 1;
SQL
```
Expected:
```
bronze_tables: 16
control_tables: 2
seeded: 6
roles: dwh_owner, dwh_ro
```
- [ ] **Step 2: No commit (pure verification)**
---
## Phase B — Rustfs Setup
### Task 6: Create `dwh/exports/` and `dwh/processed/` prefixes in `fleet-db` bucket
**Files:**
- Remote-only: `s3://fleet-db/dwh/exports/` and `s3://fleet-db/dwh/processed/`
- [ ] **Step 1: Verify rustfs bucket reachable**
Export secrets from Coolify `.env` (do not print):
```bash
export AWS_ACCESS_KEY_ID=$RUSTFS_ACCESS_KEY
export AWS_SECRET_ACCESS_KEY=$RUSTFS_SECRET_KEY
aws --endpoint-url "$RUSTFS_ENDPOINT" s3 ls s3://fleet-db/
```
Expected: listing includes `daily/` (pg_dump backups) — this confirms credentials and endpoint.
- [ ] **Step 2: Create placeholder marker files to establish prefixes**
S3-compatible stores create "folders" lazily — a zero-byte marker makes them visible immediately:
```bash
echo "" | aws --endpoint-url "$RUSTFS_ENDPOINT" s3 cp - s3://fleet-db/dwh/exports/.keep
echo "" | aws --endpoint-url "$RUSTFS_ENDPOINT" s3 cp - s3://fleet-db/dwh/processed/.keep
```
- [ ] **Step 3: Verify prefixes visible**
Run:
```bash
aws --endpoint-url "$RUSTFS_ENDPOINT" s3 ls s3://fleet-db/dwh/
```
Expected:
```
PRE exports/
PRE processed/
```
- [ ] **Step 4: No commit (remote-only state)**
---
## Phase C — n8n Credential Hardening
### Task 7: Update `tracksolid_dwh_target` credential in n8n
**Files:** n8n credential store only (not in repo).
- [ ] **Step 1: Edit credential in n8n UI**
Open n8n → Credentials → `tracksolid_dwh_target` (or create if not present). Set:
- Host: `31.97.44.246`
- Port: `5888`
- Database: `tracksolid_dwh`
- User: `dwh_owner`
- Password: (the `DWH_OWNER_PW` from Task 3, now in Coolify secrets)
- SSL: `require`
- [ ] **Step 2: Test connection**
Click "Test" in n8n credential dialog. Expected: `Connection tested successfully`.
- [ ] **Step 3: Paper trail — record the change in runbook draft**
No commit yet. Note in a scratch file that credential was updated; the runbook (Task 27) will document the final state.
---
### Task 8: Update `tracksolid_source` credential to use `grafana_ro`
**Files:** n8n credential store only.
- [ ] **Step 1: Confirm `grafana_ro` exists on source DB**
The source already has `grafana_ro` per CLAUDE.md. Verify:
```bash
DB=$(docker ps --filter name=timescale_db --format "{{.Names}}" | head -1)
docker exec "$DB" psql -U postgres -d tracksolid_db -c "\du grafana_ro"
```
Expected: role exists with LOGIN. If missing, create with SELECT-only grants across `tracksolid` schema.
- [ ] **Step 2: Update n8n credential**
In n8n UI edit `tracksolid_source`:
- User: `grafana_ro`
- Password: (from `.env` `GRAFANA_DB_RO_PASSWORD`)
Test connection — expected success.
- [ ] **Step 3: Smoke-test read access from n8n**
Create a throwaway n8n Postgres node with `SELECT count(*) FROM tracksolid.devices;` → execute once. Expected: `63` (or current count). Delete the throwaway node.
---
## Phase D — Workflow 2 (`dwh_load_bronze`)
### Task 9: Create Workflow 2 skeleton with Execute Workflow trigger
**Files:**
- Create in n8n UI: workflow `dwh_load_bronze`
- Export to: `n8n-workflows/dwh_load_bronze.json` (after each task step that changes it)
**Purpose:** The load workflow is the callee. Building it first means Workflow 1 can be tested incrementally against a working load target.
- [ ] **Step 1: Create new workflow in n8n UI**
n8n → New Workflow → Name: `dwh_load_bronze`. Add an "Execute Workflow Trigger" node as the starting node.
Configure the trigger's input schema (n8n auto-detects; set these as documentation):
```
{
"table": "string (required) — one of: devices, live_positions, position_history, trips, alarms, parking_events, device_events, ingestion_log",
"csv_path": "string (required) — rustfs key, e.g. dwh/exports/devices/20260424_0800_EAT.csv",
"run_id": "integer (required) — dwh_control.extract_runs.run_id produced by Workflow 1",
"run_started_at": "string ISO-8601 — used as the upper watermark bound"
}
```
- [ ] **Step 2: Export to repo as initial skeleton**
Click ⋯ → Download → save as `n8n-workflows/dwh_load_bronze.json`.
- [ ] **Step 3: Commit**
```bash
git add n8n-workflows/dwh_load_bronze.json
git commit -m "feat(n8n): scaffold dwh_load_bronze workflow"
```
---
### Task 10: Add rustfs download node to Workflow 2
**Files:**
- Modify: `n8n-workflows/dwh_load_bronze.json` (via n8n UI → Download)
- [ ] **Step 1: Add node `Download CSV from rustfs`**
Node type: `S3`
Operation: `Download`
Credential: `rustfs_s3`
Parameters:
- Bucket Name: `fleet-db`
- File Key: `={{ $json.csv_path }}`
- Binary Property: `data`
Wire: `Execute Workflow Trigger → Download CSV from rustfs`.
- [ ] **Step 2: Manually test with a hand-crafted CSV**
Create a tiny test CSV locally and upload:
```bash
cat > /tmp/test_devices.csv <<'CSV'
imei,vehicle_number,driver_name
862798000000001,TEST-01,Test Driver
CSV
aws --endpoint-url "$RUSTFS_ENDPOINT" s3 cp /tmp/test_devices.csv s3://fleet-db/dwh/exports/devices/_smoke_test.csv
```
In n8n UI click "Execute Workflow" on `dwh_load_bronze` → supply test input:
```json
{
"table": "devices",
"csv_path": "dwh/exports/devices/_smoke_test.csv",
"run_id": 0,
"run_started_at": "2026-04-24T12:00:00Z"
}
```
Expected: Download node output shows a binary item with the CSV content.
- [ ] **Step 3: Export + commit**
```bash
# After exporting the updated JSON from n8n
git add n8n-workflows/dwh_load_bronze.json
git commit -m "feat(n8n): add rustfs download step to dwh_load_bronze"
```
---
### Task 11: Add CSV parse + bronze-upsert node for `devices` (snapshot pattern)
**Files:**
- Modify: `n8n-workflows/dwh_load_bronze.json`
**Purpose:** Get ONE table end-to-end before parameterising. `devices` is the simplest — no geometry, small row count, TRUNCATE+INSERT pattern.
- [ ] **Step 1: Add node `Parse CSV`**
Node type: `Extract From File` → Operation: `Extract From CSV`
Parameters:
- Binary Property: `data`
- Options → Header Row: enabled
- Options → Delimiter: `,`
Wire: `Download CSV → Parse CSV`.
- [ ] **Step 2: Add Switch node `Route by table`**
Node type: `Switch`
Rules: one output per `{{$node["Execute Workflow Trigger"].json.table}}` value. For this task only wire the `devices` branch; others will be added in later tasks.
- [ ] **Step 3: Add node `Load bronze.devices (snapshot)`**
Node type: `Postgres`
Operation: `Execute Query`
Credential: `tracksolid_dwh_target`
Query (parameterised):
```sql
BEGIN;
TRUNCATE bronze.devices;
INSERT INTO bronze.devices (imei, vehicle_number, driver_name /* + all other devices columns */)
SELECT imei, vehicle_number, driver_name /* ... */
FROM json_populate_recordset(NULL::bronze.devices, $1::json);
UPDATE dwh_control.extract_watermarks
SET last_loaded_at = NOW(),
rows_loaded_last_run = (SELECT count(*) FROM bronze.devices),
updated_at = NOW()
WHERE table_name = 'devices';
UPDATE dwh_control.extract_runs
SET status = 'loaded',
run_finished_at = NOW(),
rows_loaded = (SELECT count(*) FROM bronze.devices)
WHERE run_id = $2;
COMMIT;
```
Query Parameters:
- `$1`: `={{ JSON.stringify($node["Parse CSV"].json) }}`
- `$2`: `={{ $node["Execute Workflow Trigger"].json.run_id }}`
**Note on `json_populate_recordset`:** this is the cleanest way to bulk-load n8n's per-row items into a target table when schemas align. If column names in the CSV exactly match `bronze.devices` column names, this works with no per-column mapping. If the CSV has extra or renamed columns, use an explicit `SELECT col1, col2, ...` instead.
- [ ] **Step 4: Seed a run_id row for the smoke test**
Before testing, insert a row the workflow will update:
```bash
PGPASSWORD=$DWH_OWNER_PW psql -h 31.97.44.246 -p 5888 -U dwh_owner -d tracksolid_dwh -c \
"INSERT INTO dwh_control.extract_runs (table_name, run_started_at, status, csv_path) VALUES ('devices', NOW(), 'uploaded', 'dwh/exports/devices/_smoke_test.csv') RETURNING run_id;"
```
Record the returned `run_id` (e.g. `1`).
- [ ] **Step 5: Execute workflow against smoke-test CSV**
Input to Execute Workflow Trigger:
```json
{
"table": "devices",
"csv_path": "dwh/exports/devices/_smoke_test.csv",
"run_id": <the run_id from Step 4>,
"run_started_at": "2026-04-24T12:00:00Z"
}
```
Expected: all nodes green.
- [ ] **Step 6: Verify bronze and control state**
Run:
```bash
PGPASSWORD=$DWH_OWNER_PW psql -h 31.97.44.246 -p 5888 -U dwh_owner -d tracksolid_dwh <<'SQL'
SELECT count(*) AS devices_rows FROM bronze.devices;
SELECT run_id, status, rows_loaded, run_finished_at
FROM dwh_control.extract_runs WHERE table_name='devices' ORDER BY run_id DESC LIMIT 1;
SELECT rows_loaded_last_run, last_loaded_at
FROM dwh_control.extract_watermarks WHERE table_name='devices';
SQL
```
Expected:
- `devices_rows = 1` (just the smoke-test row)
- `status = 'loaded'`, `rows_loaded = 1`, `run_finished_at` populated
- `rows_loaded_last_run = 1`, `last_loaded_at` populated
- [ ] **Step 7: Clean up the smoke-test row**
```bash
PGPASSWORD=$DWH_OWNER_PW psql -h 31.97.44.246 -p 5888 -U dwh_owner -d tracksolid_dwh -c \
"TRUNCATE bronze.devices; UPDATE dwh_control.extract_watermarks SET last_loaded_at=NULL, rows_loaded_last_run=NULL WHERE table_name='devices';"
```
- [ ] **Step 8: Export + commit**
```bash
# After export
git add n8n-workflows/dwh_load_bronze.json
git commit -m "feat(n8n): add devices snapshot load path to dwh_load_bronze"
```
---
### Task 12: Add incremental-load path for `position_history` (with geometry)
**Files:**
- Modify: `n8n-workflows/dwh_load_bronze.json`
**Purpose:** This proves the hardest case — composite conflict target + PostGIS geometry round-trip.
- [ ] **Step 1: Add node `Load bronze.position_history (incremental)`**
Node type: `Postgres`
Credential: `tracksolid_dwh_target`
Query:
```sql
BEGIN;
INSERT INTO bronze.position_history
(imei, gps_time, geom, lat, lng, speed, direction, acc_status, satellite, current_mileage, recorded_at)
SELECT
imei,
gps_time::timestamptz,
CASE WHEN geom_ewkt IS NULL OR geom_ewkt = '' THEN NULL ELSE ST_GeomFromEWKT(geom_ewkt) END,
lat::double precision,
lng::double precision,
speed::numeric,
direction::numeric,
acc_status,
satellite::smallint,
current_mileage::numeric,
recorded_at::timestamptz
FROM json_populate_recordset(NULL::record, $1::json) AS r(
imei text, gps_time text, geom_ewkt text, lat text, lng text,
speed text, direction text, acc_status text, satellite text,
current_mileage text, recorded_at text
)
ON CONFLICT (imei, gps_time) DO NOTHING;
WITH counts AS (SELECT count(*) AS c FROM json_populate_recordset(NULL::record, $1::json) AS r(imei text))
UPDATE dwh_control.extract_watermarks
SET last_extracted_at = $3::timestamptz,
last_loaded_at = NOW(),
rows_loaded_last_run = (SELECT c FROM counts),
updated_at = NOW()
WHERE table_name = 'position_history';
UPDATE dwh_control.extract_runs
SET status = 'loaded',
run_finished_at = NOW(),
rows_loaded = (SELECT c FROM (SELECT count(*) AS c FROM json_populate_recordset(NULL::record, $1::json) AS r(imei text)) AS s)
WHERE run_id = $2;
COMMIT;
```
Query Parameters:
- `$1`: `={{ JSON.stringify($node["Parse CSV"].json) }}`
- `$2`: `={{ $node["Execute Workflow Trigger"].json.run_id }}`
- `$3`: `={{ $node["Execute Workflow Trigger"].json.run_started_at }}`
Wire the `position_history` branch of the Switch node to this node.
- [ ] **Step 2: Prepare a smoke-test CSV with one geometry row**
```bash
cat > /tmp/test_ph.csv <<'CSV'
imei,gps_time,geom_ewkt,lat,lng,speed,direction,acc_status,satellite,current_mileage,recorded_at
862798000000001,2026-04-24T10:00:00Z,SRID=4326;POINT(36.82 -1.29),-1.29,36.82,42.5,180,on,12,123456.78,2026-04-24T10:00:05Z
CSV
aws --endpoint-url "$RUSTFS_ENDPOINT" s3 cp /tmp/test_ph.csv s3://fleet-db/dwh/exports/position_history/_smoke_test.csv
```
- [ ] **Step 3: Seed devices row (FK) and run_id**
```bash
PGPASSWORD=$DWH_OWNER_PW psql -h 31.97.44.246 -p 5888 -U dwh_owner -d tracksolid_dwh <<'SQL'
INSERT INTO bronze.devices (imei) VALUES ('862798000000001') ON CONFLICT DO NOTHING;
INSERT INTO dwh_control.extract_runs (table_name, run_started_at, status, csv_path)
VALUES ('position_history', NOW(), 'uploaded', 'dwh/exports/position_history/_smoke_test.csv')
RETURNING run_id;
SQL
```
Record the `run_id`.
- [ ] **Step 4: Execute workflow**
Input:
```json
{
"table": "position_history",
"csv_path": "dwh/exports/position_history/_smoke_test.csv",
"run_id": <run_id>,
"run_started_at": "2026-04-24T10:30:00Z"
}
```
- [ ] **Step 5: Verify geometry round-trip**
```bash
PGPASSWORD=$DWH_OWNER_PW psql -h 31.97.44.246 -p 5888 -U dwh_owner -d tracksolid_dwh -c \
"SELECT imei, gps_time, ST_AsText(geom) AS geom_wkt, lat, lng FROM bronze.position_history WHERE imei='862798000000001';"
```
Expected:
```
imei | gps_time | geom_wkt | lat | lng
-----------------+--------------------+----------------+-------+-------
862798000000001 | 2026-04-24 10:00:00| POINT(36.82 -1.29) | -1.29 | 36.82
```
- [ ] **Step 6: Verify idempotency by re-running**
Execute the workflow a second time with identical input. Expected: no new row in bronze.position_history (ON CONFLICT DO NOTHING), but `rows_loaded_last_run` in watermarks still reports 1 (rows received, not rows new — this is expected behaviour and documented in the runbook).
- [ ] **Step 7: Clean up**
```bash
PGPASSWORD=$DWH_OWNER_PW psql -h 31.97.44.246 -p 5888 -U dwh_owner -d tracksolid_dwh -c \
"TRUNCATE bronze.position_history; DELETE FROM bronze.devices WHERE imei='862798000000001';"
```
- [ ] **Step 8: Export + commit**
```bash
git add n8n-workflows/dwh_load_bronze.json
git commit -m "feat(n8n): add position_history incremental load with PostGIS round-trip"
```
---
### Task 13: Add remaining 6 load paths (`live_positions`, `trips`, `alarms`, `parking_events`, `device_events`, `ingestion_log`)
**Files:**
- Modify: `n8n-workflows/dwh_load_bronze.json`
**Purpose:** Copy the pattern from Task 12 (incremental) or Task 11 (snapshot) for each remaining table. One table per commit so regressions are bisectable.
Each sub-task follows this shape:
- [ ] **Sub-task 13a: `live_positions` (snapshot, has geometry)**
Query pattern:
```sql
BEGIN;
TRUNCATE bronze.live_positions;
INSERT INTO bronze.live_positions
(imei, geom, lat, lng, /* ...all cols... */)
SELECT
imei,
CASE WHEN geom_ewkt IS NULL OR geom_ewkt='' THEN NULL ELSE ST_GeomFromEWKT(geom_ewkt) END,
lat::double precision, lng::double precision,
/* ...casts per column... */
FROM json_populate_recordset(NULL::record, $1::json) AS r(
imei text, geom_ewkt text, lat text, lng text /* ... */
);
UPDATE dwh_control.extract_watermarks
SET last_loaded_at = NOW(),
rows_loaded_last_run = (SELECT count(*) FROM bronze.live_positions),
updated_at = NOW()
WHERE table_name = 'live_positions';
UPDATE dwh_control.extract_runs
SET status='loaded', run_finished_at = NOW(),
rows_loaded = (SELECT count(*) FROM bronze.live_positions)
WHERE run_id = $2;
COMMIT;
```
Smoke test + commit: follow the shape of Task 11 steps 48.
- [ ] **Sub-task 13b: `trips` (incremental, has geometry ×2)**
Conflict target: `(id)`. Geometry columns: `start_geom`, `end_geom` (both optional). Watermark column: `updated_at`.
Query shape — key diff from Task 12: two `ST_GeomFromEWKT` calls, one per geometry column, and conflict target is `(id)`:
```sql
INSERT INTO bronze.trips (id, imei, start_time, end_time, start_geom, end_geom, distance_m, avg_speed_kmh, max_speed_kmh, updated_at)
SELECT
id::bigint, imei, start_time::timestamptz, end_time::timestamptz,
CASE WHEN start_geom_ewkt IS NULL OR start_geom_ewkt='' THEN NULL ELSE ST_GeomFromEWKT(start_geom_ewkt) END,
CASE WHEN end_geom_ewkt IS NULL OR end_geom_ewkt='' THEN NULL ELSE ST_GeomFromEWKT(end_geom_ewkt) END,
distance_m::numeric, avg_speed_kmh::numeric, max_speed_kmh::numeric, updated_at::timestamptz
FROM json_populate_recordset(NULL::record, $1::json) AS r(
id text, imei text, start_time text, end_time text,
start_geom_ewkt text, end_geom_ewkt text,
distance_m text, avg_speed_kmh text, max_speed_kmh text, updated_at text
)
ON CONFLICT (id) DO NOTHING;
```
Then the matching watermarks + extract_runs updates (same shape as Task 12 Step 1).
Smoke test + commit.
- [ ] **Sub-task 13c: `alarms` (incremental, has geometry)** — conflict on `(id)`, watermark `updated_at`, one `geom`. Smoke test + commit.
- [ ] **Sub-task 13d: `parking_events` (incremental, has geometry)** — conflict on `(id)`, watermark `updated_at`, one `geom`. Smoke test + commit.
- [ ] **Sub-task 13e: `device_events` (incremental, no geometry)** — conflict on `(id)`, watermark `created_at` (source column name — in the CSV we'll preserve whatever header Workflow 1 emits; match here). Smoke test + commit.
- [ ] **Sub-task 13f: `ingestion_log` (incremental, no geometry)** — conflict on `(id)`, watermark `run_at`. Smoke test + commit.
After all six sub-tasks, Workflow 2 has 8 parallel load paths from the Switch node.
---
## Phase E — Workflow 1 (`dwh_extract`)
### Task 14: Create Workflow 1 skeleton with Schedule Trigger (disabled)
**Files:**
- Create in n8n UI: workflow `dwh_extract`
- Export to: `n8n-workflows/dwh_extract.json`
- [ ] **Step 1: New workflow in n8n UI**
Name: `dwh_extract`. Status: Disabled (we'll enable only after go-live in Task 26).
- [ ] **Step 2: Add Schedule Trigger node**
Node type: `Schedule Trigger`
Cron expression: `0 5,8,11,14,17,20,23 * * *`
Timezone: `Africa/Nairobi`
- [ ] **Step 3: Add Set node `init_run_context`**
Node type: `Set` (Edit Fields)
Mode: `Manual mapping` → Keep Only Set fields
Add:
- `run_started_at` = `={{ $now.toISO() }}`
Wire: `Schedule Trigger → init_run_context`.
- [ ] **Step 4: Export + commit**
```bash
git add n8n-workflows/dwh_extract.json
git commit -m "feat(n8n): scaffold dwh_extract workflow (disabled)"
```
---
### Task 15: Build the `devices` extract branch (snapshot)
**Files:**
- Modify: `n8n-workflows/dwh_extract.json`
- [ ] **Step 1: Add Postgres node `Extract: devices`**
Node type: `Postgres`
Credential: `tracksolid_source`
Operation: `Execute Query`
Query:
```sql
-- devices is a snapshot table: no watermark, just full dump.
SELECT imei, vehicle_number, driver_name, driver_phone, sim, /* + remaining 22 columns */
assigned_city, device_model, created_at, updated_at
FROM tracksolid.devices
ORDER BY imei;
```
- [ ] **Step 2: Add Postgres node `Insert extract_runs (devices)`**
Credential: `tracksolid_dwh_target`
Query:
```sql
INSERT INTO dwh_control.extract_runs (table_name, run_started_at, status, rows_extracted)
VALUES ('devices', $1::timestamptz, 'extracting', $2::int)
RETURNING run_id;
```
Parameters:
- `$1`: `={{ $node["init_run_context"].json.run_started_at }}`
- `$2`: `={{ $node["Extract: devices"].json.length }}`
- [ ] **Step 3: Add node `Format as CSV`**
Node type: `Convert to File` → Operation: `Convert to CSV`
Parameters:
- Binary Property: `data`
- Input (JSON items): `={{ $node["Extract: devices"].json }}`
- [ ] **Step 4: Add node `Upload CSV to rustfs`**
Node type: `S3`
Operation: `Upload`
Credential: `rustfs_s3`
Parameters:
- Bucket: `fleet-db`
- File Key: `=dwh/exports/devices/{{ $now.setZone('Africa/Nairobi').toFormat('yyyyLLdd_HHmm') }}_EAT.csv`
- Binary Property: `data`
- [ ] **Step 5: Add Postgres node `Update extract_runs status='uploaded'`**
Query:
```sql
UPDATE dwh_control.extract_runs
SET status = 'uploaded', csv_path = $1
WHERE run_id = $2;
```
Parameters:
- `$1`: the file key from Step 4 (capture via Set node or re-compute)
- `$2`: the `run_id` from Step 2
- [ ] **Step 6: Add node `Trigger Workflow 2 for devices`**
Node type: `Execute Workflow`
Workflow: `dwh_load_bronze`
Input:
```json
{
"table": "devices",
"csv_path": "={{ file_key from Step 4 }}",
"run_id": "={{ $node['Insert extract_runs (devices)'].json.run_id }}",
"run_started_at": "={{ $node['init_run_context'].json.run_started_at }}"
}
```
- [ ] **Step 7: Manually execute `dwh_extract` workflow (single-table mode)**
Use n8n's "Execute Workflow" button. Monitor: every node green, Workflow 2 completes, bronze.devices populated with real row count.
- [ ] **Step 8: Verify end-to-end**
```bash
PGPASSWORD=$DWH_RO_PW psql -h 31.97.44.246 -p 5888 -U dwh_ro -d tracksolid_dwh -c \
"SELECT (SELECT count(*) FROM bronze.devices) AS bronze_count,
(SELECT rows_loaded FROM dwh_control.extract_runs WHERE table_name='devices' ORDER BY run_id DESC LIMIT 1) AS last_rows_loaded;"
```
Cross-check against source:
```bash
DB=$(docker ps --filter name=timescale_db --format "{{.Names}}" | head -1)
docker exec "$DB" psql -U grafana_ro -d tracksolid_db -c "SELECT count(*) FROM tracksolid.devices;"
```
Expected: both counts match (current source = 63).
- [ ] **Step 9: Export + commit**
```bash
git add n8n-workflows/dwh_extract.json
git commit -m "feat(n8n): add devices extract branch to dwh_extract"
```
---
### Task 16: Build the `position_history` extract branch (incremental with watermark)
**Files:**
- Modify: `n8n-workflows/dwh_extract.json`
**Purpose:** Prove the incremental pattern end-to-end for the hardest table (geometry + large row counts + watermark).
- [ ] **Step 1: Add Postgres node `Read watermark: position_history`**
Credential: `tracksolid_dwh_target`
Query:
```sql
SELECT last_extracted_at FROM dwh_control.extract_watermarks WHERE table_name = 'position_history';
```
- [ ] **Step 2: Add Postgres node `Extract: position_history`**
Credential: `tracksolid_source`
Query:
```sql
SELECT
imei,
gps_time,
CASE WHEN geom IS NULL THEN NULL ELSE ST_AsEWKT(geom) END AS geom_ewkt,
lat, lng, speed, direction, acc_status, satellite, current_mileage, recorded_at
FROM tracksolid.position_history
WHERE recorded_at > $1::timestamptz
AND recorded_at <= $2::timestamptz
ORDER BY recorded_at;
```
Parameters:
- `$1`: `={{ $node['Read watermark: position_history'].json.last_extracted_at }}`
- `$2`: `={{ $node['init_run_context'].json.run_started_at }}`
- [ ] **Step 3: Add `Insert extract_runs`, `Format as CSV`, `Upload CSV`, `Update extract_runs`, `Trigger Workflow 2`**
Follow the shape of Task 15 Steps 26 with these changes:
- `table` = `position_history`
- Extract SQL uses watermark bounds from Steps 12
- CSV key: `dwh/exports/position_history/YYYYMMDD_HHMM_EAT.csv`
- Workflow 2 input `table` = `position_history`
- [ ] **Step 4: Execute and verify end-to-end**
Execute `dwh_extract` workflow manually. Expected (first run with seeded 2026-01-01 watermark): backlog of all position_history rows pulled in one CSV, loaded into bronze.
Verify row-count parity:
```bash
# Source
docker exec "$DB" psql -U grafana_ro -d tracksolid_db -c \
"SELECT count(*) FROM tracksolid.position_history;"
# Bronze
PGPASSWORD=$DWH_RO_PW psql -h 31.97.44.246 -p 5888 -U dwh_ro -d tracksolid_dwh -c \
"SELECT count(*) FROM bronze.position_history;"
```
Expected: counts match (current source ≈ 519).
Verify geometry round-trip on a sample:
```bash
PGPASSWORD=$DWH_RO_PW psql -h 31.97.44.246 -p 5888 -U dwh_ro -d tracksolid_dwh -c \
"SELECT imei, gps_time, ST_AsText(geom) FROM bronze.position_history ORDER BY gps_time DESC LIMIT 3;"
```
Expected: valid `POINT(lng lat)` values.
- [ ] **Step 5: Verify watermark advanced**
```bash
PGPASSWORD=$DWH_RO_PW psql -h 31.97.44.246 -p 5888 -U dwh_ro -d tracksolid_dwh -c \
"SELECT last_extracted_at, last_loaded_at, rows_loaded_last_run FROM dwh_control.extract_watermarks WHERE table_name='position_history';"
```
Expected: `last_extracted_at` ≈ the `run_started_at` from the execution (not 2026-01-01 anymore).
- [ ] **Step 6: Second execution — verify incremental behaviour**
Execute `dwh_extract` again immediately. Expected: `rows_extracted ≈ 0` (nothing has changed in the seconds between runs), CSV uploaded is nearly-empty, bronze row count unchanged.
- [ ] **Step 7: Export + commit**
```bash
git add n8n-workflows/dwh_extract.json
git commit -m "feat(n8n): add position_history incremental extract with watermark"
```
---
### Task 17: Build the remaining 6 extract branches
**Files:**
- Modify: `n8n-workflows/dwh_extract.json`
Follow Task 15 (snapshot pattern) or Task 16 (incremental pattern) per table. One branch per commit.
- [ ] **Sub-task 17a: `live_positions` (snapshot, has geometry)** — Follow Task 15 shape; include `ST_AsEWKT(geom) AS geom_ewkt` in SELECT.
- [ ] **Sub-task 17b: `trips` (incremental, geometry ×2, watermark `updated_at`)** — Two `ST_AsEWKT` calls (`start_geom`, `end_geom`).
- [ ] **Sub-task 17c: `alarms` (incremental, has geometry, watermark `updated_at`)**
- [ ] **Sub-task 17d: `parking_events` (incremental, has geometry, watermark `updated_at`)**
- [ ] **Sub-task 17e: `device_events` (incremental, no geometry, watermark `created_at`)**
- [ ] **Sub-task 17f: `ingestion_log` (incremental, no geometry, watermark `run_at`)**
After all six, `dwh_extract` has 8 parallel extract branches, each ending in a `Trigger Workflow 2` node.
Commit after each.
---
### Task 18: Add per-branch error handling and `status='failed'` marker
**Files:**
- Modify: `n8n-workflows/dwh_extract.json`
- Modify: `n8n-workflows/dwh_load_bronze.json`
**Purpose:** If any node in a branch throws, mark the corresponding `extract_runs` row as `failed` with the error, so the observability queries surface it.
- [ ] **Step 1: On each branch in Workflow 1, set node `On Error` → `Continue` for the failure path**
For each extract branch: after the Upload or Trigger Workflow 2 node, wire an "error output" to a new Postgres node:
```sql
UPDATE dwh_control.extract_runs
SET status = 'failed',
run_finished_at = NOW(),
error_message = $1
WHERE run_id = $2;
```
Parameters:
- `$1`: `={{ $json.error?.message || 'unknown error' }}`
- `$2`: the `run_id` captured earlier in the branch
- [ ] **Step 2: Same pattern on Workflow 2**
If the load transaction fails, the trigger Postgres node throws; wire its error output to a marker node with the same shape.
- [ ] **Step 3: Intentional failure test**
On Workflow 1, temporarily break the `trips` branch's upload node (e.g. wrong bucket name). Execute the workflow. Expected:
- Other branches succeed.
- `trips` branch's `extract_runs` row transitions to `status='failed'` with the error message populated.
Verify:
```bash
PGPASSWORD=$DWH_RO_PW psql -h 31.97.44.246 -p 5888 -U dwh_ro -d tracksolid_dwh -c \
"SELECT status, error_message FROM dwh_control.extract_runs WHERE table_name='trips' ORDER BY run_id DESC LIMIT 1;"
```
Expected: `failed`, error message visible.
Restore the correct bucket name.
- [ ] **Step 4: Export + commit**
```bash
git add n8n-workflows/dwh_extract.json n8n-workflows/dwh_load_bronze.json
git commit -m "feat(n8n): add failure-state marking to dwh workflows"
```
---
### Task 19: Add CSV-move step at end of Workflow 2
**Files:**
- Modify: `n8n-workflows/dwh_load_bronze.json`
- [ ] **Step 1: Add node `Move CSV to processed/`**
Node type: `S3`
Operation: `Copy` (or "Move" if the n8n S3 node supports native move; otherwise Copy then Delete)
Parameters:
- Source bucket: `fleet-db`, source key: `={{ $node['Execute Workflow Trigger'].json.csv_path }}`
- Destination bucket: `fleet-db`, destination key: `={{ $node['Execute Workflow Trigger'].json.csv_path.replace('dwh/exports/','dwh/processed/') }}`
Wire AFTER the successful branch of the load Postgres node (so failed loads leave the CSV in `exports/` for natural retry).
- [ ] **Step 2: Add node `Delete source CSV`**
Node type: `S3`
Operation: `Delete`
Parameters:
- Bucket: `fleet-db`
- Key: `={{ $node['Execute Workflow Trigger'].json.csv_path }}`
Wire: after Copy.
- [ ] **Step 3: Verify move behaviour**
Execute the full pipeline for `devices` once. Expected after run:
```bash
aws --endpoint-url "$RUSTFS_ENDPOINT" s3 ls s3://fleet-db/dwh/exports/devices/
# should NOT show the new CSV
aws --endpoint-url "$RUSTFS_ENDPOINT" s3 ls s3://fleet-db/dwh/processed/devices/
# should show the new CSV
```
- [ ] **Step 4: Export + commit**
```bash
git add n8n-workflows/dwh_load_bronze.json
git commit -m "feat(n8n): move loaded CSVs to dwh/processed/ audit trail"
```
---
### Task 20: End-to-end full-workflow smoke test
- [ ] **Step 1: Truncate bronze + reset watermarks**
```bash
PGPASSWORD=$DWH_OWNER_PW psql -h 31.97.44.246 -p 5888 -U dwh_owner -d tracksolid_dwh <<'SQL'
TRUNCATE bronze.devices, bronze.live_positions, bronze.position_history, bronze.trips,
bronze.alarms, bronze.parking_events, bronze.device_events, bronze.ingestion_log
RESTART IDENTITY CASCADE;
UPDATE dwh_control.extract_watermarks
SET last_extracted_at = '2026-01-01', last_loaded_at = NULL, rows_loaded_last_run = NULL;
DELETE FROM dwh_control.extract_runs;
SQL
```
- [ ] **Step 2: Manually execute `dwh_extract`**
Click "Execute Workflow" in n8n. All 8 branches should run in parallel.
- [ ] **Step 3: Row-count parity across all 8 tables**
Script:
```bash
for TBL in devices live_positions position_history trips alarms parking_events device_events ingestion_log; do
SRC=$(docker exec "$DB" psql -U grafana_ro -d tracksolid_db -tAc "SELECT count(*) FROM tracksolid.$TBL;")
TGT=$(PGPASSWORD=$DWH_RO_PW psql -h 31.97.44.246 -p 5888 -U dwh_ro -d tracksolid_dwh -tAc "SELECT count(*) FROM bronze.$TBL;")
echo "$TBL source=$SRC bronze=$TGT"
done
```
Expected: every row shows matching counts (within the run window — position_history and ingestion_log may differ by a handful if the source ingested during the run).
- [ ] **Step 4: All runs marked `loaded`**
```bash
PGPASSWORD=$DWH_RO_PW psql -h 31.97.44.246 -p 5888 -U dwh_ro -d tracksolid_dwh -c \
"SELECT table_name, status, rows_loaded FROM dwh_control.extract_runs ORDER BY table_name;"
```
Expected: 8 rows, all `status='loaded'`, `rows_loaded` non-null.
- [ ] **Step 5: No commit (verification only)**
If any table fails parity, pause here and debug. Do not move to Phase F until all 8 tables pass.
---
## Phase F — Observability & Go-live
### Task 21: Create error-notification workflow
**Files:**
- Create: `n8n-workflows/dwh_error_notifier.json`
- [ ] **Step 1: New workflow `dwh_error_notifier`**
Trigger: `Error Trigger` node (n8n's built-in error-workflow trigger).
- [ ] **Step 2: Format + send notification**
Add HTTP Request node pointing to the team's Slack/webhook endpoint (read URL from env var `TEAM_ALERT_WEBHOOK`). Message body template:
```
DWH pipeline failure
Workflow: {{ $json.workflow.name }}
Node: {{ $json.execution.lastNodeExecuted }}
Error: {{ $json.execution.error.message }}
Time: {{ $now.toISO() }}
```
- [ ] **Step 3: Wire as Error Workflow on both pipeline workflows**
In `dwh_extract` and `dwh_load_bronze` → Settings → Error Workflow → select `dwh_error_notifier`.
- [ ] **Step 4: Verify with an intentional failure**
Break one node temporarily; execute the workflow; confirm the notification lands in Slack. Restore.
- [ ] **Step 5: Export + commit**
```bash
git add n8n-workflows/dwh_error_notifier.json n8n-workflows/dwh_extract.json n8n-workflows/dwh_load_bronze.json
git commit -m "feat(n8n): add dwh_error_notifier wired to both pipeline workflows"
```
---
### Task 22: Add freshness + failure SQL views to `dwh_control`
**Files:**
- Create: `dwh/261004_dwh_observability_views.sql`
- [ ] **Step 1: Write the migration**
```sql
-- dwh/261004_dwh_observability_views.sql
-- Convenience views for Grafana panels and manual health checks.
BEGIN;
CREATE OR REPLACE VIEW dwh_control.v_table_freshness AS
SELECT
table_name,
MAX(run_finished_at) AS last_loaded_at,
NOW() - MAX(run_finished_at) AS lag,
CASE WHEN MAX(run_finished_at) < NOW() - INTERVAL '4 hours' THEN TRUE ELSE FALSE END AS is_stale
FROM dwh_control.extract_runs
WHERE status = 'loaded'
GROUP BY table_name;
CREATE OR REPLACE VIEW dwh_control.v_recent_failures AS
SELECT run_id, table_name, run_started_at, error_message
FROM dwh_control.extract_runs
WHERE status = 'failed'
AND run_started_at > NOW() - INTERVAL '24 hours'
ORDER BY run_started_at DESC;
GRANT SELECT ON dwh_control.v_table_freshness TO dwh_ro;
GRANT SELECT ON dwh_control.v_recent_failures TO dwh_ro;
COMMIT;
```
- [ ] **Step 2: Apply and verify**
```bash
PGPASSWORD=<postgres_password> psql -h 31.97.44.246 -p 5888 -U postgres -d tracksolid_dwh \
-f dwh/261004_dwh_observability_views.sql
PGPASSWORD=$DWH_RO_PW psql -h 31.97.44.246 -p 5888 -U dwh_ro -d tracksolid_dwh -c \
"SELECT * FROM dwh_control.v_table_freshness;"
```
Expected: 8 rows (one per table), `is_stale` should be FALSE for all tables right after Task 20.
- [ ] **Step 3: Commit**
```bash
git add dwh/261004_dwh_observability_views.sql
git commit -m "feat(dwh): add observability views v_table_freshness and v_recent_failures"
```
---
### Task 23: Enable the cron schedule on `dwh_extract`
- [ ] **Step 1: Pre-enable check**
Confirm:
- Task 20 passed (full parity across 8 tables)
- Task 21 error-workflow wired
- Task 22 freshness view shows all 8 tables fresh
- [ ] **Step 2: Toggle `dwh_extract` workflow to Active in n8n UI**
Flip the toggle. First scheduled run will fire at the next cron tick (one of 05,08,11,14,17,20,23 EAT).
- [ ] **Step 3: Watch the first scheduled run**
Wait for the next cron tick. Monitor n8n Executions page — expect all 8 branches green within ~1 minute of the trigger.
- [ ] **Step 4: Verify run was recorded**
```bash
PGPASSWORD=$DWH_RO_PW psql -h 31.97.44.246 -p 5888 -U dwh_ro -d tracksolid_dwh -c \
"SELECT table_name, status, run_started_at FROM dwh_control.extract_runs WHERE run_started_at > NOW() - INTERVAL '10 minutes' ORDER BY table_name;"
```
Expected: 8 rows, all `loaded`, recent `run_started_at`.
- [ ] **Step 5: Export + commit**
```bash
# Export dwh_extract.json after toggling Active (this state persists in the JSON)
git add n8n-workflows/dwh_extract.json
git commit -m "feat(n8n): enable cron schedule on dwh_extract (7x daily, EAT)"
```
---
### Task 24: 24-hour steady-state verification
- [ ] **Step 1: Wait 24 hours after Task 23 go-live**
This is a gate, not an action.
- [ ] **Step 2: Verify all 7 scheduled runs completed**
```bash
PGPASSWORD=$DWH_RO_PW psql -h 31.97.44.246 -p 5888 -U dwh_ro -d tracksolid_dwh <<'SQL'
SELECT
date_trunc('hour', run_started_at) AS hr,
count(*) FILTER (WHERE status='loaded') AS loaded,
count(*) FILTER (WHERE status='failed') AS failed
FROM dwh_control.extract_runs
WHERE run_started_at > NOW() - INTERVAL '24 hours'
GROUP BY 1 ORDER BY 1;
SQL
```
Expected: 7 hourly groups (05, 08, 11, 14, 17, 20, 23 EAT), each with 8 loaded, 0 failed.
- [ ] **Step 3: Check staleness view**
```bash
PGPASSWORD=$DWH_RO_PW psql -h 31.97.44.246 -p 5888 -U dwh_ro -d tracksolid_dwh -c \
"SELECT * FROM dwh_control.v_table_freshness;"
```
Expected: no table has `is_stale = true`.
- [ ] **Step 4: Check failures view**
```bash
PGPASSWORD=$DWH_RO_PW psql -h 31.97.44.246 -p 5888 -U dwh_ro -d tracksolid_dwh -c \
"SELECT * FROM dwh_control.v_recent_failures;"
```
Expected: 0 rows.
- [ ] **Step 5: No commit**
---
### Task 25: Write operations runbook `docs/DWH_PIPELINE.md`
**Files:**
- Create: `docs/DWH_PIPELINE.md`
- [ ] **Step 1: Write the runbook**
Sections to include (expand each, no placeholders):
```markdown
# DWH Pipeline Runbook
## What this pipeline does
Moves 8 tables from tracksolid_db (Coolify source) → CSV in rustfs → bronze schema in tracksolid_dwh (31.97.44.246:5888). Runs 7x/day (05,08,11,14,17,20,23 EAT).
## Topology
[reproduce the architecture diagram from the spec]
## Table list and patterns
[8-row table with name + pattern + watermark column + conflict key, copied from spec]
## Where things live
- Source DB: timescale_db:5432 / tracksolid_db (Coolify internal)
- Target DB: 31.97.44.246:5888 / tracksolid_dwh
- Blob storage: rustfs bucket fleet-db, prefixes dwh/exports/ and dwh/processed/
- Workflows: n8n instance on Coolify, names dwh_extract and dwh_load_bronze
- Error workflow: dwh_error_notifier
- Migrations applied (record with date): 260423, 261001, 261002, 261003, 261004
## Credentials
[table of credential names + where password lives — 1Password/Coolify secrets]
## Daily health check (1 minute)
SELECT * FROM dwh_control.v_table_freshness;
SELECT * FROM dwh_control.v_recent_failures;
## Common tasks
### Re-run a failed load
The CSV will still be in dwh/exports/ (move-to-processed only runs on success).
Find the extract_runs row, then manually trigger dwh_load_bronze with its csv_path/run_id.
### Backfill from a specific date
UPDATE dwh_control.extract_watermarks SET last_extracted_at = '<date>' WHERE table_name='<table>';
Then trigger dwh_extract manually. The next run will pull everything since that date.
### Add a new table
1. Copy extract branch in dwh_extract (snapshot or incremental template).
2. Copy matching load path in dwh_load_bronze.
3. Seed watermark row if incremental.
4. Smoke test end-to-end.
### Resolve a persistent failure
1. Check dwh_control.v_recent_failures for error_message.
2. Fix the underlying issue (credentials, schema drift, etc.).
3. Manually trigger dwh_extract — retries pick up from the unchanged watermark.
## What NOT to do
- Do not TRUNCATE bronze.* in production without resetting watermarks first — extract will miss the gap.
- Do not delete CSVs from dwh/processed/ — that's the audit trail (30-day retention window is configured).
- Do not grant direct write access to bronze.* to anyone other than dwh_owner.
```
- [ ] **Step 2: Commit**
```bash
git add docs/DWH_PIPELINE.md
git commit -m "docs: add DWH pipeline operations runbook"
```
---
### Task 26: Update `CLAUDE.md`
**Files:**
- Modify: `CLAUDE.md` §3, §4, §5, §10
- [ ] **Step 1: §3 Instance & Connection Parameters — append the target DB**
Add after the existing DB name/user/schemas lines:
```
- **DWH target DB:** `tracksolid_dwh` at `31.97.44.246:5888` (separate PostGIS server). Writes by `dwh_owner`, reads by `dwh_ro`. Schemas: `bronze`, `silver`, `gold`, `dwh_control`. See `docs/DWH_PIPELINE.md`.
```
- [ ] **Step 2: §4 Codebase Map — add new files**
Insert under the existing listing:
```
dwh/261001_dwh_control.sql # Watermark + run log schema (261002 constraints audit, 261003 roles, 261004 obs views)
n8n-workflows/dwh_extract.json # Workflow 1: scheduled extract → CSV → rustfs
n8n-workflows/dwh_load_bronze.json # Workflow 2: rustfs CSV → bronze upsert
n8n-workflows/dwh_error_notifier.json # Shared error-workflow for the DWH pipeline
docs/DWH_PIPELINE.md # Operations runbook
```
- [ ] **Step 3: §5 Database Schema — add bronze + dwh_control tables**
Append:
```
bronze.devices, bronze.position_history, bronze.trips, bronze.alarms,
bronze.live_positions, bronze.parking_events, bronze.device_events,
bronze.ingestion_log -- Replicated from tracksolid.* via n8n DWH pipeline (7x/day)
dwh_control.extract_watermarks -- Per-table high-water mark for incremental extracts
dwh_control.extract_runs -- Per-run audit log (status, row counts, errors)
dwh_control.v_table_freshness -- Grafana: per-table lag
dwh_control.v_recent_failures -- Grafana: 24h failure list
```
- [ ] **Step 4: §10 Open Items — remove the DWH bronze item**
Strike/delete any line referencing the unpopulated DWH (the "run nightly ETL" line stays, that's a separate gold-layer concern).
- [ ] **Step 5: Commit**
```bash
git add CLAUDE.md
git commit -m "docs(CLAUDE): add DWH pipeline to connections, codebase map, schema, and open items"
```
---
### Task 27: Final PR
- [ ] **Step 1: Push branch**
```bash
git push -u origin quality-program-2026-04-12
```
- [ ] **Step 2: Open PR against `main`**
```bash
gh pr create --title "feat(dwh): n8n-based bronze layer extract pipeline" --body "$(cat <<'EOF'
## Summary
- Adds the first layer of the medallion-architecture DWH: 8 tables replicated from `tracksolid_db` to `tracksolid_dwh.bronze` via rustfs CSV.
- Two n8n workflows (`dwh_extract` scheduled 7x/day, `dwh_load_bronze` triggered per table) plus a shared error-notifier.
- Control schema `dwh_control` tracks watermarks and per-run audit log; observability views expose freshness and failures to Grafana.
- Hardened credentials: scoped `dwh_owner` (write) and `dwh_ro` (read) roles replace the superuser-over-public-IP trial.
## Test plan
- [x] Phase A: bronze DDL + control schema + roles applied and verified
- [x] Phase D: Workflow 2 load paths tested end-to-end per table with smoke CSVs
- [x] Phase E: Workflow 1 extract branches tested end-to-end per table
- [x] Task 20: full-pipeline parity check across all 8 tables
- [x] Task 23: cron enabled and first scheduled run succeeded
- [x] Task 24: 24h steady-state (7 runs × 8 tables = 56 successful loads, 0 failures)
Design spec: `docs/superpowers/specs/2026-04-24-n8n-dwh-bronze-pipeline-design.md`
Runbook: `docs/DWH_PIPELINE.md`
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
EOF
)"
```
- [ ] **Step 2: Return PR URL**
---
## Self-Review Summary
**Spec coverage check:**
- ✅ Architecture (two workflows + rustfs transit) → Phases D + E
- ✅ 8 tables (2 snapshot + 6 incremental) → Tasks 11, 13 (load) + 15, 17 (extract)
- ✅ PostGIS round-trip → Task 12 (load side proved), Task 16 (extract side proved)
- ✅ Watermark discipline (DB insert ts, closed upper bound) → Task 16 Step 2
- ✅ Idempotent retry (ON CONFLICT DO NOTHING) → Tasks 12, 13
-`dwh_control` schema → Task 2
- ✅ Scoped roles (dwh_owner + dwh_ro) + SSL → Tasks 3, 7
- ✅ 7x/day cron → Task 14
- ✅ Error handling (failed status + notifier) → Tasks 18, 21
- ✅ CSV audit trail (exports → processed) → Task 19
- ✅ Observability views → Task 22
- ✅ 24h steady-state gate → Task 24
- ✅ Runbook → Task 25
- ✅ CLAUDE.md updates → Task 26
**Placeholder scan:** no "TBD", no "add error handling" without code, no "similar to earlier" — each sub-task in Task 13/17 includes the key query shape plus an explicit test+commit step.
**Type consistency:** `run_id` BIGSERIAL throughout; `table_name` TEXT; watermark column names match the source schema verified in the design spec. CSV column names (`geom_ewkt`) consistent between extract SELECT and load INSERT.
No gaps found.
---
## Execution Handoff
Plan complete and saved to `docs/superpowers/plans/2026-04-24-n8n-dwh-bronze-pipeline.md`. Two execution options:
**1. Subagent-Driven (recommended)** — I dispatch a fresh subagent per task, review between tasks, fast iteration. Well-suited here because many tasks involve live DB operations that benefit from a clean review gate.
**2. Inline Execution** — Execute tasks in this session using executing-plans, batch execution with checkpoints.
**Which approach?**