fleet-platform/scripts/simulate_trips_from_legacy.py

239 lines
8.2 KiB
Python
Raw Normal View History

"""Simulate serve.fn_vehicle_trips against the REPORTS/*.json legacy dumps.
The legacy DB exports per-fix rows with: gps_time_utc, lat, lng, speed,
stationary (bool), trip_id (legacy's own assignment). It does NOT expose
acc_state so for this sim we map stationary=True acc_state=0,
stationary=False acc_state=1. That's a fair proxy: legacy's `stationary`
is speed-derived, and ACC_OFF/stationary is what triggers a work-stop in
the new algorithm.
Output: legacy trip count vs new-algorithm trip count, plus the new-algorithm
trip list (start, end, duration, distance, end_reason). Eyeball-comparable
with the legacy trip listing.
Run: python3 scripts/simulate_trips_from_legacy.py REPORTS/query_2-2026-638J.json
"""
from __future__ import annotations
import json
import math
import sys
from collections import defaultdict
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Any
STOP_THRESH = timedelta(minutes=5)
GAP_THRESH = timedelta(minutes=30)
NOFIX_STOP_THRESH = timedelta(minutes=5) # NEW: a 5-min reporting silence ends a trip
STAT_KMH = 5.0
EARTH_R = 6371000.0
def haversine_m(lat1: float, lng1: float, lat2: float, lng2: float) -> float:
phi1, phi2 = math.radians(lat1), math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlam = math.radians(lng2 - lng1)
a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2
return 2 * EARTH_R * math.asin(math.sqrt(a))
def parse_ts(s: str) -> datetime:
# legacy "2026-05-27 04:11:38+00"
s = s.replace("+00", "+00:00")
return datetime.fromisoformat(s)
@dataclass
class Trip:
trip_id: int
started_at: datetime
ended_at: datetime | None = None
distance_m: float = 0.0
idling_sec: float = 0.0
end_reason: str = "open"
n_fixes: int = 0
def simulate(rows: list[dict[str, Any]]) -> dict[str, Any]:
rows.sort(key=lambda r: r["gps_time_utc"])
trips: list[Trip] = []
in_trip = False
cur: Trip | None = None
prev_at: datetime | None = None
prev_lat: float | None = None
prev_lng: float | None = None
prev_state: str | None = None
off_run_start: datetime | None = None
idle_run_start: datetime | None = None
reporting_time: datetime | None = None
total_distance_m = 0.0
driving_sec = idling_sec = stopped_sec = unknown_sec = 0.0
for r in rows:
at = parse_ts(r["gps_time_utc"])
lat, lng = float(r["lat"]), float(r["lng"])
speed = float(r["speed"])
stationary = r["stationary"]
# proxy acc from stationary
acc = 0 if stationary else 1
# If we're mid-trip and there's been a NOFIX_STOP_THRESH silence,
# close the trip at the prior fix (work stop) BEFORE processing this row.
if (
in_trip
and prev_at is not None
and at - prev_at >= NOFIX_STOP_THRESH
):
cur.ended_at = prev_at
cur.end_reason = "nofix_stop"
trips.append(cur)
in_trip = False
cur = None
off_run_start = None
idle_run_start = None
# classify state
if prev_at is not None and at - prev_at > GAP_THRESH:
pos_state = "unknown"
elif speed >= STAT_KMH:
pos_state = "moving"
elif acc == 1:
pos_state = "idling"
else:
pos_state = "stopped"
if reporting_time is None and acc == 1:
reporting_time = at
if prev_at is not None:
step_sec = (at - prev_at).total_seconds()
if prev_state == "moving":
driving_sec += step_sec
step_m = haversine_m(prev_lat, prev_lng, lat, lng)
total_distance_m += step_m
if in_trip:
cur.distance_m += step_m
elif prev_state == "idling":
idling_sec += step_sec
elif prev_state == "stopped":
stopped_sec += step_sec
else:
unknown_sec += step_sec
# state machine
if pos_state == "unknown":
if in_trip:
cur.ended_at = prev_at
cur.end_reason = "long_gap"
trips.append(cur)
in_trip = False
cur = None
off_run_start = None
idle_run_start = None
elif not in_trip:
if pos_state in ("moving", "idling") or acc == 1:
cur = Trip(trip_id=len(trips) + 1, started_at=at, n_fixes=1)
in_trip = True
off_run_start = None
idle_run_start = at if pos_state == "idling" else None
else:
cur.n_fixes += 1
if pos_state == "stopped":
if off_run_start is None:
off_run_start = at
idle_run_start = None
if at - off_run_start >= STOP_THRESH:
cur.ended_at = off_run_start
cur.end_reason = "work_stop"
trips.append(cur)
in_trip = False
cur = None
off_run_start = None
elif pos_state == "idling":
off_run_start = None
if idle_run_start is None:
idle_run_start = at
else: # moving
off_run_start = None
if idle_run_start is not None:
idle_dur = at - idle_run_start
if idle_dur >= STOP_THRESH:
cur.idling_sec += idle_dur.total_seconds()
idle_run_start = None
prev_at = at
prev_lat = lat
prev_lng = lng
prev_state = pos_state
if in_trip and cur is not None:
cur.ended_at = prev_at
cur.end_reason = "day_end"
trips.append(cur)
return {
"reporting_time": reporting_time,
"trips": trips,
"total_distance_km": round(total_distance_m / 1000, 2),
"driving_min": round(driving_sec / 60, 1),
"idling_min": round(idling_sec / 60, 1),
"stopped_min": round(stopped_sec / 60, 1),
"unknown_min": round(unknown_sec / 60, 1),
}
def legacy_summary(rows: list[dict[str, Any]]) -> dict[str, Any]:
rows = sorted(rows, key=lambda r: r["gps_time_utc"])
by_trip: dict[str, list[dict[str, Any]]] = defaultdict(list)
for r in rows:
by_trip[r["trip_id"]].append(r)
trips = []
for tid, rs in by_trip.items():
if tid == "0":
continue
rs.sort(key=lambda r: r["gps_time_utc"])
t0 = parse_ts(rs[0]["gps_time_utc"])
t1 = parse_ts(rs[-1]["gps_time_utc"])
trips.append((tid, t0, t1, len(rs)))
trips.sort(key=lambda t: t[1])
return {"trips": trips, "n_stationary_rows": sum(1 for r in rows if r["trip_id"] == "0")}
def main(path: str) -> None:
with open(path) as f:
rows = json.load(f)
print(f"\n=== {path} ===")
print(f"raw rows: {len(rows)}")
leg = legacy_summary(rows)
print("\n-- LEGACY --")
print(f"trips: {len(leg['trips'])} (plus {leg['n_stationary_rows']} stationary-bucket rows)")
for tid, t0, t1, n in leg["trips"]:
dur = (t1 - t0).total_seconds() / 60
print(f" trip {tid}: {t0:%H:%M:%S}{t1:%H:%M:%S} ({dur:.0f} min, {n} fixes)")
sim = simulate(rows)
print("\n-- NEW ALGO (5min stop, 30min gap, 5 km/h stationary) --")
rep = sim["reporting_time"]
print(f"reporting_time: {rep.isoformat() if rep else 'none'}")
print(f"day totals: distance={sim['total_distance_km']} km, "
f"driving={sim['driving_min']} min, idling={sim['idling_min']} min, "
f"stopped={sim['stopped_min']} min, unknown={sim['unknown_min']} min")
print(f"trips: {len(sim['trips'])}")
for t in sim["trips"]:
dur = (t.ended_at - t.started_at).total_seconds() / 60
print(f" trip {t.trip_id}: {t.started_at:%H:%M:%S}"
f"{t.ended_at:%H:%M:%S} "
f"({dur:.0f} min, {t.distance_m/1000:.2f} km, "
f"{t.idling_sec/60:.1f} idle, end={t.end_reason}, {t.n_fixes} fixes)")
if __name__ == "__main__":
for p in sys.argv[1:] or ["REPORTS/query_2-2026-638J.json", "REPORTS/query_2-2026-728K.json"]:
main(p)