Contract checker (#13, PRD F1.10) + contract_drift_days SLO
Some checks are pending
build / lint-test (push) Waiting to run
build / build-push (push) Blocked by required conditions

Migration 18: ops.contract_check_log table — append-only log of probes
against the Tracksolid Pro endpoints we depend on.

New worker app/workers/contract_check.py — per run:
  - jimi.oauth.token.get   (token refresh succeeds)
  - jimi.user.device.location.list per configured target (parse first item
    with JimiPollFix)
  - jimi.device.location.get with a sample IMEI from the list (parse first
    item with JimiPollFix)
Each probe logs success or {error_class, error_detail, sample}; failures
are recorded, not raised.

slo_metrics now also computes contract_drift_days = days since the most-
recent successful probe of the laggard endpoint. With threshold 1d (from
mig 5), a single failed daily run flips the badge red within 24h.

cron entrypoint registers the check daily at 02:00 UTC plus once on
startup, gated on TRACKSOLID_APP_KEY + a configured target.
This commit is contained in:
kianiadee 2026-05-27 11:58:29 +03:00
parent 495bb2bd71
commit 0f91668256
4 changed files with 337 additions and 6 deletions

View file

@ -5,8 +5,9 @@ time-triggered jobs. P1 jobs:
- poll_live_positions : every TRACKSOLID_POLL_INTERVAL_SEC (default 60s) - poll_live_positions : every TRACKSOLID_POLL_INTERVAL_SEC (default 60s)
- poll_stale_imeis : every TRACKSOLID_STALE_POLL_INTERVAL_SEC (default 600s) - poll_stale_imeis : every TRACKSOLID_STALE_POLL_INTERVAL_SEC (default 600s)
- geocode_pending : every GEOCODER_TICK_SEC
SLO measurement worker (#12) and contract checker (#13) land here later. - slo_record_all : every 60s (writes slo.measurements)
- contract_check : daily at 02:00 UTC (writes ops.contract_check_log)
""" """
from collections.abc import AsyncIterator from collections.abc import AsyncIterator
@ -14,6 +15,7 @@ from contextlib import asynccontextmanager
import structlog import structlog
from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger from apscheduler.triggers.interval import IntervalTrigger
from fastapi import FastAPI from fastapi import FastAPI
@ -22,7 +24,7 @@ from app.db import close_pool, get_pool
from app.health import router as health_router from app.health import router as health_router
from app.logging_setup import configure_logging from app.logging_setup import configure_logging
from app.tracksolid.client import TracksolidClient from app.tracksolid.client import TracksolidClient
from app.workers import geocoder, poller, slo_metrics from app.workers import contract_check, geocoder, poller, slo_metrics
log = structlog.get_logger("cron") log = structlog.get_logger("cron")
@ -108,6 +110,26 @@ async def lifespan(_: FastAPI) -> AsyncIterator[None]:
scheduler.add_job(slo_metrics.record_all, trigger="date", id="slo_initial") scheduler.add_job(slo_metrics.record_all, trigger="date", id="slo_initial")
log.info("cron.slo_worker_registered") log.info("cron.slo_worker_registered")
# Contract checker — daily probe of Tracksolid endpoints (#13, PRD F1.10).
# Only registered when we actually have creds to call the API; otherwise
# the SLO stays at "unknown" rather than logging perpetual NoTarget errors.
async def _run_contract_check() -> None:
await contract_check.run_checks(client, settings)
if has_target and settings.tracksolid_app_key:
scheduler.add_job(
_run_contract_check,
trigger=CronTrigger(hour=2, minute=0), # 02:00 UTC daily
id="contract_check_daily",
max_instances=1,
coalesce=True,
misfire_grace_time=3600,
)
scheduler.add_job(_run_contract_check, trigger="date", id="contract_check_initial")
log.info("cron.contract_check_registered", schedule="daily 02:00 UTC")
else:
log.warning("cron.contract_check_skipped_missing_creds")
scheduler.start() scheduler.start()
log.info("cron.scheduler_started") log.info("cron.scheduler_started")

View file

@ -0,0 +1,257 @@
"""Contract checker — task #13 (PRD F1.10).
Daily probe of the Tracksolid Pro endpoints we depend on. For each endpoint,
call it, validate the response shape against the current Pydantic model, and
record one row in ops.contract_check_log. The SLO worker reads that log to
compute contract_drift_days = days since the most-recent successful probe of
the laggard endpoint. Threshold is 1 day, so a single failed daily run flips
the SLO badge red within 24h.
Why P1 runs this against the live (not sandbox) account: there is no dedicated
sandbox subaccount yet (Q-for-Tracksolid in the onboarding doc). The probes
are read-only token.get, location.list, location.get so running them
against production is safe. When a sandbox account is provisioned, swap the
target in env and the check is unchanged.
Failure is recorded, not raised. A crash in the checker would itself be a
silent contract drift on our side; we'd rather have a noisy log row than a
backtrace that someone has to notice.
"""
from __future__ import annotations
from typing import Any
import httpx
import structlog
from psycopg.types.json import Jsonb
from pydantic import ValidationError
from app.config import Settings, get_settings
from app.db import get_pool
from app.models.jimi import JimiPollFix
from app.tracksolid.client import TracksolidClient, TracksolidError
from app.workers.poller import _targets
log = structlog.get_logger("worker.contract_check")
TOKEN_METHOD = "jimi.oauth.token.get"
LIST_METHOD = "jimi.user.device.location.list"
GET_METHOD = "jimi.device.location.get"
async def _record(
*,
endpoint: str,
target: str | None,
success: bool,
error_class: str | None,
error_detail: str | None,
sample: dict[str, Any] | None,
parser_version: str,
) -> None:
pool = await get_pool()
async with pool.connection() as conn, conn.cursor() as cur:
await cur.execute(
"""INSERT INTO ops.contract_check_log
(endpoint, target, success, error_class, error_detail, sample, parser_version)
VALUES (%s, %s, %s, %s, %s, %s, %s)""",
(
endpoint,
target,
success,
error_class,
(error_detail[:2000] if error_detail else None),
Jsonb(sample) if sample is not None else None,
parser_version,
),
)
def _truncate_sample(payload: Any) -> dict[str, Any] | None:
"""Pull one item out of a Jimi response for the audit row."""
if isinstance(payload, dict):
result = payload.get("result")
if isinstance(result, list) and result:
return result[0] if isinstance(result[0], dict) else {"item": result[0]}
if isinstance(result, dict):
return result
return {k: payload[k] for k in list(payload.keys())[:8]}
return None
async def _check_token(client: TracksolidClient, *, parser_version: str) -> None:
try:
await client._ensure_token() # noqa: SLF001 — intentional probe
except (TracksolidError, httpx.HTTPError) as exc:
await _record(
endpoint=TOKEN_METHOD,
target=None,
success=False,
error_class=type(exc).__name__,
error_detail=str(exc),
sample=None,
parser_version=parser_version,
)
log.warning("contract_check.token_failed", error=str(exc))
return
await _record(
endpoint=TOKEN_METHOD,
target=None,
success=True,
error_class=None,
error_detail=None,
sample=None,
parser_version=parser_version,
)
async def _check_location_list(
client: TracksolidClient, target: str, *, parser_version: str
) -> str | None:
"""Returns a sample IMEI from the response, for use by _check_location_get."""
try:
body = await client.location_list(target=target)
except (TracksolidError, httpx.HTTPError) as exc:
await _record(
endpoint=LIST_METHOD,
target=target,
success=False,
error_class=type(exc).__name__,
error_detail=str(exc),
sample=None,
parser_version=parser_version,
)
log.warning("contract_check.list_failed", target=target, error=str(exc))
return None
result = body.get("result") if isinstance(body, dict) else None
items = result if isinstance(result, list) else []
if not items:
await _record(
endpoint=LIST_METHOD,
target=target,
success=False,
error_class="EmptyResult",
error_detail="location.list returned no items",
sample=_truncate_sample(body),
parser_version=parser_version,
)
log.warning("contract_check.list_empty", target=target)
return None
try:
JimiPollFix.model_validate(items[0])
except ValidationError as exc:
await _record(
endpoint=LIST_METHOD,
target=target,
success=False,
error_class="ValidationError",
error_detail=str(exc),
sample=items[0] if isinstance(items[0], dict) else None,
parser_version=parser_version,
)
log.warning("contract_check.list_drift", target=target, error=str(exc))
return None
await _record(
endpoint=LIST_METHOD,
target=target,
success=True,
error_class=None,
error_detail=None,
sample=items[0] if isinstance(items[0], dict) else None,
parser_version=parser_version,
)
first = items[0] if isinstance(items[0], dict) else {}
sample_imei = first.get("imei") or first.get("deviceImei")
return str(sample_imei) if sample_imei else None
async def _check_location_get(
client: TracksolidClient, sample_imei: str, *, parser_version: str
) -> None:
try:
body = await client.location_get([sample_imei])
except (TracksolidError, httpx.HTTPError) as exc:
await _record(
endpoint=GET_METHOD,
target=None,
success=False,
error_class=type(exc).__name__,
error_detail=str(exc),
sample={"imei": sample_imei},
parser_version=parser_version,
)
log.warning("contract_check.get_failed", imei=sample_imei, error=str(exc))
return
result = body.get("result") if isinstance(body, dict) else None
items = result if isinstance(result, list) else []
if not items:
await _record(
endpoint=GET_METHOD,
target=None,
success=False,
error_class="EmptyResult",
error_detail=f"location.get({sample_imei}) returned no items",
sample=_truncate_sample(body),
parser_version=parser_version,
)
log.warning("contract_check.get_empty", imei=sample_imei)
return
try:
JimiPollFix.model_validate(items[0])
except ValidationError as exc:
await _record(
endpoint=GET_METHOD,
target=None,
success=False,
error_class="ValidationError",
error_detail=str(exc),
sample=items[0] if isinstance(items[0], dict) else None,
parser_version=parser_version,
)
log.warning("contract_check.get_drift", imei=sample_imei, error=str(exc))
return
await _record(
endpoint=GET_METHOD,
target=None,
success=True,
error_class=None,
error_detail=None,
sample=items[0] if isinstance(items[0], dict) else None,
parser_version=parser_version,
)
async def run_checks(client: TracksolidClient, settings: Settings | None = None) -> None:
settings = settings or get_settings()
parser_version = settings.app_git_sha
await _check_token(client, parser_version=parser_version)
sample_imei: str | None = None
for target in _targets(settings):
imei = await _check_location_list(client, target, parser_version=parser_version)
if imei and sample_imei is None:
sample_imei = imei
if sample_imei is not None:
await _check_location_get(client, sample_imei, parser_version=parser_version)
else:
await _record(
endpoint=GET_METHOD,
target=None,
success=False,
error_class="NoSampleImei",
error_detail="no list.location call yielded an IMEI to probe location.get",
sample=None,
parser_version=parser_version,
)
log.warning("contract_check.get_skipped_no_sample_imei")
log.info("contract_check.complete")

View file

@ -12,9 +12,9 @@ Metrics computed in P1:
for events.parsed rows inserted in the last 5 minutes for events.parsed rows inserted in the last 5 minutes
(threshold 30s) (threshold 30s)
Not yet computed (waiting on dependent work): contract_drift_days days since the most-recent successful probe of the
contract_drift_days needs the daily Tracksolid contract checker (#13) laggard endpoint in ops.contract_check_log
and an ops.contract_check_log table (threshold 1 day)
""" """
import structlog import structlog
@ -75,10 +75,31 @@ async def _parser_lag_p95_sec() -> float | None:
return float(row[0]) return float(row[0])
async def _contract_drift_days() -> float | None:
pool = await get_pool()
async with pool.connection() as conn, conn.cursor() as cur:
await cur.execute(
"""
SELECT EXTRACT(EPOCH FROM max(now() - latest)) / 86400.0
FROM (
SELECT endpoint, max(checked_at) AS latest
FROM ops.contract_check_log
WHERE success
GROUP BY endpoint
) e
"""
)
row = await cur.fetchone()
if row is None or row[0] is None:
return None
return float(row[0])
async def record_all() -> None: async def record_all() -> None:
measured: dict[str, float | None] = { measured: dict[str, float | None] = {
"fix_freshness_pct_60s": await _fix_freshness_pct_60s(), "fix_freshness_pct_60s": await _fix_freshness_pct_60s(),
"parser_lag_p95_sec": await _parser_lag_p95_sec(), "parser_lag_p95_sec": await _parser_lag_p95_sec(),
"contract_drift_days": await _contract_drift_days(),
} }
written: dict[str, float] = {} written: dict[str, float] = {}
for metric, value in measured.items(): for metric, value in measured.items():

View file

@ -0,0 +1,31 @@
-- migrate:up
--
-- Append-only log of contract-check probes against the Tracksolid Pro API.
-- One row per (endpoint, attempt). The SLO worker reads this to compute
-- contract_drift_days = days since the most-recent successful probe for the
-- laggard endpoint. Threshold 1 day → red badge if any endpoint hasn't
-- validated cleanly in the last 24h.
CREATE SCHEMA IF NOT EXISTS ops;
CREATE TABLE ops.contract_check_log (
check_id bigserial PRIMARY KEY,
endpoint text NOT NULL,
target text,
success bool NOT NULL,
error_class text,
error_detail text,
sample jsonb,
parser_version text,
checked_at timestamptz NOT NULL DEFAULT now()
);
CREATE INDEX contract_check_endpoint_time_idx
ON ops.contract_check_log (endpoint, checked_at DESC);
CREATE INDEX contract_check_success_time_idx
ON ops.contract_check_log (success, checked_at DESC);
-- migrate:down
DROP TABLE IF EXISTS ops.contract_check_log;