"""Simulate serve.fn_vehicle_trips against the REPORTS/*.json legacy dumps. The legacy DB exports per-fix rows with: gps_time_utc, lat, lng, speed, stationary (bool), trip_id (legacy's own assignment). It does NOT expose acc_state — so for this sim we map stationary=True → acc_state=0, stationary=False → acc_state=1. That's a fair proxy: legacy's `stationary` is speed-derived, and ACC_OFF/stationary is what triggers a work-stop in the new algorithm. Output: legacy trip count vs new-algorithm trip count, plus the new-algorithm trip list (start, end, duration, distance, end_reason). Eyeball-comparable with the legacy trip listing. Run: python3 scripts/simulate_trips_from_legacy.py REPORTS/query_2-2026-638J.json """ from __future__ import annotations import json import math import sys from collections import defaultdict from dataclasses import dataclass from datetime import datetime, timedelta from typing import Any STOP_THRESH = timedelta(minutes=5) GAP_THRESH = timedelta(minutes=30) NOFIX_STOP_THRESH = timedelta(minutes=5) # NEW: a 5-min reporting silence ends a trip STAT_KMH = 5.0 EARTH_R = 6371000.0 def haversine_m(lat1: float, lng1: float, lat2: float, lng2: float) -> float: phi1, phi2 = math.radians(lat1), math.radians(lat2) dphi = math.radians(lat2 - lat1) dlam = math.radians(lng2 - lng1) a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2 return 2 * EARTH_R * math.asin(math.sqrt(a)) def parse_ts(s: str) -> datetime: # legacy "2026-05-27 04:11:38+00" s = s.replace("+00", "+00:00") return datetime.fromisoformat(s) @dataclass class Trip: trip_id: int started_at: datetime ended_at: datetime | None = None distance_m: float = 0.0 idling_sec: float = 0.0 end_reason: str = "open" n_fixes: int = 0 def simulate(rows: list[dict[str, Any]]) -> dict[str, Any]: rows.sort(key=lambda r: r["gps_time_utc"]) trips: list[Trip] = [] in_trip = False cur: Trip | None = None prev_at: datetime | None = None prev_lat: float | None = None prev_lng: float | None = None prev_state: str | None = None off_run_start: datetime | None = None idle_run_start: datetime | None = None reporting_time: datetime | None = None total_distance_m = 0.0 driving_sec = idling_sec = stopped_sec = unknown_sec = 0.0 for r in rows: at = parse_ts(r["gps_time_utc"]) lat, lng = float(r["lat"]), float(r["lng"]) speed = float(r["speed"]) stationary = r["stationary"] # proxy acc from stationary acc = 0 if stationary else 1 # If we're mid-trip and there's been a NOFIX_STOP_THRESH silence, # close the trip at the prior fix (work stop) BEFORE processing this row. if ( in_trip and prev_at is not None and at - prev_at >= NOFIX_STOP_THRESH ): cur.ended_at = prev_at cur.end_reason = "nofix_stop" trips.append(cur) in_trip = False cur = None off_run_start = None idle_run_start = None # classify state if prev_at is not None and at - prev_at > GAP_THRESH: pos_state = "unknown" elif speed >= STAT_KMH: pos_state = "moving" elif acc == 1: pos_state = "idling" else: pos_state = "stopped" if reporting_time is None and acc == 1: reporting_time = at if prev_at is not None: step_sec = (at - prev_at).total_seconds() if prev_state == "moving": driving_sec += step_sec step_m = haversine_m(prev_lat, prev_lng, lat, lng) total_distance_m += step_m if in_trip: cur.distance_m += step_m elif prev_state == "idling": idling_sec += step_sec elif prev_state == "stopped": stopped_sec += step_sec else: unknown_sec += step_sec # state machine if pos_state == "unknown": if in_trip: cur.ended_at = prev_at cur.end_reason = "long_gap" trips.append(cur) in_trip = False cur = None off_run_start = None idle_run_start = None elif not in_trip: if pos_state in ("moving", "idling") or acc == 1: cur = Trip(trip_id=len(trips) + 1, started_at=at, n_fixes=1) in_trip = True off_run_start = None idle_run_start = at if pos_state == "idling" else None else: cur.n_fixes += 1 if pos_state == "stopped": if off_run_start is None: off_run_start = at idle_run_start = None if at - off_run_start >= STOP_THRESH: cur.ended_at = off_run_start cur.end_reason = "work_stop" trips.append(cur) in_trip = False cur = None off_run_start = None elif pos_state == "idling": off_run_start = None if idle_run_start is None: idle_run_start = at else: # moving off_run_start = None if idle_run_start is not None: idle_dur = at - idle_run_start if idle_dur >= STOP_THRESH: cur.idling_sec += idle_dur.total_seconds() idle_run_start = None prev_at = at prev_lat = lat prev_lng = lng prev_state = pos_state if in_trip and cur is not None: cur.ended_at = prev_at cur.end_reason = "day_end" trips.append(cur) return { "reporting_time": reporting_time, "trips": trips, "total_distance_km": round(total_distance_m / 1000, 2), "driving_min": round(driving_sec / 60, 1), "idling_min": round(idling_sec / 60, 1), "stopped_min": round(stopped_sec / 60, 1), "unknown_min": round(unknown_sec / 60, 1), } def legacy_summary(rows: list[dict[str, Any]]) -> dict[str, Any]: rows = sorted(rows, key=lambda r: r["gps_time_utc"]) by_trip: dict[str, list[dict[str, Any]]] = defaultdict(list) for r in rows: by_trip[r["trip_id"]].append(r) trips = [] for tid, rs in by_trip.items(): if tid == "0": continue rs.sort(key=lambda r: r["gps_time_utc"]) t0 = parse_ts(rs[0]["gps_time_utc"]) t1 = parse_ts(rs[-1]["gps_time_utc"]) trips.append((tid, t0, t1, len(rs))) trips.sort(key=lambda t: t[1]) return {"trips": trips, "n_stationary_rows": sum(1 for r in rows if r["trip_id"] == "0")} def main(path: str) -> None: with open(path) as f: rows = json.load(f) print(f"\n=== {path} ===") print(f"raw rows: {len(rows)}") leg = legacy_summary(rows) print("\n-- LEGACY --") print(f"trips: {len(leg['trips'])} (plus {leg['n_stationary_rows']} stationary-bucket rows)") for tid, t0, t1, n in leg["trips"]: dur = (t1 - t0).total_seconds() / 60 print(f" trip {tid}: {t0:%H:%M:%S} → {t1:%H:%M:%S} ({dur:.0f} min, {n} fixes)") sim = simulate(rows) print("\n-- NEW ALGO (5min stop, 30min gap, 5 km/h stationary) --") rep = sim["reporting_time"] print(f"reporting_time: {rep.isoformat() if rep else 'none'}") print(f"day totals: distance={sim['total_distance_km']} km, " f"driving={sim['driving_min']} min, idling={sim['idling_min']} min, " f"stopped={sim['stopped_min']} min, unknown={sim['unknown_min']} min") print(f"trips: {len(sim['trips'])}") for t in sim["trips"]: dur = (t.ended_at - t.started_at).total_seconds() / 60 print(f" trip {t.trip_id}: {t.started_at:%H:%M:%S} → " f"{t.ended_at:%H:%M:%S} " f"({dur:.0f} min, {t.distance_m/1000:.2f} km, " f"{t.idling_sec/60:.1f} idle, end={t.end_reason}, {t.n_fixes} fixes)") if __name__ == "__main__": for p in sys.argv[1:] or ["REPORTS/query_2-2026-638J.json", "REPORTS/query_2-2026-728K.json"]: main(p)