fleet-platform/tests/test_parsers.py

79 lines
2.6 KiB
Python
Raw Normal View History

import pytest
from app.parsers.jimi import UnsupportedMsgType, parse_raw
from tests.fixtures.jimi_payloads import (
POLL_LOCATION_LIST_RESPONSE,
PUSH_ALARM_SINGLE,
PUSH_EVENT_LOGIN,
PUSH_GPS_SINGLE,
PUSH_HEARTBEAT_SINGLE,
TRACKSOLID_POLL_LIST_REAL,
ZERO_ISLAND_FIX,
)
def test_push_gps_single_produces_one_position_fix() -> None:
events = parse_raw("tracksolid_push", "pushgps", PUSH_GPS_SINGLE, account_id="acct-1")
assert len(events) == 1
ev = events[0]
assert ev.kind == "position_fix"
assert ev.imei == "860112050000001"
assert ev.account_id == "acct-1"
assert ev.payload["lat"] == -1.2864
assert ev.payload["speed_kmh"] == 42.5
def test_zero_island_is_dropped() -> None:
events = parse_raw("tracksolid_push", "pushgps", ZERO_ISLAND_FIX, account_id="acct-1")
assert events == []
def test_push_alarm_produces_alarm_and_crossfeed_position() -> None:
events = parse_raw("tracksolid_push", "pushalarm", PUSH_ALARM_SINGLE, account_id="acct-1")
kinds = sorted(e.kind for e in events)
assert kinds == ["alarm", "position_fix"]
def test_push_heartbeat_kind() -> None:
events = parse_raw("tracksolid_push", "pushhb", PUSH_HEARTBEAT_SINGLE, account_id="acct-1")
assert len(events) == 1
assert events[0].kind == "heartbeat"
assert events[0].payload["power_level"] == 88
def test_push_event_login() -> None:
events = parse_raw("tracksolid_push", "pushevent", PUSH_EVENT_LOGIN, account_id="acct-1")
assert len(events) == 1
assert events[0].kind == "device_login"
def test_poll_location_list_yields_one_event_per_device() -> None:
events = parse_raw(
"tracksolid_poll_list", None, POLL_LOCATION_LIST_RESPONSE, account_id="acct-1"
)
assert len(events) == 2
assert {e.imei for e in events} == {"860112050000001", "860112050000002"}
assert all(e.kind == "position_fix" for e in events)
def test_unsupported_msg_type_raises() -> None:
with pytest.raises(UnsupportedMsgType):
parse_raw("tracksolid_push", "pushobd", {}, account_id="acct-1")
def test_tracksolid_real_poll_list_drops_offline_keeps_valid() -> None:
"""Production Tracksolid response: 1 offline JC400P (null gpsTime), 1 GT06E with a fix.
Parser should skip the offline device silently and emit one position_fix."""
events = parse_raw(
"tracksolid_poll_list",
None,
TRACKSOLID_POLL_LIST_REAL,
account_id="Fireside Communications",
)
assert len(events) == 1
ev = events[0]
assert ev.imei == "868000000000123"
assert ev.kind == "position_fix"
assert ev.payload["lat"] == -1.2864
assert ev.payload["speed_kmh"] == 42.5