tracksolid_timescale_grafan.../tests/integration/test_webhook_endpoints.py

180 lines
7.5 KiB
Python
Raw Permalink Normal View History

"""Integration tests for FastAPI webhook endpoints."""
import sys
import os
import json
import pytest
from unittest.mock import MagicMock, patch, call
from contextlib import contextmanager
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
os.environ.setdefault("TRACKSOLID_APP_KEY", "test_key")
os.environ.setdefault("TRACKSOLID_APP_SECRET", "test_secret")
os.environ.setdefault("TRACKSOLID_USER_ID", "test_user")
os.environ.setdefault("TRACKSOLID_PWD_MD5", "test_md5")
os.environ.setdefault("DATABASE_URL", "postgresql://test:test@localhost:5432/test")
os.environ.setdefault("JIMI_WEBHOOK_TOKEN", "")
from fastapi.testclient import TestClient
import webhook_receiver_rev
from tests.fixtures.api_responses import (
WEBHOOK_ALARM_PAYLOAD,
WEBHOOK_ALARM_NULL_TYPE,
FIX-M21: alarm cross-feed + stale-IMEI recovery for live_positions Cherry-pick of c8f5907 (originally FIX-M20 on main) onto quality-program-2026-04-12 — renamed to FIX-M21 here to avoid clashing with this branch's existing [FIX-M20] (trip enrichment, commit 144dede). Behaviour and code are unchanged from the main-branch original; the annotation tag is the only difference. Background ---------- A field audit of liveposition.rahamafresh.com on 2026-05-21 surfaced two freshness gaps that share a single root cause: tracksolid.live_positions was being written by only one path (the 60s polled sweep), and that path silently omits devices that don't have a "current" fix in Jimi's location.list response. Effect on the dashboard: * 18 vehicles show OFFLINE for days-to-months — last fix is whatever the sweep wrote before Jimi dropped them. * 3 vehicles (KDK 780K, KCQ 618K, KCZ 476E) depend on dashcam fallback because their dedicated tracker has been silent; the camera's lat/lng arrives via /pushalarm webhooks (5,287/day, 100% lat/lng fill) but we discard it after writing to tracksolid.alarms. Verified upstream subscription state: only /pushalarm is registered with Jimi; the n8n forwarders for /pushgps, /pushtripreport, /pushobd are inactive. This change uses only data that already arrives. What's in this commit --------------------- ts_shared_rev.py * upsert_live_position(cur, imei, lat, lng, gps_time, ..., extras=None) — single time-guarded upsert all three writers will share. Guards on is_valid_fix() (filters Zero-Island and out-of-range) and EXCLUDED.gps_time > stored.gps_time so late-arriving alarms or webhook retries can't rewind a fresher marker. COALESCE on optional columns so sparse callers don't blank dense ones' values. * get_stale_imeis(stale_minutes=30) — SELECT enabled_flag=1 devices whose live_positions.gps_time is NULL or older than the threshold, ordered NULLS FIRST so worst-offenders are in batch #1. * ensure_device(cur, imei, device_name=None) — relocated from webhook_receiver_rev so every live_positions writer can satisfy the FK without re-defining the helper. The original underscore-prefixed name in webhook_receiver_rev becomes a backwards-compat alias. webhook_receiver_rev.py * /pushalarm — after the alarm row insert, call upsert_live_position with the alarm's lat/lng and alarmTime. Sits inside the existing per-item SAVEPOINT, so a cross-feed failure rolls back only that one alarm's cross-feed, not the alarm row. ingest_movement_rev.py * poll_live_positions — inline INSERT replaced with upsert_live_position (extras dict carries the sweep-only columns). Same data, time-guarded. * get_device_locations — inline INSERT replaced; also gains an ensure_device call so it can be safely fed arbitrary IMEIs. * poll_stale_locations() — new wrapper. Pulls get_stale_imeis() and hands it to get_device_locations. Scheduled every 10 minutes plus a startup catch-up call. Uses jimi.device.location.get which returns *last-known* fix, so devices the 60s sweep drops can be re-warmed. Expected post-deploy effect (estimates, see 06_live_location/260521_timescale_location_upgrade_major.md §4) * ~1,100-1,600 additional live_positions upserts/day from the alarm cross-feed, after the time-guard rejects ~70-80% of races vs the fresher 60s sweep. * The 3 camera-fallback plates flip to "seconds-after-alarm" cadence (JC400P emits ~107 alarms/day per device). * 8-14 of the 24 OFFLINE plates expected to recover via location.get's last-known-fix path within the first 30 minutes. * Dashboard's "Offline 24h+" KPI: 24 → 10-14 within the first hour. * No 06_live_location code changes required — reads through reporting.v_live_positions transparently. Tests ----- 12 webhook integration tests pass (3 new: cross-feed fires on valid fix; skips without lat/lng; skips Zero-Island). 8 new unit tests in test_stale_imeis.py cover the stale selector, the poll wrapper, and the time-guard contract on upsert_live_position. Full suite: 77 passed. Deployment ---------- No schema migration. Both webhook_receiver and ingest_movement containers must be rebuilt — source is image-baked, not bind-mounted. Rollback is git revert + rebuild. Plan & monitoring SQL: 06_live_location/260521_timescale_location_upgrade_major.md Verification playbook: 06_live_location/260521_timescale_location_upgrade_verification.md Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-21 18:05:26 +00:00
WEBHOOK_ALARM_NO_POSITION,
WEBHOOK_ALARM_ZERO_ISLAND,
WEBHOOK_TRIP_BCD_PAYLOAD,
WEBHOOK_TRIP_ISO_PAYLOAD,
WEBHOOK_OBD_PAYLOAD,
)
def make_mock_conn():
"""Create a mock DB connection with cursor support."""
mock_cur = MagicMock()
mock_conn = MagicMock()
mock_conn.cursor.return_value.__enter__ = lambda s: mock_cur
mock_conn.cursor.return_value.__exit__ = MagicMock(return_value=False)
return mock_conn, mock_cur
@contextmanager
def mock_get_conn_ctx(mock_conn):
yield mock_conn
@pytest.fixture
def client():
return TestClient(webhook_receiver_rev.app, raise_server_exceptions=True)
@pytest.fixture
def mock_db():
mock_conn, mock_cur = make_mock_conn()
with patch("webhook_receiver_rev.get_conn") as mock_get_conn:
mock_get_conn.return_value = mock_get_conn_ctx(mock_conn)
yield mock_conn, mock_cur
class TestHealth:
def test_health_returns_ok(self, client):
response = client.get("/health")
assert response.status_code == 200
assert response.json() == {"status": "ok"}
class TestPushAlarm:
def test_valid_alarm_accepted(self, client, mock_db):
mock_conn, mock_cur = mock_db
data_list = json.dumps([WEBHOOK_ALARM_PAYLOAD])
response = client.post("/pushalarm", data={"token": "", "data_list": data_list})
assert response.status_code == 200
assert response.json()["code"] == 0
def test_null_alarm_type_skipped(self, client, mock_db):
"""BUG-02 guard: NULL alarm_type must be rejected, not inserted."""
mock_conn, mock_cur = mock_db
data_list = json.dumps([WEBHOOK_ALARM_NULL_TYPE])
response = client.post("/pushalarm", data={"token": "", "data_list": data_list})
assert response.status_code == 200
perf+fix: SAVEPOINT-per-item pollers, batched GPS inserts, parallel detail fetch Audit fixes across the ingestion stack: Observability - Move log_ingestion out of batch loops in poll_alarms and poll_parking (was emitting N cumulative log rows per run instead of one). - Add missing log_ingestion + t0 to poll_trips. - Count inserted via cur.rowcount instead of naive +=1 so ON CONFLICT DO NOTHING no longer inflates the metric. Resilience - SAVEPOINT-per-item added to poll_alarms, poll_live_positions, poll_trips, poll_parking so one bad row no longer aborts the batch (webhook handlers already had this; pollers were inconsistent). Performance - /pushgps and poll_track_list now use psycopg2.extras.execute_values with ON CONFLICT DO NOTHING — 10-50x write throughput on larger batches. - sync_devices and sync_driver_audit fetch jimi.track.device.detail concurrently via ThreadPoolExecutor(max_workers=8), cutting the daily registry sync from ~24s to ~3s for an 80-device fleet. - poll_track_list split into two phases: parallel API fetch (4 workers, no DB connection held) then one batched write. Previously the DB connection was held across every per-IMEI HTTP call, risking pool starvation. Security - _validate_token uses hmac.compare_digest for constant-time token comparison (closes timing side-channel). - _parse_data_list caps incoming items at WEBHOOK_MAX_ITEMS (default 5000) so a pathological push cannot blow memory. Tests - Fix test_null_alarm_type_skipped: its INSERT-count assertion was catching the ingestion_log insert written by log_ingestion. Filter that out so the test checks only data-table inserts. - Full suite: 66 passed. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 21:33:55 +00:00
# Verify no data INSERT was executed. log_ingestion always writes one
# row to tracksolid.ingestion_log — exclude it from the assertion.
data_inserts = [
c for c in mock_cur.execute.call_args_list
if "INSERT" in str(c) and "ingestion_log" not in str(c)
]
assert len(data_inserts) == 0, "NULL alarm_type must not be inserted"
def test_empty_data_list_ok(self, client):
response = client.post("/pushalarm", data={"token": "", "data_list": ""})
assert response.status_code == 200
def test_batch_with_bad_item_processes_rest(self, client, mock_db):
"""BUG-04: One bad item must not abort the entire batch."""
mock_conn, mock_cur = mock_db
# One valid, one missing alarm_type (will be skipped, not crash)
items = [WEBHOOK_ALARM_PAYLOAD, WEBHOOK_ALARM_NULL_TYPE]
data_list = json.dumps(items)
response = client.post("/pushalarm", data={"token": "", "data_list": data_list})
assert response.status_code == 200
assert response.json()["code"] == 0
FIX-M21: alarm cross-feed + stale-IMEI recovery for live_positions Cherry-pick of c8f5907 (originally FIX-M20 on main) onto quality-program-2026-04-12 — renamed to FIX-M21 here to avoid clashing with this branch's existing [FIX-M20] (trip enrichment, commit 144dede). Behaviour and code are unchanged from the main-branch original; the annotation tag is the only difference. Background ---------- A field audit of liveposition.rahamafresh.com on 2026-05-21 surfaced two freshness gaps that share a single root cause: tracksolid.live_positions was being written by only one path (the 60s polled sweep), and that path silently omits devices that don't have a "current" fix in Jimi's location.list response. Effect on the dashboard: * 18 vehicles show OFFLINE for days-to-months — last fix is whatever the sweep wrote before Jimi dropped them. * 3 vehicles (KDK 780K, KCQ 618K, KCZ 476E) depend on dashcam fallback because their dedicated tracker has been silent; the camera's lat/lng arrives via /pushalarm webhooks (5,287/day, 100% lat/lng fill) but we discard it after writing to tracksolid.alarms. Verified upstream subscription state: only /pushalarm is registered with Jimi; the n8n forwarders for /pushgps, /pushtripreport, /pushobd are inactive. This change uses only data that already arrives. What's in this commit --------------------- ts_shared_rev.py * upsert_live_position(cur, imei, lat, lng, gps_time, ..., extras=None) — single time-guarded upsert all three writers will share. Guards on is_valid_fix() (filters Zero-Island and out-of-range) and EXCLUDED.gps_time > stored.gps_time so late-arriving alarms or webhook retries can't rewind a fresher marker. COALESCE on optional columns so sparse callers don't blank dense ones' values. * get_stale_imeis(stale_minutes=30) — SELECT enabled_flag=1 devices whose live_positions.gps_time is NULL or older than the threshold, ordered NULLS FIRST so worst-offenders are in batch #1. * ensure_device(cur, imei, device_name=None) — relocated from webhook_receiver_rev so every live_positions writer can satisfy the FK without re-defining the helper. The original underscore-prefixed name in webhook_receiver_rev becomes a backwards-compat alias. webhook_receiver_rev.py * /pushalarm — after the alarm row insert, call upsert_live_position with the alarm's lat/lng and alarmTime. Sits inside the existing per-item SAVEPOINT, so a cross-feed failure rolls back only that one alarm's cross-feed, not the alarm row. ingest_movement_rev.py * poll_live_positions — inline INSERT replaced with upsert_live_position (extras dict carries the sweep-only columns). Same data, time-guarded. * get_device_locations — inline INSERT replaced; also gains an ensure_device call so it can be safely fed arbitrary IMEIs. * poll_stale_locations() — new wrapper. Pulls get_stale_imeis() and hands it to get_device_locations. Scheduled every 10 minutes plus a startup catch-up call. Uses jimi.device.location.get which returns *last-known* fix, so devices the 60s sweep drops can be re-warmed. Expected post-deploy effect (estimates, see 06_live_location/260521_timescale_location_upgrade_major.md §4) * ~1,100-1,600 additional live_positions upserts/day from the alarm cross-feed, after the time-guard rejects ~70-80% of races vs the fresher 60s sweep. * The 3 camera-fallback plates flip to "seconds-after-alarm" cadence (JC400P emits ~107 alarms/day per device). * 8-14 of the 24 OFFLINE plates expected to recover via location.get's last-known-fix path within the first 30 minutes. * Dashboard's "Offline 24h+" KPI: 24 → 10-14 within the first hour. * No 06_live_location code changes required — reads through reporting.v_live_positions transparently. Tests ----- 12 webhook integration tests pass (3 new: cross-feed fires on valid fix; skips without lat/lng; skips Zero-Island). 8 new unit tests in test_stale_imeis.py cover the stale selector, the poll wrapper, and the time-guard contract on upsert_live_position. Full suite: 77 passed. Deployment ---------- No schema migration. Both webhook_receiver and ingest_movement containers must be rebuilt — source is image-baked, not bind-mounted. Rollback is git revert + rebuild. Plan & monitoring SQL: 06_live_location/260521_timescale_location_upgrade_major.md Verification playbook: 06_live_location/260521_timescale_location_upgrade_verification.md Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-21 18:05:26 +00:00
def test_alarm_cross_feeds_live_position(self, client, mock_db):
"""FIX-M21: a valid alarm must additionally upsert live_positions."""
mock_conn, mock_cur = mock_db
data_list = json.dumps([WEBHOOK_ALARM_PAYLOAD])
response = client.post("/pushalarm", data={"token": "", "data_list": data_list})
assert response.status_code == 200
# Exactly one INSERT INTO live_positions should have fired.
lp_inserts = [
c for c in mock_cur.execute.call_args_list
if "tracksolid.live_positions" in str(c) and "INSERT" in str(c)
]
assert len(lp_inserts) == 1, "Cross-feed must upsert live_positions exactly once"
def test_alarm_without_lat_lng_skips_cross_feed(self, client, mock_db):
"""FIX-M21: an alarm without lat/lng must NOT touch live_positions
(but must still insert the alarm row)."""
mock_conn, mock_cur = mock_db
data_list = json.dumps([WEBHOOK_ALARM_NO_POSITION])
response = client.post("/pushalarm", data={"token": "", "data_list": data_list})
assert response.status_code == 200
lp_inserts = [
c for c in mock_cur.execute.call_args_list
if "tracksolid.live_positions" in str(c) and "INSERT" in str(c)
]
assert len(lp_inserts) == 0, "No live_positions write without a valid fix"
alarm_inserts = [
c for c in mock_cur.execute.call_args_list
if "tracksolid.alarms" in str(c) and "INSERT" in str(c)
]
assert len(alarm_inserts) == 1, "Alarm row must still write"
def test_alarm_with_zero_island_skips_cross_feed(self, client, mock_db):
"""FIX-M21: a (0, 0) fix must NOT propagate to live_positions —
is_valid_fix in the shared helper guards Zero Island."""
mock_conn, mock_cur = mock_db
data_list = json.dumps([WEBHOOK_ALARM_ZERO_ISLAND])
response = client.post("/pushalarm", data={"token": "", "data_list": data_list})
assert response.status_code == 200
lp_inserts = [
c for c in mock_cur.execute.call_args_list
if "tracksolid.live_positions" in str(c) and "INSERT" in str(c)
]
assert len(lp_inserts) == 0, "Zero-Island fix must not reach live_positions"
class TestPushTripReport:
def test_bcd_timestamp_parsed(self, client, mock_db):
"""BUG-03: BCD timestamp 220415103000 must be parsed correctly."""
mock_conn, mock_cur = mock_db
data_list = json.dumps([WEBHOOK_TRIP_BCD_PAYLOAD])
response = client.post("/pushtripreport", data={"token": "", "data_list": data_list})
assert response.status_code == 200
assert response.json()["code"] == 0
# Verify an INSERT was attempted
insert_calls = [c for c in mock_cur.execute.call_args_list
if "INSERT" in str(c)]
assert len(insert_calls) > 0, "Trip with BCD timestamp must trigger INSERT"
def test_iso_timestamp_accepted(self, client, mock_db):
mock_conn, mock_cur = mock_db
data_list = json.dumps([WEBHOOK_TRIP_ISO_PAYLOAD])
response = client.post("/pushtripreport", data={"token": "", "data_list": data_list})
assert response.status_code == 200
def test_missing_imei_skipped(self, client, mock_db):
mock_conn, mock_cur = mock_db
bad_trip = {"beginTime": "2024-04-12 07:00:00", "miles": 10.0}
data_list = json.dumps([bad_trip])
response = client.post("/pushtripreport", data={"token": "", "data_list": data_list})
assert response.status_code == 200
class TestPushObd:
def test_valid_obd_accepted(self, client, mock_db):
mock_conn, mock_cur = mock_db
data_list = json.dumps([WEBHOOK_OBD_PAYLOAD])
response = client.post("/pushobd", data={"token": "", "data_list": data_list})
assert response.status_code == 200
assert response.json()["code"] == 0