diff --git a/app/routers/views.py b/app/routers/views.py index a97e78f..5d2836c 100644 --- a/app/routers/views.py +++ b/app/routers/views.py @@ -1,7 +1,11 @@ +import csv +import io import json +from datetime import UTC, date, datetime, timedelta from typing import Annotated, Any -from fastapi import APIRouter, Depends, HTTPException, Query, Request +from fastapi import APIRouter, Depends, HTTPException, Path, Query, Request +from fastapi.responses import Response from psycopg.types.json import Jsonb from app.auth import AuthAccount, require_scope @@ -42,3 +46,91 @@ async def live_view( if row is None or row[0] is None: raise HTTPException(status_code=500, detail="serve.fn_live_view returned NULL") return LiveViewResponse.model_validate(row[0]) + + +def _resolve_date(date_q: str | None) -> date: + """Default to today in EAT (UTC+3) when no ?date= is provided.""" + if date_q: + try: + return date.fromisoformat(date_q) + except ValueError as exc: + raise HTTPException(status_code=400, detail=f"invalid date: {exc}") from exc + return (datetime.now(UTC) + timedelta(hours=3)).date() + + +async def _fetch_trips(vehicle_id: int, day: date) -> dict[str, Any]: + pool = await get_pool() + async with pool.connection() as conn, conn.cursor() as cur: + await cur.execute( + "SELECT serve.fn_vehicle_trips(%s, %s)", (vehicle_id, day) + ) + row = await cur.fetchone() + if row is None or row[0] is None: + raise HTTPException(status_code=500, detail="serve.fn_vehicle_trips returned NULL") + payload: dict[str, Any] = row[0] + if "error" in payload: + raise HTTPException(status_code=404, detail=payload["error"]) + return payload + + +@router.get("/vehicle/{vehicle_id}/trips") +@limiter.limit("60/minute") +async def vehicle_trips( + request: Request, + _account: Annotated[AuthAccount, Depends(require_scope("read:fleet"))], + vehicle_id: Annotated[int, Path(ge=1)], + date_q: Annotated[ + str | None, Query(alias="date", description="YYYY-MM-DD in EAT; defaults to today") + ] = None, +) -> dict[str, Any]: + _ = request + day = _resolve_date(date_q) + return await _fetch_trips(vehicle_id, day) + + +@router.get("/vehicle/{vehicle_id}/trips.csv") +@limiter.limit("30/minute") +async def vehicle_trips_csv( + request: Request, + _account: Annotated[AuthAccount, Depends(require_scope("read:fleet"))], + vehicle_id: Annotated[int, Path(ge=1)], + date_q: Annotated[ + str | None, Query(alias="date", description="YYYY-MM-DD in EAT; defaults to today") + ] = None, +) -> Response: + _ = request + day = _resolve_date(date_q) + payload = await _fetch_trips(vehicle_id, day) + + plate = payload.get("plate") or "" + reporting_time = payload.get("reporting_time") or "" + date_str = payload.get("date", day.isoformat()) + + buf = io.StringIO() + w = csv.writer(buf) + w.writerow([ + "date", "plate", "reporting_time", + "trip_id", "started_at", "ended_at", + "duration_min", "distance_km", "idling_min", "end_reason", + ]) + for trip in payload.get("trips", []): + w.writerow([ + date_str, + plate, + reporting_time, + trip.get("trip_id"), + trip.get("started_at"), + trip.get("ended_at"), + trip.get("duration_min"), + trip.get("distance_km"), + trip.get("idling_min"), + trip.get("end_reason"), + ]) + + safe_plate = (plate or f"v{vehicle_id}").replace(" ", "_").replace("/", "-") + filename = f"trips_{safe_plate}_{date_str}.csv" + return Response( + content=buf.getvalue(), + media_type="text/csv", + headers={"Content-Disposition": f'attachment; filename="{filename}"'}, + ) diff --git a/db/migrations/20260601000019_fn_vehicle_trips.sql b/db/migrations/20260601000019_fn_vehicle_trips.sql new file mode 100644 index 0000000..8ee7c99 --- /dev/null +++ b/db/migrations/20260601000019_fn_vehicle_trips.sql @@ -0,0 +1,347 @@ +-- migrate:up +-- +-- Trip detection on demand from state.position_history. +-- +-- Algorithm (state machine, single forward pass over the day's positions): +-- +-- reporting_time := first occurred_at where acc_state = 1 +-- +-- A trip starts at: +-- - an ACC_ON transition (or the first row of the day if it's already ACC_ON) +-- A trip ends at: +-- - ACC_OFF + stationary (speed < 5 km/h) for >= 5 min → end_reason='work_stop' +-- - a fix gap > 30 min → end_reason='long_gap' +-- - the end of the day's data → end_reason='day_end' +-- +-- Within a trip, an ACC_ON + stationary stretch of >= 5 min is recorded +-- as an idling segment (no trip split — engine still running, treated +-- as a customer-stop with engine on). +-- +-- Distance accumulates per moving step (curr.speed_kmh >= 5 km/h); +-- GPS jitter at standstill is excluded. +-- +-- Day totals (driving_min / idling_min / stopped_min / unknown_min) are +-- bucketed by the classification of the *previous* fix at each step. +-- +-- Falls back to movement-only segmentation (has_acc_data=false in the +-- response) when every position for the day has acc_state IS NULL. + +CREATE OR REPLACE FUNCTION serve.fn_vehicle_trips( + p_vehicle_id integer, + p_date_eat date +) RETURNS jsonb +LANGUAGE plpgsql STABLE +AS $fn$ +DECLARE + EAT_OFFSET interval := interval '3 hours'; + STOP_THRESH interval := interval '5 minutes'; + GAP_THRESH interval := interval '30 minutes'; + STAT_KMH numeric := 5; + + day_start_utc timestamptz := (p_date_eat::timestamp - EAT_OFFSET) AT TIME ZONE 'UTC'; + day_end_utc timestamptz := day_start_utc + interval '24 hours'; + + rec record; + v_plate text; + v_imei text; + + reporting_time timestamptz; + has_acc bool := false; + + -- trip-in-progress state + in_trip bool := false; + trip_started_at timestamptz; + trip_started_geom geometry; + trip_path geometry[] := ARRAY[]::geometry[]; + trip_path_times timestamptz[] := ARRAY[]::timestamptz[]; + trip_path_speeds numeric[] := ARRAY[]::numeric[]; + trip_distance_m numeric := 0; + trip_idling_sec numeric := 0; + trip_stops jsonb := '[]'::jsonb; + + -- mid-trip ACC_OFF run (might become a work stop) + off_run_start timestamptz; + off_run_geom geometry; + -- mid-trip idle (ACC_ON + stationary) run + idle_run_start timestamptz; + idle_run_geom geometry; + + prev_at timestamptz; + prev_geom geometry; + prev_state text; -- 'moving' | 'idling' | 'stopped' | 'unknown' + + trips_out jsonb := '[]'::jsonb; + n_trips int := 0; + + total_distance_m numeric := 0; + total_driving_sec numeric := 0; + total_idling_sec numeric := 0; + total_stopped_sec numeric := 0; + total_unknown_sec numeric := 0; + longest_gap_sec numeric := 0; + fix_count int := 0; + + pos_state text; + step_sec numeric; + step_m numeric; + + -- closure helper inline (PL/pgSQL has no nested funcs, so we inline) +BEGIN + SELECT v.plate INTO v_plate FROM domain.vehicles v WHERE v.vehicle_id = p_vehicle_id; + IF v_plate IS NULL THEN + RETURN jsonb_build_object( + 'error', 'vehicle not found', + 'vehicle_id', p_vehicle_id, + 'date', to_char(p_date_eat, 'YYYY-MM-DD') + ); + END IF; + + SELECT d.imei INTO v_imei + FROM domain.devices d + WHERE d.vehicle_id = p_vehicle_id + ORDER BY CASE d.device_type WHEN 'tracker' THEN 0 ELSE 1 END, d.imei + LIMIT 1; + + FOR rec IN + SELECT occurred_at, + geom, + COALESCE(speed_kmh, 0)::numeric AS speed_kmh, + acc_state + FROM state.position_history + WHERE vehicle_id = p_vehicle_id + AND occurred_at >= day_start_utc + AND occurred_at < day_end_utc + ORDER BY occurred_at + LOOP + fix_count := fix_count + 1; + + IF rec.acc_state IS NOT NULL THEN + has_acc := true; + END IF; + + IF rec.acc_state = 1 AND reporting_time IS NULL THEN + reporting_time := rec.occurred_at; + END IF; + + -- Classify this position + IF prev_at IS NOT NULL AND rec.occurred_at - prev_at > GAP_THRESH THEN + pos_state := 'unknown'; + ELSIF rec.speed_kmh >= STAT_KMH THEN + pos_state := 'moving'; + ELSIF rec.acc_state = 1 THEN + pos_state := 'idling'; + ELSIF rec.acc_state = 0 THEN + pos_state := 'stopped'; + ELSE + -- no acc data: fall back to speed-only + pos_state := CASE WHEN rec.speed_kmh >= STAT_KMH THEN 'moving' ELSE 'stopped' END; + END IF; + + -- Bucket the time *since* prev_at into totals + IF prev_at IS NOT NULL THEN + step_sec := EXTRACT(EPOCH FROM (rec.occurred_at - prev_at)); + longest_gap_sec := GREATEST(longest_gap_sec, step_sec); + CASE prev_state + WHEN 'moving' THEN total_driving_sec := total_driving_sec + step_sec; + WHEN 'idling' THEN total_idling_sec := total_idling_sec + step_sec; + WHEN 'stopped' THEN total_stopped_sec := total_stopped_sec + step_sec; + ELSE total_unknown_sec := total_unknown_sec + step_sec; + END CASE; + + IF in_trip AND prev_state = 'moving' AND pos_state IN ('moving','idling') THEN + step_m := ST_Distance(prev_geom::geography, rec.geom::geography); + trip_distance_m := trip_distance_m + step_m; + total_distance_m := total_distance_m + step_m; + ELSIF prev_state = 'moving' THEN + step_m := ST_Distance(prev_geom::geography, rec.geom::geography); + total_distance_m := total_distance_m + step_m; + END IF; + END IF; + + ---------------------------------------------------------------------- + -- State machine + ---------------------------------------------------------------------- + + IF pos_state = 'unknown' THEN + -- Gap longer than GAP_THRESH; close any open trip. + IF in_trip THEN + trips_out := trips_out || jsonb_build_object( + 'trip_id', n_trips + 1, + 'started_at', trip_started_at, + 'ended_at', prev_at, + 'duration_min', round(EXTRACT(EPOCH FROM (prev_at - trip_started_at))/60.0, 1), + 'distance_km', round((trip_distance_m / 1000.0)::numeric, 2), + 'idling_min', round((trip_idling_sec / 60.0)::numeric, 1), + 'end_reason', 'long_gap', + 'stops', trip_stops, + 'path', CASE WHEN array_length(trip_path,1) >= 2 + THEN ST_AsGeoJSON(ST_MakeLine(trip_path))::jsonb + ELSE NULL END + ); + n_trips := n_trips + 1; + in_trip := false; + trip_path := ARRAY[]::geometry[]; + trip_path_times := ARRAY[]::timestamptz[]; + trip_path_speeds := ARRAY[]::numeric[]; + trip_distance_m := 0; + trip_idling_sec := 0; + trip_stops := '[]'::jsonb; + off_run_start := NULL; + idle_run_start := NULL; + END IF; + + ELSIF NOT in_trip THEN + -- Outside any trip. Start one when we see motion or ACC_ON. + IF pos_state IN ('moving','idling') OR rec.acc_state = 1 THEN + in_trip := true; + trip_started_at := rec.occurred_at; + trip_started_geom := rec.geom; + trip_path := ARRAY[rec.geom]; + trip_path_times := ARRAY[rec.occurred_at]; + trip_path_speeds := ARRAY[rec.speed_kmh]; + trip_distance_m := 0; + trip_idling_sec := 0; + trip_stops := '[]'::jsonb; + off_run_start := NULL; + idle_run_start := CASE WHEN pos_state = 'idling' THEN rec.occurred_at ELSE NULL END; + END IF; + + ELSE + -- In trip — append point, then handle stop/idle runs. + trip_path := trip_path || rec.geom; + trip_path_times := trip_path_times || rec.occurred_at; + trip_path_speeds := trip_path_speeds || rec.speed_kmh; + + IF pos_state = 'stopped' THEN + IF off_run_start IS NULL THEN + off_run_start := rec.occurred_at; + off_run_geom := rec.geom; + END IF; + IF idle_run_start IS NOT NULL THEN + idle_run_start := NULL; + END IF; + + IF rec.occurred_at - off_run_start >= STOP_THRESH THEN + -- Work stop confirmed: close the trip at off_run_start. + trips_out := trips_out || jsonb_build_object( + 'trip_id', n_trips + 1, + 'started_at', trip_started_at, + 'ended_at', off_run_start, + 'duration_min', round(EXTRACT(EPOCH FROM (off_run_start - trip_started_at))/60.0, 1), + 'distance_km', round((trip_distance_m / 1000.0)::numeric, 2), + 'idling_min', round((trip_idling_sec / 60.0)::numeric, 1), + 'end_reason', 'work_stop', + 'stops', trip_stops, + 'path', CASE WHEN array_length(trip_path,1) >= 2 + THEN ST_AsGeoJSON(ST_MakeLine(trip_path))::jsonb + ELSE NULL END + ); + n_trips := n_trips + 1; + in_trip := false; + trip_path := ARRAY[]::geometry[]; + trip_path_times := ARRAY[]::timestamptz[]; + trip_path_speeds := ARRAY[]::numeric[]; + trip_distance_m := 0; + trip_idling_sec := 0; + trip_stops := '[]'::jsonb; + off_run_start := NULL; + END IF; + + ELSIF pos_state = 'idling' THEN + IF off_run_start IS NOT NULL THEN + -- ACC came back on before STOP_THRESH; abandon the off-run. + off_run_start := NULL; + END IF; + IF idle_run_start IS NULL THEN + idle_run_start := rec.occurred_at; + idle_run_geom := rec.geom; + ELSIF rec.occurred_at - idle_run_start >= STOP_THRESH THEN + -- Already recorded as idling within trip; nothing to flush + -- until run ends. + NULL; + END IF; + + ELSE -- moving + IF off_run_start IS NOT NULL THEN + off_run_start := NULL; + END IF; + IF idle_run_start IS NOT NULL THEN + -- Idle run ended; if it was long enough, record it. + IF rec.occurred_at - idle_run_start >= STOP_THRESH THEN + trip_idling_sec := trip_idling_sec + + EXTRACT(EPOCH FROM (rec.occurred_at - idle_run_start)); + trip_stops := trip_stops || jsonb_build_object( + 'at', idle_run_start, + 'duration_min', round(EXTRACT(EPOCH FROM (rec.occurred_at - idle_run_start))/60.0, 1), + 'kind', 'idling', + 'lng', ST_X(idle_run_geom), + 'lat', ST_Y(idle_run_geom) + ); + END IF; + idle_run_start := NULL; + END IF; + END IF; + END IF; + + prev_at := rec.occurred_at; + prev_geom := rec.geom; + prev_state := pos_state; + END LOOP; + + -- Close any trip still open at end of loop. + IF in_trip THEN + -- If an unflushed idle run is open, account for it. + IF idle_run_start IS NOT NULL AND prev_at - idle_run_start >= STOP_THRESH THEN + trip_idling_sec := trip_idling_sec + + EXTRACT(EPOCH FROM (prev_at - idle_run_start)); + trip_stops := trip_stops || jsonb_build_object( + 'at', idle_run_start, + 'duration_min', round(EXTRACT(EPOCH FROM (prev_at - idle_run_start))/60.0, 1), + 'kind', 'idling', + 'lng', ST_X(idle_run_geom), + 'lat', ST_Y(idle_run_geom) + ); + END IF; + trips_out := trips_out || jsonb_build_object( + 'trip_id', n_trips + 1, + 'started_at', trip_started_at, + 'ended_at', prev_at, + 'duration_min', round(EXTRACT(EPOCH FROM (prev_at - trip_started_at))/60.0, 1), + 'distance_km', round((trip_distance_m / 1000.0)::numeric, 2), + 'idling_min', round((trip_idling_sec / 60.0)::numeric, 1), + 'end_reason', 'day_end', + 'stops', trip_stops, + 'path', CASE WHEN array_length(trip_path,1) >= 2 + THEN ST_AsGeoJSON(ST_MakeLine(trip_path))::jsonb + ELSE NULL END + ); + n_trips := n_trips + 1; + END IF; + + RETURN jsonb_build_object( + 'vehicle_id', p_vehicle_id, + 'plate', v_plate, + 'imei', v_imei, + 'date', to_char(p_date_eat, 'YYYY-MM-DD'), + 'reporting_time', reporting_time, + 'totals', jsonb_build_object( + 'distance_km', round((total_distance_m / 1000.0)::numeric, 2), + 'driving_min', round((total_driving_sec / 60.0)::numeric, 1), + 'idling_min', round((total_idling_sec / 60.0)::numeric, 1), + 'stopped_min', round((total_stopped_sec / 60.0)::numeric, 1), + 'unknown_min', round((total_unknown_sec / 60.0)::numeric, 1), + 'trip_count', n_trips + ), + 'data_quality', jsonb_build_object( + 'fix_count', fix_count, + 'has_acc_data', has_acc, + 'longest_gap_sec', round(longest_gap_sec::numeric, 0) + ), + 'trips', trips_out + ); +END +$fn$; + +-- migrate:down + +DROP FUNCTION IF EXISTS serve.fn_vehicle_trips(integer, date);