From fa110f4313485a34ae6cc9ead165fbb7ad1050a7 Mon Sep 17 00:00:00 2001 From: David Kiania Date: Fri, 24 Apr 2026 10:43:07 +0300 Subject: [PATCH] feat: [FIX-M19] multi-account ingest across fireside sub-accounts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fleet lives across three Tracksolid sub-accounts: fireside — 63 devices Fireside@HQ — 52 devices Fireside_MSA — 41 devices Previously sync_devices / poll_live_positions / poll_parking only queried a single TARGET_ACCOUNT, so ~64% of the fleet was invisible to the pipeline. Changes: - ts_shared_rev.py: new TARGETS list (env TRACKSOLID_TARGETS, comma-separated; falls back to the single TARGET_ACCOUNT). - ts_shared_rev.py: new get_active_imeis_by_target() helper that groups active IMEIs by their stored account so parking calls can pass the right account param per batch. - ingest_movement_rev.py: sync_devices and poll_live_positions loop over every target and dedupe by IMEI before upserting. poll_parking loops over imeis_by_target so each batch carries the matching account. - CLAUDE.md: FIX-M19 entry. Requires new env var TRACKSOLID_TARGETS="fireside,Fireside@HQ,Fireside_MSA" on the ingest services in Coolify. Co-Authored-By: Claude Opus 4.7 --- CLAUDE.md | 1 + ingest_movement_rev.py | 138 +++++++++++++++++++++++++---------------- ts_shared_rev.py | 23 +++++++ 3 files changed, 109 insertions(+), 53 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 04f917b..f7b27ac 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -172,6 +172,7 @@ All views carry a `COMMENT ON VIEW` referencing their spec — `\d+ tracksolid.v | FIX-M16 | `ingest_movement_rev.py` | `distance` from API is metres → divide by 1000 before storing | | FIX-M17 | `ingest_movement_rev.py` | `sync_devices()` ON CONFLICT now updates all 26 fields (was 5) | | FIX-M18 | `ingest_movement_rev.py` | `sync_devices()` pulls `vehicleName`/`vehicleNumber`/`driverName`/`driverPhone`/`sim` from `jimi.track.device.detail` — list endpoint returns null for these even when set | +| FIX-M19 | `ts_shared_rev.py`, `ingest_movement_rev.py` | Multi-account support: fleet spans `fireside`, `Fireside@HQ`, `Fireside_MSA` (156 devices total). `sync_devices`, `poll_live_positions`, `poll_parking` iterate `TRACKSOLID_TARGETS` (comma-separated env var). New helper `get_active_imeis_by_target()` scopes parking calls to the right account | | FIX-E06 | `ingest_events_rev.py` | Alarm field mapping: `alertTypeId`/`alarmTypeName`/`alertTime` | | BUG-02 | Migration 04 | Historical `distance_m` rows ÷1,000,000 → renamed to `distance_km` | diff --git a/ingest_movement_rev.py b/ingest_movement_rev.py index 9ccc332..90651d5 100644 --- a/ingest_movement_rev.py +++ b/ingest_movement_rev.py @@ -34,6 +34,10 @@ REVISIONS (QA-Verified): driverPhone / sim from jimi.track.device.detail first (list endpoint returns null for these even when populated via jimi.open.device.update). + [FIX-M19] Multi-account support: the fleet is split across multiple + Tracksolid sub-accounts. sync_devices, poll_live_positions + and poll_parking now iterate every target in TRACKSOLID_TARGETS + and dedupe/scope per-target before writing. ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ """ @@ -46,8 +50,10 @@ from psycopg2.extras import execute_values from ts_shared_rev import ( TARGET_ACCOUNT, + TARGETS, api_post, get_active_imeis, + get_active_imeis_by_target, get_conn, get_token, is_valid_fix, @@ -67,14 +73,26 @@ setup_shutdown(log) # ── 1. Device Registry Sync (Daily) ────────────────────────────────────────── def sync_devices(): - log.info("Syncing device registry...") + log.info("Syncing device registry across %d target(s): %s", len(TARGETS), TARGETS) t0, token = time.time(), get_token() if not token: return - resp = api_post("jimi.user.device.list", {"target": TARGET_ACCOUNT}, token) - if resp.get("code") != 0: return + # [FIX-M19] Fleet is split across multiple sub-accounts. Aggregate the + # device list from every configured target and dedupe by IMEI. + devices_by_imei: dict[str, dict] = {} + for target in TARGETS: + resp = api_post("jimi.user.device.list", {"target": target}, token) + if resp.get("code") != 0: + log.warning("device.list failed for target=%s: code=%s msg=%s", + target, resp.get("code"), resp.get("message")) + continue + for d in (resp.get("result") or []): + imei = d.get("imei") + if imei: + devices_by_imei[imei] = d - devices = resp.get("result") or [] + devices = list(devices_by_imei.values()) + log.info("Aggregated %d unique devices across targets.", len(devices)) upserted = 0 # Fetch per-device detail in parallel — previously an N+1 blocker where @@ -165,10 +183,21 @@ def poll_live_positions(): t0, token = time.time(), get_token() if not token: return - resp = api_post("jimi.user.device.location.list", {"target": TARGET_ACCOUNT, "map_type": "GOOGLE"}, token) - if resp.get("code") != 0: return + # [FIX-M19] Iterate every target and dedupe by IMEI (keep last). + positions_by_imei: dict[str, dict] = {} + for target in TARGETS: + resp = api_post("jimi.user.device.location.list", + {"target": target, "map_type": "GOOGLE"}, token) + if resp.get("code") != 0: + log.warning("location.list failed for target=%s: code=%s msg=%s", + target, resp.get("code"), resp.get("message")) + continue + for p in (resp.get("result") or []): + imei = p.get("imei") + if imei: + positions_by_imei[imei] = p - positions = resp.get("result") or [] + positions = list(positions_by_imei.values()) upserted, inserted = 0, 0 with get_conn() as conn: @@ -288,63 +317,66 @@ def poll_trips(): def poll_parking(): t0 = time.time() - token, imeis = get_token(), get_active_imeis() - if not token or not imeis: return + # [FIX-M19] Parking requires an `account` param tied to the IMEI's + # sub-account — call per target with that target's IMEIs only. + token, imeis_by_target = get_token(), get_active_imeis_by_target() + if not token or not imeis_by_target: return end_ts = datetime.now(timezone.utc) start_ts = end_ts - timedelta(hours=1) + total_imei = sum(len(v) for v in imeis_by_target.values()) inserted = 0 with get_conn() as conn: with conn.cursor() as cur: - for i in range(0, len(imeis), 50): - batch = imeis[i:i+50] - # [FIX-M13] Added account + acc_type=0 (all stop types). Without these - # the API returns empty results even when parking events exist. - resp = api_post("jimi.open.platform.report.parking", { - "account": TARGET_ACCOUNT, - "imeis": ",".join(batch), - "begin_time": start_ts.strftime("%Y-%m-%d %H:%M:%S"), - "end_time": end_ts.strftime("%Y-%m-%d %H:%M:%S"), - "acc_type": 0, - }, token) + for target, target_imeis in imeis_by_target.items(): + for i in range(0, len(target_imeis), 50): + batch = target_imeis[i:i+50] + # [FIX-M13] account + acc_type=0 required for non-empty results. + resp = api_post("jimi.open.platform.report.parking", { + "account": target, + "imeis": ",".join(batch), + "begin_time": start_ts.strftime("%Y-%m-%d %H:%M:%S"), + "end_time": end_ts.strftime("%Y-%m-%d %H:%M:%S"), + "acc_type": 0, + }, token) - events = resp.get("result") or [] - for p in events: - try: - cur.execute("SAVEPOINT sp") - imei = p.get("imei") - start_time = clean_ts(p.get("startTime")) - if not imei or not start_time: + events = resp.get("result") or [] + for p in events: + try: + cur.execute("SAVEPOINT sp") + imei = p.get("imei") + start_time = clean_ts(p.get("startTime")) + if not imei or not start_time: + cur.execute("RELEASE SAVEPOINT sp") + continue + lat, lng = clean_num(p.get("lat")), clean_num(p.get("lng")) + cur.execute(""" + INSERT INTO tracksolid.parking_events ( + imei, event_type, start_time, end_time, + duration_seconds, geom, address + ) VALUES ( + %s, 'parking', %s, %s, %s, + CASE WHEN %s IS NOT NULL AND %s IS NOT NULL + THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326) + ELSE NULL END, + %s + ) ON CONFLICT (imei, start_time, event_type) DO NOTHING + """, ( + imei, start_time, clean_ts(p.get("endTime")), + clean_int(p.get("durSecond")), # [FIX-M13] API returns durSecond, not seconds + lng, lat, lng, lat, + clean(p.get("address")) + )) cur.execute("RELEASE SAVEPOINT sp") - continue - lat, lng = clean_num(p.get("lat")), clean_num(p.get("lng")) - cur.execute(""" - INSERT INTO tracksolid.parking_events ( - imei, event_type, start_time, end_time, - duration_seconds, geom, address - ) VALUES ( - %s, 'parking', %s, %s, %s, - CASE WHEN %s IS NOT NULL AND %s IS NOT NULL - THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326) - ELSE NULL END, - %s - ) ON CONFLICT (imei, start_time, event_type) DO NOTHING - """, ( - imei, start_time, clean_ts(p.get("endTime")), - clean_int(p.get("durSecond")), # [FIX-M13] API returns durSecond, not seconds - lng, lat, lng, lat, - clean(p.get("address")) - )) - cur.execute("RELEASE SAVEPOINT sp") - inserted += cur.rowcount - except Exception: - cur.execute("ROLLBACK TO SAVEPOINT sp") - log.warning("Failed to process parking for %s", p.get("imei"), exc_info=True) + inserted += cur.rowcount + except Exception: + cur.execute("ROLLBACK TO SAVEPOINT sp") + log.warning("Failed to process parking for %s", p.get("imei"), exc_info=True) - log_ingestion(cur, "jimi.open.platform.report.parking", len(imeis), 0, inserted, + log_ingestion(cur, "jimi.open.platform.report.parking", total_imei, 0, inserted, int((time.time() - t0) * 1000), True) - log.info("Parking: %d events processed.", inserted) + log.info("Parking: %d events processed across %d target(s).", inserted, len(imeis_by_target)) # ── 5. High-Resolution GPS Trail (Every 30m) — POLL-01 ─────────────────────── diff --git a/ts_shared_rev.py b/ts_shared_rev.py index 251575f..e782faf 100644 --- a/ts_shared_rev.py +++ b/ts_shared_rev.py @@ -48,6 +48,12 @@ APP_KEY = _require_env("TRACKSOLID_APP_KEY") APP_SECRET = _require_env("TRACKSOLID_APP_SECRET") USER_ID = _require_env("TRACKSOLID_USER_ID") TARGET_ACCOUNT = os.getenv("TRACKSOLID_TARGET_ACCOUNT", USER_ID) +# [FIX-M19] Multi-account support: the fleet is split across multiple +# Tracksolid sub-accounts (e.g. fireside, Fireside@HQ, Fireside_MSA). +# TRACKSOLID_TARGETS is a comma-separated list; falls back to TARGET_ACCOUNT. +TARGETS = [ + t.strip() for t in os.getenv("TRACKSOLID_TARGETS", "").split(",") if t.strip() +] or [TARGET_ACCOUNT] PWD_MD5 = _require_env("TRACKSOLID_PWD_MD5") DATABASE_URL = _require_env("DATABASE_URL") API_BASE_URL = os.getenv("TRACKSOLID_API_URL", "https://eu-open.tracksolidpro.com/route/rest") @@ -229,6 +235,23 @@ def get_active_imeis() -> list[str]: cur.execute("SELECT imei FROM tracksolid.devices WHERE enabled_flag = 1") return [r[0] for r in cur.fetchall()] +def get_active_imeis_by_target() -> dict[str, list[str]]: + """[FIX-M19] Group active IMEIs by their Tracksolid sub-account so + endpoints that require an `account`/`target` param (e.g. parking) can + scope per-target calls. IMEIs with a NULL account are bucketed under + the primary TARGET_ACCOUNT as a safe default.""" + with get_conn() as conn: + with conn.cursor() as cur: + cur.execute(""" + SELECT COALESCE(account, %s) AS target, imei + FROM tracksolid.devices + WHERE enabled_flag = 1 + """, (TARGET_ACCOUNT,)) + out: dict[str, list[str]] = {} + for target, imei in cur.fetchall(): + out.setdefault(target, []).append(imei) + return out + def log_ingestion(cur, endpoint: str, imei_count: int, upserted: int, inserted: int, duration_ms: int, success: bool, error_code: str = None, error_msg: str = None): cur.execute(""" INSERT INTO tracksolid.ingestion_log