tracksolid_timescale_grafan.../run_migrations.py
David Kiania 5d47eece6b Fix: seed pre-tracking migrations to skip already-applied 02 and 03
Migration 02 and 03 were applied before the schema_migrations tracking
table existed, so they had no record and the runner tried to re-run them,
hitting non-idempotent TimescaleDB policy/trigger/cagg statements.

seed_pre_tracking_migrations() checks for sentinel schema objects and
inserts records for any migration that was clearly already applied:
  - 02: tracksolid.devices table exists
  - 03: position_history.altitude column exists

Called immediately after ensure_tracking_table() on every startup.
Safe on fresh databases (objects absent → nothing seeded → runs normally).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-10 23:43:44 +03:00

202 lines
6.4 KiB
Python

"""
run_migrations.py — Idempotent SQL migration runner for Docker init.
Runs automatically on every container startup via docker-compose command:
sh -c "python run_migrations.py && python <service>.py"
How it works:
1. Creates tracksolid.schema_migrations table on first run.
2. Skips any migration already recorded in that table.
3. Applies pending migrations in filename order.
4. Records each successful migration so it never runs twice.
5. Verifies critical tables exist before allowing the service to start.
To add a new migration: create NN_description.sql in the repo and add
the filename to MIGRATIONS below. Coolify will apply it on next deploy.
"""
import os
import subprocess
import sys
import psycopg2
DATABASE_URL = os.environ["DATABASE_URL"]
# ── Add new migration filenames here in order ─────────────────────────────────
MIGRATIONS = [
"02_tracksolid_full_schema_rev.sql",
"03_webhook_schema_migration.sql",
"04_bug_fix_migration.sql", # distance_m → distance_km rename + correction
"05_enhancement_migration.sql", # new tables, OBD columns, dwh_gold expansion
]
# ── Tables that must exist before the service is allowed to start ─────────────
CRITICAL_TABLES = [
"tracksolid.devices",
"tracksolid.api_token_cache",
"tracksolid.ingestion_log",
"tracksolid.live_positions",
"tracksolid.position_history",
"tracksolid.trips",
"tracksolid.alarms",
"tracksolid.obd_readings",
"tracksolid.device_events",
"tracksolid.fuel_readings",
"tracksolid.temperature_readings",
"tracksolid.lbs_readings",
"tracksolid.geofences",
]
def get_conn():
return psycopg2.connect(DATABASE_URL)
def ensure_tracking_table(conn):
"""Create schema and schema_migrations tracking table if they don't exist."""
with conn.cursor() as cur:
# Schema may not exist yet on a fresh DB (migration 02 creates it,
# but we need it before we can create the tracking table).
cur.execute("CREATE SCHEMA IF NOT EXISTS tracksolid")
cur.execute("""
CREATE TABLE IF NOT EXISTS tracksolid.schema_migrations (
filename TEXT PRIMARY KEY,
applied_at TIMESTAMPTZ NOT NULL DEFAULT now()
)
""")
conn.commit()
def seed_pre_tracking_migrations(conn):
"""
Retroactively mark migrations as applied if their schema objects already
exist. Required when the tracking table is introduced to a database that
was migrated before tracking existed — prevents re-running non-idempotent
statements (TimescaleDB policies, triggers, continuous aggregates).
"""
seeds = []
with conn.cursor() as cur:
# Migration 02: tracksolid.devices is the canonical sentinel table
cur.execute("""
SELECT 1 FROM information_schema.tables
WHERE table_schema = 'tracksolid' AND table_name = 'devices'
""")
if cur.fetchone():
seeds.append("02_tracksolid_full_schema_rev.sql")
# Migration 03: position_history.altitude column added in this migration
cur.execute("""
SELECT 1 FROM information_schema.columns
WHERE table_schema = 'tracksolid'
AND table_name = 'position_history'
AND column_name = 'altitude'
""")
if cur.fetchone():
seeds.append("03_webhook_schema_migration.sql")
for filename in seeds:
cur.execute("""
INSERT INTO tracksolid.schema_migrations (filename)
VALUES (%s) ON CONFLICT DO NOTHING
""", (filename,))
conn.commit()
if seeds:
print(f" Seeded pre-tracking migrations as applied: {', '.join(seeds)}")
def already_applied(conn, filename):
with conn.cursor() as cur:
cur.execute(
"SELECT 1 FROM tracksolid.schema_migrations WHERE filename = %s",
(filename,),
)
return cur.fetchone() is not None
def record_applied(conn, filename):
with conn.cursor() as cur:
cur.execute(
"INSERT INTO tracksolid.schema_migrations (filename) VALUES (%s) ON CONFLICT DO NOTHING",
(filename,),
)
conn.commit()
def run_file(path, filename):
"""Execute a SQL file via psql. Returns True on success."""
print(f" APPLY {filename} ...")
result = subprocess.run(
["psql", DATABASE_URL, "-f", path],
capture_output=True, text=True,
)
errors = [l for l in result.stderr.splitlines() if "ERROR:" in l]
if errors:
for e in errors:
print(f" ERROR: {e.strip()}")
return False
print(f" OK {filename}")
return True
def verify_schema(conn):
"""Verify critical tables exist. Exit 1 if missing — blocks service start."""
print("Verifying schema...")
with conn.cursor() as cur:
missing = []
for table in CRITICAL_TABLES:
schema, name = table.split(".")
cur.execute(
"SELECT 1 FROM information_schema.tables "
"WHERE table_schema=%s AND table_name=%s",
(schema, name),
)
if not cur.fetchone():
missing.append(table)
if missing:
print(f"FATAL: missing tables after migrations: {', '.join(missing)}")
sys.exit(1)
print(f" All {len(CRITICAL_TABLES)} critical tables verified.")
def main():
print("=== Database Migration Runner ===")
conn = get_conn()
ensure_tracking_table(conn)
seed_pre_tracking_migrations(conn)
applied = skipped = 0
for sql_file in MIGRATIONS:
path = os.path.join("/app", sql_file)
if not os.path.exists(path):
print(f" SKIP {sql_file} (file not found in /app)")
skipped += 1
continue
if already_applied(conn, sql_file):
print(f" SKIP {sql_file} (already applied)")
skipped += 1
continue
if run_file(path, sql_file):
record_applied(conn, sql_file)
applied += 1
else:
print(f"FATAL: migration {sql_file} failed — aborting.")
conn.close()
sys.exit(1)
print(f"\nMigrations: {applied} applied, {skipped} skipped.")
verify_schema(conn)
conn.close()
print("Startup checks passed.\n")
if __name__ == "__main__":
main()