2026-04-07 18:34:40 +00:00
|
|
|
"""
|
|
|
|
|
ts_shared_rev.py — Fireside Communications · Tracksolid Pro Ingestion Stack
|
|
|
|
|
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
|
|
|
|
Shared utilities: config, signing, HTTP, DB pool, token cache, clean helpers.
|
Add webhook receiver, consolidate shared utilities, expand telemetry coverage
- Add FastAPI webhook receiver (webhook_receiver_rev.py) for Jimi push data:
OBD diagnostics, DTC fault codes, alarms, GPS, heartbeats, trip reports
- Add schema migration (03_webhook_schema_migration.sql) for webhook tables:
fault_codes, heartbeats, expanded obd_readings/trips/position_history/alarms
- Consolidate duplicated _safe/_shutdown into shared safe_task/setup_shutdown
in ts_shared_rev.py (DRY refactor)
- Add auto-commit to get_conn() context manager (prevents forgotten commits)
- Fix poll_trips to capture runTimeSecond and maxSpeed from API
- Add poll_parking via jimi.open.platform.report.parking
- Remove broken poll_obd (OBD is push-only, no polling endpoint exists)
- Fix alarms schema: add lat/lng/acc_status columns + dedup constraint
- Fix obd_readings schema: add dedup constraint
- Fix trigger DO block: replace nonexistent has_column with information_schema
- Narrow api_post exception handling to RequestException/ValueError
- Add webhook_receiver service to docker-compose.yaml
- Add fastapi/uvicorn/python-multipart to pyproject.toml
- Add clean_ts timestamp validator to ts_shared_rev.py
- Add Tracksolid Pro API documentation (tracksolidApiDocumentation.md)
- Populate .gitignore with Python/OS/secrets patterns
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-08 13:28:45 +00:00
|
|
|
Imported by ingest_movement_rev.py, ingest_events_rev.py, and webhook_receiver_rev.py.
|
2026-04-07 18:34:40 +00:00
|
|
|
|
|
|
|
|
REVISIONS (QA-Verified):
|
|
|
|
|
[FIX-01] Secrets exclusively from env (Security).
|
|
|
|
|
[FIX-02] psycopg2.pool.ThreadedConnectionPool (Performance).
|
|
|
|
|
[FIX-03] Exponential back-off on transient HTTP/API errors (Resiliency).
|
|
|
|
|
[FIX-04] Token refresh via jimi.oauth.token.refresh (Efficiency).
|
|
|
|
|
[FIX-05] API rate-limit (1006) back-off + re-sign (Resiliency).
|
|
|
|
|
[FIX-QA-01] clean_num/clean_int return None on non-numeric (Data Integrity).
|
|
|
|
|
[FIX-QA-02] api_post catches all RequestExceptions for retry (Robustness).
|
Add webhook receiver, consolidate shared utilities, expand telemetry coverage
- Add FastAPI webhook receiver (webhook_receiver_rev.py) for Jimi push data:
OBD diagnostics, DTC fault codes, alarms, GPS, heartbeats, trip reports
- Add schema migration (03_webhook_schema_migration.sql) for webhook tables:
fault_codes, heartbeats, expanded obd_readings/trips/position_history/alarms
- Consolidate duplicated _safe/_shutdown into shared safe_task/setup_shutdown
in ts_shared_rev.py (DRY refactor)
- Add auto-commit to get_conn() context manager (prevents forgotten commits)
- Fix poll_trips to capture runTimeSecond and maxSpeed from API
- Add poll_parking via jimi.open.platform.report.parking
- Remove broken poll_obd (OBD is push-only, no polling endpoint exists)
- Fix alarms schema: add lat/lng/acc_status columns + dedup constraint
- Fix obd_readings schema: add dedup constraint
- Fix trigger DO block: replace nonexistent has_column with information_schema
- Narrow api_post exception handling to RequestException/ValueError
- Add webhook_receiver service to docker-compose.yaml
- Add fastapi/uvicorn/python-multipart to pyproject.toml
- Add clean_ts timestamp validator to ts_shared_rev.py
- Add Tracksolid Pro API documentation (tracksolidApiDocumentation.md)
- Populate .gitignore with Python/OS/secrets patterns
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-08 13:28:45 +00:00
|
|
|
[FIX-09] get_conn auto-commits on success (Data Integrity).
|
|
|
|
|
[FIX-11] Consolidated safe_task/setup_shutdown (DRY).
|
2026-04-07 18:34:40 +00:00
|
|
|
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
|
|
import hashlib
|
|
|
|
|
import logging
|
|
|
|
|
import os
|
Add webhook receiver, consolidate shared utilities, expand telemetry coverage
- Add FastAPI webhook receiver (webhook_receiver_rev.py) for Jimi push data:
OBD diagnostics, DTC fault codes, alarms, GPS, heartbeats, trip reports
- Add schema migration (03_webhook_schema_migration.sql) for webhook tables:
fault_codes, heartbeats, expanded obd_readings/trips/position_history/alarms
- Consolidate duplicated _safe/_shutdown into shared safe_task/setup_shutdown
in ts_shared_rev.py (DRY refactor)
- Add auto-commit to get_conn() context manager (prevents forgotten commits)
- Fix poll_trips to capture runTimeSecond and maxSpeed from API
- Add poll_parking via jimi.open.platform.report.parking
- Remove broken poll_obd (OBD is push-only, no polling endpoint exists)
- Fix alarms schema: add lat/lng/acc_status columns + dedup constraint
- Fix obd_readings schema: add dedup constraint
- Fix trigger DO block: replace nonexistent has_column with information_schema
- Narrow api_post exception handling to RequestException/ValueError
- Add webhook_receiver service to docker-compose.yaml
- Add fastapi/uvicorn/python-multipart to pyproject.toml
- Add clean_ts timestamp validator to ts_shared_rev.py
- Add Tracksolid Pro API documentation (tracksolidApiDocumentation.md)
- Populate .gitignore with Python/OS/secrets patterns
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-08 13:28:45 +00:00
|
|
|
import signal
|
|
|
|
|
import sys
|
2026-04-07 18:34:40 +00:00
|
|
|
import time
|
|
|
|
|
from contextlib import contextmanager
|
|
|
|
|
from datetime import datetime, timezone, timedelta
|
|
|
|
|
from typing import Optional, Any
|
|
|
|
|
|
|
|
|
|
import psycopg2
|
|
|
|
|
import psycopg2.extras
|
|
|
|
|
import psycopg2.pool
|
|
|
|
|
import requests
|
|
|
|
|
from requests.adapters import HTTPAdapter
|
|
|
|
|
from urllib3.util.retry import Retry
|
|
|
|
|
|
|
|
|
|
# ── Configuration ─────────────────────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
def _require_env(key: str) -> str:
|
|
|
|
|
v = os.getenv(key)
|
|
|
|
|
if not v:
|
|
|
|
|
raise EnvironmentError(f"Required environment variable '{key}' is missing.")
|
|
|
|
|
return v
|
|
|
|
|
|
|
|
|
|
APP_KEY = _require_env("TRACKSOLID_APP_KEY")
|
|
|
|
|
APP_SECRET = _require_env("TRACKSOLID_APP_SECRET")
|
|
|
|
|
USER_ID = _require_env("TRACKSOLID_USER_ID")
|
|
|
|
|
TARGET_ACCOUNT = os.getenv("TRACKSOLID_TARGET_ACCOUNT", USER_ID)
|
2026-04-24 07:43:07 +00:00
|
|
|
# [FIX-M19] Multi-account support: the fleet is split across multiple
|
|
|
|
|
# Tracksolid sub-accounts (e.g. fireside, Fireside@HQ, Fireside_MSA).
|
|
|
|
|
# TRACKSOLID_TARGETS is a comma-separated list; falls back to TARGET_ACCOUNT.
|
|
|
|
|
TARGETS = [
|
|
|
|
|
t.strip() for t in os.getenv("TRACKSOLID_TARGETS", "").split(",") if t.strip()
|
|
|
|
|
] or [TARGET_ACCOUNT]
|
2026-04-07 18:34:40 +00:00
|
|
|
PWD_MD5 = _require_env("TRACKSOLID_PWD_MD5")
|
|
|
|
|
DATABASE_URL = _require_env("DATABASE_URL")
|
|
|
|
|
API_BASE_URL = os.getenv("TRACKSOLID_API_URL", "https://eu-open.tracksolidpro.com/route/rest")
|
|
|
|
|
|
|
|
|
|
# Pool sizing: Min 2 for low traffic, Max 12 for high frequency telemetry
|
|
|
|
|
_POOL_MIN = 2
|
|
|
|
|
_POOL_MAX = int(os.getenv("DB_POOL_MAX", "12"))
|
|
|
|
|
|
|
|
|
|
# ── Logging ───────────────────────────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
def get_logger(name: str) -> logging.Logger:
|
|
|
|
|
"""Standardized logger for systemd/journald ingestion."""
|
|
|
|
|
root = logging.getLogger("tracksolid")
|
|
|
|
|
if not root.handlers:
|
|
|
|
|
handler = logging.StreamHandler()
|
|
|
|
|
handler.setFormatter(logging.Formatter(
|
|
|
|
|
"%(asctime)s [%(levelname)s] %(name)s — %(message)s",
|
|
|
|
|
datefmt="%Y-%m-%d %H:%M:%S",
|
|
|
|
|
))
|
|
|
|
|
root.addHandler(handler)
|
|
|
|
|
root.setLevel(logging.INFO)
|
|
|
|
|
return root.getChild(name)
|
|
|
|
|
|
|
|
|
|
_log = get_logger("shared")
|
|
|
|
|
|
|
|
|
|
# ── Connection Pool (psycopg2) ───────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
_pool: Optional[psycopg2.pool.ThreadedConnectionPool] = None
|
|
|
|
|
|
|
|
|
|
def _get_pool() -> psycopg2.pool.ThreadedConnectionPool:
|
|
|
|
|
global _pool
|
|
|
|
|
if _pool is None or _pool.closed:
|
|
|
|
|
_pool = psycopg2.pool.ThreadedConnectionPool(
|
|
|
|
|
_POOL_MIN, _POOL_MAX, DATABASE_URL,
|
|
|
|
|
options="-c client_encoding=UTF8",
|
|
|
|
|
)
|
|
|
|
|
_log.info("DB Pool initialized (min=%d, max=%d)", _POOL_MIN, _POOL_MAX)
|
|
|
|
|
return _pool
|
|
|
|
|
|
|
|
|
|
@contextmanager
|
|
|
|
|
def get_conn():
|
Add webhook receiver, consolidate shared utilities, expand telemetry coverage
- Add FastAPI webhook receiver (webhook_receiver_rev.py) for Jimi push data:
OBD diagnostics, DTC fault codes, alarms, GPS, heartbeats, trip reports
- Add schema migration (03_webhook_schema_migration.sql) for webhook tables:
fault_codes, heartbeats, expanded obd_readings/trips/position_history/alarms
- Consolidate duplicated _safe/_shutdown into shared safe_task/setup_shutdown
in ts_shared_rev.py (DRY refactor)
- Add auto-commit to get_conn() context manager (prevents forgotten commits)
- Fix poll_trips to capture runTimeSecond and maxSpeed from API
- Add poll_parking via jimi.open.platform.report.parking
- Remove broken poll_obd (OBD is push-only, no polling endpoint exists)
- Fix alarms schema: add lat/lng/acc_status columns + dedup constraint
- Fix obd_readings schema: add dedup constraint
- Fix trigger DO block: replace nonexistent has_column with information_schema
- Narrow api_post exception handling to RequestException/ValueError
- Add webhook_receiver service to docker-compose.yaml
- Add fastapi/uvicorn/python-multipart to pyproject.toml
- Add clean_ts timestamp validator to ts_shared_rev.py
- Add Tracksolid Pro API documentation (tracksolidApiDocumentation.md)
- Populate .gitignore with Python/OS/secrets patterns
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-08 13:28:45 +00:00
|
|
|
"""Thread-safe DB connection context manager. Auto-commits on success, rolls back on error."""
|
2026-04-07 18:34:40 +00:00
|
|
|
pool = _get_pool()
|
|
|
|
|
conn = pool.getconn()
|
|
|
|
|
try:
|
|
|
|
|
conn.autocommit = False
|
|
|
|
|
yield conn
|
Add webhook receiver, consolidate shared utilities, expand telemetry coverage
- Add FastAPI webhook receiver (webhook_receiver_rev.py) for Jimi push data:
OBD diagnostics, DTC fault codes, alarms, GPS, heartbeats, trip reports
- Add schema migration (03_webhook_schema_migration.sql) for webhook tables:
fault_codes, heartbeats, expanded obd_readings/trips/position_history/alarms
- Consolidate duplicated _safe/_shutdown into shared safe_task/setup_shutdown
in ts_shared_rev.py (DRY refactor)
- Add auto-commit to get_conn() context manager (prevents forgotten commits)
- Fix poll_trips to capture runTimeSecond and maxSpeed from API
- Add poll_parking via jimi.open.platform.report.parking
- Remove broken poll_obd (OBD is push-only, no polling endpoint exists)
- Fix alarms schema: add lat/lng/acc_status columns + dedup constraint
- Fix obd_readings schema: add dedup constraint
- Fix trigger DO block: replace nonexistent has_column with information_schema
- Narrow api_post exception handling to RequestException/ValueError
- Add webhook_receiver service to docker-compose.yaml
- Add fastapi/uvicorn/python-multipart to pyproject.toml
- Add clean_ts timestamp validator to ts_shared_rev.py
- Add Tracksolid Pro API documentation (tracksolidApiDocumentation.md)
- Populate .gitignore with Python/OS/secrets patterns
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-08 13:28:45 +00:00
|
|
|
conn.commit()
|
2026-04-07 18:34:40 +00:00
|
|
|
except Exception:
|
|
|
|
|
conn.rollback()
|
|
|
|
|
raise
|
|
|
|
|
finally:
|
|
|
|
|
pool.putconn(conn)
|
|
|
|
|
|
|
|
|
|
def close_pool():
|
|
|
|
|
global _pool
|
|
|
|
|
if _pool:
|
|
|
|
|
_pool.closeall()
|
|
|
|
|
_log.info("DB Pool closed.")
|
|
|
|
|
|
Add webhook receiver, consolidate shared utilities, expand telemetry coverage
- Add FastAPI webhook receiver (webhook_receiver_rev.py) for Jimi push data:
OBD diagnostics, DTC fault codes, alarms, GPS, heartbeats, trip reports
- Add schema migration (03_webhook_schema_migration.sql) for webhook tables:
fault_codes, heartbeats, expanded obd_readings/trips/position_history/alarms
- Consolidate duplicated _safe/_shutdown into shared safe_task/setup_shutdown
in ts_shared_rev.py (DRY refactor)
- Add auto-commit to get_conn() context manager (prevents forgotten commits)
- Fix poll_trips to capture runTimeSecond and maxSpeed from API
- Add poll_parking via jimi.open.platform.report.parking
- Remove broken poll_obd (OBD is push-only, no polling endpoint exists)
- Fix alarms schema: add lat/lng/acc_status columns + dedup constraint
- Fix obd_readings schema: add dedup constraint
- Fix trigger DO block: replace nonexistent has_column with information_schema
- Narrow api_post exception handling to RequestException/ValueError
- Add webhook_receiver service to docker-compose.yaml
- Add fastapi/uvicorn/python-multipart to pyproject.toml
- Add clean_ts timestamp validator to ts_shared_rev.py
- Add Tracksolid Pro API documentation (tracksolidApiDocumentation.md)
- Populate .gitignore with Python/OS/secrets patterns
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-08 13:28:45 +00:00
|
|
|
# ── Scheduler / Signal Utilities ─────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
def safe_task(fn, logger=None):
|
|
|
|
|
"""Decorator to prevent scheduler death on single function failure."""
|
|
|
|
|
_logger = logger or _log
|
|
|
|
|
def wrapper():
|
|
|
|
|
try:
|
|
|
|
|
fn()
|
|
|
|
|
except Exception:
|
|
|
|
|
_logger.exception("Task %s failed. Scheduler continuing...", fn.__name__)
|
|
|
|
|
wrapper.__name__ = fn.__name__
|
|
|
|
|
return wrapper
|
|
|
|
|
|
|
|
|
|
def setup_shutdown(logger=None):
|
|
|
|
|
"""Register SIGTERM/SIGINT handlers for clean DB pool closure."""
|
|
|
|
|
_logger = logger or _log
|
|
|
|
|
def _handler(signum, frame):
|
|
|
|
|
_logger.info("Signal %s received. Closing DB pool...", signum)
|
|
|
|
|
close_pool()
|
|
|
|
|
sys.exit(0)
|
|
|
|
|
signal.signal(signal.SIGTERM, _handler)
|
|
|
|
|
signal.signal(signal.SIGINT, _handler)
|
|
|
|
|
|
2026-04-07 18:34:40 +00:00
|
|
|
# ── Value Cleaning (QA Fixes) ─────────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
def clean(v: Any) -> Optional[str]:
|
|
|
|
|
if v is None: return None
|
|
|
|
|
s = str(v).strip()
|
|
|
|
|
return s if s != "" else None
|
|
|
|
|
|
|
|
|
|
def clean_num(v: Any) -> Optional[float]:
|
|
|
|
|
"""QA-01: Explicitly returns None for non-numeric strings."""
|
|
|
|
|
s = clean(v)
|
|
|
|
|
if s is None: return None
|
|
|
|
|
try:
|
|
|
|
|
return float(s)
|
|
|
|
|
except (ValueError, TypeError):
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def clean_int(v: Any) -> Optional[int]:
|
|
|
|
|
s = clean(v)
|
|
|
|
|
if s is None: return None
|
|
|
|
|
try:
|
|
|
|
|
return int(float(s))
|
|
|
|
|
except (ValueError, TypeError):
|
|
|
|
|
return None
|
|
|
|
|
|
Add webhook receiver, consolidate shared utilities, expand telemetry coverage
- Add FastAPI webhook receiver (webhook_receiver_rev.py) for Jimi push data:
OBD diagnostics, DTC fault codes, alarms, GPS, heartbeats, trip reports
- Add schema migration (03_webhook_schema_migration.sql) for webhook tables:
fault_codes, heartbeats, expanded obd_readings/trips/position_history/alarms
- Consolidate duplicated _safe/_shutdown into shared safe_task/setup_shutdown
in ts_shared_rev.py (DRY refactor)
- Add auto-commit to get_conn() context manager (prevents forgotten commits)
- Fix poll_trips to capture runTimeSecond and maxSpeed from API
- Add poll_parking via jimi.open.platform.report.parking
- Remove broken poll_obd (OBD is push-only, no polling endpoint exists)
- Fix alarms schema: add lat/lng/acc_status columns + dedup constraint
- Fix obd_readings schema: add dedup constraint
- Fix trigger DO block: replace nonexistent has_column with information_schema
- Narrow api_post exception handling to RequestException/ValueError
- Add webhook_receiver service to docker-compose.yaml
- Add fastapi/uvicorn/python-multipart to pyproject.toml
- Add clean_ts timestamp validator to ts_shared_rev.py
- Add Tracksolid Pro API documentation (tracksolidApiDocumentation.md)
- Populate .gitignore with Python/OS/secrets patterns
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-08 13:28:45 +00:00
|
|
|
def clean_ts(v: Any) -> Optional[str]:
|
|
|
|
|
"""Clean timestamp string for PostgreSQL insertion."""
|
|
|
|
|
s = clean(v)
|
|
|
|
|
if s is None:
|
|
|
|
|
return None
|
|
|
|
|
try:
|
|
|
|
|
datetime.fromisoformat(s.replace("Z", "+00:00"))
|
|
|
|
|
return s
|
|
|
|
|
except (ValueError, TypeError):
|
|
|
|
|
return None
|
|
|
|
|
|
2026-04-07 18:34:40 +00:00
|
|
|
def is_valid_fix(lat: Any, lng: Any) -> bool:
|
|
|
|
|
"""Filters out 0,0 'Zero Island' markers and null positions."""
|
|
|
|
|
flat, flng = clean_num(lat), clean_num(lng)
|
|
|
|
|
if flat is None or flng is None: return False
|
|
|
|
|
if flat == 0.0 and flng == 0.0: return False
|
|
|
|
|
return (-90 <= flat <= 90) and (-180 <= flng <= 180)
|
|
|
|
|
|
|
|
|
|
# ── API Signature & HTTP ──────────────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
def build_sign(params: dict, secret: str) -> str:
|
|
|
|
|
"""Tracksolid MD5 Signature: secret + k1v1k2v2... + secret."""
|
|
|
|
|
sorted_keys = sorted(k for k in params if k != "sign" and params[k] is not None)
|
|
|
|
|
raw = secret + "".join(f"{k}{params[k]}" for k in sorted_keys) + secret
|
|
|
|
|
return hashlib.md5(raw.encode("utf-8")).hexdigest().upper()
|
|
|
|
|
|
|
|
|
|
_session = requests.Session()
|
|
|
|
|
_session.mount("https://", HTTPAdapter(max_retries=Retry(total=3, backoff_factor=1)))
|
|
|
|
|
|
|
|
|
|
def api_post(method: str, extra: dict, access_token: Optional[str] = None, _retry_count: int = 0) -> dict:
|
|
|
|
|
"""
|
|
|
|
|
Production-grade API caller.
|
|
|
|
|
Handles: Retries, Signing, Rate Limiting (1006), and Token Expiry (1004).
|
|
|
|
|
"""
|
|
|
|
|
params = {
|
|
|
|
|
"method": method,
|
|
|
|
|
"app_key": APP_KEY,
|
|
|
|
|
"timestamp": datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S"),
|
|
|
|
|
"sign_method": "md5",
|
|
|
|
|
"v": "1.0",
|
|
|
|
|
"format": "json",
|
|
|
|
|
}
|
|
|
|
|
if access_token: params["access_token"] = access_token
|
|
|
|
|
params.update(extra)
|
|
|
|
|
params["sign"] = build_sign(params, APP_SECRET)
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
r = _session.post(API_BASE_URL, data=params, timeout=25)
|
|
|
|
|
r.raise_for_status()
|
|
|
|
|
data = r.json()
|
Add webhook receiver, consolidate shared utilities, expand telemetry coverage
- Add FastAPI webhook receiver (webhook_receiver_rev.py) for Jimi push data:
OBD diagnostics, DTC fault codes, alarms, GPS, heartbeats, trip reports
- Add schema migration (03_webhook_schema_migration.sql) for webhook tables:
fault_codes, heartbeats, expanded obd_readings/trips/position_history/alarms
- Consolidate duplicated _safe/_shutdown into shared safe_task/setup_shutdown
in ts_shared_rev.py (DRY refactor)
- Add auto-commit to get_conn() context manager (prevents forgotten commits)
- Fix poll_trips to capture runTimeSecond and maxSpeed from API
- Add poll_parking via jimi.open.platform.report.parking
- Remove broken poll_obd (OBD is push-only, no polling endpoint exists)
- Fix alarms schema: add lat/lng/acc_status columns + dedup constraint
- Fix obd_readings schema: add dedup constraint
- Fix trigger DO block: replace nonexistent has_column with information_schema
- Narrow api_post exception handling to RequestException/ValueError
- Add webhook_receiver service to docker-compose.yaml
- Add fastapi/uvicorn/python-multipart to pyproject.toml
- Add clean_ts timestamp validator to ts_shared_rev.py
- Add Tracksolid Pro API documentation (tracksolidApiDocumentation.md)
- Populate .gitignore with Python/OS/secrets patterns
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-08 13:28:45 +00:00
|
|
|
except (requests.RequestException, ValueError) as e:
|
2026-04-07 18:34:40 +00:00
|
|
|
if _retry_count < 3:
|
|
|
|
|
time.sleep(2 ** _retry_count)
|
|
|
|
|
return api_post(method, extra, access_token, _retry_count + 1)
|
|
|
|
|
return {"code": -1, "message": str(e)}
|
|
|
|
|
|
|
|
|
|
code = data.get("code")
|
|
|
|
|
# Handle Rate Limit (1006)
|
|
|
|
|
if code == 1006 and _retry_count < 3:
|
|
|
|
|
wait = 10 * (_retry_count + 1)
|
|
|
|
|
_log.warning("Rate limit hit [%s]. Backing off %ds...", method, wait)
|
|
|
|
|
time.sleep(wait)
|
|
|
|
|
return api_post(method, extra, access_token, _retry_count + 1)
|
|
|
|
|
|
|
|
|
|
return data
|
|
|
|
|
|
|
|
|
|
# ── Database Operations ───────────────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
def get_active_imeis() -> list[str]:
|
|
|
|
|
with get_conn() as conn:
|
|
|
|
|
with conn.cursor() as cur:
|
|
|
|
|
cur.execute("SELECT imei FROM tracksolid.devices WHERE enabled_flag = 1")
|
|
|
|
|
return [r[0] for r in cur.fetchall()]
|
|
|
|
|
|
FIX-M20: alarm cross-feed + stale-IMEI recovery for live_positions
Background
----------
A field audit of liveposition.rahamafresh.com on 2026-05-21 surfaced two
freshness gaps that share a single root cause: tracksolid.live_positions
was being written by only one path (the 60s polled sweep), and that path
silently omits devices that don't have a "current" fix in Jimi's
location.list response. Effect on the dashboard:
* 18 vehicles show OFFLINE for days-to-months — last fix is whatever
the sweep wrote before Jimi dropped them.
* 3 vehicles (KDK 780K, KCQ 618K, KCZ 476E) depend on dashcam fallback
because their dedicated tracker has been silent; the camera's lat/lng
arrives via /pushalarm webhooks (5,287/day, 100% lat/lng fill) but
we discard it after writing to tracksolid.alarms.
Verified upstream subscription state: only /pushalarm is registered with
Jimi; the n8n forwarders for /pushgps, /pushtripreport, /pushobd are
inactive. This change uses only data that already arrives.
What's in this PR
-----------------
ts_shared_rev.py
* upsert_live_position(cur, imei, lat, lng, gps_time, ..., extras=None)
— single time-guarded upsert all three writers will share. Guards on
is_valid_fix() (filters Zero-Island and out-of-range) and
EXCLUDED.gps_time > stored.gps_time so late-arriving alarms or
webhook retries can't rewind a fresher marker. COALESCE on optional
columns so sparse callers don't blank dense ones' values.
* get_stale_imeis(stale_minutes=30) — SELECT enabled_flag=1 devices
whose live_positions.gps_time is NULL or older than the threshold,
ordered NULLS FIRST so worst-offenders are in batch #1.
* ensure_device(cur, imei, device_name=None) — relocated from
webhook_receiver_rev so every live_positions writer can satisfy the
FK without re-defining the helper. The original underscore-prefixed
name in webhook_receiver_rev becomes a backwards-compat alias.
webhook_receiver_rev.py
* /pushalarm — after the alarm row insert, call upsert_live_position
with the alarm's lat/lng and alarmTime. Sits inside the existing
per-item SAVEPOINT, so a cross-feed failure rolls back only that
one alarm's cross-feed, not the alarm row.
ingest_movement_rev.py
* poll_live_positions — inline INSERT replaced with upsert_live_position
(extras dict carries the sweep-only columns). Same data, time-guarded.
* get_device_locations — inline INSERT replaced; also gains an
ensure_device call so it can be safely fed arbitrary IMEIs.
* poll_stale_locations() — new wrapper. Pulls get_stale_imeis() and
hands it to get_device_locations. Scheduled every 10 minutes plus a
startup catch-up call. Uses jimi.device.location.get which returns
*last-known* fix, so devices the 60s sweep drops can be re-warmed.
Expected post-deploy effect (estimates, see
260521_timescale_location_upgrade_major.md §4)
* ~1,100-1,600 additional live_positions upserts/day from the alarm
cross-feed, after the time-guard rejects ~70-80% of races vs the
fresher 60s sweep.
* The 3 camera-fallback plates flip to "seconds-after-alarm" cadence
(JC400P emits ~107 alarms/day per device).
* 8-14 of the 24 OFFLINE plates expected to recover via location.get's
last-known-fix path within the first 30 minutes.
* Dashboard's "Offline 24h+" KPI: 24 → 10-14 within the first hour.
* No 06_live_location code changes required — reads through
reporting.v_live_positions transparently.
Tests
-----
12 webhook integration tests pass (3 new: cross-feed fires on valid fix;
skips without lat/lng; skips Zero-Island). 8 new unit tests in
test_stale_imeis.py cover the stale selector, the poll wrapper, and the
time-guard contract on upsert_live_position. Full suite: 77 passed.
Deployment
----------
No schema migration. Both webhook_receiver and ingest_movement
containers must be rebuilt — source is image-baked, not bind-mounted.
Rollback is git revert + rebuild.
Plan & monitoring SQL: 06_live_location/260521_timescale_location_upgrade_major.md
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-21 18:05:26 +00:00
|
|
|
def get_stale_imeis(stale_minutes: int = 30) -> list[str]:
|
|
|
|
|
"""[FIX-M20] IMEIs whose live_positions fix is missing or older than N minutes.
|
|
|
|
|
|
|
|
|
|
Used by poll_stale_locations() to feed get_device_locations() with the
|
|
|
|
|
set the 60s sweep silently dropped. Ordered oldest-first (NULLs first)
|
|
|
|
|
so worst-offenders get the first seats in each 50-IMEI batch.
|
|
|
|
|
"""
|
|
|
|
|
with get_conn() as conn:
|
|
|
|
|
with conn.cursor() as cur:
|
|
|
|
|
cur.execute("""
|
|
|
|
|
SELECT d.imei
|
|
|
|
|
FROM tracksolid.devices d
|
|
|
|
|
LEFT JOIN tracksolid.live_positions lp USING (imei)
|
|
|
|
|
WHERE d.enabled_flag = 1
|
|
|
|
|
AND (lp.gps_time IS NULL
|
|
|
|
|
OR lp.gps_time < NOW() - (%s || ' minutes')::interval)
|
|
|
|
|
ORDER BY lp.gps_time ASC NULLS FIRST
|
|
|
|
|
""", (str(stale_minutes),))
|
|
|
|
|
return [r[0] for r in cur.fetchall()]
|
|
|
|
|
|
|
|
|
|
def ensure_device(cur, imei: str, device_name: Optional[str] = None) -> None:
|
|
|
|
|
"""[FIX-M20] Upsert a stub row into tracksolid.devices so FK-constrained
|
|
|
|
|
inserts don't fail when ingest paths see an IMEI before sync_devices does.
|
|
|
|
|
|
|
|
|
|
Lifted out of webhook_receiver_rev.py to be shareable by every writer
|
|
|
|
|
of live_positions / alarms / position_history. Idempotent.
|
|
|
|
|
"""
|
|
|
|
|
cur.execute(
|
|
|
|
|
"""
|
|
|
|
|
INSERT INTO tracksolid.devices (imei, device_name, status, created_at, updated_at)
|
|
|
|
|
VALUES (%s, %s, 'unknown', NOW(), NOW())
|
|
|
|
|
ON CONFLICT (imei) DO NOTHING
|
|
|
|
|
""",
|
|
|
|
|
(imei, device_name),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def upsert_live_position(
|
|
|
|
|
cur,
|
|
|
|
|
imei: str,
|
|
|
|
|
lat,
|
|
|
|
|
lng,
|
|
|
|
|
gps_time,
|
|
|
|
|
speed=None,
|
|
|
|
|
direction=None,
|
|
|
|
|
acc_status=None,
|
|
|
|
|
current_mileage=None,
|
|
|
|
|
extras: Optional[dict] = None,
|
|
|
|
|
) -> int:
|
|
|
|
|
"""[FIX-M20] Time-guarded upsert into tracksolid.live_positions.
|
|
|
|
|
|
|
|
|
|
Only overwrites the stored row when the incoming gps_time is strictly
|
|
|
|
|
newer than what's already there. NULL stored gps_time always loses
|
|
|
|
|
(any fix beats no fix). Returns 1 if a row was written/updated, else 0.
|
|
|
|
|
|
|
|
|
|
`extras` carries the columns only the 60s sweep emits
|
|
|
|
|
(pos_type, confidence, hb_time, gps_signal, gps_num, elec_quantity,
|
|
|
|
|
power_value, battery_power_val, tracker_oil, temperature,
|
|
|
|
|
device_status, loc_desc). When omitted, those columns are left alone
|
|
|
|
|
on update via COALESCE so a sparse caller (e.g. alarm cross-feed)
|
|
|
|
|
doesn't blank them out.
|
|
|
|
|
"""
|
|
|
|
|
if not imei or not gps_time or not is_valid_fix(lat, lng):
|
|
|
|
|
return 0
|
|
|
|
|
|
|
|
|
|
extras = extras or {}
|
|
|
|
|
cur.execute("""
|
|
|
|
|
INSERT INTO tracksolid.live_positions (
|
|
|
|
|
imei, geom, lat, lng, gps_time, speed, direction,
|
|
|
|
|
acc_status, current_mileage,
|
|
|
|
|
pos_type, confidence, hb_time, gps_signal, gps_num,
|
|
|
|
|
elec_quantity, power_value, battery_power_val,
|
|
|
|
|
tracker_oil, temperature, device_status, loc_desc,
|
|
|
|
|
recorded_at
|
|
|
|
|
) VALUES (
|
|
|
|
|
%(imei)s,
|
|
|
|
|
ST_SetSRID(ST_MakePoint(%(lng)s, %(lat)s), 4326),
|
|
|
|
|
%(lat)s, %(lng)s, %(gps_time)s, %(speed)s, %(direction)s,
|
|
|
|
|
%(acc_status)s, %(current_mileage)s,
|
|
|
|
|
%(pos_type)s, %(confidence)s, %(hb_time)s, %(gps_signal)s, %(gps_num)s,
|
|
|
|
|
%(elec_quantity)s, %(power_value)s, %(battery_power_val)s,
|
|
|
|
|
%(tracker_oil)s, %(temperature)s, %(device_status)s, %(loc_desc)s,
|
|
|
|
|
NOW()
|
|
|
|
|
)
|
|
|
|
|
ON CONFLICT (imei) DO UPDATE SET
|
|
|
|
|
geom = EXCLUDED.geom,
|
|
|
|
|
lat = EXCLUDED.lat,
|
|
|
|
|
lng = EXCLUDED.lng,
|
|
|
|
|
gps_time = EXCLUDED.gps_time,
|
|
|
|
|
speed = COALESCE(EXCLUDED.speed, tracksolid.live_positions.speed),
|
|
|
|
|
direction = COALESCE(EXCLUDED.direction, tracksolid.live_positions.direction),
|
|
|
|
|
acc_status = COALESCE(EXCLUDED.acc_status, tracksolid.live_positions.acc_status),
|
|
|
|
|
current_mileage = COALESCE(EXCLUDED.current_mileage, tracksolid.live_positions.current_mileage),
|
|
|
|
|
pos_type = COALESCE(EXCLUDED.pos_type, tracksolid.live_positions.pos_type),
|
|
|
|
|
confidence = COALESCE(EXCLUDED.confidence, tracksolid.live_positions.confidence),
|
|
|
|
|
hb_time = COALESCE(EXCLUDED.hb_time, tracksolid.live_positions.hb_time),
|
|
|
|
|
gps_signal = COALESCE(EXCLUDED.gps_signal, tracksolid.live_positions.gps_signal),
|
|
|
|
|
gps_num = COALESCE(EXCLUDED.gps_num, tracksolid.live_positions.gps_num),
|
|
|
|
|
elec_quantity = COALESCE(EXCLUDED.elec_quantity, tracksolid.live_positions.elec_quantity),
|
|
|
|
|
power_value = COALESCE(EXCLUDED.power_value, tracksolid.live_positions.power_value),
|
|
|
|
|
battery_power_val = COALESCE(EXCLUDED.battery_power_val, tracksolid.live_positions.battery_power_val),
|
|
|
|
|
tracker_oil = COALESCE(EXCLUDED.tracker_oil, tracksolid.live_positions.tracker_oil),
|
|
|
|
|
temperature = COALESCE(EXCLUDED.temperature, tracksolid.live_positions.temperature),
|
|
|
|
|
device_status = COALESCE(EXCLUDED.device_status, tracksolid.live_positions.device_status),
|
|
|
|
|
loc_desc = COALESCE(EXCLUDED.loc_desc, tracksolid.live_positions.loc_desc),
|
|
|
|
|
updated_at = NOW()
|
|
|
|
|
WHERE EXCLUDED.gps_time IS NOT NULL
|
|
|
|
|
AND (tracksolid.live_positions.gps_time IS NULL
|
|
|
|
|
OR EXCLUDED.gps_time > tracksolid.live_positions.gps_time)
|
|
|
|
|
""", {
|
|
|
|
|
"imei": imei,
|
|
|
|
|
"lat": lat,
|
|
|
|
|
"lng": lng,
|
|
|
|
|
"gps_time": gps_time,
|
|
|
|
|
"speed": speed,
|
|
|
|
|
"direction": direction,
|
|
|
|
|
"acc_status": acc_status,
|
|
|
|
|
"current_mileage": current_mileage,
|
|
|
|
|
"pos_type": extras.get("pos_type"),
|
|
|
|
|
"confidence": extras.get("confidence"),
|
|
|
|
|
"hb_time": extras.get("hb_time"),
|
|
|
|
|
"gps_signal": extras.get("gps_signal"),
|
|
|
|
|
"gps_num": extras.get("gps_num"),
|
|
|
|
|
"elec_quantity": extras.get("elec_quantity"),
|
|
|
|
|
"power_value": extras.get("power_value"),
|
|
|
|
|
"battery_power_val": extras.get("battery_power_val"),
|
|
|
|
|
"tracker_oil": extras.get("tracker_oil"),
|
|
|
|
|
"temperature": extras.get("temperature"),
|
|
|
|
|
"device_status": extras.get("device_status"),
|
|
|
|
|
"loc_desc": extras.get("loc_desc"),
|
|
|
|
|
})
|
|
|
|
|
return cur.rowcount
|
|
|
|
|
|
2026-04-24 07:43:07 +00:00
|
|
|
def get_active_imeis_by_target() -> dict[str, list[str]]:
|
|
|
|
|
"""[FIX-M19] Group active IMEIs by their Tracksolid sub-account so
|
|
|
|
|
endpoints that require an `account`/`target` param (e.g. parking) can
|
|
|
|
|
scope per-target calls. IMEIs with a NULL account are bucketed under
|
|
|
|
|
the primary TARGET_ACCOUNT as a safe default."""
|
|
|
|
|
with get_conn() as conn:
|
|
|
|
|
with conn.cursor() as cur:
|
|
|
|
|
cur.execute("""
|
|
|
|
|
SELECT COALESCE(account, %s) AS target, imei
|
|
|
|
|
FROM tracksolid.devices
|
|
|
|
|
WHERE enabled_flag = 1
|
|
|
|
|
""", (TARGET_ACCOUNT,))
|
|
|
|
|
out: dict[str, list[str]] = {}
|
|
|
|
|
for target, imei in cur.fetchall():
|
|
|
|
|
out.setdefault(target, []).append(imei)
|
|
|
|
|
return out
|
|
|
|
|
|
2026-04-07 18:34:40 +00:00
|
|
|
def log_ingestion(cur, endpoint: str, imei_count: int, upserted: int, inserted: int, duration_ms: int, success: bool, error_code: str = None, error_msg: str = None):
|
|
|
|
|
cur.execute("""
|
2026-04-11 15:19:13 +00:00
|
|
|
INSERT INTO tracksolid.ingestion_log
|
2026-04-07 18:34:40 +00:00
|
|
|
(endpoint, imei_count, rows_upserted, rows_inserted, duration_ms, success, error_code, error_message)
|
|
|
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
|
2026-04-11 15:19:13 +00:00
|
|
|
""", (
|
|
|
|
|
endpoint[:100], imei_count, upserted, inserted, duration_ms, success,
|
|
|
|
|
str(error_code)[:50] if error_code is not None else None,
|
|
|
|
|
str(error_msg)[:500] if error_msg is not None else None,
|
|
|
|
|
))
|
2026-04-07 18:34:40 +00:00
|
|
|
|
|
|
|
|
# ── Token Management ──────────────────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
def get_token() -> Optional[str]:
|
|
|
|
|
"""Cache-aware token fetcher with auto-refresh."""
|
|
|
|
|
with get_conn() as conn:
|
|
|
|
|
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
|
|
|
|
|
cur.execute("SELECT access_token, refresh_token, expires_at FROM tracksolid.api_token_cache WHERE account = %s", (USER_ID,))
|
|
|
|
|
row = cur.fetchone()
|
|
|
|
|
|
|
|
|
|
now = datetime.now(timezone.utc)
|
|
|
|
|
if row:
|
|
|
|
|
expires_at = row['expires_at'].replace(tzinfo=timezone.utc)
|
|
|
|
|
diff = (expires_at - now).total_seconds()
|
|
|
|
|
|
|
|
|
|
if diff > 1800: return row['access_token']
|
|
|
|
|
if diff > 0 and row['refresh_token']: return _refresh_token(row['refresh_token'])
|
|
|
|
|
|
|
|
|
|
return _fetch_new_token()
|
|
|
|
|
|
|
|
|
|
def _fetch_new_token() -> Optional[str]:
|
|
|
|
|
_log.info("Requesting new access token (Full Auth)...")
|
|
|
|
|
res = api_post("jimi.oauth.token.get", {"user_id": USER_ID, "user_pwd_md5": PWD_MD5, "expires_in": 7200})
|
|
|
|
|
if res.get("code") == 0:
|
|
|
|
|
return _update_token_cache(res["result"])
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def _refresh_token(refresh_token: str) -> Optional[str]:
|
|
|
|
|
_log.info("Refreshing access token...")
|
|
|
|
|
res = api_post("jimi.oauth.token.refresh", {"refresh_token": refresh_token})
|
|
|
|
|
if res.get("code") == 0:
|
|
|
|
|
return _update_token_cache(res["result"])
|
|
|
|
|
return _fetch_new_token()
|
|
|
|
|
|
|
|
|
|
def _update_token_cache(r: dict) -> str:
|
|
|
|
|
token, expires_in = r["accessToken"], int(r.get("expiresIn", 7200))
|
|
|
|
|
expires_at = datetime.now(timezone.utc) + timedelta(seconds=expires_in)
|
|
|
|
|
with get_conn() as conn:
|
|
|
|
|
with conn.cursor() as cur:
|
|
|
|
|
cur.execute("""
|
|
|
|
|
INSERT INTO tracksolid.api_token_cache (account, access_token, refresh_token, expires_at)
|
|
|
|
|
VALUES (%s, %s, %s, %s)
|
|
|
|
|
ON CONFLICT (account) DO UPDATE SET
|
|
|
|
|
access_token=EXCLUDED.access_token, refresh_token=EXCLUDED.refresh_token,
|
|
|
|
|
expires_at=EXCLUDED.expires_at, obtained_at=NOW()
|
|
|
|
|
""", (USER_ID, token, r.get("refreshToken"), expires_at))
|
|
|
|
|
conn.commit()
|
|
|
|
|
return token
|