fleet-platform/app/workers/geocoder.py

139 lines
4.9 KiB
Python
Raw Permalink Normal View History

"""Reverse geocoder worker — PRD F2.3 (pulled forward).
Drains positions in `state.live_positions` that don't yet have a cached address
and fetches one from Nominatim. Keyed by lat/lng rounded to 4 decimals (~10m),
so parked vehicles cost one lookup ever; moving vehicles cost a handful per
minute. Rate-limited to comply with Nominatim's 1 req/sec policy.
"""
import asyncio
import time
from typing import Any
import httpx
import structlog
from app.config import Settings
from app.db import get_pool
log = structlog.get_logger("worker.geocoder")
# Circuit breaker: when Nominatim fails repeatedly (down or rate-limiting us),
# trip open and skip ticks for a cooldown rather than grinding through every
# batch 1 req/sec only to fail. Module-level so the open state survives across
# ticks; monotonic clock so it's immune to wall-clock jumps.
class _Breaker:
open_until: float = 0.0
_breaker = _Breaker()
def _extract_short_address(payload: dict[str, Any]) -> str | None:
"""Build the screenshot-style 'Kiambu-Limuru Rd., Limuru' string."""
addr = payload.get("address") or {}
if not isinstance(addr, dict):
return None
road = addr.get("road") or addr.get("pedestrian") or addr.get("residential")
place = (
addr.get("city")
or addr.get("town")
or addr.get("village")
or addr.get("suburb")
or addr.get("municipality")
or addr.get("county")
)
parts = [p for p in (road, place) if p]
return ", ".join(parts) if parts else None
async def geocode_pending(settings: Settings) -> None:
now = time.monotonic()
if now < _breaker.open_until:
log.info("geocoder.breaker_open", reopens_in_sec=round(_breaker.open_until - now, 1))
return
pool = await get_pool()
async with pool.connection() as conn, conn.cursor() as cur:
await cur.execute(
"""
SELECT DISTINCT
round(ST_Y(lp.geom)::numeric, 4) AS lat,
round(ST_X(lp.geom)::numeric, 4) AS lng
FROM state.live_positions lp
WHERE NOT EXISTS (
SELECT 1 FROM state.geocoded_positions g
WHERE g.lat_rounded = round(ST_Y(lp.geom)::numeric, 4)
AND g.lng_rounded = round(ST_X(lp.geom)::numeric, 4)
)
LIMIT %s
""",
(settings.geocoder_max_per_tick,),
)
pending = list(await cur.fetchall())
if not pending:
return
log.info("geocoder.batch_start", count=len(pending))
async with httpx.AsyncClient(
base_url=settings.nominatim_base_url,
headers={"User-Agent": settings.nominatim_user_agent},
timeout=httpx.Timeout(20.0),
) as http:
consecutive_failures = 0
for lat, lng in pending:
await asyncio.sleep(settings.geocoder_rate_limit_sec)
try:
r = await http.get(
"/reverse",
params={
"lat": str(lat),
"lon": str(lng),
"format": "json",
"zoom": "17",
"addressdetails": "1",
},
)
r.raise_for_status()
data = r.json()
except Exception:
log.exception("geocoder.fetch_failed", lat=float(lat), lng=float(lng))
consecutive_failures += 1
if consecutive_failures >= settings.geocoder_breaker_threshold:
_breaker.open_until = (
time.monotonic() + settings.geocoder_breaker_cooldown_sec
)
log.warning(
"geocoder.breaker_tripped",
consecutive_failures=consecutive_failures,
cooldown_sec=settings.geocoder_breaker_cooldown_sec,
)
break
continue
consecutive_failures = 0
address = data.get("display_name")
short = _extract_short_address(data)
async with pool.connection() as conn, conn.cursor() as cur:
await cur.execute(
"""
INSERT INTO state.geocoded_positions
(lat_rounded, lng_rounded, address, address_short, source)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (lat_rounded, lng_rounded) DO UPDATE
SET address = EXCLUDED.address,
address_short = EXCLUDED.address_short,
fetched_at = now()
""",
(lat, lng, address, short, "nominatim"),
)
log.info(
"geocoder.cached",
lat=float(lat),
lng=float(lng),
address_short=short,
)