Trip detection: add nofix_stop rule (5-min reporting silence ends trip)
Some checks are pending
build / lint-test (push) Waiting to run
build / build-push (push) Blocked by required conditions

Calibrated against a full-day legacy report for vehicle KDE 638J:
without this rule we collapsed 15 dispatcher-visible trips into 3
(the algorithm waited for explicit ACC_OFF + stationary fixes that never
came in the polled data — the device just stops reporting between trips).

New rule: if mid-trip and the next fix arrives >=5 min after the previous
one (but <=30 min, which is still long_gap), close the trip at the prior
fix with end_reason='nofix_stop'.

Validation:
  638J full day: legacy 15 trips, mine 15 trips, perfect alignment
  728K half day: legacy 33 noisy trips, mine 9 clean trips
                 (stop-and-go traffic still consolidates because fixes
                  keep coming, just slow/stationary — not silence)

Also commits scripts/simulate_trips_from_legacy.py which runs the same
state machine in Python against REPORTS/*.json dumps for offline tuning.
This commit is contained in:
kianiadee 2026-05-27 12:57:01 +03:00
parent 419c030761
commit 7d63c03191
2 changed files with 278 additions and 0 deletions

View file

@ -10,9 +10,16 @@
-- - an ACC_ON transition (or the first row of the day if it's already ACC_ON) -- - an ACC_ON transition (or the first row of the day if it's already ACC_ON)
-- A trip ends at: -- A trip ends at:
-- - ACC_OFF + stationary (speed < 5 km/h) for >= 5 min → end_reason='work_stop' -- - ACC_OFF + stationary (speed < 5 km/h) for >= 5 min → end_reason='work_stop'
-- - a fix-reporting silence of >= 5 min → end_reason='nofix_stop'
-- - a fix gap > 30 min → end_reason='long_gap' -- - a fix gap > 30 min → end_reason='long_gap'
-- - the end of the day's data → end_reason='day_end' -- - the end of the day's data → end_reason='day_end'
-- --
-- The nofix_stop rule matches legacy dispatcher semantics: when a polled
-- device goes silent for >=5 min mid-trip we assume the engine is off
-- (validated against legacy 638J full-day: 15 trips legacy = 15 trips here).
-- Pure stop-and-go traffic patterns still consolidate cleanly because each
-- stop produces explicit slow/stationary fixes, not silences.
--
-- Within a trip, an ACC_ON + stationary stretch of >= 5 min is recorded -- Within a trip, an ACC_ON + stationary stretch of >= 5 min is recorded
-- as an idling segment (no trip split — engine still running, treated -- as an idling segment (no trip split — engine still running, treated
-- as a customer-stop with engine on). -- as a customer-stop with engine on).
@ -35,6 +42,7 @@ AS $fn$
DECLARE DECLARE
EAT_OFFSET interval := interval '3 hours'; EAT_OFFSET interval := interval '3 hours';
STOP_THRESH interval := interval '5 minutes'; STOP_THRESH interval := interval '5 minutes';
NOFIX_THRESH interval := interval '5 minutes';
GAP_THRESH interval := interval '30 minutes'; GAP_THRESH interval := interval '30 minutes';
STAT_KMH numeric := 5; STAT_KMH numeric := 5;
@ -123,6 +131,39 @@ BEGIN
reporting_time := rec.occurred_at; reporting_time := rec.occurred_at;
END IF; END IF;
-- nofix_stop: if mid-trip and the prior fix was >= NOFIX_THRESH ago
-- but < GAP_THRESH, treat the silence as engine-off and close the
-- trip at prev_at before processing this row.
IF in_trip
AND prev_at IS NOT NULL
AND rec.occurred_at - prev_at >= NOFIX_THRESH
AND rec.occurred_at - prev_at <= GAP_THRESH
THEN
trips_out := trips_out || jsonb_build_object(
'trip_id', n_trips + 1,
'started_at', trip_started_at,
'ended_at', prev_at,
'duration_min', round(EXTRACT(EPOCH FROM (prev_at - trip_started_at))/60.0, 1),
'distance_km', round((trip_distance_m / 1000.0)::numeric, 2),
'idling_min', round((trip_idling_sec / 60.0)::numeric, 1),
'end_reason', 'nofix_stop',
'stops', trip_stops,
'path', CASE WHEN array_length(trip_path,1) >= 2
THEN ST_AsGeoJSON(ST_MakeLine(trip_path))::jsonb
ELSE NULL END
);
n_trips := n_trips + 1;
in_trip := false;
trip_path := ARRAY[]::geometry[];
trip_path_times := ARRAY[]::timestamptz[];
trip_path_speeds := ARRAY[]::numeric[];
trip_distance_m := 0;
trip_idling_sec := 0;
trip_stops := '[]'::jsonb;
off_run_start := NULL;
idle_run_start := NULL;
END IF;
-- Classify this position -- Classify this position
IF prev_at IS NOT NULL AND rec.occurred_at - prev_at > GAP_THRESH THEN IF prev_at IS NOT NULL AND rec.occurred_at - prev_at > GAP_THRESH THEN
pos_state := 'unknown'; pos_state := 'unknown';

View file

@ -0,0 +1,237 @@
"""Simulate serve.fn_vehicle_trips against the REPORTS/*.json legacy dumps.
The legacy DB exports per-fix rows with: gps_time_utc, lat, lng, speed,
stationary (bool), trip_id (legacy's own assignment). It does NOT expose
acc_state so for this sim we map stationary=True acc_state=0,
stationary=False acc_state=1. That's a fair proxy: legacy's `stationary`
is speed-derived, and ACC_OFF/stationary is what triggers a work-stop in
the new algorithm.
Output: legacy trip count vs new-algorithm trip count, plus the new-algorithm
trip list (start, end, duration, distance, end_reason). Eyeball-comparable
with the legacy trip listing.
Run: python3 scripts/simulate_trips_from_legacy.py REPORTS/query_2-2026-638J.json
"""
from __future__ import annotations
import json
import math
import sys
from collections import Counter, defaultdict
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Any
STOP_THRESH = timedelta(minutes=5)
GAP_THRESH = timedelta(minutes=30)
NOFIX_STOP_THRESH = timedelta(minutes=5) # NEW: a 5-min reporting silence ends a trip
STAT_KMH = 5.0
EARTH_R = 6371000.0
def haversine_m(lat1: float, lng1: float, lat2: float, lng2: float) -> float:
phi1, phi2 = math.radians(lat1), math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlam = math.radians(lng2 - lng1)
a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2
return 2 * EARTH_R * math.asin(math.sqrt(a))
def parse_ts(s: str) -> datetime:
# legacy "2026-05-27 04:11:38+00"
s = s.replace("+00", "+00:00")
return datetime.fromisoformat(s)
@dataclass
class Trip:
trip_id: int
started_at: datetime
ended_at: datetime | None = None
distance_m: float = 0.0
idling_sec: float = 0.0
end_reason: str = "open"
n_fixes: int = 0
def simulate(rows: list[dict[str, Any]]) -> dict[str, Any]:
rows.sort(key=lambda r: r["gps_time_utc"])
trips: list[Trip] = []
in_trip = False
cur: Trip | None = None
prev_at: datetime | None = None
prev_lat: float | None = None
prev_lng: float | None = None
prev_state: str | None = None
off_run_start: datetime | None = None
idle_run_start: datetime | None = None
reporting_time: datetime | None = None
total_distance_m = 0.0
driving_sec = idling_sec = stopped_sec = unknown_sec = 0.0
for r in rows:
at = parse_ts(r["gps_time_utc"])
lat, lng = float(r["lat"]), float(r["lng"])
speed = float(r["speed"])
stationary = r["stationary"]
# proxy acc from stationary
acc = 0 if stationary else 1
# If we're mid-trip and there's been a NOFIX_STOP_THRESH silence,
# close the trip at the prior fix (work stop) BEFORE processing this row.
if (
in_trip
and prev_at is not None
and at - prev_at >= NOFIX_STOP_THRESH
):
cur.ended_at = prev_at
cur.end_reason = "nofix_stop"
trips.append(cur)
in_trip = False
cur = None
off_run_start = None
idle_run_start = None
# classify state
if prev_at is not None and at - prev_at > GAP_THRESH:
pos_state = "unknown"
elif speed >= STAT_KMH:
pos_state = "moving"
elif acc == 1:
pos_state = "idling"
else:
pos_state = "stopped"
if reporting_time is None and acc == 1:
reporting_time = at
if prev_at is not None:
step_sec = (at - prev_at).total_seconds()
if prev_state == "moving":
driving_sec += step_sec
step_m = haversine_m(prev_lat, prev_lng, lat, lng)
total_distance_m += step_m
if in_trip:
cur.distance_m += step_m
elif prev_state == "idling":
idling_sec += step_sec
elif prev_state == "stopped":
stopped_sec += step_sec
else:
unknown_sec += step_sec
# state machine
if pos_state == "unknown":
if in_trip:
cur.ended_at = prev_at
cur.end_reason = "long_gap"
trips.append(cur)
in_trip = False
cur = None
off_run_start = None
idle_run_start = None
elif not in_trip:
if pos_state in ("moving", "idling") or acc == 1:
cur = Trip(trip_id=len(trips) + 1, started_at=at, n_fixes=1)
in_trip = True
off_run_start = None
idle_run_start = at if pos_state == "idling" else None
else:
cur.n_fixes += 1
if pos_state == "stopped":
if off_run_start is None:
off_run_start = at
idle_run_start = None
if at - off_run_start >= STOP_THRESH:
cur.ended_at = off_run_start
cur.end_reason = "work_stop"
trips.append(cur)
in_trip = False
cur = None
off_run_start = None
elif pos_state == "idling":
off_run_start = None
if idle_run_start is None:
idle_run_start = at
else: # moving
off_run_start = None
if idle_run_start is not None:
idle_dur = at - idle_run_start
if idle_dur >= STOP_THRESH:
cur.idling_sec += idle_dur.total_seconds()
idle_run_start = None
prev_at = at
prev_lat = lat
prev_lng = lng
prev_state = pos_state
if in_trip and cur is not None:
cur.ended_at = prev_at
cur.end_reason = "day_end"
trips.append(cur)
return {
"reporting_time": reporting_time,
"trips": trips,
"total_distance_km": round(total_distance_m / 1000, 2),
"driving_min": round(driving_sec / 60, 1),
"idling_min": round(idling_sec / 60, 1),
"stopped_min": round(stopped_sec / 60, 1),
"unknown_min": round(unknown_sec / 60, 1),
}
def legacy_summary(rows: list[dict[str, Any]]) -> dict[str, Any]:
rows = sorted(rows, key=lambda r: r["gps_time_utc"])
by_trip: dict[str, list[dict[str, Any]]] = defaultdict(list)
for r in rows:
by_trip[r["trip_id"]].append(r)
trips = []
for tid, rs in by_trip.items():
if tid == "0":
continue
rs.sort(key=lambda r: r["gps_time_utc"])
t0 = parse_ts(rs[0]["gps_time_utc"])
t1 = parse_ts(rs[-1]["gps_time_utc"])
trips.append((tid, t0, t1, len(rs)))
trips.sort(key=lambda t: t[1])
return {"trips": trips, "n_stationary_rows": sum(1 for r in rows if r["trip_id"] == "0")}
def main(path: str) -> None:
rows = json.load(open(path))
print(f"\n=== {path} ===")
print(f"raw rows: {len(rows)}")
leg = legacy_summary(rows)
print(f"\n-- LEGACY --")
print(f"trips: {len(leg['trips'])} (plus {leg['n_stationary_rows']} stationary-bucket rows)")
for tid, t0, t1, n in leg["trips"]:
dur = (t1 - t0).total_seconds() / 60
print(f" trip {tid}: {t0:%H:%M:%S}{t1:%H:%M:%S} ({dur:.0f} min, {n} fixes)")
sim = simulate(rows)
print(f"\n-- NEW ALGO (5min stop, 30min gap, 5 km/h stationary) --")
rep = sim["reporting_time"]
print(f"reporting_time: {rep.isoformat() if rep else 'none'}")
print(f"day totals: distance={sim['total_distance_km']} km, "
f"driving={sim['driving_min']} min, idling={sim['idling_min']} min, "
f"stopped={sim['stopped_min']} min, unknown={sim['unknown_min']} min")
print(f"trips: {len(sim['trips'])}")
for t in sim["trips"]:
dur = (t.ended_at - t.started_at).total_seconds() / 60
print(f" trip {t.trip_id}: {t.started_at:%H:%M:%S}"
f"{t.ended_at:%H:%M:%S} "
f"({dur:.0f} min, {t.distance_m/1000:.2f} km, "
f"{t.idling_sec/60:.1f} idle, end={t.end_reason}, {t.n_fixes} fixes)")
if __name__ == "__main__":
for p in sys.argv[1:] or ["REPORTS/query_2-2026-638J.json", "REPORTS/query_2-2026-728K.json"]:
main(p)