BUG-01 [FIX-E06]: jimi.device.alarm.list poll response uses alertTypeId/ alarmTypeName/alertTime, not the webhook field names. All 1,054 stored alarm records had null alarm_type/alarm_name as a result. Corrected field mapping in ingest_events_rev.py; also added alarm_name and source columns to INSERT. BUG-02 [FIX-M11/M12]: trips.distance_m was storing millimetres due to an erroneous * 1000 on an already-km API value. Removed the multiplication in poll_trips() and push_trip_report(). Column renamed to distance_km in migration 04 (historical rows divided by 1,000,000 to correct to km). All SQL in both ingestion files updated to reference distance_km. POLL-02 [FIX-M13]: parking poll returned 0 rows because the required account and acc_type=0 parameters were missing. Also fixed response field mapping: durSecond was incorrectly read as 'seconds'. Migration 04: corrects and renames distance_m → distance_km. Migration 05: adds normalized OBD columns, alarm/device enrichment columns, new tables (device_events, fuel_readings, temperature_readings, lbs_readings, geofences), expands dwh_gold fact table, and adds refresh_daily_metrics() ETL. tracksolid_DB_manual.md updated to reflect column rename and mark fixed issues. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
459 lines
19 KiB
Python
459 lines
19 KiB
Python
"""
|
|
webhook_receiver_rev.py — Fireside Communications · Tracksolid Webhook Receiver
|
|
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
|
RESPONSIBILITY: Receives real-time push data from Jimi Tracksolid Pro servers.
|
|
|
|
Jimi's Data Push API POSTs telemetry to these endpoints as it arrives from
|
|
devices, providing real-time ingestion without polling. This is the ONLY way
|
|
to receive OBD diagnostics and DTC fault codes — those data types have no
|
|
polling endpoint.
|
|
|
|
ENDPOINTS:
|
|
/pushobd — OBD CAN bus diagnostics (Priority 1)
|
|
/pushfaultinfo — DTC fault codes (Priority 1)
|
|
/pushalarm — Alarm events (Priority 2)
|
|
/pushgps — GPS positions (Priority 2)
|
|
/pushhb — Device heartbeats (Priority 2)
|
|
/pushtripreport — Trip reports (Priority 2)
|
|
/health — Healthcheck for Docker/monitoring
|
|
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import time
|
|
from contextlib import asynccontextmanager
|
|
from datetime import datetime, timezone
|
|
from typing import Optional
|
|
|
|
from fastapi import FastAPI, Form, HTTPException
|
|
from fastapi.responses import JSONResponse
|
|
|
|
from ts_shared_rev import (
|
|
close_pool,
|
|
get_conn,
|
|
log_ingestion,
|
|
clean,
|
|
clean_num,
|
|
clean_int,
|
|
clean_ts,
|
|
is_valid_fix,
|
|
get_logger,
|
|
)
|
|
|
|
log = get_logger("webhook")
|
|
|
|
# ── Configuration ─────────────────────────────────────────────────────────────
|
|
|
|
WEBHOOK_TOKEN = os.getenv("JIMI_WEBHOOK_TOKEN", "")
|
|
|
|
# ── Lifespan ──────────────────────────────────────────────────────────────────
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
log.info("Webhook receiver starting (v1.0)...")
|
|
yield
|
|
log.info("Webhook receiver shutting down...")
|
|
close_pool()
|
|
|
|
app = FastAPI(title="Tracksolid Webhook Receiver", lifespan=lifespan)
|
|
|
|
# ── Helpers ───────────────────────────────────────────────────────────────────
|
|
|
|
SUCCESS = {"code": 0, "msg": "success"}
|
|
|
|
|
|
def _validate_token(token: str) -> None:
|
|
"""Raise 403 if token is invalid. Skips validation if JIMI_WEBHOOK_TOKEN is empty."""
|
|
if WEBHOOK_TOKEN and token != WEBHOOK_TOKEN:
|
|
raise HTTPException(status_code=403, detail="Invalid token")
|
|
|
|
|
|
def _parse_data_list(raw: str) -> list[dict]:
|
|
"""Parse the JSON string from Jimi's data_list form field."""
|
|
try:
|
|
parsed = json.loads(raw)
|
|
if isinstance(parsed, list):
|
|
return parsed
|
|
return [parsed]
|
|
except (json.JSONDecodeError, TypeError):
|
|
log.warning("Failed to parse data_list: %.200s", raw)
|
|
return []
|
|
|
|
|
|
def unix_to_ts(v) -> Optional[str]:
|
|
"""Convert Unix timestamp (seconds or milliseconds) to ISO string."""
|
|
if v is None:
|
|
return None
|
|
try:
|
|
ts = int(v)
|
|
if ts > 1e12:
|
|
ts = ts // 1000
|
|
return datetime.fromtimestamp(ts, tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
|
|
except (ValueError, TypeError, OSError):
|
|
return None
|
|
|
|
|
|
def _make_geom_params(lat, lng):
|
|
"""Return (lng, lat, lng, lat) tuple for the CASE WHEN ST_MakePoint pattern."""
|
|
return (lng, lat, lng, lat)
|
|
|
|
# ── Health Check ──────────────────────────────────────────────────────────────
|
|
|
|
@app.get("/health")
|
|
def health():
|
|
return {"status": "ok"}
|
|
|
|
# ── 1. OBD Diagnostics (Priority 1) ──────────────────────────────────────────
|
|
|
|
@app.post("/pushobd")
|
|
def push_obd(token: str = Form(""), data_list: str = Form("")):
|
|
_validate_token(token)
|
|
items = _parse_data_list(data_list)
|
|
if not items:
|
|
return JSONResponse(content=SUCCESS)
|
|
|
|
t0 = time.time()
|
|
inserted = 0
|
|
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
for item in items:
|
|
try:
|
|
imei = clean(item.get("deviceImei"))
|
|
obd = item.get("obdJson", {})
|
|
if isinstance(obd, str):
|
|
try:
|
|
obd = json.loads(obd)
|
|
except json.JSONDecodeError:
|
|
obd = {}
|
|
|
|
event_time = clean_ts(obd.get("event_time"))
|
|
if not imei or not event_time:
|
|
continue
|
|
|
|
lat = clean_num(obd.get("lat"))
|
|
lng = clean_num(obd.get("lng"))
|
|
|
|
cur.execute("""
|
|
INSERT INTO tracksolid.obd_readings (
|
|
imei, reading_time, car_type, acc_state, status_flags,
|
|
lat, lng, geom, obd_data, updated_at
|
|
) VALUES (
|
|
%s, %s, %s, %s, %s, %s, %s,
|
|
CASE WHEN %s IS NOT NULL AND %s IS NOT NULL
|
|
THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326)
|
|
ELSE NULL END,
|
|
%s, NOW()
|
|
) ON CONFLICT (imei, reading_time) DO UPDATE SET
|
|
obd_data = EXCLUDED.obd_data,
|
|
updated_at = NOW()
|
|
""", (
|
|
imei, event_time,
|
|
clean_int(obd.get("car_type")),
|
|
clean_int(obd.get("AccState")),
|
|
clean_int(obd.get("statusFlags")),
|
|
lat, lng,
|
|
*_make_geom_params(lat, lng),
|
|
json.dumps(obd),
|
|
))
|
|
inserted += 1
|
|
except Exception:
|
|
log.warning("Failed to process OBD item for %s", item.get("deviceImei"), exc_info=True)
|
|
|
|
log_ingestion(cur, "webhook/pushobd", len(items), 0, inserted,
|
|
int((time.time() - t0) * 1000), True)
|
|
conn.commit()
|
|
|
|
log.info("pushobd: %d/%d items processed.", inserted, len(items))
|
|
return JSONResponse(content=SUCCESS)
|
|
|
|
# ── 2. DTC Fault Codes (Priority 1) ──────────────────────────────────────────
|
|
|
|
@app.post("/pushfaultinfo")
|
|
def push_fault_info(token: str = Form(""), data_list: str = Form("")):
|
|
_validate_token(token)
|
|
items = _parse_data_list(data_list)
|
|
if not items:
|
|
return JSONResponse(content=SUCCESS)
|
|
|
|
t0 = time.time()
|
|
inserted = 0
|
|
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
for item in items:
|
|
try:
|
|
imei = clean(item.get("deviceImei"))
|
|
gate_time = clean_ts(item.get("gateTime"))
|
|
if not imei or not gate_time:
|
|
continue
|
|
|
|
fault_codes = item.get("faultCodeList", [])
|
|
if isinstance(fault_codes, str):
|
|
try:
|
|
fault_codes = json.loads(fault_codes)
|
|
except json.JSONDecodeError:
|
|
fault_codes = []
|
|
|
|
lat = clean_num(item.get("lat"))
|
|
lng = clean_num(item.get("lng"))
|
|
evt_time = unix_to_ts(item.get("eventTime")) or clean_ts(item.get("eventTime"))
|
|
|
|
for code in fault_codes:
|
|
cur.execute("""
|
|
INSERT INTO tracksolid.fault_codes (
|
|
imei, reported_at, fault_code, status_flags,
|
|
lat, lng, geom, event_time
|
|
) VALUES (
|
|
%s, %s, %s, %s, %s, %s,
|
|
CASE WHEN %s IS NOT NULL AND %s IS NOT NULL
|
|
THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326)
|
|
ELSE NULL END,
|
|
%s
|
|
) ON CONFLICT (imei, reported_at, fault_code) DO NOTHING
|
|
""", (
|
|
imei, gate_time, clean(code),
|
|
clean_int(item.get("statusFlags")),
|
|
lat, lng,
|
|
*_make_geom_params(lat, lng),
|
|
evt_time,
|
|
))
|
|
inserted += 1
|
|
except Exception:
|
|
log.warning("Failed to process fault item for %s", item.get("deviceImei"), exc_info=True)
|
|
|
|
log_ingestion(cur, "webhook/pushfaultinfo", len(items), 0, inserted,
|
|
int((time.time() - t0) * 1000), True)
|
|
conn.commit()
|
|
|
|
log.info("pushfaultinfo: %d fault codes from %d items.", inserted, len(items))
|
|
return JSONResponse(content=SUCCESS)
|
|
|
|
# ── 3. Alarm Events (Priority 2) ─────────────────────────────────────────────
|
|
|
|
@app.post("/pushalarm")
|
|
def push_alarm(token: str = Form(""), data_list: str = Form("")):
|
|
_validate_token(token)
|
|
items = _parse_data_list(data_list)
|
|
if not items:
|
|
return JSONResponse(content=SUCCESS)
|
|
|
|
t0 = time.time()
|
|
inserted = 0
|
|
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
for item in items:
|
|
try:
|
|
imei = clean(item.get("deviceImei"))
|
|
alarm_type = clean(item.get("alarmType"))
|
|
alarm_time = clean_ts(item.get("gateTime"))
|
|
if not imei or not alarm_time:
|
|
continue
|
|
|
|
lat = clean_num(item.get("lat"))
|
|
lng = clean_num(item.get("lng"))
|
|
|
|
cur.execute("""
|
|
INSERT INTO tracksolid.alarms (
|
|
imei, alarm_type, alarm_name, alarm_time, geom,
|
|
lat, lng, speed, source, updated_at
|
|
) VALUES (
|
|
%s, %s, %s, %s,
|
|
CASE WHEN %s IS NOT NULL AND %s IS NOT NULL
|
|
THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326)
|
|
ELSE NULL END,
|
|
%s, %s, %s, 'push', NOW()
|
|
) ON CONFLICT (imei, alarm_type, alarm_time) DO NOTHING
|
|
""", (
|
|
imei, alarm_type, clean(item.get("alarmName")), alarm_time,
|
|
*_make_geom_params(lat, lng),
|
|
lat, lng,
|
|
clean_num(item.get("speed")),
|
|
))
|
|
inserted += 1
|
|
except Exception:
|
|
log.warning("Failed to process alarm for %s", item.get("deviceImei"), exc_info=True)
|
|
|
|
log_ingestion(cur, "webhook/pushalarm", len(items), 0, inserted,
|
|
int((time.time() - t0) * 1000), True)
|
|
conn.commit()
|
|
|
|
log.info("pushalarm: %d/%d items processed.", inserted, len(items))
|
|
return JSONResponse(content=SUCCESS)
|
|
|
|
# ── 4. GPS Positions (Priority 2) ────────────────────────────────────────────
|
|
|
|
@app.post("/pushgps")
|
|
def push_gps(token: str = Form(""), data_list: str = Form("")):
|
|
_validate_token(token)
|
|
items = _parse_data_list(data_list)
|
|
if not items:
|
|
return JSONResponse(content=SUCCESS)
|
|
|
|
t0 = time.time()
|
|
inserted = 0
|
|
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
for item in items:
|
|
try:
|
|
imei = clean(item.get("deviceImei"))
|
|
gps_time = clean_ts(item.get("gpsTime"))
|
|
lat = clean_num(item.get("lat"))
|
|
lng = clean_num(item.get("lng"))
|
|
|
|
if not imei or not gps_time or not is_valid_fix(lat, lng):
|
|
continue
|
|
|
|
cur.execute("""
|
|
INSERT INTO tracksolid.position_history (
|
|
imei, gps_time, geom, lat, lng, speed, direction,
|
|
acc_status, satellite, current_mileage,
|
|
altitude, post_type, source
|
|
) VALUES (
|
|
%s, %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326),
|
|
%s, %s, %s, %s, %s, %s, %s, %s, %s, 'push'
|
|
) ON CONFLICT (imei, gps_time) DO NOTHING
|
|
""", (
|
|
imei, gps_time, lng, lat,
|
|
lat, lng,
|
|
clean_num(item.get("gpsSpeed")),
|
|
clean_num(item.get("direction")),
|
|
str(item.get("acc")) if item.get("acc") is not None else None,
|
|
clean_int(item.get("satelliteNum")),
|
|
clean_num(item.get("distance")),
|
|
clean_num(item.get("altitude")),
|
|
clean_int(item.get("postType")),
|
|
))
|
|
inserted += 1
|
|
except Exception:
|
|
log.warning("Failed to process GPS for %s", item.get("deviceImei"), exc_info=True)
|
|
|
|
log_ingestion(cur, "webhook/pushgps", len(items), 0, inserted,
|
|
int((time.time() - t0) * 1000), True)
|
|
conn.commit()
|
|
|
|
log.info("pushgps: %d/%d items processed.", inserted, len(items))
|
|
return JSONResponse(content=SUCCESS)
|
|
|
|
# ── 5. Device Heartbeats (Priority 2) ────────────────────────────────────────
|
|
|
|
@app.post("/pushhb")
|
|
def push_heartbeat(token: str = Form(""), data_list: str = Form("")):
|
|
_validate_token(token)
|
|
items = _parse_data_list(data_list)
|
|
if not items:
|
|
return JSONResponse(content=SUCCESS)
|
|
|
|
t0 = time.time()
|
|
inserted = 0
|
|
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
for item in items:
|
|
try:
|
|
imei = clean(item.get("deviceImei"))
|
|
gate_time = clean_ts(item.get("gateTime"))
|
|
if not imei or not gate_time:
|
|
continue
|
|
|
|
cur.execute("""
|
|
INSERT INTO tracksolid.heartbeats (
|
|
imei, gate_time, power_level, gsm_signal,
|
|
acc_status, power_status, fortify
|
|
) VALUES (%s, %s, %s, %s, %s, %s, %s)
|
|
ON CONFLICT (imei, gate_time) DO NOTHING
|
|
""", (
|
|
imei, gate_time,
|
|
clean_int(item.get("powerLevel")),
|
|
clean_int(item.get("gsmSign")),
|
|
clean_int(item.get("acc")),
|
|
clean_int(item.get("powerStatus")),
|
|
clean_int(item.get("fortify")),
|
|
))
|
|
inserted += 1
|
|
except Exception:
|
|
log.warning("Failed to process heartbeat for %s", item.get("deviceImei"), exc_info=True)
|
|
|
|
log_ingestion(cur, "webhook/pushhb", len(items), 0, inserted,
|
|
int((time.time() - t0) * 1000), True)
|
|
conn.commit()
|
|
|
|
log.info("pushhb: %d/%d items processed.", inserted, len(items))
|
|
return JSONResponse(content=SUCCESS)
|
|
|
|
# ── 6. Trip Reports (Priority 2) ─────────────────────────────────────────────
|
|
|
|
@app.post("/pushtripreport")
|
|
def push_trip_report(token: str = Form(""), data_list: str = Form("")):
|
|
_validate_token(token)
|
|
items = _parse_data_list(data_list)
|
|
if not items:
|
|
return JSONResponse(content=SUCCESS)
|
|
|
|
t0 = time.time()
|
|
inserted = 0
|
|
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
for item in items:
|
|
try:
|
|
imei = clean(item.get("deviceImei"))
|
|
begin_time = clean_ts(item.get("beginTime"))
|
|
end_time = clean_ts(item.get("endTime"))
|
|
if not imei or not begin_time:
|
|
continue
|
|
|
|
# [FIX-M11] API sends miles (km). Store directly as distance_km.
|
|
# Previous code multiplied by 1000, producing mm not m.
|
|
distance_km = clean_num(item.get("miles"))
|
|
|
|
begin_lat = clean_num(item.get("beginLat"))
|
|
begin_lng = clean_num(item.get("beginLng"))
|
|
end_lat = clean_num(item.get("endLat"))
|
|
end_lng = clean_num(item.get("endLng"))
|
|
|
|
cur.execute("""
|
|
INSERT INTO tracksolid.trips (
|
|
imei, start_time, end_time, distance_km,
|
|
start_geom, end_geom,
|
|
fuel_consumed_l, idle_time_s, trip_seq, source,
|
|
updated_at
|
|
) VALUES (
|
|
%s, %s, %s, %s,
|
|
CASE WHEN %s IS NOT NULL AND %s IS NOT NULL
|
|
THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326)
|
|
ELSE NULL END,
|
|
CASE WHEN %s IS NOT NULL AND %s IS NOT NULL
|
|
THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326)
|
|
ELSE NULL END,
|
|
%s, %s, %s, 'push', NOW()
|
|
) ON CONFLICT (imei, start_time) DO UPDATE SET
|
|
end_time = EXCLUDED.end_time,
|
|
distance_km = EXCLUDED.distance_km,
|
|
end_geom = EXCLUDED.end_geom,
|
|
fuel_consumed_l = EXCLUDED.fuel_consumed_l,
|
|
idle_time_s = EXCLUDED.idle_time_s,
|
|
updated_at = NOW()
|
|
""", (
|
|
imei, begin_time, end_time, distance_km,
|
|
begin_lng, begin_lat, begin_lng, begin_lat,
|
|
end_lng, end_lat, end_lng, end_lat,
|
|
clean_num(item.get("oils")),
|
|
clean_int(item.get("idleTimes")),
|
|
clean_int(item.get("tripSeq")),
|
|
))
|
|
inserted += 1
|
|
except Exception:
|
|
log.warning("Failed to process trip for %s", item.get("deviceImei"), exc_info=True)
|
|
|
|
log_ingestion(cur, "webhook/pushtripreport", len(items), 0, inserted,
|
|
int((time.time() - t0) * 1000), True)
|
|
conn.commit()
|
|
|
|
log.info("pushtripreport: %d/%d items processed.", inserted, len(items))
|
|
return JSONResponse(content=SUCCESS)
|