diff --git a/app/config.py b/app/config.py index 149c20b..3bf0a61 100644 --- a/app/config.py +++ b/app/config.py @@ -42,6 +42,10 @@ class Settings(BaseSettings): geocoder_max_per_tick: int = Field(default=8, alias="GEOCODER_MAX_PER_TICK") geocoder_tick_sec: int = Field(default=30, alias="GEOCODER_TICK_SEC") geocoder_rate_limit_sec: float = Field(default=1.1, alias="GEOCODER_RATE_LIMIT_SEC") + geocoder_breaker_threshold: int = Field(default=3, alias="GEOCODER_BREAKER_THRESHOLD") + geocoder_breaker_cooldown_sec: float = Field( + default=300.0, alias="GEOCODER_BREAKER_COOLDOWN_SEC" + ) app_mode: Literal["dev", "prod"] = Field(default="prod", alias="APP_MODE") app_role: Literal["gateway", "worker", "cron"] = Field(default="gateway", alias="APP_ROLE") diff --git a/app/workers/geocoder.py b/app/workers/geocoder.py index 250c671..05ec8f6 100644 --- a/app/workers/geocoder.py +++ b/app/workers/geocoder.py @@ -7,6 +7,7 @@ minute. Rate-limited to comply with Nominatim's 1 req/sec policy. """ import asyncio +import time from typing import Any import httpx @@ -17,6 +18,16 @@ from app.db import get_pool log = structlog.get_logger("worker.geocoder") +# Circuit breaker: when Nominatim fails repeatedly (down or rate-limiting us), +# trip open and skip ticks for a cooldown rather than grinding through every +# batch 1 req/sec only to fail. Module-level so the open state survives across +# ticks; monotonic clock so it's immune to wall-clock jumps. +class _Breaker: + open_until: float = 0.0 + + +_breaker = _Breaker() + def _extract_short_address(payload: dict[str, Any]) -> str | None: """Build the screenshot-style 'Kiambu-Limuru Rd., Limuru' string.""" @@ -37,6 +48,11 @@ def _extract_short_address(payload: dict[str, Any]) -> str | None: async def geocode_pending(settings: Settings) -> None: + now = time.monotonic() + if now < _breaker.open_until: + log.info("geocoder.breaker_open", reopens_in_sec=round(_breaker.open_until - now, 1)) + return + pool = await get_pool() async with pool.connection() as conn, conn.cursor() as cur: await cur.execute( @@ -65,6 +81,7 @@ async def geocode_pending(settings: Settings) -> None: headers={"User-Agent": settings.nominatim_user_agent}, timeout=httpx.Timeout(20.0), ) as http: + consecutive_failures = 0 for lat, lng in pending: await asyncio.sleep(settings.geocoder_rate_limit_sec) try: @@ -82,8 +99,21 @@ async def geocode_pending(settings: Settings) -> None: data = r.json() except Exception: log.exception("geocoder.fetch_failed", lat=float(lat), lng=float(lng)) + consecutive_failures += 1 + if consecutive_failures >= settings.geocoder_breaker_threshold: + _breaker.open_until = ( + time.monotonic() + settings.geocoder_breaker_cooldown_sec + ) + log.warning( + "geocoder.breaker_tripped", + consecutive_failures=consecutive_failures, + cooldown_sec=settings.geocoder_breaker_cooldown_sec, + ) + break continue + consecutive_failures = 0 + address = data.get("display_name") short = _extract_short_address(data) diff --git a/tests/test_parsers.py b/tests/test_parsers.py index 614b9d8..b998545 100644 --- a/tests/test_parsers.py +++ b/tests/test_parsers.py @@ -61,6 +61,14 @@ def test_unsupported_msg_type_raises() -> None: parse_raw("tracksolid_push", "pushobd", {}, account_id="acct-1") +def test_parse_raw_malformed_payload_no_crash() -> None: + # When JSON decoding fails at the gateway, the body is stored verbatim as + # {"_raw": ...}. The parser must tolerate it (yield nothing) rather than + # raise — otherwise the worker routes a non-error to events.parser_errors. + events = parse_raw("tracksolid_push", "pushgps", {"_raw": "garbage"}, account_id="acct-1") + assert events == [] + + def test_tracksolid_real_poll_list_drops_offline_keeps_valid() -> None: """Production Tracksolid response: 1 offline JC400P (null gpsTime), 1 GT06E with a fix. Parser should skip the offline device silently and emit one position_fix.""" diff --git a/tests/test_push_coerce.py b/tests/test_push_coerce.py new file mode 100644 index 0000000..ff7b723 --- /dev/null +++ b/tests/test_push_coerce.py @@ -0,0 +1,40 @@ +"""Unit tests for the push ingest payload coercion boundary. + +`_coerce_payload` is where raw POST bodies first meet our parser: malformed +JSON must degrade to a stored `{"_raw": ...}` envelope rather than raising and +dropping the event. +""" + +from app.routers.push import MAX_ITEMS_PER_POST, _coerce_payload + + +def test_malformed_json_is_wrapped_not_raised() -> None: + garbage = '{"lat": -1.28, "lng":' # truncated / invalid JSON + assert _coerce_payload(garbage) == {"_raw": garbage} + + +def test_empty_string_yields_empty_raw() -> None: + assert _coerce_payload("") == {"_raw": ""} + + +def test_valid_object_passes_through() -> None: + assert _coerce_payload('{"imei": "860112050000001", "speed": 42}') == { + "imei": "860112050000001", + "speed": 42, + } + + +def test_top_level_array_is_listed() -> None: + assert _coerce_payload('[{"imei": "1"}, {"imei": "2"}]') == { + "_list": [{"imei": "1"}, {"imei": "2"}] + } + + +def test_oversized_array_is_truncated() -> None: + oversized = "[" + ",".join(["{}"] * (MAX_ITEMS_PER_POST + 5)) + "]" + result = _coerce_payload(oversized) + assert len(result["_list"]) == MAX_ITEMS_PER_POST + + +def test_scalar_json_falls_back_to_raw() -> None: + assert _coerce_payload("42") == {"_raw": "42"}