""" backfill_trips_enrichment.py — One-shot enrichment of historical tracksolid.trips rows ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ Migration 09 added route_geom, start/end_address, vehicle_plate, waypoints_count. poll_trips() fills these for new trips going forward; this script backfills existing rows where the new columns are NULL by reconstructing data from position_history (the GPS trail is already there) and Nominatim. Behaviour: • Selects rows where route_geom IS NULL OR start_geom IS NULL (covers the original 8 historical poll-ingested trips and any future rows that landed before position_history caught up). • Per row: runs the same 4-subquery enrichment poll_trips uses, then reverse-geocodes start/end via Nominatim. • Writes only via COALESCE — never overwrites webhook-supplied data. • Logs each run to tracksolid.ingestion_log with endpoint='backfill_trips_enrichment'. Usage: # Dry-run — shows counts only, writes nothing python backfill_trips_enrichment.py # Apply changes python backfill_trips_enrichment.py --apply # Scope to a single device python backfill_trips_enrichment.py --imei 862798052707896 --apply # Limit to trips since a date (UTC) python backfill_trips_enrichment.py --since 2026-04-01 --apply # Skip Nominatim reverse-geocoding (geometry/plate/idle only — runs in # minutes instead of hours when backfilling thousands of rows). Addresses # remain NULL for these rows and will be filled by future poll_trips # cycles only for new trips, not retroactively. python backfill_trips_enrichment.py --skip-geocode --apply ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ """ import argparse import time from ts_shared_rev import ( get_conn, get_logger, log_ingestion, reverse_geocode, ) log = get_logger("backfill_trips") _ENRICH_QUERY = """ SELECT (SELECT geom FROM tracksolid.position_history WHERE imei = %s AND gps_time >= %s ORDER BY gps_time ASC LIMIT 1) AS start_geom, (SELECT ST_Y(geom) FROM tracksolid.position_history WHERE imei = %s AND gps_time >= %s ORDER BY gps_time ASC LIMIT 1) AS start_lat, (SELECT ST_X(geom) FROM tracksolid.position_history WHERE imei = %s AND gps_time >= %s ORDER BY gps_time ASC LIMIT 1) AS start_lng, (SELECT geom FROM tracksolid.position_history WHERE imei = %s AND gps_time <= %s ORDER BY gps_time DESC LIMIT 1) AS end_geom, (SELECT ST_Y(geom) FROM tracksolid.position_history WHERE imei = %s AND gps_time <= %s ORDER BY gps_time DESC LIMIT 1) AS end_lat, (SELECT ST_X(geom) FROM tracksolid.position_history WHERE imei = %s AND gps_time <= %s ORDER BY gps_time DESC LIMIT 1) AS end_lng, (SELECT ST_MakeLine(geom ORDER BY gps_time) FROM tracksolid.position_history WHERE imei = %s AND gps_time BETWEEN %s AND %s AND geom IS NOT NULL) AS route_geom, (SELECT COUNT(*) FROM tracksolid.position_history WHERE imei = %s AND gps_time BETWEEN %s AND %s) AS waypoints_count """ def _select_targets(cur, imei: str | None, since: str | None) -> list[tuple]: """Return rows that need enrichment, as (id, imei, start_time, end_time).""" sql = """ SELECT id, imei, start_time, end_time, vehicle_plate FROM tracksolid.trips WHERE (route_geom IS NULL OR start_geom IS NULL) AND end_time IS NOT NULL """ params: list = [] if imei: sql += " AND imei = %s" params.append(imei) if since: sql += " AND start_time >= %s" params.append(since) sql += " ORDER BY start_time" cur.execute(sql, params) return cur.fetchall() def _load_plates_cache(cur) -> dict[str, str]: cur.execute(""" SELECT imei, vehicle_number FROM tracksolid.devices WHERE vehicle_number IS NOT NULL """) return {imei: plate for imei, plate in cur.fetchall()} def run(apply: bool, filter_imei: str | None, since: str | None, skip_geocode: bool = False) -> None: t0 = time.time() enriched = degenerate = no_fixes = failed = 0 if skip_geocode: log.info("Reverse-geocoding disabled (--skip-geocode). Addresses will stay NULL.") with get_conn() as conn: with conn.cursor() as cur: plates = _load_plates_cache(cur) targets = _select_targets(cur, filter_imei, since) log.info( "Found %d trip(s) needing enrichment%s%s.", len(targets), f" for imei={filter_imei}" if filter_imei else "", f" since={since}" if since else "", ) for trip_id, imei, start_time, end_time, existing_plate in targets: try: cur.execute(_ENRICH_QUERY, ( imei, start_time, imei, start_time, imei, start_time, imei, end_time, imei, end_time, imei, end_time, imei, start_time, end_time, imei, start_time, end_time, )) (start_geom, start_lat, start_lng, end_geom, end_lat, end_lng, route_geom, waypoints_count) = cur.fetchone() if waypoints_count == 0: no_fixes += 1 log.info( " trip id=%s imei=%s start=%s — no GPS fixes in window, skipping", trip_id, imei, start_time, ) continue if waypoints_count < 2: # Not enough fixes for a polyline. Still capture the # single endpoint geom and address. degenerate += 1 log.info( " trip id=%s imei=%s — only %d fix(es), no route_geom", trip_id, imei, waypoints_count, ) if skip_geocode: start_address = end_address = None else: start_address = reverse_geocode(start_lat, start_lng) end_address = reverse_geocode(end_lat, end_lng) vehicle_plate = existing_plate or plates.get(imei) log.info( " trip id=%s imei=%s waypoints=%d start=%s end=%s", trip_id, imei, waypoints_count, start_address, end_address, ) if apply: cur.execute(""" UPDATE tracksolid.trips SET start_geom = COALESCE(start_geom, %s), end_geom = COALESCE(end_geom, %s), route_geom = COALESCE(route_geom, %s), waypoints_count = COALESCE(waypoints_count, %s), start_address = COALESCE(start_address, %s), end_address = COALESCE(end_address, %s), vehicle_plate = COALESCE(vehicle_plate, %s) WHERE id = %s """, ( start_geom, end_geom, route_geom, waypoints_count, start_address, end_address, vehicle_plate, trip_id, )) enriched += 1 except Exception: failed += 1 log.warning( "Failed to enrich trip id=%s imei=%s", trip_id, imei, exc_info=True, ) if apply: log_ingestion( cur, "backfill_trips_enrichment", imei_count=len(targets), upserted=0, inserted=enriched, duration_ms=int((time.time() - t0) * 1000), success=(failed == 0), ) mode = "APPLIED" if apply else "DRY-RUN" print(f"\n{'='*60}") print(f" {mode} COMPLETE") print(f"{'='*60}") print(f" Trips enriched : {enriched}") print(f" Degenerate (<2 fixes) : {degenerate}") print(f" Skipped (no fixes) : {no_fixes}") print(f" Failed : {failed}") if not apply: print("\n Run with --apply to commit changes.") if __name__ == "__main__": parser = argparse.ArgumentParser( description="Backfill route_geom / start_geom / end_geom / addresses on tracksolid.trips." ) parser.add_argument("--apply", action="store_true", help="Write changes to DB (default: dry-run)") parser.add_argument("--imei", default=None, help="Limit to a single IMEI") parser.add_argument("--since", default=None, help="Only trips with start_time >= YYYY-MM-DD (UTC)") parser.add_argument("--skip-geocode", action="store_true", help="Skip Nominatim reverse-geocoding (fast path for large backfills)") args = parser.parse_args() run(apply=args.apply, filter_imei=args.imei, since=args.since, skip_geocode=args.skip_geocode)