"""Single-writer projector for state.live_positions + state.position_history. Drains events.parsed of kind 'position_fix' that haven't been projected yet. Each device (imei) owns one row in state.live_positions; the tracker-vs-camera dedup happens at read time in serve.fn_live_view, not here. Ordering invariant: state.live_positions never moves backwards in occurred_at *per imei* (older fixes from the same device do not overwrite newer ones). """ from datetime import datetime from typing import Any import structlog from psycopg import AsyncCursor from psycopg.types.json import Jsonb from app.db import get_pool from app.parsers.jimi import PARSER_VERSION log = structlog.get_logger("projector.live_positions") DRAIN_BATCH = 500 PROJECTED_FLAG_KEY = "live_positions_projected_at" async def _resolve_device( cur: AsyncCursor[Any], imei: str, *, account_id: str | None ) -> int | None: """Returns vehicle_id for the device. Auto-provisions on first sight: when the polling worker sees an IMEI we've never seen before, we create a placeholder vehicle (plate = "IMEI-") and a device row with lifecycle='active'. The fleet admin can rename the plate later via the (forthcoming) admin UI; until then the device is fully operational. Returns None only when the IMEI is known but unmapped (vehicle_id IS NULL), which shouldn't happen via this auto-provision path but is preserved for manual edits. """ await cur.execute( "SELECT vehicle_id FROM domain.devices WHERE imei = %s", (imei,), ) row = await cur.fetchone() if row is not None: return None if row[0] is None else int(row[0]) if not account_id: return None await cur.execute( """INSERT INTO domain.accounts (account_id, name, app_key) VALUES (%s, %s, '') ON CONFLICT (account_id) DO NOTHING""", (account_id, account_id), ) plate = f"IMEI-{imei[-6:]}" await cur.execute( """INSERT INTO domain.vehicles (plate) VALUES (%s) ON CONFLICT (plate) DO UPDATE SET plate = EXCLUDED.plate RETURNING vehicle_id""", (plate,), ) row = await cur.fetchone() assert row is not None vehicle_id = int(row[0]) await cur.execute( """INSERT INTO domain.devices (imei, account_id, vehicle_id, device_type, lifecycle, activation_at) VALUES (%s, %s, %s, 'tracker', 'active', now()) ON CONFLICT (imei) DO UPDATE SET account_id = EXCLUDED.account_id, vehicle_id = COALESCE(domain.devices.vehicle_id, EXCLUDED.vehicle_id), lifecycle = CASE WHEN domain.devices.lifecycle = 'provisioned' THEN 'active' ELSE domain.devices.lifecycle END""", (imei, account_id, vehicle_id), ) log.info("projector.auto_provisioned_device", imei=imei, vehicle_id=vehicle_id, plate=plate) return vehicle_id async def _project_one( cur: AsyncCursor[Any], *, occurred_at: datetime, imei: str, payload: dict[str, Any], ) -> bool: lat = payload.get("lat") lng = payload.get("lng") if lat is None or lng is None: return False vehicle_id = await _resolve_device(cur, imei, account_id=payload.get("_account_id")) if vehicle_id is None: return False geom_wkt = f"POINT({lng} {lat})" await cur.execute( """ INSERT INTO state.live_positions ( imei, vehicle_id, occurred_at, geom, speed_kmh, direction_deg, acc_state, source, parser_version, updated_at ) VALUES ( %s, %s, %s, ST_SetSRID(ST_GeomFromText(%s), 4326), %s, %s, %s, %s, %s, now() ) ON CONFLICT (imei) DO UPDATE SET vehicle_id = EXCLUDED.vehicle_id, occurred_at = EXCLUDED.occurred_at, geom = EXCLUDED.geom, speed_kmh = EXCLUDED.speed_kmh, direction_deg = EXCLUDED.direction_deg, acc_state = EXCLUDED.acc_state, source = EXCLUDED.source, parser_version = EXCLUDED.parser_version, updated_at = now() WHERE EXCLUDED.occurred_at > state.live_positions.occurred_at """, ( imei, vehicle_id, occurred_at, geom_wkt, payload.get("speed_kmh"), payload.get("direction_deg"), payload.get("acc") if isinstance(payload.get("acc"), int) else None, payload.get("source") or "unknown", PARSER_VERSION, ), ) await cur.execute( """ INSERT INTO state.position_history ( vehicle_id, imei, occurred_at, geom, speed_kmh, direction_deg, acc_state, altitude_m, satellites, source, parser_version ) VALUES ( %s, %s, %s, ST_SetSRID(ST_GeomFromText(%s), 4326), %s, %s, %s, %s, %s, %s, %s ) """, ( vehicle_id, imei, occurred_at, geom_wkt, payload.get("speed_kmh"), payload.get("direction_deg"), payload.get("acc") if isinstance(payload.get("acc"), int) else None, payload.get("altitude_m"), payload.get("satellites"), payload.get("source") or "unknown", PARSER_VERSION, ), ) return True async def drain() -> int: """Process pending position_fix events. Returns count drained.""" pool = await get_pool() processed = 0 async with pool.connection() as conn, conn.transaction(), conn.cursor() as cur: await cur.execute( """ SELECT parsed_id, occurred_at, imei, account_id, payload FROM events.parsed WHERE kind = 'position_fix' AND NOT (payload ? %s) ORDER BY occurred_at FOR UPDATE SKIP LOCKED LIMIT %s """, (PROJECTED_FLAG_KEY, DRAIN_BATCH), ) rows = await cur.fetchall() for _, occurred_at, imei, account_id, payload in rows: payload_with_acct = dict(payload, _account_id=account_id) try: await _project_one( cur, occurred_at=occurred_at, imei=imei, payload=payload_with_acct, ) except Exception: log.exception("projector.failed", imei=imei) raise if rows: await cur.executemany( """ UPDATE events.parsed SET payload = payload || %s::jsonb WHERE parsed_id = %s AND occurred_at = %s """, [ (Jsonb({PROJECTED_FLAG_KEY: "now"}), int(pid), occ) for pid, occ, _, _, _ in rows ], ) processed = len(rows) if processed: log.info("projector.drained", count=processed) return processed