Merge pull request 'fix: webhook handles JSON body push format from Jimi' (#6) from quality-program-2026-04-12 into main
Some checks are pending
Static Analysis / static (push) Waiting to run
Tests / test (push) Waiting to run

This commit is contained in:
kianiadee 2026-04-21 08:57:04 +00:00
commit 809dbb165c

View file

@ -41,7 +41,7 @@ from typing import Optional
# monopolising a worker or blowing the DB pool. Jimi normally sends ≤ 200.
MAX_ITEMS_PER_POST = int(os.getenv("WEBHOOK_MAX_ITEMS", "5000"))
from fastapi import FastAPI, Form, HTTPException
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse
from psycopg2.extras import execute_values
@ -86,7 +86,7 @@ def _validate_token(token: str) -> None:
def _parse_data_list(raw: str) -> list[dict]:
"""Parse the JSON string from Jimi's data_list form field."""
"""Parse a JSON string into a list of dicts. raw may be a JSON array or single object."""
try:
parsed = json.loads(raw)
items = parsed if isinstance(parsed, list) else [parsed]
@ -96,10 +96,59 @@ def _parse_data_list(raw: str) -> list[dict]:
items = items[:MAX_ITEMS_PER_POST]
return items
except (json.JSONDecodeError, TypeError):
log.warning("Failed to parse data_list: %.200s", raw)
return []
async def _parse_request(request: Request) -> tuple[str, list[dict]]:
"""Extract token + data_list from either a JSON body or form-encoded body.
Jimi's integration push API sends:
Content-Type: application/json
Body: {"token": "...", "data_list": [{...}, ...]}
Some older/configured endpoints may still use form-encoded. This helper
handles both so each endpoint doesn't need to know which format arrived.
"""
content_type = request.headers.get("content-type", "")
body = await request.body()
if not body:
log.debug("push: empty request body")
return "", []
# ── Try JSON body first (integration push format) ──────────────────────────
if "application/json" in content_type or body.lstrip()[:1] == b"{":
try:
payload = json.loads(body)
token = str(payload.get("token", ""))
raw_dl = payload.get("data_list", [])
if isinstance(raw_dl, list):
items = raw_dl[:MAX_ITEMS_PER_POST]
elif isinstance(raw_dl, str):
items = _parse_data_list(raw_dl)
else:
items = []
log.debug("push: parsed JSON body — %d items", len(items))
return token, items
except (json.JSONDecodeError, TypeError):
log.warning("push: JSON body parse failed — body: %.200s",
body.decode("utf-8", errors="replace"))
# ── Fall back to form-encoded ───────────────────────────────────────────────
try:
form = await request.form()
token = str(form.get("token", ""))
raw_dl = str(form.get("data_list", ""))
items = _parse_data_list(raw_dl) if raw_dl else []
log.debug("push: parsed form body — %d items", len(items))
return token, items
except Exception:
log.warning("push: form body parse failed — body: %.200s",
body.decode("utf-8", errors="replace"), exc_info=True)
return "", []
def unix_to_ts(v) -> Optional[str]:
"""Convert Unix timestamp (seconds or milliseconds) to ISO string."""
if v is None:
@ -145,9 +194,9 @@ def health():
# ── 1. OBD Diagnostics (Priority 1) ──────────────────────────────────────────
@app.post("/pushobd")
def push_obd(token: str = Form(""), data_list: str = Form("")):
async def push_obd(request: Request):
token, items = await _parse_request(request)
_validate_token(token)
items = _parse_data_list(data_list)
if not items:
return JSONResponse(content=SUCCESS)
@ -216,9 +265,9 @@ def push_obd(token: str = Form(""), data_list: str = Form("")):
# ── 2. DTC Fault Codes (Priority 1) ──────────────────────────────────────────
@app.post("/pushfaultinfo")
def push_fault_info(token: str = Form(""), data_list: str = Form("")):
async def push_fault_info(request: Request):
token, items = await _parse_request(request)
_validate_token(token)
items = _parse_data_list(data_list)
if not items:
return JSONResponse(content=SUCCESS)
@ -286,9 +335,9 @@ def push_fault_info(token: str = Form(""), data_list: str = Form("")):
# ── 3. Alarm Events (Priority 2) ─────────────────────────────────────────────
@app.post("/pushalarm")
def push_alarm(token: str = Form(""), data_list: str = Form("")):
async def push_alarm(request: Request):
token, items = await _parse_request(request)
_validate_token(token)
items = _parse_data_list(data_list)
if not items:
return JSONResponse(content=SUCCESS)
@ -343,9 +392,9 @@ def push_alarm(token: str = Form(""), data_list: str = Form("")):
# ── 4. GPS Positions (Priority 2) ────────────────────────────────────────────
@app.post("/pushgps")
def push_gps(token: str = Form(""), data_list: str = Form("")):
async def push_gps(request: Request):
token, items = await _parse_request(request)
_validate_token(token)
items = _parse_data_list(data_list)
if not items:
return JSONResponse(content=SUCCESS)
@ -407,9 +456,9 @@ def push_gps(token: str = Form(""), data_list: str = Form("")):
# ── 5. Device Heartbeats (Priority 2) ────────────────────────────────────────
@app.post("/pushhb")
def push_heartbeat(token: str = Form(""), data_list: str = Form("")):
async def push_heartbeat(request: Request):
token, items = await _parse_request(request)
_validate_token(token)
items = _parse_data_list(data_list)
if not items:
return JSONResponse(content=SUCCESS)
@ -456,9 +505,9 @@ def push_heartbeat(token: str = Form(""), data_list: str = Form("")):
# ── 6. Trip Reports (Priority 2) ─────────────────────────────────────────────
@app.post("/pushtripreport")
def push_trip_report(token: str = Form(""), data_list: str = Form("")):
async def push_trip_report(request: Request):
token, items = await _parse_request(request)
_validate_token(token)
items = _parse_data_list(data_list)
if not items:
return JSONResponse(content=SUCCESS)
@ -531,9 +580,9 @@ def push_trip_report(token: str = Form(""), data_list: str = Form("")):
# ── 7. Device Events (LOGIN / LOGOUT) ────────────────────────────────────────
@app.post("/pushevent")
def push_event(token: str = Form(""), data_list: str = Form("")):
async def push_event(request: Request):
token, items = await _parse_request(request)
_validate_token(token)
items = _parse_data_list(data_list)
if not items:
return JSONResponse(content=SUCCESS)