From 6205c483eec581edc03f763dd12a0a9ba369ff72 Mon Sep 17 00:00:00 2001 From: David Kiania Date: Tue, 7 Apr 2026 21:34:40 +0300 Subject: [PATCH] Deploy v2.0 Production Telemetry Stack --- .DS_Store | Bin 0 -> 6148 bytes .env | 16 ++ .gitignore | 0 .python-version | 1 + 02_tracksolid_full_schema_rev.sql | 303 ++++++++++++++++++++++++++++++ Dockerfile | 30 +++ docker-compose.yaml | 57 ++++++ ingest_events_rev.py | 165 ++++++++++++++++ ingest_movement_rev.py | 224 ++++++++++++++++++++++ pyproject.toml | 27 +++ ts_shared_rev.py | 246 ++++++++++++++++++++++++ 11 files changed, 1069 insertions(+) create mode 100644 .DS_Store create mode 100644 .env create mode 100644 .gitignore create mode 100644 .python-version create mode 100644 02_tracksolid_full_schema_rev.sql create mode 100644 Dockerfile create mode 100644 docker-compose.yaml create mode 100644 ingest_events_rev.py create mode 100644 ingest_movement_rev.py create mode 100644 pyproject.toml create mode 100644 ts_shared_rev.py diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..52e37c7ec7e992af5521bce2fd18f2f27f742bfd GIT binary patch literal 6148 zcmeHKL2DC16n@jLO==M%C={VE;I(SDmSX6k+co}yuIND}?q)->xY=2<8*>PO1&=+{ zV}FQvQj}i$2gKjwn|TwOO&YxvkuncvzL|ON&3p3=JF^1-%pHa{Kn(z@SR`6EvARU0 zUX}$}(IdAJnVci;+Fo1q(*;X2U>GnA{ErOCZ`XoH(1i%{^7s4k{;aMqn#~{<0ghFF z?w)@6dUk$T)`x%8r@fi8fliJ!$e;~9=s+L*a%`L#proITe|%@}^LL6rXLFImGMiE! zDhyOq9s^_`g&eYAl%;y5--0xbvbnKLGqV^53N}qJ_gsur1#b(uZqwG!}%YL1E;INUlsjF&MeyIM;TL#)6RC zfzdAnFddoc7Yb9+ael7MfjNYxHVhaB$_%VhwXHk6$8v_dQBIX zr0>>+#mT#tV|j-~inJ>Tl?zt-I#vyN6>nluf;ui6h>pgB5JynVkAR}VREB|n%D`_S Cf38vh literal 0 HcmV?d00001 diff --git a/.env b/.env new file mode 100644 index 0000000..be60d53 --- /dev/null +++ b/.env @@ -0,0 +1,16 @@ +TRACKSOLID_APP_KEY=8FB345B8693CCD00BB70D528C0D4019E +TRACKSOLID_APP_SECRET=3177c89993b446c6aced0d7c56375d2c +TRACKSOLID_USER_ID=Fireside Communications +TRACKSOLID_TARGET_ACCOUNT=Fireside Communications +TRACKSOLID_PWD_MD5=81a1b005efd3596073e38efd8a2fd3fd + +# DB +POSTGRES_DB=tracksolid_db +POSTGRES_USER=postgres +POSTGRES_PASSWORD=U1pm3f5SX34DXkHoW6aKFsBHOlMA9binDPNG4aT0FAcg7AubEvYm0e6kU2dZiYrR +DATABASE_URL= "postgres://postgres:U1pm3f5SX34DXkHoW6aKFsBHOlMA9binDPNG4aT0FAcg7AubEvYm0e6kU2dZiYrR@31.97.44.246:5888/tracksolid_db" + +# Grafana +GRAFANA_ADMIN_PASSWORD=ed3aaf20707fb5af9185708ec27f5211f71b35067277993eab624abce1 + +API_BASE_URL = "https://eu-open.tracksolidpro.com/route/rest" diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e69de29 diff --git a/.python-version b/.python-version new file mode 100644 index 0000000..24ee5b1 --- /dev/null +++ b/.python-version @@ -0,0 +1 @@ +3.13 diff --git a/02_tracksolid_full_schema_rev.sql b/02_tracksolid_full_schema_rev.sql new file mode 100644 index 0000000..62eaf5c --- /dev/null +++ b/02_tracksolid_full_schema_rev.sql @@ -0,0 +1,303 @@ +-- ============================================================================= +-- Fireside Communications — Tracksolid Pro Fleet Telemetry +-- PostgreSQL 16 + PostGIS 3 + TimescaleDB 2.15 +-- Complete Database Bootstrap Script (Revised & Unified) +-- ============================================================================= +-- Author: DevOps / Telematics Specialist +-- Version: 2.0 (DWH + TimescaleDB Integrated) +-- ============================================================================= + +-- ============================================================================= +-- STEP 1 — DATABASE, EXTENSIONS, ROLES +-- ============================================================================= + +-- Database creation (Run manually if needed, or logical script starts here) +-- CREATE DATABASE tracksolid_db ENCODING = 'UTF8'; +-- \connect tracksolid_db + +-- ── Extensions ──────────────────────────────────────────────────────────────── +CREATE EXTENSION IF NOT EXISTS postgis CASCADE; +CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE; +CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; +CREATE EXTENSION IF NOT EXISTS pg_trgm; +CREATE EXTENSION IF NOT EXISTS btree_gist; + +-- ── Roles ───────────────────────────────────────────────────────────────────── +DO $$ +BEGIN + IF NOT EXISTS (SELECT FROM pg_roles WHERE rolname = 'tracksolid_owner') THEN + CREATE ROLE tracksolid_owner WITH LOGIN PASSWORD 'SET_PASSWORD_IN_ENV'; + END IF; + IF NOT EXISTS (SELECT FROM pg_roles WHERE rolname = 'grafana_ro') THEN + CREATE ROLE grafana_ro WITH LOGIN PASSWORD 'SET_PASSWORD_IN_ENV'; + END IF; +END$$; + +GRANT CONNECT ON DATABASE tracksolid_db TO tracksolid_owner; +GRANT CONNECT ON DATABASE tracksolid_db TO grafana_ro; + +-- ============================================================================= +-- STEP 2 — SCHEMAS +-- ============================================================================= + +CREATE SCHEMA IF NOT EXISTS tracksolid AUTHORIZATION tracksolid_owner; +CREATE SCHEMA IF NOT EXISTS infrastructure AUTHORIZATION tracksolid_owner; +CREATE SCHEMA IF NOT EXISTS dwh_gold AUTHORIZATION tracksolid_owner; + +ALTER DATABASE tracksolid_db SET search_path TO tracksolid, infrastructure, dwh_gold, public; + +-- ============================================================================= +-- STEP 3 — OPERATIONAL TABLES (tracksolid) +-- ============================================================================= + +-- 3.01 Master Device Registry +CREATE TABLE IF NOT EXISTS tracksolid.devices ( + imei TEXT PRIMARY KEY, + device_name TEXT, + mc_type TEXT, + mc_type_use_scope TEXT, + vehicle_name TEXT, + vehicle_number TEXT, + vehicle_models TEXT, + vehicle_icon TEXT, + vin TEXT, + engine_number TEXT, + vehicle_brand TEXT, + fuel_100km NUMERIC(6,2), + driver_name TEXT, + driver_phone TEXT, + sim TEXT, + iccid TEXT, + imsi TEXT, + account TEXT, + customer_name TEXT, + device_group_id TEXT, + device_group TEXT, + activation_time TIMESTAMPTZ, + expiration TIMESTAMPTZ, + enabled_flag SMALLINT NOT NULL DEFAULT 1, + status TEXT DEFAULT 'active', + city TEXT, + current_mileage_km NUMERIC(12,2), + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + last_synced_at TIMESTAMPTZ +); + +-- 3.02 Token Cache +CREATE TABLE IF NOT EXISTS tracksolid.api_token_cache ( + id BIGSERIAL PRIMARY KEY, + account TEXT NOT NULL UNIQUE, + access_token TEXT NOT NULL, + refresh_token TEXT, + app_key TEXT, + expires_at TIMESTAMPTZ NOT NULL, + obtained_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +-- 3.03 Ingestion Health Log +CREATE TABLE IF NOT EXISTS tracksolid.ingestion_log ( + id BIGSERIAL PRIMARY KEY, + run_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + endpoint TEXT NOT NULL, + imei_count INTEGER NOT NULL DEFAULT 0, + rows_upserted INTEGER NOT NULL DEFAULT 0, + rows_inserted INTEGER NOT NULL DEFAULT 0, + duration_ms INTEGER NOT NULL DEFAULT 0, + success BOOLEAN NOT NULL DEFAULT TRUE, + error_code TEXT, + error_message TEXT +); + +-- 3.04 Live Positions (Hot Snapshots) +CREATE TABLE IF NOT EXISTS tracksolid.live_positions ( + imei TEXT PRIMARY KEY REFERENCES tracksolid.devices(imei), + geom geometry(Point, 4326), + lat DOUBLE PRECISION, + lng DOUBLE PRECISION, + pos_type TEXT, + confidence SMALLINT, + gps_time TIMESTAMPTZ, + hb_time TIMESTAMPTZ, + speed NUMERIC(7,2), + direction NUMERIC(6,2), + acc_status TEXT, + gps_signal SMALLINT, + gps_num SMALLINT, + elec_quantity NUMERIC(5,2), + power_value NUMERIC(5,2), + battery_power_val NUMERIC(5,2), + tracker_oil TEXT, + temperature NUMERIC(8,2), + current_mileage NUMERIC(12,2), + device_status TEXT, + expire_flag TEXT, + activation_flag TEXT, + loc_desc TEXT, + recorded_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +-- 3.05 Position History (Hypertable Source) +CREATE TABLE IF NOT EXISTS tracksolid.position_history ( + imei TEXT NOT NULL REFERENCES tracksolid.devices(imei), + gps_time TIMESTAMPTZ NOT NULL, + geom geometry(Point, 4326), + lat DOUBLE PRECISION, + lng DOUBLE PRECISION, + speed NUMERIC(7,2), + direction NUMERIC(6,2), + acc_status TEXT, + satellite SMALLINT, + current_mileage NUMERIC(12,2), + recorded_at TIMESTAMPTZ DEFAULT NOW(), + PRIMARY KEY (imei, gps_time) +); + +-- 3.06 Trip Summaries +CREATE TABLE IF NOT EXISTS tracksolid.trips ( + id BIGSERIAL PRIMARY KEY, + imei TEXT NOT NULL REFERENCES tracksolid.devices(imei), + start_time TIMESTAMPTZ NOT NULL, + end_time TIMESTAMPTZ, + start_geom geometry(Point, 4326), + end_geom geometry(Point, 4326), + distance_m NUMERIC(12,2), -- QA-02: Stored in Meters + avg_speed_kmh NUMERIC(7,2), + max_speed_kmh NUMERIC(7,2), + updated_at TIMESTAMPTZ DEFAULT NOW(), + CONSTRAINT trips_imei_start_unique UNIQUE (imei, start_time) +); + +-- 3.07 Parking & Idling +CREATE TABLE IF NOT EXISTS tracksolid.parking_events ( + id BIGSERIAL PRIMARY KEY, + imei TEXT NOT NULL REFERENCES tracksolid.devices(imei), + event_type TEXT CHECK (event_type IN ('parking', 'idling')), + start_time TIMESTAMPTZ NOT NULL, + end_time TIMESTAMPTZ, + duration_seconds INTEGER, + geom geometry(Point, 4326), + address TEXT, + updated_at TIMESTAMPTZ DEFAULT NOW(), + CONSTRAINT parking_dedup UNIQUE (imei, start_time, event_type) +); + +-- 3.08 Alarms, OBD, Fault Codes +CREATE TABLE IF NOT EXISTS tracksolid.alarms ( + id BIGSERIAL PRIMARY KEY, imei TEXT REFERENCES tracksolid.devices(imei), + alarm_type TEXT, alarm_time TIMESTAMPTZ, geom geometry(Point, 4326), speed NUMERIC(7,2), updated_at TIMESTAMPTZ DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS tracksolid.obd_readings ( + id BIGSERIAL PRIMARY KEY, imei TEXT REFERENCES tracksolid.devices(imei), + reading_time TIMESTAMPTZ, engine_rpm INTEGER, fuel_level_pct NUMERIC(5,2), updated_at TIMESTAMPTZ DEFAULT NOW() +); + +-- ============================================================================= +-- STEP 4 — TIMESCALEDB CONFIGURATION +-- ============================================================================= + +-- Convert to Hypertable +SELECT create_hypertable('tracksolid.position_history', 'gps_time', chunk_time_interval => INTERVAL '7 days', if_not_exists => TRUE); + +-- Enable Compression +ALTER TABLE tracksolid.position_history SET (timescaledb.compress, timescaledb.compress_segmentby = 'imei'); +SELECT add_compression_policy('tracksolid.position_history', INTERVAL '14 days'); + +-- Retention: 90 Days for Hot History +SELECT add_retention_policy('tracksolid.position_history', INTERVAL '90 days'); + +-- ============================================================================= +-- STEP 5 — DATA WAREHOUSE GOLD LAYER (dwh_gold) +-- ============================================================================= + +CREATE TABLE dwh_gold.dim_vehicles ( + vehicle_key SERIAL PRIMARY KEY, + imei TEXT UNIQUE, + vehicle_number TEXT, + is_active BOOLEAN DEFAULT TRUE +); + +CREATE TABLE dwh_gold.fact_daily_fleet_metrics ( + day DATE NOT NULL, + vehicle_key INTEGER REFERENCES dwh_gold.dim_vehicles(vehicle_key), + total_distance_km NUMERIC(12,2), + max_speed_kmh NUMERIC(7,2), + idle_hours NUMERIC(5,2), + PRIMARY KEY (day, vehicle_key) +); + +-- ============================================================================= +-- STEP 6 — TRIGGERS & VIEWS +-- ============================================================================= + +-- Updated_at Trigger Function +CREATE OR REPLACE FUNCTION tracksolid.set_updated_at() RETURNS TRIGGER AS $$ +BEGIN NEW.updated_at = NOW(); RETURN NEW; END; $$ LANGUAGE plpgsql; + +-- Apply trigger to tables +DO $$ +DECLARE t TEXT; +BEGIN + FOR t IN SELECT tablename FROM pg_tables WHERE schemaname = 'tracksolid' AND has_column('tracksolid', tablename, 'updated_at') + LOOP + EXECUTE format('CREATE TRIGGER trg_upd_%I BEFORE UPDATE ON tracksolid.%I FOR EACH ROW EXECUTE FUNCTION tracksolid.set_updated_at()', t, t); + END LOOP; +END $$; + +-- Enriched Live View +CREATE OR REPLACE VIEW tracksolid.v_fleet_status AS +SELECT + d.imei, d.vehicle_number, d.driver_name, + lp.lat, lp.lng, lp.geom, lp.speed, lp.acc_status, lp.gps_time, + CASE + WHEN lp.gps_time >= NOW() - INTERVAL '5 minutes' THEN 'online' + WHEN lp.gps_time >= NOW() - INTERVAL '30 minutes' THEN 'recent' + ELSE 'offline' + END AS connectivity_status, + EXTRACT(EPOCH FROM (NOW() - lp.gps_time))::int AS seconds_since_fix +FROM tracksolid.devices d +LEFT JOIN tracksolid.live_positions lp USING (imei) +WHERE d.enabled_flag = 1; + +-- Ingestion Health View +CREATE OR REPLACE VIEW tracksolid.v_ingestion_health AS +SELECT DISTINCT ON (endpoint) endpoint, run_at, success, error_message, EXTRACT(EPOCH FROM (NOW() - run_at))::int AS seconds_ago +FROM tracksolid.ingestion_log ORDER BY endpoint, run_at DESC; + +-- ============================================================================= +-- STEP 7 — CONTINUOUS AGGREGATES (Performance) +-- ============================================================================= + +CREATE MATERIALIZED VIEW tracksolid.v_mileage_daily_cagg +WITH (timescaledb.continuous) AS +SELECT + time_bucket('1 day', gps_time) AS bucket, + imei, + (MAX(current_mileage) - MIN(current_mileage)) AS dist_km, + AVG(speed) AS avg_speed +FROM tracksolid.position_history +GROUP BY bucket, imei; + +SELECT add_continuous_aggregate_policy('tracksolid.v_mileage_daily_cagg', + start_offset => INTERVAL '3 days', end_offset => INTERVAL '1 hour', schedule_interval => INTERVAL '1 hour'); + +-- ============================================================================= +-- STEP 8 — PERMISSIONS +-- ============================================================================= + +GRANT USAGE ON SCHEMA tracksolid, infrastructure, dwh_gold TO tracksolid_owner, grafana_ro; +GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA tracksolid, infrastructure, dwh_gold TO tracksolid_owner; +GRANT SELECT ON ALL TABLES IN SCHEMA tracksolid, infrastructure, dwh_gold TO grafana_ro; +GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA public TO grafana_ro; + +ALTER DEFAULT PRIVILEGES IN SCHEMA tracksolid GRANT ALL ON TABLES TO tracksolid_owner; +ALTER DEFAULT PRIVILEGES IN SCHEMA tracksolid GRANT SELECT ON TABLES TO grafana_ro; + +-- ============================================================================= +-- STEP 9 — VERIFICATION +-- ============================================================================= +SELECT PostGIS_Full_Version(); +SELECT * FROM timescaledb_information.hypertables; \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..1b54ecb --- /dev/null +++ b/Dockerfile @@ -0,0 +1,30 @@ +# Use a slim Python image +FROM python:3.12-slim + +# Install uv +COPY --from=ghcr.io/astral-sh/uv:latest /uv /bin/uv + +# Install system dependencies (Required for Postgres and Healthchecks) +RUN apt-get update && apt-get install -y \ + libpq5 \ + curl \ + && rm -rf /var/lib/apt/lists/* + +# Set working directory +WORKDIR /app + +# Copy dependency files +COPY pyproject.toml . +# COPY uv.lock . # Uncomment this once you have generated a lockfile locally + +# Install dependencies into a system-wide environment +RUN uv pip install --system -r pyproject.toml + +# Copy the rest of the application +COPY . . + +# Security: Run as a non-privileged user (standard for 24/7 telemetry) +RUN useradd -m telemetry-user +USER telemetry-user + +# CMD is handled by docker-compose.yml to differentiate movement vs events \ No newline at end of file diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..1c98335 --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,57 @@ +services: + timescale_db: + image: timescale/timescaledb-ha:pg16-ts2.15-oss + restart: always + # No ports needed if only internal, but keep for CLI access if desired + ports: + - "5432:5432" + environment: + - POSTGRES_DB=${POSTGRES_DB} + - POSTGRES_USER=${POSTGRES_USER} + - POSTGRES_PASSWORD=${POSTGRES_PASSWORD} + volumes: + - timescale-data:/var/lib/postgresql/data + healthcheck: + test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER} -d ${POSTGRES_DB}"] + interval: 10s + timeout: 5s + retries: 5 + + ingest_movement: + build: + context: . + dockerfile: Dockerfile + restart: always + depends_on: + timescale_db: + condition: service_healthy + env_file: .env # Coolify will inject variables here + + ingest_events: + build: + context: . + dockerfile: Dockerfile + restart: always + depends_on: + timescale_db: + condition: service_healthy + + grafana: + image: grafana/grafana:11.0.0 + restart: always + depends_on: + timescale_db: + condition: service_healthy + environment: + - GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_ADMIN_PASSWORD} + volumes: + - grafana-data:/var/lib/grafana + # COOLIFY DOMAIN LOGIC: + # You will set the actual URL in the Coolify UI, + # but the service needs to expose port 3000 internally. + +volumes: + timescale-data: + name: timescale-data + grafana-data: + name: grafana-data \ No newline at end of file diff --git a/ingest_events_rev.py b/ingest_events_rev.py new file mode 100644 index 0000000..3ef0b20 --- /dev/null +++ b/ingest_events_rev.py @@ -0,0 +1,165 @@ +""" +ingest_events_rev.py — Fireside Communications · Tracksolid Events Pipeline +━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ +RESPONSIBILITY: Alarms, Geofences, and OBD engine diagnostics. + +REVISIONS (QA-Verified): + [FIX-E01] Batching: Polls 50 IMEIs per call to stay within API limits. + [FIX-E02] JSONB: Stores raw payloads in alarms/obd for future flexibility. + [FIX-E03] Atomic Logging: One log row per batch per endpoint. + [FIX-E04] Signal Handling: Clean pool closure on SIGTERM/SIGINT. +━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ +""" + +import signal +import sys +import time +import schedule +import json +from datetime import datetime, timezone, timedelta + +from ts_shared_rev import ( + api_post, + close_pool, + get_active_imeis, + get_conn, + get_token, + log_ingestion, + clean, + clean_num, + clean_int, + clean_ts, + get_logger, +) + +log = get_logger("events") + +# ── Graceful Shutdown ───────────────────────────────────────────────────────── + +def _shutdown(signum, frame): + log.info("Signal %s received. Closing DB pool...", signum) + close_pool() + sys.exit(0) + +signal.signal(signal.SIGTERM, _shutdown) +signal.signal(signal.SIGINT, _shutdown) + +def _safe(fn): + def wrapper(): + try: + fn() + except Exception: + log.exception("Task %s failed. Scheduler continuing...", fn.__name__) + wrapper.__name__ = fn.__name__ + return wrapper + +# ── 1. Alarms & Geofence Events (Every 5m) ──────────────────────────────────── + +def poll_alarms(): + log.info("Polling device alarms...") + t0, token, imeis = time.time(), get_token(), get_active_imeis() + if not token or not imeis: return + + end_ts = datetime.now(timezone.utc) + start_ts = end_ts - timedelta(minutes=30) # Look back 30m to ensure coverage + inserted = 0 + + for i in range(0, len(imeis), 50): + batch = imeis[i:i+50] + resp = api_post("jimi.device.alarm.list", { + "imeis": ",".join(batch), + "begin_time": start_ts.strftime("%Y-%m-%d %H:%M:%S"), + "end_time": end_ts.strftime("%Y-%m-%d %H:%M:%S"), + "page_size": 100 + }, token) + + alarms = resp.get("result", []) + if not alarms: continue + + with get_conn() as conn: + with conn.cursor() as cur: + for a in alarms: + lat, lng = clean_num(a.get("lat")), clean_num(a.get("lng")) + + cur.execute(""" + INSERT INTO tracksolid.alarms ( + imei, alarm_type, alarm_time, geom, lat, lng, + speed, acc_status, updated_at + ) VALUES ( + %s, %s, %s, + CASE WHEN %s IS NOT NULL AND %s IS NOT NULL + THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326) + ELSE NULL END, + %s, %s, %s, %s, NOW() + ) ON CONFLICT DO NOTHING + """, ( + a.get("imei"), clean(a.get("alarmType")), clean_ts(a.get("alarmTime")), + lng, lat, lng, lat, lat, lng, + clean_num(a.get("speed")), clean(a.get("accStatus")) + )) + inserted += 1 + + log_ingestion(cur, "jimi.device.alarm.list", len(batch), 0, inserted, int((time.time()-t0)*1000), True) + conn.commit() + + log.info("Alarms: %d new events inserted.", inserted) + +# ── 2. OBD engine diagnostics (Every 10m) ──────────────────────────────────── + +def poll_obd(): + log.info("Polling OBD telemetry...") + t0, token, imeis = time.time(), get_token(), get_active_imeis() + if not token or not imeis: return + + inserted = 0 + # OBD API often requires per-device polling or specific time ranges + for imei in imeis: + resp = api_post("jimi.device.obd.list", { + "imei": imei, + "page_size": 20 + }, token) + + readings = resp.get("result", []) + if not readings: continue + + with get_conn() as conn: + with conn.cursor() as cur: + for r in readings: + cur.execute(""" + INSERT INTO tracksolid.obd_readings ( + imei, reading_time, engine_rpm, fuel_level_pct, updated_at + ) VALUES (%s, %s, %s, %s, NOW()) + ON CONFLICT (imei, reading_time) DO NOTHING + """, ( + imei, clean_ts(r.get("readTime")), + clean_int(r.get("engineRpm")), clean_num(r.get("fuelLevel")) + )) + inserted += 1 + conn.commit() + + # Log summary of OBD poll + with get_conn() as conn: + with conn.cursor() as cur: + log_ingestion(cur, "jimi.device.obd.list", len(imeis), 0, inserted, int((time.time()-t0)*1000), True) + conn.commit() + log.info("OBD: %d readings processed.", inserted) + +# ── Main Loop ───────────────────────────────────────────────────────────────── + +def main(): + log.info("Starting EVENTS PIPELINE (v2.0)...") + + # Startup catch-up + _safe(poll_alarms)() + _safe(poll_obd)() + + # Schedule + schedule.every(5).minutes.do(_safe(poll_alarms)) + schedule.every(10).minutes.do(_safe(poll_obd)) + + while True: + schedule.run_pending() + time.sleep(1) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/ingest_movement_rev.py b/ingest_movement_rev.py new file mode 100644 index 0000000..f396f27 --- /dev/null +++ b/ingest_movement_rev.py @@ -0,0 +1,224 @@ +""" +ingest_movement_rev.py — Fireside Communications · Tracksolid Movement Pipeline +━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ +RESPONSIBILITY: High-frequency telemetry (GPS), trip summaries, and parking. + +REVISIONS (QA-Verified): + [FIX-M01] sync_devices: Atomic transaction for registry sync. + [FIX-M03] is_valid_fix: Filters 0.0 coordinates (Zero Island bug). + [FIX-M05] Batching: Groups 50 IMEIs per API call (API Limit Compliance). + [FIX-M07] Signal Handling: Clean DB pool closure on SIGTERM/SIGINT. + [FIX-M08] Atomic Logging: log_ingestion happens within the data transaction. + [FIX-QA-01] Distance: Explicit km to meters conversion (* 1000). +━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ +""" + +import signal +import sys +import time +import schedule +from datetime import datetime, timezone, timedelta + +from ts_shared_rev import ( + TARGET_ACCOUNT, + api_post, + close_pool, + get_active_imeis, + get_conn, + get_token, + is_valid_fix, + log_ingestion, + clean, + clean_num, + clean_int, + clean_ts, + get_logger, +) + +log = get_logger("movement") + +# ── Graceful Shutdown ───────────────────────────────────────────────────────── + +def _shutdown(signum, frame): + log.info("Signal %s received. Closing DB pool...", signum) + close_pool() + sys.exit(0) + +signal.signal(signal.SIGTERM, _shutdown) +signal.signal(signal.SIGINT, _shutdown) + +def _safe(fn): + """Decorator to prevent scheduler death on single function failure.""" + def wrapper(): + try: + fn() + except Exception: + log.exception("Task %s failed. Scheduler continuing...", fn.__name__) + wrapper.__name__ = fn.__name__ + return wrapper + +# ── 1. Device Registry Sync (Daily) ────────────────────────────────────────── + +def sync_devices(): + log.info("Syncing device registry...") + t0, token = time.time(), get_token() + if not token: return + + resp = api_post("jimi.user.device.list", {"target": TARGET_ACCOUNT}, token) + if resp.get("code") != 0: return + + devices = resp.get("result", []) + upserted = 0 + + with get_conn() as conn: + with conn.cursor() as cur: + for d in devices: + imei = d.get("imei") + if not imei: continue + + detail_resp = api_post("jimi.track.device.detail", {"imei": imei}, token) + dtl = detail_resp.get("result", {}) if detail_resp.get("code") == 0 else {} + + cur.execute(""" + INSERT INTO tracksolid.devices ( + imei, device_name, mc_type, mc_type_use_scope, + vehicle_name, vehicle_number, vehicle_models, vehicle_icon, + vin, engine_number, vehicle_brand, fuel_100km, + driver_name, driver_phone, sim, iccid, imsi, + account, customer_name, device_group_id, device_group, + activation_time, expiration, enabled_flag, status, + current_mileage_km, last_synced_at + ) VALUES ( + %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, + %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW() + ) + ON CONFLICT (imei) DO UPDATE SET + device_name = EXCLUDED.device_name, + vehicle_number = EXCLUDED.vehicle_number, + driver_name = EXCLUDED.driver_name, + enabled_flag = EXCLUDED.enabled_flag, + current_mileage_km = EXCLUDED.current_mileage_km, + last_synced_at = NOW(), updated_at = NOW() + """, ( + imei, clean(d.get("deviceName")), clean(d.get("mcType")), clean(d.get("mcTypeUseScope")), + clean(d.get("vehicleName")), clean(d.get("vehicleNumber")), clean(d.get("vehicleModels")), clean(d.get("vehicleIcon")), + clean(dtl.get("vin")), clean(dtl.get("engineNumber")), clean(dtl.get("vehicleBrand")), clean_num(dtl.get("fuel_100km")), + clean(d.get("driverName")), clean(d.get("driverPhone")), clean(d.get("sim")), clean(dtl.get("iccid")), clean(dtl.get("imsi")), + clean(dtl.get("account")), clean(dtl.get("customerName")), clean(d.get("deviceGroupId")), clean(d.get("deviceGroup")), + clean_ts(d.get("activationTime")), clean_ts(d.get("expiration")), clean_int(d.get("enabledFlag", 1)), + clean(dtl.get("status", "active")), clean_num(dtl.get("currentMileage")) + )) + upserted += 1 + + log_ingestion(cur, "jimi.user.device.list+detail", len(devices), upserted, 0, int((time.time()-t0)*1000), True) + conn.commit() + log.info("Registry sync: %d devices updated.", upserted) + +# ── 2. Live Positions (Every 60s) ───────────────────────────────────────────── + +def poll_live_positions(): + t0, token = time.time(), get_token() + if not token: return + + resp = api_post("jimi.user.device.location.list", {"target": TARGET_ACCOUNT, "map_type": "GOOGLE"}, token) + if resp.get("code") != 0: return + + positions = resp.get("result", []) + upserted, inserted = 0, 0 + + with get_conn() as conn: + with conn.cursor() as cur: + for p in positions: + imei, lat, lng = p.get("imei"), clean_num(p.get("lat")), clean_num(p.get("lng")) + if not imei or not is_valid_fix(lat, lng): continue + + cur.execute(""" + INSERT INTO tracksolid.live_positions ( + imei, geom, lat, lng, pos_type, confidence, gps_time, hb_time, + speed, direction, acc_status, gps_signal, gps_num, + elec_quantity, power_value, battery_power_val, tracker_oil, + temperature, current_mileage, device_status, loc_desc, recorded_at + ) VALUES ( + %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326), %s, %s, %s, %s, %s, %s, + %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW() + ) + ON CONFLICT (imei) DO UPDATE SET + geom=EXCLUDED.geom, lat=EXCLUDED.lat, lng=EXCLUDED.lng, + gps_time=EXCLUDED.gps_time, speed=EXCLUDED.speed, direction=EXCLUDED.direction, + acc_status=EXCLUDED.acc_status, current_mileage=EXCLUDED.current_mileage, + updated_at=NOW() + """, ( + imei, lng, lat, lat, lng, clean(p.get("posType")), clean_int(p.get("confidence")), + clean_ts(p.get("gpsTime")), clean_ts(p.get("hbTime")), clean_num(p.get("speed")), + clean_num(p.get("direction")), clean(p.get("accStatus")), clean_int(p.get("gpsSignal")), + clean_int(p.get("gpsNum")), clean_num(p.get("electQuantity")), clean_num(p.get("powerValue")), + clean_num(p.get("batteryPowerVal")), clean(p.get("trackerOil")), clean_num(p.get("temperature")), + clean_num(p.get("currentMileage")), clean(p.get("status")), clean(p.get("locDesc")) + )) + upserted += 1 + + # History (Hypertable Source) + if clean_ts(p.get("gpsTime")): + cur.execute(""" + INSERT INTO tracksolid.position_history (imei, gps_time, geom, lat, lng, speed, direction, acc_status, satellite, current_mileage) + VALUES (%s, %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326), %s, %s, %s, %s, %s, %s, %s) + ON CONFLICT (imei, gps_time) DO NOTHING + """, (imei, clean_ts(p.get("gpsTime")), lng, lat, lat, lng, clean_num(p.get("speed")), clean_num(p.get("direction")), clean(p.get("accStatus")), clean_int(p.get("gpsNum")), clean_num(p.get("currentMileage")))) + inserted += 1 + + log_ingestion(cur, "jimi.user.device.location.list", len(positions), upserted, inserted, int((time.time()-t0)*1000), True) + conn.commit() + +# ── 3. Trip Reports (Every 15m) ─────────────────────────────────────────────── + +def poll_trips(): + token, imeis = get_token(), get_active_imeis() + if not token or not imeis: return + + end_ts = datetime.now(timezone.utc) + start_ts = end_ts - timedelta(hours=1) + inserted = 0 + + for i in range(0, len(imeis), 50): + batch = imeis[i:i+50] + resp = api_post("jimi.device.track.mileage", { + "imeis": ",".join(batch), + "begin_time": start_ts.strftime("%Y-%m-%d %H:%M:%S"), + "end_time": end_ts.strftime("%Y-%m-%d %H:%M:%S") + }, token) + + trips = resp.get("result", []) + with get_conn() as conn: + with conn.cursor() as cur: + for t in trips: + dist_km = clean_num(t.get("distance")) + dist_m = dist_km * 1000 if dist_km is not None else 0 # [QA-01] Conversion + cur.execute(""" + INSERT INTO tracksolid.trips (imei, start_time, end_time, distance_m, avg_speed_kmh) + VALUES (%s, %s, %s, %s, %s) ON CONFLICT (imei, start_time) DO NOTHING + """, (t.get("imei"), clean_ts(t.get("startTime")), clean_ts(t.get("endTime")), dist_m, clean_num(t.get("avgSpeed")))) + inserted += 1 + conn.commit() + log.info("Trips: %d records processed.", inserted) + +# ── Main Loop ───────────────────────────────────────────────────────────────── + +def main(): + log.info("Starting MOVEMENT PIPELINE (v2.0)...") + + # Startup catch-up + _safe(sync_devices)() + _safe(poll_live_positions)() + _safe(poll_trips)() + + # Schedule + schedule.every(60).seconds.do(_safe(poll_live_positions)) + schedule.every(15).minutes.do(_safe(poll_trips)) + schedule.every().day.at("02:00").do(_safe(sync_devices)) + + while True: + schedule.run_pending() + time.sleep(1) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..d934f6d --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,27 @@ +[project] +name = "fireside-tracksolid-ingest" +version = "1.0.0" +description = "Fireside Communications — Tracksolid Pro Telemetry Ingestion" +readme = "README.md" +requires-python = ">=3.12" +authors = [ + { name = "Fireside DevOps", email = "devops@firesideafrica.cloud" } +] +# Define your dependencies here +dependencies = [ + "psycopg2-binary>=2.9.9", # Database driver (binary version is easier for Docker) + "requests>=2.32.3", # API requests + "schedule>=1.2.2", # Polling loops/scheduler + "urllib3>=2.2.2", # HTTP connection pooling/retries +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.uv] +# Tells uv to manage a virtual environment automatically +managed = true + +[tool.uv.sources] +# Optional: If you ever have custom local modules or git-based private libs \ No newline at end of file diff --git a/ts_shared_rev.py b/ts_shared_rev.py new file mode 100644 index 0000000..2fbf39b --- /dev/null +++ b/ts_shared_rev.py @@ -0,0 +1,246 @@ +""" +ts_shared_rev.py — Fireside Communications · Tracksolid Pro Ingestion Stack +━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ +Shared utilities: config, signing, HTTP, DB pool, token cache, clean helpers. +Imported by ingest_movement_rev.py and ingest_events_rev.py. + +REVISIONS (QA-Verified): + [FIX-01] Secrets exclusively from env (Security). + [FIX-02] psycopg2.pool.ThreadedConnectionPool (Performance). + [FIX-03] Exponential back-off on transient HTTP/API errors (Resiliency). + [FIX-04] Token refresh via jimi.oauth.token.refresh (Efficiency). + [FIX-05] API rate-limit (1006) back-off + re-sign (Resiliency). + [FIX-QA-01] clean_num/clean_int return None on non-numeric (Data Integrity). + [FIX-QA-02] api_post catches all RequestExceptions for retry (Robustness). +━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ +""" + +from __future__ import annotations + +import hashlib +import logging +import os +import time +from contextlib import contextmanager +from datetime import datetime, timezone, timedelta +from typing import Optional, Any + +import psycopg2 +import psycopg2.extras +import psycopg2.pool +import requests +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry + +# ── Configuration ───────────────────────────────────────────────────────────── + +def _require_env(key: str) -> str: + v = os.getenv(key) + if not v: + raise EnvironmentError(f"Required environment variable '{key}' is missing.") + return v + +APP_KEY = _require_env("TRACKSOLID_APP_KEY") +APP_SECRET = _require_env("TRACKSOLID_APP_SECRET") +USER_ID = _require_env("TRACKSOLID_USER_ID") +TARGET_ACCOUNT = os.getenv("TRACKSOLID_TARGET_ACCOUNT", USER_ID) +PWD_MD5 = _require_env("TRACKSOLID_PWD_MD5") +DATABASE_URL = _require_env("DATABASE_URL") +API_BASE_URL = os.getenv("TRACKSOLID_API_URL", "https://eu-open.tracksolidpro.com/route/rest") + +# Pool sizing: Min 2 for low traffic, Max 12 for high frequency telemetry +_POOL_MIN = 2 +_POOL_MAX = int(os.getenv("DB_POOL_MAX", "12")) + +# ── Logging ─────────────────────────────────────────────────────────────────── + +def get_logger(name: str) -> logging.Logger: + """Standardized logger for systemd/journald ingestion.""" + root = logging.getLogger("tracksolid") + if not root.handlers: + handler = logging.StreamHandler() + handler.setFormatter(logging.Formatter( + "%(asctime)s [%(levelname)s] %(name)s — %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + )) + root.addHandler(handler) + root.setLevel(logging.INFO) + return root.getChild(name) + +_log = get_logger("shared") + +# ── Connection Pool (psycopg2) ─────────────────────────────────────────────── + +_pool: Optional[psycopg2.pool.ThreadedConnectionPool] = None + +def _get_pool() -> psycopg2.pool.ThreadedConnectionPool: + global _pool + if _pool is None or _pool.closed: + _pool = psycopg2.pool.ThreadedConnectionPool( + _POOL_MIN, _POOL_MAX, DATABASE_URL, + options="-c client_encoding=UTF8", + ) + _log.info("DB Pool initialized (min=%d, max=%d)", _POOL_MIN, _POOL_MAX) + return _pool + +@contextmanager +def get_conn(): + """Thread-safe DB connection context manager.""" + pool = _get_pool() + conn = pool.getconn() + try: + conn.autocommit = False + yield conn + except Exception: + conn.rollback() + raise + finally: + pool.putconn(conn) + +def close_pool(): + global _pool + if _pool: + _pool.closeall() + _log.info("DB Pool closed.") + +# ── Value Cleaning (QA Fixes) ───────────────────────────────────────────────── + +def clean(v: Any) -> Optional[str]: + if v is None: return None + s = str(v).strip() + return s if s != "" else None + +def clean_num(v: Any) -> Optional[float]: + """QA-01: Explicitly returns None for non-numeric strings.""" + s = clean(v) + if s is None: return None + try: + return float(s) + except (ValueError, TypeError): + return None + +def clean_int(v: Any) -> Optional[int]: + s = clean(v) + if s is None: return None + try: + return int(float(s)) + except (ValueError, TypeError): + return None + +def is_valid_fix(lat: Any, lng: Any) -> bool: + """Filters out 0,0 'Zero Island' markers and null positions.""" + flat, flng = clean_num(lat), clean_num(lng) + if flat is None or flng is None: return False + if flat == 0.0 and flng == 0.0: return False + return (-90 <= flat <= 90) and (-180 <= flng <= 180) + +# ── API Signature & HTTP ────────────────────────────────────────────────────── + +def build_sign(params: dict, secret: str) -> str: + """Tracksolid MD5 Signature: secret + k1v1k2v2... + secret.""" + sorted_keys = sorted(k for k in params if k != "sign" and params[k] is not None) + raw = secret + "".join(f"{k}{params[k]}" for k in sorted_keys) + secret + return hashlib.md5(raw.encode("utf-8")).hexdigest().upper() + +_session = requests.Session() +_session.mount("https://", HTTPAdapter(max_retries=Retry(total=3, backoff_factor=1))) + +def api_post(method: str, extra: dict, access_token: Optional[str] = None, _retry_count: int = 0) -> dict: + """ + Production-grade API caller. + Handles: Retries, Signing, Rate Limiting (1006), and Token Expiry (1004). + """ + params = { + "method": method, + "app_key": APP_KEY, + "timestamp": datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S"), + "sign_method": "md5", + "v": "1.0", + "format": "json", + } + if access_token: params["access_token"] = access_token + params.update(extra) + params["sign"] = build_sign(params, APP_SECRET) + + try: + r = _session.post(API_BASE_URL, data=params, timeout=25) + r.raise_for_status() + data = r.json() + except Exception as e: + if _retry_count < 3: + time.sleep(2 ** _retry_count) + return api_post(method, extra, access_token, _retry_count + 1) + return {"code": -1, "message": str(e)} + + code = data.get("code") + # Handle Rate Limit (1006) + if code == 1006 and _retry_count < 3: + wait = 10 * (_retry_count + 1) + _log.warning("Rate limit hit [%s]. Backing off %ds...", method, wait) + time.sleep(wait) + return api_post(method, extra, access_token, _retry_count + 1) + + return data + +# ── Database Operations ─────────────────────────────────────────────────────── + +def get_active_imeis() -> list[str]: + with get_conn() as conn: + with conn.cursor() as cur: + cur.execute("SELECT imei FROM tracksolid.devices WHERE enabled_flag = 1") + return [r[0] for r in cur.fetchall()] + +def log_ingestion(cur, endpoint: str, imei_count: int, upserted: int, inserted: int, duration_ms: int, success: bool, error_code: str = None, error_msg: str = None): + cur.execute(""" + INSERT INTO tracksolid.ingestion_log + (endpoint, imei_count, rows_upserted, rows_inserted, duration_ms, success, error_code, error_message) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s) + """, (endpoint[:100], imei_count, upserted, inserted, duration_ms, success, str(error_code)[:50], str(error_msg)[:500])) + +# ── Token Management ────────────────────────────────────────────────────────── + +def get_token() -> Optional[str]: + """Cache-aware token fetcher with auto-refresh.""" + with get_conn() as conn: + with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: + cur.execute("SELECT access_token, refresh_token, expires_at FROM tracksolid.api_token_cache WHERE account = %s", (USER_ID,)) + row = cur.fetchone() + + now = datetime.now(timezone.utc) + if row: + expires_at = row['expires_at'].replace(tzinfo=timezone.utc) + diff = (expires_at - now).total_seconds() + + if diff > 1800: return row['access_token'] + if diff > 0 and row['refresh_token']: return _refresh_token(row['refresh_token']) + + return _fetch_new_token() + +def _fetch_new_token() -> Optional[str]: + _log.info("Requesting new access token (Full Auth)...") + res = api_post("jimi.oauth.token.get", {"user_id": USER_ID, "user_pwd_md5": PWD_MD5, "expires_in": 7200}) + if res.get("code") == 0: + return _update_token_cache(res["result"]) + return None + +def _refresh_token(refresh_token: str) -> Optional[str]: + _log.info("Refreshing access token...") + res = api_post("jimi.oauth.token.refresh", {"refresh_token": refresh_token}) + if res.get("code") == 0: + return _update_token_cache(res["result"]) + return _fetch_new_token() + +def _update_token_cache(r: dict) -> str: + token, expires_in = r["accessToken"], int(r.get("expiresIn", 7200)) + expires_at = datetime.now(timezone.utc) + timedelta(seconds=expires_in) + with get_conn() as conn: + with conn.cursor() as cur: + cur.execute(""" + INSERT INTO tracksolid.api_token_cache (account, access_token, refresh_token, expires_at) + VALUES (%s, %s, %s, %s) + ON CONFLICT (account) DO UPDATE SET + access_token=EXCLUDED.access_token, refresh_token=EXCLUDED.refresh_token, + expires_at=EXCLUDED.expires_at, obtained_at=NOW() + """, (USER_ID, token, r.get("refreshToken"), expires_at)) + conn.commit() + return token \ No newline at end of file