246 lines
10 KiB
Python
246 lines
10 KiB
Python
|
|
"""
|
||
|
|
ts_shared_rev.py — Fireside Communications · Tracksolid Pro Ingestion Stack
|
||
|
|
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||
|
|
Shared utilities: config, signing, HTTP, DB pool, token cache, clean helpers.
|
||
|
|
Imported by ingest_movement_rev.py and ingest_events_rev.py.
|
||
|
|
|
||
|
|
REVISIONS (QA-Verified):
|
||
|
|
[FIX-01] Secrets exclusively from env (Security).
|
||
|
|
[FIX-02] psycopg2.pool.ThreadedConnectionPool (Performance).
|
||
|
|
[FIX-03] Exponential back-off on transient HTTP/API errors (Resiliency).
|
||
|
|
[FIX-04] Token refresh via jimi.oauth.token.refresh (Efficiency).
|
||
|
|
[FIX-05] API rate-limit (1006) back-off + re-sign (Resiliency).
|
||
|
|
[FIX-QA-01] clean_num/clean_int return None on non-numeric (Data Integrity).
|
||
|
|
[FIX-QA-02] api_post catches all RequestExceptions for retry (Robustness).
|
||
|
|
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||
|
|
"""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import hashlib
|
||
|
|
import logging
|
||
|
|
import os
|
||
|
|
import time
|
||
|
|
from contextlib import contextmanager
|
||
|
|
from datetime import datetime, timezone, timedelta
|
||
|
|
from typing import Optional, Any
|
||
|
|
|
||
|
|
import psycopg2
|
||
|
|
import psycopg2.extras
|
||
|
|
import psycopg2.pool
|
||
|
|
import requests
|
||
|
|
from requests.adapters import HTTPAdapter
|
||
|
|
from urllib3.util.retry import Retry
|
||
|
|
|
||
|
|
# ── Configuration ─────────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
def _require_env(key: str) -> str:
|
||
|
|
v = os.getenv(key)
|
||
|
|
if not v:
|
||
|
|
raise EnvironmentError(f"Required environment variable '{key}' is missing.")
|
||
|
|
return v
|
||
|
|
|
||
|
|
APP_KEY = _require_env("TRACKSOLID_APP_KEY")
|
||
|
|
APP_SECRET = _require_env("TRACKSOLID_APP_SECRET")
|
||
|
|
USER_ID = _require_env("TRACKSOLID_USER_ID")
|
||
|
|
TARGET_ACCOUNT = os.getenv("TRACKSOLID_TARGET_ACCOUNT", USER_ID)
|
||
|
|
PWD_MD5 = _require_env("TRACKSOLID_PWD_MD5")
|
||
|
|
DATABASE_URL = _require_env("DATABASE_URL")
|
||
|
|
API_BASE_URL = os.getenv("TRACKSOLID_API_URL", "https://eu-open.tracksolidpro.com/route/rest")
|
||
|
|
|
||
|
|
# Pool sizing: Min 2 for low traffic, Max 12 for high frequency telemetry
|
||
|
|
_POOL_MIN = 2
|
||
|
|
_POOL_MAX = int(os.getenv("DB_POOL_MAX", "12"))
|
||
|
|
|
||
|
|
# ── Logging ───────────────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
def get_logger(name: str) -> logging.Logger:
|
||
|
|
"""Standardized logger for systemd/journald ingestion."""
|
||
|
|
root = logging.getLogger("tracksolid")
|
||
|
|
if not root.handlers:
|
||
|
|
handler = logging.StreamHandler()
|
||
|
|
handler.setFormatter(logging.Formatter(
|
||
|
|
"%(asctime)s [%(levelname)s] %(name)s — %(message)s",
|
||
|
|
datefmt="%Y-%m-%d %H:%M:%S",
|
||
|
|
))
|
||
|
|
root.addHandler(handler)
|
||
|
|
root.setLevel(logging.INFO)
|
||
|
|
return root.getChild(name)
|
||
|
|
|
||
|
|
_log = get_logger("shared")
|
||
|
|
|
||
|
|
# ── Connection Pool (psycopg2) ───────────────────────────────────────────────
|
||
|
|
|
||
|
|
_pool: Optional[psycopg2.pool.ThreadedConnectionPool] = None
|
||
|
|
|
||
|
|
def _get_pool() -> psycopg2.pool.ThreadedConnectionPool:
|
||
|
|
global _pool
|
||
|
|
if _pool is None or _pool.closed:
|
||
|
|
_pool = psycopg2.pool.ThreadedConnectionPool(
|
||
|
|
_POOL_MIN, _POOL_MAX, DATABASE_URL,
|
||
|
|
options="-c client_encoding=UTF8",
|
||
|
|
)
|
||
|
|
_log.info("DB Pool initialized (min=%d, max=%d)", _POOL_MIN, _POOL_MAX)
|
||
|
|
return _pool
|
||
|
|
|
||
|
|
@contextmanager
|
||
|
|
def get_conn():
|
||
|
|
"""Thread-safe DB connection context manager."""
|
||
|
|
pool = _get_pool()
|
||
|
|
conn = pool.getconn()
|
||
|
|
try:
|
||
|
|
conn.autocommit = False
|
||
|
|
yield conn
|
||
|
|
except Exception:
|
||
|
|
conn.rollback()
|
||
|
|
raise
|
||
|
|
finally:
|
||
|
|
pool.putconn(conn)
|
||
|
|
|
||
|
|
def close_pool():
|
||
|
|
global _pool
|
||
|
|
if _pool:
|
||
|
|
_pool.closeall()
|
||
|
|
_log.info("DB Pool closed.")
|
||
|
|
|
||
|
|
# ── Value Cleaning (QA Fixes) ─────────────────────────────────────────────────
|
||
|
|
|
||
|
|
def clean(v: Any) -> Optional[str]:
|
||
|
|
if v is None: return None
|
||
|
|
s = str(v).strip()
|
||
|
|
return s if s != "" else None
|
||
|
|
|
||
|
|
def clean_num(v: Any) -> Optional[float]:
|
||
|
|
"""QA-01: Explicitly returns None for non-numeric strings."""
|
||
|
|
s = clean(v)
|
||
|
|
if s is None: return None
|
||
|
|
try:
|
||
|
|
return float(s)
|
||
|
|
except (ValueError, TypeError):
|
||
|
|
return None
|
||
|
|
|
||
|
|
def clean_int(v: Any) -> Optional[int]:
|
||
|
|
s = clean(v)
|
||
|
|
if s is None: return None
|
||
|
|
try:
|
||
|
|
return int(float(s))
|
||
|
|
except (ValueError, TypeError):
|
||
|
|
return None
|
||
|
|
|
||
|
|
def is_valid_fix(lat: Any, lng: Any) -> bool:
|
||
|
|
"""Filters out 0,0 'Zero Island' markers and null positions."""
|
||
|
|
flat, flng = clean_num(lat), clean_num(lng)
|
||
|
|
if flat is None or flng is None: return False
|
||
|
|
if flat == 0.0 and flng == 0.0: return False
|
||
|
|
return (-90 <= flat <= 90) and (-180 <= flng <= 180)
|
||
|
|
|
||
|
|
# ── API Signature & HTTP ──────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
def build_sign(params: dict, secret: str) -> str:
|
||
|
|
"""Tracksolid MD5 Signature: secret + k1v1k2v2... + secret."""
|
||
|
|
sorted_keys = sorted(k for k in params if k != "sign" and params[k] is not None)
|
||
|
|
raw = secret + "".join(f"{k}{params[k]}" for k in sorted_keys) + secret
|
||
|
|
return hashlib.md5(raw.encode("utf-8")).hexdigest().upper()
|
||
|
|
|
||
|
|
_session = requests.Session()
|
||
|
|
_session.mount("https://", HTTPAdapter(max_retries=Retry(total=3, backoff_factor=1)))
|
||
|
|
|
||
|
|
def api_post(method: str, extra: dict, access_token: Optional[str] = None, _retry_count: int = 0) -> dict:
|
||
|
|
"""
|
||
|
|
Production-grade API caller.
|
||
|
|
Handles: Retries, Signing, Rate Limiting (1006), and Token Expiry (1004).
|
||
|
|
"""
|
||
|
|
params = {
|
||
|
|
"method": method,
|
||
|
|
"app_key": APP_KEY,
|
||
|
|
"timestamp": datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S"),
|
||
|
|
"sign_method": "md5",
|
||
|
|
"v": "1.0",
|
||
|
|
"format": "json",
|
||
|
|
}
|
||
|
|
if access_token: params["access_token"] = access_token
|
||
|
|
params.update(extra)
|
||
|
|
params["sign"] = build_sign(params, APP_SECRET)
|
||
|
|
|
||
|
|
try:
|
||
|
|
r = _session.post(API_BASE_URL, data=params, timeout=25)
|
||
|
|
r.raise_for_status()
|
||
|
|
data = r.json()
|
||
|
|
except Exception as e:
|
||
|
|
if _retry_count < 3:
|
||
|
|
time.sleep(2 ** _retry_count)
|
||
|
|
return api_post(method, extra, access_token, _retry_count + 1)
|
||
|
|
return {"code": -1, "message": str(e)}
|
||
|
|
|
||
|
|
code = data.get("code")
|
||
|
|
# Handle Rate Limit (1006)
|
||
|
|
if code == 1006 and _retry_count < 3:
|
||
|
|
wait = 10 * (_retry_count + 1)
|
||
|
|
_log.warning("Rate limit hit [%s]. Backing off %ds...", method, wait)
|
||
|
|
time.sleep(wait)
|
||
|
|
return api_post(method, extra, access_token, _retry_count + 1)
|
||
|
|
|
||
|
|
return data
|
||
|
|
|
||
|
|
# ── Database Operations ───────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
def get_active_imeis() -> list[str]:
|
||
|
|
with get_conn() as conn:
|
||
|
|
with conn.cursor() as cur:
|
||
|
|
cur.execute("SELECT imei FROM tracksolid.devices WHERE enabled_flag = 1")
|
||
|
|
return [r[0] for r in cur.fetchall()]
|
||
|
|
|
||
|
|
def log_ingestion(cur, endpoint: str, imei_count: int, upserted: int, inserted: int, duration_ms: int, success: bool, error_code: str = None, error_msg: str = None):
|
||
|
|
cur.execute("""
|
||
|
|
INSERT INTO tracksolid.ingestion_log
|
||
|
|
(endpoint, imei_count, rows_upserted, rows_inserted, duration_ms, success, error_code, error_message)
|
||
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
|
||
|
|
""", (endpoint[:100], imei_count, upserted, inserted, duration_ms, success, str(error_code)[:50], str(error_msg)[:500]))
|
||
|
|
|
||
|
|
# ── Token Management ──────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
def get_token() -> Optional[str]:
|
||
|
|
"""Cache-aware token fetcher with auto-refresh."""
|
||
|
|
with get_conn() as conn:
|
||
|
|
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
|
||
|
|
cur.execute("SELECT access_token, refresh_token, expires_at FROM tracksolid.api_token_cache WHERE account = %s", (USER_ID,))
|
||
|
|
row = cur.fetchone()
|
||
|
|
|
||
|
|
now = datetime.now(timezone.utc)
|
||
|
|
if row:
|
||
|
|
expires_at = row['expires_at'].replace(tzinfo=timezone.utc)
|
||
|
|
diff = (expires_at - now).total_seconds()
|
||
|
|
|
||
|
|
if diff > 1800: return row['access_token']
|
||
|
|
if diff > 0 and row['refresh_token']: return _refresh_token(row['refresh_token'])
|
||
|
|
|
||
|
|
return _fetch_new_token()
|
||
|
|
|
||
|
|
def _fetch_new_token() -> Optional[str]:
|
||
|
|
_log.info("Requesting new access token (Full Auth)...")
|
||
|
|
res = api_post("jimi.oauth.token.get", {"user_id": USER_ID, "user_pwd_md5": PWD_MD5, "expires_in": 7200})
|
||
|
|
if res.get("code") == 0:
|
||
|
|
return _update_token_cache(res["result"])
|
||
|
|
return None
|
||
|
|
|
||
|
|
def _refresh_token(refresh_token: str) -> Optional[str]:
|
||
|
|
_log.info("Refreshing access token...")
|
||
|
|
res = api_post("jimi.oauth.token.refresh", {"refresh_token": refresh_token})
|
||
|
|
if res.get("code") == 0:
|
||
|
|
return _update_token_cache(res["result"])
|
||
|
|
return _fetch_new_token()
|
||
|
|
|
||
|
|
def _update_token_cache(r: dict) -> str:
|
||
|
|
token, expires_in = r["accessToken"], int(r.get("expiresIn", 7200))
|
||
|
|
expires_at = datetime.now(timezone.utc) + timedelta(seconds=expires_in)
|
||
|
|
with get_conn() as conn:
|
||
|
|
with conn.cursor() as cur:
|
||
|
|
cur.execute("""
|
||
|
|
INSERT INTO tracksolid.api_token_cache (account, access_token, refresh_token, expires_at)
|
||
|
|
VALUES (%s, %s, %s, %s)
|
||
|
|
ON CONFLICT (account) DO UPDATE SET
|
||
|
|
access_token=EXCLUDED.access_token, refresh_token=EXCLUDED.refresh_token,
|
||
|
|
expires_at=EXCLUDED.expires_at, obtained_at=NOW()
|
||
|
|
""", (USER_ID, token, r.get("refreshToken"), expires_at))
|
||
|
|
conn.commit()
|
||
|
|
return token
|