fleet-platform/app/workers/slo_metrics.py

112 lines
3.8 KiB
Python
Raw Permalink Normal View History

"""SLO measurement worker — task #12.
Every TRACKSOLID_POLL_INTERVAL_SEC (default 60s) we compute the current value
of each metric in `slo.targets` and append one row to `slo.measurements`. The
`slo.v_current_status` view does the rest joining the latest measurement to
the target threshold so the dashboard panel can render green/red badges.
Metrics computed in P1:
fix_freshness_pct_60s % of active devices whose latest fix is within
90 seconds (threshold 95)
parser_lag_p95_sec p95 of (events.parsed.inserted_at - events.raw.received_at)
for events.parsed rows inserted in the last 5 minutes
(threshold 30s)
contract_drift_days days since the most-recent successful probe of the
laggard endpoint in ops.contract_check_log
(threshold 1 day)
"""
import structlog
from app.db import get_pool
log = structlog.get_logger("worker.slo")
async def _record(metric: str, value: float) -> None:
pool = await get_pool()
async with pool.connection() as conn, conn.cursor() as cur:
await cur.execute(
"INSERT INTO slo.measurements (metric, value) VALUES (%s, %s)",
(metric, value),
)
async def _fix_freshness_pct_60s() -> float | None:
pool = await get_pool()
async with pool.connection() as conn, conn.cursor() as cur:
await cur.execute(
"""
SELECT 100.0
* count(*) FILTER (
WHERE lp.occurred_at > now() - interval '90 seconds'
)
/ NULLIF(count(*), 0) AS pct
FROM state.live_positions lp
JOIN domain.devices d ON d.imei = lp.imei
WHERE d.lifecycle = 'active'
"""
)
row = await cur.fetchone()
if row is None or row[0] is None:
return None
return float(row[0])
async def _parser_lag_p95_sec() -> float | None:
pool = await get_pool()
async with pool.connection() as conn, conn.cursor() as cur:
await cur.execute(
"""
SELECT EXTRACT(EPOCH FROM percentile_disc(0.95) WITHIN GROUP (
ORDER BY (p.inserted_at - r.received_at)
))
FROM events.parsed p
JOIN events.raw r
ON r.event_id = p.raw_event_id
AND r.received_at = p.raw_received_at
WHERE p.inserted_at > now() - interval '5 minutes'
"""
)
row = await cur.fetchone()
if row is None or row[0] is None:
return None
return float(row[0])
async def _contract_drift_days() -> float | None:
pool = await get_pool()
async with pool.connection() as conn, conn.cursor() as cur:
await cur.execute(
"""
SELECT EXTRACT(EPOCH FROM max(now() - latest)) / 86400.0
FROM (
SELECT endpoint, max(checked_at) AS latest
FROM ops.contract_check_log
WHERE success
GROUP BY endpoint
) e
"""
)
row = await cur.fetchone()
if row is None or row[0] is None:
return None
return float(row[0])
async def record_all() -> None:
measured: dict[str, float | None] = {
"fix_freshness_pct_60s": await _fix_freshness_pct_60s(),
"parser_lag_p95_sec": await _parser_lag_p95_sec(),
"contract_drift_days": await _contract_drift_days(),
}
written: dict[str, float] = {}
for metric, value in measured.items():
if value is None:
continue
await _record(metric, value)
written[metric] = round(value, 2)
if written:
log.info("slo.recorded", **{k: written[k] for k in written})