Plate consolidation migration + projector links new devices to existing plates; SLO measurement worker (task #12)
Some checks are pending
build / lint-test (push) Waiting to run
build / build-push (push) Blocked by required conditions

This commit is contained in:
kianiadee 2026-05-23 23:42:45 +03:00
parent 8323f94a19
commit 6fd0d84560
4 changed files with 249 additions and 23 deletions

View file

@ -22,7 +22,7 @@ from app.db import close_pool, get_pool
from app.health import router as health_router from app.health import router as health_router
from app.logging_setup import configure_logging from app.logging_setup import configure_logging
from app.tracksolid.client import TracksolidClient from app.tracksolid.client import TracksolidClient
from app.workers import geocoder, poller from app.workers import geocoder, poller, slo_metrics
log = structlog.get_logger("cron") log = structlog.get_logger("cron")
@ -95,6 +95,19 @@ async def lifespan(_: FastAPI) -> AsyncIterator[None]:
) )
log.info("cron.geocoder_registered", tick_sec=settings.geocoder_tick_sec) log.info("cron.geocoder_registered", tick_sec=settings.geocoder_tick_sec)
# SLO measurement worker — populates slo.measurements every 60s so
# slo.v_current_status (used by the dashboard SLO panel) has live data.
scheduler.add_job(
slo_metrics.record_all,
trigger=IntervalTrigger(seconds=60),
id="slo_record_all",
max_instances=1,
coalesce=True,
misfire_grace_time=30,
)
scheduler.add_job(slo_metrics.record_all, trigger="date", id="slo_initial")
log.info("cron.slo_worker_registered")
scheduler.start() scheduler.start()
log.info("cron.scheduler_started") log.info("cron.scheduler_started")

View file

@ -8,6 +8,7 @@ Ordering invariant: state.live_positions never moves backwards in occurred_at
*per imei* (older fixes from the same device do not overwrite newer ones). *per imei* (older fixes from the same device do not overwrite newer ones).
""" """
import re
from datetime import datetime from datetime import datetime
from typing import Any from typing import Any
@ -24,20 +25,35 @@ DRAIN_BATCH = 500
PROJECTED_FLAG_KEY = "live_positions_projected_at" PROJECTED_FLAG_KEY = "live_positions_projected_at"
_PLATE_FROM_DEVICE_NAME = re.compile(r"^.* - (.+?)(?:_cam|_CAM)?$")
def _extract_plate_from_device_name(device_name: str | None) -> str | None:
"""Same logic as serve._label_short / serve._driver_name in SQL — pulled into
Python so the auto-provisioner can link new devices to existing vehicle rows
by real plate (instead of creating a placeholder IMEI- row every time)."""
if not device_name:
return None
m = _PLATE_FROM_DEVICE_NAME.match(device_name)
if not m:
return None
candidate = m.group(1)
if not candidate or not re.search(r"[A-Z]", candidate) or not re.search(r"[0-9]", candidate):
return None
return candidate
async def _resolve_device( async def _resolve_device(
cur: AsyncCursor[Any], imei: str, *, account_id: str | None cur: AsyncCursor[Any], imei: str, *, account_id: str | None, device_name: str | None
) -> int | None: ) -> int | None:
"""Returns vehicle_id for the device. """Returns vehicle_id for the device.
Auto-provisions on first sight: when the polling worker sees an IMEI we've Auto-provisions on first sight. If device_name carries a derivable plate
never seen before, we create a placeholder vehicle (plate = "IMEI-<last6>") (e.g. "John Mbugua - KDW 573B_cam" "KDW 573B"), we link the new device to
and a device row with lifecycle='active'. The fleet admin can rename the the existing vehicle row that already has that plate so a camera that
plate later via the (forthcoming) admin UI; until then the device is fully joins a truck already tracked by an X3 ends up on the same vehicle_id, and
operational. serve.fn_live_view's tracker-first dedup picks the right fix. When the
plate isn't derivable we fall back to a placeholder "IMEI-LAST6" vehicle.
Returns None only when the IMEI is known but unmapped (vehicle_id IS NULL),
which shouldn't happen via this auto-provision path but is preserved for
manual edits.
""" """
await cur.execute( await cur.execute(
"SELECT vehicle_id FROM domain.devices WHERE imei = %s", "SELECT vehicle_id FROM domain.devices WHERE imei = %s",
@ -57,16 +73,37 @@ async def _resolve_device(
(account_id, account_id), (account_id, account_id),
) )
plate = f"IMEI-{imei[-6:]}" real_plate = _extract_plate_from_device_name(device_name)
await cur.execute( vehicle_id: int | None = None
"""INSERT INTO domain.vehicles (plate) VALUES (%s)
ON CONFLICT (plate) DO UPDATE SET plate = EXCLUDED.plate if real_plate is not None:
RETURNING vehicle_id""", await cur.execute(
(plate,), "SELECT vehicle_id FROM domain.vehicles WHERE plate = %s",
) (real_plate,),
row = await cur.fetchone() )
assert row is not None row = await cur.fetchone()
vehicle_id = int(row[0]) if row is not None:
vehicle_id = int(row[0])
log.info(
"projector.linked_to_existing_vehicle",
imei=imei, vehicle_id=vehicle_id, plate=real_plate,
)
if vehicle_id is None:
plate = real_plate or f"IMEI-{imei[-6:]}"
await cur.execute(
"""INSERT INTO domain.vehicles (plate) VALUES (%s)
ON CONFLICT (plate) DO UPDATE SET plate = EXCLUDED.plate
RETURNING vehicle_id""",
(plate,),
)
row = await cur.fetchone()
assert row is not None
vehicle_id = int(row[0])
log.info(
"projector.auto_provisioned_vehicle",
imei=imei, vehicle_id=vehicle_id, plate=plate,
)
await cur.execute( await cur.execute(
"""INSERT INTO domain.devices """INSERT INTO domain.devices
@ -80,7 +117,6 @@ async def _resolve_device(
(imei, account_id, vehicle_id), (imei, account_id, vehicle_id),
) )
log.info("projector.auto_provisioned_device", imei=imei, vehicle_id=vehicle_id, plate=plate)
return vehicle_id return vehicle_id
@ -96,7 +132,12 @@ async def _project_one(
if lat is None or lng is None: if lat is None or lng is None:
return False return False
vehicle_id = await _resolve_device(cur, imei, account_id=payload.get("_account_id")) vehicle_id = await _resolve_device(
cur,
imei,
account_id=payload.get("_account_id"),
device_name=payload.get("device_name"),
)
if vehicle_id is None: if vehicle_id is None:
return False return False

View file

@ -0,0 +1,90 @@
"""SLO measurement worker — task #12.
Every TRACKSOLID_POLL_INTERVAL_SEC (default 60s) we compute the current value
of each metric in `slo.targets` and append one row to `slo.measurements`. The
`slo.v_current_status` view does the rest joining the latest measurement to
the target threshold so the dashboard panel can render green/red badges.
Metrics computed in P1:
fix_freshness_pct_60s % of active devices whose latest fix is within
90 seconds (threshold 95)
parser_lag_p95_sec p95 of (events.parsed.inserted_at - events.raw.received_at)
for events.parsed rows inserted in the last 5 minutes
(threshold 30s)
Not yet computed (waiting on dependent work):
contract_drift_days needs the daily Tracksolid contract checker (#13)
and an ops.contract_check_log table
"""
import structlog
from app.db import get_pool
log = structlog.get_logger("worker.slo")
async def _record(metric: str, value: float) -> None:
pool = await get_pool()
async with pool.connection() as conn, conn.cursor() as cur:
await cur.execute(
"INSERT INTO slo.measurements (metric, value) VALUES (%s, %s)",
(metric, value),
)
async def _fix_freshness_pct_60s() -> float | None:
pool = await get_pool()
async with pool.connection() as conn, conn.cursor() as cur:
await cur.execute(
"""
SELECT 100.0
* count(*) FILTER (
WHERE lp.occurred_at > now() - interval '90 seconds'
)
/ NULLIF(count(*), 0) AS pct
FROM state.live_positions lp
JOIN domain.devices d ON d.imei = lp.imei
WHERE d.lifecycle = 'active'
"""
)
row = await cur.fetchone()
if row is None or row[0] is None:
return None
return float(row[0])
async def _parser_lag_p95_sec() -> float | None:
pool = await get_pool()
async with pool.connection() as conn, conn.cursor() as cur:
await cur.execute(
"""
SELECT EXTRACT(EPOCH FROM percentile_disc(0.95) WITHIN GROUP (
ORDER BY (p.inserted_at - r.received_at)
))
FROM events.parsed p
JOIN events.raw r
ON r.event_id = p.raw_event_id
AND r.received_at = p.raw_received_at
WHERE p.inserted_at > now() - interval '5 minutes'
"""
)
row = await cur.fetchone()
if row is None or row[0] is None:
return None
return float(row[0])
async def record_all() -> None:
measured: dict[str, float | None] = {
"fix_freshness_pct_60s": await _fix_freshness_pct_60s(),
"parser_lag_p95_sec": await _parser_lag_p95_sec(),
}
written: dict[str, float] = {}
for metric, value in measured.items():
if value is None:
continue
await _record(metric, value)
written[metric] = round(value, 2)
if written:
log.info("slo.recorded", **{k: written[k] for k in written})

View file

@ -0,0 +1,82 @@
-- migrate:up
--
-- One-shot plate consolidation:
-- 1. extract plate from each device's device_name (post " - ", strip _cam)
-- 2. for each plate appearing on >1 device (tracker+camera pair), pick the
-- lowest existing vehicle_id as canonical
-- 3. remap state.position_history, state.live_positions, domain.devices
-- to the canonical vehicle_id
-- 4. delete orphaned vehicle rows
-- 5. update canonical vehicles' plate to the extracted plate
--
-- After this runs, domain.vehicles holds one row per physical plate. The
-- PRD F1.6 dedup rule in serve.fn_live_view picks tracker-first when both
-- a tracker and camera report for the same vehicle.
--
-- Idempotent: re-running the migration only updates plates that have moved on.
DO $migration$
BEGIN
CREATE TEMP TABLE _plate_map ON COMMIT DROP AS
SELECT
d.imei,
d.vehicle_id AS current_vehicle_id,
regexp_replace(
(regexp_match(lp.device_name, '^.* - (.+)$'))[1],
'_(cam|CAM)$', ''
) AS new_plate
FROM domain.devices d
JOIN state.live_positions lp ON lp.imei = d.imei
WHERE lp.device_name LIKE '% - %';
DELETE FROM _plate_map
WHERE new_plate IS NULL
OR new_plate = ''
OR new_plate !~ '[A-Z]'
OR new_plate !~ '[0-9]';
CREATE TEMP TABLE _canonical ON COMMIT DROP AS
SELECT new_plate, min(current_vehicle_id) AS canonical_vehicle_id
FROM _plate_map
GROUP BY new_plate;
-- Drop UNIQUE so the multi-row plate assignment doesn't transiently violate
ALTER TABLE domain.vehicles DROP CONSTRAINT IF EXISTS vehicles_plate_key;
UPDATE state.position_history ph
SET vehicle_id = c.canonical_vehicle_id
FROM _plate_map pm
JOIN _canonical c ON c.new_plate = pm.new_plate
WHERE ph.imei = pm.imei
AND ph.vehicle_id != c.canonical_vehicle_id;
UPDATE state.live_positions lp
SET vehicle_id = c.canonical_vehicle_id
FROM _plate_map pm
JOIN _canonical c ON c.new_plate = pm.new_plate
WHERE lp.imei = pm.imei
AND lp.vehicle_id != c.canonical_vehicle_id;
UPDATE domain.devices d
SET vehicle_id = c.canonical_vehicle_id
FROM _plate_map pm
JOIN _canonical c ON c.new_plate = pm.new_plate
WHERE d.imei = pm.imei
AND d.vehicle_id != c.canonical_vehicle_id;
DELETE FROM domain.vehicles v
WHERE NOT EXISTS (SELECT 1 FROM domain.devices d WHERE d.vehicle_id = v.vehicle_id)
AND NOT EXISTS (SELECT 1 FROM state.live_positions lp WHERE lp.vehicle_id = v.vehicle_id)
AND NOT EXISTS (SELECT 1 FROM state.position_history ph WHERE ph.vehicle_id = v.vehicle_id);
UPDATE domain.vehicles v
SET plate = c.new_plate, updated_at = now()
FROM _canonical c
WHERE v.vehicle_id = c.canonical_vehicle_id;
ALTER TABLE domain.vehicles ADD CONSTRAINT vehicles_plate_key UNIQUE (plate);
END
$migration$;
-- migrate:down
-- No-op: plate consolidation is a one-way operation.