"""Single-writer projector for state.live_positions + state.position_history. Drains events.parsed of kind 'position_fix' that haven't been projected yet. Each device (imei) owns one row in state.live_positions; the tracker-vs-camera dedup happens at read time in serve.fn_live_view, not here. Ordering invariant: state.live_positions never moves backwards in occurred_at *per imei* (older fixes from the same device do not overwrite newer ones). """ from datetime import datetime from typing import Any import structlog from psycopg import AsyncCursor from psycopg.types.json import Jsonb from app.db import get_pool from app.parsers.jimi import PARSER_VERSION log = structlog.get_logger("projector.live_positions") DRAIN_BATCH = 500 PROJECTED_FLAG_KEY = "live_positions_projected_at" async def _resolve_device(cur: AsyncCursor[Any], imei: str) -> int | None: """Returns vehicle_id for the device, or None if unmapped/unknown.""" await cur.execute( "SELECT vehicle_id FROM domain.devices WHERE imei = %s", (imei,), ) row = await cur.fetchone() if row is None or row[0] is None: return None return int(row[0]) async def _project_one( cur: AsyncCursor[Any], *, occurred_at: datetime, imei: str, payload: dict[str, Any], ) -> bool: lat = payload.get("lat") lng = payload.get("lng") if lat is None or lng is None: return False vehicle_id = await _resolve_device(cur, imei) if vehicle_id is None: return False geom_wkt = f"POINT({lng} {lat})" await cur.execute( """ INSERT INTO state.live_positions ( imei, vehicle_id, occurred_at, geom, speed_kmh, direction_deg, acc_state, source, parser_version, updated_at ) VALUES ( %s, %s, %s, ST_SetSRID(ST_GeomFromText(%s), 4326), %s, %s, %s, %s, %s, now() ) ON CONFLICT (imei) DO UPDATE SET vehicle_id = EXCLUDED.vehicle_id, occurred_at = EXCLUDED.occurred_at, geom = EXCLUDED.geom, speed_kmh = EXCLUDED.speed_kmh, direction_deg = EXCLUDED.direction_deg, acc_state = EXCLUDED.acc_state, source = EXCLUDED.source, parser_version = EXCLUDED.parser_version, updated_at = now() WHERE EXCLUDED.occurred_at > state.live_positions.occurred_at """, ( imei, vehicle_id, occurred_at, geom_wkt, payload.get("speed_kmh"), payload.get("direction_deg"), payload.get("acc") if isinstance(payload.get("acc"), int) else None, payload.get("source") or "unknown", PARSER_VERSION, ), ) await cur.execute( """ INSERT INTO state.position_history ( vehicle_id, imei, occurred_at, geom, speed_kmh, direction_deg, acc_state, altitude_m, satellites, source, parser_version ) VALUES ( %s, %s, %s, ST_SetSRID(ST_GeomFromText(%s), 4326), %s, %s, %s, %s, %s, %s, %s ) """, ( vehicle_id, imei, occurred_at, geom_wkt, payload.get("speed_kmh"), payload.get("direction_deg"), payload.get("acc") if isinstance(payload.get("acc"), int) else None, payload.get("altitude_m"), payload.get("satellites"), payload.get("source") or "unknown", PARSER_VERSION, ), ) return True async def drain() -> int: """Process pending position_fix events. Returns count drained.""" pool = await get_pool() processed = 0 async with pool.connection() as conn, conn.transaction(), conn.cursor() as cur: await cur.execute( """ SELECT parsed_id, occurred_at, imei, payload FROM events.parsed WHERE kind = 'position_fix' AND NOT (payload ? %s) ORDER BY occurred_at FOR UPDATE SKIP LOCKED LIMIT %s """, (PROJECTED_FLAG_KEY, DRAIN_BATCH), ) rows = await cur.fetchall() for _, occurred_at, imei, payload in rows: try: await _project_one( cur, occurred_at=occurred_at, imei=imei, payload=payload, ) except Exception: log.exception("projector.failed", imei=imei) raise if rows: await cur.executemany( """ UPDATE events.parsed SET payload = payload || %s::jsonb WHERE parsed_id = %s AND occurred_at = %s """, [ (Jsonb({PROJECTED_FLAG_KEY: "now"}), int(pid), occ) for pid, occ, _, _ in rows ], ) processed = len(rows) if processed: log.info("projector.drained", count=processed) return processed