fleet-platform/app/projectors/live_positions.py

228 lines
7.9 KiB
Python
Raw Normal View History

"""Single-writer projector for state.live_positions + state.position_history.
Drains events.parsed of kind 'position_fix' that haven't been projected yet.
Each device (imei) owns one row in state.live_positions; the tracker-vs-camera
dedup happens at read time in serve.fn_live_view, not here.
Ordering invariant: state.live_positions never moves backwards in occurred_at
*per imei* (older fixes from the same device do not overwrite newer ones).
"""
from datetime import datetime
from typing import Any
import structlog
from psycopg import AsyncCursor
from psycopg.types.json import Jsonb
from app.db import get_pool
from app.parsers.jimi import PARSER_VERSION
log = structlog.get_logger("projector.live_positions")
DRAIN_BATCH = 500
PROJECTED_FLAG_KEY = "live_positions_projected_at"
async def _resolve_device(
cur: AsyncCursor[Any], imei: str, *, account_id: str | None
) -> int | None:
"""Returns vehicle_id for the device.
Auto-provisions on first sight: when the polling worker sees an IMEI we've
never seen before, we create a placeholder vehicle (plate = "IMEI-<last6>")
and a device row with lifecycle='active'. The fleet admin can rename the
plate later via the (forthcoming) admin UI; until then the device is fully
operational.
Returns None only when the IMEI is known but unmapped (vehicle_id IS NULL),
which shouldn't happen via this auto-provision path but is preserved for
manual edits.
"""
await cur.execute(
"SELECT vehicle_id FROM domain.devices WHERE imei = %s",
(imei,),
)
row = await cur.fetchone()
if row is not None:
return None if row[0] is None else int(row[0])
if not account_id:
return None
await cur.execute(
"""INSERT INTO domain.accounts (account_id, name, app_key)
VALUES (%s, %s, '')
ON CONFLICT (account_id) DO NOTHING""",
(account_id, account_id),
)
plate = f"IMEI-{imei[-6:]}"
await cur.execute(
"""INSERT INTO domain.vehicles (plate) VALUES (%s)
ON CONFLICT (plate) DO UPDATE SET plate = EXCLUDED.plate
RETURNING vehicle_id""",
(plate,),
)
row = await cur.fetchone()
assert row is not None
vehicle_id = int(row[0])
await cur.execute(
"""INSERT INTO domain.devices
(imei, account_id, vehicle_id, device_type, lifecycle, activation_at)
VALUES (%s, %s, %s, 'tracker', 'active', now())
ON CONFLICT (imei) DO UPDATE
SET account_id = EXCLUDED.account_id,
vehicle_id = COALESCE(domain.devices.vehicle_id, EXCLUDED.vehicle_id),
lifecycle = CASE WHEN domain.devices.lifecycle = 'provisioned'
THEN 'active' ELSE domain.devices.lifecycle END""",
(imei, account_id, vehicle_id),
)
log.info("projector.auto_provisioned_device", imei=imei, vehicle_id=vehicle_id, plate=plate)
return vehicle_id
async def _project_one(
cur: AsyncCursor[Any],
*,
occurred_at: datetime,
imei: str,
payload: dict[str, Any],
) -> bool:
lat = payload.get("lat")
lng = payload.get("lng")
if lat is None or lng is None:
return False
vehicle_id = await _resolve_device(cur, imei, account_id=payload.get("_account_id"))
if vehicle_id is None:
return False
geom_wkt = f"POINT({lng} {lat})"
acc_int = payload.get("acc") if isinstance(payload.get("acc"), int) else None
await cur.execute(
"""
INSERT INTO state.live_positions (
imei, vehicle_id, occurred_at, geom, speed_kmh, direction_deg,
acc_state, source, parser_version, updated_at,
mc_type, current_mileage_km, gps_signal, satellites, device_name, pos_type
) VALUES (
%s, %s, %s, ST_SetSRID(ST_GeomFromText(%s), 4326),
%s, %s, %s, %s, %s, now(),
%s, %s, %s, %s, %s, %s
)
ON CONFLICT (imei) DO UPDATE
SET vehicle_id = EXCLUDED.vehicle_id,
occurred_at = EXCLUDED.occurred_at,
geom = EXCLUDED.geom,
speed_kmh = EXCLUDED.speed_kmh,
direction_deg = EXCLUDED.direction_deg,
acc_state = EXCLUDED.acc_state,
source = EXCLUDED.source,
parser_version = EXCLUDED.parser_version,
updated_at = now(),
mc_type = EXCLUDED.mc_type,
current_mileage_km = EXCLUDED.current_mileage_km,
gps_signal = EXCLUDED.gps_signal,
satellites = EXCLUDED.satellites,
device_name = EXCLUDED.device_name,
pos_type = EXCLUDED.pos_type
WHERE EXCLUDED.occurred_at > state.live_positions.occurred_at
""",
(
imei, vehicle_id, occurred_at, geom_wkt,
payload.get("speed_kmh"),
payload.get("direction_deg"),
acc_int,
payload.get("source") or "unknown",
PARSER_VERSION,
payload.get("mc_type"),
payload.get("current_mileage_km"),
payload.get("gps_signal"),
payload.get("satellites"),
payload.get("device_name"),
payload.get("pos_type"),
),
)
await cur.execute(
"""
INSERT INTO state.position_history (
vehicle_id, imei, occurred_at, geom, speed_kmh, direction_deg,
acc_state, altitude_m, satellites, source, parser_version,
mc_type, current_mileage_km, gps_signal, pos_type
) VALUES (
%s, %s, %s, ST_SetSRID(ST_GeomFromText(%s), 4326),
%s, %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s
)
""",
(
vehicle_id, imei, occurred_at, geom_wkt,
payload.get("speed_kmh"),
payload.get("direction_deg"),
acc_int,
payload.get("altitude_m"),
payload.get("satellites"),
payload.get("source") or "unknown",
PARSER_VERSION,
payload.get("mc_type"),
payload.get("current_mileage_km"),
payload.get("gps_signal"),
payload.get("pos_type"),
),
)
return True
async def drain() -> int:
"""Process pending position_fix events. Returns count drained."""
pool = await get_pool()
processed = 0
async with pool.connection() as conn, conn.transaction(), conn.cursor() as cur:
await cur.execute(
"""
SELECT parsed_id, occurred_at, imei, account_id, payload
FROM events.parsed
WHERE kind = 'position_fix'
AND NOT (payload ? %s)
ORDER BY occurred_at
FOR UPDATE SKIP LOCKED
LIMIT %s
""",
(PROJECTED_FLAG_KEY, DRAIN_BATCH),
)
rows = await cur.fetchall()
for _, occurred_at, imei, account_id, payload in rows:
payload_with_acct = dict(payload, _account_id=account_id)
try:
await _project_one(
cur,
occurred_at=occurred_at,
imei=imei,
payload=payload_with_acct,
)
except Exception:
log.exception("projector.failed", imei=imei)
raise
if rows:
await cur.executemany(
"""
UPDATE events.parsed
SET payload = payload || %s::jsonb
WHERE parsed_id = %s AND occurred_at = %s
""",
[
(Jsonb({PROJECTED_FLAG_KEY: "now"}), int(pid), occ)
for pid, occ, _, _, _ in rows
],
)
processed = len(rows)
if processed:
log.info("projector.drained", count=processed)
return processed