-- ============================================================= -- TRACKSOLID DWH CONTROL SCHEMA -- Target Database: tracksolid_dwh -- Purpose: Watermarks + per-run audit log for the n8n DWH pipeline -- Applies after: 260423_dwh_ddl_v1.sql (requires dwh_owner role + grafana_ro role) -- ============================================================= BEGIN; -- 1. CONTROL SCHEMA -- Owned by dwh_owner to match bronze/silver/gold ownership convention from 260423. CREATE SCHEMA IF NOT EXISTS dwh_control AUTHORIZATION dwh_owner; GRANT USAGE ON SCHEMA dwh_control TO grafana_ro; -- 2. PERMISSIONS (dwh_owner writes, grafana_ro reads) -- Existing default privileges from 260423 only cover bronze/silver/gold; extend to dwh_control. ALTER DEFAULT PRIVILEGES FOR ROLE dwh_owner IN SCHEMA dwh_control GRANT SELECT ON TABLES TO grafana_ro; -- 3. EXTRACT WATERMARKS -- One row per incremental table. Updated by Workflow 2 after a successful load commit. -- last_extracted_at is the UPPER bound used in the most recent successful extract, -- so the next run uses `WHERE > last_extracted_at AND <= :run_started_at`. SET ROLE dwh_owner; CREATE TABLE IF NOT EXISTS dwh_control.extract_watermarks ( table_name TEXT PRIMARY KEY, last_extracted_at TIMESTAMPTZ NOT NULL DEFAULT '2026-01-01T00:00:00Z', last_loaded_at TIMESTAMPTZ, rows_loaded_last_run INT, updated_at TIMESTAMPTZ DEFAULT NOW() ); -- 4. EXTRACT RUN AUDIT LOG -- One row per table per cron tick. Lifecycle: extracting → uploaded → loading → loaded (or failed). -- Failures retain error_message; the CSV stays in dwh/exports/ for the next run to pick up. CREATE TABLE IF NOT EXISTS dwh_control.extract_runs ( run_id BIGSERIAL PRIMARY KEY, table_name TEXT NOT NULL, run_started_at TIMESTAMPTZ NOT NULL, run_finished_at TIMESTAMPTZ, rows_extracted INT, rows_loaded INT, csv_path TEXT, status TEXT CHECK (status IN ('extracting','uploaded','loading','loaded','failed')), error_message TEXT ); CREATE INDEX IF NOT EXISTS idx_extract_runs_table_time ON dwh_control.extract_runs (table_name, run_started_at DESC); CREATE INDEX IF NOT EXISTS idx_extract_runs_status_time ON dwh_control.extract_runs (status, run_finished_at DESC); -- 5. SEED WATERMARKS -- One row per incremental table. Snapshot tables (devices, live_positions) do not need -- watermarks and are intentionally omitted. INSERT INTO dwh_control.extract_watermarks (table_name) VALUES ('position_history'), ('trips'), ('alarms'), ('parking_events'), ('device_events'), ('ingestion_log') ON CONFLICT (table_name) DO NOTHING; RESET ROLE; COMMIT;