Migration 18: ops.contract_check_log table — append-only log of probes
against the Tracksolid Pro endpoints we depend on.
New worker app/workers/contract_check.py — per run:
- jimi.oauth.token.get (token refresh succeeds)
- jimi.user.device.location.list per configured target (parse first item
with JimiPollFix)
- jimi.device.location.get with a sample IMEI from the list (parse first
item with JimiPollFix)
Each probe logs success or {error_class, error_detail, sample}; failures
are recorded, not raised.
slo_metrics now also computes contract_drift_days = days since the most-
recent successful probe of the laggard endpoint. With threshold 1d (from
mig 5), a single failed daily run flips the badge red within 24h.
cron entrypoint registers the check daily at 02:00 UTC plus once on
startup, gated on TRACKSOLID_APP_KEY + a configured target.
111 lines
3.8 KiB
Python
111 lines
3.8 KiB
Python
"""SLO measurement worker — task #12.
|
|
|
|
Every TRACKSOLID_POLL_INTERVAL_SEC (default 60s) we compute the current value
|
|
of each metric in `slo.targets` and append one row to `slo.measurements`. The
|
|
`slo.v_current_status` view does the rest — joining the latest measurement to
|
|
the target threshold so the dashboard panel can render green/red badges.
|
|
|
|
Metrics computed in P1:
|
|
fix_freshness_pct_60s → % of active devices whose latest fix is within
|
|
90 seconds (threshold 95)
|
|
parser_lag_p95_sec → p95 of (events.parsed.inserted_at - events.raw.received_at)
|
|
for events.parsed rows inserted in the last 5 minutes
|
|
(threshold 30s)
|
|
|
|
contract_drift_days → days since the most-recent successful probe of the
|
|
laggard endpoint in ops.contract_check_log
|
|
(threshold 1 day)
|
|
"""
|
|
|
|
import structlog
|
|
|
|
from app.db import get_pool
|
|
|
|
log = structlog.get_logger("worker.slo")
|
|
|
|
|
|
async def _record(metric: str, value: float) -> None:
|
|
pool = await get_pool()
|
|
async with pool.connection() as conn, conn.cursor() as cur:
|
|
await cur.execute(
|
|
"INSERT INTO slo.measurements (metric, value) VALUES (%s, %s)",
|
|
(metric, value),
|
|
)
|
|
|
|
|
|
async def _fix_freshness_pct_60s() -> float | None:
|
|
pool = await get_pool()
|
|
async with pool.connection() as conn, conn.cursor() as cur:
|
|
await cur.execute(
|
|
"""
|
|
SELECT 100.0
|
|
* count(*) FILTER (
|
|
WHERE lp.occurred_at > now() - interval '90 seconds'
|
|
)
|
|
/ NULLIF(count(*), 0) AS pct
|
|
FROM state.live_positions lp
|
|
JOIN domain.devices d ON d.imei = lp.imei
|
|
WHERE d.lifecycle = 'active'
|
|
"""
|
|
)
|
|
row = await cur.fetchone()
|
|
if row is None or row[0] is None:
|
|
return None
|
|
return float(row[0])
|
|
|
|
|
|
async def _parser_lag_p95_sec() -> float | None:
|
|
pool = await get_pool()
|
|
async with pool.connection() as conn, conn.cursor() as cur:
|
|
await cur.execute(
|
|
"""
|
|
SELECT EXTRACT(EPOCH FROM percentile_disc(0.95) WITHIN GROUP (
|
|
ORDER BY (p.inserted_at - r.received_at)
|
|
))
|
|
FROM events.parsed p
|
|
JOIN events.raw r
|
|
ON r.event_id = p.raw_event_id
|
|
AND r.received_at = p.raw_received_at
|
|
WHERE p.inserted_at > now() - interval '5 minutes'
|
|
"""
|
|
)
|
|
row = await cur.fetchone()
|
|
if row is None or row[0] is None:
|
|
return None
|
|
return float(row[0])
|
|
|
|
|
|
async def _contract_drift_days() -> float | None:
|
|
pool = await get_pool()
|
|
async with pool.connection() as conn, conn.cursor() as cur:
|
|
await cur.execute(
|
|
"""
|
|
SELECT EXTRACT(EPOCH FROM max(now() - latest)) / 86400.0
|
|
FROM (
|
|
SELECT endpoint, max(checked_at) AS latest
|
|
FROM ops.contract_check_log
|
|
WHERE success
|
|
GROUP BY endpoint
|
|
) e
|
|
"""
|
|
)
|
|
row = await cur.fetchone()
|
|
if row is None or row[0] is None:
|
|
return None
|
|
return float(row[0])
|
|
|
|
|
|
async def record_all() -> None:
|
|
measured: dict[str, float | None] = {
|
|
"fix_freshness_pct_60s": await _fix_freshness_pct_60s(),
|
|
"parser_lag_p95_sec": await _parser_lag_p95_sec(),
|
|
"contract_drift_days": await _contract_drift_days(),
|
|
}
|
|
written: dict[str, float] = {}
|
|
for metric, value in measured.items():
|
|
if value is None:
|
|
continue
|
|
await _record(metric, value)
|
|
written[metric] = round(value, 2)
|
|
if written:
|
|
log.info("slo.recorded", **{k: written[k] for k in written})
|