fleet-platform/app/workers/contract_check.py
kianiadee cf747d3efe
Some checks failed
build / build-push (push) Blocked by required conditions
build / lint-test (push) Failing after 6s
Fix CI lint failures so build-push can run and push an image
ruff: drop stale SLF001 noqa, wrap json.load in a context manager (SIM115), remove unused imports + placeholder-less f-strings; ignore PLR0912/PLR0915 for one-off scripts.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-05-29 01:11:19 +03:00

257 lines
8.6 KiB
Python

"""Contract checker — task #13 (PRD F1.10).
Daily probe of the Tracksolid Pro endpoints we depend on. For each endpoint,
call it, validate the response shape against the current Pydantic model, and
record one row in ops.contract_check_log. The SLO worker reads that log to
compute contract_drift_days = days since the most-recent successful probe of
the laggard endpoint. Threshold is 1 day, so a single failed daily run flips
the SLO badge red within 24h.
Why P1 runs this against the live (not sandbox) account: there is no dedicated
sandbox subaccount yet (Q-for-Tracksolid in the onboarding doc). The probes
are read-only — token.get, location.list, location.get — so running them
against production is safe. When a sandbox account is provisioned, swap the
target in env and the check is unchanged.
Failure is recorded, not raised. A crash in the checker would itself be a
silent contract drift on our side; we'd rather have a noisy log row than a
backtrace that someone has to notice.
"""
from __future__ import annotations
from typing import Any
import httpx
import structlog
from psycopg.types.json import Jsonb
from pydantic import ValidationError
from app.config import Settings, get_settings
from app.db import get_pool
from app.models.jimi import JimiPollFix
from app.tracksolid.client import TracksolidClient, TracksolidError
from app.workers.poller import _targets
log = structlog.get_logger("worker.contract_check")
TOKEN_METHOD = "jimi.oauth.token.get"
LIST_METHOD = "jimi.user.device.location.list"
GET_METHOD = "jimi.device.location.get"
async def _record(
*,
endpoint: str,
target: str | None,
success: bool,
error_class: str | None,
error_detail: str | None,
sample: dict[str, Any] | None,
parser_version: str,
) -> None:
pool = await get_pool()
async with pool.connection() as conn, conn.cursor() as cur:
await cur.execute(
"""INSERT INTO ops.contract_check_log
(endpoint, target, success, error_class, error_detail, sample, parser_version)
VALUES (%s, %s, %s, %s, %s, %s, %s)""",
(
endpoint,
target,
success,
error_class,
(error_detail[:2000] if error_detail else None),
Jsonb(sample) if sample is not None else None,
parser_version,
),
)
def _truncate_sample(payload: Any) -> dict[str, Any] | None:
"""Pull one item out of a Jimi response for the audit row."""
if isinstance(payload, dict):
result = payload.get("result")
if isinstance(result, list) and result:
return result[0] if isinstance(result[0], dict) else {"item": result[0]}
if isinstance(result, dict):
return result
return {k: payload[k] for k in list(payload.keys())[:8]}
return None
async def _check_token(client: TracksolidClient, *, parser_version: str) -> None:
try:
await client._ensure_token() # intentional probe of the token endpoint
except (TracksolidError, httpx.HTTPError) as exc:
await _record(
endpoint=TOKEN_METHOD,
target=None,
success=False,
error_class=type(exc).__name__,
error_detail=str(exc),
sample=None,
parser_version=parser_version,
)
log.warning("contract_check.token_failed", error=str(exc))
return
await _record(
endpoint=TOKEN_METHOD,
target=None,
success=True,
error_class=None,
error_detail=None,
sample=None,
parser_version=parser_version,
)
async def _check_location_list(
client: TracksolidClient, target: str, *, parser_version: str
) -> str | None:
"""Returns a sample IMEI from the response, for use by _check_location_get."""
try:
body = await client.location_list(target=target)
except (TracksolidError, httpx.HTTPError) as exc:
await _record(
endpoint=LIST_METHOD,
target=target,
success=False,
error_class=type(exc).__name__,
error_detail=str(exc),
sample=None,
parser_version=parser_version,
)
log.warning("contract_check.list_failed", target=target, error=str(exc))
return None
result = body.get("result") if isinstance(body, dict) else None
items = result if isinstance(result, list) else []
if not items:
await _record(
endpoint=LIST_METHOD,
target=target,
success=False,
error_class="EmptyResult",
error_detail="location.list returned no items",
sample=_truncate_sample(body),
parser_version=parser_version,
)
log.warning("contract_check.list_empty", target=target)
return None
try:
JimiPollFix.model_validate(items[0])
except ValidationError as exc:
await _record(
endpoint=LIST_METHOD,
target=target,
success=False,
error_class="ValidationError",
error_detail=str(exc),
sample=items[0] if isinstance(items[0], dict) else None,
parser_version=parser_version,
)
log.warning("contract_check.list_drift", target=target, error=str(exc))
return None
await _record(
endpoint=LIST_METHOD,
target=target,
success=True,
error_class=None,
error_detail=None,
sample=items[0] if isinstance(items[0], dict) else None,
parser_version=parser_version,
)
first = items[0] if isinstance(items[0], dict) else {}
sample_imei = first.get("imei") or first.get("deviceImei")
return str(sample_imei) if sample_imei else None
async def _check_location_get(
client: TracksolidClient, sample_imei: str, *, parser_version: str
) -> None:
try:
body = await client.location_get([sample_imei])
except (TracksolidError, httpx.HTTPError) as exc:
await _record(
endpoint=GET_METHOD,
target=None,
success=False,
error_class=type(exc).__name__,
error_detail=str(exc),
sample={"imei": sample_imei},
parser_version=parser_version,
)
log.warning("contract_check.get_failed", imei=sample_imei, error=str(exc))
return
result = body.get("result") if isinstance(body, dict) else None
items = result if isinstance(result, list) else []
if not items:
await _record(
endpoint=GET_METHOD,
target=None,
success=False,
error_class="EmptyResult",
error_detail=f"location.get({sample_imei}) returned no items",
sample=_truncate_sample(body),
parser_version=parser_version,
)
log.warning("contract_check.get_empty", imei=sample_imei)
return
try:
JimiPollFix.model_validate(items[0])
except ValidationError as exc:
await _record(
endpoint=GET_METHOD,
target=None,
success=False,
error_class="ValidationError",
error_detail=str(exc),
sample=items[0] if isinstance(items[0], dict) else None,
parser_version=parser_version,
)
log.warning("contract_check.get_drift", imei=sample_imei, error=str(exc))
return
await _record(
endpoint=GET_METHOD,
target=None,
success=True,
error_class=None,
error_detail=None,
sample=items[0] if isinstance(items[0], dict) else None,
parser_version=parser_version,
)
async def run_checks(client: TracksolidClient, settings: Settings | None = None) -> None:
settings = settings or get_settings()
parser_version = settings.app_git_sha
await _check_token(client, parser_version=parser_version)
sample_imei: str | None = None
for target in _targets(settings):
imei = await _check_location_list(client, target, parser_version=parser_version)
if imei and sample_imei is None:
sample_imei = imei
if sample_imei is not None:
await _check_location_get(client, sample_imei, parser_version=parser_version)
else:
await _record(
endpoint=GET_METHOD,
target=None,
success=False,
error_class="NoSampleImei",
error_detail="no list.location call yielded an IMEI to probe location.get",
sample=None,
parser_version=parser_version,
)
log.warning("contract_check.get_skipped_no_sample_imei")
log.info("contract_check.complete")