diff --git a/app/entrypoints/cron.py b/app/entrypoints/cron.py index 71a5683..b4a1dce 100644 --- a/app/entrypoints/cron.py +++ b/app/entrypoints/cron.py @@ -22,7 +22,7 @@ from app.db import close_pool, get_pool from app.health import router as health_router from app.logging_setup import configure_logging from app.tracksolid.client import TracksolidClient -from app.workers import geocoder, poller +from app.workers import geocoder, poller, slo_metrics log = structlog.get_logger("cron") @@ -95,6 +95,19 @@ async def lifespan(_: FastAPI) -> AsyncIterator[None]: ) log.info("cron.geocoder_registered", tick_sec=settings.geocoder_tick_sec) + # SLO measurement worker — populates slo.measurements every 60s so + # slo.v_current_status (used by the dashboard SLO panel) has live data. + scheduler.add_job( + slo_metrics.record_all, + trigger=IntervalTrigger(seconds=60), + id="slo_record_all", + max_instances=1, + coalesce=True, + misfire_grace_time=30, + ) + scheduler.add_job(slo_metrics.record_all, trigger="date", id="slo_initial") + log.info("cron.slo_worker_registered") + scheduler.start() log.info("cron.scheduler_started") diff --git a/app/projectors/live_positions.py b/app/projectors/live_positions.py index a21438a..c0898c1 100644 --- a/app/projectors/live_positions.py +++ b/app/projectors/live_positions.py @@ -8,6 +8,7 @@ Ordering invariant: state.live_positions never moves backwards in occurred_at *per imei* (older fixes from the same device do not overwrite newer ones). """ +import re from datetime import datetime from typing import Any @@ -24,20 +25,35 @@ DRAIN_BATCH = 500 PROJECTED_FLAG_KEY = "live_positions_projected_at" +_PLATE_FROM_DEVICE_NAME = re.compile(r"^.* - (.+?)(?:_cam|_CAM)?$") + + +def _extract_plate_from_device_name(device_name: str | None) -> str | None: + """Same logic as serve._label_short / serve._driver_name in SQL — pulled into + Python so the auto-provisioner can link new devices to existing vehicle rows + by real plate (instead of creating a placeholder IMEI- row every time).""" + if not device_name: + return None + m = _PLATE_FROM_DEVICE_NAME.match(device_name) + if not m: + return None + candidate = m.group(1) + if not candidate or not re.search(r"[A-Z]", candidate) or not re.search(r"[0-9]", candidate): + return None + return candidate + + async def _resolve_device( - cur: AsyncCursor[Any], imei: str, *, account_id: str | None + cur: AsyncCursor[Any], imei: str, *, account_id: str | None, device_name: str | None ) -> int | None: """Returns vehicle_id for the device. - Auto-provisions on first sight: when the polling worker sees an IMEI we've - never seen before, we create a placeholder vehicle (plate = "IMEI-") - and a device row with lifecycle='active'. The fleet admin can rename the - plate later via the (forthcoming) admin UI; until then the device is fully - operational. - - Returns None only when the IMEI is known but unmapped (vehicle_id IS NULL), - which shouldn't happen via this auto-provision path but is preserved for - manual edits. + Auto-provisions on first sight. If device_name carries a derivable plate + (e.g. "John Mbugua - KDW 573B_cam" → "KDW 573B"), we link the new device to + the existing vehicle row that already has that plate — so a camera that + joins a truck already tracked by an X3 ends up on the same vehicle_id, and + serve.fn_live_view's tracker-first dedup picks the right fix. When the + plate isn't derivable we fall back to a placeholder "IMEI-LAST6" vehicle. """ await cur.execute( "SELECT vehicle_id FROM domain.devices WHERE imei = %s", @@ -57,16 +73,37 @@ async def _resolve_device( (account_id, account_id), ) - plate = f"IMEI-{imei[-6:]}" - await cur.execute( - """INSERT INTO domain.vehicles (plate) VALUES (%s) - ON CONFLICT (plate) DO UPDATE SET plate = EXCLUDED.plate - RETURNING vehicle_id""", - (plate,), - ) - row = await cur.fetchone() - assert row is not None - vehicle_id = int(row[0]) + real_plate = _extract_plate_from_device_name(device_name) + vehicle_id: int | None = None + + if real_plate is not None: + await cur.execute( + "SELECT vehicle_id FROM domain.vehicles WHERE plate = %s", + (real_plate,), + ) + row = await cur.fetchone() + if row is not None: + vehicle_id = int(row[0]) + log.info( + "projector.linked_to_existing_vehicle", + imei=imei, vehicle_id=vehicle_id, plate=real_plate, + ) + + if vehicle_id is None: + plate = real_plate or f"IMEI-{imei[-6:]}" + await cur.execute( + """INSERT INTO domain.vehicles (plate) VALUES (%s) + ON CONFLICT (plate) DO UPDATE SET plate = EXCLUDED.plate + RETURNING vehicle_id""", + (plate,), + ) + row = await cur.fetchone() + assert row is not None + vehicle_id = int(row[0]) + log.info( + "projector.auto_provisioned_vehicle", + imei=imei, vehicle_id=vehicle_id, plate=plate, + ) await cur.execute( """INSERT INTO domain.devices @@ -80,7 +117,6 @@ async def _resolve_device( (imei, account_id, vehicle_id), ) - log.info("projector.auto_provisioned_device", imei=imei, vehicle_id=vehicle_id, plate=plate) return vehicle_id @@ -96,7 +132,12 @@ async def _project_one( if lat is None or lng is None: return False - vehicle_id = await _resolve_device(cur, imei, account_id=payload.get("_account_id")) + vehicle_id = await _resolve_device( + cur, + imei, + account_id=payload.get("_account_id"), + device_name=payload.get("device_name"), + ) if vehicle_id is None: return False diff --git a/app/workers/slo_metrics.py b/app/workers/slo_metrics.py new file mode 100644 index 0000000..8b62913 --- /dev/null +++ b/app/workers/slo_metrics.py @@ -0,0 +1,90 @@ +"""SLO measurement worker — task #12. + +Every TRACKSOLID_POLL_INTERVAL_SEC (default 60s) we compute the current value +of each metric in `slo.targets` and append one row to `slo.measurements`. The +`slo.v_current_status` view does the rest — joining the latest measurement to +the target threshold so the dashboard panel can render green/red badges. + +Metrics computed in P1: + fix_freshness_pct_60s → % of active devices whose latest fix is within + 90 seconds (threshold 95) + parser_lag_p95_sec → p95 of (events.parsed.inserted_at - events.raw.received_at) + for events.parsed rows inserted in the last 5 minutes + (threshold 30s) + +Not yet computed (waiting on dependent work): + contract_drift_days → needs the daily Tracksolid contract checker (#13) + and an ops.contract_check_log table +""" + +import structlog + +from app.db import get_pool + +log = structlog.get_logger("worker.slo") + + +async def _record(metric: str, value: float) -> None: + pool = await get_pool() + async with pool.connection() as conn, conn.cursor() as cur: + await cur.execute( + "INSERT INTO slo.measurements (metric, value) VALUES (%s, %s)", + (metric, value), + ) + + +async def _fix_freshness_pct_60s() -> float | None: + pool = await get_pool() + async with pool.connection() as conn, conn.cursor() as cur: + await cur.execute( + """ + SELECT 100.0 + * count(*) FILTER ( + WHERE lp.occurred_at > now() - interval '90 seconds' + ) + / NULLIF(count(*), 0) AS pct + FROM state.live_positions lp + JOIN domain.devices d ON d.imei = lp.imei + WHERE d.lifecycle = 'active' + """ + ) + row = await cur.fetchone() + if row is None or row[0] is None: + return None + return float(row[0]) + + +async def _parser_lag_p95_sec() -> float | None: + pool = await get_pool() + async with pool.connection() as conn, conn.cursor() as cur: + await cur.execute( + """ + SELECT EXTRACT(EPOCH FROM percentile_disc(0.95) WITHIN GROUP ( + ORDER BY (p.inserted_at - r.received_at) + )) + FROM events.parsed p + JOIN events.raw r + ON r.event_id = p.raw_event_id + AND r.received_at = p.raw_received_at + WHERE p.inserted_at > now() - interval '5 minutes' + """ + ) + row = await cur.fetchone() + if row is None or row[0] is None: + return None + return float(row[0]) + + +async def record_all() -> None: + measured: dict[str, float | None] = { + "fix_freshness_pct_60s": await _fix_freshness_pct_60s(), + "parser_lag_p95_sec": await _parser_lag_p95_sec(), + } + written: dict[str, float] = {} + for metric, value in measured.items(): + if value is None: + continue + await _record(metric, value) + written[metric] = round(value, 2) + if written: + log.info("slo.recorded", **{k: written[k] for k in written}) diff --git a/db/migrations/20260601000014_real_plates_consolidate.sql b/db/migrations/20260601000014_real_plates_consolidate.sql new file mode 100644 index 0000000..1436ab5 --- /dev/null +++ b/db/migrations/20260601000014_real_plates_consolidate.sql @@ -0,0 +1,82 @@ +-- migrate:up +-- +-- One-shot plate consolidation: +-- 1. extract plate from each device's device_name (post " - ", strip _cam) +-- 2. for each plate appearing on >1 device (tracker+camera pair), pick the +-- lowest existing vehicle_id as canonical +-- 3. remap state.position_history, state.live_positions, domain.devices +-- to the canonical vehicle_id +-- 4. delete orphaned vehicle rows +-- 5. update canonical vehicles' plate to the extracted plate +-- +-- After this runs, domain.vehicles holds one row per physical plate. The +-- PRD F1.6 dedup rule in serve.fn_live_view picks tracker-first when both +-- a tracker and camera report for the same vehicle. +-- +-- Idempotent: re-running the migration only updates plates that have moved on. + +DO $migration$ +BEGIN + CREATE TEMP TABLE _plate_map ON COMMIT DROP AS + SELECT + d.imei, + d.vehicle_id AS current_vehicle_id, + regexp_replace( + (regexp_match(lp.device_name, '^.* - (.+)$'))[1], + '_(cam|CAM)$', '' + ) AS new_plate + FROM domain.devices d + JOIN state.live_positions lp ON lp.imei = d.imei + WHERE lp.device_name LIKE '% - %'; + + DELETE FROM _plate_map + WHERE new_plate IS NULL + OR new_plate = '' + OR new_plate !~ '[A-Z]' + OR new_plate !~ '[0-9]'; + + CREATE TEMP TABLE _canonical ON COMMIT DROP AS + SELECT new_plate, min(current_vehicle_id) AS canonical_vehicle_id + FROM _plate_map + GROUP BY new_plate; + + -- Drop UNIQUE so the multi-row plate assignment doesn't transiently violate + ALTER TABLE domain.vehicles DROP CONSTRAINT IF EXISTS vehicles_plate_key; + + UPDATE state.position_history ph + SET vehicle_id = c.canonical_vehicle_id + FROM _plate_map pm + JOIN _canonical c ON c.new_plate = pm.new_plate + WHERE ph.imei = pm.imei + AND ph.vehicle_id != c.canonical_vehicle_id; + + UPDATE state.live_positions lp + SET vehicle_id = c.canonical_vehicle_id + FROM _plate_map pm + JOIN _canonical c ON c.new_plate = pm.new_plate + WHERE lp.imei = pm.imei + AND lp.vehicle_id != c.canonical_vehicle_id; + + UPDATE domain.devices d + SET vehicle_id = c.canonical_vehicle_id + FROM _plate_map pm + JOIN _canonical c ON c.new_plate = pm.new_plate + WHERE d.imei = pm.imei + AND d.vehicle_id != c.canonical_vehicle_id; + + DELETE FROM domain.vehicles v + WHERE NOT EXISTS (SELECT 1 FROM domain.devices d WHERE d.vehicle_id = v.vehicle_id) + AND NOT EXISTS (SELECT 1 FROM state.live_positions lp WHERE lp.vehicle_id = v.vehicle_id) + AND NOT EXISTS (SELECT 1 FROM state.position_history ph WHERE ph.vehicle_id = v.vehicle_id); + + UPDATE domain.vehicles v + SET plate = c.new_plate, updated_at = now() + FROM _canonical c + WHERE v.vehicle_id = c.canonical_vehicle_id; + + ALTER TABLE domain.vehicles ADD CONSTRAINT vehicles_plate_key UNIQUE (plate); +END +$migration$; + +-- migrate:down +-- No-op: plate consolidation is a one-way operation.