DWH pipeline (new):
- dwh/261001_dwh_control.sql — watermarks + per-run audit log schema
- dwh/261002_bronze_constraints_audit.sql — ON CONFLICT key assertion
- dwh/261003_dwh_roles.sql — dwh_owner / grafana_ro contract assertion
- dwh/261004_dwh_observability_views.sql — v_table_freshness,
v_recent_failures, v_watermark_lag (readable by grafana_ro)
- docs/DWH_PIPELINE.md — operations runbook (setup, troubleshooting,
manual re-run, back-fill, rotation)
- DWH_Execution_Manual.md — reusable playbook for future data
projects (extract → blob → load pattern, 7 design principles,
snapshot-vs-incremental matrix, verification gates)
- docs/superpowers/{specs,plans}/2026-04-24-n8n-dwh-bronze-pipeline-*
— design spec + 27-task implementation plan
Security:
- dwh/260423_dwh_ddl_v1.sql — redacted plaintext role passwords to
'CHANGE_ME_BEFORE_APPLY' placeholders; added SECURITY header
documenting generation + rotation flow
Docs:
- CLAUDE.md — §3 adds tracksolid_dwh@31.97.44.246:5888 target,
§4 adds dwh/ + docs/DWH_PIPELINE.md to codebase map, §5 adds
bronze + dwh_control schema roll-up, §10 adds deploy task +
password rotation follow-up
Also includes miscellaneous in-progress files accumulated on this
branch (workspace, analytics notes, vehicle CSVs, extract helpers,
renamed markdown archives).
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
320 lines
16 KiB
Markdown
320 lines
16 KiB
Markdown
# n8n DWH Bronze Layer Pipeline — Design & Plan
|
||
|
||
**Date:** 2026-04-24
|
||
**Status:** Awaiting approval
|
||
**Repo:** `/Users/davidkiania/Downloads/55_ts_coolify_gemini_prod`
|
||
|
||
---
|
||
|
||
## Context
|
||
|
||
Fireside's Tracksolid fleet pipeline currently ingests telemetry into a single production DB (`tracksolid_db`, TimescaleDB/PostGIS on Coolify at `stage.rahamafresh.com`). There is no downstream data warehouse, so every analytical query hits the live operational DB — risking contention as Grafana panels and ad-hoc analysis scale. A full medallion-architecture bronze DDL exists on disk (`dwh/260423_dwh_ddl_v1.sql`) but has never been populated.
|
||
|
||
The user wants to build the **first layer of that DWH** using n8n (already running on the same Coolify instance, already connected to both source and target DBs). The design has two n8n workflows:
|
||
|
||
1. **Workflow 1 — Extract**: pull tables from the source `tracksolid_db` (Coolify-hosted TimescaleDB, reached via the same internal Docker network n8n is on), write CSVs to rustfs blob storage.
|
||
2. **Workflow 2 — Load**: pick up those CSVs and upsert into the bronze schema inside `tracksolid_dwh` (PostGIS) on the separate server `31.97.44.246:5888`.
|
||
|
||
**Confirmed connection targets:**
|
||
- **Source:** `tracksolid_db` on the Coolify stack — n8n connects via internal Docker network (trial confirmed working).
|
||
- **Target:** `tracksolid_dwh` at `31.97.44.246:5888` — a separate PostGIS instance. Schemas `bronze`, `silver`, `gold`, plus `dwh_control` all live in this one database.
|
||
|
||
The intermediate rustfs CSV layer (a) gives a durable audit trail of every extract, (b) decouples source-DB availability from target-DB availability (a remote-DB outage doesn't lose data — the CSV waits in `exports/`), and (c) matches how rustfs is already used in the stack (pg_dump backups).
|
||
|
||
---
|
||
|
||
## Architecture
|
||
|
||
```
|
||
┌──────────────────────────────────────────────────┐
|
||
│ n8n (Coolify instance) │
|
||
│ │
|
||
│ Workflow 1: dwh_extract │
|
||
│ Schedule: cron 0 5,8,11,14,17,20,23 * * * │
|
||
│ (Africa/Nairobi, 7 runs/day) │
|
||
│ Steps per table: │
|
||
│ 1. Read watermark from target control table │
|
||
│ 2. Query source with watermark bounds │
|
||
│ 3. Render rows as CSV │
|
||
│ 4. Upload CSV to rustfs │
|
||
│ 5. Insert row into dwh_control.extract_runs │
|
||
│ (status='uploaded') │
|
||
│ 6. Execute Workflow 2 for this CSV │
|
||
│ │
|
||
│ Workflow 2: dwh_load_bronze │
|
||
│ Trigger: Execute Workflow (from Workflow 1) │
|
||
│ Input: { table, csv_path, run_id, │
|
||
│ run_started_at } │
|
||
│ Steps: │
|
||
│ 1. Download CSV from rustfs │
|
||
│ 2. Parse CSV │
|
||
│ 3. BEGIN │
|
||
│ INSERT ... ON CONFLICT DO NOTHING │
|
||
│ UPDATE extract_watermarks │
|
||
│ UPDATE extract_runs SET status='loaded' │
|
||
│ COMMIT │
|
||
│ 4. Move CSV: dwh/exports/ → dwh/processed/ │
|
||
└──────────────────────────────────────────────────┘
|
||
│ │ │
|
||
▼ ▼ ▼
|
||
tracksolid_db rustfs (fleet-db) tracksolid_dwh (PostGIS)
|
||
(Coolify internal) /dwh/exports/ 31.97.44.246:5888
|
||
/dwh/processed/ dwh_control.extract_watermarks
|
||
dwh_control.extract_runs
|
||
bronze.devices
|
||
bronze.position_history
|
||
bronze.trips
|
||
bronze.alarms
|
||
bronze.parking_events
|
||
bronze.device_events
|
||
bronze.live_positions
|
||
bronze.ingestion_log
|
||
```
|
||
|
||
**Rustfs path convention:**
|
||
- Active export: `s3://fleet-db/dwh/exports/{table}/{YYYYMMDD_HHMM}_EAT.csv`
|
||
- After successful load: moved to `s3://fleet-db/dwh/processed/{table}/{YYYYMMDD_HHMM}_EAT.csv`
|
||
- Never deleted — this is the audit trail.
|
||
|
||
---
|
||
|
||
## Table-by-Table Extraction Strategy
|
||
|
||
### Snapshot tables (TRUNCATE + full reload every run)
|
||
|
||
Small state-based tables where "current state" matters, not history.
|
||
|
||
| Source table | Rows | Bronze target |
|
||
|---|---|---|
|
||
| `tracksolid.devices` | 63 | `bronze.devices` |
|
||
| `tracksolid.live_positions` | 19 | `bronze.live_positions` |
|
||
|
||
**Load pattern:**
|
||
```sql
|
||
BEGIN;
|
||
TRUNCATE bronze.devices;
|
||
INSERT INTO bronze.devices (...) VALUES (...);
|
||
UPDATE dwh_control.extract_watermarks SET last_loaded_at = NOW() WHERE table_name='devices';
|
||
COMMIT;
|
||
```
|
||
|
||
### Incremental tables (watermark + append-with-dedup)
|
||
|
||
Append-only event/history tables. Watermark is the **DB insertion timestamp**, not the device-reported timestamp, so out-of-order device clocks / delayed pushes can't cause silent data loss.
|
||
|
||
| Source table | Watermark column | Natural unique key (exists in source) | Bronze conflict target |
|
||
|---|---|---|---|
|
||
| `tracksolid.position_history` | `recorded_at` | `(imei, gps_time)` | `(imei, gps_time)` |
|
||
| `tracksolid.trips` | `updated_at` | `(imei, start_time)` | `id` |
|
||
| `tracksolid.alarms` | `updated_at` | `(imei, alarm_type, alarm_time)` | `id` |
|
||
| `tracksolid.parking_events` | `updated_at` | `(imei, start_time, event_type)` | `id` |
|
||
| `tracksolid.device_events` | `created_at` | `(imei, event_type, event_time)` | `id` |
|
||
| `tracksolid.ingestion_log` | `run_at` | PK `id` | `id` |
|
||
|
||
**Extract pattern (closed upper bound to avoid boundary drift):**
|
||
```sql
|
||
SELECT <cols>, ST_AsEWKT(geom) AS geom_ewkt
|
||
FROM tracksolid.position_history
|
||
WHERE recorded_at > :last_extracted_at
|
||
AND recorded_at <= :run_started_at
|
||
ORDER BY recorded_at;
|
||
```
|
||
|
||
**Load pattern (idempotent):**
|
||
```sql
|
||
BEGIN;
|
||
INSERT INTO bronze.position_history (imei, gps_time, geom, lat, lng, ...)
|
||
SELECT imei, gps_time, ST_GeomFromEWKT(geom_ewkt), lat, lng, ...
|
||
FROM csv_stage
|
||
ON CONFLICT (imei, gps_time) DO NOTHING;
|
||
|
||
UPDATE dwh_control.extract_watermarks
|
||
SET last_extracted_at = :run_started_at,
|
||
last_loaded_at = NOW(),
|
||
rows_loaded_last_run = <count>
|
||
WHERE table_name = 'position_history';
|
||
|
||
UPDATE dwh_control.extract_runs
|
||
SET status = 'loaded', run_finished_at = NOW(), rows_loaded = <count>
|
||
WHERE run_id = :run_id;
|
||
COMMIT;
|
||
```
|
||
|
||
### First-run behaviour
|
||
|
||
`extract_watermarks` seeded with `last_extracted_at = '2026-01-01T00:00:00Z'` so the first run back-fills all historical data in a single CSV per table.
|
||
|
||
### Skipped for now (no data, webhooks pending)
|
||
|
||
`obd_readings`, `fault_codes`, `fuel_readings`, `temperature_readings`, `lbs_readings`, `heartbeats` — add later by copying the incremental pattern and seeding a watermark row.
|
||
|
||
---
|
||
|
||
## PostGIS Geometry Handling
|
||
|
||
Six source tables have `geometry(Point, 4326)` columns: `live_positions`, `position_history`, `trips` (start+end), `parking_events`, `alarms`.
|
||
|
||
- **Extract:** `ST_AsEWKT(geom) AS geom_ewkt` — preserves SRID inline (`SRID=4326;POINT(...)`)
|
||
- **Load:** `ST_GeomFromEWKT(csv.geom_ewkt)` — no separate SRID step, no loss on round-trip
|
||
- **NULL safety:** `CASE WHEN geom IS NULL THEN NULL ELSE ST_AsEWKT(geom) END`
|
||
|
||
---
|
||
|
||
## Control Tables (to add to `tracksolid_dwh`)
|
||
|
||
New migration file: `dwh/261001_dwh_control.sql` — applied once to `tracksolid_dwh@31.97.44.246:5888`.
|
||
|
||
```sql
|
||
CREATE SCHEMA IF NOT EXISTS dwh_control;
|
||
|
||
CREATE TABLE dwh_control.extract_watermarks (
|
||
table_name TEXT PRIMARY KEY,
|
||
last_extracted_at TIMESTAMPTZ NOT NULL DEFAULT '2026-01-01T00:00:00Z',
|
||
last_loaded_at TIMESTAMPTZ,
|
||
rows_loaded_last_run INT,
|
||
updated_at TIMESTAMPTZ DEFAULT NOW()
|
||
);
|
||
|
||
CREATE TABLE dwh_control.extract_runs (
|
||
run_id BIGSERIAL PRIMARY KEY,
|
||
table_name TEXT NOT NULL,
|
||
run_started_at TIMESTAMPTZ NOT NULL,
|
||
run_finished_at TIMESTAMPTZ,
|
||
rows_extracted INT,
|
||
rows_loaded INT,
|
||
csv_path TEXT,
|
||
status TEXT CHECK (status IN ('extracting','uploaded','loading','loaded','failed')),
|
||
error_message TEXT
|
||
);
|
||
|
||
CREATE INDEX idx_extract_runs_table_time ON dwh_control.extract_runs (table_name, run_started_at DESC);
|
||
CREATE INDEX idx_extract_runs_status_time ON dwh_control.extract_runs (status, run_finished_at DESC);
|
||
|
||
-- Seed one row per incremental table
|
||
INSERT INTO dwh_control.extract_watermarks (table_name) VALUES
|
||
('position_history'), ('trips'), ('alarms'),
|
||
('parking_events'), ('device_events'), ('ingestion_log');
|
||
```
|
||
|
||
---
|
||
|
||
## Scheduling
|
||
|
||
- **Cron:** `0 5,8,11,14,17,20,23 * * *` with TZ `Africa/Nairobi` (set in n8n schedule node).
|
||
- **7 runs/day:** 05:00, 08:00, 11:00, 14:00, 17:00, 20:00, 23:00 EAT.
|
||
- **Fits the 6–8/day requirement** with even 3-hour gaps in daytime and a silent overnight window (23:00 → 05:00 = 6h) which is fine because device traffic is minimal after hours.
|
||
- First run of each day (05:00) will carry the overnight backlog — this is the expected behaviour of the watermark design.
|
||
|
||
---
|
||
|
||
## Error Handling & Observability
|
||
|
||
### Per-table isolation
|
||
Workflow 1 iterates tables in sequence; a failure on one table does not block others. Every table's result (success or failure) is logged to `dwh_control.extract_runs`.
|
||
|
||
### Retryable failures
|
||
If Workflow 2 fails mid-load: transaction rolls back → watermark stays → CSV stays in `exports/` → next scheduled run re-processes it (natural retry).
|
||
|
||
### Alerting (Grafana panels on `tracksolid_dwh`, read via `dwh_ro` role — see below)
|
||
- **Freshness:** `SELECT table_name, NOW() - MAX(run_finished_at) AS lag FROM dwh_control.extract_runs WHERE status='loaded' GROUP BY 1 HAVING NOW() - MAX(run_finished_at) > INTERVAL '4 hours';`
|
||
- **Failures in last hour:** `SELECT * FROM dwh_control.extract_runs WHERE status='failed' AND run_started_at > NOW() - INTERVAL '1 hour';`
|
||
- **Row count sanity:** `rows_extracted != rows_loaded` flags CSV parse or load issues.
|
||
|
||
### n8n-level error workflow
|
||
Attach an "Error Workflow" in both n8n workflows that posts to a webhook (existing pattern in `n8n-workflows/`) for immediate notification.
|
||
|
||
---
|
||
|
||
## Security & Credentials
|
||
|
||
Both DB credentials already exist in n8n (connections trialled and working). The required credential shapes are:
|
||
|
||
| n8n credential | Host / Port / DB | Recommended user | Usage |
|
||
|---|---|---|---|
|
||
| `tracksolid_source` | Coolify internal `timescale_db:5432` → DB `tracksolid_db` | `grafana_ro` (read-only) | Source extract queries |
|
||
| `tracksolid_dwh_target` | `31.97.44.246:5888` → DB `tracksolid_dwh` | `dwh_owner` (scoped) | Bronze writes + control-table updates |
|
||
| `rustfs_s3` | `${RUSTFS_ENDPOINT}` | `${RUSTFS_ACCESS_KEY}` | CSV upload/download/move |
|
||
|
||
### Credential-hardening recommendations (current state vs target state)
|
||
|
||
The trial connection string uses `postgres` (superuser) over a public IP. Two hardening steps to take before production:
|
||
|
||
1. **Create a scoped `dwh_owner` role** on `tracksolid_dwh` — owns only `bronze` + `dwh_control` schemas, cannot touch other DBs or cluster roles. n8n's `tracksolid_dwh_target` credential switches to this user.
|
||
2. **Create a `dwh_ro` role** for Grafana panels — read-only across `bronze` + `dwh_control`. This is what the freshness/failure dashboards in §Error Handling use.
|
||
3. **Enforce `sslmode=require`** on the `tracksolid_dwh_target` connection string (public-IP hop, cleartext otherwise).
|
||
4. **Rotate the `postgres` password** that was shared in chat history — one-off cleanup, not a plan blocker.
|
||
|
||
All four are one-migration-file tasks and fit naturally into the `dwh/261001_dwh_control.sql` setup step.
|
||
|
||
---
|
||
|
||
## Files to Create / Modify
|
||
|
||
| Path | Action | Purpose |
|
||
|---|---|---|
|
||
| `dwh/261001_dwh_control.sql` | **new** | Control-schema migration (watermarks + run log) |
|
||
| `dwh/260423_dwh_ddl_v1.sql` | **review** | Confirm bronze tables have matching unique constraints; patch if missing |
|
||
| `n8n-workflows/dwh_extract.json` | **new** | Workflow 1 export |
|
||
| `n8n-workflows/dwh_load_bronze.json` | **new** | Workflow 2 export |
|
||
| `docs/DWH_PIPELINE.md` | **new** | Operations runbook (see verification section) |
|
||
| `CLAUDE.md` §3, §4, §5, §10 | **update** | Add `tracksolid_dwh@31.97.44.246:5888` to §3 Connection Params; add bronze schema + n8n DWH workflows to codebase map; remove DWH item from Open Items |
|
||
|
||
**Existing utilities to reuse (do NOT reinvent):**
|
||
- Rustfs env vars already wired in `docker-compose.yaml` (`RUSTFS_ENDPOINT`, `RUSTFS_ACCESS_KEY`, `RUSTFS_SECRET_KEY`, `RUSTFS_BUCKET`) — Workflow nodes read from the same `.env`.
|
||
- Backup rustfs client logic in `backup/backup_db.sh` is the reference pattern for S3 auth shape.
|
||
- Existing n8n workflow pattern in `n8n-workflows/jimi_pushgps.json` et al. for webhook trigger + HTTP-forward shape.
|
||
|
||
---
|
||
|
||
## Verification
|
||
|
||
### Pre-deployment checks (before first cron trigger)
|
||
1. **Bronze DDL applied:** `psql -h 31.97.44.246 -p 5888 -U dwh_owner -d tracksolid_dwh -c "\dt bronze.*"` lists 16 tables.
|
||
2. **Control schema applied:** same connection, `\dt dwh_control.*` lists `extract_watermarks`, `extract_runs`.
|
||
3. **Watermarks seeded:** `SELECT * FROM dwh_control.extract_watermarks;` returns 6 rows, all with `last_extracted_at = 2026-01-01`.
|
||
4. **Roles created:** `\du` lists `dwh_owner` and `dwh_ro`; `postgres` superuser no longer used for n8n.
|
||
5. **n8n credentials:** Test each credential individually in n8n UI — all three connect successfully (source via internal network, target via `31.97.44.246:5888` with `sslmode=require`).
|
||
6. **Rustfs path exists:** `aws --endpoint ${RUSTFS_ENDPOINT} s3 ls s3://fleet-db/dwh/` — if missing, create `exports/` and `processed/` prefixes.
|
||
|
||
### First-run verification (manually trigger Workflow 1)
|
||
1. `SELECT * FROM dwh_control.extract_runs ORDER BY run_id DESC LIMIT 20;` — 8 rows (one per table processed), all `status='loaded'`.
|
||
2. `SELECT table_name, rows_loaded_last_run FROM dwh_control.extract_watermarks;` — non-zero for all incremental tables that have source data.
|
||
3. Row-count parity:
|
||
```sql
|
||
-- on source (tracksolid_db, Coolify internal)
|
||
SELECT COUNT(*) FROM tracksolid.position_history;
|
||
-- on target (tracksolid_dwh @ 31.97.44.246:5888)
|
||
SELECT COUNT(*) FROM bronze.position_history;
|
||
```
|
||
Numbers should match ± rows inserted in the narrow window between the two queries.
|
||
4. **Geometry round-trip check:**
|
||
```sql
|
||
SELECT ST_AsText(geom) FROM bronze.position_history LIMIT 5;
|
||
-- should return valid POINT(lng lat) values, not NULL or garbage
|
||
```
|
||
5. **Rustfs audit:** `aws s3 ls s3://fleet-db/dwh/processed/` — 8 CSV files present (one per table), originals no longer in `exports/`.
|
||
|
||
### Steady-state verification (after 24h / 7 runs)
|
||
1. `SELECT table_name, NOW() - MAX(run_finished_at) FROM dwh_control.extract_runs WHERE status='loaded' GROUP BY 1;` — max lag < 3h 15min for every table.
|
||
2. `SELECT COUNT(*) FROM dwh_control.extract_runs WHERE status='failed';` — zero.
|
||
3. Grafana dashboard (to be added in a follow-up plan) shows freshness and row counts per table.
|
||
|
||
---
|
||
|
||
## Out of Scope (follow-up work)
|
||
|
||
- Silver/gold layer transformations (the DWH DDL defines schemas but no queries yet).
|
||
- Bronze schema evolution tooling (manual migrations are acceptable for one pipeline).
|
||
- Backfill of tables where webhooks aren't yet registered (OBD, fuel, temperature, LBS).
|
||
- Grafana dashboard panels for the DWH — worth its own spec once we have a week of data to design around.
|
||
|
||
---
|
||
|
||
## Open Questions (none blocking)
|
||
|
||
All design decisions resolved in the brainstorming session. Confirmed:
|
||
- Source: `tracksolid_db` on Coolify, reached via internal Docker network.
|
||
- Target: `tracksolid_dwh` at `31.97.44.246:5888` (public IP), schemas `bronze`/`silver`/`gold` + `dwh_control`.
|
||
- Trial connections already working in n8n.
|
||
|
||
If any endpoint/credential changes during implementation, those are n8n-credential updates only — no design change.
|