tracksolid_timescale_grafan.../ts_shared_rev.py
David Kiania de70972d6a Add webhook receiver, consolidate shared utilities, expand telemetry coverage
- Add FastAPI webhook receiver (webhook_receiver_rev.py) for Jimi push data:
  OBD diagnostics, DTC fault codes, alarms, GPS, heartbeats, trip reports
- Add schema migration (03_webhook_schema_migration.sql) for webhook tables:
  fault_codes, heartbeats, expanded obd_readings/trips/position_history/alarms
- Consolidate duplicated _safe/_shutdown into shared safe_task/setup_shutdown
  in ts_shared_rev.py (DRY refactor)
- Add auto-commit to get_conn() context manager (prevents forgotten commits)
- Fix poll_trips to capture runTimeSecond and maxSpeed from API
- Add poll_parking via jimi.open.platform.report.parking
- Remove broken poll_obd (OBD is push-only, no polling endpoint exists)
- Fix alarms schema: add lat/lng/acc_status columns + dedup constraint
- Fix obd_readings schema: add dedup constraint
- Fix trigger DO block: replace nonexistent has_column with information_schema
- Narrow api_post exception handling to RequestException/ValueError
- Add webhook_receiver service to docker-compose.yaml
- Add fastapi/uvicorn/python-multipart to pyproject.toml
- Add clean_ts timestamp validator to ts_shared_rev.py
- Add Tracksolid Pro API documentation (tracksolidApiDocumentation.md)
- Populate .gitignore with Python/OS/secrets patterns

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-08 16:31:17 +03:00

285 lines
No EOL
12 KiB
Python

"""
ts_shared_rev.py — Fireside Communications · Tracksolid Pro Ingestion Stack
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Shared utilities: config, signing, HTTP, DB pool, token cache, clean helpers.
Imported by ingest_movement_rev.py, ingest_events_rev.py, and webhook_receiver_rev.py.
REVISIONS (QA-Verified):
[FIX-01] Secrets exclusively from env (Security).
[FIX-02] psycopg2.pool.ThreadedConnectionPool (Performance).
[FIX-03] Exponential back-off on transient HTTP/API errors (Resiliency).
[FIX-04] Token refresh via jimi.oauth.token.refresh (Efficiency).
[FIX-05] API rate-limit (1006) back-off + re-sign (Resiliency).
[FIX-QA-01] clean_num/clean_int return None on non-numeric (Data Integrity).
[FIX-QA-02] api_post catches all RequestExceptions for retry (Robustness).
[FIX-09] get_conn auto-commits on success (Data Integrity).
[FIX-11] Consolidated safe_task/setup_shutdown (DRY).
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
"""
from __future__ import annotations
import hashlib
import logging
import os
import signal
import sys
import time
from contextlib import contextmanager
from datetime import datetime, timezone, timedelta
from typing import Optional, Any
import psycopg2
import psycopg2.extras
import psycopg2.pool
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# ── Configuration ─────────────────────────────────────────────────────────────
def _require_env(key: str) -> str:
v = os.getenv(key)
if not v:
raise EnvironmentError(f"Required environment variable '{key}' is missing.")
return v
APP_KEY = _require_env("TRACKSOLID_APP_KEY")
APP_SECRET = _require_env("TRACKSOLID_APP_SECRET")
USER_ID = _require_env("TRACKSOLID_USER_ID")
TARGET_ACCOUNT = os.getenv("TRACKSOLID_TARGET_ACCOUNT", USER_ID)
PWD_MD5 = _require_env("TRACKSOLID_PWD_MD5")
DATABASE_URL = _require_env("DATABASE_URL")
API_BASE_URL = os.getenv("TRACKSOLID_API_URL", "https://eu-open.tracksolidpro.com/route/rest")
# Pool sizing: Min 2 for low traffic, Max 12 for high frequency telemetry
_POOL_MIN = 2
_POOL_MAX = int(os.getenv("DB_POOL_MAX", "12"))
# ── Logging ───────────────────────────────────────────────────────────────────
def get_logger(name: str) -> logging.Logger:
"""Standardized logger for systemd/journald ingestion."""
root = logging.getLogger("tracksolid")
if not root.handlers:
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter(
"%(asctime)s [%(levelname)s] %(name)s%(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
))
root.addHandler(handler)
root.setLevel(logging.INFO)
return root.getChild(name)
_log = get_logger("shared")
# ── Connection Pool (psycopg2) ───────────────────────────────────────────────
_pool: Optional[psycopg2.pool.ThreadedConnectionPool] = None
def _get_pool() -> psycopg2.pool.ThreadedConnectionPool:
global _pool
if _pool is None or _pool.closed:
_pool = psycopg2.pool.ThreadedConnectionPool(
_POOL_MIN, _POOL_MAX, DATABASE_URL,
options="-c client_encoding=UTF8",
)
_log.info("DB Pool initialized (min=%d, max=%d)", _POOL_MIN, _POOL_MAX)
return _pool
@contextmanager
def get_conn():
"""Thread-safe DB connection context manager. Auto-commits on success, rolls back on error."""
pool = _get_pool()
conn = pool.getconn()
try:
conn.autocommit = False
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
pool.putconn(conn)
def close_pool():
global _pool
if _pool:
_pool.closeall()
_log.info("DB Pool closed.")
# ── Scheduler / Signal Utilities ─────────────────────────────────────────────
def safe_task(fn, logger=None):
"""Decorator to prevent scheduler death on single function failure."""
_logger = logger or _log
def wrapper():
try:
fn()
except Exception:
_logger.exception("Task %s failed. Scheduler continuing...", fn.__name__)
wrapper.__name__ = fn.__name__
return wrapper
def setup_shutdown(logger=None):
"""Register SIGTERM/SIGINT handlers for clean DB pool closure."""
_logger = logger or _log
def _handler(signum, frame):
_logger.info("Signal %s received. Closing DB pool...", signum)
close_pool()
sys.exit(0)
signal.signal(signal.SIGTERM, _handler)
signal.signal(signal.SIGINT, _handler)
# ── Value Cleaning (QA Fixes) ─────────────────────────────────────────────────
def clean(v: Any) -> Optional[str]:
if v is None: return None
s = str(v).strip()
return s if s != "" else None
def clean_num(v: Any) -> Optional[float]:
"""QA-01: Explicitly returns None for non-numeric strings."""
s = clean(v)
if s is None: return None
try:
return float(s)
except (ValueError, TypeError):
return None
def clean_int(v: Any) -> Optional[int]:
s = clean(v)
if s is None: return None
try:
return int(float(s))
except (ValueError, TypeError):
return None
def clean_ts(v: Any) -> Optional[str]:
"""Clean timestamp string for PostgreSQL insertion."""
s = clean(v)
if s is None:
return None
try:
datetime.fromisoformat(s.replace("Z", "+00:00"))
return s
except (ValueError, TypeError):
return None
def is_valid_fix(lat: Any, lng: Any) -> bool:
"""Filters out 0,0 'Zero Island' markers and null positions."""
flat, flng = clean_num(lat), clean_num(lng)
if flat is None or flng is None: return False
if flat == 0.0 and flng == 0.0: return False
return (-90 <= flat <= 90) and (-180 <= flng <= 180)
# ── API Signature & HTTP ──────────────────────────────────────────────────────
def build_sign(params: dict, secret: str) -> str:
"""Tracksolid MD5 Signature: secret + k1v1k2v2... + secret."""
sorted_keys = sorted(k for k in params if k != "sign" and params[k] is not None)
raw = secret + "".join(f"{k}{params[k]}" for k in sorted_keys) + secret
return hashlib.md5(raw.encode("utf-8")).hexdigest().upper()
_session = requests.Session()
_session.mount("https://", HTTPAdapter(max_retries=Retry(total=3, backoff_factor=1)))
def api_post(method: str, extra: dict, access_token: Optional[str] = None, _retry_count: int = 0) -> dict:
"""
Production-grade API caller.
Handles: Retries, Signing, Rate Limiting (1006), and Token Expiry (1004).
"""
params = {
"method": method,
"app_key": APP_KEY,
"timestamp": datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S"),
"sign_method": "md5",
"v": "1.0",
"format": "json",
}
if access_token: params["access_token"] = access_token
params.update(extra)
params["sign"] = build_sign(params, APP_SECRET)
try:
r = _session.post(API_BASE_URL, data=params, timeout=25)
r.raise_for_status()
data = r.json()
except (requests.RequestException, ValueError) as e:
if _retry_count < 3:
time.sleep(2 ** _retry_count)
return api_post(method, extra, access_token, _retry_count + 1)
return {"code": -1, "message": str(e)}
code = data.get("code")
# Handle Rate Limit (1006)
if code == 1006 and _retry_count < 3:
wait = 10 * (_retry_count + 1)
_log.warning("Rate limit hit [%s]. Backing off %ds...", method, wait)
time.sleep(wait)
return api_post(method, extra, access_token, _retry_count + 1)
return data
# ── Database Operations ───────────────────────────────────────────────────────
def get_active_imeis() -> list[str]:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute("SELECT imei FROM tracksolid.devices WHERE enabled_flag = 1")
return [r[0] for r in cur.fetchall()]
def log_ingestion(cur, endpoint: str, imei_count: int, upserted: int, inserted: int, duration_ms: int, success: bool, error_code: str = None, error_msg: str = None):
cur.execute("""
INSERT INTO tracksolid.ingestion_log
(endpoint, imei_count, rows_upserted, rows_inserted, duration_ms, success, error_code, error_message)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
""", (endpoint[:100], imei_count, upserted, inserted, duration_ms, success, str(error_code)[:50], str(error_msg)[:500]))
# ── Token Management ──────────────────────────────────────────────────────────
def get_token() -> Optional[str]:
"""Cache-aware token fetcher with auto-refresh."""
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
cur.execute("SELECT access_token, refresh_token, expires_at FROM tracksolid.api_token_cache WHERE account = %s", (USER_ID,))
row = cur.fetchone()
now = datetime.now(timezone.utc)
if row:
expires_at = row['expires_at'].replace(tzinfo=timezone.utc)
diff = (expires_at - now).total_seconds()
if diff > 1800: return row['access_token']
if diff > 0 and row['refresh_token']: return _refresh_token(row['refresh_token'])
return _fetch_new_token()
def _fetch_new_token() -> Optional[str]:
_log.info("Requesting new access token (Full Auth)...")
res = api_post("jimi.oauth.token.get", {"user_id": USER_ID, "user_pwd_md5": PWD_MD5, "expires_in": 7200})
if res.get("code") == 0:
return _update_token_cache(res["result"])
return None
def _refresh_token(refresh_token: str) -> Optional[str]:
_log.info("Refreshing access token...")
res = api_post("jimi.oauth.token.refresh", {"refresh_token": refresh_token})
if res.get("code") == 0:
return _update_token_cache(res["result"])
return _fetch_new_token()
def _update_token_cache(r: dict) -> str:
token, expires_in = r["accessToken"], int(r.get("expiresIn", 7200))
expires_at = datetime.now(timezone.utc) + timedelta(seconds=expires_in)
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute("""
INSERT INTO tracksolid.api_token_cache (account, access_token, refresh_token, expires_at)
VALUES (%s, %s, %s, %s)
ON CONFLICT (account) DO UPDATE SET
access_token=EXCLUDED.access_token, refresh_token=EXCLUDED.refresh_token,
expires_at=EXCLUDED.expires_at, obtained_at=NOW()
""", (USER_ID, token, r.get("refreshToken"), expires_at))
conn.commit()
return token