tracksolid_timescale_grafan.../tests/integration/test_webhook_endpoints.py

134 lines
5.2 KiB
Python
Raw Normal View History

"""Integration tests for FastAPI webhook endpoints."""
import sys
import os
import json
import pytest
from unittest.mock import MagicMock, patch, call
from contextlib import contextmanager
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
os.environ.setdefault("TRACKSOLID_APP_KEY", "test_key")
os.environ.setdefault("TRACKSOLID_APP_SECRET", "test_secret")
os.environ.setdefault("TRACKSOLID_USER_ID", "test_user")
os.environ.setdefault("TRACKSOLID_PWD_MD5", "test_md5")
os.environ.setdefault("DATABASE_URL", "postgresql://test:test@localhost:5432/test")
os.environ.setdefault("JIMI_WEBHOOK_TOKEN", "")
from fastapi.testclient import TestClient
import webhook_receiver_rev
from tests.fixtures.api_responses import (
WEBHOOK_ALARM_PAYLOAD,
WEBHOOK_ALARM_NULL_TYPE,
WEBHOOK_TRIP_BCD_PAYLOAD,
WEBHOOK_TRIP_ISO_PAYLOAD,
WEBHOOK_OBD_PAYLOAD,
)
def make_mock_conn():
"""Create a mock DB connection with cursor support."""
mock_cur = MagicMock()
mock_conn = MagicMock()
mock_conn.cursor.return_value.__enter__ = lambda s: mock_cur
mock_conn.cursor.return_value.__exit__ = MagicMock(return_value=False)
return mock_conn, mock_cur
@contextmanager
def mock_get_conn_ctx(mock_conn):
yield mock_conn
@pytest.fixture
def client():
return TestClient(webhook_receiver_rev.app, raise_server_exceptions=True)
@pytest.fixture
def mock_db():
mock_conn, mock_cur = make_mock_conn()
with patch("webhook_receiver_rev.get_conn") as mock_get_conn:
mock_get_conn.return_value = mock_get_conn_ctx(mock_conn)
yield mock_conn, mock_cur
class TestHealth:
def test_health_returns_ok(self, client):
response = client.get("/health")
assert response.status_code == 200
assert response.json() == {"status": "ok"}
class TestPushAlarm:
def test_valid_alarm_accepted(self, client, mock_db):
mock_conn, mock_cur = mock_db
data_list = json.dumps([WEBHOOK_ALARM_PAYLOAD])
response = client.post("/pushalarm", data={"token": "", "data_list": data_list})
assert response.status_code == 200
assert response.json()["code"] == 0
def test_null_alarm_type_skipped(self, client, mock_db):
"""BUG-02 guard: NULL alarm_type must be rejected, not inserted."""
mock_conn, mock_cur = mock_db
data_list = json.dumps([WEBHOOK_ALARM_NULL_TYPE])
response = client.post("/pushalarm", data={"token": "", "data_list": data_list})
assert response.status_code == 200
perf+fix: SAVEPOINT-per-item pollers, batched GPS inserts, parallel detail fetch Audit fixes across the ingestion stack: Observability - Move log_ingestion out of batch loops in poll_alarms and poll_parking (was emitting N cumulative log rows per run instead of one). - Add missing log_ingestion + t0 to poll_trips. - Count inserted via cur.rowcount instead of naive +=1 so ON CONFLICT DO NOTHING no longer inflates the metric. Resilience - SAVEPOINT-per-item added to poll_alarms, poll_live_positions, poll_trips, poll_parking so one bad row no longer aborts the batch (webhook handlers already had this; pollers were inconsistent). Performance - /pushgps and poll_track_list now use psycopg2.extras.execute_values with ON CONFLICT DO NOTHING — 10-50x write throughput on larger batches. - sync_devices and sync_driver_audit fetch jimi.track.device.detail concurrently via ThreadPoolExecutor(max_workers=8), cutting the daily registry sync from ~24s to ~3s for an 80-device fleet. - poll_track_list split into two phases: parallel API fetch (4 workers, no DB connection held) then one batched write. Previously the DB connection was held across every per-IMEI HTTP call, risking pool starvation. Security - _validate_token uses hmac.compare_digest for constant-time token comparison (closes timing side-channel). - _parse_data_list caps incoming items at WEBHOOK_MAX_ITEMS (default 5000) so a pathological push cannot blow memory. Tests - Fix test_null_alarm_type_skipped: its INSERT-count assertion was catching the ingestion_log insert written by log_ingestion. Filter that out so the test checks only data-table inserts. - Full suite: 66 passed. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 21:33:55 +00:00
# Verify no data INSERT was executed. log_ingestion always writes one
# row to tracksolid.ingestion_log — exclude it from the assertion.
data_inserts = [
c for c in mock_cur.execute.call_args_list
if "INSERT" in str(c) and "ingestion_log" not in str(c)
]
assert len(data_inserts) == 0, "NULL alarm_type must not be inserted"
def test_empty_data_list_ok(self, client):
response = client.post("/pushalarm", data={"token": "", "data_list": ""})
assert response.status_code == 200
def test_batch_with_bad_item_processes_rest(self, client, mock_db):
"""BUG-04: One bad item must not abort the entire batch."""
mock_conn, mock_cur = mock_db
# One valid, one missing alarm_type (will be skipped, not crash)
items = [WEBHOOK_ALARM_PAYLOAD, WEBHOOK_ALARM_NULL_TYPE]
data_list = json.dumps(items)
response = client.post("/pushalarm", data={"token": "", "data_list": data_list})
assert response.status_code == 200
assert response.json()["code"] == 0
class TestPushTripReport:
def test_bcd_timestamp_parsed(self, client, mock_db):
"""BUG-03: BCD timestamp 220415103000 must be parsed correctly."""
mock_conn, mock_cur = mock_db
data_list = json.dumps([WEBHOOK_TRIP_BCD_PAYLOAD])
response = client.post("/pushtripreport", data={"token": "", "data_list": data_list})
assert response.status_code == 200
assert response.json()["code"] == 0
# Verify an INSERT was attempted
insert_calls = [c for c in mock_cur.execute.call_args_list
if "INSERT" in str(c)]
assert len(insert_calls) > 0, "Trip with BCD timestamp must trigger INSERT"
def test_iso_timestamp_accepted(self, client, mock_db):
mock_conn, mock_cur = mock_db
data_list = json.dumps([WEBHOOK_TRIP_ISO_PAYLOAD])
response = client.post("/pushtripreport", data={"token": "", "data_list": data_list})
assert response.status_code == 200
def test_missing_imei_skipped(self, client, mock_db):
mock_conn, mock_cur = mock_db
bad_trip = {"beginTime": "2024-04-12 07:00:00", "miles": 10.0}
data_list = json.dumps([bad_trip])
response = client.post("/pushtripreport", data={"token": "", "data_list": data_list})
assert response.status_code == 200
class TestPushObd:
def test_valid_obd_accepted(self, client, mock_db):
mock_conn, mock_cur = mock_db
data_list = json.dumps([WEBHOOK_OBD_PAYLOAD])
response = client.post("/pushobd", data={"token": "", "data_list": data_list})
assert response.status_code == 200
assert response.json()["code"] == 0