From 0f91668256d3638b4bd40cebca62755c55717d18 Mon Sep 17 00:00:00 2001 From: kianiadee Date: Wed, 27 May 2026 11:58:29 +0300 Subject: [PATCH] Contract checker (#13, PRD F1.10) + contract_drift_days SLO MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Migration 18: ops.contract_check_log table — append-only log of probes against the Tracksolid Pro endpoints we depend on. New worker app/workers/contract_check.py — per run: - jimi.oauth.token.get (token refresh succeeds) - jimi.user.device.location.list per configured target (parse first item with JimiPollFix) - jimi.device.location.get with a sample IMEI from the list (parse first item with JimiPollFix) Each probe logs success or {error_class, error_detail, sample}; failures are recorded, not raised. slo_metrics now also computes contract_drift_days = days since the most- recent successful probe of the laggard endpoint. With threshold 1d (from mig 5), a single failed daily run flips the badge red within 24h. cron entrypoint registers the check daily at 02:00 UTC plus once on startup, gated on TRACKSOLID_APP_KEY + a configured target. --- app/entrypoints/cron.py | 28 +- app/workers/contract_check.py | 257 ++++++++++++++++++ app/workers/slo_metrics.py | 27 +- .../20260601000018_ops_contract_check_log.sql | 31 +++ 4 files changed, 337 insertions(+), 6 deletions(-) create mode 100644 app/workers/contract_check.py create mode 100644 db/migrations/20260601000018_ops_contract_check_log.sql diff --git a/app/entrypoints/cron.py b/app/entrypoints/cron.py index b4a1dce..85c0642 100644 --- a/app/entrypoints/cron.py +++ b/app/entrypoints/cron.py @@ -5,8 +5,9 @@ time-triggered jobs. P1 jobs: - poll_live_positions : every TRACKSOLID_POLL_INTERVAL_SEC (default 60s) - poll_stale_imeis : every TRACKSOLID_STALE_POLL_INTERVAL_SEC (default 600s) - -SLO measurement worker (#12) and contract checker (#13) land here later. + - geocode_pending : every GEOCODER_TICK_SEC + - slo_record_all : every 60s (writes slo.measurements) + - contract_check : daily at 02:00 UTC (writes ops.contract_check_log) """ from collections.abc import AsyncIterator @@ -14,6 +15,7 @@ from contextlib import asynccontextmanager import structlog from apscheduler.schedulers.asyncio import AsyncIOScheduler +from apscheduler.triggers.cron import CronTrigger from apscheduler.triggers.interval import IntervalTrigger from fastapi import FastAPI @@ -22,7 +24,7 @@ from app.db import close_pool, get_pool from app.health import router as health_router from app.logging_setup import configure_logging from app.tracksolid.client import TracksolidClient -from app.workers import geocoder, poller, slo_metrics +from app.workers import contract_check, geocoder, poller, slo_metrics log = structlog.get_logger("cron") @@ -108,6 +110,26 @@ async def lifespan(_: FastAPI) -> AsyncIterator[None]: scheduler.add_job(slo_metrics.record_all, trigger="date", id="slo_initial") log.info("cron.slo_worker_registered") + # Contract checker — daily probe of Tracksolid endpoints (#13, PRD F1.10). + # Only registered when we actually have creds to call the API; otherwise + # the SLO stays at "unknown" rather than logging perpetual NoTarget errors. + async def _run_contract_check() -> None: + await contract_check.run_checks(client, settings) + + if has_target and settings.tracksolid_app_key: + scheduler.add_job( + _run_contract_check, + trigger=CronTrigger(hour=2, minute=0), # 02:00 UTC daily + id="contract_check_daily", + max_instances=1, + coalesce=True, + misfire_grace_time=3600, + ) + scheduler.add_job(_run_contract_check, trigger="date", id="contract_check_initial") + log.info("cron.contract_check_registered", schedule="daily 02:00 UTC") + else: + log.warning("cron.contract_check_skipped_missing_creds") + scheduler.start() log.info("cron.scheduler_started") diff --git a/app/workers/contract_check.py b/app/workers/contract_check.py new file mode 100644 index 0000000..dc733b1 --- /dev/null +++ b/app/workers/contract_check.py @@ -0,0 +1,257 @@ +"""Contract checker — task #13 (PRD F1.10). + +Daily probe of the Tracksolid Pro endpoints we depend on. For each endpoint, +call it, validate the response shape against the current Pydantic model, and +record one row in ops.contract_check_log. The SLO worker reads that log to +compute contract_drift_days = days since the most-recent successful probe of +the laggard endpoint. Threshold is 1 day, so a single failed daily run flips +the SLO badge red within 24h. + +Why P1 runs this against the live (not sandbox) account: there is no dedicated +sandbox subaccount yet (Q-for-Tracksolid in the onboarding doc). The probes +are read-only — token.get, location.list, location.get — so running them +against production is safe. When a sandbox account is provisioned, swap the +target in env and the check is unchanged. + +Failure is recorded, not raised. A crash in the checker would itself be a +silent contract drift on our side; we'd rather have a noisy log row than a +backtrace that someone has to notice. +""" + +from __future__ import annotations + +from typing import Any + +import httpx +import structlog +from psycopg.types.json import Jsonb +from pydantic import ValidationError + +from app.config import Settings, get_settings +from app.db import get_pool +from app.models.jimi import JimiPollFix +from app.tracksolid.client import TracksolidClient, TracksolidError +from app.workers.poller import _targets + +log = structlog.get_logger("worker.contract_check") + +TOKEN_METHOD = "jimi.oauth.token.get" +LIST_METHOD = "jimi.user.device.location.list" +GET_METHOD = "jimi.device.location.get" + + +async def _record( + *, + endpoint: str, + target: str | None, + success: bool, + error_class: str | None, + error_detail: str | None, + sample: dict[str, Any] | None, + parser_version: str, +) -> None: + pool = await get_pool() + async with pool.connection() as conn, conn.cursor() as cur: + await cur.execute( + """INSERT INTO ops.contract_check_log + (endpoint, target, success, error_class, error_detail, sample, parser_version) + VALUES (%s, %s, %s, %s, %s, %s, %s)""", + ( + endpoint, + target, + success, + error_class, + (error_detail[:2000] if error_detail else None), + Jsonb(sample) if sample is not None else None, + parser_version, + ), + ) + + +def _truncate_sample(payload: Any) -> dict[str, Any] | None: + """Pull one item out of a Jimi response for the audit row.""" + if isinstance(payload, dict): + result = payload.get("result") + if isinstance(result, list) and result: + return result[0] if isinstance(result[0], dict) else {"item": result[0]} + if isinstance(result, dict): + return result + return {k: payload[k] for k in list(payload.keys())[:8]} + return None + + +async def _check_token(client: TracksolidClient, *, parser_version: str) -> None: + try: + await client._ensure_token() # noqa: SLF001 — intentional probe + except (TracksolidError, httpx.HTTPError) as exc: + await _record( + endpoint=TOKEN_METHOD, + target=None, + success=False, + error_class=type(exc).__name__, + error_detail=str(exc), + sample=None, + parser_version=parser_version, + ) + log.warning("contract_check.token_failed", error=str(exc)) + return + await _record( + endpoint=TOKEN_METHOD, + target=None, + success=True, + error_class=None, + error_detail=None, + sample=None, + parser_version=parser_version, + ) + + +async def _check_location_list( + client: TracksolidClient, target: str, *, parser_version: str +) -> str | None: + """Returns a sample IMEI from the response, for use by _check_location_get.""" + try: + body = await client.location_list(target=target) + except (TracksolidError, httpx.HTTPError) as exc: + await _record( + endpoint=LIST_METHOD, + target=target, + success=False, + error_class=type(exc).__name__, + error_detail=str(exc), + sample=None, + parser_version=parser_version, + ) + log.warning("contract_check.list_failed", target=target, error=str(exc)) + return None + + result = body.get("result") if isinstance(body, dict) else None + items = result if isinstance(result, list) else [] + if not items: + await _record( + endpoint=LIST_METHOD, + target=target, + success=False, + error_class="EmptyResult", + error_detail="location.list returned no items", + sample=_truncate_sample(body), + parser_version=parser_version, + ) + log.warning("contract_check.list_empty", target=target) + return None + + try: + JimiPollFix.model_validate(items[0]) + except ValidationError as exc: + await _record( + endpoint=LIST_METHOD, + target=target, + success=False, + error_class="ValidationError", + error_detail=str(exc), + sample=items[0] if isinstance(items[0], dict) else None, + parser_version=parser_version, + ) + log.warning("contract_check.list_drift", target=target, error=str(exc)) + return None + + await _record( + endpoint=LIST_METHOD, + target=target, + success=True, + error_class=None, + error_detail=None, + sample=items[0] if isinstance(items[0], dict) else None, + parser_version=parser_version, + ) + first = items[0] if isinstance(items[0], dict) else {} + sample_imei = first.get("imei") or first.get("deviceImei") + return str(sample_imei) if sample_imei else None + + +async def _check_location_get( + client: TracksolidClient, sample_imei: str, *, parser_version: str +) -> None: + try: + body = await client.location_get([sample_imei]) + except (TracksolidError, httpx.HTTPError) as exc: + await _record( + endpoint=GET_METHOD, + target=None, + success=False, + error_class=type(exc).__name__, + error_detail=str(exc), + sample={"imei": sample_imei}, + parser_version=parser_version, + ) + log.warning("contract_check.get_failed", imei=sample_imei, error=str(exc)) + return + + result = body.get("result") if isinstance(body, dict) else None + items = result if isinstance(result, list) else [] + if not items: + await _record( + endpoint=GET_METHOD, + target=None, + success=False, + error_class="EmptyResult", + error_detail=f"location.get({sample_imei}) returned no items", + sample=_truncate_sample(body), + parser_version=parser_version, + ) + log.warning("contract_check.get_empty", imei=sample_imei) + return + + try: + JimiPollFix.model_validate(items[0]) + except ValidationError as exc: + await _record( + endpoint=GET_METHOD, + target=None, + success=False, + error_class="ValidationError", + error_detail=str(exc), + sample=items[0] if isinstance(items[0], dict) else None, + parser_version=parser_version, + ) + log.warning("contract_check.get_drift", imei=sample_imei, error=str(exc)) + return + + await _record( + endpoint=GET_METHOD, + target=None, + success=True, + error_class=None, + error_detail=None, + sample=items[0] if isinstance(items[0], dict) else None, + parser_version=parser_version, + ) + + +async def run_checks(client: TracksolidClient, settings: Settings | None = None) -> None: + settings = settings or get_settings() + parser_version = settings.app_git_sha + + await _check_token(client, parser_version=parser_version) + + sample_imei: str | None = None + for target in _targets(settings): + imei = await _check_location_list(client, target, parser_version=parser_version) + if imei and sample_imei is None: + sample_imei = imei + + if sample_imei is not None: + await _check_location_get(client, sample_imei, parser_version=parser_version) + else: + await _record( + endpoint=GET_METHOD, + target=None, + success=False, + error_class="NoSampleImei", + error_detail="no list.location call yielded an IMEI to probe location.get", + sample=None, + parser_version=parser_version, + ) + log.warning("contract_check.get_skipped_no_sample_imei") + + log.info("contract_check.complete") diff --git a/app/workers/slo_metrics.py b/app/workers/slo_metrics.py index 8b62913..7ada116 100644 --- a/app/workers/slo_metrics.py +++ b/app/workers/slo_metrics.py @@ -12,9 +12,9 @@ Metrics computed in P1: for events.parsed rows inserted in the last 5 minutes (threshold 30s) -Not yet computed (waiting on dependent work): - contract_drift_days → needs the daily Tracksolid contract checker (#13) - and an ops.contract_check_log table + contract_drift_days → days since the most-recent successful probe of the + laggard endpoint in ops.contract_check_log + (threshold 1 day) """ import structlog @@ -75,10 +75,31 @@ async def _parser_lag_p95_sec() -> float | None: return float(row[0]) +async def _contract_drift_days() -> float | None: + pool = await get_pool() + async with pool.connection() as conn, conn.cursor() as cur: + await cur.execute( + """ + SELECT EXTRACT(EPOCH FROM max(now() - latest)) / 86400.0 + FROM ( + SELECT endpoint, max(checked_at) AS latest + FROM ops.contract_check_log + WHERE success + GROUP BY endpoint + ) e + """ + ) + row = await cur.fetchone() + if row is None or row[0] is None: + return None + return float(row[0]) + + async def record_all() -> None: measured: dict[str, float | None] = { "fix_freshness_pct_60s": await _fix_freshness_pct_60s(), "parser_lag_p95_sec": await _parser_lag_p95_sec(), + "contract_drift_days": await _contract_drift_days(), } written: dict[str, float] = {} for metric, value in measured.items(): diff --git a/db/migrations/20260601000018_ops_contract_check_log.sql b/db/migrations/20260601000018_ops_contract_check_log.sql new file mode 100644 index 0000000..0cb03c7 --- /dev/null +++ b/db/migrations/20260601000018_ops_contract_check_log.sql @@ -0,0 +1,31 @@ +-- migrate:up +-- +-- Append-only log of contract-check probes against the Tracksolid Pro API. +-- One row per (endpoint, attempt). The SLO worker reads this to compute +-- contract_drift_days = days since the most-recent successful probe for the +-- laggard endpoint. Threshold 1 day → red badge if any endpoint hasn't +-- validated cleanly in the last 24h. + +CREATE SCHEMA IF NOT EXISTS ops; + +CREATE TABLE ops.contract_check_log ( + check_id bigserial PRIMARY KEY, + endpoint text NOT NULL, + target text, + success bool NOT NULL, + error_class text, + error_detail text, + sample jsonb, + parser_version text, + checked_at timestamptz NOT NULL DEFAULT now() +); + +CREATE INDEX contract_check_endpoint_time_idx + ON ops.contract_check_log (endpoint, checked_at DESC); + +CREATE INDEX contract_check_success_time_idx + ON ops.contract_check_log (success, checked_at DESC); + +-- migrate:down + +DROP TABLE IF EXISTS ops.contract_check_log;