Compare commits

...

3 commits

Author SHA1 Message Date
a36542dbc9 Merge pull request 'fix: harden MCP server reliability, build reproducibility, and auth' (#1) from fix/reliability-pool-build-guard into main 2026-06-19 21:40:07 +00:00
kiania
c02c127798 fix(connections): shrink MCP DB-connection footprint on a shared 100-conn DB
The DB is at max_connections=100 and several stack services hold persistent
pools (several as the postgres superuser, idle for hours), so peaks hit
"too many connections". The MCP is a minor contributor but easy to bound:

- Dockerfile: uvicorn --workers 2 → 1. The MCP's connection budget is
  workers × MCP_POOL_MAX, so this caps it at 8 backends instead of 16. Scale
  via MCP_POOL_MAX, not workers, so the budget stays obvious. (Pairs with the
  minconn=0 lazy pool already on this branch: 0 connections held when idle.)
- analytics_ro_role.sql: add idle_session_timeout=5min so the DB reaps the
  MCP's idle POOLED connections (idle_in_transaction never reaps them — they're
  idle outside a txn) and returns the slots. Safe because the server now
  discards + transparently retries a reaped connection instead of erroring.

Note: the dominant fix is stack-wide (get the superuser app pools onto bounded,
timed roles; consider PgBouncer; or raise max_connections) — out of this repo's
scope but documented in the review.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-19 23:38:22 +03:00
kiania
5e3fc3910b fix: harden MCP server reliability, build reproducibility, and auth
Addresses intermittent query failures on the live instance (container itself
is healthy — failures are application/query-level) plus security hardening.

Reliability (analytics_mcp.py):
- Discard dead pooled connections instead of recycling them. A broken socket
  (DB restart, network blip, crash) previously poisoned the pool and every
  later query handed that connection failed until container recreation. New
  _is_disconnect() classifies real connection loss (class-08 / 57P0x SQLSTATE,
  or socket-level OperationalError with pgcode=None) vs. an in-session query
  error like statement_timeout (QueryCanceled / 57014), which is NOT a
  disconnect and leaves the connection usable.
- query() retries ONCE, only on a genuine disconnect, so a recycled-but-stale
  connection is invisible to the analyst (a real query error still surfaces).
- Bound concurrent checkouts with a semaphore (POOL_MAX) so >POOL_MAX
  concurrent queries QUEUE instead of overflowing the pool and raising
  PoolError (a 500 to the analyst).
- Lazy pool (minconn=0) + retry on init, so a brief DB outage at deploy time
  no longer crash-loops the worker.

Build reproducibility:
- Commit uv.lock (was gitignored) and build with `uv sync --frozen` so
  redeploys can't silently pull a newer, behaviour-changed mcp/starlette.

Security:
- Constant-time Bearer-token comparison (hmac.compare_digest).
- /healthz no longer leaks the analyst/token count.
- Dockerfile runs as a non-root user.
- deploy.sh: Docker log rotation (bound disk) + Traefik rate-limit middleware.

Also: relax the SQL guard so a forbidden keyword inside a string literal (e.g.
WHERE summary ILIKE '%please delete%') no longer rejects a valid read; the
blocklist still rejects data-modifying CTEs (and writes are impossible anyway
via the analytics_ro GRANTs + read-only rolled-back txn). Fix stale docstrings.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-19 23:28:58 +03:00
7 changed files with 1375 additions and 24 deletions

View file

@ -1,3 +1,5 @@
# NOTE: uv.lock is intentionally NOT ignored — the Dockerfile copies it for
# reproducible `uv sync --frozen` builds.
.git .git
.venv .venv
__pycache__ __pycache__

3
.gitignore vendored
View file

@ -7,4 +7,5 @@ __pycache__/
.DS_Store .DS_Store
.ruff_cache/ .ruff_cache/
.mypy_cache/ .mypy_cache/
uv.lock # uv.lock IS committed on purpose — the Docker build uses `uv sync --frozen` for
# reproducible installs, and the Coolify build clones the repo (it must include it).

View file

@ -11,15 +11,26 @@ COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/
WORKDIR /app WORKDIR /app
# Install ONLY dependencies (flat module — the project itself is not a package). # Install ONLY dependencies (flat module — the project itself is not a package).
COPY pyproject.toml ./ # Copy the lockfile and build --frozen so rebuilds are reproducible: without it,
RUN uv sync --no-dev --no-install-project # `uv sync` re-resolves the >= ranges in pyproject.toml and a redeploy could pull a
# newer, behaviour-changed mcp/starlette and break the running server.
COPY pyproject.toml uv.lock ./
RUN uv sync --no-dev --no-install-project --frozen
ENV PATH="/app/.venv/bin:$PATH" ENV PATH="/app/.venv/bin:$PATH"
COPY analytics_mcp.py ./ COPY analytics_mcp.py ./
# Run as a non-root user (least privilege; nothing here needs root).
RUN useradd -m -u 10001 app && chown -R app:app /app
USER app
EXPOSE 8892 EXPOSE 8892
HEALTHCHECK --interval=30s --timeout=3s --start-period=10s --retries=3 \ HEALTHCHECK --interval=30s --timeout=3s --start-period=10s --retries=3 \
CMD python -c "import urllib.request,sys; sys.exit(0 if urllib.request.urlopen('http://localhost:8892/healthz').status==200 else 1)" || exit 1 CMD python -c "import urllib.request,sys; sys.exit(0 if urllib.request.urlopen('http://localhost:8892/healthz').status==200 else 1)" || exit 1
CMD ["uvicorn", "analytics_mcp:app", "--host", "0.0.0.0", "--port", "8892", "--workers", "2"] # Single worker: this is a low-traffic read-only proxy for a handful of analysts, and
# the DB connection budget = workers × MCP_POOL_MAX. One worker (× default pool 8) caps
# the MCP at 8 backends instead of 16, which matters on a shared 100-connection DB.
# Scale out by raising MCP_POOL_MAX, not workers, so the budget stays obvious.
CMD ["uvicorn", "analytics_mcp:app", "--host", "0.0.0.0", "--port", "8892", "--workers", "1"]

View file

@ -26,9 +26,11 @@ Env:
""" """
from __future__ import annotations from __future__ import annotations
import hmac
import logging import logging
import os import os
import re import re
import threading
import time import time
from contextlib import contextmanager from contextlib import contextmanager
@ -57,6 +59,7 @@ def _get_logger(name: str) -> logging.Logger:
) )
root.addHandler(handler) root.addHandler(handler)
root.setLevel(logging.INFO) root.setLevel(logging.INFO)
root.propagate = False # don't double-emit through uvicorn's root handler
return root.getChild(name) return root.getChild(name)
@ -78,26 +81,89 @@ READABLE_SCHEMAS = tuple(
# Force read-only + a statement timeout at the connection level (belt + braces; # Force read-only + a statement timeout at the connection level (belt + braces;
# the analytics_ro role already sets these, but a self-contained server is safer # the analytics_ro role already sets these, but a self-contained server is safer
# in case it is ever pointed at a less-restricted DSN). # in case it is ever pointed at a less-restricted DSN).
_pool = psycopg2.pool.ThreadedConnectionPool( POOL_MAX = int(os.getenv("MCP_POOL_MAX", "8"))
1, _POOL_OPTS = "-c default_transaction_read_only=on -c statement_timeout=30000 -c client_encoding=UTF8"
int(os.getenv("MCP_POOL_MAX", "8")),
DATABASE_URL,
options="-c default_transaction_read_only=on -c statement_timeout=30000 -c client_encoding=UTF8", def _init_pool(retries: int = 5, delay: float = 2.0) -> psycopg2.pool.ThreadedConnectionPool:
"""Create the pool, retrying so a brief DB outage at deploy time doesn't crash
the worker into a boot loop. minconn=0 no eager connect at import (connections
are opened lazily on first use)."""
last: Exception | None = None
for attempt in range(1, retries + 1):
try:
return psycopg2.pool.ThreadedConnectionPool(
0, POOL_MAX, DATABASE_URL, options=_POOL_OPTS
)
except psycopg2.OperationalError as exc:
last = exc
log.warning("pool init attempt %d/%d failed: %s", attempt, retries, exc)
if attempt < retries:
time.sleep(delay)
assert last is not None
raise last
_pool = _init_pool()
# FastMCP runs each sync tool in an anyio worker thread (default ~40). Gate checkouts
# behind a bounded semaphore so that >POOL_MAX concurrent queries QUEUE (block in
# their worker thread) instead of overflowing the pool and raising PoolError — a 500
# to the analyst. The event loop is never blocked; only surplus worker threads wait.
_pool_slots = threading.BoundedSemaphore(POOL_MAX)
# SQLSTATE classes that mean the CONNECTION is gone (vs. a query that merely failed,
# e.g. a statement_timeout, which is QueryCanceled / 57014 and leaves the conn usable).
# class 08 = connection exception; 57P01/02/03 = admin/crash shutdown.
_DISCONNECT_SQLSTATES = frozenset(
{"08000", "08003", "08006", "08001", "08004", "08007", "57P01", "57P02", "57P03"}
) )
def _is_disconnect(exc: Exception) -> bool:
"""True only for a genuinely lost connection — so we discard/retry on a dropped
socket but NOT on a statement_timeout or other in-session query error."""
if isinstance(exc, psycopg2.InterfaceError):
return True
if isinstance(exc, psycopg2.OperationalError):
code = getattr(exc, "pgcode", None)
return code is None or code in _DISCONNECT_SQLSTATES # None = socket-level failure
return False
@contextmanager @contextmanager
def _ro_conn(): def _ro_conn():
"""Read-only connection; the transaction is ALWAYS rolled back (never commits).""" """Read-only connection; the transaction is ALWAYS rolled back (never commits).
Dead connections (DB restart, network blip, crash) are DISCARDED rather than
recycled otherwise a single broken socket poisons the pool and every later
query handed that connection fails until the container is recreated. A query that
merely errors (e.g. statement_timeout) leaves the connection healthy, so it is
rolled back and returned to the pool as normal."""
_pool_slots.acquire()
try:
conn = _pool.getconn() conn = _pool.getconn()
except Exception:
_pool_slots.release()
raise
broken = False
try: try:
conn.set_session(readonly=True, autocommit=False) conn.set_session(readonly=True, autocommit=False)
yield conn yield conn
except Exception as exc:
broken = _is_disconnect(exc)
raise
finally: finally:
try:
if broken or conn.closed:
_pool.putconn(conn, close=True)
else:
try: try:
conn.rollback() conn.rollback()
finally:
_pool.putconn(conn) _pool.putconn(conn)
except (psycopg2.OperationalError, psycopg2.InterfaceError):
_pool.putconn(conn, close=True)
finally:
_pool_slots.release()
def _rows(cur) -> list[dict]: def _rows(cur) -> list[dict]:
@ -135,6 +201,13 @@ def _strip_comments(sql: str) -> str:
return sql.strip() return sql.strip()
def _strip_literals(sql: str) -> str:
"""Blank out the contents of single-quoted string literals so the keyword guard
does not fire on DATA (e.g. WHERE summary ILIKE '%please delete%'). '' escapes
are handled so we don't mis-terminate a literal mid-string."""
return re.sub(r"'(?:[^']|'')*'", "''", sql)
def _guard(sql: str) -> str: def _guard(sql: str) -> str:
"""Validate a single read-only statement; return the cleaned statement.""" """Validate a single read-only statement; return the cleaned statement."""
stripped = _strip_comments(sql) stripped = _strip_comments(sql)
@ -146,7 +219,11 @@ def _guard(sql: str) -> str:
stmt = parts[0].strip() stmt = parts[0].strip()
if not re.match(r"^(select|with)\b", stmt, re.IGNORECASE): if not re.match(r"^(select|with)\b", stmt, re.IGNORECASE):
raise ValueError("Only SELECT / WITH queries are allowed.") raise ValueError("Only SELECT / WITH queries are allowed.")
if _FORBIDDEN.search(stmt): # Scan with string literals blanked: the blocklist's real job is to reject
# data-modifying CTEs (WITH x AS (DELETE ... RETURNING ...)), not to trip over a
# keyword that merely appears inside a filter value. (Writes are impossible anyway
# via the analytics_ro GRANTs + the read-only, rolled-back transaction.)
if _FORBIDDEN.search(_strip_literals(stmt)):
raise ValueError("Query contains a forbidden (write/DDL) keyword.") raise ValueError("Query contains a forbidden (write/DDL) keyword.")
return stmt return stmt
@ -179,18 +256,31 @@ mcp = FastMCP("fireside-analytics", stateless_http=True, transport_security=_tra
def query(sql: str, max_rows: int = 1000) -> dict: def query(sql: str, max_rows: int = 1000) -> dict:
"""Run a read-only SELECT/WITH query against the fleet database. """Run a read-only SELECT/WITH query against the fleet database.
Only the reporting.* and tracksolid.* schemas are readable. Single statement Readable schemas are the analytics_ro grant surface (reporting, tracksolid,
only; write/DDL is rejected. Returns up to `max_rows` rows (default 1000, hard tickets, fuel by default). Single statement only; write/DDL is rejected. Returns
cap 10000). A LIMIT is auto-applied when absent. Result: {row_count, truncated, rows}. up to `max_rows` rows (default 1000, hard cap 10000). A LIMIT is auto-applied when
absent. Result: {row_count, truncated, rows}.
""" """
stmt = _guard(sql) stmt = _guard(sql)
cap = max(1, min(int(max_rows), MAX_ROWS_CEIL)) cap = max(1, min(int(max_rows), MAX_ROWS_CEIL))
if not re.search(r"\blimit\b", stmt, re.IGNORECASE): if not re.search(r"\blimit\b", stmt, re.IGNORECASE):
stmt = f"{stmt}\nLIMIT {cap + 1}" # +1 row to detect truncation stmt = f"{stmt}\nLIMIT {cap + 1}" # +1 row to detect truncation
t0 = time.monotonic() t0 = time.monotonic()
# Retry once on a dead connection (only): _ro_conn() discards broken sockets, so a
# second attempt gets a fresh one — making a recycled-but-stale pool connection
# invisible to the analyst. A real query error (statement_timeout, bad SQL) is NOT
# retried; it surfaces immediately.
rows = []
for attempt in (1, 2):
try:
with _ro_conn() as conn, conn.cursor() as cur: with _ro_conn() as conn, conn.cursor() as cur:
cur.execute(stmt) cur.execute(stmt)
rows = _rows(cur) rows = _rows(cur)
break
except Exception as exc:
if attempt == 2 or not _is_disconnect(exc):
raise
log.warning("stale DB connection on attempt %d — retrying once", attempt)
truncated = len(rows) > cap truncated = len(rows) > cap
rows = rows[:cap] rows = rows[:cap]
dur_ms = int((time.monotonic() - t0) * 1000) dur_ms = int((time.monotonic() - t0) * 1000)
@ -200,7 +290,7 @@ def query(sql: str, max_rows: int = 1000) -> dict:
@mcp.tool() @mcp.tool()
def list_schemas() -> list[dict]: def list_schemas() -> list[dict]:
"""List the readable schemas (reporting, tracksolid) with their object counts.""" """List the readable schemas (reporting, tracksolid, tickets, fuel) with object counts."""
with _ro_conn() as conn, conn.cursor() as cur: with _ro_conn() as conn, conn.cursor() as cur:
cur.execute( cur.execute(
"SELECT table_schema AS schema, count(*) AS objects " "SELECT table_schema AS schema, count(*) AS objects "
@ -213,7 +303,7 @@ def list_schemas() -> list[dict]:
@mcp.tool() @mcp.tool()
def list_tables(schema: str) -> list[dict]: def list_tables(schema: str) -> list[dict]:
"""List tables + views in a schema (must be reporting or tracksolid).""" """List tables + views in a schema (must be one of the readable schemas)."""
if schema not in READABLE_SCHEMAS: if schema not in READABLE_SCHEMAS:
raise ValueError(f"schema must be one of {READABLE_SCHEMAS}") raise ValueError(f"schema must be one of {READABLE_SCHEMAS}")
with _ro_conn() as conn, conn.cursor() as cur: with _ro_conn() as conn, conn.cursor() as cur:
@ -280,13 +370,26 @@ _TOKENS = {
} }
def _lookup_token(token: str) -> str | None:
"""Constant-time token match: compare against every known token so the response
time does not reveal how far a guessed prefix matched. Cheap for a handful of
per-analyst tokens."""
if not token:
return None
match = None
for known, name in _TOKENS.items():
if hmac.compare_digest(token, known):
match = name
return match
class BearerAuth(BaseHTTPMiddleware): class BearerAuth(BaseHTTPMiddleware):
async def dispatch(self, request, call_next): async def dispatch(self, request, call_next):
if request.url.path == "/healthz": if request.url.path == "/healthz":
return await call_next(request) return await call_next(request)
auth = request.headers.get("authorization", "") auth = request.headers.get("authorization", "")
token = auth[7:] if auth.lower().startswith("bearer ") else "" token = auth[7:] if auth.lower().startswith("bearer ") else ""
caller = _TOKENS.get(token) caller = _lookup_token(token)
if caller is None: if caller is None:
return JSONResponse({"error": "unauthorized"}, status_code=401) return JSONResponse({"error": "unauthorized"}, status_code=401)
request.state.caller = caller request.state.caller = caller
@ -294,7 +397,7 @@ class BearerAuth(BaseHTTPMiddleware):
async def healthz(_request): async def healthz(_request):
return JSONResponse({"ok": True, "tokens": len(_TOKENS)}) return JSONResponse({"ok": True})
app = mcp.streamable_http_app() app = mcp.streamable_http_app()

View file

@ -65,14 +65,18 @@ docker rm -f "$NAME" 2>/dev/null || true
docker run -d --name "$NAME" --restart unless-stopped \ docker run -d --name "$NAME" --restart unless-stopped \
--network "$APPNET" \ --network "$APPNET" \
--env-file "$ENV_FILE" \ --env-file "$ENV_FILE" \
--log-opt max-size=10m --log-opt max-file=5 \
--label 'traefik.enable=true' \ --label 'traefik.enable=true' \
--label 'traefik.docker.network=coolify' \ --label 'traefik.docker.network=coolify' \
--label 'traefik.http.middlewares.redirect-to-https.redirectscheme.scheme=https' \ --label 'traefik.http.middlewares.redirect-to-https.redirectscheme.scheme=https' \
--label 'traefik.http.middlewares.fleetmcp-ratelimit.ratelimit.average=30' \
--label 'traefik.http.middlewares.fleetmcp-ratelimit.ratelimit.burst=60' \
--label "traefik.http.routers.http-0-fleetmcp.entryPoints=http" \ --label "traefik.http.routers.http-0-fleetmcp.entryPoints=http" \
--label "traefik.http.routers.http-0-fleetmcp.middlewares=redirect-to-https" \ --label "traefik.http.routers.http-0-fleetmcp.middlewares=redirect-to-https" \
--label "traefik.http.routers.http-0-fleetmcp.rule=Host(\`${HOST_DOMAIN}\`)" \ --label "traefik.http.routers.http-0-fleetmcp.rule=Host(\`${HOST_DOMAIN}\`)" \
--label "traefik.http.routers.https-0-fleetmcp.entryPoints=https" \ --label "traefik.http.routers.https-0-fleetmcp.entryPoints=https" \
--label "traefik.http.routers.https-0-fleetmcp.rule=Host(\`${HOST_DOMAIN}\`)" \ --label "traefik.http.routers.https-0-fleetmcp.rule=Host(\`${HOST_DOMAIN}\`)" \
--label "traefik.http.routers.https-0-fleetmcp.middlewares=fleetmcp-ratelimit" \
--label "traefik.http.routers.https-0-fleetmcp.tls=true" \ --label "traefik.http.routers.https-0-fleetmcp.tls=true" \
--label "traefik.http.routers.https-0-fleetmcp.tls.certresolver=letsencrypt" \ --label "traefik.http.routers.https-0-fleetmcp.tls.certresolver=letsencrypt" \
--label "traefik.http.services.fleetmcp.loadbalancer.server.port=${PORT}" \ --label "traefik.http.services.fleetmcp.loadbalancer.server.port=${PORT}" \

View file

@ -62,3 +62,9 @@ ALTER DEFAULT PRIVILEGES FOR ROLE postgres IN SCHEMA fuel GRANT EX
ALTER ROLE analytics_ro SET default_transaction_read_only = on; ALTER ROLE analytics_ro SET default_transaction_read_only = on;
ALTER ROLE analytics_ro SET statement_timeout = '30s'; ALTER ROLE analytics_ro SET statement_timeout = '30s';
ALTER ROLE analytics_ro SET idle_in_transaction_session_timeout = '60s'; ALTER ROLE analytics_ro SET idle_in_transaction_session_timeout = '60s';
-- Cap idle POOLED connections too (these sit idle but NOT in a transaction, so the
-- idle_in_transaction timeout never reaps them). On a shared 100-connection DB this
-- returns slots the MCP isn't using. Safe with the server's dead-connection handling:
-- a reaped pooled connection is discarded + transparently retried, not surfaced as an
-- error. (Requires PostgreSQL 14+.)
ALTER ROLE analytics_ro SET idle_session_timeout = '5min';

1224
uv.lock Normal file

File diff suppressed because it is too large Load diff