tracksolid_timescale_grafan.../webhook_receiver_rev.py

637 lines
27 KiB
Python
Raw Permalink Normal View History

"""
webhook_receiver_rev.py Fireside Communications · Tracksolid Webhook Receiver
RESPONSIBILITY: Receives real-time push data from Jimi Tracksolid Pro servers.
Jimi's Data Push API POSTs telemetry to these endpoints as it arrives from
devices, providing real-time ingestion without polling. This is the ONLY way
to receive OBD diagnostics and DTC fault codes those data types have no
polling endpoint.
ENDPOINTS:
/pushobd OBD CAN bus diagnostics (Priority 1)
/pushfaultinfo DTC fault codes (Priority 1)
/pushalarm Alarm events (Priority 2)
/pushgps GPS positions (Priority 2)
/pushhb Device heartbeats (Priority 2)
/pushtripreport Trip reports (Priority 2)
/pushevent Device LOGIN/LOGOUT events (Priority 3)
/health Healthcheck for Docker/monitoring
REVISIONS (QA-Verified):
[BUG-01] OBD event_time: try unix_to_ts before clean_ts (handles epoch timestamps).
[BUG-02] push_alarm: guard also checks alarm_type is not null (prevents FK violation).
[BUG-03] push_trip_report: _parse_trip_ts handles Jimi BCD format YYMMDDHHmmss.
[BUG-04] SAVEPOINT per item in all DB-writing endpoints (one bad item won't abort batch).
[BUG-05] Added /pushevent endpoint writes to tracksolid.device_events.
"""
from __future__ import annotations
perf+fix: SAVEPOINT-per-item pollers, batched GPS inserts, parallel detail fetch Audit fixes across the ingestion stack: Observability - Move log_ingestion out of batch loops in poll_alarms and poll_parking (was emitting N cumulative log rows per run instead of one). - Add missing log_ingestion + t0 to poll_trips. - Count inserted via cur.rowcount instead of naive +=1 so ON CONFLICT DO NOTHING no longer inflates the metric. Resilience - SAVEPOINT-per-item added to poll_alarms, poll_live_positions, poll_trips, poll_parking so one bad row no longer aborts the batch (webhook handlers already had this; pollers were inconsistent). Performance - /pushgps and poll_track_list now use psycopg2.extras.execute_values with ON CONFLICT DO NOTHING — 10-50x write throughput on larger batches. - sync_devices and sync_driver_audit fetch jimi.track.device.detail concurrently via ThreadPoolExecutor(max_workers=8), cutting the daily registry sync from ~24s to ~3s for an 80-device fleet. - poll_track_list split into two phases: parallel API fetch (4 workers, no DB connection held) then one batched write. Previously the DB connection was held across every per-IMEI HTTP call, risking pool starvation. Security - _validate_token uses hmac.compare_digest for constant-time token comparison (closes timing side-channel). - _parse_data_list caps incoming items at WEBHOOK_MAX_ITEMS (default 5000) so a pathological push cannot blow memory. Tests - Fix test_null_alarm_type_skipped: its INSERT-count assertion was catching the ingestion_log insert written by log_ingestion. Filter that out so the test checks only data-table inserts. - Full suite: 66 passed. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 21:33:55 +00:00
import hmac
import json
import os
import time
from contextlib import asynccontextmanager
from datetime import datetime, timezone
from typing import Optional
perf+fix: SAVEPOINT-per-item pollers, batched GPS inserts, parallel detail fetch Audit fixes across the ingestion stack: Observability - Move log_ingestion out of batch loops in poll_alarms and poll_parking (was emitting N cumulative log rows per run instead of one). - Add missing log_ingestion + t0 to poll_trips. - Count inserted via cur.rowcount instead of naive +=1 so ON CONFLICT DO NOTHING no longer inflates the metric. Resilience - SAVEPOINT-per-item added to poll_alarms, poll_live_positions, poll_trips, poll_parking so one bad row no longer aborts the batch (webhook handlers already had this; pollers were inconsistent). Performance - /pushgps and poll_track_list now use psycopg2.extras.execute_values with ON CONFLICT DO NOTHING — 10-50x write throughput on larger batches. - sync_devices and sync_driver_audit fetch jimi.track.device.detail concurrently via ThreadPoolExecutor(max_workers=8), cutting the daily registry sync from ~24s to ~3s for an 80-device fleet. - poll_track_list split into two phases: parallel API fetch (4 workers, no DB connection held) then one batched write. Previously the DB connection was held across every per-IMEI HTTP call, risking pool starvation. Security - _validate_token uses hmac.compare_digest for constant-time token comparison (closes timing side-channel). - _parse_data_list caps incoming items at WEBHOOK_MAX_ITEMS (default 5000) so a pathological push cannot blow memory. Tests - Fix test_null_alarm_type_skipped: its INSERT-count assertion was catching the ingestion_log insert written by log_ingestion. Filter that out so the test checks only data-table inserts. - Full suite: 66 passed. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 21:33:55 +00:00
# Cap on items per webhook POST. Prevents a malformed/malicious push from
# monopolising a worker or blowing the DB pool. Jimi normally sends ≤ 200.
MAX_ITEMS_PER_POST = int(os.getenv("WEBHOOK_MAX_ITEMS", "5000"))
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse
perf+fix: SAVEPOINT-per-item pollers, batched GPS inserts, parallel detail fetch Audit fixes across the ingestion stack: Observability - Move log_ingestion out of batch loops in poll_alarms and poll_parking (was emitting N cumulative log rows per run instead of one). - Add missing log_ingestion + t0 to poll_trips. - Count inserted via cur.rowcount instead of naive +=1 so ON CONFLICT DO NOTHING no longer inflates the metric. Resilience - SAVEPOINT-per-item added to poll_alarms, poll_live_positions, poll_trips, poll_parking so one bad row no longer aborts the batch (webhook handlers already had this; pollers were inconsistent). Performance - /pushgps and poll_track_list now use psycopg2.extras.execute_values with ON CONFLICT DO NOTHING — 10-50x write throughput on larger batches. - sync_devices and sync_driver_audit fetch jimi.track.device.detail concurrently via ThreadPoolExecutor(max_workers=8), cutting the daily registry sync from ~24s to ~3s for an 80-device fleet. - poll_track_list split into two phases: parallel API fetch (4 workers, no DB connection held) then one batched write. Previously the DB connection was held across every per-IMEI HTTP call, risking pool starvation. Security - _validate_token uses hmac.compare_digest for constant-time token comparison (closes timing side-channel). - _parse_data_list caps incoming items at WEBHOOK_MAX_ITEMS (default 5000) so a pathological push cannot blow memory. Tests - Fix test_null_alarm_type_skipped: its INSERT-count assertion was catching the ingestion_log insert written by log_ingestion. Filter that out so the test checks only data-table inserts. - Full suite: 66 passed. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 21:33:55 +00:00
from psycopg2.extras import execute_values
from ts_shared_rev import (
close_pool,
get_conn,
log_ingestion,
clean,
clean_num,
clean_int,
clean_ts,
is_valid_fix,
get_logger,
)
log = get_logger("webhook")
# ── Configuration ─────────────────────────────────────────────────────────────
WEBHOOK_TOKEN = os.getenv("JIMI_WEBHOOK_TOKEN", "")
# ── Lifespan ──────────────────────────────────────────────────────────────────
@asynccontextmanager
async def lifespan(app: FastAPI):
log.info("Webhook receiver starting (v1.1)...")
yield
log.info("Webhook receiver shutting down...")
close_pool()
app = FastAPI(title="Tracksolid Webhook Receiver", lifespan=lifespan)
# ── Helpers ───────────────────────────────────────────────────────────────────
SUCCESS = {"code": 0, "msg": "success"}
def _validate_token(token: str) -> None:
"""Raise 403 if token is invalid. Skips validation if JIMI_WEBHOOK_TOKEN is empty."""
perf+fix: SAVEPOINT-per-item pollers, batched GPS inserts, parallel detail fetch Audit fixes across the ingestion stack: Observability - Move log_ingestion out of batch loops in poll_alarms and poll_parking (was emitting N cumulative log rows per run instead of one). - Add missing log_ingestion + t0 to poll_trips. - Count inserted via cur.rowcount instead of naive +=1 so ON CONFLICT DO NOTHING no longer inflates the metric. Resilience - SAVEPOINT-per-item added to poll_alarms, poll_live_positions, poll_trips, poll_parking so one bad row no longer aborts the batch (webhook handlers already had this; pollers were inconsistent). Performance - /pushgps and poll_track_list now use psycopg2.extras.execute_values with ON CONFLICT DO NOTHING — 10-50x write throughput on larger batches. - sync_devices and sync_driver_audit fetch jimi.track.device.detail concurrently via ThreadPoolExecutor(max_workers=8), cutting the daily registry sync from ~24s to ~3s for an 80-device fleet. - poll_track_list split into two phases: parallel API fetch (4 workers, no DB connection held) then one batched write. Previously the DB connection was held across every per-IMEI HTTP call, risking pool starvation. Security - _validate_token uses hmac.compare_digest for constant-time token comparison (closes timing side-channel). - _parse_data_list caps incoming items at WEBHOOK_MAX_ITEMS (default 5000) so a pathological push cannot blow memory. Tests - Fix test_null_alarm_type_skipped: its INSERT-count assertion was catching the ingestion_log insert written by log_ingestion. Filter that out so the test checks only data-table inserts. - Full suite: 66 passed. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 21:33:55 +00:00
if WEBHOOK_TOKEN and not hmac.compare_digest(token, WEBHOOK_TOKEN):
raise HTTPException(status_code=403, detail="Invalid token")
def _parse_data_list(raw: str) -> list[dict]:
"""Parse a JSON string into a list of dicts. raw may be a JSON array or single object."""
try:
parsed = json.loads(raw)
perf+fix: SAVEPOINT-per-item pollers, batched GPS inserts, parallel detail fetch Audit fixes across the ingestion stack: Observability - Move log_ingestion out of batch loops in poll_alarms and poll_parking (was emitting N cumulative log rows per run instead of one). - Add missing log_ingestion + t0 to poll_trips. - Count inserted via cur.rowcount instead of naive +=1 so ON CONFLICT DO NOTHING no longer inflates the metric. Resilience - SAVEPOINT-per-item added to poll_alarms, poll_live_positions, poll_trips, poll_parking so one bad row no longer aborts the batch (webhook handlers already had this; pollers were inconsistent). Performance - /pushgps and poll_track_list now use psycopg2.extras.execute_values with ON CONFLICT DO NOTHING — 10-50x write throughput on larger batches. - sync_devices and sync_driver_audit fetch jimi.track.device.detail concurrently via ThreadPoolExecutor(max_workers=8), cutting the daily registry sync from ~24s to ~3s for an 80-device fleet. - poll_track_list split into two phases: parallel API fetch (4 workers, no DB connection held) then one batched write. Previously the DB connection was held across every per-IMEI HTTP call, risking pool starvation. Security - _validate_token uses hmac.compare_digest for constant-time token comparison (closes timing side-channel). - _parse_data_list caps incoming items at WEBHOOK_MAX_ITEMS (default 5000) so a pathological push cannot blow memory. Tests - Fix test_null_alarm_type_skipped: its INSERT-count assertion was catching the ingestion_log insert written by log_ingestion. Filter that out so the test checks only data-table inserts. - Full suite: 66 passed. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 21:33:55 +00:00
items = parsed if isinstance(parsed, list) else [parsed]
if len(items) > MAX_ITEMS_PER_POST:
log.warning("data_list truncated: %d items exceeded cap of %d",
len(items), MAX_ITEMS_PER_POST)
items = items[:MAX_ITEMS_PER_POST]
return items
except (json.JSONDecodeError, TypeError):
return []
async def _parse_request(request: Request) -> tuple[str, list[dict]]:
"""Extract token + data_list from either a JSON body or form-encoded body.
Jimi's integration push API sends:
Content-Type: application/json
Body: {"token": "...", "data_list": [{...}, ...]}
Some older/configured endpoints may still use form-encoded. This helper
handles both so each endpoint doesn't need to know which format arrived.
"""
body = await request.body()
if not body:
return "", []
# Jimi integration push format (observed live):
# Content-Type: application/x-www-form-urlencoded
# Body: msgType=<topic>&data=<URL-encoded JSON object or array>
# The `data` field holds a single JSON object per event, not an array.
try:
form = await request.form()
except Exception:
log.warning("push: form parse failed", exc_info=True)
return "", []
token = str(form.get("token", ""))
raw_data = form.get("data") or form.get("data_list") or ""
if not raw_data:
return token, []
try:
parsed = json.loads(raw_data)
except (json.JSONDecodeError, TypeError):
log.warning("push: data JSON parse failed — %.200s", raw_data)
return token, []
items = parsed if isinstance(parsed, list) else [parsed]
if len(items) > MAX_ITEMS_PER_POST:
log.warning("push: truncated %d%d items", len(items), MAX_ITEMS_PER_POST)
items = items[:MAX_ITEMS_PER_POST]
return token, items
def unix_to_ts(v) -> Optional[str]:
"""Convert Unix timestamp (seconds or milliseconds) to ISO string."""
if v is None:
return None
try:
ts = int(v)
if ts > 1e12:
ts = ts // 1000
return datetime.fromtimestamp(ts, tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
except (ValueError, TypeError, OSError):
return None
def _parse_trip_ts(v) -> Optional[str]:
"""[BUG-03] Parse trip timestamps. Handles ISO strings and Jimi BCD formats."""
iso = clean_ts(v)
if iso:
return iso
s = clean(v)
if s is None:
return None
try:
if len(s) == 12: # YYMMDDHHmmss
return datetime.strptime(s, "%y%m%d%H%M%S").strftime("%Y-%m-%d %H:%M:%S")
if len(s) == 14: # YYYYMMDDHHmmss
return datetime.strptime(s, "%Y%m%d%H%M%S").strftime("%Y-%m-%d %H:%M:%S")
except (ValueError, TypeError):
pass
log.warning("Cannot parse trip timestamp: %r", v)
return None
def _make_geom_params(lat, lng):
"""Return (lng, lat, lng, lat) tuple for the CASE WHEN ST_MakePoint pattern."""
return (lng, lat, lng, lat)
def _ensure_device(cur, imei: str, device_name: Optional[str] = None) -> None:
"""Upsert a stub row into tracksolid.devices so FK-constrained inserts don't fail.
Jimi pushes alarms/GPS for devices that may not yet be in our devices table
(neither API-sync'd nor in the onboarding CSV). Rather than drop the event,
register the IMEI with whatever context the push carried; the nightly
`sync_devices()` and the CSV import fill in the remaining fields later.
"""
cur.execute(
"""
INSERT INTO tracksolid.devices (imei, device_name, status, created_at, updated_at)
VALUES (%s, %s, 'unknown', NOW(), NOW())
ON CONFLICT (imei) DO NOTHING
""",
(imei, device_name),
)
# ── Health Check ──────────────────────────────────────────────────────────────
@app.get("/health")
def health():
return {"status": "ok"}
# ── 1. OBD Diagnostics (Priority 1) ──────────────────────────────────────────
@app.post("/pushobd")
async def push_obd(request: Request):
token, items = await _parse_request(request)
_validate_token(token)
if not items:
return JSONResponse(content=SUCCESS)
t0 = time.time()
inserted = 0
with get_conn() as conn:
with conn.cursor() as cur:
for item in items:
try:
cur.execute("SAVEPOINT sp")
imei = clean(item.get("deviceImei"))
obd = item.get("obdJson", {})
if isinstance(obd, str):
try:
obd = json.loads(obd)
except json.JSONDecodeError:
obd = {}
# [BUG-01] Try unix epoch first, fall back to ISO string.
event_time = (
unix_to_ts(obd.get("event_time"))
or clean_ts(obd.get("event_time"))
)
if not imei or not event_time:
cur.execute("RELEASE SAVEPOINT sp")
continue
lat = clean_num(obd.get("lat"))
lng = clean_num(obd.get("lng"))
cur.execute("""
INSERT INTO tracksolid.obd_readings (
imei, reading_time, car_type, acc_state, status_flags,
lat, lng, geom, obd_data, updated_at
) VALUES (
%s, %s, %s, %s, %s, %s, %s,
CASE WHEN %s IS NOT NULL AND %s IS NOT NULL
THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326)
ELSE NULL END,
%s, NOW()
) ON CONFLICT (imei, reading_time) DO UPDATE SET
obd_data = EXCLUDED.obd_data,
updated_at = NOW()
""", (
imei, event_time,
clean_int(obd.get("car_type")),
clean_int(obd.get("AccState")),
clean_int(obd.get("statusFlags")),
lat, lng,
*_make_geom_params(lat, lng),
json.dumps(obd),
))
cur.execute("RELEASE SAVEPOINT sp")
inserted += 1
except Exception:
cur.execute("ROLLBACK TO SAVEPOINT sp")
log.warning("Failed to process OBD item for %s", item.get("deviceImei"), exc_info=True)
log_ingestion(cur, "webhook/pushobd", len(items), 0, inserted,
int((time.time() - t0) * 1000), True)
log.info("pushobd: %d/%d items processed.", inserted, len(items))
return JSONResponse(content=SUCCESS)
# ── 2. DTC Fault Codes (Priority 1) ──────────────────────────────────────────
@app.post("/pushfaultinfo")
async def push_fault_info(request: Request):
token, items = await _parse_request(request)
_validate_token(token)
if not items:
return JSONResponse(content=SUCCESS)
t0 = time.time()
inserted = 0
with get_conn() as conn:
with conn.cursor() as cur:
for item in items:
try:
cur.execute("SAVEPOINT sp")
imei = clean(item.get("deviceImei"))
gate_time = clean_ts(item.get("gateTime"))
if not imei or not gate_time:
cur.execute("RELEASE SAVEPOINT sp")
continue
fault_codes = item.get("faultCodeList", [])
if isinstance(fault_codes, str):
try:
fault_codes = json.loads(fault_codes)
except json.JSONDecodeError:
fault_codes = []
lat = clean_num(item.get("lat"))
lng = clean_num(item.get("lng"))
evt_time = unix_to_ts(item.get("eventTime")) or clean_ts(item.get("eventTime"))
for code in fault_codes:
fault_code = clean(code)
# Guard NULL: ON CONFLICT won't deduplicate NULL fault_codes
# because NULL != NULL in Postgres unique constraints.
if not fault_code:
continue
cur.execute("""
INSERT INTO tracksolid.fault_codes (
imei, reported_at, fault_code, status_flags,
lat, lng, geom, event_time
) VALUES (
%s, %s, %s, %s, %s, %s,
CASE WHEN %s IS NOT NULL AND %s IS NOT NULL
THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326)
ELSE NULL END,
%s
) ON CONFLICT (imei, reported_at, fault_code) DO NOTHING
""", (
imei, gate_time, fault_code,
clean_int(item.get("statusFlags")),
lat, lng,
*_make_geom_params(lat, lng),
evt_time,
))
inserted += 1
cur.execute("RELEASE SAVEPOINT sp")
except Exception:
cur.execute("ROLLBACK TO SAVEPOINT sp")
log.warning("Failed to process fault item for %s", item.get("deviceImei"), exc_info=True)
log_ingestion(cur, "webhook/pushfaultinfo", len(items), 0, inserted,
int((time.time() - t0) * 1000), True)
log.info("pushfaultinfo: %d fault codes from %d items.", inserted, len(items))
return JSONResponse(content=SUCCESS)
# ── 3. Alarm Events (Priority 2) ─────────────────────────────────────────────
@app.post("/pushalarm")
async def push_alarm(request: Request):
token, items = await _parse_request(request)
_validate_token(token)
if not items:
return JSONResponse(content=SUCCESS)
t0 = time.time()
inserted = 0
with get_conn() as conn:
with conn.cursor() as cur:
for item in items:
try:
cur.execute("SAVEPOINT sp")
# Jimi integration push uses `imei` + `alarmTime`, NOT the
# `deviceImei` + `gateTime` fields shown in the API docs.
imei = clean(item.get("imei") or item.get("deviceImei"))
alarm_type = clean(item.get("alarmType"))
alarm_time = clean_ts(item.get("alarmTime") or item.get("gateTime"))
# [BUG-02] Also guard alarm_type — NULL alarm_type violates NOT NULL constraint.
if not imei or not alarm_time or not alarm_type:
cur.execute("RELEASE SAVEPOINT sp")
continue
# Ensure parent devices row exists to satisfy FK constraint.
_ensure_device(cur, imei, clean(item.get("deviceName")))
lat = clean_num(item.get("lat"))
lng = clean_num(item.get("lng"))
cur.execute("""
INSERT INTO tracksolid.alarms (
imei, alarm_type, alarm_name, alarm_time, geom,
lat, lng, speed, source, updated_at
) VALUES (
%s, %s, %s, %s,
CASE WHEN %s IS NOT NULL AND %s IS NOT NULL
THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326)
ELSE NULL END,
%s, %s, %s, 'push', NOW()
) ON CONFLICT (imei, alarm_type, alarm_time) DO NOTHING
""", (
imei, alarm_type, clean(item.get("alarmName")), alarm_time,
*_make_geom_params(lat, lng),
lat, lng,
clean_num(item.get("speed")),
))
cur.execute("RELEASE SAVEPOINT sp")
inserted += 1
except Exception:
cur.execute("ROLLBACK TO SAVEPOINT sp")
log.warning("Failed to process alarm for %s",
item.get("imei") or item.get("deviceImei"), exc_info=True)
log_ingestion(cur, "webhook/pushalarm", len(items), 0, inserted,
int((time.time() - t0) * 1000), True)
log.info("pushalarm: %d/%d items processed.", inserted, len(items))
return JSONResponse(content=SUCCESS)
# ── 4. GPS Positions (Priority 2) ────────────────────────────────────────────
@app.post("/pushgps")
async def push_gps(request: Request):
token, items = await _parse_request(request)
_validate_token(token)
if not items:
return JSONResponse(content=SUCCESS)
t0 = time.time()
perf+fix: SAVEPOINT-per-item pollers, batched GPS inserts, parallel detail fetch Audit fixes across the ingestion stack: Observability - Move log_ingestion out of batch loops in poll_alarms and poll_parking (was emitting N cumulative log rows per run instead of one). - Add missing log_ingestion + t0 to poll_trips. - Count inserted via cur.rowcount instead of naive +=1 so ON CONFLICT DO NOTHING no longer inflates the metric. Resilience - SAVEPOINT-per-item added to poll_alarms, poll_live_positions, poll_trips, poll_parking so one bad row no longer aborts the batch (webhook handlers already had this; pollers were inconsistent). Performance - /pushgps and poll_track_list now use psycopg2.extras.execute_values with ON CONFLICT DO NOTHING — 10-50x write throughput on larger batches. - sync_devices and sync_driver_audit fetch jimi.track.device.detail concurrently via ThreadPoolExecutor(max_workers=8), cutting the daily registry sync from ~24s to ~3s for an 80-device fleet. - poll_track_list split into two phases: parallel API fetch (4 workers, no DB connection held) then one batched write. Previously the DB connection was held across every per-IMEI HTTP call, risking pool starvation. Security - _validate_token uses hmac.compare_digest for constant-time token comparison (closes timing side-channel). - _parse_data_list caps incoming items at WEBHOOK_MAX_ITEMS (default 5000) so a pathological push cannot blow memory. Tests - Fix test_null_alarm_type_skipped: its INSERT-count assertion was catching the ingestion_log insert written by log_ingestion. Filter that out so the test checks only data-table inserts. - Full suite: 66 passed. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 21:33:55 +00:00
# Validation phase — pre-clean and filter without touching the DB.
# Per-row INSERT with SAVEPOINT was ~1 ms/row overhead at this volume;
# one batched execute_values is 10-50× faster for the same rows.
rows = []
for item in items:
imei = clean(item.get("deviceImei"))
gps_time = clean_ts(item.get("gpsTime"))
lat = clean_num(item.get("lat"))
lng = clean_num(item.get("lng"))
if not imei or not gps_time or not is_valid_fix(lat, lng):
continue
rows.append((
imei, gps_time, lng, lat, lat, lng,
clean_num(item.get("gpsSpeed")),
clean_num(item.get("direction")),
str(item.get("acc")) if item.get("acc") is not None else None,
clean_int(item.get("satelliteNum")),
clean_num(item.get("distance")),
clean_num(item.get("altitude")),
clean_int(item.get("postType")),
))
perf+fix: SAVEPOINT-per-item pollers, batched GPS inserts, parallel detail fetch Audit fixes across the ingestion stack: Observability - Move log_ingestion out of batch loops in poll_alarms and poll_parking (was emitting N cumulative log rows per run instead of one). - Add missing log_ingestion + t0 to poll_trips. - Count inserted via cur.rowcount instead of naive +=1 so ON CONFLICT DO NOTHING no longer inflates the metric. Resilience - SAVEPOINT-per-item added to poll_alarms, poll_live_positions, poll_trips, poll_parking so one bad row no longer aborts the batch (webhook handlers already had this; pollers were inconsistent). Performance - /pushgps and poll_track_list now use psycopg2.extras.execute_values with ON CONFLICT DO NOTHING — 10-50x write throughput on larger batches. - sync_devices and sync_driver_audit fetch jimi.track.device.detail concurrently via ThreadPoolExecutor(max_workers=8), cutting the daily registry sync from ~24s to ~3s for an 80-device fleet. - poll_track_list split into two phases: parallel API fetch (4 workers, no DB connection held) then one batched write. Previously the DB connection was held across every per-IMEI HTTP call, risking pool starvation. Security - _validate_token uses hmac.compare_digest for constant-time token comparison (closes timing side-channel). - _parse_data_list caps incoming items at WEBHOOK_MAX_ITEMS (default 5000) so a pathological push cannot blow memory. Tests - Fix test_null_alarm_type_skipped: its INSERT-count assertion was catching the ingestion_log insert written by log_ingestion. Filter that out so the test checks only data-table inserts. - Full suite: 66 passed. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 21:33:55 +00:00
inserted = 0
if rows:
with get_conn() as conn:
with conn.cursor() as cur:
execute_values(
cur,
"""
INSERT INTO tracksolid.position_history (
imei, gps_time, geom, lat, lng, speed, direction,
acc_status, satellite, current_mileage,
altitude, post_type, source
) VALUES %s
ON CONFLICT (imei, gps_time) DO NOTHING
""",
rows,
template="(%s, %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326),"
" %s, %s, %s, %s, %s, %s, %s, %s, %s, 'push')",
page_size=len(rows),
)
inserted = cur.rowcount
log_ingestion(cur, "webhook/pushgps", len(items), 0, inserted,
int((time.time() - t0) * 1000), True)
else:
# No valid rows, still record the call for observability.
with get_conn() as conn:
with conn.cursor() as cur:
log_ingestion(cur, "webhook/pushgps", len(items), 0, 0,
int((time.time() - t0) * 1000), True)
log.info("pushgps: %d/%d items inserted.", inserted, len(items))
return JSONResponse(content=SUCCESS)
# ── 5. Device Heartbeats (Priority 2) ────────────────────────────────────────
@app.post("/pushhb")
async def push_heartbeat(request: Request):
token, items = await _parse_request(request)
_validate_token(token)
if not items:
return JSONResponse(content=SUCCESS)
t0 = time.time()
inserted = 0
with get_conn() as conn:
with conn.cursor() as cur:
for item in items:
try:
cur.execute("SAVEPOINT sp")
imei = clean(item.get("deviceImei"))
gate_time = clean_ts(item.get("gateTime"))
if not imei or not gate_time:
cur.execute("RELEASE SAVEPOINT sp")
continue
cur.execute("""
INSERT INTO tracksolid.heartbeats (
imei, gate_time, power_level, gsm_signal,
acc_status, power_status, fortify
) VALUES (%s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (imei, gate_time) DO NOTHING
""", (
imei, gate_time,
clean_int(item.get("powerLevel")),
clean_int(item.get("gsmSign")),
clean_int(item.get("acc")),
clean_int(item.get("powerStatus")),
clean_int(item.get("fortify")),
))
cur.execute("RELEASE SAVEPOINT sp")
inserted += 1
except Exception:
cur.execute("ROLLBACK TO SAVEPOINT sp")
log.warning("Failed to process heartbeat for %s", item.get("deviceImei"), exc_info=True)
log_ingestion(cur, "webhook/pushhb", len(items), 0, inserted,
int((time.time() - t0) * 1000), True)
log.info("pushhb: %d/%d items processed.", inserted, len(items))
return JSONResponse(content=SUCCESS)
# ── 6. Trip Reports (Priority 2) ─────────────────────────────────────────────
@app.post("/pushtripreport")
async def push_trip_report(request: Request):
token, items = await _parse_request(request)
_validate_token(token)
if not items:
return JSONResponse(content=SUCCESS)
t0 = time.time()
inserted = 0
with get_conn() as conn:
with conn.cursor() as cur:
for item in items:
try:
cur.execute("SAVEPOINT sp")
imei = clean(item.get("deviceImei"))
# [BUG-03] Use _parse_trip_ts to handle Jimi BCD format YYMMDDHHmmss.
begin_time = _parse_trip_ts(item.get("beginTime"))
end_time = _parse_trip_ts(item.get("endTime"))
if not imei or not begin_time:
cur.execute("RELEASE SAVEPOINT sp")
continue
# [FIX-M11] API sends km. Store directly as distance_km.
distance_km = clean_num(item.get("miles"))
begin_lat = clean_num(item.get("beginLat"))
begin_lng = clean_num(item.get("beginLng"))
end_lat = clean_num(item.get("endLat"))
end_lng = clean_num(item.get("endLng"))
cur.execute("""
INSERT INTO tracksolid.trips (
imei, start_time, end_time, distance_km,
start_geom, end_geom,
fuel_consumed_l, idle_time_s, trip_seq, source,
updated_at
) VALUES (
%s, %s, %s, %s,
CASE WHEN %s IS NOT NULL AND %s IS NOT NULL
THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326)
ELSE NULL END,
CASE WHEN %s IS NOT NULL AND %s IS NOT NULL
THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326)
ELSE NULL END,
%s, %s, %s, 'push', NOW()
) ON CONFLICT (imei, start_time) DO UPDATE SET
end_time = EXCLUDED.end_time,
distance_km = EXCLUDED.distance_km,
end_geom = EXCLUDED.end_geom,
fuel_consumed_l = EXCLUDED.fuel_consumed_l,
idle_time_s = EXCLUDED.idle_time_s,
updated_at = NOW()
""", (
imei, begin_time, end_time, distance_km,
begin_lng, begin_lat, begin_lng, begin_lat,
end_lng, end_lat, end_lng, end_lat,
clean_num(item.get("oils")),
clean_int(item.get("idleTimes")),
clean_int(item.get("tripSeq")),
))
cur.execute("RELEASE SAVEPOINT sp")
inserted += 1
except Exception:
cur.execute("ROLLBACK TO SAVEPOINT sp")
log.warning("Failed to process trip for %s", item.get("deviceImei"), exc_info=True)
log_ingestion(cur, "webhook/pushtripreport", len(items), 0, inserted,
int((time.time() - t0) * 1000), True)
log.info("pushtripreport: %d/%d items processed.", inserted, len(items))
return JSONResponse(content=SUCCESS)
# ── 7. Device Events (LOGIN / LOGOUT) ────────────────────────────────────────
@app.post("/pushevent")
async def push_event(request: Request):
token, items = await _parse_request(request)
_validate_token(token)
if not items:
return JSONResponse(content=SUCCESS)
t0 = time.time()
inserted = 0
with get_conn() as conn:
with conn.cursor() as cur:
for item in items:
try:
cur.execute("SAVEPOINT sp")
imei = clean(item.get("deviceImei"))
event_type = clean(item.get("type"))
event_time = clean_ts(item.get("gateTime"))
if not imei or not event_type or not event_time:
cur.execute("RELEASE SAVEPOINT sp")
continue
cur.execute("""
INSERT INTO tracksolid.device_events
(imei, event_type, event_time, timezone)
VALUES (%s, %s, %s, %s)
ON CONFLICT (imei, event_type, event_time) DO NOTHING
""", (imei, event_type, event_time, clean(item.get("timezone"))))
cur.execute("RELEASE SAVEPOINT sp")
inserted += 1
except Exception:
cur.execute("ROLLBACK TO SAVEPOINT sp")
log.warning("Failed to process event for %s", item.get("deviceImei"), exc_info=True)
log_ingestion(cur, "webhook/pushevent", len(items), 0, inserted,
int((time.time() - t0) * 1000), True)
log.info("pushevent: %d/%d items processed.", inserted, len(items))
return JSONResponse(content=SUCCESS)