tracksolid_timescale_grafan.../ingest_movement_rev.py
david kiania d66c3bab42
Some checks failed
Static Analysis / static (push) Has been cancelled
Tests / test (push) Has been cancelled
Static Analysis / static (pull_request) Has been cancelled
Tests / test (pull_request) Has been cancelled
fix: BUG-06..11 — pool lock, clean_int rounding, date-only tz, _infer_city, rowcount naming, double commit
BUG-06 (LOW-MED): _get_pool() had a TOCTOU race — two threads hitting the
None pool at cold start could each create one and leak the loser's
connections. Added a threading.Lock with double-checked locking.

BUG-07 (LOW): clean_int truncated via int(float(s)) so "3.9" → 3. All
current call sites are intrinsically-integer fields, so behaviour for
production traffic is unchanged, but rounding is the safer default for
any future field that arrives as a decimal. Unit test updated to match.

BUG-08 (LOW): _infer_city mapped every Kenyan plate to NBO, silently
misclassifying Coast/Mombasa vehicles. Now returns None for K-series
plates and emits a log warning so operators can tag them explicitly.
Uganda (UMA / UAG) remains unambiguous → KLA. Analytics views already
COALESCE NULLs into the 'unassigned' bucket so no dashboards break.

BUG-09 (LOW): clean_ts accepted "2024-04-12" verbatim → Postgres stored
00:00 UTC = 03:00 EAT, three hours off the operator's intent. Date-only
strings are now anchored to Africa/Nairobi midnight (T00:00:00+03:00).
Strings with a time component pass through unchanged. Unit test added.

BUG-10 (LOW): rowcount counters in poll_live_positions and poll_trips
were named "upserted"/"inserted" but they sum cur.rowcount from
ON CONFLICT DO UPDATE statements — which always returns 1 per touch
regardless of whether the row was an insert or an update. Renamed to
live_pos_affected / history_inserted / trips_affected, and routed
trips_affected to the rows_upserted slot of ingestion_log (it was
previously logged as rows_inserted, which was misleading).

BUG-11 (COSMETIC): removed the redundant conn.commit() inside the
with get_conn() block of _update_token_cache — the context manager
already auto-commits on __exit__.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-15 15:49:54 +03:00

579 lines
No EOL
29 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
ingest_movement_rev.py — Fireside Communications · Tracksolid Movement Pipeline
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
RESPONSIBILITY: High-frequency telemetry (GPS), trip summaries, and parking.
REVISIONS (QA-Verified):
[FIX-M01] sync_devices: Atomic transaction for registry sync.
[FIX-M03] is_valid_fix: Filters 0.0 coordinates (Zero Island bug).
[FIX-M05] Batching: Groups 50 IMEIs per API call (API Limit Compliance).
[FIX-M07] Signal Handling: Clean DB pool closure on SIGTERM/SIGINT.
[FIX-M08] Atomic Logging: log_ingestion happens within the data transaction.
[FIX-11] Uses shared safe_task/setup_shutdown from ts_shared_rev (DRY).
[FIX-M09] Trips: Captures runTimeSecond and maxSpeed from API.
[FIX-M10] Parking: New poll_parking via jimi.open.platform.report.parking.
[FIX-M11] BUG-02: distance_m was stored in millimetres due to erroneous
* 1000 on an already-metric API value. Removed multiplication;
column renamed to distance_km in migration 04. Both poll_trips
and push_trip_report (webhook) corrected.
[FIX-M12] BUG-02: Renamed distance_m → distance_km in all SQL to match
migration 04 schema change.
[FIX-M13] POLL-02: Parking poll was returning 0 rows — added missing
acc_type=0 and account params; fixed response field durSecond
(was mapped as 'seconds').
[FIX-M14] POLL-01: New poll_track_list() — calls jimi.device.track.list
per device every 30 minutes to capture high-resolution GPS
waypoints between the 60-second fleet sweep snapshots. Writes to
position_history with source='track_list'. Fills gaps in route
reconstruction and enables accurate path drawing on maps.
[FIX-M15] POLL-03: New get_device_locations() utility — calls
jimi.device.location.get for up to 50 specific IMEIs on demand.
Used for precision refreshes (alarm enrichment, stale device
recovery) without waiting for the next full fleet sweep.
[FIX-M18] sync_devices: Pull vehicleName / vehicleNumber / driverName /
driverPhone / sim from jimi.track.device.detail first (list
endpoint returns null for these even when populated via
jimi.open.device.update).
[FIX-M19] Multi-account support: the fleet is split across multiple
Tracksolid sub-accounts. sync_devices, poll_live_positions
and poll_parking now iterate every target in TRACKSOLID_TARGETS
and dedupe/scope per-target before writing.
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
"""
import time
from concurrent.futures import ThreadPoolExecutor
import schedule
from datetime import datetime, timezone, timedelta
from psycopg2.extras import execute_values
from ts_shared_rev import (
TARGET_ACCOUNT,
TARGETS,
api_post,
get_active_imeis,
get_active_imeis_by_target,
get_conn,
get_token,
is_valid_fix,
log_ingestion,
clean,
clean_num,
clean_int,
clean_ts,
get_logger,
safe_task,
setup_shutdown,
)
log = get_logger("movement")
setup_shutdown(log)
# ── 1. Device Registry Sync (Daily) ──────────────────────────────────────────
def sync_devices():
log.info("Syncing device registry across %d target(s): %s", len(TARGETS), TARGETS)
t0, token = time.time(), get_token()
if not token: return
# [FIX-M19] Fleet is split across multiple sub-accounts. Aggregate the
# device list from every configured target and dedupe by IMEI.
devices_by_imei: dict[str, dict] = {}
for target in TARGETS:
resp = api_post("jimi.user.device.list", {"target": target}, token)
if resp.get("code") != 0:
log.warning("device.list failed for target=%s: code=%s msg=%s",
target, resp.get("code"), resp.get("message"))
continue
for d in (resp.get("result") or []):
imei = d.get("imei")
if imei:
devices_by_imei[imei] = d
devices = list(devices_by_imei.values())
log.info("Aggregated %d unique devices across targets.", len(devices))
upserted = 0
# Fetch per-device detail in parallel — previously an N+1 blocker where
# 80 devices × ~300 ms/call ≈ 24 s serial. 8 workers brings it to ~3 s.
# Gated at 8 to stay under API rate-limit (1006) headroom.
def _fetch_detail(imei: str) -> dict:
detail_resp = api_post("jimi.track.device.detail", {"imei": imei}, token)
return detail_resp.get("result") or {} if detail_resp.get("code") == 0 else {}
imeis = [d.get("imei") for d in devices if d.get("imei")]
with ThreadPoolExecutor(max_workers=8) as pool:
details = dict(zip(imeis, pool.map(_fetch_detail, imeis)))
with get_conn() as conn:
with conn.cursor() as cur:
for d in devices:
imei = d.get("imei")
if not imei: continue
dtl = details.get(imei, {})
cur.execute("""
INSERT INTO tracksolid.devices (
imei, device_name, mc_type, mc_type_use_scope,
vehicle_name, vehicle_number, vehicle_models, vehicle_icon,
vin, engine_number, vehicle_brand, fuel_100km,
driver_name, driver_phone, sim, iccid, imsi,
account, customer_name, device_group_id, device_group,
activation_time, expiration, enabled_flag, status,
current_mileage_km, last_synced_at
) VALUES (
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW()
)
ON CONFLICT (imei) DO UPDATE SET
device_name = EXCLUDED.device_name,
mc_type = EXCLUDED.mc_type,
mc_type_use_scope = EXCLUDED.mc_type_use_scope,
vehicle_name = EXCLUDED.vehicle_name,
vehicle_number = EXCLUDED.vehicle_number,
vehicle_models = EXCLUDED.vehicle_models,
vehicle_icon = EXCLUDED.vehicle_icon,
vin = EXCLUDED.vin,
engine_number = EXCLUDED.engine_number,
vehicle_brand = EXCLUDED.vehicle_brand,
fuel_100km = EXCLUDED.fuel_100km,
driver_name = EXCLUDED.driver_name,
driver_phone = EXCLUDED.driver_phone,
sim = EXCLUDED.sim,
iccid = EXCLUDED.iccid,
imsi = EXCLUDED.imsi,
account = EXCLUDED.account,
customer_name = EXCLUDED.customer_name,
device_group_id = EXCLUDED.device_group_id,
device_group = EXCLUDED.device_group,
activation_time = EXCLUDED.activation_time,
expiration = EXCLUDED.expiration,
enabled_flag = EXCLUDED.enabled_flag,
status = EXCLUDED.status,
current_mileage_km = EXCLUDED.current_mileage_km,
last_synced_at = NOW(),
updated_at = NOW()
""", (
# [FIX-M18] vehicleName/vehicleNumber/driverName/driverPhone/sim
# only surface via jimi.track.device.detail — list returns null.
imei, clean(d.get("deviceName")), clean(d.get("mcType")), clean(d.get("mcTypeUseScope")),
clean(dtl.get("vehicleName") or d.get("vehicleName")),
clean(dtl.get("vehicleNumber") or d.get("vehicleNumber")),
clean(d.get("vehicleModels")), clean(d.get("vehicleIcon")),
clean(dtl.get("vin")), clean(dtl.get("engineNumber")), clean(dtl.get("vehicleBrand")), clean_num(dtl.get("fuel_100km")),
clean(dtl.get("driverName") or d.get("driverName")),
clean(dtl.get("driverPhone") or d.get("driverPhone")),
clean(dtl.get("sim") or d.get("sim")),
clean(dtl.get("iccid")), clean(dtl.get("imsi")),
clean(dtl.get("account")), clean(dtl.get("customerName")), clean(d.get("deviceGroupId")), clean(d.get("deviceGroup")),
clean_ts(d.get("activationTime")), clean_ts(d.get("expiration")), clean_int(d.get("enabledFlag", 1)),
clean(dtl.get("status", "active")), clean_num(dtl.get("currentMileage"))
))
upserted += 1
log_ingestion(cur, "jimi.user.device.list+detail", len(devices), upserted, 0, int((time.time()-t0)*1000), True)
conn.commit()
log.info("Registry sync: %d devices updated.", upserted)
# ── 2. Live Positions (Every 60s) ─────────────────────────────────────────────
def poll_live_positions():
t0, token = time.time(), get_token()
if not token: return
# [FIX-M19] Iterate every target and dedupe by IMEI (keep last).
positions_by_imei: dict[str, dict] = {}
for target in TARGETS:
resp = api_post("jimi.user.device.location.list",
{"target": target, "map_type": "GOOGLE"}, token)
if resp.get("code") != 0:
log.warning("location.list failed for target=%s: code=%s msg=%s",
target, resp.get("code"), resp.get("message"))
continue
for p in (resp.get("result") or []):
imei = p.get("imei")
if imei:
positions_by_imei[imei] = p
positions = list(positions_by_imei.values())
# [BUG-10] Variable names match the ingestion_log column they feed:
# live_pos_affected → rows_upserted (DO UPDATE: rowcount=1 per touch)
# history_inserted → rows_inserted (DO NOTHING: rowcount=1 only on real insert)
live_pos_affected, history_inserted = 0, 0
with get_conn() as conn:
with conn.cursor() as cur:
for p in positions:
try:
cur.execute("SAVEPOINT sp")
imei, lat, lng = p.get("imei"), clean_num(p.get("lat")), clean_num(p.get("lng"))
if not imei or not is_valid_fix(lat, lng):
cur.execute("RELEASE SAVEPOINT sp")
continue
gps_time = clean_ts(p.get("gpsTime"))
speed = clean_num(p.get("speed"))
direction = clean_num(p.get("direction"))
acc_status = clean(p.get("accStatus"))
gps_num = clean_int(p.get("gpsNum"))
current_mileage = clean_num(p.get("currentMileage"))
cur.execute("""
INSERT INTO tracksolid.live_positions (
imei, geom, lat, lng, pos_type, confidence, gps_time, hb_time,
speed, direction, acc_status, gps_signal, gps_num,
elec_quantity, power_value, battery_power_val, tracker_oil,
temperature, current_mileage, device_status, loc_desc, recorded_at
) VALUES (
%s, ST_SetSRID(ST_MakePoint(%s, %s), 4326), %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW()
)
ON CONFLICT (imei) DO UPDATE SET
geom=EXCLUDED.geom, lat=EXCLUDED.lat, lng=EXCLUDED.lng,
gps_time=EXCLUDED.gps_time, speed=EXCLUDED.speed, direction=EXCLUDED.direction,
acc_status=EXCLUDED.acc_status, current_mileage=EXCLUDED.current_mileage,
updated_at=NOW()
""", (
imei, lng, lat, lat, lng, clean(p.get("posType")), clean_int(p.get("confidence")),
gps_time, clean_ts(p.get("hbTime")), speed,
direction, acc_status, clean_int(p.get("gpsSignal")),
gps_num, clean_num(p.get("electQuantity")), clean_num(p.get("powerValue")),
clean_num(p.get("batteryPowerVal")), clean(p.get("trackerOil")), clean_num(p.get("temperature")),
current_mileage, clean(p.get("status")), clean(p.get("locDesc"))
))
live_pos_affected += cur.rowcount
# History (Hypertable Source)
if gps_time:
cur.execute("""
INSERT INTO tracksolid.position_history (imei, gps_time, geom, lat, lng, speed, direction, acc_status, satellite, current_mileage)
VALUES (%s, %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326), %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (imei, gps_time) DO NOTHING
""", (imei, gps_time, lng, lat, lat, lng, speed, direction, acc_status, gps_num, current_mileage))
history_inserted += cur.rowcount
cur.execute("RELEASE SAVEPOINT sp")
except Exception:
cur.execute("ROLLBACK TO SAVEPOINT sp")
log.warning("Failed to process live position for %s", p.get("imei"), exc_info=True)
log_ingestion(cur, "jimi.user.device.location.list", len(positions),
live_pos_affected, history_inserted,
int((time.time()-t0)*1000), True)
# ── 3. Trip Reports (Every 15m) ───────────────────────────────────────────────
def poll_trips():
t0 = time.time()
token, imeis = get_token(), get_active_imeis()
if not token or not imeis: return
end_ts = datetime.now(timezone.utc)
start_ts = end_ts - timedelta(hours=1)
# [BUG-10] trips.cur.rowcount counts both real inserts and DO UPDATE
# touches — i.e. affected rows from an upsert. Route to rows_upserted
# in ingestion_log, not rows_inserted.
trips_affected = 0
with get_conn() as conn:
with conn.cursor() as cur:
for i in range(0, len(imeis), 50):
batch = imeis[i:i+50]
resp = api_post("jimi.device.track.mileage", {
"imeis": ",".join(batch),
"begin_time": start_ts.strftime("%Y-%m-%d %H:%M:%S"),
"end_time": end_ts.strftime("%Y-%m-%d %H:%M:%S")
}, token)
trips = resp.get("result") or []
for t in trips:
try:
cur.execute("SAVEPOINT sp")
# [FIX-M16] API returns distance in METRES despite documentation saying km.
# Confirmed via: avgSpeed(km/h) × runTimeSecond / 3600 == distance/1000.
# startMileage/endMileage are cumulative odometer in metres (same unit).
# Divide by 1000 to store as distance_km.
raw_dist = clean_num(t.get("distance"))
dist_km = round(raw_dist / 1000.0, 4) if raw_dist is not None else None
cur.execute("""
INSERT INTO tracksolid.trips (
imei, start_time, end_time, distance_km,
avg_speed_kmh, max_speed_kmh, driving_time_s, source
) VALUES (%s, %s, %s, %s, %s, %s, %s, 'poll')
ON CONFLICT (imei, start_time) DO UPDATE SET
end_time = EXCLUDED.end_time,
distance_km = EXCLUDED.distance_km,
max_speed_kmh = COALESCE(EXCLUDED.max_speed_kmh, tracksolid.trips.max_speed_kmh),
driving_time_s = COALESCE(EXCLUDED.driving_time_s, tracksolid.trips.driving_time_s)
""", (
t.get("imei"), clean_ts(t.get("startTime")), clean_ts(t.get("endTime")),
dist_km, clean_num(t.get("avgSpeed")),
clean_num(t.get("maxSpeed")), clean_int(t.get("runTimeSecond"))
))
cur.execute("RELEASE SAVEPOINT sp")
trips_affected += cur.rowcount
except Exception:
cur.execute("ROLLBACK TO SAVEPOINT sp")
log.warning("Failed to process trip for %s", t.get("imei"), exc_info=True)
log_ingestion(cur, "jimi.device.track.mileage", len(imeis),
trips_affected, 0,
int((time.time() - t0) * 1000), True)
log.info("Trips: %d records affected.", trips_affected)
# ── 4. Parking Events (Every 15m) ─────────────────────────────────────────────
def poll_parking():
t0 = time.time()
# [FIX-M19] Parking requires an `account` param tied to the IMEI's
# sub-account — call per target with that target's IMEIs only.
token, imeis_by_target = get_token(), get_active_imeis_by_target()
if not token or not imeis_by_target: return
end_ts = datetime.now(timezone.utc)
start_ts = end_ts - timedelta(hours=1)
total_imei = sum(len(v) for v in imeis_by_target.values())
inserted = 0
with get_conn() as conn:
with conn.cursor() as cur:
for target, target_imeis in imeis_by_target.items():
for i in range(0, len(target_imeis), 50):
batch = target_imeis[i:i+50]
# [FIX-M13] account + acc_type=0 required for non-empty results.
resp = api_post("jimi.open.platform.report.parking", {
"account": target,
"imeis": ",".join(batch),
"begin_time": start_ts.strftime("%Y-%m-%d %H:%M:%S"),
"end_time": end_ts.strftime("%Y-%m-%d %H:%M:%S"),
"acc_type": 0,
}, token)
events = resp.get("result") or []
for p in events:
try:
cur.execute("SAVEPOINT sp")
imei = p.get("imei")
start_time = clean_ts(p.get("startTime"))
if not imei or not start_time:
cur.execute("RELEASE SAVEPOINT sp")
continue
lat, lng = clean_num(p.get("lat")), clean_num(p.get("lng"))
cur.execute("""
INSERT INTO tracksolid.parking_events (
imei, event_type, start_time, end_time,
duration_seconds, geom, address
) VALUES (
%s, 'parking', %s, %s, %s,
CASE WHEN %s IS NOT NULL AND %s IS NOT NULL
THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326)
ELSE NULL END,
%s
) ON CONFLICT (imei, start_time, event_type) DO NOTHING
""", (
imei, start_time, clean_ts(p.get("endTime")),
clean_int(p.get("durSecond")), # [FIX-M13] API returns durSecond, not seconds
lng, lat, lng, lat,
clean(p.get("address"))
))
cur.execute("RELEASE SAVEPOINT sp")
inserted += cur.rowcount
except Exception:
cur.execute("ROLLBACK TO SAVEPOINT sp")
log.warning("Failed to process parking for %s", p.get("imei"), exc_info=True)
log_ingestion(cur, "jimi.open.platform.report.parking", total_imei, 0, inserted,
int((time.time() - t0) * 1000), True)
log.info("Parking: %d events processed across %d target(s).", inserted, len(imeis_by_target))
# ── 5. High-Resolution GPS Trail (Every 30m) — POLL-01 ───────────────────────
def poll_track_list():
"""[FIX-M14] Fetch per-device GPS waypoint trails via jimi.device.track.list.
The 60-second fleet sweep (poll_live_positions) captures only the most
recent fix per vehicle — all motion between sweeps is invisible. This
function retrieves every waypoint the device logged in the last 35 minutes
(5-min overlap ensures no gaps at scheduling boundaries) and inserts them
into position_history with source='track_list'.
Impact on reporting:
- position_history row density increases from ~1/min to ~14/min per device
- Route traces in Grafana become accurate continuous paths
- Speed profile queries gain meaningful resolution (avg over 10s intervals
vs 60s intervals) — enables hard-braking / harsh-acceleration detection
- v_mileage_daily_cagg continuous aggregate gains finer odometer deltas
"""
t0 = time.time()
token, imeis = get_token(), get_active_imeis()
if not token or not imeis:
return
end_ts = datetime.now(timezone.utc)
start_ts = end_ts - timedelta(minutes=35) # 5-min overlap avoids boundary gaps
begin_str = start_ts.strftime("%Y-%m-%d %H:%M:%S")
end_str = end_ts.strftime("%Y-%m-%d %H:%M:%S")
# Phase 1: fetch waypoints from API without holding a DB connection.
# jimi.device.track.list is per-IMEI; parallelise at 4 workers to speed
# up the 30 min sweep without tripping the 1006 rate limit.
def _fetch(imei: str):
resp = api_post("jimi.device.track.list", {
"imei": imei,
"begin_time": begin_str,
"end_time": end_str,
"map_type": "GOOGLE",
}, token)
return imei, resp.get("result") or []
with ThreadPoolExecutor(max_workers=4) as pool:
fetched = list(pool.map(_fetch, imeis))
# Phase 2: write rows in one DB transaction.
total_inserted = 0
devices_with_data = 0
rows = []
for imei, waypoints in fetched:
device_rows = 0
for wp in waypoints:
lat = clean_num(wp.get("lat"))
lng = clean_num(wp.get("lng"))
gps_time = clean_ts(wp.get("gpsTime"))
if not is_valid_fix(lat, lng) or not gps_time:
continue
rows.append((
imei, gps_time,
lng, lat, # ST_MakePoint(lng, lat)
lat, lng, # lat, lng columns
clean_num(wp.get("gpsSpeed")),
clean_num(wp.get("direction")),
clean(wp.get("accStatus")),
))
device_rows += 1
if device_rows:
devices_with_data += 1
if rows:
with get_conn() as conn:
with conn.cursor() as cur:
execute_values(
cur,
"""
INSERT INTO tracksolid.position_history (
imei, gps_time, geom, lat, lng,
speed, direction, acc_status, source
) VALUES %s
ON CONFLICT (imei, gps_time) DO NOTHING
""",
rows,
template="(%s, %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326),"
" %s, %s, %s, %s, %s, 'track_list')",
page_size=500,
)
total_inserted = cur.rowcount
log_ingestion(cur, "jimi.device.track.list", len(imeis),
0, total_inserted, int((time.time() - t0) * 1000), True)
else:
with get_conn() as conn:
with conn.cursor() as cur:
log_ingestion(cur, "jimi.device.track.list", len(imeis),
0, 0, int((time.time() - t0) * 1000), True)
log.info("Track list: %d waypoints inserted across %d/%d devices.",
total_inserted, devices_with_data, len(imeis))
# ── 6. On-Demand Device Location Refresh — POLL-03 ───────────────────────────
def get_device_locations(imeis: list) -> int:
"""[FIX-M15] Precision position refresh for a specific list of IMEIs.
Calls jimi.device.location.get (up to 50 IMEIs per call) and upserts
results into live_positions. Use this for:
- Alarm enrichment: get exact position immediately after an alarm fires
- Stale device recovery: force-refresh a vehicle that has been offline
- Dashboard on-demand refresh without waiting for the 60s fleet sweep
Returns the number of positions successfully upserted.
"""
token = get_token()
if not token or not imeis:
return 0
upserted = 0
with get_conn() as conn:
with conn.cursor() as cur:
for i in range(0, len(imeis), 50):
batch = imeis[i:i + 50]
resp = api_post("jimi.device.location.get", {
"imeis": ",".join(batch),
"map_type": "GOOGLE",
}, token)
positions = resp.get("result") or []
for p in positions:
imei = p.get("imei")
lat = clean_num(p.get("lat"))
lng = clean_num(p.get("lng"))
if not imei or not is_valid_fix(lat, lng):
continue
cur.execute("""
INSERT INTO tracksolid.live_positions (
imei, geom, lat, lng, speed, direction,
gps_time, acc_status, current_mileage, recorded_at
) VALUES (
%s, ST_SetSRID(ST_MakePoint(%s, %s), 4326),
%s, %s, %s, %s, %s, %s, %s, NOW()
)
ON CONFLICT (imei) DO UPDATE SET
geom = EXCLUDED.geom,
lat = EXCLUDED.lat,
lng = EXCLUDED.lng,
speed = EXCLUDED.speed,
direction = EXCLUDED.direction,
gps_time = EXCLUDED.gps_time,
acc_status = EXCLUDED.acc_status,
current_mileage = EXCLUDED.current_mileage,
updated_at = NOW()
""", (
imei, lng, lat, lat, lng,
clean_num(p.get("speed")),
clean_num(p.get("direction")),
clean_ts(p.get("gpsTime")),
clean(p.get("accStatus")),
clean_num(p.get("currentMileage")),
))
upserted += 1
conn.commit()
log.info("get_device_locations: %d positions refreshed.", upserted)
return upserted
# ── Main Loop ─────────────────────────────────────────────────────────────────
def main():
log.info("Starting MOVEMENT PIPELINE (v2.2)...")
# Startup catch-up
safe_task(sync_devices, log)()
safe_task(poll_live_positions, log)()
safe_task(poll_trips, log)()
safe_task(poll_parking, log)()
safe_task(poll_track_list, log)()
# Schedule
schedule.every(60).seconds.do(safe_task(poll_live_positions, log))
schedule.every(15).minutes.do(safe_task(poll_trips, log))
schedule.every(15).minutes.do(safe_task(poll_parking, log))
schedule.every(30).minutes.do(safe_task(poll_track_list, log)) # [FIX-M14]
schedule.every().day.at("02:00").do(safe_task(sync_devices, log))
while True:
schedule.run_pending()
time.sleep(1)
if __name__ == "__main__":
main()