Geocoder trips open after N consecutive Nominatim failures and skips ticks
for a cooldown, instead of grinding the whole batch 1 req/sec on every tick.
Adds tests for _coerce_payload (malformed JSON degrades to {_raw}) and
parse_raw tolerating a malformed payload.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
40 lines
1.2 KiB
Python
40 lines
1.2 KiB
Python
"""Unit tests for the push ingest payload coercion boundary.
|
|
|
|
`_coerce_payload` is where raw POST bodies first meet our parser: malformed
|
|
JSON must degrade to a stored `{"_raw": ...}` envelope rather than raising and
|
|
dropping the event.
|
|
"""
|
|
|
|
from app.routers.push import MAX_ITEMS_PER_POST, _coerce_payload
|
|
|
|
|
|
def test_malformed_json_is_wrapped_not_raised() -> None:
|
|
garbage = '{"lat": -1.28, "lng":' # truncated / invalid JSON
|
|
assert _coerce_payload(garbage) == {"_raw": garbage}
|
|
|
|
|
|
def test_empty_string_yields_empty_raw() -> None:
|
|
assert _coerce_payload("") == {"_raw": ""}
|
|
|
|
|
|
def test_valid_object_passes_through() -> None:
|
|
assert _coerce_payload('{"imei": "860112050000001", "speed": 42}') == {
|
|
"imei": "860112050000001",
|
|
"speed": 42,
|
|
}
|
|
|
|
|
|
def test_top_level_array_is_listed() -> None:
|
|
assert _coerce_payload('[{"imei": "1"}, {"imei": "2"}]') == {
|
|
"_list": [{"imei": "1"}, {"imei": "2"}]
|
|
}
|
|
|
|
|
|
def test_oversized_array_is_truncated() -> None:
|
|
oversized = "[" + ",".join(["{}"] * (MAX_ITEMS_PER_POST + 5)) + "]"
|
|
result = _coerce_payload(oversized)
|
|
assert len(result["_list"]) == MAX_ITEMS_PER_POST
|
|
|
|
|
|
def test_scalar_json_falls_back_to_raw() -> None:
|
|
assert _coerce_payload("42") == {"_raw": "42"}
|