From b1e4d6e85f7a9746b55e105c3c7f01860e229a6f Mon Sep 17 00:00:00 2001 From: David Kiania Date: Sat, 11 Apr 2026 18:19:13 +0300 Subject: [PATCH] Fix 5 webhook bugs: SAVEPOINTs, NULL guards, BCD timestamps, /pushevent, log NULL fix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit BUG-01: OBD event_time — try unix_to_ts before clean_ts (Jimi sends epoch ints) BUG-02: push_alarm — guard alarm_type not null (NULL breaks ON CONFLICT dedup) BUG-03: push_trip_report — _parse_trip_ts handles Jimi BCD format YYMMDDHHmmss BUG-04: SAVEPOINT per item in all 5 DB endpoints (FK violation on one item no longer aborts the whole batch; SAVEPOINT now inside try for safety) BUG-05: Add /pushevent endpoint (log-only; was returning 404 to Jimi) FIX: push_fault_info — skip null fault_code (NULL != NULL in PG unique index) FIX: log_ingestion — pass SQL NULL not string "None" when no error occurred Co-Authored-By: Claude Sonnet 4.6 --- ts_shared_rev.py | 8 +++- webhook_receiver_rev.py | 95 +++++++++++++++++++++++++++++++++++------ 2 files changed, 87 insertions(+), 16 deletions(-) diff --git a/ts_shared_rev.py b/ts_shared_rev.py index 18589c4..251575f 100644 --- a/ts_shared_rev.py +++ b/ts_shared_rev.py @@ -231,10 +231,14 @@ def get_active_imeis() -> list[str]: def log_ingestion(cur, endpoint: str, imei_count: int, upserted: int, inserted: int, duration_ms: int, success: bool, error_code: str = None, error_msg: str = None): cur.execute(""" - INSERT INTO tracksolid.ingestion_log + INSERT INTO tracksolid.ingestion_log (endpoint, imei_count, rows_upserted, rows_inserted, duration_ms, success, error_code, error_message) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) - """, (endpoint[:100], imei_count, upserted, inserted, duration_ms, success, str(error_code)[:50], str(error_msg)[:500])) + """, ( + endpoint[:100], imei_count, upserted, inserted, duration_ms, success, + str(error_code)[:50] if error_code is not None else None, + str(error_msg)[:500] if error_msg is not None else None, + )) # ── Token Management ────────────────────────────────────────────────────────── diff --git a/webhook_receiver_rev.py b/webhook_receiver_rev.py index a5a73a3..a5c338e 100644 --- a/webhook_receiver_rev.py +++ b/webhook_receiver_rev.py @@ -15,7 +15,15 @@ ENDPOINTS: /pushgps — GPS positions (Priority 2) /pushhb — Device heartbeats (Priority 2) /pushtripreport — Trip reports (Priority 2) + /pushevent — Device events (Priority 3, log-only) /health — Healthcheck for Docker/monitoring + +REVISIONS (QA-Verified): + [BUG-01] OBD event_time: try unix_to_ts before clean_ts (handles epoch timestamps). + [BUG-02] push_alarm: guard also checks alarm_type is not null (prevents FK violation). + [BUG-03] push_trip_report: _parse_trip_ts handles Jimi BCD format YYMMDDHHmmss. + [BUG-04] SAVEPOINT per item in all DB-writing endpoints (one bad item won't abort batch). + [BUG-05] Added /pushevent endpoint (log-only, prevents Jimi 404 errors). ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ """ @@ -53,7 +61,7 @@ WEBHOOK_TOKEN = os.getenv("JIMI_WEBHOOK_TOKEN", "") @asynccontextmanager async def lifespan(app: FastAPI): - log.info("Webhook receiver starting (v1.0)...") + log.info("Webhook receiver starting (v1.1)...") yield log.info("Webhook receiver shutting down...") close_pool() @@ -96,6 +104,25 @@ def unix_to_ts(v) -> Optional[str]: return None +def _parse_trip_ts(v) -> Optional[str]: + """[BUG-03] Parse trip timestamps. Handles ISO strings and Jimi BCD formats.""" + iso = clean_ts(v) + if iso: + return iso + s = clean(v) + if s is None: + return None + try: + if len(s) == 12: # YYMMDDHHmmss + return datetime.strptime(s, "%y%m%d%H%M%S").strftime("%Y-%m-%d %H:%M:%S") + if len(s) == 14: # YYYYMMDDHHmmss + return datetime.strptime(s, "%Y%m%d%H%M%S").strftime("%Y-%m-%d %H:%M:%S") + except (ValueError, TypeError): + pass + log.warning("Cannot parse trip timestamp: %r", v) + return None + + def _make_geom_params(lat, lng): """Return (lng, lat, lng, lat) tuple for the CASE WHEN ST_MakePoint pattern.""" return (lng, lat, lng, lat) @@ -122,6 +149,7 @@ def push_obd(token: str = Form(""), data_list: str = Form("")): with conn.cursor() as cur: for item in items: try: + cur.execute("SAVEPOINT sp") imei = clean(item.get("deviceImei")) obd = item.get("obdJson", {}) if isinstance(obd, str): @@ -130,8 +158,13 @@ def push_obd(token: str = Form(""), data_list: str = Form("")): except json.JSONDecodeError: obd = {} - event_time = clean_ts(obd.get("event_time")) + # [BUG-01] Try unix epoch first, fall back to ISO string. + event_time = ( + unix_to_ts(obd.get("event_time")) + or clean_ts(obd.get("event_time")) + ) if not imei or not event_time: + cur.execute("RELEASE SAVEPOINT sp") continue lat = clean_num(obd.get("lat")) @@ -159,13 +192,14 @@ def push_obd(token: str = Form(""), data_list: str = Form("")): *_make_geom_params(lat, lng), json.dumps(obd), )) + cur.execute("RELEASE SAVEPOINT sp") inserted += 1 except Exception: + cur.execute("ROLLBACK TO SAVEPOINT sp") log.warning("Failed to process OBD item for %s", item.get("deviceImei"), exc_info=True) log_ingestion(cur, "webhook/pushobd", len(items), 0, inserted, int((time.time() - t0) * 1000), True) - conn.commit() log.info("pushobd: %d/%d items processed.", inserted, len(items)) return JSONResponse(content=SUCCESS) @@ -186,9 +220,11 @@ def push_fault_info(token: str = Form(""), data_list: str = Form("")): with conn.cursor() as cur: for item in items: try: + cur.execute("SAVEPOINT sp") imei = clean(item.get("deviceImei")) gate_time = clean_ts(item.get("gateTime")) if not imei or not gate_time: + cur.execute("RELEASE SAVEPOINT sp") continue fault_codes = item.get("faultCodeList", []) @@ -203,6 +239,11 @@ def push_fault_info(token: str = Form(""), data_list: str = Form("")): evt_time = unix_to_ts(item.get("eventTime")) or clean_ts(item.get("eventTime")) for code in fault_codes: + fault_code = clean(code) + # Guard NULL: ON CONFLICT won't deduplicate NULL fault_codes + # because NULL != NULL in Postgres unique constraints. + if not fault_code: + continue cur.execute(""" INSERT INTO tracksolid.fault_codes ( imei, reported_at, fault_code, status_flags, @@ -215,19 +256,20 @@ def push_fault_info(token: str = Form(""), data_list: str = Form("")): %s ) ON CONFLICT (imei, reported_at, fault_code) DO NOTHING """, ( - imei, gate_time, clean(code), + imei, gate_time, fault_code, clean_int(item.get("statusFlags")), lat, lng, *_make_geom_params(lat, lng), evt_time, )) inserted += 1 + cur.execute("RELEASE SAVEPOINT sp") except Exception: + cur.execute("ROLLBACK TO SAVEPOINT sp") log.warning("Failed to process fault item for %s", item.get("deviceImei"), exc_info=True) log_ingestion(cur, "webhook/pushfaultinfo", len(items), 0, inserted, int((time.time() - t0) * 1000), True) - conn.commit() log.info("pushfaultinfo: %d fault codes from %d items.", inserted, len(items)) return JSONResponse(content=SUCCESS) @@ -248,10 +290,13 @@ def push_alarm(token: str = Form(""), data_list: str = Form("")): with conn.cursor() as cur: for item in items: try: + cur.execute("SAVEPOINT sp") imei = clean(item.get("deviceImei")) alarm_type = clean(item.get("alarmType")) alarm_time = clean_ts(item.get("gateTime")) - if not imei or not alarm_time: + # [BUG-02] Also guard alarm_type — NULL alarm_type violates NOT NULL constraint. + if not imei or not alarm_time or not alarm_type: + cur.execute("RELEASE SAVEPOINT sp") continue lat = clean_num(item.get("lat")) @@ -274,13 +319,14 @@ def push_alarm(token: str = Form(""), data_list: str = Form("")): lat, lng, clean_num(item.get("speed")), )) + cur.execute("RELEASE SAVEPOINT sp") inserted += 1 except Exception: + cur.execute("ROLLBACK TO SAVEPOINT sp") log.warning("Failed to process alarm for %s", item.get("deviceImei"), exc_info=True) log_ingestion(cur, "webhook/pushalarm", len(items), 0, inserted, int((time.time() - t0) * 1000), True) - conn.commit() log.info("pushalarm: %d/%d items processed.", inserted, len(items)) return JSONResponse(content=SUCCESS) @@ -301,12 +347,14 @@ def push_gps(token: str = Form(""), data_list: str = Form("")): with conn.cursor() as cur: for item in items: try: + cur.execute("SAVEPOINT sp") imei = clean(item.get("deviceImei")) gps_time = clean_ts(item.get("gpsTime")) lat = clean_num(item.get("lat")) lng = clean_num(item.get("lng")) if not imei or not gps_time or not is_valid_fix(lat, lng): + cur.execute("RELEASE SAVEPOINT sp") continue cur.execute(""" @@ -329,13 +377,14 @@ def push_gps(token: str = Form(""), data_list: str = Form("")): clean_num(item.get("altitude")), clean_int(item.get("postType")), )) + cur.execute("RELEASE SAVEPOINT sp") inserted += 1 except Exception: + cur.execute("ROLLBACK TO SAVEPOINT sp") log.warning("Failed to process GPS for %s", item.get("deviceImei"), exc_info=True) log_ingestion(cur, "webhook/pushgps", len(items), 0, inserted, int((time.time() - t0) * 1000), True) - conn.commit() log.info("pushgps: %d/%d items processed.", inserted, len(items)) return JSONResponse(content=SUCCESS) @@ -356,9 +405,11 @@ def push_heartbeat(token: str = Form(""), data_list: str = Form("")): with conn.cursor() as cur: for item in items: try: + cur.execute("SAVEPOINT sp") imei = clean(item.get("deviceImei")) gate_time = clean_ts(item.get("gateTime")) if not imei or not gate_time: + cur.execute("RELEASE SAVEPOINT sp") continue cur.execute(""" @@ -375,13 +426,14 @@ def push_heartbeat(token: str = Form(""), data_list: str = Form("")): clean_int(item.get("powerStatus")), clean_int(item.get("fortify")), )) + cur.execute("RELEASE SAVEPOINT sp") inserted += 1 except Exception: + cur.execute("ROLLBACK TO SAVEPOINT sp") log.warning("Failed to process heartbeat for %s", item.get("deviceImei"), exc_info=True) log_ingestion(cur, "webhook/pushhb", len(items), 0, inserted, int((time.time() - t0) * 1000), True) - conn.commit() log.info("pushhb: %d/%d items processed.", inserted, len(items)) return JSONResponse(content=SUCCESS) @@ -402,14 +454,16 @@ def push_trip_report(token: str = Form(""), data_list: str = Form("")): with conn.cursor() as cur: for item in items: try: + cur.execute("SAVEPOINT sp") imei = clean(item.get("deviceImei")) - begin_time = clean_ts(item.get("beginTime")) - end_time = clean_ts(item.get("endTime")) + # [BUG-03] Use _parse_trip_ts to handle Jimi BCD format YYMMDDHHmmss. + begin_time = _parse_trip_ts(item.get("beginTime")) + end_time = _parse_trip_ts(item.get("endTime")) if not imei or not begin_time: + cur.execute("RELEASE SAVEPOINT sp") continue - # [FIX-M11] API sends miles (km). Store directly as distance_km. - # Previous code multiplied by 1000, producing mm not m. + # [FIX-M11] API sends km. Store directly as distance_km. distance_km = clean_num(item.get("miles")) begin_lat = clean_num(item.get("beginLat")) @@ -447,13 +501,26 @@ def push_trip_report(token: str = Form(""), data_list: str = Form("")): clean_int(item.get("idleTimes")), clean_int(item.get("tripSeq")), )) + cur.execute("RELEASE SAVEPOINT sp") inserted += 1 except Exception: + cur.execute("ROLLBACK TO SAVEPOINT sp") log.warning("Failed to process trip for %s", item.get("deviceImei"), exc_info=True) log_ingestion(cur, "webhook/pushtripreport", len(items), 0, inserted, int((time.time() - t0) * 1000), True) - conn.commit() log.info("pushtripreport: %d/%d items processed.", inserted, len(items)) return JSONResponse(content=SUCCESS) + +# ── 7. Device Events (Priority 3 — log only) ───────────────────────────────── + +@app.post("/pushevent") +def push_event(token: str = Form(""), data_list: str = Form("")): + """[BUG-05] Accept Jimi event pushes so they don't 404. Log for future schema work.""" + _validate_token(token) + items = _parse_data_list(data_list) + for item in items: + log.info("pushevent: imei=%s type=%s gateTime=%s", + item.get("deviceImei"), item.get("type"), item.get("gateTime")) + return JSONResponse(content=SUCCESS)