From 2309464ab8549a0b50cab79eed9544b4e7e3225a Mon Sep 17 00:00:00 2001 From: david kiania Date: Thu, 21 May 2026 21:05:26 +0300 Subject: [PATCH] FIX-M21: alarm cross-feed + stale-IMEI recovery for live_positions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cherry-pick of c8f5907 (originally FIX-M20 on main) onto quality-program-2026-04-12 — renamed to FIX-M21 here to avoid clashing with this branch's existing [FIX-M20] (trip enrichment, commit 144dede). Behaviour and code are unchanged from the main-branch original; the annotation tag is the only difference. Background ---------- A field audit of liveposition.rahamafresh.com on 2026-05-21 surfaced two freshness gaps that share a single root cause: tracksolid.live_positions was being written by only one path (the 60s polled sweep), and that path silently omits devices that don't have a "current" fix in Jimi's location.list response. Effect on the dashboard: * 18 vehicles show OFFLINE for days-to-months — last fix is whatever the sweep wrote before Jimi dropped them. * 3 vehicles (KDK 780K, KCQ 618K, KCZ 476E) depend on dashcam fallback because their dedicated tracker has been silent; the camera's lat/lng arrives via /pushalarm webhooks (5,287/day, 100% lat/lng fill) but we discard it after writing to tracksolid.alarms. Verified upstream subscription state: only /pushalarm is registered with Jimi; the n8n forwarders for /pushgps, /pushtripreport, /pushobd are inactive. This change uses only data that already arrives. What's in this commit --------------------- ts_shared_rev.py * upsert_live_position(cur, imei, lat, lng, gps_time, ..., extras=None) — single time-guarded upsert all three writers will share. Guards on is_valid_fix() (filters Zero-Island and out-of-range) and EXCLUDED.gps_time > stored.gps_time so late-arriving alarms or webhook retries can't rewind a fresher marker. COALESCE on optional columns so sparse callers don't blank dense ones' values. * get_stale_imeis(stale_minutes=30) — SELECT enabled_flag=1 devices whose live_positions.gps_time is NULL or older than the threshold, ordered NULLS FIRST so worst-offenders are in batch #1. * ensure_device(cur, imei, device_name=None) — relocated from webhook_receiver_rev so every live_positions writer can satisfy the FK without re-defining the helper. The original underscore-prefixed name in webhook_receiver_rev becomes a backwards-compat alias. webhook_receiver_rev.py * /pushalarm — after the alarm row insert, call upsert_live_position with the alarm's lat/lng and alarmTime. Sits inside the existing per-item SAVEPOINT, so a cross-feed failure rolls back only that one alarm's cross-feed, not the alarm row. ingest_movement_rev.py * poll_live_positions — inline INSERT replaced with upsert_live_position (extras dict carries the sweep-only columns). Same data, time-guarded. * get_device_locations — inline INSERT replaced; also gains an ensure_device call so it can be safely fed arbitrary IMEIs. * poll_stale_locations() — new wrapper. Pulls get_stale_imeis() and hands it to get_device_locations. Scheduled every 10 minutes plus a startup catch-up call. Uses jimi.device.location.get which returns *last-known* fix, so devices the 60s sweep drops can be re-warmed. Expected post-deploy effect (estimates, see 06_live_location/260521_timescale_location_upgrade_major.md §4) * ~1,100-1,600 additional live_positions upserts/day from the alarm cross-feed, after the time-guard rejects ~70-80% of races vs the fresher 60s sweep. * The 3 camera-fallback plates flip to "seconds-after-alarm" cadence (JC400P emits ~107 alarms/day per device). * 8-14 of the 24 OFFLINE plates expected to recover via location.get's last-known-fix path within the first 30 minutes. * Dashboard's "Offline 24h+" KPI: 24 → 10-14 within the first hour. * No 06_live_location code changes required — reads through reporting.v_live_positions transparently. Tests ----- 12 webhook integration tests pass (3 new: cross-feed fires on valid fix; skips without lat/lng; skips Zero-Island). 8 new unit tests in test_stale_imeis.py cover the stale selector, the poll wrapper, and the time-guard contract on upsert_live_position. Full suite: 77 passed. Deployment ---------- No schema migration. Both webhook_receiver and ingest_movement containers must be rebuilt — source is image-baked, not bind-mounted. Rollback is git revert + rebuild. Plan & monitoring SQL: 06_live_location/260521_timescale_location_upgrade_major.md Verification playbook: 06_live_location/260521_timescale_location_upgrade_verification.md Co-Authored-By: Claude Opus 4.7 --- ingest_movement_rev.py | 121 ++++++++++-------- tests/fixtures/api_responses.py | 24 ++++ tests/integration/test_webhook_endpoints.py | 46 +++++++ tests/unit/test_stale_imeis.py | 112 +++++++++++++++++ ts_shared_rev.py | 132 ++++++++++++++++++++ webhook_receiver_rev.py | 33 ++--- 6 files changed, 402 insertions(+), 66 deletions(-) create mode 100644 tests/unit/test_stale_imeis.py diff --git a/ingest_movement_rev.py b/ingest_movement_rev.py index 20498e6..a618520 100644 --- a/ingest_movement_rev.py +++ b/ingest_movement_rev.py @@ -44,6 +44,17 @@ REVISIONS (QA-Verified): (Nominatim), and caches vehicle_plate from devices. Closes the NULL-column gaps that were inherent to jimi.device.track.mileage (it does not return coordinates, idle, or trip sequence). + [FIX-M21] Live-position freshness: (a) /pushalarm now cross-feeds its + lat/lng into live_positions via the new shared + upsert_live_position() helper, time-guarded so older + timestamps can't rewind fresher fixes; (b) new + poll_stale_locations() runs every 10 min and calls + get_device_locations() for IMEIs whose live_positions.gps_time + is missing or > 30 min stale, recovering devices the 60s + sweep silently drops. Both the 60s sweep and + get_device_locations() now share the same time-guarded + upsert. _ensure_device → ensure_device relocated to + ts_shared_rev for FK-guard reuse. ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ """ @@ -61,6 +72,7 @@ from ts_shared_rev import ( get_active_imeis, get_active_imeis_by_target, get_conn, + get_stale_imeis, get_token, is_valid_fix, log_ingestion, @@ -69,9 +81,11 @@ from ts_shared_rev import ( clean_int, clean_ts, get_logger, + ensure_device, reverse_geocode, safe_task, setup_shutdown, + upsert_live_position, ) log = get_logger("movement") @@ -224,30 +238,30 @@ def poll_live_positions(): gps_num = clean_int(p.get("gpsNum")) current_mileage = clean_num(p.get("currentMileage")) - cur.execute(""" - INSERT INTO tracksolid.live_positions ( - imei, geom, lat, lng, pos_type, confidence, gps_time, hb_time, - speed, direction, acc_status, gps_signal, gps_num, - elec_quantity, power_value, battery_power_val, tracker_oil, - temperature, current_mileage, device_status, loc_desc, recorded_at - ) VALUES ( - %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326), %s, %s, %s, %s, %s, %s, - %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW() - ) - ON CONFLICT (imei) DO UPDATE SET - geom=EXCLUDED.geom, lat=EXCLUDED.lat, lng=EXCLUDED.lng, - gps_time=EXCLUDED.gps_time, speed=EXCLUDED.speed, direction=EXCLUDED.direction, - acc_status=EXCLUDED.acc_status, current_mileage=EXCLUDED.current_mileage, - updated_at=NOW() - """, ( - imei, lng, lat, lat, lng, clean(p.get("posType")), clean_int(p.get("confidence")), - gps_time, clean_ts(p.get("hbTime")), speed, - direction, acc_status, clean_int(p.get("gpsSignal")), - gps_num, clean_num(p.get("electQuantity")), clean_num(p.get("powerValue")), - clean_num(p.get("batteryPowerVal")), clean(p.get("trackerOil")), clean_num(p.get("temperature")), - current_mileage, clean(p.get("status")), clean(p.get("locDesc")) - )) - upserted += cur.rowcount + # [FIX-M21] Time-guarded upsert via shared helper so the + # 60s sweep, the alarm cross-feed, and get_device_locations + # all agree about freshness ordering. The sweep is normally + # the freshest source so the guard rarely rejects its writes. + upserted += upsert_live_position( + cur, imei, lat, lng, gps_time, + speed=speed, direction=direction, + acc_status=acc_status, + current_mileage=current_mileage, + extras={ + "pos_type": clean(p.get("posType")), + "confidence": clean_int(p.get("confidence")), + "hb_time": clean_ts(p.get("hbTime")), + "gps_signal": clean_int(p.get("gpsSignal")), + "gps_num": gps_num, + "elec_quantity": clean_num(p.get("electQuantity")), + "power_value": clean_num(p.get("powerValue")), + "battery_power_val": clean_num(p.get("batteryPowerVal")), + "tracker_oil": clean(p.get("trackerOil")), + "temperature": clean_num(p.get("temperature")), + "device_status": clean(p.get("status")), + "loc_desc": clean(p.get("locDesc")), + }, + ) # History (Hypertable Source) if gps_time: @@ -617,39 +631,44 @@ def get_device_locations(imeis: list) -> int: if not imei or not is_valid_fix(lat, lng): continue - cur.execute(""" - INSERT INTO tracksolid.live_positions ( - imei, geom, lat, lng, speed, direction, - gps_time, acc_status, current_mileage, recorded_at - ) VALUES ( - %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326), - %s, %s, %s, %s, %s, %s, %s, NOW() - ) - ON CONFLICT (imei) DO UPDATE SET - geom = EXCLUDED.geom, - lat = EXCLUDED.lat, - lng = EXCLUDED.lng, - speed = EXCLUDED.speed, - direction = EXCLUDED.direction, - gps_time = EXCLUDED.gps_time, - acc_status = EXCLUDED.acc_status, - current_mileage = EXCLUDED.current_mileage, - updated_at = NOW() - """, ( - imei, lng, lat, lat, lng, - clean_num(p.get("speed")), - clean_num(p.get("direction")), + # [FIX-M21] FK guard — this path can see IMEIs the daily + # sync_devices hasn't picked up yet (especially when used + # as the stale-IMEI rescue path). + ensure_device(cur, imei, clean(p.get("deviceName"))) + + upserted += upsert_live_position( + cur, imei, lat, lng, clean_ts(p.get("gpsTime")), - clean(p.get("accStatus")), - clean_num(p.get("currentMileage")), - )) - upserted += 1 + speed=clean_num(p.get("speed")), + direction=clean_num(p.get("direction")), + acc_status=clean(p.get("accStatus")), + current_mileage=clean_num(p.get("currentMileage")), + ) conn.commit() log.info("get_device_locations: %d positions refreshed.", upserted) return upserted +# ── 7. Stale-IMEI Recovery — POLL-04 ───────────────────────────────────────── + +def poll_stale_locations(): + """[FIX-M21] Refresh live_positions for IMEIs whose stored gps_time is + missing or older than 30 minutes. + + Complements poll_live_positions (the 60s sweep), which silently omits + devices Jimi's location.list endpoint doesn't return. jimi.device.location.get + returns *last-known* fix per IMEI, so this path can re-warm devices + the sweep has dropped. + """ + stale = get_stale_imeis(stale_minutes=30) + if not stale: + log.info("poll_stale_locations: no stale IMEIs.") + return + log.info("poll_stale_locations: refreshing %d stale IMEI(s).", len(stale)) + get_device_locations(stale) + + # ── Main Loop ───────────────────────────────────────────────────────────────── def main(): @@ -661,12 +680,14 @@ def main(): safe_task(poll_trips, log)() safe_task(poll_parking, log)() safe_task(poll_track_list, log)() + safe_task(poll_stale_locations, log)() # Schedule schedule.every(60).seconds.do(safe_task(poll_live_positions, log)) schedule.every(15).minutes.do(safe_task(poll_trips, log)) schedule.every(15).minutes.do(safe_task(poll_parking, log)) schedule.every(30).minutes.do(safe_task(poll_track_list, log)) # [FIX-M14] + schedule.every(10).minutes.do(safe_task(poll_stale_locations, log)) # [FIX-M21] schedule.every().day.at("02:00").do(safe_task(sync_devices, log)) while True: diff --git a/tests/fixtures/api_responses.py b/tests/fixtures/api_responses.py index edebe26..c25183d 100644 --- a/tests/fixtures/api_responses.py +++ b/tests/fixtures/api_responses.py @@ -107,3 +107,27 @@ WEBHOOK_ALARM_NULL_TYPE = { "alarmType": None, "gateTime": "2024-04-12 07:30:00", } + +# Alarm with no lat/lng — cross-feed (FIX-M21) must skip live_positions +# but still write the alarm row. +WEBHOOK_ALARM_NO_POSITION = { + "deviceImei": "123456789012345", + "alarmType": "4", + "alarmName": "Speeding", + "gateTime": "2024-04-12 07:30:00", + "lat": None, + "lng": None, + "speed": 0.0, +} + +# Alarm with Zero-Island (0, 0) coordinates — is_valid_fix must reject; +# alarm row still writes, live_positions cross-feed must NOT fire. +WEBHOOK_ALARM_ZERO_ISLAND = { + "deviceImei": "123456789012345", + "alarmType": "4", + "alarmName": "Speeding", + "gateTime": "2024-04-12 07:30:00", + "lat": 0, + "lng": 0, + "speed": 0.0, +} diff --git a/tests/integration/test_webhook_endpoints.py b/tests/integration/test_webhook_endpoints.py index dd87e25..3acc403 100644 --- a/tests/integration/test_webhook_endpoints.py +++ b/tests/integration/test_webhook_endpoints.py @@ -20,6 +20,8 @@ import webhook_receiver_rev from tests.fixtures.api_responses import ( WEBHOOK_ALARM_PAYLOAD, WEBHOOK_ALARM_NULL_TYPE, + WEBHOOK_ALARM_NO_POSITION, + WEBHOOK_ALARM_ZERO_ISLAND, WEBHOOK_TRIP_BCD_PAYLOAD, WEBHOOK_TRIP_ISO_PAYLOAD, WEBHOOK_OBD_PAYLOAD, @@ -96,6 +98,50 @@ class TestPushAlarm: assert response.status_code == 200 assert response.json()["code"] == 0 + def test_alarm_cross_feeds_live_position(self, client, mock_db): + """FIX-M21: a valid alarm must additionally upsert live_positions.""" + mock_conn, mock_cur = mock_db + data_list = json.dumps([WEBHOOK_ALARM_PAYLOAD]) + response = client.post("/pushalarm", data={"token": "", "data_list": data_list}) + assert response.status_code == 200 + # Exactly one INSERT INTO live_positions should have fired. + lp_inserts = [ + c for c in mock_cur.execute.call_args_list + if "tracksolid.live_positions" in str(c) and "INSERT" in str(c) + ] + assert len(lp_inserts) == 1, "Cross-feed must upsert live_positions exactly once" + + def test_alarm_without_lat_lng_skips_cross_feed(self, client, mock_db): + """FIX-M21: an alarm without lat/lng must NOT touch live_positions + (but must still insert the alarm row).""" + mock_conn, mock_cur = mock_db + data_list = json.dumps([WEBHOOK_ALARM_NO_POSITION]) + response = client.post("/pushalarm", data={"token": "", "data_list": data_list}) + assert response.status_code == 200 + lp_inserts = [ + c for c in mock_cur.execute.call_args_list + if "tracksolid.live_positions" in str(c) and "INSERT" in str(c) + ] + assert len(lp_inserts) == 0, "No live_positions write without a valid fix" + alarm_inserts = [ + c for c in mock_cur.execute.call_args_list + if "tracksolid.alarms" in str(c) and "INSERT" in str(c) + ] + assert len(alarm_inserts) == 1, "Alarm row must still write" + + def test_alarm_with_zero_island_skips_cross_feed(self, client, mock_db): + """FIX-M21: a (0, 0) fix must NOT propagate to live_positions — + is_valid_fix in the shared helper guards Zero Island.""" + mock_conn, mock_cur = mock_db + data_list = json.dumps([WEBHOOK_ALARM_ZERO_ISLAND]) + response = client.post("/pushalarm", data={"token": "", "data_list": data_list}) + assert response.status_code == 200 + lp_inserts = [ + c for c in mock_cur.execute.call_args_list + if "tracksolid.live_positions" in str(c) and "INSERT" in str(c) + ] + assert len(lp_inserts) == 0, "Zero-Island fix must not reach live_positions" + class TestPushTripReport: def test_bcd_timestamp_parsed(self, client, mock_db): diff --git a/tests/unit/test_stale_imeis.py b/tests/unit/test_stale_imeis.py new file mode 100644 index 0000000..8d841df --- /dev/null +++ b/tests/unit/test_stale_imeis.py @@ -0,0 +1,112 @@ +"""Unit tests for the FIX-M21 stale-IMEI recovery helpers. + +Covers: + - ts_shared_rev.get_stale_imeis — the SQL selector + - ingest_movement_rev.poll_stale_locations — the scheduler wrapper +""" +import sys +import os +import pytest +from unittest.mock import MagicMock, patch +from contextlib import contextmanager + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) + +os.environ.setdefault("TRACKSOLID_APP_KEY", "test_key") +os.environ.setdefault("TRACKSOLID_APP_SECRET", "test_secret") +os.environ.setdefault("TRACKSOLID_USER_ID", "test_user") +os.environ.setdefault("TRACKSOLID_PWD_MD5", "test_md5") +os.environ.setdefault("DATABASE_URL", "postgresql://test:test@localhost:5432/test") + + +def _make_mock_conn(rows=None): + """Cursor/connection double that returns `rows` from fetchall().""" + mock_cur = MagicMock() + mock_cur.fetchall.return_value = rows or [] + mock_conn = MagicMock() + mock_conn.cursor.return_value.__enter__ = lambda s: mock_cur + mock_conn.cursor.return_value.__exit__ = MagicMock(return_value=False) + return mock_conn, mock_cur + + +@contextmanager +def _conn_ctx(mock_conn): + yield mock_conn + + +class TestGetStaleImeis: + def test_returns_devices_with_old_gps_time(self): + from ts_shared_rev import get_stale_imeis + mock_conn, mock_cur = _make_mock_conn(rows=[("imei_a",), ("imei_b",)]) + with patch("ts_shared_rev.get_conn") as mock_get_conn: + mock_get_conn.return_value = _conn_ctx(mock_conn) + result = get_stale_imeis(stale_minutes=30) + + assert result == ["imei_a", "imei_b"] + sql = str(mock_cur.execute.call_args) + assert "enabled_flag = 1" in sql + assert "INTERVAL" in sql.upper() or "interval" in sql + assert "NULLS FIRST" in sql + + def test_returns_empty_when_no_stale(self): + from ts_shared_rev import get_stale_imeis + mock_conn, _ = _make_mock_conn(rows=[]) + with patch("ts_shared_rev.get_conn") as mock_get_conn: + mock_get_conn.return_value = _conn_ctx(mock_conn) + assert get_stale_imeis() == [] + + +class TestPollStaleLocations: + def test_noop_when_empty(self): + """If no IMEIs are stale, get_device_locations must NOT be called.""" + import ingest_movement_rev as mod + with patch.object(mod, "get_stale_imeis", return_value=[]), \ + patch.object(mod, "get_device_locations") as mock_refresh: + mod.poll_stale_locations() + mock_refresh.assert_not_called() + + def test_invokes_refresh_with_stale_list(self): + """When stale IMEIs are present, get_device_locations is called once + with the exact list returned by get_stale_imeis.""" + import ingest_movement_rev as mod + stale = ["imei_a", "imei_b", "imei_c"] + with patch.object(mod, "get_stale_imeis", return_value=stale), \ + patch.object(mod, "get_device_locations") as mock_refresh: + mod.poll_stale_locations() + mock_refresh.assert_called_once_with(stale) + + +class TestUpsertLivePositionGuards: + """Spot-checks on the time-guard contract — covers what the dashboard + relies on (no rewinding the marker on stale alarm arrivals).""" + + def test_skips_invalid_fix(self): + from ts_shared_rev import upsert_live_position + mock_cur = MagicMock() + # Zero-Island + assert upsert_live_position(mock_cur, "imei_x", 0, 0, "2026-05-21 10:00:00") == 0 + mock_cur.execute.assert_not_called() + + def test_skips_missing_gps_time(self): + from ts_shared_rev import upsert_live_position + mock_cur = MagicMock() + assert upsert_live_position(mock_cur, "imei_x", -1.29, 36.82, None) == 0 + mock_cur.execute.assert_not_called() + + def test_skips_missing_imei(self): + from ts_shared_rev import upsert_live_position + mock_cur = MagicMock() + assert upsert_live_position(mock_cur, None, -1.29, 36.82, "2026-05-21 10:00:00") == 0 + mock_cur.execute.assert_not_called() + + def test_executes_upsert_for_valid_fix(self): + from ts_shared_rev import upsert_live_position + mock_cur = MagicMock() + mock_cur.rowcount = 1 + n = upsert_live_position(mock_cur, "imei_x", -1.29, 36.82, "2026-05-21 10:00:00", + speed=42.5) + assert n == 1 + mock_cur.execute.assert_called_once() + sql = str(mock_cur.execute.call_args) + # The time-guard is the load-bearing detail — verify it's present. + assert "EXCLUDED.gps_time > tracksolid.live_positions.gps_time" in sql diff --git a/ts_shared_rev.py b/ts_shared_rev.py index 72a5504..6f9d42c 100644 --- a/ts_shared_rev.py +++ b/ts_shared_rev.py @@ -237,6 +237,138 @@ def get_active_imeis() -> list[str]: cur.execute("SELECT imei FROM tracksolid.devices WHERE enabled_flag = 1") return [r[0] for r in cur.fetchall()] +def get_stale_imeis(stale_minutes: int = 30) -> list[str]: + """[FIX-M21] IMEIs whose live_positions fix is missing or older than N minutes. + + Used by poll_stale_locations() to feed get_device_locations() with the + set the 60s sweep silently dropped. Ordered oldest-first (NULLs first) + so worst-offenders get the first seats in each 50-IMEI batch. + """ + with get_conn() as conn: + with conn.cursor() as cur: + cur.execute(""" + SELECT d.imei + FROM tracksolid.devices d + LEFT JOIN tracksolid.live_positions lp USING (imei) + WHERE d.enabled_flag = 1 + AND (lp.gps_time IS NULL + OR lp.gps_time < NOW() - (%s || ' minutes')::interval) + ORDER BY lp.gps_time ASC NULLS FIRST + """, (str(stale_minutes),)) + return [r[0] for r in cur.fetchall()] + +def ensure_device(cur, imei: str, device_name: Optional[str] = None) -> None: + """[FIX-M21] Upsert a stub row into tracksolid.devices so FK-constrained + inserts don't fail when ingest paths see an IMEI before sync_devices does. + + Lifted out of webhook_receiver_rev.py to be shareable by every writer + of live_positions / alarms / position_history. Idempotent. + """ + cur.execute( + """ + INSERT INTO tracksolid.devices (imei, device_name, status, created_at, updated_at) + VALUES (%s, %s, 'unknown', NOW(), NOW()) + ON CONFLICT (imei) DO NOTHING + """, + (imei, device_name), + ) + +def upsert_live_position( + cur, + imei: str, + lat, + lng, + gps_time, + speed=None, + direction=None, + acc_status=None, + current_mileage=None, + extras: Optional[dict] = None, +) -> int: + """[FIX-M21] Time-guarded upsert into tracksolid.live_positions. + + Only overwrites the stored row when the incoming gps_time is strictly + newer than what's already there. NULL stored gps_time always loses + (any fix beats no fix). Returns 1 if a row was written/updated, else 0. + + `extras` carries the columns only the 60s sweep emits + (pos_type, confidence, hb_time, gps_signal, gps_num, elec_quantity, + power_value, battery_power_val, tracker_oil, temperature, + device_status, loc_desc). When omitted, those columns are left alone + on update via COALESCE so a sparse caller (e.g. alarm cross-feed) + doesn't blank them out. + """ + if not imei or not gps_time or not is_valid_fix(lat, lng): + return 0 + + extras = extras or {} + cur.execute(""" + INSERT INTO tracksolid.live_positions ( + imei, geom, lat, lng, gps_time, speed, direction, + acc_status, current_mileage, + pos_type, confidence, hb_time, gps_signal, gps_num, + elec_quantity, power_value, battery_power_val, + tracker_oil, temperature, device_status, loc_desc, + recorded_at + ) VALUES ( + %(imei)s, + ST_SetSRID(ST_MakePoint(%(lng)s, %(lat)s), 4326), + %(lat)s, %(lng)s, %(gps_time)s, %(speed)s, %(direction)s, + %(acc_status)s, %(current_mileage)s, + %(pos_type)s, %(confidence)s, %(hb_time)s, %(gps_signal)s, %(gps_num)s, + %(elec_quantity)s, %(power_value)s, %(battery_power_val)s, + %(tracker_oil)s, %(temperature)s, %(device_status)s, %(loc_desc)s, + NOW() + ) + ON CONFLICT (imei) DO UPDATE SET + geom = EXCLUDED.geom, + lat = EXCLUDED.lat, + lng = EXCLUDED.lng, + gps_time = EXCLUDED.gps_time, + speed = COALESCE(EXCLUDED.speed, tracksolid.live_positions.speed), + direction = COALESCE(EXCLUDED.direction, tracksolid.live_positions.direction), + acc_status = COALESCE(EXCLUDED.acc_status, tracksolid.live_positions.acc_status), + current_mileage = COALESCE(EXCLUDED.current_mileage, tracksolid.live_positions.current_mileage), + pos_type = COALESCE(EXCLUDED.pos_type, tracksolid.live_positions.pos_type), + confidence = COALESCE(EXCLUDED.confidence, tracksolid.live_positions.confidence), + hb_time = COALESCE(EXCLUDED.hb_time, tracksolid.live_positions.hb_time), + gps_signal = COALESCE(EXCLUDED.gps_signal, tracksolid.live_positions.gps_signal), + gps_num = COALESCE(EXCLUDED.gps_num, tracksolid.live_positions.gps_num), + elec_quantity = COALESCE(EXCLUDED.elec_quantity, tracksolid.live_positions.elec_quantity), + power_value = COALESCE(EXCLUDED.power_value, tracksolid.live_positions.power_value), + battery_power_val = COALESCE(EXCLUDED.battery_power_val, tracksolid.live_positions.battery_power_val), + tracker_oil = COALESCE(EXCLUDED.tracker_oil, tracksolid.live_positions.tracker_oil), + temperature = COALESCE(EXCLUDED.temperature, tracksolid.live_positions.temperature), + device_status = COALESCE(EXCLUDED.device_status, tracksolid.live_positions.device_status), + loc_desc = COALESCE(EXCLUDED.loc_desc, tracksolid.live_positions.loc_desc), + updated_at = NOW() + WHERE EXCLUDED.gps_time IS NOT NULL + AND (tracksolid.live_positions.gps_time IS NULL + OR EXCLUDED.gps_time > tracksolid.live_positions.gps_time) + """, { + "imei": imei, + "lat": lat, + "lng": lng, + "gps_time": gps_time, + "speed": speed, + "direction": direction, + "acc_status": acc_status, + "current_mileage": current_mileage, + "pos_type": extras.get("pos_type"), + "confidence": extras.get("confidence"), + "hb_time": extras.get("hb_time"), + "gps_signal": extras.get("gps_signal"), + "gps_num": extras.get("gps_num"), + "elec_quantity": extras.get("elec_quantity"), + "power_value": extras.get("power_value"), + "battery_power_val": extras.get("battery_power_val"), + "tracker_oil": extras.get("tracker_oil"), + "temperature": extras.get("temperature"), + "device_status": extras.get("device_status"), + "loc_desc": extras.get("loc_desc"), + }) + return cur.rowcount + def get_active_imeis_by_target() -> dict[str, list[str]]: """[FIX-M19] Group active IMEIs by their Tracksolid sub-account so endpoints that require an `account`/`target` param (e.g. parking) can diff --git a/webhook_receiver_rev.py b/webhook_receiver_rev.py index b0d7108..125a2d7 100644 --- a/webhook_receiver_rev.py +++ b/webhook_receiver_rev.py @@ -55,6 +55,8 @@ from ts_shared_rev import ( clean_ts, is_valid_fix, get_logger, + ensure_device, + upsert_live_position, ) log = get_logger("webhook") @@ -178,22 +180,11 @@ def _make_geom_params(lat, lng): return (lng, lat, lng, lat) -def _ensure_device(cur, imei: str, device_name: Optional[str] = None) -> None: - """Upsert a stub row into tracksolid.devices so FK-constrained inserts don't fail. - - Jimi pushes alarms/GPS for devices that may not yet be in our devices table - (neither API-sync'd nor in the onboarding CSV). Rather than drop the event, - register the IMEI with whatever context the push carried; the nightly - `sync_devices()` and the CSV import fill in the remaining fields later. - """ - cur.execute( - """ - INSERT INTO tracksolid.devices (imei, device_name, status, created_at, updated_at) - VALUES (%s, %s, 'unknown', NOW(), NOW()) - ON CONFLICT (imei) DO NOTHING - """, - (imei, device_name), - ) +# Backwards-compat shim. The implementation was relocated to ts_shared_rev +# (as `ensure_device`) so ingest_movement_rev and any future writer can share +# the FK-guard without re-defining it. Existing call sites in this file +# continue to use the underscore-prefixed name. +_ensure_device = ensure_device # ── Health Check ────────────────────────────────────────────────────────────── @@ -392,6 +383,16 @@ async def push_alarm(request: Request): lat, lng, clean_num(item.get("speed")), )) + + # [FIX-M21] Cross-feed: every Jimi alarm carries lat/lng. + # Refresh live_positions so dashboard markers don't have to + # wait up to 60s for the next polled sweep. Time-guarded + # inside the helper — alarms older than the current fix lose. + upsert_live_position( + cur, imei, lat, lng, alarm_time, + speed=clean_num(item.get("speed")), + ) + cur.execute("RELEASE SAVEPOINT sp") inserted += 1 except Exception: