Compare commits

...

6 commits

Author SHA1 Message Date
2f421d7439 Merge pull request 'infra(deploy): serve MCP on multiple domains + SKIP_BUILD' (#4) from infra/deploy-multidomain into main 2026-06-26 13:56:53 +00:00
kiania
af6fdbcd3f fix(logging): attribute each query to its analyst caller
The BearerAuth middleware matched a per-analyst token but only stashed it on
request.state, which the FastMCP tools never see — so the query log line logged
rows/sql with no caller, defeating the per-token attribution the auth design
promises. Bridge the caller name through a ContextVar (anyio propagates it into
the worker thread that runs each sync tool) and include it in the query() log.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-26 16:54:07 +03:00
kiania
fae40942a2 infra(deploy): serve MCP on multiple domains + SKIP_BUILD for label-only redeploys
The prod connector domain (fleetmcp.rahamafresh.com) had no Traefik router —
deploy.sh only ever set one HOST_DOMAIN (defaulting to fleetmcp.fivetitude.com),
so requests to the prod domain returned 503 "no available server" even with the
container healthy.

- HOST_DOMAINS: comma-separated list folded into one Traefik router rule
  (Host(`a`) || Host(`b`)). One LE cert covers all names (SANs), so connectors
  on either domain keep working. Defaults to HOST_DOMAIN (back-compatible).
- SKIP_BUILD=1: reuse the existing image for a labels/env-only redeploy, so a
  routing change can't accidentally bake in new/stale code.

Deployed to prod with HOST_DOMAINS="fleetmcp.rahamafresh.com,fleetmcp.fivetitude.com";
both domains verified (healthz 200, /mcp 401, valid SAN cert).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-23 21:35:31 +03:00
a36542dbc9 Merge pull request 'fix: harden MCP server reliability, build reproducibility, and auth' (#1) from fix/reliability-pool-build-guard into main 2026-06-19 21:40:07 +00:00
kiania
c02c127798 fix(connections): shrink MCP DB-connection footprint on a shared 100-conn DB
The DB is at max_connections=100 and several stack services hold persistent
pools (several as the postgres superuser, idle for hours), so peaks hit
"too many connections". The MCP is a minor contributor but easy to bound:

- Dockerfile: uvicorn --workers 2 → 1. The MCP's connection budget is
  workers × MCP_POOL_MAX, so this caps it at 8 backends instead of 16. Scale
  via MCP_POOL_MAX, not workers, so the budget stays obvious. (Pairs with the
  minconn=0 lazy pool already on this branch: 0 connections held when idle.)
- analytics_ro_role.sql: add idle_session_timeout=5min so the DB reaps the
  MCP's idle POOLED connections (idle_in_transaction never reaps them — they're
  idle outside a txn) and returns the slots. Safe because the server now
  discards + transparently retries a reaped connection instead of erroring.

Note: the dominant fix is stack-wide (get the superuser app pools onto bounded,
timed roles; consider PgBouncer; or raise max_connections) — out of this repo's
scope but documented in the review.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-19 23:38:22 +03:00
kiania
5e3fc3910b fix: harden MCP server reliability, build reproducibility, and auth
Addresses intermittent query failures on the live instance (container itself
is healthy — failures are application/query-level) plus security hardening.

Reliability (analytics_mcp.py):
- Discard dead pooled connections instead of recycling them. A broken socket
  (DB restart, network blip, crash) previously poisoned the pool and every
  later query handed that connection failed until container recreation. New
  _is_disconnect() classifies real connection loss (class-08 / 57P0x SQLSTATE,
  or socket-level OperationalError with pgcode=None) vs. an in-session query
  error like statement_timeout (QueryCanceled / 57014), which is NOT a
  disconnect and leaves the connection usable.
- query() retries ONCE, only on a genuine disconnect, so a recycled-but-stale
  connection is invisible to the analyst (a real query error still surfaces).
- Bound concurrent checkouts with a semaphore (POOL_MAX) so >POOL_MAX
  concurrent queries QUEUE instead of overflowing the pool and raising
  PoolError (a 500 to the analyst).
- Lazy pool (minconn=0) + retry on init, so a brief DB outage at deploy time
  no longer crash-loops the worker.

Build reproducibility:
- Commit uv.lock (was gitignored) and build with `uv sync --frozen` so
  redeploys can't silently pull a newer, behaviour-changed mcp/starlette.

Security:
- Constant-time Bearer-token comparison (hmac.compare_digest).
- /healthz no longer leaks the analyst/token count.
- Dockerfile runs as a non-root user.
- deploy.sh: Docker log rotation (bound disk) + Traefik rate-limit middleware.

Also: relax the SQL guard so a forbidden keyword inside a string literal (e.g.
WHERE summary ILIKE '%please delete%') no longer rejects a valid read; the
blocklist still rejects data-modifying CTEs (and writes are impossible anyway
via the analytics_ro GRANTs + read-only rolled-back txn). Fix stale docstrings.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-19 23:28:58 +03:00
7 changed files with 1412 additions and 30 deletions

View file

@ -1,3 +1,5 @@
# NOTE: uv.lock is intentionally NOT ignored — the Dockerfile copies it for
# reproducible `uv sync --frozen` builds.
.git
.venv
__pycache__

3
.gitignore vendored
View file

@ -7,4 +7,5 @@ __pycache__/
.DS_Store
.ruff_cache/
.mypy_cache/
uv.lock
# uv.lock IS committed on purpose — the Docker build uses `uv sync --frozen` for
# reproducible installs, and the Coolify build clones the repo (it must include it).

View file

@ -11,15 +11,26 @@ COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/
WORKDIR /app
# Install ONLY dependencies (flat module — the project itself is not a package).
COPY pyproject.toml ./
RUN uv sync --no-dev --no-install-project
# Copy the lockfile and build --frozen so rebuilds are reproducible: without it,
# `uv sync` re-resolves the >= ranges in pyproject.toml and a redeploy could pull a
# newer, behaviour-changed mcp/starlette and break the running server.
COPY pyproject.toml uv.lock ./
RUN uv sync --no-dev --no-install-project --frozen
ENV PATH="/app/.venv/bin:$PATH"
COPY analytics_mcp.py ./
# Run as a non-root user (least privilege; nothing here needs root).
RUN useradd -m -u 10001 app && chown -R app:app /app
USER app
EXPOSE 8892
HEALTHCHECK --interval=30s --timeout=3s --start-period=10s --retries=3 \
CMD python -c "import urllib.request,sys; sys.exit(0 if urllib.request.urlopen('http://localhost:8892/healthz').status==200 else 1)" || exit 1
CMD ["uvicorn", "analytics_mcp:app", "--host", "0.0.0.0", "--port", "8892", "--workers", "2"]
# Single worker: this is a low-traffic read-only proxy for a handful of analysts, and
# the DB connection budget = workers × MCP_POOL_MAX. One worker (× default pool 8) caps
# the MCP at 8 backends instead of 16, which matters on a shared 100-connection DB.
# Scale out by raising MCP_POOL_MAX, not workers, so the budget stays obvious.
CMD ["uvicorn", "analytics_mcp:app", "--host", "0.0.0.0", "--port", "8892", "--workers", "1"]

View file

@ -26,9 +26,12 @@ Env:
"""
from __future__ import annotations
import contextvars
import hmac
import logging
import os
import re
import threading
import time
from contextlib import contextmanager
@ -57,11 +60,18 @@ def _get_logger(name: str) -> logging.Logger:
)
root.addHandler(handler)
root.setLevel(logging.INFO)
root.propagate = False # don't double-emit through uvicorn's root handler
return root.getChild(name)
log = _get_logger("server")
# Per-request caller name, set by BearerAuth from the matched token so the tools can
# attribute each query to an analyst in the logs. A ContextVar (not a tool arg) because
# FastMCP tools never receive the HTTP request; anyio propagates the context into the
# worker thread that runs each sync tool. Defaults to "?" if auth ever didn't run.
_caller_var: contextvars.ContextVar[str] = contextvars.ContextVar("caller", default="?")
DATABASE_URL = os.environ["DATABASE_URL"] # analytics_ro DSN (set by deploy)
MAX_ROWS_CEIL = int(os.getenv("MCP_MAX_ROWS", "10000"))
# Schemas the introspection helpers (list_tables/describe_table/sample_table) expose.
@ -78,26 +88,89 @@ READABLE_SCHEMAS = tuple(
# Force read-only + a statement timeout at the connection level (belt + braces;
# the analytics_ro role already sets these, but a self-contained server is safer
# in case it is ever pointed at a less-restricted DSN).
_pool = psycopg2.pool.ThreadedConnectionPool(
1,
int(os.getenv("MCP_POOL_MAX", "8")),
DATABASE_URL,
options="-c default_transaction_read_only=on -c statement_timeout=30000 -c client_encoding=UTF8",
POOL_MAX = int(os.getenv("MCP_POOL_MAX", "8"))
_POOL_OPTS = "-c default_transaction_read_only=on -c statement_timeout=30000 -c client_encoding=UTF8"
def _init_pool(retries: int = 5, delay: float = 2.0) -> psycopg2.pool.ThreadedConnectionPool:
"""Create the pool, retrying so a brief DB outage at deploy time doesn't crash
the worker into a boot loop. minconn=0 no eager connect at import (connections
are opened lazily on first use)."""
last: Exception | None = None
for attempt in range(1, retries + 1):
try:
return psycopg2.pool.ThreadedConnectionPool(
0, POOL_MAX, DATABASE_URL, options=_POOL_OPTS
)
except psycopg2.OperationalError as exc:
last = exc
log.warning("pool init attempt %d/%d failed: %s", attempt, retries, exc)
if attempt < retries:
time.sleep(delay)
assert last is not None
raise last
_pool = _init_pool()
# FastMCP runs each sync tool in an anyio worker thread (default ~40). Gate checkouts
# behind a bounded semaphore so that >POOL_MAX concurrent queries QUEUE (block in
# their worker thread) instead of overflowing the pool and raising PoolError — a 500
# to the analyst. The event loop is never blocked; only surplus worker threads wait.
_pool_slots = threading.BoundedSemaphore(POOL_MAX)
# SQLSTATE classes that mean the CONNECTION is gone (vs. a query that merely failed,
# e.g. a statement_timeout, which is QueryCanceled / 57014 and leaves the conn usable).
# class 08 = connection exception; 57P01/02/03 = admin/crash shutdown.
_DISCONNECT_SQLSTATES = frozenset(
{"08000", "08003", "08006", "08001", "08004", "08007", "57P01", "57P02", "57P03"}
)
def _is_disconnect(exc: Exception) -> bool:
"""True only for a genuinely lost connection — so we discard/retry on a dropped
socket but NOT on a statement_timeout or other in-session query error."""
if isinstance(exc, psycopg2.InterfaceError):
return True
if isinstance(exc, psycopg2.OperationalError):
code = getattr(exc, "pgcode", None)
return code is None or code in _DISCONNECT_SQLSTATES # None = socket-level failure
return False
@contextmanager
def _ro_conn():
"""Read-only connection; the transaction is ALWAYS rolled back (never commits)."""
conn = _pool.getconn()
"""Read-only connection; the transaction is ALWAYS rolled back (never commits).
Dead connections (DB restart, network blip, crash) are DISCARDED rather than
recycled otherwise a single broken socket poisons the pool and every later
query handed that connection fails until the container is recreated. A query that
merely errors (e.g. statement_timeout) leaves the connection healthy, so it is
rolled back and returned to the pool as normal."""
_pool_slots.acquire()
try:
conn = _pool.getconn()
except Exception:
_pool_slots.release()
raise
broken = False
try:
conn.set_session(readonly=True, autocommit=False)
yield conn
except Exception as exc:
broken = _is_disconnect(exc)
raise
finally:
try:
conn.rollback()
if broken or conn.closed:
_pool.putconn(conn, close=True)
else:
try:
conn.rollback()
_pool.putconn(conn)
except (psycopg2.OperationalError, psycopg2.InterfaceError):
_pool.putconn(conn, close=True)
finally:
_pool.putconn(conn)
_pool_slots.release()
def _rows(cur) -> list[dict]:
@ -135,6 +208,13 @@ def _strip_comments(sql: str) -> str:
return sql.strip()
def _strip_literals(sql: str) -> str:
"""Blank out the contents of single-quoted string literals so the keyword guard
does not fire on DATA (e.g. WHERE summary ILIKE '%please delete%'). '' escapes
are handled so we don't mis-terminate a literal mid-string."""
return re.sub(r"'(?:[^']|'')*'", "''", sql)
def _guard(sql: str) -> str:
"""Validate a single read-only statement; return the cleaned statement."""
stripped = _strip_comments(sql)
@ -146,7 +226,11 @@ def _guard(sql: str) -> str:
stmt = parts[0].strip()
if not re.match(r"^(select|with)\b", stmt, re.IGNORECASE):
raise ValueError("Only SELECT / WITH queries are allowed.")
if _FORBIDDEN.search(stmt):
# Scan with string literals blanked: the blocklist's real job is to reject
# data-modifying CTEs (WITH x AS (DELETE ... RETURNING ...)), not to trip over a
# keyword that merely appears inside a filter value. (Writes are impossible anyway
# via the analytics_ro GRANTs + the read-only, rolled-back transaction.)
if _FORBIDDEN.search(_strip_literals(stmt)):
raise ValueError("Query contains a forbidden (write/DDL) keyword.")
return stmt
@ -179,28 +263,44 @@ mcp = FastMCP("fireside-analytics", stateless_http=True, transport_security=_tra
def query(sql: str, max_rows: int = 1000) -> dict:
"""Run a read-only SELECT/WITH query against the fleet database.
Only the reporting.* and tracksolid.* schemas are readable. Single statement
only; write/DDL is rejected. Returns up to `max_rows` rows (default 1000, hard
cap 10000). A LIMIT is auto-applied when absent. Result: {row_count, truncated, rows}.
Readable schemas are the analytics_ro grant surface (reporting, tracksolid,
tickets, fuel by default). Single statement only; write/DDL is rejected. Returns
up to `max_rows` rows (default 1000, hard cap 10000). A LIMIT is auto-applied when
absent. Result: {row_count, truncated, rows}.
"""
stmt = _guard(sql)
cap = max(1, min(int(max_rows), MAX_ROWS_CEIL))
if not re.search(r"\blimit\b", stmt, re.IGNORECASE):
stmt = f"{stmt}\nLIMIT {cap + 1}" # +1 row to detect truncation
t0 = time.monotonic()
with _ro_conn() as conn, conn.cursor() as cur:
cur.execute(stmt)
rows = _rows(cur)
# Retry once on a dead connection (only): _ro_conn() discards broken sockets, so a
# second attempt gets a fresh one — making a recycled-but-stale pool connection
# invisible to the analyst. A real query error (statement_timeout, bad SQL) is NOT
# retried; it surfaces immediately.
rows = []
for attempt in (1, 2):
try:
with _ro_conn() as conn, conn.cursor() as cur:
cur.execute(stmt)
rows = _rows(cur)
break
except Exception as exc:
if attempt == 2 or not _is_disconnect(exc):
raise
log.warning("stale DB connection on attempt %d — retrying once", attempt)
truncated = len(rows) > cap
rows = rows[:cap]
dur_ms = int((time.monotonic() - t0) * 1000)
log.info("query rows=%d trunc=%s %dms :: %s", len(rows), truncated, dur_ms, sql[:200])
log.info(
"query caller=%s rows=%d trunc=%s %dms :: %s",
_caller_var.get(), len(rows), truncated, dur_ms, sql[:200],
)
return {"row_count": len(rows), "truncated": truncated, "rows": rows}
@mcp.tool()
def list_schemas() -> list[dict]:
"""List the readable schemas (reporting, tracksolid) with their object counts."""
"""List the readable schemas (reporting, tracksolid, tickets, fuel) with object counts."""
with _ro_conn() as conn, conn.cursor() as cur:
cur.execute(
"SELECT table_schema AS schema, count(*) AS objects "
@ -213,7 +313,7 @@ def list_schemas() -> list[dict]:
@mcp.tool()
def list_tables(schema: str) -> list[dict]:
"""List tables + views in a schema (must be reporting or tracksolid)."""
"""List tables + views in a schema (must be one of the readable schemas)."""
if schema not in READABLE_SCHEMAS:
raise ValueError(f"schema must be one of {READABLE_SCHEMAS}")
with _ro_conn() as conn, conn.cursor() as cur:
@ -280,21 +380,35 @@ _TOKENS = {
}
def _lookup_token(token: str) -> str | None:
"""Constant-time token match: compare against every known token so the response
time does not reveal how far a guessed prefix matched. Cheap for a handful of
per-analyst tokens."""
if not token:
return None
match = None
for known, name in _TOKENS.items():
if hmac.compare_digest(token, known):
match = name
return match
class BearerAuth(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
if request.url.path == "/healthz":
return await call_next(request)
auth = request.headers.get("authorization", "")
token = auth[7:] if auth.lower().startswith("bearer ") else ""
caller = _TOKENS.get(token)
caller = _lookup_token(token)
if caller is None:
return JSONResponse({"error": "unauthorized"}, status_code=401)
request.state.caller = caller
_caller_var.set(caller) # so the tools can attribute the query in the logs
return await call_next(request)
async def healthz(_request):
return JSONResponse({"ok": True, "tokens": len(_TOKENS)})
return JSONResponse({"ok": True})
app = mcp.streamable_http_app()

View file

@ -24,6 +24,20 @@ set -euo pipefail
NAME=analytics_mcp
PORT=8892
HOST_DOMAIN="${HOST_DOMAIN:-fleetmcp.fivetitude.com}" # prod: fleetmcp.rahamafresh.com
# Comma-separated list of every domain this service answers on (defaults to
# HOST_DOMAIN). All are folded into ONE Traefik router rule so a single cert
# covers them and connectors on either domain keep working.
HOST_DOMAINS="${HOST_DOMAINS:-$HOST_DOMAIN}"
BT='`'
RULE=""
IFS=',' read -ra _DOMS <<< "$HOST_DOMAINS"
for _d in "${_DOMS[@]}"; do
_d="${_d// /}"
if [ -n "$_d" ]; then
seg="Host(${BT}${_d}${BT})"
if [ -z "$RULE" ]; then RULE="$seg"; else RULE="$RULE || $seg"; fi
fi
done
IMAGE="fleetanalytics-mcp:latest"
ENV_FILE="$(pwd)/.deploy.env"
@ -53,9 +67,15 @@ RO_PW=$(cat "${ANALYTICS_RO_PW_FILE:-$HOME/.analytics_ro.pw}" 2>/dev/null || tru
HOSTPART="${SRC_DB_URL#*@}" # host:port/dbname[?params]
RO_DB_URL="postgresql://analytics_ro:${RO_PW}@${HOSTPART}"
# Build the image from this repo.
echo "Building $IMAGE ..."
docker build -t "$IMAGE" .
# Build the image from this repo (SKIP_BUILD=1 reuses the existing image for a
# labels/env-only change — no new code is pulled in).
if [ "${SKIP_BUILD:-0}" = "1" ]; then
echo "SKIP_BUILD=1 — reusing existing $IMAGE (no rebuild)."
docker image inspect "$IMAGE" >/dev/null 2>&1 || { echo "ERROR: $IMAGE not present"; exit 1; }
else
echo "Building $IMAGE ..."
docker build -t "$IMAGE" .
fi
# Minimal env (read-only DSN + auth only — no Tracksolid ingestion secrets).
{ echo "DATABASE_URL=${RO_DB_URL}"; echo "MCP_AUTH_TOKENS=${MCP_AUTH_TOKENS}"; } > "$ENV_FILE"
@ -65,14 +85,18 @@ docker rm -f "$NAME" 2>/dev/null || true
docker run -d --name "$NAME" --restart unless-stopped \
--network "$APPNET" \
--env-file "$ENV_FILE" \
--log-opt max-size=10m --log-opt max-file=5 \
--label 'traefik.enable=true' \
--label 'traefik.docker.network=coolify' \
--label 'traefik.http.middlewares.redirect-to-https.redirectscheme.scheme=https' \
--label 'traefik.http.middlewares.fleetmcp-ratelimit.ratelimit.average=30' \
--label 'traefik.http.middlewares.fleetmcp-ratelimit.ratelimit.burst=60' \
--label "traefik.http.routers.http-0-fleetmcp.entryPoints=http" \
--label "traefik.http.routers.http-0-fleetmcp.middlewares=redirect-to-https" \
--label "traefik.http.routers.http-0-fleetmcp.rule=Host(\`${HOST_DOMAIN}\`)" \
--label "traefik.http.routers.http-0-fleetmcp.rule=${RULE}" \
--label "traefik.http.routers.https-0-fleetmcp.entryPoints=https" \
--label "traefik.http.routers.https-0-fleetmcp.rule=Host(\`${HOST_DOMAIN}\`)" \
--label "traefik.http.routers.https-0-fleetmcp.rule=${RULE}" \
--label "traefik.http.routers.https-0-fleetmcp.middlewares=fleetmcp-ratelimit" \
--label "traefik.http.routers.https-0-fleetmcp.tls=true" \
--label "traefik.http.routers.https-0-fleetmcp.tls.certresolver=letsencrypt" \
--label "traefik.http.services.fleetmcp.loadbalancer.server.port=${PORT}" \

View file

@ -62,3 +62,9 @@ ALTER DEFAULT PRIVILEGES FOR ROLE postgres IN SCHEMA fuel GRANT EX
ALTER ROLE analytics_ro SET default_transaction_read_only = on;
ALTER ROLE analytics_ro SET statement_timeout = '30s';
ALTER ROLE analytics_ro SET idle_in_transaction_session_timeout = '60s';
-- Cap idle POOLED connections too (these sit idle but NOT in a transaction, so the
-- idle_in_transaction timeout never reaps them). On a shared 100-connection DB this
-- returns slots the MCP isn't using. Safe with the server's dead-connection handling:
-- a reaped pooled connection is discarded + transparently retried, not surfaced as an
-- error. (Requires PostgreSQL 14+.)
ALTER ROLE analytics_ro SET idle_session_timeout = '5min';

1224
uv.lock Normal file

File diff suppressed because it is too large Load diff