tracksolid_timescale_grafan.../webhook_receiver_rev.py
David Kiania c05b47abe2 Fix alarm field mapping, distance unit bug, parking params; add schema migrations
BUG-01 [FIX-E06]: jimi.device.alarm.list poll response uses alertTypeId/
alarmTypeName/alertTime, not the webhook field names. All 1,054 stored alarm
records had null alarm_type/alarm_name as a result. Corrected field mapping
in ingest_events_rev.py; also added alarm_name and source columns to INSERT.

BUG-02 [FIX-M11/M12]: trips.distance_m was storing millimetres due to an
erroneous * 1000 on an already-km API value. Removed the multiplication in
poll_trips() and push_trip_report(). Column renamed to distance_km in
migration 04 (historical rows divided by 1,000,000 to correct to km).
All SQL in both ingestion files updated to reference distance_km.

POLL-02 [FIX-M13]: parking poll returned 0 rows because the required
account and acc_type=0 parameters were missing. Also fixed response field
mapping: durSecond was incorrectly read as 'seconds'.

Migration 04: corrects and renames distance_m → distance_km.
Migration 05: adds normalized OBD columns, alarm/device enrichment columns,
new tables (device_events, fuel_readings, temperature_readings, lbs_readings,
geofences), expands dwh_gold fact table, and adds refresh_daily_metrics() ETL.

tracksolid_DB_manual.md updated to reflect column rename and mark fixed issues.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-10 22:18:30 +03:00

459 lines
19 KiB
Python

"""
webhook_receiver_rev.py — Fireside Communications · Tracksolid Webhook Receiver
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
RESPONSIBILITY: Receives real-time push data from Jimi Tracksolid Pro servers.
Jimi's Data Push API POSTs telemetry to these endpoints as it arrives from
devices, providing real-time ingestion without polling. This is the ONLY way
to receive OBD diagnostics and DTC fault codes — those data types have no
polling endpoint.
ENDPOINTS:
/pushobd — OBD CAN bus diagnostics (Priority 1)
/pushfaultinfo — DTC fault codes (Priority 1)
/pushalarm — Alarm events (Priority 2)
/pushgps — GPS positions (Priority 2)
/pushhb — Device heartbeats (Priority 2)
/pushtripreport — Trip reports (Priority 2)
/health — Healthcheck for Docker/monitoring
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
"""
from __future__ import annotations
import json
import os
import time
from contextlib import asynccontextmanager
from datetime import datetime, timezone
from typing import Optional
from fastapi import FastAPI, Form, HTTPException
from fastapi.responses import JSONResponse
from ts_shared_rev import (
close_pool,
get_conn,
log_ingestion,
clean,
clean_num,
clean_int,
clean_ts,
is_valid_fix,
get_logger,
)
log = get_logger("webhook")
# ── Configuration ─────────────────────────────────────────────────────────────
WEBHOOK_TOKEN = os.getenv("JIMI_WEBHOOK_TOKEN", "")
# ── Lifespan ──────────────────────────────────────────────────────────────────
@asynccontextmanager
async def lifespan(app: FastAPI):
log.info("Webhook receiver starting (v1.0)...")
yield
log.info("Webhook receiver shutting down...")
close_pool()
app = FastAPI(title="Tracksolid Webhook Receiver", lifespan=lifespan)
# ── Helpers ───────────────────────────────────────────────────────────────────
SUCCESS = {"code": 0, "msg": "success"}
def _validate_token(token: str) -> None:
"""Raise 403 if token is invalid. Skips validation if JIMI_WEBHOOK_TOKEN is empty."""
if WEBHOOK_TOKEN and token != WEBHOOK_TOKEN:
raise HTTPException(status_code=403, detail="Invalid token")
def _parse_data_list(raw: str) -> list[dict]:
"""Parse the JSON string from Jimi's data_list form field."""
try:
parsed = json.loads(raw)
if isinstance(parsed, list):
return parsed
return [parsed]
except (json.JSONDecodeError, TypeError):
log.warning("Failed to parse data_list: %.200s", raw)
return []
def unix_to_ts(v) -> Optional[str]:
"""Convert Unix timestamp (seconds or milliseconds) to ISO string."""
if v is None:
return None
try:
ts = int(v)
if ts > 1e12:
ts = ts // 1000
return datetime.fromtimestamp(ts, tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
except (ValueError, TypeError, OSError):
return None
def _make_geom_params(lat, lng):
"""Return (lng, lat, lng, lat) tuple for the CASE WHEN ST_MakePoint pattern."""
return (lng, lat, lng, lat)
# ── Health Check ──────────────────────────────────────────────────────────────
@app.get("/health")
def health():
return {"status": "ok"}
# ── 1. OBD Diagnostics (Priority 1) ──────────────────────────────────────────
@app.post("/pushobd")
def push_obd(token: str = Form(""), data_list: str = Form("")):
_validate_token(token)
items = _parse_data_list(data_list)
if not items:
return JSONResponse(content=SUCCESS)
t0 = time.time()
inserted = 0
with get_conn() as conn:
with conn.cursor() as cur:
for item in items:
try:
imei = clean(item.get("deviceImei"))
obd = item.get("obdJson", {})
if isinstance(obd, str):
try:
obd = json.loads(obd)
except json.JSONDecodeError:
obd = {}
event_time = clean_ts(obd.get("event_time"))
if not imei or not event_time:
continue
lat = clean_num(obd.get("lat"))
lng = clean_num(obd.get("lng"))
cur.execute("""
INSERT INTO tracksolid.obd_readings (
imei, reading_time, car_type, acc_state, status_flags,
lat, lng, geom, obd_data, updated_at
) VALUES (
%s, %s, %s, %s, %s, %s, %s,
CASE WHEN %s IS NOT NULL AND %s IS NOT NULL
THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326)
ELSE NULL END,
%s, NOW()
) ON CONFLICT (imei, reading_time) DO UPDATE SET
obd_data = EXCLUDED.obd_data,
updated_at = NOW()
""", (
imei, event_time,
clean_int(obd.get("car_type")),
clean_int(obd.get("AccState")),
clean_int(obd.get("statusFlags")),
lat, lng,
*_make_geom_params(lat, lng),
json.dumps(obd),
))
inserted += 1
except Exception:
log.warning("Failed to process OBD item for %s", item.get("deviceImei"), exc_info=True)
log_ingestion(cur, "webhook/pushobd", len(items), 0, inserted,
int((time.time() - t0) * 1000), True)
conn.commit()
log.info("pushobd: %d/%d items processed.", inserted, len(items))
return JSONResponse(content=SUCCESS)
# ── 2. DTC Fault Codes (Priority 1) ──────────────────────────────────────────
@app.post("/pushfaultinfo")
def push_fault_info(token: str = Form(""), data_list: str = Form("")):
_validate_token(token)
items = _parse_data_list(data_list)
if not items:
return JSONResponse(content=SUCCESS)
t0 = time.time()
inserted = 0
with get_conn() as conn:
with conn.cursor() as cur:
for item in items:
try:
imei = clean(item.get("deviceImei"))
gate_time = clean_ts(item.get("gateTime"))
if not imei or not gate_time:
continue
fault_codes = item.get("faultCodeList", [])
if isinstance(fault_codes, str):
try:
fault_codes = json.loads(fault_codes)
except json.JSONDecodeError:
fault_codes = []
lat = clean_num(item.get("lat"))
lng = clean_num(item.get("lng"))
evt_time = unix_to_ts(item.get("eventTime")) or clean_ts(item.get("eventTime"))
for code in fault_codes:
cur.execute("""
INSERT INTO tracksolid.fault_codes (
imei, reported_at, fault_code, status_flags,
lat, lng, geom, event_time
) VALUES (
%s, %s, %s, %s, %s, %s,
CASE WHEN %s IS NOT NULL AND %s IS NOT NULL
THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326)
ELSE NULL END,
%s
) ON CONFLICT (imei, reported_at, fault_code) DO NOTHING
""", (
imei, gate_time, clean(code),
clean_int(item.get("statusFlags")),
lat, lng,
*_make_geom_params(lat, lng),
evt_time,
))
inserted += 1
except Exception:
log.warning("Failed to process fault item for %s", item.get("deviceImei"), exc_info=True)
log_ingestion(cur, "webhook/pushfaultinfo", len(items), 0, inserted,
int((time.time() - t0) * 1000), True)
conn.commit()
log.info("pushfaultinfo: %d fault codes from %d items.", inserted, len(items))
return JSONResponse(content=SUCCESS)
# ── 3. Alarm Events (Priority 2) ─────────────────────────────────────────────
@app.post("/pushalarm")
def push_alarm(token: str = Form(""), data_list: str = Form("")):
_validate_token(token)
items = _parse_data_list(data_list)
if not items:
return JSONResponse(content=SUCCESS)
t0 = time.time()
inserted = 0
with get_conn() as conn:
with conn.cursor() as cur:
for item in items:
try:
imei = clean(item.get("deviceImei"))
alarm_type = clean(item.get("alarmType"))
alarm_time = clean_ts(item.get("gateTime"))
if not imei or not alarm_time:
continue
lat = clean_num(item.get("lat"))
lng = clean_num(item.get("lng"))
cur.execute("""
INSERT INTO tracksolid.alarms (
imei, alarm_type, alarm_name, alarm_time, geom,
lat, lng, speed, source, updated_at
) VALUES (
%s, %s, %s, %s,
CASE WHEN %s IS NOT NULL AND %s IS NOT NULL
THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326)
ELSE NULL END,
%s, %s, %s, 'push', NOW()
) ON CONFLICT (imei, alarm_type, alarm_time) DO NOTHING
""", (
imei, alarm_type, clean(item.get("alarmName")), alarm_time,
*_make_geom_params(lat, lng),
lat, lng,
clean_num(item.get("speed")),
))
inserted += 1
except Exception:
log.warning("Failed to process alarm for %s", item.get("deviceImei"), exc_info=True)
log_ingestion(cur, "webhook/pushalarm", len(items), 0, inserted,
int((time.time() - t0) * 1000), True)
conn.commit()
log.info("pushalarm: %d/%d items processed.", inserted, len(items))
return JSONResponse(content=SUCCESS)
# ── 4. GPS Positions (Priority 2) ────────────────────────────────────────────
@app.post("/pushgps")
def push_gps(token: str = Form(""), data_list: str = Form("")):
_validate_token(token)
items = _parse_data_list(data_list)
if not items:
return JSONResponse(content=SUCCESS)
t0 = time.time()
inserted = 0
with get_conn() as conn:
with conn.cursor() as cur:
for item in items:
try:
imei = clean(item.get("deviceImei"))
gps_time = clean_ts(item.get("gpsTime"))
lat = clean_num(item.get("lat"))
lng = clean_num(item.get("lng"))
if not imei or not gps_time or not is_valid_fix(lat, lng):
continue
cur.execute("""
INSERT INTO tracksolid.position_history (
imei, gps_time, geom, lat, lng, speed, direction,
acc_status, satellite, current_mileage,
altitude, post_type, source
) VALUES (
%s, %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326),
%s, %s, %s, %s, %s, %s, %s, %s, %s, 'push'
) ON CONFLICT (imei, gps_time) DO NOTHING
""", (
imei, gps_time, lng, lat,
lat, lng,
clean_num(item.get("gpsSpeed")),
clean_num(item.get("direction")),
str(item.get("acc")) if item.get("acc") is not None else None,
clean_int(item.get("satelliteNum")),
clean_num(item.get("distance")),
clean_num(item.get("altitude")),
clean_int(item.get("postType")),
))
inserted += 1
except Exception:
log.warning("Failed to process GPS for %s", item.get("deviceImei"), exc_info=True)
log_ingestion(cur, "webhook/pushgps", len(items), 0, inserted,
int((time.time() - t0) * 1000), True)
conn.commit()
log.info("pushgps: %d/%d items processed.", inserted, len(items))
return JSONResponse(content=SUCCESS)
# ── 5. Device Heartbeats (Priority 2) ────────────────────────────────────────
@app.post("/pushhb")
def push_heartbeat(token: str = Form(""), data_list: str = Form("")):
_validate_token(token)
items = _parse_data_list(data_list)
if not items:
return JSONResponse(content=SUCCESS)
t0 = time.time()
inserted = 0
with get_conn() as conn:
with conn.cursor() as cur:
for item in items:
try:
imei = clean(item.get("deviceImei"))
gate_time = clean_ts(item.get("gateTime"))
if not imei or not gate_time:
continue
cur.execute("""
INSERT INTO tracksolid.heartbeats (
imei, gate_time, power_level, gsm_signal,
acc_status, power_status, fortify
) VALUES (%s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (imei, gate_time) DO NOTHING
""", (
imei, gate_time,
clean_int(item.get("powerLevel")),
clean_int(item.get("gsmSign")),
clean_int(item.get("acc")),
clean_int(item.get("powerStatus")),
clean_int(item.get("fortify")),
))
inserted += 1
except Exception:
log.warning("Failed to process heartbeat for %s", item.get("deviceImei"), exc_info=True)
log_ingestion(cur, "webhook/pushhb", len(items), 0, inserted,
int((time.time() - t0) * 1000), True)
conn.commit()
log.info("pushhb: %d/%d items processed.", inserted, len(items))
return JSONResponse(content=SUCCESS)
# ── 6. Trip Reports (Priority 2) ─────────────────────────────────────────────
@app.post("/pushtripreport")
def push_trip_report(token: str = Form(""), data_list: str = Form("")):
_validate_token(token)
items = _parse_data_list(data_list)
if not items:
return JSONResponse(content=SUCCESS)
t0 = time.time()
inserted = 0
with get_conn() as conn:
with conn.cursor() as cur:
for item in items:
try:
imei = clean(item.get("deviceImei"))
begin_time = clean_ts(item.get("beginTime"))
end_time = clean_ts(item.get("endTime"))
if not imei or not begin_time:
continue
# [FIX-M11] API sends miles (km). Store directly as distance_km.
# Previous code multiplied by 1000, producing mm not m.
distance_km = clean_num(item.get("miles"))
begin_lat = clean_num(item.get("beginLat"))
begin_lng = clean_num(item.get("beginLng"))
end_lat = clean_num(item.get("endLat"))
end_lng = clean_num(item.get("endLng"))
cur.execute("""
INSERT INTO tracksolid.trips (
imei, start_time, end_time, distance_km,
start_geom, end_geom,
fuel_consumed_l, idle_time_s, trip_seq, source,
updated_at
) VALUES (
%s, %s, %s, %s,
CASE WHEN %s IS NOT NULL AND %s IS NOT NULL
THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326)
ELSE NULL END,
CASE WHEN %s IS NOT NULL AND %s IS NOT NULL
THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326)
ELSE NULL END,
%s, %s, %s, 'push', NOW()
) ON CONFLICT (imei, start_time) DO UPDATE SET
end_time = EXCLUDED.end_time,
distance_km = EXCLUDED.distance_km,
end_geom = EXCLUDED.end_geom,
fuel_consumed_l = EXCLUDED.fuel_consumed_l,
idle_time_s = EXCLUDED.idle_time_s,
updated_at = NOW()
""", (
imei, begin_time, end_time, distance_km,
begin_lng, begin_lat, begin_lng, begin_lat,
end_lng, end_lat, end_lng, end_lat,
clean_num(item.get("oils")),
clean_int(item.get("idleTimes")),
clean_int(item.get("tripSeq")),
))
inserted += 1
except Exception:
log.warning("Failed to process trip for %s", item.get("deviceImei"), exc_info=True)
log_ingestion(cur, "webhook/pushtripreport", len(items), 0, inserted,
int((time.time() - t0) * 1000), True)
conn.commit()
log.info("pushtripreport: %d/%d items processed.", inserted, len(items))
return JSONResponse(content=SUCCESS)