fleet-platform/app/workers/poller.py
kianiadee 6c5ba3b22b
Some checks are pending
build / lint-test (push) Waiting to run
build / build-push (push) Blocked by required conditions
UI: arrow + plate-short label + cost-centre marker palette + hover popup; richer state.live_positions + serve.fn_live_view v2; multi-target poll plumbing
2026-05-23 09:29:04 +03:00

123 lines
4.5 KiB
Python

"""Polling worker — writes Tracksolid responses to events.raw.
P1 polling jobs (cron role, APScheduler-driven):
- poll_live_positions: every TRACKSOLID_POLL_INTERVAL_SEC (default 60s), call
jimi.user.device.location.list for the configured target account, persist
the full response to events.raw with source='tracksolid_poll_list'. The
parser + projector handle the rest.
- poll_stale_imeis: every TRACKSOLID_STALE_POLL_INTERVAL_SEC (default 600s),
query state.live_positions for IMEIs whose latest fix is older than
TRACKSOLID_STALE_AFTER_SEC (default 1800s), batch them into groups of 100,
call jimi.device.location.get for each batch.
"""
from typing import Any
import structlog
from psycopg.types.json import Jsonb
from app.config import Settings, get_settings
from app.db import get_pool
from app.tracksolid.client import TracksolidClient, TracksolidError
log = structlog.get_logger("worker.poller")
async def _insert_raw(
*, source: str, account_id: str, payload: dict[str, Any]
) -> int:
pool = await get_pool()
async with pool.connection() as conn, conn.cursor() as cur:
await cur.execute(
"INSERT INTO events.raw (source, msg_type, account_id, payload) "
"VALUES (%s, %s, %s, %s) RETURNING event_id",
(source, None, account_id, Jsonb(payload)),
)
row = await cur.fetchone()
assert row is not None
return int(row[0])
def _targets(settings: Settings) -> list[str]:
"""Return the configured list of Tracksolid target accounts.
TRACKSOLID_TARGETS (comma-separated) takes precedence; falls back to the
single TRACKSOLID_TARGET_ACCOUNT for backward compatibility.
"""
if settings.tracksolid_targets:
return [t.strip() for t in settings.tracksolid_targets.split(",") if t.strip()]
if settings.tracksolid_target_account:
return [settings.tracksolid_target_account]
return []
async def poll_live_positions(client: TracksolidClient, settings: Settings) -> None:
targets = _targets(settings)
if not targets:
log.warning("poller.list_skipped_no_targets")
return
for target in targets:
try:
body = await client.location_list(target=target)
except TracksolidError:
log.exception("poller.list_api_error", target=target)
continue
except Exception:
log.exception("poller.list_crashed", target=target)
continue
result = body.get("result")
n = len(result) if isinstance(result, list) else 0
eid = await _insert_raw(source="tracksolid_poll_list", account_id=target, payload=body)
log.info("poller.list_ok", event_id=eid, devices=n, target=target)
async def _stale_imeis(settings: Settings) -> list[str]:
pool = await get_pool()
async with pool.connection() as conn, conn.cursor() as cur:
await cur.execute(
"""
SELECT lp.imei
FROM state.live_positions lp
JOIN domain.devices d ON d.imei = lp.imei
WHERE d.lifecycle = 'active'
AND lp.occurred_at < now() - (%s::int * interval '1 second')
ORDER BY lp.occurred_at ASC
LIMIT 1000
""",
(settings.tracksolid_stale_after_sec,),
)
return [r[0] for r in await cur.fetchall()]
async def poll_stale_imeis(client: TracksolidClient, settings: Settings) -> None:
"""jimi.device.location.get is account-scoped via the access_token, not via
`target`. Stale-poll across all imeis regardless of which target they came
from; tag with the primary target for bookkeeping."""
targets = _targets(settings)
if not targets:
return
primary_target = targets[0]
imeis = await _stale_imeis(settings)
if not imeis:
return
log.info("poller.stale_check_start", count=len(imeis))
for i in range(0, len(imeis), 100):
batch = imeis[i : i + 100]
try:
body = await client.location_get(batch)
except TracksolidError:
log.exception("poller.get_api_error", batch_size=len(batch))
continue
except Exception:
log.exception("poller.get_crashed", batch_size=len(batch))
continue
eid = await _insert_raw(
source="tracksolid_poll_get", account_id=primary_target, payload=body
)
log.info("poller.get_ok", event_id=eid, batch_size=len(batch))
def build_client() -> TracksolidClient:
return TracksolidClient(get_settings())