tracksolid_timescale_grafan.../backfill_trips_enrichment.py

230 lines
9.9 KiB
Python
Raw Permalink Normal View History

feat(trips): [FIX-M20] enrich tracksolid.trips with coords, route polyline, addresses, plate Polling jimi.device.track.mileage does not return start/end coordinates, fuel, idle, or trip sequence — leaving most trip columns NULL. This change closes those gaps using data we already have in position_history plus a best-effort Nominatim lookup. Migration 09_trips_enrichment.sql adds: • route_geom (LineString), start_address, end_address, vehicle_plate, waypoints_count on tracksolid.trips • GIST indexes on the three geometry columns • view tracksolid.v_trips_enriched exposing daily_seq + trip_date_eat (replaces reliance on the device-supplied trip_seq, which is only populated when /pushtripreport fires) ingest_movement_rev.py::poll_trips now: • extracts idleSecond from the poll response (was previously dropped) • per-trip: SELECTs start fix, end fix, ST_MakeLine route, and waypoint count from position_history within (start_time, end_time) • reverse-geocodes start/end via the new ts_shared_rev.reverse_geocode helper (Nominatim, LRU-cached at ~11m precision, 1 req/sec, never raises) • caches vehicle_plate from a per-cycle plates dict • ON CONFLICT preserves webhook-supplied data when /pushtripreport later delivers native coords/fuel/trip_seq backfill_trips_enrichment.py is a one-shot script (dry-run by default, --apply to commit, --imei / --since flags) that runs the same enrichment against historical NULL rows and COALESCEs only — never overwrites. DWH bronze mirrors and Grafana panels intentionally not touched (frozen on this branch until the schema work lands). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-01 18:30:20 +00:00
"""
backfill_trips_enrichment.py One-shot enrichment of historical tracksolid.trips rows
Migration 09 added route_geom, start/end_address, vehicle_plate, waypoints_count.
poll_trips() fills these for new trips going forward; this script backfills
existing rows where the new columns are NULL by reconstructing data from
position_history (the GPS trail is already there) and Nominatim.
Behaviour:
Selects rows where route_geom IS NULL OR start_geom IS NULL
(covers the original 8 historical poll-ingested trips and any future
rows that landed before position_history caught up).
Per row: runs the same 4-subquery enrichment poll_trips uses, then
reverse-geocodes start/end via Nominatim.
Writes only via COALESCE never overwrites webhook-supplied data.
Logs each run to tracksolid.ingestion_log with endpoint='backfill_trips_enrichment'.
Usage:
# Dry-run — shows counts only, writes nothing
python backfill_trips_enrichment.py
# Apply changes
python backfill_trips_enrichment.py --apply
# Scope to a single device
python backfill_trips_enrichment.py --imei 862798052707896 --apply
# Limit to trips since a date (UTC)
python backfill_trips_enrichment.py --since 2026-04-01 --apply
# Skip Nominatim reverse-geocoding (geometry/plate/idle only — runs in
# minutes instead of hours when backfilling thousands of rows). Addresses
# remain NULL for these rows and will be filled by future poll_trips
# cycles only for new trips, not retroactively.
python backfill_trips_enrichment.py --skip-geocode --apply
feat(trips): [FIX-M20] enrich tracksolid.trips with coords, route polyline, addresses, plate Polling jimi.device.track.mileage does not return start/end coordinates, fuel, idle, or trip sequence — leaving most trip columns NULL. This change closes those gaps using data we already have in position_history plus a best-effort Nominatim lookup. Migration 09_trips_enrichment.sql adds: • route_geom (LineString), start_address, end_address, vehicle_plate, waypoints_count on tracksolid.trips • GIST indexes on the three geometry columns • view tracksolid.v_trips_enriched exposing daily_seq + trip_date_eat (replaces reliance on the device-supplied trip_seq, which is only populated when /pushtripreport fires) ingest_movement_rev.py::poll_trips now: • extracts idleSecond from the poll response (was previously dropped) • per-trip: SELECTs start fix, end fix, ST_MakeLine route, and waypoint count from position_history within (start_time, end_time) • reverse-geocodes start/end via the new ts_shared_rev.reverse_geocode helper (Nominatim, LRU-cached at ~11m precision, 1 req/sec, never raises) • caches vehicle_plate from a per-cycle plates dict • ON CONFLICT preserves webhook-supplied data when /pushtripreport later delivers native coords/fuel/trip_seq backfill_trips_enrichment.py is a one-shot script (dry-run by default, --apply to commit, --imei / --since flags) that runs the same enrichment against historical NULL rows and COALESCEs only — never overwrites. DWH bronze mirrors and Grafana panels intentionally not touched (frozen on this branch until the schema work lands). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-01 18:30:20 +00:00
"""
import argparse
import time
from ts_shared_rev import (
get_conn,
get_logger,
log_ingestion,
reverse_geocode,
)
log = get_logger("backfill_trips")
_ENRICH_QUERY = """
SELECT
(SELECT geom FROM tracksolid.position_history
WHERE imei = %s AND gps_time >= %s
ORDER BY gps_time ASC LIMIT 1) AS start_geom,
(SELECT ST_Y(geom) FROM tracksolid.position_history
WHERE imei = %s AND gps_time >= %s
ORDER BY gps_time ASC LIMIT 1) AS start_lat,
(SELECT ST_X(geom) FROM tracksolid.position_history
WHERE imei = %s AND gps_time >= %s
ORDER BY gps_time ASC LIMIT 1) AS start_lng,
(SELECT geom FROM tracksolid.position_history
WHERE imei = %s AND gps_time <= %s
ORDER BY gps_time DESC LIMIT 1) AS end_geom,
(SELECT ST_Y(geom) FROM tracksolid.position_history
WHERE imei = %s AND gps_time <= %s
ORDER BY gps_time DESC LIMIT 1) AS end_lat,
(SELECT ST_X(geom) FROM tracksolid.position_history
WHERE imei = %s AND gps_time <= %s
ORDER BY gps_time DESC LIMIT 1) AS end_lng,
(SELECT ST_MakeLine(geom ORDER BY gps_time)
FROM tracksolid.position_history
WHERE imei = %s AND gps_time BETWEEN %s AND %s
AND geom IS NOT NULL) AS route_geom,
(SELECT COUNT(*) FROM tracksolid.position_history
WHERE imei = %s AND gps_time BETWEEN %s AND %s) AS waypoints_count
"""
def _select_targets(cur, imei: str | None, since: str | None) -> list[tuple]:
"""Return rows that need enrichment, as (id, imei, start_time, end_time)."""
sql = """
SELECT id, imei, start_time, end_time, vehicle_plate
FROM tracksolid.trips
WHERE (route_geom IS NULL OR start_geom IS NULL)
AND end_time IS NOT NULL
"""
params: list = []
if imei:
sql += " AND imei = %s"
params.append(imei)
if since:
sql += " AND start_time >= %s"
params.append(since)
sql += " ORDER BY start_time"
cur.execute(sql, params)
return cur.fetchall()
def _load_plates_cache(cur) -> dict[str, str]:
cur.execute("""
SELECT imei, vehicle_number
FROM tracksolid.devices
WHERE vehicle_number IS NOT NULL
""")
return {imei: plate for imei, plate in cur.fetchall()}
def run(apply: bool, filter_imei: str | None, since: str | None,
skip_geocode: bool = False) -> None:
feat(trips): [FIX-M20] enrich tracksolid.trips with coords, route polyline, addresses, plate Polling jimi.device.track.mileage does not return start/end coordinates, fuel, idle, or trip sequence — leaving most trip columns NULL. This change closes those gaps using data we already have in position_history plus a best-effort Nominatim lookup. Migration 09_trips_enrichment.sql adds: • route_geom (LineString), start_address, end_address, vehicle_plate, waypoints_count on tracksolid.trips • GIST indexes on the three geometry columns • view tracksolid.v_trips_enriched exposing daily_seq + trip_date_eat (replaces reliance on the device-supplied trip_seq, which is only populated when /pushtripreport fires) ingest_movement_rev.py::poll_trips now: • extracts idleSecond from the poll response (was previously dropped) • per-trip: SELECTs start fix, end fix, ST_MakeLine route, and waypoint count from position_history within (start_time, end_time) • reverse-geocodes start/end via the new ts_shared_rev.reverse_geocode helper (Nominatim, LRU-cached at ~11m precision, 1 req/sec, never raises) • caches vehicle_plate from a per-cycle plates dict • ON CONFLICT preserves webhook-supplied data when /pushtripreport later delivers native coords/fuel/trip_seq backfill_trips_enrichment.py is a one-shot script (dry-run by default, --apply to commit, --imei / --since flags) that runs the same enrichment against historical NULL rows and COALESCEs only — never overwrites. DWH bronze mirrors and Grafana panels intentionally not touched (frozen on this branch until the schema work lands). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-01 18:30:20 +00:00
t0 = time.time()
enriched = degenerate = no_fixes = failed = 0
if skip_geocode:
log.info("Reverse-geocoding disabled (--skip-geocode). Addresses will stay NULL.")
feat(trips): [FIX-M20] enrich tracksolid.trips with coords, route polyline, addresses, plate Polling jimi.device.track.mileage does not return start/end coordinates, fuel, idle, or trip sequence — leaving most trip columns NULL. This change closes those gaps using data we already have in position_history plus a best-effort Nominatim lookup. Migration 09_trips_enrichment.sql adds: • route_geom (LineString), start_address, end_address, vehicle_plate, waypoints_count on tracksolid.trips • GIST indexes on the three geometry columns • view tracksolid.v_trips_enriched exposing daily_seq + trip_date_eat (replaces reliance on the device-supplied trip_seq, which is only populated when /pushtripreport fires) ingest_movement_rev.py::poll_trips now: • extracts idleSecond from the poll response (was previously dropped) • per-trip: SELECTs start fix, end fix, ST_MakeLine route, and waypoint count from position_history within (start_time, end_time) • reverse-geocodes start/end via the new ts_shared_rev.reverse_geocode helper (Nominatim, LRU-cached at ~11m precision, 1 req/sec, never raises) • caches vehicle_plate from a per-cycle plates dict • ON CONFLICT preserves webhook-supplied data when /pushtripreport later delivers native coords/fuel/trip_seq backfill_trips_enrichment.py is a one-shot script (dry-run by default, --apply to commit, --imei / --since flags) that runs the same enrichment against historical NULL rows and COALESCEs only — never overwrites. DWH bronze mirrors and Grafana panels intentionally not touched (frozen on this branch until the schema work lands). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-01 18:30:20 +00:00
with get_conn() as conn:
with conn.cursor() as cur:
plates = _load_plates_cache(cur)
targets = _select_targets(cur, filter_imei, since)
log.info(
"Found %d trip(s) needing enrichment%s%s.",
len(targets),
f" for imei={filter_imei}" if filter_imei else "",
f" since={since}" if since else "",
)
for trip_id, imei, start_time, end_time, existing_plate in targets:
try:
cur.execute(_ENRICH_QUERY, (
imei, start_time,
imei, start_time,
imei, start_time,
imei, end_time,
imei, end_time,
imei, end_time,
imei, start_time, end_time,
imei, start_time, end_time,
))
(start_geom, start_lat, start_lng,
end_geom, end_lat, end_lng,
route_geom, waypoints_count) = cur.fetchone()
if waypoints_count == 0:
no_fixes += 1
log.info(
" trip id=%s imei=%s start=%s — no GPS fixes in window, skipping",
trip_id, imei, start_time,
)
continue
if waypoints_count < 2:
# Not enough fixes for a polyline. Still capture the
# single endpoint geom and address.
degenerate += 1
log.info(
" trip id=%s imei=%s — only %d fix(es), no route_geom",
trip_id, imei, waypoints_count,
)
if skip_geocode:
start_address = end_address = None
else:
start_address = reverse_geocode(start_lat, start_lng)
end_address = reverse_geocode(end_lat, end_lng)
feat(trips): [FIX-M20] enrich tracksolid.trips with coords, route polyline, addresses, plate Polling jimi.device.track.mileage does not return start/end coordinates, fuel, idle, or trip sequence — leaving most trip columns NULL. This change closes those gaps using data we already have in position_history plus a best-effort Nominatim lookup. Migration 09_trips_enrichment.sql adds: • route_geom (LineString), start_address, end_address, vehicle_plate, waypoints_count on tracksolid.trips • GIST indexes on the three geometry columns • view tracksolid.v_trips_enriched exposing daily_seq + trip_date_eat (replaces reliance on the device-supplied trip_seq, which is only populated when /pushtripreport fires) ingest_movement_rev.py::poll_trips now: • extracts idleSecond from the poll response (was previously dropped) • per-trip: SELECTs start fix, end fix, ST_MakeLine route, and waypoint count from position_history within (start_time, end_time) • reverse-geocodes start/end via the new ts_shared_rev.reverse_geocode helper (Nominatim, LRU-cached at ~11m precision, 1 req/sec, never raises) • caches vehicle_plate from a per-cycle plates dict • ON CONFLICT preserves webhook-supplied data when /pushtripreport later delivers native coords/fuel/trip_seq backfill_trips_enrichment.py is a one-shot script (dry-run by default, --apply to commit, --imei / --since flags) that runs the same enrichment against historical NULL rows and COALESCEs only — never overwrites. DWH bronze mirrors and Grafana panels intentionally not touched (frozen on this branch until the schema work lands). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-01 18:30:20 +00:00
vehicle_plate = existing_plate or plates.get(imei)
log.info(
" trip id=%s imei=%s waypoints=%d start=%s end=%s",
trip_id, imei, waypoints_count, start_address, end_address,
)
if apply:
cur.execute("""
UPDATE tracksolid.trips SET
start_geom = COALESCE(start_geom, %s),
end_geom = COALESCE(end_geom, %s),
route_geom = COALESCE(route_geom, %s),
waypoints_count = COALESCE(waypoints_count, %s),
start_address = COALESCE(start_address, %s),
end_address = COALESCE(end_address, %s),
vehicle_plate = COALESCE(vehicle_plate, %s)
WHERE id = %s
""", (
start_geom, end_geom, route_geom, waypoints_count,
start_address, end_address, vehicle_plate,
trip_id,
))
enriched += 1
except Exception:
failed += 1
log.warning(
"Failed to enrich trip id=%s imei=%s",
trip_id, imei, exc_info=True,
)
if apply:
log_ingestion(
cur, "backfill_trips_enrichment",
imei_count=len(targets),
upserted=0, inserted=enriched,
duration_ms=int((time.time() - t0) * 1000),
success=(failed == 0),
)
mode = "APPLIED" if apply else "DRY-RUN"
print(f"\n{'='*60}")
print(f" {mode} COMPLETE")
print(f"{'='*60}")
print(f" Trips enriched : {enriched}")
print(f" Degenerate (<2 fixes) : {degenerate}")
print(f" Skipped (no fixes) : {no_fixes}")
print(f" Failed : {failed}")
if not apply:
print("\n Run with --apply to commit changes.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Backfill route_geom / start_geom / end_geom / addresses on tracksolid.trips."
)
parser.add_argument("--apply", action="store_true", help="Write changes to DB (default: dry-run)")
parser.add_argument("--imei", default=None, help="Limit to a single IMEI")
parser.add_argument("--since", default=None, help="Only trips with start_time >= YYYY-MM-DD (UTC)")
parser.add_argument("--skip-geocode", action="store_true", help="Skip Nominatim reverse-geocoding (fast path for large backfills)")
feat(trips): [FIX-M20] enrich tracksolid.trips with coords, route polyline, addresses, plate Polling jimi.device.track.mileage does not return start/end coordinates, fuel, idle, or trip sequence — leaving most trip columns NULL. This change closes those gaps using data we already have in position_history plus a best-effort Nominatim lookup. Migration 09_trips_enrichment.sql adds: • route_geom (LineString), start_address, end_address, vehicle_plate, waypoints_count on tracksolid.trips • GIST indexes on the three geometry columns • view tracksolid.v_trips_enriched exposing daily_seq + trip_date_eat (replaces reliance on the device-supplied trip_seq, which is only populated when /pushtripreport fires) ingest_movement_rev.py::poll_trips now: • extracts idleSecond from the poll response (was previously dropped) • per-trip: SELECTs start fix, end fix, ST_MakeLine route, and waypoint count from position_history within (start_time, end_time) • reverse-geocodes start/end via the new ts_shared_rev.reverse_geocode helper (Nominatim, LRU-cached at ~11m precision, 1 req/sec, never raises) • caches vehicle_plate from a per-cycle plates dict • ON CONFLICT preserves webhook-supplied data when /pushtripreport later delivers native coords/fuel/trip_seq backfill_trips_enrichment.py is a one-shot script (dry-run by default, --apply to commit, --imei / --since flags) that runs the same enrichment against historical NULL rows and COALESCEs only — never overwrites. DWH bronze mirrors and Grafana panels intentionally not touched (frozen on this branch until the schema work lands). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-01 18:30:20 +00:00
args = parser.parse_args()
run(apply=args.apply, filter_imei=args.imei, since=args.since,
skip_geocode=args.skip_geocode)