109 lines
3.7 KiB
Python
109 lines
3.7 KiB
Python
|
|
"""Reverse geocoder worker — PRD F2.3 (pulled forward).
|
||
|
|
|
||
|
|
Drains positions in `state.live_positions` that don't yet have a cached address
|
||
|
|
and fetches one from Nominatim. Keyed by lat/lng rounded to 4 decimals (~10m),
|
||
|
|
so parked vehicles cost one lookup ever; moving vehicles cost a handful per
|
||
|
|
minute. Rate-limited to comply with Nominatim's 1 req/sec policy.
|
||
|
|
"""
|
||
|
|
|
||
|
|
import asyncio
|
||
|
|
from typing import Any
|
||
|
|
|
||
|
|
import httpx
|
||
|
|
import structlog
|
||
|
|
|
||
|
|
from app.config import Settings
|
||
|
|
from app.db import get_pool
|
||
|
|
|
||
|
|
log = structlog.get_logger("worker.geocoder")
|
||
|
|
|
||
|
|
|
||
|
|
def _extract_short_address(payload: dict[str, Any]) -> str | None:
|
||
|
|
"""Build the screenshot-style 'Kiambu-Limuru Rd., Limuru' string."""
|
||
|
|
addr = payload.get("address") or {}
|
||
|
|
if not isinstance(addr, dict):
|
||
|
|
return None
|
||
|
|
road = addr.get("road") or addr.get("pedestrian") or addr.get("residential")
|
||
|
|
place = (
|
||
|
|
addr.get("city")
|
||
|
|
or addr.get("town")
|
||
|
|
or addr.get("village")
|
||
|
|
or addr.get("suburb")
|
||
|
|
or addr.get("municipality")
|
||
|
|
or addr.get("county")
|
||
|
|
)
|
||
|
|
parts = [p for p in (road, place) if p]
|
||
|
|
return ", ".join(parts) if parts else None
|
||
|
|
|
||
|
|
|
||
|
|
async def geocode_pending(settings: Settings) -> None:
|
||
|
|
pool = await get_pool()
|
||
|
|
async with pool.connection() as conn, conn.cursor() as cur:
|
||
|
|
await cur.execute(
|
||
|
|
"""
|
||
|
|
SELECT DISTINCT
|
||
|
|
round(ST_Y(lp.geom)::numeric, 4) AS lat,
|
||
|
|
round(ST_X(lp.geom)::numeric, 4) AS lng
|
||
|
|
FROM state.live_positions lp
|
||
|
|
WHERE NOT EXISTS (
|
||
|
|
SELECT 1 FROM state.geocoded_positions g
|
||
|
|
WHERE g.lat_rounded = round(ST_Y(lp.geom)::numeric, 4)
|
||
|
|
AND g.lng_rounded = round(ST_X(lp.geom)::numeric, 4)
|
||
|
|
)
|
||
|
|
LIMIT %s
|
||
|
|
""",
|
||
|
|
(settings.geocoder_max_per_tick,),
|
||
|
|
)
|
||
|
|
pending = list(await cur.fetchall())
|
||
|
|
|
||
|
|
if not pending:
|
||
|
|
return
|
||
|
|
|
||
|
|
log.info("geocoder.batch_start", count=len(pending))
|
||
|
|
async with httpx.AsyncClient(
|
||
|
|
base_url=settings.nominatim_base_url,
|
||
|
|
headers={"User-Agent": settings.nominatim_user_agent},
|
||
|
|
timeout=httpx.Timeout(20.0),
|
||
|
|
) as http:
|
||
|
|
for lat, lng in pending:
|
||
|
|
await asyncio.sleep(settings.geocoder_rate_limit_sec)
|
||
|
|
try:
|
||
|
|
r = await http.get(
|
||
|
|
"/reverse",
|
||
|
|
params={
|
||
|
|
"lat": str(lat),
|
||
|
|
"lon": str(lng),
|
||
|
|
"format": "json",
|
||
|
|
"zoom": "17",
|
||
|
|
"addressdetails": "1",
|
||
|
|
},
|
||
|
|
)
|
||
|
|
r.raise_for_status()
|
||
|
|
data = r.json()
|
||
|
|
except Exception:
|
||
|
|
log.exception("geocoder.fetch_failed", lat=float(lat), lng=float(lng))
|
||
|
|
continue
|
||
|
|
|
||
|
|
address = data.get("display_name")
|
||
|
|
short = _extract_short_address(data)
|
||
|
|
|
||
|
|
async with pool.connection() as conn, conn.cursor() as cur:
|
||
|
|
await cur.execute(
|
||
|
|
"""
|
||
|
|
INSERT INTO state.geocoded_positions
|
||
|
|
(lat_rounded, lng_rounded, address, address_short, source)
|
||
|
|
VALUES (%s, %s, %s, %s, %s)
|
||
|
|
ON CONFLICT (lat_rounded, lng_rounded) DO UPDATE
|
||
|
|
SET address = EXCLUDED.address,
|
||
|
|
address_short = EXCLUDED.address_short,
|
||
|
|
fetched_at = now()
|
||
|
|
""",
|
||
|
|
(lat, lng, address, short, "nominatim"),
|
||
|
|
)
|
||
|
|
log.info(
|
||
|
|
"geocoder.cached",
|
||
|
|
lat=float(lat),
|
||
|
|
lng=float(lng),
|
||
|
|
address_short=short,
|
||
|
|
)
|