tracksolid_timescale_grafan.../webhook_receiver_rev.py
david kiania 8d386bf27a
Some checks failed
Static Analysis / static (push) Waiting to run
Tests / test (push) Waiting to run
Static Analysis / static (pull_request) Has been cancelled
Tests / test (pull_request) Has been cancelled
fix: BUG-01 ETL type crash, BUG-02 multi-account audit, BUG-03 diagnostic
BUG-01 (CRITICAL): dwh_gold.refresh_daily_metrics inserted t.imei (TEXT) into
fact_daily_fleet_metrics.vehicle_key (INTEGER REFERENCES dim_vehicles), so the
nightly ETL would have raised "invalid input syntax for type integer" on every
run. Migration 08 backfills dim_vehicles from tracksolid.devices and rewrites
the function to JOIN through dim_vehicles, returning the serial vehicle_key.
The function also re-syncs dim_vehicles at the top of each call so newly
registered devices appear in the warehouse without manual seeding.

BUG-02 (HIGH): sync_driver_audit.py only queried TARGET_ACCOUNT, ignoring the
Fireside@HQ and Fireside_MSA sub-accounts. The audit now iterates TARGETS
(matching FIX-M19 in ingest_movement_rev.sync_devices), dedupes devices by
IMEI, and tolerates per-target failures.

BUG-03 (HIGH, diagnostic only): the webhook trip handler stores item["miles"]
straight into distance_km. The field name is suspicious and FIX-M16 already
proved the polling endpoint mis-documents its units. Added a SQL diagnostic
that compares the distribution of stored-km / great-circle-km for push-source
vs poll-source trips over 30 days — the ratio test will tell us whether the
push value needs a /1.609 (miles), /1000 (metres), or no conversion. The
existing calculation is left unchanged until the data confirms the unit; the
old FIX-M11 comment is replaced with a BUG-03 pointer to the diagnostic.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-15 15:34:43 +03:00

643 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
webhook_receiver_rev.py — Fireside Communications · Tracksolid Webhook Receiver
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
RESPONSIBILITY: Receives real-time push data from Jimi Tracksolid Pro servers.
Jimi's Data Push API POSTs telemetry to these endpoints as it arrives from
devices, providing real-time ingestion without polling. This is the ONLY way
to receive OBD diagnostics and DTC fault codes — those data types have no
polling endpoint.
ENDPOINTS:
/pushobd — OBD CAN bus diagnostics (Priority 1)
/pushfaultinfo — DTC fault codes (Priority 1)
/pushalarm — Alarm events (Priority 2)
/pushgps — GPS positions (Priority 2)
/pushhb — Device heartbeats (Priority 2)
/pushtripreport — Trip reports (Priority 2)
/pushevent — Device LOGIN/LOGOUT events (Priority 3)
/health — Healthcheck for Docker/monitoring
REVISIONS (QA-Verified):
[BUG-01] OBD event_time: try unix_to_ts before clean_ts (handles epoch timestamps).
[BUG-02] push_alarm: guard also checks alarm_type is not null (prevents FK violation).
[BUG-03] push_trip_report: _parse_trip_ts handles Jimi BCD format YYMMDDHHmmss.
[BUG-04] SAVEPOINT per item in all DB-writing endpoints (one bad item won't abort batch).
[BUG-05] Added /pushevent endpoint → writes to tracksolid.device_events.
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
"""
from __future__ import annotations
import hmac
import json
import os
import time
from contextlib import asynccontextmanager
from datetime import datetime, timezone
from typing import Optional
# Cap on items per webhook POST. Prevents a malformed/malicious push from
# monopolising a worker or blowing the DB pool. Jimi normally sends ≤ 200.
MAX_ITEMS_PER_POST = int(os.getenv("WEBHOOK_MAX_ITEMS", "5000"))
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse
from psycopg2.extras import execute_values
from ts_shared_rev import (
close_pool,
get_conn,
log_ingestion,
clean,
clean_num,
clean_int,
clean_ts,
is_valid_fix,
get_logger,
)
log = get_logger("webhook")
# ── Configuration ─────────────────────────────────────────────────────────────
WEBHOOK_TOKEN = os.getenv("JIMI_WEBHOOK_TOKEN", "")
# ── Lifespan ──────────────────────────────────────────────────────────────────
@asynccontextmanager
async def lifespan(app: FastAPI):
log.info("Webhook receiver starting (v1.1)...")
yield
log.info("Webhook receiver shutting down...")
close_pool()
app = FastAPI(title="Tracksolid Webhook Receiver", lifespan=lifespan)
# ── Helpers ───────────────────────────────────────────────────────────────────
SUCCESS = {"code": 0, "msg": "success"}
def _validate_token(token: str) -> None:
"""Raise 403 if token is invalid. Skips validation if JIMI_WEBHOOK_TOKEN is empty."""
if WEBHOOK_TOKEN and not hmac.compare_digest(token, WEBHOOK_TOKEN):
raise HTTPException(status_code=403, detail="Invalid token")
def _parse_data_list(raw: str) -> list[dict]:
"""Parse a JSON string into a list of dicts. raw may be a JSON array or single object."""
try:
parsed = json.loads(raw)
items = parsed if isinstance(parsed, list) else [parsed]
if len(items) > MAX_ITEMS_PER_POST:
log.warning("data_list truncated: %d items exceeded cap of %d",
len(items), MAX_ITEMS_PER_POST)
items = items[:MAX_ITEMS_PER_POST]
return items
except (json.JSONDecodeError, TypeError):
return []
async def _parse_request(request: Request) -> tuple[str, list[dict]]:
"""Extract token + data_list from either a JSON body or form-encoded body.
Jimi's integration push API sends:
Content-Type: application/json
Body: {"token": "...", "data_list": [{...}, ...]}
Some older/configured endpoints may still use form-encoded. This helper
handles both so each endpoint doesn't need to know which format arrived.
"""
body = await request.body()
if not body:
return "", []
# Jimi integration push format (observed live):
# Content-Type: application/x-www-form-urlencoded
# Body: msgType=<topic>&data=<URL-encoded JSON object or array>
# The `data` field holds a single JSON object per event, not an array.
try:
form = await request.form()
except Exception:
log.warning("push: form parse failed", exc_info=True)
return "", []
token = str(form.get("token", ""))
raw_data = form.get("data") or form.get("data_list") or ""
if not raw_data:
return token, []
try:
parsed = json.loads(raw_data)
except (json.JSONDecodeError, TypeError):
log.warning("push: data JSON parse failed — %.200s", raw_data)
return token, []
items = parsed if isinstance(parsed, list) else [parsed]
if len(items) > MAX_ITEMS_PER_POST:
log.warning("push: truncated %d%d items", len(items), MAX_ITEMS_PER_POST)
items = items[:MAX_ITEMS_PER_POST]
return token, items
def unix_to_ts(v) -> Optional[str]:
"""Convert Unix timestamp (seconds or milliseconds) to ISO string."""
if v is None:
return None
try:
ts = int(v)
if ts > 1e12:
ts = ts // 1000
return datetime.fromtimestamp(ts, tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
except (ValueError, TypeError, OSError):
return None
def _parse_trip_ts(v) -> Optional[str]:
"""[BUG-03] Parse trip timestamps. Handles ISO strings and Jimi BCD formats."""
iso = clean_ts(v)
if iso:
return iso
s = clean(v)
if s is None:
return None
try:
if len(s) == 12: # YYMMDDHHmmss
return datetime.strptime(s, "%y%m%d%H%M%S").strftime("%Y-%m-%d %H:%M:%S")
if len(s) == 14: # YYYYMMDDHHmmss
return datetime.strptime(s, "%Y%m%d%H%M%S").strftime("%Y-%m-%d %H:%M:%S")
except (ValueError, TypeError):
pass
log.warning("Cannot parse trip timestamp: %r", v)
return None
def _make_geom_params(lat, lng):
"""Return (lng, lat, lng, lat) tuple for the CASE WHEN ST_MakePoint pattern."""
return (lng, lat, lng, lat)
def _ensure_device(cur, imei: str, device_name: Optional[str] = None) -> None:
"""Upsert a stub row into tracksolid.devices so FK-constrained inserts don't fail.
Jimi pushes alarms/GPS for devices that may not yet be in our devices table
(neither API-sync'd nor in the onboarding CSV). Rather than drop the event,
register the IMEI with whatever context the push carried; the nightly
`sync_devices()` and the CSV import fill in the remaining fields later.
"""
cur.execute(
"""
INSERT INTO tracksolid.devices (imei, device_name, status, created_at, updated_at)
VALUES (%s, %s, 'unknown', NOW(), NOW())
ON CONFLICT (imei) DO NOTHING
""",
(imei, device_name),
)
# ── Health Check ──────────────────────────────────────────────────────────────
@app.get("/health")
def health():
return {"status": "ok"}
# ── 1. OBD Diagnostics (Priority 1) ──────────────────────────────────────────
@app.post("/pushobd")
async def push_obd(request: Request):
token, items = await _parse_request(request)
_validate_token(token)
if not items:
return JSONResponse(content=SUCCESS)
t0 = time.time()
inserted = 0
with get_conn() as conn:
with conn.cursor() as cur:
for item in items:
try:
cur.execute("SAVEPOINT sp")
imei = clean(item.get("deviceImei"))
obd = item.get("obdJson", {})
if isinstance(obd, str):
try:
obd = json.loads(obd)
except json.JSONDecodeError:
obd = {}
# [BUG-01] Try unix epoch first, fall back to ISO string.
event_time = (
unix_to_ts(obd.get("event_time"))
or clean_ts(obd.get("event_time"))
)
if not imei or not event_time:
cur.execute("RELEASE SAVEPOINT sp")
continue
lat = clean_num(obd.get("lat"))
lng = clean_num(obd.get("lng"))
cur.execute("""
INSERT INTO tracksolid.obd_readings (
imei, reading_time, car_type, acc_state, status_flags,
lat, lng, geom, obd_data, updated_at
) VALUES (
%s, %s, %s, %s, %s, %s, %s,
CASE WHEN %s IS NOT NULL AND %s IS NOT NULL
THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326)
ELSE NULL END,
%s, NOW()
) ON CONFLICT (imei, reading_time) DO UPDATE SET
obd_data = EXCLUDED.obd_data,
updated_at = NOW()
""", (
imei, event_time,
clean_int(obd.get("car_type")),
clean_int(obd.get("AccState")),
clean_int(obd.get("statusFlags")),
lat, lng,
*_make_geom_params(lat, lng),
json.dumps(obd),
))
cur.execute("RELEASE SAVEPOINT sp")
inserted += 1
except Exception:
cur.execute("ROLLBACK TO SAVEPOINT sp")
log.warning("Failed to process OBD item for %s", item.get("deviceImei"), exc_info=True)
log_ingestion(cur, "webhook/pushobd", len(items), 0, inserted,
int((time.time() - t0) * 1000), True)
log.info("pushobd: %d/%d items processed.", inserted, len(items))
return JSONResponse(content=SUCCESS)
# ── 2. DTC Fault Codes (Priority 1) ──────────────────────────────────────────
@app.post("/pushfaultinfo")
async def push_fault_info(request: Request):
token, items = await _parse_request(request)
_validate_token(token)
if not items:
return JSONResponse(content=SUCCESS)
t0 = time.time()
inserted = 0
with get_conn() as conn:
with conn.cursor() as cur:
for item in items:
try:
cur.execute("SAVEPOINT sp")
imei = clean(item.get("deviceImei"))
gate_time = clean_ts(item.get("gateTime"))
if not imei or not gate_time:
cur.execute("RELEASE SAVEPOINT sp")
continue
fault_codes = item.get("faultCodeList", [])
if isinstance(fault_codes, str):
try:
fault_codes = json.loads(fault_codes)
except json.JSONDecodeError:
fault_codes = []
lat = clean_num(item.get("lat"))
lng = clean_num(item.get("lng"))
evt_time = unix_to_ts(item.get("eventTime")) or clean_ts(item.get("eventTime"))
for code in fault_codes:
fault_code = clean(code)
# Guard NULL: ON CONFLICT won't deduplicate NULL fault_codes
# because NULL != NULL in Postgres unique constraints.
if not fault_code:
continue
cur.execute("""
INSERT INTO tracksolid.fault_codes (
imei, reported_at, fault_code, status_flags,
lat, lng, geom, event_time
) VALUES (
%s, %s, %s, %s, %s, %s,
CASE WHEN %s IS NOT NULL AND %s IS NOT NULL
THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326)
ELSE NULL END,
%s
) ON CONFLICT (imei, reported_at, fault_code) DO NOTHING
""", (
imei, gate_time, fault_code,
clean_int(item.get("statusFlags")),
lat, lng,
*_make_geom_params(lat, lng),
evt_time,
))
inserted += 1
cur.execute("RELEASE SAVEPOINT sp")
except Exception:
cur.execute("ROLLBACK TO SAVEPOINT sp")
log.warning("Failed to process fault item for %s", item.get("deviceImei"), exc_info=True)
log_ingestion(cur, "webhook/pushfaultinfo", len(items), 0, inserted,
int((time.time() - t0) * 1000), True)
log.info("pushfaultinfo: %d fault codes from %d items.", inserted, len(items))
return JSONResponse(content=SUCCESS)
# ── 3. Alarm Events (Priority 2) ─────────────────────────────────────────────
@app.post("/pushalarm")
async def push_alarm(request: Request):
token, items = await _parse_request(request)
_validate_token(token)
if not items:
return JSONResponse(content=SUCCESS)
t0 = time.time()
inserted = 0
with get_conn() as conn:
with conn.cursor() as cur:
for item in items:
try:
cur.execute("SAVEPOINT sp")
# Jimi integration push uses `imei` + `alarmTime`, NOT the
# `deviceImei` + `gateTime` fields shown in the API docs.
imei = clean(item.get("imei") or item.get("deviceImei"))
alarm_type = clean(item.get("alarmType"))
alarm_time = clean_ts(item.get("alarmTime") or item.get("gateTime"))
# [BUG-02] Also guard alarm_type — NULL alarm_type violates NOT NULL constraint.
if not imei or not alarm_time or not alarm_type:
cur.execute("RELEASE SAVEPOINT sp")
continue
# Ensure parent devices row exists to satisfy FK constraint.
_ensure_device(cur, imei, clean(item.get("deviceName")))
lat = clean_num(item.get("lat"))
lng = clean_num(item.get("lng"))
cur.execute("""
INSERT INTO tracksolid.alarms (
imei, alarm_type, alarm_name, alarm_time, geom,
lat, lng, speed, source, updated_at
) VALUES (
%s, %s, %s, %s,
CASE WHEN %s IS NOT NULL AND %s IS NOT NULL
THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326)
ELSE NULL END,
%s, %s, %s, 'push', NOW()
) ON CONFLICT (imei, alarm_type, alarm_time) DO NOTHING
""", (
imei, alarm_type, clean(item.get("alarmName")), alarm_time,
*_make_geom_params(lat, lng),
lat, lng,
clean_num(item.get("speed")),
))
cur.execute("RELEASE SAVEPOINT sp")
inserted += 1
except Exception:
cur.execute("ROLLBACK TO SAVEPOINT sp")
log.warning("Failed to process alarm for %s",
item.get("imei") or item.get("deviceImei"), exc_info=True)
log_ingestion(cur, "webhook/pushalarm", len(items), 0, inserted,
int((time.time() - t0) * 1000), True)
log.info("pushalarm: %d/%d items processed.", inserted, len(items))
return JSONResponse(content=SUCCESS)
# ── 4. GPS Positions (Priority 2) ────────────────────────────────────────────
@app.post("/pushgps")
async def push_gps(request: Request):
token, items = await _parse_request(request)
_validate_token(token)
if not items:
return JSONResponse(content=SUCCESS)
t0 = time.time()
# Validation phase — pre-clean and filter without touching the DB.
# Per-row INSERT with SAVEPOINT was ~1 ms/row overhead at this volume;
# one batched execute_values is 10-50× faster for the same rows.
rows = []
for item in items:
imei = clean(item.get("deviceImei"))
gps_time = clean_ts(item.get("gpsTime"))
lat = clean_num(item.get("lat"))
lng = clean_num(item.get("lng"))
if not imei or not gps_time or not is_valid_fix(lat, lng):
continue
rows.append((
imei, gps_time, lng, lat, lat, lng,
clean_num(item.get("gpsSpeed")),
clean_num(item.get("direction")),
str(item.get("acc")) if item.get("acc") is not None else None,
clean_int(item.get("satelliteNum")),
clean_num(item.get("distance")),
clean_num(item.get("altitude")),
clean_int(item.get("postType")),
))
inserted = 0
if rows:
with get_conn() as conn:
with conn.cursor() as cur:
execute_values(
cur,
"""
INSERT INTO tracksolid.position_history (
imei, gps_time, geom, lat, lng, speed, direction,
acc_status, satellite, current_mileage,
altitude, post_type, source
) VALUES %s
ON CONFLICT (imei, gps_time) DO NOTHING
""",
rows,
template="(%s, %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326),"
" %s, %s, %s, %s, %s, %s, %s, %s, %s, 'push')",
page_size=len(rows),
)
inserted = cur.rowcount
log_ingestion(cur, "webhook/pushgps", len(items), 0, inserted,
int((time.time() - t0) * 1000), True)
else:
# No valid rows, still record the call for observability.
with get_conn() as conn:
with conn.cursor() as cur:
log_ingestion(cur, "webhook/pushgps", len(items), 0, 0,
int((time.time() - t0) * 1000), True)
log.info("pushgps: %d/%d items inserted.", inserted, len(items))
return JSONResponse(content=SUCCESS)
# ── 5. Device Heartbeats (Priority 2) ────────────────────────────────────────
@app.post("/pushhb")
async def push_heartbeat(request: Request):
token, items = await _parse_request(request)
_validate_token(token)
if not items:
return JSONResponse(content=SUCCESS)
t0 = time.time()
inserted = 0
with get_conn() as conn:
with conn.cursor() as cur:
for item in items:
try:
cur.execute("SAVEPOINT sp")
imei = clean(item.get("deviceImei"))
gate_time = clean_ts(item.get("gateTime"))
if not imei or not gate_time:
cur.execute("RELEASE SAVEPOINT sp")
continue
cur.execute("""
INSERT INTO tracksolid.heartbeats (
imei, gate_time, power_level, gsm_signal,
acc_status, power_status, fortify
) VALUES (%s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (imei, gate_time) DO NOTHING
""", (
imei, gate_time,
clean_int(item.get("powerLevel")),
clean_int(item.get("gsmSign")),
clean_int(item.get("acc")),
clean_int(item.get("powerStatus")),
clean_int(item.get("fortify")),
))
cur.execute("RELEASE SAVEPOINT sp")
inserted += 1
except Exception:
cur.execute("ROLLBACK TO SAVEPOINT sp")
log.warning("Failed to process heartbeat for %s", item.get("deviceImei"), exc_info=True)
log_ingestion(cur, "webhook/pushhb", len(items), 0, inserted,
int((time.time() - t0) * 1000), True)
log.info("pushhb: %d/%d items processed.", inserted, len(items))
return JSONResponse(content=SUCCESS)
# ── 6. Trip Reports (Priority 2) ─────────────────────────────────────────────
@app.post("/pushtripreport")
async def push_trip_report(request: Request):
token, items = await _parse_request(request)
_validate_token(token)
if not items:
return JSONResponse(content=SUCCESS)
t0 = time.time()
inserted = 0
with get_conn() as conn:
with conn.cursor() as cur:
for item in items:
try:
cur.execute("SAVEPOINT sp")
imei = clean(item.get("deviceImei"))
# [BUG-03] Use _parse_trip_ts to handle Jimi BCD format YYMMDDHHmmss.
begin_time = _parse_trip_ts(item.get("beginTime"))
end_time = _parse_trip_ts(item.get("endTime"))
if not imei or not begin_time:
cur.execute("RELEASE SAVEPOINT sp")
continue
# [BUG-03 — under investigation] The field is named `miles`
# but FIX-M11 claims it is already km. The polling endpoint
# was found to return metres despite docs (FIX-M16), so the
# webhook unit is suspect. Until the diagnostic
# db_audit/checks/bug03_webhook_distance_units.sql confirms
# the unit, store the raw value unchanged and rely on the
# diagnostic's ratio test to decide whether a /1.609 or
# /1000 conversion is needed.
distance_km = clean_num(item.get("miles"))
begin_lat = clean_num(item.get("beginLat"))
begin_lng = clean_num(item.get("beginLng"))
end_lat = clean_num(item.get("endLat"))
end_lng = clean_num(item.get("endLng"))
cur.execute("""
INSERT INTO tracksolid.trips (
imei, start_time, end_time, distance_km,
start_geom, end_geom,
fuel_consumed_l, idle_time_s, trip_seq, source,
updated_at
) VALUES (
%s, %s, %s, %s,
CASE WHEN %s IS NOT NULL AND %s IS NOT NULL
THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326)
ELSE NULL END,
CASE WHEN %s IS NOT NULL AND %s IS NOT NULL
THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326)
ELSE NULL END,
%s, %s, %s, 'push', NOW()
) ON CONFLICT (imei, start_time) DO UPDATE SET
end_time = EXCLUDED.end_time,
distance_km = EXCLUDED.distance_km,
end_geom = EXCLUDED.end_geom,
fuel_consumed_l = EXCLUDED.fuel_consumed_l,
idle_time_s = EXCLUDED.idle_time_s,
updated_at = NOW()
""", (
imei, begin_time, end_time, distance_km,
begin_lng, begin_lat, begin_lng, begin_lat,
end_lng, end_lat, end_lng, end_lat,
clean_num(item.get("oils")),
clean_int(item.get("idleTimes")),
clean_int(item.get("tripSeq")),
))
cur.execute("RELEASE SAVEPOINT sp")
inserted += 1
except Exception:
cur.execute("ROLLBACK TO SAVEPOINT sp")
log.warning("Failed to process trip for %s", item.get("deviceImei"), exc_info=True)
log_ingestion(cur, "webhook/pushtripreport", len(items), 0, inserted,
int((time.time() - t0) * 1000), True)
log.info("pushtripreport: %d/%d items processed.", inserted, len(items))
return JSONResponse(content=SUCCESS)
# ── 7. Device Events (LOGIN / LOGOUT) ────────────────────────────────────────
@app.post("/pushevent")
async def push_event(request: Request):
token, items = await _parse_request(request)
_validate_token(token)
if not items:
return JSONResponse(content=SUCCESS)
t0 = time.time()
inserted = 0
with get_conn() as conn:
with conn.cursor() as cur:
for item in items:
try:
cur.execute("SAVEPOINT sp")
imei = clean(item.get("deviceImei"))
event_type = clean(item.get("type"))
event_time = clean_ts(item.get("gateTime"))
if not imei or not event_type or not event_time:
cur.execute("RELEASE SAVEPOINT sp")
continue
cur.execute("""
INSERT INTO tracksolid.device_events
(imei, event_type, event_time, timezone)
VALUES (%s, %s, %s, %s)
ON CONFLICT (imei, event_type, event_time) DO NOTHING
""", (imei, event_type, event_time, clean(item.get("timezone"))))
cur.execute("RELEASE SAVEPOINT sp")
inserted += 1
except Exception:
cur.execute("ROLLBACK TO SAVEPOINT sp")
log.warning("Failed to process event for %s", item.get("deviceImei"), exc_info=True)
log_ingestion(cur, "webhook/pushevent", len(items), 0, inserted,
int((time.time() - t0) * 1000), True)
log.info("pushevent: %d/%d items processed.", inserted, len(items))
return JSONResponse(content=SUCCESS)