tracksolid_timescale_grafan.../webhook_receiver_rev.py
David Kiania 87ecab4a72 Wire /pushevent to device_events table (was log-only)
LOGIN/LOGOUT events from Jimi now persist to tracksolid.device_events.
Table already existed with correct schema (imei, event_type, event_time,
timezone, unique constraint). Follows same SAVEPOINT + log_ingestion
pattern as all other DB-writing endpoints.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-11 18:42:22 +03:00

556 lines
24 KiB
Python

"""
webhook_receiver_rev.py — Fireside Communications · Tracksolid Webhook Receiver
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
RESPONSIBILITY: Receives real-time push data from Jimi Tracksolid Pro servers.
Jimi's Data Push API POSTs telemetry to these endpoints as it arrives from
devices, providing real-time ingestion without polling. This is the ONLY way
to receive OBD diagnostics and DTC fault codes — those data types have no
polling endpoint.
ENDPOINTS:
/pushobd — OBD CAN bus diagnostics (Priority 1)
/pushfaultinfo — DTC fault codes (Priority 1)
/pushalarm — Alarm events (Priority 2)
/pushgps — GPS positions (Priority 2)
/pushhb — Device heartbeats (Priority 2)
/pushtripreport — Trip reports (Priority 2)
/pushevent — Device LOGIN/LOGOUT events (Priority 3)
/health — Healthcheck for Docker/monitoring
REVISIONS (QA-Verified):
[BUG-01] OBD event_time: try unix_to_ts before clean_ts (handles epoch timestamps).
[BUG-02] push_alarm: guard also checks alarm_type is not null (prevents FK violation).
[BUG-03] push_trip_report: _parse_trip_ts handles Jimi BCD format YYMMDDHHmmss.
[BUG-04] SAVEPOINT per item in all DB-writing endpoints (one bad item won't abort batch).
[BUG-05] Added /pushevent endpoint → writes to tracksolid.device_events.
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
"""
from __future__ import annotations
import json
import os
import time
from contextlib import asynccontextmanager
from datetime import datetime, timezone
from typing import Optional
from fastapi import FastAPI, Form, HTTPException
from fastapi.responses import JSONResponse
from ts_shared_rev import (
close_pool,
get_conn,
log_ingestion,
clean,
clean_num,
clean_int,
clean_ts,
is_valid_fix,
get_logger,
)
log = get_logger("webhook")
# ── Configuration ─────────────────────────────────────────────────────────────
WEBHOOK_TOKEN = os.getenv("JIMI_WEBHOOK_TOKEN", "")
# ── Lifespan ──────────────────────────────────────────────────────────────────
@asynccontextmanager
async def lifespan(app: FastAPI):
log.info("Webhook receiver starting (v1.1)...")
yield
log.info("Webhook receiver shutting down...")
close_pool()
app = FastAPI(title="Tracksolid Webhook Receiver", lifespan=lifespan)
# ── Helpers ───────────────────────────────────────────────────────────────────
SUCCESS = {"code": 0, "msg": "success"}
def _validate_token(token: str) -> None:
"""Raise 403 if token is invalid. Skips validation if JIMI_WEBHOOK_TOKEN is empty."""
if WEBHOOK_TOKEN and token != WEBHOOK_TOKEN:
raise HTTPException(status_code=403, detail="Invalid token")
def _parse_data_list(raw: str) -> list[dict]:
"""Parse the JSON string from Jimi's data_list form field."""
try:
parsed = json.loads(raw)
if isinstance(parsed, list):
return parsed
return [parsed]
except (json.JSONDecodeError, TypeError):
log.warning("Failed to parse data_list: %.200s", raw)
return []
def unix_to_ts(v) -> Optional[str]:
"""Convert Unix timestamp (seconds or milliseconds) to ISO string."""
if v is None:
return None
try:
ts = int(v)
if ts > 1e12:
ts = ts // 1000
return datetime.fromtimestamp(ts, tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
except (ValueError, TypeError, OSError):
return None
def _parse_trip_ts(v) -> Optional[str]:
"""[BUG-03] Parse trip timestamps. Handles ISO strings and Jimi BCD formats."""
iso = clean_ts(v)
if iso:
return iso
s = clean(v)
if s is None:
return None
try:
if len(s) == 12: # YYMMDDHHmmss
return datetime.strptime(s, "%y%m%d%H%M%S").strftime("%Y-%m-%d %H:%M:%S")
if len(s) == 14: # YYYYMMDDHHmmss
return datetime.strptime(s, "%Y%m%d%H%M%S").strftime("%Y-%m-%d %H:%M:%S")
except (ValueError, TypeError):
pass
log.warning("Cannot parse trip timestamp: %r", v)
return None
def _make_geom_params(lat, lng):
"""Return (lng, lat, lng, lat) tuple for the CASE WHEN ST_MakePoint pattern."""
return (lng, lat, lng, lat)
# ── Health Check ──────────────────────────────────────────────────────────────
@app.get("/health")
def health():
return {"status": "ok"}
# ── 1. OBD Diagnostics (Priority 1) ──────────────────────────────────────────
@app.post("/pushobd")
def push_obd(token: str = Form(""), data_list: str = Form("")):
_validate_token(token)
items = _parse_data_list(data_list)
if not items:
return JSONResponse(content=SUCCESS)
t0 = time.time()
inserted = 0
with get_conn() as conn:
with conn.cursor() as cur:
for item in items:
try:
cur.execute("SAVEPOINT sp")
imei = clean(item.get("deviceImei"))
obd = item.get("obdJson", {})
if isinstance(obd, str):
try:
obd = json.loads(obd)
except json.JSONDecodeError:
obd = {}
# [BUG-01] Try unix epoch first, fall back to ISO string.
event_time = (
unix_to_ts(obd.get("event_time"))
or clean_ts(obd.get("event_time"))
)
if not imei or not event_time:
cur.execute("RELEASE SAVEPOINT sp")
continue
lat = clean_num(obd.get("lat"))
lng = clean_num(obd.get("lng"))
cur.execute("""
INSERT INTO tracksolid.obd_readings (
imei, reading_time, car_type, acc_state, status_flags,
lat, lng, geom, obd_data, updated_at
) VALUES (
%s, %s, %s, %s, %s, %s, %s,
CASE WHEN %s IS NOT NULL AND %s IS NOT NULL
THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326)
ELSE NULL END,
%s, NOW()
) ON CONFLICT (imei, reading_time) DO UPDATE SET
obd_data = EXCLUDED.obd_data,
updated_at = NOW()
""", (
imei, event_time,
clean_int(obd.get("car_type")),
clean_int(obd.get("AccState")),
clean_int(obd.get("statusFlags")),
lat, lng,
*_make_geom_params(lat, lng),
json.dumps(obd),
))
cur.execute("RELEASE SAVEPOINT sp")
inserted += 1
except Exception:
cur.execute("ROLLBACK TO SAVEPOINT sp")
log.warning("Failed to process OBD item for %s", item.get("deviceImei"), exc_info=True)
log_ingestion(cur, "webhook/pushobd", len(items), 0, inserted,
int((time.time() - t0) * 1000), True)
log.info("pushobd: %d/%d items processed.", inserted, len(items))
return JSONResponse(content=SUCCESS)
# ── 2. DTC Fault Codes (Priority 1) ──────────────────────────────────────────
@app.post("/pushfaultinfo")
def push_fault_info(token: str = Form(""), data_list: str = Form("")):
_validate_token(token)
items = _parse_data_list(data_list)
if not items:
return JSONResponse(content=SUCCESS)
t0 = time.time()
inserted = 0
with get_conn() as conn:
with conn.cursor() as cur:
for item in items:
try:
cur.execute("SAVEPOINT sp")
imei = clean(item.get("deviceImei"))
gate_time = clean_ts(item.get("gateTime"))
if not imei or not gate_time:
cur.execute("RELEASE SAVEPOINT sp")
continue
fault_codes = item.get("faultCodeList", [])
if isinstance(fault_codes, str):
try:
fault_codes = json.loads(fault_codes)
except json.JSONDecodeError:
fault_codes = []
lat = clean_num(item.get("lat"))
lng = clean_num(item.get("lng"))
evt_time = unix_to_ts(item.get("eventTime")) or clean_ts(item.get("eventTime"))
for code in fault_codes:
fault_code = clean(code)
# Guard NULL: ON CONFLICT won't deduplicate NULL fault_codes
# because NULL != NULL in Postgres unique constraints.
if not fault_code:
continue
cur.execute("""
INSERT INTO tracksolid.fault_codes (
imei, reported_at, fault_code, status_flags,
lat, lng, geom, event_time
) VALUES (
%s, %s, %s, %s, %s, %s,
CASE WHEN %s IS NOT NULL AND %s IS NOT NULL
THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326)
ELSE NULL END,
%s
) ON CONFLICT (imei, reported_at, fault_code) DO NOTHING
""", (
imei, gate_time, fault_code,
clean_int(item.get("statusFlags")),
lat, lng,
*_make_geom_params(lat, lng),
evt_time,
))
inserted += 1
cur.execute("RELEASE SAVEPOINT sp")
except Exception:
cur.execute("ROLLBACK TO SAVEPOINT sp")
log.warning("Failed to process fault item for %s", item.get("deviceImei"), exc_info=True)
log_ingestion(cur, "webhook/pushfaultinfo", len(items), 0, inserted,
int((time.time() - t0) * 1000), True)
log.info("pushfaultinfo: %d fault codes from %d items.", inserted, len(items))
return JSONResponse(content=SUCCESS)
# ── 3. Alarm Events (Priority 2) ─────────────────────────────────────────────
@app.post("/pushalarm")
def push_alarm(token: str = Form(""), data_list: str = Form("")):
_validate_token(token)
items = _parse_data_list(data_list)
if not items:
return JSONResponse(content=SUCCESS)
t0 = time.time()
inserted = 0
with get_conn() as conn:
with conn.cursor() as cur:
for item in items:
try:
cur.execute("SAVEPOINT sp")
imei = clean(item.get("deviceImei"))
alarm_type = clean(item.get("alarmType"))
alarm_time = clean_ts(item.get("gateTime"))
# [BUG-02] Also guard alarm_type — NULL alarm_type violates NOT NULL constraint.
if not imei or not alarm_time or not alarm_type:
cur.execute("RELEASE SAVEPOINT sp")
continue
lat = clean_num(item.get("lat"))
lng = clean_num(item.get("lng"))
cur.execute("""
INSERT INTO tracksolid.alarms (
imei, alarm_type, alarm_name, alarm_time, geom,
lat, lng, speed, source, updated_at
) VALUES (
%s, %s, %s, %s,
CASE WHEN %s IS NOT NULL AND %s IS NOT NULL
THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326)
ELSE NULL END,
%s, %s, %s, 'push', NOW()
) ON CONFLICT (imei, alarm_type, alarm_time) DO NOTHING
""", (
imei, alarm_type, clean(item.get("alarmName")), alarm_time,
*_make_geom_params(lat, lng),
lat, lng,
clean_num(item.get("speed")),
))
cur.execute("RELEASE SAVEPOINT sp")
inserted += 1
except Exception:
cur.execute("ROLLBACK TO SAVEPOINT sp")
log.warning("Failed to process alarm for %s", item.get("deviceImei"), exc_info=True)
log_ingestion(cur, "webhook/pushalarm", len(items), 0, inserted,
int((time.time() - t0) * 1000), True)
log.info("pushalarm: %d/%d items processed.", inserted, len(items))
return JSONResponse(content=SUCCESS)
# ── 4. GPS Positions (Priority 2) ────────────────────────────────────────────
@app.post("/pushgps")
def push_gps(token: str = Form(""), data_list: str = Form("")):
_validate_token(token)
items = _parse_data_list(data_list)
if not items:
return JSONResponse(content=SUCCESS)
t0 = time.time()
inserted = 0
with get_conn() as conn:
with conn.cursor() as cur:
for item in items:
try:
cur.execute("SAVEPOINT sp")
imei = clean(item.get("deviceImei"))
gps_time = clean_ts(item.get("gpsTime"))
lat = clean_num(item.get("lat"))
lng = clean_num(item.get("lng"))
if not imei or not gps_time or not is_valid_fix(lat, lng):
cur.execute("RELEASE SAVEPOINT sp")
continue
cur.execute("""
INSERT INTO tracksolid.position_history (
imei, gps_time, geom, lat, lng, speed, direction,
acc_status, satellite, current_mileage,
altitude, post_type, source
) VALUES (
%s, %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326),
%s, %s, %s, %s, %s, %s, %s, %s, %s, 'push'
) ON CONFLICT (imei, gps_time) DO NOTHING
""", (
imei, gps_time, lng, lat,
lat, lng,
clean_num(item.get("gpsSpeed")),
clean_num(item.get("direction")),
str(item.get("acc")) if item.get("acc") is not None else None,
clean_int(item.get("satelliteNum")),
clean_num(item.get("distance")),
clean_num(item.get("altitude")),
clean_int(item.get("postType")),
))
cur.execute("RELEASE SAVEPOINT sp")
inserted += 1
except Exception:
cur.execute("ROLLBACK TO SAVEPOINT sp")
log.warning("Failed to process GPS for %s", item.get("deviceImei"), exc_info=True)
log_ingestion(cur, "webhook/pushgps", len(items), 0, inserted,
int((time.time() - t0) * 1000), True)
log.info("pushgps: %d/%d items processed.", inserted, len(items))
return JSONResponse(content=SUCCESS)
# ── 5. Device Heartbeats (Priority 2) ────────────────────────────────────────
@app.post("/pushhb")
def push_heartbeat(token: str = Form(""), data_list: str = Form("")):
_validate_token(token)
items = _parse_data_list(data_list)
if not items:
return JSONResponse(content=SUCCESS)
t0 = time.time()
inserted = 0
with get_conn() as conn:
with conn.cursor() as cur:
for item in items:
try:
cur.execute("SAVEPOINT sp")
imei = clean(item.get("deviceImei"))
gate_time = clean_ts(item.get("gateTime"))
if not imei or not gate_time:
cur.execute("RELEASE SAVEPOINT sp")
continue
cur.execute("""
INSERT INTO tracksolid.heartbeats (
imei, gate_time, power_level, gsm_signal,
acc_status, power_status, fortify
) VALUES (%s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (imei, gate_time) DO NOTHING
""", (
imei, gate_time,
clean_int(item.get("powerLevel")),
clean_int(item.get("gsmSign")),
clean_int(item.get("acc")),
clean_int(item.get("powerStatus")),
clean_int(item.get("fortify")),
))
cur.execute("RELEASE SAVEPOINT sp")
inserted += 1
except Exception:
cur.execute("ROLLBACK TO SAVEPOINT sp")
log.warning("Failed to process heartbeat for %s", item.get("deviceImei"), exc_info=True)
log_ingestion(cur, "webhook/pushhb", len(items), 0, inserted,
int((time.time() - t0) * 1000), True)
log.info("pushhb: %d/%d items processed.", inserted, len(items))
return JSONResponse(content=SUCCESS)
# ── 6. Trip Reports (Priority 2) ─────────────────────────────────────────────
@app.post("/pushtripreport")
def push_trip_report(token: str = Form(""), data_list: str = Form("")):
_validate_token(token)
items = _parse_data_list(data_list)
if not items:
return JSONResponse(content=SUCCESS)
t0 = time.time()
inserted = 0
with get_conn() as conn:
with conn.cursor() as cur:
for item in items:
try:
cur.execute("SAVEPOINT sp")
imei = clean(item.get("deviceImei"))
# [BUG-03] Use _parse_trip_ts to handle Jimi BCD format YYMMDDHHmmss.
begin_time = _parse_trip_ts(item.get("beginTime"))
end_time = _parse_trip_ts(item.get("endTime"))
if not imei or not begin_time:
cur.execute("RELEASE SAVEPOINT sp")
continue
# [FIX-M11] API sends km. Store directly as distance_km.
distance_km = clean_num(item.get("miles"))
begin_lat = clean_num(item.get("beginLat"))
begin_lng = clean_num(item.get("beginLng"))
end_lat = clean_num(item.get("endLat"))
end_lng = clean_num(item.get("endLng"))
cur.execute("""
INSERT INTO tracksolid.trips (
imei, start_time, end_time, distance_km,
start_geom, end_geom,
fuel_consumed_l, idle_time_s, trip_seq, source,
updated_at
) VALUES (
%s, %s, %s, %s,
CASE WHEN %s IS NOT NULL AND %s IS NOT NULL
THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326)
ELSE NULL END,
CASE WHEN %s IS NOT NULL AND %s IS NOT NULL
THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326)
ELSE NULL END,
%s, %s, %s, 'push', NOW()
) ON CONFLICT (imei, start_time) DO UPDATE SET
end_time = EXCLUDED.end_time,
distance_km = EXCLUDED.distance_km,
end_geom = EXCLUDED.end_geom,
fuel_consumed_l = EXCLUDED.fuel_consumed_l,
idle_time_s = EXCLUDED.idle_time_s,
updated_at = NOW()
""", (
imei, begin_time, end_time, distance_km,
begin_lng, begin_lat, begin_lng, begin_lat,
end_lng, end_lat, end_lng, end_lat,
clean_num(item.get("oils")),
clean_int(item.get("idleTimes")),
clean_int(item.get("tripSeq")),
))
cur.execute("RELEASE SAVEPOINT sp")
inserted += 1
except Exception:
cur.execute("ROLLBACK TO SAVEPOINT sp")
log.warning("Failed to process trip for %s", item.get("deviceImei"), exc_info=True)
log_ingestion(cur, "webhook/pushtripreport", len(items), 0, inserted,
int((time.time() - t0) * 1000), True)
log.info("pushtripreport: %d/%d items processed.", inserted, len(items))
return JSONResponse(content=SUCCESS)
# ── 7. Device Events (LOGIN / LOGOUT) ────────────────────────────────────────
@app.post("/pushevent")
def push_event(token: str = Form(""), data_list: str = Form("")):
_validate_token(token)
items = _parse_data_list(data_list)
if not items:
return JSONResponse(content=SUCCESS)
t0 = time.time()
inserted = 0
with get_conn() as conn:
with conn.cursor() as cur:
for item in items:
try:
cur.execute("SAVEPOINT sp")
imei = clean(item.get("deviceImei"))
event_type = clean(item.get("type"))
event_time = clean_ts(item.get("gateTime"))
if not imei or not event_type or not event_time:
cur.execute("RELEASE SAVEPOINT sp")
continue
cur.execute("""
INSERT INTO tracksolid.device_events
(imei, event_type, event_time, timezone)
VALUES (%s, %s, %s, %s)
ON CONFLICT (imei, event_type, event_time) DO NOTHING
""", (imei, event_type, event_time, clean(item.get("timezone"))))
cur.execute("RELEASE SAVEPOINT sp")
inserted += 1
except Exception:
cur.execute("ROLLBACK TO SAVEPOINT sp")
log.warning("Failed to process event for %s", item.get("deviceImei"), exc_info=True)
log_ingestion(cur, "webhook/pushevent", len(items), 0, inserted,
int((time.time() - t0) * 1000), True)
log.info("pushevent: %d/%d items processed.", inserted, len(items))
return JSONResponse(content=SUCCESS)