Geocoder circuit breaker + invalid-input regression tests
Geocoder trips open after N consecutive Nominatim failures and skips ticks
for a cooldown, instead of grinding the whole batch 1 req/sec on every tick.
Adds tests for _coerce_payload (malformed JSON degrades to {_raw}) and
parse_raw tolerating a malformed payload.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
parent
7d250e120d
commit
4632990143
4 changed files with 82 additions and 0 deletions
|
|
@ -42,6 +42,10 @@ class Settings(BaseSettings):
|
||||||
geocoder_max_per_tick: int = Field(default=8, alias="GEOCODER_MAX_PER_TICK")
|
geocoder_max_per_tick: int = Field(default=8, alias="GEOCODER_MAX_PER_TICK")
|
||||||
geocoder_tick_sec: int = Field(default=30, alias="GEOCODER_TICK_SEC")
|
geocoder_tick_sec: int = Field(default=30, alias="GEOCODER_TICK_SEC")
|
||||||
geocoder_rate_limit_sec: float = Field(default=1.1, alias="GEOCODER_RATE_LIMIT_SEC")
|
geocoder_rate_limit_sec: float = Field(default=1.1, alias="GEOCODER_RATE_LIMIT_SEC")
|
||||||
|
geocoder_breaker_threshold: int = Field(default=3, alias="GEOCODER_BREAKER_THRESHOLD")
|
||||||
|
geocoder_breaker_cooldown_sec: float = Field(
|
||||||
|
default=300.0, alias="GEOCODER_BREAKER_COOLDOWN_SEC"
|
||||||
|
)
|
||||||
|
|
||||||
app_mode: Literal["dev", "prod"] = Field(default="prod", alias="APP_MODE")
|
app_mode: Literal["dev", "prod"] = Field(default="prod", alias="APP_MODE")
|
||||||
app_role: Literal["gateway", "worker", "cron"] = Field(default="gateway", alias="APP_ROLE")
|
app_role: Literal["gateway", "worker", "cron"] = Field(default="gateway", alias="APP_ROLE")
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,7 @@ minute. Rate-limited to comply with Nominatim's 1 req/sec policy.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
|
import time
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
import httpx
|
import httpx
|
||||||
|
|
@ -17,6 +18,16 @@ from app.db import get_pool
|
||||||
|
|
||||||
log = structlog.get_logger("worker.geocoder")
|
log = structlog.get_logger("worker.geocoder")
|
||||||
|
|
||||||
|
# Circuit breaker: when Nominatim fails repeatedly (down or rate-limiting us),
|
||||||
|
# trip open and skip ticks for a cooldown rather than grinding through every
|
||||||
|
# batch 1 req/sec only to fail. Module-level so the open state survives across
|
||||||
|
# ticks; monotonic clock so it's immune to wall-clock jumps.
|
||||||
|
class _Breaker:
|
||||||
|
open_until: float = 0.0
|
||||||
|
|
||||||
|
|
||||||
|
_breaker = _Breaker()
|
||||||
|
|
||||||
|
|
||||||
def _extract_short_address(payload: dict[str, Any]) -> str | None:
|
def _extract_short_address(payload: dict[str, Any]) -> str | None:
|
||||||
"""Build the screenshot-style 'Kiambu-Limuru Rd., Limuru' string."""
|
"""Build the screenshot-style 'Kiambu-Limuru Rd., Limuru' string."""
|
||||||
|
|
@ -37,6 +48,11 @@ def _extract_short_address(payload: dict[str, Any]) -> str | None:
|
||||||
|
|
||||||
|
|
||||||
async def geocode_pending(settings: Settings) -> None:
|
async def geocode_pending(settings: Settings) -> None:
|
||||||
|
now = time.monotonic()
|
||||||
|
if now < _breaker.open_until:
|
||||||
|
log.info("geocoder.breaker_open", reopens_in_sec=round(_breaker.open_until - now, 1))
|
||||||
|
return
|
||||||
|
|
||||||
pool = await get_pool()
|
pool = await get_pool()
|
||||||
async with pool.connection() as conn, conn.cursor() as cur:
|
async with pool.connection() as conn, conn.cursor() as cur:
|
||||||
await cur.execute(
|
await cur.execute(
|
||||||
|
|
@ -65,6 +81,7 @@ async def geocode_pending(settings: Settings) -> None:
|
||||||
headers={"User-Agent": settings.nominatim_user_agent},
|
headers={"User-Agent": settings.nominatim_user_agent},
|
||||||
timeout=httpx.Timeout(20.0),
|
timeout=httpx.Timeout(20.0),
|
||||||
) as http:
|
) as http:
|
||||||
|
consecutive_failures = 0
|
||||||
for lat, lng in pending:
|
for lat, lng in pending:
|
||||||
await asyncio.sleep(settings.geocoder_rate_limit_sec)
|
await asyncio.sleep(settings.geocoder_rate_limit_sec)
|
||||||
try:
|
try:
|
||||||
|
|
@ -82,8 +99,21 @@ async def geocode_pending(settings: Settings) -> None:
|
||||||
data = r.json()
|
data = r.json()
|
||||||
except Exception:
|
except Exception:
|
||||||
log.exception("geocoder.fetch_failed", lat=float(lat), lng=float(lng))
|
log.exception("geocoder.fetch_failed", lat=float(lat), lng=float(lng))
|
||||||
|
consecutive_failures += 1
|
||||||
|
if consecutive_failures >= settings.geocoder_breaker_threshold:
|
||||||
|
_breaker.open_until = (
|
||||||
|
time.monotonic() + settings.geocoder_breaker_cooldown_sec
|
||||||
|
)
|
||||||
|
log.warning(
|
||||||
|
"geocoder.breaker_tripped",
|
||||||
|
consecutive_failures=consecutive_failures,
|
||||||
|
cooldown_sec=settings.geocoder_breaker_cooldown_sec,
|
||||||
|
)
|
||||||
|
break
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
consecutive_failures = 0
|
||||||
|
|
||||||
address = data.get("display_name")
|
address = data.get("display_name")
|
||||||
short = _extract_short_address(data)
|
short = _extract_short_address(data)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -61,6 +61,14 @@ def test_unsupported_msg_type_raises() -> None:
|
||||||
parse_raw("tracksolid_push", "pushobd", {}, account_id="acct-1")
|
parse_raw("tracksolid_push", "pushobd", {}, account_id="acct-1")
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_raw_malformed_payload_no_crash() -> None:
|
||||||
|
# When JSON decoding fails at the gateway, the body is stored verbatim as
|
||||||
|
# {"_raw": ...}. The parser must tolerate it (yield nothing) rather than
|
||||||
|
# raise — otherwise the worker routes a non-error to events.parser_errors.
|
||||||
|
events = parse_raw("tracksolid_push", "pushgps", {"_raw": "garbage"}, account_id="acct-1")
|
||||||
|
assert events == []
|
||||||
|
|
||||||
|
|
||||||
def test_tracksolid_real_poll_list_drops_offline_keeps_valid() -> None:
|
def test_tracksolid_real_poll_list_drops_offline_keeps_valid() -> None:
|
||||||
"""Production Tracksolid response: 1 offline JC400P (null gpsTime), 1 GT06E with a fix.
|
"""Production Tracksolid response: 1 offline JC400P (null gpsTime), 1 GT06E with a fix.
|
||||||
Parser should skip the offline device silently and emit one position_fix."""
|
Parser should skip the offline device silently and emit one position_fix."""
|
||||||
|
|
|
||||||
40
tests/test_push_coerce.py
Normal file
40
tests/test_push_coerce.py
Normal file
|
|
@ -0,0 +1,40 @@
|
||||||
|
"""Unit tests for the push ingest payload coercion boundary.
|
||||||
|
|
||||||
|
`_coerce_payload` is where raw POST bodies first meet our parser: malformed
|
||||||
|
JSON must degrade to a stored `{"_raw": ...}` envelope rather than raising and
|
||||||
|
dropping the event.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from app.routers.push import MAX_ITEMS_PER_POST, _coerce_payload
|
||||||
|
|
||||||
|
|
||||||
|
def test_malformed_json_is_wrapped_not_raised() -> None:
|
||||||
|
garbage = '{"lat": -1.28, "lng":' # truncated / invalid JSON
|
||||||
|
assert _coerce_payload(garbage) == {"_raw": garbage}
|
||||||
|
|
||||||
|
|
||||||
|
def test_empty_string_yields_empty_raw() -> None:
|
||||||
|
assert _coerce_payload("") == {"_raw": ""}
|
||||||
|
|
||||||
|
|
||||||
|
def test_valid_object_passes_through() -> None:
|
||||||
|
assert _coerce_payload('{"imei": "860112050000001", "speed": 42}') == {
|
||||||
|
"imei": "860112050000001",
|
||||||
|
"speed": 42,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def test_top_level_array_is_listed() -> None:
|
||||||
|
assert _coerce_payload('[{"imei": "1"}, {"imei": "2"}]') == {
|
||||||
|
"_list": [{"imei": "1"}, {"imei": "2"}]
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def test_oversized_array_is_truncated() -> None:
|
||||||
|
oversized = "[" + ",".join(["{}"] * (MAX_ITEMS_PER_POST + 5)) + "]"
|
||||||
|
result = _coerce_payload(oversized)
|
||||||
|
assert len(result["_list"]) == MAX_ITEMS_PER_POST
|
||||||
|
|
||||||
|
|
||||||
|
def test_scalar_json_falls_back_to_raw() -> None:
|
||||||
|
assert _coerce_payload("42") == {"_raw": "42"}
|
||||||
Loading…
Reference in a new issue