Compare commits

..

No commits in common. "3bd9ee07cd4446321142aa7b21732705582a6a3c" and "34afe60927f5f9013d855d3edd1e64e36585ec75" have entirely different histories.

8 changed files with 9 additions and 787 deletions

View file

@ -42,10 +42,6 @@ class Settings(BaseSettings):
geocoder_max_per_tick: int = Field(default=8, alias="GEOCODER_MAX_PER_TICK") geocoder_max_per_tick: int = Field(default=8, alias="GEOCODER_MAX_PER_TICK")
geocoder_tick_sec: int = Field(default=30, alias="GEOCODER_TICK_SEC") geocoder_tick_sec: int = Field(default=30, alias="GEOCODER_TICK_SEC")
geocoder_rate_limit_sec: float = Field(default=1.1, alias="GEOCODER_RATE_LIMIT_SEC") geocoder_rate_limit_sec: float = Field(default=1.1, alias="GEOCODER_RATE_LIMIT_SEC")
geocoder_breaker_threshold: int = Field(default=3, alias="GEOCODER_BREAKER_THRESHOLD")
geocoder_breaker_cooldown_sec: float = Field(
default=300.0, alias="GEOCODER_BREAKER_COOLDOWN_SEC"
)
app_mode: Literal["dev", "prod"] = Field(default="prod", alias="APP_MODE") app_mode: Literal["dev", "prod"] = Field(default="prod", alias="APP_MODE")
app_role: Literal["gateway", "worker", "cron"] = Field(default="gateway", alias="APP_ROLE") app_role: Literal["gateway", "worker", "cron"] = Field(default="gateway", alias="APP_ROLE")

View file

@ -7,7 +7,6 @@ minute. Rate-limited to comply with Nominatim's 1 req/sec policy.
""" """
import asyncio import asyncio
import time
from typing import Any from typing import Any
import httpx import httpx
@ -18,16 +17,6 @@ from app.db import get_pool
log = structlog.get_logger("worker.geocoder") log = structlog.get_logger("worker.geocoder")
# Circuit breaker: when Nominatim fails repeatedly (down or rate-limiting us),
# trip open and skip ticks for a cooldown rather than grinding through every
# batch 1 req/sec only to fail. Module-level so the open state survives across
# ticks; monotonic clock so it's immune to wall-clock jumps.
class _Breaker:
open_until: float = 0.0
_breaker = _Breaker()
def _extract_short_address(payload: dict[str, Any]) -> str | None: def _extract_short_address(payload: dict[str, Any]) -> str | None:
"""Build the screenshot-style 'Kiambu-Limuru Rd., Limuru' string.""" """Build the screenshot-style 'Kiambu-Limuru Rd., Limuru' string."""
@ -48,11 +37,6 @@ def _extract_short_address(payload: dict[str, Any]) -> str | None:
async def geocode_pending(settings: Settings) -> None: async def geocode_pending(settings: Settings) -> None:
now = time.monotonic()
if now < _breaker.open_until:
log.info("geocoder.breaker_open", reopens_in_sec=round(_breaker.open_until - now, 1))
return
pool = await get_pool() pool = await get_pool()
async with pool.connection() as conn, conn.cursor() as cur: async with pool.connection() as conn, conn.cursor() as cur:
await cur.execute( await cur.execute(
@ -81,7 +65,6 @@ async def geocode_pending(settings: Settings) -> None:
headers={"User-Agent": settings.nominatim_user_agent}, headers={"User-Agent": settings.nominatim_user_agent},
timeout=httpx.Timeout(20.0), timeout=httpx.Timeout(20.0),
) as http: ) as http:
consecutive_failures = 0
for lat, lng in pending: for lat, lng in pending:
await asyncio.sleep(settings.geocoder_rate_limit_sec) await asyncio.sleep(settings.geocoder_rate_limit_sec)
try: try:
@ -99,21 +82,8 @@ async def geocode_pending(settings: Settings) -> None:
data = r.json() data = r.json()
except Exception: except Exception:
log.exception("geocoder.fetch_failed", lat=float(lat), lng=float(lng)) log.exception("geocoder.fetch_failed", lat=float(lat), lng=float(lng))
consecutive_failures += 1
if consecutive_failures >= settings.geocoder_breaker_threshold:
_breaker.open_until = (
time.monotonic() + settings.geocoder_breaker_cooldown_sec
)
log.warning(
"geocoder.breaker_tripped",
consecutive_failures=consecutive_failures,
cooldown_sec=settings.geocoder_breaker_cooldown_sec,
)
break
continue continue
consecutive_failures = 0
address = data.get("display_name") address = data.get("display_name")
short = _extract_short_address(data) short = _extract_short_address(data)

View file

@ -1,407 +0,0 @@
-- migrate:up
--
-- Trip detection on demand from state.position_history.
--
-- v2 (accuracy fix): the source query now
-- (1) restricts to the vehicle's primary tracker IMEI (v_imei) instead of
-- every device, so a camera-paired vehicle's tracker + camera fixes no
-- longer interleave and zig-zag, and
-- (2) excludes low-accuracy LBS/WIFI fixes (pos_type IN ('LBS','WIFI')).
-- Those cell-tower/wifi fixes can land kilometres off; drawn as straight
-- ST_MakeLine segments they produced the "starburst" of spikes radiating from
-- a parked vehicle. NULL pos_type (push/crossfeed GPS) is kept — same rule the
-- serve.fn_live_view `low_accuracy` flag uses. Everything else is identical to
-- the prior definition.
--
-- Algorithm (state machine, single forward pass over the day's positions):
--
-- reporting_time := first occurred_at where acc_state = 1
--
-- A trip starts at:
-- - an ACC_ON transition (or the first row of the day if it's already ACC_ON)
-- A trip ends at:
-- - ACC_OFF + stationary (speed < 5 km/h) for >= 5 min → end_reason='work_stop'
-- - a fix-reporting silence of >= 5 min → end_reason='nofix_stop'
-- - a fix gap > 30 min → end_reason='long_gap'
-- - the end of the day's data → end_reason='day_end'
--
-- The nofix_stop rule matches legacy dispatcher semantics: when a polled
-- device goes silent for >=5 min mid-trip we assume the engine is off
-- (validated against legacy 638J full-day: 15 trips legacy = 15 trips here).
-- Pure stop-and-go traffic patterns still consolidate cleanly because each
-- stop produces explicit slow/stationary fixes, not silences.
--
-- Within a trip, an ACC_ON + stationary stretch of >= 5 min is recorded
-- as an idling segment (no trip split — engine still running, treated
-- as a customer-stop with engine on).
--
-- Distance accumulates per moving step (curr.speed_kmh >= 5 km/h);
-- GPS jitter at standstill is excluded.
--
-- Day totals (driving_min / idling_min / stopped_min / unknown_min) are
-- bucketed by the classification of the *previous* fix at each step.
--
-- Falls back to movement-only segmentation (has_acc_data=false in the
-- response) when every position for the day has acc_state IS NULL.
-- Drop any prior signature; the function identity is (name + arg types),
-- so a CREATE OR REPLACE with a different arg type would create a sibling.
DROP FUNCTION IF EXISTS serve.fn_vehicle_trips(integer, date);
DROP FUNCTION IF EXISTS serve.fn_vehicle_trips(bigint, date);
CREATE OR REPLACE FUNCTION serve.fn_vehicle_trips(
p_vehicle_id bigint,
p_date_eat date
) RETURNS jsonb
LANGUAGE plpgsql STABLE
AS $fn$
DECLARE
EAT_OFFSET interval := interval '3 hours';
STOP_THRESH interval := interval '5 minutes';
NOFIX_THRESH interval := interval '5 minutes';
GAP_THRESH interval := interval '30 minutes';
STAT_KMH numeric := 5;
day_start_utc timestamptz := (p_date_eat::timestamp - EAT_OFFSET) AT TIME ZONE 'UTC';
day_end_utc timestamptz := day_start_utc + interval '24 hours';
rec record;
v_plate text;
v_imei text;
reporting_time timestamptz;
has_acc bool := false;
-- trip-in-progress state
in_trip bool := false;
trip_started_at timestamptz;
trip_started_geom geometry;
trip_path geometry[] := ARRAY[]::geometry[];
trip_path_times timestamptz[] := ARRAY[]::timestamptz[];
trip_path_speeds numeric[] := ARRAY[]::numeric[];
trip_distance_m numeric := 0;
trip_idling_sec numeric := 0;
trip_stops jsonb := '[]'::jsonb;
-- mid-trip ACC_OFF run (might become a work stop)
off_run_start timestamptz;
off_run_geom geometry;
-- mid-trip idle (ACC_ON + stationary) run
idle_run_start timestamptz;
idle_run_geom geometry;
prev_at timestamptz;
prev_geom geometry;
prev_state text; -- 'moving' | 'idling' | 'stopped' | 'unknown'
trips_out jsonb := '[]'::jsonb;
n_trips int := 0;
total_distance_m numeric := 0;
total_driving_sec numeric := 0;
total_idling_sec numeric := 0;
total_stopped_sec numeric := 0;
total_unknown_sec numeric := 0;
longest_gap_sec numeric := 0;
fix_count int := 0;
pos_state text;
step_sec numeric;
step_m numeric;
-- closure helper inline (PL/pgSQL has no nested funcs, so we inline)
BEGIN
SELECT v.plate INTO v_plate FROM domain.vehicles v WHERE v.vehicle_id = p_vehicle_id;
IF v_plate IS NULL THEN
RETURN jsonb_build_object(
'error', 'vehicle not found',
'vehicle_id', p_vehicle_id,
'date', to_char(p_date_eat, 'YYYY-MM-DD')
);
END IF;
SELECT d.imei INTO v_imei
FROM domain.devices d
WHERE d.vehicle_id = p_vehicle_id
ORDER BY CASE d.device_type WHEN 'tracker' THEN 0 ELSE 1 END, d.imei
LIMIT 1;
FOR rec IN
SELECT occurred_at,
geom,
COALESCE(speed_kmh, 0)::numeric AS speed_kmh,
acc_state
FROM state.position_history
WHERE vehicle_id = p_vehicle_id
AND imei = v_imei
AND COALESCE(pos_type, '') NOT IN ('LBS', 'WIFI')
AND occurred_at >= day_start_utc
AND occurred_at < day_end_utc
ORDER BY occurred_at
LOOP
fix_count := fix_count + 1;
IF rec.acc_state IS NOT NULL THEN
has_acc := true;
END IF;
IF rec.acc_state = 1 AND reporting_time IS NULL THEN
reporting_time := rec.occurred_at;
END IF;
-- nofix_stop: if mid-trip and the prior fix was >= NOFIX_THRESH ago
-- but < GAP_THRESH, treat the silence as engine-off and close the
-- trip at prev_at before processing this row.
IF in_trip
AND prev_at IS NOT NULL
AND rec.occurred_at - prev_at >= NOFIX_THRESH
AND rec.occurred_at - prev_at <= GAP_THRESH
THEN
trips_out := trips_out || jsonb_build_object(
'trip_id', n_trips + 1,
'started_at', trip_started_at,
'ended_at', prev_at,
'duration_min', round(EXTRACT(EPOCH FROM (prev_at - trip_started_at))/60.0, 1),
'distance_km', round((trip_distance_m / 1000.0)::numeric, 2),
'idling_min', round((trip_idling_sec / 60.0)::numeric, 1),
'end_reason', 'nofix_stop',
'stops', trip_stops,
'path', CASE WHEN array_length(trip_path,1) >= 2
THEN ST_AsGeoJSON(ST_MakeLine(trip_path))::jsonb
ELSE NULL END
);
n_trips := n_trips + 1;
in_trip := false;
trip_path := ARRAY[]::geometry[];
trip_path_times := ARRAY[]::timestamptz[];
trip_path_speeds := ARRAY[]::numeric[];
trip_distance_m := 0;
trip_idling_sec := 0;
trip_stops := '[]'::jsonb;
off_run_start := NULL;
idle_run_start := NULL;
END IF;
-- Classify this position
IF prev_at IS NOT NULL AND rec.occurred_at - prev_at > GAP_THRESH THEN
pos_state := 'unknown';
ELSIF rec.speed_kmh >= STAT_KMH THEN
pos_state := 'moving';
ELSIF rec.acc_state = 1 THEN
pos_state := 'idling';
ELSIF rec.acc_state = 0 THEN
pos_state := 'stopped';
ELSE
-- no acc data: fall back to speed-only
pos_state := CASE WHEN rec.speed_kmh >= STAT_KMH THEN 'moving' ELSE 'stopped' END;
END IF;
-- Bucket the time *since* prev_at into totals
IF prev_at IS NOT NULL THEN
step_sec := EXTRACT(EPOCH FROM (rec.occurred_at - prev_at));
longest_gap_sec := GREATEST(longest_gap_sec, step_sec);
CASE prev_state
WHEN 'moving' THEN total_driving_sec := total_driving_sec + step_sec;
WHEN 'idling' THEN total_idling_sec := total_idling_sec + step_sec;
WHEN 'stopped' THEN total_stopped_sec := total_stopped_sec + step_sec;
ELSE total_unknown_sec := total_unknown_sec + step_sec;
END CASE;
IF in_trip AND prev_state = 'moving' AND pos_state IN ('moving','idling') THEN
step_m := ST_Distance(prev_geom::geography, rec.geom::geography);
trip_distance_m := trip_distance_m + step_m;
total_distance_m := total_distance_m + step_m;
ELSIF prev_state = 'moving' THEN
step_m := ST_Distance(prev_geom::geography, rec.geom::geography);
total_distance_m := total_distance_m + step_m;
END IF;
END IF;
----------------------------------------------------------------------
-- State machine
----------------------------------------------------------------------
IF pos_state = 'unknown' THEN
-- Gap longer than GAP_THRESH; close any open trip.
IF in_trip THEN
trips_out := trips_out || jsonb_build_object(
'trip_id', n_trips + 1,
'started_at', trip_started_at,
'ended_at', prev_at,
'duration_min', round(EXTRACT(EPOCH FROM (prev_at - trip_started_at))/60.0, 1),
'distance_km', round((trip_distance_m / 1000.0)::numeric, 2),
'idling_min', round((trip_idling_sec / 60.0)::numeric, 1),
'end_reason', 'long_gap',
'stops', trip_stops,
'path', CASE WHEN array_length(trip_path,1) >= 2
THEN ST_AsGeoJSON(ST_MakeLine(trip_path))::jsonb
ELSE NULL END
);
n_trips := n_trips + 1;
in_trip := false;
trip_path := ARRAY[]::geometry[];
trip_path_times := ARRAY[]::timestamptz[];
trip_path_speeds := ARRAY[]::numeric[];
trip_distance_m := 0;
trip_idling_sec := 0;
trip_stops := '[]'::jsonb;
off_run_start := NULL;
idle_run_start := NULL;
END IF;
ELSIF NOT in_trip THEN
-- Outside any trip. Start one when we see motion or ACC_ON.
IF pos_state IN ('moving','idling') OR rec.acc_state = 1 THEN
in_trip := true;
trip_started_at := rec.occurred_at;
trip_started_geom := rec.geom;
trip_path := ARRAY[rec.geom];
trip_path_times := ARRAY[rec.occurred_at];
trip_path_speeds := ARRAY[rec.speed_kmh];
trip_distance_m := 0;
trip_idling_sec := 0;
trip_stops := '[]'::jsonb;
off_run_start := NULL;
idle_run_start := CASE WHEN pos_state = 'idling' THEN rec.occurred_at ELSE NULL END;
END IF;
ELSE
-- In trip — append point, then handle stop/idle runs.
trip_path := trip_path || rec.geom;
trip_path_times := trip_path_times || rec.occurred_at;
trip_path_speeds := trip_path_speeds || rec.speed_kmh;
IF pos_state = 'stopped' THEN
IF off_run_start IS NULL THEN
off_run_start := rec.occurred_at;
off_run_geom := rec.geom;
END IF;
IF idle_run_start IS NOT NULL THEN
idle_run_start := NULL;
END IF;
IF rec.occurred_at - off_run_start >= STOP_THRESH THEN
-- Work stop confirmed: close the trip at off_run_start.
trips_out := trips_out || jsonb_build_object(
'trip_id', n_trips + 1,
'started_at', trip_started_at,
'ended_at', off_run_start,
'duration_min', round(EXTRACT(EPOCH FROM (off_run_start - trip_started_at))/60.0, 1),
'distance_km', round((trip_distance_m / 1000.0)::numeric, 2),
'idling_min', round((trip_idling_sec / 60.0)::numeric, 1),
'end_reason', 'work_stop',
'stops', trip_stops,
'path', CASE WHEN array_length(trip_path,1) >= 2
THEN ST_AsGeoJSON(ST_MakeLine(trip_path))::jsonb
ELSE NULL END
);
n_trips := n_trips + 1;
in_trip := false;
trip_path := ARRAY[]::geometry[];
trip_path_times := ARRAY[]::timestamptz[];
trip_path_speeds := ARRAY[]::numeric[];
trip_distance_m := 0;
trip_idling_sec := 0;
trip_stops := '[]'::jsonb;
off_run_start := NULL;
END IF;
ELSIF pos_state = 'idling' THEN
IF off_run_start IS NOT NULL THEN
-- ACC came back on before STOP_THRESH; abandon the off-run.
off_run_start := NULL;
END IF;
IF idle_run_start IS NULL THEN
idle_run_start := rec.occurred_at;
idle_run_geom := rec.geom;
ELSIF rec.occurred_at - idle_run_start >= STOP_THRESH THEN
-- Already recorded as idling within trip; nothing to flush
-- until run ends.
NULL;
END IF;
ELSE -- moving
IF off_run_start IS NOT NULL THEN
off_run_start := NULL;
END IF;
IF idle_run_start IS NOT NULL THEN
-- Idle run ended; if it was long enough, record it.
IF rec.occurred_at - idle_run_start >= STOP_THRESH THEN
trip_idling_sec := trip_idling_sec
+ EXTRACT(EPOCH FROM (rec.occurred_at - idle_run_start));
trip_stops := trip_stops || jsonb_build_object(
'at', idle_run_start,
'duration_min', round(EXTRACT(EPOCH FROM (rec.occurred_at - idle_run_start))/60.0, 1),
'kind', 'idling',
'lng', ST_X(idle_run_geom),
'lat', ST_Y(idle_run_geom)
);
END IF;
idle_run_start := NULL;
END IF;
END IF;
END IF;
prev_at := rec.occurred_at;
prev_geom := rec.geom;
prev_state := pos_state;
END LOOP;
-- Close any trip still open at end of loop.
IF in_trip THEN
-- If an unflushed idle run is open, account for it.
IF idle_run_start IS NOT NULL AND prev_at - idle_run_start >= STOP_THRESH THEN
trip_idling_sec := trip_idling_sec
+ EXTRACT(EPOCH FROM (prev_at - idle_run_start));
trip_stops := trip_stops || jsonb_build_object(
'at', idle_run_start,
'duration_min', round(EXTRACT(EPOCH FROM (prev_at - idle_run_start))/60.0, 1),
'kind', 'idling',
'lng', ST_X(idle_run_geom),
'lat', ST_Y(idle_run_geom)
);
END IF;
trips_out := trips_out || jsonb_build_object(
'trip_id', n_trips + 1,
'started_at', trip_started_at,
'ended_at', prev_at,
'duration_min', round(EXTRACT(EPOCH FROM (prev_at - trip_started_at))/60.0, 1),
'distance_km', round((trip_distance_m / 1000.0)::numeric, 2),
'idling_min', round((trip_idling_sec / 60.0)::numeric, 1),
'end_reason', 'day_end',
'stops', trip_stops,
'path', CASE WHEN array_length(trip_path,1) >= 2
THEN ST_AsGeoJSON(ST_MakeLine(trip_path))::jsonb
ELSE NULL END
);
n_trips := n_trips + 1;
END IF;
RETURN jsonb_build_object(
'vehicle_id', p_vehicle_id,
'plate', v_plate,
'imei', v_imei,
'date', to_char(p_date_eat, 'YYYY-MM-DD'),
'reporting_time', reporting_time,
'totals', jsonb_build_object(
'distance_km', round((total_distance_m / 1000.0)::numeric, 2),
'driving_min', round((total_driving_sec / 60.0)::numeric, 1),
'idling_min', round((total_idling_sec / 60.0)::numeric, 1),
'stopped_min', round((total_stopped_sec / 60.0)::numeric, 1),
'unknown_min', round((total_unknown_sec / 60.0)::numeric, 1),
'trip_count', n_trips
),
'data_quality', jsonb_build_object(
'fix_count', fix_count,
'has_acc_data', has_acc,
'longest_gap_sec', round(longest_gap_sec::numeric, 0)
),
'trips', trips_out
);
END
$fn$;
-- migrate:down
DROP FUNCTION IF EXISTS serve.fn_vehicle_trips(bigint, date);
DROP FUNCTION IF EXISTS serve.fn_vehicle_trips(integer, date);

View file

@ -1,61 +0,0 @@
-- migrate:up
--
-- Continuous day track for a vehicle: one GeoJSON LineString of every fix in
-- the EAT day, in time order. serve.fn_vehicle_trips returns each trip as its
-- own ST_MakeLine, so on the map a day with reporting-gap trip splits reads as
-- several disconnected segments. The frontend draws this track as a faint base
-- line under the coloured per-trip segments, so the route looks like one
-- continuous drive while individual trips stay highlighted.
--
-- EAT day boundary matches serve.fn_vehicle_trips (UTC+3). Light ST_Simplify
-- (~3 m) trims redundant points to keep the payload small without visibly
-- changing the shape. Returns NULL when there are fewer than 2 fixes.
--
-- v2 (accuracy fix): excludes low-accuracy LBS/WIFI fixes (pos_type IN
-- ('LBS','WIFI')). Cell-tower/wifi fixes can land kilometres off and, drawn as
-- straight segments, spiked the base line out and back to a parked vehicle.
-- NULL pos_type (push/crossfeed GPS) is kept — same rule as fn_vehicle_trips
-- and the serve.fn_live_view `low_accuracy` flag.
CREATE OR REPLACE FUNCTION serve.fn_vehicle_day_track(
p_vehicle_id bigint,
p_date_eat date
) RETURNS jsonb
LANGUAGE sql STABLE
AS $$
WITH bounds AS (
SELECT (p_date_eat::timestamp - interval '3 hours') AT TIME ZONE 'UTC' AS day_start
),
-- Use one device's fixes (tracker-first, same canonical pick as
-- fn_vehicle_trips) so a camera-paired vehicle's track doesn't zig-zag
-- between the tracker and camera positions.
primary_imei AS (
SELECT d.imei
FROM domain.devices d
WHERE d.vehicle_id = p_vehicle_id
ORDER BY CASE d.device_type WHEN 'tracker' THEN 0 ELSE 1 END, d.imei
LIMIT 1
),
pts AS (
SELECT ph.geom, ph.occurred_at
FROM state.position_history ph, bounds b, primary_imei pi
WHERE ph.vehicle_id = p_vehicle_id
AND ph.imei = pi.imei
AND COALESCE(ph.pos_type, '') NOT IN ('LBS', 'WIFI')
AND ph.occurred_at >= b.day_start
AND ph.occurred_at < b.day_start + interval '24 hours'
)
SELECT CASE
WHEN count(*) >= 2
THEN ST_AsGeoJSON(
ST_Simplify(ST_MakeLine(geom ORDER BY occurred_at), 0.00003)
)::jsonb
ELSE NULL
END
FROM pts;
$$;
-- migrate:down
DROP FUNCTION IF EXISTS serve.fn_vehicle_day_track(bigint, date);

View file

@ -61,14 +61,6 @@ def test_unsupported_msg_type_raises() -> None:
parse_raw("tracksolid_push", "pushobd", {}, account_id="acct-1") parse_raw("tracksolid_push", "pushobd", {}, account_id="acct-1")
def test_parse_raw_malformed_payload_no_crash() -> None:
# When JSON decoding fails at the gateway, the body is stored verbatim as
# {"_raw": ...}. The parser must tolerate it (yield nothing) rather than
# raise — otherwise the worker routes a non-error to events.parser_errors.
events = parse_raw("tracksolid_push", "pushgps", {"_raw": "garbage"}, account_id="acct-1")
assert events == []
def test_tracksolid_real_poll_list_drops_offline_keeps_valid() -> None: def test_tracksolid_real_poll_list_drops_offline_keeps_valid() -> None:
"""Production Tracksolid response: 1 offline JC400P (null gpsTime), 1 GT06E with a fix. """Production Tracksolid response: 1 offline JC400P (null gpsTime), 1 GT06E with a fix.
Parser should skip the offline device silently and emit one position_fix.""" Parser should skip the offline device silently and emit one position_fix."""

View file

@ -1,40 +0,0 @@
"""Unit tests for the push ingest payload coercion boundary.
`_coerce_payload` is where raw POST bodies first meet our parser: malformed
JSON must degrade to a stored `{"_raw": ...}` envelope rather than raising and
dropping the event.
"""
from app.routers.push import MAX_ITEMS_PER_POST, _coerce_payload
def test_malformed_json_is_wrapped_not_raised() -> None:
garbage = '{"lat": -1.28, "lng":' # truncated / invalid JSON
assert _coerce_payload(garbage) == {"_raw": garbage}
def test_empty_string_yields_empty_raw() -> None:
assert _coerce_payload("") == {"_raw": ""}
def test_valid_object_passes_through() -> None:
assert _coerce_payload('{"imei": "860112050000001", "speed": 42}') == {
"imei": "860112050000001",
"speed": 42,
}
def test_top_level_array_is_listed() -> None:
assert _coerce_payload('[{"imei": "1"}, {"imei": "2"}]') == {
"_list": [{"imei": "1"}, {"imei": "2"}]
}
def test_oversized_array_is_truncated() -> None:
oversized = "[" + ",".join(["{}"] * (MAX_ITEMS_PER_POST + 5)) + "]"
result = _coerce_payload(oversized)
assert len(result["_list"]) == MAX_ITEMS_PER_POST
def test_scalar_json_falls_back_to_raw() -> None:
assert _coerce_payload("42") == {"_raw": "42"}

View file

@ -486,7 +486,7 @@ function _renderSummary(root, summary) {
* as an OR by sending no filter and letting the renderer hide the rest. * as an OR by sending no filter and letting the renderer hide the rest.
* That keeps the SQL contract unchanged for P1. * That keeps the SQL contract unchanged for P1.
*/ */
export function initFilters(root, onChange, onVehiclePick) { export function initFilters(root, onChange) {
const ccWidget = _buildMultiSelect( const ccWidget = _buildMultiSelect(
root.querySelector('#flt-cost-centre'), root.querySelector('#flt-cost-centre'),
{ label: 'cost centre', plural: 'cost centres', showSwatch: true }, { label: 'cost centre', plural: 'cost centres', showSwatch: true },
@ -509,24 +509,6 @@ export function initFilters(root, onChange, onVehiclePick) {
ccWidget.onChange(emit); ccWidget.onChange(emit);
cityWidget.onChange(emit); cityWidget.onChange(emit);
// Persistent fleet registry so the finder always lists every vehicle seen
// this session, even after a cost-centre/city filter narrows the live view.
const vehReg = new Map();
const vehFinder = _buildVehicleSelect(root.querySelector('#flt-vehicle'), {
onSelect(meta) {
// Match the two filters to the vehicle, then emit once (single refresh).
ccWidget.setSelection(meta.cost_centre ? [meta.cost_centre] : []);
cityWidget.setSelection(meta.assigned_city ? [meta.assigned_city] : []);
emit();
onVehiclePick && onVehiclePick(meta);
},
onClear() {
ccWidget.setSelection([]);
cityWidget.setSelection([]);
emit();
},
});
return { return {
updateOptions(features) { updateOptions(features) {
const cc = new Map(); const cc = new Map();
@ -535,26 +517,11 @@ export function initFilters(root, onChange, onVehiclePick) {
const p = f.properties || {}; const p = f.properties || {};
if (p.cost_centre) cc.set(p.cost_centre, p.cost_centre_color || '#94a3b8'); if (p.cost_centre) cc.set(p.cost_centre, p.cost_centre_color || '#94a3b8');
if (p.assigned_city) city.add(p.assigned_city); if (p.assigned_city) city.add(p.assigned_city);
if (p.vehicle_id != null) {
const coords = (f.geometry && f.geometry.coordinates) || [];
vehReg.set(p.vehicle_id, {
vehicle_id: p.vehicle_id,
plate: p.plate,
driver_name: p.driver_name,
cost_centre: p.cost_centre,
assigned_city: p.assigned_city,
occurred_at: p.occurred_at,
lng: coords[0] ?? null,
lat: coords[1] ?? null,
});
}
} }
ccWidget.setOptions([...cc.entries()].sort() ccWidget.setOptions([...cc.entries()].sort()
.map(([value, color]) => ({ value, color }))); .map(([value, color]) => ({ value, color })));
cityWidget.setOptions([...city].sort() cityWidget.setOptions([...city].sort()
.map(value => ({ value }))); .map(value => ({ value })));
vehFinder.setOptions([...vehReg.values()]
.sort((a, b) => (a.plate || '').localeCompare(b.plate || '')));
}, },
getActive() { getActive() {
return { costCentres: ccWidget.getValues(), cities: cityWidget.getValues() }; return { costCentres: ccWidget.getValues(), cities: cityWidget.getValues() };
@ -587,18 +554,8 @@ function _buildMultiSelect(root, { label, plural, showSwatch }) {
const listeners = []; const listeners = [];
let options = []; // [{value, color?}] let options = []; // [{value, color?}]
let lastSig = ''; // skip no-op rebuilds on the 15s live refresh let lastSig = ''; // skip no-op rebuilds on the 15s live refresh
// Non-null = a selection forced programmatically (by the vehicle finder).
// Enforced across rebuilds until the user touches the widget themselves.
let _forced = null;
const updateLabel = () => { const updateLabel = () => {
if (_forced && _forced.length) {
// A forced (vehicle-matched) selection stays labelled by its value even
// when the live view collapses the option list down to just that value.
btnLabel.textContent = _forced.length === 1 ? _forced[0] : `${_forced.length} ${plural}`;
allBox.checked = false;
return;
}
const checked = [...optsRoot.querySelectorAll('input:checked')]; const checked = [...optsRoot.querySelectorAll('input:checked')];
if (checked.length === 0 || checked.length === options.length) { if (checked.length === 0 || checked.length === options.length) {
btnLabel.textContent = `All ${plural}`; btnLabel.textContent = `All ${plural}`;
@ -632,7 +589,6 @@ function _buildMultiSelect(root, { label, plural, showSwatch }) {
}); });
allBox.addEventListener('change', () => { allBox.addEventListener('change', () => {
_forced = null; // manual interaction takes back control
const checked = allBox.checked; const checked = allBox.checked;
optsRoot.querySelectorAll('input').forEach(cb => { cb.checked = checked; }); optsRoot.querySelectorAll('input').forEach(cb => { cb.checked = checked; });
if (!checked) allBox.checked = false; // "All" un-check = clear if (!checked) allBox.checked = false; // "All" un-check = clear
@ -653,39 +609,20 @@ function _buildMultiSelect(root, { label, plural, showSwatch }) {
const prevChecked = new Set( const prevChecked = new Set(
[...optsRoot.querySelectorAll('input:checked')].map(cb => cb.value), [...optsRoot.querySelectorAll('input:checked')].map(cb => cb.value),
); );
const forced = _forced ? new Set(_forced) : null;
const wasAll = prevChecked.size === 0 || allBox.checked; const wasAll = prevChecked.size === 0 || allBox.checked;
const isChecked = (value) => forced
? forced.has(value)
: (wasAll || prevChecked.has(value));
optsRoot.innerHTML = opts.map(({ value, color }) => ` optsRoot.innerHTML = opts.map(({ value, color }) => `
<label class="ms-row"> <label class="ms-row">
<input type="checkbox" value="${_esc(value)}" ${isChecked(value) ? 'checked' : ''} /> <input type="checkbox" value="${_esc(value)}" ${wasAll || prevChecked.has(value) ? 'checked' : ''} />
${showSwatch && color ? `<span class="ms-swatch" style="background:${_esc(color)}"></span>` : ''} ${showSwatch && color ? `<span class="ms-swatch" style="background:${_esc(color)}"></span>` : ''}
<span class="ms-row-label">${_esc(value)}</span> <span class="ms-row-label">${_esc(value)}</span>
</label> </label>
`).join(''); `).join('');
optsRoot.querySelectorAll('input').forEach(cb => { optsRoot.querySelectorAll('input').forEach(cb => {
cb.addEventListener('change', () => { _forced = null; updateLabel(); fire(); }); cb.addEventListener('change', () => { updateLabel(); fire(); });
});
updateLabel();
},
// Force a selection from outside (the vehicle finder). Pass [] to reset to
// "All". Updates the DOM + label now and is re-applied on later rebuilds;
// does not fire onChange — the caller emits once after setting both widgets.
setSelection(values) {
_forced = (values && values.length) ? [...values] : null;
const want = _forced ? new Set(_forced) : null;
optsRoot.querySelectorAll('input').forEach(cb => {
cb.checked = want ? want.has(cb.value) : true;
}); });
updateLabel(); updateLabel();
}, },
getValues() { getValues() {
// Forced selection is authoritative until the user touches the widget,
// so the cost-centre/city filter survives even after the option list
// collapses to the single matched value on the next refresh.
if (_forced && _forced.length) return [..._forced];
const checked = [...optsRoot.querySelectorAll('input:checked')]; const checked = [...optsRoot.querySelectorAll('input:checked')];
if (checked.length === 0 || checked.length === options.length) return []; if (checked.length === 0 || checked.length === options.length) return [];
return checked.map(cb => cb.value); return checked.map(cb => cb.value);
@ -694,104 +631,6 @@ function _buildMultiSelect(root, { label, plural, showSwatch }) {
}; };
} }
/**
* Single-select vehicle finder with a search box. Lists the whole known fleet
* (plate + driver/device name); typing narrows by plate or driver. Picking a
* row calls onSelect(meta); the "All vehicles" row calls onClear().
*
* meta = { vehicle_id, plate, driver_name, cost_centre, assigned_city,
* occurred_at, lng, lat } enough for the caller to set the matching
* cost-centre/city filters and fly to / open the vehicle.
*/
function _buildVehicleSelect(root, { onSelect, onClear }) {
root.classList.add('ms', 'ms-vehicle');
root.innerHTML = `
<button type="button" class="ms-btn" aria-haspopup="listbox" aria-expanded="false">
<span class="ms-btn-label">Find vehicle</span>
<span class="ms-caret"></span>
</button>
<div class="ms-pop" role="listbox" hidden>
<input type="text" class="ms-search" placeholder="Search plate or driver…" autocomplete="off" />
<div class="ms-row ms-row-all ms-veh-all" role="option"><span>All vehicles</span></div>
<div class="ms-options"></div>
</div>
`;
const btn = root.querySelector('.ms-btn');
const btnLabel = root.querySelector('.ms-btn-label');
const pop = root.querySelector('.ms-pop');
const search = root.querySelector('.ms-search');
const allRow = root.querySelector('.ms-veh-all');
const optsRoot = root.querySelector('.ms-options');
let options = []; // [{vehicle_id, plate, driver_name, ...meta}]
let selectedId = null;
const renderList = (q) => {
const needle = (q || '').trim().toLowerCase();
const rows = options.filter(o => !needle
|| (o.plate || '').toLowerCase().includes(needle)
|| (o.driver_name || '').toLowerCase().includes(needle));
optsRoot.innerHTML = rows.length ? rows.map(o => `
<div class="ms-row ms-vehicle-row" role="option" data-vid="${_esc(String(o.vehicle_id))}">
<span class="ms-row-label">${_esc(o.plate || ('Vehicle ' + o.vehicle_id))}</span>
${o.driver_name ? `<span class="ms-row-sub">${_esc(o.driver_name)}</span>` : ''}
</div>
`).join('') : '<div class="ms-empty">No match</div>';
optsRoot.querySelectorAll('.ms-vehicle-row').forEach(el => {
el.addEventListener('click', () => {
const vid = Number(el.getAttribute('data-vid'));
const meta = options.find(o => o.vehicle_id === vid);
if (!meta) return;
selectedId = vid;
btnLabel.textContent = meta.plate || (`Vehicle ${vid}`);
close();
onSelect && onSelect(meta);
});
});
};
const open = () => {
pop.removeAttribute('hidden');
btn.setAttribute('aria-expanded', 'true');
search.value = '';
renderList('');
search.focus();
};
const close = () => {
pop.setAttribute('hidden', '');
btn.setAttribute('aria-expanded', 'false');
};
btn.addEventListener('click', (e) => {
e.stopPropagation();
pop.hasAttribute('hidden') ? open() : close();
});
document.addEventListener('click', (e) => {
if (!root.contains(e.target) && !pop.hasAttribute('hidden')) close();
});
search.addEventListener('click', (e) => e.stopPropagation());
search.addEventListener('input', () => renderList(search.value));
allRow.addEventListener('click', () => {
selectedId = null;
btnLabel.textContent = 'Find vehicle…';
close();
onClear && onClear();
});
return {
// Refresh the backing list. Don't rebuild the open popover mid-search;
// the next keystroke re-renders from the updated data.
setOptions(list) {
options = list;
if (selectedId != null) {
const m = list.find(o => o.vehicle_id === selectedId);
if (m) btnLabel.textContent = m.plate || (`Vehicle ${selectedId}`);
}
},
getValue() { return selectedId; },
};
}
/** /**
* Client-side narrowing: after rendering, hide markers whose cost_centre * Client-side narrowing: after rendering, hide markers whose cost_centre
* or assigned_city isn't in the multi-select set. Used when the user picks * or assigned_city isn't in the multi-select set. Used when the user picks
@ -951,53 +790,6 @@ export function initTripPanel(map, panelRoot) {
_renderDock(map, els); _renderDock(map, els);
} }
}); });
// Open a single vehicle programmatically (from the vehicle finder). Mirrors a
// plain map-click: jumps to the vehicle's last-active day unless the user has
// already picked a date, draws its day, and frames the route on the map.
async function openVehicle(vid, meta = {}) {
if (!_dateUserPicked) {
els.date.value = _eatDate(meta.occurred_at);
} else if (!els.date.value) {
els.date.value = _todayEat();
}
_currentDate = els.date.value;
panelRoot.classList.add('open');
panelRoot.setAttribute('aria-hidden', 'false');
_clearSelection(map);
_addVehicle(vid, meta.plate || `Vehicle ${vid}`, meta.driver_name || '');
await _fetchAndDraw(map, vid);
_renderDock(map, els);
_fitToVehicle(map, _selection.get(vid), meta);
}
return { openVehicle };
}
// Frame a vehicle's day on the map: fit to its day-track/trip path when there
// is one, else fly to its live position.
function _fitToVehicle(map, entry, meta) {
let coords = [];
const p = entry && entry.payload;
if (p && p.day_track && Array.isArray(p.day_track.coordinates)) {
coords = p.day_track.coordinates;
} else if (p && Array.isArray(p.trips)) {
for (const t of p.trips) {
if (t.path && Array.isArray(t.path.coordinates)) {
coords = coords.concat(t.path.coordinates);
}
}
}
// eslint-disable-next-line no-undef
if (coords.length >= 2 && typeof maplibregl !== 'undefined') {
const bounds = coords.reduce(
// eslint-disable-next-line no-undef
(b, c) => b.extend(c), new maplibregl.LngLatBounds(coords[0], coords[0]),
);
map.fitBounds(bounds, { padding: 80, maxZoom: 15, duration: 600 });
} else if (meta && meta.lng != null && meta.lat != null) {
map.flyTo({ center: [meta.lng, meta.lat], zoom: Math.max(map.getZoom(), 14), duration: 600 });
}
} }
function _addVehicle(vid, plate, driver) { function _addVehicle(vid, plate, driver) {

View file

@ -90,19 +90,6 @@
} }
.ms-row-label { overflow: hidden; text-overflow: ellipsis; white-space: nowrap; } .ms-row-label { overflow: hidden; text-overflow: ellipsis; white-space: nowrap; }
/* vehicle finder: single-select with a search box */
.ms-vehicle { min-width: 210px; }
.ms-search {
width: 100%; box-sizing: border-box; margin-bottom: 6px;
background: var(--panel-2); color: var(--text);
border: 1px solid var(--panel-2); border-radius: 4px;
padding: 6px 8px; font-size: 12px; font-family: inherit;
}
.ms-search:focus { outline: none; border-color: var(--muted); }
.ms-vehicle-row { justify-content: space-between; }
.ms-row-sub { color: var(--muted); font-size: 11px; margin-left: 8px; flex-shrink: 0; }
.ms-empty { padding: 8px; color: var(--muted); font-size: 12px; }
/* ─────────── map (fills remaining height) ─────────── */ /* ─────────── map (fills remaining height) ─────────── */
#map-container { position: relative; min-height: 0; } #map-container { position: relative; min-height: 0; }
#map { position: absolute; inset: 0; } #map { position: absolute; inset: 0; }
@ -222,7 +209,6 @@
<div class="band-block"> <div class="band-block">
<div class="band-title">Filters</div> <div class="band-title">Filters</div>
<div class="band-row" id="filters"> <div class="band-row" id="filters">
<div id="flt-vehicle"></div>
<div id="flt-cost-centre"></div> <div id="flt-cost-centre"></div>
<div id="flt-assigned-city"></div> <div id="flt-assigned-city"></div>
</div> </div>
@ -281,19 +267,13 @@
} }
} }
const tripApi = initTripPanel(map, document.getElementById('trip-panel')); const filters = initFilters(document.getElementById('filters'), (serverFilters, selection) => {
const filters = initFilters(
document.getElementById('filters'),
(serverFilters, selection) => {
currentFilters = serverFilters; currentFilters = serverFilters;
activeSelection = selection; activeSelection = selection;
refresh(); refresh();
}, });
// Vehicle picked from the finder → its cost-centre/city filters are set by
// initFilters; here we locate it on the map and open its trips. initTripPanel(map, document.getElementById('trip-panel'));
(meta) => { tripApi.openVehicle(meta.vehicle_id, meta); },
);
refresh(); refresh();
setInterval(refresh, 15000); setInterval(refresh, 15000);