Polling jimi.device.track.mileage does not return start/end coordinates,
fuel, idle, or trip sequence — leaving most trip columns NULL. This change
closes those gaps using data we already have in position_history plus a
best-effort Nominatim lookup.
Migration 09_trips_enrichment.sql adds:
• route_geom (LineString), start_address, end_address, vehicle_plate,
waypoints_count on tracksolid.trips
• GIST indexes on the three geometry columns
• view tracksolid.v_trips_enriched exposing daily_seq + trip_date_eat
(replaces reliance on the device-supplied trip_seq, which is only
populated when /pushtripreport fires)
ingest_movement_rev.py::poll_trips now:
• extracts idleSecond from the poll response (was previously dropped)
• per-trip: SELECTs start fix, end fix, ST_MakeLine route, and waypoint
count from position_history within (start_time, end_time)
• reverse-geocodes start/end via the new ts_shared_rev.reverse_geocode
helper (Nominatim, LRU-cached at ~11m precision, 1 req/sec, never raises)
• caches vehicle_plate from a per-cycle plates dict
• ON CONFLICT preserves webhook-supplied data when /pushtripreport later
delivers native coords/fuel/trip_seq
backfill_trips_enrichment.py is a one-shot script (dry-run by default,
--apply to commit, --imei / --since flags) that runs the same enrichment
against historical NULL rows and COALESCEs only — never overwrites.
DWH bronze mirrors and Grafana panels intentionally not touched (frozen
on this branch until the schema work lands).
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
251 lines
8.3 KiB
Python
251 lines
8.3 KiB
Python
"""
|
|
run_migrations.py — Idempotent SQL migration runner for Docker init.
|
|
|
|
Runs automatically on every container startup via docker-compose command:
|
|
sh -c "python run_migrations.py && python <service>.py"
|
|
|
|
How it works:
|
|
1. Creates tracksolid.schema_migrations table on first run.
|
|
2. Skips any migration already recorded in that table.
|
|
3. Applies pending migrations in filename order.
|
|
4. Records each successful migration so it never runs twice.
|
|
5. Verifies critical tables exist before allowing the service to start.
|
|
|
|
To add a new migration: create NN_description.sql in the repo and add
|
|
the filename to MIGRATIONS below. Coolify will apply it on next deploy.
|
|
"""
|
|
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
|
|
import psycopg2
|
|
|
|
DATABASE_URL = os.environ["DATABASE_URL"]
|
|
|
|
# ── Add new migration filenames here in order ─────────────────────────────────
|
|
MIGRATIONS = [
|
|
"02_tracksolid_full_schema_rev.sql",
|
|
"03_webhook_schema_migration.sql",
|
|
"04_bug_fix_migration.sql", # distance_m → distance_km rename + correction
|
|
"05_enhancement_migration.sql", # new tables, OBD columns, dwh_gold expansion
|
|
"06_business_analytics_migration.sql", # ops schema, dispatch_log, assigned_city
|
|
"07_analytics_views.sql", # Grafana-facing views in tracksolid.*
|
|
"08_analytics_config.sql", # ops.cost_rates, ops.kpi_targets + seed data
|
|
"09_trips_enrichment.sql", # trips.route_geom + addresses + plate + v_trips_enriched
|
|
]
|
|
|
|
# ── Tables that must exist before the service is allowed to start ─────────────
|
|
CRITICAL_TABLES = [
|
|
"tracksolid.devices",
|
|
"tracksolid.api_token_cache",
|
|
"tracksolid.ingestion_log",
|
|
"tracksolid.live_positions",
|
|
"tracksolid.position_history",
|
|
"tracksolid.trips",
|
|
"tracksolid.alarms",
|
|
"tracksolid.obd_readings",
|
|
"tracksolid.device_events",
|
|
"tracksolid.fuel_readings",
|
|
"tracksolid.temperature_readings",
|
|
"tracksolid.lbs_readings",
|
|
"tracksolid.geofences",
|
|
]
|
|
|
|
|
|
def get_conn():
|
|
return psycopg2.connect(DATABASE_URL)
|
|
|
|
|
|
def ensure_tracking_table(conn):
|
|
"""Create schema and schema_migrations tracking table if they don't exist."""
|
|
with conn.cursor() as cur:
|
|
# Schema may not exist yet on a fresh DB (migration 02 creates it,
|
|
# but we need it before we can create the tracking table).
|
|
cur.execute("CREATE SCHEMA IF NOT EXISTS tracksolid")
|
|
cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS tracksolid.schema_migrations (
|
|
filename TEXT PRIMARY KEY,
|
|
applied_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
|
)
|
|
""")
|
|
conn.commit()
|
|
|
|
|
|
def seed_pre_tracking_migrations(conn):
|
|
"""
|
|
Retroactively mark migrations as applied if their schema objects already
|
|
exist. Checked on every startup — safe to run repeatedly (ON CONFLICT DO
|
|
NOTHING). Prevents re-running non-idempotent statements when a second
|
|
container starts after another has already applied the migration, or when
|
|
the tracking table is introduced to a database migrated before it existed.
|
|
|
|
Sentinel objects per migration:
|
|
02 — tracksolid.devices table exists
|
|
03 — position_history.altitude column exists
|
|
04 — trips.distance_km column exists (renamed from distance_m)
|
|
05 — tracksolid.device_events table exists (new in 05)
|
|
"""
|
|
checks = [
|
|
(
|
|
"02_tracksolid_full_schema_rev.sql",
|
|
"SELECT 1 FROM information_schema.tables "
|
|
"WHERE table_schema='tracksolid' AND table_name='devices'",
|
|
),
|
|
(
|
|
"03_webhook_schema_migration.sql",
|
|
"SELECT 1 FROM information_schema.columns "
|
|
"WHERE table_schema='tracksolid' AND table_name='position_history' "
|
|
"AND column_name='altitude'",
|
|
),
|
|
(
|
|
"04_bug_fix_migration.sql",
|
|
"SELECT 1 FROM information_schema.columns "
|
|
"WHERE table_schema='tracksolid' AND table_name='trips' "
|
|
"AND column_name='distance_km'",
|
|
),
|
|
(
|
|
"05_enhancement_migration.sql",
|
|
"SELECT 1 FROM information_schema.tables "
|
|
"WHERE table_schema='tracksolid' AND table_name='device_events'",
|
|
),
|
|
(
|
|
"06_business_analytics_migration.sql",
|
|
"SELECT 1 FROM information_schema.tables "
|
|
"WHERE table_schema='ops' AND table_name='tickets'",
|
|
),
|
|
(
|
|
"07_analytics_views.sql",
|
|
"SELECT 1 FROM information_schema.views "
|
|
"WHERE table_schema='tracksolid' AND table_name='v_fleet_today'",
|
|
),
|
|
]
|
|
|
|
seeds = []
|
|
with conn.cursor() as cur:
|
|
for filename, query in checks:
|
|
cur.execute(query)
|
|
if cur.fetchone():
|
|
cur.execute(
|
|
"INSERT INTO tracksolid.schema_migrations (filename) "
|
|
"VALUES (%s) ON CONFLICT DO NOTHING",
|
|
(filename,),
|
|
)
|
|
seeds.append(filename)
|
|
|
|
conn.commit()
|
|
if seeds:
|
|
print(f" Seeded as applied: {', '.join(seeds)}")
|
|
|
|
|
|
def already_applied(conn, filename):
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"SELECT 1 FROM tracksolid.schema_migrations WHERE filename = %s",
|
|
(filename,),
|
|
)
|
|
return cur.fetchone() is not None
|
|
|
|
|
|
def record_applied(conn, filename):
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"INSERT INTO tracksolid.schema_migrations (filename) VALUES (%s) ON CONFLICT DO NOTHING",
|
|
(filename,),
|
|
)
|
|
conn.commit()
|
|
|
|
|
|
def run_file(path, filename):
|
|
"""Execute a SQL file via psql. Returns True on success."""
|
|
print(f" APPLY {filename} ...")
|
|
result = subprocess.run(
|
|
["psql", DATABASE_URL, "-f", path],
|
|
capture_output=True, text=True,
|
|
)
|
|
errors = [l for l in result.stderr.splitlines() if "ERROR:" in l]
|
|
if errors:
|
|
for e in errors:
|
|
print(f" ERROR: {e.strip()}")
|
|
return False
|
|
print(f" OK {filename}")
|
|
return True
|
|
|
|
|
|
def sync_role_passwords(conn):
|
|
"""
|
|
Keep DB role passwords in sync with env vars on every startup.
|
|
Safe to run repeatedly — ALTER ROLE is idempotent.
|
|
This fixes roles created with the placeholder 'SET_PASSWORD_IN_ENV'.
|
|
"""
|
|
roles = {
|
|
"grafana_ro": os.getenv("GRAFANA_DB_RO_PASSWORD"),
|
|
}
|
|
with conn.cursor() as cur:
|
|
for role, password in roles.items():
|
|
if password:
|
|
cur.execute(f"ALTER ROLE {role} WITH PASSWORD %s", (password,))
|
|
print(f" Password synced for role: {role}")
|
|
conn.commit()
|
|
|
|
|
|
def verify_schema(conn):
|
|
"""Verify critical tables exist. Exit 1 if missing — blocks service start."""
|
|
print("Verifying schema...")
|
|
with conn.cursor() as cur:
|
|
missing = []
|
|
for table in CRITICAL_TABLES:
|
|
schema, name = table.split(".")
|
|
cur.execute(
|
|
"SELECT 1 FROM information_schema.tables "
|
|
"WHERE table_schema=%s AND table_name=%s",
|
|
(schema, name),
|
|
)
|
|
if not cur.fetchone():
|
|
missing.append(table)
|
|
|
|
if missing:
|
|
print(f"FATAL: missing tables after migrations: {', '.join(missing)}")
|
|
sys.exit(1)
|
|
print(f" All {len(CRITICAL_TABLES)} critical tables verified.")
|
|
|
|
|
|
def main():
|
|
print("=== Database Migration Runner ===")
|
|
|
|
conn = get_conn()
|
|
ensure_tracking_table(conn)
|
|
seed_pre_tracking_migrations(conn)
|
|
|
|
applied = skipped = 0
|
|
for sql_file in MIGRATIONS:
|
|
path = os.path.join("/app", sql_file)
|
|
|
|
if not os.path.exists(path):
|
|
print(f" SKIP {sql_file} (file not found in /app)")
|
|
skipped += 1
|
|
continue
|
|
|
|
if already_applied(conn, sql_file):
|
|
print(f" SKIP {sql_file} (already applied)")
|
|
skipped += 1
|
|
continue
|
|
|
|
if run_file(path, sql_file):
|
|
record_applied(conn, sql_file)
|
|
applied += 1
|
|
else:
|
|
print(f"FATAL: migration {sql_file} failed — aborting.")
|
|
conn.close()
|
|
sys.exit(1)
|
|
|
|
print(f"\nMigrations: {applied} applied, {skipped} skipped.")
|
|
|
|
sync_role_passwords(conn)
|
|
verify_schema(conn)
|
|
conn.close()
|
|
print("Startup checks passed.\n")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|