tracksolid_timescale_grafan.../docs/superpowers/specs/2026-04-24-n8n-dwh-bronze-pipeline-design.md
David Kiania 34f5fa1b9c feat(dwh): bronze pipeline migrations, runbook, and execution manual
DWH pipeline (new):
  - dwh/261001_dwh_control.sql — watermarks + per-run audit log schema
  - dwh/261002_bronze_constraints_audit.sql — ON CONFLICT key assertion
  - dwh/261003_dwh_roles.sql — dwh_owner / grafana_ro contract assertion
  - dwh/261004_dwh_observability_views.sql — v_table_freshness,
    v_recent_failures, v_watermark_lag (readable by grafana_ro)
  - docs/DWH_PIPELINE.md — operations runbook (setup, troubleshooting,
    manual re-run, back-fill, rotation)
  - DWH_Execution_Manual.md — reusable playbook for future data
    projects (extract → blob → load pattern, 7 design principles,
    snapshot-vs-incremental matrix, verification gates)
  - docs/superpowers/{specs,plans}/2026-04-24-n8n-dwh-bronze-pipeline-*
    — design spec + 27-task implementation plan

Security:
  - dwh/260423_dwh_ddl_v1.sql — redacted plaintext role passwords to
    'CHANGE_ME_BEFORE_APPLY' placeholders; added SECURITY header
    documenting generation + rotation flow

Docs:
  - CLAUDE.md — §3 adds tracksolid_dwh@31.97.44.246:5888 target,
    §4 adds dwh/ + docs/DWH_PIPELINE.md to codebase map, §5 adds
    bronze + dwh_control schema roll-up, §10 adds deploy task +
    password rotation follow-up

Also includes miscellaneous in-progress files accumulated on this
branch (workspace, analytics notes, vehicle CSVs, extract helpers,
renamed markdown archives).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-25 01:07:53 +03:00

16 KiB
Raw Permalink Blame History

n8n DWH Bronze Layer Pipeline — Design & Plan

Date: 2026-04-24 Status: Awaiting approval Repo: /Users/davidkiania/Downloads/55_ts_coolify_gemini_prod


Context

Fireside's Tracksolid fleet pipeline currently ingests telemetry into a single production DB (tracksolid_db, TimescaleDB/PostGIS on Coolify at stage.rahamafresh.com). There is no downstream data warehouse, so every analytical query hits the live operational DB — risking contention as Grafana panels and ad-hoc analysis scale. A full medallion-architecture bronze DDL exists on disk (dwh/260423_dwh_ddl_v1.sql) but has never been populated.

The user wants to build the first layer of that DWH using n8n (already running on the same Coolify instance, already connected to both source and target DBs). The design has two n8n workflows:

  1. Workflow 1 — Extract: pull tables from the source tracksolid_db (Coolify-hosted TimescaleDB, reached via the same internal Docker network n8n is on), write CSVs to rustfs blob storage.
  2. Workflow 2 — Load: pick up those CSVs and upsert into the bronze schema inside tracksolid_dwh (PostGIS) on the separate server 31.97.44.246:5888.

Confirmed connection targets:

  • Source: tracksolid_db on the Coolify stack — n8n connects via internal Docker network (trial confirmed working).
  • Target: tracksolid_dwh at 31.97.44.246:5888 — a separate PostGIS instance. Schemas bronze, silver, gold, plus dwh_control all live in this one database.

The intermediate rustfs CSV layer (a) gives a durable audit trail of every extract, (b) decouples source-DB availability from target-DB availability (a remote-DB outage doesn't lose data — the CSV waits in exports/), and (c) matches how rustfs is already used in the stack (pg_dump backups).


Architecture

  ┌──────────────────────────────────────────────────┐
  │             n8n  (Coolify instance)              │
  │                                                  │
  │   Workflow 1: dwh_extract                        │
  │   Schedule: cron 0 5,8,11,14,17,20,23 * * *      │
  │             (Africa/Nairobi, 7 runs/day)         │
  │   Steps per table:                               │
  │     1. Read watermark from target control table  │
  │     2. Query source with watermark bounds        │
  │     3. Render rows as CSV                        │
  │     4. Upload CSV to rustfs                      │
  │     5. Insert row into dwh_control.extract_runs  │
  │        (status='uploaded')                       │
  │     6. Execute Workflow 2 for this CSV           │
  │                                                  │
  │   Workflow 2: dwh_load_bronze                    │
  │   Trigger: Execute Workflow (from Workflow 1)    │
  │   Input: { table, csv_path, run_id,              │
  │            run_started_at }                      │
  │   Steps:                                         │
  │     1. Download CSV from rustfs                  │
  │     2. Parse CSV                                 │
  │     3. BEGIN                                     │
  │          INSERT ... ON CONFLICT DO NOTHING       │
  │          UPDATE extract_watermarks               │
  │          UPDATE extract_runs SET status='loaded' │
  │        COMMIT                                    │
  │     4. Move CSV: dwh/exports/ → dwh/processed/   │
  └──────────────────────────────────────────────────┘
         │                    │                    │
         ▼                    ▼                    ▼
   tracksolid_db       rustfs (fleet-db)    tracksolid_dwh (PostGIS)
   (Coolify internal)  /dwh/exports/        31.97.44.246:5888
                       /dwh/processed/      dwh_control.extract_watermarks
                                            dwh_control.extract_runs
                                            bronze.devices
                                            bronze.position_history
                                            bronze.trips
                                            bronze.alarms
                                            bronze.parking_events
                                            bronze.device_events
                                            bronze.live_positions
                                            bronze.ingestion_log

Rustfs path convention:

  • Active export: s3://fleet-db/dwh/exports/{table}/{YYYYMMDD_HHMM}_EAT.csv
  • After successful load: moved to s3://fleet-db/dwh/processed/{table}/{YYYYMMDD_HHMM}_EAT.csv
  • Never deleted — this is the audit trail.

Table-by-Table Extraction Strategy

Snapshot tables (TRUNCATE + full reload every run)

Small state-based tables where "current state" matters, not history.

Source table Rows Bronze target
tracksolid.devices 63 bronze.devices
tracksolid.live_positions 19 bronze.live_positions

Load pattern:

BEGIN;
  TRUNCATE bronze.devices;
  INSERT INTO bronze.devices (...) VALUES (...);
  UPDATE dwh_control.extract_watermarks SET last_loaded_at = NOW() WHERE table_name='devices';
COMMIT;

Incremental tables (watermark + append-with-dedup)

Append-only event/history tables. Watermark is the DB insertion timestamp, not the device-reported timestamp, so out-of-order device clocks / delayed pushes can't cause silent data loss.

Source table Watermark column Natural unique key (exists in source) Bronze conflict target
tracksolid.position_history recorded_at (imei, gps_time) (imei, gps_time)
tracksolid.trips updated_at (imei, start_time) id
tracksolid.alarms updated_at (imei, alarm_type, alarm_time) id
tracksolid.parking_events updated_at (imei, start_time, event_type) id
tracksolid.device_events created_at (imei, event_type, event_time) id
tracksolid.ingestion_log run_at PK id id

Extract pattern (closed upper bound to avoid boundary drift):

SELECT <cols>, ST_AsEWKT(geom) AS geom_ewkt
FROM tracksolid.position_history
WHERE recorded_at >  :last_extracted_at
  AND recorded_at <= :run_started_at
ORDER BY recorded_at;

Load pattern (idempotent):

BEGIN;
  INSERT INTO bronze.position_history (imei, gps_time, geom, lat, lng, ...)
  SELECT imei, gps_time, ST_GeomFromEWKT(geom_ewkt), lat, lng, ...
  FROM csv_stage
  ON CONFLICT (imei, gps_time) DO NOTHING;

  UPDATE dwh_control.extract_watermarks
     SET last_extracted_at = :run_started_at,
         last_loaded_at    = NOW(),
         rows_loaded_last_run = <count>
   WHERE table_name = 'position_history';

  UPDATE dwh_control.extract_runs
     SET status = 'loaded', run_finished_at = NOW(), rows_loaded = <count>
   WHERE run_id = :run_id;
COMMIT;

First-run behaviour

extract_watermarks seeded with last_extracted_at = '2026-01-01T00:00:00Z' so the first run back-fills all historical data in a single CSV per table.

Skipped for now (no data, webhooks pending)

obd_readings, fault_codes, fuel_readings, temperature_readings, lbs_readings, heartbeats — add later by copying the incremental pattern and seeding a watermark row.


PostGIS Geometry Handling

Six source tables have geometry(Point, 4326) columns: live_positions, position_history, trips (start+end), parking_events, alarms.

  • Extract: ST_AsEWKT(geom) AS geom_ewkt — preserves SRID inline (SRID=4326;POINT(...))
  • Load: ST_GeomFromEWKT(csv.geom_ewkt) — no separate SRID step, no loss on round-trip
  • NULL safety: CASE WHEN geom IS NULL THEN NULL ELSE ST_AsEWKT(geom) END

Control Tables (to add to tracksolid_dwh)

New migration file: dwh/261001_dwh_control.sql — applied once to tracksolid_dwh@31.97.44.246:5888.

CREATE SCHEMA IF NOT EXISTS dwh_control;

CREATE TABLE dwh_control.extract_watermarks (
  table_name           TEXT PRIMARY KEY,
  last_extracted_at    TIMESTAMPTZ NOT NULL DEFAULT '2026-01-01T00:00:00Z',
  last_loaded_at       TIMESTAMPTZ,
  rows_loaded_last_run INT,
  updated_at           TIMESTAMPTZ DEFAULT NOW()
);

CREATE TABLE dwh_control.extract_runs (
  run_id           BIGSERIAL PRIMARY KEY,
  table_name       TEXT NOT NULL,
  run_started_at   TIMESTAMPTZ NOT NULL,
  run_finished_at  TIMESTAMPTZ,
  rows_extracted   INT,
  rows_loaded      INT,
  csv_path         TEXT,
  status           TEXT CHECK (status IN ('extracting','uploaded','loading','loaded','failed')),
  error_message    TEXT
);

CREATE INDEX idx_extract_runs_table_time ON dwh_control.extract_runs (table_name, run_started_at DESC);
CREATE INDEX idx_extract_runs_status_time ON dwh_control.extract_runs (status, run_finished_at DESC);

-- Seed one row per incremental table
INSERT INTO dwh_control.extract_watermarks (table_name) VALUES
  ('position_history'), ('trips'), ('alarms'),
  ('parking_events'),  ('device_events'), ('ingestion_log');

Scheduling

  • Cron: 0 5,8,11,14,17,20,23 * * * with TZ Africa/Nairobi (set in n8n schedule node).
  • 7 runs/day: 05:00, 08:00, 11:00, 14:00, 17:00, 20:00, 23:00 EAT.
  • Fits the 68/day requirement with even 3-hour gaps in daytime and a silent overnight window (23:00 → 05:00 = 6h) which is fine because device traffic is minimal after hours.
  • First run of each day (05:00) will carry the overnight backlog — this is the expected behaviour of the watermark design.

Error Handling & Observability

Per-table isolation

Workflow 1 iterates tables in sequence; a failure on one table does not block others. Every table's result (success or failure) is logged to dwh_control.extract_runs.

Retryable failures

If Workflow 2 fails mid-load: transaction rolls back → watermark stays → CSV stays in exports/ → next scheduled run re-processes it (natural retry).

Alerting (Grafana panels on tracksolid_dwh, read via dwh_ro role — see below)

  • Freshness: SELECT table_name, NOW() - MAX(run_finished_at) AS lag FROM dwh_control.extract_runs WHERE status='loaded' GROUP BY 1 HAVING NOW() - MAX(run_finished_at) > INTERVAL '4 hours';
  • Failures in last hour: SELECT * FROM dwh_control.extract_runs WHERE status='failed' AND run_started_at > NOW() - INTERVAL '1 hour';
  • Row count sanity: rows_extracted != rows_loaded flags CSV parse or load issues.

n8n-level error workflow

Attach an "Error Workflow" in both n8n workflows that posts to a webhook (existing pattern in n8n-workflows/) for immediate notification.


Security & Credentials

Both DB credentials already exist in n8n (connections trialled and working). The required credential shapes are:

n8n credential Host / Port / DB Recommended user Usage
tracksolid_source Coolify internal timescale_db:5432 → DB tracksolid_db grafana_ro (read-only) Source extract queries
tracksolid_dwh_target 31.97.44.246:5888 → DB tracksolid_dwh dwh_owner (scoped) Bronze writes + control-table updates
rustfs_s3 ${RUSTFS_ENDPOINT} ${RUSTFS_ACCESS_KEY} CSV upload/download/move

Credential-hardening recommendations (current state vs target state)

The trial connection string uses postgres (superuser) over a public IP. Two hardening steps to take before production:

  1. Create a scoped dwh_owner role on tracksolid_dwh — owns only bronze + dwh_control schemas, cannot touch other DBs or cluster roles. n8n's tracksolid_dwh_target credential switches to this user.
  2. Create a dwh_ro role for Grafana panels — read-only across bronze + dwh_control. This is what the freshness/failure dashboards in §Error Handling use.
  3. Enforce sslmode=require on the tracksolid_dwh_target connection string (public-IP hop, cleartext otherwise).
  4. Rotate the postgres password that was shared in chat history — one-off cleanup, not a plan blocker.

All four are one-migration-file tasks and fit naturally into the dwh/261001_dwh_control.sql setup step.


Files to Create / Modify

Path Action Purpose
dwh/261001_dwh_control.sql new Control-schema migration (watermarks + run log)
dwh/260423_dwh_ddl_v1.sql review Confirm bronze tables have matching unique constraints; patch if missing
n8n-workflows/dwh_extract.json new Workflow 1 export
n8n-workflows/dwh_load_bronze.json new Workflow 2 export
docs/DWH_PIPELINE.md new Operations runbook (see verification section)
CLAUDE.md §3, §4, §5, §10 update Add tracksolid_dwh@31.97.44.246:5888 to §3 Connection Params; add bronze schema + n8n DWH workflows to codebase map; remove DWH item from Open Items

Existing utilities to reuse (do NOT reinvent):

  • Rustfs env vars already wired in docker-compose.yaml (RUSTFS_ENDPOINT, RUSTFS_ACCESS_KEY, RUSTFS_SECRET_KEY, RUSTFS_BUCKET) — Workflow nodes read from the same .env.
  • Backup rustfs client logic in backup/backup_db.sh is the reference pattern for S3 auth shape.
  • Existing n8n workflow pattern in n8n-workflows/jimi_pushgps.json et al. for webhook trigger + HTTP-forward shape.

Verification

Pre-deployment checks (before first cron trigger)

  1. Bronze DDL applied: psql -h 31.97.44.246 -p 5888 -U dwh_owner -d tracksolid_dwh -c "\dt bronze.*" lists 16 tables.
  2. Control schema applied: same connection, \dt dwh_control.* lists extract_watermarks, extract_runs.
  3. Watermarks seeded: SELECT * FROM dwh_control.extract_watermarks; returns 6 rows, all with last_extracted_at = 2026-01-01.
  4. Roles created: \du lists dwh_owner and dwh_ro; postgres superuser no longer used for n8n.
  5. n8n credentials: Test each credential individually in n8n UI — all three connect successfully (source via internal network, target via 31.97.44.246:5888 with sslmode=require).
  6. Rustfs path exists: aws --endpoint ${RUSTFS_ENDPOINT} s3 ls s3://fleet-db/dwh/ — if missing, create exports/ and processed/ prefixes.

First-run verification (manually trigger Workflow 1)

  1. SELECT * FROM dwh_control.extract_runs ORDER BY run_id DESC LIMIT 20; — 8 rows (one per table processed), all status='loaded'.
  2. SELECT table_name, rows_loaded_last_run FROM dwh_control.extract_watermarks; — non-zero for all incremental tables that have source data.
  3. Row-count parity:
    -- on source (tracksolid_db, Coolify internal)
    SELECT COUNT(*) FROM tracksolid.position_history;
    -- on target (tracksolid_dwh @ 31.97.44.246:5888)
    SELECT COUNT(*) FROM bronze.position_history;
    
    Numbers should match ± rows inserted in the narrow window between the two queries.
  4. Geometry round-trip check:
    SELECT ST_AsText(geom) FROM bronze.position_history LIMIT 5;
    -- should return valid POINT(lng lat) values, not NULL or garbage
    
  5. Rustfs audit: aws s3 ls s3://fleet-db/dwh/processed/ — 8 CSV files present (one per table), originals no longer in exports/.

Steady-state verification (after 24h / 7 runs)

  1. SELECT table_name, NOW() - MAX(run_finished_at) FROM dwh_control.extract_runs WHERE status='loaded' GROUP BY 1; — max lag < 3h 15min for every table.
  2. SELECT COUNT(*) FROM dwh_control.extract_runs WHERE status='failed'; — zero.
  3. Grafana dashboard (to be added in a follow-up plan) shows freshness and row counts per table.

Out of Scope (follow-up work)

  • Silver/gold layer transformations (the DWH DDL defines schemas but no queries yet).
  • Bronze schema evolution tooling (manual migrations are acceptable for one pipeline).
  • Backfill of tables where webhooks aren't yet registered (OBD, fuel, temperature, LBS).
  • Grafana dashboard panels for the DWH — worth its own spec once we have a week of data to design around.

Open Questions (none blocking)

All design decisions resolved in the brainstorming session. Confirmed:

  • Source: tracksolid_db on Coolify, reached via internal Docker network.
  • Target: tracksolid_dwh at 31.97.44.246:5888 (public IP), schemas bronze/silver/gold + dwh_control.
  • Trial connections already working in n8n.

If any endpoint/credential changes during implementation, those are n8n-credential updates only — no design change.