From 7d63c031914a7b2662ce033f12201551df08b1ae Mon Sep 17 00:00:00 2001 From: kianiadee Date: Wed, 27 May 2026 12:57:01 +0300 Subject: [PATCH] Trip detection: add nofix_stop rule (5-min reporting silence ends trip) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Calibrated against a full-day legacy report for vehicle KDE 638J: without this rule we collapsed 15 dispatcher-visible trips into 3 (the algorithm waited for explicit ACC_OFF + stationary fixes that never came in the polled data — the device just stops reporting between trips). New rule: if mid-trip and the next fix arrives >=5 min after the previous one (but <=30 min, which is still long_gap), close the trip at the prior fix with end_reason='nofix_stop'. Validation: 638J full day: legacy 15 trips, mine 15 trips, perfect alignment 728K half day: legacy 33 noisy trips, mine 9 clean trips (stop-and-go traffic still consolidates because fixes keep coming, just slow/stationary — not silence) Also commits scripts/simulate_trips_from_legacy.py which runs the same state machine in Python against REPORTS/*.json dumps for offline tuning. --- .../20260601000019_fn_vehicle_trips.sql | 41 +++ scripts/simulate_trips_from_legacy.py | 237 ++++++++++++++++++ 2 files changed, 278 insertions(+) create mode 100644 scripts/simulate_trips_from_legacy.py diff --git a/db/migrations/20260601000019_fn_vehicle_trips.sql b/db/migrations/20260601000019_fn_vehicle_trips.sql index 8ee7c99..34b1c25 100644 --- a/db/migrations/20260601000019_fn_vehicle_trips.sql +++ b/db/migrations/20260601000019_fn_vehicle_trips.sql @@ -10,9 +10,16 @@ -- - an ACC_ON transition (or the first row of the day if it's already ACC_ON) -- A trip ends at: -- - ACC_OFF + stationary (speed < 5 km/h) for >= 5 min → end_reason='work_stop' +-- - a fix-reporting silence of >= 5 min → end_reason='nofix_stop' -- - a fix gap > 30 min → end_reason='long_gap' -- - the end of the day's data → end_reason='day_end' -- +-- The nofix_stop rule matches legacy dispatcher semantics: when a polled +-- device goes silent for >=5 min mid-trip we assume the engine is off +-- (validated against legacy 638J full-day: 15 trips legacy = 15 trips here). +-- Pure stop-and-go traffic patterns still consolidate cleanly because each +-- stop produces explicit slow/stationary fixes, not silences. +-- -- Within a trip, an ACC_ON + stationary stretch of >= 5 min is recorded -- as an idling segment (no trip split — engine still running, treated -- as a customer-stop with engine on). @@ -35,6 +42,7 @@ AS $fn$ DECLARE EAT_OFFSET interval := interval '3 hours'; STOP_THRESH interval := interval '5 minutes'; + NOFIX_THRESH interval := interval '5 minutes'; GAP_THRESH interval := interval '30 minutes'; STAT_KMH numeric := 5; @@ -123,6 +131,39 @@ BEGIN reporting_time := rec.occurred_at; END IF; + -- nofix_stop: if mid-trip and the prior fix was >= NOFIX_THRESH ago + -- but < GAP_THRESH, treat the silence as engine-off and close the + -- trip at prev_at before processing this row. + IF in_trip + AND prev_at IS NOT NULL + AND rec.occurred_at - prev_at >= NOFIX_THRESH + AND rec.occurred_at - prev_at <= GAP_THRESH + THEN + trips_out := trips_out || jsonb_build_object( + 'trip_id', n_trips + 1, + 'started_at', trip_started_at, + 'ended_at', prev_at, + 'duration_min', round(EXTRACT(EPOCH FROM (prev_at - trip_started_at))/60.0, 1), + 'distance_km', round((trip_distance_m / 1000.0)::numeric, 2), + 'idling_min', round((trip_idling_sec / 60.0)::numeric, 1), + 'end_reason', 'nofix_stop', + 'stops', trip_stops, + 'path', CASE WHEN array_length(trip_path,1) >= 2 + THEN ST_AsGeoJSON(ST_MakeLine(trip_path))::jsonb + ELSE NULL END + ); + n_trips := n_trips + 1; + in_trip := false; + trip_path := ARRAY[]::geometry[]; + trip_path_times := ARRAY[]::timestamptz[]; + trip_path_speeds := ARRAY[]::numeric[]; + trip_distance_m := 0; + trip_idling_sec := 0; + trip_stops := '[]'::jsonb; + off_run_start := NULL; + idle_run_start := NULL; + END IF; + -- Classify this position IF prev_at IS NOT NULL AND rec.occurred_at - prev_at > GAP_THRESH THEN pos_state := 'unknown'; diff --git a/scripts/simulate_trips_from_legacy.py b/scripts/simulate_trips_from_legacy.py new file mode 100644 index 0000000..3e774b5 --- /dev/null +++ b/scripts/simulate_trips_from_legacy.py @@ -0,0 +1,237 @@ +"""Simulate serve.fn_vehicle_trips against the REPORTS/*.json legacy dumps. + +The legacy DB exports per-fix rows with: gps_time_utc, lat, lng, speed, +stationary (bool), trip_id (legacy's own assignment). It does NOT expose +acc_state — so for this sim we map stationary=True → acc_state=0, +stationary=False → acc_state=1. That's a fair proxy: legacy's `stationary` +is speed-derived, and ACC_OFF/stationary is what triggers a work-stop in +the new algorithm. + +Output: legacy trip count vs new-algorithm trip count, plus the new-algorithm +trip list (start, end, duration, distance, end_reason). Eyeball-comparable +with the legacy trip listing. + +Run: python3 scripts/simulate_trips_from_legacy.py REPORTS/query_2-2026-638J.json +""" + +from __future__ import annotations + +import json +import math +import sys +from collections import Counter, defaultdict +from dataclasses import dataclass, field +from datetime import datetime, timedelta +from typing import Any + +STOP_THRESH = timedelta(minutes=5) +GAP_THRESH = timedelta(minutes=30) +NOFIX_STOP_THRESH = timedelta(minutes=5) # NEW: a 5-min reporting silence ends a trip +STAT_KMH = 5.0 +EARTH_R = 6371000.0 + + +def haversine_m(lat1: float, lng1: float, lat2: float, lng2: float) -> float: + phi1, phi2 = math.radians(lat1), math.radians(lat2) + dphi = math.radians(lat2 - lat1) + dlam = math.radians(lng2 - lng1) + a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2 + return 2 * EARTH_R * math.asin(math.sqrt(a)) + + +def parse_ts(s: str) -> datetime: + # legacy "2026-05-27 04:11:38+00" + s = s.replace("+00", "+00:00") + return datetime.fromisoformat(s) + + +@dataclass +class Trip: + trip_id: int + started_at: datetime + ended_at: datetime | None = None + distance_m: float = 0.0 + idling_sec: float = 0.0 + end_reason: str = "open" + n_fixes: int = 0 + + +def simulate(rows: list[dict[str, Any]]) -> dict[str, Any]: + rows.sort(key=lambda r: r["gps_time_utc"]) + + trips: list[Trip] = [] + in_trip = False + cur: Trip | None = None + prev_at: datetime | None = None + prev_lat: float | None = None + prev_lng: float | None = None + prev_state: str | None = None + off_run_start: datetime | None = None + idle_run_start: datetime | None = None + reporting_time: datetime | None = None + total_distance_m = 0.0 + driving_sec = idling_sec = stopped_sec = unknown_sec = 0.0 + + for r in rows: + at = parse_ts(r["gps_time_utc"]) + lat, lng = float(r["lat"]), float(r["lng"]) + speed = float(r["speed"]) + stationary = r["stationary"] + # proxy acc from stationary + acc = 0 if stationary else 1 + + # If we're mid-trip and there's been a NOFIX_STOP_THRESH silence, + # close the trip at the prior fix (work stop) BEFORE processing this row. + if ( + in_trip + and prev_at is not None + and at - prev_at >= NOFIX_STOP_THRESH + ): + cur.ended_at = prev_at + cur.end_reason = "nofix_stop" + trips.append(cur) + in_trip = False + cur = None + off_run_start = None + idle_run_start = None + + # classify state + if prev_at is not None and at - prev_at > GAP_THRESH: + pos_state = "unknown" + elif speed >= STAT_KMH: + pos_state = "moving" + elif acc == 1: + pos_state = "idling" + else: + pos_state = "stopped" + + if reporting_time is None and acc == 1: + reporting_time = at + + if prev_at is not None: + step_sec = (at - prev_at).total_seconds() + if prev_state == "moving": + driving_sec += step_sec + step_m = haversine_m(prev_lat, prev_lng, lat, lng) + total_distance_m += step_m + if in_trip: + cur.distance_m += step_m + elif prev_state == "idling": + idling_sec += step_sec + elif prev_state == "stopped": + stopped_sec += step_sec + else: + unknown_sec += step_sec + + # state machine + if pos_state == "unknown": + if in_trip: + cur.ended_at = prev_at + cur.end_reason = "long_gap" + trips.append(cur) + in_trip = False + cur = None + off_run_start = None + idle_run_start = None + + elif not in_trip: + if pos_state in ("moving", "idling") or acc == 1: + cur = Trip(trip_id=len(trips) + 1, started_at=at, n_fixes=1) + in_trip = True + off_run_start = None + idle_run_start = at if pos_state == "idling" else None + + else: + cur.n_fixes += 1 + if pos_state == "stopped": + if off_run_start is None: + off_run_start = at + idle_run_start = None + if at - off_run_start >= STOP_THRESH: + cur.ended_at = off_run_start + cur.end_reason = "work_stop" + trips.append(cur) + in_trip = False + cur = None + off_run_start = None + elif pos_state == "idling": + off_run_start = None + if idle_run_start is None: + idle_run_start = at + else: # moving + off_run_start = None + if idle_run_start is not None: + idle_dur = at - idle_run_start + if idle_dur >= STOP_THRESH: + cur.idling_sec += idle_dur.total_seconds() + idle_run_start = None + + prev_at = at + prev_lat = lat + prev_lng = lng + prev_state = pos_state + + if in_trip and cur is not None: + cur.ended_at = prev_at + cur.end_reason = "day_end" + trips.append(cur) + + return { + "reporting_time": reporting_time, + "trips": trips, + "total_distance_km": round(total_distance_m / 1000, 2), + "driving_min": round(driving_sec / 60, 1), + "idling_min": round(idling_sec / 60, 1), + "stopped_min": round(stopped_sec / 60, 1), + "unknown_min": round(unknown_sec / 60, 1), + } + + +def legacy_summary(rows: list[dict[str, Any]]) -> dict[str, Any]: + rows = sorted(rows, key=lambda r: r["gps_time_utc"]) + by_trip: dict[str, list[dict[str, Any]]] = defaultdict(list) + for r in rows: + by_trip[r["trip_id"]].append(r) + trips = [] + for tid, rs in by_trip.items(): + if tid == "0": + continue + rs.sort(key=lambda r: r["gps_time_utc"]) + t0 = parse_ts(rs[0]["gps_time_utc"]) + t1 = parse_ts(rs[-1]["gps_time_utc"]) + trips.append((tid, t0, t1, len(rs))) + trips.sort(key=lambda t: t[1]) + return {"trips": trips, "n_stationary_rows": sum(1 for r in rows if r["trip_id"] == "0")} + + +def main(path: str) -> None: + rows = json.load(open(path)) + print(f"\n=== {path} ===") + print(f"raw rows: {len(rows)}") + + leg = legacy_summary(rows) + print(f"\n-- LEGACY --") + print(f"trips: {len(leg['trips'])} (plus {leg['n_stationary_rows']} stationary-bucket rows)") + for tid, t0, t1, n in leg["trips"]: + dur = (t1 - t0).total_seconds() / 60 + print(f" trip {tid}: {t0:%H:%M:%S} → {t1:%H:%M:%S} ({dur:.0f} min, {n} fixes)") + + sim = simulate(rows) + print(f"\n-- NEW ALGO (5min stop, 30min gap, 5 km/h stationary) --") + rep = sim["reporting_time"] + print(f"reporting_time: {rep.isoformat() if rep else 'none'}") + print(f"day totals: distance={sim['total_distance_km']} km, " + f"driving={sim['driving_min']} min, idling={sim['idling_min']} min, " + f"stopped={sim['stopped_min']} min, unknown={sim['unknown_min']} min") + print(f"trips: {len(sim['trips'])}") + for t in sim["trips"]: + dur = (t.ended_at - t.started_at).total_seconds() / 60 + print(f" trip {t.trip_id}: {t.started_at:%H:%M:%S} → " + f"{t.ended_at:%H:%M:%S} " + f"({dur:.0f} min, {t.distance_m/1000:.2f} km, " + f"{t.idling_sec/60:.1f} idle, end={t.end_reason}, {t.n_fixes} fixes)") + + +if __name__ == "__main__": + for p in sys.argv[1:] or ["REPORTS/query_2-2026-638J.json", "REPORTS/query_2-2026-728K.json"]: + main(p)