258 lines
8.6 KiB
Python
258 lines
8.6 KiB
Python
|
|
"""Contract checker — task #13 (PRD F1.10).
|
||
|
|
|
||
|
|
Daily probe of the Tracksolid Pro endpoints we depend on. For each endpoint,
|
||
|
|
call it, validate the response shape against the current Pydantic model, and
|
||
|
|
record one row in ops.contract_check_log. The SLO worker reads that log to
|
||
|
|
compute contract_drift_days = days since the most-recent successful probe of
|
||
|
|
the laggard endpoint. Threshold is 1 day, so a single failed daily run flips
|
||
|
|
the SLO badge red within 24h.
|
||
|
|
|
||
|
|
Why P1 runs this against the live (not sandbox) account: there is no dedicated
|
||
|
|
sandbox subaccount yet (Q-for-Tracksolid in the onboarding doc). The probes
|
||
|
|
are read-only — token.get, location.list, location.get — so running them
|
||
|
|
against production is safe. When a sandbox account is provisioned, swap the
|
||
|
|
target in env and the check is unchanged.
|
||
|
|
|
||
|
|
Failure is recorded, not raised. A crash in the checker would itself be a
|
||
|
|
silent contract drift on our side; we'd rather have a noisy log row than a
|
||
|
|
backtrace that someone has to notice.
|
||
|
|
"""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
from typing import Any
|
||
|
|
|
||
|
|
import httpx
|
||
|
|
import structlog
|
||
|
|
from psycopg.types.json import Jsonb
|
||
|
|
from pydantic import ValidationError
|
||
|
|
|
||
|
|
from app.config import Settings, get_settings
|
||
|
|
from app.db import get_pool
|
||
|
|
from app.models.jimi import JimiPollFix
|
||
|
|
from app.tracksolid.client import TracksolidClient, TracksolidError
|
||
|
|
from app.workers.poller import _targets
|
||
|
|
|
||
|
|
log = structlog.get_logger("worker.contract_check")
|
||
|
|
|
||
|
|
TOKEN_METHOD = "jimi.oauth.token.get"
|
||
|
|
LIST_METHOD = "jimi.user.device.location.list"
|
||
|
|
GET_METHOD = "jimi.device.location.get"
|
||
|
|
|
||
|
|
|
||
|
|
async def _record(
|
||
|
|
*,
|
||
|
|
endpoint: str,
|
||
|
|
target: str | None,
|
||
|
|
success: bool,
|
||
|
|
error_class: str | None,
|
||
|
|
error_detail: str | None,
|
||
|
|
sample: dict[str, Any] | None,
|
||
|
|
parser_version: str,
|
||
|
|
) -> None:
|
||
|
|
pool = await get_pool()
|
||
|
|
async with pool.connection() as conn, conn.cursor() as cur:
|
||
|
|
await cur.execute(
|
||
|
|
"""INSERT INTO ops.contract_check_log
|
||
|
|
(endpoint, target, success, error_class, error_detail, sample, parser_version)
|
||
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s)""",
|
||
|
|
(
|
||
|
|
endpoint,
|
||
|
|
target,
|
||
|
|
success,
|
||
|
|
error_class,
|
||
|
|
(error_detail[:2000] if error_detail else None),
|
||
|
|
Jsonb(sample) if sample is not None else None,
|
||
|
|
parser_version,
|
||
|
|
),
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
def _truncate_sample(payload: Any) -> dict[str, Any] | None:
|
||
|
|
"""Pull one item out of a Jimi response for the audit row."""
|
||
|
|
if isinstance(payload, dict):
|
||
|
|
result = payload.get("result")
|
||
|
|
if isinstance(result, list) and result:
|
||
|
|
return result[0] if isinstance(result[0], dict) else {"item": result[0]}
|
||
|
|
if isinstance(result, dict):
|
||
|
|
return result
|
||
|
|
return {k: payload[k] for k in list(payload.keys())[:8]}
|
||
|
|
return None
|
||
|
|
|
||
|
|
|
||
|
|
async def _check_token(client: TracksolidClient, *, parser_version: str) -> None:
|
||
|
|
try:
|
||
|
|
await client._ensure_token() # noqa: SLF001 — intentional probe
|
||
|
|
except (TracksolidError, httpx.HTTPError) as exc:
|
||
|
|
await _record(
|
||
|
|
endpoint=TOKEN_METHOD,
|
||
|
|
target=None,
|
||
|
|
success=False,
|
||
|
|
error_class=type(exc).__name__,
|
||
|
|
error_detail=str(exc),
|
||
|
|
sample=None,
|
||
|
|
parser_version=parser_version,
|
||
|
|
)
|
||
|
|
log.warning("contract_check.token_failed", error=str(exc))
|
||
|
|
return
|
||
|
|
await _record(
|
||
|
|
endpoint=TOKEN_METHOD,
|
||
|
|
target=None,
|
||
|
|
success=True,
|
||
|
|
error_class=None,
|
||
|
|
error_detail=None,
|
||
|
|
sample=None,
|
||
|
|
parser_version=parser_version,
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
async def _check_location_list(
|
||
|
|
client: TracksolidClient, target: str, *, parser_version: str
|
||
|
|
) -> str | None:
|
||
|
|
"""Returns a sample IMEI from the response, for use by _check_location_get."""
|
||
|
|
try:
|
||
|
|
body = await client.location_list(target=target)
|
||
|
|
except (TracksolidError, httpx.HTTPError) as exc:
|
||
|
|
await _record(
|
||
|
|
endpoint=LIST_METHOD,
|
||
|
|
target=target,
|
||
|
|
success=False,
|
||
|
|
error_class=type(exc).__name__,
|
||
|
|
error_detail=str(exc),
|
||
|
|
sample=None,
|
||
|
|
parser_version=parser_version,
|
||
|
|
)
|
||
|
|
log.warning("contract_check.list_failed", target=target, error=str(exc))
|
||
|
|
return None
|
||
|
|
|
||
|
|
result = body.get("result") if isinstance(body, dict) else None
|
||
|
|
items = result if isinstance(result, list) else []
|
||
|
|
if not items:
|
||
|
|
await _record(
|
||
|
|
endpoint=LIST_METHOD,
|
||
|
|
target=target,
|
||
|
|
success=False,
|
||
|
|
error_class="EmptyResult",
|
||
|
|
error_detail="location.list returned no items",
|
||
|
|
sample=_truncate_sample(body),
|
||
|
|
parser_version=parser_version,
|
||
|
|
)
|
||
|
|
log.warning("contract_check.list_empty", target=target)
|
||
|
|
return None
|
||
|
|
|
||
|
|
try:
|
||
|
|
JimiPollFix.model_validate(items[0])
|
||
|
|
except ValidationError as exc:
|
||
|
|
await _record(
|
||
|
|
endpoint=LIST_METHOD,
|
||
|
|
target=target,
|
||
|
|
success=False,
|
||
|
|
error_class="ValidationError",
|
||
|
|
error_detail=str(exc),
|
||
|
|
sample=items[0] if isinstance(items[0], dict) else None,
|
||
|
|
parser_version=parser_version,
|
||
|
|
)
|
||
|
|
log.warning("contract_check.list_drift", target=target, error=str(exc))
|
||
|
|
return None
|
||
|
|
|
||
|
|
await _record(
|
||
|
|
endpoint=LIST_METHOD,
|
||
|
|
target=target,
|
||
|
|
success=True,
|
||
|
|
error_class=None,
|
||
|
|
error_detail=None,
|
||
|
|
sample=items[0] if isinstance(items[0], dict) else None,
|
||
|
|
parser_version=parser_version,
|
||
|
|
)
|
||
|
|
first = items[0] if isinstance(items[0], dict) else {}
|
||
|
|
sample_imei = first.get("imei") or first.get("deviceImei")
|
||
|
|
return str(sample_imei) if sample_imei else None
|
||
|
|
|
||
|
|
|
||
|
|
async def _check_location_get(
|
||
|
|
client: TracksolidClient, sample_imei: str, *, parser_version: str
|
||
|
|
) -> None:
|
||
|
|
try:
|
||
|
|
body = await client.location_get([sample_imei])
|
||
|
|
except (TracksolidError, httpx.HTTPError) as exc:
|
||
|
|
await _record(
|
||
|
|
endpoint=GET_METHOD,
|
||
|
|
target=None,
|
||
|
|
success=False,
|
||
|
|
error_class=type(exc).__name__,
|
||
|
|
error_detail=str(exc),
|
||
|
|
sample={"imei": sample_imei},
|
||
|
|
parser_version=parser_version,
|
||
|
|
)
|
||
|
|
log.warning("contract_check.get_failed", imei=sample_imei, error=str(exc))
|
||
|
|
return
|
||
|
|
|
||
|
|
result = body.get("result") if isinstance(body, dict) else None
|
||
|
|
items = result if isinstance(result, list) else []
|
||
|
|
if not items:
|
||
|
|
await _record(
|
||
|
|
endpoint=GET_METHOD,
|
||
|
|
target=None,
|
||
|
|
success=False,
|
||
|
|
error_class="EmptyResult",
|
||
|
|
error_detail=f"location.get({sample_imei}) returned no items",
|
||
|
|
sample=_truncate_sample(body),
|
||
|
|
parser_version=parser_version,
|
||
|
|
)
|
||
|
|
log.warning("contract_check.get_empty", imei=sample_imei)
|
||
|
|
return
|
||
|
|
|
||
|
|
try:
|
||
|
|
JimiPollFix.model_validate(items[0])
|
||
|
|
except ValidationError as exc:
|
||
|
|
await _record(
|
||
|
|
endpoint=GET_METHOD,
|
||
|
|
target=None,
|
||
|
|
success=False,
|
||
|
|
error_class="ValidationError",
|
||
|
|
error_detail=str(exc),
|
||
|
|
sample=items[0] if isinstance(items[0], dict) else None,
|
||
|
|
parser_version=parser_version,
|
||
|
|
)
|
||
|
|
log.warning("contract_check.get_drift", imei=sample_imei, error=str(exc))
|
||
|
|
return
|
||
|
|
|
||
|
|
await _record(
|
||
|
|
endpoint=GET_METHOD,
|
||
|
|
target=None,
|
||
|
|
success=True,
|
||
|
|
error_class=None,
|
||
|
|
error_detail=None,
|
||
|
|
sample=items[0] if isinstance(items[0], dict) else None,
|
||
|
|
parser_version=parser_version,
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
async def run_checks(client: TracksolidClient, settings: Settings | None = None) -> None:
|
||
|
|
settings = settings or get_settings()
|
||
|
|
parser_version = settings.app_git_sha
|
||
|
|
|
||
|
|
await _check_token(client, parser_version=parser_version)
|
||
|
|
|
||
|
|
sample_imei: str | None = None
|
||
|
|
for target in _targets(settings):
|
||
|
|
imei = await _check_location_list(client, target, parser_version=parser_version)
|
||
|
|
if imei and sample_imei is None:
|
||
|
|
sample_imei = imei
|
||
|
|
|
||
|
|
if sample_imei is not None:
|
||
|
|
await _check_location_get(client, sample_imei, parser_version=parser_version)
|
||
|
|
else:
|
||
|
|
await _record(
|
||
|
|
endpoint=GET_METHOD,
|
||
|
|
target=None,
|
||
|
|
success=False,
|
||
|
|
error_class="NoSampleImei",
|
||
|
|
error_detail="no list.location call yielded an IMEI to probe location.get",
|
||
|
|
sample=None,
|
||
|
|
parser_version=parser_version,
|
||
|
|
)
|
||
|
|
log.warning("contract_check.get_skipped_no_sample_imei")
|
||
|
|
|
||
|
|
log.info("contract_check.complete")
|