feat: S3 via boto3 + Dockerfile for Coolify deploy

- Replace the aws-CLI subprocess calls with boto3 (list_objects_v2 paginator,
  get_object, copy_object+delete_object) using path-style addressing + RUSTFS_*
  env. Removes the external aws-CLI dependency so it runs in a slim container.
- Add boto3 to pyproject dependencies.
- Add Dockerfile (python:3.12-slim, deps, TZ=Africa/Nairobi, keep-alive CMD) and
  .dockerignore for Coolify; document Coolify Scheduled Task setup in README.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
david kiania 2026-06-15 20:08:05 +03:00
parent 4532643247
commit 68f2b99cd3
5 changed files with 86 additions and 48 deletions

8
.dockerignore Normal file
View file

@ -0,0 +1,8 @@
.git
.venv
__pycache__/
*.pyc
*.csv
.env
.DS_Store
uv.lock

25
Dockerfile Normal file
View file

@ -0,0 +1,25 @@
# fleettickets — INC ingestion image (Coolify-deployable).
# A small batch/cron worker: it has no web server. Coolify keeps the container
# running (CMD below) and fires the ingest via a Scheduled Task:
# python import_tickets.py --from-bucket --apply (cron: 15 7-19 * * *)
# Env (set in Coolify): DATABASE_URL, RUSTFS_*, GEOCODER_*. S3 is via boto3 — no
# aws CLI needed. psycopg2-binary ships its own libpq, so no build toolchain.
FROM python:3.12-slim
ENV PYTHONUNBUFFERED=1 \
PIP_NO_CACHE_DIR=1 \
TZ=Africa/Nairobi
RUN apt-get update \
&& apt-get install -y --no-install-recommends tzdata \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
# Dependencies (mirror pyproject.toml) — separate layer for build caching.
RUN pip install "psycopg2-binary>=2.9.9" "requests>=2.32.3" "boto3>=1.34"
COPY . .
# Keep the container alive so Coolify Scheduled Tasks can exec into it.
CMD ["tail", "-f", "/dev/null"]

View file

@ -61,21 +61,27 @@ python import_tickets.py --geocode-locations --apply # precise, actionable INC
python import_tickets.py --inc-csv 2026-06-15T17-00-00.csv --apply python import_tickets.py --inc-csv 2026-06-15T17-00-00.csv --apply
``` ```
Dry-run is the default (omit `--apply`). `import_tickets.py --from-bucket` shells out to Dry-run is the default (omit `--apply`). `import_tickets.py --from-bucket` talks to S3
the `aws` CLI using the `RUSTFS_*` env (no boto3 dependency). via **boto3** using the `RUSTFS_*` env (path-style addressing; no aws-CLI dependency).
## Schedule (cron) ## Deploy (Coolify)
On the instance, ingest at **:15 past every hour, 07:0019:00 EAT** via The repo ships a [`Dockerfile`](Dockerfile) — a small batch worker with no web server.
[`run_ingest.sh`](run_ingest.sh) (loads `.env`, runs `--from-bucket --apply`): Coolify builds it and keeps the container alive (`CMD tail -f /dev/null`); the ingest
runs as a **Scheduled Task**, not a system crontab:
```cron - **Command:** `python import_tickets.py --from-bucket --apply`
CRON_TZ=Africa/Nairobi - **Frequency:** `15 7-19 * * *` (`:15` past each hour, 07:0019:00). If Coolify runs
15 7-19 * * * /opt/fleettickets/run_ingest.sh >> /var/log/fleettickets-inc.log 2>&1 scheduled tasks in **UTC**, use `15 4-16 * * *` (EAT is UTC+3); if it exposes a
``` per-task timezone, set `Africa/Nairobi` and keep `15 7-19 * * *`.
- **Env vars** (Coolify → Environment Variables): `DATABASE_URL` (internal DB host),
`RUSTFS_*`, `GEOCODER_*`.
`CRON_TZ` matters — the export filenames and this schedule are in `Africa/Nairobi`. Skip-if-unchanged makes a run on an already-ingested snapshot a cheap no-op.
Skip-if-unchanged means a run on an already-ingested snapshot is a cheap no-op.
For a plain host/VM instead of Coolify, [`run_ingest.sh`](run_ingest.sh) loads `.env`
and runs the ingest; schedule it with a crontab line
(`CRON_TZ=Africa/Nairobi` / `15 7-19 * * *`).
## Notes ## Notes

View file

@ -51,16 +51,16 @@ from __future__ import annotations
import argparse import argparse
import csv import csv
import io import io
import json
import math import math
import os import os
import re import re
import subprocess
import time import time
from datetime import datetime, timezone, timedelta from datetime import datetime, timezone, timedelta
import boto3
import requests import requests
import psycopg2.extras import psycopg2.extras
from botocore.config import Config as BotoConfig
from shared import clean, get_conn, get_logger from shared import clean, get_conn, get_logger
@ -104,21 +104,18 @@ _last_geocode_at = 0.0
# automations/inc/<EAT-timestamp>.csv (no latest pointer, no envelope, no deltas). # automations/inc/<EAT-timestamp>.csv (no latest pointer, no envelope, no deltas).
# We ingest the NEWEST file; if its S3 ETag matches the last processed file's ETag # We ingest the NEWEST file; if its S3 ETag matches the last processed file's ETag
# we skip the DB write (the export re-emits byte-identical content most hours). # we skip the DB write (the export re-emits byte-identical content most hours).
def _s3_env() -> dict: # S3 access is via boto3 (no aws-CLI dependency → runs cleanly in a slim container).
return { def _s3_client():
**os.environ, """boto3 S3 client for the rustfs endpoint (force path-style addressing)."""
"AWS_ACCESS_KEY_ID": os.environ["RUSTFS_ACCESS_KEY"], return boto3.client(
"AWS_SECRET_ACCESS_KEY": os.environ["RUSTFS_SECRET_KEY"], "s3",
"AWS_DEFAULT_REGION": os.getenv("RUSTFS_REGION", "us-east-1"), endpoint_url=os.environ["RUSTFS_ENDPOINT"],
"AWS_S3_ADDRESSING_STYLE": "path", # force path-style to match the rustfs endpoint aws_access_key_id=os.environ["RUSTFS_ACCESS_KEY"],
} aws_secret_access_key=os.environ["RUSTFS_SECRET_KEY"],
region_name=os.getenv("RUSTFS_REGION", "us-east-1"),
config=BotoConfig(s3={"addressing_style": "path"}, signature_version="s3v4",
def _aws(args: list[str], env: dict) -> bytes: retries={"max_attempts": 3, "mode": "standard"}),
return subprocess.run( )
["aws", "--endpoint-url", os.environ["RUSTFS_ENDPOINT"], *args],
env=env, capture_output=True, timeout=180, check=True,
).stdout
def _ts_from_key(key: str) -> datetime | None: def _ts_from_key(key: str) -> datetime | None:
@ -129,18 +126,19 @@ def _ts_from_key(key: str) -> datetime | None:
return datetime.strptime(m.group(1), "%Y-%m-%dT%H-%M-%S").replace(tzinfo=_EAT) return datetime.strptime(m.group(1), "%Y-%m-%dT%H-%M-%S").replace(tzinfo=_EAT)
def _list_inc_csvs(env: dict) -> list[tuple[str, str]]: def _list_inc_csvs(s3) -> list[tuple[str, str]]:
"""[(key, etag)] for every automations/inc/<ts>.csv (excludes processed/ + dirs).""" """[(key, etag)] for every automations/inc/<ts>.csv (excludes processed/ + dirs)."""
out = _aws( out: list[tuple[str, str]] = []
["s3api", "list-objects-v2", "--bucket", _BUCKET, "--prefix", _INC_PREFIX, for page in s3.get_paginator("list_objects_v2").paginate(Bucket=_BUCKET, Prefix=_INC_PREFIX):
"--query", "Contents[].{Key:Key,ETag:ETag}", "--output", "json"], for it in page.get("Contents", []):
env, if _CSV_KEY_RE.match(it["Key"]):
).decode("utf-8").strip() out.append((it["Key"], (it.get("ETag") or "").strip('"')))
items = json.loads(out) if out and out != "None" else [] return out
return [
(it["Key"], (it.get("ETag") or "").strip('"'))
for it in (items or []) if _CSV_KEY_RE.match(it.get("Key", "")) def _get_text(s3, key: str) -> str:
] """Download an object's body as UTF-8 text."""
return s3.get_object(Bucket=_BUCKET, Key=key)["Body"].read().decode("utf-8")
def _last_processed_etag() -> str | None: def _last_processed_etag() -> str | None:
@ -164,11 +162,12 @@ def _load_csv_local(path: str) -> list[dict]:
return list(csv.DictReader(f)) return list(csv.DictReader(f))
def _move_processed(keys: list[str], env: dict) -> None: def _move_processed(s3, keys: list[str]) -> None:
"""Archive listed INC csv objects to automations/inc/processed/ (S3 mv = copy+delete).""" """Archive listed INC csv objects to automations/inc/processed/ (copy + delete)."""
for key in keys: for key in keys:
dst = _PROCESSED_PREFIX + key.rsplit("/", 1)[-1] dst = _PROCESSED_PREFIX + key.rsplit("/", 1)[-1]
_aws(["s3", "mv", f"s3://{_BUCKET}/{key}", f"s3://{_BUCKET}/{dst}"], env) s3.copy_object(Bucket=_BUCKET, CopySource={"Bucket": _BUCKET, "Key": key}, Key=dst)
s3.delete_object(Bucket=_BUCKET, Key=key)
log.info("archived %s -> %s", key, dst) log.info("archived %s -> %s", key, dst)
@ -251,8 +250,8 @@ def ingest(args) -> None:
return return
# --from-bucket: newest INC csv → skip-if-unchanged → ingest → archive. # --from-bucket: newest INC csv → skip-if-unchanged → ingest → archive.
env = _s3_env() s3 = _s3_client()
listing = _list_inc_csvs(env) listing = _list_inc_csvs(s3)
if not listing: if not listing:
log.info("no INC csv files under %s — nothing to do", _INC_PREFIX) log.info("no INC csv files under %s — nothing to do", _INC_PREFIX)
return return
@ -266,13 +265,12 @@ def ingest(args) -> None:
if newest_etag and newest_etag == last_etag: if newest_etag and newest_etag == last_etag:
log.info("etag unchanged from last processed (%s) — skipping DB write", last_etag) log.info("etag unchanged from last processed (%s) — skipping DB write", last_etag)
if args.apply: if args.apply:
_move_processed(all_keys, env) _move_processed(s3, all_keys)
else: else:
log.info("DRY-RUN — would archive %d file(s) to %s", len(all_keys), _PROCESSED_PREFIX) log.info("DRY-RUN — would archive %d file(s) to %s", len(all_keys), _PROCESSED_PREFIX)
return return
text = _aws(["s3", "cp", f"s3://{_BUCKET}/{newest_key}", "-"], env).decode("utf-8") rows = _parse_csv(_get_text(s3, newest_key))
rows = _parse_csv(text)
ts = _ts_from_key(newest_key) ts = _ts_from_key(newest_key)
meta = {"export_type": "full", "source_s3_key": newest_key, meta = {"export_type": "full", "source_s3_key": newest_key,
"source_etag": newest_etag, "row_count": len(rows)} "source_etag": newest_etag, "row_count": len(rows)}
@ -280,7 +278,7 @@ def ingest(args) -> None:
meta["exported_at"] = ts.isoformat() meta["exported_at"] = ts.isoformat()
upsert(rows, args.apply, meta=meta) upsert(rows, args.apply, meta=meta)
if args.apply: if args.apply:
_move_processed(all_keys, env) _move_processed(s3, all_keys)
else: else:
log.info("DRY-RUN — would archive %d file(s) to %s", len(all_keys), _PROCESSED_PREFIX) log.info("DRY-RUN — would archive %d file(s) to %s", len(all_keys), _PROCESSED_PREFIX)

View file

@ -6,6 +6,7 @@ requires-python = ">=3.12"
dependencies = [ dependencies = [
"psycopg2-binary>=2.9.9", # DB driver "psycopg2-binary>=2.9.9", # DB driver
"requests>=2.32.3", # geocoder HTTP "requests>=2.32.3", # geocoder HTTP
"boto3>=1.34", # S3 (rustfs) access — no aws-CLI dependency
] ]
[project.optional-dependencies] [project.optional-dependencies]