diff --git a/CLAUDE.md b/CLAUDE.md index 586282a..f7b27ac 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -45,6 +45,7 @@ This repository ingests the Tracksolid Pro API into a TimescaleDB/PostGIS databa | Visualisation | Grafana (provisioned via custom image) | | Workflow automation | n8n | | API source | Tracksolid Pro / Jimi IoT Open Platform (`eu-open.tracksolidpro.com/route/rest`) | +| Backup | pg_dump sidecar → rustfs S3 (`fleet-db` bucket), nightly | | Version control | Forgejo at `repo.rahamafresh.com` | --- @@ -84,6 +85,11 @@ docs/ # Reference docs (connections, API, KPIs, project co docs/superpowers/ # Pitch specs and implementation plans (not deployed code) 02_tracksolid_full_schema_rev.sql # Full schema bootstrap 03..06_*.sql # Incremental migrations (06 adds assigned_city, dispatch_log, ops.*) +07_analytics_views.sql # Analytics views migration (applied 2026-04-21) +Dockerfile # Custom image for ingest/webhook containers +pyproject.toml # Python project + uv dependency spec +OPERATIONS_MANUAL.md # Day-to-day ops runbook +backup/ # pg_dump sidecar scripts and config 01_BusinessAnalytics.md # SQL analytics library — read before writing queries 20260414_FS__Logistics - final_fixed.csv # 144-device driver/vehicle source data tracksolidApiDocumentation.md # API endpoint reference @@ -165,6 +171,8 @@ All views carry a `COMMENT ON VIEW` referencing their spec — `\d+ tracksolid.v | FIX-M15 | `ingest_movement_rev.py` | `get_device_locations()` — on-demand precision refresh | | FIX-M16 | `ingest_movement_rev.py` | `distance` from API is metres → divide by 1000 before storing | | FIX-M17 | `ingest_movement_rev.py` | `sync_devices()` ON CONFLICT now updates all 26 fields (was 5) | +| FIX-M18 | `ingest_movement_rev.py` | `sync_devices()` pulls `vehicleName`/`vehicleNumber`/`driverName`/`driverPhone`/`sim` from `jimi.track.device.detail` — list endpoint returns null for these even when set | +| FIX-M19 | `ts_shared_rev.py`, `ingest_movement_rev.py` | Multi-account support: fleet spans `fireside`, `Fireside@HQ`, `Fireside_MSA` (156 devices total). `sync_devices`, `poll_live_positions`, `poll_parking` iterate `TRACKSOLID_TARGETS` (comma-separated env var). New helper `get_active_imeis_by_target()` scopes parking calls to the right account | | FIX-E06 | `ingest_events_rev.py` | Alarm field mapping: `alertTypeId`/`alarmTypeName`/`alertTime` | | BUG-02 | Migration 04 | Historical `distance_m` rows ÷1,000,000 → renamed to `distance_km` | @@ -209,7 +217,7 @@ Latest full snapshot: `260412_baseline_report.md` | Priority | Item | |---|---| | HIGH | Run `import_drivers_csv.py --apply` — 144 X3/JC400P devices with names + plates waiting | -| HIGH | Register webhooks: `/pushobd` `/pushoil` `/pushtem` `/pushlbs` `/pushevent` | +| HIGH | Register webhooks: `/pushoil` `/pushtem` `/pushlbs` (auto-register on push now done — commit 257643c) | | HIGH | Investigate X3-63282 in Kampala — legitimate or unauthorised? | | MEDIUM | Set `fuel_100km` per vehicle type to activate fuel cost calculations | | MEDIUM | Investigate 44 silent devices (only 19 of 63 reporting) — SIM installed? Activated? | diff --git a/backup/Dockerfile b/backup/Dockerfile new file mode 100644 index 0000000..e9c1eae --- /dev/null +++ b/backup/Dockerfile @@ -0,0 +1,16 @@ +FROM alpine:3.20 + +RUN apk add --no-cache \ + postgresql16-client \ + aws-cli \ + gzip \ + tzdata \ + bash \ + coreutils + +WORKDIR /app +COPY backup_db.sh /app/backup_db.sh +COPY entrypoint.sh /app/entrypoint.sh +RUN chmod +x /app/backup_db.sh /app/entrypoint.sh + +ENTRYPOINT ["/app/entrypoint.sh"] diff --git a/backup/backup_db.sh b/backup/backup_db.sh new file mode 100755 index 0000000..7a724ce --- /dev/null +++ b/backup/backup_db.sh @@ -0,0 +1,58 @@ +#!/bin/sh +# Nightly pg_dump → rustfs (S3-compatible). +# Required env: POSTGRES_USER, POSTGRES_PASSWORD, POSTGRES_DB, +# RUSTFS_ENDPOINT, RUSTFS_ACCESS_KEY, RUSTFS_SECRET_KEY, RUSTFS_BUCKET. +# Optional: BACKUP_KEEP_DAYS (default 30), PGHOST (default timescale_db). + +set -eu + +: "${POSTGRES_USER:?}" +: "${POSTGRES_PASSWORD:?}" +: "${POSTGRES_DB:?}" +: "${RUSTFS_ENDPOINT:?}" +: "${RUSTFS_ACCESS_KEY:?}" +: "${RUSTFS_SECRET_KEY:?}" +: "${RUSTFS_BUCKET:?}" + +PGHOST="${PGHOST:-timescale_db}" +PGPORT="${PGPORT:-5432}" +KEEP_DAYS="${BACKUP_KEEP_DAYS:-30}" + +TS="$(date -u +%Y%m%d_%H%M%SZ)" +FILE="${POSTGRES_DB}_${TS}.sql.gz" +TMP="/tmp/${FILE}" + +export AWS_ACCESS_KEY_ID="$RUSTFS_ACCESS_KEY" +export AWS_SECRET_ACCESS_KEY="$RUSTFS_SECRET_KEY" +export AWS_DEFAULT_REGION="${RUSTFS_REGION:-us-east-1}" +export PGPASSWORD="$POSTGRES_PASSWORD" + +echo "[$(date -u +%FT%TZ)] pg_dump ${POSTGRES_DB}@${PGHOST} -> ${FILE}" +pg_dump -h "$PGHOST" -p "$PGPORT" -U "$POSTGRES_USER" -d "$POSTGRES_DB" \ + --no-owner --no-privileges --format=plain \ + | gzip -9 > "$TMP" + +SIZE=$(wc -c < "$TMP") +echo "[$(date -u +%FT%TZ)] dump size: ${SIZE} bytes" + +KEY="daily/${FILE}" +echo "[$(date -u +%FT%TZ)] uploading s3://${RUSTFS_BUCKET}/${KEY}" +aws --endpoint-url "$RUSTFS_ENDPOINT" s3 cp "$TMP" "s3://${RUSTFS_BUCKET}/${KEY}" + +rm -f "$TMP" + +# Prune anything older than KEEP_DAYS. +CUTOFF="$(date -u -d "-${KEEP_DAYS} days" +%Y%m%d 2>/dev/null || date -u -v -"${KEEP_DAYS}"d +%Y%m%d)" +aws --endpoint-url "$RUSTFS_ENDPOINT" s3 ls "s3://${RUSTFS_BUCKET}/daily/" \ + | awk '{print $4}' \ + | while read -r OBJ; do + [ -z "$OBJ" ] && continue + OBJ_DATE=$(echo "$OBJ" | sed -n 's/.*_\([0-9]\{8\}\)_.*/\1/p') + [ -z "$OBJ_DATE" ] && continue + if [ "$OBJ_DATE" -lt "$CUTOFF" ]; then + echo "[$(date -u +%FT%TZ)] prune s3://${RUSTFS_BUCKET}/daily/${OBJ}" + aws --endpoint-url "$RUSTFS_ENDPOINT" s3 rm "s3://${RUSTFS_BUCKET}/daily/${OBJ}" + fi + done + +echo "[$(date -u +%FT%TZ)] backup complete" diff --git a/backup/entrypoint.sh b/backup/entrypoint.sh new file mode 100755 index 0000000..f74eaed --- /dev/null +++ b/backup/entrypoint.sh @@ -0,0 +1,62 @@ +#!/bin/sh +# Runs backup_db.sh at each time in BACKUP_TIMES_UTC (comma-separated HH:MM list). +# Defaults: 02:30, 08:30, 14:30, 20:30 UTC — four backups per day. +# +# Back-compat: if BACKUP_TIMES_UTC is unset but legacy BACKUP_HOUR/BACKUP_MINUTE are, +# those are honored as a single slot. + +set -eu + +if [ -n "${BACKUP_TIMES_UTC:-}" ]; then + TIMES="$BACKUP_TIMES_UTC" +elif [ -n "${BACKUP_HOUR:-}" ] || [ -n "${BACKUP_MINUTE:-}" ]; then + TIMES="$(printf '%02d:%02d' "${BACKUP_HOUR:-2}" "${BACKUP_MINUTE:-30}")" +else + TIMES="02:30,08:30,14:30,20:30" +fi + +echo "[$(date -u +%FT%TZ)] backup schedule (UTC): ${TIMES}" + +if [ "${BACKUP_RUN_ON_START:-0}" = "1" ]; then + echo "[$(date -u +%FT%TZ)] BACKUP_RUN_ON_START=1 — running backup immediately" + /app/backup_db.sh || echo "[$(date -u +%FT%TZ)] initial backup failed (continuing)" +fi + +# Compute epoch for "today HH:MM UTC" on both GNU and BSD date. +slot_to_epoch_today() { + HM="$1" + date -u -d "today ${HM}:00" +%s 2>/dev/null \ + || date -u -j -f "%H:%M:%S" "${HM}:00" +%s +} + +while true; do + NOW_EPOCH=$(date -u +%s) + NEXT="" + # Find the smallest TARGET > NOW across all configured slots (rolling to tomorrow if needed). + OLDIFS="$IFS" + IFS=',' + for HM in $TIMES; do + HM="$(echo "$HM" | tr -d ' ')" + [ -z "$HM" ] && continue + T=$(slot_to_epoch_today "$HM") + if [ "$T" -le "$NOW_EPOCH" ]; then + T=$((T + 86400)) + fi + if [ -z "$NEXT" ] || [ "$T" -lt "$NEXT" ]; then + NEXT="$T" + fi + done + IFS="$OLDIFS" + + if [ -z "$NEXT" ]; then + echo "[$(date -u +%FT%TZ)] no valid times in BACKUP_TIMES_UTC='${TIMES}'; sleeping 1h" + sleep 3600 + continue + fi + + SLEEP=$((NEXT - NOW_EPOCH)) + NEXT_ISO=$(date -u -d "@${NEXT}" +%FT%TZ 2>/dev/null || date -u -r "${NEXT}" +%FT%TZ) + echo "[$(date -u +%FT%TZ)] next backup in ${SLEEP}s (at ${NEXT_ISO})" + sleep "$SLEEP" + /app/backup_db.sh || echo "[$(date -u +%FT%TZ)] backup failed (will retry at next slot)" +done diff --git a/docker-compose.yaml b/docker-compose.yaml index df4b87d..26618b1 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -6,10 +6,13 @@ services: - POSTGRES_DB=${POSTGRES_DB} - POSTGRES_USER=${POSTGRES_USER} - POSTGRES_PASSWORD=${POSTGRES_PASSWORD} + # HA image's PGDATA is /home/postgres/pgdata/data, not /var/lib/postgresql/data. + # Mount the named volume there so data survives container rebuilds. + - PGDATA=/home/postgres/pgdata/data ports: - "5433:5432" volumes: - - timescale-data:/var/lib/postgresql/data + - timescale-data:/home/postgres/pgdata healthcheck: test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER} -d ${POSTGRES_DB}"] interval: 10s @@ -76,6 +79,26 @@ services: # You will set the actual URL in the Coolify UI, # but the service needs to expose port 3000 internally. + db_backup: + build: + context: ./backup + dockerfile: Dockerfile + restart: always + depends_on: + timescale_db: + condition: service_healthy + env_file: .env + environment: + # pg_dump → rustfs. Credentials from .env (RUSTFS_*). + # BACKUP_TIMES_UTC: comma-separated HH:MM list. Default: 4×/day. + - BACKUP_TIMES_UTC=${BACKUP_TIMES_UTC:-02:30,08:30,14:30,20:30} + - BACKUP_KEEP_DAYS=${BACKUP_KEEP_DAYS:-30} + - BACKUP_RUN_ON_START=${BACKUP_RUN_ON_START:-0} + - RUSTFS_ENDPOINT=${RUSTFS_ENDPOINT} + - RUSTFS_ACCESS_KEY=${RUSTFS_ACCESS_KEY} + - RUSTFS_SECRET_KEY=${RUSTFS_SECRET_KEY} + - RUSTFS_BUCKET=${RUSTFS_BUCKET:-fleet-db} + volumes: timescale-data: name: timescale-data diff --git a/ingest_movement_rev.py b/ingest_movement_rev.py index 67ad1b2..90651d5 100644 --- a/ingest_movement_rev.py +++ b/ingest_movement_rev.py @@ -30,6 +30,14 @@ REVISIONS (QA-Verified): jimi.device.location.get for up to 50 specific IMEIs on demand. Used for precision refreshes (alarm enrichment, stale device recovery) without waiting for the next full fleet sweep. + [FIX-M18] sync_devices: Pull vehicleName / vehicleNumber / driverName / + driverPhone / sim from jimi.track.device.detail first (list + endpoint returns null for these even when populated via + jimi.open.device.update). + [FIX-M19] Multi-account support: the fleet is split across multiple + Tracksolid sub-accounts. sync_devices, poll_live_positions + and poll_parking now iterate every target in TRACKSOLID_TARGETS + and dedupe/scope per-target before writing. ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ """ @@ -42,8 +50,10 @@ from psycopg2.extras import execute_values from ts_shared_rev import ( TARGET_ACCOUNT, + TARGETS, api_post, get_active_imeis, + get_active_imeis_by_target, get_conn, get_token, is_valid_fix, @@ -63,14 +73,26 @@ setup_shutdown(log) # ── 1. Device Registry Sync (Daily) ────────────────────────────────────────── def sync_devices(): - log.info("Syncing device registry...") + log.info("Syncing device registry across %d target(s): %s", len(TARGETS), TARGETS) t0, token = time.time(), get_token() if not token: return - resp = api_post("jimi.user.device.list", {"target": TARGET_ACCOUNT}, token) - if resp.get("code") != 0: return + # [FIX-M19] Fleet is split across multiple sub-accounts. Aggregate the + # device list from every configured target and dedupe by IMEI. + devices_by_imei: dict[str, dict] = {} + for target in TARGETS: + resp = api_post("jimi.user.device.list", {"target": target}, token) + if resp.get("code") != 0: + log.warning("device.list failed for target=%s: code=%s msg=%s", + target, resp.get("code"), resp.get("message")) + continue + for d in (resp.get("result") or []): + imei = d.get("imei") + if imei: + devices_by_imei[imei] = d - devices = resp.get("result") or [] + devices = list(devices_by_imei.values()) + log.info("Aggregated %d unique devices across targets.", len(devices)) upserted = 0 # Fetch per-device detail in parallel — previously an N+1 blocker where @@ -134,10 +156,17 @@ def sync_devices(): last_synced_at = NOW(), updated_at = NOW() """, ( + # [FIX-M18] vehicleName/vehicleNumber/driverName/driverPhone/sim + # only surface via jimi.track.device.detail — list returns null. imei, clean(d.get("deviceName")), clean(d.get("mcType")), clean(d.get("mcTypeUseScope")), - clean(d.get("vehicleName")), clean(d.get("vehicleNumber")), clean(d.get("vehicleModels")), clean(d.get("vehicleIcon")), + clean(dtl.get("vehicleName") or d.get("vehicleName")), + clean(dtl.get("vehicleNumber") or d.get("vehicleNumber")), + clean(d.get("vehicleModels")), clean(d.get("vehicleIcon")), clean(dtl.get("vin")), clean(dtl.get("engineNumber")), clean(dtl.get("vehicleBrand")), clean_num(dtl.get("fuel_100km")), - clean(d.get("driverName")), clean(d.get("driverPhone")), clean(d.get("sim")), clean(dtl.get("iccid")), clean(dtl.get("imsi")), + clean(dtl.get("driverName") or d.get("driverName")), + clean(dtl.get("driverPhone") or d.get("driverPhone")), + clean(dtl.get("sim") or d.get("sim")), + clean(dtl.get("iccid")), clean(dtl.get("imsi")), clean(dtl.get("account")), clean(dtl.get("customerName")), clean(d.get("deviceGroupId")), clean(d.get("deviceGroup")), clean_ts(d.get("activationTime")), clean_ts(d.get("expiration")), clean_int(d.get("enabledFlag", 1)), clean(dtl.get("status", "active")), clean_num(dtl.get("currentMileage")) @@ -154,10 +183,21 @@ def poll_live_positions(): t0, token = time.time(), get_token() if not token: return - resp = api_post("jimi.user.device.location.list", {"target": TARGET_ACCOUNT, "map_type": "GOOGLE"}, token) - if resp.get("code") != 0: return + # [FIX-M19] Iterate every target and dedupe by IMEI (keep last). + positions_by_imei: dict[str, dict] = {} + for target in TARGETS: + resp = api_post("jimi.user.device.location.list", + {"target": target, "map_type": "GOOGLE"}, token) + if resp.get("code") != 0: + log.warning("location.list failed for target=%s: code=%s msg=%s", + target, resp.get("code"), resp.get("message")) + continue + for p in (resp.get("result") or []): + imei = p.get("imei") + if imei: + positions_by_imei[imei] = p - positions = resp.get("result") or [] + positions = list(positions_by_imei.values()) upserted, inserted = 0, 0 with get_conn() as conn: @@ -277,63 +317,66 @@ def poll_trips(): def poll_parking(): t0 = time.time() - token, imeis = get_token(), get_active_imeis() - if not token or not imeis: return + # [FIX-M19] Parking requires an `account` param tied to the IMEI's + # sub-account — call per target with that target's IMEIs only. + token, imeis_by_target = get_token(), get_active_imeis_by_target() + if not token or not imeis_by_target: return end_ts = datetime.now(timezone.utc) start_ts = end_ts - timedelta(hours=1) + total_imei = sum(len(v) for v in imeis_by_target.values()) inserted = 0 with get_conn() as conn: with conn.cursor() as cur: - for i in range(0, len(imeis), 50): - batch = imeis[i:i+50] - # [FIX-M13] Added account + acc_type=0 (all stop types). Without these - # the API returns empty results even when parking events exist. - resp = api_post("jimi.open.platform.report.parking", { - "account": TARGET_ACCOUNT, - "imeis": ",".join(batch), - "begin_time": start_ts.strftime("%Y-%m-%d %H:%M:%S"), - "end_time": end_ts.strftime("%Y-%m-%d %H:%M:%S"), - "acc_type": 0, - }, token) + for target, target_imeis in imeis_by_target.items(): + for i in range(0, len(target_imeis), 50): + batch = target_imeis[i:i+50] + # [FIX-M13] account + acc_type=0 required for non-empty results. + resp = api_post("jimi.open.platform.report.parking", { + "account": target, + "imeis": ",".join(batch), + "begin_time": start_ts.strftime("%Y-%m-%d %H:%M:%S"), + "end_time": end_ts.strftime("%Y-%m-%d %H:%M:%S"), + "acc_type": 0, + }, token) - events = resp.get("result") or [] - for p in events: - try: - cur.execute("SAVEPOINT sp") - imei = p.get("imei") - start_time = clean_ts(p.get("startTime")) - if not imei or not start_time: + events = resp.get("result") or [] + for p in events: + try: + cur.execute("SAVEPOINT sp") + imei = p.get("imei") + start_time = clean_ts(p.get("startTime")) + if not imei or not start_time: + cur.execute("RELEASE SAVEPOINT sp") + continue + lat, lng = clean_num(p.get("lat")), clean_num(p.get("lng")) + cur.execute(""" + INSERT INTO tracksolid.parking_events ( + imei, event_type, start_time, end_time, + duration_seconds, geom, address + ) VALUES ( + %s, 'parking', %s, %s, %s, + CASE WHEN %s IS NOT NULL AND %s IS NOT NULL + THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326) + ELSE NULL END, + %s + ) ON CONFLICT (imei, start_time, event_type) DO NOTHING + """, ( + imei, start_time, clean_ts(p.get("endTime")), + clean_int(p.get("durSecond")), # [FIX-M13] API returns durSecond, not seconds + lng, lat, lng, lat, + clean(p.get("address")) + )) cur.execute("RELEASE SAVEPOINT sp") - continue - lat, lng = clean_num(p.get("lat")), clean_num(p.get("lng")) - cur.execute(""" - INSERT INTO tracksolid.parking_events ( - imei, event_type, start_time, end_time, - duration_seconds, geom, address - ) VALUES ( - %s, 'parking', %s, %s, %s, - CASE WHEN %s IS NOT NULL AND %s IS NOT NULL - THEN ST_SetSRID(ST_MakePoint(%s, %s), 4326) - ELSE NULL END, - %s - ) ON CONFLICT (imei, start_time, event_type) DO NOTHING - """, ( - imei, start_time, clean_ts(p.get("endTime")), - clean_int(p.get("durSecond")), # [FIX-M13] API returns durSecond, not seconds - lng, lat, lng, lat, - clean(p.get("address")) - )) - cur.execute("RELEASE SAVEPOINT sp") - inserted += cur.rowcount - except Exception: - cur.execute("ROLLBACK TO SAVEPOINT sp") - log.warning("Failed to process parking for %s", p.get("imei"), exc_info=True) + inserted += cur.rowcount + except Exception: + cur.execute("ROLLBACK TO SAVEPOINT sp") + log.warning("Failed to process parking for %s", p.get("imei"), exc_info=True) - log_ingestion(cur, "jimi.open.platform.report.parking", len(imeis), 0, inserted, + log_ingestion(cur, "jimi.open.platform.report.parking", total_imei, 0, inserted, int((time.time() - t0) * 1000), True) - log.info("Parking: %d events processed.", inserted) + log.info("Parking: %d events processed across %d target(s).", inserted, len(imeis_by_target)) # ── 5. High-Resolution GPS Trail (Every 30m) — POLL-01 ─────────────────────── diff --git a/ts_shared_rev.py b/ts_shared_rev.py index 251575f..e782faf 100644 --- a/ts_shared_rev.py +++ b/ts_shared_rev.py @@ -48,6 +48,12 @@ APP_KEY = _require_env("TRACKSOLID_APP_KEY") APP_SECRET = _require_env("TRACKSOLID_APP_SECRET") USER_ID = _require_env("TRACKSOLID_USER_ID") TARGET_ACCOUNT = os.getenv("TRACKSOLID_TARGET_ACCOUNT", USER_ID) +# [FIX-M19] Multi-account support: the fleet is split across multiple +# Tracksolid sub-accounts (e.g. fireside, Fireside@HQ, Fireside_MSA). +# TRACKSOLID_TARGETS is a comma-separated list; falls back to TARGET_ACCOUNT. +TARGETS = [ + t.strip() for t in os.getenv("TRACKSOLID_TARGETS", "").split(",") if t.strip() +] or [TARGET_ACCOUNT] PWD_MD5 = _require_env("TRACKSOLID_PWD_MD5") DATABASE_URL = _require_env("DATABASE_URL") API_BASE_URL = os.getenv("TRACKSOLID_API_URL", "https://eu-open.tracksolidpro.com/route/rest") @@ -229,6 +235,23 @@ def get_active_imeis() -> list[str]: cur.execute("SELECT imei FROM tracksolid.devices WHERE enabled_flag = 1") return [r[0] for r in cur.fetchall()] +def get_active_imeis_by_target() -> dict[str, list[str]]: + """[FIX-M19] Group active IMEIs by their Tracksolid sub-account so + endpoints that require an `account`/`target` param (e.g. parking) can + scope per-target calls. IMEIs with a NULL account are bucketed under + the primary TARGET_ACCOUNT as a safe default.""" + with get_conn() as conn: + with conn.cursor() as cur: + cur.execute(""" + SELECT COALESCE(account, %s) AS target, imei + FROM tracksolid.devices + WHERE enabled_flag = 1 + """, (TARGET_ACCOUNT,)) + out: dict[str, list[str]] = {} + for target, imei in cur.fetchall(): + out.setdefault(target, []).append(imei) + return out + def log_ingestion(cur, endpoint: str, imei_count: int, upserted: int, inserted: int, duration_ms: int, success: bool, error_code: str = None, error_msg: str = None): cur.execute(""" INSERT INTO tracksolid.ingestion_log