tracksolid_timescale_grafan.../ts_shared_rev.py

444 lines
19 KiB
Python
Raw Normal View History

2026-04-07 18:34:40 +00:00
"""
ts_shared_rev.py Fireside Communications · Tracksolid Pro Ingestion Stack
Shared utilities: config, signing, HTTP, DB pool, token cache, clean helpers.
Imported by ingest_movement_rev.py, ingest_events_rev.py, and webhook_receiver_rev.py.
2026-04-07 18:34:40 +00:00
REVISIONS (QA-Verified):
[FIX-01] Secrets exclusively from env (Security).
[FIX-02] psycopg2.pool.ThreadedConnectionPool (Performance).
[FIX-03] Exponential back-off on transient HTTP/API errors (Resiliency).
[FIX-04] Token refresh via jimi.oauth.token.refresh (Efficiency).
[FIX-05] API rate-limit (1006) back-off + re-sign (Resiliency).
[FIX-QA-01] clean_num/clean_int return None on non-numeric (Data Integrity).
[FIX-QA-02] api_post catches all RequestExceptions for retry (Robustness).
[FIX-09] get_conn auto-commits on success (Data Integrity).
[FIX-11] Consolidated safe_task/setup_shutdown (DRY).
2026-04-07 18:34:40 +00:00
"""
from __future__ import annotations
import hashlib
import logging
import os
import signal
import sys
2026-04-07 18:34:40 +00:00
import time
from contextlib import contextmanager
from datetime import datetime, timezone, timedelta
from typing import Optional, Any
import psycopg2
import psycopg2.extras
import psycopg2.pool
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# ── Configuration ─────────────────────────────────────────────────────────────
def _require_env(key: str) -> str:
v = os.getenv(key)
if not v:
raise EnvironmentError(f"Required environment variable '{key}' is missing.")
return v
APP_KEY = _require_env("TRACKSOLID_APP_KEY")
APP_SECRET = _require_env("TRACKSOLID_APP_SECRET")
USER_ID = _require_env("TRACKSOLID_USER_ID")
TARGET_ACCOUNT = os.getenv("TRACKSOLID_TARGET_ACCOUNT", USER_ID)
# [FIX-M19] Multi-account support: the fleet is split across multiple
# Tracksolid sub-accounts (e.g. fireside, Fireside@HQ, Fireside_MSA).
# TRACKSOLID_TARGETS is a comma-separated list; falls back to TARGET_ACCOUNT.
TARGETS = [
t.strip() for t in os.getenv("TRACKSOLID_TARGETS", "").split(",") if t.strip()
] or [TARGET_ACCOUNT]
2026-04-07 18:34:40 +00:00
PWD_MD5 = _require_env("TRACKSOLID_PWD_MD5")
DATABASE_URL = _require_env("DATABASE_URL")
API_BASE_URL = os.getenv("TRACKSOLID_API_URL", "https://eu-open.tracksolidpro.com/route/rest")
# Pool sizing: Min 2 for low traffic, Max 12 for high frequency telemetry
_POOL_MIN = 2
_POOL_MAX = int(os.getenv("DB_POOL_MAX", "12"))
# ── Logging ───────────────────────────────────────────────────────────────────
def get_logger(name: str) -> logging.Logger:
"""Standardized logger for systemd/journald ingestion."""
root = logging.getLogger("tracksolid")
if not root.handlers:
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter(
"%(asctime)s [%(levelname)s] %(name)s%(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
))
root.addHandler(handler)
root.setLevel(logging.INFO)
return root.getChild(name)
_log = get_logger("shared")
# ── Connection Pool (psycopg2) ───────────────────────────────────────────────
_pool: Optional[psycopg2.pool.ThreadedConnectionPool] = None
def _get_pool() -> psycopg2.pool.ThreadedConnectionPool:
global _pool
if _pool is None or _pool.closed:
_pool = psycopg2.pool.ThreadedConnectionPool(
_POOL_MIN, _POOL_MAX, DATABASE_URL,
options="-c client_encoding=UTF8",
)
_log.info("DB Pool initialized (min=%d, max=%d)", _POOL_MIN, _POOL_MAX)
return _pool
@contextmanager
def get_conn():
"""Thread-safe DB connection context manager. Auto-commits on success, rolls back on error."""
2026-04-07 18:34:40 +00:00
pool = _get_pool()
conn = pool.getconn()
try:
conn.autocommit = False
yield conn
conn.commit()
2026-04-07 18:34:40 +00:00
except Exception:
conn.rollback()
raise
finally:
pool.putconn(conn)
def close_pool():
global _pool
if _pool:
_pool.closeall()
_log.info("DB Pool closed.")
# ── Scheduler / Signal Utilities ─────────────────────────────────────────────
def safe_task(fn, logger=None):
"""Decorator to prevent scheduler death on single function failure."""
_logger = logger or _log
def wrapper():
try:
fn()
except Exception:
_logger.exception("Task %s failed. Scheduler continuing...", fn.__name__)
wrapper.__name__ = fn.__name__
return wrapper
def setup_shutdown(logger=None):
"""Register SIGTERM/SIGINT handlers for clean DB pool closure."""
_logger = logger or _log
def _handler(signum, frame):
_logger.info("Signal %s received. Closing DB pool...", signum)
close_pool()
sys.exit(0)
signal.signal(signal.SIGTERM, _handler)
signal.signal(signal.SIGINT, _handler)
2026-04-07 18:34:40 +00:00
# ── Value Cleaning (QA Fixes) ─────────────────────────────────────────────────
def clean(v: Any) -> Optional[str]:
if v is None: return None
s = str(v).strip()
return s if s != "" else None
def clean_num(v: Any) -> Optional[float]:
"""QA-01: Explicitly returns None for non-numeric strings."""
s = clean(v)
if s is None: return None
try:
return float(s)
except (ValueError, TypeError):
return None
def clean_int(v: Any) -> Optional[int]:
s = clean(v)
if s is None: return None
try:
return int(float(s))
except (ValueError, TypeError):
return None
def clean_ts(v: Any) -> Optional[str]:
"""Clean timestamp string for PostgreSQL insertion."""
s = clean(v)
if s is None:
return None
try:
datetime.fromisoformat(s.replace("Z", "+00:00"))
return s
except (ValueError, TypeError):
return None
2026-04-07 18:34:40 +00:00
def is_valid_fix(lat: Any, lng: Any) -> bool:
"""Filters out 0,0 'Zero Island' markers and null positions."""
flat, flng = clean_num(lat), clean_num(lng)
if flat is None or flng is None: return False
if flat == 0.0 and flng == 0.0: return False
return (-90 <= flat <= 90) and (-180 <= flng <= 180)
# ── API Signature & HTTP ──────────────────────────────────────────────────────
def build_sign(params: dict, secret: str) -> str:
"""Tracksolid MD5 Signature: secret + k1v1k2v2... + secret."""
sorted_keys = sorted(k for k in params if k != "sign" and params[k] is not None)
raw = secret + "".join(f"{k}{params[k]}" for k in sorted_keys) + secret
return hashlib.md5(raw.encode("utf-8")).hexdigest().upper()
_session = requests.Session()
_session.mount("https://", HTTPAdapter(max_retries=Retry(total=3, backoff_factor=1)))
def api_post(method: str, extra: dict, access_token: Optional[str] = None, _retry_count: int = 0) -> dict:
"""
Production-grade API caller.
Handles: Retries, Signing, Rate Limiting (1006), and Token Expiry (1004).
"""
params = {
"method": method,
"app_key": APP_KEY,
"timestamp": datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S"),
"sign_method": "md5",
"v": "1.0",
"format": "json",
}
if access_token: params["access_token"] = access_token
params.update(extra)
params["sign"] = build_sign(params, APP_SECRET)
try:
r = _session.post(API_BASE_URL, data=params, timeout=25)
r.raise_for_status()
data = r.json()
except (requests.RequestException, ValueError) as e:
2026-04-07 18:34:40 +00:00
if _retry_count < 3:
time.sleep(2 ** _retry_count)
return api_post(method, extra, access_token, _retry_count + 1)
return {"code": -1, "message": str(e)}
code = data.get("code")
# Handle Rate Limit (1006)
if code == 1006 and _retry_count < 3:
wait = 10 * (_retry_count + 1)
_log.warning("Rate limit hit [%s]. Backing off %ds...", method, wait)
time.sleep(wait)
return api_post(method, extra, access_token, _retry_count + 1)
return data
# ── Database Operations ───────────────────────────────────────────────────────
def get_active_imeis() -> list[str]:
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute("SELECT imei FROM tracksolid.devices WHERE enabled_flag = 1")
return [r[0] for r in cur.fetchall()]
FIX-M20: alarm cross-feed + stale-IMEI recovery for live_positions Background ---------- A field audit of liveposition.rahamafresh.com on 2026-05-21 surfaced two freshness gaps that share a single root cause: tracksolid.live_positions was being written by only one path (the 60s polled sweep), and that path silently omits devices that don't have a "current" fix in Jimi's location.list response. Effect on the dashboard: * 18 vehicles show OFFLINE for days-to-months — last fix is whatever the sweep wrote before Jimi dropped them. * 3 vehicles (KDK 780K, KCQ 618K, KCZ 476E) depend on dashcam fallback because their dedicated tracker has been silent; the camera's lat/lng arrives via /pushalarm webhooks (5,287/day, 100% lat/lng fill) but we discard it after writing to tracksolid.alarms. Verified upstream subscription state: only /pushalarm is registered with Jimi; the n8n forwarders for /pushgps, /pushtripreport, /pushobd are inactive. This change uses only data that already arrives. What's in this PR ----------------- ts_shared_rev.py * upsert_live_position(cur, imei, lat, lng, gps_time, ..., extras=None) — single time-guarded upsert all three writers will share. Guards on is_valid_fix() (filters Zero-Island and out-of-range) and EXCLUDED.gps_time > stored.gps_time so late-arriving alarms or webhook retries can't rewind a fresher marker. COALESCE on optional columns so sparse callers don't blank dense ones' values. * get_stale_imeis(stale_minutes=30) — SELECT enabled_flag=1 devices whose live_positions.gps_time is NULL or older than the threshold, ordered NULLS FIRST so worst-offenders are in batch #1. * ensure_device(cur, imei, device_name=None) — relocated from webhook_receiver_rev so every live_positions writer can satisfy the FK without re-defining the helper. The original underscore-prefixed name in webhook_receiver_rev becomes a backwards-compat alias. webhook_receiver_rev.py * /pushalarm — after the alarm row insert, call upsert_live_position with the alarm's lat/lng and alarmTime. Sits inside the existing per-item SAVEPOINT, so a cross-feed failure rolls back only that one alarm's cross-feed, not the alarm row. ingest_movement_rev.py * poll_live_positions — inline INSERT replaced with upsert_live_position (extras dict carries the sweep-only columns). Same data, time-guarded. * get_device_locations — inline INSERT replaced; also gains an ensure_device call so it can be safely fed arbitrary IMEIs. * poll_stale_locations() — new wrapper. Pulls get_stale_imeis() and hands it to get_device_locations. Scheduled every 10 minutes plus a startup catch-up call. Uses jimi.device.location.get which returns *last-known* fix, so devices the 60s sweep drops can be re-warmed. Expected post-deploy effect (estimates, see 260521_timescale_location_upgrade_major.md §4) * ~1,100-1,600 additional live_positions upserts/day from the alarm cross-feed, after the time-guard rejects ~70-80% of races vs the fresher 60s sweep. * The 3 camera-fallback plates flip to "seconds-after-alarm" cadence (JC400P emits ~107 alarms/day per device). * 8-14 of the 24 OFFLINE plates expected to recover via location.get's last-known-fix path within the first 30 minutes. * Dashboard's "Offline 24h+" KPI: 24 → 10-14 within the first hour. * No 06_live_location code changes required — reads through reporting.v_live_positions transparently. Tests ----- 12 webhook integration tests pass (3 new: cross-feed fires on valid fix; skips without lat/lng; skips Zero-Island). 8 new unit tests in test_stale_imeis.py cover the stale selector, the poll wrapper, and the time-guard contract on upsert_live_position. Full suite: 77 passed. Deployment ---------- No schema migration. Both webhook_receiver and ingest_movement containers must be rebuilt — source is image-baked, not bind-mounted. Rollback is git revert + rebuild. Plan & monitoring SQL: 06_live_location/260521_timescale_location_upgrade_major.md Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-21 18:05:26 +00:00
def get_stale_imeis(stale_minutes: int = 30) -> list[str]:
"""[FIX-M20] IMEIs whose live_positions fix is missing or older than N minutes.
Used by poll_stale_locations() to feed get_device_locations() with the
set the 60s sweep silently dropped. Ordered oldest-first (NULLs first)
so worst-offenders get the first seats in each 50-IMEI batch.
"""
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT d.imei
FROM tracksolid.devices d
LEFT JOIN tracksolid.live_positions lp USING (imei)
WHERE d.enabled_flag = 1
AND (lp.gps_time IS NULL
OR lp.gps_time < NOW() - (%s || ' minutes')::interval)
ORDER BY lp.gps_time ASC NULLS FIRST
""", (str(stale_minutes),))
return [r[0] for r in cur.fetchall()]
def ensure_device(cur, imei: str, device_name: Optional[str] = None) -> None:
"""[FIX-M20] Upsert a stub row into tracksolid.devices so FK-constrained
inserts don't fail when ingest paths see an IMEI before sync_devices does.
Lifted out of webhook_receiver_rev.py to be shareable by every writer
of live_positions / alarms / position_history. Idempotent.
"""
cur.execute(
"""
INSERT INTO tracksolid.devices (imei, device_name, status, created_at, updated_at)
VALUES (%s, %s, 'unknown', NOW(), NOW())
ON CONFLICT (imei) DO NOTHING
""",
(imei, device_name),
)
def upsert_live_position(
cur,
imei: str,
lat,
lng,
gps_time,
speed=None,
direction=None,
acc_status=None,
current_mileage=None,
extras: Optional[dict] = None,
) -> int:
"""[FIX-M20] Time-guarded upsert into tracksolid.live_positions.
Only overwrites the stored row when the incoming gps_time is strictly
newer than what's already there. NULL stored gps_time always loses
(any fix beats no fix). Returns 1 if a row was written/updated, else 0.
`extras` carries the columns only the 60s sweep emits
(pos_type, confidence, hb_time, gps_signal, gps_num, elec_quantity,
power_value, battery_power_val, tracker_oil, temperature,
device_status, loc_desc). When omitted, those columns are left alone
on update via COALESCE so a sparse caller (e.g. alarm cross-feed)
doesn't blank them out.
"""
if not imei or not gps_time or not is_valid_fix(lat, lng):
return 0
extras = extras or {}
cur.execute("""
INSERT INTO tracksolid.live_positions (
imei, geom, lat, lng, gps_time, speed, direction,
acc_status, current_mileage,
pos_type, confidence, hb_time, gps_signal, gps_num,
elec_quantity, power_value, battery_power_val,
tracker_oil, temperature, device_status, loc_desc,
recorded_at
) VALUES (
%(imei)s,
ST_SetSRID(ST_MakePoint(%(lng)s, %(lat)s), 4326),
%(lat)s, %(lng)s, %(gps_time)s, %(speed)s, %(direction)s,
%(acc_status)s, %(current_mileage)s,
%(pos_type)s, %(confidence)s, %(hb_time)s, %(gps_signal)s, %(gps_num)s,
%(elec_quantity)s, %(power_value)s, %(battery_power_val)s,
%(tracker_oil)s, %(temperature)s, %(device_status)s, %(loc_desc)s,
NOW()
)
ON CONFLICT (imei) DO UPDATE SET
geom = EXCLUDED.geom,
lat = EXCLUDED.lat,
lng = EXCLUDED.lng,
gps_time = EXCLUDED.gps_time,
speed = COALESCE(EXCLUDED.speed, tracksolid.live_positions.speed),
direction = COALESCE(EXCLUDED.direction, tracksolid.live_positions.direction),
acc_status = COALESCE(EXCLUDED.acc_status, tracksolid.live_positions.acc_status),
current_mileage = COALESCE(EXCLUDED.current_mileage, tracksolid.live_positions.current_mileage),
pos_type = COALESCE(EXCLUDED.pos_type, tracksolid.live_positions.pos_type),
confidence = COALESCE(EXCLUDED.confidence, tracksolid.live_positions.confidence),
hb_time = COALESCE(EXCLUDED.hb_time, tracksolid.live_positions.hb_time),
gps_signal = COALESCE(EXCLUDED.gps_signal, tracksolid.live_positions.gps_signal),
gps_num = COALESCE(EXCLUDED.gps_num, tracksolid.live_positions.gps_num),
elec_quantity = COALESCE(EXCLUDED.elec_quantity, tracksolid.live_positions.elec_quantity),
power_value = COALESCE(EXCLUDED.power_value, tracksolid.live_positions.power_value),
battery_power_val = COALESCE(EXCLUDED.battery_power_val, tracksolid.live_positions.battery_power_val),
tracker_oil = COALESCE(EXCLUDED.tracker_oil, tracksolid.live_positions.tracker_oil),
temperature = COALESCE(EXCLUDED.temperature, tracksolid.live_positions.temperature),
device_status = COALESCE(EXCLUDED.device_status, tracksolid.live_positions.device_status),
loc_desc = COALESCE(EXCLUDED.loc_desc, tracksolid.live_positions.loc_desc),
updated_at = NOW()
WHERE EXCLUDED.gps_time IS NOT NULL
AND (tracksolid.live_positions.gps_time IS NULL
OR EXCLUDED.gps_time > tracksolid.live_positions.gps_time)
""", {
"imei": imei,
"lat": lat,
"lng": lng,
"gps_time": gps_time,
"speed": speed,
"direction": direction,
"acc_status": acc_status,
"current_mileage": current_mileage,
"pos_type": extras.get("pos_type"),
"confidence": extras.get("confidence"),
"hb_time": extras.get("hb_time"),
"gps_signal": extras.get("gps_signal"),
"gps_num": extras.get("gps_num"),
"elec_quantity": extras.get("elec_quantity"),
"power_value": extras.get("power_value"),
"battery_power_val": extras.get("battery_power_val"),
"tracker_oil": extras.get("tracker_oil"),
"temperature": extras.get("temperature"),
"device_status": extras.get("device_status"),
"loc_desc": extras.get("loc_desc"),
})
return cur.rowcount
def get_active_imeis_by_target() -> dict[str, list[str]]:
"""[FIX-M19] Group active IMEIs by their Tracksolid sub-account so
endpoints that require an `account`/`target` param (e.g. parking) can
scope per-target calls. IMEIs with a NULL account are bucketed under
the primary TARGET_ACCOUNT as a safe default."""
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT COALESCE(account, %s) AS target, imei
FROM tracksolid.devices
WHERE enabled_flag = 1
""", (TARGET_ACCOUNT,))
out: dict[str, list[str]] = {}
for target, imei in cur.fetchall():
out.setdefault(target, []).append(imei)
return out
2026-04-07 18:34:40 +00:00
def log_ingestion(cur, endpoint: str, imei_count: int, upserted: int, inserted: int, duration_ms: int, success: bool, error_code: str = None, error_msg: str = None):
cur.execute("""
INSERT INTO tracksolid.ingestion_log
2026-04-07 18:34:40 +00:00
(endpoint, imei_count, rows_upserted, rows_inserted, duration_ms, success, error_code, error_message)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
""", (
endpoint[:100], imei_count, upserted, inserted, duration_ms, success,
str(error_code)[:50] if error_code is not None else None,
str(error_msg)[:500] if error_msg is not None else None,
))
2026-04-07 18:34:40 +00:00
# ── Token Management ──────────────────────────────────────────────────────────
def get_token() -> Optional[str]:
"""Cache-aware token fetcher with auto-refresh."""
with get_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
cur.execute("SELECT access_token, refresh_token, expires_at FROM tracksolid.api_token_cache WHERE account = %s", (USER_ID,))
row = cur.fetchone()
now = datetime.now(timezone.utc)
if row:
expires_at = row['expires_at'].replace(tzinfo=timezone.utc)
diff = (expires_at - now).total_seconds()
if diff > 1800: return row['access_token']
if diff > 0 and row['refresh_token']: return _refresh_token(row['refresh_token'])
return _fetch_new_token()
def _fetch_new_token() -> Optional[str]:
_log.info("Requesting new access token (Full Auth)...")
res = api_post("jimi.oauth.token.get", {"user_id": USER_ID, "user_pwd_md5": PWD_MD5, "expires_in": 7200})
if res.get("code") == 0:
return _update_token_cache(res["result"])
return None
def _refresh_token(refresh_token: str) -> Optional[str]:
_log.info("Refreshing access token...")
res = api_post("jimi.oauth.token.refresh", {"refresh_token": refresh_token})
if res.get("code") == 0:
return _update_token_cache(res["result"])
return _fetch_new_token()
def _update_token_cache(r: dict) -> str:
token, expires_in = r["accessToken"], int(r.get("expiresIn", 7200))
expires_at = datetime.now(timezone.utc) + timedelta(seconds=expires_in)
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute("""
INSERT INTO tracksolid.api_token_cache (account, access_token, refresh_token, expires_at)
VALUES (%s, %s, %s, %s)
ON CONFLICT (account) DO UPDATE SET
access_token=EXCLUDED.access_token, refresh_token=EXCLUDED.refresh_token,
expires_at=EXCLUDED.expires_at, obtained_at=NOW()
""", (USER_ID, token, r.get("refreshToken"), expires_at))
conn.commit()
return token