tracksolid_timescale_grafan.../tests/unit/test_stale_imeis.py
david kiania c8f5907d4f
Some checks failed
Static Analysis / static (push) Has been cancelled
Tests / test (push) Has been cancelled
Static Analysis / static (pull_request) Has been cancelled
Tests / test (pull_request) Has been cancelled
FIX-M20: alarm cross-feed + stale-IMEI recovery for live_positions
Background
----------
A field audit of liveposition.rahamafresh.com on 2026-05-21 surfaced two
freshness gaps that share a single root cause: tracksolid.live_positions
was being written by only one path (the 60s polled sweep), and that path
silently omits devices that don't have a "current" fix in Jimi's
location.list response. Effect on the dashboard:

  * 18 vehicles show OFFLINE for days-to-months — last fix is whatever
    the sweep wrote before Jimi dropped them.
  * 3 vehicles (KDK 780K, KCQ 618K, KCZ 476E) depend on dashcam fallback
    because their dedicated tracker has been silent; the camera's lat/lng
    arrives via /pushalarm webhooks (5,287/day, 100% lat/lng fill) but
    we discard it after writing to tracksolid.alarms.

Verified upstream subscription state: only /pushalarm is registered with
Jimi; the n8n forwarders for /pushgps, /pushtripreport, /pushobd are
inactive. This change uses only data that already arrives.

What's in this PR
-----------------
ts_shared_rev.py
  * upsert_live_position(cur, imei, lat, lng, gps_time, ..., extras=None)
    — single time-guarded upsert all three writers will share. Guards on
    is_valid_fix() (filters Zero-Island and out-of-range) and
    EXCLUDED.gps_time > stored.gps_time so late-arriving alarms or
    webhook retries can't rewind a fresher marker. COALESCE on optional
    columns so sparse callers don't blank dense ones' values.
  * get_stale_imeis(stale_minutes=30) — SELECT enabled_flag=1 devices
    whose live_positions.gps_time is NULL or older than the threshold,
    ordered NULLS FIRST so worst-offenders are in batch #1.
  * ensure_device(cur, imei, device_name=None) — relocated from
    webhook_receiver_rev so every live_positions writer can satisfy the
    FK without re-defining the helper. The original underscore-prefixed
    name in webhook_receiver_rev becomes a backwards-compat alias.

webhook_receiver_rev.py
  * /pushalarm — after the alarm row insert, call upsert_live_position
    with the alarm's lat/lng and alarmTime. Sits inside the existing
    per-item SAVEPOINT, so a cross-feed failure rolls back only that
    one alarm's cross-feed, not the alarm row.

ingest_movement_rev.py
  * poll_live_positions — inline INSERT replaced with upsert_live_position
    (extras dict carries the sweep-only columns). Same data, time-guarded.
  * get_device_locations — inline INSERT replaced; also gains an
    ensure_device call so it can be safely fed arbitrary IMEIs.
  * poll_stale_locations() — new wrapper. Pulls get_stale_imeis() and
    hands it to get_device_locations. Scheduled every 10 minutes plus a
    startup catch-up call. Uses jimi.device.location.get which returns
    *last-known* fix, so devices the 60s sweep drops can be re-warmed.

Expected post-deploy effect (estimates, see
260521_timescale_location_upgrade_major.md §4)
  * ~1,100-1,600 additional live_positions upserts/day from the alarm
    cross-feed, after the time-guard rejects ~70-80% of races vs the
    fresher 60s sweep.
  * The 3 camera-fallback plates flip to "seconds-after-alarm" cadence
    (JC400P emits ~107 alarms/day per device).
  * 8-14 of the 24 OFFLINE plates expected to recover via location.get's
    last-known-fix path within the first 30 minutes.
  * Dashboard's "Offline 24h+" KPI: 24 → 10-14 within the first hour.
  * No 06_live_location code changes required — reads through
    reporting.v_live_positions transparently.

Tests
-----
12 webhook integration tests pass (3 new: cross-feed fires on valid fix;
skips without lat/lng; skips Zero-Island). 8 new unit tests in
test_stale_imeis.py cover the stale selector, the poll wrapper, and the
time-guard contract on upsert_live_position. Full suite: 77 passed.

Deployment
----------
No schema migration. Both webhook_receiver and ingest_movement
containers must be rebuilt — source is image-baked, not bind-mounted.
Rollback is git revert + rebuild.

Plan & monitoring SQL: 06_live_location/260521_timescale_location_upgrade_major.md

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-21 21:05:26 +03:00

112 lines
4.5 KiB
Python

"""Unit tests for the FIX-M20 stale-IMEI recovery helpers.
Covers:
- ts_shared_rev.get_stale_imeis — the SQL selector
- ingest_movement_rev.poll_stale_locations — the scheduler wrapper
"""
import sys
import os
import pytest
from unittest.mock import MagicMock, patch
from contextlib import contextmanager
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
os.environ.setdefault("TRACKSOLID_APP_KEY", "test_key")
os.environ.setdefault("TRACKSOLID_APP_SECRET", "test_secret")
os.environ.setdefault("TRACKSOLID_USER_ID", "test_user")
os.environ.setdefault("TRACKSOLID_PWD_MD5", "test_md5")
os.environ.setdefault("DATABASE_URL", "postgresql://test:test@localhost:5432/test")
def _make_mock_conn(rows=None):
"""Cursor/connection double that returns `rows` from fetchall()."""
mock_cur = MagicMock()
mock_cur.fetchall.return_value = rows or []
mock_conn = MagicMock()
mock_conn.cursor.return_value.__enter__ = lambda s: mock_cur
mock_conn.cursor.return_value.__exit__ = MagicMock(return_value=False)
return mock_conn, mock_cur
@contextmanager
def _conn_ctx(mock_conn):
yield mock_conn
class TestGetStaleImeis:
def test_returns_devices_with_old_gps_time(self):
from ts_shared_rev import get_stale_imeis
mock_conn, mock_cur = _make_mock_conn(rows=[("imei_a",), ("imei_b",)])
with patch("ts_shared_rev.get_conn") as mock_get_conn:
mock_get_conn.return_value = _conn_ctx(mock_conn)
result = get_stale_imeis(stale_minutes=30)
assert result == ["imei_a", "imei_b"]
sql = str(mock_cur.execute.call_args)
assert "enabled_flag = 1" in sql
assert "INTERVAL" in sql.upper() or "interval" in sql
assert "NULLS FIRST" in sql
def test_returns_empty_when_no_stale(self):
from ts_shared_rev import get_stale_imeis
mock_conn, _ = _make_mock_conn(rows=[])
with patch("ts_shared_rev.get_conn") as mock_get_conn:
mock_get_conn.return_value = _conn_ctx(mock_conn)
assert get_stale_imeis() == []
class TestPollStaleLocations:
def test_noop_when_empty(self):
"""If no IMEIs are stale, get_device_locations must NOT be called."""
import ingest_movement_rev as mod
with patch.object(mod, "get_stale_imeis", return_value=[]), \
patch.object(mod, "get_device_locations") as mock_refresh:
mod.poll_stale_locations()
mock_refresh.assert_not_called()
def test_invokes_refresh_with_stale_list(self):
"""When stale IMEIs are present, get_device_locations is called once
with the exact list returned by get_stale_imeis."""
import ingest_movement_rev as mod
stale = ["imei_a", "imei_b", "imei_c"]
with patch.object(mod, "get_stale_imeis", return_value=stale), \
patch.object(mod, "get_device_locations") as mock_refresh:
mod.poll_stale_locations()
mock_refresh.assert_called_once_with(stale)
class TestUpsertLivePositionGuards:
"""Spot-checks on the time-guard contract — covers what the dashboard
relies on (no rewinding the marker on stale alarm arrivals)."""
def test_skips_invalid_fix(self):
from ts_shared_rev import upsert_live_position
mock_cur = MagicMock()
# Zero-Island
assert upsert_live_position(mock_cur, "imei_x", 0, 0, "2026-05-21 10:00:00") == 0
mock_cur.execute.assert_not_called()
def test_skips_missing_gps_time(self):
from ts_shared_rev import upsert_live_position
mock_cur = MagicMock()
assert upsert_live_position(mock_cur, "imei_x", -1.29, 36.82, None) == 0
mock_cur.execute.assert_not_called()
def test_skips_missing_imei(self):
from ts_shared_rev import upsert_live_position
mock_cur = MagicMock()
assert upsert_live_position(mock_cur, None, -1.29, 36.82, "2026-05-21 10:00:00") == 0
mock_cur.execute.assert_not_called()
def test_executes_upsert_for_valid_fix(self):
from ts_shared_rev import upsert_live_position
mock_cur = MagicMock()
mock_cur.rowcount = 1
n = upsert_live_position(mock_cur, "imei_x", -1.29, 36.82, "2026-05-21 10:00:00",
speed=42.5)
assert n == 1
mock_cur.execute.assert_called_once()
sql = str(mock_cur.execute.call_args)
# The time-guard is the load-bearing detail — verify it's present.
assert "EXCLUDED.gps_time > tracksolid.live_positions.gps_time" in sql