Cherry-pick ofc8f5907(originally FIX-M20 on main) onto quality-program-2026-04-12 — renamed to FIX-M21 here to avoid clashing with this branch's existing [FIX-M20] (trip enrichment, commit144dede). Behaviour and code are unchanged from the main-branch original; the annotation tag is the only difference. Background ---------- A field audit of liveposition.rahamafresh.com on 2026-05-21 surfaced two freshness gaps that share a single root cause: tracksolid.live_positions was being written by only one path (the 60s polled sweep), and that path silently omits devices that don't have a "current" fix in Jimi's location.list response. Effect on the dashboard: * 18 vehicles show OFFLINE for days-to-months — last fix is whatever the sweep wrote before Jimi dropped them. * 3 vehicles (KDK 780K, KCQ 618K, KCZ 476E) depend on dashcam fallback because their dedicated tracker has been silent; the camera's lat/lng arrives via /pushalarm webhooks (5,287/day, 100% lat/lng fill) but we discard it after writing to tracksolid.alarms. Verified upstream subscription state: only /pushalarm is registered with Jimi; the n8n forwarders for /pushgps, /pushtripreport, /pushobd are inactive. This change uses only data that already arrives. What's in this commit --------------------- ts_shared_rev.py * upsert_live_position(cur, imei, lat, lng, gps_time, ..., extras=None) — single time-guarded upsert all three writers will share. Guards on is_valid_fix() (filters Zero-Island and out-of-range) and EXCLUDED.gps_time > stored.gps_time so late-arriving alarms or webhook retries can't rewind a fresher marker. COALESCE on optional columns so sparse callers don't blank dense ones' values. * get_stale_imeis(stale_minutes=30) — SELECT enabled_flag=1 devices whose live_positions.gps_time is NULL or older than the threshold, ordered NULLS FIRST so worst-offenders are in batch #1. * ensure_device(cur, imei, device_name=None) — relocated from webhook_receiver_rev so every live_positions writer can satisfy the FK without re-defining the helper. The original underscore-prefixed name in webhook_receiver_rev becomes a backwards-compat alias. webhook_receiver_rev.py * /pushalarm — after the alarm row insert, call upsert_live_position with the alarm's lat/lng and alarmTime. Sits inside the existing per-item SAVEPOINT, so a cross-feed failure rolls back only that one alarm's cross-feed, not the alarm row. ingest_movement_rev.py * poll_live_positions — inline INSERT replaced with upsert_live_position (extras dict carries the sweep-only columns). Same data, time-guarded. * get_device_locations — inline INSERT replaced; also gains an ensure_device call so it can be safely fed arbitrary IMEIs. * poll_stale_locations() — new wrapper. Pulls get_stale_imeis() and hands it to get_device_locations. Scheduled every 10 minutes plus a startup catch-up call. Uses jimi.device.location.get which returns *last-known* fix, so devices the 60s sweep drops can be re-warmed. Expected post-deploy effect (estimates, see 06_live_location/260521_timescale_location_upgrade_major.md §4) * ~1,100-1,600 additional live_positions upserts/day from the alarm cross-feed, after the time-guard rejects ~70-80% of races vs the fresher 60s sweep. * The 3 camera-fallback plates flip to "seconds-after-alarm" cadence (JC400P emits ~107 alarms/day per device). * 8-14 of the 24 OFFLINE plates expected to recover via location.get's last-known-fix path within the first 30 minutes. * Dashboard's "Offline 24h+" KPI: 24 → 10-14 within the first hour. * No 06_live_location code changes required — reads through reporting.v_live_positions transparently. Tests ----- 12 webhook integration tests pass (3 new: cross-feed fires on valid fix; skips without lat/lng; skips Zero-Island). 8 new unit tests in test_stale_imeis.py cover the stale selector, the poll wrapper, and the time-guard contract on upsert_live_position. Full suite: 77 passed. Deployment ---------- No schema migration. Both webhook_receiver and ingest_movement containers must be rebuilt — source is image-baked, not bind-mounted. Rollback is git revert + rebuild. Plan & monitoring SQL: 06_live_location/260521_timescale_location_upgrade_major.md Verification playbook: 06_live_location/260521_timescale_location_upgrade_verification.md Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
524 lines
No EOL
22 KiB
Python
524 lines
No EOL
22 KiB
Python
"""
|
|
ts_shared_rev.py — Fireside Communications · Tracksolid Pro Ingestion Stack
|
|
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
|
Shared utilities: config, signing, HTTP, DB pool, token cache, clean helpers.
|
|
Imported by ingest_movement_rev.py, ingest_events_rev.py, and webhook_receiver_rev.py.
|
|
|
|
REVISIONS (QA-Verified):
|
|
[FIX-01] Secrets exclusively from env (Security).
|
|
[FIX-02] psycopg2.pool.ThreadedConnectionPool (Performance).
|
|
[FIX-03] Exponential back-off on transient HTTP/API errors (Resiliency).
|
|
[FIX-04] Token refresh via jimi.oauth.token.refresh (Efficiency).
|
|
[FIX-05] API rate-limit (1006) back-off + re-sign (Resiliency).
|
|
[FIX-QA-01] clean_num/clean_int return None on non-numeric (Data Integrity).
|
|
[FIX-QA-02] api_post catches all RequestExceptions for retry (Robustness).
|
|
[FIX-09] get_conn auto-commits on success (Data Integrity).
|
|
[FIX-11] Consolidated safe_task/setup_shutdown (DRY).
|
|
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import logging
|
|
import os
|
|
import signal
|
|
import sys
|
|
import threading
|
|
import time
|
|
from contextlib import contextmanager
|
|
from datetime import datetime, timezone, timedelta
|
|
from functools import lru_cache
|
|
from typing import Optional, Any
|
|
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
import psycopg2.pool
|
|
import requests
|
|
from requests.adapters import HTTPAdapter
|
|
from urllib3.util.retry import Retry
|
|
|
|
# ── Configuration ─────────────────────────────────────────────────────────────
|
|
|
|
def _require_env(key: str) -> str:
|
|
v = os.getenv(key)
|
|
if not v:
|
|
raise EnvironmentError(f"Required environment variable '{key}' is missing.")
|
|
return v
|
|
|
|
APP_KEY = _require_env("TRACKSOLID_APP_KEY")
|
|
APP_SECRET = _require_env("TRACKSOLID_APP_SECRET")
|
|
USER_ID = _require_env("TRACKSOLID_USER_ID")
|
|
TARGET_ACCOUNT = os.getenv("TRACKSOLID_TARGET_ACCOUNT", USER_ID)
|
|
# [FIX-M19] Multi-account support: the fleet is split across multiple
|
|
# Tracksolid sub-accounts (e.g. fireside, Fireside@HQ, Fireside_MSA).
|
|
# TRACKSOLID_TARGETS is a comma-separated list; falls back to TARGET_ACCOUNT.
|
|
TARGETS = [
|
|
t.strip() for t in os.getenv("TRACKSOLID_TARGETS", "").split(",") if t.strip()
|
|
] or [TARGET_ACCOUNT]
|
|
PWD_MD5 = _require_env("TRACKSOLID_PWD_MD5")
|
|
DATABASE_URL = _require_env("DATABASE_URL")
|
|
API_BASE_URL = os.getenv("TRACKSOLID_API_URL", "https://eu-open.tracksolidpro.com/route/rest")
|
|
|
|
# Pool sizing: Min 2 for low traffic, Max 12 for high frequency telemetry
|
|
_POOL_MIN = 2
|
|
_POOL_MAX = int(os.getenv("DB_POOL_MAX", "12"))
|
|
|
|
# ── Logging ───────────────────────────────────────────────────────────────────
|
|
|
|
def get_logger(name: str) -> logging.Logger:
|
|
"""Standardized logger for systemd/journald ingestion."""
|
|
root = logging.getLogger("tracksolid")
|
|
if not root.handlers:
|
|
handler = logging.StreamHandler()
|
|
handler.setFormatter(logging.Formatter(
|
|
"%(asctime)s [%(levelname)s] %(name)s — %(message)s",
|
|
datefmt="%Y-%m-%d %H:%M:%S",
|
|
))
|
|
root.addHandler(handler)
|
|
root.setLevel(logging.INFO)
|
|
return root.getChild(name)
|
|
|
|
_log = get_logger("shared")
|
|
|
|
# ── Connection Pool (psycopg2) ───────────────────────────────────────────────
|
|
|
|
_pool: Optional[psycopg2.pool.ThreadedConnectionPool] = None
|
|
|
|
def _get_pool() -> psycopg2.pool.ThreadedConnectionPool:
|
|
global _pool
|
|
if _pool is None or _pool.closed:
|
|
_pool = psycopg2.pool.ThreadedConnectionPool(
|
|
_POOL_MIN, _POOL_MAX, DATABASE_URL,
|
|
options="-c client_encoding=UTF8",
|
|
)
|
|
_log.info("DB Pool initialized (min=%d, max=%d)", _POOL_MIN, _POOL_MAX)
|
|
return _pool
|
|
|
|
@contextmanager
|
|
def get_conn():
|
|
"""Thread-safe DB connection context manager. Auto-commits on success, rolls back on error."""
|
|
pool = _get_pool()
|
|
conn = pool.getconn()
|
|
try:
|
|
conn.autocommit = False
|
|
yield conn
|
|
conn.commit()
|
|
except Exception:
|
|
conn.rollback()
|
|
raise
|
|
finally:
|
|
pool.putconn(conn)
|
|
|
|
def close_pool():
|
|
global _pool
|
|
if _pool:
|
|
_pool.closeall()
|
|
_log.info("DB Pool closed.")
|
|
|
|
# ── Scheduler / Signal Utilities ─────────────────────────────────────────────
|
|
|
|
def safe_task(fn, logger=None):
|
|
"""Decorator to prevent scheduler death on single function failure."""
|
|
_logger = logger or _log
|
|
def wrapper():
|
|
try:
|
|
fn()
|
|
except Exception:
|
|
_logger.exception("Task %s failed. Scheduler continuing...", fn.__name__)
|
|
wrapper.__name__ = fn.__name__
|
|
return wrapper
|
|
|
|
def setup_shutdown(logger=None):
|
|
"""Register SIGTERM/SIGINT handlers for clean DB pool closure."""
|
|
_logger = logger or _log
|
|
def _handler(signum, frame):
|
|
_logger.info("Signal %s received. Closing DB pool...", signum)
|
|
close_pool()
|
|
sys.exit(0)
|
|
signal.signal(signal.SIGTERM, _handler)
|
|
signal.signal(signal.SIGINT, _handler)
|
|
|
|
# ── Value Cleaning (QA Fixes) ─────────────────────────────────────────────────
|
|
|
|
def clean(v: Any) -> Optional[str]:
|
|
if v is None: return None
|
|
s = str(v).strip()
|
|
return s if s != "" else None
|
|
|
|
def clean_num(v: Any) -> Optional[float]:
|
|
"""QA-01: Explicitly returns None for non-numeric strings."""
|
|
s = clean(v)
|
|
if s is None: return None
|
|
try:
|
|
return float(s)
|
|
except (ValueError, TypeError):
|
|
return None
|
|
|
|
def clean_int(v: Any) -> Optional[int]:
|
|
s = clean(v)
|
|
if s is None: return None
|
|
try:
|
|
return int(float(s))
|
|
except (ValueError, TypeError):
|
|
return None
|
|
|
|
def clean_ts(v: Any) -> Optional[str]:
|
|
"""Clean timestamp string for PostgreSQL insertion."""
|
|
s = clean(v)
|
|
if s is None:
|
|
return None
|
|
try:
|
|
datetime.fromisoformat(s.replace("Z", "+00:00"))
|
|
return s
|
|
except (ValueError, TypeError):
|
|
return None
|
|
|
|
def is_valid_fix(lat: Any, lng: Any) -> bool:
|
|
"""Filters out 0,0 'Zero Island' markers and null positions."""
|
|
flat, flng = clean_num(lat), clean_num(lng)
|
|
if flat is None or flng is None: return False
|
|
if flat == 0.0 and flng == 0.0: return False
|
|
return (-90 <= flat <= 90) and (-180 <= flng <= 180)
|
|
|
|
# ── API Signature & HTTP ──────────────────────────────────────────────────────
|
|
|
|
def build_sign(params: dict, secret: str) -> str:
|
|
"""Tracksolid MD5 Signature: secret + k1v1k2v2... + secret."""
|
|
sorted_keys = sorted(k for k in params if k != "sign" and params[k] is not None)
|
|
raw = secret + "".join(f"{k}{params[k]}" for k in sorted_keys) + secret
|
|
return hashlib.md5(raw.encode("utf-8")).hexdigest().upper()
|
|
|
|
_session = requests.Session()
|
|
_session.mount("https://", HTTPAdapter(max_retries=Retry(total=3, backoff_factor=1)))
|
|
|
|
def api_post(method: str, extra: dict, access_token: Optional[str] = None, _retry_count: int = 0) -> dict:
|
|
"""
|
|
Production-grade API caller.
|
|
Handles: Retries, Signing, Rate Limiting (1006), and Token Expiry (1004).
|
|
"""
|
|
params = {
|
|
"method": method,
|
|
"app_key": APP_KEY,
|
|
"timestamp": datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S"),
|
|
"sign_method": "md5",
|
|
"v": "1.0",
|
|
"format": "json",
|
|
}
|
|
if access_token: params["access_token"] = access_token
|
|
params.update(extra)
|
|
params["sign"] = build_sign(params, APP_SECRET)
|
|
|
|
try:
|
|
r = _session.post(API_BASE_URL, data=params, timeout=25)
|
|
r.raise_for_status()
|
|
data = r.json()
|
|
except (requests.RequestException, ValueError) as e:
|
|
if _retry_count < 3:
|
|
time.sleep(2 ** _retry_count)
|
|
return api_post(method, extra, access_token, _retry_count + 1)
|
|
return {"code": -1, "message": str(e)}
|
|
|
|
code = data.get("code")
|
|
# Handle Rate Limit (1006)
|
|
if code == 1006 and _retry_count < 3:
|
|
wait = 10 * (_retry_count + 1)
|
|
_log.warning("Rate limit hit [%s]. Backing off %ds...", method, wait)
|
|
time.sleep(wait)
|
|
return api_post(method, extra, access_token, _retry_count + 1)
|
|
|
|
return data
|
|
|
|
# ── Database Operations ───────────────────────────────────────────────────────
|
|
|
|
def get_active_imeis() -> list[str]:
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute("SELECT imei FROM tracksolid.devices WHERE enabled_flag = 1")
|
|
return [r[0] for r in cur.fetchall()]
|
|
|
|
def get_stale_imeis(stale_minutes: int = 30) -> list[str]:
|
|
"""[FIX-M21] IMEIs whose live_positions fix is missing or older than N minutes.
|
|
|
|
Used by poll_stale_locations() to feed get_device_locations() with the
|
|
set the 60s sweep silently dropped. Ordered oldest-first (NULLs first)
|
|
so worst-offenders get the first seats in each 50-IMEI batch.
|
|
"""
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute("""
|
|
SELECT d.imei
|
|
FROM tracksolid.devices d
|
|
LEFT JOIN tracksolid.live_positions lp USING (imei)
|
|
WHERE d.enabled_flag = 1
|
|
AND (lp.gps_time IS NULL
|
|
OR lp.gps_time < NOW() - (%s || ' minutes')::interval)
|
|
ORDER BY lp.gps_time ASC NULLS FIRST
|
|
""", (str(stale_minutes),))
|
|
return [r[0] for r in cur.fetchall()]
|
|
|
|
def ensure_device(cur, imei: str, device_name: Optional[str] = None) -> None:
|
|
"""[FIX-M21] Upsert a stub row into tracksolid.devices so FK-constrained
|
|
inserts don't fail when ingest paths see an IMEI before sync_devices does.
|
|
|
|
Lifted out of webhook_receiver_rev.py to be shareable by every writer
|
|
of live_positions / alarms / position_history. Idempotent.
|
|
"""
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO tracksolid.devices (imei, device_name, status, created_at, updated_at)
|
|
VALUES (%s, %s, 'unknown', NOW(), NOW())
|
|
ON CONFLICT (imei) DO NOTHING
|
|
""",
|
|
(imei, device_name),
|
|
)
|
|
|
|
def upsert_live_position(
|
|
cur,
|
|
imei: str,
|
|
lat,
|
|
lng,
|
|
gps_time,
|
|
speed=None,
|
|
direction=None,
|
|
acc_status=None,
|
|
current_mileage=None,
|
|
extras: Optional[dict] = None,
|
|
) -> int:
|
|
"""[FIX-M21] Time-guarded upsert into tracksolid.live_positions.
|
|
|
|
Only overwrites the stored row when the incoming gps_time is strictly
|
|
newer than what's already there. NULL stored gps_time always loses
|
|
(any fix beats no fix). Returns 1 if a row was written/updated, else 0.
|
|
|
|
`extras` carries the columns only the 60s sweep emits
|
|
(pos_type, confidence, hb_time, gps_signal, gps_num, elec_quantity,
|
|
power_value, battery_power_val, tracker_oil, temperature,
|
|
device_status, loc_desc). When omitted, those columns are left alone
|
|
on update via COALESCE so a sparse caller (e.g. alarm cross-feed)
|
|
doesn't blank them out.
|
|
"""
|
|
if not imei or not gps_time or not is_valid_fix(lat, lng):
|
|
return 0
|
|
|
|
extras = extras or {}
|
|
cur.execute("""
|
|
INSERT INTO tracksolid.live_positions (
|
|
imei, geom, lat, lng, gps_time, speed, direction,
|
|
acc_status, current_mileage,
|
|
pos_type, confidence, hb_time, gps_signal, gps_num,
|
|
elec_quantity, power_value, battery_power_val,
|
|
tracker_oil, temperature, device_status, loc_desc,
|
|
recorded_at
|
|
) VALUES (
|
|
%(imei)s,
|
|
ST_SetSRID(ST_MakePoint(%(lng)s, %(lat)s), 4326),
|
|
%(lat)s, %(lng)s, %(gps_time)s, %(speed)s, %(direction)s,
|
|
%(acc_status)s, %(current_mileage)s,
|
|
%(pos_type)s, %(confidence)s, %(hb_time)s, %(gps_signal)s, %(gps_num)s,
|
|
%(elec_quantity)s, %(power_value)s, %(battery_power_val)s,
|
|
%(tracker_oil)s, %(temperature)s, %(device_status)s, %(loc_desc)s,
|
|
NOW()
|
|
)
|
|
ON CONFLICT (imei) DO UPDATE SET
|
|
geom = EXCLUDED.geom,
|
|
lat = EXCLUDED.lat,
|
|
lng = EXCLUDED.lng,
|
|
gps_time = EXCLUDED.gps_time,
|
|
speed = COALESCE(EXCLUDED.speed, tracksolid.live_positions.speed),
|
|
direction = COALESCE(EXCLUDED.direction, tracksolid.live_positions.direction),
|
|
acc_status = COALESCE(EXCLUDED.acc_status, tracksolid.live_positions.acc_status),
|
|
current_mileage = COALESCE(EXCLUDED.current_mileage, tracksolid.live_positions.current_mileage),
|
|
pos_type = COALESCE(EXCLUDED.pos_type, tracksolid.live_positions.pos_type),
|
|
confidence = COALESCE(EXCLUDED.confidence, tracksolid.live_positions.confidence),
|
|
hb_time = COALESCE(EXCLUDED.hb_time, tracksolid.live_positions.hb_time),
|
|
gps_signal = COALESCE(EXCLUDED.gps_signal, tracksolid.live_positions.gps_signal),
|
|
gps_num = COALESCE(EXCLUDED.gps_num, tracksolid.live_positions.gps_num),
|
|
elec_quantity = COALESCE(EXCLUDED.elec_quantity, tracksolid.live_positions.elec_quantity),
|
|
power_value = COALESCE(EXCLUDED.power_value, tracksolid.live_positions.power_value),
|
|
battery_power_val = COALESCE(EXCLUDED.battery_power_val, tracksolid.live_positions.battery_power_val),
|
|
tracker_oil = COALESCE(EXCLUDED.tracker_oil, tracksolid.live_positions.tracker_oil),
|
|
temperature = COALESCE(EXCLUDED.temperature, tracksolid.live_positions.temperature),
|
|
device_status = COALESCE(EXCLUDED.device_status, tracksolid.live_positions.device_status),
|
|
loc_desc = COALESCE(EXCLUDED.loc_desc, tracksolid.live_positions.loc_desc),
|
|
updated_at = NOW()
|
|
WHERE EXCLUDED.gps_time IS NOT NULL
|
|
AND (tracksolid.live_positions.gps_time IS NULL
|
|
OR EXCLUDED.gps_time > tracksolid.live_positions.gps_time)
|
|
""", {
|
|
"imei": imei,
|
|
"lat": lat,
|
|
"lng": lng,
|
|
"gps_time": gps_time,
|
|
"speed": speed,
|
|
"direction": direction,
|
|
"acc_status": acc_status,
|
|
"current_mileage": current_mileage,
|
|
"pos_type": extras.get("pos_type"),
|
|
"confidence": extras.get("confidence"),
|
|
"hb_time": extras.get("hb_time"),
|
|
"gps_signal": extras.get("gps_signal"),
|
|
"gps_num": extras.get("gps_num"),
|
|
"elec_quantity": extras.get("elec_quantity"),
|
|
"power_value": extras.get("power_value"),
|
|
"battery_power_val": extras.get("battery_power_val"),
|
|
"tracker_oil": extras.get("tracker_oil"),
|
|
"temperature": extras.get("temperature"),
|
|
"device_status": extras.get("device_status"),
|
|
"loc_desc": extras.get("loc_desc"),
|
|
})
|
|
return cur.rowcount
|
|
|
|
def get_active_imeis_by_target() -> dict[str, list[str]]:
|
|
"""[FIX-M19] Group active IMEIs by their Tracksolid sub-account so
|
|
endpoints that require an `account`/`target` param (e.g. parking) can
|
|
scope per-target calls. IMEIs with a NULL account are bucketed under
|
|
the primary TARGET_ACCOUNT as a safe default."""
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute("""
|
|
SELECT COALESCE(account, %s) AS target, imei
|
|
FROM tracksolid.devices
|
|
WHERE enabled_flag = 1
|
|
""", (TARGET_ACCOUNT,))
|
|
out: dict[str, list[str]] = {}
|
|
for target, imei in cur.fetchall():
|
|
out.setdefault(target, []).append(imei)
|
|
return out
|
|
|
|
def log_ingestion(cur, endpoint: str, imei_count: int, upserted: int, inserted: int, duration_ms: int, success: bool, error_code: str = None, error_msg: str = None):
|
|
cur.execute("""
|
|
INSERT INTO tracksolid.ingestion_log
|
|
(endpoint, imei_count, rows_upserted, rows_inserted, duration_ms, success, error_code, error_message)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
|
|
""", (
|
|
endpoint[:100], imei_count, upserted, inserted, duration_ms, success,
|
|
str(error_code)[:50] if error_code is not None else None,
|
|
str(error_msg)[:500] if error_msg is not None else None,
|
|
))
|
|
|
|
# ── Token Management ──────────────────────────────────────────────────────────
|
|
|
|
def get_token() -> Optional[str]:
|
|
"""Cache-aware token fetcher with auto-refresh."""
|
|
with get_conn() as conn:
|
|
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
|
|
cur.execute("SELECT access_token, refresh_token, expires_at FROM tracksolid.api_token_cache WHERE account = %s", (USER_ID,))
|
|
row = cur.fetchone()
|
|
|
|
now = datetime.now(timezone.utc)
|
|
if row:
|
|
expires_at = row['expires_at'].replace(tzinfo=timezone.utc)
|
|
diff = (expires_at - now).total_seconds()
|
|
|
|
if diff > 1800: return row['access_token']
|
|
if diff > 0 and row['refresh_token']: return _refresh_token(row['refresh_token'])
|
|
|
|
return _fetch_new_token()
|
|
|
|
def _fetch_new_token() -> Optional[str]:
|
|
_log.info("Requesting new access token (Full Auth)...")
|
|
res = api_post("jimi.oauth.token.get", {"user_id": USER_ID, "user_pwd_md5": PWD_MD5, "expires_in": 7200})
|
|
if res.get("code") == 0:
|
|
return _update_token_cache(res["result"])
|
|
return None
|
|
|
|
def _refresh_token(refresh_token: str) -> Optional[str]:
|
|
_log.info("Refreshing access token...")
|
|
res = api_post("jimi.oauth.token.refresh", {"refresh_token": refresh_token})
|
|
if res.get("code") == 0:
|
|
return _update_token_cache(res["result"])
|
|
return _fetch_new_token()
|
|
|
|
def _update_token_cache(r: dict) -> str:
|
|
token, expires_in = r["accessToken"], int(r.get("expiresIn", 7200))
|
|
expires_at = datetime.now(timezone.utc) + timedelta(seconds=expires_in)
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute("""
|
|
INSERT INTO tracksolid.api_token_cache (account, access_token, refresh_token, expires_at)
|
|
VALUES (%s, %s, %s, %s)
|
|
ON CONFLICT (account) DO UPDATE SET
|
|
access_token=EXCLUDED.access_token, refresh_token=EXCLUDED.refresh_token,
|
|
expires_at=EXCLUDED.expires_at, obtained_at=NOW()
|
|
""", (USER_ID, token, r.get("refreshToken"), expires_at))
|
|
conn.commit()
|
|
return token
|
|
|
|
# ── Reverse Geocoding (Nominatim) ────────────────────────────────────────────
|
|
# Best-effort lookup used by poll_trips() to populate trips.start_address /
|
|
# end_address. Must NEVER raise — failure returns None and the trip insert
|
|
# proceeds without the address.
|
|
|
|
_NOMINATIM_URL = os.getenv(
|
|
"NOMINATIM_URL",
|
|
"https://nominatim.openstreetmap.org/reverse",
|
|
)
|
|
_NOMINATIM_USER_AGENT = os.getenv(
|
|
"NOMINATIM_USER_AGENT",
|
|
"fireside-tracksolid/1.0 (kianiadee@gmail.com)",
|
|
)
|
|
_GEOCODE_LOCK = threading.Lock()
|
|
_GEOCODE_LAST_CALL_AT: float = 0.0
|
|
_GEOCODE_MIN_INTERVAL_S: float = 1.0 # Nominatim TOS — 1 req/sec absolute max
|
|
|
|
|
|
def _geocode_throttle() -> None:
|
|
"""Sleep just long enough since the previous call to honour 1 req/sec."""
|
|
global _GEOCODE_LAST_CALL_AT
|
|
with _GEOCODE_LOCK:
|
|
elapsed = time.monotonic() - _GEOCODE_LAST_CALL_AT
|
|
if elapsed < _GEOCODE_MIN_INTERVAL_S:
|
|
time.sleep(_GEOCODE_MIN_INTERVAL_S - elapsed)
|
|
_GEOCODE_LAST_CALL_AT = time.monotonic()
|
|
|
|
|
|
@lru_cache(maxsize=2048)
|
|
def _reverse_geocode_cached(lat_round: float, lng_round: float) -> Optional[str]:
|
|
"""Cached HTTP call. Key is lat/lng rounded to 4 dp (~11 m precision)."""
|
|
_geocode_throttle()
|
|
try:
|
|
r = _session.get(
|
|
_NOMINATIM_URL,
|
|
params={
|
|
"lat": lat_round,
|
|
"lon": lng_round,
|
|
"format": "json",
|
|
"zoom": 18,
|
|
"addressdetails": 0,
|
|
},
|
|
headers={"User-Agent": _NOMINATIM_USER_AGENT},
|
|
timeout=10,
|
|
)
|
|
r.raise_for_status()
|
|
data = r.json()
|
|
addr = data.get("display_name")
|
|
if addr:
|
|
return addr.strip()
|
|
return None
|
|
except (requests.RequestException, ValueError) as e:
|
|
_log.warning("reverse_geocode failed lat=%s lng=%s: %s",
|
|
lat_round, lng_round, e)
|
|
return None
|
|
|
|
|
|
def reverse_geocode(lat: Any, lng: Any) -> Optional[str]:
|
|
"""
|
|
Reverse-geocode a coordinate to a human-readable address via Nominatim.
|
|
|
|
Best-effort. Never raises. Returns None on:
|
|
• missing / invalid lat or lng
|
|
• HTTP/timeout/JSON failure
|
|
• Nominatim returns no display_name
|
|
|
|
Cached on lat/lng rounded to 4 decimal places (~11 m), which keeps
|
|
repeated visits to the same depot/site from re-querying.
|
|
"""
|
|
flat, flng = clean_num(lat), clean_num(lng)
|
|
if flat is None or flng is None:
|
|
return None
|
|
if flat == 0.0 and flng == 0.0:
|
|
return None
|
|
if not (-90 <= flat <= 90 and -180 <= flng <= 180):
|
|
return None
|
|
return _reverse_geocode_cached(round(flat, 4), round(flng, 4)) |