tracksolid_timescale_grafan.../ingest_events_rev.py

108 lines
4.2 KiB
Python
Raw Normal View History

2026-04-07 18:34:40 +00:00
"""
ingest_events_rev.py Fireside Communications · Tracksolid Events Pipeline
RESPONSIBILITY: Alarm event polling (catch-up/fallback for webhook push data).
OBD diagnostics are received via the webhook_receiver_rev.py push service
jimi.device.obd.list does not exist in the Tracksolid Pro API.
2026-04-07 18:34:40 +00:00
REVISIONS (QA-Verified):
[FIX-E01] Batching: Polls 50 IMEIs per call to stay within API limits.
[FIX-E03] Atomic Logging: One log row per batch per endpoint.
[FIX-E04] Signal Handling: Clean pool closure on SIGTERM/SIGINT.
[FIX-E05] Removed poll_obd: OBD data is push-only via /pushobd webhook.
[FIX-11] Uses shared safe_task/setup_shutdown from ts_shared_rev (DRY).
2026-04-07 18:34:40 +00:00
"""
import time
import schedule
from datetime import datetime, timezone, timedelta
from ts_shared_rev import (
api_post,
get_active_imeis,
get_conn,
get_token,
log_ingestion,
clean,
clean_num,
clean_int,
clean_ts,
get_logger,
safe_task,
setup_shutdown,
2026-04-07 18:34:40 +00:00
)
log = get_logger("events")
setup_shutdown(log)
2026-04-07 18:34:40 +00:00
# ── 1. Alarms & Geofence Events (Every 5m) ────────────────────────────────────
def poll_alarms():
log.info("Polling device alarms...")
t0, token, imeis = time.time(), get_token(), get_active_imeis()
if not token or not imeis: return
end_ts = datetime.now(timezone.utc)
start_ts = end_ts - timedelta(minutes=30) # Look back 30m to ensure coverage
inserted = 0
for i in range(0, len(imeis), 50):
batch = imeis[i:i+50]
resp = api_post("jimi.device.alarm.list", {
"imeis": ",".join(batch),
"begin_time": start_ts.strftime("%Y-%m-%d %H:%M:%S"),
"end_time": end_ts.strftime("%Y-%m-%d %H:%M:%S"),
"page_size": 100
}, token)
alarms = resp.get("result") or []
2026-04-07 18:34:40 +00:00
if not alarms: continue
with get_conn() as conn:
with conn.cursor() as cur:
for a in alarms:
lat, lng = clean_num(a.get("lat")), clean_num(a.get("lng"))
cur.execute("""
INSERT INTO tracksolid.alarms (
imei, alarm_type, alarm_time, geom, lat, lng,
2026-04-07 18:34:40 +00:00
speed, acc_status, updated_at
) VALUES (
%s, %s, %s,
CASE WHEN %s IS NOT NULL AND %s IS NOT NULL
THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326)
ELSE NULL END,
2026-04-07 18:34:40 +00:00
%s, %s, %s, %s, NOW()
) ON CONFLICT (imei, alarm_type, alarm_time) DO NOTHING
2026-04-07 18:34:40 +00:00
""", (
a.get("imei"), clean(a.get("alarmType")), clean_ts(a.get("alarmTime")),
lng, lat, lng, lat, lat, lng,
clean_num(a.get("speed")), clean(a.get("accStatus"))
))
inserted += 1
log_ingestion(cur, "jimi.device.alarm.list", len(batch), 0, inserted, int((time.time()-t0)*1000), True)
conn.commit()
log.info("Alarms: %d new events inserted.", inserted)
# ── Main Loop ─────────────────────────────────────────────────────────────────
def main():
log.info("Starting EVENTS PIPELINE (v2.1)...")
# OBD removed: Data arrives via webhook push (/pushobd), not polling.
2026-04-07 18:34:40 +00:00
# Startup catch-up
safe_task(poll_alarms, log)()
2026-04-07 18:34:40 +00:00
# Schedule
schedule.every(5).minutes.do(safe_task(poll_alarms, log))
2026-04-07 18:34:40 +00:00
while True:
schedule.run_pending()
time.sleep(1)
if __name__ == "__main__":
main()