Compare commits

...

9 commits

Author SHA1 Message Date
David Kiania
8867be9d3d perf+fix: SAVEPOINT-per-item pollers, batched GPS inserts, parallel detail fetch
Some checks are pending
Static Analysis / static (push) Waiting to run
Tests / test (push) Waiting to run
Audit fixes across the ingestion stack:

Observability
- Move log_ingestion out of batch loops in poll_alarms and poll_parking
  (was emitting N cumulative log rows per run instead of one).
- Add missing log_ingestion + t0 to poll_trips.
- Count inserted via cur.rowcount instead of naive +=1 so ON CONFLICT
  DO NOTHING no longer inflates the metric.

Resilience
- SAVEPOINT-per-item added to poll_alarms, poll_live_positions,
  poll_trips, poll_parking so one bad row no longer aborts the batch
  (webhook handlers already had this; pollers were inconsistent).

Performance
- /pushgps and poll_track_list now use psycopg2.extras.execute_values
  with ON CONFLICT DO NOTHING — 10-50x write throughput on larger
  batches.
- sync_devices and sync_driver_audit fetch jimi.track.device.detail
  concurrently via ThreadPoolExecutor(max_workers=8), cutting the
  daily registry sync from ~24s to ~3s for an 80-device fleet.
- poll_track_list split into two phases: parallel API fetch (4 workers,
  no DB connection held) then one batched write. Previously the DB
  connection was held across every per-IMEI HTTP call, risking pool
  starvation.

Security
- _validate_token uses hmac.compare_digest for constant-time token
  comparison (closes timing side-channel).
- _parse_data_list caps incoming items at WEBHOOK_MAX_ITEMS (default
  5000) so a pathological push cannot blow memory.

Tests
- Fix test_null_alarm_type_skipped: its INSERT-count assertion was
  catching the ingestion_log insert written by log_ingestion. Filter
  that out so the test checks only data-table inserts.
- Full suite: 66 passed.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-18 00:33:55 +03:00
David Kiania
f7cc48cc6a chore: align .python-version to 3.12.0 (matches Docker image and pyproject.toml) 2026-04-12 21:41:43 +03:00
David Kiania
20d3ddb841 feat: add db_audit health checks, runner, and scheduled Forgejo workflow
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-12 21:40:29 +03:00
David Kiania
6ed4d3a1e2 test: add test suite - unit tests, webhook endpoint tests, and CI workflow
57 unit tests covering clean helpers, API signing, and field mapping fixes
(FIX-E06, FIX-M16, BUG-01, BUG-03); integration tests for webhook endpoints
with mocked DB; Forgejo CI workflow with TimescaleDB service container.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-12 21:38:20 +03:00
David Kiania
2ca3d2f021 ci: add ruff + mypy static analysis config and Forgejo workflow 2026-04-12 21:32:33 +03:00
David Kiania
75d3417a2b docs: add quality program design spec 2026-04-12 21:31:56 +03:00
David Kiania
f9834564ab Add CLAUDE.md and project docs for structured Claude project
CLAUDE.md: cached context file covering project identity, tech stack,
codebase map, schema quick-ref, API gotchas, fix history, working rules,
fleet state, and open items. Structured for maximum cache efficiency —
stable content first, dynamic state at the end.

docs/CONNECTIONS.md: connection parameter shapes (no secrets) for SSH,
DB, API, container resolution, Forgejo, Grafana, n8n.

docs/PROJECT_CONTEXT.md: client business context (telco field service,
3 cities, service types), data quality gaps, KPI framework by domain,
integration roadmap.

docs/KPI_FRAMEWORK.md: living KPI register with status tracking,
thresholds, client feedback log, and review checklist. To be co-developed
with client iteratively.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-12 20:59:15 +03:00
David Kiania
2131faf8c6 Add 260412 baseline report — first trip data, FIX-M16 confirmed
Post-deployment snapshot at ~00:15 EAT 2026-04-12. Key changes vs 260410:
- 3 trips recorded (FRED KMGW 538W HULETI, 6.94 km total) — pipeline validated
- FIX-M16 distance unit fix confirmed: implied speed matches API avgSpeed exactly
- 70 track_list fixes in 24h (was 13) — dense trail from active driving
- KDK 829A GP returned to primary depot from secondary Nairobi East cluster
- Uganda anomaly (X3-63282) persists — flagged for management
- Driver name root cause confirmed: not assigned in Tracksolid Pro UI

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-12 00:14:27 +03:00
David Kiania
6a0ceb78dd Fix trip distance unit (metres→km) and full device sync on upsert
[FIX-M16] jimi.device.track.mileage returns distance in metres despite
docs claiming km. Confirmed: avgSpeed × runTimeSecond / 3600 = distance/1000.
poll_trips() now divides raw value by 1000 before storing as distance_km.
3 existing bad rows corrected in prod DB (distance_km / 1000).

[FIX-M17] sync_devices() ON CONFLICT clause was only updating 5 of 26
fields, silently dropping driver_phone, sim, iccid, vehicle_name, status
etc. on subsequent syncs. Expanded to update all device fields so driver
assignments made in Tracksolid Pro UI propagate to DB on next daily sync.

Add sync_driver_audit.py: one-shot script to compare API vs DB device
registry, report driver/IMEI gaps, and force a full field upsert.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-12 00:06:57 +03:00
33 changed files with 2508 additions and 247 deletions

View file

@ -0,0 +1,23 @@
name: Static Analysis
on: [push, pull_request]
jobs:
static:
runs-on: self-hosted
steps:
- uses: actions/checkout@v4
- name: Install Python
run: pip install ruff mypy psycopg2-binary requests fastapi uvicorn python-multipart
- name: Lint with ruff
run: ruff check .
- name: Type check with mypy
run: >
mypy
ts_shared_rev.py
ingest_movement_rev.py
ingest_events_rev.py
webhook_receiver_rev.py

View file

@ -0,0 +1,40 @@
name: Tests
on: [push, pull_request]
jobs:
test:
runs-on: self-hosted
services:
timescaledb:
image: timescale/timescaledb-ha:pg16-ts2.15
env:
POSTGRES_PASSWORD: test
POSTGRES_DB: tracksolid_test
POSTGRES_USER: postgres
ports:
- 5433:5432
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- uses: actions/checkout@v4
- name: Install dependencies
run: |
pip install pytest pytest-asyncio httpx psycopg2-binary requests \
fastapi uvicorn python-multipart
- name: Run tests
run: pytest tests/ -v --tb=short
env:
TRACKSOLID_APP_KEY: test_key
TRACKSOLID_APP_SECRET: test_secret
TRACKSOLID_USER_ID: test_user
TRACKSOLID_PWD_MD5: test_md5
DATABASE_URL: postgresql://postgres:test@localhost:5433/tracksolid_test
TEST_DATABASE_URL: postgresql://postgres:test@localhost:5433/tracksolid_test
JIMI_WEBHOOK_TOKEN: ""

View file

@ -0,0 +1,20 @@
name: DB Audit
on:
schedule:
- cron: "0 3 * * *" # 03:00 UTC = 06:00 EAT daily
workflow_dispatch: # Also runnable manually from Forgejo UI
jobs:
audit:
runs-on: self-hosted
steps:
- uses: actions/checkout@v4
- name: Install dependencies
run: pip install psycopg2-binary
- name: Run DB audit
run: python db_audit/run_audit.py
env:
DATABASE_URL: ${{ secrets.DATABASE_URL }}

View file

@ -1 +1 @@
3.13
3.12.0

315
260412_baseline_report.md Normal file
View file

@ -0,0 +1,315 @@
# Fireside Communications — Fleet Baseline Report
**Date:** 2026-04-12 · **Time of queries:** ~00:15 EAT
**Database:** tracksolid_db on TimescaleDB
**Container:** timescale_db-bo3nov2ija7g8wn9b1g2paxs-192322642108
**Report scope:** All 63 registered devices · All tables · Post-migration 04 + 05
---
## 1. Migration Status
All four schema migrations applied and tracked:
| Migration File | Applied (EAT) | Status |
|---|---|---|
| `02_tracksolid_full_schema_rev.sql` | 2026-04-11 22:25:37 | ✓ Applied |
| `03_webhook_schema_migration.sql` | 2026-04-11 22:25:37 | ✓ Applied |
| `04_bug_fix_migration.sql` | 2026-04-11 22:25:37 | ✓ Applied — `distance_km` renamed & corrected |
| `05_enhancement_migration.sql` | 2026-04-11 22:25:37 | ✓ Applied — new tables + columns |
Schema is fully current. No pending migrations.
---
## 2. Table Row Counts (as of 00:15 EAT)
| Table | Rows | Δ vs 260410 | Notes |
|---|---|---|---|
| `tracksolid.devices` | **63** | — | Full fleet registry |
| `tracksolid.live_positions` | **19** | — | 19 devices with a known position (30% of fleet) |
| `tracksolid.position_history` | **101** | 36 | New container; accumulating since 22:25 EAT Apr 11 |
| `tracksolid.position_history` (`track_list`) | **70** | +57 | High-res trail density growing strongly |
| `tracksolid.alarms` | **3** | +1 | ACC_ON/ACC_OFF events from evening movement |
| `tracksolid.trips` | **3** | +3 | **First real trips recorded — FIX-M16 distance fix confirmed** |
| `tracksolid.parking_events` | **0** | — | Fix deployed; will populate with completed park cycles |
| `tracksolid.obd_readings` | **0** | — | Awaiting webhook registration |
| `tracksolid.device_events` | **0** | — | Awaiting `/pushevent` registration |
| `tracksolid.fuel_readings` | **0** | — | Awaiting `/pushoil` registration |
| `tracksolid.temperature_readings` | **0** | — | Awaiting `/pushtem` registration |
| `tracksolid.lbs_readings` | **0** | — | Awaiting `/pushlbs` registration |
| `tracksolid.geofences` | **0** | — | Not yet configured |
| `tracksolid.heartbeats` | **0** | — | Awaiting heartbeat webhook |
| `tracksolid.fault_codes` | **0** | — | Awaiting fault code data |
| `tracksolid.ingestion_log` | **43** | — | New container; fresh audit trail |
| `dwh_gold.fact_daily_fleet_metrics` | **0** | — | ETL not yet run |
| `dwh_gold.dim_vehicles` | **0** | — | Awaiting population |
---
## 3. Fleet Composition
**63 devices across 4 device models — unchanged:**
| Model | Count | Typical Use |
|---|---|---|
| AT4 | 23 | Asset / cargo hardwired tracker |
| JC400P | 23 | Camera-capable tracker (larger vehicles) |
| X3 | 10 | Compact vehicle tracker |
| GT06E | 7 | OBD-port tracker |
| **Total** | **63** | |
---
## 4. Full Device Registry
> All 63 devices. `driver_name` is blank for every device — confirmed root cause: no drivers assigned in Tracksolid Pro account (not a DB sync issue). `vehicle_number` also unpopulated.
| Device Name | Model | SIM | Odometer (km) | Expires | Status |
|---|---|---|---|---|---|
| AT4-51820 | AT4 | — | — | — | No position |
| AT4-53099 | AT4 | — | — | — | No position |
| AT4-54246 | AT4 | — | — | — | No position |
| AT4-55029 | AT4 | — | — | — | No position |
| AT4-55235 | AT4 | — | — | — | No position |
| AT4-57389 | AT4 | — | — | — | No position |
| AT4-61860 | AT4 | — | — | — | No position |
| AT4-64815 | AT4 | — | 0 | 2036-02-05 | Inactive (1,573h) |
| AT4-64823 | AT4 | — | — | — | No position |
| AT4-64880 | AT4 | — | — | — | No position |
| AT4-64989 | AT4 | — | — | — | No position |
| AT4-65010 | AT4 | — | — | — | No position |
| AT4-65135 | AT4 | — | — | — | No position |
| AT4-65341 | AT4 | — | — | — | No position |
| AT4-65598 | AT4 | — | — | — | No position |
| AT4-65648 | AT4 | — | — | — | No position |
| AT4-66158 | AT4 | — | — | — | No position |
| AT4-67271 | AT4 | — | — | — | No position |
| AT4-67693 | AT4 | — | — | — | No position |
| KCE 690F | AT4 | — | 0 | 2039-07-01 | Inactive (57,329h) |
| KCS 903Y JK SUB | AT4 | 0700024569 | 4 | 2039-06-09 | Inactive (15,229h) |
| KCU 865Q Vanguard Sub | AT4 | 0757270804 | 10 | 2039-12-20 | Inactive (15,445h) |
| KMEH 692C KAWASAKI | AT4 | 0110094467 | 3 | 2040-04-03 | Inactive (24,709h) |
| Belta KCU-647D | GT06E | 0110094465 | 235 | 2040-04-03 | Inactive (7,584h) — SERVICE FLAG |
| GT06E-85428 | GT06E | — | — | — | No position |
| GT06E-86319 | GT06E | — | — | — | No position |
| JK Subaru KCS 903Y | GT06E | 0746759925 | 73 | 2039-06-12 | Very stale (670h) |
| KCU 145Q Solo Xtrail | GT06E | 0757270810 | 53 | 2039-12-20 | Inactive (7,546h) |
| KCU 865Q Vanguard | GT06E | 0757270763 | 62 | 2039-12-20 | Stale (79h) |
| KDK 829A GP | GT06E | 0707923872 | 239 | 2042-10-29 | **Recent (2h) — DEPOT** — SERVICE FLAG |
| JC400P-07904 | JC400P | — | — | — | No position |
| JC400P-85041 | JC400P | — | — | — | No position |
| JC400P-85058 | JC400P | — | — | — | No position |
| JC400P-85751 | JC400P | — | 0 | 2036-03-11 | Inactive (746h) |
| JC400P-86270 | JC400P | — | — | — | No position |
| JC400P-86403 | JC400P | — | — | — | No position |
| JC400P-87625 | JC400P | — | — | — | No position |
| JC400P-87831 | JC400P | — | — | — | No position |
| JC400P-89431 | JC400P | — | — | — | No position |
| JC400P-89530 | JC400P | — | — | — | No position |
| JC400P-89563 | JC400P | — | — | — | No position |
| JC400P-89662 | JC400P | — | — | — | No position |
| JC400P-89977 | JC400P | — | — | — | No position |
| JC400P-90108 | JC400P | — | — | — | No position |
| JC400P-90199 | JC400P | — | — | — | No position |
| JC400P-90678 | JC400P | — | — | — | No position |
| JC400P-91619 | JC400P | — | — | — | No position |
| JC400P-92278 | JC400P | — | — | — | No position |
| JC400P-92716 | JC400P | — | — | — | No position |
| JC400P-92732 | JC400P | — | — | — | No position |
| JC400P-94233 | JC400P | — | — | — | No position |
| KDU 878T_CAM | JC400P | 0708351897 | 2 | 2035-08-18 | Inactive (3,081h) |
| KDW 632M HL Cam | JC400P | 300002396032 IoT | 0 | 2036-03-11 | Inactive (756h) |
| FRED KMGW 538W HULETI | X3 | 0119867174 | 2 | 2036-02-08 | **Active (0.1h) — MOVED** |
| KDU 878T_Track | X3 | 0708352823 | 5 | 2035-08-18 | Stale (79h) |
| KDW 632M HL Tracker | X3 | 300002396033 IoT | 0 | 2036-02-09 | Inactive (744h) |
| KMGR 409U HENRY JAZZ | X3 | 0768697302 | 7 | 2035-07-31 | Recent (7h) |
| X3-59405 | X3 | — | — | — | No position |
| X3-63282 | X3 | — | 4 | 2036-02-14 | **Active (0.2h) — UGANDA ANOMALY PERSISTS** |
| X3-64223 | X3 | — | — | — | No position |
| X3-68968 | X3 | — | 0 | 2036-03-11 | Inactive (744h) |
| X3-69172 | X3 | — | — | — | No position |
| X3-78553 | X3 | — | — | — | No position |
---
## 5. Live Position Coverage
**19 of 63 devices (30%)** have a position in `live_positions` — same count as 260410.
**44 devices (70%)** have no position at all — offline, SIM not installed, or never activated.
### Freshness Bands
| Band | Count | Devices |
|---|---|---|
| < 2 hours (active) | 2 | FRED KMGW 538W HULETI, X3-63282 |
| 224 hours (recent) | 2 | KDK 829A GP (2h), KMGR 409U HENRY JAZZ (7h) |
| 17 days (stale) | 2 | KCU 865Q Vanguard (79h), KDU 878T_Track (79h) |
| 112 months (very stale) | 3 | JK Subaru KCS 903Y (670h), KCU 145Q (7,546h), Belta KCU-647D (7,584h) |
| > 1 year (inactive) | 10 | KDU 878T_CAM, KCS 903Y JK SUB, KCU 865Q Vanguard Sub, KMEH 692C KAWASAKI, KCE 690F, etc. |
### Full Live Position Detail
| Device | Model | Lat | Lng | Speed (km/h) | ACC | GPS Signal | Satellites | Last Fix (EAT) |
|---|---|---|---|---|---|---|---|---|
| FRED KMGW 538W HULETI | X3 | -1.24444 | 36.72321 | 0 | Off | 4 | 13 | 2026-04-12 00:02:54 |
| X3-63282 | X3 | 0.19566 | 32.54004 | 0 | Off | 4 | 11 | 2026-04-11 23:58:38 |
| KDK 829A GP | GT06E | -1.23850 | 36.72677 | 0 | Off | 4 | 9 | 2026-04-11 22:09:59 |
| KMGR 409U HENRY JAZZ | X3 | -1.23743 | 36.72663 | 3 | Off | 3 | 3 | 2026-04-11 16:47:20 |
| KCU 865Q Vanguard | GT06E | -1.23748 | 36.72641 | 5 | Off | 0 | 5 | 2026-04-08 17:17:45 |
| KDU 878T_Track | X3 | -1.23528 | 36.72871 | 0 | Off | 4 | 10 | 2026-04-08 17:16:55 |
| JK Subaru KCS 903Y | GT06E | -1.23560 | 36.72868 | 0 | Off | 1 | 6 | 2026-03-15 01:52:33 |
| X3-68968 | X3 | -1.23799 | 36.72615 | 0 | Off | 4 | 15 | 2026-03-11 23:59:28 |
| KDW 632M HL Tracker | X3 | -1.24087 | 36.72839 | 0 | Off | 4 | 6 | 2026-03-11 23:53:44 |
| JC400P-85751 | JC400P | -1.23796 | 36.72611 | 0 | Off | 4 | 15 | 2026-03-11 22:15:44 |
| KDW 632M HL Cam | JC400P | -1.24115 | 36.72847 | 0 | Off | 4 | 0 | 2026-03-11 11:52:01 |
| AT4-64815 | AT4 | -1.24136 | 36.72872 | 0 | Off | 4 | 4 | 2026-02-05 11:19:55 |
| KDU 878T_CAM | JC400P | -1.06900 | 37.01436 | 12 | Off | 4 | 15 | 2025-12-04 15:27:42 |
| KCU 145Q Solo Xtrail | GT06E | -1.29728 | 36.88850 | 0 | Off | 4 | 7 | 2025-06-01 14:04:47 |
| Belta KCU-647D | GT06E | -1.15151 | 36.63857 | 0 | Off | 4 | 11 | 2025-05-30 23:53:22 |
| KCS 903Y JK SUB | AT4 | -1.23529 | 36.72875 | 0 | Off | 4 | 3 | 2024-07-16 10:41:42 |
| KCU 865Q Vanguard Sub | AT4 | -1.23522 | 36.73104 | 0 | Off | 4 | 5 | 2024-07-07 10:43:21 |
| KMEH 692C KAWASAKI | AT4 | -1.23849 | 36.72460 | 0 | Off | 4 | 11 | 2023-06-17 10:41:18 |
| KCE 690F | AT4 | -1.24008 | 36.74522 | 31 | Off | 4 | 6 | 2019-09-27 07:20:08 |
---
## 6. Geographic Clustering
| Cluster | Area | Coords | Active Devices | Δ vs 260410 |
|---|---|---|---|---|
| **Primary depot** | Nairobi West / Kikuyu Rd corridor | -1.235 to -1.244, 36.722 to 36.731 | 14 devices | KDK 829A GP moved here from secondary cluster |
| **Secondary** | Nairobi East / Thika Rd | -1.297, 36.888 | 1 device | KDK 829A GP departed — now only KCU 145Q Solo (stale) |
| **Outlier** | Thika / Ruiru | -1.069, 37.014 | 1 device (KDU 878T_CAM) | Unchanged |
| **CRITICAL** | **Uganda — Kampala region** | **0.196, 32.540** | **1 device (X3-63282)** | **Persists — no change** |
> **KDK 829A GP position change confirmed:** was at -1.328, 36.900 (Nairobi East) in the 260410 report; now at -1.238, 36.727 (primary depot). Vehicle drove from the secondary cluster to the main yard between the two report windows.
---
## 7. Position History
**Total fixes: 101** across two ingestion sources (new container; accumulating since 22:25 EAT Apr 11):
| Source | Fixes | Method | Frequency |
|---|---|---|---|
| `poll` | 31 | Fleet-wide 60s sweep | Every 60 seconds |
| `track_list` | 70 | Per-device high-res trail (POLL-01) | Every 30 minutes |
| **Total** | **101** | | |
### Per-Device Fixes — Last 24 Hours
| Device | Model | Source | Fixes | First Fix (EAT) | Last Fix (EAT) | Avg Speed | Max Speed |
|---|---|---|---|---|---|---|---|
| FRED KMGW 538W HULETI | X3 | track_list | 69 | 2026-04-11 21:52:44 | 2026-04-11 23:50:12 | 18.1 km/h | 53 km/h |
| FRED KMGW 538W HULETI | X3 | poll | 7 | 2026-04-11 22:25:12 | 2026-04-12 00:02:54 | 5.0 km/h | 35 km/h |
| X3-63282 | X3 | poll | 4 | 2026-04-11 22:13:38 | 2026-04-11 23:58:38 | 0.0 km/h | 0 km/h |
| X3-63282 | X3 | track_list | 1 | 2026-04-11 21:58:38 | 2026-04-11 21:58:38 | 0.0 km/h | 0 km/h |
| KDK 829A GP | GT06E | poll | 1 | 2026-04-11 22:09:59 | 2026-04-11 22:09:59 | 0.0 km/h | 0 km/h |
| KMGR 409U HENRY JAZZ | X3 | poll | 1 | 2026-04-11 16:47:20 | 2026-04-11 16:47:20 | 3.0 km/h | 3 km/h |
> **FRED KMGW 538W HULETI** generated 69 high-resolution track_list waypoints with avg 18.1 km/h and peak 53 km/h — confirming real road movement during the evening. This is the first active driving data since pipeline deployment.
---
## 8. Alarms
**Total alarms: 3** — all on FRED KMGW 538W HULETI, corresponding to evening trips.
| # | Device | Alarm Type | Alarm Name | Time (EAT) | Lat | Lng | Speed |
|---|---|---|---|---|---|---|---|
| 1 | FRED KMGW 538W HULETI | ACC_ON | ACC ON | 2026-04-11 22:07:27 | -1.23950 | 36.73979 | 0 |
| 2 | FRED KMGW 538W HULETI | ACC_OFF | ACC OFF | 2026-04-11 22:28:23 | -1.24441 | 36.72324 | 0 |
| 3 | FRED KMGW 538W HULETI | ACC_OFF | ACC OFF | 2026-04-11 23:35:13 | -1.24428 | 36.72300 | 0 |
**Key findings:**
- ACC_ON at 22:07 → vehicle started; ACC_OFF at 22:28 → parked briefly; ACC_OFF at 23:35 → final park. Consistent with the 3 trips recorded.
- ACC_ON event at -1.23950, 36.73979 — slightly east of primary depot, consistent with trip 3 start coordinates.
- No vibration alerts this window (2 vibration alerts overnight on 260410). Quieter night.
- No speeding, geofence, or power alarms.
---
## 9. Trips
**Trips recorded: 3** — all FRED KMGW 538W HULETI on the evening of 2026-04-11.
> **FIX-M16 confirmed working:** distances are physically consistent with duration and speed.
| # | Device | Start (EAT) | End (EAT) | Distance (km) | Drive Time (s) | Implied Speed | Avg Speed (API) |
|---|---|---|---|---|---|---|---|
| 1 | FRED KMGW 538W HULETI | 21:47:05 | 21:49:44 | 1.430 km | 159s | 32.4 km/h | 32.41 km/h ✓ |
| 2 | FRED KMGW 538W HULETI | 23:13:05 | 23:20:22 | 2.600 km | 437s | 21.4 km/h | 21.38 km/h ✓ |
| 3 | FRED KMGW 538W HULETI | 23:27:36 | 23:35:13 | 2.910 km | 457s | 22.9 km/h | 22.93 km/h ✓ |
| **Total** | | | | **6.940 km** | **1,053s (17.6 min)** | | |
**Notes:**
- Pre-fix, these trips were stored as 1,432 km / 2,596 km / 2,910 km — corrected in-place by DB update and code fix deployed.
- `max_speed_kmh` not yet populated for these trips — API field `maxSpeed` not returned by `jimi.device.track.mileage` for this device/window.
- Short urban trips (1.42.9 km) at 2132 km/h — consistent with Nairobi city driving near the Kikuyu Rd depot.
---
## 10. Parking Events
**Parking events: 0**
POLL-02 fix deployed (`acc_type=0`, corrected `durSecond` mapping). API responding cleanly (14 calls, 0 rows). Events will populate once a complete park-stop-drive cycle is observed by the poller window.
---
## 11. Ingestion Pipeline Health
Container uptime: ~1h 50min at time of queries (restarted 22:25 EAT Apr 11 for FIX-M16 deployment).
| Endpoint | Calls | Rows Upserted | Rows Inserted | Avg Duration | Failures | First Call (EAT) | Last Call (EAT) |
|---|---|---|---|---|---|---|---|
| `jimi.user.device.location.list` | 21 | 399 | 399 | 575ms | 0 | 2026-04-11 22:25:45 | 2026-04-12 00:10:58 |
| `jimi.open.platform.report.parking` | 14 | 0 | 0 | 10,863ms | 0 | 2026-04-11 22:25:55 | 2026-04-12 00:04:34 |
| `jimi.device.track.list` | 4 | 0 | 77 | 237,175ms | 0 | 2026-04-11 22:26:10 | 2026-04-12 00:04:41 |
| `jimi.device.alarm.list` | 3 | 0 | 4 | 344ms | 0 | 2026-04-11 22:30:39 | 2026-04-12 00:04:34 |
| `jimi.user.device.list+detail` | 2 | 126 | 0 | 5,768ms | 0 | 2026-04-11 22:25:39 | 2026-04-12 00:04:12 |
| **Total** | **44** | | | | **0** | | |
**Observations:**
- **Zero failures across all 44 API calls** — pipeline stable after restart.
- Location polling: 21 calls, 575ms avg — consistent with 260410 (493ms). Slightly slower, within normal variance.
- Track list: 4 calls, 77 waypoints at 237s avg — slower per call than 260410 (137s). Likely due to FRED KMGW 538W HULETI generating dense waypoints during active driving.
- Alarm poll: 344ms avg — fast and clean.
- Device sync: 2 runs since restart; all 63 device records updated with full field sync (FIX-M17 now active).
---
## 12. Changes Since 260410 Baseline
| Area | 260410 | 260412 | Assessment |
|---|---|---|---|
| Trips recorded | 0 | **3** | First real trip data — pipeline validated end-to-end |
| Trip distance accuracy | Broken (km stored as m) | **Fixed (FIX-M16)** | Implied speed matches API avgSpeed exactly |
| `sync_devices` ON CONFLICT | 5 fields only | **26 fields** (FIX-M17) | Driver/phone/SIM will now update on each daily sync |
| `track_list` fixes (24h) | 13 | **70** | FRED KMGW 538W HULETI drove → dense trail captured |
| FRED KMGW 538W HULETI | Parked at depot | **Active — 6.94 km driven** | First confirmed driving data |
| KDK 829A GP | Secondary cluster (Nairobi East) | **Primary depot** | Returned to main yard overnight |
| `sync_driver_audit.py` | Not present | **Added** | One-shot tool for API↔DB driver/IMEI gap reporting |
| Driver names in DB | 0 | **0** | Root cause confirmed: not assigned in Tracksolid Pro UI |
| Uganda anomaly (X3-63282) | Active at 0.196, 32.540 | **Persists — no change** | Requires investigation |
| Service flags | Belta KCU-647D (234,546 km), KDK 829A GP (239,264 km) | **Same — odometers approaching 240k** | Maintenance overdue |
---
## 13. Open Items
| Priority | Item | Owner |
|---|---|---|
| HIGH | Assign driver names + vehicle numbers in Tracksolid Pro UI | Operations |
| HIGH | Investigate X3-63282 in Uganda (Kampala region) — legitimate deployment or stolen? | Management |
| HIGH | Service KDK 829A GP (239,264 km) and Belta KCU-647D (235,000 km) | Fleet maintenance |
| MEDIUM | Register webhooks in Tracksolid Pro: `/pushobd`, `/pushoil`, `/pushtem`, `/pushlbs`, `/pushevent` | DevOps |
| MEDIUM | Set `fuel_100km` per vehicle type to activate fuel cost analytics | Operations |
| MEDIUM | Investigate 44 devices with no GPS fix — deployed? SIM installed? | Fleet ops |
| LOW | Define geofences — depot boundary, approved route corridors | Operations |
| LOW | Run nightly ETL: `SELECT dwh_gold.refresh_daily_metrics(CURRENT_DATE - 1)` | DevOps |
---
*Report generated: 2026-04-12 ~00:15 EAT · Stack: TimescaleDB 2.15 + PostGIS + Tracksolid Pro Open Platform API*
*Pipeline: `ingest_movement_rev.py` v2.2 (FIX-M16, FIX-M17) · `ingest_events_rev.py` · `webhook_receiver_rev.py`*

161
CLAUDE.md Normal file
View file

@ -0,0 +1,161 @@
# CLAUDE.md — Fireside Communications · Tracksolid Fleet Intelligence
## 1. What This Project Is
Fleet telematics ingestion and analytics stack for a **telco first-line support client** operating in Nairobi, Mombasa, and Kampala. The client dispatches field technicians to install, repair, and maintain home and business broadband, handle LOS signal faults, service migrations, and maintain outside plant infrastructure. The fleet is ~80 vehicles across three cities, all tracked via Tracksolid Pro (Jimi IoT API).
This repository ingests the Tracksolid Pro API into a TimescaleDB/PostGIS database and visualises fleet and operational KPIs in Grafana. The pipeline is deployed on Coolify at `stage.rahamafresh.com`.
**Repository:** `https://repo.rahamafresh.com/kianiadee/tracksolid_timescale_grafana_prod.git`
---
## 2. Tech Stack
| Layer | Technology |
|---|---|
| Ingestion | Python 3.12 — `ingest_movement_rev.py`, `ingest_events_rev.py`, `webhook_receiver_rev.py` |
| Shared utils | `ts_shared_rev.py` — token cache, DB pool, API signing, clean helpers |
| Database | PostgreSQL 16 + TimescaleDB 2.15 + PostGIS 3 (`tracksolid_db`) |
| Orchestration | Docker Compose on Coolify |
| Visualisation | Grafana (provisioned via custom image) |
| Workflow automation | n8n |
| API source | Tracksolid Pro / Jimi IoT Open Platform (`eu-open.tracksolidpro.com/route/rest`) |
| Version control | Forgejo at `repo.rahamafresh.com` |
---
## 3. Instance & Connection Parameters
See `docs/CONNECTIONS.md` for the full shape. Summary:
- **SSH:** `ssh -i ~/.ssh/id_ed25519 kianiadee@stage.rahamafresh.com`
- **DB name:** `tracksolid_db` · **DB user:** `postgres` (internal) · `tracksolid_owner` (app) · `grafana_ro` (read-only)
- **DB schema:** `tracksolid` (operational) · `infrastructure` · `dwh_gold` (aggregates)
- **Container naming:** Coolify appends a random suffix. Always resolve with:
```bash
docker ps --filter name=<service_name> --format "{{.Names}}" | head -1
```
e.g. `docker ps --filter name=timescale_db --format "{{.Names}}" | head -1`
- **Env vars:** loaded from `.env` via `env_file` in `docker-compose.yaml`. See `docs/CONNECTIONS.md` for variable names. Never hardcode secrets.
---
## 4. Codebase Map
```
ts_shared_rev.py # Shared: config, signing, DB pool, token cache, clean helpers
ingest_movement_rev.py # GPS positions, trips, parking, track-list (high-res trail), device sync
ingest_events_rev.py # Alarm events polling (fallback for webhook push)
webhook_receiver_rev.py # FastAPI push receiver: /pushobd /pushevent /pushtripreport etc.
sync_driver_audit.py # One-shot: API↔DB driver/IMEI gap report + full upsert
run_migrations.py # Applies SQL migrations in order at container startup
docker-compose.yaml # Services: timescale_db, ingest_movement, ingest_events,
# webhook_receiver, grafana
grafana/ # Grafana provisioning (baked into image)
n8n-workflows/ # n8n workflow exports
docs/ # Reference docs (connections, API, KPIs, project context)
02_tracksolid_full_schema_rev.sql # Full schema bootstrap
03..05_*.sql # Incremental migrations
01_BusinessAnalytics.md # SQL analytics library (reference before writing queries)
tracksolidApiDocumentation.md # API endpoint reference
260412_baseline_report.md # Latest fleet state snapshot
```
---
## 5. Database Schema — Key Tables
```sql
tracksolid.devices -- Master device registry (63 devices, imei PK)
tracksolid.live_positions -- Current position per device (1 row per IMEI, upserted)
tracksolid.position_history -- All GPS fixes (hypertable, partitioned by gps_time)
-- source: 'poll' (60s sweep) | 'track_list' (30m high-res)
tracksolid.trips -- Trip summaries: distance_km, driving_time_s, avg/max speed
tracksolid.parking_events -- Stop events with duration and address
tracksolid.alarms -- Alarm events (alarm_type, alarm_name, alarm_time)
tracksolid.obd_readings -- OBD diagnostics (push only, awaiting webhook registration)
tracksolid.device_events -- Power on/off tamper events
tracksolid.ingestion_log -- API call audit trail per endpoint
dwh_gold.fact_daily_fleet_metrics -- Nightly ETL aggregates per vehicle per day
```
Full DDL: `02_tracksolid_full_schema_rev.sql` + migrations `03``05`.
---
## 6. API Critical Facts
**Always read `tracksolidApiDocumentation.md` before adding a new endpoint call.**
| Fact | Detail |
|---|---|
| Auth | OAuth2 — token cached in `tracksolid.api_token_cache`, refreshed via `jimi.oauth.token.refresh` |
| Signing | MD5: `secret + sorted(k+v pairs) + secret` — see `build_sign()` in `ts_shared_rev.py` |
| Batch limit | Max 50 IMEIs per call for most endpoints |
| `distance` field | **Returns METRES, not km** despite docs. Always divide by 1000. (FIX-M16) |
| `driverName`/`driverPhone` | From `jimi.user.device.list` — will be NULL if not set in Tracksolid Pro UI |
| `alarm_type` field | API polling returns `alertTypeId`/`alarmTypeName` — NOT `alarmType`/`alarmName` (FIX-E06) |
| `durSecond` | Parking endpoint returns `durSecond`, not `seconds` (FIX-M13) |
| `jimi.device.track.mileage` | `startMileage`/`endMileage` are cumulative odometer in **metres** |
| Rate limit | Code 1006 — back off and retry with re-sign (handled in `api_post()`) |
| OBD data | Push only via `/pushobd` webhook — no polling endpoint exists |
---
## 7. Fix History (do not regress)
| Fix ID | File | What it fixed |
|---|---|---|
| FIX-M11 | `ingest_movement_rev.py` | Removed erroneous ×1000 on distance (was storing km as mm) |
| FIX-M13 | `ingest_movement_rev.py` | Parking: added `acc_type=0`, `account`; mapped `durSecond` |
| FIX-M14 | `ingest_movement_rev.py` | `poll_track_list()` — high-res GPS trail every 30m |
| FIX-M15 | `ingest_movement_rev.py` | `get_device_locations()` — on-demand precision refresh |
| FIX-M16 | `ingest_movement_rev.py` | `distance` from API is metres → divide by 1000 before storing |
| FIX-M17 | `ingest_movement_rev.py` | `sync_devices()` ON CONFLICT now updates all 26 fields (was 5) |
| FIX-E06 | `ingest_events_rev.py` | Alarm field mapping: `alertTypeId`/`alarmTypeName`/`alertTime` |
| BUG-02 | Migration 04 | Historical `distance_m` rows ÷1,000,000 → renamed to `distance_km` |
---
## 8. Working Rules
1. **No prod push without explicit user confirmation.** Always state what you are about to push and wait.
2. **Never rewrite a migration that is already applied.** Check `tracksolid.schema_migrations` first. Add a new numbered migration file for any schema change.
3. **Read before writing.** Before suggesting any code change, read the relevant source file. Before writing a query, check `01_BusinessAnalytics.md` for an existing pattern.
4. **Reuse shared utilities.** All DB access via `get_conn()`, all API calls via `api_post()`, all cleaning via `clean()` / `clean_num()` / `clean_int()` / `clean_ts()` in `ts_shared_rev.py`. Do not reinvent these.
5. **Resolve container names dynamically.** Never hardcode the Coolify suffix. Use `docker ps --filter name=<service>`.
6. **SSH only when asked.** Default workflow is local code → commit → push. SSH into the instance only when explicitly asked to test or run something live.
7. **Secrets from env only.** Connection strings, API keys, and passwords live in `.env`. Reference variable names from `docs/CONNECTIONS.md`, never values.
8. **Two developers, one incoming.** Write code and docs that a second developer (mixed technical/operations background) can follow without prior context.
---
## 9. Fleet State (as of 2026-04-12 baseline)
| Metric | Value |
|---|---|
| Total registered devices | 63 (growing to 80) |
| Devices with GPS fix < 2h | 2 |
| Devices never reported | 44 |
| Driver names populated | 0 — must be set in Tracksolid Pro UI first |
| Cities active | Nairobi (primary), Mombasa (deploying), Kampala (1 device confirmed) |
| Uganda anomaly | X3-63282 at 0.196, 32.540 — under investigation |
| Service flags | KDK 829A GP (239,264 km), Belta KCU-647D (235,000 km) |
Latest full snapshot: `260412_baseline_report.md`
---
## 10. Open Items (update as resolved)
| Priority | Item |
|---|---|
| HIGH | Assign driver names + vehicle numbers in Tracksolid Pro UI |
| HIGH | Register webhooks: `/pushobd` `/pushoil` `/pushtem` `/pushlbs` `/pushevent` |
| HIGH | Investigate X3-63282 in Kampala — legitimate or unauthorised? |
| MEDIUM | Set `fuel_100km` per vehicle type to activate fuel cost calculations |
| MEDIUM | Investigate 44 silent devices — SIM installed? Activated? |
| MEDIUM | Co-develop client KPI framework (see `docs/KPI_FRAMEWORK.md`) |
| LOW | Populate geofences — depot boundaries, city zones |
| LOW | Run nightly ETL: `SELECT dwh_gold.refresh_daily_metrics(CURRENT_DATE - 1)` |

0
db_audit/__init__.py Normal file
View file

View file

@ -0,0 +1,19 @@
-- Data gaps: enabled devices with no position_history or trips in last 7 days
SELECT
d.imei,
d.device_name,
d.enabled_flag,
MAX(ph.gps_time) AS last_position,
MAX(t.start_time) AS last_trip
FROM tracksolid.devices d
LEFT JOIN tracksolid.position_history ph
ON ph.imei = d.imei
AND ph.gps_time > NOW() - INTERVAL '7 days'
LEFT JOIN tracksolid.trips t
ON t.imei = d.imei
AND t.start_time > NOW() - INTERVAL '7 days'
WHERE d.enabled_flag = 1
GROUP BY d.imei, d.device_name, d.enabled_flag
HAVING MAX(ph.gps_time) IS NULL
AND MAX(t.start_time) IS NULL
ORDER BY d.imei;

View file

@ -0,0 +1,14 @@
-- Distance outliers: trips with impossible or suspicious distance in last 7 days
SELECT
imei,
start_time,
end_time,
distance_km,
source
FROM tracksolid.trips
WHERE start_time > NOW() - INTERVAL '7 days'
AND (
distance_km < 0
OR distance_km > 500
)
ORDER BY distance_km DESC;

View file

@ -0,0 +1,11 @@
-- Duplicate (imei, gps_time) pairs in position_history
-- Should always return 0 rows if ON CONFLICT DO NOTHING is working correctly
SELECT
imei,
gps_time,
COUNT(*) AS duplicate_count
FROM tracksolid.position_history
WHERE gps_time > NOW() - INTERVAL '7 days'
GROUP BY imei, gps_time
HAVING COUNT(*) > 1
ORDER BY duplicate_count DESC;

View file

@ -0,0 +1,34 @@
-- Enum drift: unexpected values in source and other constrained columns
-- position_history.source should be: poll, push, track_list
SELECT
'position_history.source' AS check_column,
source AS unexpected_value,
COUNT(*) AS occurrences
FROM tracksolid.position_history
WHERE source NOT IN ('poll', 'push', 'track_list')
AND source IS NOT NULL
GROUP BY source
UNION ALL
-- trips.source should be: poll, push
SELECT
'trips.source',
source,
COUNT(*)
FROM tracksolid.trips
WHERE source NOT IN ('poll', 'push')
AND source IS NOT NULL
GROUP BY source
UNION ALL
-- alarms.source should be: poll, push
SELECT
'alarms.source',
source,
COUNT(*)
FROM tracksolid.alarms
WHERE source NOT IN ('poll', 'push')
AND source IS NOT NULL
GROUP BY source;

View file

@ -0,0 +1,30 @@
-- NULL integrity check across telemetry tables
SELECT
'position_history.imei_null' AS check_field,
COUNT(*) AS null_count
FROM tracksolid.position_history
WHERE imei IS NULL
UNION ALL
SELECT
'position_history.gps_time_null',
COUNT(*)
FROM tracksolid.position_history
WHERE gps_time IS NULL
UNION ALL
SELECT
'alarms.imei_null',
COUNT(*)
FROM tracksolid.alarms
WHERE imei IS NULL
UNION ALL
SELECT
'alarms.alarm_type_null',
COUNT(*)
FROM tracksolid.alarms
WHERE alarm_type IS NULL
UNION ALL
SELECT
'obd_readings.imei_null',
COUNT(*)
FROM tracksolid.obd_readings
WHERE imei IS NULL;

View file

@ -0,0 +1,14 @@
-- Stale devices: enabled devices with no GPS fix in last 2 hours
SELECT
d.imei,
d.device_name,
lp.gps_time AS last_gps_time,
EXTRACT(EPOCH FROM (NOW() - lp.gps_time)) / 3600 AS hours_since_fix
FROM tracksolid.devices d
LEFT JOIN tracksolid.live_positions lp ON lp.imei = d.imei
WHERE d.enabled_flag = 1
AND (
lp.gps_time IS NULL
OR lp.gps_time < NOW() - INTERVAL '2 hours'
)
ORDER BY hours_since_fix DESC NULLS FIRST;

161
db_audit/run_audit.py Normal file
View file

@ -0,0 +1,161 @@
"""
db_audit/run_audit.py Fireside Communications Fleet Telemetry DB Audit
Runs six health checks against the production TimescaleDB.
Writes results to tracksolid.health_checks for Grafana monitoring.
Exits with code 1 if any critical finding is detected.
Usage:
DATABASE_URL=postgresql://... python db_audit/run_audit.py
Checks:
stale_devices - Enabled devices with no GPS fix in >2h
null_integrity - NULL imei/gps_time in telemetry tables
distance_outliers - Trip distances <0 or >500 km in last 7 days
duplicate_positions - Duplicate (imei, gps_time) in position_history
data_gaps - Enabled devices with zero data in last 7 days
enum_drift - Unexpected values in source/severity columns
"""
from __future__ import annotations
import json
import os
import sys
import logging
from pathlib import Path
import psycopg2
import psycopg2.extras
# ── Config ────────────────────────────────────────────────────────────────────
DATABASE_URL = os.environ.get("DATABASE_URL")
if not DATABASE_URL:
print("ERROR: DATABASE_URL environment variable is required.", file=sys.stderr)
sys.exit(1)
CHECKS_DIR = Path(__file__).parent / "checks"
SCHEMA_FILE = Path(__file__).parent / "schema" / "health_checks_table.sql"
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
log = logging.getLogger("db_audit")
# ── Status Logic ──────────────────────────────────────────────────────────────
# Checks that produce CRITICAL status if they return any rows
CRITICAL_CHECKS = {"null_integrity", "duplicate_positions"}
# Checks that produce WARNING status if they return any rows
WARNING_CHECKS = {"stale_devices", "distance_outliers", "data_gaps", "enum_drift"}
def _determine_status(check_name: str, rows: list[dict]) -> str:
if not rows:
return "ok"
# null_integrity returns counts — critical if any count > 0
if check_name == "null_integrity":
has_nulls = any(row.get("null_count", 0) > 0 for row in rows)
return "critical" if has_nulls else "ok"
if check_name in CRITICAL_CHECKS:
return "critical"
if check_name in WARNING_CHECKS:
return "warning"
return "ok"
# ── Core Runner ───────────────────────────────────────────────────────────────
def run_checks() -> bool:
"""Run all checks. Returns True if any critical finding found."""
conn = psycopg2.connect(DATABASE_URL, options="-c client_encoding=UTF8")
conn.autocommit = False
try:
with conn.cursor() as cur:
# Ensure health_checks table exists
cur.execute(SCHEMA_FILE.read_text())
conn.commit()
log.info("health_checks table verified.")
has_critical = False
results = []
for sql_file in sorted(CHECKS_DIR.glob("*.sql")):
check_name = sql_file.stem
sql = sql_file.read_text()
log.info("Running check: %s ...", check_name)
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(sql)
rows = [dict(r) for r in cur.fetchall()]
status = _determine_status(check_name, rows)
row_count = len(rows)
# Serialize rows (convert non-JSON-serializable types)
detail = _safe_json(rows[:50]) # Cap at 50 rows to keep detail manageable
with conn.cursor() as cur:
cur.execute("""
INSERT INTO tracksolid.health_checks
(check_name, status, detail, row_count)
VALUES (%s, %s, %s, %s)
""", (check_name, status, json.dumps(detail), row_count))
conn.commit()
icon = "" if status == "ok" else ("⚠️ " if status == "warning" else "🔴")
log.info(" %s %s: %s (%d rows)", icon, check_name, status.upper(), row_count)
results.append((check_name, status, row_count))
if status == "critical":
has_critical = True
# Summary
print("\n" + "="*60)
print("DB AUDIT SUMMARY")
print("="*60)
for name, status, count in results:
indicator = "OK" if status == "ok" else ("WARN" if status == "warning" else "CRIT")
print(f" [{indicator:4s}] {name:<30} ({count} rows)")
print("="*60)
if has_critical:
print("RESULT: CRITICAL findings detected. Exit code 1.")
else:
print("RESULT: No critical findings. Exit code 0.")
print()
return has_critical
finally:
conn.close()
def _safe_json(rows: list[dict]) -> list[dict]:
"""Convert any non-JSON-serializable values (Decimal, datetime) to strings."""
import decimal
from datetime import datetime, date
def convert(v):
if isinstance(v, (datetime, date)):
return v.isoformat()
if isinstance(v, decimal.Decimal):
return float(v)
return v
return [{k: convert(v) for k, v in row.items()} for row in rows]
# ── Entry Point ───────────────────────────────────────────────────────────────
if __name__ == "__main__":
log.info("Starting DB audit...")
has_critical = run_checks()
sys.exit(1 if has_critical else 0)

View file

@ -0,0 +1,13 @@
-- Idempotent: safe to run on every audit start
CREATE TABLE IF NOT EXISTS tracksolid.health_checks (
id BIGSERIAL PRIMARY KEY,
checked_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
check_name TEXT NOT NULL,
status TEXT NOT NULL CHECK (status IN ('ok', 'warning', 'critical')),
detail JSONB,
row_count INT
);
-- Index for Grafana time-range queries
CREATE INDEX IF NOT EXISTS health_checks_checked_at_idx
ON tracksolid.health_checks (checked_at DESC);

111
docs/CONNECTIONS.md Normal file
View file

@ -0,0 +1,111 @@
# Connection Parameters Reference
**No secrets are stored here. All values come from `.env` at runtime.**
---
## SSH
```
Host: stage.rahamafresh.com
User: kianiadee
Key: ~/.ssh/id_ed25519
```
```bash
ssh -i ~/.ssh/id_ed25519 kianiadee@stage.rahamafresh.com
```
---
## Database
| Parameter | Value |
|---|---|
| Database | `tracksolid_db` |
| Host (internal) | `timescale_db` (Docker service name) |
| Port | `5432` |
| App user | `tracksolid_owner` |
| Read-only user | `grafana_ro` |
| Superuser | `postgres` |
### `.env` variable names
```
POSTGRES_DB=tracksolid_db
POSTGRES_USER=...
POSTGRES_PASSWORD=...
DATABASE_URL=postgresql://tracksolid_owner:<password>@timescale_db:5432/tracksolid_db
GRAFANA_DB_RO_PASSWORD=...
```
### Run a query from host
```bash
DB=$(docker ps --filter name=timescale_db --format "{{.Names}}" | head -1)
docker exec $DB psql -U postgres -d tracksolid_db -c "SELECT COUNT(*) FROM tracksolid.devices;"
```
### Run a query file
```bash
docker exec -i $DB psql -U postgres -d tracksolid_db < migration.sql
```
---
## Tracksolid Pro API
| Parameter | Env var |
|---|---|
| App key | `TRACKSOLID_APP_KEY` |
| App secret | `TRACKSOLID_APP_SECRET` |
| User ID | `TRACKSOLID_USER_ID` |
| Target account | `TRACKSOLID_TARGET_ACCOUNT` (defaults to USER_ID) |
| Password MD5 | `TRACKSOLID_PWD_MD5` |
| Base URL | `TRACKSOLID_API_URL` (default: `https://eu-open.tracksolidpro.com/route/rest`) |
---
## Container Name Resolution
Coolify appends a random suffix to all container names. Never hardcode. Always resolve:
```bash
# Pattern
docker ps --filter name=<service_name> --format "{{.Names}}" | head -1
# Examples
docker ps --filter name=timescale_db --format "{{.Names}}" | head -1
docker ps --filter name=ingest_movement --format "{{.Names}}" | head -1
docker ps --filter name=webhook_receiver --format "{{.Names}}" | head -1
docker ps --filter name=grafana --format "{{.Names}}" | head -1
```
Current suffix (may change on redeploy): `bo3nov2ija7g8wn9b1g2paxs-19xxxxxxxxxx`
---
## Forgejo
```
Host: https://repo.rahamafresh.com
Repo: kianiadee/tracksolid_timescale_grafana_prod
Remote: https://repo.rahamafresh.com/kianiadee/tracksolid_timescale_grafana_prod.git
```
---
## Grafana
- Deployed as Docker service `grafana`
- Provisioning baked into image (datasources + dashboards via `grafana/Dockerfile`)
- Admin password: `GF_SECURITY_ADMIN_PASSWORD` from `.env`
- Default dashboard: NOC Fleet Dashboard
---
## n8n
- Deployed as separate Coolify service (`n8n-usoksgg8o40044g0cw08s8wc`)
- Workflows exported to `n8n-workflows/`

101
docs/KPI_FRAMEWORK.md Normal file
View file

@ -0,0 +1,101 @@
# KPI Framework — Telco Field Service Fleet
## Fireside Communications · Co-developed with client
> **Status:** Draft — pending client review and validation.
> Update this file after each client feedback session. Move KPIs from Proposed → Active → Retired as the programme matures.
---
## How to Use This Document
1. **Proposed** — KPI defined, not yet validated with client
2. **Active** — Client confirmed this matters; query written; Grafana panel exists or is in progress
3. **Baseline set** — Enough historical data exists to set a meaningful target
4. **Retired** — No longer tracked (document reason)
Each active KPI should link to:
- The SQL query (or reference to `01_BusinessAnalytics.md`)
- The Grafana panel name/dashboard
- The refresh frequency
- The person who reviews it
---
## KPI Status Register
### Fleet Utilisation
| KPI | Status | SQL ref | Grafana panel | Reviewed by | Cadence |
|---|---|---|---|---|---|
| Utilisation rate (%) | Proposed | `01_BusinessAnalytics.md §2.1` | — | — | Daily |
| Idle time % of shift | Proposed | `01_BusinessAnalytics.md §2.2` | — | — | Daily |
| Vehicles not moved today | Proposed | `01_BusinessAnalytics.md §2.3` | — | — | Daily |
| Fleet km today | Proposed | `01_BusinessAnalytics.md §5.1` | — | — | Daily |
| Fleet km this week | Proposed | `01_BusinessAnalytics.md §5.2` | — | — | Weekly |
### Technician Productivity *(requires job system integration)*
| KPI | Status | SQL ref | Grafana panel | Reviewed by | Cadence |
|---|---|---|---|---|---|
| Jobs completed per tech per day | Proposed | TBD | — | — | Daily |
| First-time fix rate | Proposed | TBD | — | — | Weekly |
| Mean time to arrive (MTTA) | Proposed | TBD | — | — | Weekly |
| Mean time to repair (MTTR) | Proposed | TBD | — | — | Weekly |
| SLA compliance rate | Proposed | TBD | — | — | Weekly |
### Driver Behaviour
| KPI | Status | SQL ref | Grafana panel | Reviewed by | Cadence |
|---|---|---|---|---|---|
| Speeding events per 100 km | Proposed | `01_BusinessAnalytics.md §3.1` | — | — | Weekly |
| Harsh driving index | Proposed | `01_BusinessAnalytics.md §3.2` | — | — | Weekly |
| Late starts (count per driver) | Proposed | `01_BusinessAnalytics.md §3.3` | — | — | Monthly |
| Early knock-off | Proposed | `01_BusinessAnalytics.md §3.3` | — | — | Monthly |
| After-hours movement | Proposed | `01_BusinessAnalytics.md §3.4` | — | — | Daily |
### Asset Health & Cost
| KPI | Status | SQL ref | Grafana panel | Reviewed by | Cadence |
|---|---|---|---|---|---|
| Estimated idle fuel cost (KES) | Proposed | `01_BusinessAnalytics.md §2.2` | — | — | Monthly |
| Vehicles at service threshold | Proposed | TBD | — | — | Weekly |
| Alarm rate per vehicle/week | Proposed | `01_BusinessAnalytics.md §6` | — | — | Weekly |
| GPS offline rate | Proposed | — | — | — | Daily |
---
## Severity & Threshold Reference
Adjust with client after first month of live data:
| Metric | Green | Amber | Red |
|---|---|---|---|
| Fleet utilisation rate | > 60% | 4060% | < 40% |
| Idle time % of shift | < 15% | 1530% | > 30% |
| Speeding per 100 km | < 0.5 | 0.52.0 | > 2.0 |
| Harsh driving index | < 0.5 | 0.52.0 | > 2.0 |
| Late starts / month | 01 | 24 | ≥ 5 |
| Alarm rate / vehicle / week | 02 | 37 | > 7 |
| GPS offline rate | < 5% | 515% | > 15% |
| MTTA (minutes) | < 30 | 3060 | > 60 |
| First-time fix rate | > 85% | 7085% | < 70% |
| SLA compliance | > 95% | 8595% | < 85% |
---
## Client Feedback Log
| Date | Session | Feedback | Action |
|---|---|---|---|
| — | Initial framework | Draft created | Awaiting first client review |
---
## Next Review Checklist
- [ ] Confirm shift hours (start, end, lunch, working days)
- [ ] Confirm SLA tiers (home vs business customer)
- [ ] Confirm which KPIs the ops manager wants on a daily digest
- [ ] Confirm reporting format (Grafana link, PDF, WhatsApp summary)
- [ ] Identify job management system / ticketing tool for MTTA/MTTR
- [ ] Confirm vehicle categories (motorcycle, van, 4WD) for per-type benchmarks

131
docs/PROJECT_CONTEXT.md Normal file
View file

@ -0,0 +1,131 @@
# Project Context — Fireside Communications Fleet Intelligence
## The Client
A first-line technical support operation contracted by a large Kenyan/East African telco. The client manages field technicians who handle the full spectrum of last-mile broadband support:
| Service Type | Description |
|---|---|
| New installations | Fibre/broadband installs at home and business premises |
| Fault resolution | LOS (Loss of Signal) troubleshooting, slow service investigations |
| Outside plant maintenance | Physical cable, cabinet, and pole infrastructure maintenance |
| Migrations | Customer plan or technology upgrades requiring a site visit |
| Business customer support | Prioritised SLA-driven support for commercial accounts |
## Operational Geography
| City | Status | Notes |
|---|---|---|
| Nairobi | Primary — fully operational | Main depot at Kikuyu Rd corridor (~-1.237, 36.727) |
| Mombasa | Deploying | Fleet being onboarded |
| Kampala, Uganda | 1 device confirmed | X3-63282 at 0.196, 32.540 — status under investigation |
All three cities managed from a single Tracksolid Pro account and a single database instance. A `city` field or grouping by device group should be used for per-city analytics rather than separate schemas.
## The Fleet
- ~80 vehicles total (63 currently registered in Tracksolid Pro)
- Mix of motorcycles (courier/light inspection) and vans/4WDs (equipment and crew)
- Device models in use: AT4 (hardwired), JC400P (camera-capable), X3 (compact), GT06E (OBD)
- Vehicle identity (plate numbers, driver assignments) not yet populated in Tracksolid Pro — primary data quality gap
## Data Quality Gaps (as of April 2026)
| Gap | Impact | Resolution path |
|---|---|---|
| No driver names assigned | Reports show IMEIs instead of people | Assign in Tracksolid Pro UI → DB syncs nightly |
| No vehicle numbers populated | Cannot link vehicle to job/plate | Manual UPDATE or CSV import |
| 44 of 63 devices never reported GPS | Cannot track these vehicles | Verify SIM installation + activation |
| `fuel_100km` null for all devices | Fuel cost calculations inactive | Set by vehicle type via UPDATE |
| No geofences defined | Cannot alert on depot departures or route deviations | Define depot polygons + city zones |
| Webhooks not registered | OBD, fuel, temperature tables empty | Register in Tracksolid Pro account settings |
---
## KPI Framework
> This section is developed iteratively with the client. KPIs are grouped by operational domain. As client feedback arrives, move items from "Proposed" to "Active" and add the Grafana panel reference.
### Domain 1 — Fleet Utilisation
Measures whether vehicles are productively deployed during working hours.
| KPI | Definition | Target | Status |
|---|---|---|---|
| Utilisation rate | Drive time / shift hours × 100 | > 60% | Proposed |
| Idle time % | Engine-on-stationary / total shift | < 15% | Proposed |
| Vehicles not moved today | COUNT where no trip recorded | 0 | Proposed |
| Fleet km per day | SUM(distance_km) across all trips | Baseline TBD | Proposed |
### Domain 2 — Field Technician Productivity
Measures output per technician per day. **Requires job management system integration or manual job log.**
| KPI | Definition | Target | Status |
|---|---|---|---|
| Jobs completed per technician per day | Count of closed jobs | Baseline TBD | Proposed |
| First-time fix rate | Jobs resolved on first visit % | > 80% | Proposed |
| Mean time to arrive (MTTA) | Job assignment → vehicle on-site | < 45 min | Proposed |
| Mean time to repair (MTTR) | Job creation → job closed | < 2 hours | Proposed |
| SLA compliance rate | % jobs closed within SLA window | > 95% | Proposed |
> Note: MTTA and MTTR require job timestamps from the telco's ticketing system. Integration point TBD.
### Domain 3 — Driver Behaviour & Safety
Measures driving quality. Feeds into insurance, safety, and coaching programmes.
| KPI | Definition | Target | Status |
|---|---|---|---|
| Speeding events per 100 km | GPS fixes > 80 km/h / total km × 100 | < 0.5 | Proposed |
| Harsh driving index | Speed delta > 30 km/h in < 60s per 100 km | < 0.5 | Proposed |
| After-hours movement | Trips starting before 06:00 or after 20:00 EAT | 0 | Proposed |
| Late starts | First ignition after 07:45 EAT | < 2/month | Proposed |
| Early knock-off | Last trip ended before 17:00 EAT | < 2/month | Proposed |
### Domain 4 — Route & Dispatch Efficiency
Measures how well vehicles are matched to jobs geographically.
| KPI | Definition | Target | Status |
|---|---|---|---|
| Avg distance per job | Total km / jobs completed | Baseline TBD | Proposed |
| Nearest available vehicle ETA | PostGIS dispatch query | < 30 min | Proposed |
| Return-to-depot rate | % trips ending at primary depot | Baseline TBD | Proposed |
### Domain 5 — Asset Health & Cost
Measures maintenance burden and fuel efficiency.
| KPI | Definition | Target | Status |
|---|---|---|---|
| Estimated idle fuel cost (KES) | Idle hours × 0.8 L/h × KES 180/L | Minimise | Proposed |
| Vehicles approaching service interval | Odometer > threshold | 0 overdue | Proposed |
| Alarm rate per vehicle per week | COUNT(alarms) / 7 | < 2 | Proposed |
| GPS offline rate | Devices with fix age > 10 min / total | < 10% | Proposed |
### Shift Schedule Assumptions
Adjust these as confirmed with client:
| Parameter | Assumed Value |
|---|---|
| Shift start | 07:30 EAT |
| Late threshold | After 07:45 EAT |
| Shift end | 17:00 EAT |
| After-hours | Before 06:00 or after 20:00 EAT |
| Working days | MondaySaturday (confirm with client) |
| Shift length for utilisation | 10 hours |
---
## Integration Roadmap
| Integration | What it unlocks | Priority |
|---|---|---|
| Telco ticketing system (job timestamps) | MTTA, MTTR, first-time fix rate, jobs/day | HIGH |
| Tracksolid webhook registration | OBD, fuel, temperature, tamper events | HIGH |
| Driver assignment in Tracksolid Pro | All driver-attributed KPIs | HIGH |
| Geofence definition | Depot departure alerts, city zone coverage | MEDIUM |
| Fuel sensor webhook (`/pushoil`) | Actual fuel consumption vs estimated | MEDIUM |
| Temperature sensor (`/pushtem`) | Cold-chain compliance (if applicable) | LOW |

View file

@ -0,0 +1,148 @@
# Bug Reduction Quality Program — Design Spec
**Date:** 2026-04-12
**Project:** Fireside Communications Fleet Telemetry Ingestion Platform
**Repo:** `55_ts_coolify_gemini_prod`
**Status:** Approved — Implementation in Progress
## Problem
The platform has been running in production since late 2025 ingesting GPS and telemetry data from ~63 fleet vehicles. All bugs discovered to date (FIX-M11, FIX-M13, FIX-M16, FIX-E06, BUG-01 through BUG-05) were caught manually in production — via data inspection, Grafana anomalies, or customer reports. There are:
- Zero automated tests
- No linting or type-checking configuration
- No CI/CD pipeline
- No programmatic DB health monitoring
Any code change risks silent regressions. Any API field mapping change risks data going silently to NULL. Any schema change risks data corruption that may not be noticed for days.
## Goal
A layered quality program that:
1. **Finds existing bugs and data issues** without modifying source code
2. **Prevents future regressions** by locking in known-correct behaviour
3. **Monitors production DB health** on a daily schedule
## Constraints
- Existing source files MUST NOT be modified in Phase 1
- All additions are new files only (config, tests, CI workflows, audit scripts)
- Must run in CI (Forgejo Actions, self-hosted runner) and production (scheduled DB audit)
---
## Architecture: Three Parallel Workstreams
### Workstream 1 — Static Analysis
**Tools:** `ruff` (linting) + `mypy` (type checking)
**Trigger:** Every push / pull request via Forgejo Actions
**Risk:** Zero — read-only analysis of existing source
Surfaces:
- Undefined names, unused imports (ruff/F rules)
- Likely bugs: mutable defaults, string formatting issues (ruff/B rules)
- Type errors: untyped returns, Optional not handled (mypy)
- Modern Python upgrade opportunities (ruff/UP rules)
First run will be noisy — output becomes the bug backlog.
### Workstream 2 — Test Suite
**Framework:** pytest + pytest-asyncio
**Trigger:** Every push / pull request via Forgejo Actions
**Isolation:** Integration tests use a Docker TimescaleDB service container
**Unit tests** (pure Python, no DB):
- `test_clean_helpers.py``clean()`, `clean_num()`, `clean_ts()`, `is_valid_fix()` — these gate all data into the DB
- `test_api_signing.py``build_sign()` MD5 signature correctness
- `test_field_mapping.py` — locks in the three most bug-prone field mappings:
- FIX-E06: poll alarms use `alertTypeId`/`alarmTypeName`/`alertTime` (not `alarmType`)
- FIX-M16: trip distance arrives in metres, stored as km (÷ 1000)
- BUG-03: BCD timestamps `YYMMDDHHmmss` parsed correctly
**Integration tests** (real TimescaleDB):
- `test_movement_pipeline.py``poll_live_positions()` full round-trip, UPSERT idempotency
- `test_events_pipeline.py``poll_alarms()` field mapping, NULL alarm_type rejection
- `test_webhook_endpoints.py` — FastAPI endpoints with mock Jimi payloads, SAVEPOINT isolation
### Workstream 3 — DB Audit
**Runner:** `db_audit/run_audit.py` (Python)
**Trigger:** Daily at 06:00 EAT (03:00 UTC) via scheduled Forgejo workflow + `workflow_dispatch` for manual runs
**Output:** Rows written to `tracksolid.health_checks` table; queryable from Grafana
Six health checks:
| Check | File | Critical | Warning |
|---|---|---|---|
| Stale devices | `stale_devices.sql` | — | Any enabled device with no GPS fix >2h |
| NULL integrity | `null_integrity.sql` | Any NULL imei or gps_time in telemetry tables | — |
| Distance outliers | `distance_outliers.sql` | — | Any trip >500km or <0km in last 7 days |
| Duplicate positions | `duplicate_positions.sql` | Any (imei, gps_time) duplicate in position_history | — |
| Data gaps | `data_gaps.sql` | — | Any enabled device with no data in 7 days |
| Enum drift | `enum_drift.sql` | — | Unexpected value in source/severity columns |
Exit code: `1` on any `critical`, `0` on `ok`/`warning`.
---
## File Layout
```
55_ts_coolify_gemini_prod/
├── pyproject.toml ← ADD: ruff + mypy + pytest config + dev deps
├── .forgejo/
│ └── workflows/
│ ├── ci-static.yml
│ ├── ci-tests.yml
│ └── scheduled-audit.yml
├── tests/
│ ├── conftest.py
│ ├── fixtures/
│ │ ├── api_responses.py
│ │ └── schema.sql
│ ├── unit/
│ │ ├── test_clean_helpers.py
│ │ ├── test_api_signing.py
│ │ └── test_field_mapping.py
│ └── integration/
│ ├── test_movement_pipeline.py
│ ├── test_events_pipeline.py
│ └── test_webhook_endpoints.py
└── db_audit/
├── run_audit.py
├── checks/
│ ├── stale_devices.sql
│ ├── null_integrity.sql
│ ├── distance_outliers.sql
│ ├── duplicate_positions.sql
│ ├── data_gaps.sql
│ └── enum_drift.sql
└── schema/
└── health_checks_table.sql
```
---
## Forgejo Runner Setup
Before CI can run, a self-hosted runner must be registered on the Coolify server:
1. Forgejo → Settings → Actions → Runners → Register Runner → copy token
2. On Coolify server: `docker run -d --name forgejo-runner gitea/act_runner:latest register --instance https://repo.rahamafresh.com --token <TOKEN> --name coolify-runner --labels self-hosted`
3. Verify runner appears as active in Forgejo
Required Forgejo secrets:
- `DATABASE_URL` — production DB connection string (for scheduled audit)
- `TEST_DATABASE_URL` — set automatically by CI service container
---
## Verification
| Workstream | Pass Criteria |
|---|---|
| Static Analysis | Push triggers CI-static; ruff + mypy produce output report; job exits non-zero on violations |
| Test Suite | Push triggers CI-tests; all unit tests pass; integration tests pass against service container DB |
| DB Audit | Manual run populates `health_checks` table; findings match known issues (44 silent devices, etc.); scheduled run fires at 06:00 EAT |

View file

@ -52,48 +52,54 @@ def poll_alarms():
start_ts = end_ts - timedelta(minutes=30) # Look back 30m to ensure coverage
inserted = 0
for i in range(0, len(imeis), 50):
batch = imeis[i:i+50]
resp = api_post("jimi.device.alarm.list", {
"imeis": ",".join(batch),
"begin_time": start_ts.strftime("%Y-%m-%d %H:%M:%S"),
"end_time": end_ts.strftime("%Y-%m-%d %H:%M:%S"),
"page_size": 100
}, token)
with get_conn() as conn:
with conn.cursor() as cur:
for i in range(0, len(imeis), 50):
batch = imeis[i:i+50]
resp = api_post("jimi.device.alarm.list", {
"imeis": ",".join(batch),
"begin_time": start_ts.strftime("%Y-%m-%d %H:%M:%S"),
"end_time": end_ts.strftime("%Y-%m-%d %H:%M:%S"),
"page_size": 100
}, token)
alarms = resp.get("result") or []
if not alarms: continue
alarms = resp.get("result") or []
if not alarms: continue
with get_conn() as conn:
with conn.cursor() as cur:
for a in alarms:
lat, lng = clean_num(a.get("lat")), clean_num(a.get("lng"))
# [FIX-E06] Poll response uses alertTypeId/alarmTypeName/alertTime,
# not alarmType/alarmName/alarmTime (those are webhook push field names).
alarm_type = clean(a.get("alertTypeId"))
alarm_name = clean(a.get("alarmTypeName"))
alarm_time = clean_ts(a.get("alertTime"))
try:
cur.execute("SAVEPOINT sp")
lat, lng = clean_num(a.get("lat")), clean_num(a.get("lng"))
# [FIX-E06] Poll response uses alertTypeId/alarmTypeName/alertTime,
# not alarmType/alarmName/alarmTime (those are webhook push field names).
alarm_type = clean(a.get("alertTypeId"))
alarm_name = clean(a.get("alarmTypeName"))
alarm_time = clean_ts(a.get("alertTime"))
cur.execute("""
INSERT INTO tracksolid.alarms (
imei, alarm_type, alarm_name, alarm_time, geom, lat, lng,
speed, acc_status, source, updated_at
) VALUES (
%s, %s, %s, %s,
CASE WHEN %s IS NOT NULL AND %s IS NOT NULL
THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326)
ELSE NULL END,
%s, %s, %s, %s, 'poll', NOW()
) ON CONFLICT (imei, alarm_type, alarm_time) DO NOTHING
""", (
a.get("imei"), alarm_type, alarm_name, alarm_time,
lng, lat, lng, lat, lat, lng,
clean_num(a.get("speed")), clean(a.get("accStatus"))
))
inserted += 1
log_ingestion(cur, "jimi.device.alarm.list", len(batch), 0, inserted, int((time.time()-t0)*1000), True)
conn.commit()
cur.execute("""
INSERT INTO tracksolid.alarms (
imei, alarm_type, alarm_name, alarm_time, geom, lat, lng,
speed, acc_status, source, updated_at
) VALUES (
%s, %s, %s, %s,
CASE WHEN %s IS NOT NULL AND %s IS NOT NULL
THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326)
ELSE NULL END,
%s, %s, %s, %s, 'poll', NOW()
) ON CONFLICT (imei, alarm_type, alarm_time) DO NOTHING
""", (
a.get("imei"), alarm_type, alarm_name, alarm_time,
lng, lat, lng, lat, lat, lng,
clean_num(a.get("speed")), clean(a.get("accStatus"))
))
cur.execute("RELEASE SAVEPOINT sp")
inserted += cur.rowcount
except Exception:
cur.execute("ROLLBACK TO SAVEPOINT sp")
log.warning("Failed to process alarm for %s", a.get("imei"), exc_info=True)
log_ingestion(cur, "jimi.device.alarm.list", len(imeis), 0, inserted,
int((time.time()-t0)*1000), True)
log.info("Alarms: %d new events inserted.", inserted)

View file

@ -34,8 +34,11 @@ REVISIONS (QA-Verified):
"""
import time
from concurrent.futures import ThreadPoolExecutor
import schedule
from datetime import datetime, timezone, timedelta
from psycopg2.extras import execute_values
from ts_shared_rev import (
TARGET_ACCOUNT,
@ -70,14 +73,24 @@ def sync_devices():
devices = resp.get("result") or []
upserted = 0
# Fetch per-device detail in parallel — previously an N+1 blocker where
# 80 devices × ~300 ms/call ≈ 24 s serial. 8 workers brings it to ~3 s.
# Gated at 8 to stay under API rate-limit (1006) headroom.
def _fetch_detail(imei: str) -> dict:
detail_resp = api_post("jimi.track.device.detail", {"imei": imei}, token)
return detail_resp.get("result") or {} if detail_resp.get("code") == 0 else {}
imeis = [d.get("imei") for d in devices if d.get("imei")]
with ThreadPoolExecutor(max_workers=8) as pool:
details = dict(zip(imeis, pool.map(_fetch_detail, imeis)))
with get_conn() as conn:
with conn.cursor() as cur:
for d in devices:
imei = d.get("imei")
if not imei: continue
detail_resp = api_post("jimi.track.device.detail", {"imei": imei}, token)
dtl = detail_resp.get("result") or {} if detail_resp.get("code") == 0 else {}
dtl = details.get(imei, {})
cur.execute("""
INSERT INTO tracksolid.devices (
@ -93,12 +106,33 @@ def sync_devices():
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW()
)
ON CONFLICT (imei) DO UPDATE SET
device_name = EXCLUDED.device_name,
vehicle_number = EXCLUDED.vehicle_number,
driver_name = EXCLUDED.driver_name,
enabled_flag = EXCLUDED.enabled_flag,
device_name = EXCLUDED.device_name,
mc_type = EXCLUDED.mc_type,
mc_type_use_scope = EXCLUDED.mc_type_use_scope,
vehicle_name = EXCLUDED.vehicle_name,
vehicle_number = EXCLUDED.vehicle_number,
vehicle_models = EXCLUDED.vehicle_models,
vehicle_icon = EXCLUDED.vehicle_icon,
vin = EXCLUDED.vin,
engine_number = EXCLUDED.engine_number,
vehicle_brand = EXCLUDED.vehicle_brand,
fuel_100km = EXCLUDED.fuel_100km,
driver_name = EXCLUDED.driver_name,
driver_phone = EXCLUDED.driver_phone,
sim = EXCLUDED.sim,
iccid = EXCLUDED.iccid,
imsi = EXCLUDED.imsi,
account = EXCLUDED.account,
customer_name = EXCLUDED.customer_name,
device_group_id = EXCLUDED.device_group_id,
device_group = EXCLUDED.device_group,
activation_time = EXCLUDED.activation_time,
expiration = EXCLUDED.expiration,
enabled_flag = EXCLUDED.enabled_flag,
status = EXCLUDED.status,
current_mileage_km = EXCLUDED.current_mileage_km,
last_synced_at = NOW(), updated_at = NOW()
last_synced_at = NOW(),
updated_at = NOW()
""", (
imei, clean(d.get("deviceName")), clean(d.get("mcType")), clean(d.get("mcTypeUseScope")),
clean(d.get("vehicleName")), clean(d.get("vehicleNumber")), clean(d.get("vehicleModels")), clean(d.get("vehicleIcon")),
@ -129,49 +163,64 @@ def poll_live_positions():
with get_conn() as conn:
with conn.cursor() as cur:
for p in positions:
imei, lat, lng = p.get("imei"), clean_num(p.get("lat")), clean_num(p.get("lng"))
if not imei or not is_valid_fix(lat, lng): continue
try:
cur.execute("SAVEPOINT sp")
imei, lat, lng = p.get("imei"), clean_num(p.get("lat")), clean_num(p.get("lng"))
if not imei or not is_valid_fix(lat, lng):
cur.execute("RELEASE SAVEPOINT sp")
continue
cur.execute("""
INSERT INTO tracksolid.live_positions (
imei, geom, lat, lng, pos_type, confidence, gps_time, hb_time,
speed, direction, acc_status, gps_signal, gps_num,
elec_quantity, power_value, battery_power_val, tracker_oil,
temperature, current_mileage, device_status, loc_desc, recorded_at
) VALUES (
%s, ST_SetSRID(ST_MakePoint(%s, %s), 4326), %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW()
)
ON CONFLICT (imei) DO UPDATE SET
geom=EXCLUDED.geom, lat=EXCLUDED.lat, lng=EXCLUDED.lng,
gps_time=EXCLUDED.gps_time, speed=EXCLUDED.speed, direction=EXCLUDED.direction,
acc_status=EXCLUDED.acc_status, current_mileage=EXCLUDED.current_mileage,
updated_at=NOW()
""", (
imei, lng, lat, lat, lng, clean(p.get("posType")), clean_int(p.get("confidence")),
clean_ts(p.get("gpsTime")), clean_ts(p.get("hbTime")), clean_num(p.get("speed")),
clean_num(p.get("direction")), clean(p.get("accStatus")), clean_int(p.get("gpsSignal")),
clean_int(p.get("gpsNum")), clean_num(p.get("electQuantity")), clean_num(p.get("powerValue")),
clean_num(p.get("batteryPowerVal")), clean(p.get("trackerOil")), clean_num(p.get("temperature")),
clean_num(p.get("currentMileage")), clean(p.get("status")), clean(p.get("locDesc"))
))
upserted += 1
gps_time = clean_ts(p.get("gpsTime"))
speed = clean_num(p.get("speed"))
direction = clean_num(p.get("direction"))
acc_status = clean(p.get("accStatus"))
gps_num = clean_int(p.get("gpsNum"))
current_mileage = clean_num(p.get("currentMileage"))
# History (Hypertable Source)
if clean_ts(p.get("gpsTime")):
cur.execute("""
INSERT INTO tracksolid.position_history (imei, gps_time, geom, lat, lng, speed, direction, acc_status, satellite, current_mileage)
VALUES (%s, %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326), %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (imei, gps_time) DO NOTHING
""", (imei, clean_ts(p.get("gpsTime")), lng, lat, lat, lng, clean_num(p.get("speed")), clean_num(p.get("direction")), clean(p.get("accStatus")), clean_int(p.get("gpsNum")), clean_num(p.get("currentMileage"))))
inserted += 1
INSERT INTO tracksolid.live_positions (
imei, geom, lat, lng, pos_type, confidence, gps_time, hb_time,
speed, direction, acc_status, gps_signal, gps_num,
elec_quantity, power_value, battery_power_val, tracker_oil,
temperature, current_mileage, device_status, loc_desc, recorded_at
) VALUES (
%s, ST_SetSRID(ST_MakePoint(%s, %s), 4326), %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW()
)
ON CONFLICT (imei) DO UPDATE SET
geom=EXCLUDED.geom, lat=EXCLUDED.lat, lng=EXCLUDED.lng,
gps_time=EXCLUDED.gps_time, speed=EXCLUDED.speed, direction=EXCLUDED.direction,
acc_status=EXCLUDED.acc_status, current_mileage=EXCLUDED.current_mileage,
updated_at=NOW()
""", (
imei, lng, lat, lat, lng, clean(p.get("posType")), clean_int(p.get("confidence")),
gps_time, clean_ts(p.get("hbTime")), speed,
direction, acc_status, clean_int(p.get("gpsSignal")),
gps_num, clean_num(p.get("electQuantity")), clean_num(p.get("powerValue")),
clean_num(p.get("batteryPowerVal")), clean(p.get("trackerOil")), clean_num(p.get("temperature")),
current_mileage, clean(p.get("status")), clean(p.get("locDesc"))
))
upserted += cur.rowcount
# History (Hypertable Source)
if gps_time:
cur.execute("""
INSERT INTO tracksolid.position_history (imei, gps_time, geom, lat, lng, speed, direction, acc_status, satellite, current_mileage)
VALUES (%s, %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326), %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (imei, gps_time) DO NOTHING
""", (imei, gps_time, lng, lat, lat, lng, speed, direction, acc_status, gps_num, current_mileage))
inserted += cur.rowcount
cur.execute("RELEASE SAVEPOINT sp")
except Exception:
cur.execute("ROLLBACK TO SAVEPOINT sp")
log.warning("Failed to process live position for %s", p.get("imei"), exc_info=True)
log_ingestion(cur, "jimi.user.device.location.list", len(positions), upserted, inserted, int((time.time()-t0)*1000), True)
conn.commit()
# ── 3. Trip Reports (Every 15m) ───────────────────────────────────────────────
def poll_trips():
t0 = time.time()
token, imeis = get_token(), get_active_imeis()
if not token or not imeis: return
@ -179,38 +228,49 @@ def poll_trips():
start_ts = end_ts - timedelta(hours=1)
inserted = 0
for i in range(0, len(imeis), 50):
batch = imeis[i:i+50]
resp = api_post("jimi.device.track.mileage", {
"imeis": ",".join(batch),
"begin_time": start_ts.strftime("%Y-%m-%d %H:%M:%S"),
"end_time": end_ts.strftime("%Y-%m-%d %H:%M:%S")
}, token)
with get_conn() as conn:
with conn.cursor() as cur:
for i in range(0, len(imeis), 50):
batch = imeis[i:i+50]
resp = api_post("jimi.device.track.mileage", {
"imeis": ",".join(batch),
"begin_time": start_ts.strftime("%Y-%m-%d %H:%M:%S"),
"end_time": end_ts.strftime("%Y-%m-%d %H:%M:%S")
}, token)
trips = resp.get("result") or []
with get_conn() as conn:
with conn.cursor() as cur:
trips = resp.get("result") or []
for t in trips:
# [FIX-M11] API returns distance in km. Store directly as distance_km.
# Previous code multiplied by 1000 (→ mm), which was wrong.
dist_km = clean_num(t.get("distance"))
cur.execute("""
INSERT INTO tracksolid.trips (
imei, start_time, end_time, distance_km,
avg_speed_kmh, max_speed_kmh, driving_time_s, source
) VALUES (%s, %s, %s, %s, %s, %s, %s, 'poll')
ON CONFLICT (imei, start_time) DO UPDATE SET
end_time = EXCLUDED.end_time,
distance_km = EXCLUDED.distance_km,
max_speed_kmh = COALESCE(EXCLUDED.max_speed_kmh, tracksolid.trips.max_speed_kmh),
driving_time_s = COALESCE(EXCLUDED.driving_time_s, tracksolid.trips.driving_time_s)
""", (
t.get("imei"), clean_ts(t.get("startTime")), clean_ts(t.get("endTime")),
dist_km, clean_num(t.get("avgSpeed")),
clean_num(t.get("maxSpeed")), clean_int(t.get("runTimeSecond"))
))
inserted += 1
conn.commit()
try:
cur.execute("SAVEPOINT sp")
# [FIX-M16] API returns distance in METRES despite documentation saying km.
# Confirmed via: avgSpeed(km/h) × runTimeSecond / 3600 == distance/1000.
# startMileage/endMileage are cumulative odometer in metres (same unit).
# Divide by 1000 to store as distance_km.
raw_dist = clean_num(t.get("distance"))
dist_km = round(raw_dist / 1000.0, 4) if raw_dist is not None else None
cur.execute("""
INSERT INTO tracksolid.trips (
imei, start_time, end_time, distance_km,
avg_speed_kmh, max_speed_kmh, driving_time_s, source
) VALUES (%s, %s, %s, %s, %s, %s, %s, 'poll')
ON CONFLICT (imei, start_time) DO UPDATE SET
end_time = EXCLUDED.end_time,
distance_km = EXCLUDED.distance_km,
max_speed_kmh = COALESCE(EXCLUDED.max_speed_kmh, tracksolid.trips.max_speed_kmh),
driving_time_s = COALESCE(EXCLUDED.driving_time_s, tracksolid.trips.driving_time_s)
""", (
t.get("imei"), clean_ts(t.get("startTime")), clean_ts(t.get("endTime")),
dist_km, clean_num(t.get("avgSpeed")),
clean_num(t.get("maxSpeed")), clean_int(t.get("runTimeSecond"))
))
cur.execute("RELEASE SAVEPOINT sp")
inserted += cur.rowcount
except Exception:
cur.execute("ROLLBACK TO SAVEPOINT sp")
log.warning("Failed to process trip for %s", t.get("imei"), exc_info=True)
log_ingestion(cur, "jimi.device.track.mileage", len(imeis), 0, inserted,
int((time.time() - t0) * 1000), True)
log.info("Trips: %d records processed.", inserted)
# ── 4. Parking Events (Every 15m) ─────────────────────────────────────────────
@ -224,47 +284,55 @@ def poll_parking():
start_ts = end_ts - timedelta(hours=1)
inserted = 0
for i in range(0, len(imeis), 50):
batch = imeis[i:i+50]
# [FIX-M13] Added account + acc_type=0 (all stop types). Without these
# the API returns empty results even when parking events exist.
resp = api_post("jimi.open.platform.report.parking", {
"account": TARGET_ACCOUNT,
"imeis": ",".join(batch),
"begin_time": start_ts.strftime("%Y-%m-%d %H:%M:%S"),
"end_time": end_ts.strftime("%Y-%m-%d %H:%M:%S"),
"acc_type": 0,
}, token)
with get_conn() as conn:
with conn.cursor() as cur:
for i in range(0, len(imeis), 50):
batch = imeis[i:i+50]
# [FIX-M13] Added account + acc_type=0 (all stop types). Without these
# the API returns empty results even when parking events exist.
resp = api_post("jimi.open.platform.report.parking", {
"account": TARGET_ACCOUNT,
"imeis": ",".join(batch),
"begin_time": start_ts.strftime("%Y-%m-%d %H:%M:%S"),
"end_time": end_ts.strftime("%Y-%m-%d %H:%M:%S"),
"acc_type": 0,
}, token)
events = resp.get("result") or []
with get_conn() as conn:
with conn.cursor() as cur:
events = resp.get("result") or []
for p in events:
imei = p.get("imei")
start_time = clean_ts(p.get("startTime"))
if not imei or not start_time:
continue
lat, lng = clean_num(p.get("lat")), clean_num(p.get("lng"))
cur.execute("""
INSERT INTO tracksolid.parking_events (
imei, event_type, start_time, end_time,
duration_seconds, geom, address
) VALUES (
%s, 'parking', %s, %s, %s,
CASE WHEN %s IS NOT NULL AND %s IS NOT NULL
THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326)
ELSE NULL END,
%s
) ON CONFLICT (imei, start_time, event_type) DO NOTHING
""", (
imei, start_time, clean_ts(p.get("endTime")),
clean_int(p.get("durSecond")), # [FIX-M13] API returns durSecond, not seconds
lng, lat, lng, lat,
clean(p.get("address"))
))
inserted += 1
log_ingestion(cur, "jimi.open.platform.report.parking", len(batch), 0, inserted,
int((time.time() - t0) * 1000), True)
try:
cur.execute("SAVEPOINT sp")
imei = p.get("imei")
start_time = clean_ts(p.get("startTime"))
if not imei or not start_time:
cur.execute("RELEASE SAVEPOINT sp")
continue
lat, lng = clean_num(p.get("lat")), clean_num(p.get("lng"))
cur.execute("""
INSERT INTO tracksolid.parking_events (
imei, event_type, start_time, end_time,
duration_seconds, geom, address
) VALUES (
%s, 'parking', %s, %s, %s,
CASE WHEN %s IS NOT NULL AND %s IS NOT NULL
THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326)
ELSE NULL END,
%s
) ON CONFLICT (imei, start_time, event_type) DO NOTHING
""", (
imei, start_time, clean_ts(p.get("endTime")),
clean_int(p.get("durSecond")), # [FIX-M13] API returns durSecond, not seconds
lng, lat, lng, lat,
clean(p.get("address"))
))
cur.execute("RELEASE SAVEPOINT sp")
inserted += cur.rowcount
except Exception:
cur.execute("ROLLBACK TO SAVEPOINT sp")
log.warning("Failed to process parking for %s", p.get("imei"), exc_info=True)
log_ingestion(cur, "jimi.open.platform.report.parking", len(imeis), 0, inserted,
int((time.time() - t0) * 1000), True)
log.info("Parking: %d events processed.", inserted)
# ── 5. High-Resolution GPS Trail (Every 30m) — POLL-01 ───────────────────────
@ -292,58 +360,73 @@ def poll_track_list():
end_ts = datetime.now(timezone.utc)
start_ts = end_ts - timedelta(minutes=35) # 5-min overlap avoids boundary gaps
begin_str = start_ts.strftime("%Y-%m-%d %H:%M:%S")
end_str = end_ts.strftime("%Y-%m-%d %H:%M:%S")
# Phase 1: fetch waypoints from API without holding a DB connection.
# jimi.device.track.list is per-IMEI; parallelise at 4 workers to speed
# up the 30 min sweep without tripping the 1006 rate limit.
def _fetch(imei: str):
resp = api_post("jimi.device.track.list", {
"imei": imei,
"begin_time": begin_str,
"end_time": end_str,
"map_type": "GOOGLE",
}, token)
return imei, resp.get("result") or []
with ThreadPoolExecutor(max_workers=4) as pool:
fetched = list(pool.map(_fetch, imeis))
# Phase 2: write rows in one DB transaction.
total_inserted = 0
devices_with_data = 0
rows = []
for imei, waypoints in fetched:
device_rows = 0
for wp in waypoints:
lat = clean_num(wp.get("lat"))
lng = clean_num(wp.get("lng"))
gps_time = clean_ts(wp.get("gpsTime"))
if not is_valid_fix(lat, lng) or not gps_time:
continue
rows.append((
imei, gps_time,
lng, lat, # ST_MakePoint(lng, lat)
lat, lng, # lat, lng columns
clean_num(wp.get("gpsSpeed")),
clean_num(wp.get("direction")),
clean(wp.get("accStatus")),
))
device_rows += 1
if device_rows:
devices_with_data += 1
with get_conn() as conn:
with conn.cursor() as cur:
for imei in imeis:
resp = api_post("jimi.device.track.list", {
"imei": imei,
"begin_time": start_ts.strftime("%Y-%m-%d %H:%M:%S"),
"end_time": end_ts.strftime("%Y-%m-%d %H:%M:%S"),
"map_type": "GOOGLE",
}, token)
waypoints = resp.get("result") or []
if not waypoints:
continue
inserted = 0
for wp in waypoints:
lat = clean_num(wp.get("lat"))
lng = clean_num(wp.get("lng"))
gps_time = clean_ts(wp.get("gpsTime"))
if not is_valid_fix(lat, lng) or not gps_time:
continue
cur.execute("""
INSERT INTO tracksolid.position_history (
imei, gps_time, geom, lat, lng,
speed, direction, acc_status, source
) VALUES (
%s, %s,
ST_SetSRID(ST_MakePoint(%s, %s), 4326),
%s, %s, %s, %s, %s, 'track_list'
)
ON CONFLICT (imei, gps_time) DO NOTHING
""", (
imei, gps_time,
lng, lat, # ST_MakePoint(lng, lat)
lat, lng, # lat, lng columns
clean_num(wp.get("gpsSpeed")),
clean_num(wp.get("direction")),
clean(wp.get("accStatus")),
))
inserted += 1
if inserted:
total_inserted += inserted
devices_with_data += 1
log_ingestion(cur, "jimi.device.track.list", len(imeis),
0, total_inserted, int((time.time() - t0) * 1000), True)
conn.commit()
if rows:
with get_conn() as conn:
with conn.cursor() as cur:
execute_values(
cur,
"""
INSERT INTO tracksolid.position_history (
imei, gps_time, geom, lat, lng,
speed, direction, acc_status, source
) VALUES %s
ON CONFLICT (imei, gps_time) DO NOTHING
""",
rows,
template="(%s, %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326),"
" %s, %s, %s, %s, %s, 'track_list')",
page_size=500,
)
total_inserted = cur.rowcount
log_ingestion(cur, "jimi.device.track.list", len(imeis),
0, total_inserted, int((time.time() - t0) * 1000), True)
else:
with get_conn() as conn:
with conn.cursor() as cur:
log_ingestion(cur, "jimi.device.track.list", len(imeis),
0, 0, int((time.time() - t0) * 1000), True)
log.info("Track list: %d waypoints inserted across %d/%d devices.",
total_inserted, devices_with_data, len(imeis))

View file

@ -27,4 +27,24 @@ build-backend = "hatchling.build"
managed = true
[tool.uv.sources]
# Optional: If you ever have custom local modules or git-based private libs
# Optional: If you ever have custom local modules or git-based private libs
[project.optional-dependencies]
dev = [
"ruff>=0.4",
"mypy>=1.10",
"pytest>=8",
"pytest-asyncio>=0.23",
"httpx>=0.27",
]
[tool.ruff]
target-version = "py312"
line-length = 100
select = ["E", "W", "F", "B", "UP", "SIM"]
[tool.mypy]
python_version = "3.12"
warn_return_any = true
warn_unused_ignores = true
ignore_missing_imports = true

214
sync_driver_audit.py Normal file
View file

@ -0,0 +1,214 @@
"""
sync_driver_audit.py Fireside Communications · Driver & IMEI Audit Sync
One-shot script: fetches ALL devices from Tracksolid API, compares driver
and IMEI details against the DB, reports gaps, and populates missing data.
Run inside the container:
docker exec -it <ingest_movement_container> python sync_driver_audit.py
Or via Coolify terminal with env vars loaded.
"""
import time
from concurrent.futures import ThreadPoolExecutor
from ts_shared_rev import (
TARGET_ACCOUNT,
api_post,
get_conn,
get_token,
clean,
clean_num,
clean_int,
clean_ts,
get_logger,
)
log = get_logger("driver_audit")
def run_audit():
log.info("=== Driver & IMEI Audit Sync ===")
t0 = time.time()
token = get_token()
if not token:
log.error("Could not obtain API token. Check credentials.")
return
# 1. Fetch all devices from API
resp = api_post("jimi.user.device.list", {"target": TARGET_ACCOUNT}, token)
if resp.get("code") != 0:
log.error("API error: %s", resp)
return
api_devices = resp.get("result") or []
log.info("API returned %d devices.", len(api_devices))
# 2. Fetch current DB state
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT imei, device_name, driver_name, driver_phone, sim, status
FROM tracksolid.devices
ORDER BY imei
""")
db_rows = {row[0]: {
"device_name": row[1],
"driver_name": row[2],
"driver_phone": row[3],
"sim": row[4],
"status": row[5],
} for row in cur.fetchall()}
log.info("DB has %d devices registered.", len(db_rows))
# 3. Compare and report gaps
api_imeis = set()
missing_from_db = []
driver_gaps = []
driver_phone_gaps = []
for d in api_devices:
imei = d.get("imei")
if not imei:
continue
api_imeis.add(imei)
if imei not in db_rows:
missing_from_db.append(imei)
else:
db = db_rows[imei]
if not db["driver_name"] and clean(d.get("driverName")):
driver_gaps.append((imei, clean(d.get("driverName"))))
if not db["driver_phone"] and clean(d.get("driverPhone")):
driver_phone_gaps.append((imei, clean(d.get("driverPhone"))))
orphaned_in_db = set(db_rows.keys()) - api_imeis
# 4. Print gap report
print("\n" + "="*60)
print("AUDIT REPORT")
print("="*60)
print(f" API devices : {len(api_imeis)}")
print(f" DB devices : {len(db_rows)}")
print(f" New (API only): {len(missing_from_db)}")
print(f" Orphaned (DB) : {len(orphaned_in_db)}")
print(f" Missing driver_name (API has, DB null): {len(driver_gaps)}")
print(f" Missing driver_phone (API has, DB null): {len(driver_phone_gaps)}")
if missing_from_db:
print(f"\nIMEIs NOT in DB ({len(missing_from_db)}):")
for imei in missing_from_db:
print(f" {imei}")
if driver_gaps:
print(f"\nDevices missing driver_name in DB ({len(driver_gaps)}):")
for imei, name in driver_gaps:
print(f" {imei}'{name}'")
if driver_phone_gaps:
print(f"\nDevices missing driver_phone in DB ({len(driver_phone_gaps)}):")
for imei, phone in driver_phone_gaps:
print(f" {imei}'{phone}'")
if orphaned_in_db:
print(f"\nIMEIs in DB but NOT in API (orphaned/deactivated) ({len(orphaned_in_db)}):")
for imei in sorted(orphaned_in_db):
print(f" {imei}")
print("="*60)
# 5. Upsert ALL devices with full field sync (including driver info)
log.info("Starting full upsert of %d devices...", len(api_devices))
upserted = 0
# Parallelize the per-device detail lookups (see ingest_movement.sync_devices).
def _fetch_detail(imei: str) -> dict:
detail_resp = api_post("jimi.track.device.detail", {"imei": imei}, token)
return detail_resp.get("result") or {} if detail_resp.get("code") == 0 else {}
imeis_to_fetch = [d.get("imei") for d in api_devices if d.get("imei")]
with ThreadPoolExecutor(max_workers=8) as pool:
details = dict(zip(imeis_to_fetch, pool.map(_fetch_detail, imeis_to_fetch)))
with get_conn() as conn:
with conn.cursor() as cur:
for d in api_devices:
imei = d.get("imei")
if not imei:
continue
dtl = details.get(imei, {})
cur.execute("""
INSERT INTO tracksolid.devices (
imei, device_name, mc_type, mc_type_use_scope,
vehicle_name, vehicle_number, vehicle_models, vehicle_icon,
vin, engine_number, vehicle_brand, fuel_100km,
driver_name, driver_phone, sim, iccid, imsi,
account, customer_name, device_group_id, device_group,
activation_time, expiration, enabled_flag, status,
current_mileage_km, last_synced_at
) VALUES (
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW()
)
ON CONFLICT (imei) DO UPDATE SET
device_name = EXCLUDED.device_name,
mc_type = EXCLUDED.mc_type,
mc_type_use_scope = EXCLUDED.mc_type_use_scope,
vehicle_name = EXCLUDED.vehicle_name,
vehicle_number = EXCLUDED.vehicle_number,
vehicle_models = EXCLUDED.vehicle_models,
vehicle_icon = EXCLUDED.vehicle_icon,
vin = EXCLUDED.vin,
engine_number = EXCLUDED.engine_number,
vehicle_brand = EXCLUDED.vehicle_brand,
fuel_100km = EXCLUDED.fuel_100km,
driver_name = EXCLUDED.driver_name,
driver_phone = EXCLUDED.driver_phone,
sim = EXCLUDED.sim,
iccid = EXCLUDED.iccid,
imsi = EXCLUDED.imsi,
account = EXCLUDED.account,
customer_name = EXCLUDED.customer_name,
device_group_id = EXCLUDED.device_group_id,
device_group = EXCLUDED.device_group,
activation_time = EXCLUDED.activation_time,
expiration = EXCLUDED.expiration,
enabled_flag = EXCLUDED.enabled_flag,
status = EXCLUDED.status,
current_mileage_km = EXCLUDED.current_mileage_km,
last_synced_at = NOW(),
updated_at = NOW()
""", (
imei,
clean(d.get("deviceName")), clean(d.get("mcType")),
clean(d.get("mcTypeUseScope")), clean(d.get("vehicleName")),
clean(d.get("vehicleNumber")), clean(d.get("vehicleModels")),
clean(d.get("vehicleIcon")),
clean(dtl.get("vin")), clean(dtl.get("engineNumber")),
clean(dtl.get("vehicleBrand")), clean_num(dtl.get("fuel_100km")),
clean(d.get("driverName")), clean(d.get("driverPhone")),
clean(d.get("sim")), clean(dtl.get("iccid")),
clean(dtl.get("imsi")),
clean(dtl.get("account")), clean(dtl.get("customerName")),
clean(d.get("deviceGroupId")), clean(d.get("deviceGroup")),
clean_ts(d.get("activationTime")), clean_ts(d.get("expiration")),
clean_int(d.get("enabledFlag", 1)),
clean(dtl.get("status", "active")),
clean_num(dtl.get("currentMileage")),
))
upserted += 1
conn.commit()
elapsed = int((time.time() - t0) * 1000)
log.info("Done. Upserted %d devices in %dms.", upserted, elapsed)
print(f"\nSync complete: {upserted} devices upserted in {elapsed}ms.")
if __name__ == "__main__":
run_audit()

0
tests/__init__.py Normal file
View file

0
tests/fixtures/__init__.py vendored Normal file
View file

109
tests/fixtures/api_responses.py vendored Normal file
View file

@ -0,0 +1,109 @@
"""Mock Tracksolid Pro API responses for testing."""
# jimi.user.device.location.list response
LIVE_POSITIONS_RESPONSE = {
"code": 0,
"result": [
{
"imei": "123456789012345",
"lat": -1.2921,
"lng": 36.8219,
"speed": 45.5,
"direction": 180,
"gpsTime": "2024-04-12 08:00:00",
"hbTime": "2024-04-12 08:00:05",
"accStatus": "1",
"gpsSignal": 4,
"gpsNum": 8,
"currentMileage": 1234.5,
"posType": "GPS",
"confidence": 95,
"status": "1",
"locDesc": "Nairobi CBD",
},
{
# Zero Island — should be filtered by is_valid_fix
"imei": "999999999999999",
"lat": 0.0,
"lng": 0.0,
"speed": 0,
"gpsTime": "2024-04-12 08:00:00",
},
]
}
# jimi.device.track.mileage response (distance in METRES — FIX-M16)
TRIPS_RESPONSE = {
"code": 0,
"result": [
{
"imei": "123456789012345",
"startTime": "2024-04-12 07:00:00",
"endTime": "2024-04-12 08:00:00",
"distance": 15000, # 15000 METRES = 15.0 km
"avgSpeed": 15.0,
"maxSpeed": 60.0,
"runTimeSecond": 3600,
}
]
}
# jimi.device.alarm.list response (FIX-E06: uses alertTypeId, not alarmType)
ALARMS_RESPONSE = {
"code": 0,
"result": [
{
"imei": "123456789012345",
"alertTypeId": "4", # poll field name
"alarmTypeName": "Speeding", # poll field name
"alertTime": "2024-04-12 07:30:00", # poll field name
"lat": -1.2921,
"lng": 36.8219,
"speed": 95.0,
"accStatus": "1",
}
]
}
# Webhook /pushalarm payload (uses alarmType, not alertTypeId)
WEBHOOK_ALARM_PAYLOAD = {
"deviceImei": "123456789012345",
"alarmType": "4",
"alarmName": "Speeding",
"gateTime": "2024-04-12 07:30:00",
"lat": -1.2921,
"lng": 36.8219,
"speed": 95.0,
}
# Webhook /pushtripreport payload (BCD timestamp — BUG-03)
WEBHOOK_TRIP_BCD_PAYLOAD = {
"deviceImei": "123456789012345",
"beginTime": "220415103000", # BCD YYMMDDHHmmss = 2022-04-15 10:30:00
"endTime": "220415113000", # BCD YYMMDDHHmmss = 2022-04-15 11:30:00
"miles": 12.5,
"beginLat": -1.2921,
"beginLng": 36.8219,
"endLat": -1.3000,
"endLng": 36.8300,
}
WEBHOOK_TRIP_ISO_PAYLOAD = {
"deviceImei": "123456789012345",
"beginTime": "2024-04-12 07:00:00",
"endTime": "2024-04-12 08:00:00",
"miles": 15.5,
}
# Webhook /pushobd payload
WEBHOOK_OBD_PAYLOAD = {
"deviceImei": "123456789012345",
"obdJson": '{"event_time": 1712908800, "AccState": 1, "statusFlags": 0, "lat": -1.2921, "lng": 36.8219}',
}
# Alarm with NULL alarm_type (BUG-02 guard)
WEBHOOK_ALARM_NULL_TYPE = {
"deviceImei": "123456789012345",
"alarmType": None,
"gateTime": "2024-04-12 07:30:00",
}

View file

View file

@ -0,0 +1,133 @@
"""Integration tests for FastAPI webhook endpoints."""
import sys
import os
import json
import pytest
from unittest.mock import MagicMock, patch, call
from contextlib import contextmanager
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
os.environ.setdefault("TRACKSOLID_APP_KEY", "test_key")
os.environ.setdefault("TRACKSOLID_APP_SECRET", "test_secret")
os.environ.setdefault("TRACKSOLID_USER_ID", "test_user")
os.environ.setdefault("TRACKSOLID_PWD_MD5", "test_md5")
os.environ.setdefault("DATABASE_URL", "postgresql://test:test@localhost:5432/test")
os.environ.setdefault("JIMI_WEBHOOK_TOKEN", "")
from fastapi.testclient import TestClient
import webhook_receiver_rev
from tests.fixtures.api_responses import (
WEBHOOK_ALARM_PAYLOAD,
WEBHOOK_ALARM_NULL_TYPE,
WEBHOOK_TRIP_BCD_PAYLOAD,
WEBHOOK_TRIP_ISO_PAYLOAD,
WEBHOOK_OBD_PAYLOAD,
)
def make_mock_conn():
"""Create a mock DB connection with cursor support."""
mock_cur = MagicMock()
mock_conn = MagicMock()
mock_conn.cursor.return_value.__enter__ = lambda s: mock_cur
mock_conn.cursor.return_value.__exit__ = MagicMock(return_value=False)
return mock_conn, mock_cur
@contextmanager
def mock_get_conn_ctx(mock_conn):
yield mock_conn
@pytest.fixture
def client():
return TestClient(webhook_receiver_rev.app, raise_server_exceptions=True)
@pytest.fixture
def mock_db():
mock_conn, mock_cur = make_mock_conn()
with patch("webhook_receiver_rev.get_conn") as mock_get_conn:
mock_get_conn.return_value = mock_get_conn_ctx(mock_conn)
yield mock_conn, mock_cur
class TestHealth:
def test_health_returns_ok(self, client):
response = client.get("/health")
assert response.status_code == 200
assert response.json() == {"status": "ok"}
class TestPushAlarm:
def test_valid_alarm_accepted(self, client, mock_db):
mock_conn, mock_cur = mock_db
data_list = json.dumps([WEBHOOK_ALARM_PAYLOAD])
response = client.post("/pushalarm", data={"token": "", "data_list": data_list})
assert response.status_code == 200
assert response.json()["code"] == 0
def test_null_alarm_type_skipped(self, client, mock_db):
"""BUG-02 guard: NULL alarm_type must be rejected, not inserted."""
mock_conn, mock_cur = mock_db
data_list = json.dumps([WEBHOOK_ALARM_NULL_TYPE])
response = client.post("/pushalarm", data={"token": "", "data_list": data_list})
assert response.status_code == 200
# Verify no data INSERT was executed. log_ingestion always writes one
# row to tracksolid.ingestion_log — exclude it from the assertion.
data_inserts = [
c for c in mock_cur.execute.call_args_list
if "INSERT" in str(c) and "ingestion_log" not in str(c)
]
assert len(data_inserts) == 0, "NULL alarm_type must not be inserted"
def test_empty_data_list_ok(self, client):
response = client.post("/pushalarm", data={"token": "", "data_list": ""})
assert response.status_code == 200
def test_batch_with_bad_item_processes_rest(self, client, mock_db):
"""BUG-04: One bad item must not abort the entire batch."""
mock_conn, mock_cur = mock_db
# One valid, one missing alarm_type (will be skipped, not crash)
items = [WEBHOOK_ALARM_PAYLOAD, WEBHOOK_ALARM_NULL_TYPE]
data_list = json.dumps(items)
response = client.post("/pushalarm", data={"token": "", "data_list": data_list})
assert response.status_code == 200
assert response.json()["code"] == 0
class TestPushTripReport:
def test_bcd_timestamp_parsed(self, client, mock_db):
"""BUG-03: BCD timestamp 220415103000 must be parsed correctly."""
mock_conn, mock_cur = mock_db
data_list = json.dumps([WEBHOOK_TRIP_BCD_PAYLOAD])
response = client.post("/pushtripreport", data={"token": "", "data_list": data_list})
assert response.status_code == 200
assert response.json()["code"] == 0
# Verify an INSERT was attempted
insert_calls = [c for c in mock_cur.execute.call_args_list
if "INSERT" in str(c)]
assert len(insert_calls) > 0, "Trip with BCD timestamp must trigger INSERT"
def test_iso_timestamp_accepted(self, client, mock_db):
mock_conn, mock_cur = mock_db
data_list = json.dumps([WEBHOOK_TRIP_ISO_PAYLOAD])
response = client.post("/pushtripreport", data={"token": "", "data_list": data_list})
assert response.status_code == 200
def test_missing_imei_skipped(self, client, mock_db):
mock_conn, mock_cur = mock_db
bad_trip = {"beginTime": "2024-04-12 07:00:00", "miles": 10.0}
data_list = json.dumps([bad_trip])
response = client.post("/pushtripreport", data={"token": "", "data_list": data_list})
assert response.status_code == 200
class TestPushObd:
def test_valid_obd_accepted(self, client, mock_db):
mock_conn, mock_cur = mock_db
data_list = json.dumps([WEBHOOK_OBD_PAYLOAD])
response = client.post("/pushobd", data={"token": "", "data_list": data_list})
assert response.status_code == 200
assert response.json()["code"] == 0

0
tests/unit/__init__.py Normal file
View file

View file

@ -0,0 +1,60 @@
"""Unit tests for Tracksolid API MD5 signature generation."""
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
os.environ.setdefault("TRACKSOLID_APP_KEY", "test_key")
os.environ.setdefault("TRACKSOLID_APP_SECRET", "test_secret")
os.environ.setdefault("TRACKSOLID_USER_ID", "test_user")
os.environ.setdefault("TRACKSOLID_PWD_MD5", "test_md5")
os.environ.setdefault("DATABASE_URL", "postgresql://test:test@localhost:5432/test")
from ts_shared_rev import build_sign
class TestBuildSign:
def test_basic_signature(self):
"""Known input + secret produces expected MD5."""
params = {"method": "jimi.test", "app_key": "mykey", "v": "1.0"}
secret = "mysecret"
result = build_sign(params, secret)
# Verify it's a 32-char uppercase hex string
assert len(result) == 32
assert result == result.upper()
assert all(c in "0123456789ABCDEF" for c in result)
def test_sign_key_excluded(self):
"""The 'sign' key itself must be excluded from signing."""
params_with = {"method": "test", "sign": "old_sign", "v": "1.0"}
params_without = {"method": "test", "v": "1.0"}
secret = "secret"
assert build_sign(params_with, secret) == build_sign(params_without, secret)
def test_none_values_excluded(self):
"""Keys with None values are excluded from signing."""
params_with_none = {"method": "test", "optional": None, "v": "1.0"}
params_without_none = {"method": "test", "v": "1.0"}
secret = "secret"
assert build_sign(params_with_none, secret) == build_sign(params_without_none, secret)
def test_alphabetical_key_ordering(self):
"""Keys are sorted alphabetically for consistent signing."""
params_abc = {"a": "1", "b": "2", "c": "3"}
params_cba = {"c": "3", "b": "2", "a": "1"}
secret = "secret"
assert build_sign(params_abc, secret) == build_sign(params_cba, secret)
def test_different_secrets_produce_different_signs(self):
params = {"method": "test"}
assert build_sign(params, "secret1") != build_sign(params, "secret2")
def test_known_hash(self):
"""Verify against a manually computed hash."""
import hashlib
params = {"app_key": "ABC", "method": "test", "v": "1.0"}
secret = "XYZ"
sorted_keys = sorted(params.keys())
raw = secret + "".join(f"{k}{params[k]}" for k in sorted_keys) + secret
expected = hashlib.md5(raw.encode("utf-8")).hexdigest().upper()
assert build_sign(params, secret) == expected

View file

@ -0,0 +1,125 @@
"""Unit tests for ts_shared_rev data cleaning helpers."""
import sys
import os
import pytest
# Add parent directory to path so we can import ts_shared_rev
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
# Set required env vars before import
os.environ.setdefault("TRACKSOLID_APP_KEY", "test_key")
os.environ.setdefault("TRACKSOLID_APP_SECRET", "test_secret")
os.environ.setdefault("TRACKSOLID_USER_ID", "test_user")
os.environ.setdefault("TRACKSOLID_PWD_MD5", "test_md5")
os.environ.setdefault("DATABASE_URL", "postgresql://test:test@localhost:5432/test")
from ts_shared_rev import clean, clean_num, clean_int, clean_ts, is_valid_fix
class TestClean:
def test_none_returns_none(self):
assert clean(None) is None
def test_empty_string_returns_none(self):
assert clean("") is None
def test_whitespace_only_returns_none(self):
assert clean(" ") is None
def test_normal_string_preserved(self):
assert clean("hello") == "hello"
def test_strips_whitespace(self):
assert clean(" hello ") == "hello"
def test_non_string_converted(self):
assert clean(123) == "123"
def test_zero_preserved(self):
assert clean(0) == "0"
class TestCleanNum:
def test_valid_float_string(self):
assert clean_num("3.14") == pytest.approx(3.14)
def test_valid_integer_string(self):
assert clean_num("42") == pytest.approx(42.0)
def test_non_numeric_returns_none(self):
assert clean_num("abc") is None
def test_none_returns_none(self):
assert clean_num(None) is None
def test_empty_string_returns_none(self):
assert clean_num("") is None
def test_numeric_value_passthrough(self):
assert clean_num(45.5) == pytest.approx(45.5)
def test_negative_value(self):
assert clean_num("-1.5") == pytest.approx(-1.5)
class TestCleanInt:
def test_integer_string(self):
assert clean_int("42") == 42
def test_float_string_truncates(self):
assert clean_int("3.9") == 3
def test_non_numeric_returns_none(self):
assert clean_int("abc") is None
def test_none_returns_none(self):
assert clean_int(None) is None
class TestCleanTs:
def test_valid_iso_timestamp(self):
result = clean_ts("2024-04-12 08:00:00")
assert result == "2024-04-12 08:00:00"
def test_valid_iso_with_timezone(self):
result = clean_ts("2024-04-12T08:00:00Z")
assert result is not None
def test_garbage_returns_none(self):
assert clean_ts("not-a-date") is None
def test_none_returns_none(self):
assert clean_ts(None) is None
def test_empty_string_returns_none(self):
assert clean_ts("") is None
def test_bcd_format_returns_none(self):
# BCD format YYMMDDHHmmss is NOT handled by clean_ts (only by _parse_trip_ts)
assert clean_ts("220415103000") is None
class TestIsValidFix:
def test_zero_island_filtered(self):
assert is_valid_fix(0.0, 0.0) is False
def test_valid_nairobi_coords(self):
assert is_valid_fix(-1.2921, 36.8219) is True
def test_none_lat_returns_false(self):
assert is_valid_fix(None, 36.8219) is False
def test_none_lng_returns_false(self):
assert is_valid_fix(-1.2921, None) is False
def test_out_of_range_lat(self):
assert is_valid_fix(91.0, 36.8219) is False
def test_out_of_range_lng(self):
assert is_valid_fix(-1.2921, 181.0) is False
def test_valid_extreme_coords(self):
assert is_valid_fix(90.0, 180.0) is True
def test_string_coords_accepted(self):
assert is_valid_fix("-1.2921", "36.8219") is True

View file

@ -0,0 +1,150 @@
"""Unit tests locking in known field mapping fixes (FIX-E06, FIX-M16, BUG-03)."""
import sys
import os
import pytest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
os.environ.setdefault("TRACKSOLID_APP_KEY", "test_key")
os.environ.setdefault("TRACKSOLID_APP_SECRET", "test_secret")
os.environ.setdefault("TRACKSOLID_USER_ID", "test_user")
os.environ.setdefault("TRACKSOLID_PWD_MD5", "test_md5")
os.environ.setdefault("DATABASE_URL", "postgresql://test:test@localhost:5432/test")
from ts_shared_rev import clean, clean_ts, clean_num
from webhook_receiver_rev import _parse_trip_ts, unix_to_ts
class TestFIXE06AlarmFieldMapping:
"""FIX-E06: Poll alarm endpoint uses alertTypeId/alarmTypeName/alertTime."""
def test_poll_uses_alert_type_id(self):
"""Alarm poll response must use alertTypeId, not alarmType."""
api_alarm = {
"imei": "123456789012345",
"alertTypeId": "4", # CORRECT poll field
"alarmType": "WRONG_FIELD", # webhook field - should NOT be used for polls
"alarmTypeName": "Speeding",
"alertTime": "2024-04-12 07:30:00",
}
# FIX-E06: extract using alertTypeId (poll field name)
alarm_type = clean(api_alarm.get("alertTypeId"))
assert alarm_type == "4", "Must use alertTypeId not alarmType for poll responses"
def test_poll_uses_alarm_type_name(self):
"""Alarm name must come from alarmTypeName, not alarmName."""
api_alarm = {
"alertTypeId": "4",
"alarmTypeName": "Speeding", # CORRECT poll field
"alarmName": "WRONG_FIELD", # webhook field
"alertTime": "2024-04-12 07:30:00",
}
alarm_name = clean(api_alarm.get("alarmTypeName"))
assert alarm_name == "Speeding"
def test_poll_uses_alert_time(self):
"""Alarm time must come from alertTime, not alarmTime."""
api_alarm = {
"alertTypeId": "4",
"alarmTypeName": "Speeding",
"alertTime": "2024-04-12 07:30:00", # CORRECT poll field
"alarmTime": "WRONG_FIELD", # webhook field
}
alarm_time = clean_ts(api_alarm.get("alertTime"))
assert alarm_time == "2024-04-12 07:30:00"
def test_wrong_field_names_return_none(self):
"""Using incorrect webhook field names on poll data returns None (the bug)."""
api_alarm = {"alertTypeId": "4", "alarmTypeName": "Speeding", "alertTime": "2024-04-12 07:30:00"}
# These are webhook fields — should NOT be present in poll responses
assert clean(api_alarm.get("alarmType")) is None
assert clean(api_alarm.get("alarmName")) is None
assert clean_ts(api_alarm.get("alarmTime")) is None
class TestFIXM16DistanceUnits:
"""FIX-M16: Trip distance arrives in METRES from API, must be stored as km."""
def test_metres_divided_by_1000(self):
"""15000 metres from API → 15.0 km stored."""
raw_dist_metres = 15000
dist_km = round(raw_dist_metres / 1000.0, 4)
assert dist_km == pytest.approx(15.0)
def test_small_distance(self):
"""500 metres → 0.5 km."""
assert round(500 / 1000.0, 4) == pytest.approx(0.5)
def test_none_distance(self):
"""None distance stays None (no division by zero)."""
raw_dist = clean_num(None)
dist_km = round(raw_dist / 1000.0, 4) if raw_dist is not None else None
assert dist_km is None
def test_zero_distance(self):
"""0 metres → 0.0 km."""
raw_dist = clean_num(0)
dist_km = round(raw_dist / 1000.0, 4) if raw_dist is not None else None
assert dist_km == pytest.approx(0.0)
def test_non_divided_would_be_wrong(self):
"""Verify that NOT dividing produces obviously wrong km values."""
raw_dist_metres = 15000
# Without fix: storing raw value as km
wrong_km = raw_dist_metres
# With fix: correct km
correct_km = raw_dist_metres / 1000.0
assert wrong_km == 15000 # Would mean 15,000 km trip — clearly wrong
assert correct_km == 15.0
class TestBUG03TripTimestamps:
"""BUG-03: Trip timestamps may be BCD format YYMMDDHHmmss or ISO string."""
def test_bcd_12_char_format(self):
"""220415103000 → 2022-04-15 10:30:00."""
result = _parse_trip_ts("220415103000")
assert result == "2022-04-15 10:30:00"
def test_bcd_14_char_format(self):
"""20220415103000 → 2022-04-15 10:30:00."""
result = _parse_trip_ts("20220415103000")
assert result == "2022-04-15 10:30:00"
def test_iso_string_passthrough(self):
"""ISO string passes through unchanged."""
result = _parse_trip_ts("2024-04-12 08:00:00")
assert result == "2024-04-12 08:00:00"
def test_none_returns_none(self):
assert _parse_trip_ts(None) is None
def test_garbage_returns_none(self):
assert _parse_trip_ts("not-a-timestamp") is None
def test_bcd_year_20xx(self):
"""24 prefix → 2024-xx-xx."""
result = _parse_trip_ts("240412080000")
assert result is not None
assert result.startswith("2024-04-12")
class TestUnixToTs:
"""BUG-01: OBD event_time may be Unix epoch (seconds or milliseconds)."""
def test_unix_seconds(self):
result = unix_to_ts(1712908800)
assert result is not None
assert "2024" in result
def test_unix_milliseconds(self):
result = unix_to_ts(1712908800000) # ms — should be divided by 1000
assert result is not None
assert "2024" in result
def test_unix_seconds_matches_milliseconds(self):
"""Seconds and milliseconds of same moment produce same result."""
assert unix_to_ts(1712908800) == unix_to_ts(1712908800000)
def test_none_returns_none(self):
assert unix_to_ts(None) is None

View file

@ -29,6 +29,7 @@ REVISIONS (QA-Verified):
from __future__ import annotations
import hmac
import json
import os
import time
@ -36,8 +37,13 @@ from contextlib import asynccontextmanager
from datetime import datetime, timezone
from typing import Optional
# Cap on items per webhook POST. Prevents a malformed/malicious push from
# monopolising a worker or blowing the DB pool. Jimi normally sends ≤ 200.
MAX_ITEMS_PER_POST = int(os.getenv("WEBHOOK_MAX_ITEMS", "5000"))
from fastapi import FastAPI, Form, HTTPException
from fastapi.responses import JSONResponse
from psycopg2.extras import execute_values
from ts_shared_rev import (
close_pool,
@ -75,7 +81,7 @@ SUCCESS = {"code": 0, "msg": "success"}
def _validate_token(token: str) -> None:
"""Raise 403 if token is invalid. Skips validation if JIMI_WEBHOOK_TOKEN is empty."""
if WEBHOOK_TOKEN and token != WEBHOOK_TOKEN:
if WEBHOOK_TOKEN and not hmac.compare_digest(token, WEBHOOK_TOKEN):
raise HTTPException(status_code=403, detail="Invalid token")
@ -83,9 +89,12 @@ def _parse_data_list(raw: str) -> list[dict]:
"""Parse the JSON string from Jimi's data_list form field."""
try:
parsed = json.loads(raw)
if isinstance(parsed, list):
return parsed
return [parsed]
items = parsed if isinstance(parsed, list) else [parsed]
if len(items) > MAX_ITEMS_PER_POST:
log.warning("data_list truncated: %d items exceeded cap of %d",
len(items), MAX_ITEMS_PER_POST)
items = items[:MAX_ITEMS_PER_POST]
return items
except (json.JSONDecodeError, TypeError):
log.warning("Failed to parse data_list: %.200s", raw)
return []
@ -341,52 +350,58 @@ def push_gps(token: str = Form(""), data_list: str = Form("")):
return JSONResponse(content=SUCCESS)
t0 = time.time()
# Validation phase — pre-clean and filter without touching the DB.
# Per-row INSERT with SAVEPOINT was ~1 ms/row overhead at this volume;
# one batched execute_values is 10-50× faster for the same rows.
rows = []
for item in items:
imei = clean(item.get("deviceImei"))
gps_time = clean_ts(item.get("gpsTime"))
lat = clean_num(item.get("lat"))
lng = clean_num(item.get("lng"))
if not imei or not gps_time or not is_valid_fix(lat, lng):
continue
rows.append((
imei, gps_time, lng, lat, lat, lng,
clean_num(item.get("gpsSpeed")),
clean_num(item.get("direction")),
str(item.get("acc")) if item.get("acc") is not None else None,
clean_int(item.get("satelliteNum")),
clean_num(item.get("distance")),
clean_num(item.get("altitude")),
clean_int(item.get("postType")),
))
inserted = 0
if rows:
with get_conn() as conn:
with conn.cursor() as cur:
execute_values(
cur,
"""
INSERT INTO tracksolid.position_history (
imei, gps_time, geom, lat, lng, speed, direction,
acc_status, satellite, current_mileage,
altitude, post_type, source
) VALUES %s
ON CONFLICT (imei, gps_time) DO NOTHING
""",
rows,
template="(%s, %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326),"
" %s, %s, %s, %s, %s, %s, %s, %s, %s, 'push')",
page_size=len(rows),
)
inserted = cur.rowcount
log_ingestion(cur, "webhook/pushgps", len(items), 0, inserted,
int((time.time() - t0) * 1000), True)
else:
# No valid rows, still record the call for observability.
with get_conn() as conn:
with conn.cursor() as cur:
log_ingestion(cur, "webhook/pushgps", len(items), 0, 0,
int((time.time() - t0) * 1000), True)
with get_conn() as conn:
with conn.cursor() as cur:
for item in items:
try:
cur.execute("SAVEPOINT sp")
imei = clean(item.get("deviceImei"))
gps_time = clean_ts(item.get("gpsTime"))
lat = clean_num(item.get("lat"))
lng = clean_num(item.get("lng"))
if not imei or not gps_time or not is_valid_fix(lat, lng):
cur.execute("RELEASE SAVEPOINT sp")
continue
cur.execute("""
INSERT INTO tracksolid.position_history (
imei, gps_time, geom, lat, lng, speed, direction,
acc_status, satellite, current_mileage,
altitude, post_type, source
) VALUES (
%s, %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326),
%s, %s, %s, %s, %s, %s, %s, %s, %s, 'push'
) ON CONFLICT (imei, gps_time) DO NOTHING
""", (
imei, gps_time, lng, lat,
lat, lng,
clean_num(item.get("gpsSpeed")),
clean_num(item.get("direction")),
str(item.get("acc")) if item.get("acc") is not None else None,
clean_int(item.get("satelliteNum")),
clean_num(item.get("distance")),
clean_num(item.get("altitude")),
clean_int(item.get("postType")),
))
cur.execute("RELEASE SAVEPOINT sp")
inserted += 1
except Exception:
cur.execute("ROLLBACK TO SAVEPOINT sp")
log.warning("Failed to process GPS for %s", item.get("deviceImei"), exc_info=True)
log_ingestion(cur, "webhook/pushgps", len(items), 0, inserted,
int((time.time() - t0) * 1000), True)
log.info("pushgps: %d/%d items processed.", inserted, len(items))
log.info("pushgps: %d/%d items inserted.", inserted, len(items))
return JSONResponse(content=SUCCESS)
# ── 5. Device Heartbeats (Priority 2) ────────────────────────────────────────