From 45974b381011f00b5f336441b399f3328b15acb0 Mon Sep 17 00:00:00 2001 From: kianiadee Date: Sat, 23 May 2026 23:06:25 +0300 Subject: [PATCH] Geocoder: cron job + state.geocoded_positions; label uses device_name last 4 (fallback plate); popup address row --- app/config.py | 10 ++ app/entrypoints/cron.py | 16 +- app/workers/geocoder.py | 108 ++++++++++++ .../20260601000010_geocoded_positions.sql | 18 ++ .../20260601000011_serve_fn_live_view_v3.sql | 163 ++++++++++++++++++ web/fleet-core.js | 2 + web/index-live.html | 3 +- 7 files changed, 318 insertions(+), 2 deletions(-) create mode 100644 app/workers/geocoder.py create mode 100644 db/migrations/20260601000010_geocoded_positions.sql create mode 100644 db/migrations/20260601000011_serve_fn_live_view_v3.sql diff --git a/app/config.py b/app/config.py index 3170708..149c20b 100644 --- a/app/config.py +++ b/app/config.py @@ -33,6 +33,16 @@ class Settings(BaseSettings): ntfy_topic: str = Field(default="fleet-slo-breach", alias="NTFY_TOPIC") ntfy_token: str = Field(default="", alias="NTFY_TOKEN") + nominatim_base_url: str = Field( + default="https://nominatim.openstreetmap.org", alias="NOMINATIM_BASE_URL" + ) + nominatim_user_agent: str = Field( + default="fleet-platform/0.1 (admin@rahamafresh.com)", alias="NOMINATIM_USER_AGENT" + ) + geocoder_max_per_tick: int = Field(default=8, alias="GEOCODER_MAX_PER_TICK") + geocoder_tick_sec: int = Field(default=30, alias="GEOCODER_TICK_SEC") + geocoder_rate_limit_sec: float = Field(default=1.1, alias="GEOCODER_RATE_LIMIT_SEC") + app_mode: Literal["dev", "prod"] = Field(default="prod", alias="APP_MODE") app_role: Literal["gateway", "worker", "cron"] = Field(default="gateway", alias="APP_ROLE") app_log_level: str = Field(default="INFO", alias="APP_LOG_LEVEL") diff --git a/app/entrypoints/cron.py b/app/entrypoints/cron.py index c2ebb77..71a5683 100644 --- a/app/entrypoints/cron.py +++ b/app/entrypoints/cron.py @@ -22,7 +22,7 @@ from app.db import close_pool, get_pool from app.health import router as health_router from app.logging_setup import configure_logging from app.tracksolid.client import TracksolidClient -from app.workers import poller +from app.workers import geocoder, poller log = structlog.get_logger("cron") @@ -81,6 +81,20 @@ async def lifespan(_: FastAPI) -> AsyncIterator[None]: else: log.warning("cron.tracksolid_jobs_skipped_missing_creds") + # Reverse-geocoder (PRD F2.3, pulled forward so popup shows addresses). + async def _run_geocode() -> None: + await geocoder.geocode_pending(settings) + + scheduler.add_job( + _run_geocode, + trigger=IntervalTrigger(seconds=settings.geocoder_tick_sec), + id="geocode_pending", + max_instances=1, + coalesce=True, + misfire_grace_time=120, + ) + log.info("cron.geocoder_registered", tick_sec=settings.geocoder_tick_sec) + scheduler.start() log.info("cron.scheduler_started") diff --git a/app/workers/geocoder.py b/app/workers/geocoder.py new file mode 100644 index 0000000..250c671 --- /dev/null +++ b/app/workers/geocoder.py @@ -0,0 +1,108 @@ +"""Reverse geocoder worker — PRD F2.3 (pulled forward). + +Drains positions in `state.live_positions` that don't yet have a cached address +and fetches one from Nominatim. Keyed by lat/lng rounded to 4 decimals (~10m), +so parked vehicles cost one lookup ever; moving vehicles cost a handful per +minute. Rate-limited to comply with Nominatim's 1 req/sec policy. +""" + +import asyncio +from typing import Any + +import httpx +import structlog + +from app.config import Settings +from app.db import get_pool + +log = structlog.get_logger("worker.geocoder") + + +def _extract_short_address(payload: dict[str, Any]) -> str | None: + """Build the screenshot-style 'Kiambu-Limuru Rd., Limuru' string.""" + addr = payload.get("address") or {} + if not isinstance(addr, dict): + return None + road = addr.get("road") or addr.get("pedestrian") or addr.get("residential") + place = ( + addr.get("city") + or addr.get("town") + or addr.get("village") + or addr.get("suburb") + or addr.get("municipality") + or addr.get("county") + ) + parts = [p for p in (road, place) if p] + return ", ".join(parts) if parts else None + + +async def geocode_pending(settings: Settings) -> None: + pool = await get_pool() + async with pool.connection() as conn, conn.cursor() as cur: + await cur.execute( + """ + SELECT DISTINCT + round(ST_Y(lp.geom)::numeric, 4) AS lat, + round(ST_X(lp.geom)::numeric, 4) AS lng + FROM state.live_positions lp + WHERE NOT EXISTS ( + SELECT 1 FROM state.geocoded_positions g + WHERE g.lat_rounded = round(ST_Y(lp.geom)::numeric, 4) + AND g.lng_rounded = round(ST_X(lp.geom)::numeric, 4) + ) + LIMIT %s + """, + (settings.geocoder_max_per_tick,), + ) + pending = list(await cur.fetchall()) + + if not pending: + return + + log.info("geocoder.batch_start", count=len(pending)) + async with httpx.AsyncClient( + base_url=settings.nominatim_base_url, + headers={"User-Agent": settings.nominatim_user_agent}, + timeout=httpx.Timeout(20.0), + ) as http: + for lat, lng in pending: + await asyncio.sleep(settings.geocoder_rate_limit_sec) + try: + r = await http.get( + "/reverse", + params={ + "lat": str(lat), + "lon": str(lng), + "format": "json", + "zoom": "17", + "addressdetails": "1", + }, + ) + r.raise_for_status() + data = r.json() + except Exception: + log.exception("geocoder.fetch_failed", lat=float(lat), lng=float(lng)) + continue + + address = data.get("display_name") + short = _extract_short_address(data) + + async with pool.connection() as conn, conn.cursor() as cur: + await cur.execute( + """ + INSERT INTO state.geocoded_positions + (lat_rounded, lng_rounded, address, address_short, source) + VALUES (%s, %s, %s, %s, %s) + ON CONFLICT (lat_rounded, lng_rounded) DO UPDATE + SET address = EXCLUDED.address, + address_short = EXCLUDED.address_short, + fetched_at = now() + """, + (lat, lng, address, short, "nominatim"), + ) + log.info( + "geocoder.cached", + lat=float(lat), + lng=float(lng), + address_short=short, + ) diff --git a/db/migrations/20260601000010_geocoded_positions.sql b/db/migrations/20260601000010_geocoded_positions.sql new file mode 100644 index 0000000..8d8765f --- /dev/null +++ b/db/migrations/20260601000010_geocoded_positions.sql @@ -0,0 +1,18 @@ +-- migrate:up + +-- Cache layer for reverse geocoding. Keyed by lat/lng rounded to 4 decimals +-- (~10m grid), so parked vehicles share a row and only moving vehicles trigger +-- fresh lookups. Filled asynchronously by the geocoder cron job (PRD F2.3). +CREATE TABLE state.geocoded_positions ( + lat_rounded numeric(8,4) NOT NULL, + lng_rounded numeric(8,4) NOT NULL, + address text, + address_short text, + source text NOT NULL DEFAULT 'nominatim', + fetched_at timestamptz NOT NULL DEFAULT now(), + PRIMARY KEY (lat_rounded, lng_rounded) +); + +-- migrate:down + +DROP TABLE IF EXISTS state.geocoded_positions; diff --git a/db/migrations/20260601000011_serve_fn_live_view_v3.sql b/db/migrations/20260601000011_serve_fn_live_view_v3.sql new file mode 100644 index 0000000..bbc65a2 --- /dev/null +++ b/db/migrations/20260601000011_serve_fn_live_view_v3.sql @@ -0,0 +1,163 @@ +-- migrate:up + +DROP FUNCTION IF EXISTS serve.fn_live_view(jsonb); + +CREATE OR REPLACE FUNCTION serve.fn_live_view(filters jsonb) +RETURNS jsonb +LANGUAGE plpgsql STABLE +AS $$ +DECLARE + fresh_window interval := COALESCE((filters->>'fresh_window')::interval, interval '24 hours'); + offline_after interval := COALESCE((filters->>'offline_after')::interval, interval '5 minutes'); + move_speed_kmh numeric := COALESCE((filters->>'move_speed_kmh')::numeric, 5); + p_cost_centre text := filters->>'cost_centre'; + p_assigned_city text := filters->>'assigned_city'; + p_vehicle_numbers text[] := CASE + WHEN filters ? 'vehicle_numbers' + THEN ARRAY(SELECT jsonb_array_elements_text(filters->'vehicle_numbers')) + ELSE NULL + END; + result jsonb; +BEGIN + WITH candidates AS ( + SELECT + lp.imei, + lp.occurred_at, + lp.geom, + lp.speed_kmh, + lp.direction_deg, + lp.mc_type, + lp.current_mileage_km, + lp.gps_signal, + lp.satellites, + lp.device_name, + lp.pos_type, + d.device_type, + d.activation_at, + v.vehicle_id, + v.plate, + v.cost_centre, + v.assigned_city + FROM state.live_positions lp + JOIN domain.devices d ON d.imei = lp.imei + JOIN domain.vehicles v ON v.vehicle_id = d.vehicle_id + WHERE d.lifecycle = 'active' + AND (p_cost_centre IS NULL OR v.cost_centre = p_cost_centre) + AND (p_assigned_city IS NULL OR v.assigned_city = p_assigned_city) + AND (p_vehicle_numbers IS NULL OR v.plate = ANY (p_vehicle_numbers)) + ), + ranked AS ( + SELECT + c.*, + ROW_NUMBER() OVER ( + PARTITION BY c.vehicle_id + ORDER BY + CASE c.device_type WHEN 'tracker' THEN 0 ELSE 1 END, + CASE WHEN c.occurred_at > now() - fresh_window THEN 0 ELSE 1 END, + c.occurred_at DESC, + c.activation_at DESC NULLS LAST + ) AS rn + FROM candidates c + ), + deduped AS (SELECT * FROM ranked WHERE rn = 1), + enriched AS ( + SELECT + d.*, + CASE + WHEN d.occurred_at <= now() - offline_after THEN 'offline' + WHEN d.speed_kmh IS NOT NULL AND d.speed_kmh > move_speed_kmh THEN 'moving' + ELSE 'parked' + END AS operational_state, + serve._cost_centre_color(d.cost_centre) AS cost_centre_color, + EXTRACT(EPOCH FROM (now() - d.occurred_at))::int AS age_sec, + round(ST_Y(d.geom)::numeric, 4) AS lat_rounded, + round(ST_X(d.geom)::numeric, 4) AS lng_rounded + FROM deduped d + ), + with_addr AS ( + SELECT e.*, g.address, g.address_short + FROM enriched e + LEFT JOIN state.geocoded_positions g + ON g.lat_rounded = e.lat_rounded + AND g.lng_rounded = e.lng_rounded + ), + summary AS ( + SELECT jsonb_build_object( + 'total_active', count(*), + 'moving', count(*) FILTER (WHERE operational_state = 'moving'), + 'parked', count(*) FILTER (WHERE operational_state = 'parked'), + 'offline', count(*) FILTER (WHERE operational_state = 'offline'), + 'below_freshness_slo', count(*) FILTER ( + WHERE occurred_at <= now() - interval '90 seconds' + ), + 'as_of', to_char(now() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS"Z"') + ) AS s + FROM with_addr + ), + features AS ( + SELECT COALESCE(jsonb_agg( + jsonb_build_object( + 'type', 'Feature', + 'geometry', ST_AsGeoJSON(e.geom)::jsonb, + 'properties', jsonb_build_object( + 'vehicle_id', e.vehicle_id, + 'plate', e.plate, + -- label = last 4 of device_name (Tracksolid wire name, e.g. "JC400P-92732" → "2732") + -- fall back to last 4 of plate when device_name is null + 'plate_short', right(COALESCE(NULLIF(e.device_name, ''), e.plate), 4), + 'imei', e.imei, + 'device_type', e.device_type, + 'device_name', e.device_name, + 'mc_type', e.mc_type, + 'pos_type', e.pos_type, + 'cost_centre', e.cost_centre, + 'cost_centre_color', e.cost_centre_color, + 'assigned_city', e.assigned_city, + 'address', e.address, + 'address_short', e.address_short, + 'occurred_at', to_char(e.occurred_at AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS"Z"'), + 'age_sec', e.age_sec, + 'speed_kmh', e.speed_kmh, + 'heading_deg', e.direction_deg, + 'gps_signal', e.gps_signal, + 'satellites', e.satellites, + 'current_mileage_km', e.current_mileage_km, + 'operational_state', e.operational_state, + 'style_class', 'vehicle-' || e.operational_state, + 'marker_color', CASE WHEN e.operational_state = 'moving' + THEN e.cost_centre_color + ELSE '#9ca3af' END, + 'show_arrow', (e.operational_state = 'moving' AND e.direction_deg IS NOT NULL) + ) + ) + ), '[]'::jsonb) AS feats + FROM with_addr e + ), + slo_block AS ( + SELECT COALESCE(jsonb_object_agg( + metric, + jsonb_build_object( + 'threshold', threshold, + 'current', current_value, + 'status', status + ) + ), '{}'::jsonb) AS ss + FROM slo.v_current_status + ) + SELECT jsonb_build_object( + 'summary', (SELECT s FROM summary), + 'geojson', jsonb_build_object( + 'type', 'FeatureCollection', + 'features', (SELECT feats FROM features) + ), + 'slo_status', (SELECT ss FROM slo_block) + ) + INTO result; + + RETURN result; +END; +$$; + +-- migrate:down + +DROP FUNCTION IF EXISTS serve.fn_live_view(jsonb); diff --git a/web/fleet-core.js b/web/fleet-core.js index 62af671..8ead54b 100644 --- a/web/fleet-core.js +++ b/web/fleet-core.js @@ -234,6 +234,7 @@ function _popupHtml(props) { const tagLine = [props.cost_centre, props.assigned_city] .filter(Boolean).join(' · ').toLowerCase(); + const addressLine = props.address_short || props.address || null; const headingPart = (props.heading_deg != null) ? `heading ${Math.round(Number(props.heading_deg))}°` : null; @@ -258,6 +259,7 @@ function _popupHtml(props) { ${_esc(pillText)} ${tagLine ? `` : ''} + ${addressLine ? `` : ''} ${headingLine ? `` : ''} ${mileageLine ? `` : ''} diff --git a/web/index-live.html b/web/index-live.html index b31fa45..1bcf02f 100644 --- a/web/index-live.html +++ b/web/index-live.html @@ -63,7 +63,8 @@ .pill-parked { background: rgba(148,163,184,0.15); color: var(--muted); } .pill-offline { background: rgba(148,163,184,0.15); color: var(--muted); } .pill-unknown { background: rgba(148,163,184,0.15); color: var(--muted); } - .popup-meta { color: var(--muted); font-size: 12px; margin: 2px 0 8px; } + .popup-meta { color: var(--muted); font-size: 12px; margin: 2px 0 4px; } + .popup-address { color: var(--text); font-size: 13.5px; margin: 6px 0 8px; font-weight: 500; } .popup-row { color: #cbd5e1; font-size: 12.5px; margin: 4px 0; } form.filters { display: grid; gap: 8px; } form.filters input, form.filters select { width: 100%; background: #0b1220; color: var(--text); border: 1px solid #0b1220; border-radius: 4px; padding: 6px 8px; font-size: 12px; }