fleet-platform/app/parsers/jimi.py

241 lines
7.5 KiB
Python

from dataclasses import dataclass
from datetime import datetime
from typing import Any
from app.models.jimi import (
JimiPollFix,
JimiPushAlarm,
JimiPushEvent,
JimiPushGps,
JimiPushHeartbeat,
)
PARSER_VERSION = 1
@dataclass(slots=True, frozen=True)
class ParsedEvent:
kind: str
occurred_at: datetime
imei: str
account_id: str | None
payload: dict[str, Any]
class UnsupportedMsgType(Exception):
pass
def _is_valid_fix(lat: float | None, lng: float | None) -> bool:
if lat is None or lng is None:
return False
if lat == 0.0 and lng == 0.0:
return False # zero island — sensor error
if not (-90.0 <= lat <= 90.0):
return False
return -180.0 <= lng <= 180.0
def _items_from_payload(payload: dict[str, Any]) -> list[dict[str, Any]]:
"""Coerce the gateway-stored payload into a list of dict items.
The gateway stores the verbatim Tracksolid form `data` field as either
a dict (single object) or {"_list": [...]} (array). Some callers may pass
{"_raw": "..."} if JSON parsing failed at the gateway.
"""
raw_list = payload.get("_list")
if isinstance(raw_list, list):
return [item for item in raw_list if isinstance(item, dict)]
if "_raw" in payload:
return []
return [payload]
def _fix_payload(
lat: float,
lng: float,
*,
speed_kmh: float | None,
direction_deg: float | None,
altitude_m: float | None = None,
satellites: int | None = None,
acc: Any = None,
source: str,
) -> dict[str, Any]:
return {
"lat": lat,
"lng": lng,
"speed_kmh": speed_kmh,
"direction_deg": direction_deg,
"altitude_m": altitude_m,
"satellites": satellites,
"acc": acc,
"source": source,
}
def _parse_push_gps(payload: dict[str, Any], account_id: str | None) -> list[ParsedEvent]:
out: list[ParsedEvent] = []
for item in _items_from_payload(payload):
model = JimiPushGps.model_validate(item)
if not _is_valid_fix(model.lat, model.lng):
continue
out.append(
ParsedEvent(
kind="position_fix",
occurred_at=model.gps_time,
imei=model.imei,
account_id=account_id,
payload=_fix_payload(
model.lat,
model.lng,
speed_kmh=model.speed_kmh,
direction_deg=model.direction_deg,
altitude_m=model.altitude_m,
satellites=model.satellites,
acc=model.acc,
source="tracksolid_push",
),
)
)
return out
def _parse_push_alarm(payload: dict[str, Any], account_id: str | None) -> list[ParsedEvent]:
out: list[ParsedEvent] = []
for item in _items_from_payload(payload):
item_copy = dict(item)
item_copy.setdefault("imei", item.get("deviceImei") or item.get("imei"))
model = JimiPushAlarm.model_validate(item_copy)
out.append(
ParsedEvent(
kind="alarm",
occurred_at=model.alarm_time,
imei=model.imei,
account_id=account_id,
payload={
"alarm_type": model.alarm_type,
"alarm_name": model.alarm_name,
"lat": model.lat,
"lng": model.lng,
"speed_kmh": model.speed_kmh,
"device_name": model.device_name,
},
)
)
if _is_valid_fix(model.lat, model.lng):
assert model.lat is not None and model.lng is not None
out.append(
ParsedEvent(
kind="position_fix",
occurred_at=model.alarm_time,
imei=model.imei,
account_id=account_id,
payload=_fix_payload(
model.lat,
model.lng,
speed_kmh=model.speed_kmh,
direction_deg=None,
source="tracksolid_push_alarm_crossfeed",
),
)
)
return out
def _parse_push_heartbeat(payload: dict[str, Any], account_id: str | None) -> list[ParsedEvent]:
out: list[ParsedEvent] = []
for item in _items_from_payload(payload):
model = JimiPushHeartbeat.model_validate(item)
out.append(
ParsedEvent(
kind="heartbeat",
occurred_at=model.gate_time,
imei=model.imei,
account_id=account_id,
payload={
"power_level": model.power_level,
"gsm_signal": model.gsm_signal,
"acc": model.acc,
"power_status": model.power_status,
},
)
)
return out
def _parse_push_event(payload: dict[str, Any], account_id: str | None) -> list[ParsedEvent]:
out: list[ParsedEvent] = []
for item in _items_from_payload(payload):
model = JimiPushEvent.model_validate(item)
kind = "device_login" if model.event_type.upper() == "LOGIN" else "device_logout"
out.append(
ParsedEvent(
kind=kind,
occurred_at=model.event_time,
imei=model.imei,
account_id=account_id,
payload={"event_type": model.event_type, "timezone": model.timezone_str},
)
)
return out
def _items_for_poll(payload: dict[str, Any]) -> list[dict[str, Any]]:
"""Polled list/get responses come back wrapped under various keys."""
for key in ("_list", "data", "records"):
v = payload.get(key)
if isinstance(v, list):
return [item for item in v if isinstance(item, dict)]
if "_raw" in payload:
return []
return [payload]
def _parse_poll_list(payload: dict[str, Any], account_id: str | None) -> list[ParsedEvent]:
out: list[ParsedEvent] = []
for item in _items_for_poll(payload):
model = JimiPollFix.model_validate(item)
if not _is_valid_fix(model.lat, model.lng):
continue
out.append(
ParsedEvent(
kind="position_fix",
occurred_at=model.gps_time,
imei=model.imei,
account_id=account_id,
payload=_fix_payload(
model.lat,
model.lng,
speed_kmh=model.speed_kmh,
direction_deg=model.direction_deg,
altitude_m=model.altitude_m,
satellites=model.satellites,
acc=model.acc,
source="tracksolid_poll",
),
)
)
return out
_DISPATCH = {
("tracksolid_push", "pushgps"): _parse_push_gps,
("tracksolid_push", "pushalarm"): _parse_push_alarm,
("tracksolid_push", "pushhb"): _parse_push_heartbeat,
("tracksolid_push", "pushevent"): _parse_push_event,
("tracksolid_poll_list", None): _parse_poll_list,
("tracksolid_poll_get", None): _parse_poll_list,
}
def parse_raw(
source: str,
msg_type: str | None,
payload: dict[str, Any],
account_id: str | None,
) -> list[ParsedEvent]:
handler = _DISPATCH.get((source, msg_type)) or _DISPATCH.get((source, None))
if handler is None:
raise UnsupportedMsgType(f"no parser for source={source!r} msg_type={msg_type!r}")
return handler(payload, account_id)