""" ingest_events_rev.py — Fireside Communications · Tracksolid Events Pipeline ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ RESPONSIBILITY: Alarm event polling (catch-up/fallback for webhook push data). OBD diagnostics are received via the webhook_receiver_rev.py push service — jimi.device.obd.list does not exist in the Tracksolid Pro API. REVISIONS (QA-Verified): [FIX-E01] Batching: Polls 50 IMEIs per call to stay within API limits. [FIX-E03] Atomic Logging: One log row per batch per endpoint. [FIX-E04] Signal Handling: Clean pool closure on SIGTERM/SIGINT. [FIX-E05] Removed poll_obd: OBD data is push-only via /pushobd webhook. [FIX-11] Uses shared safe_task/setup_shutdown from ts_shared_rev (DRY). [FIX-E06] BUG-01: jimi.device.alarm.list returns alertTypeId/alarmTypeName/ alertTime — not alarmType/alarmName/alarmTime (those are webhook field names). Corrected field mapping so alarm_type and alarm_name are no longer silently stored as NULL. ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ """ import time import schedule from datetime import datetime, timezone, timedelta from ts_shared_rev import ( api_post, get_active_imeis, get_conn, get_token, log_ingestion, clean, clean_num, clean_int, clean_ts, get_logger, safe_task, setup_shutdown, ) log = get_logger("events") setup_shutdown(log) # ── 1. Alarms & Geofence Events (Every 5m) ──────────────────────────────────── def poll_alarms(): log.info("Polling device alarms...") t0, token, imeis = time.time(), get_token(), get_active_imeis() if not token or not imeis: return end_ts = datetime.now(timezone.utc) start_ts = end_ts - timedelta(minutes=30) # Look back 30m to ensure coverage inserted = 0 with get_conn() as conn: with conn.cursor() as cur: for i in range(0, len(imeis), 50): batch = imeis[i:i+50] resp = api_post("jimi.device.alarm.list", { "imeis": ",".join(batch), "begin_time": start_ts.strftime("%Y-%m-%d %H:%M:%S"), "end_time": end_ts.strftime("%Y-%m-%d %H:%M:%S"), "page_size": 100 }, token) alarms = resp.get("result") or [] if not alarms: continue for a in alarms: try: cur.execute("SAVEPOINT sp") lat, lng = clean_num(a.get("lat")), clean_num(a.get("lng")) # [FIX-E06] Poll response uses alertTypeId/alarmTypeName/alertTime, # not alarmType/alarmName/alarmTime (those are webhook push field names). alarm_type = clean(a.get("alertTypeId")) alarm_name = clean(a.get("alarmTypeName")) alarm_time = clean_ts(a.get("alertTime")) cur.execute(""" INSERT INTO tracksolid.alarms ( imei, alarm_type, alarm_name, alarm_time, geom, lat, lng, speed, acc_status, source, updated_at ) VALUES ( %s, %s, %s, %s, CASE WHEN %s IS NOT NULL AND %s IS NOT NULL THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326) ELSE NULL END, %s, %s, %s, %s, 'poll', NOW() ) ON CONFLICT (imei, alarm_type, alarm_time) DO NOTHING """, ( a.get("imei"), alarm_type, alarm_name, alarm_time, lng, lat, lng, lat, lat, lng, clean_num(a.get("speed")), clean(a.get("accStatus")) )) cur.execute("RELEASE SAVEPOINT sp") inserted += cur.rowcount except Exception: cur.execute("ROLLBACK TO SAVEPOINT sp") log.warning("Failed to process alarm for %s", a.get("imei"), exc_info=True) log_ingestion(cur, "jimi.device.alarm.list", len(imeis), 0, inserted, int((time.time()-t0)*1000), True) log.info("Alarms: %d new events inserted.", inserted) # ── Main Loop ───────────────────────────────────────────────────────────────── def main(): log.info("Starting EVENTS PIPELINE (v2.1)...") # OBD removed: Data arrives via webhook push (/pushobd), not polling. # Startup catch-up safe_task(poll_alarms, log)() # Schedule schedule.every(5).minutes.do(safe_task(poll_alarms, log)) while True: schedule.run_pending() time.sleep(1) if __name__ == "__main__": main()