""" ingest_movement_rev.py — Fireside Communications · Tracksolid Movement Pipeline ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ RESPONSIBILITY: High-frequency telemetry (GPS), trip summaries, and parking. REVISIONS (QA-Verified): [FIX-M01] sync_devices: Atomic transaction for registry sync. [FIX-M03] is_valid_fix: Filters 0.0 coordinates (Zero Island bug). [FIX-M05] Batching: Groups 50 IMEIs per API call (API Limit Compliance). [FIX-M07] Signal Handling: Clean DB pool closure on SIGTERM/SIGINT. [FIX-M08] Atomic Logging: log_ingestion happens within the data transaction. [FIX-11] Uses shared safe_task/setup_shutdown from ts_shared_rev (DRY). [FIX-M09] Trips: Captures runTimeSecond and maxSpeed from API. [FIX-M10] Parking: New poll_parking via jimi.open.platform.report.parking. [FIX-M11] BUG-02: distance_m was stored in millimetres due to erroneous * 1000 on an already-metric API value. Removed multiplication; column renamed to distance_km in migration 04. Both poll_trips and push_trip_report (webhook) corrected. [FIX-M12] BUG-02: Renamed distance_m → distance_km in all SQL to match migration 04 schema change. [FIX-M13] POLL-02: Parking poll was returning 0 rows — added missing acc_type=0 and account params; fixed response field durSecond (was mapped as 'seconds'). ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ """ import time import schedule from datetime import datetime, timezone, timedelta from ts_shared_rev import ( TARGET_ACCOUNT, api_post, get_active_imeis, get_conn, get_token, is_valid_fix, log_ingestion, clean, clean_num, clean_int, clean_ts, get_logger, safe_task, setup_shutdown, ) log = get_logger("movement") setup_shutdown(log) # ── 1. Device Registry Sync (Daily) ────────────────────────────────────────── def sync_devices(): log.info("Syncing device registry...") t0, token = time.time(), get_token() if not token: return resp = api_post("jimi.user.device.list", {"target": TARGET_ACCOUNT}, token) if resp.get("code") != 0: return devices = resp.get("result") or [] upserted = 0 with get_conn() as conn: with conn.cursor() as cur: for d in devices: imei = d.get("imei") if not imei: continue detail_resp = api_post("jimi.track.device.detail", {"imei": imei}, token) dtl = detail_resp.get("result") or {} if detail_resp.get("code") == 0 else {} cur.execute(""" INSERT INTO tracksolid.devices ( imei, device_name, mc_type, mc_type_use_scope, vehicle_name, vehicle_number, vehicle_models, vehicle_icon, vin, engine_number, vehicle_brand, fuel_100km, driver_name, driver_phone, sim, iccid, imsi, account, customer_name, device_group_id, device_group, activation_time, expiration, enabled_flag, status, current_mileage_km, last_synced_at ) VALUES ( %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW() ) ON CONFLICT (imei) DO UPDATE SET device_name = EXCLUDED.device_name, vehicle_number = EXCLUDED.vehicle_number, driver_name = EXCLUDED.driver_name, enabled_flag = EXCLUDED.enabled_flag, current_mileage_km = EXCLUDED.current_mileage_km, last_synced_at = NOW(), updated_at = NOW() """, ( imei, clean(d.get("deviceName")), clean(d.get("mcType")), clean(d.get("mcTypeUseScope")), clean(d.get("vehicleName")), clean(d.get("vehicleNumber")), clean(d.get("vehicleModels")), clean(d.get("vehicleIcon")), clean(dtl.get("vin")), clean(dtl.get("engineNumber")), clean(dtl.get("vehicleBrand")), clean_num(dtl.get("fuel_100km")), clean(d.get("driverName")), clean(d.get("driverPhone")), clean(d.get("sim")), clean(dtl.get("iccid")), clean(dtl.get("imsi")), clean(dtl.get("account")), clean(dtl.get("customerName")), clean(d.get("deviceGroupId")), clean(d.get("deviceGroup")), clean_ts(d.get("activationTime")), clean_ts(d.get("expiration")), clean_int(d.get("enabledFlag", 1)), clean(dtl.get("status", "active")), clean_num(dtl.get("currentMileage")) )) upserted += 1 log_ingestion(cur, "jimi.user.device.list+detail", len(devices), upserted, 0, int((time.time()-t0)*1000), True) conn.commit() log.info("Registry sync: %d devices updated.", upserted) # ── 2. Live Positions (Every 60s) ───────────────────────────────────────────── def poll_live_positions(): t0, token = time.time(), get_token() if not token: return resp = api_post("jimi.user.device.location.list", {"target": TARGET_ACCOUNT, "map_type": "GOOGLE"}, token) if resp.get("code") != 0: return positions = resp.get("result") or [] upserted, inserted = 0, 0 with get_conn() as conn: with conn.cursor() as cur: for p in positions: imei, lat, lng = p.get("imei"), clean_num(p.get("lat")), clean_num(p.get("lng")) if not imei or not is_valid_fix(lat, lng): continue cur.execute(""" INSERT INTO tracksolid.live_positions ( imei, geom, lat, lng, pos_type, confidence, gps_time, hb_time, speed, direction, acc_status, gps_signal, gps_num, elec_quantity, power_value, battery_power_val, tracker_oil, temperature, current_mileage, device_status, loc_desc, recorded_at ) VALUES ( %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326), %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW() ) ON CONFLICT (imei) DO UPDATE SET geom=EXCLUDED.geom, lat=EXCLUDED.lat, lng=EXCLUDED.lng, gps_time=EXCLUDED.gps_time, speed=EXCLUDED.speed, direction=EXCLUDED.direction, acc_status=EXCLUDED.acc_status, current_mileage=EXCLUDED.current_mileage, updated_at=NOW() """, ( imei, lng, lat, lat, lng, clean(p.get("posType")), clean_int(p.get("confidence")), clean_ts(p.get("gpsTime")), clean_ts(p.get("hbTime")), clean_num(p.get("speed")), clean_num(p.get("direction")), clean(p.get("accStatus")), clean_int(p.get("gpsSignal")), clean_int(p.get("gpsNum")), clean_num(p.get("electQuantity")), clean_num(p.get("powerValue")), clean_num(p.get("batteryPowerVal")), clean(p.get("trackerOil")), clean_num(p.get("temperature")), clean_num(p.get("currentMileage")), clean(p.get("status")), clean(p.get("locDesc")) )) upserted += 1 # History (Hypertable Source) if clean_ts(p.get("gpsTime")): cur.execute(""" INSERT INTO tracksolid.position_history (imei, gps_time, geom, lat, lng, speed, direction, acc_status, satellite, current_mileage) VALUES (%s, %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326), %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (imei, gps_time) DO NOTHING """, (imei, clean_ts(p.get("gpsTime")), lng, lat, lat, lng, clean_num(p.get("speed")), clean_num(p.get("direction")), clean(p.get("accStatus")), clean_int(p.get("gpsNum")), clean_num(p.get("currentMileage")))) inserted += 1 log_ingestion(cur, "jimi.user.device.location.list", len(positions), upserted, inserted, int((time.time()-t0)*1000), True) conn.commit() # ── 3. Trip Reports (Every 15m) ─────────────────────────────────────────────── def poll_trips(): token, imeis = get_token(), get_active_imeis() if not token or not imeis: return end_ts = datetime.now(timezone.utc) start_ts = end_ts - timedelta(hours=1) inserted = 0 for i in range(0, len(imeis), 50): batch = imeis[i:i+50] resp = api_post("jimi.device.track.mileage", { "imeis": ",".join(batch), "begin_time": start_ts.strftime("%Y-%m-%d %H:%M:%S"), "end_time": end_ts.strftime("%Y-%m-%d %H:%M:%S") }, token) trips = resp.get("result") or [] with get_conn() as conn: with conn.cursor() as cur: for t in trips: # [FIX-M11] API returns distance in km. Store directly as distance_km. # Previous code multiplied by 1000 (→ mm), which was wrong. dist_km = clean_num(t.get("distance")) cur.execute(""" INSERT INTO tracksolid.trips ( imei, start_time, end_time, distance_km, avg_speed_kmh, max_speed_kmh, driving_time_s, source ) VALUES (%s, %s, %s, %s, %s, %s, %s, 'poll') ON CONFLICT (imei, start_time) DO UPDATE SET end_time = EXCLUDED.end_time, distance_km = EXCLUDED.distance_km, max_speed_kmh = COALESCE(EXCLUDED.max_speed_kmh, tracksolid.trips.max_speed_kmh), driving_time_s = COALESCE(EXCLUDED.driving_time_s, tracksolid.trips.driving_time_s) """, ( t.get("imei"), clean_ts(t.get("startTime")), clean_ts(t.get("endTime")), dist_km, clean_num(t.get("avgSpeed")), clean_num(t.get("maxSpeed")), clean_int(t.get("runTimeSecond")) )) inserted += 1 conn.commit() log.info("Trips: %d records processed.", inserted) # ── 4. Parking Events (Every 15m) ───────────────────────────────────────────── def poll_parking(): t0 = time.time() token, imeis = get_token(), get_active_imeis() if not token or not imeis: return end_ts = datetime.now(timezone.utc) start_ts = end_ts - timedelta(hours=1) inserted = 0 for i in range(0, len(imeis), 50): batch = imeis[i:i+50] # [FIX-M13] Added account + acc_type=0 (all stop types). Without these # the API returns empty results even when parking events exist. resp = api_post("jimi.open.platform.report.parking", { "account": TARGET_ACCOUNT, "imeis": ",".join(batch), "begin_time": start_ts.strftime("%Y-%m-%d %H:%M:%S"), "end_time": end_ts.strftime("%Y-%m-%d %H:%M:%S"), "acc_type": 0, }, token) events = resp.get("result") or [] with get_conn() as conn: with conn.cursor() as cur: for p in events: imei = p.get("imei") start_time = clean_ts(p.get("startTime")) if not imei or not start_time: continue lat, lng = clean_num(p.get("lat")), clean_num(p.get("lng")) cur.execute(""" INSERT INTO tracksolid.parking_events ( imei, event_type, start_time, end_time, duration_seconds, geom, address ) VALUES ( %s, 'parking', %s, %s, %s, CASE WHEN %s IS NOT NULL AND %s IS NOT NULL THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326) ELSE NULL END, %s ) ON CONFLICT (imei, start_time, event_type) DO NOTHING """, ( imei, start_time, clean_ts(p.get("endTime")), clean_int(p.get("durSecond")), # [FIX-M13] API returns durSecond, not seconds lng, lat, lng, lat, clean(p.get("address")) )) inserted += 1 log_ingestion(cur, "jimi.open.platform.report.parking", len(batch), 0, inserted, int((time.time() - t0) * 1000), True) log.info("Parking: %d events processed.", inserted) # ── Main Loop ───────────────────────────────────────────────────────────────── def main(): log.info("Starting MOVEMENT PIPELINE (v2.1)...") # Startup catch-up safe_task(sync_devices, log)() safe_task(poll_live_positions, log)() safe_task(poll_trips, log)() safe_task(poll_parking, log)() # Schedule schedule.every(60).seconds.do(safe_task(poll_live_positions, log)) schedule.every(15).minutes.do(safe_task(poll_trips, log)) schedule.every(15).minutes.do(safe_task(poll_parking, log)) schedule.every().day.at("02:00").do(safe_task(sync_devices, log)) while True: schedule.run_pending() time.sleep(1) if __name__ == "__main__": main()