238 lines
8.2 KiB
Python
238 lines
8.2 KiB
Python
|
|
"""Simulate serve.fn_vehicle_trips against the REPORTS/*.json legacy dumps.
|
||
|
|
|
||
|
|
The legacy DB exports per-fix rows with: gps_time_utc, lat, lng, speed,
|
||
|
|
stationary (bool), trip_id (legacy's own assignment). It does NOT expose
|
||
|
|
acc_state — so for this sim we map stationary=True → acc_state=0,
|
||
|
|
stationary=False → acc_state=1. That's a fair proxy: legacy's `stationary`
|
||
|
|
is speed-derived, and ACC_OFF/stationary is what triggers a work-stop in
|
||
|
|
the new algorithm.
|
||
|
|
|
||
|
|
Output: legacy trip count vs new-algorithm trip count, plus the new-algorithm
|
||
|
|
trip list (start, end, duration, distance, end_reason). Eyeball-comparable
|
||
|
|
with the legacy trip listing.
|
||
|
|
|
||
|
|
Run: python3 scripts/simulate_trips_from_legacy.py REPORTS/query_2-2026-638J.json
|
||
|
|
"""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import json
|
||
|
|
import math
|
||
|
|
import sys
|
||
|
|
from collections import Counter, defaultdict
|
||
|
|
from dataclasses import dataclass, field
|
||
|
|
from datetime import datetime, timedelta
|
||
|
|
from typing import Any
|
||
|
|
|
||
|
|
STOP_THRESH = timedelta(minutes=5)
|
||
|
|
GAP_THRESH = timedelta(minutes=30)
|
||
|
|
NOFIX_STOP_THRESH = timedelta(minutes=5) # NEW: a 5-min reporting silence ends a trip
|
||
|
|
STAT_KMH = 5.0
|
||
|
|
EARTH_R = 6371000.0
|
||
|
|
|
||
|
|
|
||
|
|
def haversine_m(lat1: float, lng1: float, lat2: float, lng2: float) -> float:
|
||
|
|
phi1, phi2 = math.radians(lat1), math.radians(lat2)
|
||
|
|
dphi = math.radians(lat2 - lat1)
|
||
|
|
dlam = math.radians(lng2 - lng1)
|
||
|
|
a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2
|
||
|
|
return 2 * EARTH_R * math.asin(math.sqrt(a))
|
||
|
|
|
||
|
|
|
||
|
|
def parse_ts(s: str) -> datetime:
|
||
|
|
# legacy "2026-05-27 04:11:38+00"
|
||
|
|
s = s.replace("+00", "+00:00")
|
||
|
|
return datetime.fromisoformat(s)
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class Trip:
|
||
|
|
trip_id: int
|
||
|
|
started_at: datetime
|
||
|
|
ended_at: datetime | None = None
|
||
|
|
distance_m: float = 0.0
|
||
|
|
idling_sec: float = 0.0
|
||
|
|
end_reason: str = "open"
|
||
|
|
n_fixes: int = 0
|
||
|
|
|
||
|
|
|
||
|
|
def simulate(rows: list[dict[str, Any]]) -> dict[str, Any]:
|
||
|
|
rows.sort(key=lambda r: r["gps_time_utc"])
|
||
|
|
|
||
|
|
trips: list[Trip] = []
|
||
|
|
in_trip = False
|
||
|
|
cur: Trip | None = None
|
||
|
|
prev_at: datetime | None = None
|
||
|
|
prev_lat: float | None = None
|
||
|
|
prev_lng: float | None = None
|
||
|
|
prev_state: str | None = None
|
||
|
|
off_run_start: datetime | None = None
|
||
|
|
idle_run_start: datetime | None = None
|
||
|
|
reporting_time: datetime | None = None
|
||
|
|
total_distance_m = 0.0
|
||
|
|
driving_sec = idling_sec = stopped_sec = unknown_sec = 0.0
|
||
|
|
|
||
|
|
for r in rows:
|
||
|
|
at = parse_ts(r["gps_time_utc"])
|
||
|
|
lat, lng = float(r["lat"]), float(r["lng"])
|
||
|
|
speed = float(r["speed"])
|
||
|
|
stationary = r["stationary"]
|
||
|
|
# proxy acc from stationary
|
||
|
|
acc = 0 if stationary else 1
|
||
|
|
|
||
|
|
# If we're mid-trip and there's been a NOFIX_STOP_THRESH silence,
|
||
|
|
# close the trip at the prior fix (work stop) BEFORE processing this row.
|
||
|
|
if (
|
||
|
|
in_trip
|
||
|
|
and prev_at is not None
|
||
|
|
and at - prev_at >= NOFIX_STOP_THRESH
|
||
|
|
):
|
||
|
|
cur.ended_at = prev_at
|
||
|
|
cur.end_reason = "nofix_stop"
|
||
|
|
trips.append(cur)
|
||
|
|
in_trip = False
|
||
|
|
cur = None
|
||
|
|
off_run_start = None
|
||
|
|
idle_run_start = None
|
||
|
|
|
||
|
|
# classify state
|
||
|
|
if prev_at is not None and at - prev_at > GAP_THRESH:
|
||
|
|
pos_state = "unknown"
|
||
|
|
elif speed >= STAT_KMH:
|
||
|
|
pos_state = "moving"
|
||
|
|
elif acc == 1:
|
||
|
|
pos_state = "idling"
|
||
|
|
else:
|
||
|
|
pos_state = "stopped"
|
||
|
|
|
||
|
|
if reporting_time is None and acc == 1:
|
||
|
|
reporting_time = at
|
||
|
|
|
||
|
|
if prev_at is not None:
|
||
|
|
step_sec = (at - prev_at).total_seconds()
|
||
|
|
if prev_state == "moving":
|
||
|
|
driving_sec += step_sec
|
||
|
|
step_m = haversine_m(prev_lat, prev_lng, lat, lng)
|
||
|
|
total_distance_m += step_m
|
||
|
|
if in_trip:
|
||
|
|
cur.distance_m += step_m
|
||
|
|
elif prev_state == "idling":
|
||
|
|
idling_sec += step_sec
|
||
|
|
elif prev_state == "stopped":
|
||
|
|
stopped_sec += step_sec
|
||
|
|
else:
|
||
|
|
unknown_sec += step_sec
|
||
|
|
|
||
|
|
# state machine
|
||
|
|
if pos_state == "unknown":
|
||
|
|
if in_trip:
|
||
|
|
cur.ended_at = prev_at
|
||
|
|
cur.end_reason = "long_gap"
|
||
|
|
trips.append(cur)
|
||
|
|
in_trip = False
|
||
|
|
cur = None
|
||
|
|
off_run_start = None
|
||
|
|
idle_run_start = None
|
||
|
|
|
||
|
|
elif not in_trip:
|
||
|
|
if pos_state in ("moving", "idling") or acc == 1:
|
||
|
|
cur = Trip(trip_id=len(trips) + 1, started_at=at, n_fixes=1)
|
||
|
|
in_trip = True
|
||
|
|
off_run_start = None
|
||
|
|
idle_run_start = at if pos_state == "idling" else None
|
||
|
|
|
||
|
|
else:
|
||
|
|
cur.n_fixes += 1
|
||
|
|
if pos_state == "stopped":
|
||
|
|
if off_run_start is None:
|
||
|
|
off_run_start = at
|
||
|
|
idle_run_start = None
|
||
|
|
if at - off_run_start >= STOP_THRESH:
|
||
|
|
cur.ended_at = off_run_start
|
||
|
|
cur.end_reason = "work_stop"
|
||
|
|
trips.append(cur)
|
||
|
|
in_trip = False
|
||
|
|
cur = None
|
||
|
|
off_run_start = None
|
||
|
|
elif pos_state == "idling":
|
||
|
|
off_run_start = None
|
||
|
|
if idle_run_start is None:
|
||
|
|
idle_run_start = at
|
||
|
|
else: # moving
|
||
|
|
off_run_start = None
|
||
|
|
if idle_run_start is not None:
|
||
|
|
idle_dur = at - idle_run_start
|
||
|
|
if idle_dur >= STOP_THRESH:
|
||
|
|
cur.idling_sec += idle_dur.total_seconds()
|
||
|
|
idle_run_start = None
|
||
|
|
|
||
|
|
prev_at = at
|
||
|
|
prev_lat = lat
|
||
|
|
prev_lng = lng
|
||
|
|
prev_state = pos_state
|
||
|
|
|
||
|
|
if in_trip and cur is not None:
|
||
|
|
cur.ended_at = prev_at
|
||
|
|
cur.end_reason = "day_end"
|
||
|
|
trips.append(cur)
|
||
|
|
|
||
|
|
return {
|
||
|
|
"reporting_time": reporting_time,
|
||
|
|
"trips": trips,
|
||
|
|
"total_distance_km": round(total_distance_m / 1000, 2),
|
||
|
|
"driving_min": round(driving_sec / 60, 1),
|
||
|
|
"idling_min": round(idling_sec / 60, 1),
|
||
|
|
"stopped_min": round(stopped_sec / 60, 1),
|
||
|
|
"unknown_min": round(unknown_sec / 60, 1),
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
def legacy_summary(rows: list[dict[str, Any]]) -> dict[str, Any]:
|
||
|
|
rows = sorted(rows, key=lambda r: r["gps_time_utc"])
|
||
|
|
by_trip: dict[str, list[dict[str, Any]]] = defaultdict(list)
|
||
|
|
for r in rows:
|
||
|
|
by_trip[r["trip_id"]].append(r)
|
||
|
|
trips = []
|
||
|
|
for tid, rs in by_trip.items():
|
||
|
|
if tid == "0":
|
||
|
|
continue
|
||
|
|
rs.sort(key=lambda r: r["gps_time_utc"])
|
||
|
|
t0 = parse_ts(rs[0]["gps_time_utc"])
|
||
|
|
t1 = parse_ts(rs[-1]["gps_time_utc"])
|
||
|
|
trips.append((tid, t0, t1, len(rs)))
|
||
|
|
trips.sort(key=lambda t: t[1])
|
||
|
|
return {"trips": trips, "n_stationary_rows": sum(1 for r in rows if r["trip_id"] == "0")}
|
||
|
|
|
||
|
|
|
||
|
|
def main(path: str) -> None:
|
||
|
|
rows = json.load(open(path))
|
||
|
|
print(f"\n=== {path} ===")
|
||
|
|
print(f"raw rows: {len(rows)}")
|
||
|
|
|
||
|
|
leg = legacy_summary(rows)
|
||
|
|
print(f"\n-- LEGACY --")
|
||
|
|
print(f"trips: {len(leg['trips'])} (plus {leg['n_stationary_rows']} stationary-bucket rows)")
|
||
|
|
for tid, t0, t1, n in leg["trips"]:
|
||
|
|
dur = (t1 - t0).total_seconds() / 60
|
||
|
|
print(f" trip {tid}: {t0:%H:%M:%S} → {t1:%H:%M:%S} ({dur:.0f} min, {n} fixes)")
|
||
|
|
|
||
|
|
sim = simulate(rows)
|
||
|
|
print(f"\n-- NEW ALGO (5min stop, 30min gap, 5 km/h stationary) --")
|
||
|
|
rep = sim["reporting_time"]
|
||
|
|
print(f"reporting_time: {rep.isoformat() if rep else 'none'}")
|
||
|
|
print(f"day totals: distance={sim['total_distance_km']} km, "
|
||
|
|
f"driving={sim['driving_min']} min, idling={sim['idling_min']} min, "
|
||
|
|
f"stopped={sim['stopped_min']} min, unknown={sim['unknown_min']} min")
|
||
|
|
print(f"trips: {len(sim['trips'])}")
|
||
|
|
for t in sim["trips"]:
|
||
|
|
dur = (t.ended_at - t.started_at).total_seconds() / 60
|
||
|
|
print(f" trip {t.trip_id}: {t.started_at:%H:%M:%S} → "
|
||
|
|
f"{t.ended_at:%H:%M:%S} "
|
||
|
|
f"({dur:.0f} min, {t.distance_m/1000:.2f} km, "
|
||
|
|
f"{t.idling_sec/60:.1f} idle, end={t.end_reason}, {t.n_fixes} fixes)")
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
for p in sys.argv[1:] or ["REPORTS/query_2-2026-638J.json", "REPORTS/query_2-2026-728K.json"]:
|
||
|
|
main(p)
|