tracksolid_timescale_grafan.../run_migrations.py

252 lines
8.3 KiB
Python
Raw Normal View History

"""
run_migrations.py Idempotent SQL migration runner for Docker init.
Runs automatically on every container startup via docker-compose command:
sh -c "python run_migrations.py && python <service>.py"
How it works:
1. Creates tracksolid.schema_migrations table on first run.
2. Skips any migration already recorded in that table.
3. Applies pending migrations in filename order.
4. Records each successful migration so it never runs twice.
5. Verifies critical tables exist before allowing the service to start.
To add a new migration: create NN_description.sql in the repo and add
the filename to MIGRATIONS below. Coolify will apply it on next deploy.
"""
import os
import subprocess
import sys
import psycopg2
DATABASE_URL = os.environ["DATABASE_URL"]
# ── Add new migration filenames here in order ─────────────────────────────────
MIGRATIONS = [
"02_tracksolid_full_schema_rev.sql",
"03_webhook_schema_migration.sql",
"04_bug_fix_migration.sql", # distance_m → distance_km rename + correction
"05_enhancement_migration.sql", # new tables, OBD columns, dwh_gold expansion
"06_business_analytics_migration.sql", # ops schema, dispatch_log, assigned_city
"07_analytics_views.sql", # Grafana-facing views in tracksolid.*
feat(analytics): Phase 0 — analytics-config migration and CSV importer rewrite Phase 0 of the three-stakeholder analytics redesign: - 08_analytics_config.sql: ops.cost_rates + ops.kpi_targets with seed fuel rates (KES 195/L NBO+MBA, UGX 5200/L KLA) and 6 seed KPI targets (utilisation_pct, idle_pct global+osp-patrol, fuel_kes_per_100km, mttr_hours, alarms_per_100km). Granted SELECT to grafana_ro. Wired into run_migrations.py MIGRATIONS. - import_drivers_csv.py: full rewrite for the new Mitieng CSV (20260427_FSG_Vehicles_mitieng.csv). Snake_case columns, drops _infer_city() plate-prefix logic in favour of reading assigned_city directly. Adds cost_centre, assigned_route, vehicle_category, vehicle_brand, fuel_100km, depot_address. Treats the literal "NULL" string as missing. Reuses clean(), clean_num(), clean_ts(), get_conn(), get_logger() from ts_shared_rev. Special-cases numeric and timestamptz columns in the UPDATE clause. - audit_device_reconciliation.py: read-only audit comparing the CSV against tracksolid.devices. Reports per-account row counts, IMEIs on one side only, and devices on both sides whose metadata is still NULL. - 260427_device_reconciliation.md + 260427_audit_output.txt: Phase 0.2 reconciliation record. First run: DB has 172 devices, CSV has 162, delta +10 (10 IMEIs in DB-only, mostly fireside-account auto-syncs). Importer run with --only-null --apply filled 154 rows; coverage now assigned_city 152/172, cost_centre 150/172. Applied to stage on 2026-04-27 23:35 UTC. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-27 20:42:37 +00:00
"08_analytics_config.sql", # ops.cost_rates, ops.kpi_targets + seed data
feat(trips): [FIX-M20] enrich tracksolid.trips with coords, route polyline, addresses, plate Polling jimi.device.track.mileage does not return start/end coordinates, fuel, idle, or trip sequence — leaving most trip columns NULL. This change closes those gaps using data we already have in position_history plus a best-effort Nominatim lookup. Migration 09_trips_enrichment.sql adds: • route_geom (LineString), start_address, end_address, vehicle_plate, waypoints_count on tracksolid.trips • GIST indexes on the three geometry columns • view tracksolid.v_trips_enriched exposing daily_seq + trip_date_eat (replaces reliance on the device-supplied trip_seq, which is only populated when /pushtripreport fires) ingest_movement_rev.py::poll_trips now: • extracts idleSecond from the poll response (was previously dropped) • per-trip: SELECTs start fix, end fix, ST_MakeLine route, and waypoint count from position_history within (start_time, end_time) • reverse-geocodes start/end via the new ts_shared_rev.reverse_geocode helper (Nominatim, LRU-cached at ~11m precision, 1 req/sec, never raises) • caches vehicle_plate from a per-cycle plates dict • ON CONFLICT preserves webhook-supplied data when /pushtripreport later delivers native coords/fuel/trip_seq backfill_trips_enrichment.py is a one-shot script (dry-run by default, --apply to commit, --imei / --since flags) that runs the same enrichment against historical NULL rows and COALESCEs only — never overwrites. DWH bronze mirrors and Grafana panels intentionally not touched (frozen on this branch until the schema work lands). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-01 18:30:20 +00:00
"09_trips_enrichment.sql", # trips.route_geom + addresses + plate + v_trips_enriched
]
# ── Tables that must exist before the service is allowed to start ─────────────
CRITICAL_TABLES = [
"tracksolid.devices",
"tracksolid.api_token_cache",
"tracksolid.ingestion_log",
"tracksolid.live_positions",
"tracksolid.position_history",
"tracksolid.trips",
"tracksolid.alarms",
"tracksolid.obd_readings",
"tracksolid.device_events",
"tracksolid.fuel_readings",
"tracksolid.temperature_readings",
"tracksolid.lbs_readings",
"tracksolid.geofences",
]
def get_conn():
return psycopg2.connect(DATABASE_URL)
def ensure_tracking_table(conn):
"""Create schema and schema_migrations tracking table if they don't exist."""
with conn.cursor() as cur:
# Schema may not exist yet on a fresh DB (migration 02 creates it,
# but we need it before we can create the tracking table).
cur.execute("CREATE SCHEMA IF NOT EXISTS tracksolid")
cur.execute("""
CREATE TABLE IF NOT EXISTS tracksolid.schema_migrations (
filename TEXT PRIMARY KEY,
applied_at TIMESTAMPTZ NOT NULL DEFAULT now()
)
""")
conn.commit()
def seed_pre_tracking_migrations(conn):
"""
Retroactively mark migrations as applied if their schema objects already
exist. Checked on every startup safe to run repeatedly (ON CONFLICT DO
NOTHING). Prevents re-running non-idempotent statements when a second
container starts after another has already applied the migration, or when
the tracking table is introduced to a database migrated before it existed.
Sentinel objects per migration:
02 tracksolid.devices table exists
03 position_history.altitude column exists
04 trips.distance_km column exists (renamed from distance_m)
05 tracksolid.device_events table exists (new in 05)
"""
checks = [
(
"02_tracksolid_full_schema_rev.sql",
"SELECT 1 FROM information_schema.tables "
"WHERE table_schema='tracksolid' AND table_name='devices'",
),
(
"03_webhook_schema_migration.sql",
"SELECT 1 FROM information_schema.columns "
"WHERE table_schema='tracksolid' AND table_name='position_history' "
"AND column_name='altitude'",
),
(
"04_bug_fix_migration.sql",
"SELECT 1 FROM information_schema.columns "
"WHERE table_schema='tracksolid' AND table_name='trips' "
"AND column_name='distance_km'",
),
(
"05_enhancement_migration.sql",
"SELECT 1 FROM information_schema.tables "
"WHERE table_schema='tracksolid' AND table_name='device_events'",
),
(
"06_business_analytics_migration.sql",
"SELECT 1 FROM information_schema.tables "
"WHERE table_schema='ops' AND table_name='tickets'",
),
(
"07_analytics_views.sql",
"SELECT 1 FROM information_schema.views "
"WHERE table_schema='tracksolid' AND table_name='v_fleet_today'",
),
]
seeds = []
with conn.cursor() as cur:
for filename, query in checks:
cur.execute(query)
if cur.fetchone():
cur.execute(
"INSERT INTO tracksolid.schema_migrations (filename) "
"VALUES (%s) ON CONFLICT DO NOTHING",
(filename,),
)
seeds.append(filename)
conn.commit()
if seeds:
print(f" Seeded as applied: {', '.join(seeds)}")
def already_applied(conn, filename):
with conn.cursor() as cur:
cur.execute(
"SELECT 1 FROM tracksolid.schema_migrations WHERE filename = %s",
(filename,),
)
return cur.fetchone() is not None
def record_applied(conn, filename):
with conn.cursor() as cur:
cur.execute(
"INSERT INTO tracksolid.schema_migrations (filename) VALUES (%s) ON CONFLICT DO NOTHING",
(filename,),
)
conn.commit()
def run_file(path, filename):
"""Execute a SQL file via psql. Returns True on success."""
print(f" APPLY {filename} ...")
result = subprocess.run(
["psql", DATABASE_URL, "-f", path],
capture_output=True, text=True,
)
errors = [l for l in result.stderr.splitlines() if "ERROR:" in l]
if errors:
for e in errors:
print(f" ERROR: {e.strip()}")
return False
print(f" OK {filename}")
return True
def sync_role_passwords(conn):
"""
Keep DB role passwords in sync with env vars on every startup.
Safe to run repeatedly ALTER ROLE is idempotent.
This fixes roles created with the placeholder 'SET_PASSWORD_IN_ENV'.
"""
roles = {
"grafana_ro": os.getenv("GRAFANA_DB_RO_PASSWORD"),
}
with conn.cursor() as cur:
for role, password in roles.items():
if password:
cur.execute(f"ALTER ROLE {role} WITH PASSWORD %s", (password,))
print(f" Password synced for role: {role}")
conn.commit()
def verify_schema(conn):
"""Verify critical tables exist. Exit 1 if missing — blocks service start."""
print("Verifying schema...")
with conn.cursor() as cur:
missing = []
for table in CRITICAL_TABLES:
schema, name = table.split(".")
cur.execute(
"SELECT 1 FROM information_schema.tables "
"WHERE table_schema=%s AND table_name=%s",
(schema, name),
)
if not cur.fetchone():
missing.append(table)
if missing:
print(f"FATAL: missing tables after migrations: {', '.join(missing)}")
sys.exit(1)
print(f" All {len(CRITICAL_TABLES)} critical tables verified.")
def main():
print("=== Database Migration Runner ===")
conn = get_conn()
ensure_tracking_table(conn)
seed_pre_tracking_migrations(conn)
applied = skipped = 0
for sql_file in MIGRATIONS:
path = os.path.join("/app", sql_file)
if not os.path.exists(path):
print(f" SKIP {sql_file} (file not found in /app)")
skipped += 1
continue
if already_applied(conn, sql_file):
print(f" SKIP {sql_file} (already applied)")
skipped += 1
continue
if run_file(path, sql_file):
record_applied(conn, sql_file)
applied += 1
else:
print(f"FATAL: migration {sql_file} failed — aborting.")
conn.close()
sys.exit(1)
print(f"\nMigrations: {applied} applied, {skipped} skipped.")
sync_role_passwords(conn)
verify_schema(conn)
conn.close()
print("Startup checks passed.\n")
if __name__ == "__main__":
main()