tracksolid_timescale_grafan.../ingest_events_rev.py
david kiania b11294009b
Some checks are pending
Static Analysis / static (push) Waiting to run
Tests / test (push) Waiting to run
Static Analysis / static (pull_request) Waiting to run
Tests / test (pull_request) Waiting to run
fix(security,ingest): 260702 audit — secure the stack, correct poller counters
Security:
- .dockerignore + Dockerfile: stop baking .env / the 346MB OSM pbf into image
  layers; install pinned from uv.lock (reproducible builds) (SEC-04/05).
- docker-compose: DB port binds ${DB_BIND_ADDR:-127.0.0.1} — loopback-only by
  default; remote tooling moves to an SSH tunnel (SEC-01).
- webhook_receiver: CRITICAL startup warning + WEBHOOK_REQUIRE_TOKEN=1 fail-closed
  when JIMI_WEBHOOK_TOKEN is empty (SEC-02 / FIX-W01).

Correctness:
- FIX-M22/E07: capture cur.rowcount BEFORE RELEASE SAVEPOINT in poll_alarms/
  poll_trips/poll_parking — the RELEASE reported -1, producing "Alarms: -4 new
  events inserted" logs and negative ingestion_log.rows_inserted.
- FIX-W02: parse application/json push bodies (were silently dropped).
- FIX-W03: move webhook DB work off the event loop via asyncio.to_thread.
- FIX-M23: poll_trips phased so no txn/connection is held across Tracksolid +
  Nominatim (1 req/s) network calls.
- FIX-M24: sync_devices disables devices absent from every target (guarded).
- FIX-W04: reject device-clock-garbage alarm_time (2019 timestamps observed).
- get_token(): don't relabel already-aware timestamptz expiries (BUG-P9).

Observability/lifecycle:
- migration 21: v_ingest_health restricted to active pipeline endpoints so
  one-shot tools stop wedging /health/ingest at 'stale' (dry-run verified).
- FIX-M25: daily purge_audit_logs() trims ingestion_log (90d) + refresh_log (180d).
- remove orphaned duplicate migrations/10_driver_clock_views.sql; ruff lint config.

+5 webhook tests (82 pass). Report/plan/work-log in docs/reports/260702_*.
Local only; not deployed. CLAUDE.md fix-history edits left uncommitted (that file
also carries unrelated in-progress edits).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-07-02 09:51:02 +03:00

134 lines
No EOL
5.9 KiB
Python

"""
ingest_events_rev.py — Fireside Communications · Tracksolid Events Pipeline
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
RESPONSIBILITY: Alarm event polling (catch-up/fallback for webhook push data).
OBD diagnostics are received via the webhook_receiver_rev.py push service —
jimi.device.obd.list does not exist in the Tracksolid Pro API.
REVISIONS (QA-Verified):
[FIX-E01] Batching: Polls 50 IMEIs per call to stay within API limits.
[FIX-E03] Atomic Logging: One log row per batch per endpoint.
[FIX-E04] Signal Handling: Clean pool closure on SIGTERM/SIGINT.
[FIX-E05] Removed poll_obd: OBD data is push-only via /pushobd webhook.
[FIX-11] Uses shared safe_task/setup_shutdown from ts_shared_rev (DRY).
[FIX-E06] BUG-01: jimi.device.alarm.list returns alertTypeId/alarmTypeName/
alertTime — not alarmType/alarmName/alarmTime (those are webhook
field names). Corrected field mapping so alarm_type and alarm_name
are no longer silently stored as NULL.
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
"""
import time
import schedule
from datetime import datetime, timezone, timedelta
from ts_shared_rev import (
api_post,
get_active_imeis,
get_conn,
get_token,
log_ingestion,
clean,
clean_num,
clean_ts,
get_logger,
safe_task,
setup_shutdown,
)
log = get_logger("events")
setup_shutdown(log)
# ── 1. Alarms & Geofence Events (Every 5m) ────────────────────────────────────
def poll_alarms():
log.info("Polling device alarms...")
t0, token, imeis = time.time(), get_token(), get_active_imeis()
if not token or not imeis: return
end_ts = datetime.now(timezone.utc)
start_ts = end_ts - timedelta(minutes=30) # Look back 30m to ensure coverage
inserted = 0
with get_conn() as conn:
with conn.cursor() as cur:
for i in range(0, len(imeis), 50):
batch = imeis[i:i+50]
resp = api_post("jimi.device.alarm.list", {
"imeis": ",".join(batch),
"begin_time": start_ts.strftime("%Y-%m-%d %H:%M:%S"),
"end_time": end_ts.strftime("%Y-%m-%d %H:%M:%S"),
"page_size": 100
}, token)
alarms = resp.get("result") or []
if not alarms: continue
for a in alarms:
try:
cur.execute("SAVEPOINT sp")
lat, lng = clean_num(a.get("lat")), clean_num(a.get("lng"))
# [FIX-E06] Poll response uses alertTypeId/alarmTypeName/alertTime,
# not alarmType/alarmName/alarmTime (those are webhook push field names).
alarm_type = clean(a.get("alertTypeId"))
alarm_name = clean(a.get("alarmTypeName"))
alarm_time = clean_ts(a.get("alertTime"))
cur.execute("""
INSERT INTO tracksolid.alarms (
imei, alarm_type, alarm_name, alarm_time, geom, lat, lng,
speed, acc_status, source, updated_at
) VALUES (
%s, %s, %s, %s,
CASE WHEN %s IS NOT NULL AND %s IS NOT NULL
THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326)
ELSE NULL END,
%s, %s, %s, %s, 'poll', NOW()
) ON CONFLICT (imei, alarm_type, alarm_time) DO NOTHING
""", (
a.get("imei"), alarm_type, alarm_name, alarm_time,
lng, lat, lng, lat, lat, lng,
clean_num(a.get("speed")), clean(a.get("accStatus"))
))
# [FIX-E07] Capture rowcount BEFORE RELEASE SAVEPOINT —
# reading it after counts the RELEASE (-1), which is why
# logs showed "Alarms: -4 new events inserted".
row_inserted = cur.rowcount
cur.execute("RELEASE SAVEPOINT sp")
inserted += row_inserted
except Exception:
cur.execute("ROLLBACK TO SAVEPOINT sp")
log.warning("Failed to process alarm for %s", a.get("imei"), exc_info=True)
log_ingestion(cur, "jimi.device.alarm.list", len(imeis), 0, inserted,
int((time.time()-t0)*1000), True)
log.info("Alarms: %d new events inserted.", inserted)
# ── Main Loop ─────────────────────────────────────────────────────────────────
def startup_catchup():
"""Run the alarm poll once on boot. Split out of main() so the merged
ingest_worker can reuse it (DRY).
OBD removed: data arrives via webhook push (/pushobd), not polling."""
safe_task(poll_alarms, log)()
def register_jobs():
"""Register the events jobs on the global `schedule` scheduler.
Reused by both this module's main() and ingest_worker_rev.main()."""
schedule.every(5).minutes.do(safe_task(poll_alarms, log))
def main():
log.info("Starting EVENTS PIPELINE (v2.1)...")
startup_catchup()
register_jobs()
while True:
schedule.run_pending()
time.sleep(1)
if __name__ == "__main__":
main()