tracksolid_timescale_grafan.../ingest_movement_rev.py
David Kiania 417627675e
Some checks failed
Static Analysis / static (push) Has been cancelled
Tests / test (push) Has been cancelled
Static Analysis / static (pull_request) Has been cancelled
Tests / test (pull_request) Has been cancelled
fix: [FIX-M18] pull driverName/vehicleNumber/sim from detail endpoint
jimi.user.device.list returns null for vehicleName, vehicleNumber,
driverName, driverPhone, and sim even after those fields are set via
jimi.open.device.update — the values only surface through
jimi.track.device.detail. sync_devices() now reads from dtl first with
d as fallback, which unblocks backfill of the 144 CSV-driven updates
pushed on 2026-04-22.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-22 18:21:25 +03:00

538 lines
No EOL
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
ingest_movement_rev.py — Fireside Communications · Tracksolid Movement Pipeline
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
RESPONSIBILITY: High-frequency telemetry (GPS), trip summaries, and parking.
REVISIONS (QA-Verified):
[FIX-M01] sync_devices: Atomic transaction for registry sync.
[FIX-M03] is_valid_fix: Filters 0.0 coordinates (Zero Island bug).
[FIX-M05] Batching: Groups 50 IMEIs per API call (API Limit Compliance).
[FIX-M07] Signal Handling: Clean DB pool closure on SIGTERM/SIGINT.
[FIX-M08] Atomic Logging: log_ingestion happens within the data transaction.
[FIX-11] Uses shared safe_task/setup_shutdown from ts_shared_rev (DRY).
[FIX-M09] Trips: Captures runTimeSecond and maxSpeed from API.
[FIX-M10] Parking: New poll_parking via jimi.open.platform.report.parking.
[FIX-M11] BUG-02: distance_m was stored in millimetres due to erroneous
* 1000 on an already-metric API value. Removed multiplication;
column renamed to distance_km in migration 04. Both poll_trips
and push_trip_report (webhook) corrected.
[FIX-M12] BUG-02: Renamed distance_m → distance_km in all SQL to match
migration 04 schema change.
[FIX-M13] POLL-02: Parking poll was returning 0 rows — added missing
acc_type=0 and account params; fixed response field durSecond
(was mapped as 'seconds').
[FIX-M14] POLL-01: New poll_track_list() — calls jimi.device.track.list
per device every 30 minutes to capture high-resolution GPS
waypoints between the 60-second fleet sweep snapshots. Writes to
position_history with source='track_list'. Fills gaps in route
reconstruction and enables accurate path drawing on maps.
[FIX-M15] POLL-03: New get_device_locations() utility — calls
jimi.device.location.get for up to 50 specific IMEIs on demand.
Used for precision refreshes (alarm enrichment, stale device
recovery) without waiting for the next full fleet sweep.
[FIX-M18] sync_devices: Pull vehicleName / vehicleNumber / driverName /
driverPhone / sim from jimi.track.device.detail first (list
endpoint returns null for these even when populated via
jimi.open.device.update).
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
"""
import time
from concurrent.futures import ThreadPoolExecutor
import schedule
from datetime import datetime, timezone, timedelta
from psycopg2.extras import execute_values
from ts_shared_rev import (
TARGET_ACCOUNT,
api_post,
get_active_imeis,
get_conn,
get_token,
is_valid_fix,
log_ingestion,
clean,
clean_num,
clean_int,
clean_ts,
get_logger,
safe_task,
setup_shutdown,
)
log = get_logger("movement")
setup_shutdown(log)
# ── 1. Device Registry Sync (Daily) ──────────────────────────────────────────
def sync_devices():
log.info("Syncing device registry...")
t0, token = time.time(), get_token()
if not token: return
resp = api_post("jimi.user.device.list", {"target": TARGET_ACCOUNT}, token)
if resp.get("code") != 0: return
devices = resp.get("result") or []
upserted = 0
# Fetch per-device detail in parallel — previously an N+1 blocker where
# 80 devices × ~300 ms/call ≈ 24 s serial. 8 workers brings it to ~3 s.
# Gated at 8 to stay under API rate-limit (1006) headroom.
def _fetch_detail(imei: str) -> dict:
detail_resp = api_post("jimi.track.device.detail", {"imei": imei}, token)
return detail_resp.get("result") or {} if detail_resp.get("code") == 0 else {}
imeis = [d.get("imei") for d in devices if d.get("imei")]
with ThreadPoolExecutor(max_workers=8) as pool:
details = dict(zip(imeis, pool.map(_fetch_detail, imeis)))
with get_conn() as conn:
with conn.cursor() as cur:
for d in devices:
imei = d.get("imei")
if not imei: continue
dtl = details.get(imei, {})
cur.execute("""
INSERT INTO tracksolid.devices (
imei, device_name, mc_type, mc_type_use_scope,
vehicle_name, vehicle_number, vehicle_models, vehicle_icon,
vin, engine_number, vehicle_brand, fuel_100km,
driver_name, driver_phone, sim, iccid, imsi,
account, customer_name, device_group_id, device_group,
activation_time, expiration, enabled_flag, status,
current_mileage_km, last_synced_at
) VALUES (
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW()
)
ON CONFLICT (imei) DO UPDATE SET
device_name = EXCLUDED.device_name,
mc_type = EXCLUDED.mc_type,
mc_type_use_scope = EXCLUDED.mc_type_use_scope,
vehicle_name = EXCLUDED.vehicle_name,
vehicle_number = EXCLUDED.vehicle_number,
vehicle_models = EXCLUDED.vehicle_models,
vehicle_icon = EXCLUDED.vehicle_icon,
vin = EXCLUDED.vin,
engine_number = EXCLUDED.engine_number,
vehicle_brand = EXCLUDED.vehicle_brand,
fuel_100km = EXCLUDED.fuel_100km,
driver_name = EXCLUDED.driver_name,
driver_phone = EXCLUDED.driver_phone,
sim = EXCLUDED.sim,
iccid = EXCLUDED.iccid,
imsi = EXCLUDED.imsi,
account = EXCLUDED.account,
customer_name = EXCLUDED.customer_name,
device_group_id = EXCLUDED.device_group_id,
device_group = EXCLUDED.device_group,
activation_time = EXCLUDED.activation_time,
expiration = EXCLUDED.expiration,
enabled_flag = EXCLUDED.enabled_flag,
status = EXCLUDED.status,
current_mileage_km = EXCLUDED.current_mileage_km,
last_synced_at = NOW(),
updated_at = NOW()
""", (
# [FIX-M18] vehicleName/vehicleNumber/driverName/driverPhone/sim
# only surface via jimi.track.device.detail — list returns null.
imei, clean(d.get("deviceName")), clean(d.get("mcType")), clean(d.get("mcTypeUseScope")),
clean(dtl.get("vehicleName") or d.get("vehicleName")),
clean(dtl.get("vehicleNumber") or d.get("vehicleNumber")),
clean(d.get("vehicleModels")), clean(d.get("vehicleIcon")),
clean(dtl.get("vin")), clean(dtl.get("engineNumber")), clean(dtl.get("vehicleBrand")), clean_num(dtl.get("fuel_100km")),
clean(dtl.get("driverName") or d.get("driverName")),
clean(dtl.get("driverPhone") or d.get("driverPhone")),
clean(dtl.get("sim") or d.get("sim")),
clean(dtl.get("iccid")), clean(dtl.get("imsi")),
clean(dtl.get("account")), clean(dtl.get("customerName")), clean(d.get("deviceGroupId")), clean(d.get("deviceGroup")),
clean_ts(d.get("activationTime")), clean_ts(d.get("expiration")), clean_int(d.get("enabledFlag", 1)),
clean(dtl.get("status", "active")), clean_num(dtl.get("currentMileage"))
))
upserted += 1
log_ingestion(cur, "jimi.user.device.list+detail", len(devices), upserted, 0, int((time.time()-t0)*1000), True)
conn.commit()
log.info("Registry sync: %d devices updated.", upserted)
# ── 2. Live Positions (Every 60s) ─────────────────────────────────────────────
def poll_live_positions():
t0, token = time.time(), get_token()
if not token: return
resp = api_post("jimi.user.device.location.list", {"target": TARGET_ACCOUNT, "map_type": "GOOGLE"}, token)
if resp.get("code") != 0: return
positions = resp.get("result") or []
upserted, inserted = 0, 0
with get_conn() as conn:
with conn.cursor() as cur:
for p in positions:
try:
cur.execute("SAVEPOINT sp")
imei, lat, lng = p.get("imei"), clean_num(p.get("lat")), clean_num(p.get("lng"))
if not imei or not is_valid_fix(lat, lng):
cur.execute("RELEASE SAVEPOINT sp")
continue
gps_time = clean_ts(p.get("gpsTime"))
speed = clean_num(p.get("speed"))
direction = clean_num(p.get("direction"))
acc_status = clean(p.get("accStatus"))
gps_num = clean_int(p.get("gpsNum"))
current_mileage = clean_num(p.get("currentMileage"))
cur.execute("""
INSERT INTO tracksolid.live_positions (
imei, geom, lat, lng, pos_type, confidence, gps_time, hb_time,
speed, direction, acc_status, gps_signal, gps_num,
elec_quantity, power_value, battery_power_val, tracker_oil,
temperature, current_mileage, device_status, loc_desc, recorded_at
) VALUES (
%s, ST_SetSRID(ST_MakePoint(%s, %s), 4326), %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW()
)
ON CONFLICT (imei) DO UPDATE SET
geom=EXCLUDED.geom, lat=EXCLUDED.lat, lng=EXCLUDED.lng,
gps_time=EXCLUDED.gps_time, speed=EXCLUDED.speed, direction=EXCLUDED.direction,
acc_status=EXCLUDED.acc_status, current_mileage=EXCLUDED.current_mileage,
updated_at=NOW()
""", (
imei, lng, lat, lat, lng, clean(p.get("posType")), clean_int(p.get("confidence")),
gps_time, clean_ts(p.get("hbTime")), speed,
direction, acc_status, clean_int(p.get("gpsSignal")),
gps_num, clean_num(p.get("electQuantity")), clean_num(p.get("powerValue")),
clean_num(p.get("batteryPowerVal")), clean(p.get("trackerOil")), clean_num(p.get("temperature")),
current_mileage, clean(p.get("status")), clean(p.get("locDesc"))
))
upserted += cur.rowcount
# History (Hypertable Source)
if gps_time:
cur.execute("""
INSERT INTO tracksolid.position_history (imei, gps_time, geom, lat, lng, speed, direction, acc_status, satellite, current_mileage)
VALUES (%s, %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326), %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (imei, gps_time) DO NOTHING
""", (imei, gps_time, lng, lat, lat, lng, speed, direction, acc_status, gps_num, current_mileage))
inserted += cur.rowcount
cur.execute("RELEASE SAVEPOINT sp")
except Exception:
cur.execute("ROLLBACK TO SAVEPOINT sp")
log.warning("Failed to process live position for %s", p.get("imei"), exc_info=True)
log_ingestion(cur, "jimi.user.device.location.list", len(positions), upserted, inserted, int((time.time()-t0)*1000), True)
# ── 3. Trip Reports (Every 15m) ───────────────────────────────────────────────
def poll_trips():
t0 = time.time()
token, imeis = get_token(), get_active_imeis()
if not token or not imeis: return
end_ts = datetime.now(timezone.utc)
start_ts = end_ts - timedelta(hours=1)
inserted = 0
with get_conn() as conn:
with conn.cursor() as cur:
for i in range(0, len(imeis), 50):
batch = imeis[i:i+50]
resp = api_post("jimi.device.track.mileage", {
"imeis": ",".join(batch),
"begin_time": start_ts.strftime("%Y-%m-%d %H:%M:%S"),
"end_time": end_ts.strftime("%Y-%m-%d %H:%M:%S")
}, token)
trips = resp.get("result") or []
for t in trips:
try:
cur.execute("SAVEPOINT sp")
# [FIX-M16] API returns distance in METRES despite documentation saying km.
# Confirmed via: avgSpeed(km/h) × runTimeSecond / 3600 == distance/1000.
# startMileage/endMileage are cumulative odometer in metres (same unit).
# Divide by 1000 to store as distance_km.
raw_dist = clean_num(t.get("distance"))
dist_km = round(raw_dist / 1000.0, 4) if raw_dist is not None else None
cur.execute("""
INSERT INTO tracksolid.trips (
imei, start_time, end_time, distance_km,
avg_speed_kmh, max_speed_kmh, driving_time_s, source
) VALUES (%s, %s, %s, %s, %s, %s, %s, 'poll')
ON CONFLICT (imei, start_time) DO UPDATE SET
end_time = EXCLUDED.end_time,
distance_km = EXCLUDED.distance_km,
max_speed_kmh = COALESCE(EXCLUDED.max_speed_kmh, tracksolid.trips.max_speed_kmh),
driving_time_s = COALESCE(EXCLUDED.driving_time_s, tracksolid.trips.driving_time_s)
""", (
t.get("imei"), clean_ts(t.get("startTime")), clean_ts(t.get("endTime")),
dist_km, clean_num(t.get("avgSpeed")),
clean_num(t.get("maxSpeed")), clean_int(t.get("runTimeSecond"))
))
cur.execute("RELEASE SAVEPOINT sp")
inserted += cur.rowcount
except Exception:
cur.execute("ROLLBACK TO SAVEPOINT sp")
log.warning("Failed to process trip for %s", t.get("imei"), exc_info=True)
log_ingestion(cur, "jimi.device.track.mileage", len(imeis), 0, inserted,
int((time.time() - t0) * 1000), True)
log.info("Trips: %d records processed.", inserted)
# ── 4. Parking Events (Every 15m) ─────────────────────────────────────────────
def poll_parking():
t0 = time.time()
token, imeis = get_token(), get_active_imeis()
if not token or not imeis: return
end_ts = datetime.now(timezone.utc)
start_ts = end_ts - timedelta(hours=1)
inserted = 0
with get_conn() as conn:
with conn.cursor() as cur:
for i in range(0, len(imeis), 50):
batch = imeis[i:i+50]
# [FIX-M13] Added account + acc_type=0 (all stop types). Without these
# the API returns empty results even when parking events exist.
resp = api_post("jimi.open.platform.report.parking", {
"account": TARGET_ACCOUNT,
"imeis": ",".join(batch),
"begin_time": start_ts.strftime("%Y-%m-%d %H:%M:%S"),
"end_time": end_ts.strftime("%Y-%m-%d %H:%M:%S"),
"acc_type": 0,
}, token)
events = resp.get("result") or []
for p in events:
try:
cur.execute("SAVEPOINT sp")
imei = p.get("imei")
start_time = clean_ts(p.get("startTime"))
if not imei or not start_time:
cur.execute("RELEASE SAVEPOINT sp")
continue
lat, lng = clean_num(p.get("lat")), clean_num(p.get("lng"))
cur.execute("""
INSERT INTO tracksolid.parking_events (
imei, event_type, start_time, end_time,
duration_seconds, geom, address
) VALUES (
%s, 'parking', %s, %s, %s,
CASE WHEN %s IS NOT NULL AND %s IS NOT NULL
THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326)
ELSE NULL END,
%s
) ON CONFLICT (imei, start_time, event_type) DO NOTHING
""", (
imei, start_time, clean_ts(p.get("endTime")),
clean_int(p.get("durSecond")), # [FIX-M13] API returns durSecond, not seconds
lng, lat, lng, lat,
clean(p.get("address"))
))
cur.execute("RELEASE SAVEPOINT sp")
inserted += cur.rowcount
except Exception:
cur.execute("ROLLBACK TO SAVEPOINT sp")
log.warning("Failed to process parking for %s", p.get("imei"), exc_info=True)
log_ingestion(cur, "jimi.open.platform.report.parking", len(imeis), 0, inserted,
int((time.time() - t0) * 1000), True)
log.info("Parking: %d events processed.", inserted)
# ── 5. High-Resolution GPS Trail (Every 30m) — POLL-01 ───────────────────────
def poll_track_list():
"""[FIX-M14] Fetch per-device GPS waypoint trails via jimi.device.track.list.
The 60-second fleet sweep (poll_live_positions) captures only the most
recent fix per vehicle — all motion between sweeps is invisible. This
function retrieves every waypoint the device logged in the last 35 minutes
(5-min overlap ensures no gaps at scheduling boundaries) and inserts them
into position_history with source='track_list'.
Impact on reporting:
- position_history row density increases from ~1/min to ~14/min per device
- Route traces in Grafana become accurate continuous paths
- Speed profile queries gain meaningful resolution (avg over 10s intervals
vs 60s intervals) — enables hard-braking / harsh-acceleration detection
- v_mileage_daily_cagg continuous aggregate gains finer odometer deltas
"""
t0 = time.time()
token, imeis = get_token(), get_active_imeis()
if not token or not imeis:
return
end_ts = datetime.now(timezone.utc)
start_ts = end_ts - timedelta(minutes=35) # 5-min overlap avoids boundary gaps
begin_str = start_ts.strftime("%Y-%m-%d %H:%M:%S")
end_str = end_ts.strftime("%Y-%m-%d %H:%M:%S")
# Phase 1: fetch waypoints from API without holding a DB connection.
# jimi.device.track.list is per-IMEI; parallelise at 4 workers to speed
# up the 30 min sweep without tripping the 1006 rate limit.
def _fetch(imei: str):
resp = api_post("jimi.device.track.list", {
"imei": imei,
"begin_time": begin_str,
"end_time": end_str,
"map_type": "GOOGLE",
}, token)
return imei, resp.get("result") or []
with ThreadPoolExecutor(max_workers=4) as pool:
fetched = list(pool.map(_fetch, imeis))
# Phase 2: write rows in one DB transaction.
total_inserted = 0
devices_with_data = 0
rows = []
for imei, waypoints in fetched:
device_rows = 0
for wp in waypoints:
lat = clean_num(wp.get("lat"))
lng = clean_num(wp.get("lng"))
gps_time = clean_ts(wp.get("gpsTime"))
if not is_valid_fix(lat, lng) or not gps_time:
continue
rows.append((
imei, gps_time,
lng, lat, # ST_MakePoint(lng, lat)
lat, lng, # lat, lng columns
clean_num(wp.get("gpsSpeed")),
clean_num(wp.get("direction")),
clean(wp.get("accStatus")),
))
device_rows += 1
if device_rows:
devices_with_data += 1
if rows:
with get_conn() as conn:
with conn.cursor() as cur:
execute_values(
cur,
"""
INSERT INTO tracksolid.position_history (
imei, gps_time, geom, lat, lng,
speed, direction, acc_status, source
) VALUES %s
ON CONFLICT (imei, gps_time) DO NOTHING
""",
rows,
template="(%s, %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326),"
" %s, %s, %s, %s, %s, 'track_list')",
page_size=500,
)
total_inserted = cur.rowcount
log_ingestion(cur, "jimi.device.track.list", len(imeis),
0, total_inserted, int((time.time() - t0) * 1000), True)
else:
with get_conn() as conn:
with conn.cursor() as cur:
log_ingestion(cur, "jimi.device.track.list", len(imeis),
0, 0, int((time.time() - t0) * 1000), True)
log.info("Track list: %d waypoints inserted across %d/%d devices.",
total_inserted, devices_with_data, len(imeis))
# ── 6. On-Demand Device Location Refresh — POLL-03 ───────────────────────────
def get_device_locations(imeis: list) -> int:
"""[FIX-M15] Precision position refresh for a specific list of IMEIs.
Calls jimi.device.location.get (up to 50 IMEIs per call) and upserts
results into live_positions. Use this for:
- Alarm enrichment: get exact position immediately after an alarm fires
- Stale device recovery: force-refresh a vehicle that has been offline
- Dashboard on-demand refresh without waiting for the 60s fleet sweep
Returns the number of positions successfully upserted.
"""
token = get_token()
if not token or not imeis:
return 0
upserted = 0
with get_conn() as conn:
with conn.cursor() as cur:
for i in range(0, len(imeis), 50):
batch = imeis[i:i + 50]
resp = api_post("jimi.device.location.get", {
"imeis": ",".join(batch),
"map_type": "GOOGLE",
}, token)
positions = resp.get("result") or []
for p in positions:
imei = p.get("imei")
lat = clean_num(p.get("lat"))
lng = clean_num(p.get("lng"))
if not imei or not is_valid_fix(lat, lng):
continue
cur.execute("""
INSERT INTO tracksolid.live_positions (
imei, geom, lat, lng, speed, direction,
gps_time, acc_status, current_mileage, recorded_at
) VALUES (
%s, ST_SetSRID(ST_MakePoint(%s, %s), 4326),
%s, %s, %s, %s, %s, %s, %s, NOW()
)
ON CONFLICT (imei) DO UPDATE SET
geom = EXCLUDED.geom,
lat = EXCLUDED.lat,
lng = EXCLUDED.lng,
speed = EXCLUDED.speed,
direction = EXCLUDED.direction,
gps_time = EXCLUDED.gps_time,
acc_status = EXCLUDED.acc_status,
current_mileage = EXCLUDED.current_mileage,
updated_at = NOW()
""", (
imei, lng, lat, lat, lng,
clean_num(p.get("speed")),
clean_num(p.get("direction")),
clean_ts(p.get("gpsTime")),
clean(p.get("accStatus")),
clean_num(p.get("currentMileage")),
))
upserted += 1
conn.commit()
log.info("get_device_locations: %d positions refreshed.", upserted)
return upserted
# ── Main Loop ─────────────────────────────────────────────────────────────────
def main():
log.info("Starting MOVEMENT PIPELINE (v2.2)...")
# Startup catch-up
safe_task(sync_devices, log)()
safe_task(poll_live_positions, log)()
safe_task(poll_trips, log)()
safe_task(poll_parking, log)()
safe_task(poll_track_list, log)()
# Schedule
schedule.every(60).seconds.do(safe_task(poll_live_positions, log))
schedule.every(15).minutes.do(safe_task(poll_trips, log))
schedule.every(15).minutes.do(safe_task(poll_parking, log))
schedule.every(30).minutes.do(safe_task(poll_track_list, log)) # [FIX-M14]
schedule.every().day.at("02:00").do(safe_task(sync_devices, log))
while True:
schedule.run_pending()
time.sleep(1)
if __name__ == "__main__":
main()