from dataclasses import dataclass from datetime import datetime from typing import Any from app.models.jimi import ( JimiPollFix, JimiPushAlarm, JimiPushEvent, JimiPushGps, JimiPushHeartbeat, ) PARSER_VERSION = 1 @dataclass(slots=True, frozen=True) class ParsedEvent: kind: str occurred_at: datetime imei: str account_id: str | None payload: dict[str, Any] class UnsupportedMsgType(Exception): pass def _is_valid_fix(lat: float | None, lng: float | None) -> bool: if lat is None or lng is None: return False if lat == 0.0 and lng == 0.0: return False # zero island — sensor error if not (-90.0 <= lat <= 90.0): return False return -180.0 <= lng <= 180.0 def _items_from_payload(payload: dict[str, Any]) -> list[dict[str, Any]]: """Coerce the gateway-stored payload into a list of dict items. The gateway stores the verbatim Tracksolid form `data` field as either a dict (single object) or {"_list": [...]} (array). Some callers may pass {"_raw": "..."} if JSON parsing failed at the gateway. """ raw_list = payload.get("_list") if isinstance(raw_list, list): return [item for item in raw_list if isinstance(item, dict)] if "_raw" in payload: return [] return [payload] def _fix_payload( lat: float, lng: float, *, speed_kmh: float | None, direction_deg: float | None, altitude_m: float | None = None, satellites: int | None = None, acc: Any = None, source: str, ) -> dict[str, Any]: return { "lat": lat, "lng": lng, "speed_kmh": speed_kmh, "direction_deg": direction_deg, "altitude_m": altitude_m, "satellites": satellites, "acc": acc, "source": source, } def _parse_push_gps(payload: dict[str, Any], account_id: str | None) -> list[ParsedEvent]: out: list[ParsedEvent] = [] for item in _items_from_payload(payload): model = JimiPushGps.model_validate(item) if not _is_valid_fix(model.lat, model.lng): continue out.append( ParsedEvent( kind="position_fix", occurred_at=model.gps_time, imei=model.imei, account_id=account_id, payload=_fix_payload( model.lat, model.lng, speed_kmh=model.speed_kmh, direction_deg=model.direction_deg, altitude_m=model.altitude_m, satellites=model.satellites, acc=model.acc, source="tracksolid_push", ), ) ) return out def _parse_push_alarm(payload: dict[str, Any], account_id: str | None) -> list[ParsedEvent]: out: list[ParsedEvent] = [] for item in _items_from_payload(payload): item_copy = dict(item) item_copy.setdefault("imei", item.get("deviceImei") or item.get("imei")) model = JimiPushAlarm.model_validate(item_copy) out.append( ParsedEvent( kind="alarm", occurred_at=model.alarm_time, imei=model.imei, account_id=account_id, payload={ "alarm_type": model.alarm_type, "alarm_name": model.alarm_name, "lat": model.lat, "lng": model.lng, "speed_kmh": model.speed_kmh, "device_name": model.device_name, }, ) ) if _is_valid_fix(model.lat, model.lng): assert model.lat is not None and model.lng is not None out.append( ParsedEvent( kind="position_fix", occurred_at=model.alarm_time, imei=model.imei, account_id=account_id, payload=_fix_payload( model.lat, model.lng, speed_kmh=model.speed_kmh, direction_deg=None, source="tracksolid_push_alarm_crossfeed", ), ) ) return out def _parse_push_heartbeat(payload: dict[str, Any], account_id: str | None) -> list[ParsedEvent]: out: list[ParsedEvent] = [] for item in _items_from_payload(payload): model = JimiPushHeartbeat.model_validate(item) out.append( ParsedEvent( kind="heartbeat", occurred_at=model.gate_time, imei=model.imei, account_id=account_id, payload={ "power_level": model.power_level, "gsm_signal": model.gsm_signal, "acc": model.acc, "power_status": model.power_status, }, ) ) return out def _parse_push_event(payload: dict[str, Any], account_id: str | None) -> list[ParsedEvent]: out: list[ParsedEvent] = [] for item in _items_from_payload(payload): model = JimiPushEvent.model_validate(item) kind = "device_login" if model.event_type.upper() == "LOGIN" else "device_logout" out.append( ParsedEvent( kind=kind, occurred_at=model.event_time, imei=model.imei, account_id=account_id, payload={"event_type": model.event_type, "timezone": model.timezone_str}, ) ) return out def _items_for_poll(payload: dict[str, Any]) -> list[dict[str, Any]]: """Polled list/get responses come back wrapped under various keys. Tracksolid Pro envelope is `{code, msg, result: [...]}` — extract `result`. Synthetic fixtures and legacy formats may use `_list`, `data`, or `records`. """ for key in ("result", "_list", "data", "records"): v = payload.get(key) if isinstance(v, list): return [item for item in v if isinstance(item, dict)] if isinstance(v, dict): return [v] if "_raw" in payload: return [] return [payload] def _parse_poll_list(payload: dict[str, Any], account_id: str | None) -> list[ParsedEvent]: """Per-item tolerant: a polled batch of 55 devices may include 20 offline cameras with null gpsTime/lat/lng. Skip those silently; surface only truly unexpected shapes as parser errors.""" out: list[ParsedEvent] = [] for item in _items_for_poll(payload): try: model = JimiPollFix.model_validate(item) except Exception: # malformed item — don't fail the whole batch; logged at parser continue if model.gps_time is None: continue if not _is_valid_fix(model.lat, model.lng): continue assert model.lat is not None and model.lng is not None out.append( ParsedEvent( kind="position_fix", occurred_at=model.gps_time, imei=model.imei, account_id=account_id, payload=_fix_payload( model.lat, model.lng, speed_kmh=model.speed_kmh, direction_deg=model.direction_deg, altitude_m=model.altitude_m, satellites=model.satellites, acc=model.acc, source="tracksolid_poll", ), ) ) return out _DISPATCH = { ("tracksolid_push", "pushgps"): _parse_push_gps, ("tracksolid_push", "pushalarm"): _parse_push_alarm, ("tracksolid_push", "pushhb"): _parse_push_heartbeat, ("tracksolid_push", "pushevent"): _parse_push_event, ("tracksolid_poll_list", None): _parse_poll_list, ("tracksolid_poll_get", None): _parse_poll_list, } def parse_raw( source: str, msg_type: str | None, payload: dict[str, Any], account_id: str | None, ) -> list[ParsedEvent]: handler = _DISPATCH.get((source, msg_type)) or _DISPATCH.get((source, None)) if handler is None: raise UnsupportedMsgType(f"no parser for source={source!r} msg_type={msg_type!r}") return handler(payload, account_id)