FIX-M21: alarm cross-feed + stale-IMEI recovery for live_positions
Some checks failed
Static Analysis / static (push) Has been cancelled
Tests / test (push) Has been cancelled

Cherry-pick of c8f5907 (originally FIX-M20 on main) onto
quality-program-2026-04-12 — renamed to FIX-M21 here to avoid clashing
with this branch's existing [FIX-M20] (trip enrichment, commit 144dede).
Behaviour and code are unchanged from the main-branch original; the
annotation tag is the only difference.

Background
----------
A field audit of liveposition.rahamafresh.com on 2026-05-21 surfaced two
freshness gaps that share a single root cause: tracksolid.live_positions
was being written by only one path (the 60s polled sweep), and that path
silently omits devices that don't have a "current" fix in Jimi's
location.list response. Effect on the dashboard:

  * 18 vehicles show OFFLINE for days-to-months — last fix is whatever
    the sweep wrote before Jimi dropped them.
  * 3 vehicles (KDK 780K, KCQ 618K, KCZ 476E) depend on dashcam fallback
    because their dedicated tracker has been silent; the camera's lat/lng
    arrives via /pushalarm webhooks (5,287/day, 100% lat/lng fill) but
    we discard it after writing to tracksolid.alarms.

Verified upstream subscription state: only /pushalarm is registered with
Jimi; the n8n forwarders for /pushgps, /pushtripreport, /pushobd are
inactive. This change uses only data that already arrives.

What's in this commit
---------------------
ts_shared_rev.py
  * upsert_live_position(cur, imei, lat, lng, gps_time, ..., extras=None)
    — single time-guarded upsert all three writers will share. Guards on
    is_valid_fix() (filters Zero-Island and out-of-range) and
    EXCLUDED.gps_time > stored.gps_time so late-arriving alarms or
    webhook retries can't rewind a fresher marker. COALESCE on optional
    columns so sparse callers don't blank dense ones' values.
  * get_stale_imeis(stale_minutes=30) — SELECT enabled_flag=1 devices
    whose live_positions.gps_time is NULL or older than the threshold,
    ordered NULLS FIRST so worst-offenders are in batch #1.
  * ensure_device(cur, imei, device_name=None) — relocated from
    webhook_receiver_rev so every live_positions writer can satisfy the
    FK without re-defining the helper. The original underscore-prefixed
    name in webhook_receiver_rev becomes a backwards-compat alias.

webhook_receiver_rev.py
  * /pushalarm — after the alarm row insert, call upsert_live_position
    with the alarm's lat/lng and alarmTime. Sits inside the existing
    per-item SAVEPOINT, so a cross-feed failure rolls back only that
    one alarm's cross-feed, not the alarm row.

ingest_movement_rev.py
  * poll_live_positions — inline INSERT replaced with upsert_live_position
    (extras dict carries the sweep-only columns). Same data, time-guarded.
  * get_device_locations — inline INSERT replaced; also gains an
    ensure_device call so it can be safely fed arbitrary IMEIs.
  * poll_stale_locations() — new wrapper. Pulls get_stale_imeis() and
    hands it to get_device_locations. Scheduled every 10 minutes plus a
    startup catch-up call. Uses jimi.device.location.get which returns
    *last-known* fix, so devices the 60s sweep drops can be re-warmed.

Expected post-deploy effect (estimates, see
06_live_location/260521_timescale_location_upgrade_major.md §4)
  * ~1,100-1,600 additional live_positions upserts/day from the alarm
    cross-feed, after the time-guard rejects ~70-80% of races vs the
    fresher 60s sweep.
  * The 3 camera-fallback plates flip to "seconds-after-alarm" cadence
    (JC400P emits ~107 alarms/day per device).
  * 8-14 of the 24 OFFLINE plates expected to recover via location.get's
    last-known-fix path within the first 30 minutes.
  * Dashboard's "Offline 24h+" KPI: 24 → 10-14 within the first hour.
  * No 06_live_location code changes required — reads through
    reporting.v_live_positions transparently.

Tests
-----
12 webhook integration tests pass (3 new: cross-feed fires on valid fix;
skips without lat/lng; skips Zero-Island). 8 new unit tests in
test_stale_imeis.py cover the stale selector, the poll wrapper, and the
time-guard contract on upsert_live_position. Full suite: 77 passed.

Deployment
----------
No schema migration. Both webhook_receiver and ingest_movement
containers must be rebuilt — source is image-baked, not bind-mounted.
Rollback is git revert + rebuild.

Plan & monitoring SQL: 06_live_location/260521_timescale_location_upgrade_major.md
Verification playbook:  06_live_location/260521_timescale_location_upgrade_verification.md

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
david kiania 2026-05-21 21:05:26 +03:00
parent 3b79d5a62e
commit 2309464ab8
6 changed files with 402 additions and 66 deletions

View file

@ -44,6 +44,17 @@ REVISIONS (QA-Verified):
(Nominatim), and caches vehicle_plate from devices. Closes the
NULL-column gaps that were inherent to jimi.device.track.mileage
(it does not return coordinates, idle, or trip sequence).
[FIX-M21] Live-position freshness: (a) /pushalarm now cross-feeds its
lat/lng into live_positions via the new shared
upsert_live_position() helper, time-guarded so older
timestamps can't rewind fresher fixes; (b) new
poll_stale_locations() runs every 10 min and calls
get_device_locations() for IMEIs whose live_positions.gps_time
is missing or > 30 min stale, recovering devices the 60s
sweep silently drops. Both the 60s sweep and
get_device_locations() now share the same time-guarded
upsert. _ensure_device ensure_device relocated to
ts_shared_rev for FK-guard reuse.
"""
@ -61,6 +72,7 @@ from ts_shared_rev import (
get_active_imeis,
get_active_imeis_by_target,
get_conn,
get_stale_imeis,
get_token,
is_valid_fix,
log_ingestion,
@ -69,9 +81,11 @@ from ts_shared_rev import (
clean_int,
clean_ts,
get_logger,
ensure_device,
reverse_geocode,
safe_task,
setup_shutdown,
upsert_live_position,
)
log = get_logger("movement")
@ -224,30 +238,30 @@ def poll_live_positions():
gps_num = clean_int(p.get("gpsNum"))
current_mileage = clean_num(p.get("currentMileage"))
cur.execute("""
INSERT INTO tracksolid.live_positions (
imei, geom, lat, lng, pos_type, confidence, gps_time, hb_time,
speed, direction, acc_status, gps_signal, gps_num,
elec_quantity, power_value, battery_power_val, tracker_oil,
temperature, current_mileage, device_status, loc_desc, recorded_at
) VALUES (
%s, ST_SetSRID(ST_MakePoint(%s, %s), 4326), %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW()
# [FIX-M21] Time-guarded upsert via shared helper so the
# 60s sweep, the alarm cross-feed, and get_device_locations
# all agree about freshness ordering. The sweep is normally
# the freshest source so the guard rarely rejects its writes.
upserted += upsert_live_position(
cur, imei, lat, lng, gps_time,
speed=speed, direction=direction,
acc_status=acc_status,
current_mileage=current_mileage,
extras={
"pos_type": clean(p.get("posType")),
"confidence": clean_int(p.get("confidence")),
"hb_time": clean_ts(p.get("hbTime")),
"gps_signal": clean_int(p.get("gpsSignal")),
"gps_num": gps_num,
"elec_quantity": clean_num(p.get("electQuantity")),
"power_value": clean_num(p.get("powerValue")),
"battery_power_val": clean_num(p.get("batteryPowerVal")),
"tracker_oil": clean(p.get("trackerOil")),
"temperature": clean_num(p.get("temperature")),
"device_status": clean(p.get("status")),
"loc_desc": clean(p.get("locDesc")),
},
)
ON CONFLICT (imei) DO UPDATE SET
geom=EXCLUDED.geom, lat=EXCLUDED.lat, lng=EXCLUDED.lng,
gps_time=EXCLUDED.gps_time, speed=EXCLUDED.speed, direction=EXCLUDED.direction,
acc_status=EXCLUDED.acc_status, current_mileage=EXCLUDED.current_mileage,
updated_at=NOW()
""", (
imei, lng, lat, lat, lng, clean(p.get("posType")), clean_int(p.get("confidence")),
gps_time, clean_ts(p.get("hbTime")), speed,
direction, acc_status, clean_int(p.get("gpsSignal")),
gps_num, clean_num(p.get("electQuantity")), clean_num(p.get("powerValue")),
clean_num(p.get("batteryPowerVal")), clean(p.get("trackerOil")), clean_num(p.get("temperature")),
current_mileage, clean(p.get("status")), clean(p.get("locDesc"))
))
upserted += cur.rowcount
# History (Hypertable Source)
if gps_time:
@ -617,39 +631,44 @@ def get_device_locations(imeis: list) -> int:
if not imei or not is_valid_fix(lat, lng):
continue
cur.execute("""
INSERT INTO tracksolid.live_positions (
imei, geom, lat, lng, speed, direction,
gps_time, acc_status, current_mileage, recorded_at
) VALUES (
%s, ST_SetSRID(ST_MakePoint(%s, %s), 4326),
%s, %s, %s, %s, %s, %s, %s, NOW()
)
ON CONFLICT (imei) DO UPDATE SET
geom = EXCLUDED.geom,
lat = EXCLUDED.lat,
lng = EXCLUDED.lng,
speed = EXCLUDED.speed,
direction = EXCLUDED.direction,
gps_time = EXCLUDED.gps_time,
acc_status = EXCLUDED.acc_status,
current_mileage = EXCLUDED.current_mileage,
updated_at = NOW()
""", (
imei, lng, lat, lat, lng,
clean_num(p.get("speed")),
clean_num(p.get("direction")),
# [FIX-M21] FK guard — this path can see IMEIs the daily
# sync_devices hasn't picked up yet (especially when used
# as the stale-IMEI rescue path).
ensure_device(cur, imei, clean(p.get("deviceName")))
upserted += upsert_live_position(
cur, imei, lat, lng,
clean_ts(p.get("gpsTime")),
clean(p.get("accStatus")),
clean_num(p.get("currentMileage")),
))
upserted += 1
speed=clean_num(p.get("speed")),
direction=clean_num(p.get("direction")),
acc_status=clean(p.get("accStatus")),
current_mileage=clean_num(p.get("currentMileage")),
)
conn.commit()
log.info("get_device_locations: %d positions refreshed.", upserted)
return upserted
# ── 7. Stale-IMEI Recovery — POLL-04 ─────────────────────────────────────────
def poll_stale_locations():
"""[FIX-M21] Refresh live_positions for IMEIs whose stored gps_time is
missing or older than 30 minutes.
Complements poll_live_positions (the 60s sweep), which silently omits
devices Jimi's location.list endpoint doesn't return. jimi.device.location.get
returns *last-known* fix per IMEI, so this path can re-warm devices
the sweep has dropped.
"""
stale = get_stale_imeis(stale_minutes=30)
if not stale:
log.info("poll_stale_locations: no stale IMEIs.")
return
log.info("poll_stale_locations: refreshing %d stale IMEI(s).", len(stale))
get_device_locations(stale)
# ── Main Loop ─────────────────────────────────────────────────────────────────
def main():
@ -661,12 +680,14 @@ def main():
safe_task(poll_trips, log)()
safe_task(poll_parking, log)()
safe_task(poll_track_list, log)()
safe_task(poll_stale_locations, log)()
# Schedule
schedule.every(60).seconds.do(safe_task(poll_live_positions, log))
schedule.every(15).minutes.do(safe_task(poll_trips, log))
schedule.every(15).minutes.do(safe_task(poll_parking, log))
schedule.every(30).minutes.do(safe_task(poll_track_list, log)) # [FIX-M14]
schedule.every(10).minutes.do(safe_task(poll_stale_locations, log)) # [FIX-M21]
schedule.every().day.at("02:00").do(safe_task(sync_devices, log))
while True:

View file

@ -107,3 +107,27 @@ WEBHOOK_ALARM_NULL_TYPE = {
"alarmType": None,
"gateTime": "2024-04-12 07:30:00",
}
# Alarm with no lat/lng — cross-feed (FIX-M21) must skip live_positions
# but still write the alarm row.
WEBHOOK_ALARM_NO_POSITION = {
"deviceImei": "123456789012345",
"alarmType": "4",
"alarmName": "Speeding",
"gateTime": "2024-04-12 07:30:00",
"lat": None,
"lng": None,
"speed": 0.0,
}
# Alarm with Zero-Island (0, 0) coordinates — is_valid_fix must reject;
# alarm row still writes, live_positions cross-feed must NOT fire.
WEBHOOK_ALARM_ZERO_ISLAND = {
"deviceImei": "123456789012345",
"alarmType": "4",
"alarmName": "Speeding",
"gateTime": "2024-04-12 07:30:00",
"lat": 0,
"lng": 0,
"speed": 0.0,
}

View file

@ -20,6 +20,8 @@ import webhook_receiver_rev
from tests.fixtures.api_responses import (
WEBHOOK_ALARM_PAYLOAD,
WEBHOOK_ALARM_NULL_TYPE,
WEBHOOK_ALARM_NO_POSITION,
WEBHOOK_ALARM_ZERO_ISLAND,
WEBHOOK_TRIP_BCD_PAYLOAD,
WEBHOOK_TRIP_ISO_PAYLOAD,
WEBHOOK_OBD_PAYLOAD,
@ -96,6 +98,50 @@ class TestPushAlarm:
assert response.status_code == 200
assert response.json()["code"] == 0
def test_alarm_cross_feeds_live_position(self, client, mock_db):
"""FIX-M21: a valid alarm must additionally upsert live_positions."""
mock_conn, mock_cur = mock_db
data_list = json.dumps([WEBHOOK_ALARM_PAYLOAD])
response = client.post("/pushalarm", data={"token": "", "data_list": data_list})
assert response.status_code == 200
# Exactly one INSERT INTO live_positions should have fired.
lp_inserts = [
c for c in mock_cur.execute.call_args_list
if "tracksolid.live_positions" in str(c) and "INSERT" in str(c)
]
assert len(lp_inserts) == 1, "Cross-feed must upsert live_positions exactly once"
def test_alarm_without_lat_lng_skips_cross_feed(self, client, mock_db):
"""FIX-M21: an alarm without lat/lng must NOT touch live_positions
(but must still insert the alarm row)."""
mock_conn, mock_cur = mock_db
data_list = json.dumps([WEBHOOK_ALARM_NO_POSITION])
response = client.post("/pushalarm", data={"token": "", "data_list": data_list})
assert response.status_code == 200
lp_inserts = [
c for c in mock_cur.execute.call_args_list
if "tracksolid.live_positions" in str(c) and "INSERT" in str(c)
]
assert len(lp_inserts) == 0, "No live_positions write without a valid fix"
alarm_inserts = [
c for c in mock_cur.execute.call_args_list
if "tracksolid.alarms" in str(c) and "INSERT" in str(c)
]
assert len(alarm_inserts) == 1, "Alarm row must still write"
def test_alarm_with_zero_island_skips_cross_feed(self, client, mock_db):
"""FIX-M21: a (0, 0) fix must NOT propagate to live_positions —
is_valid_fix in the shared helper guards Zero Island."""
mock_conn, mock_cur = mock_db
data_list = json.dumps([WEBHOOK_ALARM_ZERO_ISLAND])
response = client.post("/pushalarm", data={"token": "", "data_list": data_list})
assert response.status_code == 200
lp_inserts = [
c for c in mock_cur.execute.call_args_list
if "tracksolid.live_positions" in str(c) and "INSERT" in str(c)
]
assert len(lp_inserts) == 0, "Zero-Island fix must not reach live_positions"
class TestPushTripReport:
def test_bcd_timestamp_parsed(self, client, mock_db):

View file

@ -0,0 +1,112 @@
"""Unit tests for the FIX-M21 stale-IMEI recovery helpers.
Covers:
- ts_shared_rev.get_stale_imeis the SQL selector
- ingest_movement_rev.poll_stale_locations the scheduler wrapper
"""
import sys
import os
import pytest
from unittest.mock import MagicMock, patch
from contextlib import contextmanager
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
os.environ.setdefault("TRACKSOLID_APP_KEY", "test_key")
os.environ.setdefault("TRACKSOLID_APP_SECRET", "test_secret")
os.environ.setdefault("TRACKSOLID_USER_ID", "test_user")
os.environ.setdefault("TRACKSOLID_PWD_MD5", "test_md5")
os.environ.setdefault("DATABASE_URL", "postgresql://test:test@localhost:5432/test")
def _make_mock_conn(rows=None):
"""Cursor/connection double that returns `rows` from fetchall()."""
mock_cur = MagicMock()
mock_cur.fetchall.return_value = rows or []
mock_conn = MagicMock()
mock_conn.cursor.return_value.__enter__ = lambda s: mock_cur
mock_conn.cursor.return_value.__exit__ = MagicMock(return_value=False)
return mock_conn, mock_cur
@contextmanager
def _conn_ctx(mock_conn):
yield mock_conn
class TestGetStaleImeis:
def test_returns_devices_with_old_gps_time(self):
from ts_shared_rev import get_stale_imeis
mock_conn, mock_cur = _make_mock_conn(rows=[("imei_a",), ("imei_b",)])
with patch("ts_shared_rev.get_conn") as mock_get_conn:
mock_get_conn.return_value = _conn_ctx(mock_conn)
result = get_stale_imeis(stale_minutes=30)
assert result == ["imei_a", "imei_b"]
sql = str(mock_cur.execute.call_args)
assert "enabled_flag = 1" in sql
assert "INTERVAL" in sql.upper() or "interval" in sql
assert "NULLS FIRST" in sql
def test_returns_empty_when_no_stale(self):
from ts_shared_rev import get_stale_imeis
mock_conn, _ = _make_mock_conn(rows=[])
with patch("ts_shared_rev.get_conn") as mock_get_conn:
mock_get_conn.return_value = _conn_ctx(mock_conn)
assert get_stale_imeis() == []
class TestPollStaleLocations:
def test_noop_when_empty(self):
"""If no IMEIs are stale, get_device_locations must NOT be called."""
import ingest_movement_rev as mod
with patch.object(mod, "get_stale_imeis", return_value=[]), \
patch.object(mod, "get_device_locations") as mock_refresh:
mod.poll_stale_locations()
mock_refresh.assert_not_called()
def test_invokes_refresh_with_stale_list(self):
"""When stale IMEIs are present, get_device_locations is called once
with the exact list returned by get_stale_imeis."""
import ingest_movement_rev as mod
stale = ["imei_a", "imei_b", "imei_c"]
with patch.object(mod, "get_stale_imeis", return_value=stale), \
patch.object(mod, "get_device_locations") as mock_refresh:
mod.poll_stale_locations()
mock_refresh.assert_called_once_with(stale)
class TestUpsertLivePositionGuards:
"""Spot-checks on the time-guard contract — covers what the dashboard
relies on (no rewinding the marker on stale alarm arrivals)."""
def test_skips_invalid_fix(self):
from ts_shared_rev import upsert_live_position
mock_cur = MagicMock()
# Zero-Island
assert upsert_live_position(mock_cur, "imei_x", 0, 0, "2026-05-21 10:00:00") == 0
mock_cur.execute.assert_not_called()
def test_skips_missing_gps_time(self):
from ts_shared_rev import upsert_live_position
mock_cur = MagicMock()
assert upsert_live_position(mock_cur, "imei_x", -1.29, 36.82, None) == 0
mock_cur.execute.assert_not_called()
def test_skips_missing_imei(self):
from ts_shared_rev import upsert_live_position
mock_cur = MagicMock()
assert upsert_live_position(mock_cur, None, -1.29, 36.82, "2026-05-21 10:00:00") == 0
mock_cur.execute.assert_not_called()
def test_executes_upsert_for_valid_fix(self):
from ts_shared_rev import upsert_live_position
mock_cur = MagicMock()
mock_cur.rowcount = 1
n = upsert_live_position(mock_cur, "imei_x", -1.29, 36.82, "2026-05-21 10:00:00",
speed=42.5)
assert n == 1
mock_cur.execute.assert_called_once()
sql = str(mock_cur.execute.call_args)
# The time-guard is the load-bearing detail — verify it's present.
assert "EXCLUDED.gps_time > tracksolid.live_positions.gps_time" in sql

View file

@ -237,6 +237,138 @@ def get_active_imeis() -> list[str]:
cur.execute("SELECT imei FROM tracksolid.devices WHERE enabled_flag = 1")
return [r[0] for r in cur.fetchall()]
def get_stale_imeis(stale_minutes: int = 30) -> list[str]:
"""[FIX-M21] IMEIs whose live_positions fix is missing or older than N minutes.
Used by poll_stale_locations() to feed get_device_locations() with the
set the 60s sweep silently dropped. Ordered oldest-first (NULLs first)
so worst-offenders get the first seats in each 50-IMEI batch.
"""
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT d.imei
FROM tracksolid.devices d
LEFT JOIN tracksolid.live_positions lp USING (imei)
WHERE d.enabled_flag = 1
AND (lp.gps_time IS NULL
OR lp.gps_time < NOW() - (%s || ' minutes')::interval)
ORDER BY lp.gps_time ASC NULLS FIRST
""", (str(stale_minutes),))
return [r[0] for r in cur.fetchall()]
def ensure_device(cur, imei: str, device_name: Optional[str] = None) -> None:
"""[FIX-M21] Upsert a stub row into tracksolid.devices so FK-constrained
inserts don't fail when ingest paths see an IMEI before sync_devices does.
Lifted out of webhook_receiver_rev.py to be shareable by every writer
of live_positions / alarms / position_history. Idempotent.
"""
cur.execute(
"""
INSERT INTO tracksolid.devices (imei, device_name, status, created_at, updated_at)
VALUES (%s, %s, 'unknown', NOW(), NOW())
ON CONFLICT (imei) DO NOTHING
""",
(imei, device_name),
)
def upsert_live_position(
cur,
imei: str,
lat,
lng,
gps_time,
speed=None,
direction=None,
acc_status=None,
current_mileage=None,
extras: Optional[dict] = None,
) -> int:
"""[FIX-M21] Time-guarded upsert into tracksolid.live_positions.
Only overwrites the stored row when the incoming gps_time is strictly
newer than what's already there. NULL stored gps_time always loses
(any fix beats no fix). Returns 1 if a row was written/updated, else 0.
`extras` carries the columns only the 60s sweep emits
(pos_type, confidence, hb_time, gps_signal, gps_num, elec_quantity,
power_value, battery_power_val, tracker_oil, temperature,
device_status, loc_desc). When omitted, those columns are left alone
on update via COALESCE so a sparse caller (e.g. alarm cross-feed)
doesn't blank them out.
"""
if not imei or not gps_time or not is_valid_fix(lat, lng):
return 0
extras = extras or {}
cur.execute("""
INSERT INTO tracksolid.live_positions (
imei, geom, lat, lng, gps_time, speed, direction,
acc_status, current_mileage,
pos_type, confidence, hb_time, gps_signal, gps_num,
elec_quantity, power_value, battery_power_val,
tracker_oil, temperature, device_status, loc_desc,
recorded_at
) VALUES (
%(imei)s,
ST_SetSRID(ST_MakePoint(%(lng)s, %(lat)s), 4326),
%(lat)s, %(lng)s, %(gps_time)s, %(speed)s, %(direction)s,
%(acc_status)s, %(current_mileage)s,
%(pos_type)s, %(confidence)s, %(hb_time)s, %(gps_signal)s, %(gps_num)s,
%(elec_quantity)s, %(power_value)s, %(battery_power_val)s,
%(tracker_oil)s, %(temperature)s, %(device_status)s, %(loc_desc)s,
NOW()
)
ON CONFLICT (imei) DO UPDATE SET
geom = EXCLUDED.geom,
lat = EXCLUDED.lat,
lng = EXCLUDED.lng,
gps_time = EXCLUDED.gps_time,
speed = COALESCE(EXCLUDED.speed, tracksolid.live_positions.speed),
direction = COALESCE(EXCLUDED.direction, tracksolid.live_positions.direction),
acc_status = COALESCE(EXCLUDED.acc_status, tracksolid.live_positions.acc_status),
current_mileage = COALESCE(EXCLUDED.current_mileage, tracksolid.live_positions.current_mileage),
pos_type = COALESCE(EXCLUDED.pos_type, tracksolid.live_positions.pos_type),
confidence = COALESCE(EXCLUDED.confidence, tracksolid.live_positions.confidence),
hb_time = COALESCE(EXCLUDED.hb_time, tracksolid.live_positions.hb_time),
gps_signal = COALESCE(EXCLUDED.gps_signal, tracksolid.live_positions.gps_signal),
gps_num = COALESCE(EXCLUDED.gps_num, tracksolid.live_positions.gps_num),
elec_quantity = COALESCE(EXCLUDED.elec_quantity, tracksolid.live_positions.elec_quantity),
power_value = COALESCE(EXCLUDED.power_value, tracksolid.live_positions.power_value),
battery_power_val = COALESCE(EXCLUDED.battery_power_val, tracksolid.live_positions.battery_power_val),
tracker_oil = COALESCE(EXCLUDED.tracker_oil, tracksolid.live_positions.tracker_oil),
temperature = COALESCE(EXCLUDED.temperature, tracksolid.live_positions.temperature),
device_status = COALESCE(EXCLUDED.device_status, tracksolid.live_positions.device_status),
loc_desc = COALESCE(EXCLUDED.loc_desc, tracksolid.live_positions.loc_desc),
updated_at = NOW()
WHERE EXCLUDED.gps_time IS NOT NULL
AND (tracksolid.live_positions.gps_time IS NULL
OR EXCLUDED.gps_time > tracksolid.live_positions.gps_time)
""", {
"imei": imei,
"lat": lat,
"lng": lng,
"gps_time": gps_time,
"speed": speed,
"direction": direction,
"acc_status": acc_status,
"current_mileage": current_mileage,
"pos_type": extras.get("pos_type"),
"confidence": extras.get("confidence"),
"hb_time": extras.get("hb_time"),
"gps_signal": extras.get("gps_signal"),
"gps_num": extras.get("gps_num"),
"elec_quantity": extras.get("elec_quantity"),
"power_value": extras.get("power_value"),
"battery_power_val": extras.get("battery_power_val"),
"tracker_oil": extras.get("tracker_oil"),
"temperature": extras.get("temperature"),
"device_status": extras.get("device_status"),
"loc_desc": extras.get("loc_desc"),
})
return cur.rowcount
def get_active_imeis_by_target() -> dict[str, list[str]]:
"""[FIX-M19] Group active IMEIs by their Tracksolid sub-account so
endpoints that require an `account`/`target` param (e.g. parking) can

View file

@ -55,6 +55,8 @@ from ts_shared_rev import (
clean_ts,
is_valid_fix,
get_logger,
ensure_device,
upsert_live_position,
)
log = get_logger("webhook")
@ -178,22 +180,11 @@ def _make_geom_params(lat, lng):
return (lng, lat, lng, lat)
def _ensure_device(cur, imei: str, device_name: Optional[str] = None) -> None:
"""Upsert a stub row into tracksolid.devices so FK-constrained inserts don't fail.
Jimi pushes alarms/GPS for devices that may not yet be in our devices table
(neither API-sync'd nor in the onboarding CSV). Rather than drop the event,
register the IMEI with whatever context the push carried; the nightly
`sync_devices()` and the CSV import fill in the remaining fields later.
"""
cur.execute(
"""
INSERT INTO tracksolid.devices (imei, device_name, status, created_at, updated_at)
VALUES (%s, %s, 'unknown', NOW(), NOW())
ON CONFLICT (imei) DO NOTHING
""",
(imei, device_name),
)
# Backwards-compat shim. The implementation was relocated to ts_shared_rev
# (as `ensure_device`) so ingest_movement_rev and any future writer can share
# the FK-guard without re-defining it. Existing call sites in this file
# continue to use the underscore-prefixed name.
_ensure_device = ensure_device
# ── Health Check ──────────────────────────────────────────────────────────────
@ -392,6 +383,16 @@ async def push_alarm(request: Request):
lat, lng,
clean_num(item.get("speed")),
))
# [FIX-M21] Cross-feed: every Jimi alarm carries lat/lng.
# Refresh live_positions so dashboard markers don't have to
# wait up to 60s for the next polled sweep. Time-guarded
# inside the helper — alarms older than the current fix lose.
upsert_live_position(
cur, imei, lat, lng, alarm_time,
speed=clean_num(item.get("speed")),
)
cur.execute("RELEASE SAVEPOINT sp")
inserted += 1
except Exception: