""" ingest_events_rev.py — Fireside Communications · Tracksolid Events Pipeline ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ RESPONSIBILITY: Alarm event polling (catch-up/fallback for webhook push data). OBD diagnostics are received via the webhook_receiver_rev.py push service — jimi.device.obd.list does not exist in the Tracksolid Pro API. REVISIONS (QA-Verified): [FIX-E01] Batching: Polls 50 IMEIs per call to stay within API limits. [FIX-E03] Atomic Logging: One log row per batch per endpoint. [FIX-E04] Signal Handling: Clean pool closure on SIGTERM/SIGINT. [FIX-E05] Removed poll_obd: OBD data is push-only via /pushobd webhook. [FIX-11] Uses shared safe_task/setup_shutdown from ts_shared_rev (DRY). ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ """ import time import schedule from datetime import datetime, timezone, timedelta from ts_shared_rev import ( api_post, get_active_imeis, get_conn, get_token, log_ingestion, clean, clean_num, clean_int, clean_ts, get_logger, safe_task, setup_shutdown, ) log = get_logger("events") setup_shutdown(log) # ── 1. Alarms & Geofence Events (Every 5m) ──────────────────────────────────── def poll_alarms(): log.info("Polling device alarms...") t0, token, imeis = time.time(), get_token(), get_active_imeis() if not token or not imeis: return end_ts = datetime.now(timezone.utc) start_ts = end_ts - timedelta(minutes=30) # Look back 30m to ensure coverage inserted = 0 for i in range(0, len(imeis), 50): batch = imeis[i:i+50] resp = api_post("jimi.device.alarm.list", { "imeis": ",".join(batch), "begin_time": start_ts.strftime("%Y-%m-%d %H:%M:%S"), "end_time": end_ts.strftime("%Y-%m-%d %H:%M:%S"), "page_size": 100 }, token) alarms = resp.get("result") or [] if not alarms: continue with get_conn() as conn: with conn.cursor() as cur: for a in alarms: lat, lng = clean_num(a.get("lat")), clean_num(a.get("lng")) cur.execute(""" INSERT INTO tracksolid.alarms ( imei, alarm_type, alarm_time, geom, lat, lng, speed, acc_status, updated_at ) VALUES ( %s, %s, %s, CASE WHEN %s IS NOT NULL AND %s IS NOT NULL THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326) ELSE NULL END, %s, %s, %s, %s, NOW() ) ON CONFLICT (imei, alarm_type, alarm_time) DO NOTHING """, ( a.get("imei"), clean(a.get("alarmType")), clean_ts(a.get("alarmTime")), lng, lat, lng, lat, lat, lng, clean_num(a.get("speed")), clean(a.get("accStatus")) )) inserted += 1 log_ingestion(cur, "jimi.device.alarm.list", len(batch), 0, inserted, int((time.time()-t0)*1000), True) conn.commit() log.info("Alarms: %d new events inserted.", inserted) # ── Main Loop ───────────────────────────────────────────────────────────────── def main(): log.info("Starting EVENTS PIPELINE (v2.1)...") # OBD removed: Data arrives via webhook push (/pushobd), not polling. # Startup catch-up safe_task(poll_alarms, log)() # Schedule schedule.every(5).minutes.do(safe_task(poll_alarms, log)) while True: schedule.run_pending() time.sleep(1) if __name__ == "__main__": main()