fleet-platform/app/projectors/live_positions.py

313 lines
11 KiB
Python
Raw Normal View History

"""Single-writer projector for state.live_positions + state.position_history.
Drains events.parsed of kind 'position_fix' that haven't been projected yet.
Each device (imei) owns one row in state.live_positions; the tracker-vs-camera
dedup happens at read time in serve.fn_live_view, not here.
Ordering invariant: state.live_positions never moves backwards in occurred_at
*per imei* (older fixes from the same device do not overwrite newer ones).
"""
import re
from datetime import datetime
from typing import Any
import structlog
from psycopg import AsyncCursor
from psycopg.types.json import Jsonb
from app.db import get_pool
from app.parsers.jimi import PARSER_VERSION
log = structlog.get_logger("projector.live_positions")
DRAIN_BATCH = 500
PROJECTED_FLAG_KEY = "live_positions_projected_at"
# Low-accuracy positioning modes (cell-tower / wifi) reported in Tracksolid's
# `posType`. These can be kilometres off a real GPS fix, so we don't let one
# overwrite a *fresh* GPS fix in the live view (it would teleport the marker).
# A GPS fix older than this window does yield to LBS/WIFI, so a genuinely
# GPS-dark vehicle still updates rather than freezing forever.
LOW_ACCURACY_POS_TYPES = ("LBS", "WIFI")
GPS_PREFERENCE_WINDOW_SQL = "interval '10 minutes'"
_PLATE_FROM_DEVICE_NAME = re.compile(r"^.* - (.+?)(?:_cam|_CAM)?$")
_CAMERA_NAME_RE = re.compile(r"_cam$", re.IGNORECASE)
def _classify_device_type(device_name: str | None) -> str:
"""Tracksolid device_name carries a '_cam' suffix for camera units
(e.g. "John Mbugua - KDW 573B_cam"). Everything else is a GPS tracker.
The tracker-first dedup in serve.fn_live_view (PRD F1.6) depends on this
classification: when a camera and a tracker share one vehicle_id, the
accurate tracker fix must win. Provisioning every device as 'tracker'
silently defeats that tie-break, so classify at first sight here.
"""
if device_name and _CAMERA_NAME_RE.search(device_name):
return "camera"
return "tracker"
def _extract_plate_from_device_name(device_name: str | None) -> str | None:
"""Same logic as serve._label_short / serve._driver_name in SQL — pulled into
Python so the auto-provisioner can link new devices to existing vehicle rows
by real plate (instead of creating a placeholder IMEI- row every time)."""
if not device_name:
return None
m = _PLATE_FROM_DEVICE_NAME.match(device_name)
if not m:
return None
candidate = m.group(1)
if not candidate or not re.search(r"[A-Z]", candidate) or not re.search(r"[0-9]", candidate):
return None
return candidate
async def _resolve_device(
cur: AsyncCursor[Any], imei: str, *, account_id: str | None, device_name: str | None
) -> int | None:
"""Returns vehicle_id for the device.
Auto-provisions on first sight. If device_name carries a derivable plate
(e.g. "John Mbugua - KDW 573B_cam" "KDW 573B"), we link the new device to
the existing vehicle row that already has that plate so a camera that
joins a truck already tracked by an X3 ends up on the same vehicle_id, and
serve.fn_live_view's tracker-first dedup picks the right fix. When the
plate isn't derivable we fall back to a placeholder "IMEI-LAST6" vehicle.
"""
await cur.execute(
"SELECT vehicle_id FROM domain.devices WHERE imei = %s",
(imei,),
)
row = await cur.fetchone()
if row is not None:
return None if row[0] is None else int(row[0])
if not account_id:
return None
await cur.execute(
"""INSERT INTO domain.accounts (account_id, name, app_key)
VALUES (%s, %s, '')
ON CONFLICT (account_id) DO NOTHING""",
(account_id, account_id),
)
real_plate = _extract_plate_from_device_name(device_name)
vehicle_id: int | None = None
if real_plate is not None:
await cur.execute(
"SELECT vehicle_id FROM domain.vehicles WHERE plate = %s",
(real_plate,),
)
row = await cur.fetchone()
if row is not None:
vehicle_id = int(row[0])
log.info(
"projector.linked_to_existing_vehicle",
imei=imei, vehicle_id=vehicle_id, plate=real_plate,
)
if vehicle_id is None:
plate = real_plate or f"IMEI-{imei[-6:]}"
await cur.execute(
"""INSERT INTO domain.vehicles (plate) VALUES (%s)
ON CONFLICT (plate) DO UPDATE SET plate = EXCLUDED.plate
RETURNING vehicle_id""",
(plate,),
)
row = await cur.fetchone()
assert row is not None
vehicle_id = int(row[0])
log.info(
"projector.auto_provisioned_vehicle",
imei=imei, vehicle_id=vehicle_id, plate=plate,
)
device_type = _classify_device_type(device_name)
await cur.execute(
"""INSERT INTO domain.devices
(imei, account_id, vehicle_id, device_type, lifecycle, activation_at)
VALUES (%s, %s, %s, %s, 'active', now())
ON CONFLICT (imei) DO UPDATE
SET account_id = EXCLUDED.account_id,
vehicle_id = COALESCE(domain.devices.vehicle_id, EXCLUDED.vehicle_id),
device_type = EXCLUDED.device_type,
lifecycle = CASE WHEN domain.devices.lifecycle = 'provisioned'
THEN 'active' ELSE domain.devices.lifecycle END""",
(imei, account_id, vehicle_id, device_type),
)
return vehicle_id
async def _project_one(
cur: AsyncCursor[Any],
*,
occurred_at: datetime,
imei: str,
payload: dict[str, Any],
) -> bool:
lat = payload.get("lat")
lng = payload.get("lng")
if lat is None or lng is None:
return False
vehicle_id = await _resolve_device(
cur,
imei,
account_id=payload.get("_account_id"),
device_name=payload.get("device_name"),
)
if vehicle_id is None:
return False
geom_wkt = f"POINT({lng} {lat})"
acc_int = payload.get("acc") if isinstance(payload.get("acc"), int) else None
await cur.execute(
"""
INSERT INTO state.live_positions (
imei, vehicle_id, occurred_at, geom, speed_kmh, direction_deg,
acc_state, source, parser_version, updated_at,
mc_type, current_mileage_km, gps_signal, satellites, device_name, pos_type
) VALUES (
%s, %s, %s, ST_SetSRID(ST_GeomFromText(%s), 4326),
%s, %s, %s, %s, %s, now(),
%s, %s, %s, %s, %s, %s
)
ON CONFLICT (imei) DO UPDATE
SET vehicle_id = EXCLUDED.vehicle_id,
occurred_at = EXCLUDED.occurred_at,
geom = EXCLUDED.geom,
speed_kmh = EXCLUDED.speed_kmh,
direction_deg = EXCLUDED.direction_deg,
acc_state = EXCLUDED.acc_state,
source = EXCLUDED.source,
parser_version = EXCLUDED.parser_version,
updated_at = now(),
mc_type = EXCLUDED.mc_type,
current_mileage_km = EXCLUDED.current_mileage_km,
gps_signal = EXCLUDED.gps_signal,
satellites = EXCLUDED.satellites,
device_name = EXCLUDED.device_name,
pos_type = EXCLUDED.pos_type
WHERE EXCLUDED.occurred_at > state.live_positions.occurred_at
AND NOT COALESCE(
EXCLUDED.pos_type = ANY (%s)
AND state.live_positions.pos_type = 'GPS'
AND state.live_positions.occurred_at
> now() - """ + GPS_PREFERENCE_WINDOW_SQL + """,
false
)
""",
(
imei, vehicle_id, occurred_at, geom_wkt,
payload.get("speed_kmh"),
payload.get("direction_deg"),
acc_int,
payload.get("source") or "unknown",
PARSER_VERSION,
payload.get("mc_type"),
payload.get("current_mileage_km"),
payload.get("gps_signal"),
payload.get("satellites"),
payload.get("device_name"),
payload.get("pos_type"),
list(LOW_ACCURACY_POS_TYPES),
),
)
# De-dupe at write time: a parked device re-reports the same gpsTime on
# every poll, which would otherwise pile identical (imei, occurred_at) rows
# into the history hypertable and inflate the trip "fix count". Single
# writer + the (imei, occurred_at) index make the NOT EXISTS check cheap,
# and it needs no unique constraint — so it can't fail if deployed before
# the dedup migration.
await cur.execute(
"""
INSERT INTO state.position_history (
vehicle_id, imei, occurred_at, geom, speed_kmh, direction_deg,
acc_state, altitude_m, satellites, source, parser_version,
mc_type, current_mileage_km, gps_signal, pos_type
)
SELECT
%s, %s, %s, ST_SetSRID(ST_GeomFromText(%s), 4326),
%s, %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s
WHERE NOT EXISTS (
SELECT 1 FROM state.position_history p
WHERE p.imei = %s AND p.occurred_at = %s
)
""",
(
vehicle_id, imei, occurred_at, geom_wkt,
payload.get("speed_kmh"),
payload.get("direction_deg"),
acc_int,
payload.get("altitude_m"),
payload.get("satellites"),
payload.get("source") or "unknown",
PARSER_VERSION,
payload.get("mc_type"),
payload.get("current_mileage_km"),
payload.get("gps_signal"),
payload.get("pos_type"),
imei, occurred_at,
),
)
return True
async def drain() -> int:
"""Process pending position_fix events. Returns count drained."""
pool = await get_pool()
processed = 0
async with pool.connection() as conn, conn.transaction(), conn.cursor() as cur:
await cur.execute(
"""
SELECT parsed_id, occurred_at, imei, account_id, payload
FROM events.parsed
WHERE kind = 'position_fix'
AND NOT (payload ? %s)
ORDER BY occurred_at
FOR UPDATE SKIP LOCKED
LIMIT %s
""",
(PROJECTED_FLAG_KEY, DRAIN_BATCH),
)
rows = await cur.fetchall()
for _, occurred_at, imei, account_id, payload in rows:
payload_with_acct = dict(payload, _account_id=account_id)
try:
await _project_one(
cur,
occurred_at=occurred_at,
imei=imei,
payload=payload_with_acct,
)
except Exception:
log.exception("projector.failed", imei=imei)
raise
if rows:
await cur.executemany(
"""
UPDATE events.parsed
SET payload = payload || %s::jsonb
WHERE parsed_id = %s AND occurred_at = %s
""",
[
(Jsonb({PROJECTED_FLAG_KEY: "now"}), int(pid), occ)
for pid, occ, _, _, _ in rows
],
)
processed = len(rows)
if processed:
log.info("projector.drained", count=processed)
return processed