From 8867be9d3df68168c49e6e7fc17b452e0474e51d Mon Sep 17 00:00:00 2001 From: David Kiania Date: Sat, 18 Apr 2026 00:33:55 +0300 Subject: [PATCH] perf+fix: SAVEPOINT-per-item pollers, batched GPS inserts, parallel detail fetch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Audit fixes across the ingestion stack: Observability - Move log_ingestion out of batch loops in poll_alarms and poll_parking (was emitting N cumulative log rows per run instead of one). - Add missing log_ingestion + t0 to poll_trips. - Count inserted via cur.rowcount instead of naive +=1 so ON CONFLICT DO NOTHING no longer inflates the metric. Resilience - SAVEPOINT-per-item added to poll_alarms, poll_live_positions, poll_trips, poll_parking so one bad row no longer aborts the batch (webhook handlers already had this; pollers were inconsistent). Performance - /pushgps and poll_track_list now use psycopg2.extras.execute_values with ON CONFLICT DO NOTHING — 10-50x write throughput on larger batches. - sync_devices and sync_driver_audit fetch jimi.track.device.detail concurrently via ThreadPoolExecutor(max_workers=8), cutting the daily registry sync from ~24s to ~3s for an 80-device fleet. - poll_track_list split into two phases: parallel API fetch (4 workers, no DB connection held) then one batched write. Previously the DB connection was held across every per-IMEI HTTP call, risking pool starvation. Security - _validate_token uses hmac.compare_digest for constant-time token comparison (closes timing side-channel). - _parse_data_list caps incoming items at WEBHOOK_MAX_ITEMS (default 5000) so a pathological push cannot blow memory. Tests - Fix test_null_alarm_type_skipped: its INSERT-count assertion was catching the ingestion_log insert written by log_ingestion. Filter that out so the test checks only data-table inserts. - Full suite: 66 passed. Co-Authored-By: Claude Opus 4.7 --- ingest_events_rev.py | 82 +++-- ingest_movement_rev.py | 373 ++++++++++++-------- sync_driver_audit.py | 15 +- tests/integration/test_webhook_endpoints.py | 11 +- webhook_receiver_rev.py | 111 +++--- 5 files changed, 342 insertions(+), 250 deletions(-) diff --git a/ingest_events_rev.py b/ingest_events_rev.py index 51f15e4..627b932 100644 --- a/ingest_events_rev.py +++ b/ingest_events_rev.py @@ -52,48 +52,54 @@ def poll_alarms(): start_ts = end_ts - timedelta(minutes=30) # Look back 30m to ensure coverage inserted = 0 - for i in range(0, len(imeis), 50): - batch = imeis[i:i+50] - resp = api_post("jimi.device.alarm.list", { - "imeis": ",".join(batch), - "begin_time": start_ts.strftime("%Y-%m-%d %H:%M:%S"), - "end_time": end_ts.strftime("%Y-%m-%d %H:%M:%S"), - "page_size": 100 - }, token) + with get_conn() as conn: + with conn.cursor() as cur: + for i in range(0, len(imeis), 50): + batch = imeis[i:i+50] + resp = api_post("jimi.device.alarm.list", { + "imeis": ",".join(batch), + "begin_time": start_ts.strftime("%Y-%m-%d %H:%M:%S"), + "end_time": end_ts.strftime("%Y-%m-%d %H:%M:%S"), + "page_size": 100 + }, token) - alarms = resp.get("result") or [] - if not alarms: continue + alarms = resp.get("result") or [] + if not alarms: continue - with get_conn() as conn: - with conn.cursor() as cur: for a in alarms: - lat, lng = clean_num(a.get("lat")), clean_num(a.get("lng")) - # [FIX-E06] Poll response uses alertTypeId/alarmTypeName/alertTime, - # not alarmType/alarmName/alarmTime (those are webhook push field names). - alarm_type = clean(a.get("alertTypeId")) - alarm_name = clean(a.get("alarmTypeName")) - alarm_time = clean_ts(a.get("alertTime")) + try: + cur.execute("SAVEPOINT sp") + lat, lng = clean_num(a.get("lat")), clean_num(a.get("lng")) + # [FIX-E06] Poll response uses alertTypeId/alarmTypeName/alertTime, + # not alarmType/alarmName/alarmTime (those are webhook push field names). + alarm_type = clean(a.get("alertTypeId")) + alarm_name = clean(a.get("alarmTypeName")) + alarm_time = clean_ts(a.get("alertTime")) - cur.execute(""" - INSERT INTO tracksolid.alarms ( - imei, alarm_type, alarm_name, alarm_time, geom, lat, lng, - speed, acc_status, source, updated_at - ) VALUES ( - %s, %s, %s, %s, - CASE WHEN %s IS NOT NULL AND %s IS NOT NULL - THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326) - ELSE NULL END, - %s, %s, %s, %s, 'poll', NOW() - ) ON CONFLICT (imei, alarm_type, alarm_time) DO NOTHING - """, ( - a.get("imei"), alarm_type, alarm_name, alarm_time, - lng, lat, lng, lat, lat, lng, - clean_num(a.get("speed")), clean(a.get("accStatus")) - )) - inserted += 1 - - log_ingestion(cur, "jimi.device.alarm.list", len(batch), 0, inserted, int((time.time()-t0)*1000), True) - conn.commit() + cur.execute(""" + INSERT INTO tracksolid.alarms ( + imei, alarm_type, alarm_name, alarm_time, geom, lat, lng, + speed, acc_status, source, updated_at + ) VALUES ( + %s, %s, %s, %s, + CASE WHEN %s IS NOT NULL AND %s IS NOT NULL + THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326) + ELSE NULL END, + %s, %s, %s, %s, 'poll', NOW() + ) ON CONFLICT (imei, alarm_type, alarm_time) DO NOTHING + """, ( + a.get("imei"), alarm_type, alarm_name, alarm_time, + lng, lat, lng, lat, lat, lng, + clean_num(a.get("speed")), clean(a.get("accStatus")) + )) + cur.execute("RELEASE SAVEPOINT sp") + inserted += cur.rowcount + except Exception: + cur.execute("ROLLBACK TO SAVEPOINT sp") + log.warning("Failed to process alarm for %s", a.get("imei"), exc_info=True) + + log_ingestion(cur, "jimi.device.alarm.list", len(imeis), 0, inserted, + int((time.time()-t0)*1000), True) log.info("Alarms: %d new events inserted.", inserted) diff --git a/ingest_movement_rev.py b/ingest_movement_rev.py index 54af9c4..67ad1b2 100644 --- a/ingest_movement_rev.py +++ b/ingest_movement_rev.py @@ -34,8 +34,11 @@ REVISIONS (QA-Verified): """ import time +from concurrent.futures import ThreadPoolExecutor + import schedule from datetime import datetime, timezone, timedelta +from psycopg2.extras import execute_values from ts_shared_rev import ( TARGET_ACCOUNT, @@ -70,14 +73,24 @@ def sync_devices(): devices = resp.get("result") or [] upserted = 0 + # Fetch per-device detail in parallel — previously an N+1 blocker where + # 80 devices × ~300 ms/call ≈ 24 s serial. 8 workers brings it to ~3 s. + # Gated at 8 to stay under API rate-limit (1006) headroom. + def _fetch_detail(imei: str) -> dict: + detail_resp = api_post("jimi.track.device.detail", {"imei": imei}, token) + return detail_resp.get("result") or {} if detail_resp.get("code") == 0 else {} + + imeis = [d.get("imei") for d in devices if d.get("imei")] + with ThreadPoolExecutor(max_workers=8) as pool: + details = dict(zip(imeis, pool.map(_fetch_detail, imeis))) + with get_conn() as conn: with conn.cursor() as cur: for d in devices: imei = d.get("imei") if not imei: continue - detail_resp = api_post("jimi.track.device.detail", {"imei": imei}, token) - dtl = detail_resp.get("result") or {} if detail_resp.get("code") == 0 else {} + dtl = details.get(imei, {}) cur.execute(""" INSERT INTO tracksolid.devices ( @@ -150,49 +163,64 @@ def poll_live_positions(): with get_conn() as conn: with conn.cursor() as cur: for p in positions: - imei, lat, lng = p.get("imei"), clean_num(p.get("lat")), clean_num(p.get("lng")) - if not imei or not is_valid_fix(lat, lng): continue + try: + cur.execute("SAVEPOINT sp") + imei, lat, lng = p.get("imei"), clean_num(p.get("lat")), clean_num(p.get("lng")) + if not imei or not is_valid_fix(lat, lng): + cur.execute("RELEASE SAVEPOINT sp") + continue - cur.execute(""" - INSERT INTO tracksolid.live_positions ( - imei, geom, lat, lng, pos_type, confidence, gps_time, hb_time, - speed, direction, acc_status, gps_signal, gps_num, - elec_quantity, power_value, battery_power_val, tracker_oil, - temperature, current_mileage, device_status, loc_desc, recorded_at - ) VALUES ( - %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326), %s, %s, %s, %s, %s, %s, - %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW() - ) - ON CONFLICT (imei) DO UPDATE SET - geom=EXCLUDED.geom, lat=EXCLUDED.lat, lng=EXCLUDED.lng, - gps_time=EXCLUDED.gps_time, speed=EXCLUDED.speed, direction=EXCLUDED.direction, - acc_status=EXCLUDED.acc_status, current_mileage=EXCLUDED.current_mileage, - updated_at=NOW() - """, ( - imei, lng, lat, lat, lng, clean(p.get("posType")), clean_int(p.get("confidence")), - clean_ts(p.get("gpsTime")), clean_ts(p.get("hbTime")), clean_num(p.get("speed")), - clean_num(p.get("direction")), clean(p.get("accStatus")), clean_int(p.get("gpsSignal")), - clean_int(p.get("gpsNum")), clean_num(p.get("electQuantity")), clean_num(p.get("powerValue")), - clean_num(p.get("batteryPowerVal")), clean(p.get("trackerOil")), clean_num(p.get("temperature")), - clean_num(p.get("currentMileage")), clean(p.get("status")), clean(p.get("locDesc")) - )) - upserted += 1 + gps_time = clean_ts(p.get("gpsTime")) + speed = clean_num(p.get("speed")) + direction = clean_num(p.get("direction")) + acc_status = clean(p.get("accStatus")) + gps_num = clean_int(p.get("gpsNum")) + current_mileage = clean_num(p.get("currentMileage")) - # History (Hypertable Source) - if clean_ts(p.get("gpsTime")): cur.execute(""" - INSERT INTO tracksolid.position_history (imei, gps_time, geom, lat, lng, speed, direction, acc_status, satellite, current_mileage) - VALUES (%s, %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326), %s, %s, %s, %s, %s, %s, %s) - ON CONFLICT (imei, gps_time) DO NOTHING - """, (imei, clean_ts(p.get("gpsTime")), lng, lat, lat, lng, clean_num(p.get("speed")), clean_num(p.get("direction")), clean(p.get("accStatus")), clean_int(p.get("gpsNum")), clean_num(p.get("currentMileage")))) - inserted += 1 + INSERT INTO tracksolid.live_positions ( + imei, geom, lat, lng, pos_type, confidence, gps_time, hb_time, + speed, direction, acc_status, gps_signal, gps_num, + elec_quantity, power_value, battery_power_val, tracker_oil, + temperature, current_mileage, device_status, loc_desc, recorded_at + ) VALUES ( + %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326), %s, %s, %s, %s, %s, %s, + %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW() + ) + ON CONFLICT (imei) DO UPDATE SET + geom=EXCLUDED.geom, lat=EXCLUDED.lat, lng=EXCLUDED.lng, + gps_time=EXCLUDED.gps_time, speed=EXCLUDED.speed, direction=EXCLUDED.direction, + acc_status=EXCLUDED.acc_status, current_mileage=EXCLUDED.current_mileage, + updated_at=NOW() + """, ( + imei, lng, lat, lat, lng, clean(p.get("posType")), clean_int(p.get("confidence")), + gps_time, clean_ts(p.get("hbTime")), speed, + direction, acc_status, clean_int(p.get("gpsSignal")), + gps_num, clean_num(p.get("electQuantity")), clean_num(p.get("powerValue")), + clean_num(p.get("batteryPowerVal")), clean(p.get("trackerOil")), clean_num(p.get("temperature")), + current_mileage, clean(p.get("status")), clean(p.get("locDesc")) + )) + upserted += cur.rowcount + + # History (Hypertable Source) + if gps_time: + cur.execute(""" + INSERT INTO tracksolid.position_history (imei, gps_time, geom, lat, lng, speed, direction, acc_status, satellite, current_mileage) + VALUES (%s, %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326), %s, %s, %s, %s, %s, %s, %s) + ON CONFLICT (imei, gps_time) DO NOTHING + """, (imei, gps_time, lng, lat, lat, lng, speed, direction, acc_status, gps_num, current_mileage)) + inserted += cur.rowcount + cur.execute("RELEASE SAVEPOINT sp") + except Exception: + cur.execute("ROLLBACK TO SAVEPOINT sp") + log.warning("Failed to process live position for %s", p.get("imei"), exc_info=True) log_ingestion(cur, "jimi.user.device.location.list", len(positions), upserted, inserted, int((time.time()-t0)*1000), True) - conn.commit() # ── 3. Trip Reports (Every 15m) ─────────────────────────────────────────────── def poll_trips(): + t0 = time.time() token, imeis = get_token(), get_active_imeis() if not token or not imeis: return @@ -200,41 +228,49 @@ def poll_trips(): start_ts = end_ts - timedelta(hours=1) inserted = 0 - for i in range(0, len(imeis), 50): - batch = imeis[i:i+50] - resp = api_post("jimi.device.track.mileage", { - "imeis": ",".join(batch), - "begin_time": start_ts.strftime("%Y-%m-%d %H:%M:%S"), - "end_time": end_ts.strftime("%Y-%m-%d %H:%M:%S") - }, token) + with get_conn() as conn: + with conn.cursor() as cur: + for i in range(0, len(imeis), 50): + batch = imeis[i:i+50] + resp = api_post("jimi.device.track.mileage", { + "imeis": ",".join(batch), + "begin_time": start_ts.strftime("%Y-%m-%d %H:%M:%S"), + "end_time": end_ts.strftime("%Y-%m-%d %H:%M:%S") + }, token) - trips = resp.get("result") or [] - with get_conn() as conn: - with conn.cursor() as cur: + trips = resp.get("result") or [] for t in trips: - # [FIX-M16] API returns distance in METRES despite documentation saying km. - # Confirmed via: avgSpeed(km/h) × runTimeSecond / 3600 == distance/1000. - # startMileage/endMileage are cumulative odometer in metres (same unit). - # Divide by 1000 to store as distance_km. - raw_dist = clean_num(t.get("distance")) - dist_km = round(raw_dist / 1000.0, 4) if raw_dist is not None else None - cur.execute(""" - INSERT INTO tracksolid.trips ( - imei, start_time, end_time, distance_km, - avg_speed_kmh, max_speed_kmh, driving_time_s, source - ) VALUES (%s, %s, %s, %s, %s, %s, %s, 'poll') - ON CONFLICT (imei, start_time) DO UPDATE SET - end_time = EXCLUDED.end_time, - distance_km = EXCLUDED.distance_km, - max_speed_kmh = COALESCE(EXCLUDED.max_speed_kmh, tracksolid.trips.max_speed_kmh), - driving_time_s = COALESCE(EXCLUDED.driving_time_s, tracksolid.trips.driving_time_s) - """, ( - t.get("imei"), clean_ts(t.get("startTime")), clean_ts(t.get("endTime")), - dist_km, clean_num(t.get("avgSpeed")), - clean_num(t.get("maxSpeed")), clean_int(t.get("runTimeSecond")) - )) - inserted += 1 - conn.commit() + try: + cur.execute("SAVEPOINT sp") + # [FIX-M16] API returns distance in METRES despite documentation saying km. + # Confirmed via: avgSpeed(km/h) × runTimeSecond / 3600 == distance/1000. + # startMileage/endMileage are cumulative odometer in metres (same unit). + # Divide by 1000 to store as distance_km. + raw_dist = clean_num(t.get("distance")) + dist_km = round(raw_dist / 1000.0, 4) if raw_dist is not None else None + cur.execute(""" + INSERT INTO tracksolid.trips ( + imei, start_time, end_time, distance_km, + avg_speed_kmh, max_speed_kmh, driving_time_s, source + ) VALUES (%s, %s, %s, %s, %s, %s, %s, 'poll') + ON CONFLICT (imei, start_time) DO UPDATE SET + end_time = EXCLUDED.end_time, + distance_km = EXCLUDED.distance_km, + max_speed_kmh = COALESCE(EXCLUDED.max_speed_kmh, tracksolid.trips.max_speed_kmh), + driving_time_s = COALESCE(EXCLUDED.driving_time_s, tracksolid.trips.driving_time_s) + """, ( + t.get("imei"), clean_ts(t.get("startTime")), clean_ts(t.get("endTime")), + dist_km, clean_num(t.get("avgSpeed")), + clean_num(t.get("maxSpeed")), clean_int(t.get("runTimeSecond")) + )) + cur.execute("RELEASE SAVEPOINT sp") + inserted += cur.rowcount + except Exception: + cur.execute("ROLLBACK TO SAVEPOINT sp") + log.warning("Failed to process trip for %s", t.get("imei"), exc_info=True) + + log_ingestion(cur, "jimi.device.track.mileage", len(imeis), 0, inserted, + int((time.time() - t0) * 1000), True) log.info("Trips: %d records processed.", inserted) # ── 4. Parking Events (Every 15m) ───────────────────────────────────────────── @@ -248,47 +284,55 @@ def poll_parking(): start_ts = end_ts - timedelta(hours=1) inserted = 0 - for i in range(0, len(imeis), 50): - batch = imeis[i:i+50] - # [FIX-M13] Added account + acc_type=0 (all stop types). Without these - # the API returns empty results even when parking events exist. - resp = api_post("jimi.open.platform.report.parking", { - "account": TARGET_ACCOUNT, - "imeis": ",".join(batch), - "begin_time": start_ts.strftime("%Y-%m-%d %H:%M:%S"), - "end_time": end_ts.strftime("%Y-%m-%d %H:%M:%S"), - "acc_type": 0, - }, token) + with get_conn() as conn: + with conn.cursor() as cur: + for i in range(0, len(imeis), 50): + batch = imeis[i:i+50] + # [FIX-M13] Added account + acc_type=0 (all stop types). Without these + # the API returns empty results even when parking events exist. + resp = api_post("jimi.open.platform.report.parking", { + "account": TARGET_ACCOUNT, + "imeis": ",".join(batch), + "begin_time": start_ts.strftime("%Y-%m-%d %H:%M:%S"), + "end_time": end_ts.strftime("%Y-%m-%d %H:%M:%S"), + "acc_type": 0, + }, token) - events = resp.get("result") or [] - with get_conn() as conn: - with conn.cursor() as cur: + events = resp.get("result") or [] for p in events: - imei = p.get("imei") - start_time = clean_ts(p.get("startTime")) - if not imei or not start_time: - continue - lat, lng = clean_num(p.get("lat")), clean_num(p.get("lng")) - cur.execute(""" - INSERT INTO tracksolid.parking_events ( - imei, event_type, start_time, end_time, - duration_seconds, geom, address - ) VALUES ( - %s, 'parking', %s, %s, %s, - CASE WHEN %s IS NOT NULL AND %s IS NOT NULL - THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326) - ELSE NULL END, - %s - ) ON CONFLICT (imei, start_time, event_type) DO NOTHING - """, ( - imei, start_time, clean_ts(p.get("endTime")), - clean_int(p.get("durSecond")), # [FIX-M13] API returns durSecond, not seconds - lng, lat, lng, lat, - clean(p.get("address")) - )) - inserted += 1 - log_ingestion(cur, "jimi.open.platform.report.parking", len(batch), 0, inserted, - int((time.time() - t0) * 1000), True) + try: + cur.execute("SAVEPOINT sp") + imei = p.get("imei") + start_time = clean_ts(p.get("startTime")) + if not imei or not start_time: + cur.execute("RELEASE SAVEPOINT sp") + continue + lat, lng = clean_num(p.get("lat")), clean_num(p.get("lng")) + cur.execute(""" + INSERT INTO tracksolid.parking_events ( + imei, event_type, start_time, end_time, + duration_seconds, geom, address + ) VALUES ( + %s, 'parking', %s, %s, %s, + CASE WHEN %s IS NOT NULL AND %s IS NOT NULL + THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326) + ELSE NULL END, + %s + ) ON CONFLICT (imei, start_time, event_type) DO NOTHING + """, ( + imei, start_time, clean_ts(p.get("endTime")), + clean_int(p.get("durSecond")), # [FIX-M13] API returns durSecond, not seconds + lng, lat, lng, lat, + clean(p.get("address")) + )) + cur.execute("RELEASE SAVEPOINT sp") + inserted += cur.rowcount + except Exception: + cur.execute("ROLLBACK TO SAVEPOINT sp") + log.warning("Failed to process parking for %s", p.get("imei"), exc_info=True) + + log_ingestion(cur, "jimi.open.platform.report.parking", len(imeis), 0, inserted, + int((time.time() - t0) * 1000), True) log.info("Parking: %d events processed.", inserted) # ── 5. High-Resolution GPS Trail (Every 30m) — POLL-01 ─────────────────────── @@ -316,58 +360,73 @@ def poll_track_list(): end_ts = datetime.now(timezone.utc) start_ts = end_ts - timedelta(minutes=35) # 5-min overlap avoids boundary gaps + begin_str = start_ts.strftime("%Y-%m-%d %H:%M:%S") + end_str = end_ts.strftime("%Y-%m-%d %H:%M:%S") + + # Phase 1: fetch waypoints from API without holding a DB connection. + # jimi.device.track.list is per-IMEI; parallelise at 4 workers to speed + # up the 30 min sweep without tripping the 1006 rate limit. + def _fetch(imei: str): + resp = api_post("jimi.device.track.list", { + "imei": imei, + "begin_time": begin_str, + "end_time": end_str, + "map_type": "GOOGLE", + }, token) + return imei, resp.get("result") or [] + + with ThreadPoolExecutor(max_workers=4) as pool: + fetched = list(pool.map(_fetch, imeis)) + + # Phase 2: write rows in one DB transaction. total_inserted = 0 devices_with_data = 0 + rows = [] + for imei, waypoints in fetched: + device_rows = 0 + for wp in waypoints: + lat = clean_num(wp.get("lat")) + lng = clean_num(wp.get("lng")) + gps_time = clean_ts(wp.get("gpsTime")) + if not is_valid_fix(lat, lng) or not gps_time: + continue + rows.append(( + imei, gps_time, + lng, lat, # ST_MakePoint(lng, lat) + lat, lng, # lat, lng columns + clean_num(wp.get("gpsSpeed")), + clean_num(wp.get("direction")), + clean(wp.get("accStatus")), + )) + device_rows += 1 + if device_rows: + devices_with_data += 1 - with get_conn() as conn: - with conn.cursor() as cur: - for imei in imeis: - resp = api_post("jimi.device.track.list", { - "imei": imei, - "begin_time": start_ts.strftime("%Y-%m-%d %H:%M:%S"), - "end_time": end_ts.strftime("%Y-%m-%d %H:%M:%S"), - "map_type": "GOOGLE", - }, token) - - waypoints = resp.get("result") or [] - if not waypoints: - continue - - inserted = 0 - for wp in waypoints: - lat = clean_num(wp.get("lat")) - lng = clean_num(wp.get("lng")) - gps_time = clean_ts(wp.get("gpsTime")) - if not is_valid_fix(lat, lng) or not gps_time: - continue - - cur.execute(""" - INSERT INTO tracksolid.position_history ( - imei, gps_time, geom, lat, lng, - speed, direction, acc_status, source - ) VALUES ( - %s, %s, - ST_SetSRID(ST_MakePoint(%s, %s), 4326), - %s, %s, %s, %s, %s, 'track_list' - ) - ON CONFLICT (imei, gps_time) DO NOTHING - """, ( - imei, gps_time, - lng, lat, # ST_MakePoint(lng, lat) - lat, lng, # lat, lng columns - clean_num(wp.get("gpsSpeed")), - clean_num(wp.get("direction")), - clean(wp.get("accStatus")), - )) - inserted += 1 - - if inserted: - total_inserted += inserted - devices_with_data += 1 - - log_ingestion(cur, "jimi.device.track.list", len(imeis), - 0, total_inserted, int((time.time() - t0) * 1000), True) - conn.commit() + if rows: + with get_conn() as conn: + with conn.cursor() as cur: + execute_values( + cur, + """ + INSERT INTO tracksolid.position_history ( + imei, gps_time, geom, lat, lng, + speed, direction, acc_status, source + ) VALUES %s + ON CONFLICT (imei, gps_time) DO NOTHING + """, + rows, + template="(%s, %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326)," + " %s, %s, %s, %s, %s, 'track_list')", + page_size=500, + ) + total_inserted = cur.rowcount + log_ingestion(cur, "jimi.device.track.list", len(imeis), + 0, total_inserted, int((time.time() - t0) * 1000), True) + else: + with get_conn() as conn: + with conn.cursor() as cur: + log_ingestion(cur, "jimi.device.track.list", len(imeis), + 0, 0, int((time.time() - t0) * 1000), True) log.info("Track list: %d waypoints inserted across %d/%d devices.", total_inserted, devices_with_data, len(imeis)) diff --git a/sync_driver_audit.py b/sync_driver_audit.py index 62edc7f..2248346 100644 --- a/sync_driver_audit.py +++ b/sync_driver_audit.py @@ -12,6 +12,8 @@ Or via Coolify terminal with env vars loaded. """ import time +from concurrent.futures import ThreadPoolExecutor + from ts_shared_rev import ( TARGET_ACCOUNT, api_post, @@ -122,6 +124,15 @@ def run_audit(): log.info("Starting full upsert of %d devices...", len(api_devices)) upserted = 0 + # Parallelize the per-device detail lookups (see ingest_movement.sync_devices). + def _fetch_detail(imei: str) -> dict: + detail_resp = api_post("jimi.track.device.detail", {"imei": imei}, token) + return detail_resp.get("result") or {} if detail_resp.get("code") == 0 else {} + + imeis_to_fetch = [d.get("imei") for d in api_devices if d.get("imei")] + with ThreadPoolExecutor(max_workers=8) as pool: + details = dict(zip(imeis_to_fetch, pool.map(_fetch_detail, imeis_to_fetch))) + with get_conn() as conn: with conn.cursor() as cur: for d in api_devices: @@ -129,9 +140,7 @@ def run_audit(): if not imei: continue - # Fetch detailed info for driver phone, SIM, ICCID etc. - detail_resp = api_post("jimi.track.device.detail", {"imei": imei}, token) - dtl = detail_resp.get("result") or {} if detail_resp.get("code") == 0 else {} + dtl = details.get(imei, {}) cur.execute(""" INSERT INTO tracksolid.devices ( diff --git a/tests/integration/test_webhook_endpoints.py b/tests/integration/test_webhook_endpoints.py index 2ca474a..dd87e25 100644 --- a/tests/integration/test_webhook_endpoints.py +++ b/tests/integration/test_webhook_endpoints.py @@ -74,10 +74,13 @@ class TestPushAlarm: data_list = json.dumps([WEBHOOK_ALARM_NULL_TYPE]) response = client.post("/pushalarm", data={"token": "", "data_list": data_list}) assert response.status_code == 200 - # Verify no INSERT was executed (only SAVEPOINT + RELEASE calls) - insert_calls = [c for c in mock_cur.execute.call_args_list - if "INSERT" in str(c)] - assert len(insert_calls) == 0, "NULL alarm_type must not be inserted" + # Verify no data INSERT was executed. log_ingestion always writes one + # row to tracksolid.ingestion_log — exclude it from the assertion. + data_inserts = [ + c for c in mock_cur.execute.call_args_list + if "INSERT" in str(c) and "ingestion_log" not in str(c) + ] + assert len(data_inserts) == 0, "NULL alarm_type must not be inserted" def test_empty_data_list_ok(self, client): response = client.post("/pushalarm", data={"token": "", "data_list": ""}) diff --git a/webhook_receiver_rev.py b/webhook_receiver_rev.py index 1b87ffe..4a59067 100644 --- a/webhook_receiver_rev.py +++ b/webhook_receiver_rev.py @@ -29,6 +29,7 @@ REVISIONS (QA-Verified): from __future__ import annotations +import hmac import json import os import time @@ -36,8 +37,13 @@ from contextlib import asynccontextmanager from datetime import datetime, timezone from typing import Optional +# Cap on items per webhook POST. Prevents a malformed/malicious push from +# monopolising a worker or blowing the DB pool. Jimi normally sends ≤ 200. +MAX_ITEMS_PER_POST = int(os.getenv("WEBHOOK_MAX_ITEMS", "5000")) + from fastapi import FastAPI, Form, HTTPException from fastapi.responses import JSONResponse +from psycopg2.extras import execute_values from ts_shared_rev import ( close_pool, @@ -75,7 +81,7 @@ SUCCESS = {"code": 0, "msg": "success"} def _validate_token(token: str) -> None: """Raise 403 if token is invalid. Skips validation if JIMI_WEBHOOK_TOKEN is empty.""" - if WEBHOOK_TOKEN and token != WEBHOOK_TOKEN: + if WEBHOOK_TOKEN and not hmac.compare_digest(token, WEBHOOK_TOKEN): raise HTTPException(status_code=403, detail="Invalid token") @@ -83,9 +89,12 @@ def _parse_data_list(raw: str) -> list[dict]: """Parse the JSON string from Jimi's data_list form field.""" try: parsed = json.loads(raw) - if isinstance(parsed, list): - return parsed - return [parsed] + items = parsed if isinstance(parsed, list) else [parsed] + if len(items) > MAX_ITEMS_PER_POST: + log.warning("data_list truncated: %d items exceeded cap of %d", + len(items), MAX_ITEMS_PER_POST) + items = items[:MAX_ITEMS_PER_POST] + return items except (json.JSONDecodeError, TypeError): log.warning("Failed to parse data_list: %.200s", raw) return [] @@ -341,52 +350,58 @@ def push_gps(token: str = Form(""), data_list: str = Form("")): return JSONResponse(content=SUCCESS) t0 = time.time() + # Validation phase — pre-clean and filter without touching the DB. + # Per-row INSERT with SAVEPOINT was ~1 ms/row overhead at this volume; + # one batched execute_values is 10-50× faster for the same rows. + rows = [] + for item in items: + imei = clean(item.get("deviceImei")) + gps_time = clean_ts(item.get("gpsTime")) + lat = clean_num(item.get("lat")) + lng = clean_num(item.get("lng")) + if not imei or not gps_time or not is_valid_fix(lat, lng): + continue + rows.append(( + imei, gps_time, lng, lat, lat, lng, + clean_num(item.get("gpsSpeed")), + clean_num(item.get("direction")), + str(item.get("acc")) if item.get("acc") is not None else None, + clean_int(item.get("satelliteNum")), + clean_num(item.get("distance")), + clean_num(item.get("altitude")), + clean_int(item.get("postType")), + )) + inserted = 0 + if rows: + with get_conn() as conn: + with conn.cursor() as cur: + execute_values( + cur, + """ + INSERT INTO tracksolid.position_history ( + imei, gps_time, geom, lat, lng, speed, direction, + acc_status, satellite, current_mileage, + altitude, post_type, source + ) VALUES %s + ON CONFLICT (imei, gps_time) DO NOTHING + """, + rows, + template="(%s, %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326)," + " %s, %s, %s, %s, %s, %s, %s, %s, %s, 'push')", + page_size=len(rows), + ) + inserted = cur.rowcount + log_ingestion(cur, "webhook/pushgps", len(items), 0, inserted, + int((time.time() - t0) * 1000), True) + else: + # No valid rows, still record the call for observability. + with get_conn() as conn: + with conn.cursor() as cur: + log_ingestion(cur, "webhook/pushgps", len(items), 0, 0, + int((time.time() - t0) * 1000), True) - with get_conn() as conn: - with conn.cursor() as cur: - for item in items: - try: - cur.execute("SAVEPOINT sp") - imei = clean(item.get("deviceImei")) - gps_time = clean_ts(item.get("gpsTime")) - lat = clean_num(item.get("lat")) - lng = clean_num(item.get("lng")) - - if not imei or not gps_time or not is_valid_fix(lat, lng): - cur.execute("RELEASE SAVEPOINT sp") - continue - - cur.execute(""" - INSERT INTO tracksolid.position_history ( - imei, gps_time, geom, lat, lng, speed, direction, - acc_status, satellite, current_mileage, - altitude, post_type, source - ) VALUES ( - %s, %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326), - %s, %s, %s, %s, %s, %s, %s, %s, %s, 'push' - ) ON CONFLICT (imei, gps_time) DO NOTHING - """, ( - imei, gps_time, lng, lat, - lat, lng, - clean_num(item.get("gpsSpeed")), - clean_num(item.get("direction")), - str(item.get("acc")) if item.get("acc") is not None else None, - clean_int(item.get("satelliteNum")), - clean_num(item.get("distance")), - clean_num(item.get("altitude")), - clean_int(item.get("postType")), - )) - cur.execute("RELEASE SAVEPOINT sp") - inserted += 1 - except Exception: - cur.execute("ROLLBACK TO SAVEPOINT sp") - log.warning("Failed to process GPS for %s", item.get("deviceImei"), exc_info=True) - - log_ingestion(cur, "webhook/pushgps", len(items), 0, inserted, - int((time.time() - t0) * 1000), True) - - log.info("pushgps: %d/%d items processed.", inserted, len(items)) + log.info("pushgps: %d/%d items inserted.", inserted, len(items)) return JSONResponse(content=SUCCESS) # ── 5. Device Heartbeats (Priority 2) ────────────────────────────────────────