diff --git a/.env.example b/.env.example index 1f146ad..871d308 100644 --- a/.env.example +++ b/.env.example @@ -16,10 +16,15 @@ TRACKSOLID_PUSH_TOKEN=set-from-tracksolid-console # Tracksolid polled API TRACKSOLID_API_BASE_URL=https://eu-open.tracksolidpro.com/route/rest -TRACKSOLID_PROD_ACCOUNTS= # JSON array of {account_id, app_key, secret} — populated from current TARGETS env -TRACKSOLID_SANDBOX_ACCOUNT_ID= -TRACKSOLID_SANDBOX_APP_KEY= -TRACKSOLID_SANDBOX_SECRET= +TRACKSOLID_APP_KEY= +TRACKSOLID_APP_SECRET= +TRACKSOLID_USER_ID= +TRACKSOLID_PWD_MD5= +TRACKSOLID_TARGET_ACCOUNT= +TRACKSOLID_TOKEN_TTL_SEC=7200 +TRACKSOLID_POLL_INTERVAL_SEC=60 +TRACKSOLID_STALE_POLL_INTERVAL_SEC=600 +TRACKSOLID_STALE_AFTER_SEC=1800 # Geocoding (P2) NOMINATIM_BASE_URL= diff --git a/app/config.py b/app/config.py index 1cc2d75..fab4ff8 100644 --- a/app/config.py +++ b/app/config.py @@ -16,6 +16,17 @@ class Settings(BaseSettings): tracksolid_push_token: str = Field(default="", alias="TRACKSOLID_PUSH_TOKEN") tracksolid_api_base_url: str = Field(default="", alias="TRACKSOLID_API_BASE_URL") + tracksolid_app_key: str = Field(default="", alias="TRACKSOLID_APP_KEY") + tracksolid_app_secret: str = Field(default="", alias="TRACKSOLID_APP_SECRET") + tracksolid_user_id: str = Field(default="", alias="TRACKSOLID_USER_ID") + tracksolid_pwd_md5: str = Field(default="", alias="TRACKSOLID_PWD_MD5") + tracksolid_target_account: str = Field(default="", alias="TRACKSOLID_TARGET_ACCOUNT") + tracksolid_token_ttl_sec: int = Field(default=7200, alias="TRACKSOLID_TOKEN_TTL_SEC") + tracksolid_poll_interval_sec: int = Field(default=60, alias="TRACKSOLID_POLL_INTERVAL_SEC") + tracksolid_stale_poll_interval_sec: int = Field( + default=600, alias="TRACKSOLID_STALE_POLL_INTERVAL_SEC" + ) + tracksolid_stale_after_sec: int = Field(default=1800, alias="TRACKSOLID_STALE_AFTER_SEC") ntfy_base_url: str = Field(default="", alias="NTFY_BASE_URL") ntfy_topic: str = Field(default="fleet-slo-breach", alias="NTFY_TOPIC") diff --git a/app/entrypoints/cron.py b/app/entrypoints/cron.py index 4b58cf4..1ce8c22 100644 --- a/app/entrypoints/cron.py +++ b/app/entrypoints/cron.py @@ -1,8 +1,12 @@ """Cron entrypoint. Runs as a FastAPI app (for /health/cron) with APScheduler spawning the -time-triggered jobs. P1 jobs are added in week 2 task #8 (polling) and week 3 -tasks #12 (SLO measurement) and #13 (contract checker). +time-triggered jobs. P1 jobs: + + - poll_live_positions : every TRACKSOLID_POLL_INTERVAL_SEC (default 60s) + - poll_stale_imeis : every TRACKSOLID_STALE_POLL_INTERVAL_SEC (default 600s) + +SLO measurement worker (#12) and contract checker (#13) land here later. """ from collections.abc import AsyncIterator @@ -10,12 +14,15 @@ from contextlib import asynccontextmanager import structlog from apscheduler.schedulers.asyncio import AsyncIOScheduler +from apscheduler.triggers.interval import IntervalTrigger from fastapi import FastAPI from app.config import get_settings from app.db import close_pool, get_pool from app.health import router as health_router from app.logging_setup import configure_logging +from app.tracksolid.client import TracksolidClient +from app.workers import poller log = structlog.get_logger("cron") @@ -25,9 +32,54 @@ async def lifespan(_: FastAPI) -> AsyncIterator[None]: configure_logging() settings = get_settings() await get_pool() - log.info("cron.starting", git_sha=settings.app_git_sha, mode=settings.app_mode) + log.info( + "cron.starting", + git_sha=settings.app_git_sha, + mode=settings.app_mode, + target_account=settings.tracksolid_target_account or "", + ) + + client = TracksolidClient(settings) scheduler = AsyncIOScheduler(timezone="UTC") + + async def _run_list() -> None: + await poller.poll_live_positions(client, settings) + + async def _run_stale() -> None: + await poller.poll_stale_imeis(client, settings) + + if settings.tracksolid_target_account and settings.tracksolid_app_key: + scheduler.add_job( + _run_list, + trigger=IntervalTrigger(seconds=settings.tracksolid_poll_interval_sec), + id="poll_live_positions", + max_instances=1, + coalesce=True, + misfire_grace_time=30, + ) + scheduler.add_job( + _run_stale, + trigger=IntervalTrigger(seconds=settings.tracksolid_stale_poll_interval_sec), + id="poll_stale_imeis", + max_instances=1, + coalesce=True, + misfire_grace_time=120, + ) + scheduler.add_job( + _run_list, + trigger="date", # fire once on startup + id="poll_live_positions_initial", + ) + log.info( + "cron.tracksolid_jobs_registered", + list_every_sec=settings.tracksolid_poll_interval_sec, + stale_every_sec=settings.tracksolid_stale_poll_interval_sec, + stale_after_sec=settings.tracksolid_stale_after_sec, + ) + else: + log.warning("cron.tracksolid_jobs_skipped_missing_creds") + scheduler.start() log.info("cron.scheduler_started") @@ -35,6 +87,7 @@ async def lifespan(_: FastAPI) -> AsyncIterator[None]: yield finally: scheduler.shutdown(wait=False) + await client.close() await close_pool() diff --git a/app/parsers/jimi.py b/app/parsers/jimi.py index a53a59d..25f700e 100644 --- a/app/parsers/jimi.py +++ b/app/parsers/jimi.py @@ -182,11 +182,17 @@ def _parse_push_event(payload: dict[str, Any], account_id: str | None) -> list[P def _items_for_poll(payload: dict[str, Any]) -> list[dict[str, Any]]: - """Polled list/get responses come back wrapped under various keys.""" - for key in ("_list", "data", "records"): + """Polled list/get responses come back wrapped under various keys. + + Tracksolid Pro envelope is `{code, msg, result: [...]}` — extract `result`. + Synthetic fixtures and legacy formats may use `_list`, `data`, or `records`. + """ + for key in ("result", "_list", "data", "records"): v = payload.get(key) if isinstance(v, list): return [item for item in v if isinstance(item, dict)] + if isinstance(v, dict): + return [v] if "_raw" in payload: return [] return [payload] diff --git a/app/projectors/live_positions.py b/app/projectors/live_positions.py index 7b0062a..339e9d8 100644 --- a/app/projectors/live_positions.py +++ b/app/projectors/live_positions.py @@ -24,16 +24,64 @@ DRAIN_BATCH = 500 PROJECTED_FLAG_KEY = "live_positions_projected_at" -async def _resolve_device(cur: AsyncCursor[Any], imei: str) -> int | None: - """Returns vehicle_id for the device, or None if unmapped/unknown.""" +async def _resolve_device( + cur: AsyncCursor[Any], imei: str, *, account_id: str | None +) -> int | None: + """Returns vehicle_id for the device. + + Auto-provisions on first sight: when the polling worker sees an IMEI we've + never seen before, we create a placeholder vehicle (plate = "IMEI-") + and a device row with lifecycle='active'. The fleet admin can rename the + plate later via the (forthcoming) admin UI; until then the device is fully + operational. + + Returns None only when the IMEI is known but unmapped (vehicle_id IS NULL), + which shouldn't happen via this auto-provision path but is preserved for + manual edits. + """ await cur.execute( "SELECT vehicle_id FROM domain.devices WHERE imei = %s", (imei,), ) row = await cur.fetchone() - if row is None or row[0] is None: + if row is not None: + return None if row[0] is None else int(row[0]) + + if not account_id: return None - return int(row[0]) + + await cur.execute( + """INSERT INTO domain.accounts (account_id, name, app_key) + VALUES (%s, %s, '') + ON CONFLICT (account_id) DO NOTHING""", + (account_id, account_id), + ) + + plate = f"IMEI-{imei[-6:]}" + await cur.execute( + """INSERT INTO domain.vehicles (plate) VALUES (%s) + ON CONFLICT (plate) DO UPDATE SET plate = EXCLUDED.plate + RETURNING vehicle_id""", + (plate,), + ) + row = await cur.fetchone() + assert row is not None + vehicle_id = int(row[0]) + + await cur.execute( + """INSERT INTO domain.devices + (imei, account_id, vehicle_id, device_type, lifecycle, activation_at) + VALUES (%s, %s, %s, 'tracker', 'active', now()) + ON CONFLICT (imei) DO UPDATE + SET account_id = EXCLUDED.account_id, + vehicle_id = COALESCE(domain.devices.vehicle_id, EXCLUDED.vehicle_id), + lifecycle = CASE WHEN domain.devices.lifecycle = 'provisioned' + THEN 'active' ELSE domain.devices.lifecycle END""", + (imei, account_id, vehicle_id), + ) + + log.info("projector.auto_provisioned_device", imei=imei, vehicle_id=vehicle_id, plate=plate) + return vehicle_id async def _project_one( @@ -48,7 +96,7 @@ async def _project_one( if lat is None or lng is None: return False - vehicle_id = await _resolve_device(cur, imei) + vehicle_id = await _resolve_device(cur, imei, account_id=payload.get("_account_id")) if vehicle_id is None: return False @@ -116,7 +164,7 @@ async def drain() -> int: async with pool.connection() as conn, conn.transaction(), conn.cursor() as cur: await cur.execute( """ - SELECT parsed_id, occurred_at, imei, payload + SELECT parsed_id, occurred_at, imei, account_id, payload FROM events.parsed WHERE kind = 'position_fix' AND NOT (payload ? %s) @@ -127,13 +175,14 @@ async def drain() -> int: (PROJECTED_FLAG_KEY, DRAIN_BATCH), ) rows = await cur.fetchall() - for _, occurred_at, imei, payload in rows: + for _, occurred_at, imei, account_id, payload in rows: + payload_with_acct = dict(payload, _account_id=account_id) try: await _project_one( cur, occurred_at=occurred_at, imei=imei, - payload=payload, + payload=payload_with_acct, ) except Exception: log.exception("projector.failed", imei=imei) @@ -147,7 +196,7 @@ async def drain() -> int: """, [ (Jsonb({PROJECTED_FLAG_KEY: "now"}), int(pid), occ) - for pid, occ, _, _ in rows + for pid, occ, _, _, _ in rows ], ) processed = len(rows) diff --git a/app/tracksolid/__init__.py b/app/tracksolid/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/tracksolid/client.py b/app/tracksolid/client.py new file mode 100644 index 0000000..ace8f0b --- /dev/null +++ b/app/tracksolid/client.py @@ -0,0 +1,133 @@ +"""Tracksolid Pro API client. + +Signature scheme (per https://tracksolidprodocs.jimicloud.com/integration/integration.html): + sign = MD5(app_secret + ''.join(k+v for k,v in sorted(params)) + app_secret) +upper-case hex, 32 chars. `sign` is then added to the params for transport. + +Token endpoint: jimi.oauth.token.get → returns accessToken + expiresIn (sec). +List endpoint: jimi.user.device.location.list (per target account). +Get endpoint: jimi.device.location.get (batch of up to 100 IMEIs). +""" + +import asyncio +import hashlib +from datetime import UTC, datetime, timedelta +from typing import Any + +import httpx +import structlog + +from app.config import Settings + +log = structlog.get_logger("tracksolid") + +REFRESH_LEAD_SECONDS = 300 # refresh 5 min before expiry + + +class TracksolidError(Exception): + """Tracksolid API returned a non-zero `code`.""" + + def __init__(self, code: int, msg: str, *, method: str) -> None: + super().__init__(f"{method} → code={code} msg={msg!r}") + self.code = code + self.msg = msg + self.method = method + + +def sign_params(params: dict[str, str], secret: str) -> str: + body = "".join(f"{k}{v}" for k, v in sorted(params.items())) + raw = f"{secret}{body}{secret}".encode() + return hashlib.md5(raw).hexdigest().upper() + + +def _utc_timestamp(at: datetime | None = None) -> str: + at = at or datetime.now(UTC) + return at.strftime("%Y-%m-%d %H:%M:%S") + + +class TracksolidClient: + """Thin async client. One instance per account is fine. + + Holds an access-token cache; refreshes on demand or on 401. + """ + + def __init__(self, settings: Settings, *, http_timeout: float = 15.0) -> None: + self._settings = settings + self._http = httpx.AsyncClient(timeout=http_timeout) + self._token: str | None = None + self._token_expires_at: datetime = datetime.now(UTC) + self._lock = asyncio.Lock() + + async def close(self) -> None: + await self._http.aclose() + + @property + def base_url(self) -> str: + return self._settings.tracksolid_api_base_url + + def _common(self) -> dict[str, str]: + return { + "app_key": self._settings.tracksolid_app_key, + "sign_method": "md5", + "timestamp": _utc_timestamp(), + "format": "json", + "v": "1.0", + } + + async def _post(self, params: dict[str, str]) -> dict[str, Any]: + signed = dict(params) + signed["sign"] = sign_params(params, self._settings.tracksolid_app_secret) + resp = await self._http.post(self.base_url, data=signed) + resp.raise_for_status() + body: dict[str, Any] = resp.json() + code = int(body.get("code", -1)) + if code != 0: + raise TracksolidError(code, str(body.get("msg")), method=params.get("method", "?")) + return body + + async def _ensure_token(self) -> str: + async with self._lock: + if ( + self._token is not None + and self._token_expires_at - datetime.now(UTC) + > timedelta(seconds=REFRESH_LEAD_SECONDS) + ): + return self._token + + params = self._common() | { + "method": "jimi.oauth.token.get", + "user_id": self._settings.tracksolid_user_id, + "user_pwd_md5": self._settings.tracksolid_pwd_md5, + "expires_in": str(self._settings.tracksolid_token_ttl_sec), + } + body = await self._post(params) + result = body.get("result", {}) + token = result.get("accessToken") + ttl = int(result.get("expiresIn", self._settings.tracksolid_token_ttl_sec)) + if not isinstance(token, str) or not token: + raise TracksolidError( + 0, f"missing accessToken in {result!r}", method="jimi.oauth.token.get" + ) + self._token = token + self._token_expires_at = datetime.now(UTC) + timedelta(seconds=ttl) + log.info( + "tracksolid.token_refreshed", + expires_at=self._token_expires_at.isoformat(), + ttl_sec=ttl, + ) + return token + + async def _data_call(self, method: str, extra: dict[str, str]) -> dict[str, Any]: + token = await self._ensure_token() + params = self._common() | {"method": method, "access_token": token} | extra + return await self._post(params) + + async def location_list(self, target: str) -> dict[str, Any]: + return await self._data_call("jimi.user.device.location.list", {"target": target}) + + async def location_get(self, imeis: list[str]) -> dict[str, Any]: + if not imeis: + return {"code": 0, "msg": "success", "result": []} + if len(imeis) > 100: + raise ValueError("location.get supports max 100 imeis per call") + return await self._data_call("jimi.device.location.get", {"imeis": ",".join(imeis)}) diff --git a/app/workers/poller.py b/app/workers/poller.py new file mode 100644 index 0000000..ec50ea4 --- /dev/null +++ b/app/workers/poller.py @@ -0,0 +1,105 @@ +"""Polling worker — writes Tracksolid responses to events.raw. + +P1 polling jobs (cron role, APScheduler-driven): + +- poll_live_positions: every TRACKSOLID_POLL_INTERVAL_SEC (default 60s), call + jimi.user.device.location.list for the configured target account, persist + the full response to events.raw with source='tracksolid_poll_list'. The + parser + projector handle the rest. + +- poll_stale_imeis: every TRACKSOLID_STALE_POLL_INTERVAL_SEC (default 600s), + query state.live_positions for IMEIs whose latest fix is older than + TRACKSOLID_STALE_AFTER_SEC (default 1800s), batch them into groups of 100, + call jimi.device.location.get for each batch. +""" + +from typing import Any + +import structlog +from psycopg.types.json import Jsonb + +from app.config import Settings, get_settings +from app.db import get_pool +from app.tracksolid.client import TracksolidClient, TracksolidError + +log = structlog.get_logger("worker.poller") + + +async def _insert_raw( + *, source: str, account_id: str, payload: dict[str, Any] +) -> int: + pool = await get_pool() + async with pool.connection() as conn, conn.cursor() as cur: + await cur.execute( + "INSERT INTO events.raw (source, msg_type, account_id, payload) " + "VALUES (%s, %s, %s, %s) RETURNING event_id", + (source, None, account_id, Jsonb(payload)), + ) + row = await cur.fetchone() + assert row is not None + return int(row[0]) + + +async def poll_live_positions(client: TracksolidClient, settings: Settings) -> None: + target = settings.tracksolid_target_account + if not target: + log.warning("poller.list_skipped_no_target") + return + try: + body = await client.location_list(target=target) + except TracksolidError: + log.exception("poller.list_api_error") + return + except Exception: + log.exception("poller.list_crashed") + return + result = body.get("result") + n = len(result) if isinstance(result, list) else 0 + eid = await _insert_raw(source="tracksolid_poll_list", account_id=target, payload=body) + log.info("poller.list_ok", event_id=eid, devices=n) + + +async def _stale_imeis(settings: Settings) -> list[str]: + pool = await get_pool() + async with pool.connection() as conn, conn.cursor() as cur: + await cur.execute( + """ + SELECT lp.imei + FROM state.live_positions lp + JOIN domain.devices d ON d.imei = lp.imei + WHERE d.lifecycle = 'active' + AND lp.occurred_at < now() - (%s::int * interval '1 second') + ORDER BY lp.occurred_at ASC + LIMIT 1000 + """, + (settings.tracksolid_stale_after_sec,), + ) + return [r[0] for r in await cur.fetchall()] + + +async def poll_stale_imeis(client: TracksolidClient, settings: Settings) -> None: + target = settings.tracksolid_target_account + if not target: + return + imeis = await _stale_imeis(settings) + if not imeis: + return + log.info("poller.stale_check_start", count=len(imeis)) + for i in range(0, len(imeis), 100): + batch = imeis[i : i + 100] + try: + body = await client.location_get(batch) + except TracksolidError: + log.exception("poller.get_api_error", batch_size=len(batch)) + continue + except Exception: + log.exception("poller.get_crashed", batch_size=len(batch)) + continue + eid = await _insert_raw( + source="tracksolid_poll_get", account_id=target, payload=body + ) + log.info("poller.get_ok", event_id=eid, batch_size=len(batch)) + + +def build_client() -> TracksolidClient: + return TracksolidClient(get_settings()) diff --git a/tests/test_tracksolid_client.py b/tests/test_tracksolid_client.py new file mode 100644 index 0000000..74bdde7 --- /dev/null +++ b/tests/test_tracksolid_client.py @@ -0,0 +1,47 @@ +"""Unit tests for Tracksolid signing — no network.""" + +import hashlib + +from app.tracksolid.client import sign_params + + +def test_sign_params_docs_example() -> None: + secret = "c0aa0226fddc4365a3c67fef45427f8a" + params = { + "app_key": "8FB345B8693CCD00CE073CAB5F094009339A22A4105B6558", + "expires_in": "7200", + "format": "json", + "method": "jimi.oauth.token.get", + "sign_method": "md5", + "timestamp": "2025-05-19 10:23:00", + "user_id": "JMTEST123", + "user_pwd_md5": "21218cca77804d2ba1922c33e0151105", + "v": "1.0", + } + body = "".join(f"{k}{v}" for k, v in sorted(params.items())) + expected_raw = f"{secret}{body}{secret}" + expected_sig = hashlib.md5(expected_raw.encode()).hexdigest().upper() + + sig = sign_params(params, secret) + + assert sig == expected_sig + assert len(sig) == 32 + assert sig == sig.upper() + + +def test_sign_params_alphabetical_invariance() -> None: + secret = "secret" + params = {"b": "2", "a": "1", "c": "3"} + sig1 = sign_params(params, secret) + sig2 = sign_params({"c": "3", "a": "1", "b": "2"}, secret) + assert sig1 == sig2 + + +def test_sign_params_handles_space_in_value() -> None: + """Values like timestamp ('2025-05-19 10:23:00') include spaces; they + should be signed verbatim, not url-encoded.""" + secret = "s" + params = {"timestamp": "2025-05-19 10:23:00", "method": "x"} + body = "method" + "x" + "timestamp" + "2025-05-19 10:23:00" + expected = hashlib.md5(f"{secret}{body}{secret}".encode()).hexdigest().upper() + assert sign_params(params, secret) == expected