From 68f2b99cd3398d4397cff6ba4860c36429930e91 Mon Sep 17 00:00:00 2001 From: david kiania Date: Mon, 15 Jun 2026 20:08:05 +0300 Subject: [PATCH] feat: S3 via boto3 + Dockerfile for Coolify deploy - Replace the aws-CLI subprocess calls with boto3 (list_objects_v2 paginator, get_object, copy_object+delete_object) using path-style addressing + RUSTFS_* env. Removes the external aws-CLI dependency so it runs in a slim container. - Add boto3 to pyproject dependencies. - Add Dockerfile (python:3.12-slim, deps, TZ=Africa/Nairobi, keep-alive CMD) and .dockerignore for Coolify; document Coolify Scheduled Task setup in README. Co-Authored-By: Claude Opus 4.8 --- .dockerignore | 8 ++++++ Dockerfile | 25 ++++++++++++++++ README.md | 28 ++++++++++-------- import_tickets.py | 72 +++++++++++++++++++++++------------------------ pyproject.toml | 1 + 5 files changed, 86 insertions(+), 48 deletions(-) create mode 100644 .dockerignore create mode 100644 Dockerfile diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..368cfac --- /dev/null +++ b/.dockerignore @@ -0,0 +1,8 @@ +.git +.venv +__pycache__/ +*.pyc +*.csv +.env +.DS_Store +uv.lock diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..d922fc6 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,25 @@ +# fleettickets — INC ingestion image (Coolify-deployable). +# A small batch/cron worker: it has no web server. Coolify keeps the container +# running (CMD below) and fires the ingest via a Scheduled Task: +# python import_tickets.py --from-bucket --apply (cron: 15 7-19 * * *) +# Env (set in Coolify): DATABASE_URL, RUSTFS_*, GEOCODER_*. S3 is via boto3 — no +# aws CLI needed. psycopg2-binary ships its own libpq, so no build toolchain. +FROM python:3.12-slim + +ENV PYTHONUNBUFFERED=1 \ + PIP_NO_CACHE_DIR=1 \ + TZ=Africa/Nairobi + +RUN apt-get update \ + && apt-get install -y --no-install-recommends tzdata \ + && rm -rf /var/lib/apt/lists/* + +WORKDIR /app + +# Dependencies (mirror pyproject.toml) — separate layer for build caching. +RUN pip install "psycopg2-binary>=2.9.9" "requests>=2.32.3" "boto3>=1.34" + +COPY . . + +# Keep the container alive so Coolify Scheduled Tasks can exec into it. +CMD ["tail", "-f", "/dev/null"] diff --git a/README.md b/README.md index 4086d59..9e0fcd3 100644 --- a/README.md +++ b/README.md @@ -61,21 +61,27 @@ python import_tickets.py --geocode-locations --apply # precise, actionable INC python import_tickets.py --inc-csv 2026-06-15T17-00-00.csv --apply ``` -Dry-run is the default (omit `--apply`). `import_tickets.py --from-bucket` shells out to -the `aws` CLI using the `RUSTFS_*` env (no boto3 dependency). +Dry-run is the default (omit `--apply`). `import_tickets.py --from-bucket` talks to S3 +via **boto3** using the `RUSTFS_*` env (path-style addressing; no aws-CLI dependency). -## Schedule (cron) +## Deploy (Coolify) -On the instance, ingest at **:15 past every hour, 07:00–19:00 EAT** via -[`run_ingest.sh`](run_ingest.sh) (loads `.env`, runs `--from-bucket --apply`): +The repo ships a [`Dockerfile`](Dockerfile) — a small batch worker with no web server. +Coolify builds it and keeps the container alive (`CMD tail -f /dev/null`); the ingest +runs as a **Scheduled Task**, not a system crontab: -```cron -CRON_TZ=Africa/Nairobi -15 7-19 * * * /opt/fleettickets/run_ingest.sh >> /var/log/fleettickets-inc.log 2>&1 -``` +- **Command:** `python import_tickets.py --from-bucket --apply` +- **Frequency:** `15 7-19 * * *` (`:15` past each hour, 07:00–19:00). If Coolify runs + scheduled tasks in **UTC**, use `15 4-16 * * *` (EAT is UTC+3); if it exposes a + per-task timezone, set `Africa/Nairobi` and keep `15 7-19 * * *`. +- **Env vars** (Coolify → Environment Variables): `DATABASE_URL` (internal DB host), + `RUSTFS_*`, `GEOCODER_*`. -`CRON_TZ` matters — the export filenames and this schedule are in `Africa/Nairobi`. -Skip-if-unchanged means a run on an already-ingested snapshot is a cheap no-op. +Skip-if-unchanged makes a run on an already-ingested snapshot a cheap no-op. + +For a plain host/VM instead of Coolify, [`run_ingest.sh`](run_ingest.sh) loads `.env` +and runs the ingest; schedule it with a crontab line +(`CRON_TZ=Africa/Nairobi` / `15 7-19 * * *`). ## Notes diff --git a/import_tickets.py b/import_tickets.py index 8dc1e63..dd34d82 100644 --- a/import_tickets.py +++ b/import_tickets.py @@ -51,16 +51,16 @@ from __future__ import annotations import argparse import csv import io -import json import math import os import re -import subprocess import time from datetime import datetime, timezone, timedelta +import boto3 import requests import psycopg2.extras +from botocore.config import Config as BotoConfig from shared import clean, get_conn, get_logger @@ -104,21 +104,18 @@ _last_geocode_at = 0.0 # automations/inc/.csv (no latest pointer, no envelope, no deltas). # We ingest the NEWEST file; if its S3 ETag matches the last processed file's ETag # we skip the DB write (the export re-emits byte-identical content most hours). -def _s3_env() -> dict: - return { - **os.environ, - "AWS_ACCESS_KEY_ID": os.environ["RUSTFS_ACCESS_KEY"], - "AWS_SECRET_ACCESS_KEY": os.environ["RUSTFS_SECRET_KEY"], - "AWS_DEFAULT_REGION": os.getenv("RUSTFS_REGION", "us-east-1"), - "AWS_S3_ADDRESSING_STYLE": "path", # force path-style to match the rustfs endpoint - } - - -def _aws(args: list[str], env: dict) -> bytes: - return subprocess.run( - ["aws", "--endpoint-url", os.environ["RUSTFS_ENDPOINT"], *args], - env=env, capture_output=True, timeout=180, check=True, - ).stdout +# S3 access is via boto3 (no aws-CLI dependency → runs cleanly in a slim container). +def _s3_client(): + """boto3 S3 client for the rustfs endpoint (force path-style addressing).""" + return boto3.client( + "s3", + endpoint_url=os.environ["RUSTFS_ENDPOINT"], + aws_access_key_id=os.environ["RUSTFS_ACCESS_KEY"], + aws_secret_access_key=os.environ["RUSTFS_SECRET_KEY"], + region_name=os.getenv("RUSTFS_REGION", "us-east-1"), + config=BotoConfig(s3={"addressing_style": "path"}, signature_version="s3v4", + retries={"max_attempts": 3, "mode": "standard"}), + ) def _ts_from_key(key: str) -> datetime | None: @@ -129,18 +126,19 @@ def _ts_from_key(key: str) -> datetime | None: return datetime.strptime(m.group(1), "%Y-%m-%dT%H-%M-%S").replace(tzinfo=_EAT) -def _list_inc_csvs(env: dict) -> list[tuple[str, str]]: +def _list_inc_csvs(s3) -> list[tuple[str, str]]: """[(key, etag)] for every automations/inc/.csv (excludes processed/ + dirs).""" - out = _aws( - ["s3api", "list-objects-v2", "--bucket", _BUCKET, "--prefix", _INC_PREFIX, - "--query", "Contents[].{Key:Key,ETag:ETag}", "--output", "json"], - env, - ).decode("utf-8").strip() - items = json.loads(out) if out and out != "None" else [] - return [ - (it["Key"], (it.get("ETag") or "").strip('"')) - for it in (items or []) if _CSV_KEY_RE.match(it.get("Key", "")) - ] + out: list[tuple[str, str]] = [] + for page in s3.get_paginator("list_objects_v2").paginate(Bucket=_BUCKET, Prefix=_INC_PREFIX): + for it in page.get("Contents", []): + if _CSV_KEY_RE.match(it["Key"]): + out.append((it["Key"], (it.get("ETag") or "").strip('"'))) + return out + + +def _get_text(s3, key: str) -> str: + """Download an object's body as UTF-8 text.""" + return s3.get_object(Bucket=_BUCKET, Key=key)["Body"].read().decode("utf-8") def _last_processed_etag() -> str | None: @@ -164,11 +162,12 @@ def _load_csv_local(path: str) -> list[dict]: return list(csv.DictReader(f)) -def _move_processed(keys: list[str], env: dict) -> None: - """Archive listed INC csv objects to automations/inc/processed/ (S3 mv = copy+delete).""" +def _move_processed(s3, keys: list[str]) -> None: + """Archive listed INC csv objects to automations/inc/processed/ (copy + delete).""" for key in keys: dst = _PROCESSED_PREFIX + key.rsplit("/", 1)[-1] - _aws(["s3", "mv", f"s3://{_BUCKET}/{key}", f"s3://{_BUCKET}/{dst}"], env) + s3.copy_object(Bucket=_BUCKET, CopySource={"Bucket": _BUCKET, "Key": key}, Key=dst) + s3.delete_object(Bucket=_BUCKET, Key=key) log.info("archived %s -> %s", key, dst) @@ -251,8 +250,8 @@ def ingest(args) -> None: return # --from-bucket: newest INC csv → skip-if-unchanged → ingest → archive. - env = _s3_env() - listing = _list_inc_csvs(env) + s3 = _s3_client() + listing = _list_inc_csvs(s3) if not listing: log.info("no INC csv files under %s — nothing to do", _INC_PREFIX) return @@ -266,13 +265,12 @@ def ingest(args) -> None: if newest_etag and newest_etag == last_etag: log.info("etag unchanged from last processed (%s) — skipping DB write", last_etag) if args.apply: - _move_processed(all_keys, env) + _move_processed(s3, all_keys) else: log.info("DRY-RUN — would archive %d file(s) to %s", len(all_keys), _PROCESSED_PREFIX) return - text = _aws(["s3", "cp", f"s3://{_BUCKET}/{newest_key}", "-"], env).decode("utf-8") - rows = _parse_csv(text) + rows = _parse_csv(_get_text(s3, newest_key)) ts = _ts_from_key(newest_key) meta = {"export_type": "full", "source_s3_key": newest_key, "source_etag": newest_etag, "row_count": len(rows)} @@ -280,7 +278,7 @@ def ingest(args) -> None: meta["exported_at"] = ts.isoformat() upsert(rows, args.apply, meta=meta) if args.apply: - _move_processed(all_keys, env) + _move_processed(s3, all_keys) else: log.info("DRY-RUN — would archive %d file(s) to %s", len(all_keys), _PROCESSED_PREFIX) diff --git a/pyproject.toml b/pyproject.toml index e35980a..4050e51 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,6 +6,7 @@ requires-python = ">=3.12" dependencies = [ "psycopg2-binary>=2.9.9", # DB driver "requests>=2.32.3", # geocoder HTTP + "boto3>=1.34", # S3 (rustfs) access — no aws-CLI dependency ] [project.optional-dependencies]