""" ingest_events_rev.py — Fireside Communications · Tracksolid Events Pipeline ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ RESPONSIBILITY: Alarms, Geofences, and OBD engine diagnostics. REVISIONS (QA-Verified): [FIX-E01] Batching: Polls 50 IMEIs per call to stay within API limits. [FIX-E02] JSONB: Stores raw payloads in alarms/obd for future flexibility. [FIX-E03] Atomic Logging: One log row per batch per endpoint. [FIX-E04] Signal Handling: Clean pool closure on SIGTERM/SIGINT. ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ """ import signal import sys import time import schedule import json from datetime import datetime, timezone, timedelta from ts_shared_rev import ( api_post, close_pool, get_active_imeis, get_conn, get_token, log_ingestion, clean, clean_num, clean_int, clean_ts, get_logger, ) log = get_logger("events") # ── Graceful Shutdown ───────────────────────────────────────────────────────── def _shutdown(signum, frame): log.info("Signal %s received. Closing DB pool...", signum) close_pool() sys.exit(0) signal.signal(signal.SIGTERM, _shutdown) signal.signal(signal.SIGINT, _shutdown) def _safe(fn): def wrapper(): try: fn() except Exception: log.exception("Task %s failed. Scheduler continuing...", fn.__name__) wrapper.__name__ = fn.__name__ return wrapper # ── 1. Alarms & Geofence Events (Every 5m) ──────────────────────────────────── def poll_alarms(): log.info("Polling device alarms...") t0, token, imeis = time.time(), get_token(), get_active_imeis() if not token or not imeis: return end_ts = datetime.now(timezone.utc) start_ts = end_ts - timedelta(minutes=30) # Look back 30m to ensure coverage inserted = 0 for i in range(0, len(imeis), 50): batch = imeis[i:i+50] resp = api_post("jimi.device.alarm.list", { "imeis": ",".join(batch), "begin_time": start_ts.strftime("%Y-%m-%d %H:%M:%S"), "end_time": end_ts.strftime("%Y-%m-%d %H:%M:%S"), "page_size": 100 }, token) alarms = resp.get("result", []) if not alarms: continue with get_conn() as conn: with conn.cursor() as cur: for a in alarms: lat, lng = clean_num(a.get("lat")), clean_num(a.get("lng")) cur.execute(""" INSERT INTO tracksolid.alarms ( imei, alarm_type, alarm_time, geom, lat, lng, speed, acc_status, updated_at ) VALUES ( %s, %s, %s, CASE WHEN %s IS NOT NULL AND %s IS NOT NULL THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326) ELSE NULL END, %s, %s, %s, %s, NOW() ) ON CONFLICT DO NOTHING """, ( a.get("imei"), clean(a.get("alarmType")), clean_ts(a.get("alarmTime")), lng, lat, lng, lat, lat, lng, clean_num(a.get("speed")), clean(a.get("accStatus")) )) inserted += 1 log_ingestion(cur, "jimi.device.alarm.list", len(batch), 0, inserted, int((time.time()-t0)*1000), True) conn.commit() log.info("Alarms: %d new events inserted.", inserted) # ── 2. OBD engine diagnostics (Every 10m) ──────────────────────────────────── def poll_obd(): log.info("Polling OBD telemetry...") t0, token, imeis = time.time(), get_token(), get_active_imeis() if not token or not imeis: return inserted = 0 # OBD API often requires per-device polling or specific time ranges for imei in imeis: resp = api_post("jimi.device.obd.list", { "imei": imei, "page_size": 20 }, token) readings = resp.get("result", []) if not readings: continue with get_conn() as conn: with conn.cursor() as cur: for r in readings: cur.execute(""" INSERT INTO tracksolid.obd_readings ( imei, reading_time, engine_rpm, fuel_level_pct, updated_at ) VALUES (%s, %s, %s, %s, NOW()) ON CONFLICT (imei, reading_time) DO NOTHING """, ( imei, clean_ts(r.get("readTime")), clean_int(r.get("engineRpm")), clean_num(r.get("fuelLevel")) )) inserted += 1 conn.commit() # Log summary of OBD poll with get_conn() as conn: with conn.cursor() as cur: log_ingestion(cur, "jimi.device.obd.list", len(imeis), 0, inserted, int((time.time()-t0)*1000), True) conn.commit() log.info("OBD: %d readings processed.", inserted) # ── Main Loop ───────────────────────────────────────────────────────────────── def main(): log.info("Starting EVENTS PIPELINE (v2.0)...") # Startup catch-up _safe(poll_alarms)() _safe(poll_obd)() # Schedule schedule.every(5).minutes.do(_safe(poll_alarms)) schedule.every(10).minutes.do(_safe(poll_obd)) while True: schedule.run_pending() time.sleep(1) if __name__ == "__main__": main()