Fix 5 webhook bugs: SAVEPOINTs, NULL guards, BCD timestamps, /pushevent, log NULL fix

BUG-01: OBD event_time — try unix_to_ts before clean_ts (Jimi sends epoch ints)
BUG-02: push_alarm — guard alarm_type not null (NULL breaks ON CONFLICT dedup)
BUG-03: push_trip_report — _parse_trip_ts handles Jimi BCD format YYMMDDHHmmss
BUG-04: SAVEPOINT per item in all 5 DB endpoints (FK violation on one item no
        longer aborts the whole batch; SAVEPOINT now inside try for safety)
BUG-05: Add /pushevent endpoint (log-only; was returning 404 to Jimi)
FIX:    push_fault_info — skip null fault_code (NULL != NULL in PG unique index)
FIX:    log_ingestion — pass SQL NULL not string "None" when no error occurred

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
David Kiania 2026-04-11 18:19:13 +03:00
parent 1f11a65b0b
commit b1e4d6e85f
2 changed files with 87 additions and 16 deletions

View file

@ -231,10 +231,14 @@ def get_active_imeis() -> list[str]:
def log_ingestion(cur, endpoint: str, imei_count: int, upserted: int, inserted: int, duration_ms: int, success: bool, error_code: str = None, error_msg: str = None):
cur.execute("""
INSERT INTO tracksolid.ingestion_log
INSERT INTO tracksolid.ingestion_log
(endpoint, imei_count, rows_upserted, rows_inserted, duration_ms, success, error_code, error_message)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
""", (endpoint[:100], imei_count, upserted, inserted, duration_ms, success, str(error_code)[:50], str(error_msg)[:500]))
""", (
endpoint[:100], imei_count, upserted, inserted, duration_ms, success,
str(error_code)[:50] if error_code is not None else None,
str(error_msg)[:500] if error_msg is not None else None,
))
# ── Token Management ──────────────────────────────────────────────────────────

View file

@ -15,7 +15,15 @@ ENDPOINTS:
/pushgps GPS positions (Priority 2)
/pushhb Device heartbeats (Priority 2)
/pushtripreport Trip reports (Priority 2)
/pushevent Device events (Priority 3, log-only)
/health Healthcheck for Docker/monitoring
REVISIONS (QA-Verified):
[BUG-01] OBD event_time: try unix_to_ts before clean_ts (handles epoch timestamps).
[BUG-02] push_alarm: guard also checks alarm_type is not null (prevents FK violation).
[BUG-03] push_trip_report: _parse_trip_ts handles Jimi BCD format YYMMDDHHmmss.
[BUG-04] SAVEPOINT per item in all DB-writing endpoints (one bad item won't abort batch).
[BUG-05] Added /pushevent endpoint (log-only, prevents Jimi 404 errors).
"""
@ -53,7 +61,7 @@ WEBHOOK_TOKEN = os.getenv("JIMI_WEBHOOK_TOKEN", "")
@asynccontextmanager
async def lifespan(app: FastAPI):
log.info("Webhook receiver starting (v1.0)...")
log.info("Webhook receiver starting (v1.1)...")
yield
log.info("Webhook receiver shutting down...")
close_pool()
@ -96,6 +104,25 @@ def unix_to_ts(v) -> Optional[str]:
return None
def _parse_trip_ts(v) -> Optional[str]:
"""[BUG-03] Parse trip timestamps. Handles ISO strings and Jimi BCD formats."""
iso = clean_ts(v)
if iso:
return iso
s = clean(v)
if s is None:
return None
try:
if len(s) == 12: # YYMMDDHHmmss
return datetime.strptime(s, "%y%m%d%H%M%S").strftime("%Y-%m-%d %H:%M:%S")
if len(s) == 14: # YYYYMMDDHHmmss
return datetime.strptime(s, "%Y%m%d%H%M%S").strftime("%Y-%m-%d %H:%M:%S")
except (ValueError, TypeError):
pass
log.warning("Cannot parse trip timestamp: %r", v)
return None
def _make_geom_params(lat, lng):
"""Return (lng, lat, lng, lat) tuple for the CASE WHEN ST_MakePoint pattern."""
return (lng, lat, lng, lat)
@ -122,6 +149,7 @@ def push_obd(token: str = Form(""), data_list: str = Form("")):
with conn.cursor() as cur:
for item in items:
try:
cur.execute("SAVEPOINT sp")
imei = clean(item.get("deviceImei"))
obd = item.get("obdJson", {})
if isinstance(obd, str):
@ -130,8 +158,13 @@ def push_obd(token: str = Form(""), data_list: str = Form("")):
except json.JSONDecodeError:
obd = {}
event_time = clean_ts(obd.get("event_time"))
# [BUG-01] Try unix epoch first, fall back to ISO string.
event_time = (
unix_to_ts(obd.get("event_time"))
or clean_ts(obd.get("event_time"))
)
if not imei or not event_time:
cur.execute("RELEASE SAVEPOINT sp")
continue
lat = clean_num(obd.get("lat"))
@ -159,13 +192,14 @@ def push_obd(token: str = Form(""), data_list: str = Form("")):
*_make_geom_params(lat, lng),
json.dumps(obd),
))
cur.execute("RELEASE SAVEPOINT sp")
inserted += 1
except Exception:
cur.execute("ROLLBACK TO SAVEPOINT sp")
log.warning("Failed to process OBD item for %s", item.get("deviceImei"), exc_info=True)
log_ingestion(cur, "webhook/pushobd", len(items), 0, inserted,
int((time.time() - t0) * 1000), True)
conn.commit()
log.info("pushobd: %d/%d items processed.", inserted, len(items))
return JSONResponse(content=SUCCESS)
@ -186,9 +220,11 @@ def push_fault_info(token: str = Form(""), data_list: str = Form("")):
with conn.cursor() as cur:
for item in items:
try:
cur.execute("SAVEPOINT sp")
imei = clean(item.get("deviceImei"))
gate_time = clean_ts(item.get("gateTime"))
if not imei or not gate_time:
cur.execute("RELEASE SAVEPOINT sp")
continue
fault_codes = item.get("faultCodeList", [])
@ -203,6 +239,11 @@ def push_fault_info(token: str = Form(""), data_list: str = Form("")):
evt_time = unix_to_ts(item.get("eventTime")) or clean_ts(item.get("eventTime"))
for code in fault_codes:
fault_code = clean(code)
# Guard NULL: ON CONFLICT won't deduplicate NULL fault_codes
# because NULL != NULL in Postgres unique constraints.
if not fault_code:
continue
cur.execute("""
INSERT INTO tracksolid.fault_codes (
imei, reported_at, fault_code, status_flags,
@ -215,19 +256,20 @@ def push_fault_info(token: str = Form(""), data_list: str = Form("")):
%s
) ON CONFLICT (imei, reported_at, fault_code) DO NOTHING
""", (
imei, gate_time, clean(code),
imei, gate_time, fault_code,
clean_int(item.get("statusFlags")),
lat, lng,
*_make_geom_params(lat, lng),
evt_time,
))
inserted += 1
cur.execute("RELEASE SAVEPOINT sp")
except Exception:
cur.execute("ROLLBACK TO SAVEPOINT sp")
log.warning("Failed to process fault item for %s", item.get("deviceImei"), exc_info=True)
log_ingestion(cur, "webhook/pushfaultinfo", len(items), 0, inserted,
int((time.time() - t0) * 1000), True)
conn.commit()
log.info("pushfaultinfo: %d fault codes from %d items.", inserted, len(items))
return JSONResponse(content=SUCCESS)
@ -248,10 +290,13 @@ def push_alarm(token: str = Form(""), data_list: str = Form("")):
with conn.cursor() as cur:
for item in items:
try:
cur.execute("SAVEPOINT sp")
imei = clean(item.get("deviceImei"))
alarm_type = clean(item.get("alarmType"))
alarm_time = clean_ts(item.get("gateTime"))
if not imei or not alarm_time:
# [BUG-02] Also guard alarm_type — NULL alarm_type violates NOT NULL constraint.
if not imei or not alarm_time or not alarm_type:
cur.execute("RELEASE SAVEPOINT sp")
continue
lat = clean_num(item.get("lat"))
@ -274,13 +319,14 @@ def push_alarm(token: str = Form(""), data_list: str = Form("")):
lat, lng,
clean_num(item.get("speed")),
))
cur.execute("RELEASE SAVEPOINT sp")
inserted += 1
except Exception:
cur.execute("ROLLBACK TO SAVEPOINT sp")
log.warning("Failed to process alarm for %s", item.get("deviceImei"), exc_info=True)
log_ingestion(cur, "webhook/pushalarm", len(items), 0, inserted,
int((time.time() - t0) * 1000), True)
conn.commit()
log.info("pushalarm: %d/%d items processed.", inserted, len(items))
return JSONResponse(content=SUCCESS)
@ -301,12 +347,14 @@ def push_gps(token: str = Form(""), data_list: str = Form("")):
with conn.cursor() as cur:
for item in items:
try:
cur.execute("SAVEPOINT sp")
imei = clean(item.get("deviceImei"))
gps_time = clean_ts(item.get("gpsTime"))
lat = clean_num(item.get("lat"))
lng = clean_num(item.get("lng"))
if not imei or not gps_time or not is_valid_fix(lat, lng):
cur.execute("RELEASE SAVEPOINT sp")
continue
cur.execute("""
@ -329,13 +377,14 @@ def push_gps(token: str = Form(""), data_list: str = Form("")):
clean_num(item.get("altitude")),
clean_int(item.get("postType")),
))
cur.execute("RELEASE SAVEPOINT sp")
inserted += 1
except Exception:
cur.execute("ROLLBACK TO SAVEPOINT sp")
log.warning("Failed to process GPS for %s", item.get("deviceImei"), exc_info=True)
log_ingestion(cur, "webhook/pushgps", len(items), 0, inserted,
int((time.time() - t0) * 1000), True)
conn.commit()
log.info("pushgps: %d/%d items processed.", inserted, len(items))
return JSONResponse(content=SUCCESS)
@ -356,9 +405,11 @@ def push_heartbeat(token: str = Form(""), data_list: str = Form("")):
with conn.cursor() as cur:
for item in items:
try:
cur.execute("SAVEPOINT sp")
imei = clean(item.get("deviceImei"))
gate_time = clean_ts(item.get("gateTime"))
if not imei or not gate_time:
cur.execute("RELEASE SAVEPOINT sp")
continue
cur.execute("""
@ -375,13 +426,14 @@ def push_heartbeat(token: str = Form(""), data_list: str = Form("")):
clean_int(item.get("powerStatus")),
clean_int(item.get("fortify")),
))
cur.execute("RELEASE SAVEPOINT sp")
inserted += 1
except Exception:
cur.execute("ROLLBACK TO SAVEPOINT sp")
log.warning("Failed to process heartbeat for %s", item.get("deviceImei"), exc_info=True)
log_ingestion(cur, "webhook/pushhb", len(items), 0, inserted,
int((time.time() - t0) * 1000), True)
conn.commit()
log.info("pushhb: %d/%d items processed.", inserted, len(items))
return JSONResponse(content=SUCCESS)
@ -402,14 +454,16 @@ def push_trip_report(token: str = Form(""), data_list: str = Form("")):
with conn.cursor() as cur:
for item in items:
try:
cur.execute("SAVEPOINT sp")
imei = clean(item.get("deviceImei"))
begin_time = clean_ts(item.get("beginTime"))
end_time = clean_ts(item.get("endTime"))
# [BUG-03] Use _parse_trip_ts to handle Jimi BCD format YYMMDDHHmmss.
begin_time = _parse_trip_ts(item.get("beginTime"))
end_time = _parse_trip_ts(item.get("endTime"))
if not imei or not begin_time:
cur.execute("RELEASE SAVEPOINT sp")
continue
# [FIX-M11] API sends miles (km). Store directly as distance_km.
# Previous code multiplied by 1000, producing mm not m.
# [FIX-M11] API sends km. Store directly as distance_km.
distance_km = clean_num(item.get("miles"))
begin_lat = clean_num(item.get("beginLat"))
@ -447,13 +501,26 @@ def push_trip_report(token: str = Form(""), data_list: str = Form("")):
clean_int(item.get("idleTimes")),
clean_int(item.get("tripSeq")),
))
cur.execute("RELEASE SAVEPOINT sp")
inserted += 1
except Exception:
cur.execute("ROLLBACK TO SAVEPOINT sp")
log.warning("Failed to process trip for %s", item.get("deviceImei"), exc_info=True)
log_ingestion(cur, "webhook/pushtripreport", len(items), 0, inserted,
int((time.time() - t0) * 1000), True)
conn.commit()
log.info("pushtripreport: %d/%d items processed.", inserted, len(items))
return JSONResponse(content=SUCCESS)
# ── 7. Device Events (Priority 3 — log only) ─────────────────────────────────
@app.post("/pushevent")
def push_event(token: str = Form(""), data_list: str = Form("")):
"""[BUG-05] Accept Jimi event pushes so they don't 404. Log for future schema work."""
_validate_token(token)
items = _parse_data_list(data_list)
for item in items:
log.info("pushevent: imei=%s type=%s gateTime=%s",
item.get("deviceImei"), item.get("type"), item.get("gateTime"))
return JSONResponse(content=SUCCESS)