tracksolid_timescale_grafan.../ingest_movement_rev.py
david kiania 76f6915e61 feat(stack): consolidate 7→4 services (merge pollers, drop pgbouncer/grafana)
Collapse the backend from 7 Coolify services to 4 app services + the DB.

- Merge ingest_movement + ingest_events into a single ingest_worker:
  split each poller's main() into reusable startup_catchup()/register_jobs()
  and drive both from one schedule loop in new ingest_worker_rev.py
  (standalone entrypoints retained for local debug).
- docker-compose.yaml: replace the two poller services with ingest_worker;
  remove the pgbouncer service (dormant; transaction-mode pooling is unsafe
  for the advisory-lock'd v_trips refresher) and the grafana service +
  grafana-data volume (redundant with the FleetOps SPA).
- Add reporting.v_ingest_health (migration 19) + dashboard_api GET
  /health/ingest as the pipeline-freshness surface that replaces Grafana's
  health panels.

webhook_receiver stays isolated so a poller fault can't drop inbound pushes.
timescale_db and db_backup are unchanged.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-10 21:41:05 +03:00

706 lines
No EOL
36 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
ingest_movement_rev.py — Fireside Communications · Tracksolid Movement Pipeline
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
RESPONSIBILITY: High-frequency telemetry (GPS), trip summaries, and parking.
REVISIONS (QA-Verified):
[FIX-M01] sync_devices: Atomic transaction for registry sync.
[FIX-M03] is_valid_fix: Filters 0.0 coordinates (Zero Island bug).
[FIX-M05] Batching: Groups 50 IMEIs per API call (API Limit Compliance).
[FIX-M07] Signal Handling: Clean DB pool closure on SIGTERM/SIGINT.
[FIX-M08] Atomic Logging: log_ingestion happens within the data transaction.
[FIX-11] Uses shared safe_task/setup_shutdown from ts_shared_rev (DRY).
[FIX-M09] Trips: Captures runTimeSecond and maxSpeed from API.
[FIX-M10] Parking: New poll_parking via jimi.open.platform.report.parking.
[FIX-M11] BUG-02: distance_m was stored in millimetres due to erroneous
* 1000 on an already-metric API value. Removed multiplication;
column renamed to distance_km in migration 04. Both poll_trips
and push_trip_report (webhook) corrected.
[FIX-M12] BUG-02: Renamed distance_m → distance_km in all SQL to match
migration 04 schema change.
[FIX-M13] POLL-02: Parking poll was returning 0 rows — added missing
acc_type=0 and account params; fixed response field durSecond
(was mapped as 'seconds').
[FIX-M14] POLL-01: New poll_track_list() — calls jimi.device.track.list
per device every 30 minutes to capture high-resolution GPS
waypoints between the 60-second fleet sweep snapshots. Writes to
position_history with source='track_list'. Fills gaps in route
reconstruction and enables accurate path drawing on maps.
[FIX-M15] POLL-03: New get_device_locations() utility — calls
jimi.device.location.get for up to 50 specific IMEIs on demand.
Used for precision refreshes (alarm enrichment, stale device
recovery) without waiting for the next full fleet sweep.
[FIX-M18] sync_devices: Pull vehicleName / vehicleNumber / driverName /
driverPhone / sim from jimi.track.device.detail first (list
endpoint returns null for these even when populated via
jimi.open.device.update).
[FIX-M19] Multi-account support: the fleet is split across multiple
Tracksolid sub-accounts. sync_devices, poll_live_positions
and poll_parking now iterate every target in TRACKSOLID_TARGETS
and dedupe/scope per-target before writing.
[FIX-M20] Trip enrichment: poll_trips now backfills start_geom/end_geom/
route_geom/waypoints_count from position_history at insert
time, extracts idleSecond, reverse-geocodes start/end addresses
(Nominatim), and caches vehicle_plate from devices. Closes the
NULL-column gaps that were inherent to jimi.device.track.mileage
(it does not return coordinates, idle, or trip sequence).
[FIX-M21] Live-position freshness: (a) /pushalarm now cross-feeds its
lat/lng into live_positions via the new shared
upsert_live_position() helper, time-guarded so older
timestamps can't rewind fresher fixes; (b) new
poll_stale_locations() runs every 10 min and calls
get_device_locations() for IMEIs whose live_positions.gps_time
is missing or > 30 min stale, recovering devices the 60s
sweep silently drops. Both the 60s sweep and
get_device_locations() now share the same time-guarded
upsert. _ensure_device → ensure_device relocated to
ts_shared_rev for FK-guard reuse.
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
"""
import time
from concurrent.futures import ThreadPoolExecutor
import schedule
from datetime import datetime, timezone, timedelta
from psycopg2.extras import execute_values
from ts_shared_rev import (
TARGET_ACCOUNT,
TARGETS,
api_post,
get_active_imeis,
get_active_imeis_by_target,
get_conn,
get_stale_imeis,
get_token,
is_valid_fix,
log_ingestion,
clean,
clean_num,
clean_int,
clean_ts,
get_logger,
ensure_device,
reverse_geocode,
safe_task,
setup_shutdown,
upsert_live_position,
)
log = get_logger("movement")
setup_shutdown(log)
# ── 1. Device Registry Sync (Daily) ──────────────────────────────────────────
def sync_devices():
log.info("Syncing device registry across %d target(s): %s", len(TARGETS), TARGETS)
t0, token = time.time(), get_token()
if not token: return
# [FIX-M19] Fleet is split across multiple sub-accounts. Aggregate the
# device list from every configured target and dedupe by IMEI.
devices_by_imei: dict[str, dict] = {}
for target in TARGETS:
resp = api_post("jimi.user.device.list", {"target": target}, token)
if resp.get("code") != 0:
log.warning("device.list failed for target=%s: code=%s msg=%s",
target, resp.get("code"), resp.get("message"))
continue
for d in (resp.get("result") or []):
imei = d.get("imei")
if imei:
devices_by_imei[imei] = d
devices = list(devices_by_imei.values())
log.info("Aggregated %d unique devices across targets.", len(devices))
upserted = 0
# Fetch per-device detail in parallel — previously an N+1 blocker where
# 80 devices × ~300 ms/call ≈ 24 s serial. 8 workers brings it to ~3 s.
# Gated at 8 to stay under API rate-limit (1006) headroom.
def _fetch_detail(imei: str) -> dict:
detail_resp = api_post("jimi.track.device.detail", {"imei": imei}, token)
return detail_resp.get("result") or {} if detail_resp.get("code") == 0 else {}
imeis = [d.get("imei") for d in devices if d.get("imei")]
with ThreadPoolExecutor(max_workers=8) as pool:
details = dict(zip(imeis, pool.map(_fetch_detail, imeis)))
with get_conn() as conn:
with conn.cursor() as cur:
for d in devices:
imei = d.get("imei")
if not imei: continue
dtl = details.get(imei, {})
cur.execute("""
INSERT INTO tracksolid.devices (
imei, device_name, mc_type, mc_type_use_scope,
vehicle_name, vehicle_number, vehicle_models, vehicle_icon,
vin, engine_number, vehicle_brand, fuel_100km,
driver_name, driver_phone, sim, iccid, imsi,
account, customer_name, device_group_id, device_group,
activation_time, expiration, enabled_flag, status,
current_mileage_km, last_synced_at
) VALUES (
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW()
)
ON CONFLICT (imei) DO UPDATE SET
device_name = EXCLUDED.device_name,
mc_type = EXCLUDED.mc_type,
mc_type_use_scope = EXCLUDED.mc_type_use_scope,
vehicle_name = EXCLUDED.vehicle_name,
vehicle_number = EXCLUDED.vehicle_number,
vehicle_models = EXCLUDED.vehicle_models,
vehicle_icon = EXCLUDED.vehicle_icon,
vin = EXCLUDED.vin,
engine_number = EXCLUDED.engine_number,
vehicle_brand = EXCLUDED.vehicle_brand,
fuel_100km = EXCLUDED.fuel_100km,
driver_name = EXCLUDED.driver_name,
driver_phone = EXCLUDED.driver_phone,
sim = EXCLUDED.sim,
iccid = EXCLUDED.iccid,
imsi = EXCLUDED.imsi,
account = EXCLUDED.account,
customer_name = EXCLUDED.customer_name,
device_group_id = EXCLUDED.device_group_id,
device_group = EXCLUDED.device_group,
activation_time = EXCLUDED.activation_time,
expiration = EXCLUDED.expiration,
enabled_flag = EXCLUDED.enabled_flag,
status = EXCLUDED.status,
current_mileage_km = EXCLUDED.current_mileage_km,
last_synced_at = NOW(),
updated_at = NOW()
""", (
# [FIX-M18] vehicleName/vehicleNumber/driverName/driverPhone/sim
# only surface via jimi.track.device.detail — list returns null.
imei, clean(d.get("deviceName")), clean(d.get("mcType")), clean(d.get("mcTypeUseScope")),
clean(dtl.get("vehicleName") or d.get("vehicleName")),
clean(dtl.get("vehicleNumber") or d.get("vehicleNumber")),
clean(d.get("vehicleModels")), clean(d.get("vehicleIcon")),
clean(dtl.get("vin")), clean(dtl.get("engineNumber")), clean(dtl.get("vehicleBrand")), clean_num(dtl.get("fuel_100km")),
clean(dtl.get("driverName") or d.get("driverName")),
clean(dtl.get("driverPhone") or d.get("driverPhone")),
clean(dtl.get("sim") or d.get("sim")),
clean(dtl.get("iccid")), clean(dtl.get("imsi")),
clean(dtl.get("account")), clean(dtl.get("customerName")), clean(d.get("deviceGroupId")), clean(d.get("deviceGroup")),
clean_ts(d.get("activationTime")), clean_ts(d.get("expiration")), clean_int(d.get("enabledFlag", 1)),
clean(dtl.get("status", "active")), clean_num(dtl.get("currentMileage"))
))
upserted += 1
log_ingestion(cur, "jimi.user.device.list+detail", len(devices), upserted, 0, int((time.time()-t0)*1000), True)
conn.commit()
log.info("Registry sync: %d devices updated.", upserted)
# ── 2. Live Positions (Every 60s) ─────────────────────────────────────────────
def poll_live_positions():
t0, token = time.time(), get_token()
if not token: return
# [FIX-M19] Iterate every target and dedupe by IMEI (keep last).
positions_by_imei: dict[str, dict] = {}
for target in TARGETS:
resp = api_post("jimi.user.device.location.list",
{"target": target, "map_type": "GOOGLE"}, token)
if resp.get("code") != 0:
log.warning("location.list failed for target=%s: code=%s msg=%s",
target, resp.get("code"), resp.get("message"))
continue
for p in (resp.get("result") or []):
imei = p.get("imei")
if imei:
positions_by_imei[imei] = p
positions = list(positions_by_imei.values())
upserted, inserted = 0, 0
with get_conn() as conn:
with conn.cursor() as cur:
for p in positions:
try:
cur.execute("SAVEPOINT sp")
imei, lat, lng = p.get("imei"), clean_num(p.get("lat")), clean_num(p.get("lng"))
if not imei or not is_valid_fix(lat, lng):
cur.execute("RELEASE SAVEPOINT sp")
continue
gps_time = clean_ts(p.get("gpsTime"))
speed = clean_num(p.get("speed"))
direction = clean_num(p.get("direction"))
acc_status = clean(p.get("accStatus"))
gps_num = clean_int(p.get("gpsNum"))
current_mileage = clean_num(p.get("currentMileage"))
# [FIX-M21] Time-guarded upsert via shared helper so the
# 60s sweep, the alarm cross-feed, and get_device_locations
# all agree about freshness ordering. The sweep is normally
# the freshest source so the guard rarely rejects its writes.
upserted += upsert_live_position(
cur, imei, lat, lng, gps_time,
speed=speed, direction=direction,
acc_status=acc_status,
current_mileage=current_mileage,
extras={
"pos_type": clean(p.get("posType")),
"confidence": clean_int(p.get("confidence")),
"hb_time": clean_ts(p.get("hbTime")),
"gps_signal": clean_int(p.get("gpsSignal")),
"gps_num": gps_num,
"elec_quantity": clean_num(p.get("electQuantity")),
"power_value": clean_num(p.get("powerValue")),
"battery_power_val": clean_num(p.get("batteryPowerVal")),
"tracker_oil": clean(p.get("trackerOil")),
"temperature": clean_num(p.get("temperature")),
"device_status": clean(p.get("status")),
"loc_desc": clean(p.get("locDesc")),
},
)
# History (Hypertable Source)
if gps_time:
cur.execute("""
INSERT INTO tracksolid.position_history (imei, gps_time, geom, lat, lng, speed, direction, acc_status, satellite, current_mileage)
VALUES (%s, %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326), %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (imei, gps_time) DO NOTHING
""", (imei, gps_time, lng, lat, lat, lng, speed, direction, acc_status, gps_num, current_mileage))
inserted += cur.rowcount
cur.execute("RELEASE SAVEPOINT sp")
except Exception:
cur.execute("ROLLBACK TO SAVEPOINT sp")
log.warning("Failed to process live position for %s", p.get("imei"), exc_info=True)
log_ingestion(cur, "jimi.user.device.location.list", len(positions), upserted, inserted, int((time.time()-t0)*1000), True)
# ── 3. Trip Reports (Every 15m) ───────────────────────────────────────────────
# [FIX-M20] Migration 09 added route_geom, start/end_address, vehicle_plate,
# waypoints_count to tracksolid.trips. poll_trips now enriches every poll-
# ingested trip from position_history (start/end fix + LineString polyline)
# and reverse-geocodes the endpoints, since jimi.device.track.mileage does
# not return coordinates. ON CONFLICT preserves webhook-supplied data when
# /pushtripreport later delivers native coords.
# Per-trip enrichment from position_history. Four readable scalar subqueries
# rather than a tighter CTE — runs sub-ms each given the (imei, gps_time) PK,
# and the readable form survives the edge case where start_time is just
# before the first available fix in the window (single CTE bounded by
# BETWEEN would return NULL there).
_ENRICH_QUERY = """
SELECT
(SELECT geom FROM tracksolid.position_history
WHERE imei = %s AND gps_time >= %s
ORDER BY gps_time ASC LIMIT 1) AS start_geom,
(SELECT ST_Y(geom) FROM tracksolid.position_history
WHERE imei = %s AND gps_time >= %s
ORDER BY gps_time ASC LIMIT 1) AS start_lat,
(SELECT ST_X(geom) FROM tracksolid.position_history
WHERE imei = %s AND gps_time >= %s
ORDER BY gps_time ASC LIMIT 1) AS start_lng,
(SELECT geom FROM tracksolid.position_history
WHERE imei = %s AND gps_time <= %s
ORDER BY gps_time DESC LIMIT 1) AS end_geom,
(SELECT ST_Y(geom) FROM tracksolid.position_history
WHERE imei = %s AND gps_time <= %s
ORDER BY gps_time DESC LIMIT 1) AS end_lat,
(SELECT ST_X(geom) FROM tracksolid.position_history
WHERE imei = %s AND gps_time <= %s
ORDER BY gps_time DESC LIMIT 1) AS end_lng,
(SELECT ST_MakeLine(geom ORDER BY gps_time)
FROM tracksolid.position_history
WHERE imei = %s AND gps_time BETWEEN %s AND %s
AND geom IS NOT NULL) AS route_geom,
(SELECT COUNT(*) FROM tracksolid.position_history
WHERE imei = %s AND gps_time BETWEEN %s AND %s) AS waypoints_count
"""
def _load_plates_cache(cur) -> dict[str, str]:
"""Build {imei: vehicle_number} for active devices once per poll cycle."""
cur.execute("""
SELECT imei, vehicle_number
FROM tracksolid.devices
WHERE enabled_flag = 1 AND vehicle_number IS NOT NULL
""")
return {imei: plate for imei, plate in cur.fetchall()}
def poll_trips():
t0 = time.time()
token, imeis = get_token(), get_active_imeis()
if not token or not imeis: return
end_ts = datetime.now(timezone.utc)
start_ts = end_ts - timedelta(hours=1)
inserted = 0
with get_conn() as conn:
with conn.cursor() as cur:
plates = _load_plates_cache(cur)
for i in range(0, len(imeis), 50):
batch = imeis[i:i+50]
resp = api_post("jimi.device.track.mileage", {
"imeis": ",".join(batch),
"begin_time": start_ts.strftime("%Y-%m-%d %H:%M:%S"),
"end_time": end_ts.strftime("%Y-%m-%d %H:%M:%S")
}, token)
trips = resp.get("result") or []
for t in trips:
try:
cur.execute("SAVEPOINT sp")
# [FIX-M16] API returns distance in METRES despite documentation saying km.
# Confirmed via: avgSpeed(km/h) × runTimeSecond / 3600 == distance/1000.
# startMileage/endMileage are cumulative odometer in metres (same unit).
# Divide by 1000 to store as distance_km.
raw_dist = clean_num(t.get("distance"))
dist_km = round(raw_dist / 1000.0, 4) if raw_dist is not None else None
imei = t.get("imei")
trip_start = clean_ts(t.get("startTime"))
trip_end = clean_ts(t.get("endTime"))
idle_s = clean_int(t.get("idleSecond"))
# [FIX-M20] Enrich from position_history. trip_start/end
# may be None (rare malformed payload) — skip enrichment
# in that case so we still capture the row.
start_geom = end_geom = route_geom = None
start_lat = start_lng = end_lat = end_lng = None
waypoints_count = 0
if trip_start and trip_end:
cur.execute(_ENRICH_QUERY, (
imei, trip_start, # start_geom
imei, trip_start, # start_lat
imei, trip_start, # start_lng
imei, trip_end, # end_geom
imei, trip_end, # end_lat
imei, trip_end, # end_lng
imei, trip_start, trip_end, # route_geom
imei, trip_start, trip_end, # waypoints_count
))
(start_geom, start_lat, start_lng,
end_geom, end_lat, end_lng,
route_geom, waypoints_count) = cur.fetchone()
start_address = reverse_geocode(start_lat, start_lng)
end_address = reverse_geocode(end_lat, end_lng)
vehicle_plate = plates.get(imei)
cur.execute("""
INSERT INTO tracksolid.trips (
imei, start_time, end_time, distance_km,
avg_speed_kmh, max_speed_kmh, driving_time_s, idle_time_s,
start_geom, end_geom, route_geom, waypoints_count,
start_address, end_address, vehicle_plate, source
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s,
%s, %s, %s, 'poll')
ON CONFLICT (imei, start_time) DO UPDATE SET
end_time = EXCLUDED.end_time,
distance_km = EXCLUDED.distance_km,
max_speed_kmh = COALESCE(EXCLUDED.max_speed_kmh, tracksolid.trips.max_speed_kmh),
driving_time_s = COALESCE(EXCLUDED.driving_time_s, tracksolid.trips.driving_time_s),
idle_time_s = COALESCE(EXCLUDED.idle_time_s, tracksolid.trips.idle_time_s),
start_geom = COALESCE(tracksolid.trips.start_geom, EXCLUDED.start_geom),
end_geom = COALESCE(EXCLUDED.end_geom, tracksolid.trips.end_geom),
route_geom = COALESCE(EXCLUDED.route_geom, tracksolid.trips.route_geom),
waypoints_count = EXCLUDED.waypoints_count,
start_address = COALESCE(tracksolid.trips.start_address, EXCLUDED.start_address),
end_address = COALESCE(EXCLUDED.end_address, tracksolid.trips.end_address),
vehicle_plate = COALESCE(EXCLUDED.vehicle_plate, tracksolid.trips.vehicle_plate)
""", (
imei, trip_start, trip_end, dist_km,
clean_num(t.get("avgSpeed")),
clean_num(t.get("maxSpeed")),
clean_int(t.get("runTimeSecond")),
idle_s,
start_geom, end_geom, route_geom, waypoints_count,
start_address, end_address, vehicle_plate,
))
cur.execute("RELEASE SAVEPOINT sp")
inserted += cur.rowcount
except Exception:
cur.execute("ROLLBACK TO SAVEPOINT sp")
log.warning("Failed to process trip for %s", t.get("imei"), exc_info=True)
log_ingestion(cur, "jimi.device.track.mileage", len(imeis), 0, inserted,
int((time.time() - t0) * 1000), True)
log.info("Trips: %d records processed.", inserted)
# ── 4. Parking Events (Every 15m) ─────────────────────────────────────────────
def poll_parking():
t0 = time.time()
# [FIX-M19] Parking requires an `account` param tied to the IMEI's
# sub-account — call per target with that target's IMEIs only.
token, imeis_by_target = get_token(), get_active_imeis_by_target()
if not token or not imeis_by_target: return
end_ts = datetime.now(timezone.utc)
start_ts = end_ts - timedelta(hours=1)
total_imei = sum(len(v) for v in imeis_by_target.values())
inserted = 0
with get_conn() as conn:
with conn.cursor() as cur:
for target, target_imeis in imeis_by_target.items():
for i in range(0, len(target_imeis), 50):
batch = target_imeis[i:i+50]
# [FIX-M13] account + acc_type=0 required for non-empty results.
resp = api_post("jimi.open.platform.report.parking", {
"account": target,
"imeis": ",".join(batch),
"begin_time": start_ts.strftime("%Y-%m-%d %H:%M:%S"),
"end_time": end_ts.strftime("%Y-%m-%d %H:%M:%S"),
"acc_type": 0,
}, token)
events = resp.get("result") or []
for p in events:
try:
cur.execute("SAVEPOINT sp")
imei = p.get("imei")
start_time = clean_ts(p.get("startTime"))
if not imei or not start_time:
cur.execute("RELEASE SAVEPOINT sp")
continue
lat, lng = clean_num(p.get("lat")), clean_num(p.get("lng"))
cur.execute("""
INSERT INTO tracksolid.parking_events (
imei, event_type, start_time, end_time,
duration_seconds, geom, address
) VALUES (
%s, 'parking', %s, %s, %s,
CASE WHEN %s IS NOT NULL AND %s IS NOT NULL
THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326)
ELSE NULL END,
%s
) ON CONFLICT (imei, start_time, event_type) DO NOTHING
""", (
imei, start_time, clean_ts(p.get("endTime")),
clean_int(p.get("durSecond")), # [FIX-M13] API returns durSecond, not seconds
lng, lat, lng, lat,
clean(p.get("address"))
))
cur.execute("RELEASE SAVEPOINT sp")
inserted += cur.rowcount
except Exception:
cur.execute("ROLLBACK TO SAVEPOINT sp")
log.warning("Failed to process parking for %s", p.get("imei"), exc_info=True)
log_ingestion(cur, "jimi.open.platform.report.parking", total_imei, 0, inserted,
int((time.time() - t0) * 1000), True)
log.info("Parking: %d events processed across %d target(s).", inserted, len(imeis_by_target))
# ── 5. High-Resolution GPS Trail (Every 30m) — POLL-01 ───────────────────────
def poll_track_list():
"""[FIX-M14] Fetch per-device GPS waypoint trails via jimi.device.track.list.
The 60-second fleet sweep (poll_live_positions) captures only the most
recent fix per vehicle — all motion between sweeps is invisible. This
function retrieves every waypoint the device logged in the last 35 minutes
(5-min overlap ensures no gaps at scheduling boundaries) and inserts them
into position_history with source='track_list'.
Impact on reporting:
- position_history row density increases from ~1/min to ~14/min per device
- Route traces in Grafana become accurate continuous paths
- Speed profile queries gain meaningful resolution (avg over 10s intervals
vs 60s intervals) — enables hard-braking / harsh-acceleration detection
- v_mileage_daily_cagg continuous aggregate gains finer odometer deltas
"""
t0 = time.time()
token, imeis = get_token(), get_active_imeis()
if not token or not imeis:
return
end_ts = datetime.now(timezone.utc)
start_ts = end_ts - timedelta(minutes=35) # 5-min overlap avoids boundary gaps
begin_str = start_ts.strftime("%Y-%m-%d %H:%M:%S")
end_str = end_ts.strftime("%Y-%m-%d %H:%M:%S")
# Phase 1: fetch waypoints from API without holding a DB connection.
# jimi.device.track.list is per-IMEI; parallelise at 4 workers to speed
# up the 30 min sweep without tripping the 1006 rate limit.
def _fetch(imei: str):
resp = api_post("jimi.device.track.list", {
"imei": imei,
"begin_time": begin_str,
"end_time": end_str,
"map_type": "GOOGLE",
}, token)
return imei, resp.get("result") or []
with ThreadPoolExecutor(max_workers=4) as pool:
fetched = list(pool.map(_fetch, imeis))
# Phase 2: write rows in one DB transaction.
total_inserted = 0
devices_with_data = 0
rows = []
for imei, waypoints in fetched:
device_rows = 0
for wp in waypoints:
lat = clean_num(wp.get("lat"))
lng = clean_num(wp.get("lng"))
gps_time = clean_ts(wp.get("gpsTime"))
if not is_valid_fix(lat, lng) or not gps_time:
continue
rows.append((
imei, gps_time,
lng, lat, # ST_MakePoint(lng, lat)
lat, lng, # lat, lng columns
clean_num(wp.get("gpsSpeed")),
clean_num(wp.get("direction")),
clean(wp.get("accStatus")),
))
device_rows += 1
if device_rows:
devices_with_data += 1
if rows:
with get_conn() as conn:
with conn.cursor() as cur:
execute_values(
cur,
"""
INSERT INTO tracksolid.position_history (
imei, gps_time, geom, lat, lng,
speed, direction, acc_status, source
) VALUES %s
ON CONFLICT (imei, gps_time) DO NOTHING
""",
rows,
template="(%s, %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326),"
" %s, %s, %s, %s, %s, 'track_list')",
page_size=500,
)
total_inserted = cur.rowcount
log_ingestion(cur, "jimi.device.track.list", len(imeis),
0, total_inserted, int((time.time() - t0) * 1000), True)
else:
with get_conn() as conn:
with conn.cursor() as cur:
log_ingestion(cur, "jimi.device.track.list", len(imeis),
0, 0, int((time.time() - t0) * 1000), True)
log.info("Track list: %d waypoints inserted across %d/%d devices.",
total_inserted, devices_with_data, len(imeis))
# ── 6. On-Demand Device Location Refresh — POLL-03 ───────────────────────────
def get_device_locations(imeis: list) -> int:
"""[FIX-M15] Precision position refresh for a specific list of IMEIs.
Calls jimi.device.location.get (up to 50 IMEIs per call) and upserts
results into live_positions. Use this for:
- Alarm enrichment: get exact position immediately after an alarm fires
- Stale device recovery: force-refresh a vehicle that has been offline
- Dashboard on-demand refresh without waiting for the 60s fleet sweep
Returns the number of positions successfully upserted.
"""
token = get_token()
if not token or not imeis:
return 0
upserted = 0
with get_conn() as conn:
with conn.cursor() as cur:
for i in range(0, len(imeis), 50):
batch = imeis[i:i + 50]
resp = api_post("jimi.device.location.get", {
"imeis": ",".join(batch),
"map_type": "GOOGLE",
}, token)
positions = resp.get("result") or []
for p in positions:
imei = p.get("imei")
lat = clean_num(p.get("lat"))
lng = clean_num(p.get("lng"))
if not imei or not is_valid_fix(lat, lng):
continue
# [FIX-M21] FK guard — this path can see IMEIs the daily
# sync_devices hasn't picked up yet (especially when used
# as the stale-IMEI rescue path).
ensure_device(cur, imei, clean(p.get("deviceName")))
upserted += upsert_live_position(
cur, imei, lat, lng,
clean_ts(p.get("gpsTime")),
speed=clean_num(p.get("speed")),
direction=clean_num(p.get("direction")),
acc_status=clean(p.get("accStatus")),
current_mileage=clean_num(p.get("currentMileage")),
)
conn.commit()
log.info("get_device_locations: %d positions refreshed.", upserted)
return upserted
# ── 7. Stale-IMEI Recovery — POLL-04 ─────────────────────────────────────────
def poll_stale_locations():
"""[FIX-M21] Refresh live_positions for IMEIs whose stored gps_time is
missing or older than 30 minutes.
Complements poll_live_positions (the 60s sweep), which silently omits
devices Jimi's location.list endpoint doesn't return. jimi.device.location.get
returns *last-known* fix per IMEI, so this path can re-warm devices
the sweep has dropped.
"""
stale = get_stale_imeis(stale_minutes=30)
if not stale:
log.info("poll_stale_locations: no stale IMEIs.")
return
log.info("poll_stale_locations: refreshing %d stale IMEI(s).", len(stale))
get_device_locations(stale)
# ── Main Loop ─────────────────────────────────────────────────────────────────
def startup_catchup():
"""Run every movement task once on boot so the DB is warm immediately.
Split out of main() so the merged ingest_worker can reuse it (DRY)."""
safe_task(sync_devices, log)()
safe_task(poll_live_positions, log)()
safe_task(poll_trips, log)()
safe_task(poll_parking, log)()
safe_task(poll_track_list, log)()
safe_task(poll_stale_locations, log)()
def register_jobs():
"""Register the movement jobs on the global `schedule` scheduler.
Reused by both this module's main() and ingest_worker_rev.main()."""
schedule.every(60).seconds.do(safe_task(poll_live_positions, log))
schedule.every(15).minutes.do(safe_task(poll_trips, log))
schedule.every(15).minutes.do(safe_task(poll_parking, log))
schedule.every(30).minutes.do(safe_task(poll_track_list, log)) # [FIX-M14]
schedule.every(10).minutes.do(safe_task(poll_stale_locations, log)) # [FIX-M21]
schedule.every().day.at("02:00").do(safe_task(sync_devices, log))
def main():
log.info("Starting MOVEMENT PIPELINE (v2.2)...")
startup_catchup()
register_jobs()
while True:
schedule.run_pending()
time.sleep(1)
if __name__ == "__main__":
main()