288 lines
9.2 KiB
Python
288 lines
9.2 KiB
Python
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
from typing import Any
|
|
|
|
from app.models.jimi import (
|
|
JimiPollFix,
|
|
JimiPushAlarm,
|
|
JimiPushEvent,
|
|
JimiPushGps,
|
|
JimiPushHeartbeat,
|
|
)
|
|
|
|
PARSER_VERSION = 1
|
|
|
|
|
|
@dataclass(slots=True, frozen=True)
|
|
class ParsedEvent:
|
|
kind: str
|
|
occurred_at: datetime
|
|
imei: str
|
|
account_id: str | None
|
|
payload: dict[str, Any]
|
|
|
|
|
|
class UnsupportedMsgType(Exception):
|
|
pass
|
|
|
|
|
|
def _is_valid_fix(lat: float | None, lng: float | None) -> bool:
|
|
if lat is None or lng is None:
|
|
return False
|
|
if lat == 0.0 and lng == 0.0:
|
|
return False # zero island — sensor error
|
|
if not (-90.0 <= lat <= 90.0):
|
|
return False
|
|
return -180.0 <= lng <= 180.0
|
|
|
|
|
|
def _items_from_payload(payload: dict[str, Any]) -> list[dict[str, Any]]:
|
|
"""Coerce the gateway-stored payload into a list of dict items.
|
|
|
|
The gateway stores the verbatim Tracksolid form `data` field as either
|
|
a dict (single object) or {"_list": [...]} (array). Some callers may pass
|
|
{"_raw": "..."} if JSON parsing failed at the gateway.
|
|
"""
|
|
raw_list = payload.get("_list")
|
|
if isinstance(raw_list, list):
|
|
return [item for item in raw_list if isinstance(item, dict)]
|
|
if "_raw" in payload:
|
|
return []
|
|
return [payload]
|
|
|
|
|
|
def _fix_payload(
|
|
lat: float,
|
|
lng: float,
|
|
*,
|
|
speed_kmh: float | None,
|
|
direction_deg: float | None,
|
|
altitude_m: float | None = None,
|
|
satellites: int | None = None,
|
|
acc: Any = None,
|
|
source: str,
|
|
mc_type: str | None = None,
|
|
device_name: str | None = None,
|
|
current_mileage_km: float | None = None,
|
|
gps_signal: int | None = None,
|
|
pos_type: str | None = None,
|
|
) -> dict[str, Any]:
|
|
return {
|
|
"lat": lat,
|
|
"lng": lng,
|
|
"speed_kmh": speed_kmh,
|
|
"direction_deg": direction_deg,
|
|
"altitude_m": altitude_m,
|
|
"satellites": satellites,
|
|
"acc": acc,
|
|
"source": source,
|
|
"mc_type": mc_type,
|
|
"device_name": device_name,
|
|
"current_mileage_km": current_mileage_km,
|
|
"gps_signal": gps_signal,
|
|
"pos_type": pos_type,
|
|
}
|
|
|
|
|
|
def _parse_push_gps(payload: dict[str, Any], account_id: str | None) -> list[ParsedEvent]:
|
|
out: list[ParsedEvent] = []
|
|
for item in _items_from_payload(payload):
|
|
model = JimiPushGps.model_validate(item)
|
|
if not _is_valid_fix(model.lat, model.lng):
|
|
continue
|
|
out.append(
|
|
ParsedEvent(
|
|
kind="position_fix",
|
|
occurred_at=model.gps_time,
|
|
imei=model.imei,
|
|
account_id=account_id,
|
|
payload=_fix_payload(
|
|
model.lat,
|
|
model.lng,
|
|
speed_kmh=model.speed_kmh,
|
|
direction_deg=model.direction_deg,
|
|
altitude_m=model.altitude_m,
|
|
satellites=model.satellites,
|
|
acc=model.acc,
|
|
source="tracksolid_push",
|
|
),
|
|
)
|
|
)
|
|
return out
|
|
|
|
|
|
def _parse_push_alarm(payload: dict[str, Any], account_id: str | None) -> list[ParsedEvent]:
|
|
out: list[ParsedEvent] = []
|
|
for item in _items_from_payload(payload):
|
|
item_copy = dict(item)
|
|
item_copy.setdefault("imei", item.get("deviceImei") or item.get("imei"))
|
|
model = JimiPushAlarm.model_validate(item_copy)
|
|
out.append(
|
|
ParsedEvent(
|
|
kind="alarm",
|
|
occurred_at=model.alarm_time,
|
|
imei=model.imei,
|
|
account_id=account_id,
|
|
payload={
|
|
"alarm_type": model.alarm_type,
|
|
"alarm_name": model.alarm_name,
|
|
"lat": model.lat,
|
|
"lng": model.lng,
|
|
"speed_kmh": model.speed_kmh,
|
|
"device_name": model.device_name,
|
|
},
|
|
)
|
|
)
|
|
if _is_valid_fix(model.lat, model.lng):
|
|
assert model.lat is not None and model.lng is not None
|
|
out.append(
|
|
ParsedEvent(
|
|
kind="position_fix",
|
|
occurred_at=model.alarm_time,
|
|
imei=model.imei,
|
|
account_id=account_id,
|
|
payload=_fix_payload(
|
|
model.lat,
|
|
model.lng,
|
|
speed_kmh=model.speed_kmh,
|
|
direction_deg=None,
|
|
source="tracksolid_push_alarm_crossfeed",
|
|
),
|
|
)
|
|
)
|
|
return out
|
|
|
|
|
|
def _parse_push_heartbeat(payload: dict[str, Any], account_id: str | None) -> list[ParsedEvent]:
|
|
out: list[ParsedEvent] = []
|
|
for item in _items_from_payload(payload):
|
|
model = JimiPushHeartbeat.model_validate(item)
|
|
out.append(
|
|
ParsedEvent(
|
|
kind="heartbeat",
|
|
occurred_at=model.gate_time,
|
|
imei=model.imei,
|
|
account_id=account_id,
|
|
payload={
|
|
"power_level": model.power_level,
|
|
"gsm_signal": model.gsm_signal,
|
|
"acc": model.acc,
|
|
"power_status": model.power_status,
|
|
},
|
|
)
|
|
)
|
|
return out
|
|
|
|
|
|
def _parse_push_event(payload: dict[str, Any], account_id: str | None) -> list[ParsedEvent]:
|
|
out: list[ParsedEvent] = []
|
|
for item in _items_from_payload(payload):
|
|
model = JimiPushEvent.model_validate(item)
|
|
kind = "device_login" if model.event_type.upper() == "LOGIN" else "device_logout"
|
|
out.append(
|
|
ParsedEvent(
|
|
kind=kind,
|
|
occurred_at=model.event_time,
|
|
imei=model.imei,
|
|
account_id=account_id,
|
|
payload={"event_type": model.event_type, "timezone": model.timezone_str},
|
|
)
|
|
)
|
|
return out
|
|
|
|
|
|
def _items_for_poll(payload: dict[str, Any]) -> list[dict[str, Any]]:
|
|
"""Polled list/get responses come back wrapped under various keys.
|
|
|
|
Tracksolid Pro envelope is `{code, msg, result: [...]}` — extract `result`.
|
|
Synthetic fixtures and legacy formats may use `_list`, `data`, or `records`.
|
|
"""
|
|
for key in ("result", "_list", "data", "records"):
|
|
v = payload.get(key)
|
|
if isinstance(v, list):
|
|
return [item for item in v if isinstance(item, dict)]
|
|
if isinstance(v, dict):
|
|
return [v]
|
|
if "_raw" in payload:
|
|
return []
|
|
return [payload]
|
|
|
|
|
|
def _parse_poll_list(payload: dict[str, Any], account_id: str | None) -> list[ParsedEvent]:
|
|
"""Per-item tolerant: a polled batch of 55 devices may include 20 offline
|
|
cameras with null gpsTime/lat/lng. Skip those silently; surface only
|
|
truly unexpected shapes as parser errors."""
|
|
out: list[ParsedEvent] = []
|
|
for item in _items_for_poll(payload):
|
|
try:
|
|
model = JimiPollFix.model_validate(item)
|
|
except Exception:
|
|
continue
|
|
if model.gps_time is None:
|
|
continue
|
|
if not _is_valid_fix(model.lat, model.lng):
|
|
continue
|
|
assert model.lat is not None and model.lng is not None
|
|
|
|
# Tracksolid sends these as strings in the wire payload; coerce now.
|
|
def _f(v: Any) -> float | None:
|
|
if v in (None, ""):
|
|
return None
|
|
try:
|
|
return float(v)
|
|
except (TypeError, ValueError):
|
|
return None
|
|
|
|
def _i(v: Any) -> int | None:
|
|
f = _f(v)
|
|
return int(f) if f is not None else None
|
|
|
|
raw_signal = item.get("gpsSignal")
|
|
raw_mileage = item.get("currentMileage")
|
|
|
|
out.append(
|
|
ParsedEvent(
|
|
kind="position_fix",
|
|
occurred_at=model.gps_time,
|
|
imei=model.imei,
|
|
account_id=account_id,
|
|
payload=_fix_payload(
|
|
model.lat,
|
|
model.lng,
|
|
speed_kmh=model.speed_kmh,
|
|
direction_deg=model.direction_deg,
|
|
altitude_m=model.altitude_m,
|
|
satellites=model.satellites,
|
|
acc=model.acc,
|
|
source="tracksolid_poll",
|
|
mc_type=model.mc_type,
|
|
device_name=model.device_name,
|
|
current_mileage_km=_f(raw_mileage),
|
|
gps_signal=_i(raw_signal),
|
|
pos_type=str(model.pos_type) if model.pos_type is not None else None,
|
|
),
|
|
)
|
|
)
|
|
return out
|
|
|
|
|
|
_DISPATCH = {
|
|
("tracksolid_push", "pushgps"): _parse_push_gps,
|
|
("tracksolid_push", "pushalarm"): _parse_push_alarm,
|
|
("tracksolid_push", "pushhb"): _parse_push_heartbeat,
|
|
("tracksolid_push", "pushevent"): _parse_push_event,
|
|
("tracksolid_poll_list", None): _parse_poll_list,
|
|
("tracksolid_poll_get", None): _parse_poll_list,
|
|
}
|
|
|
|
|
|
def parse_raw(
|
|
source: str,
|
|
msg_type: str | None,
|
|
payload: dict[str, Any],
|
|
account_id: str | None,
|
|
) -> list[ParsedEvent]:
|
|
handler = _DISPATCH.get((source, msg_type)) or _DISPATCH.get((source, None))
|
|
if handler is None:
|
|
raise UnsupportedMsgType(f"no parser for source={source!r} msg_type={msg_type!r}")
|
|
return handler(payload, account_id)
|