Temporary diagnostic to see what format Jimi actually sends on /pushalarm. New container is parsing to empty items (pushes arrive but no DB insert), so we need to see the real body shape. Remove once format is confirmed. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
622 lines
27 KiB
Python
622 lines
27 KiB
Python
"""
|
||
webhook_receiver_rev.py — Fireside Communications · Tracksolid Webhook Receiver
|
||
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||
RESPONSIBILITY: Receives real-time push data from Jimi Tracksolid Pro servers.
|
||
|
||
Jimi's Data Push API POSTs telemetry to these endpoints as it arrives from
|
||
devices, providing real-time ingestion without polling. This is the ONLY way
|
||
to receive OBD diagnostics and DTC fault codes — those data types have no
|
||
polling endpoint.
|
||
|
||
ENDPOINTS:
|
||
/pushobd — OBD CAN bus diagnostics (Priority 1)
|
||
/pushfaultinfo — DTC fault codes (Priority 1)
|
||
/pushalarm — Alarm events (Priority 2)
|
||
/pushgps — GPS positions (Priority 2)
|
||
/pushhb — Device heartbeats (Priority 2)
|
||
/pushtripreport — Trip reports (Priority 2)
|
||
/pushevent — Device LOGIN/LOGOUT events (Priority 3)
|
||
/health — Healthcheck for Docker/monitoring
|
||
|
||
REVISIONS (QA-Verified):
|
||
[BUG-01] OBD event_time: try unix_to_ts before clean_ts (handles epoch timestamps).
|
||
[BUG-02] push_alarm: guard also checks alarm_type is not null (prevents FK violation).
|
||
[BUG-03] push_trip_report: _parse_trip_ts handles Jimi BCD format YYMMDDHHmmss.
|
||
[BUG-04] SAVEPOINT per item in all DB-writing endpoints (one bad item won't abort batch).
|
||
[BUG-05] Added /pushevent endpoint → writes to tracksolid.device_events.
|
||
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import hmac
|
||
import json
|
||
import os
|
||
import time
|
||
from contextlib import asynccontextmanager
|
||
from datetime import datetime, timezone
|
||
from typing import Optional
|
||
|
||
# Cap on items per webhook POST. Prevents a malformed/malicious push from
|
||
# monopolising a worker or blowing the DB pool. Jimi normally sends ≤ 200.
|
||
MAX_ITEMS_PER_POST = int(os.getenv("WEBHOOK_MAX_ITEMS", "5000"))
|
||
|
||
from fastapi import FastAPI, HTTPException, Request
|
||
from fastapi.responses import JSONResponse
|
||
from psycopg2.extras import execute_values
|
||
|
||
from ts_shared_rev import (
|
||
close_pool,
|
||
get_conn,
|
||
log_ingestion,
|
||
clean,
|
||
clean_num,
|
||
clean_int,
|
||
clean_ts,
|
||
is_valid_fix,
|
||
get_logger,
|
||
)
|
||
|
||
log = get_logger("webhook")
|
||
|
||
# ── Configuration ─────────────────────────────────────────────────────────────
|
||
|
||
WEBHOOK_TOKEN = os.getenv("JIMI_WEBHOOK_TOKEN", "")
|
||
|
||
# ── Lifespan ──────────────────────────────────────────────────────────────────
|
||
|
||
@asynccontextmanager
|
||
async def lifespan(app: FastAPI):
|
||
log.info("Webhook receiver starting (v1.1)...")
|
||
yield
|
||
log.info("Webhook receiver shutting down...")
|
||
close_pool()
|
||
|
||
app = FastAPI(title="Tracksolid Webhook Receiver", lifespan=lifespan)
|
||
|
||
# ── Helpers ───────────────────────────────────────────────────────────────────
|
||
|
||
SUCCESS = {"code": 0, "msg": "success"}
|
||
|
||
|
||
def _validate_token(token: str) -> None:
|
||
"""Raise 403 if token is invalid. Skips validation if JIMI_WEBHOOK_TOKEN is empty."""
|
||
if WEBHOOK_TOKEN and not hmac.compare_digest(token, WEBHOOK_TOKEN):
|
||
raise HTTPException(status_code=403, detail="Invalid token")
|
||
|
||
|
||
def _parse_data_list(raw: str) -> list[dict]:
|
||
"""Parse a JSON string into a list of dicts. raw may be a JSON array or single object."""
|
||
try:
|
||
parsed = json.loads(raw)
|
||
items = parsed if isinstance(parsed, list) else [parsed]
|
||
if len(items) > MAX_ITEMS_PER_POST:
|
||
log.warning("data_list truncated: %d items exceeded cap of %d",
|
||
len(items), MAX_ITEMS_PER_POST)
|
||
items = items[:MAX_ITEMS_PER_POST]
|
||
return items
|
||
except (json.JSONDecodeError, TypeError):
|
||
return []
|
||
|
||
|
||
async def _parse_request(request: Request) -> tuple[str, list[dict]]:
|
||
"""Extract token + data_list from either a JSON body or form-encoded body.
|
||
|
||
Jimi's integration push API sends:
|
||
Content-Type: application/json
|
||
Body: {"token": "...", "data_list": [{...}, ...]}
|
||
|
||
Some older/configured endpoints may still use form-encoded. This helper
|
||
handles both so each endpoint doesn't need to know which format arrived.
|
||
"""
|
||
content_type = request.headers.get("content-type", "")
|
||
body = await request.body()
|
||
|
||
# TEMP DIAGNOSTIC: log every push so we can see what Jimi actually sends.
|
||
log.info("push %s: content-type=%r body=%.300s",
|
||
request.url.path, content_type,
|
||
body.decode("utf-8", errors="replace") if body else "<empty>")
|
||
|
||
if not body:
|
||
return "", []
|
||
|
||
# ── Try JSON body first (integration push format) ──────────────────────────
|
||
if "application/json" in content_type or body.lstrip()[:1] == b"{":
|
||
try:
|
||
payload = json.loads(body)
|
||
token = str(payload.get("token", ""))
|
||
raw_dl = payload.get("data_list", [])
|
||
if isinstance(raw_dl, list):
|
||
items = raw_dl[:MAX_ITEMS_PER_POST]
|
||
elif isinstance(raw_dl, str):
|
||
items = _parse_data_list(raw_dl)
|
||
else:
|
||
items = []
|
||
log.info("push: parsed JSON body — %d items", len(items))
|
||
return token, items
|
||
except (json.JSONDecodeError, TypeError):
|
||
log.warning("push: JSON body parse failed")
|
||
|
||
# ── Fall back to form-encoded ───────────────────────────────────────────────
|
||
try:
|
||
form = await request.form()
|
||
token = str(form.get("token", ""))
|
||
raw_dl = str(form.get("data_list", ""))
|
||
items = _parse_data_list(raw_dl) if raw_dl else []
|
||
log.info("push: parsed form body — %d items", len(items))
|
||
return token, items
|
||
except Exception:
|
||
log.warning("push: form body parse failed", exc_info=True)
|
||
|
||
return "", []
|
||
|
||
|
||
def unix_to_ts(v) -> Optional[str]:
|
||
"""Convert Unix timestamp (seconds or milliseconds) to ISO string."""
|
||
if v is None:
|
||
return None
|
||
try:
|
||
ts = int(v)
|
||
if ts > 1e12:
|
||
ts = ts // 1000
|
||
return datetime.fromtimestamp(ts, tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
|
||
except (ValueError, TypeError, OSError):
|
||
return None
|
||
|
||
|
||
def _parse_trip_ts(v) -> Optional[str]:
|
||
"""[BUG-03] Parse trip timestamps. Handles ISO strings and Jimi BCD formats."""
|
||
iso = clean_ts(v)
|
||
if iso:
|
||
return iso
|
||
s = clean(v)
|
||
if s is None:
|
||
return None
|
||
try:
|
||
if len(s) == 12: # YYMMDDHHmmss
|
||
return datetime.strptime(s, "%y%m%d%H%M%S").strftime("%Y-%m-%d %H:%M:%S")
|
||
if len(s) == 14: # YYYYMMDDHHmmss
|
||
return datetime.strptime(s, "%Y%m%d%H%M%S").strftime("%Y-%m-%d %H:%M:%S")
|
||
except (ValueError, TypeError):
|
||
pass
|
||
log.warning("Cannot parse trip timestamp: %r", v)
|
||
return None
|
||
|
||
|
||
def _make_geom_params(lat, lng):
|
||
"""Return (lng, lat, lng, lat) tuple for the CASE WHEN ST_MakePoint pattern."""
|
||
return (lng, lat, lng, lat)
|
||
|
||
# ── Health Check ──────────────────────────────────────────────────────────────
|
||
|
||
@app.get("/health")
|
||
def health():
|
||
return {"status": "ok"}
|
||
|
||
# ── 1. OBD Diagnostics (Priority 1) ──────────────────────────────────────────
|
||
|
||
@app.post("/pushobd")
|
||
async def push_obd(request: Request):
|
||
token, items = await _parse_request(request)
|
||
_validate_token(token)
|
||
if not items:
|
||
return JSONResponse(content=SUCCESS)
|
||
|
||
t0 = time.time()
|
||
inserted = 0
|
||
|
||
with get_conn() as conn:
|
||
with conn.cursor() as cur:
|
||
for item in items:
|
||
try:
|
||
cur.execute("SAVEPOINT sp")
|
||
imei = clean(item.get("deviceImei"))
|
||
obd = item.get("obdJson", {})
|
||
if isinstance(obd, str):
|
||
try:
|
||
obd = json.loads(obd)
|
||
except json.JSONDecodeError:
|
||
obd = {}
|
||
|
||
# [BUG-01] Try unix epoch first, fall back to ISO string.
|
||
event_time = (
|
||
unix_to_ts(obd.get("event_time"))
|
||
or clean_ts(obd.get("event_time"))
|
||
)
|
||
if not imei or not event_time:
|
||
cur.execute("RELEASE SAVEPOINT sp")
|
||
continue
|
||
|
||
lat = clean_num(obd.get("lat"))
|
||
lng = clean_num(obd.get("lng"))
|
||
|
||
cur.execute("""
|
||
INSERT INTO tracksolid.obd_readings (
|
||
imei, reading_time, car_type, acc_state, status_flags,
|
||
lat, lng, geom, obd_data, updated_at
|
||
) VALUES (
|
||
%s, %s, %s, %s, %s, %s, %s,
|
||
CASE WHEN %s IS NOT NULL AND %s IS NOT NULL
|
||
THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326)
|
||
ELSE NULL END,
|
||
%s, NOW()
|
||
) ON CONFLICT (imei, reading_time) DO UPDATE SET
|
||
obd_data = EXCLUDED.obd_data,
|
||
updated_at = NOW()
|
||
""", (
|
||
imei, event_time,
|
||
clean_int(obd.get("car_type")),
|
||
clean_int(obd.get("AccState")),
|
||
clean_int(obd.get("statusFlags")),
|
||
lat, lng,
|
||
*_make_geom_params(lat, lng),
|
||
json.dumps(obd),
|
||
))
|
||
cur.execute("RELEASE SAVEPOINT sp")
|
||
inserted += 1
|
||
except Exception:
|
||
cur.execute("ROLLBACK TO SAVEPOINT sp")
|
||
log.warning("Failed to process OBD item for %s", item.get("deviceImei"), exc_info=True)
|
||
|
||
log_ingestion(cur, "webhook/pushobd", len(items), 0, inserted,
|
||
int((time.time() - t0) * 1000), True)
|
||
|
||
log.info("pushobd: %d/%d items processed.", inserted, len(items))
|
||
return JSONResponse(content=SUCCESS)
|
||
|
||
# ── 2. DTC Fault Codes (Priority 1) ──────────────────────────────────────────
|
||
|
||
@app.post("/pushfaultinfo")
|
||
async def push_fault_info(request: Request):
|
||
token, items = await _parse_request(request)
|
||
_validate_token(token)
|
||
if not items:
|
||
return JSONResponse(content=SUCCESS)
|
||
|
||
t0 = time.time()
|
||
inserted = 0
|
||
|
||
with get_conn() as conn:
|
||
with conn.cursor() as cur:
|
||
for item in items:
|
||
try:
|
||
cur.execute("SAVEPOINT sp")
|
||
imei = clean(item.get("deviceImei"))
|
||
gate_time = clean_ts(item.get("gateTime"))
|
||
if not imei or not gate_time:
|
||
cur.execute("RELEASE SAVEPOINT sp")
|
||
continue
|
||
|
||
fault_codes = item.get("faultCodeList", [])
|
||
if isinstance(fault_codes, str):
|
||
try:
|
||
fault_codes = json.loads(fault_codes)
|
||
except json.JSONDecodeError:
|
||
fault_codes = []
|
||
|
||
lat = clean_num(item.get("lat"))
|
||
lng = clean_num(item.get("lng"))
|
||
evt_time = unix_to_ts(item.get("eventTime")) or clean_ts(item.get("eventTime"))
|
||
|
||
for code in fault_codes:
|
||
fault_code = clean(code)
|
||
# Guard NULL: ON CONFLICT won't deduplicate NULL fault_codes
|
||
# because NULL != NULL in Postgres unique constraints.
|
||
if not fault_code:
|
||
continue
|
||
cur.execute("""
|
||
INSERT INTO tracksolid.fault_codes (
|
||
imei, reported_at, fault_code, status_flags,
|
||
lat, lng, geom, event_time
|
||
) VALUES (
|
||
%s, %s, %s, %s, %s, %s,
|
||
CASE WHEN %s IS NOT NULL AND %s IS NOT NULL
|
||
THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326)
|
||
ELSE NULL END,
|
||
%s
|
||
) ON CONFLICT (imei, reported_at, fault_code) DO NOTHING
|
||
""", (
|
||
imei, gate_time, fault_code,
|
||
clean_int(item.get("statusFlags")),
|
||
lat, lng,
|
||
*_make_geom_params(lat, lng),
|
||
evt_time,
|
||
))
|
||
inserted += 1
|
||
cur.execute("RELEASE SAVEPOINT sp")
|
||
except Exception:
|
||
cur.execute("ROLLBACK TO SAVEPOINT sp")
|
||
log.warning("Failed to process fault item for %s", item.get("deviceImei"), exc_info=True)
|
||
|
||
log_ingestion(cur, "webhook/pushfaultinfo", len(items), 0, inserted,
|
||
int((time.time() - t0) * 1000), True)
|
||
|
||
log.info("pushfaultinfo: %d fault codes from %d items.", inserted, len(items))
|
||
return JSONResponse(content=SUCCESS)
|
||
|
||
# ── 3. Alarm Events (Priority 2) ─────────────────────────────────────────────
|
||
|
||
@app.post("/pushalarm")
|
||
async def push_alarm(request: Request):
|
||
token, items = await _parse_request(request)
|
||
_validate_token(token)
|
||
if not items:
|
||
return JSONResponse(content=SUCCESS)
|
||
|
||
t0 = time.time()
|
||
inserted = 0
|
||
|
||
with get_conn() as conn:
|
||
with conn.cursor() as cur:
|
||
for item in items:
|
||
try:
|
||
cur.execute("SAVEPOINT sp")
|
||
imei = clean(item.get("deviceImei"))
|
||
alarm_type = clean(item.get("alarmType"))
|
||
alarm_time = clean_ts(item.get("gateTime"))
|
||
# [BUG-02] Also guard alarm_type — NULL alarm_type violates NOT NULL constraint.
|
||
if not imei or not alarm_time or not alarm_type:
|
||
cur.execute("RELEASE SAVEPOINT sp")
|
||
continue
|
||
|
||
lat = clean_num(item.get("lat"))
|
||
lng = clean_num(item.get("lng"))
|
||
|
||
cur.execute("""
|
||
INSERT INTO tracksolid.alarms (
|
||
imei, alarm_type, alarm_name, alarm_time, geom,
|
||
lat, lng, speed, source, updated_at
|
||
) VALUES (
|
||
%s, %s, %s, %s,
|
||
CASE WHEN %s IS NOT NULL AND %s IS NOT NULL
|
||
THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326)
|
||
ELSE NULL END,
|
||
%s, %s, %s, 'push', NOW()
|
||
) ON CONFLICT (imei, alarm_type, alarm_time) DO NOTHING
|
||
""", (
|
||
imei, alarm_type, clean(item.get("alarmName")), alarm_time,
|
||
*_make_geom_params(lat, lng),
|
||
lat, lng,
|
||
clean_num(item.get("speed")),
|
||
))
|
||
cur.execute("RELEASE SAVEPOINT sp")
|
||
inserted += 1
|
||
except Exception:
|
||
cur.execute("ROLLBACK TO SAVEPOINT sp")
|
||
log.warning("Failed to process alarm for %s", item.get("deviceImei"), exc_info=True)
|
||
|
||
log_ingestion(cur, "webhook/pushalarm", len(items), 0, inserted,
|
||
int((time.time() - t0) * 1000), True)
|
||
|
||
log.info("pushalarm: %d/%d items processed.", inserted, len(items))
|
||
return JSONResponse(content=SUCCESS)
|
||
|
||
# ── 4. GPS Positions (Priority 2) ────────────────────────────────────────────
|
||
|
||
@app.post("/pushgps")
|
||
async def push_gps(request: Request):
|
||
token, items = await _parse_request(request)
|
||
_validate_token(token)
|
||
if not items:
|
||
return JSONResponse(content=SUCCESS)
|
||
|
||
t0 = time.time()
|
||
# Validation phase — pre-clean and filter without touching the DB.
|
||
# Per-row INSERT with SAVEPOINT was ~1 ms/row overhead at this volume;
|
||
# one batched execute_values is 10-50× faster for the same rows.
|
||
rows = []
|
||
for item in items:
|
||
imei = clean(item.get("deviceImei"))
|
||
gps_time = clean_ts(item.get("gpsTime"))
|
||
lat = clean_num(item.get("lat"))
|
||
lng = clean_num(item.get("lng"))
|
||
if not imei or not gps_time or not is_valid_fix(lat, lng):
|
||
continue
|
||
rows.append((
|
||
imei, gps_time, lng, lat, lat, lng,
|
||
clean_num(item.get("gpsSpeed")),
|
||
clean_num(item.get("direction")),
|
||
str(item.get("acc")) if item.get("acc") is not None else None,
|
||
clean_int(item.get("satelliteNum")),
|
||
clean_num(item.get("distance")),
|
||
clean_num(item.get("altitude")),
|
||
clean_int(item.get("postType")),
|
||
))
|
||
|
||
inserted = 0
|
||
if rows:
|
||
with get_conn() as conn:
|
||
with conn.cursor() as cur:
|
||
execute_values(
|
||
cur,
|
||
"""
|
||
INSERT INTO tracksolid.position_history (
|
||
imei, gps_time, geom, lat, lng, speed, direction,
|
||
acc_status, satellite, current_mileage,
|
||
altitude, post_type, source
|
||
) VALUES %s
|
||
ON CONFLICT (imei, gps_time) DO NOTHING
|
||
""",
|
||
rows,
|
||
template="(%s, %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326),"
|
||
" %s, %s, %s, %s, %s, %s, %s, %s, %s, 'push')",
|
||
page_size=len(rows),
|
||
)
|
||
inserted = cur.rowcount
|
||
log_ingestion(cur, "webhook/pushgps", len(items), 0, inserted,
|
||
int((time.time() - t0) * 1000), True)
|
||
else:
|
||
# No valid rows, still record the call for observability.
|
||
with get_conn() as conn:
|
||
with conn.cursor() as cur:
|
||
log_ingestion(cur, "webhook/pushgps", len(items), 0, 0,
|
||
int((time.time() - t0) * 1000), True)
|
||
|
||
log.info("pushgps: %d/%d items inserted.", inserted, len(items))
|
||
return JSONResponse(content=SUCCESS)
|
||
|
||
# ── 5. Device Heartbeats (Priority 2) ────────────────────────────────────────
|
||
|
||
@app.post("/pushhb")
|
||
async def push_heartbeat(request: Request):
|
||
token, items = await _parse_request(request)
|
||
_validate_token(token)
|
||
if not items:
|
||
return JSONResponse(content=SUCCESS)
|
||
|
||
t0 = time.time()
|
||
inserted = 0
|
||
|
||
with get_conn() as conn:
|
||
with conn.cursor() as cur:
|
||
for item in items:
|
||
try:
|
||
cur.execute("SAVEPOINT sp")
|
||
imei = clean(item.get("deviceImei"))
|
||
gate_time = clean_ts(item.get("gateTime"))
|
||
if not imei or not gate_time:
|
||
cur.execute("RELEASE SAVEPOINT sp")
|
||
continue
|
||
|
||
cur.execute("""
|
||
INSERT INTO tracksolid.heartbeats (
|
||
imei, gate_time, power_level, gsm_signal,
|
||
acc_status, power_status, fortify
|
||
) VALUES (%s, %s, %s, %s, %s, %s, %s)
|
||
ON CONFLICT (imei, gate_time) DO NOTHING
|
||
""", (
|
||
imei, gate_time,
|
||
clean_int(item.get("powerLevel")),
|
||
clean_int(item.get("gsmSign")),
|
||
clean_int(item.get("acc")),
|
||
clean_int(item.get("powerStatus")),
|
||
clean_int(item.get("fortify")),
|
||
))
|
||
cur.execute("RELEASE SAVEPOINT sp")
|
||
inserted += 1
|
||
except Exception:
|
||
cur.execute("ROLLBACK TO SAVEPOINT sp")
|
||
log.warning("Failed to process heartbeat for %s", item.get("deviceImei"), exc_info=True)
|
||
|
||
log_ingestion(cur, "webhook/pushhb", len(items), 0, inserted,
|
||
int((time.time() - t0) * 1000), True)
|
||
|
||
log.info("pushhb: %d/%d items processed.", inserted, len(items))
|
||
return JSONResponse(content=SUCCESS)
|
||
|
||
# ── 6. Trip Reports (Priority 2) ─────────────────────────────────────────────
|
||
|
||
@app.post("/pushtripreport")
|
||
async def push_trip_report(request: Request):
|
||
token, items = await _parse_request(request)
|
||
_validate_token(token)
|
||
if not items:
|
||
return JSONResponse(content=SUCCESS)
|
||
|
||
t0 = time.time()
|
||
inserted = 0
|
||
|
||
with get_conn() as conn:
|
||
with conn.cursor() as cur:
|
||
for item in items:
|
||
try:
|
||
cur.execute("SAVEPOINT sp")
|
||
imei = clean(item.get("deviceImei"))
|
||
# [BUG-03] Use _parse_trip_ts to handle Jimi BCD format YYMMDDHHmmss.
|
||
begin_time = _parse_trip_ts(item.get("beginTime"))
|
||
end_time = _parse_trip_ts(item.get("endTime"))
|
||
if not imei or not begin_time:
|
||
cur.execute("RELEASE SAVEPOINT sp")
|
||
continue
|
||
|
||
# [FIX-M11] API sends km. Store directly as distance_km.
|
||
distance_km = clean_num(item.get("miles"))
|
||
|
||
begin_lat = clean_num(item.get("beginLat"))
|
||
begin_lng = clean_num(item.get("beginLng"))
|
||
end_lat = clean_num(item.get("endLat"))
|
||
end_lng = clean_num(item.get("endLng"))
|
||
|
||
cur.execute("""
|
||
INSERT INTO tracksolid.trips (
|
||
imei, start_time, end_time, distance_km,
|
||
start_geom, end_geom,
|
||
fuel_consumed_l, idle_time_s, trip_seq, source,
|
||
updated_at
|
||
) VALUES (
|
||
%s, %s, %s, %s,
|
||
CASE WHEN %s IS NOT NULL AND %s IS NOT NULL
|
||
THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326)
|
||
ELSE NULL END,
|
||
CASE WHEN %s IS NOT NULL AND %s IS NOT NULL
|
||
THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326)
|
||
ELSE NULL END,
|
||
%s, %s, %s, 'push', NOW()
|
||
) ON CONFLICT (imei, start_time) DO UPDATE SET
|
||
end_time = EXCLUDED.end_time,
|
||
distance_km = EXCLUDED.distance_km,
|
||
end_geom = EXCLUDED.end_geom,
|
||
fuel_consumed_l = EXCLUDED.fuel_consumed_l,
|
||
idle_time_s = EXCLUDED.idle_time_s,
|
||
updated_at = NOW()
|
||
""", (
|
||
imei, begin_time, end_time, distance_km,
|
||
begin_lng, begin_lat, begin_lng, begin_lat,
|
||
end_lng, end_lat, end_lng, end_lat,
|
||
clean_num(item.get("oils")),
|
||
clean_int(item.get("idleTimes")),
|
||
clean_int(item.get("tripSeq")),
|
||
))
|
||
cur.execute("RELEASE SAVEPOINT sp")
|
||
inserted += 1
|
||
except Exception:
|
||
cur.execute("ROLLBACK TO SAVEPOINT sp")
|
||
log.warning("Failed to process trip for %s", item.get("deviceImei"), exc_info=True)
|
||
|
||
log_ingestion(cur, "webhook/pushtripreport", len(items), 0, inserted,
|
||
int((time.time() - t0) * 1000), True)
|
||
|
||
log.info("pushtripreport: %d/%d items processed.", inserted, len(items))
|
||
return JSONResponse(content=SUCCESS)
|
||
|
||
# ── 7. Device Events (LOGIN / LOGOUT) ────────────────────────────────────────
|
||
|
||
@app.post("/pushevent")
|
||
async def push_event(request: Request):
|
||
token, items = await _parse_request(request)
|
||
_validate_token(token)
|
||
if not items:
|
||
return JSONResponse(content=SUCCESS)
|
||
|
||
t0 = time.time()
|
||
inserted = 0
|
||
|
||
with get_conn() as conn:
|
||
with conn.cursor() as cur:
|
||
for item in items:
|
||
try:
|
||
cur.execute("SAVEPOINT sp")
|
||
imei = clean(item.get("deviceImei"))
|
||
event_type = clean(item.get("type"))
|
||
event_time = clean_ts(item.get("gateTime"))
|
||
if not imei or not event_type or not event_time:
|
||
cur.execute("RELEASE SAVEPOINT sp")
|
||
continue
|
||
|
||
cur.execute("""
|
||
INSERT INTO tracksolid.device_events
|
||
(imei, event_type, event_time, timezone)
|
||
VALUES (%s, %s, %s, %s)
|
||
ON CONFLICT (imei, event_type, event_time) DO NOTHING
|
||
""", (imei, event_type, event_time, clean(item.get("timezone"))))
|
||
cur.execute("RELEASE SAVEPOINT sp")
|
||
inserted += 1
|
||
except Exception:
|
||
cur.execute("ROLLBACK TO SAVEPOINT sp")
|
||
log.warning("Failed to process event for %s", item.get("deviceImei"), exc_info=True)
|
||
|
||
log_ingestion(cur, "webhook/pushevent", len(items), 0, inserted,
|
||
int((time.time() - t0) * 1000), True)
|
||
|
||
log.info("pushevent: %d/%d items processed.", inserted, len(items))
|
||
return JSONResponse(content=SUCCESS)
|