feat: [FIX-M19] multi-account ingest across fireside sub-accounts
Some checks failed
Static Analysis / static (push) Waiting to run
Tests / test (push) Waiting to run
Static Analysis / static (pull_request) Has been cancelled
Tests / test (pull_request) Has been cancelled

Fleet lives across three Tracksolid sub-accounts:
  fireside         —  63 devices
  Fireside@HQ      —  52 devices
  Fireside_MSA     —  41 devices

Previously sync_devices / poll_live_positions / poll_parking only
queried a single TARGET_ACCOUNT, so ~64% of the fleet was invisible to
the pipeline.

Changes:
  - ts_shared_rev.py: new TARGETS list (env TRACKSOLID_TARGETS,
    comma-separated; falls back to the single TARGET_ACCOUNT).
  - ts_shared_rev.py: new get_active_imeis_by_target() helper that
    groups active IMEIs by their stored account so parking calls can
    pass the right account param per batch.
  - ingest_movement_rev.py: sync_devices and poll_live_positions loop
    over every target and dedupe by IMEI before upserting. poll_parking
    loops over imeis_by_target so each batch carries the matching
    account.
  - CLAUDE.md: FIX-M19 entry.

Requires new env var TRACKSOLID_TARGETS="fireside,Fireside@HQ,Fireside_MSA"
on the ingest services in Coolify.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
David Kiania 2026-04-24 10:43:07 +03:00
parent 417627675e
commit fa110f4313
3 changed files with 109 additions and 53 deletions

View file

@ -172,6 +172,7 @@ All views carry a `COMMENT ON VIEW` referencing their spec — `\d+ tracksolid.v
| FIX-M16 | `ingest_movement_rev.py` | `distance` from API is metres → divide by 1000 before storing | | FIX-M16 | `ingest_movement_rev.py` | `distance` from API is metres → divide by 1000 before storing |
| FIX-M17 | `ingest_movement_rev.py` | `sync_devices()` ON CONFLICT now updates all 26 fields (was 5) | | FIX-M17 | `ingest_movement_rev.py` | `sync_devices()` ON CONFLICT now updates all 26 fields (was 5) |
| FIX-M18 | `ingest_movement_rev.py` | `sync_devices()` pulls `vehicleName`/`vehicleNumber`/`driverName`/`driverPhone`/`sim` from `jimi.track.device.detail` — list endpoint returns null for these even when set | | FIX-M18 | `ingest_movement_rev.py` | `sync_devices()` pulls `vehicleName`/`vehicleNumber`/`driverName`/`driverPhone`/`sim` from `jimi.track.device.detail` — list endpoint returns null for these even when set |
| FIX-M19 | `ts_shared_rev.py`, `ingest_movement_rev.py` | Multi-account support: fleet spans `fireside`, `Fireside@HQ`, `Fireside_MSA` (156 devices total). `sync_devices`, `poll_live_positions`, `poll_parking` iterate `TRACKSOLID_TARGETS` (comma-separated env var). New helper `get_active_imeis_by_target()` scopes parking calls to the right account |
| FIX-E06 | `ingest_events_rev.py` | Alarm field mapping: `alertTypeId`/`alarmTypeName`/`alertTime` | | FIX-E06 | `ingest_events_rev.py` | Alarm field mapping: `alertTypeId`/`alarmTypeName`/`alertTime` |
| BUG-02 | Migration 04 | Historical `distance_m` rows ÷1,000,000 → renamed to `distance_km` | | BUG-02 | Migration 04 | Historical `distance_m` rows ÷1,000,000 → renamed to `distance_km` |

View file

@ -34,6 +34,10 @@ REVISIONS (QA-Verified):
driverPhone / sim from jimi.track.device.detail first (list driverPhone / sim from jimi.track.device.detail first (list
endpoint returns null for these even when populated via endpoint returns null for these even when populated via
jimi.open.device.update). jimi.open.device.update).
[FIX-M19] Multi-account support: the fleet is split across multiple
Tracksolid sub-accounts. sync_devices, poll_live_positions
and poll_parking now iterate every target in TRACKSOLID_TARGETS
and dedupe/scope per-target before writing.
""" """
@ -46,8 +50,10 @@ from psycopg2.extras import execute_values
from ts_shared_rev import ( from ts_shared_rev import (
TARGET_ACCOUNT, TARGET_ACCOUNT,
TARGETS,
api_post, api_post,
get_active_imeis, get_active_imeis,
get_active_imeis_by_target,
get_conn, get_conn,
get_token, get_token,
is_valid_fix, is_valid_fix,
@ -67,14 +73,26 @@ setup_shutdown(log)
# ── 1. Device Registry Sync (Daily) ────────────────────────────────────────── # ── 1. Device Registry Sync (Daily) ──────────────────────────────────────────
def sync_devices(): def sync_devices():
log.info("Syncing device registry...") log.info("Syncing device registry across %d target(s): %s", len(TARGETS), TARGETS)
t0, token = time.time(), get_token() t0, token = time.time(), get_token()
if not token: return if not token: return
resp = api_post("jimi.user.device.list", {"target": TARGET_ACCOUNT}, token) # [FIX-M19] Fleet is split across multiple sub-accounts. Aggregate the
if resp.get("code") != 0: return # device list from every configured target and dedupe by IMEI.
devices_by_imei: dict[str, dict] = {}
for target in TARGETS:
resp = api_post("jimi.user.device.list", {"target": target}, token)
if resp.get("code") != 0:
log.warning("device.list failed for target=%s: code=%s msg=%s",
target, resp.get("code"), resp.get("message"))
continue
for d in (resp.get("result") or []):
imei = d.get("imei")
if imei:
devices_by_imei[imei] = d
devices = resp.get("result") or [] devices = list(devices_by_imei.values())
log.info("Aggregated %d unique devices across targets.", len(devices))
upserted = 0 upserted = 0
# Fetch per-device detail in parallel — previously an N+1 blocker where # Fetch per-device detail in parallel — previously an N+1 blocker where
@ -165,10 +183,21 @@ def poll_live_positions():
t0, token = time.time(), get_token() t0, token = time.time(), get_token()
if not token: return if not token: return
resp = api_post("jimi.user.device.location.list", {"target": TARGET_ACCOUNT, "map_type": "GOOGLE"}, token) # [FIX-M19] Iterate every target and dedupe by IMEI (keep last).
if resp.get("code") != 0: return positions_by_imei: dict[str, dict] = {}
for target in TARGETS:
resp = api_post("jimi.user.device.location.list",
{"target": target, "map_type": "GOOGLE"}, token)
if resp.get("code") != 0:
log.warning("location.list failed for target=%s: code=%s msg=%s",
target, resp.get("code"), resp.get("message"))
continue
for p in (resp.get("result") or []):
imei = p.get("imei")
if imei:
positions_by_imei[imei] = p
positions = resp.get("result") or [] positions = list(positions_by_imei.values())
upserted, inserted = 0, 0 upserted, inserted = 0, 0
with get_conn() as conn: with get_conn() as conn:
@ -288,63 +317,66 @@ def poll_trips():
def poll_parking(): def poll_parking():
t0 = time.time() t0 = time.time()
token, imeis = get_token(), get_active_imeis() # [FIX-M19] Parking requires an `account` param tied to the IMEI's
if not token or not imeis: return # sub-account — call per target with that target's IMEIs only.
token, imeis_by_target = get_token(), get_active_imeis_by_target()
if not token or not imeis_by_target: return
end_ts = datetime.now(timezone.utc) end_ts = datetime.now(timezone.utc)
start_ts = end_ts - timedelta(hours=1) start_ts = end_ts - timedelta(hours=1)
total_imei = sum(len(v) for v in imeis_by_target.values())
inserted = 0 inserted = 0
with get_conn() as conn: with get_conn() as conn:
with conn.cursor() as cur: with conn.cursor() as cur:
for i in range(0, len(imeis), 50): for target, target_imeis in imeis_by_target.items():
batch = imeis[i:i+50] for i in range(0, len(target_imeis), 50):
# [FIX-M13] Added account + acc_type=0 (all stop types). Without these batch = target_imeis[i:i+50]
# the API returns empty results even when parking events exist. # [FIX-M13] account + acc_type=0 required for non-empty results.
resp = api_post("jimi.open.platform.report.parking", { resp = api_post("jimi.open.platform.report.parking", {
"account": TARGET_ACCOUNT, "account": target,
"imeis": ",".join(batch), "imeis": ",".join(batch),
"begin_time": start_ts.strftime("%Y-%m-%d %H:%M:%S"), "begin_time": start_ts.strftime("%Y-%m-%d %H:%M:%S"),
"end_time": end_ts.strftime("%Y-%m-%d %H:%M:%S"), "end_time": end_ts.strftime("%Y-%m-%d %H:%M:%S"),
"acc_type": 0, "acc_type": 0,
}, token) }, token)
events = resp.get("result") or [] events = resp.get("result") or []
for p in events: for p in events:
try: try:
cur.execute("SAVEPOINT sp") cur.execute("SAVEPOINT sp")
imei = p.get("imei") imei = p.get("imei")
start_time = clean_ts(p.get("startTime")) start_time = clean_ts(p.get("startTime"))
if not imei or not start_time: if not imei or not start_time:
cur.execute("RELEASE SAVEPOINT sp")
continue
lat, lng = clean_num(p.get("lat")), clean_num(p.get("lng"))
cur.execute("""
INSERT INTO tracksolid.parking_events (
imei, event_type, start_time, end_time,
duration_seconds, geom, address
) VALUES (
%s, 'parking', %s, %s, %s,
CASE WHEN %s IS NOT NULL AND %s IS NOT NULL
THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326)
ELSE NULL END,
%s
) ON CONFLICT (imei, start_time, event_type) DO NOTHING
""", (
imei, start_time, clean_ts(p.get("endTime")),
clean_int(p.get("durSecond")), # [FIX-M13] API returns durSecond, not seconds
lng, lat, lng, lat,
clean(p.get("address"))
))
cur.execute("RELEASE SAVEPOINT sp") cur.execute("RELEASE SAVEPOINT sp")
continue inserted += cur.rowcount
lat, lng = clean_num(p.get("lat")), clean_num(p.get("lng")) except Exception:
cur.execute(""" cur.execute("ROLLBACK TO SAVEPOINT sp")
INSERT INTO tracksolid.parking_events ( log.warning("Failed to process parking for %s", p.get("imei"), exc_info=True)
imei, event_type, start_time, end_time,
duration_seconds, geom, address
) VALUES (
%s, 'parking', %s, %s, %s,
CASE WHEN %s IS NOT NULL AND %s IS NOT NULL
THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326)
ELSE NULL END,
%s
) ON CONFLICT (imei, start_time, event_type) DO NOTHING
""", (
imei, start_time, clean_ts(p.get("endTime")),
clean_int(p.get("durSecond")), # [FIX-M13] API returns durSecond, not seconds
lng, lat, lng, lat,
clean(p.get("address"))
))
cur.execute("RELEASE SAVEPOINT sp")
inserted += cur.rowcount
except Exception:
cur.execute("ROLLBACK TO SAVEPOINT sp")
log.warning("Failed to process parking for %s", p.get("imei"), exc_info=True)
log_ingestion(cur, "jimi.open.platform.report.parking", len(imeis), 0, inserted, log_ingestion(cur, "jimi.open.platform.report.parking", total_imei, 0, inserted,
int((time.time() - t0) * 1000), True) int((time.time() - t0) * 1000), True)
log.info("Parking: %d events processed.", inserted) log.info("Parking: %d events processed across %d target(s).", inserted, len(imeis_by_target))
# ── 5. High-Resolution GPS Trail (Every 30m) — POLL-01 ─────────────────────── # ── 5. High-Resolution GPS Trail (Every 30m) — POLL-01 ───────────────────────

View file

@ -48,6 +48,12 @@ APP_KEY = _require_env("TRACKSOLID_APP_KEY")
APP_SECRET = _require_env("TRACKSOLID_APP_SECRET") APP_SECRET = _require_env("TRACKSOLID_APP_SECRET")
USER_ID = _require_env("TRACKSOLID_USER_ID") USER_ID = _require_env("TRACKSOLID_USER_ID")
TARGET_ACCOUNT = os.getenv("TRACKSOLID_TARGET_ACCOUNT", USER_ID) TARGET_ACCOUNT = os.getenv("TRACKSOLID_TARGET_ACCOUNT", USER_ID)
# [FIX-M19] Multi-account support: the fleet is split across multiple
# Tracksolid sub-accounts (e.g. fireside, Fireside@HQ, Fireside_MSA).
# TRACKSOLID_TARGETS is a comma-separated list; falls back to TARGET_ACCOUNT.
TARGETS = [
t.strip() for t in os.getenv("TRACKSOLID_TARGETS", "").split(",") if t.strip()
] or [TARGET_ACCOUNT]
PWD_MD5 = _require_env("TRACKSOLID_PWD_MD5") PWD_MD5 = _require_env("TRACKSOLID_PWD_MD5")
DATABASE_URL = _require_env("DATABASE_URL") DATABASE_URL = _require_env("DATABASE_URL")
API_BASE_URL = os.getenv("TRACKSOLID_API_URL", "https://eu-open.tracksolidpro.com/route/rest") API_BASE_URL = os.getenv("TRACKSOLID_API_URL", "https://eu-open.tracksolidpro.com/route/rest")
@ -229,6 +235,23 @@ def get_active_imeis() -> list[str]:
cur.execute("SELECT imei FROM tracksolid.devices WHERE enabled_flag = 1") cur.execute("SELECT imei FROM tracksolid.devices WHERE enabled_flag = 1")
return [r[0] for r in cur.fetchall()] return [r[0] for r in cur.fetchall()]
def get_active_imeis_by_target() -> dict[str, list[str]]:
"""[FIX-M19] Group active IMEIs by their Tracksolid sub-account so
endpoints that require an `account`/`target` param (e.g. parking) can
scope per-target calls. IMEIs with a NULL account are bucketed under
the primary TARGET_ACCOUNT as a safe default."""
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT COALESCE(account, %s) AS target, imei
FROM tracksolid.devices
WHERE enabled_flag = 1
""", (TARGET_ACCOUNT,))
out: dict[str, list[str]] = {}
for target, imei in cur.fetchall():
out.setdefault(target, []).append(imei)
return out
def log_ingestion(cur, endpoint: str, imei_count: int, upserted: int, inserted: int, duration_ms: int, success: bool, error_code: str = None, error_msg: str = None): def log_ingestion(cur, endpoint: str, imei_count: int, upserted: int, inserted: int, duration_ms: int, success: bool, error_code: str = None, error_msg: str = None):
cur.execute(""" cur.execute("""
INSERT INTO tracksolid.ingestion_log INSERT INTO tracksolid.ingestion_log