""" ingest_movement_rev.py — Fireside Communications · Tracksolid Movement Pipeline ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ RESPONSIBILITY: High-frequency telemetry (GPS), trip summaries, and parking. REVISIONS (QA-Verified): [FIX-M01] sync_devices: Atomic transaction for registry sync. [FIX-M03] is_valid_fix: Filters 0.0 coordinates (Zero Island bug). [FIX-M05] Batching: Groups 50 IMEIs per API call (API Limit Compliance). [FIX-M07] Signal Handling: Clean DB pool closure on SIGTERM/SIGINT. [FIX-M08] Atomic Logging: log_ingestion happens within the data transaction. [FIX-11] Uses shared safe_task/setup_shutdown from ts_shared_rev (DRY). [FIX-M09] Trips: Captures runTimeSecond and maxSpeed from API. [FIX-M10] Parking: New poll_parking via jimi.open.platform.report.parking. [FIX-M11] BUG-02: distance_m was stored in millimetres due to erroneous * 1000 on an already-metric API value. Removed multiplication; column renamed to distance_km in migration 04. Both poll_trips and push_trip_report (webhook) corrected. [FIX-M12] BUG-02: Renamed distance_m → distance_km in all SQL to match migration 04 schema change. [FIX-M13] POLL-02: Parking poll was returning 0 rows — added missing acc_type=0 and account params; fixed response field durSecond (was mapped as 'seconds'). [FIX-M14] POLL-01: New poll_track_list() — calls jimi.device.track.list per device every 30 minutes to capture high-resolution GPS waypoints between the 60-second fleet sweep snapshots. Writes to position_history with source='track_list'. Fills gaps in route reconstruction and enables accurate path drawing on maps. [FIX-M15] POLL-03: New get_device_locations() utility — calls jimi.device.location.get for up to 50 specific IMEIs on demand. Used for precision refreshes (alarm enrichment, stale device recovery) without waiting for the next full fleet sweep. [FIX-M18] sync_devices: Pull vehicleName / vehicleNumber / driverName / driverPhone / sim from jimi.track.device.detail first (list endpoint returns null for these even when populated via jimi.open.device.update). [FIX-M19] Multi-account support: the fleet is split across multiple Tracksolid sub-accounts. sync_devices, poll_live_positions and poll_parking now iterate every target in TRACKSOLID_TARGETS and dedupe/scope per-target before writing. [FIX-M20] Trip enrichment: poll_trips now backfills start_geom/end_geom/ route_geom/waypoints_count from position_history at insert time, extracts idleSecond, reverse-geocodes start/end addresses (Nominatim), and caches vehicle_plate from devices. Closes the NULL-column gaps that were inherent to jimi.device.track.mileage (it does not return coordinates, idle, or trip sequence). ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ """ import time from concurrent.futures import ThreadPoolExecutor import schedule from datetime import datetime, timezone, timedelta from psycopg2.extras import execute_values from ts_shared_rev import ( TARGET_ACCOUNT, TARGETS, api_post, get_active_imeis, get_active_imeis_by_target, get_conn, get_token, is_valid_fix, log_ingestion, clean, clean_num, clean_int, clean_ts, get_logger, reverse_geocode, safe_task, setup_shutdown, ) log = get_logger("movement") setup_shutdown(log) # ── 1. Device Registry Sync (Daily) ────────────────────────────────────────── def sync_devices(): log.info("Syncing device registry across %d target(s): %s", len(TARGETS), TARGETS) t0, token = time.time(), get_token() if not token: return # [FIX-M19] Fleet is split across multiple sub-accounts. Aggregate the # device list from every configured target and dedupe by IMEI. devices_by_imei: dict[str, dict] = {} for target in TARGETS: resp = api_post("jimi.user.device.list", {"target": target}, token) if resp.get("code") != 0: log.warning("device.list failed for target=%s: code=%s msg=%s", target, resp.get("code"), resp.get("message")) continue for d in (resp.get("result") or []): imei = d.get("imei") if imei: devices_by_imei[imei] = d devices = list(devices_by_imei.values()) log.info("Aggregated %d unique devices across targets.", len(devices)) upserted = 0 # Fetch per-device detail in parallel — previously an N+1 blocker where # 80 devices × ~300 ms/call ≈ 24 s serial. 8 workers brings it to ~3 s. # Gated at 8 to stay under API rate-limit (1006) headroom. def _fetch_detail(imei: str) -> dict: detail_resp = api_post("jimi.track.device.detail", {"imei": imei}, token) return detail_resp.get("result") or {} if detail_resp.get("code") == 0 else {} imeis = [d.get("imei") for d in devices if d.get("imei")] with ThreadPoolExecutor(max_workers=8) as pool: details = dict(zip(imeis, pool.map(_fetch_detail, imeis))) with get_conn() as conn: with conn.cursor() as cur: for d in devices: imei = d.get("imei") if not imei: continue dtl = details.get(imei, {}) cur.execute(""" INSERT INTO tracksolid.devices ( imei, device_name, mc_type, mc_type_use_scope, vehicle_name, vehicle_number, vehicle_models, vehicle_icon, vin, engine_number, vehicle_brand, fuel_100km, driver_name, driver_phone, sim, iccid, imsi, account, customer_name, device_group_id, device_group, activation_time, expiration, enabled_flag, status, current_mileage_km, last_synced_at ) VALUES ( %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW() ) ON CONFLICT (imei) DO UPDATE SET device_name = EXCLUDED.device_name, mc_type = EXCLUDED.mc_type, mc_type_use_scope = EXCLUDED.mc_type_use_scope, vehicle_name = EXCLUDED.vehicle_name, vehicle_number = EXCLUDED.vehicle_number, vehicle_models = EXCLUDED.vehicle_models, vehicle_icon = EXCLUDED.vehicle_icon, vin = EXCLUDED.vin, engine_number = EXCLUDED.engine_number, vehicle_brand = EXCLUDED.vehicle_brand, fuel_100km = EXCLUDED.fuel_100km, driver_name = EXCLUDED.driver_name, driver_phone = EXCLUDED.driver_phone, sim = EXCLUDED.sim, iccid = EXCLUDED.iccid, imsi = EXCLUDED.imsi, account = EXCLUDED.account, customer_name = EXCLUDED.customer_name, device_group_id = EXCLUDED.device_group_id, device_group = EXCLUDED.device_group, activation_time = EXCLUDED.activation_time, expiration = EXCLUDED.expiration, enabled_flag = EXCLUDED.enabled_flag, status = EXCLUDED.status, current_mileage_km = EXCLUDED.current_mileage_km, last_synced_at = NOW(), updated_at = NOW() """, ( # [FIX-M18] vehicleName/vehicleNumber/driverName/driverPhone/sim # only surface via jimi.track.device.detail — list returns null. imei, clean(d.get("deviceName")), clean(d.get("mcType")), clean(d.get("mcTypeUseScope")), clean(dtl.get("vehicleName") or d.get("vehicleName")), clean(dtl.get("vehicleNumber") or d.get("vehicleNumber")), clean(d.get("vehicleModels")), clean(d.get("vehicleIcon")), clean(dtl.get("vin")), clean(dtl.get("engineNumber")), clean(dtl.get("vehicleBrand")), clean_num(dtl.get("fuel_100km")), clean(dtl.get("driverName") or d.get("driverName")), clean(dtl.get("driverPhone") or d.get("driverPhone")), clean(dtl.get("sim") or d.get("sim")), clean(dtl.get("iccid")), clean(dtl.get("imsi")), clean(dtl.get("account")), clean(dtl.get("customerName")), clean(d.get("deviceGroupId")), clean(d.get("deviceGroup")), clean_ts(d.get("activationTime")), clean_ts(d.get("expiration")), clean_int(d.get("enabledFlag", 1)), clean(dtl.get("status", "active")), clean_num(dtl.get("currentMileage")) )) upserted += 1 log_ingestion(cur, "jimi.user.device.list+detail", len(devices), upserted, 0, int((time.time()-t0)*1000), True) conn.commit() log.info("Registry sync: %d devices updated.", upserted) # ── 2. Live Positions (Every 60s) ───────────────────────────────────────────── def poll_live_positions(): t0, token = time.time(), get_token() if not token: return # [FIX-M19] Iterate every target and dedupe by IMEI (keep last). positions_by_imei: dict[str, dict] = {} for target in TARGETS: resp = api_post("jimi.user.device.location.list", {"target": target, "map_type": "GOOGLE"}, token) if resp.get("code") != 0: log.warning("location.list failed for target=%s: code=%s msg=%s", target, resp.get("code"), resp.get("message")) continue for p in (resp.get("result") or []): imei = p.get("imei") if imei: positions_by_imei[imei] = p positions = list(positions_by_imei.values()) upserted, inserted = 0, 0 with get_conn() as conn: with conn.cursor() as cur: for p in positions: try: cur.execute("SAVEPOINT sp") imei, lat, lng = p.get("imei"), clean_num(p.get("lat")), clean_num(p.get("lng")) if not imei or not is_valid_fix(lat, lng): cur.execute("RELEASE SAVEPOINT sp") continue gps_time = clean_ts(p.get("gpsTime")) speed = clean_num(p.get("speed")) direction = clean_num(p.get("direction")) acc_status = clean(p.get("accStatus")) gps_num = clean_int(p.get("gpsNum")) current_mileage = clean_num(p.get("currentMileage")) cur.execute(""" INSERT INTO tracksolid.live_positions ( imei, geom, lat, lng, pos_type, confidence, gps_time, hb_time, speed, direction, acc_status, gps_signal, gps_num, elec_quantity, power_value, battery_power_val, tracker_oil, temperature, current_mileage, device_status, loc_desc, recorded_at ) VALUES ( %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326), %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW() ) ON CONFLICT (imei) DO UPDATE SET geom=EXCLUDED.geom, lat=EXCLUDED.lat, lng=EXCLUDED.lng, gps_time=EXCLUDED.gps_time, speed=EXCLUDED.speed, direction=EXCLUDED.direction, acc_status=EXCLUDED.acc_status, current_mileage=EXCLUDED.current_mileage, updated_at=NOW() """, ( imei, lng, lat, lat, lng, clean(p.get("posType")), clean_int(p.get("confidence")), gps_time, clean_ts(p.get("hbTime")), speed, direction, acc_status, clean_int(p.get("gpsSignal")), gps_num, clean_num(p.get("electQuantity")), clean_num(p.get("powerValue")), clean_num(p.get("batteryPowerVal")), clean(p.get("trackerOil")), clean_num(p.get("temperature")), current_mileage, clean(p.get("status")), clean(p.get("locDesc")) )) upserted += cur.rowcount # History (Hypertable Source) if gps_time: cur.execute(""" INSERT INTO tracksolid.position_history (imei, gps_time, geom, lat, lng, speed, direction, acc_status, satellite, current_mileage) VALUES (%s, %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326), %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (imei, gps_time) DO NOTHING """, (imei, gps_time, lng, lat, lat, lng, speed, direction, acc_status, gps_num, current_mileage)) inserted += cur.rowcount cur.execute("RELEASE SAVEPOINT sp") except Exception: cur.execute("ROLLBACK TO SAVEPOINT sp") log.warning("Failed to process live position for %s", p.get("imei"), exc_info=True) log_ingestion(cur, "jimi.user.device.location.list", len(positions), upserted, inserted, int((time.time()-t0)*1000), True) # ── 3. Trip Reports (Every 15m) ─────────────────────────────────────────────── # [FIX-M20] Migration 09 added route_geom, start/end_address, vehicle_plate, # waypoints_count to tracksolid.trips. poll_trips now enriches every poll- # ingested trip from position_history (start/end fix + LineString polyline) # and reverse-geocodes the endpoints, since jimi.device.track.mileage does # not return coordinates. ON CONFLICT preserves webhook-supplied data when # /pushtripreport later delivers native coords. # Per-trip enrichment from position_history. Four readable scalar subqueries # rather than a tighter CTE — runs sub-ms each given the (imei, gps_time) PK, # and the readable form survives the edge case where start_time is just # before the first available fix in the window (single CTE bounded by # BETWEEN would return NULL there). _ENRICH_QUERY = """ SELECT (SELECT geom FROM tracksolid.position_history WHERE imei = %s AND gps_time >= %s ORDER BY gps_time ASC LIMIT 1) AS start_geom, (SELECT ST_Y(geom) FROM tracksolid.position_history WHERE imei = %s AND gps_time >= %s ORDER BY gps_time ASC LIMIT 1) AS start_lat, (SELECT ST_X(geom) FROM tracksolid.position_history WHERE imei = %s AND gps_time >= %s ORDER BY gps_time ASC LIMIT 1) AS start_lng, (SELECT geom FROM tracksolid.position_history WHERE imei = %s AND gps_time <= %s ORDER BY gps_time DESC LIMIT 1) AS end_geom, (SELECT ST_Y(geom) FROM tracksolid.position_history WHERE imei = %s AND gps_time <= %s ORDER BY gps_time DESC LIMIT 1) AS end_lat, (SELECT ST_X(geom) FROM tracksolid.position_history WHERE imei = %s AND gps_time <= %s ORDER BY gps_time DESC LIMIT 1) AS end_lng, (SELECT ST_MakeLine(geom ORDER BY gps_time) FROM tracksolid.position_history WHERE imei = %s AND gps_time BETWEEN %s AND %s AND geom IS NOT NULL) AS route_geom, (SELECT COUNT(*) FROM tracksolid.position_history WHERE imei = %s AND gps_time BETWEEN %s AND %s) AS waypoints_count """ def _load_plates_cache(cur) -> dict[str, str]: """Build {imei: vehicle_number} for active devices once per poll cycle.""" cur.execute(""" SELECT imei, vehicle_number FROM tracksolid.devices WHERE enabled_flag = 1 AND vehicle_number IS NOT NULL """) return {imei: plate for imei, plate in cur.fetchall()} def poll_trips(): t0 = time.time() token, imeis = get_token(), get_active_imeis() if not token or not imeis: return end_ts = datetime.now(timezone.utc) start_ts = end_ts - timedelta(hours=1) inserted = 0 with get_conn() as conn: with conn.cursor() as cur: plates = _load_plates_cache(cur) for i in range(0, len(imeis), 50): batch = imeis[i:i+50] resp = api_post("jimi.device.track.mileage", { "imeis": ",".join(batch), "begin_time": start_ts.strftime("%Y-%m-%d %H:%M:%S"), "end_time": end_ts.strftime("%Y-%m-%d %H:%M:%S") }, token) trips = resp.get("result") or [] for t in trips: try: cur.execute("SAVEPOINT sp") # [FIX-M16] API returns distance in METRES despite documentation saying km. # Confirmed via: avgSpeed(km/h) × runTimeSecond / 3600 == distance/1000. # startMileage/endMileage are cumulative odometer in metres (same unit). # Divide by 1000 to store as distance_km. raw_dist = clean_num(t.get("distance")) dist_km = round(raw_dist / 1000.0, 4) if raw_dist is not None else None imei = t.get("imei") trip_start = clean_ts(t.get("startTime")) trip_end = clean_ts(t.get("endTime")) idle_s = clean_int(t.get("idleSecond")) # [FIX-M20] Enrich from position_history. trip_start/end # may be None (rare malformed payload) — skip enrichment # in that case so we still capture the row. start_geom = end_geom = route_geom = None start_lat = start_lng = end_lat = end_lng = None waypoints_count = 0 if trip_start and trip_end: cur.execute(_ENRICH_QUERY, ( imei, trip_start, # start_geom imei, trip_start, # start_lat imei, trip_start, # start_lng imei, trip_end, # end_geom imei, trip_end, # end_lat imei, trip_end, # end_lng imei, trip_start, trip_end, # route_geom imei, trip_start, trip_end, # waypoints_count )) (start_geom, start_lat, start_lng, end_geom, end_lat, end_lng, route_geom, waypoints_count) = cur.fetchone() start_address = reverse_geocode(start_lat, start_lng) end_address = reverse_geocode(end_lat, end_lng) vehicle_plate = plates.get(imei) cur.execute(""" INSERT INTO tracksolid.trips ( imei, start_time, end_time, distance_km, avg_speed_kmh, max_speed_kmh, driving_time_s, idle_time_s, start_geom, end_geom, route_geom, waypoints_count, start_address, end_address, vehicle_plate, source ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 'poll') ON CONFLICT (imei, start_time) DO UPDATE SET end_time = EXCLUDED.end_time, distance_km = EXCLUDED.distance_km, max_speed_kmh = COALESCE(EXCLUDED.max_speed_kmh, tracksolid.trips.max_speed_kmh), driving_time_s = COALESCE(EXCLUDED.driving_time_s, tracksolid.trips.driving_time_s), idle_time_s = COALESCE(EXCLUDED.idle_time_s, tracksolid.trips.idle_time_s), start_geom = COALESCE(tracksolid.trips.start_geom, EXCLUDED.start_geom), end_geom = COALESCE(EXCLUDED.end_geom, tracksolid.trips.end_geom), route_geom = COALESCE(EXCLUDED.route_geom, tracksolid.trips.route_geom), waypoints_count = EXCLUDED.waypoints_count, start_address = COALESCE(tracksolid.trips.start_address, EXCLUDED.start_address), end_address = COALESCE(EXCLUDED.end_address, tracksolid.trips.end_address), vehicle_plate = COALESCE(EXCLUDED.vehicle_plate, tracksolid.trips.vehicle_plate) """, ( imei, trip_start, trip_end, dist_km, clean_num(t.get("avgSpeed")), clean_num(t.get("maxSpeed")), clean_int(t.get("runTimeSecond")), idle_s, start_geom, end_geom, route_geom, waypoints_count, start_address, end_address, vehicle_plate, )) cur.execute("RELEASE SAVEPOINT sp") inserted += cur.rowcount except Exception: cur.execute("ROLLBACK TO SAVEPOINT sp") log.warning("Failed to process trip for %s", t.get("imei"), exc_info=True) log_ingestion(cur, "jimi.device.track.mileage", len(imeis), 0, inserted, int((time.time() - t0) * 1000), True) log.info("Trips: %d records processed.", inserted) # ── 4. Parking Events (Every 15m) ───────────────────────────────────────────── def poll_parking(): t0 = time.time() # [FIX-M19] Parking requires an `account` param tied to the IMEI's # sub-account — call per target with that target's IMEIs only. token, imeis_by_target = get_token(), get_active_imeis_by_target() if not token or not imeis_by_target: return end_ts = datetime.now(timezone.utc) start_ts = end_ts - timedelta(hours=1) total_imei = sum(len(v) for v in imeis_by_target.values()) inserted = 0 with get_conn() as conn: with conn.cursor() as cur: for target, target_imeis in imeis_by_target.items(): for i in range(0, len(target_imeis), 50): batch = target_imeis[i:i+50] # [FIX-M13] account + acc_type=0 required for non-empty results. resp = api_post("jimi.open.platform.report.parking", { "account": target, "imeis": ",".join(batch), "begin_time": start_ts.strftime("%Y-%m-%d %H:%M:%S"), "end_time": end_ts.strftime("%Y-%m-%d %H:%M:%S"), "acc_type": 0, }, token) events = resp.get("result") or [] for p in events: try: cur.execute("SAVEPOINT sp") imei = p.get("imei") start_time = clean_ts(p.get("startTime")) if not imei or not start_time: cur.execute("RELEASE SAVEPOINT sp") continue lat, lng = clean_num(p.get("lat")), clean_num(p.get("lng")) cur.execute(""" INSERT INTO tracksolid.parking_events ( imei, event_type, start_time, end_time, duration_seconds, geom, address ) VALUES ( %s, 'parking', %s, %s, %s, CASE WHEN %s IS NOT NULL AND %s IS NOT NULL THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326) ELSE NULL END, %s ) ON CONFLICT (imei, start_time, event_type) DO NOTHING """, ( imei, start_time, clean_ts(p.get("endTime")), clean_int(p.get("durSecond")), # [FIX-M13] API returns durSecond, not seconds lng, lat, lng, lat, clean(p.get("address")) )) cur.execute("RELEASE SAVEPOINT sp") inserted += cur.rowcount except Exception: cur.execute("ROLLBACK TO SAVEPOINT sp") log.warning("Failed to process parking for %s", p.get("imei"), exc_info=True) log_ingestion(cur, "jimi.open.platform.report.parking", total_imei, 0, inserted, int((time.time() - t0) * 1000), True) log.info("Parking: %d events processed across %d target(s).", inserted, len(imeis_by_target)) # ── 5. High-Resolution GPS Trail (Every 30m) — POLL-01 ─────────────────────── def poll_track_list(): """[FIX-M14] Fetch per-device GPS waypoint trails via jimi.device.track.list. The 60-second fleet sweep (poll_live_positions) captures only the most recent fix per vehicle — all motion between sweeps is invisible. This function retrieves every waypoint the device logged in the last 35 minutes (5-min overlap ensures no gaps at scheduling boundaries) and inserts them into position_history with source='track_list'. Impact on reporting: - position_history row density increases from ~1/min to ~1–4/min per device - Route traces in Grafana become accurate continuous paths - Speed profile queries gain meaningful resolution (avg over 10s intervals vs 60s intervals) — enables hard-braking / harsh-acceleration detection - v_mileage_daily_cagg continuous aggregate gains finer odometer deltas """ t0 = time.time() token, imeis = get_token(), get_active_imeis() if not token or not imeis: return end_ts = datetime.now(timezone.utc) start_ts = end_ts - timedelta(minutes=35) # 5-min overlap avoids boundary gaps begin_str = start_ts.strftime("%Y-%m-%d %H:%M:%S") end_str = end_ts.strftime("%Y-%m-%d %H:%M:%S") # Phase 1: fetch waypoints from API without holding a DB connection. # jimi.device.track.list is per-IMEI; parallelise at 4 workers to speed # up the 30 min sweep without tripping the 1006 rate limit. def _fetch(imei: str): resp = api_post("jimi.device.track.list", { "imei": imei, "begin_time": begin_str, "end_time": end_str, "map_type": "GOOGLE", }, token) return imei, resp.get("result") or [] with ThreadPoolExecutor(max_workers=4) as pool: fetched = list(pool.map(_fetch, imeis)) # Phase 2: write rows in one DB transaction. total_inserted = 0 devices_with_data = 0 rows = [] for imei, waypoints in fetched: device_rows = 0 for wp in waypoints: lat = clean_num(wp.get("lat")) lng = clean_num(wp.get("lng")) gps_time = clean_ts(wp.get("gpsTime")) if not is_valid_fix(lat, lng) or not gps_time: continue rows.append(( imei, gps_time, lng, lat, # ST_MakePoint(lng, lat) lat, lng, # lat, lng columns clean_num(wp.get("gpsSpeed")), clean_num(wp.get("direction")), clean(wp.get("accStatus")), )) device_rows += 1 if device_rows: devices_with_data += 1 if rows: with get_conn() as conn: with conn.cursor() as cur: execute_values( cur, """ INSERT INTO tracksolid.position_history ( imei, gps_time, geom, lat, lng, speed, direction, acc_status, source ) VALUES %s ON CONFLICT (imei, gps_time) DO NOTHING """, rows, template="(%s, %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326)," " %s, %s, %s, %s, %s, 'track_list')", page_size=500, ) total_inserted = cur.rowcount log_ingestion(cur, "jimi.device.track.list", len(imeis), 0, total_inserted, int((time.time() - t0) * 1000), True) else: with get_conn() as conn: with conn.cursor() as cur: log_ingestion(cur, "jimi.device.track.list", len(imeis), 0, 0, int((time.time() - t0) * 1000), True) log.info("Track list: %d waypoints inserted across %d/%d devices.", total_inserted, devices_with_data, len(imeis)) # ── 6. On-Demand Device Location Refresh — POLL-03 ─────────────────────────── def get_device_locations(imeis: list) -> int: """[FIX-M15] Precision position refresh for a specific list of IMEIs. Calls jimi.device.location.get (up to 50 IMEIs per call) and upserts results into live_positions. Use this for: - Alarm enrichment: get exact position immediately after an alarm fires - Stale device recovery: force-refresh a vehicle that has been offline - Dashboard on-demand refresh without waiting for the 60s fleet sweep Returns the number of positions successfully upserted. """ token = get_token() if not token or not imeis: return 0 upserted = 0 with get_conn() as conn: with conn.cursor() as cur: for i in range(0, len(imeis), 50): batch = imeis[i:i + 50] resp = api_post("jimi.device.location.get", { "imeis": ",".join(batch), "map_type": "GOOGLE", }, token) positions = resp.get("result") or [] for p in positions: imei = p.get("imei") lat = clean_num(p.get("lat")) lng = clean_num(p.get("lng")) if not imei or not is_valid_fix(lat, lng): continue cur.execute(""" INSERT INTO tracksolid.live_positions ( imei, geom, lat, lng, speed, direction, gps_time, acc_status, current_mileage, recorded_at ) VALUES ( %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326), %s, %s, %s, %s, %s, %s, %s, NOW() ) ON CONFLICT (imei) DO UPDATE SET geom = EXCLUDED.geom, lat = EXCLUDED.lat, lng = EXCLUDED.lng, speed = EXCLUDED.speed, direction = EXCLUDED.direction, gps_time = EXCLUDED.gps_time, acc_status = EXCLUDED.acc_status, current_mileage = EXCLUDED.current_mileage, updated_at = NOW() """, ( imei, lng, lat, lat, lng, clean_num(p.get("speed")), clean_num(p.get("direction")), clean_ts(p.get("gpsTime")), clean(p.get("accStatus")), clean_num(p.get("currentMileage")), )) upserted += 1 conn.commit() log.info("get_device_locations: %d positions refreshed.", upserted) return upserted # ── Main Loop ───────────────────────────────────────────────────────────────── def main(): log.info("Starting MOVEMENT PIPELINE (v2.2)...") # Startup catch-up safe_task(sync_devices, log)() safe_task(poll_live_positions, log)() safe_task(poll_trips, log)() safe_task(poll_parking, log)() safe_task(poll_track_list, log)() # Schedule schedule.every(60).seconds.do(safe_task(poll_live_positions, log)) schedule.every(15).minutes.do(safe_task(poll_trips, log)) schedule.every(15).minutes.do(safe_task(poll_parking, log)) schedule.every(30).minutes.do(safe_task(poll_track_list, log)) # [FIX-M14] schedule.every().day.at("02:00").do(safe_task(sync_devices, log)) while True: schedule.run_pending() time.sleep(1) if __name__ == "__main__": main()