diff --git a/import_drivers_csv.py b/import_drivers_csv.py index ab209bb..a6ad44c 100644 --- a/import_drivers_csv.py +++ b/import_drivers_csv.py @@ -48,12 +48,27 @@ _DRIVER_SKIP = {"identification", "ug"} def _infer_city(plate: str) -> str | None: - """Derive assigned_city from license plate prefix.""" + """Derive assigned_city from license plate prefix. + + [BUG-08] Kenyan plates (K-series) span both Nairobi and Mombasa, and + the prefix alone is not a reliable indicator — KC-series tends to be + Coast, KD-series tends to be Nairobi, but there are exceptions in both + directions. Rather than misclassify Coast vehicles as Nairobi (the + previous behaviour), return None for any Kenyan plate so they fall + through to `assigned_city IS NULL`. Analytics views already + COALESCE(...) those into the `unassigned` bucket; operators can tag + Mombasa/Nairobi explicitly via the DB or a future onboarding signal + (e.g. SIM MCC). + + Uganda (UMA / UAG) remains unambiguous → KLA. + """ p = (plate or "").strip().upper() if p.startswith("UMA") or p.startswith("UAG"): return "KLA" if p.startswith("K"): - return "NBO" + log.warning("Plate %s: Kenyan prefix is ambiguous (NBO vs MBA) — " + "leaving assigned_city NULL for manual tagging", p) + return None return None diff --git a/ingest_movement_rev.py b/ingest_movement_rev.py index 90651d5..08b5791 100644 --- a/ingest_movement_rev.py +++ b/ingest_movement_rev.py @@ -198,7 +198,10 @@ def poll_live_positions(): positions_by_imei[imei] = p positions = list(positions_by_imei.values()) - upserted, inserted = 0, 0 + # [BUG-10] Variable names match the ingestion_log column they feed: + # live_pos_affected → rows_upserted (DO UPDATE: rowcount=1 per touch) + # history_inserted → rows_inserted (DO NOTHING: rowcount=1 only on real insert) + live_pos_affected, history_inserted = 0, 0 with get_conn() as conn: with conn.cursor() as cur: @@ -240,7 +243,7 @@ def poll_live_positions(): clean_num(p.get("batteryPowerVal")), clean(p.get("trackerOil")), clean_num(p.get("temperature")), current_mileage, clean(p.get("status")), clean(p.get("locDesc")) )) - upserted += cur.rowcount + live_pos_affected += cur.rowcount # History (Hypertable Source) if gps_time: @@ -249,13 +252,15 @@ def poll_live_positions(): VALUES (%s, %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326), %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (imei, gps_time) DO NOTHING """, (imei, gps_time, lng, lat, lat, lng, speed, direction, acc_status, gps_num, current_mileage)) - inserted += cur.rowcount + history_inserted += cur.rowcount cur.execute("RELEASE SAVEPOINT sp") except Exception: cur.execute("ROLLBACK TO SAVEPOINT sp") log.warning("Failed to process live position for %s", p.get("imei"), exc_info=True) - log_ingestion(cur, "jimi.user.device.location.list", len(positions), upserted, inserted, int((time.time()-t0)*1000), True) + log_ingestion(cur, "jimi.user.device.location.list", len(positions), + live_pos_affected, history_inserted, + int((time.time()-t0)*1000), True) # ── 3. Trip Reports (Every 15m) ─────────────────────────────────────────────── @@ -266,7 +271,10 @@ def poll_trips(): end_ts = datetime.now(timezone.utc) start_ts = end_ts - timedelta(hours=1) - inserted = 0 + # [BUG-10] trips.cur.rowcount counts both real inserts and DO UPDATE + # touches — i.e. affected rows from an upsert. Route to rows_upserted + # in ingestion_log, not rows_inserted. + trips_affected = 0 with get_conn() as conn: with conn.cursor() as cur: @@ -304,14 +312,15 @@ def poll_trips(): clean_num(t.get("maxSpeed")), clean_int(t.get("runTimeSecond")) )) cur.execute("RELEASE SAVEPOINT sp") - inserted += cur.rowcount + trips_affected += cur.rowcount except Exception: cur.execute("ROLLBACK TO SAVEPOINT sp") log.warning("Failed to process trip for %s", t.get("imei"), exc_info=True) - log_ingestion(cur, "jimi.device.track.mileage", len(imeis), 0, inserted, + log_ingestion(cur, "jimi.device.track.mileage", len(imeis), + trips_affected, 0, int((time.time() - t0) * 1000), True) - log.info("Trips: %d records processed.", inserted) + log.info("Trips: %d records affected.", trips_affected) # ── 4. Parking Events (Every 15m) ───────────────────────────────────────────── diff --git a/tests/unit/test_clean_helpers.py b/tests/unit/test_clean_helpers.py index 811f568..5e7abb9 100644 --- a/tests/unit/test_clean_helpers.py +++ b/tests/unit/test_clean_helpers.py @@ -66,8 +66,18 @@ class TestCleanInt: def test_integer_string(self): assert clean_int("42") == 42 - def test_float_string_truncates(self): - assert clean_int("3.9") == 3 + def test_float_string_rounds_up(self): + # [BUG-07] Was truncating ("3.9" → 3); now rounds to nearest. + assert clean_int("3.9") == 4 + + def test_float_string_rounds_down(self): + assert clean_int("3.1") == 3 + + def test_half_rounds_to_even(self): + # Python 3 uses banker's rounding for round(); harmless for ingestion + # data where half-values don't appear in practice. + assert clean_int("0.5") == 0 + assert clean_int("1.5") == 2 def test_non_numeric_returns_none(self): assert clean_int("abc") is None @@ -98,6 +108,12 @@ class TestCleanTs: # BCD format YYMMDDHHmmss is NOT handled by clean_ts (only by _parse_trip_ts) assert clean_ts("220415103000") is None + def test_date_only_anchored_to_nairobi_midnight(self): + # [BUG-09] Previously returned "2024-04-12" verbatim → Postgres + # interpreted as 00:00 UTC = 03:00 EAT. Now returns a tz-aware + # ISO string anchored to Africa/Nairobi midnight. + assert clean_ts("2024-04-12") == "2024-04-12T00:00:00+03:00" + class TestIsValidFix: def test_zero_island_filtered(self): diff --git a/ts_shared_rev.py b/ts_shared_rev.py index e782faf..6616287 100644 --- a/ts_shared_rev.py +++ b/ts_shared_rev.py @@ -22,8 +22,10 @@ from __future__ import annotations import hashlib import logging import os +import re import signal import sys +import threading import time from contextlib import contextmanager from datetime import datetime, timezone, timedelta @@ -82,16 +84,24 @@ _log = get_logger("shared") # ── Connection Pool (psycopg2) ─────────────────────────────────────────────── _pool: Optional[psycopg2.pool.ThreadedConnectionPool] = None +# [BUG-06] Guard cold-start creation. Without the lock, two threads racing +# through _get_pool() while _pool is None could each instantiate a pool — the +# second assignment overwrites the first and leaks its checked-out connections. +_pool_lock = threading.Lock() def _get_pool() -> psycopg2.pool.ThreadedConnectionPool: global _pool - if _pool is None or _pool.closed: - _pool = psycopg2.pool.ThreadedConnectionPool( - _POOL_MIN, _POOL_MAX, DATABASE_URL, - options="-c client_encoding=UTF8", - ) - _log.info("DB Pool initialized (min=%d, max=%d)", _POOL_MIN, _POOL_MAX) - return _pool + if _pool is not None and not _pool.closed: + return _pool + with _pool_lock: + # Double-check inside the lock — another thread may have just created it. + if _pool is None or _pool.closed: + _pool = psycopg2.pool.ThreadedConnectionPool( + _POOL_MIN, _POOL_MAX, DATABASE_URL, + options="-c client_encoding=UTF8", + ) + _log.info("DB Pool initialized (min=%d, max=%d)", _POOL_MIN, _POOL_MAX) + return _pool @contextmanager def get_conn(): @@ -154,18 +164,37 @@ def clean_num(v: Any) -> Optional[float]: return None def clean_int(v: Any) -> Optional[int]: + """Coerce to int. [BUG-07] Rounds rather than truncates so '3.9' → 4. + + Truncation via int(float(s)) silently lost ~half a unit on any float- + valued integer field. All current call sites pass intrinsically-integer + fields (gpsNum, statusFlags, enabledFlag, etc.) so behaviour is + unchanged for them; the rounding semantics are a guard against future + fields that may arrive as decimals.""" s = clean(v) if s is None: return None try: - return int(float(s)) + return round(float(s)) except (ValueError, TypeError): return None +_DATE_ONLY_RE = re.compile(r"^\d{4}-\d{2}-\d{2}$") + def clean_ts(v: Any) -> Optional[str]: - """Clean timestamp string for PostgreSQL insertion.""" + """Clean timestamp string for PostgreSQL insertion. + + [BUG-09] datetime.fromisoformat("2024-04-12") is valid in Python 3.11+ + and produces a naive datetime at 00:00:00. Postgres then stored it as + midnight UTC, which is 03:00 Africa/Nairobi — three hours off the + operator's intent. The fleet runs in Nairobi (UTC+3), and date-only + fields in the onboarding CSV (activation_time, expiration) are meant + to land at local midnight. Append `T00:00:00+03:00` for those inputs. + Inputs that already carry a time component are returned unchanged.""" s = clean(v) if s is None: return None + if _DATE_ONLY_RE.match(s): + return f"{s}T00:00:00+03:00" try: datetime.fromisoformat(s.replace("Z", "+00:00")) return s @@ -299,14 +328,15 @@ def _refresh_token(refresh_token: str) -> Optional[str]: def _update_token_cache(r: dict) -> str: token, expires_in = r["accessToken"], int(r.get("expiresIn", 7200)) expires_at = datetime.now(timezone.utc) + timedelta(seconds=expires_in) + # [BUG-11] get_conn() auto-commits on __exit__; the previous explicit + # conn.commit() was redundant. Removed. with get_conn() as conn: with conn.cursor() as cur: cur.execute(""" INSERT INTO tracksolid.api_token_cache (account, access_token, refresh_token, expires_at) VALUES (%s, %s, %s, %s) - ON CONFLICT (account) DO UPDATE SET - access_token=EXCLUDED.access_token, refresh_token=EXCLUDED.refresh_token, + ON CONFLICT (account) DO UPDATE SET + access_token=EXCLUDED.access_token, refresh_token=EXCLUDED.refresh_token, expires_at=EXCLUDED.expires_at, obtained_at=NOW() """, (USER_ID, token, r.get("refreshToken"), expires_at)) - conn.commit() return token \ No newline at end of file