"""Polling worker — writes Tracksolid responses to events.raw. P1 polling jobs (cron role, APScheduler-driven): - poll_live_positions: every TRACKSOLID_POLL_INTERVAL_SEC (default 60s), call jimi.user.device.location.list for the configured target account, persist the full response to events.raw with source='tracksolid_poll_list'. The parser + projector handle the rest. - poll_stale_imeis: every TRACKSOLID_STALE_POLL_INTERVAL_SEC (default 600s), query state.live_positions for IMEIs whose latest fix is older than TRACKSOLID_STALE_AFTER_SEC (default 1800s), batch them into groups of 100, call jimi.device.location.get for each batch. """ from typing import Any import structlog from psycopg.types.json import Jsonb from app.config import Settings, get_settings from app.db import get_pool from app.tracksolid.client import TracksolidClient, TracksolidError log = structlog.get_logger("worker.poller") async def _insert_raw( *, source: str, account_id: str, payload: dict[str, Any] ) -> int: pool = await get_pool() async with pool.connection() as conn, conn.cursor() as cur: await cur.execute( "INSERT INTO events.raw (source, msg_type, account_id, payload) " "VALUES (%s, %s, %s, %s) RETURNING event_id", (source, None, account_id, Jsonb(payload)), ) row = await cur.fetchone() assert row is not None return int(row[0]) def _targets(settings: Settings) -> list[str]: """Return the configured list of Tracksolid target accounts. TRACKSOLID_TARGETS (comma-separated) takes precedence; falls back to the single TRACKSOLID_TARGET_ACCOUNT for backward compatibility. """ if settings.tracksolid_targets: return [t.strip() for t in settings.tracksolid_targets.split(",") if t.strip()] if settings.tracksolid_target_account: return [settings.tracksolid_target_account] return [] async def poll_live_positions(client: TracksolidClient, settings: Settings) -> None: targets = _targets(settings) if not targets: log.warning("poller.list_skipped_no_targets") return for target in targets: try: body = await client.location_list(target=target) except TracksolidError: log.exception("poller.list_api_error", target=target) continue except Exception: log.exception("poller.list_crashed", target=target) continue result = body.get("result") n = len(result) if isinstance(result, list) else 0 eid = await _insert_raw(source="tracksolid_poll_list", account_id=target, payload=body) log.info("poller.list_ok", event_id=eid, devices=n, target=target) async def _stale_imeis(settings: Settings) -> list[str]: pool = await get_pool() async with pool.connection() as conn, conn.cursor() as cur: await cur.execute( """ SELECT lp.imei FROM state.live_positions lp JOIN domain.devices d ON d.imei = lp.imei WHERE d.lifecycle = 'active' AND lp.occurred_at < now() - (%s::int * interval '1 second') ORDER BY lp.occurred_at ASC LIMIT 1000 """, (settings.tracksolid_stale_after_sec,), ) return [r[0] for r in await cur.fetchall()] async def poll_stale_imeis(client: TracksolidClient, settings: Settings) -> None: """jimi.device.location.get is account-scoped via the access_token, not via `target`. Stale-poll across all imeis regardless of which target they came from; tag with the primary target for bookkeeping.""" targets = _targets(settings) if not targets: return primary_target = targets[0] imeis = await _stale_imeis(settings) if not imeis: return log.info("poller.stale_check_start", count=len(imeis)) for i in range(0, len(imeis), 100): batch = imeis[i : i + 100] try: body = await client.location_get(batch) except TracksolidError: log.exception("poller.get_api_error", batch_size=len(batch)) continue except Exception: log.exception("poller.get_crashed", batch_size=len(batch)) continue eid = await _insert_raw( source="tracksolid_poll_get", account_id=primary_target, payload=body ) log.info("poller.get_ok", event_id=eid, batch_size=len(batch)) def build_client() -> TracksolidClient: return TracksolidClient(get_settings())