DWH pipeline (new):
- dwh/261001_dwh_control.sql — watermarks + per-run audit log schema
- dwh/261002_bronze_constraints_audit.sql — ON CONFLICT key assertion
- dwh/261003_dwh_roles.sql — dwh_owner / grafana_ro contract assertion
- dwh/261004_dwh_observability_views.sql — v_table_freshness,
v_recent_failures, v_watermark_lag (readable by grafana_ro)
- docs/DWH_PIPELINE.md — operations runbook (setup, troubleshooting,
manual re-run, back-fill, rotation)
- DWH_Execution_Manual.md — reusable playbook for future data
projects (extract → blob → load pattern, 7 design principles,
snapshot-vs-incremental matrix, verification gates)
- docs/superpowers/{specs,plans}/2026-04-24-n8n-dwh-bronze-pipeline-*
— design spec + 27-task implementation plan
Security:
- dwh/260423_dwh_ddl_v1.sql — redacted plaintext role passwords to
'CHANGE_ME_BEFORE_APPLY' placeholders; added SECURITY header
documenting generation + rotation flow
Docs:
- CLAUDE.md — §3 adds tracksolid_dwh@31.97.44.246:5888 target,
§4 adds dwh/ + docs/DWH_PIPELINE.md to codebase map, §5 adds
bronze + dwh_control schema roll-up, §10 adds deploy task +
password rotation follow-up
Also includes miscellaneous in-progress files accumulated on this
branch (workspace, analytics notes, vehicle CSVs, extract helpers,
renamed markdown archives).
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
10 KiB
DWH Pipeline — Operations Runbook
Pipeline: n8n extract + load into tracksolid_dwh bronze schema
Design spec: docs/superpowers/specs/2026-04-24-n8n-dwh-bronze-pipeline-design.md
Implementation plan: docs/superpowers/plans/2026-04-24-n8n-dwh-bronze-pipeline.md
1. What This Pipeline Does
Every ~3 hours during active hours (7 runs/day, 05:00–23:00 EAT), n8n extracts 8 tables from the production tracksolid_db (Coolify internal network), writes each as a timestamped CSV to rustfs, then loads each CSV into the bronze schema on tracksolid_dwh (31.97.44.246:5888). Rustfs CSVs are moved to dwh/processed/ after a successful load — never deleted.
Two n8n workflows:
dwh_extract— cron-triggered, iterates tables in sequence, writes CSVs, callsdwh_load_bronzeper table.dwh_load_bronze— triggered per-table bydwh_extract, loads one CSV inside a single transaction (insert → update watermark → update run log → move CSV).
2. Key Locations
| What | Where |
|---|---|
| Source DB | tracksolid_db on Coolify (internal Docker network, timescale_db:5432) |
| Target DB | tracksolid_dwh at 31.97.44.246:5888 |
| Rustfs bucket | fleet-db (same bucket used by pg_dump backups) |
| Active CSVs | s3://fleet-db/dwh/exports/{table}/{YYYYMMDD_HHMM}_EAT.csv |
| Processed CSVs | s3://fleet-db/dwh/processed/{table}/{YYYYMMDD_HHMM}_EAT.csv |
| Control schema | dwh_control in tracksolid_dwh |
| Migrations | dwh/26*.sql applied in numeric order |
3. First-Time Setup
Apply migrations to tracksolid_dwh in numeric order, as a superuser (not dwh_owner):
PSQL="psql 'postgres://postgres:***@31.97.44.246:5888/tracksolid_dwh?sslmode=require'"
$PSQL -f dwh/260423_dwh_ddl_v1.sql # Bronze DDL, roles, schemas
$PSQL -f dwh/261001_dwh_control.sql # Watermarks + run log
$PSQL -f dwh/261002_bronze_constraints_audit.sql # Assertion: ON CONFLICT keys exist
$PSQL -f dwh/261003_dwh_roles.sql # Assertion: roles + grants present
$PSQL -f dwh/261004_dwh_observability_views.sql # Freshness/failure views
Each migration is idempotent. Audit files (261002, 261003) raise an exception with a bullet list of what is missing if the contract is broken — re-apply the relevant predecessor file and try again.
Rustfs prefixes
aws --endpoint ${RUSTFS_ENDPOINT} s3api put-object \
--bucket fleet-db --key dwh/exports/
aws --endpoint ${RUSTFS_ENDPOINT} s3api put-object \
--bucket fleet-db --key dwh/processed/
n8n credentials
Three credentials, all configured in the n8n UI before importing workflows:
| Credential | Target | User | Notes |
|---|---|---|---|
tracksolid_source |
Coolify internal → tracksolid_db |
grafana_ro |
Read-only; no sslmode needed on internal network |
tracksolid_dwh_target |
31.97.44.246:5888 → tracksolid_dwh |
dwh_owner |
Must set sslmode=require — public IP |
rustfs_s3 |
${RUSTFS_ENDPOINT} |
${RUSTFS_ACCESS_KEY} |
Same creds as pg_dump backup sidecar |
Test each credential via the n8n "Test connection" button before enabling the cron schedule.
4. Schedule
n8n Schedule node, Africa/Nairobi TZ: 0 5,8,11,14,17,20,23 * * *
- 7 runs/day at 05:00, 08:00, 11:00, 14:00, 17:00, 20:00, 23:00 EAT
- Overnight gap (23:00 → 05:00 = 6h) by design — device traffic minimal
- First-of-day run carries the overnight backlog (watermark picks up where 23:00 left off)
5. What Each Table Does on Every Run
Snapshot tables (TRUNCATE + full reload)
bronze.devices, bronze.live_positions — small state tables, "current state" semantics. Full replace every run.
Incremental tables (watermark + append-with-dedup)
| Bronze table | Source watermark column | ON CONFLICT target |
|---|---|---|
position_history |
recorded_at (DB insertion time) |
(imei, gps_time) |
trips |
updated_at |
id |
alarms |
updated_at |
id |
parking_events |
updated_at |
id |
device_events |
created_at |
id |
ingestion_log |
run_at |
id |
Watermark bounds are closed upper: WHERE <col> > last_extracted_at AND <col> <= :run_started_at.
Schema drift to handle in extract SQL
trips.distance_m→bronze.trips.distance_km: source stores metres, bronze expects km. Extract SQL:SELECT ..., distance_m/1000.0 AS distance_km, .... Cross-reference: FIX-M16 inCLAUDE.md.
PostGIS geometry round-trip
All six geometry columns (live_positions, position_history, trips.start_geom, trips.end_geom, parking_events, alarms) use EWKT serialisation:
-- Extract
SELECT ..., CASE WHEN geom IS NULL THEN NULL ELSE ST_AsEWKT(geom) END AS geom_ewkt FROM ...;
-- Load
INSERT INTO bronze... (..., geom) VALUES (..., ST_GeomFromEWKT(:geom_ewkt));
SRID 4326 is preserved inline; no separate SRID step required.
6. Verifying a Healthy Run
Immediate sanity checks (after any scheduled run)
-- Any failures in the last hour?
SELECT * FROM dwh_control.v_recent_failures WHERE run_started_at > NOW() - INTERVAL '1 hour';
-- All tables loaded in last 4h?
SELECT * FROM dwh_control.v_table_freshness WHERE lag > INTERVAL '4 hours';
-- Watermarks advancing?
SELECT * FROM dwh_control.v_watermark_lag ORDER BY extract_lag DESC;
Row-count parity (spot check weekly)
-- Source
SELECT COUNT(*) FROM tracksolid.position_history;
-- Target
SELECT COUNT(*) FROM bronze.position_history;
Numbers should match ± rows inserted between the two queries. Persistent gap > 1% → investigate CSV parse errors or a dropped batch.
Geometry round-trip
SELECT ST_AsText(geom) FROM bronze.position_history WHERE geom IS NOT NULL LIMIT 5;
-- Should return valid POINT(lng lat), not NULL or garbage.
7. Troubleshooting
"A table is stale (v_table_freshness shows lag > 4h)"
- Check
v_recent_failuresfor that table. If a row exists, readerror_message. - If
status='loading'inextract_runsfor that table, a load is in progress — wait for the next cron tick. If it staysloadingacross two ticks, the n8n executor crashed mid-transaction; the DB rolled back, and the next run will retry naturally. - If no failure row and no in-progress row, the extract workflow never fired — check n8n execution logs for the cron trigger.
"A CSV is stuck in dwh/exports/"
The load failed after upload. The next scheduled run will re-process it (the watermark did not advance, so the extract SQL returns the same window). Safe to leave. If multiple days of CSVs pile up, the load workflow has a persistent bug — open n8n execution logs for the specific run_id in extract_runs.
"Row counts diverge more than ~1%"
Usually one of:
- A retry window overlapped the PK range and some rows lost the race with a concurrent source-side write. Re-trigger the extract for that window manually (see §8).
- CSV parse error silently dropped a row. Check
extract_runs.rows_extractedvs.rows_loaded— if they differ, the loader found malformed CSV.
"Geometry loaded as NULL"
EWKT serialisation broke on the extract side. Verify the source query has the CASE WHEN geom IS NULL guard — without it, ST_AsEWKT(NULL) returns NULL correctly but an empty geometry returns 'GEOMETRYCOLLECTION EMPTY' which ST_GeomFromEWKT rejects.
"bronze.trips.distance_km values are 1000× too large"
The extract query is missing /1000.0 on the source distance_m column. See §5 "Schema drift".
8. Manual Re-Run
Re-run a single table for the current window
- Open n8n, go to
dwh_extractworkflow. - Locate the branch for that table.
- Click Execute Workflow → selects that table only.
- Confirm a new row appears in
dwh_control.extract_runswithstatus='loaded'.
Back-fill a historical window
The extract workflow respects the watermark; to re-extract a window, rewind the watermark:
-- Rewind position_history to 24h ago
UPDATE dwh_control.extract_watermarks
SET last_extracted_at = NOW() - INTERVAL '24 hours'
WHERE table_name = 'position_history';
Next scheduled run will re-extract the gap. Loads are idempotent (ON CONFLICT DO NOTHING on the PK), so duplicate rows are filtered at the bronze boundary.
Full reseed (nuclear option)
-- Restart position_history from the beginning
UPDATE dwh_control.extract_watermarks
SET last_extracted_at = '2026-01-01T00:00:00Z'
WHERE table_name = 'position_history';
The first post-reseed run will produce one very large CSV (~all history). The rustfs exports/ prefix will briefly hold a multi-GB object. Expected; it moves to processed/ on success.
9. Credential Rotation
260423_dwh_ddl_v1.sql commits role passwords in plaintext — a pre-existing flaw to be cleaned up separately.
To rotate:
-- As superuser on tracksolid_dwh:
ALTER ROLE dwh_owner PASSWORD '<new secret>';
ALTER ROLE grafana_ro PASSWORD '<new secret>';
Then update the matching n8n credential and Grafana datasource. Never commit the new password — store in .env if needed for scripts, or keep exclusively inside n8n/Grafana credential stores.
10. Known Quirks
| Quirk | Source | Handling |
|---|---|---|
trips.distance_m → bronze.trips.distance_km |
Schema drift between source and bronze | Divide in extract SQL (§5) |
Hypertable row counts read 0 in pg_stat_user_tables |
TimescaleDB quirk | Always SELECT COUNT(*) directly |
parking_events can be empty for days |
Endpoint returns empty; not a failure | Zero rows loaded is a valid run outcome |
| First run of each day larger | Overnight backlog | Expected; plan watermark design |
last_extracted_at default 2026-01-01 |
Seed value from 261001 | First run on a new table back-fills all history |
11. Out of Scope (follow-up)
- Silver/gold transformations —
silverandgoldschemas exist but contain no views. - Grafana dashboard panels — these views are the data source; panels TBD.
- OBD / fault codes / fuel / temperature / LBS / heartbeats — webhooks not yet registered; add a watermark row + extract branch when they start reporting.
- Bronze schema evolution tooling — additive changes via numbered migrations is fine for one pipeline; revisit if scope grows.