Cherry-pick ofc8f5907(originally FIX-M20 on main) onto quality-program-2026-04-12 — renamed to FIX-M21 here to avoid clashing with this branch's existing [FIX-M20] (trip enrichment, commit144dede). Behaviour and code are unchanged from the main-branch original; the annotation tag is the only difference. Background ---------- A field audit of liveposition.rahamafresh.com on 2026-05-21 surfaced two freshness gaps that share a single root cause: tracksolid.live_positions was being written by only one path (the 60s polled sweep), and that path silently omits devices that don't have a "current" fix in Jimi's location.list response. Effect on the dashboard: * 18 vehicles show OFFLINE for days-to-months — last fix is whatever the sweep wrote before Jimi dropped them. * 3 vehicles (KDK 780K, KCQ 618K, KCZ 476E) depend on dashcam fallback because their dedicated tracker has been silent; the camera's lat/lng arrives via /pushalarm webhooks (5,287/day, 100% lat/lng fill) but we discard it after writing to tracksolid.alarms. Verified upstream subscription state: only /pushalarm is registered with Jimi; the n8n forwarders for /pushgps, /pushtripreport, /pushobd are inactive. This change uses only data that already arrives. What's in this commit --------------------- ts_shared_rev.py * upsert_live_position(cur, imei, lat, lng, gps_time, ..., extras=None) — single time-guarded upsert all three writers will share. Guards on is_valid_fix() (filters Zero-Island and out-of-range) and EXCLUDED.gps_time > stored.gps_time so late-arriving alarms or webhook retries can't rewind a fresher marker. COALESCE on optional columns so sparse callers don't blank dense ones' values. * get_stale_imeis(stale_minutes=30) — SELECT enabled_flag=1 devices whose live_positions.gps_time is NULL or older than the threshold, ordered NULLS FIRST so worst-offenders are in batch #1. * ensure_device(cur, imei, device_name=None) — relocated from webhook_receiver_rev so every live_positions writer can satisfy the FK without re-defining the helper. The original underscore-prefixed name in webhook_receiver_rev becomes a backwards-compat alias. webhook_receiver_rev.py * /pushalarm — after the alarm row insert, call upsert_live_position with the alarm's lat/lng and alarmTime. Sits inside the existing per-item SAVEPOINT, so a cross-feed failure rolls back only that one alarm's cross-feed, not the alarm row. ingest_movement_rev.py * poll_live_positions — inline INSERT replaced with upsert_live_position (extras dict carries the sweep-only columns). Same data, time-guarded. * get_device_locations — inline INSERT replaced; also gains an ensure_device call so it can be safely fed arbitrary IMEIs. * poll_stale_locations() — new wrapper. Pulls get_stale_imeis() and hands it to get_device_locations. Scheduled every 10 minutes plus a startup catch-up call. Uses jimi.device.location.get which returns *last-known* fix, so devices the 60s sweep drops can be re-warmed. Expected post-deploy effect (estimates, see 06_live_location/260521_timescale_location_upgrade_major.md §4) * ~1,100-1,600 additional live_positions upserts/day from the alarm cross-feed, after the time-guard rejects ~70-80% of races vs the fresher 60s sweep. * The 3 camera-fallback plates flip to "seconds-after-alarm" cadence (JC400P emits ~107 alarms/day per device). * 8-14 of the 24 OFFLINE plates expected to recover via location.get's last-known-fix path within the first 30 minutes. * Dashboard's "Offline 24h+" KPI: 24 → 10-14 within the first hour. * No 06_live_location code changes required — reads through reporting.v_live_positions transparently. Tests ----- 12 webhook integration tests pass (3 new: cross-feed fires on valid fix; skips without lat/lng; skips Zero-Island). 8 new unit tests in test_stale_imeis.py cover the stale selector, the poll wrapper, and the time-guard contract on upsert_live_position. Full suite: 77 passed. Deployment ---------- No schema migration. Both webhook_receiver and ingest_movement containers must be rebuilt — source is image-baked, not bind-mounted. Rollback is git revert + rebuild. Plan & monitoring SQL: 06_live_location/260521_timescale_location_upgrade_major.md Verification playbook: 06_live_location/260521_timescale_location_upgrade_verification.md Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
112 lines
4.5 KiB
Python
112 lines
4.5 KiB
Python
"""Unit tests for the FIX-M21 stale-IMEI recovery helpers.
|
|
|
|
Covers:
|
|
- ts_shared_rev.get_stale_imeis — the SQL selector
|
|
- ingest_movement_rev.poll_stale_locations — the scheduler wrapper
|
|
"""
|
|
import sys
|
|
import os
|
|
import pytest
|
|
from unittest.mock import MagicMock, patch
|
|
from contextlib import contextmanager
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
|
|
|
|
os.environ.setdefault("TRACKSOLID_APP_KEY", "test_key")
|
|
os.environ.setdefault("TRACKSOLID_APP_SECRET", "test_secret")
|
|
os.environ.setdefault("TRACKSOLID_USER_ID", "test_user")
|
|
os.environ.setdefault("TRACKSOLID_PWD_MD5", "test_md5")
|
|
os.environ.setdefault("DATABASE_URL", "postgresql://test:test@localhost:5432/test")
|
|
|
|
|
|
def _make_mock_conn(rows=None):
|
|
"""Cursor/connection double that returns `rows` from fetchall()."""
|
|
mock_cur = MagicMock()
|
|
mock_cur.fetchall.return_value = rows or []
|
|
mock_conn = MagicMock()
|
|
mock_conn.cursor.return_value.__enter__ = lambda s: mock_cur
|
|
mock_conn.cursor.return_value.__exit__ = MagicMock(return_value=False)
|
|
return mock_conn, mock_cur
|
|
|
|
|
|
@contextmanager
|
|
def _conn_ctx(mock_conn):
|
|
yield mock_conn
|
|
|
|
|
|
class TestGetStaleImeis:
|
|
def test_returns_devices_with_old_gps_time(self):
|
|
from ts_shared_rev import get_stale_imeis
|
|
mock_conn, mock_cur = _make_mock_conn(rows=[("imei_a",), ("imei_b",)])
|
|
with patch("ts_shared_rev.get_conn") as mock_get_conn:
|
|
mock_get_conn.return_value = _conn_ctx(mock_conn)
|
|
result = get_stale_imeis(stale_minutes=30)
|
|
|
|
assert result == ["imei_a", "imei_b"]
|
|
sql = str(mock_cur.execute.call_args)
|
|
assert "enabled_flag = 1" in sql
|
|
assert "INTERVAL" in sql.upper() or "interval" in sql
|
|
assert "NULLS FIRST" in sql
|
|
|
|
def test_returns_empty_when_no_stale(self):
|
|
from ts_shared_rev import get_stale_imeis
|
|
mock_conn, _ = _make_mock_conn(rows=[])
|
|
with patch("ts_shared_rev.get_conn") as mock_get_conn:
|
|
mock_get_conn.return_value = _conn_ctx(mock_conn)
|
|
assert get_stale_imeis() == []
|
|
|
|
|
|
class TestPollStaleLocations:
|
|
def test_noop_when_empty(self):
|
|
"""If no IMEIs are stale, get_device_locations must NOT be called."""
|
|
import ingest_movement_rev as mod
|
|
with patch.object(mod, "get_stale_imeis", return_value=[]), \
|
|
patch.object(mod, "get_device_locations") as mock_refresh:
|
|
mod.poll_stale_locations()
|
|
mock_refresh.assert_not_called()
|
|
|
|
def test_invokes_refresh_with_stale_list(self):
|
|
"""When stale IMEIs are present, get_device_locations is called once
|
|
with the exact list returned by get_stale_imeis."""
|
|
import ingest_movement_rev as mod
|
|
stale = ["imei_a", "imei_b", "imei_c"]
|
|
with patch.object(mod, "get_stale_imeis", return_value=stale), \
|
|
patch.object(mod, "get_device_locations") as mock_refresh:
|
|
mod.poll_stale_locations()
|
|
mock_refresh.assert_called_once_with(stale)
|
|
|
|
|
|
class TestUpsertLivePositionGuards:
|
|
"""Spot-checks on the time-guard contract — covers what the dashboard
|
|
relies on (no rewinding the marker on stale alarm arrivals)."""
|
|
|
|
def test_skips_invalid_fix(self):
|
|
from ts_shared_rev import upsert_live_position
|
|
mock_cur = MagicMock()
|
|
# Zero-Island
|
|
assert upsert_live_position(mock_cur, "imei_x", 0, 0, "2026-05-21 10:00:00") == 0
|
|
mock_cur.execute.assert_not_called()
|
|
|
|
def test_skips_missing_gps_time(self):
|
|
from ts_shared_rev import upsert_live_position
|
|
mock_cur = MagicMock()
|
|
assert upsert_live_position(mock_cur, "imei_x", -1.29, 36.82, None) == 0
|
|
mock_cur.execute.assert_not_called()
|
|
|
|
def test_skips_missing_imei(self):
|
|
from ts_shared_rev import upsert_live_position
|
|
mock_cur = MagicMock()
|
|
assert upsert_live_position(mock_cur, None, -1.29, 36.82, "2026-05-21 10:00:00") == 0
|
|
mock_cur.execute.assert_not_called()
|
|
|
|
def test_executes_upsert_for_valid_fix(self):
|
|
from ts_shared_rev import upsert_live_position
|
|
mock_cur = MagicMock()
|
|
mock_cur.rowcount = 1
|
|
n = upsert_live_position(mock_cur, "imei_x", -1.29, 36.82, "2026-05-21 10:00:00",
|
|
speed=42.5)
|
|
assert n == 1
|
|
mock_cur.execute.assert_called_once()
|
|
sql = str(mock_cur.execute.call_args)
|
|
# The time-guard is the load-bearing detail — verify it's present.
|
|
assert "EXCLUDED.gps_time > tracksolid.live_positions.gps_time" in sql
|