From 144dedee90b7011dba7711951f4720e535610983 Mon Sep 17 00:00:00 2001 From: David Kiania Date: Fri, 1 May 2026 21:30:20 +0300 Subject: [PATCH] feat(trips): [FIX-M20] enrich tracksolid.trips with coords, route polyline, addresses, plate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Polling jimi.device.track.mileage does not return start/end coordinates, fuel, idle, or trip sequence — leaving most trip columns NULL. This change closes those gaps using data we already have in position_history plus a best-effort Nominatim lookup. Migration 09_trips_enrichment.sql adds: • route_geom (LineString), start_address, end_address, vehicle_plate, waypoints_count on tracksolid.trips • GIST indexes on the three geometry columns • view tracksolid.v_trips_enriched exposing daily_seq + trip_date_eat (replaces reliance on the device-supplied trip_seq, which is only populated when /pushtripreport fires) ingest_movement_rev.py::poll_trips now: • extracts idleSecond from the poll response (was previously dropped) • per-trip: SELECTs start fix, end fix, ST_MakeLine route, and waypoint count from position_history within (start_time, end_time) • reverse-geocodes start/end via the new ts_shared_rev.reverse_geocode helper (Nominatim, LRU-cached at ~11m precision, 1 req/sec, never raises) • caches vehicle_plate from a per-cycle plates dict • ON CONFLICT preserves webhook-supplied data when /pushtripreport later delivers native coords/fuel/trip_seq backfill_trips_enrichment.py is a one-shot script (dry-run by default, --apply to commit, --imei / --since flags) that runs the same enrichment against historical NULL rows and COALESCEs only — never overwrites. DWH bronze mirrors and Grafana panels intentionally not touched (frozen on this branch until the schema work lands). Co-Authored-By: Claude Opus 4.7 --- 09_trips_enrichment.sql | 82 +++++++++++++ backfill_trips_enrichment.py | 215 +++++++++++++++++++++++++++++++++++ ingest_movement_rev.py | 125 ++++++++++++++++++-- run_migrations.py | 1 + ts_shared_rev.py | 86 +++++++++++++- 5 files changed, 497 insertions(+), 12 deletions(-) create mode 100644 09_trips_enrichment.sql create mode 100644 backfill_trips_enrichment.py diff --git a/09_trips_enrichment.sql b/09_trips_enrichment.sql new file mode 100644 index 0000000..afe84db --- /dev/null +++ b/09_trips_enrichment.sql @@ -0,0 +1,82 @@ +-- ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ +-- Migration 09 — tracksolid.trips Enrichment +-- ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ +-- The polling endpoint jimi.device.track.mileage does not return start/end +-- coordinates, fuel, idle, or trip sequence. This migration adds the columns +-- needed to enrich every poll-ingested trip from data we already have: +-- • start/end coordinates and full route polyline reconstructed from +-- position_history at insert time (see ingest_movement_rev.py::poll_trips) +-- • reverse-geocoded human-readable addresses (Nominatim) +-- • denormalised vehicle_plate so trip displays don't need a join +-- • waypoint count for audit / data-quality checks +-- +-- Adds a v_trips_enriched view exposing daily_seq (Nth trip for IMEI on this +-- Africa/Nairobi date) — replaces reliance on the device-supplied trip_seq +-- which is only populated when the rarely-firing /pushtripreport webhook +-- delivers a payload. +-- +-- Run after migration 08. Safe to re-run (ADD COLUMN IF NOT EXISTS, +-- CREATE INDEX IF NOT EXISTS, CREATE OR REPLACE VIEW). +-- ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ + +BEGIN; + +-- ── 1. New columns on tracksolid.trips ────────────────────────────────────── + +ALTER TABLE tracksolid.trips + ADD COLUMN IF NOT EXISTS route_geom geometry(LineString, 4326), + ADD COLUMN IF NOT EXISTS start_address TEXT, + ADD COLUMN IF NOT EXISTS end_address TEXT, + ADD COLUMN IF NOT EXISTS vehicle_plate TEXT, + ADD COLUMN IF NOT EXISTS waypoints_count INTEGER; + +COMMENT ON COLUMN tracksolid.trips.route_geom IS + 'Full GPS route polyline built at ingest from position_history points ' + 'where gps_time BETWEEN start_time AND end_time. NULL when fewer than ' + '2 fixes are available for the trip window.'; +COMMENT ON COLUMN tracksolid.trips.start_address IS + 'Reverse-geocoded human-readable address near start_geom (Nominatim). ' + 'NULL on lookup failure; address is best-effort, not authoritative.'; +COMMENT ON COLUMN tracksolid.trips.end_address IS + 'Reverse-geocoded human-readable address near end_geom (Nominatim). ' + 'NULL on lookup failure; address is best-effort, not authoritative.'; +COMMENT ON COLUMN tracksolid.trips.vehicle_plate IS + 'Denormalised tracksolid.devices.vehicle_number cached at trip-insert ' + 'time. Avoids a join for trip displays; refreshed only on next ingest.'; +COMMENT ON COLUMN tracksolid.trips.waypoints_count IS + 'Number of position_history fixes that contributed to route_geom. ' + 'Audit aid: 0 or 1 means route_geom is NULL or degenerate.'; + +-- ── 2. Spatial indexes for replay / map queries ───────────────────────────── + +CREATE INDEX IF NOT EXISTS idx_trips_route_geom + ON tracksolid.trips USING GIST (route_geom); +CREATE INDEX IF NOT EXISTS idx_trips_start_geom + ON tracksolid.trips USING GIST (start_geom); +CREATE INDEX IF NOT EXISTS idx_trips_end_geom + ON tracksolid.trips USING GIST (end_geom); + +-- ── 3. v_trips_enriched view ──────────────────────────────────────────────── +-- Adds trip_date_eat (Africa/Nairobi local date) and daily_seq (Nth trip on +-- that date for the IMEI) without depending on the device-supplied trip_seq. + +CREATE OR REPLACE VIEW tracksolid.v_trips_enriched AS +SELECT + t.*, + (t.start_time AT TIME ZONE 'Africa/Nairobi')::date AS trip_date_eat, + ROW_NUMBER() OVER ( + PARTITION BY t.imei, (t.start_time AT TIME ZONE 'Africa/Nairobi')::date + ORDER BY t.start_time + ) AS daily_seq +FROM tracksolid.trips t; + +COMMENT ON VIEW tracksolid.v_trips_enriched IS + 'tracksolid.trips with computed daily_seq (Nth trip per IMEI per local ' + 'Africa/Nairobi day) and trip_date_eat. Replaces reliance on the ' + 'device-supplied trip_seq column, which is NULL for poll-ingested trips.'; + +-- ── 4. Read access for grafana_ro ─────────────────────────────────────────── + +GRANT SELECT ON tracksolid.v_trips_enriched TO grafana_ro; + +COMMIT; diff --git a/backfill_trips_enrichment.py b/backfill_trips_enrichment.py new file mode 100644 index 0000000..a619b9e --- /dev/null +++ b/backfill_trips_enrichment.py @@ -0,0 +1,215 @@ +""" +backfill_trips_enrichment.py — One-shot enrichment of historical tracksolid.trips rows +━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ +Migration 09 added route_geom, start/end_address, vehicle_plate, waypoints_count. +poll_trips() fills these for new trips going forward; this script backfills +existing rows where the new columns are NULL by reconstructing data from +position_history (the GPS trail is already there) and Nominatim. + +Behaviour: + • Selects rows where route_geom IS NULL OR start_geom IS NULL + (covers the original 8 historical poll-ingested trips and any future + rows that landed before position_history caught up). + • Per row: runs the same 4-subquery enrichment poll_trips uses, then + reverse-geocodes start/end via Nominatim. + • Writes only via COALESCE — never overwrites webhook-supplied data. + • Logs each run to tracksolid.ingestion_log with endpoint='backfill_trips_enrichment'. + +Usage: + # Dry-run — shows counts only, writes nothing + python backfill_trips_enrichment.py + + # Apply changes + python backfill_trips_enrichment.py --apply + + # Scope to a single device + python backfill_trips_enrichment.py --imei 862798052707896 --apply + + # Limit to trips since a date (UTC) + python backfill_trips_enrichment.py --since 2026-04-01 --apply +━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ +""" + +import argparse +import time + +from ts_shared_rev import ( + get_conn, + get_logger, + log_ingestion, + reverse_geocode, +) + +log = get_logger("backfill_trips") + +_ENRICH_QUERY = """ + SELECT + (SELECT geom FROM tracksolid.position_history + WHERE imei = %s AND gps_time >= %s + ORDER BY gps_time ASC LIMIT 1) AS start_geom, + (SELECT ST_Y(geom) FROM tracksolid.position_history + WHERE imei = %s AND gps_time >= %s + ORDER BY gps_time ASC LIMIT 1) AS start_lat, + (SELECT ST_X(geom) FROM tracksolid.position_history + WHERE imei = %s AND gps_time >= %s + ORDER BY gps_time ASC LIMIT 1) AS start_lng, + (SELECT geom FROM tracksolid.position_history + WHERE imei = %s AND gps_time <= %s + ORDER BY gps_time DESC LIMIT 1) AS end_geom, + (SELECT ST_Y(geom) FROM tracksolid.position_history + WHERE imei = %s AND gps_time <= %s + ORDER BY gps_time DESC LIMIT 1) AS end_lat, + (SELECT ST_X(geom) FROM tracksolid.position_history + WHERE imei = %s AND gps_time <= %s + ORDER BY gps_time DESC LIMIT 1) AS end_lng, + (SELECT ST_MakeLine(geom ORDER BY gps_time) + FROM tracksolid.position_history + WHERE imei = %s AND gps_time BETWEEN %s AND %s + AND geom IS NOT NULL) AS route_geom, + (SELECT COUNT(*) FROM tracksolid.position_history + WHERE imei = %s AND gps_time BETWEEN %s AND %s) AS waypoints_count +""" + + +def _select_targets(cur, imei: str | None, since: str | None) -> list[tuple]: + """Return rows that need enrichment, as (id, imei, start_time, end_time).""" + sql = """ + SELECT id, imei, start_time, end_time, vehicle_plate + FROM tracksolid.trips + WHERE (route_geom IS NULL OR start_geom IS NULL) + AND end_time IS NOT NULL + """ + params: list = [] + if imei: + sql += " AND imei = %s" + params.append(imei) + if since: + sql += " AND start_time >= %s" + params.append(since) + sql += " ORDER BY start_time" + cur.execute(sql, params) + return cur.fetchall() + + +def _load_plates_cache(cur) -> dict[str, str]: + cur.execute(""" + SELECT imei, vehicle_number + FROM tracksolid.devices + WHERE vehicle_number IS NOT NULL + """) + return {imei: plate for imei, plate in cur.fetchall()} + + +def run(apply: bool, filter_imei: str | None, since: str | None) -> None: + t0 = time.time() + enriched = degenerate = no_fixes = failed = 0 + + with get_conn() as conn: + with conn.cursor() as cur: + plates = _load_plates_cache(cur) + targets = _select_targets(cur, filter_imei, since) + + log.info( + "Found %d trip(s) needing enrichment%s%s.", + len(targets), + f" for imei={filter_imei}" if filter_imei else "", + f" since={since}" if since else "", + ) + + for trip_id, imei, start_time, end_time, existing_plate in targets: + try: + cur.execute(_ENRICH_QUERY, ( + imei, start_time, + imei, start_time, + imei, start_time, + imei, end_time, + imei, end_time, + imei, end_time, + imei, start_time, end_time, + imei, start_time, end_time, + )) + (start_geom, start_lat, start_lng, + end_geom, end_lat, end_lng, + route_geom, waypoints_count) = cur.fetchone() + + if waypoints_count == 0: + no_fixes += 1 + log.info( + " trip id=%s imei=%s start=%s — no GPS fixes in window, skipping", + trip_id, imei, start_time, + ) + continue + + if waypoints_count < 2: + # Not enough fixes for a polyline. Still capture the + # single endpoint geom and address. + degenerate += 1 + log.info( + " trip id=%s imei=%s — only %d fix(es), no route_geom", + trip_id, imei, waypoints_count, + ) + + start_address = reverse_geocode(start_lat, start_lng) + end_address = reverse_geocode(end_lat, end_lng) + vehicle_plate = existing_plate or plates.get(imei) + + log.info( + " trip id=%s imei=%s waypoints=%d start=%s end=%s", + trip_id, imei, waypoints_count, start_address, end_address, + ) + + if apply: + cur.execute(""" + UPDATE tracksolid.trips SET + start_geom = COALESCE(start_geom, %s), + end_geom = COALESCE(end_geom, %s), + route_geom = COALESCE(route_geom, %s), + waypoints_count = COALESCE(waypoints_count, %s), + start_address = COALESCE(start_address, %s), + end_address = COALESCE(end_address, %s), + vehicle_plate = COALESCE(vehicle_plate, %s) + WHERE id = %s + """, ( + start_geom, end_geom, route_geom, waypoints_count, + start_address, end_address, vehicle_plate, + trip_id, + )) + enriched += 1 + except Exception: + failed += 1 + log.warning( + "Failed to enrich trip id=%s imei=%s", + trip_id, imei, exc_info=True, + ) + + if apply: + log_ingestion( + cur, "backfill_trips_enrichment", + imei_count=len(targets), + upserted=0, inserted=enriched, + duration_ms=int((time.time() - t0) * 1000), + success=(failed == 0), + ) + + mode = "APPLIED" if apply else "DRY-RUN" + print(f"\n{'='*60}") + print(f" {mode} COMPLETE") + print(f"{'='*60}") + print(f" Trips enriched : {enriched}") + print(f" Degenerate (<2 fixes) : {degenerate}") + print(f" Skipped (no fixes) : {no_fixes}") + print(f" Failed : {failed}") + if not apply: + print("\n Run with --apply to commit changes.") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Backfill route_geom / start_geom / end_geom / addresses on tracksolid.trips." + ) + parser.add_argument("--apply", action="store_true", help="Write changes to DB (default: dry-run)") + parser.add_argument("--imei", default=None, help="Limit to a single IMEI") + parser.add_argument("--since", default=None, help="Only trips with start_time >= YYYY-MM-DD (UTC)") + args = parser.parse_args() + + run(apply=args.apply, filter_imei=args.imei, since=args.since) diff --git a/ingest_movement_rev.py b/ingest_movement_rev.py index 90651d5..20498e6 100644 --- a/ingest_movement_rev.py +++ b/ingest_movement_rev.py @@ -38,6 +38,12 @@ REVISIONS (QA-Verified): Tracksolid sub-accounts. sync_devices, poll_live_positions and poll_parking now iterate every target in TRACKSOLID_TARGETS and dedupe/scope per-target before writing. + [FIX-M20] Trip enrichment: poll_trips now backfills start_geom/end_geom/ + route_geom/waypoints_count from position_history at insert + time, extracts idleSecond, reverse-geocodes start/end addresses + (Nominatim), and caches vehicle_plate from devices. Closes the + NULL-column gaps that were inherent to jimi.device.track.mileage + (it does not return coordinates, idle, or trip sequence). ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ """ @@ -63,6 +69,7 @@ from ts_shared_rev import ( clean_int, clean_ts, get_logger, + reverse_geocode, safe_task, setup_shutdown, ) @@ -259,6 +266,57 @@ def poll_live_positions(): # ── 3. Trip Reports (Every 15m) ─────────────────────────────────────────────── +# [FIX-M20] Migration 09 added route_geom, start/end_address, vehicle_plate, +# waypoints_count to tracksolid.trips. poll_trips now enriches every poll- +# ingested trip from position_history (start/end fix + LineString polyline) +# and reverse-geocodes the endpoints, since jimi.device.track.mileage does +# not return coordinates. ON CONFLICT preserves webhook-supplied data when +# /pushtripreport later delivers native coords. + +# Per-trip enrichment from position_history. Four readable scalar subqueries +# rather than a tighter CTE — runs sub-ms each given the (imei, gps_time) PK, +# and the readable form survives the edge case where start_time is just +# before the first available fix in the window (single CTE bounded by +# BETWEEN would return NULL there). +_ENRICH_QUERY = """ + SELECT + (SELECT geom FROM tracksolid.position_history + WHERE imei = %s AND gps_time >= %s + ORDER BY gps_time ASC LIMIT 1) AS start_geom, + (SELECT ST_Y(geom) FROM tracksolid.position_history + WHERE imei = %s AND gps_time >= %s + ORDER BY gps_time ASC LIMIT 1) AS start_lat, + (SELECT ST_X(geom) FROM tracksolid.position_history + WHERE imei = %s AND gps_time >= %s + ORDER BY gps_time ASC LIMIT 1) AS start_lng, + (SELECT geom FROM tracksolid.position_history + WHERE imei = %s AND gps_time <= %s + ORDER BY gps_time DESC LIMIT 1) AS end_geom, + (SELECT ST_Y(geom) FROM tracksolid.position_history + WHERE imei = %s AND gps_time <= %s + ORDER BY gps_time DESC LIMIT 1) AS end_lat, + (SELECT ST_X(geom) FROM tracksolid.position_history + WHERE imei = %s AND gps_time <= %s + ORDER BY gps_time DESC LIMIT 1) AS end_lng, + (SELECT ST_MakeLine(geom ORDER BY gps_time) + FROM tracksolid.position_history + WHERE imei = %s AND gps_time BETWEEN %s AND %s + AND geom IS NOT NULL) AS route_geom, + (SELECT COUNT(*) FROM tracksolid.position_history + WHERE imei = %s AND gps_time BETWEEN %s AND %s) AS waypoints_count +""" + + +def _load_plates_cache(cur) -> dict[str, str]: + """Build {imei: vehicle_number} for active devices once per poll cycle.""" + cur.execute(""" + SELECT imei, vehicle_number + FROM tracksolid.devices + WHERE enabled_flag = 1 AND vehicle_number IS NOT NULL + """) + return {imei: plate for imei, plate in cur.fetchall()} + + def poll_trips(): t0 = time.time() token, imeis = get_token(), get_active_imeis() @@ -270,6 +328,8 @@ def poll_trips(): with get_conn() as conn: with conn.cursor() as cur: + plates = _load_plates_cache(cur) + for i in range(0, len(imeis), 50): batch = imeis[i:i+50] resp = api_post("jimi.device.track.mileage", { @@ -288,20 +348,67 @@ def poll_trips(): # Divide by 1000 to store as distance_km. raw_dist = clean_num(t.get("distance")) dist_km = round(raw_dist / 1000.0, 4) if raw_dist is not None else None + + imei = t.get("imei") + trip_start = clean_ts(t.get("startTime")) + trip_end = clean_ts(t.get("endTime")) + idle_s = clean_int(t.get("idleSecond")) + + # [FIX-M20] Enrich from position_history. trip_start/end + # may be None (rare malformed payload) — skip enrichment + # in that case so we still capture the row. + start_geom = end_geom = route_geom = None + start_lat = start_lng = end_lat = end_lng = None + waypoints_count = 0 + if trip_start and trip_end: + cur.execute(_ENRICH_QUERY, ( + imei, trip_start, # start_geom + imei, trip_start, # start_lat + imei, trip_start, # start_lng + imei, trip_end, # end_geom + imei, trip_end, # end_lat + imei, trip_end, # end_lng + imei, trip_start, trip_end, # route_geom + imei, trip_start, trip_end, # waypoints_count + )) + (start_geom, start_lat, start_lng, + end_geom, end_lat, end_lng, + route_geom, waypoints_count) = cur.fetchone() + + start_address = reverse_geocode(start_lat, start_lng) + end_address = reverse_geocode(end_lat, end_lng) + vehicle_plate = plates.get(imei) + cur.execute(""" INSERT INTO tracksolid.trips ( imei, start_time, end_time, distance_km, - avg_speed_kmh, max_speed_kmh, driving_time_s, source - ) VALUES (%s, %s, %s, %s, %s, %s, %s, 'poll') + avg_speed_kmh, max_speed_kmh, driving_time_s, idle_time_s, + start_geom, end_geom, route_geom, waypoints_count, + start_address, end_address, vehicle_plate, source + ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, + %s, %s, %s, %s, + %s, %s, %s, 'poll') ON CONFLICT (imei, start_time) DO UPDATE SET - end_time = EXCLUDED.end_time, - distance_km = EXCLUDED.distance_km, - max_speed_kmh = COALESCE(EXCLUDED.max_speed_kmh, tracksolid.trips.max_speed_kmh), - driving_time_s = COALESCE(EXCLUDED.driving_time_s, tracksolid.trips.driving_time_s) + end_time = EXCLUDED.end_time, + distance_km = EXCLUDED.distance_km, + max_speed_kmh = COALESCE(EXCLUDED.max_speed_kmh, tracksolid.trips.max_speed_kmh), + driving_time_s = COALESCE(EXCLUDED.driving_time_s, tracksolid.trips.driving_time_s), + idle_time_s = COALESCE(EXCLUDED.idle_time_s, tracksolid.trips.idle_time_s), + start_geom = COALESCE(tracksolid.trips.start_geom, EXCLUDED.start_geom), + end_geom = COALESCE(EXCLUDED.end_geom, tracksolid.trips.end_geom), + route_geom = COALESCE(EXCLUDED.route_geom, tracksolid.trips.route_geom), + waypoints_count = EXCLUDED.waypoints_count, + start_address = COALESCE(tracksolid.trips.start_address, EXCLUDED.start_address), + end_address = COALESCE(EXCLUDED.end_address, tracksolid.trips.end_address), + vehicle_plate = COALESCE(EXCLUDED.vehicle_plate, tracksolid.trips.vehicle_plate) """, ( - t.get("imei"), clean_ts(t.get("startTime")), clean_ts(t.get("endTime")), - dist_km, clean_num(t.get("avgSpeed")), - clean_num(t.get("maxSpeed")), clean_int(t.get("runTimeSecond")) + imei, trip_start, trip_end, dist_km, + clean_num(t.get("avgSpeed")), + clean_num(t.get("maxSpeed")), + clean_int(t.get("runTimeSecond")), + idle_s, + start_geom, end_geom, route_geom, waypoints_count, + start_address, end_address, vehicle_plate, )) cur.execute("RELEASE SAVEPOINT sp") inserted += cur.rowcount diff --git a/run_migrations.py b/run_migrations.py index 3ae4166..143f71e 100644 --- a/run_migrations.py +++ b/run_migrations.py @@ -32,6 +32,7 @@ MIGRATIONS = [ "06_business_analytics_migration.sql", # ops schema, dispatch_log, assigned_city "07_analytics_views.sql", # Grafana-facing views in tracksolid.* "08_analytics_config.sql", # ops.cost_rates, ops.kpi_targets + seed data + "09_trips_enrichment.sql", # trips.route_geom + addresses + plate + v_trips_enriched ] # ── Tables that must exist before the service is allowed to start ───────────── diff --git a/ts_shared_rev.py b/ts_shared_rev.py index e782faf..72a5504 100644 --- a/ts_shared_rev.py +++ b/ts_shared_rev.py @@ -24,9 +24,11 @@ import logging import os import signal import sys +import threading import time from contextlib import contextmanager from datetime import datetime, timezone, timedelta +from functools import lru_cache from typing import Optional, Any import psycopg2 @@ -304,9 +306,87 @@ def _update_token_cache(r: dict) -> str: cur.execute(""" INSERT INTO tracksolid.api_token_cache (account, access_token, refresh_token, expires_at) VALUES (%s, %s, %s, %s) - ON CONFLICT (account) DO UPDATE SET - access_token=EXCLUDED.access_token, refresh_token=EXCLUDED.refresh_token, + ON CONFLICT (account) DO UPDATE SET + access_token=EXCLUDED.access_token, refresh_token=EXCLUDED.refresh_token, expires_at=EXCLUDED.expires_at, obtained_at=NOW() """, (USER_ID, token, r.get("refreshToken"), expires_at)) conn.commit() - return token \ No newline at end of file + return token + +# ── Reverse Geocoding (Nominatim) ──────────────────────────────────────────── +# Best-effort lookup used by poll_trips() to populate trips.start_address / +# end_address. Must NEVER raise — failure returns None and the trip insert +# proceeds without the address. + +_NOMINATIM_URL = os.getenv( + "NOMINATIM_URL", + "https://nominatim.openstreetmap.org/reverse", +) +_NOMINATIM_USER_AGENT = os.getenv( + "NOMINATIM_USER_AGENT", + "fireside-tracksolid/1.0 (kianiadee@gmail.com)", +) +_GEOCODE_LOCK = threading.Lock() +_GEOCODE_LAST_CALL_AT: float = 0.0 +_GEOCODE_MIN_INTERVAL_S: float = 1.0 # Nominatim TOS — 1 req/sec absolute max + + +def _geocode_throttle() -> None: + """Sleep just long enough since the previous call to honour 1 req/sec.""" + global _GEOCODE_LAST_CALL_AT + with _GEOCODE_LOCK: + elapsed = time.monotonic() - _GEOCODE_LAST_CALL_AT + if elapsed < _GEOCODE_MIN_INTERVAL_S: + time.sleep(_GEOCODE_MIN_INTERVAL_S - elapsed) + _GEOCODE_LAST_CALL_AT = time.monotonic() + + +@lru_cache(maxsize=2048) +def _reverse_geocode_cached(lat_round: float, lng_round: float) -> Optional[str]: + """Cached HTTP call. Key is lat/lng rounded to 4 dp (~11 m precision).""" + _geocode_throttle() + try: + r = _session.get( + _NOMINATIM_URL, + params={ + "lat": lat_round, + "lon": lng_round, + "format": "json", + "zoom": 18, + "addressdetails": 0, + }, + headers={"User-Agent": _NOMINATIM_USER_AGENT}, + timeout=10, + ) + r.raise_for_status() + data = r.json() + addr = data.get("display_name") + if addr: + return addr.strip() + return None + except (requests.RequestException, ValueError) as e: + _log.warning("reverse_geocode failed lat=%s lng=%s: %s", + lat_round, lng_round, e) + return None + + +def reverse_geocode(lat: Any, lng: Any) -> Optional[str]: + """ + Reverse-geocode a coordinate to a human-readable address via Nominatim. + + Best-effort. Never raises. Returns None on: + • missing / invalid lat or lng + • HTTP/timeout/JSON failure + • Nominatim returns no display_name + + Cached on lat/lng rounded to 4 decimal places (~11 m), which keeps + repeated visits to the same depot/site from re-querying. + """ + flat, flng = clean_num(lat), clean_num(lng) + if flat is None or flng is None: + return None + if flat == 0.0 and flng == 0.0: + return None + if not (-90 <= flat <= 90 and -180 <= flng <= 180): + return None + return _reverse_geocode_cached(round(flat, 4), round(flng, 4)) \ No newline at end of file