dict.get("result", []) returns None when key exists with null value.
Changed to resp.get("result") or [] which handles both cases.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
270 lines
No EOL
13 KiB
Python
270 lines
No EOL
13 KiB
Python
"""
|
|
ingest_movement_rev.py — Fireside Communications · Tracksolid Movement Pipeline
|
|
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
|
RESPONSIBILITY: High-frequency telemetry (GPS), trip summaries, and parking.
|
|
|
|
REVISIONS (QA-Verified):
|
|
[FIX-M01] sync_devices: Atomic transaction for registry sync.
|
|
[FIX-M03] is_valid_fix: Filters 0.0 coordinates (Zero Island bug).
|
|
[FIX-M05] Batching: Groups 50 IMEIs per API call (API Limit Compliance).
|
|
[FIX-M07] Signal Handling: Clean DB pool closure on SIGTERM/SIGINT.
|
|
[FIX-M08] Atomic Logging: log_ingestion happens within the data transaction.
|
|
[FIX-QA-01] Distance: Explicit km to meters conversion (* 1000).
|
|
[FIX-11] Uses shared safe_task/setup_shutdown from ts_shared_rev (DRY).
|
|
[FIX-M09] Trips: Captures runTimeSecond and maxSpeed from API.
|
|
[FIX-M10] Parking: New poll_parking via jimi.open.platform.report.parking.
|
|
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
|
"""
|
|
|
|
import time
|
|
import schedule
|
|
from datetime import datetime, timezone, timedelta
|
|
|
|
from ts_shared_rev import (
|
|
TARGET_ACCOUNT,
|
|
api_post,
|
|
get_active_imeis,
|
|
get_conn,
|
|
get_token,
|
|
is_valid_fix,
|
|
log_ingestion,
|
|
clean,
|
|
clean_num,
|
|
clean_int,
|
|
clean_ts,
|
|
get_logger,
|
|
safe_task,
|
|
setup_shutdown,
|
|
)
|
|
|
|
log = get_logger("movement")
|
|
setup_shutdown(log)
|
|
|
|
# ── 1. Device Registry Sync (Daily) ──────────────────────────────────────────
|
|
|
|
def sync_devices():
|
|
log.info("Syncing device registry...")
|
|
t0, token = time.time(), get_token()
|
|
if not token: return
|
|
|
|
resp = api_post("jimi.user.device.list", {"target": TARGET_ACCOUNT}, token)
|
|
if resp.get("code") != 0: return
|
|
|
|
devices = resp.get("result") or []
|
|
upserted = 0
|
|
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
for d in devices:
|
|
imei = d.get("imei")
|
|
if not imei: continue
|
|
|
|
detail_resp = api_post("jimi.track.device.detail", {"imei": imei}, token)
|
|
dtl = detail_resp.get("result") or {} if detail_resp.get("code") == 0 else {}
|
|
|
|
cur.execute("""
|
|
INSERT INTO tracksolid.devices (
|
|
imei, device_name, mc_type, mc_type_use_scope,
|
|
vehicle_name, vehicle_number, vehicle_models, vehicle_icon,
|
|
vin, engine_number, vehicle_brand, fuel_100km,
|
|
driver_name, driver_phone, sim, iccid, imsi,
|
|
account, customer_name, device_group_id, device_group,
|
|
activation_time, expiration, enabled_flag, status,
|
|
current_mileage_km, last_synced_at
|
|
) VALUES (
|
|
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
|
|
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW()
|
|
)
|
|
ON CONFLICT (imei) DO UPDATE SET
|
|
device_name = EXCLUDED.device_name,
|
|
vehicle_number = EXCLUDED.vehicle_number,
|
|
driver_name = EXCLUDED.driver_name,
|
|
enabled_flag = EXCLUDED.enabled_flag,
|
|
current_mileage_km = EXCLUDED.current_mileage_km,
|
|
last_synced_at = NOW(), updated_at = NOW()
|
|
""", (
|
|
imei, clean(d.get("deviceName")), clean(d.get("mcType")), clean(d.get("mcTypeUseScope")),
|
|
clean(d.get("vehicleName")), clean(d.get("vehicleNumber")), clean(d.get("vehicleModels")), clean(d.get("vehicleIcon")),
|
|
clean(dtl.get("vin")), clean(dtl.get("engineNumber")), clean(dtl.get("vehicleBrand")), clean_num(dtl.get("fuel_100km")),
|
|
clean(d.get("driverName")), clean(d.get("driverPhone")), clean(d.get("sim")), clean(dtl.get("iccid")), clean(dtl.get("imsi")),
|
|
clean(dtl.get("account")), clean(dtl.get("customerName")), clean(d.get("deviceGroupId")), clean(d.get("deviceGroup")),
|
|
clean_ts(d.get("activationTime")), clean_ts(d.get("expiration")), clean_int(d.get("enabledFlag", 1)),
|
|
clean(dtl.get("status", "active")), clean_num(dtl.get("currentMileage"))
|
|
))
|
|
upserted += 1
|
|
|
|
log_ingestion(cur, "jimi.user.device.list+detail", len(devices), upserted, 0, int((time.time()-t0)*1000), True)
|
|
conn.commit()
|
|
log.info("Registry sync: %d devices updated.", upserted)
|
|
|
|
# ── 2. Live Positions (Every 60s) ─────────────────────────────────────────────
|
|
|
|
def poll_live_positions():
|
|
t0, token = time.time(), get_token()
|
|
if not token: return
|
|
|
|
resp = api_post("jimi.user.device.location.list", {"target": TARGET_ACCOUNT, "map_type": "GOOGLE"}, token)
|
|
if resp.get("code") != 0: return
|
|
|
|
positions = resp.get("result") or []
|
|
upserted, inserted = 0, 0
|
|
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
for p in positions:
|
|
imei, lat, lng = p.get("imei"), clean_num(p.get("lat")), clean_num(p.get("lng"))
|
|
if not imei or not is_valid_fix(lat, lng): continue
|
|
|
|
cur.execute("""
|
|
INSERT INTO tracksolid.live_positions (
|
|
imei, geom, lat, lng, pos_type, confidence, gps_time, hb_time,
|
|
speed, direction, acc_status, gps_signal, gps_num,
|
|
elec_quantity, power_value, battery_power_val, tracker_oil,
|
|
temperature, current_mileage, device_status, loc_desc, recorded_at
|
|
) VALUES (
|
|
%s, ST_SetSRID(ST_MakePoint(%s, %s), 4326), %s, %s, %s, %s, %s, %s,
|
|
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW()
|
|
)
|
|
ON CONFLICT (imei) DO UPDATE SET
|
|
geom=EXCLUDED.geom, lat=EXCLUDED.lat, lng=EXCLUDED.lng,
|
|
gps_time=EXCLUDED.gps_time, speed=EXCLUDED.speed, direction=EXCLUDED.direction,
|
|
acc_status=EXCLUDED.acc_status, current_mileage=EXCLUDED.current_mileage,
|
|
updated_at=NOW()
|
|
""", (
|
|
imei, lng, lat, lat, lng, clean(p.get("posType")), clean_int(p.get("confidence")),
|
|
clean_ts(p.get("gpsTime")), clean_ts(p.get("hbTime")), clean_num(p.get("speed")),
|
|
clean_num(p.get("direction")), clean(p.get("accStatus")), clean_int(p.get("gpsSignal")),
|
|
clean_int(p.get("gpsNum")), clean_num(p.get("electQuantity")), clean_num(p.get("powerValue")),
|
|
clean_num(p.get("batteryPowerVal")), clean(p.get("trackerOil")), clean_num(p.get("temperature")),
|
|
clean_num(p.get("currentMileage")), clean(p.get("status")), clean(p.get("locDesc"))
|
|
))
|
|
upserted += 1
|
|
|
|
# History (Hypertable Source)
|
|
if clean_ts(p.get("gpsTime")):
|
|
cur.execute("""
|
|
INSERT INTO tracksolid.position_history (imei, gps_time, geom, lat, lng, speed, direction, acc_status, satellite, current_mileage)
|
|
VALUES (%s, %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326), %s, %s, %s, %s, %s, %s, %s)
|
|
ON CONFLICT (imei, gps_time) DO NOTHING
|
|
""", (imei, clean_ts(p.get("gpsTime")), lng, lat, lat, lng, clean_num(p.get("speed")), clean_num(p.get("direction")), clean(p.get("accStatus")), clean_int(p.get("gpsNum")), clean_num(p.get("currentMileage"))))
|
|
inserted += 1
|
|
|
|
log_ingestion(cur, "jimi.user.device.location.list", len(positions), upserted, inserted, int((time.time()-t0)*1000), True)
|
|
conn.commit()
|
|
|
|
# ── 3. Trip Reports (Every 15m) ───────────────────────────────────────────────
|
|
|
|
def poll_trips():
|
|
token, imeis = get_token(), get_active_imeis()
|
|
if not token or not imeis: return
|
|
|
|
end_ts = datetime.now(timezone.utc)
|
|
start_ts = end_ts - timedelta(hours=1)
|
|
inserted = 0
|
|
|
|
for i in range(0, len(imeis), 50):
|
|
batch = imeis[i:i+50]
|
|
resp = api_post("jimi.device.track.mileage", {
|
|
"imeis": ",".join(batch),
|
|
"begin_time": start_ts.strftime("%Y-%m-%d %H:%M:%S"),
|
|
"end_time": end_ts.strftime("%Y-%m-%d %H:%M:%S")
|
|
}, token)
|
|
|
|
trips = resp.get("result") or []
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
for t in trips:
|
|
dist_km = clean_num(t.get("distance"))
|
|
dist_m = dist_km * 1000 if dist_km is not None else 0 # [QA-01] Conversion
|
|
cur.execute("""
|
|
INSERT INTO tracksolid.trips (
|
|
imei, start_time, end_time, distance_m,
|
|
avg_speed_kmh, max_speed_kmh, driving_time_s, source
|
|
) VALUES (%s, %s, %s, %s, %s, %s, %s, 'poll')
|
|
ON CONFLICT (imei, start_time) DO UPDATE SET
|
|
end_time = EXCLUDED.end_time,
|
|
distance_m = EXCLUDED.distance_m,
|
|
max_speed_kmh = COALESCE(EXCLUDED.max_speed_kmh, tracksolid.trips.max_speed_kmh),
|
|
driving_time_s = COALESCE(EXCLUDED.driving_time_s, tracksolid.trips.driving_time_s)
|
|
""", (
|
|
t.get("imei"), clean_ts(t.get("startTime")), clean_ts(t.get("endTime")),
|
|
dist_m, clean_num(t.get("avgSpeed")),
|
|
clean_num(t.get("maxSpeed")), clean_int(t.get("runTimeSecond"))
|
|
))
|
|
inserted += 1
|
|
conn.commit()
|
|
log.info("Trips: %d records processed.", inserted)
|
|
|
|
# ── 4. Parking Events (Every 15m) ─────────────────────────────────────────────
|
|
|
|
def poll_parking():
|
|
t0 = time.time()
|
|
token, imeis = get_token(), get_active_imeis()
|
|
if not token or not imeis: return
|
|
|
|
end_ts = datetime.now(timezone.utc)
|
|
start_ts = end_ts - timedelta(hours=1)
|
|
inserted = 0
|
|
|
|
for i in range(0, len(imeis), 50):
|
|
batch = imeis[i:i+50]
|
|
resp = api_post("jimi.open.platform.report.parking", {
|
|
"imeis": ",".join(batch),
|
|
"begin_time": start_ts.strftime("%Y-%m-%d %H:%M:%S"),
|
|
"end_time": end_ts.strftime("%Y-%m-%d %H:%M:%S"),
|
|
}, token)
|
|
|
|
events = resp.get("result") or []
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
for p in events:
|
|
imei = p.get("imei")
|
|
start_time = clean_ts(p.get("startTime"))
|
|
if not imei or not start_time:
|
|
continue
|
|
lat, lng = clean_num(p.get("lat")), clean_num(p.get("lng"))
|
|
cur.execute("""
|
|
INSERT INTO tracksolid.parking_events (
|
|
imei, event_type, start_time, end_time,
|
|
duration_seconds, geom, address
|
|
) VALUES (
|
|
%s, 'parking', %s, %s, %s,
|
|
CASE WHEN %s IS NOT NULL AND %s IS NOT NULL
|
|
THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326)
|
|
ELSE NULL END,
|
|
%s
|
|
) ON CONFLICT (imei, start_time, event_type) DO NOTHING
|
|
""", (
|
|
imei, start_time, clean_ts(p.get("endTime")),
|
|
clean_int(p.get("seconds")),
|
|
lng, lat, lng, lat,
|
|
clean(p.get("address"))
|
|
))
|
|
inserted += 1
|
|
log_ingestion(cur, "jimi.open.platform.report.parking", len(batch), 0, inserted,
|
|
int((time.time() - t0) * 1000), True)
|
|
log.info("Parking: %d events processed.", inserted)
|
|
|
|
# ── Main Loop ─────────────────────────────────────────────────────────────────
|
|
|
|
def main():
|
|
log.info("Starting MOVEMENT PIPELINE (v2.1)...")
|
|
|
|
# Startup catch-up
|
|
safe_task(sync_devices, log)()
|
|
safe_task(poll_live_positions, log)()
|
|
safe_task(poll_trips, log)()
|
|
safe_task(poll_parking, log)()
|
|
|
|
# Schedule
|
|
schedule.every(60).seconds.do(safe_task(poll_live_positions, log))
|
|
schedule.every(15).minutes.do(safe_task(poll_trips, log))
|
|
schedule.every(15).minutes.do(safe_task(poll_parking, log))
|
|
schedule.every().day.at("02:00").do(safe_task(sync_devices, log))
|
|
|
|
while True:
|
|
schedule.run_pending()
|
|
time.sleep(1)
|
|
|
|
if __name__ == "__main__":
|
|
main() |