""" webhook_receiver_rev.py — Fireside Communications · Tracksolid Webhook Receiver ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ RESPONSIBILITY: Receives real-time push data from Jimi Tracksolid Pro servers. Jimi's Data Push API POSTs telemetry to these endpoints as it arrives from devices, providing real-time ingestion without polling. This is the ONLY way to receive OBD diagnostics and DTC fault codes — those data types have no polling endpoint. ENDPOINTS: /pushobd — OBD CAN bus diagnostics (Priority 1) /pushfaultinfo — DTC fault codes (Priority 1) /pushalarm — Alarm events (Priority 2) /pushgps — GPS positions (Priority 2) /pushhb — Device heartbeats (Priority 2) /pushtripreport — Trip reports (Priority 2) /pushevent — Device LOGIN/LOGOUT events (Priority 3) /health — Healthcheck for Docker/monitoring REVISIONS (QA-Verified): [BUG-01] OBD event_time: try unix_to_ts before clean_ts (handles epoch timestamps). [BUG-02] push_alarm: guard also checks alarm_type is not null (prevents FK violation). [BUG-03] push_trip_report: _parse_trip_ts handles Jimi BCD format YYMMDDHHmmss. [BUG-04] SAVEPOINT per item in all DB-writing endpoints (one bad item won't abort batch). [BUG-05] Added /pushevent endpoint → writes to tracksolid.device_events. [FIX-W01] 260702 SEC-02: startup CRITICAL warning when JIMI_WEBHOOK_TOKEN is empty; WEBHOOK_REQUIRE_TOKEN=1 refuses to start unauthenticated. [FIX-W02] 260702 BUG-P3: _parse_request handles application/json bodies ({"token", "data_list"}) in addition to the observed form-encoded msgType/data format — a vendor-side format switch no longer silently discards pushes. [FIX-W03] 260702 BUG-P2: all DB work moved off the asyncio event loop into sync _process_* functions run via asyncio.to_thread — a large push no longer stalls /health and concurrent requests on the worker. [FIX-W04] 260702 BUG-P8: push_alarm rejects alarm_time outside a sane window (WEBHOOK_EVENT_MAX_AGE_DAYS / WEBHOOK_EVENT_MAX_FUTURE_DAYS) so devices with reset clocks (2019 timestamps observed live) stop polluting tracksolid.alarms. ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ """ from __future__ import annotations import asyncio import hmac import json import os import time from contextlib import asynccontextmanager from datetime import datetime, timedelta, timezone from typing import Optional # Cap on items per webhook POST. Prevents a malformed/malicious push from # monopolising a worker or blowing the DB pool. Jimi normally sends ≤ 200. MAX_ITEMS_PER_POST = int(os.getenv("WEBHOOK_MAX_ITEMS", "5000")) from fastapi import FastAPI, HTTPException, Request from fastapi.responses import JSONResponse from psycopg2.extras import execute_values from ts_shared_rev import ( close_pool, get_conn, log_ingestion, clean, clean_num, clean_int, clean_ts, is_valid_fix, get_logger, ensure_device, upsert_live_position, ) log = get_logger("webhook") # ── Configuration ───────────────────────────────────────────────────────────── WEBHOOK_TOKEN = os.getenv("JIMI_WEBHOOK_TOKEN", "") # [FIX-W04] Sanity window for pushed event timestamps. Devices with reset # clocks push alarm_time values years in the past (2019 observed live). _EVENT_MAX_AGE = timedelta(days=int(os.getenv("WEBHOOK_EVENT_MAX_AGE_DAYS", "30"))) _EVENT_MAX_FUTURE = timedelta(days=int(os.getenv("WEBHOOK_EVENT_MAX_FUTURE_DAYS", "2"))) # ── Lifespan ────────────────────────────────────────────────────────────────── @asynccontextmanager async def lifespan(app: FastAPI): # [FIX-W01] Fail loudly (or closed) when running without push authentication. if not WEBHOOK_TOKEN: if os.getenv("WEBHOOK_REQUIRE_TOKEN", "0") == "1": raise RuntimeError( "WEBHOOK_REQUIRE_TOKEN=1 but JIMI_WEBHOOK_TOKEN is empty — " "refusing to start an unauthenticated public webhook." ) log.critical( "JIMI_WEBHOOK_TOKEN is EMPTY — every /push* endpoint accepts " "UNAUTHENTICATED writes. Configure a push token in the Tracksolid " "console and set JIMI_WEBHOOK_TOKEN (+ WEBHOOK_REQUIRE_TOKEN=1)." ) log.info("Webhook receiver starting (v1.2)...") yield log.info("Webhook receiver shutting down...") close_pool() app = FastAPI(title="Tracksolid Webhook Receiver", lifespan=lifespan) # ── Helpers ─────────────────────────────────────────────────────────────────── SUCCESS = {"code": 0, "msg": "success"} def _validate_token(token: str) -> None: """Raise 403 if token is invalid. Skips validation if JIMI_WEBHOOK_TOKEN is empty.""" if WEBHOOK_TOKEN and not hmac.compare_digest(token, WEBHOOK_TOKEN): raise HTTPException(status_code=403, detail="Invalid token") def _cap_items(parsed) -> list[dict]: items = parsed if isinstance(parsed, list) else [parsed] if len(items) > MAX_ITEMS_PER_POST: log.warning("push: truncated %d → %d items", len(items), MAX_ITEMS_PER_POST) items = items[:MAX_ITEMS_PER_POST] return items async def _parse_request(request: Request) -> tuple[str, list[dict]]: """Extract token + items from either a JSON body or a form-encoded body. Two formats exist in the wild: 1. Observed live (integration push): Content-Type: application/x-www-form-urlencoded Body: msgType=&data= (`data` holds one JSON object per event, not an array.) 2. Documented Data Push API [FIX-W02]: Content-Type: application/json Body: {"token": "...", "data_list": [{...}, ...]} Both are handled here so no endpoint needs to know which format arrived — and a vendor-side format switch can't silently discard data. """ body = await request.body() if not body: return "", [] ctype = request.headers.get("content-type", "").lower() # [FIX-W02] JSON push format. if "application/json" in ctype: try: payload = json.loads(body) except (json.JSONDecodeError, UnicodeDecodeError): log.warning("push: JSON body parse failed — %.200s", body) return "", [] if not isinstance(payload, dict): return "", _cap_items(payload) if isinstance(payload, list) else [] token = str(payload.get("token", "") or "") raw = payload.get("data_list") or payload.get("data") or [] if isinstance(raw, str): try: raw = json.loads(raw) except (json.JSONDecodeError, TypeError): log.warning("push: data JSON parse failed — %.200s", raw) return token, [] return token, _cap_items(raw) # Form-encoded push format (observed live). try: form = await request.form() except Exception: log.warning("push: form parse failed", exc_info=True) return "", [] token = str(form.get("token", "")) raw_data = form.get("data") or form.get("data_list") or "" if not raw_data: return token, [] try: parsed = json.loads(raw_data) except (json.JSONDecodeError, TypeError): log.warning("push: data JSON parse failed — %.200s", raw_data) return token, [] return token, _cap_items(parsed) def unix_to_ts(v) -> Optional[str]: """Convert Unix timestamp (seconds or milliseconds) to ISO string.""" if v is None: return None try: ts = int(v) if ts > 1e12: ts = ts // 1000 return datetime.fromtimestamp(ts, tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S") except (ValueError, TypeError, OSError): return None def _parse_trip_ts(v) -> Optional[str]: """[BUG-03] Parse trip timestamps. Handles ISO strings and Jimi BCD formats.""" iso = clean_ts(v) if iso: return iso s = clean(v) if s is None: return None try: if len(s) == 12: # YYMMDDHHmmss return datetime.strptime(s, "%y%m%d%H%M%S").strftime("%Y-%m-%d %H:%M:%S") if len(s) == 14: # YYYYMMDDHHmmss return datetime.strptime(s, "%Y%m%d%H%M%S").strftime("%Y-%m-%d %H:%M:%S") except (ValueError, TypeError): pass log.warning("Cannot parse trip timestamp: %r", v) return None def _is_sane_event_ts(ts: Optional[str]) -> bool: """[FIX-W04] True when a pushed event timestamp falls inside the sane window (not older than WEBHOOK_EVENT_MAX_AGE_DAYS, not further ahead than WEBHOOK_EVENT_MAX_FUTURE_DAYS). Unparseable values count as insane.""" if not ts: return False try: dt = datetime.fromisoformat(str(ts).replace("Z", "+00:00")) except (ValueError, TypeError): return False if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) now = datetime.now(timezone.utc) return (now - _EVENT_MAX_AGE) <= dt <= (now + _EVENT_MAX_FUTURE) def _make_geom_params(lat, lng): """Return (lng, lat, lng, lat) tuple for the CASE WHEN ST_MakePoint pattern.""" return (lng, lat, lng, lat) # Backwards-compat shim. The implementation was relocated to ts_shared_rev # (as `ensure_device`) so ingest_movement_rev and any future writer can share # the FK-guard without re-defining it. Existing call sites in this file # continue to use the underscore-prefixed name. _ensure_device = ensure_device # ── Health Check ────────────────────────────────────────────────────────────── @app.get("/health") def health(): return {"status": "ok"} # ── 1. OBD Diagnostics (Priority 1) ────────────────────────────────────────── def _process_obd(items: list[dict]) -> int: """Blocking DB work for /pushobd — runs in a worker thread [FIX-W03].""" t0 = time.time() inserted = 0 with get_conn() as conn: with conn.cursor() as cur: for item in items: try: cur.execute("SAVEPOINT sp") imei = clean(item.get("deviceImei")) obd = item.get("obdJson", {}) if isinstance(obd, str): try: obd = json.loads(obd) except json.JSONDecodeError: obd = {} # [BUG-01] Try unix epoch first, fall back to ISO string. event_time = ( unix_to_ts(obd.get("event_time")) or clean_ts(obd.get("event_time")) ) if not imei or not event_time: cur.execute("RELEASE SAVEPOINT sp") continue lat = clean_num(obd.get("lat")) lng = clean_num(obd.get("lng")) cur.execute(""" INSERT INTO tracksolid.obd_readings ( imei, reading_time, car_type, acc_state, status_flags, lat, lng, geom, obd_data, updated_at ) VALUES ( %s, %s, %s, %s, %s, %s, %s, CASE WHEN %s IS NOT NULL AND %s IS NOT NULL THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326) ELSE NULL END, %s, NOW() ) ON CONFLICT (imei, reading_time) DO UPDATE SET obd_data = EXCLUDED.obd_data, updated_at = NOW() """, ( imei, event_time, clean_int(obd.get("car_type")), clean_int(obd.get("AccState")), clean_int(obd.get("statusFlags")), lat, lng, *_make_geom_params(lat, lng), json.dumps(obd), )) cur.execute("RELEASE SAVEPOINT sp") inserted += 1 except Exception: cur.execute("ROLLBACK TO SAVEPOINT sp") log.warning("Failed to process OBD item for %s", item.get("deviceImei"), exc_info=True) log_ingestion(cur, "webhook/pushobd", len(items), 0, inserted, int((time.time() - t0) * 1000), True) return inserted @app.post("/pushobd") async def push_obd(request: Request): token, items = await _parse_request(request) _validate_token(token) if not items: return JSONResponse(content=SUCCESS) inserted = await asyncio.to_thread(_process_obd, items) log.info("pushobd: %d/%d items processed.", inserted, len(items)) return JSONResponse(content=SUCCESS) # ── 2. DTC Fault Codes (Priority 1) ────────────────────────────────────────── def _process_fault_info(items: list[dict]) -> int: """Blocking DB work for /pushfaultinfo — runs in a worker thread [FIX-W03].""" t0 = time.time() inserted = 0 with get_conn() as conn: with conn.cursor() as cur: for item in items: try: cur.execute("SAVEPOINT sp") imei = clean(item.get("deviceImei")) gate_time = clean_ts(item.get("gateTime")) if not imei or not gate_time: cur.execute("RELEASE SAVEPOINT sp") continue fault_codes = item.get("faultCodeList", []) if isinstance(fault_codes, str): try: fault_codes = json.loads(fault_codes) except json.JSONDecodeError: fault_codes = [] lat = clean_num(item.get("lat")) lng = clean_num(item.get("lng")) evt_time = unix_to_ts(item.get("eventTime")) or clean_ts(item.get("eventTime")) for code in fault_codes: fault_code = clean(code) # Guard NULL: ON CONFLICT won't deduplicate NULL fault_codes # because NULL != NULL in Postgres unique constraints. if not fault_code: continue cur.execute(""" INSERT INTO tracksolid.fault_codes ( imei, reported_at, fault_code, status_flags, lat, lng, geom, event_time ) VALUES ( %s, %s, %s, %s, %s, %s, CASE WHEN %s IS NOT NULL AND %s IS NOT NULL THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326) ELSE NULL END, %s ) ON CONFLICT (imei, reported_at, fault_code) DO NOTHING """, ( imei, gate_time, fault_code, clean_int(item.get("statusFlags")), lat, lng, *_make_geom_params(lat, lng), evt_time, )) inserted += 1 cur.execute("RELEASE SAVEPOINT sp") except Exception: cur.execute("ROLLBACK TO SAVEPOINT sp") log.warning("Failed to process fault item for %s", item.get("deviceImei"), exc_info=True) log_ingestion(cur, "webhook/pushfaultinfo", len(items), 0, inserted, int((time.time() - t0) * 1000), True) return inserted @app.post("/pushfaultinfo") async def push_fault_info(request: Request): token, items = await _parse_request(request) _validate_token(token) if not items: return JSONResponse(content=SUCCESS) inserted = await asyncio.to_thread(_process_fault_info, items) log.info("pushfaultinfo: %d fault codes from %d items.", inserted, len(items)) return JSONResponse(content=SUCCESS) # ── 3. Alarm Events (Priority 2) ───────────────────────────────────────────── def _process_alarms(items: list[dict]) -> int: """Blocking DB work for /pushalarm — runs in a worker thread [FIX-W03].""" t0 = time.time() inserted = 0 with get_conn() as conn: with conn.cursor() as cur: for item in items: try: cur.execute("SAVEPOINT sp") # Jimi integration push uses `imei` + `alarmTime`, NOT the # `deviceImei` + `gateTime` fields shown in the API docs. imei = clean(item.get("imei") or item.get("deviceImei")) alarm_type = clean(item.get("alarmType")) alarm_time = clean_ts(item.get("alarmTime") or item.get("gateTime")) # [BUG-02] Also guard alarm_type — NULL alarm_type violates NOT NULL constraint. if not imei or not alarm_time or not alarm_type: cur.execute("RELEASE SAVEPOINT sp") continue # [FIX-W04] Reject device-clock garbage (2019 timestamps observed). if not _is_sane_event_ts(alarm_time): log.warning("pushalarm: rejected insane alarm_time %r for %s", alarm_time, imei) cur.execute("RELEASE SAVEPOINT sp") continue # Ensure parent devices row exists to satisfy FK constraint. _ensure_device(cur, imei, clean(item.get("deviceName"))) lat = clean_num(item.get("lat")) lng = clean_num(item.get("lng")) cur.execute(""" INSERT INTO tracksolid.alarms ( imei, alarm_type, alarm_name, alarm_time, geom, lat, lng, speed, source, updated_at ) VALUES ( %s, %s, %s, %s, CASE WHEN %s IS NOT NULL AND %s IS NOT NULL THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326) ELSE NULL END, %s, %s, %s, 'push', NOW() ) ON CONFLICT (imei, alarm_type, alarm_time) DO NOTHING """, ( imei, alarm_type, clean(item.get("alarmName")), alarm_time, *_make_geom_params(lat, lng), lat, lng, clean_num(item.get("speed")), )) # [FIX-M21] Cross-feed: every Jimi alarm carries lat/lng. # Refresh live_positions so dashboard markers don't have to # wait up to 60s for the next polled sweep. Time-guarded # inside the helper — alarms older than the current fix lose. upsert_live_position( cur, imei, lat, lng, alarm_time, speed=clean_num(item.get("speed")), ) cur.execute("RELEASE SAVEPOINT sp") inserted += 1 except Exception: cur.execute("ROLLBACK TO SAVEPOINT sp") log.warning("Failed to process alarm for %s", item.get("imei") or item.get("deviceImei"), exc_info=True) log_ingestion(cur, "webhook/pushalarm", len(items), 0, inserted, int((time.time() - t0) * 1000), True) return inserted @app.post("/pushalarm") async def push_alarm(request: Request): token, items = await _parse_request(request) _validate_token(token) if not items: return JSONResponse(content=SUCCESS) inserted = await asyncio.to_thread(_process_alarms, items) log.info("pushalarm: %d/%d items processed.", inserted, len(items)) return JSONResponse(content=SUCCESS) # ── 4. GPS Positions (Priority 2) ──────────────────────────────────────────── def _process_gps(items: list[dict]) -> int: """Blocking validate+write for /pushgps — runs in a worker thread [FIX-W03].""" t0 = time.time() # Validation phase — pre-clean and filter without touching the DB. # Per-row INSERT with SAVEPOINT was ~1 ms/row overhead at this volume; # one batched execute_values is 10-50× faster for the same rows. rows = [] for item in items: imei = clean(item.get("deviceImei")) gps_time = clean_ts(item.get("gpsTime")) lat = clean_num(item.get("lat")) lng = clean_num(item.get("lng")) if not imei or not gps_time or not is_valid_fix(lat, lng): continue rows.append(( imei, gps_time, lng, lat, lat, lng, clean_num(item.get("gpsSpeed")), clean_num(item.get("direction")), str(item.get("acc")) if item.get("acc") is not None else None, clean_int(item.get("satelliteNum")), clean_num(item.get("distance")), clean_num(item.get("altitude")), clean_int(item.get("postType")), )) inserted = 0 if rows: with get_conn() as conn: with conn.cursor() as cur: execute_values( cur, """ INSERT INTO tracksolid.position_history ( imei, gps_time, geom, lat, lng, speed, direction, acc_status, satellite, current_mileage, altitude, post_type, source ) VALUES %s ON CONFLICT (imei, gps_time) DO NOTHING """, rows, template="(%s, %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326)," " %s, %s, %s, %s, %s, %s, %s, %s, %s, 'push')", page_size=len(rows), ) inserted = cur.rowcount log_ingestion(cur, "webhook/pushgps", len(items), 0, inserted, int((time.time() - t0) * 1000), True) else: # No valid rows, still record the call for observability. with get_conn() as conn: with conn.cursor() as cur: log_ingestion(cur, "webhook/pushgps", len(items), 0, 0, int((time.time() - t0) * 1000), True) return inserted @app.post("/pushgps") async def push_gps(request: Request): token, items = await _parse_request(request) _validate_token(token) if not items: return JSONResponse(content=SUCCESS) inserted = await asyncio.to_thread(_process_gps, items) log.info("pushgps: %d/%d items inserted.", inserted, len(items)) return JSONResponse(content=SUCCESS) # ── 5. Device Heartbeats (Priority 2) ──────────────────────────────────────── def _process_heartbeats(items: list[dict]) -> int: """Blocking DB work for /pushhb — runs in a worker thread [FIX-W03].""" t0 = time.time() inserted = 0 with get_conn() as conn: with conn.cursor() as cur: for item in items: try: cur.execute("SAVEPOINT sp") imei = clean(item.get("deviceImei")) gate_time = clean_ts(item.get("gateTime")) if not imei or not gate_time: cur.execute("RELEASE SAVEPOINT sp") continue cur.execute(""" INSERT INTO tracksolid.heartbeats ( imei, gate_time, power_level, gsm_signal, acc_status, power_status, fortify ) VALUES (%s, %s, %s, %s, %s, %s, %s) ON CONFLICT (imei, gate_time) DO NOTHING """, ( imei, gate_time, clean_int(item.get("powerLevel")), clean_int(item.get("gsmSign")), clean_int(item.get("acc")), clean_int(item.get("powerStatus")), clean_int(item.get("fortify")), )) cur.execute("RELEASE SAVEPOINT sp") inserted += 1 except Exception: cur.execute("ROLLBACK TO SAVEPOINT sp") log.warning("Failed to process heartbeat for %s", item.get("deviceImei"), exc_info=True) log_ingestion(cur, "webhook/pushhb", len(items), 0, inserted, int((time.time() - t0) * 1000), True) return inserted @app.post("/pushhb") async def push_heartbeat(request: Request): token, items = await _parse_request(request) _validate_token(token) if not items: return JSONResponse(content=SUCCESS) inserted = await asyncio.to_thread(_process_heartbeats, items) log.info("pushhb: %d/%d items processed.", inserted, len(items)) return JSONResponse(content=SUCCESS) # ── 6. Trip Reports (Priority 2) ───────────────────────────────────────────── def _process_trip_reports(items: list[dict]) -> int: """Blocking DB work for /pushtripreport — runs in a worker thread [FIX-W03].""" t0 = time.time() inserted = 0 with get_conn() as conn: with conn.cursor() as cur: for item in items: try: cur.execute("SAVEPOINT sp") imei = clean(item.get("deviceImei")) # [BUG-03] Use _parse_trip_ts to handle Jimi BCD format YYMMDDHHmmss. begin_time = _parse_trip_ts(item.get("beginTime")) end_time = _parse_trip_ts(item.get("endTime")) if not imei or not begin_time: cur.execute("RELEASE SAVEPOINT sp") continue # [FIX-M11] API sends km. Store directly as distance_km. distance_km = clean_num(item.get("miles")) begin_lat = clean_num(item.get("beginLat")) begin_lng = clean_num(item.get("beginLng")) end_lat = clean_num(item.get("endLat")) end_lng = clean_num(item.get("endLng")) cur.execute(""" INSERT INTO tracksolid.trips ( imei, start_time, end_time, distance_km, start_geom, end_geom, fuel_consumed_l, idle_time_s, trip_seq, source, updated_at ) VALUES ( %s, %s, %s, %s, CASE WHEN %s IS NOT NULL AND %s IS NOT NULL THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326) ELSE NULL END, CASE WHEN %s IS NOT NULL AND %s IS NOT NULL THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326) ELSE NULL END, %s, %s, %s, 'push', NOW() ) ON CONFLICT (imei, start_time) DO UPDATE SET end_time = EXCLUDED.end_time, distance_km = EXCLUDED.distance_km, end_geom = EXCLUDED.end_geom, fuel_consumed_l = EXCLUDED.fuel_consumed_l, idle_time_s = EXCLUDED.idle_time_s, updated_at = NOW() """, ( imei, begin_time, end_time, distance_km, begin_lng, begin_lat, begin_lng, begin_lat, end_lng, end_lat, end_lng, end_lat, clean_num(item.get("oils")), clean_int(item.get("idleTimes")), clean_int(item.get("tripSeq")), )) cur.execute("RELEASE SAVEPOINT sp") inserted += 1 except Exception: cur.execute("ROLLBACK TO SAVEPOINT sp") log.warning("Failed to process trip for %s", item.get("deviceImei"), exc_info=True) log_ingestion(cur, "webhook/pushtripreport", len(items), 0, inserted, int((time.time() - t0) * 1000), True) return inserted @app.post("/pushtripreport") async def push_trip_report(request: Request): token, items = await _parse_request(request) _validate_token(token) if not items: return JSONResponse(content=SUCCESS) inserted = await asyncio.to_thread(_process_trip_reports, items) log.info("pushtripreport: %d/%d items processed.", inserted, len(items)) return JSONResponse(content=SUCCESS) # ── 7. Device Events (LOGIN / LOGOUT) ──────────────────────────────────────── def _process_events(items: list[dict]) -> int: """Blocking DB work for /pushevent — runs in a worker thread [FIX-W03].""" t0 = time.time() inserted = 0 with get_conn() as conn: with conn.cursor() as cur: for item in items: try: cur.execute("SAVEPOINT sp") imei = clean(item.get("deviceImei")) event_type = clean(item.get("type")) event_time = clean_ts(item.get("gateTime")) if not imei or not event_type or not event_time: cur.execute("RELEASE SAVEPOINT sp") continue cur.execute(""" INSERT INTO tracksolid.device_events (imei, event_type, event_time, timezone) VALUES (%s, %s, %s, %s) ON CONFLICT (imei, event_type, event_time) DO NOTHING """, (imei, event_type, event_time, clean(item.get("timezone")))) cur.execute("RELEASE SAVEPOINT sp") inserted += 1 except Exception: cur.execute("ROLLBACK TO SAVEPOINT sp") log.warning("Failed to process event for %s", item.get("deviceImei"), exc_info=True) log_ingestion(cur, "webhook/pushevent", len(items), 0, inserted, int((time.time() - t0) * 1000), True) return inserted @app.post("/pushevent") async def push_event(request: Request): token, items = await _parse_request(request) _validate_token(token) if not items: return JSONResponse(content=SUCCESS) inserted = await asyncio.to_thread(_process_events, items) log.info("pushevent: %d/%d items processed.", inserted, len(items)) return JSONResponse(content=SUCCESS)