tracksolid_timescale_grafan.../ingest_movement_rev.py

677 lines
34 KiB
Python
Raw Permalink Normal View History

2026-04-07 18:34:40 +00:00
"""
ingest_movement_rev.py Fireside Communications · Tracksolid Movement Pipeline
RESPONSIBILITY: High-frequency telemetry (GPS), trip summaries, and parking.
REVISIONS (QA-Verified):
[FIX-M01] sync_devices: Atomic transaction for registry sync.
[FIX-M03] is_valid_fix: Filters 0.0 coordinates (Zero Island bug).
[FIX-M05] Batching: Groups 50 IMEIs per API call (API Limit Compliance).
[FIX-M07] Signal Handling: Clean DB pool closure on SIGTERM/SIGINT.
[FIX-M08] Atomic Logging: log_ingestion happens within the data transaction.
[FIX-11] Uses shared safe_task/setup_shutdown from ts_shared_rev (DRY).
[FIX-M09] Trips: Captures runTimeSecond and maxSpeed from API.
[FIX-M10] Parking: New poll_parking via jimi.open.platform.report.parking.
[FIX-M11] BUG-02: distance_m was stored in millimetres due to erroneous
* 1000 on an already-metric API value. Removed multiplication;
column renamed to distance_km in migration 04. Both poll_trips
and push_trip_report (webhook) corrected.
[FIX-M12] BUG-02: Renamed distance_m distance_km in all SQL to match
migration 04 schema change.
[FIX-M13] POLL-02: Parking poll was returning 0 rows added missing
acc_type=0 and account params; fixed response field durSecond
(was mapped as 'seconds').
[FIX-M14] POLL-01: New poll_track_list() calls jimi.device.track.list
per device every 30 minutes to capture high-resolution GPS
waypoints between the 60-second fleet sweep snapshots. Writes to
position_history with source='track_list'. Fills gaps in route
reconstruction and enables accurate path drawing on maps.
[FIX-M15] POLL-03: New get_device_locations() utility calls
jimi.device.location.get for up to 50 specific IMEIs on demand.
Used for precision refreshes (alarm enrichment, stale device
recovery) without waiting for the next full fleet sweep.
[FIX-M18] sync_devices: Pull vehicleName / vehicleNumber / driverName /
driverPhone / sim from jimi.track.device.detail first (list
endpoint returns null for these even when populated via
jimi.open.device.update).
[FIX-M19] Multi-account support: the fleet is split across multiple
Tracksolid sub-accounts. sync_devices, poll_live_positions
and poll_parking now iterate every target in TRACKSOLID_TARGETS
and dedupe/scope per-target before writing.
feat(trips): [FIX-M20] enrich tracksolid.trips with coords, route polyline, addresses, plate Polling jimi.device.track.mileage does not return start/end coordinates, fuel, idle, or trip sequence — leaving most trip columns NULL. This change closes those gaps using data we already have in position_history plus a best-effort Nominatim lookup. Migration 09_trips_enrichment.sql adds: • route_geom (LineString), start_address, end_address, vehicle_plate, waypoints_count on tracksolid.trips • GIST indexes on the three geometry columns • view tracksolid.v_trips_enriched exposing daily_seq + trip_date_eat (replaces reliance on the device-supplied trip_seq, which is only populated when /pushtripreport fires) ingest_movement_rev.py::poll_trips now: • extracts idleSecond from the poll response (was previously dropped) • per-trip: SELECTs start fix, end fix, ST_MakeLine route, and waypoint count from position_history within (start_time, end_time) • reverse-geocodes start/end via the new ts_shared_rev.reverse_geocode helper (Nominatim, LRU-cached at ~11m precision, 1 req/sec, never raises) • caches vehicle_plate from a per-cycle plates dict • ON CONFLICT preserves webhook-supplied data when /pushtripreport later delivers native coords/fuel/trip_seq backfill_trips_enrichment.py is a one-shot script (dry-run by default, --apply to commit, --imei / --since flags) that runs the same enrichment against historical NULL rows and COALESCEs only — never overwrites. DWH bronze mirrors and Grafana panels intentionally not touched (frozen on this branch until the schema work lands). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-01 18:30:20 +00:00
[FIX-M20] Trip enrichment: poll_trips now backfills start_geom/end_geom/
route_geom/waypoints_count from position_history at insert
time, extracts idleSecond, reverse-geocodes start/end addresses
(Nominatim), and caches vehicle_plate from devices. Closes the
NULL-column gaps that were inherent to jimi.device.track.mileage
(it does not return coordinates, idle, or trip sequence).
2026-04-07 18:34:40 +00:00
"""
import time
perf+fix: SAVEPOINT-per-item pollers, batched GPS inserts, parallel detail fetch Audit fixes across the ingestion stack: Observability - Move log_ingestion out of batch loops in poll_alarms and poll_parking (was emitting N cumulative log rows per run instead of one). - Add missing log_ingestion + t0 to poll_trips. - Count inserted via cur.rowcount instead of naive +=1 so ON CONFLICT DO NOTHING no longer inflates the metric. Resilience - SAVEPOINT-per-item added to poll_alarms, poll_live_positions, poll_trips, poll_parking so one bad row no longer aborts the batch (webhook handlers already had this; pollers were inconsistent). Performance - /pushgps and poll_track_list now use psycopg2.extras.execute_values with ON CONFLICT DO NOTHING — 10-50x write throughput on larger batches. - sync_devices and sync_driver_audit fetch jimi.track.device.detail concurrently via ThreadPoolExecutor(max_workers=8), cutting the daily registry sync from ~24s to ~3s for an 80-device fleet. - poll_track_list split into two phases: parallel API fetch (4 workers, no DB connection held) then one batched write. Previously the DB connection was held across every per-IMEI HTTP call, risking pool starvation. Security - _validate_token uses hmac.compare_digest for constant-time token comparison (closes timing side-channel). - _parse_data_list caps incoming items at WEBHOOK_MAX_ITEMS (default 5000) so a pathological push cannot blow memory. Tests - Fix test_null_alarm_type_skipped: its INSERT-count assertion was catching the ingestion_log insert written by log_ingestion. Filter that out so the test checks only data-table inserts. - Full suite: 66 passed. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 21:33:55 +00:00
from concurrent.futures import ThreadPoolExecutor
2026-04-07 18:34:40 +00:00
import schedule
from datetime import datetime, timezone, timedelta
perf+fix: SAVEPOINT-per-item pollers, batched GPS inserts, parallel detail fetch Audit fixes across the ingestion stack: Observability - Move log_ingestion out of batch loops in poll_alarms and poll_parking (was emitting N cumulative log rows per run instead of one). - Add missing log_ingestion + t0 to poll_trips. - Count inserted via cur.rowcount instead of naive +=1 so ON CONFLICT DO NOTHING no longer inflates the metric. Resilience - SAVEPOINT-per-item added to poll_alarms, poll_live_positions, poll_trips, poll_parking so one bad row no longer aborts the batch (webhook handlers already had this; pollers were inconsistent). Performance - /pushgps and poll_track_list now use psycopg2.extras.execute_values with ON CONFLICT DO NOTHING — 10-50x write throughput on larger batches. - sync_devices and sync_driver_audit fetch jimi.track.device.detail concurrently via ThreadPoolExecutor(max_workers=8), cutting the daily registry sync from ~24s to ~3s for an 80-device fleet. - poll_track_list split into two phases: parallel API fetch (4 workers, no DB connection held) then one batched write. Previously the DB connection was held across every per-IMEI HTTP call, risking pool starvation. Security - _validate_token uses hmac.compare_digest for constant-time token comparison (closes timing side-channel). - _parse_data_list caps incoming items at WEBHOOK_MAX_ITEMS (default 5000) so a pathological push cannot blow memory. Tests - Fix test_null_alarm_type_skipped: its INSERT-count assertion was catching the ingestion_log insert written by log_ingestion. Filter that out so the test checks only data-table inserts. - Full suite: 66 passed. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 21:33:55 +00:00
from psycopg2.extras import execute_values
2026-04-07 18:34:40 +00:00
from ts_shared_rev import (
TARGET_ACCOUNT,
TARGETS,
2026-04-07 18:34:40 +00:00
api_post,
get_active_imeis,
get_active_imeis_by_target,
2026-04-07 18:34:40 +00:00
get_conn,
get_token,
is_valid_fix,
log_ingestion,
clean,
clean_num,
clean_int,
clean_ts,
get_logger,
feat(trips): [FIX-M20] enrich tracksolid.trips with coords, route polyline, addresses, plate Polling jimi.device.track.mileage does not return start/end coordinates, fuel, idle, or trip sequence — leaving most trip columns NULL. This change closes those gaps using data we already have in position_history plus a best-effort Nominatim lookup. Migration 09_trips_enrichment.sql adds: • route_geom (LineString), start_address, end_address, vehicle_plate, waypoints_count on tracksolid.trips • GIST indexes on the three geometry columns • view tracksolid.v_trips_enriched exposing daily_seq + trip_date_eat (replaces reliance on the device-supplied trip_seq, which is only populated when /pushtripreport fires) ingest_movement_rev.py::poll_trips now: • extracts idleSecond from the poll response (was previously dropped) • per-trip: SELECTs start fix, end fix, ST_MakeLine route, and waypoint count from position_history within (start_time, end_time) • reverse-geocodes start/end via the new ts_shared_rev.reverse_geocode helper (Nominatim, LRU-cached at ~11m precision, 1 req/sec, never raises) • caches vehicle_plate from a per-cycle plates dict • ON CONFLICT preserves webhook-supplied data when /pushtripreport later delivers native coords/fuel/trip_seq backfill_trips_enrichment.py is a one-shot script (dry-run by default, --apply to commit, --imei / --since flags) that runs the same enrichment against historical NULL rows and COALESCEs only — never overwrites. DWH bronze mirrors and Grafana panels intentionally not touched (frozen on this branch until the schema work lands). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-01 18:30:20 +00:00
reverse_geocode,
safe_task,
setup_shutdown,
2026-04-07 18:34:40 +00:00
)
log = get_logger("movement")
setup_shutdown(log)
2026-04-07 18:34:40 +00:00
# ── 1. Device Registry Sync (Daily) ──────────────────────────────────────────
def sync_devices():
log.info("Syncing device registry across %d target(s): %s", len(TARGETS), TARGETS)
2026-04-07 18:34:40 +00:00
t0, token = time.time(), get_token()
if not token: return
# [FIX-M19] Fleet is split across multiple sub-accounts. Aggregate the
# device list from every configured target and dedupe by IMEI.
devices_by_imei: dict[str, dict] = {}
for target in TARGETS:
resp = api_post("jimi.user.device.list", {"target": target}, token)
if resp.get("code") != 0:
log.warning("device.list failed for target=%s: code=%s msg=%s",
target, resp.get("code"), resp.get("message"))
continue
for d in (resp.get("result") or []):
imei = d.get("imei")
if imei:
devices_by_imei[imei] = d
devices = list(devices_by_imei.values())
log.info("Aggregated %d unique devices across targets.", len(devices))
2026-04-07 18:34:40 +00:00
upserted = 0
perf+fix: SAVEPOINT-per-item pollers, batched GPS inserts, parallel detail fetch Audit fixes across the ingestion stack: Observability - Move log_ingestion out of batch loops in poll_alarms and poll_parking (was emitting N cumulative log rows per run instead of one). - Add missing log_ingestion + t0 to poll_trips. - Count inserted via cur.rowcount instead of naive +=1 so ON CONFLICT DO NOTHING no longer inflates the metric. Resilience - SAVEPOINT-per-item added to poll_alarms, poll_live_positions, poll_trips, poll_parking so one bad row no longer aborts the batch (webhook handlers already had this; pollers were inconsistent). Performance - /pushgps and poll_track_list now use psycopg2.extras.execute_values with ON CONFLICT DO NOTHING — 10-50x write throughput on larger batches. - sync_devices and sync_driver_audit fetch jimi.track.device.detail concurrently via ThreadPoolExecutor(max_workers=8), cutting the daily registry sync from ~24s to ~3s for an 80-device fleet. - poll_track_list split into two phases: parallel API fetch (4 workers, no DB connection held) then one batched write. Previously the DB connection was held across every per-IMEI HTTP call, risking pool starvation. Security - _validate_token uses hmac.compare_digest for constant-time token comparison (closes timing side-channel). - _parse_data_list caps incoming items at WEBHOOK_MAX_ITEMS (default 5000) so a pathological push cannot blow memory. Tests - Fix test_null_alarm_type_skipped: its INSERT-count assertion was catching the ingestion_log insert written by log_ingestion. Filter that out so the test checks only data-table inserts. - Full suite: 66 passed. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 21:33:55 +00:00
# Fetch per-device detail in parallel — previously an N+1 blocker where
# 80 devices × ~300 ms/call ≈ 24 s serial. 8 workers brings it to ~3 s.
# Gated at 8 to stay under API rate-limit (1006) headroom.
def _fetch_detail(imei: str) -> dict:
detail_resp = api_post("jimi.track.device.detail", {"imei": imei}, token)
return detail_resp.get("result") or {} if detail_resp.get("code") == 0 else {}
imeis = [d.get("imei") for d in devices if d.get("imei")]
with ThreadPoolExecutor(max_workers=8) as pool:
details = dict(zip(imeis, pool.map(_fetch_detail, imeis)))
2026-04-07 18:34:40 +00:00
with get_conn() as conn:
with conn.cursor() as cur:
for d in devices:
imei = d.get("imei")
if not imei: continue
perf+fix: SAVEPOINT-per-item pollers, batched GPS inserts, parallel detail fetch Audit fixes across the ingestion stack: Observability - Move log_ingestion out of batch loops in poll_alarms and poll_parking (was emitting N cumulative log rows per run instead of one). - Add missing log_ingestion + t0 to poll_trips. - Count inserted via cur.rowcount instead of naive +=1 so ON CONFLICT DO NOTHING no longer inflates the metric. Resilience - SAVEPOINT-per-item added to poll_alarms, poll_live_positions, poll_trips, poll_parking so one bad row no longer aborts the batch (webhook handlers already had this; pollers were inconsistent). Performance - /pushgps and poll_track_list now use psycopg2.extras.execute_values with ON CONFLICT DO NOTHING — 10-50x write throughput on larger batches. - sync_devices and sync_driver_audit fetch jimi.track.device.detail concurrently via ThreadPoolExecutor(max_workers=8), cutting the daily registry sync from ~24s to ~3s for an 80-device fleet. - poll_track_list split into two phases: parallel API fetch (4 workers, no DB connection held) then one batched write. Previously the DB connection was held across every per-IMEI HTTP call, risking pool starvation. Security - _validate_token uses hmac.compare_digest for constant-time token comparison (closes timing side-channel). - _parse_data_list caps incoming items at WEBHOOK_MAX_ITEMS (default 5000) so a pathological push cannot blow memory. Tests - Fix test_null_alarm_type_skipped: its INSERT-count assertion was catching the ingestion_log insert written by log_ingestion. Filter that out so the test checks only data-table inserts. - Full suite: 66 passed. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 21:33:55 +00:00
dtl = details.get(imei, {})
2026-04-07 18:34:40 +00:00
cur.execute("""
INSERT INTO tracksolid.devices (
imei, device_name, mc_type, mc_type_use_scope,
vehicle_name, vehicle_number, vehicle_models, vehicle_icon,
vin, engine_number, vehicle_brand, fuel_100km,
driver_name, driver_phone, sim, iccid, imsi,
account, customer_name, device_group_id, device_group,
activation_time, expiration, enabled_flag, status,
current_mileage_km, last_synced_at
) VALUES (
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW()
)
ON CONFLICT (imei) DO UPDATE SET
device_name = EXCLUDED.device_name,
mc_type = EXCLUDED.mc_type,
mc_type_use_scope = EXCLUDED.mc_type_use_scope,
vehicle_name = EXCLUDED.vehicle_name,
vehicle_number = EXCLUDED.vehicle_number,
vehicle_models = EXCLUDED.vehicle_models,
vehicle_icon = EXCLUDED.vehicle_icon,
vin = EXCLUDED.vin,
engine_number = EXCLUDED.engine_number,
vehicle_brand = EXCLUDED.vehicle_brand,
fuel_100km = EXCLUDED.fuel_100km,
driver_name = EXCLUDED.driver_name,
driver_phone = EXCLUDED.driver_phone,
sim = EXCLUDED.sim,
iccid = EXCLUDED.iccid,
imsi = EXCLUDED.imsi,
account = EXCLUDED.account,
customer_name = EXCLUDED.customer_name,
device_group_id = EXCLUDED.device_group_id,
device_group = EXCLUDED.device_group,
activation_time = EXCLUDED.activation_time,
expiration = EXCLUDED.expiration,
enabled_flag = EXCLUDED.enabled_flag,
status = EXCLUDED.status,
2026-04-07 18:34:40 +00:00
current_mileage_km = EXCLUDED.current_mileage_km,
last_synced_at = NOW(),
updated_at = NOW()
2026-04-07 18:34:40 +00:00
""", (
# [FIX-M18] vehicleName/vehicleNumber/driverName/driverPhone/sim
# only surface via jimi.track.device.detail — list returns null.
2026-04-07 18:34:40 +00:00
imei, clean(d.get("deviceName")), clean(d.get("mcType")), clean(d.get("mcTypeUseScope")),
clean(dtl.get("vehicleName") or d.get("vehicleName")),
clean(dtl.get("vehicleNumber") or d.get("vehicleNumber")),
clean(d.get("vehicleModels")), clean(d.get("vehicleIcon")),
2026-04-07 18:34:40 +00:00
clean(dtl.get("vin")), clean(dtl.get("engineNumber")), clean(dtl.get("vehicleBrand")), clean_num(dtl.get("fuel_100km")),
clean(dtl.get("driverName") or d.get("driverName")),
clean(dtl.get("driverPhone") or d.get("driverPhone")),
clean(dtl.get("sim") or d.get("sim")),
clean(dtl.get("iccid")), clean(dtl.get("imsi")),
2026-04-07 18:34:40 +00:00
clean(dtl.get("account")), clean(dtl.get("customerName")), clean(d.get("deviceGroupId")), clean(d.get("deviceGroup")),
clean_ts(d.get("activationTime")), clean_ts(d.get("expiration")), clean_int(d.get("enabledFlag", 1)),
clean(dtl.get("status", "active")), clean_num(dtl.get("currentMileage"))
))
upserted += 1
log_ingestion(cur, "jimi.user.device.list+detail", len(devices), upserted, 0, int((time.time()-t0)*1000), True)
conn.commit()
log.info("Registry sync: %d devices updated.", upserted)
# ── 2. Live Positions (Every 60s) ─────────────────────────────────────────────
def poll_live_positions():
t0, token = time.time(), get_token()
if not token: return
# [FIX-M19] Iterate every target and dedupe by IMEI (keep last).
positions_by_imei: dict[str, dict] = {}
for target in TARGETS:
resp = api_post("jimi.user.device.location.list",
{"target": target, "map_type": "GOOGLE"}, token)
if resp.get("code") != 0:
log.warning("location.list failed for target=%s: code=%s msg=%s",
target, resp.get("code"), resp.get("message"))
continue
for p in (resp.get("result") or []):
imei = p.get("imei")
if imei:
positions_by_imei[imei] = p
positions = list(positions_by_imei.values())
2026-04-07 18:34:40 +00:00
upserted, inserted = 0, 0
with get_conn() as conn:
with conn.cursor() as cur:
for p in positions:
perf+fix: SAVEPOINT-per-item pollers, batched GPS inserts, parallel detail fetch Audit fixes across the ingestion stack: Observability - Move log_ingestion out of batch loops in poll_alarms and poll_parking (was emitting N cumulative log rows per run instead of one). - Add missing log_ingestion + t0 to poll_trips. - Count inserted via cur.rowcount instead of naive +=1 so ON CONFLICT DO NOTHING no longer inflates the metric. Resilience - SAVEPOINT-per-item added to poll_alarms, poll_live_positions, poll_trips, poll_parking so one bad row no longer aborts the batch (webhook handlers already had this; pollers were inconsistent). Performance - /pushgps and poll_track_list now use psycopg2.extras.execute_values with ON CONFLICT DO NOTHING — 10-50x write throughput on larger batches. - sync_devices and sync_driver_audit fetch jimi.track.device.detail concurrently via ThreadPoolExecutor(max_workers=8), cutting the daily registry sync from ~24s to ~3s for an 80-device fleet. - poll_track_list split into two phases: parallel API fetch (4 workers, no DB connection held) then one batched write. Previously the DB connection was held across every per-IMEI HTTP call, risking pool starvation. Security - _validate_token uses hmac.compare_digest for constant-time token comparison (closes timing side-channel). - _parse_data_list caps incoming items at WEBHOOK_MAX_ITEMS (default 5000) so a pathological push cannot blow memory. Tests - Fix test_null_alarm_type_skipped: its INSERT-count assertion was catching the ingestion_log insert written by log_ingestion. Filter that out so the test checks only data-table inserts. - Full suite: 66 passed. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 21:33:55 +00:00
try:
cur.execute("SAVEPOINT sp")
imei, lat, lng = p.get("imei"), clean_num(p.get("lat")), clean_num(p.get("lng"))
if not imei or not is_valid_fix(lat, lng):
cur.execute("RELEASE SAVEPOINT sp")
continue
2026-04-07 18:34:40 +00:00
perf+fix: SAVEPOINT-per-item pollers, batched GPS inserts, parallel detail fetch Audit fixes across the ingestion stack: Observability - Move log_ingestion out of batch loops in poll_alarms and poll_parking (was emitting N cumulative log rows per run instead of one). - Add missing log_ingestion + t0 to poll_trips. - Count inserted via cur.rowcount instead of naive +=1 so ON CONFLICT DO NOTHING no longer inflates the metric. Resilience - SAVEPOINT-per-item added to poll_alarms, poll_live_positions, poll_trips, poll_parking so one bad row no longer aborts the batch (webhook handlers already had this; pollers were inconsistent). Performance - /pushgps and poll_track_list now use psycopg2.extras.execute_values with ON CONFLICT DO NOTHING — 10-50x write throughput on larger batches. - sync_devices and sync_driver_audit fetch jimi.track.device.detail concurrently via ThreadPoolExecutor(max_workers=8), cutting the daily registry sync from ~24s to ~3s for an 80-device fleet. - poll_track_list split into two phases: parallel API fetch (4 workers, no DB connection held) then one batched write. Previously the DB connection was held across every per-IMEI HTTP call, risking pool starvation. Security - _validate_token uses hmac.compare_digest for constant-time token comparison (closes timing side-channel). - _parse_data_list caps incoming items at WEBHOOK_MAX_ITEMS (default 5000) so a pathological push cannot blow memory. Tests - Fix test_null_alarm_type_skipped: its INSERT-count assertion was catching the ingestion_log insert written by log_ingestion. Filter that out so the test checks only data-table inserts. - Full suite: 66 passed. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 21:33:55 +00:00
gps_time = clean_ts(p.get("gpsTime"))
speed = clean_num(p.get("speed"))
direction = clean_num(p.get("direction"))
acc_status = clean(p.get("accStatus"))
gps_num = clean_int(p.get("gpsNum"))
current_mileage = clean_num(p.get("currentMileage"))
2026-04-07 18:34:40 +00:00
cur.execute("""
perf+fix: SAVEPOINT-per-item pollers, batched GPS inserts, parallel detail fetch Audit fixes across the ingestion stack: Observability - Move log_ingestion out of batch loops in poll_alarms and poll_parking (was emitting N cumulative log rows per run instead of one). - Add missing log_ingestion + t0 to poll_trips. - Count inserted via cur.rowcount instead of naive +=1 so ON CONFLICT DO NOTHING no longer inflates the metric. Resilience - SAVEPOINT-per-item added to poll_alarms, poll_live_positions, poll_trips, poll_parking so one bad row no longer aborts the batch (webhook handlers already had this; pollers were inconsistent). Performance - /pushgps and poll_track_list now use psycopg2.extras.execute_values with ON CONFLICT DO NOTHING — 10-50x write throughput on larger batches. - sync_devices and sync_driver_audit fetch jimi.track.device.detail concurrently via ThreadPoolExecutor(max_workers=8), cutting the daily registry sync from ~24s to ~3s for an 80-device fleet. - poll_track_list split into two phases: parallel API fetch (4 workers, no DB connection held) then one batched write. Previously the DB connection was held across every per-IMEI HTTP call, risking pool starvation. Security - _validate_token uses hmac.compare_digest for constant-time token comparison (closes timing side-channel). - _parse_data_list caps incoming items at WEBHOOK_MAX_ITEMS (default 5000) so a pathological push cannot blow memory. Tests - Fix test_null_alarm_type_skipped: its INSERT-count assertion was catching the ingestion_log insert written by log_ingestion. Filter that out so the test checks only data-table inserts. - Full suite: 66 passed. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 21:33:55 +00:00
INSERT INTO tracksolid.live_positions (
imei, geom, lat, lng, pos_type, confidence, gps_time, hb_time,
speed, direction, acc_status, gps_signal, gps_num,
elec_quantity, power_value, battery_power_val, tracker_oil,
temperature, current_mileage, device_status, loc_desc, recorded_at
) VALUES (
%s, ST_SetSRID(ST_MakePoint(%s, %s), 4326), %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW()
)
ON CONFLICT (imei) DO UPDATE SET
geom=EXCLUDED.geom, lat=EXCLUDED.lat, lng=EXCLUDED.lng,
gps_time=EXCLUDED.gps_time, speed=EXCLUDED.speed, direction=EXCLUDED.direction,
acc_status=EXCLUDED.acc_status, current_mileage=EXCLUDED.current_mileage,
updated_at=NOW()
""", (
imei, lng, lat, lat, lng, clean(p.get("posType")), clean_int(p.get("confidence")),
gps_time, clean_ts(p.get("hbTime")), speed,
direction, acc_status, clean_int(p.get("gpsSignal")),
gps_num, clean_num(p.get("electQuantity")), clean_num(p.get("powerValue")),
clean_num(p.get("batteryPowerVal")), clean(p.get("trackerOil")), clean_num(p.get("temperature")),
current_mileage, clean(p.get("status")), clean(p.get("locDesc"))
))
upserted += cur.rowcount
# History (Hypertable Source)
if gps_time:
cur.execute("""
INSERT INTO tracksolid.position_history (imei, gps_time, geom, lat, lng, speed, direction, acc_status, satellite, current_mileage)
VALUES (%s, %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326), %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (imei, gps_time) DO NOTHING
""", (imei, gps_time, lng, lat, lat, lng, speed, direction, acc_status, gps_num, current_mileage))
inserted += cur.rowcount
cur.execute("RELEASE SAVEPOINT sp")
except Exception:
cur.execute("ROLLBACK TO SAVEPOINT sp")
log.warning("Failed to process live position for %s", p.get("imei"), exc_info=True)
2026-04-07 18:34:40 +00:00
log_ingestion(cur, "jimi.user.device.location.list", len(positions), upserted, inserted, int((time.time()-t0)*1000), True)
# ── 3. Trip Reports (Every 15m) ───────────────────────────────────────────────
feat(trips): [FIX-M20] enrich tracksolid.trips with coords, route polyline, addresses, plate Polling jimi.device.track.mileage does not return start/end coordinates, fuel, idle, or trip sequence — leaving most trip columns NULL. This change closes those gaps using data we already have in position_history plus a best-effort Nominatim lookup. Migration 09_trips_enrichment.sql adds: • route_geom (LineString), start_address, end_address, vehicle_plate, waypoints_count on tracksolid.trips • GIST indexes on the three geometry columns • view tracksolid.v_trips_enriched exposing daily_seq + trip_date_eat (replaces reliance on the device-supplied trip_seq, which is only populated when /pushtripreport fires) ingest_movement_rev.py::poll_trips now: • extracts idleSecond from the poll response (was previously dropped) • per-trip: SELECTs start fix, end fix, ST_MakeLine route, and waypoint count from position_history within (start_time, end_time) • reverse-geocodes start/end via the new ts_shared_rev.reverse_geocode helper (Nominatim, LRU-cached at ~11m precision, 1 req/sec, never raises) • caches vehicle_plate from a per-cycle plates dict • ON CONFLICT preserves webhook-supplied data when /pushtripreport later delivers native coords/fuel/trip_seq backfill_trips_enrichment.py is a one-shot script (dry-run by default, --apply to commit, --imei / --since flags) that runs the same enrichment against historical NULL rows and COALESCEs only — never overwrites. DWH bronze mirrors and Grafana panels intentionally not touched (frozen on this branch until the schema work lands). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-01 18:30:20 +00:00
# [FIX-M20] Migration 09 added route_geom, start/end_address, vehicle_plate,
# waypoints_count to tracksolid.trips. poll_trips now enriches every poll-
# ingested trip from position_history (start/end fix + LineString polyline)
# and reverse-geocodes the endpoints, since jimi.device.track.mileage does
# not return coordinates. ON CONFLICT preserves webhook-supplied data when
# /pushtripreport later delivers native coords.
# Per-trip enrichment from position_history. Four readable scalar subqueries
# rather than a tighter CTE — runs sub-ms each given the (imei, gps_time) PK,
# and the readable form survives the edge case where start_time is just
# before the first available fix in the window (single CTE bounded by
# BETWEEN would return NULL there).
_ENRICH_QUERY = """
SELECT
(SELECT geom FROM tracksolid.position_history
WHERE imei = %s AND gps_time >= %s
ORDER BY gps_time ASC LIMIT 1) AS start_geom,
(SELECT ST_Y(geom) FROM tracksolid.position_history
WHERE imei = %s AND gps_time >= %s
ORDER BY gps_time ASC LIMIT 1) AS start_lat,
(SELECT ST_X(geom) FROM tracksolid.position_history
WHERE imei = %s AND gps_time >= %s
ORDER BY gps_time ASC LIMIT 1) AS start_lng,
(SELECT geom FROM tracksolid.position_history
WHERE imei = %s AND gps_time <= %s
ORDER BY gps_time DESC LIMIT 1) AS end_geom,
(SELECT ST_Y(geom) FROM tracksolid.position_history
WHERE imei = %s AND gps_time <= %s
ORDER BY gps_time DESC LIMIT 1) AS end_lat,
(SELECT ST_X(geom) FROM tracksolid.position_history
WHERE imei = %s AND gps_time <= %s
ORDER BY gps_time DESC LIMIT 1) AS end_lng,
(SELECT ST_MakeLine(geom ORDER BY gps_time)
FROM tracksolid.position_history
WHERE imei = %s AND gps_time BETWEEN %s AND %s
AND geom IS NOT NULL) AS route_geom,
(SELECT COUNT(*) FROM tracksolid.position_history
WHERE imei = %s AND gps_time BETWEEN %s AND %s) AS waypoints_count
"""
def _load_plates_cache(cur) -> dict[str, str]:
"""Build {imei: vehicle_number} for active devices once per poll cycle."""
cur.execute("""
SELECT imei, vehicle_number
FROM tracksolid.devices
WHERE enabled_flag = 1 AND vehicle_number IS NOT NULL
""")
return {imei: plate for imei, plate in cur.fetchall()}
2026-04-07 18:34:40 +00:00
def poll_trips():
perf+fix: SAVEPOINT-per-item pollers, batched GPS inserts, parallel detail fetch Audit fixes across the ingestion stack: Observability - Move log_ingestion out of batch loops in poll_alarms and poll_parking (was emitting N cumulative log rows per run instead of one). - Add missing log_ingestion + t0 to poll_trips. - Count inserted via cur.rowcount instead of naive +=1 so ON CONFLICT DO NOTHING no longer inflates the metric. Resilience - SAVEPOINT-per-item added to poll_alarms, poll_live_positions, poll_trips, poll_parking so one bad row no longer aborts the batch (webhook handlers already had this; pollers were inconsistent). Performance - /pushgps and poll_track_list now use psycopg2.extras.execute_values with ON CONFLICT DO NOTHING — 10-50x write throughput on larger batches. - sync_devices and sync_driver_audit fetch jimi.track.device.detail concurrently via ThreadPoolExecutor(max_workers=8), cutting the daily registry sync from ~24s to ~3s for an 80-device fleet. - poll_track_list split into two phases: parallel API fetch (4 workers, no DB connection held) then one batched write. Previously the DB connection was held across every per-IMEI HTTP call, risking pool starvation. Security - _validate_token uses hmac.compare_digest for constant-time token comparison (closes timing side-channel). - _parse_data_list caps incoming items at WEBHOOK_MAX_ITEMS (default 5000) so a pathological push cannot blow memory. Tests - Fix test_null_alarm_type_skipped: its INSERT-count assertion was catching the ingestion_log insert written by log_ingestion. Filter that out so the test checks only data-table inserts. - Full suite: 66 passed. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 21:33:55 +00:00
t0 = time.time()
2026-04-07 18:34:40 +00:00
token, imeis = get_token(), get_active_imeis()
if not token or not imeis: return
end_ts = datetime.now(timezone.utc)
start_ts = end_ts - timedelta(hours=1)
inserted = 0
perf+fix: SAVEPOINT-per-item pollers, batched GPS inserts, parallel detail fetch Audit fixes across the ingestion stack: Observability - Move log_ingestion out of batch loops in poll_alarms and poll_parking (was emitting N cumulative log rows per run instead of one). - Add missing log_ingestion + t0 to poll_trips. - Count inserted via cur.rowcount instead of naive +=1 so ON CONFLICT DO NOTHING no longer inflates the metric. Resilience - SAVEPOINT-per-item added to poll_alarms, poll_live_positions, poll_trips, poll_parking so one bad row no longer aborts the batch (webhook handlers already had this; pollers were inconsistent). Performance - /pushgps and poll_track_list now use psycopg2.extras.execute_values with ON CONFLICT DO NOTHING — 10-50x write throughput on larger batches. - sync_devices and sync_driver_audit fetch jimi.track.device.detail concurrently via ThreadPoolExecutor(max_workers=8), cutting the daily registry sync from ~24s to ~3s for an 80-device fleet. - poll_track_list split into two phases: parallel API fetch (4 workers, no DB connection held) then one batched write. Previously the DB connection was held across every per-IMEI HTTP call, risking pool starvation. Security - _validate_token uses hmac.compare_digest for constant-time token comparison (closes timing side-channel). - _parse_data_list caps incoming items at WEBHOOK_MAX_ITEMS (default 5000) so a pathological push cannot blow memory. Tests - Fix test_null_alarm_type_skipped: its INSERT-count assertion was catching the ingestion_log insert written by log_ingestion. Filter that out so the test checks only data-table inserts. - Full suite: 66 passed. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 21:33:55 +00:00
with get_conn() as conn:
with conn.cursor() as cur:
feat(trips): [FIX-M20] enrich tracksolid.trips with coords, route polyline, addresses, plate Polling jimi.device.track.mileage does not return start/end coordinates, fuel, idle, or trip sequence — leaving most trip columns NULL. This change closes those gaps using data we already have in position_history plus a best-effort Nominatim lookup. Migration 09_trips_enrichment.sql adds: • route_geom (LineString), start_address, end_address, vehicle_plate, waypoints_count on tracksolid.trips • GIST indexes on the three geometry columns • view tracksolid.v_trips_enriched exposing daily_seq + trip_date_eat (replaces reliance on the device-supplied trip_seq, which is only populated when /pushtripreport fires) ingest_movement_rev.py::poll_trips now: • extracts idleSecond from the poll response (was previously dropped) • per-trip: SELECTs start fix, end fix, ST_MakeLine route, and waypoint count from position_history within (start_time, end_time) • reverse-geocodes start/end via the new ts_shared_rev.reverse_geocode helper (Nominatim, LRU-cached at ~11m precision, 1 req/sec, never raises) • caches vehicle_plate from a per-cycle plates dict • ON CONFLICT preserves webhook-supplied data when /pushtripreport later delivers native coords/fuel/trip_seq backfill_trips_enrichment.py is a one-shot script (dry-run by default, --apply to commit, --imei / --since flags) that runs the same enrichment against historical NULL rows and COALESCEs only — never overwrites. DWH bronze mirrors and Grafana panels intentionally not touched (frozen on this branch until the schema work lands). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-01 18:30:20 +00:00
plates = _load_plates_cache(cur)
perf+fix: SAVEPOINT-per-item pollers, batched GPS inserts, parallel detail fetch Audit fixes across the ingestion stack: Observability - Move log_ingestion out of batch loops in poll_alarms and poll_parking (was emitting N cumulative log rows per run instead of one). - Add missing log_ingestion + t0 to poll_trips. - Count inserted via cur.rowcount instead of naive +=1 so ON CONFLICT DO NOTHING no longer inflates the metric. Resilience - SAVEPOINT-per-item added to poll_alarms, poll_live_positions, poll_trips, poll_parking so one bad row no longer aborts the batch (webhook handlers already had this; pollers were inconsistent). Performance - /pushgps and poll_track_list now use psycopg2.extras.execute_values with ON CONFLICT DO NOTHING — 10-50x write throughput on larger batches. - sync_devices and sync_driver_audit fetch jimi.track.device.detail concurrently via ThreadPoolExecutor(max_workers=8), cutting the daily registry sync from ~24s to ~3s for an 80-device fleet. - poll_track_list split into two phases: parallel API fetch (4 workers, no DB connection held) then one batched write. Previously the DB connection was held across every per-IMEI HTTP call, risking pool starvation. Security - _validate_token uses hmac.compare_digest for constant-time token comparison (closes timing side-channel). - _parse_data_list caps incoming items at WEBHOOK_MAX_ITEMS (default 5000) so a pathological push cannot blow memory. Tests - Fix test_null_alarm_type_skipped: its INSERT-count assertion was catching the ingestion_log insert written by log_ingestion. Filter that out so the test checks only data-table inserts. - Full suite: 66 passed. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 21:33:55 +00:00
for i in range(0, len(imeis), 50):
batch = imeis[i:i+50]
resp = api_post("jimi.device.track.mileage", {
"imeis": ",".join(batch),
"begin_time": start_ts.strftime("%Y-%m-%d %H:%M:%S"),
"end_time": end_ts.strftime("%Y-%m-%d %H:%M:%S")
}, token)
2026-04-07 18:34:40 +00:00
perf+fix: SAVEPOINT-per-item pollers, batched GPS inserts, parallel detail fetch Audit fixes across the ingestion stack: Observability - Move log_ingestion out of batch loops in poll_alarms and poll_parking (was emitting N cumulative log rows per run instead of one). - Add missing log_ingestion + t0 to poll_trips. - Count inserted via cur.rowcount instead of naive +=1 so ON CONFLICT DO NOTHING no longer inflates the metric. Resilience - SAVEPOINT-per-item added to poll_alarms, poll_live_positions, poll_trips, poll_parking so one bad row no longer aborts the batch (webhook handlers already had this; pollers were inconsistent). Performance - /pushgps and poll_track_list now use psycopg2.extras.execute_values with ON CONFLICT DO NOTHING — 10-50x write throughput on larger batches. - sync_devices and sync_driver_audit fetch jimi.track.device.detail concurrently via ThreadPoolExecutor(max_workers=8), cutting the daily registry sync from ~24s to ~3s for an 80-device fleet. - poll_track_list split into two phases: parallel API fetch (4 workers, no DB connection held) then one batched write. Previously the DB connection was held across every per-IMEI HTTP call, risking pool starvation. Security - _validate_token uses hmac.compare_digest for constant-time token comparison (closes timing side-channel). - _parse_data_list caps incoming items at WEBHOOK_MAX_ITEMS (default 5000) so a pathological push cannot blow memory. Tests - Fix test_null_alarm_type_skipped: its INSERT-count assertion was catching the ingestion_log insert written by log_ingestion. Filter that out so the test checks only data-table inserts. - Full suite: 66 passed. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 21:33:55 +00:00
trips = resp.get("result") or []
2026-04-07 18:34:40 +00:00
for t in trips:
perf+fix: SAVEPOINT-per-item pollers, batched GPS inserts, parallel detail fetch Audit fixes across the ingestion stack: Observability - Move log_ingestion out of batch loops in poll_alarms and poll_parking (was emitting N cumulative log rows per run instead of one). - Add missing log_ingestion + t0 to poll_trips. - Count inserted via cur.rowcount instead of naive +=1 so ON CONFLICT DO NOTHING no longer inflates the metric. Resilience - SAVEPOINT-per-item added to poll_alarms, poll_live_positions, poll_trips, poll_parking so one bad row no longer aborts the batch (webhook handlers already had this; pollers were inconsistent). Performance - /pushgps and poll_track_list now use psycopg2.extras.execute_values with ON CONFLICT DO NOTHING — 10-50x write throughput on larger batches. - sync_devices and sync_driver_audit fetch jimi.track.device.detail concurrently via ThreadPoolExecutor(max_workers=8), cutting the daily registry sync from ~24s to ~3s for an 80-device fleet. - poll_track_list split into two phases: parallel API fetch (4 workers, no DB connection held) then one batched write. Previously the DB connection was held across every per-IMEI HTTP call, risking pool starvation. Security - _validate_token uses hmac.compare_digest for constant-time token comparison (closes timing side-channel). - _parse_data_list caps incoming items at WEBHOOK_MAX_ITEMS (default 5000) so a pathological push cannot blow memory. Tests - Fix test_null_alarm_type_skipped: its INSERT-count assertion was catching the ingestion_log insert written by log_ingestion. Filter that out so the test checks only data-table inserts. - Full suite: 66 passed. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 21:33:55 +00:00
try:
cur.execute("SAVEPOINT sp")
# [FIX-M16] API returns distance in METRES despite documentation saying km.
# Confirmed via: avgSpeed(km/h) × runTimeSecond / 3600 == distance/1000.
# startMileage/endMileage are cumulative odometer in metres (same unit).
# Divide by 1000 to store as distance_km.
raw_dist = clean_num(t.get("distance"))
dist_km = round(raw_dist / 1000.0, 4) if raw_dist is not None else None
feat(trips): [FIX-M20] enrich tracksolid.trips with coords, route polyline, addresses, plate Polling jimi.device.track.mileage does not return start/end coordinates, fuel, idle, or trip sequence — leaving most trip columns NULL. This change closes those gaps using data we already have in position_history plus a best-effort Nominatim lookup. Migration 09_trips_enrichment.sql adds: • route_geom (LineString), start_address, end_address, vehicle_plate, waypoints_count on tracksolid.trips • GIST indexes on the three geometry columns • view tracksolid.v_trips_enriched exposing daily_seq + trip_date_eat (replaces reliance on the device-supplied trip_seq, which is only populated when /pushtripreport fires) ingest_movement_rev.py::poll_trips now: • extracts idleSecond from the poll response (was previously dropped) • per-trip: SELECTs start fix, end fix, ST_MakeLine route, and waypoint count from position_history within (start_time, end_time) • reverse-geocodes start/end via the new ts_shared_rev.reverse_geocode helper (Nominatim, LRU-cached at ~11m precision, 1 req/sec, never raises) • caches vehicle_plate from a per-cycle plates dict • ON CONFLICT preserves webhook-supplied data when /pushtripreport later delivers native coords/fuel/trip_seq backfill_trips_enrichment.py is a one-shot script (dry-run by default, --apply to commit, --imei / --since flags) that runs the same enrichment against historical NULL rows and COALESCEs only — never overwrites. DWH bronze mirrors and Grafana panels intentionally not touched (frozen on this branch until the schema work lands). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-01 18:30:20 +00:00
imei = t.get("imei")
trip_start = clean_ts(t.get("startTime"))
trip_end = clean_ts(t.get("endTime"))
idle_s = clean_int(t.get("idleSecond"))
# [FIX-M20] Enrich from position_history. trip_start/end
# may be None (rare malformed payload) — skip enrichment
# in that case so we still capture the row.
start_geom = end_geom = route_geom = None
start_lat = start_lng = end_lat = end_lng = None
waypoints_count = 0
if trip_start and trip_end:
cur.execute(_ENRICH_QUERY, (
imei, trip_start, # start_geom
imei, trip_start, # start_lat
imei, trip_start, # start_lng
imei, trip_end, # end_geom
imei, trip_end, # end_lat
imei, trip_end, # end_lng
imei, trip_start, trip_end, # route_geom
imei, trip_start, trip_end, # waypoints_count
))
(start_geom, start_lat, start_lng,
end_geom, end_lat, end_lng,
route_geom, waypoints_count) = cur.fetchone()
start_address = reverse_geocode(start_lat, start_lng)
end_address = reverse_geocode(end_lat, end_lng)
vehicle_plate = plates.get(imei)
perf+fix: SAVEPOINT-per-item pollers, batched GPS inserts, parallel detail fetch Audit fixes across the ingestion stack: Observability - Move log_ingestion out of batch loops in poll_alarms and poll_parking (was emitting N cumulative log rows per run instead of one). - Add missing log_ingestion + t0 to poll_trips. - Count inserted via cur.rowcount instead of naive +=1 so ON CONFLICT DO NOTHING no longer inflates the metric. Resilience - SAVEPOINT-per-item added to poll_alarms, poll_live_positions, poll_trips, poll_parking so one bad row no longer aborts the batch (webhook handlers already had this; pollers were inconsistent). Performance - /pushgps and poll_track_list now use psycopg2.extras.execute_values with ON CONFLICT DO NOTHING — 10-50x write throughput on larger batches. - sync_devices and sync_driver_audit fetch jimi.track.device.detail concurrently via ThreadPoolExecutor(max_workers=8), cutting the daily registry sync from ~24s to ~3s for an 80-device fleet. - poll_track_list split into two phases: parallel API fetch (4 workers, no DB connection held) then one batched write. Previously the DB connection was held across every per-IMEI HTTP call, risking pool starvation. Security - _validate_token uses hmac.compare_digest for constant-time token comparison (closes timing side-channel). - _parse_data_list caps incoming items at WEBHOOK_MAX_ITEMS (default 5000) so a pathological push cannot blow memory. Tests - Fix test_null_alarm_type_skipped: its INSERT-count assertion was catching the ingestion_log insert written by log_ingestion. Filter that out so the test checks only data-table inserts. - Full suite: 66 passed. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 21:33:55 +00:00
cur.execute("""
INSERT INTO tracksolid.trips (
imei, start_time, end_time, distance_km,
feat(trips): [FIX-M20] enrich tracksolid.trips with coords, route polyline, addresses, plate Polling jimi.device.track.mileage does not return start/end coordinates, fuel, idle, or trip sequence — leaving most trip columns NULL. This change closes those gaps using data we already have in position_history plus a best-effort Nominatim lookup. Migration 09_trips_enrichment.sql adds: • route_geom (LineString), start_address, end_address, vehicle_plate, waypoints_count on tracksolid.trips • GIST indexes on the three geometry columns • view tracksolid.v_trips_enriched exposing daily_seq + trip_date_eat (replaces reliance on the device-supplied trip_seq, which is only populated when /pushtripreport fires) ingest_movement_rev.py::poll_trips now: • extracts idleSecond from the poll response (was previously dropped) • per-trip: SELECTs start fix, end fix, ST_MakeLine route, and waypoint count from position_history within (start_time, end_time) • reverse-geocodes start/end via the new ts_shared_rev.reverse_geocode helper (Nominatim, LRU-cached at ~11m precision, 1 req/sec, never raises) • caches vehicle_plate from a per-cycle plates dict • ON CONFLICT preserves webhook-supplied data when /pushtripreport later delivers native coords/fuel/trip_seq backfill_trips_enrichment.py is a one-shot script (dry-run by default, --apply to commit, --imei / --since flags) that runs the same enrichment against historical NULL rows and COALESCEs only — never overwrites. DWH bronze mirrors and Grafana panels intentionally not touched (frozen on this branch until the schema work lands). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-01 18:30:20 +00:00
avg_speed_kmh, max_speed_kmh, driving_time_s, idle_time_s,
start_geom, end_geom, route_geom, waypoints_count,
start_address, end_address, vehicle_plate, source
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s,
%s, %s, %s, 'poll')
perf+fix: SAVEPOINT-per-item pollers, batched GPS inserts, parallel detail fetch Audit fixes across the ingestion stack: Observability - Move log_ingestion out of batch loops in poll_alarms and poll_parking (was emitting N cumulative log rows per run instead of one). - Add missing log_ingestion + t0 to poll_trips. - Count inserted via cur.rowcount instead of naive +=1 so ON CONFLICT DO NOTHING no longer inflates the metric. Resilience - SAVEPOINT-per-item added to poll_alarms, poll_live_positions, poll_trips, poll_parking so one bad row no longer aborts the batch (webhook handlers already had this; pollers were inconsistent). Performance - /pushgps and poll_track_list now use psycopg2.extras.execute_values with ON CONFLICT DO NOTHING — 10-50x write throughput on larger batches. - sync_devices and sync_driver_audit fetch jimi.track.device.detail concurrently via ThreadPoolExecutor(max_workers=8), cutting the daily registry sync from ~24s to ~3s for an 80-device fleet. - poll_track_list split into two phases: parallel API fetch (4 workers, no DB connection held) then one batched write. Previously the DB connection was held across every per-IMEI HTTP call, risking pool starvation. Security - _validate_token uses hmac.compare_digest for constant-time token comparison (closes timing side-channel). - _parse_data_list caps incoming items at WEBHOOK_MAX_ITEMS (default 5000) so a pathological push cannot blow memory. Tests - Fix test_null_alarm_type_skipped: its INSERT-count assertion was catching the ingestion_log insert written by log_ingestion. Filter that out so the test checks only data-table inserts. - Full suite: 66 passed. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 21:33:55 +00:00
ON CONFLICT (imei, start_time) DO UPDATE SET
feat(trips): [FIX-M20] enrich tracksolid.trips with coords, route polyline, addresses, plate Polling jimi.device.track.mileage does not return start/end coordinates, fuel, idle, or trip sequence — leaving most trip columns NULL. This change closes those gaps using data we already have in position_history plus a best-effort Nominatim lookup. Migration 09_trips_enrichment.sql adds: • route_geom (LineString), start_address, end_address, vehicle_plate, waypoints_count on tracksolid.trips • GIST indexes on the three geometry columns • view tracksolid.v_trips_enriched exposing daily_seq + trip_date_eat (replaces reliance on the device-supplied trip_seq, which is only populated when /pushtripreport fires) ingest_movement_rev.py::poll_trips now: • extracts idleSecond from the poll response (was previously dropped) • per-trip: SELECTs start fix, end fix, ST_MakeLine route, and waypoint count from position_history within (start_time, end_time) • reverse-geocodes start/end via the new ts_shared_rev.reverse_geocode helper (Nominatim, LRU-cached at ~11m precision, 1 req/sec, never raises) • caches vehicle_plate from a per-cycle plates dict • ON CONFLICT preserves webhook-supplied data when /pushtripreport later delivers native coords/fuel/trip_seq backfill_trips_enrichment.py is a one-shot script (dry-run by default, --apply to commit, --imei / --since flags) that runs the same enrichment against historical NULL rows and COALESCEs only — never overwrites. DWH bronze mirrors and Grafana panels intentionally not touched (frozen on this branch until the schema work lands). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-01 18:30:20 +00:00
end_time = EXCLUDED.end_time,
distance_km = EXCLUDED.distance_km,
max_speed_kmh = COALESCE(EXCLUDED.max_speed_kmh, tracksolid.trips.max_speed_kmh),
driving_time_s = COALESCE(EXCLUDED.driving_time_s, tracksolid.trips.driving_time_s),
idle_time_s = COALESCE(EXCLUDED.idle_time_s, tracksolid.trips.idle_time_s),
start_geom = COALESCE(tracksolid.trips.start_geom, EXCLUDED.start_geom),
end_geom = COALESCE(EXCLUDED.end_geom, tracksolid.trips.end_geom),
route_geom = COALESCE(EXCLUDED.route_geom, tracksolid.trips.route_geom),
waypoints_count = EXCLUDED.waypoints_count,
start_address = COALESCE(tracksolid.trips.start_address, EXCLUDED.start_address),
end_address = COALESCE(EXCLUDED.end_address, tracksolid.trips.end_address),
vehicle_plate = COALESCE(EXCLUDED.vehicle_plate, tracksolid.trips.vehicle_plate)
perf+fix: SAVEPOINT-per-item pollers, batched GPS inserts, parallel detail fetch Audit fixes across the ingestion stack: Observability - Move log_ingestion out of batch loops in poll_alarms and poll_parking (was emitting N cumulative log rows per run instead of one). - Add missing log_ingestion + t0 to poll_trips. - Count inserted via cur.rowcount instead of naive +=1 so ON CONFLICT DO NOTHING no longer inflates the metric. Resilience - SAVEPOINT-per-item added to poll_alarms, poll_live_positions, poll_trips, poll_parking so one bad row no longer aborts the batch (webhook handlers already had this; pollers were inconsistent). Performance - /pushgps and poll_track_list now use psycopg2.extras.execute_values with ON CONFLICT DO NOTHING — 10-50x write throughput on larger batches. - sync_devices and sync_driver_audit fetch jimi.track.device.detail concurrently via ThreadPoolExecutor(max_workers=8), cutting the daily registry sync from ~24s to ~3s for an 80-device fleet. - poll_track_list split into two phases: parallel API fetch (4 workers, no DB connection held) then one batched write. Previously the DB connection was held across every per-IMEI HTTP call, risking pool starvation. Security - _validate_token uses hmac.compare_digest for constant-time token comparison (closes timing side-channel). - _parse_data_list caps incoming items at WEBHOOK_MAX_ITEMS (default 5000) so a pathological push cannot blow memory. Tests - Fix test_null_alarm_type_skipped: its INSERT-count assertion was catching the ingestion_log insert written by log_ingestion. Filter that out so the test checks only data-table inserts. - Full suite: 66 passed. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 21:33:55 +00:00
""", (
feat(trips): [FIX-M20] enrich tracksolid.trips with coords, route polyline, addresses, plate Polling jimi.device.track.mileage does not return start/end coordinates, fuel, idle, or trip sequence — leaving most trip columns NULL. This change closes those gaps using data we already have in position_history plus a best-effort Nominatim lookup. Migration 09_trips_enrichment.sql adds: • route_geom (LineString), start_address, end_address, vehicle_plate, waypoints_count on tracksolid.trips • GIST indexes on the three geometry columns • view tracksolid.v_trips_enriched exposing daily_seq + trip_date_eat (replaces reliance on the device-supplied trip_seq, which is only populated when /pushtripreport fires) ingest_movement_rev.py::poll_trips now: • extracts idleSecond from the poll response (was previously dropped) • per-trip: SELECTs start fix, end fix, ST_MakeLine route, and waypoint count from position_history within (start_time, end_time) • reverse-geocodes start/end via the new ts_shared_rev.reverse_geocode helper (Nominatim, LRU-cached at ~11m precision, 1 req/sec, never raises) • caches vehicle_plate from a per-cycle plates dict • ON CONFLICT preserves webhook-supplied data when /pushtripreport later delivers native coords/fuel/trip_seq backfill_trips_enrichment.py is a one-shot script (dry-run by default, --apply to commit, --imei / --since flags) that runs the same enrichment against historical NULL rows and COALESCEs only — never overwrites. DWH bronze mirrors and Grafana panels intentionally not touched (frozen on this branch until the schema work lands). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-01 18:30:20 +00:00
imei, trip_start, trip_end, dist_km,
clean_num(t.get("avgSpeed")),
clean_num(t.get("maxSpeed")),
clean_int(t.get("runTimeSecond")),
idle_s,
start_geom, end_geom, route_geom, waypoints_count,
start_address, end_address, vehicle_plate,
perf+fix: SAVEPOINT-per-item pollers, batched GPS inserts, parallel detail fetch Audit fixes across the ingestion stack: Observability - Move log_ingestion out of batch loops in poll_alarms and poll_parking (was emitting N cumulative log rows per run instead of one). - Add missing log_ingestion + t0 to poll_trips. - Count inserted via cur.rowcount instead of naive +=1 so ON CONFLICT DO NOTHING no longer inflates the metric. Resilience - SAVEPOINT-per-item added to poll_alarms, poll_live_positions, poll_trips, poll_parking so one bad row no longer aborts the batch (webhook handlers already had this; pollers were inconsistent). Performance - /pushgps and poll_track_list now use psycopg2.extras.execute_values with ON CONFLICT DO NOTHING — 10-50x write throughput on larger batches. - sync_devices and sync_driver_audit fetch jimi.track.device.detail concurrently via ThreadPoolExecutor(max_workers=8), cutting the daily registry sync from ~24s to ~3s for an 80-device fleet. - poll_track_list split into two phases: parallel API fetch (4 workers, no DB connection held) then one batched write. Previously the DB connection was held across every per-IMEI HTTP call, risking pool starvation. Security - _validate_token uses hmac.compare_digest for constant-time token comparison (closes timing side-channel). - _parse_data_list caps incoming items at WEBHOOK_MAX_ITEMS (default 5000) so a pathological push cannot blow memory. Tests - Fix test_null_alarm_type_skipped: its INSERT-count assertion was catching the ingestion_log insert written by log_ingestion. Filter that out so the test checks only data-table inserts. - Full suite: 66 passed. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 21:33:55 +00:00
))
cur.execute("RELEASE SAVEPOINT sp")
inserted += cur.rowcount
except Exception:
cur.execute("ROLLBACK TO SAVEPOINT sp")
log.warning("Failed to process trip for %s", t.get("imei"), exc_info=True)
log_ingestion(cur, "jimi.device.track.mileage", len(imeis), 0, inserted,
int((time.time() - t0) * 1000), True)
2026-04-07 18:34:40 +00:00
log.info("Trips: %d records processed.", inserted)
# ── 4. Parking Events (Every 15m) ─────────────────────────────────────────────
def poll_parking():
t0 = time.time()
# [FIX-M19] Parking requires an `account` param tied to the IMEI's
# sub-account — call per target with that target's IMEIs only.
token, imeis_by_target = get_token(), get_active_imeis_by_target()
if not token or not imeis_by_target: return
end_ts = datetime.now(timezone.utc)
start_ts = end_ts - timedelta(hours=1)
total_imei = sum(len(v) for v in imeis_by_target.values())
inserted = 0
perf+fix: SAVEPOINT-per-item pollers, batched GPS inserts, parallel detail fetch Audit fixes across the ingestion stack: Observability - Move log_ingestion out of batch loops in poll_alarms and poll_parking (was emitting N cumulative log rows per run instead of one). - Add missing log_ingestion + t0 to poll_trips. - Count inserted via cur.rowcount instead of naive +=1 so ON CONFLICT DO NOTHING no longer inflates the metric. Resilience - SAVEPOINT-per-item added to poll_alarms, poll_live_positions, poll_trips, poll_parking so one bad row no longer aborts the batch (webhook handlers already had this; pollers were inconsistent). Performance - /pushgps and poll_track_list now use psycopg2.extras.execute_values with ON CONFLICT DO NOTHING — 10-50x write throughput on larger batches. - sync_devices and sync_driver_audit fetch jimi.track.device.detail concurrently via ThreadPoolExecutor(max_workers=8), cutting the daily registry sync from ~24s to ~3s for an 80-device fleet. - poll_track_list split into two phases: parallel API fetch (4 workers, no DB connection held) then one batched write. Previously the DB connection was held across every per-IMEI HTTP call, risking pool starvation. Security - _validate_token uses hmac.compare_digest for constant-time token comparison (closes timing side-channel). - _parse_data_list caps incoming items at WEBHOOK_MAX_ITEMS (default 5000) so a pathological push cannot blow memory. Tests - Fix test_null_alarm_type_skipped: its INSERT-count assertion was catching the ingestion_log insert written by log_ingestion. Filter that out so the test checks only data-table inserts. - Full suite: 66 passed. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 21:33:55 +00:00
with get_conn() as conn:
with conn.cursor() as cur:
for target, target_imeis in imeis_by_target.items():
for i in range(0, len(target_imeis), 50):
batch = target_imeis[i:i+50]
# [FIX-M13] account + acc_type=0 required for non-empty results.
resp = api_post("jimi.open.platform.report.parking", {
"account": target,
"imeis": ",".join(batch),
"begin_time": start_ts.strftime("%Y-%m-%d %H:%M:%S"),
"end_time": end_ts.strftime("%Y-%m-%d %H:%M:%S"),
"acc_type": 0,
}, token)
events = resp.get("result") or []
for p in events:
try:
cur.execute("SAVEPOINT sp")
imei = p.get("imei")
start_time = clean_ts(p.get("startTime"))
if not imei or not start_time:
cur.execute("RELEASE SAVEPOINT sp")
continue
lat, lng = clean_num(p.get("lat")), clean_num(p.get("lng"))
cur.execute("""
INSERT INTO tracksolid.parking_events (
imei, event_type, start_time, end_time,
duration_seconds, geom, address
) VALUES (
%s, 'parking', %s, %s, %s,
CASE WHEN %s IS NOT NULL AND %s IS NOT NULL
THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326)
ELSE NULL END,
%s
) ON CONFLICT (imei, start_time, event_type) DO NOTHING
""", (
imei, start_time, clean_ts(p.get("endTime")),
clean_int(p.get("durSecond")), # [FIX-M13] API returns durSecond, not seconds
lng, lat, lng, lat,
clean(p.get("address"))
))
perf+fix: SAVEPOINT-per-item pollers, batched GPS inserts, parallel detail fetch Audit fixes across the ingestion stack: Observability - Move log_ingestion out of batch loops in poll_alarms and poll_parking (was emitting N cumulative log rows per run instead of one). - Add missing log_ingestion + t0 to poll_trips. - Count inserted via cur.rowcount instead of naive +=1 so ON CONFLICT DO NOTHING no longer inflates the metric. Resilience - SAVEPOINT-per-item added to poll_alarms, poll_live_positions, poll_trips, poll_parking so one bad row no longer aborts the batch (webhook handlers already had this; pollers were inconsistent). Performance - /pushgps and poll_track_list now use psycopg2.extras.execute_values with ON CONFLICT DO NOTHING — 10-50x write throughput on larger batches. - sync_devices and sync_driver_audit fetch jimi.track.device.detail concurrently via ThreadPoolExecutor(max_workers=8), cutting the daily registry sync from ~24s to ~3s for an 80-device fleet. - poll_track_list split into two phases: parallel API fetch (4 workers, no DB connection held) then one batched write. Previously the DB connection was held across every per-IMEI HTTP call, risking pool starvation. Security - _validate_token uses hmac.compare_digest for constant-time token comparison (closes timing side-channel). - _parse_data_list caps incoming items at WEBHOOK_MAX_ITEMS (default 5000) so a pathological push cannot blow memory. Tests - Fix test_null_alarm_type_skipped: its INSERT-count assertion was catching the ingestion_log insert written by log_ingestion. Filter that out so the test checks only data-table inserts. - Full suite: 66 passed. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 21:33:55 +00:00
cur.execute("RELEASE SAVEPOINT sp")
inserted += cur.rowcount
except Exception:
cur.execute("ROLLBACK TO SAVEPOINT sp")
log.warning("Failed to process parking for %s", p.get("imei"), exc_info=True)
perf+fix: SAVEPOINT-per-item pollers, batched GPS inserts, parallel detail fetch Audit fixes across the ingestion stack: Observability - Move log_ingestion out of batch loops in poll_alarms and poll_parking (was emitting N cumulative log rows per run instead of one). - Add missing log_ingestion + t0 to poll_trips. - Count inserted via cur.rowcount instead of naive +=1 so ON CONFLICT DO NOTHING no longer inflates the metric. Resilience - SAVEPOINT-per-item added to poll_alarms, poll_live_positions, poll_trips, poll_parking so one bad row no longer aborts the batch (webhook handlers already had this; pollers were inconsistent). Performance - /pushgps and poll_track_list now use psycopg2.extras.execute_values with ON CONFLICT DO NOTHING — 10-50x write throughput on larger batches. - sync_devices and sync_driver_audit fetch jimi.track.device.detail concurrently via ThreadPoolExecutor(max_workers=8), cutting the daily registry sync from ~24s to ~3s for an 80-device fleet. - poll_track_list split into two phases: parallel API fetch (4 workers, no DB connection held) then one batched write. Previously the DB connection was held across every per-IMEI HTTP call, risking pool starvation. Security - _validate_token uses hmac.compare_digest for constant-time token comparison (closes timing side-channel). - _parse_data_list caps incoming items at WEBHOOK_MAX_ITEMS (default 5000) so a pathological push cannot blow memory. Tests - Fix test_null_alarm_type_skipped: its INSERT-count assertion was catching the ingestion_log insert written by log_ingestion. Filter that out so the test checks only data-table inserts. - Full suite: 66 passed. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 21:33:55 +00:00
log_ingestion(cur, "jimi.open.platform.report.parking", total_imei, 0, inserted,
perf+fix: SAVEPOINT-per-item pollers, batched GPS inserts, parallel detail fetch Audit fixes across the ingestion stack: Observability - Move log_ingestion out of batch loops in poll_alarms and poll_parking (was emitting N cumulative log rows per run instead of one). - Add missing log_ingestion + t0 to poll_trips. - Count inserted via cur.rowcount instead of naive +=1 so ON CONFLICT DO NOTHING no longer inflates the metric. Resilience - SAVEPOINT-per-item added to poll_alarms, poll_live_positions, poll_trips, poll_parking so one bad row no longer aborts the batch (webhook handlers already had this; pollers were inconsistent). Performance - /pushgps and poll_track_list now use psycopg2.extras.execute_values with ON CONFLICT DO NOTHING — 10-50x write throughput on larger batches. - sync_devices and sync_driver_audit fetch jimi.track.device.detail concurrently via ThreadPoolExecutor(max_workers=8), cutting the daily registry sync from ~24s to ~3s for an 80-device fleet. - poll_track_list split into two phases: parallel API fetch (4 workers, no DB connection held) then one batched write. Previously the DB connection was held across every per-IMEI HTTP call, risking pool starvation. Security - _validate_token uses hmac.compare_digest for constant-time token comparison (closes timing side-channel). - _parse_data_list caps incoming items at WEBHOOK_MAX_ITEMS (default 5000) so a pathological push cannot blow memory. Tests - Fix test_null_alarm_type_skipped: its INSERT-count assertion was catching the ingestion_log insert written by log_ingestion. Filter that out so the test checks only data-table inserts. - Full suite: 66 passed. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 21:33:55 +00:00
int((time.time() - t0) * 1000), True)
log.info("Parking: %d events processed across %d target(s).", inserted, len(imeis_by_target))
# ── 5. High-Resolution GPS Trail (Every 30m) — POLL-01 ───────────────────────
def poll_track_list():
"""[FIX-M14] Fetch per-device GPS waypoint trails via jimi.device.track.list.
The 60-second fleet sweep (poll_live_positions) captures only the most
recent fix per vehicle all motion between sweeps is invisible. This
function retrieves every waypoint the device logged in the last 35 minutes
(5-min overlap ensures no gaps at scheduling boundaries) and inserts them
into position_history with source='track_list'.
Impact on reporting:
- position_history row density increases from ~1/min to ~14/min per device
- Route traces in Grafana become accurate continuous paths
- Speed profile queries gain meaningful resolution (avg over 10s intervals
vs 60s intervals) enables hard-braking / harsh-acceleration detection
- v_mileage_daily_cagg continuous aggregate gains finer odometer deltas
"""
t0 = time.time()
token, imeis = get_token(), get_active_imeis()
if not token or not imeis:
return
end_ts = datetime.now(timezone.utc)
start_ts = end_ts - timedelta(minutes=35) # 5-min overlap avoids boundary gaps
perf+fix: SAVEPOINT-per-item pollers, batched GPS inserts, parallel detail fetch Audit fixes across the ingestion stack: Observability - Move log_ingestion out of batch loops in poll_alarms and poll_parking (was emitting N cumulative log rows per run instead of one). - Add missing log_ingestion + t0 to poll_trips. - Count inserted via cur.rowcount instead of naive +=1 so ON CONFLICT DO NOTHING no longer inflates the metric. Resilience - SAVEPOINT-per-item added to poll_alarms, poll_live_positions, poll_trips, poll_parking so one bad row no longer aborts the batch (webhook handlers already had this; pollers were inconsistent). Performance - /pushgps and poll_track_list now use psycopg2.extras.execute_values with ON CONFLICT DO NOTHING — 10-50x write throughput on larger batches. - sync_devices and sync_driver_audit fetch jimi.track.device.detail concurrently via ThreadPoolExecutor(max_workers=8), cutting the daily registry sync from ~24s to ~3s for an 80-device fleet. - poll_track_list split into two phases: parallel API fetch (4 workers, no DB connection held) then one batched write. Previously the DB connection was held across every per-IMEI HTTP call, risking pool starvation. Security - _validate_token uses hmac.compare_digest for constant-time token comparison (closes timing side-channel). - _parse_data_list caps incoming items at WEBHOOK_MAX_ITEMS (default 5000) so a pathological push cannot blow memory. Tests - Fix test_null_alarm_type_skipped: its INSERT-count assertion was catching the ingestion_log insert written by log_ingestion. Filter that out so the test checks only data-table inserts. - Full suite: 66 passed. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 21:33:55 +00:00
begin_str = start_ts.strftime("%Y-%m-%d %H:%M:%S")
end_str = end_ts.strftime("%Y-%m-%d %H:%M:%S")
# Phase 1: fetch waypoints from API without holding a DB connection.
# jimi.device.track.list is per-IMEI; parallelise at 4 workers to speed
# up the 30 min sweep without tripping the 1006 rate limit.
def _fetch(imei: str):
resp = api_post("jimi.device.track.list", {
"imei": imei,
"begin_time": begin_str,
"end_time": end_str,
"map_type": "GOOGLE",
}, token)
return imei, resp.get("result") or []
perf+fix: SAVEPOINT-per-item pollers, batched GPS inserts, parallel detail fetch Audit fixes across the ingestion stack: Observability - Move log_ingestion out of batch loops in poll_alarms and poll_parking (was emitting N cumulative log rows per run instead of one). - Add missing log_ingestion + t0 to poll_trips. - Count inserted via cur.rowcount instead of naive +=1 so ON CONFLICT DO NOTHING no longer inflates the metric. Resilience - SAVEPOINT-per-item added to poll_alarms, poll_live_positions, poll_trips, poll_parking so one bad row no longer aborts the batch (webhook handlers already had this; pollers were inconsistent). Performance - /pushgps and poll_track_list now use psycopg2.extras.execute_values with ON CONFLICT DO NOTHING — 10-50x write throughput on larger batches. - sync_devices and sync_driver_audit fetch jimi.track.device.detail concurrently via ThreadPoolExecutor(max_workers=8), cutting the daily registry sync from ~24s to ~3s for an 80-device fleet. - poll_track_list split into two phases: parallel API fetch (4 workers, no DB connection held) then one batched write. Previously the DB connection was held across every per-IMEI HTTP call, risking pool starvation. Security - _validate_token uses hmac.compare_digest for constant-time token comparison (closes timing side-channel). - _parse_data_list caps incoming items at WEBHOOK_MAX_ITEMS (default 5000) so a pathological push cannot blow memory. Tests - Fix test_null_alarm_type_skipped: its INSERT-count assertion was catching the ingestion_log insert written by log_ingestion. Filter that out so the test checks only data-table inserts. - Full suite: 66 passed. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 21:33:55 +00:00
with ThreadPoolExecutor(max_workers=4) as pool:
fetched = list(pool.map(_fetch, imeis))
perf+fix: SAVEPOINT-per-item pollers, batched GPS inserts, parallel detail fetch Audit fixes across the ingestion stack: Observability - Move log_ingestion out of batch loops in poll_alarms and poll_parking (was emitting N cumulative log rows per run instead of one). - Add missing log_ingestion + t0 to poll_trips. - Count inserted via cur.rowcount instead of naive +=1 so ON CONFLICT DO NOTHING no longer inflates the metric. Resilience - SAVEPOINT-per-item added to poll_alarms, poll_live_positions, poll_trips, poll_parking so one bad row no longer aborts the batch (webhook handlers already had this; pollers were inconsistent). Performance - /pushgps and poll_track_list now use psycopg2.extras.execute_values with ON CONFLICT DO NOTHING — 10-50x write throughput on larger batches. - sync_devices and sync_driver_audit fetch jimi.track.device.detail concurrently via ThreadPoolExecutor(max_workers=8), cutting the daily registry sync from ~24s to ~3s for an 80-device fleet. - poll_track_list split into two phases: parallel API fetch (4 workers, no DB connection held) then one batched write. Previously the DB connection was held across every per-IMEI HTTP call, risking pool starvation. Security - _validate_token uses hmac.compare_digest for constant-time token comparison (closes timing side-channel). - _parse_data_list caps incoming items at WEBHOOK_MAX_ITEMS (default 5000) so a pathological push cannot blow memory. Tests - Fix test_null_alarm_type_skipped: its INSERT-count assertion was catching the ingestion_log insert written by log_ingestion. Filter that out so the test checks only data-table inserts. - Full suite: 66 passed. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 21:33:55 +00:00
# Phase 2: write rows in one DB transaction.
total_inserted = 0
devices_with_data = 0
rows = []
for imei, waypoints in fetched:
device_rows = 0
for wp in waypoints:
lat = clean_num(wp.get("lat"))
lng = clean_num(wp.get("lng"))
gps_time = clean_ts(wp.get("gpsTime"))
if not is_valid_fix(lat, lng) or not gps_time:
continue
rows.append((
imei, gps_time,
lng, lat, # ST_MakePoint(lng, lat)
lat, lng, # lat, lng columns
clean_num(wp.get("gpsSpeed")),
clean_num(wp.get("direction")),
clean(wp.get("accStatus")),
))
device_rows += 1
if device_rows:
devices_with_data += 1
if rows:
with get_conn() as conn:
with conn.cursor() as cur:
execute_values(
cur,
"""
INSERT INTO tracksolid.position_history (
imei, gps_time, geom, lat, lng,
speed, direction, acc_status, source
) VALUES %s
ON CONFLICT (imei, gps_time) DO NOTHING
""",
rows,
template="(%s, %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326),"
" %s, %s, %s, %s, %s, 'track_list')",
page_size=500,
)
total_inserted = cur.rowcount
log_ingestion(cur, "jimi.device.track.list", len(imeis),
0, total_inserted, int((time.time() - t0) * 1000), True)
else:
with get_conn() as conn:
with conn.cursor() as cur:
log_ingestion(cur, "jimi.device.track.list", len(imeis),
0, 0, int((time.time() - t0) * 1000), True)
log.info("Track list: %d waypoints inserted across %d/%d devices.",
total_inserted, devices_with_data, len(imeis))
# ── 6. On-Demand Device Location Refresh — POLL-03 ───────────────────────────
def get_device_locations(imeis: list) -> int:
"""[FIX-M15] Precision position refresh for a specific list of IMEIs.
Calls jimi.device.location.get (up to 50 IMEIs per call) and upserts
results into live_positions. Use this for:
- Alarm enrichment: get exact position immediately after an alarm fires
- Stale device recovery: force-refresh a vehicle that has been offline
- Dashboard on-demand refresh without waiting for the 60s fleet sweep
Returns the number of positions successfully upserted.
"""
token = get_token()
if not token or not imeis:
return 0
upserted = 0
with get_conn() as conn:
with conn.cursor() as cur:
for i in range(0, len(imeis), 50):
batch = imeis[i:i + 50]
resp = api_post("jimi.device.location.get", {
"imeis": ",".join(batch),
"map_type": "GOOGLE",
}, token)
positions = resp.get("result") or []
for p in positions:
imei = p.get("imei")
lat = clean_num(p.get("lat"))
lng = clean_num(p.get("lng"))
if not imei or not is_valid_fix(lat, lng):
continue
cur.execute("""
INSERT INTO tracksolid.live_positions (
imei, geom, lat, lng, speed, direction,
gps_time, acc_status, current_mileage, recorded_at
) VALUES (
%s, ST_SetSRID(ST_MakePoint(%s, %s), 4326),
%s, %s, %s, %s, %s, %s, %s, NOW()
)
ON CONFLICT (imei) DO UPDATE SET
geom = EXCLUDED.geom,
lat = EXCLUDED.lat,
lng = EXCLUDED.lng,
speed = EXCLUDED.speed,
direction = EXCLUDED.direction,
gps_time = EXCLUDED.gps_time,
acc_status = EXCLUDED.acc_status,
current_mileage = EXCLUDED.current_mileage,
updated_at = NOW()
""", (
imei, lng, lat, lat, lng,
clean_num(p.get("speed")),
clean_num(p.get("direction")),
clean_ts(p.get("gpsTime")),
clean(p.get("accStatus")),
clean_num(p.get("currentMileage")),
))
upserted += 1
conn.commit()
log.info("get_device_locations: %d positions refreshed.", upserted)
return upserted
2026-04-07 18:34:40 +00:00
# ── Main Loop ─────────────────────────────────────────────────────────────────
def main():
log.info("Starting MOVEMENT PIPELINE (v2.2)...")
2026-04-07 18:34:40 +00:00
# Startup catch-up
safe_task(sync_devices, log)()
safe_task(poll_live_positions, log)()
safe_task(poll_trips, log)()
safe_task(poll_parking, log)()
safe_task(poll_track_list, log)()
2026-04-07 18:34:40 +00:00
# Schedule
schedule.every(60).seconds.do(safe_task(poll_live_positions, log))
schedule.every(15).minutes.do(safe_task(poll_trips, log))
schedule.every(15).minutes.do(safe_task(poll_parking, log))
schedule.every(30).minutes.do(safe_task(poll_track_list, log)) # [FIX-M14]
schedule.every().day.at("02:00").do(safe_task(sync_devices, log))
2026-04-07 18:34:40 +00:00
while True:
schedule.run_pending()
time.sleep(1)
if __name__ == "__main__":
main()