Geocoder: cron job + state.geocoded_positions; label uses device_name last 4 (fallback plate); popup address row
This commit is contained in:
parent
6c5ba3b22b
commit
45974b3810
7 changed files with 318 additions and 2 deletions
|
|
@ -33,6 +33,16 @@ class Settings(BaseSettings):
|
|||
ntfy_topic: str = Field(default="fleet-slo-breach", alias="NTFY_TOPIC")
|
||||
ntfy_token: str = Field(default="", alias="NTFY_TOKEN")
|
||||
|
||||
nominatim_base_url: str = Field(
|
||||
default="https://nominatim.openstreetmap.org", alias="NOMINATIM_BASE_URL"
|
||||
)
|
||||
nominatim_user_agent: str = Field(
|
||||
default="fleet-platform/0.1 (admin@rahamafresh.com)", alias="NOMINATIM_USER_AGENT"
|
||||
)
|
||||
geocoder_max_per_tick: int = Field(default=8, alias="GEOCODER_MAX_PER_TICK")
|
||||
geocoder_tick_sec: int = Field(default=30, alias="GEOCODER_TICK_SEC")
|
||||
geocoder_rate_limit_sec: float = Field(default=1.1, alias="GEOCODER_RATE_LIMIT_SEC")
|
||||
|
||||
app_mode: Literal["dev", "prod"] = Field(default="prod", alias="APP_MODE")
|
||||
app_role: Literal["gateway", "worker", "cron"] = Field(default="gateway", alias="APP_ROLE")
|
||||
app_log_level: str = Field(default="INFO", alias="APP_LOG_LEVEL")
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ from app.db import close_pool, get_pool
|
|||
from app.health import router as health_router
|
||||
from app.logging_setup import configure_logging
|
||||
from app.tracksolid.client import TracksolidClient
|
||||
from app.workers import poller
|
||||
from app.workers import geocoder, poller
|
||||
|
||||
log = structlog.get_logger("cron")
|
||||
|
||||
|
|
@ -81,6 +81,20 @@ async def lifespan(_: FastAPI) -> AsyncIterator[None]:
|
|||
else:
|
||||
log.warning("cron.tracksolid_jobs_skipped_missing_creds")
|
||||
|
||||
# Reverse-geocoder (PRD F2.3, pulled forward so popup shows addresses).
|
||||
async def _run_geocode() -> None:
|
||||
await geocoder.geocode_pending(settings)
|
||||
|
||||
scheduler.add_job(
|
||||
_run_geocode,
|
||||
trigger=IntervalTrigger(seconds=settings.geocoder_tick_sec),
|
||||
id="geocode_pending",
|
||||
max_instances=1,
|
||||
coalesce=True,
|
||||
misfire_grace_time=120,
|
||||
)
|
||||
log.info("cron.geocoder_registered", tick_sec=settings.geocoder_tick_sec)
|
||||
|
||||
scheduler.start()
|
||||
log.info("cron.scheduler_started")
|
||||
|
||||
|
|
|
|||
108
app/workers/geocoder.py
Normal file
108
app/workers/geocoder.py
Normal file
|
|
@ -0,0 +1,108 @@
|
|||
"""Reverse geocoder worker — PRD F2.3 (pulled forward).
|
||||
|
||||
Drains positions in `state.live_positions` that don't yet have a cached address
|
||||
and fetches one from Nominatim. Keyed by lat/lng rounded to 4 decimals (~10m),
|
||||
so parked vehicles cost one lookup ever; moving vehicles cost a handful per
|
||||
minute. Rate-limited to comply with Nominatim's 1 req/sec policy.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
import structlog
|
||||
|
||||
from app.config import Settings
|
||||
from app.db import get_pool
|
||||
|
||||
log = structlog.get_logger("worker.geocoder")
|
||||
|
||||
|
||||
def _extract_short_address(payload: dict[str, Any]) -> str | None:
|
||||
"""Build the screenshot-style 'Kiambu-Limuru Rd., Limuru' string."""
|
||||
addr = payload.get("address") or {}
|
||||
if not isinstance(addr, dict):
|
||||
return None
|
||||
road = addr.get("road") or addr.get("pedestrian") or addr.get("residential")
|
||||
place = (
|
||||
addr.get("city")
|
||||
or addr.get("town")
|
||||
or addr.get("village")
|
||||
or addr.get("suburb")
|
||||
or addr.get("municipality")
|
||||
or addr.get("county")
|
||||
)
|
||||
parts = [p for p in (road, place) if p]
|
||||
return ", ".join(parts) if parts else None
|
||||
|
||||
|
||||
async def geocode_pending(settings: Settings) -> None:
|
||||
pool = await get_pool()
|
||||
async with pool.connection() as conn, conn.cursor() as cur:
|
||||
await cur.execute(
|
||||
"""
|
||||
SELECT DISTINCT
|
||||
round(ST_Y(lp.geom)::numeric, 4) AS lat,
|
||||
round(ST_X(lp.geom)::numeric, 4) AS lng
|
||||
FROM state.live_positions lp
|
||||
WHERE NOT EXISTS (
|
||||
SELECT 1 FROM state.geocoded_positions g
|
||||
WHERE g.lat_rounded = round(ST_Y(lp.geom)::numeric, 4)
|
||||
AND g.lng_rounded = round(ST_X(lp.geom)::numeric, 4)
|
||||
)
|
||||
LIMIT %s
|
||||
""",
|
||||
(settings.geocoder_max_per_tick,),
|
||||
)
|
||||
pending = list(await cur.fetchall())
|
||||
|
||||
if not pending:
|
||||
return
|
||||
|
||||
log.info("geocoder.batch_start", count=len(pending))
|
||||
async with httpx.AsyncClient(
|
||||
base_url=settings.nominatim_base_url,
|
||||
headers={"User-Agent": settings.nominatim_user_agent},
|
||||
timeout=httpx.Timeout(20.0),
|
||||
) as http:
|
||||
for lat, lng in pending:
|
||||
await asyncio.sleep(settings.geocoder_rate_limit_sec)
|
||||
try:
|
||||
r = await http.get(
|
||||
"/reverse",
|
||||
params={
|
||||
"lat": str(lat),
|
||||
"lon": str(lng),
|
||||
"format": "json",
|
||||
"zoom": "17",
|
||||
"addressdetails": "1",
|
||||
},
|
||||
)
|
||||
r.raise_for_status()
|
||||
data = r.json()
|
||||
except Exception:
|
||||
log.exception("geocoder.fetch_failed", lat=float(lat), lng=float(lng))
|
||||
continue
|
||||
|
||||
address = data.get("display_name")
|
||||
short = _extract_short_address(data)
|
||||
|
||||
async with pool.connection() as conn, conn.cursor() as cur:
|
||||
await cur.execute(
|
||||
"""
|
||||
INSERT INTO state.geocoded_positions
|
||||
(lat_rounded, lng_rounded, address, address_short, source)
|
||||
VALUES (%s, %s, %s, %s, %s)
|
||||
ON CONFLICT (lat_rounded, lng_rounded) DO UPDATE
|
||||
SET address = EXCLUDED.address,
|
||||
address_short = EXCLUDED.address_short,
|
||||
fetched_at = now()
|
||||
""",
|
||||
(lat, lng, address, short, "nominatim"),
|
||||
)
|
||||
log.info(
|
||||
"geocoder.cached",
|
||||
lat=float(lat),
|
||||
lng=float(lng),
|
||||
address_short=short,
|
||||
)
|
||||
18
db/migrations/20260601000010_geocoded_positions.sql
Normal file
18
db/migrations/20260601000010_geocoded_positions.sql
Normal file
|
|
@ -0,0 +1,18 @@
|
|||
-- migrate:up
|
||||
|
||||
-- Cache layer for reverse geocoding. Keyed by lat/lng rounded to 4 decimals
|
||||
-- (~10m grid), so parked vehicles share a row and only moving vehicles trigger
|
||||
-- fresh lookups. Filled asynchronously by the geocoder cron job (PRD F2.3).
|
||||
CREATE TABLE state.geocoded_positions (
|
||||
lat_rounded numeric(8,4) NOT NULL,
|
||||
lng_rounded numeric(8,4) NOT NULL,
|
||||
address text,
|
||||
address_short text,
|
||||
source text NOT NULL DEFAULT 'nominatim',
|
||||
fetched_at timestamptz NOT NULL DEFAULT now(),
|
||||
PRIMARY KEY (lat_rounded, lng_rounded)
|
||||
);
|
||||
|
||||
-- migrate:down
|
||||
|
||||
DROP TABLE IF EXISTS state.geocoded_positions;
|
||||
163
db/migrations/20260601000011_serve_fn_live_view_v3.sql
Normal file
163
db/migrations/20260601000011_serve_fn_live_view_v3.sql
Normal file
|
|
@ -0,0 +1,163 @@
|
|||
-- migrate:up
|
||||
|
||||
DROP FUNCTION IF EXISTS serve.fn_live_view(jsonb);
|
||||
|
||||
CREATE OR REPLACE FUNCTION serve.fn_live_view(filters jsonb)
|
||||
RETURNS jsonb
|
||||
LANGUAGE plpgsql STABLE
|
||||
AS $$
|
||||
DECLARE
|
||||
fresh_window interval := COALESCE((filters->>'fresh_window')::interval, interval '24 hours');
|
||||
offline_after interval := COALESCE((filters->>'offline_after')::interval, interval '5 minutes');
|
||||
move_speed_kmh numeric := COALESCE((filters->>'move_speed_kmh')::numeric, 5);
|
||||
p_cost_centre text := filters->>'cost_centre';
|
||||
p_assigned_city text := filters->>'assigned_city';
|
||||
p_vehicle_numbers text[] := CASE
|
||||
WHEN filters ? 'vehicle_numbers'
|
||||
THEN ARRAY(SELECT jsonb_array_elements_text(filters->'vehicle_numbers'))
|
||||
ELSE NULL
|
||||
END;
|
||||
result jsonb;
|
||||
BEGIN
|
||||
WITH candidates AS (
|
||||
SELECT
|
||||
lp.imei,
|
||||
lp.occurred_at,
|
||||
lp.geom,
|
||||
lp.speed_kmh,
|
||||
lp.direction_deg,
|
||||
lp.mc_type,
|
||||
lp.current_mileage_km,
|
||||
lp.gps_signal,
|
||||
lp.satellites,
|
||||
lp.device_name,
|
||||
lp.pos_type,
|
||||
d.device_type,
|
||||
d.activation_at,
|
||||
v.vehicle_id,
|
||||
v.plate,
|
||||
v.cost_centre,
|
||||
v.assigned_city
|
||||
FROM state.live_positions lp
|
||||
JOIN domain.devices d ON d.imei = lp.imei
|
||||
JOIN domain.vehicles v ON v.vehicle_id = d.vehicle_id
|
||||
WHERE d.lifecycle = 'active'
|
||||
AND (p_cost_centre IS NULL OR v.cost_centre = p_cost_centre)
|
||||
AND (p_assigned_city IS NULL OR v.assigned_city = p_assigned_city)
|
||||
AND (p_vehicle_numbers IS NULL OR v.plate = ANY (p_vehicle_numbers))
|
||||
),
|
||||
ranked AS (
|
||||
SELECT
|
||||
c.*,
|
||||
ROW_NUMBER() OVER (
|
||||
PARTITION BY c.vehicle_id
|
||||
ORDER BY
|
||||
CASE c.device_type WHEN 'tracker' THEN 0 ELSE 1 END,
|
||||
CASE WHEN c.occurred_at > now() - fresh_window THEN 0 ELSE 1 END,
|
||||
c.occurred_at DESC,
|
||||
c.activation_at DESC NULLS LAST
|
||||
) AS rn
|
||||
FROM candidates c
|
||||
),
|
||||
deduped AS (SELECT * FROM ranked WHERE rn = 1),
|
||||
enriched AS (
|
||||
SELECT
|
||||
d.*,
|
||||
CASE
|
||||
WHEN d.occurred_at <= now() - offline_after THEN 'offline'
|
||||
WHEN d.speed_kmh IS NOT NULL AND d.speed_kmh > move_speed_kmh THEN 'moving'
|
||||
ELSE 'parked'
|
||||
END AS operational_state,
|
||||
serve._cost_centre_color(d.cost_centre) AS cost_centre_color,
|
||||
EXTRACT(EPOCH FROM (now() - d.occurred_at))::int AS age_sec,
|
||||
round(ST_Y(d.geom)::numeric, 4) AS lat_rounded,
|
||||
round(ST_X(d.geom)::numeric, 4) AS lng_rounded
|
||||
FROM deduped d
|
||||
),
|
||||
with_addr AS (
|
||||
SELECT e.*, g.address, g.address_short
|
||||
FROM enriched e
|
||||
LEFT JOIN state.geocoded_positions g
|
||||
ON g.lat_rounded = e.lat_rounded
|
||||
AND g.lng_rounded = e.lng_rounded
|
||||
),
|
||||
summary AS (
|
||||
SELECT jsonb_build_object(
|
||||
'total_active', count(*),
|
||||
'moving', count(*) FILTER (WHERE operational_state = 'moving'),
|
||||
'parked', count(*) FILTER (WHERE operational_state = 'parked'),
|
||||
'offline', count(*) FILTER (WHERE operational_state = 'offline'),
|
||||
'below_freshness_slo', count(*) FILTER (
|
||||
WHERE occurred_at <= now() - interval '90 seconds'
|
||||
),
|
||||
'as_of', to_char(now() AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS"Z"')
|
||||
) AS s
|
||||
FROM with_addr
|
||||
),
|
||||
features AS (
|
||||
SELECT COALESCE(jsonb_agg(
|
||||
jsonb_build_object(
|
||||
'type', 'Feature',
|
||||
'geometry', ST_AsGeoJSON(e.geom)::jsonb,
|
||||
'properties', jsonb_build_object(
|
||||
'vehicle_id', e.vehicle_id,
|
||||
'plate', e.plate,
|
||||
-- label = last 4 of device_name (Tracksolid wire name, e.g. "JC400P-92732" → "2732")
|
||||
-- fall back to last 4 of plate when device_name is null
|
||||
'plate_short', right(COALESCE(NULLIF(e.device_name, ''), e.plate), 4),
|
||||
'imei', e.imei,
|
||||
'device_type', e.device_type,
|
||||
'device_name', e.device_name,
|
||||
'mc_type', e.mc_type,
|
||||
'pos_type', e.pos_type,
|
||||
'cost_centre', e.cost_centre,
|
||||
'cost_centre_color', e.cost_centre_color,
|
||||
'assigned_city', e.assigned_city,
|
||||
'address', e.address,
|
||||
'address_short', e.address_short,
|
||||
'occurred_at', to_char(e.occurred_at AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS"Z"'),
|
||||
'age_sec', e.age_sec,
|
||||
'speed_kmh', e.speed_kmh,
|
||||
'heading_deg', e.direction_deg,
|
||||
'gps_signal', e.gps_signal,
|
||||
'satellites', e.satellites,
|
||||
'current_mileage_km', e.current_mileage_km,
|
||||
'operational_state', e.operational_state,
|
||||
'style_class', 'vehicle-' || e.operational_state,
|
||||
'marker_color', CASE WHEN e.operational_state = 'moving'
|
||||
THEN e.cost_centre_color
|
||||
ELSE '#9ca3af' END,
|
||||
'show_arrow', (e.operational_state = 'moving' AND e.direction_deg IS NOT NULL)
|
||||
)
|
||||
)
|
||||
), '[]'::jsonb) AS feats
|
||||
FROM with_addr e
|
||||
),
|
||||
slo_block AS (
|
||||
SELECT COALESCE(jsonb_object_agg(
|
||||
metric,
|
||||
jsonb_build_object(
|
||||
'threshold', threshold,
|
||||
'current', current_value,
|
||||
'status', status
|
||||
)
|
||||
), '{}'::jsonb) AS ss
|
||||
FROM slo.v_current_status
|
||||
)
|
||||
SELECT jsonb_build_object(
|
||||
'summary', (SELECT s FROM summary),
|
||||
'geojson', jsonb_build_object(
|
||||
'type', 'FeatureCollection',
|
||||
'features', (SELECT feats FROM features)
|
||||
),
|
||||
'slo_status', (SELECT ss FROM slo_block)
|
||||
)
|
||||
INTO result;
|
||||
|
||||
RETURN result;
|
||||
END;
|
||||
$$;
|
||||
|
||||
-- migrate:down
|
||||
|
||||
DROP FUNCTION IF EXISTS serve.fn_live_view(jsonb);
|
||||
|
|
@ -234,6 +234,7 @@ function _popupHtml(props) {
|
|||
|
||||
const tagLine = [props.cost_centre, props.assigned_city]
|
||||
.filter(Boolean).join(' · ').toLowerCase();
|
||||
const addressLine = props.address_short || props.address || null;
|
||||
|
||||
const headingPart = (props.heading_deg != null)
|
||||
? `heading ${Math.round(Number(props.heading_deg))}°` : null;
|
||||
|
|
@ -258,6 +259,7 @@ function _popupHtml(props) {
|
|||
<span class="popup-pill pill-${_esc(state)}">${_esc(pillText)}</span>
|
||||
</div>
|
||||
${tagLine ? `<div class="popup-meta">${_esc(tagLine)}</div>` : ''}
|
||||
${addressLine ? `<div class="popup-address">${_esc(addressLine)}</div>` : ''}
|
||||
${headingLine ? `<div class="popup-row">${_esc(headingLine)}</div>` : ''}
|
||||
${mileageLine ? `<div class="popup-row">${_esc(mileageLine)}</div>` : ''}
|
||||
<div class="popup-row">${_esc(ageLine)}</div>
|
||||
|
|
|
|||
|
|
@ -63,7 +63,8 @@
|
|||
.pill-parked { background: rgba(148,163,184,0.15); color: var(--muted); }
|
||||
.pill-offline { background: rgba(148,163,184,0.15); color: var(--muted); }
|
||||
.pill-unknown { background: rgba(148,163,184,0.15); color: var(--muted); }
|
||||
.popup-meta { color: var(--muted); font-size: 12px; margin: 2px 0 8px; }
|
||||
.popup-meta { color: var(--muted); font-size: 12px; margin: 2px 0 4px; }
|
||||
.popup-address { color: var(--text); font-size: 13.5px; margin: 6px 0 8px; font-weight: 500; }
|
||||
.popup-row { color: #cbd5e1; font-size: 12.5px; margin: 4px 0; }
|
||||
form.filters { display: grid; gap: 8px; }
|
||||
form.filters input, form.filters select { width: 100%; background: #0b1220; color: var(--text); border: 1px solid #0b1220; border-radius: 4px; padding: 6px 8px; font-size: 12px; }
|
||||
|
|
|
|||
Loading…
Reference in a new issue