Compare commits

..

No commits in common. "a36542dbc9c09467abeb86c47e7258911f9961b5" and "0355047fdde9c4ce4b59d4425e9e256eb32b8ec8" have entirely different histories.

7 changed files with 24 additions and 1375 deletions

View file

@ -1,5 +1,3 @@
# NOTE: uv.lock is intentionally NOT ignored — the Dockerfile copies it for
# reproducible `uv sync --frozen` builds.
.git .git
.venv .venv
__pycache__ __pycache__

3
.gitignore vendored
View file

@ -7,5 +7,4 @@ __pycache__/
.DS_Store .DS_Store
.ruff_cache/ .ruff_cache/
.mypy_cache/ .mypy_cache/
# uv.lock IS committed on purpose — the Docker build uses `uv sync --frozen` for uv.lock
# reproducible installs, and the Coolify build clones the repo (it must include it).

View file

@ -11,26 +11,15 @@ COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/
WORKDIR /app WORKDIR /app
# Install ONLY dependencies (flat module — the project itself is not a package). # Install ONLY dependencies (flat module — the project itself is not a package).
# Copy the lockfile and build --frozen so rebuilds are reproducible: without it, COPY pyproject.toml ./
# `uv sync` re-resolves the >= ranges in pyproject.toml and a redeploy could pull a RUN uv sync --no-dev --no-install-project
# newer, behaviour-changed mcp/starlette and break the running server.
COPY pyproject.toml uv.lock ./
RUN uv sync --no-dev --no-install-project --frozen
ENV PATH="/app/.venv/bin:$PATH" ENV PATH="/app/.venv/bin:$PATH"
COPY analytics_mcp.py ./ COPY analytics_mcp.py ./
# Run as a non-root user (least privilege; nothing here needs root).
RUN useradd -m -u 10001 app && chown -R app:app /app
USER app
EXPOSE 8892 EXPOSE 8892
HEALTHCHECK --interval=30s --timeout=3s --start-period=10s --retries=3 \ HEALTHCHECK --interval=30s --timeout=3s --start-period=10s --retries=3 \
CMD python -c "import urllib.request,sys; sys.exit(0 if urllib.request.urlopen('http://localhost:8892/healthz').status==200 else 1)" || exit 1 CMD python -c "import urllib.request,sys; sys.exit(0 if urllib.request.urlopen('http://localhost:8892/healthz').status==200 else 1)" || exit 1
# Single worker: this is a low-traffic read-only proxy for a handful of analysts, and CMD ["uvicorn", "analytics_mcp:app", "--host", "0.0.0.0", "--port", "8892", "--workers", "2"]
# the DB connection budget = workers × MCP_POOL_MAX. One worker (× default pool 8) caps
# the MCP at 8 backends instead of 16, which matters on a shared 100-connection DB.
# Scale out by raising MCP_POOL_MAX, not workers, so the budget stays obvious.
CMD ["uvicorn", "analytics_mcp:app", "--host", "0.0.0.0", "--port", "8892", "--workers", "1"]

View file

@ -26,11 +26,9 @@ Env:
""" """
from __future__ import annotations from __future__ import annotations
import hmac
import logging import logging
import os import os
import re import re
import threading
import time import time
from contextlib import contextmanager from contextlib import contextmanager
@ -59,7 +57,6 @@ def _get_logger(name: str) -> logging.Logger:
) )
root.addHandler(handler) root.addHandler(handler)
root.setLevel(logging.INFO) root.setLevel(logging.INFO)
root.propagate = False # don't double-emit through uvicorn's root handler
return root.getChild(name) return root.getChild(name)
@ -81,89 +78,26 @@ READABLE_SCHEMAS = tuple(
# Force read-only + a statement timeout at the connection level (belt + braces; # Force read-only + a statement timeout at the connection level (belt + braces;
# the analytics_ro role already sets these, but a self-contained server is safer # the analytics_ro role already sets these, but a self-contained server is safer
# in case it is ever pointed at a less-restricted DSN). # in case it is ever pointed at a less-restricted DSN).
POOL_MAX = int(os.getenv("MCP_POOL_MAX", "8")) _pool = psycopg2.pool.ThreadedConnectionPool(
_POOL_OPTS = "-c default_transaction_read_only=on -c statement_timeout=30000 -c client_encoding=UTF8" 1,
int(os.getenv("MCP_POOL_MAX", "8")),
DATABASE_URL,
def _init_pool(retries: int = 5, delay: float = 2.0) -> psycopg2.pool.ThreadedConnectionPool: options="-c default_transaction_read_only=on -c statement_timeout=30000 -c client_encoding=UTF8",
"""Create the pool, retrying so a brief DB outage at deploy time doesn't crash
the worker into a boot loop. minconn=0 no eager connect at import (connections
are opened lazily on first use)."""
last: Exception | None = None
for attempt in range(1, retries + 1):
try:
return psycopg2.pool.ThreadedConnectionPool(
0, POOL_MAX, DATABASE_URL, options=_POOL_OPTS
) )
except psycopg2.OperationalError as exc:
last = exc
log.warning("pool init attempt %d/%d failed: %s", attempt, retries, exc)
if attempt < retries:
time.sleep(delay)
assert last is not None
raise last
_pool = _init_pool()
# FastMCP runs each sync tool in an anyio worker thread (default ~40). Gate checkouts
# behind a bounded semaphore so that >POOL_MAX concurrent queries QUEUE (block in
# their worker thread) instead of overflowing the pool and raising PoolError — a 500
# to the analyst. The event loop is never blocked; only surplus worker threads wait.
_pool_slots = threading.BoundedSemaphore(POOL_MAX)
# SQLSTATE classes that mean the CONNECTION is gone (vs. a query that merely failed,
# e.g. a statement_timeout, which is QueryCanceled / 57014 and leaves the conn usable).
# class 08 = connection exception; 57P01/02/03 = admin/crash shutdown.
_DISCONNECT_SQLSTATES = frozenset(
{"08000", "08003", "08006", "08001", "08004", "08007", "57P01", "57P02", "57P03"}
)
def _is_disconnect(exc: Exception) -> bool:
"""True only for a genuinely lost connection — so we discard/retry on a dropped
socket but NOT on a statement_timeout or other in-session query error."""
if isinstance(exc, psycopg2.InterfaceError):
return True
if isinstance(exc, psycopg2.OperationalError):
code = getattr(exc, "pgcode", None)
return code is None or code in _DISCONNECT_SQLSTATES # None = socket-level failure
return False
@contextmanager @contextmanager
def _ro_conn(): def _ro_conn():
"""Read-only connection; the transaction is ALWAYS rolled back (never commits). """Read-only connection; the transaction is ALWAYS rolled back (never commits)."""
Dead connections (DB restart, network blip, crash) are DISCARDED rather than
recycled otherwise a single broken socket poisons the pool and every later
query handed that connection fails until the container is recreated. A query that
merely errors (e.g. statement_timeout) leaves the connection healthy, so it is
rolled back and returned to the pool as normal."""
_pool_slots.acquire()
try:
conn = _pool.getconn() conn = _pool.getconn()
except Exception:
_pool_slots.release()
raise
broken = False
try: try:
conn.set_session(readonly=True, autocommit=False) conn.set_session(readonly=True, autocommit=False)
yield conn yield conn
except Exception as exc:
broken = _is_disconnect(exc)
raise
finally: finally:
try:
if broken or conn.closed:
_pool.putconn(conn, close=True)
else:
try: try:
conn.rollback() conn.rollback()
_pool.putconn(conn)
except (psycopg2.OperationalError, psycopg2.InterfaceError):
_pool.putconn(conn, close=True)
finally: finally:
_pool_slots.release() _pool.putconn(conn)
def _rows(cur) -> list[dict]: def _rows(cur) -> list[dict]:
@ -201,13 +135,6 @@ def _strip_comments(sql: str) -> str:
return sql.strip() return sql.strip()
def _strip_literals(sql: str) -> str:
"""Blank out the contents of single-quoted string literals so the keyword guard
does not fire on DATA (e.g. WHERE summary ILIKE '%please delete%'). '' escapes
are handled so we don't mis-terminate a literal mid-string."""
return re.sub(r"'(?:[^']|'')*'", "''", sql)
def _guard(sql: str) -> str: def _guard(sql: str) -> str:
"""Validate a single read-only statement; return the cleaned statement.""" """Validate a single read-only statement; return the cleaned statement."""
stripped = _strip_comments(sql) stripped = _strip_comments(sql)
@ -219,11 +146,7 @@ def _guard(sql: str) -> str:
stmt = parts[0].strip() stmt = parts[0].strip()
if not re.match(r"^(select|with)\b", stmt, re.IGNORECASE): if not re.match(r"^(select|with)\b", stmt, re.IGNORECASE):
raise ValueError("Only SELECT / WITH queries are allowed.") raise ValueError("Only SELECT / WITH queries are allowed.")
# Scan with string literals blanked: the blocklist's real job is to reject if _FORBIDDEN.search(stmt):
# data-modifying CTEs (WITH x AS (DELETE ... RETURNING ...)), not to trip over a
# keyword that merely appears inside a filter value. (Writes are impossible anyway
# via the analytics_ro GRANTs + the read-only, rolled-back transaction.)
if _FORBIDDEN.search(_strip_literals(stmt)):
raise ValueError("Query contains a forbidden (write/DDL) keyword.") raise ValueError("Query contains a forbidden (write/DDL) keyword.")
return stmt return stmt
@ -256,31 +179,18 @@ mcp = FastMCP("fireside-analytics", stateless_http=True, transport_security=_tra
def query(sql: str, max_rows: int = 1000) -> dict: def query(sql: str, max_rows: int = 1000) -> dict:
"""Run a read-only SELECT/WITH query against the fleet database. """Run a read-only SELECT/WITH query against the fleet database.
Readable schemas are the analytics_ro grant surface (reporting, tracksolid, Only the reporting.* and tracksolid.* schemas are readable. Single statement
tickets, fuel by default). Single statement only; write/DDL is rejected. Returns only; write/DDL is rejected. Returns up to `max_rows` rows (default 1000, hard
up to `max_rows` rows (default 1000, hard cap 10000). A LIMIT is auto-applied when cap 10000). A LIMIT is auto-applied when absent. Result: {row_count, truncated, rows}.
absent. Result: {row_count, truncated, rows}.
""" """
stmt = _guard(sql) stmt = _guard(sql)
cap = max(1, min(int(max_rows), MAX_ROWS_CEIL)) cap = max(1, min(int(max_rows), MAX_ROWS_CEIL))
if not re.search(r"\blimit\b", stmt, re.IGNORECASE): if not re.search(r"\blimit\b", stmt, re.IGNORECASE):
stmt = f"{stmt}\nLIMIT {cap + 1}" # +1 row to detect truncation stmt = f"{stmt}\nLIMIT {cap + 1}" # +1 row to detect truncation
t0 = time.monotonic() t0 = time.monotonic()
# Retry once on a dead connection (only): _ro_conn() discards broken sockets, so a
# second attempt gets a fresh one — making a recycled-but-stale pool connection
# invisible to the analyst. A real query error (statement_timeout, bad SQL) is NOT
# retried; it surfaces immediately.
rows = []
for attempt in (1, 2):
try:
with _ro_conn() as conn, conn.cursor() as cur: with _ro_conn() as conn, conn.cursor() as cur:
cur.execute(stmt) cur.execute(stmt)
rows = _rows(cur) rows = _rows(cur)
break
except Exception as exc:
if attempt == 2 or not _is_disconnect(exc):
raise
log.warning("stale DB connection on attempt %d — retrying once", attempt)
truncated = len(rows) > cap truncated = len(rows) > cap
rows = rows[:cap] rows = rows[:cap]
dur_ms = int((time.monotonic() - t0) * 1000) dur_ms = int((time.monotonic() - t0) * 1000)
@ -290,7 +200,7 @@ def query(sql: str, max_rows: int = 1000) -> dict:
@mcp.tool() @mcp.tool()
def list_schemas() -> list[dict]: def list_schemas() -> list[dict]:
"""List the readable schemas (reporting, tracksolid, tickets, fuel) with object counts.""" """List the readable schemas (reporting, tracksolid) with their object counts."""
with _ro_conn() as conn, conn.cursor() as cur: with _ro_conn() as conn, conn.cursor() as cur:
cur.execute( cur.execute(
"SELECT table_schema AS schema, count(*) AS objects " "SELECT table_schema AS schema, count(*) AS objects "
@ -303,7 +213,7 @@ def list_schemas() -> list[dict]:
@mcp.tool() @mcp.tool()
def list_tables(schema: str) -> list[dict]: def list_tables(schema: str) -> list[dict]:
"""List tables + views in a schema (must be one of the readable schemas).""" """List tables + views in a schema (must be reporting or tracksolid)."""
if schema not in READABLE_SCHEMAS: if schema not in READABLE_SCHEMAS:
raise ValueError(f"schema must be one of {READABLE_SCHEMAS}") raise ValueError(f"schema must be one of {READABLE_SCHEMAS}")
with _ro_conn() as conn, conn.cursor() as cur: with _ro_conn() as conn, conn.cursor() as cur:
@ -370,26 +280,13 @@ _TOKENS = {
} }
def _lookup_token(token: str) -> str | None:
"""Constant-time token match: compare against every known token so the response
time does not reveal how far a guessed prefix matched. Cheap for a handful of
per-analyst tokens."""
if not token:
return None
match = None
for known, name in _TOKENS.items():
if hmac.compare_digest(token, known):
match = name
return match
class BearerAuth(BaseHTTPMiddleware): class BearerAuth(BaseHTTPMiddleware):
async def dispatch(self, request, call_next): async def dispatch(self, request, call_next):
if request.url.path == "/healthz": if request.url.path == "/healthz":
return await call_next(request) return await call_next(request)
auth = request.headers.get("authorization", "") auth = request.headers.get("authorization", "")
token = auth[7:] if auth.lower().startswith("bearer ") else "" token = auth[7:] if auth.lower().startswith("bearer ") else ""
caller = _lookup_token(token) caller = _TOKENS.get(token)
if caller is None: if caller is None:
return JSONResponse({"error": "unauthorized"}, status_code=401) return JSONResponse({"error": "unauthorized"}, status_code=401)
request.state.caller = caller request.state.caller = caller
@ -397,7 +294,7 @@ class BearerAuth(BaseHTTPMiddleware):
async def healthz(_request): async def healthz(_request):
return JSONResponse({"ok": True}) return JSONResponse({"ok": True, "tokens": len(_TOKENS)})
app = mcp.streamable_http_app() app = mcp.streamable_http_app()

View file

@ -65,18 +65,14 @@ docker rm -f "$NAME" 2>/dev/null || true
docker run -d --name "$NAME" --restart unless-stopped \ docker run -d --name "$NAME" --restart unless-stopped \
--network "$APPNET" \ --network "$APPNET" \
--env-file "$ENV_FILE" \ --env-file "$ENV_FILE" \
--log-opt max-size=10m --log-opt max-file=5 \
--label 'traefik.enable=true' \ --label 'traefik.enable=true' \
--label 'traefik.docker.network=coolify' \ --label 'traefik.docker.network=coolify' \
--label 'traefik.http.middlewares.redirect-to-https.redirectscheme.scheme=https' \ --label 'traefik.http.middlewares.redirect-to-https.redirectscheme.scheme=https' \
--label 'traefik.http.middlewares.fleetmcp-ratelimit.ratelimit.average=30' \
--label 'traefik.http.middlewares.fleetmcp-ratelimit.ratelimit.burst=60' \
--label "traefik.http.routers.http-0-fleetmcp.entryPoints=http" \ --label "traefik.http.routers.http-0-fleetmcp.entryPoints=http" \
--label "traefik.http.routers.http-0-fleetmcp.middlewares=redirect-to-https" \ --label "traefik.http.routers.http-0-fleetmcp.middlewares=redirect-to-https" \
--label "traefik.http.routers.http-0-fleetmcp.rule=Host(\`${HOST_DOMAIN}\`)" \ --label "traefik.http.routers.http-0-fleetmcp.rule=Host(\`${HOST_DOMAIN}\`)" \
--label "traefik.http.routers.https-0-fleetmcp.entryPoints=https" \ --label "traefik.http.routers.https-0-fleetmcp.entryPoints=https" \
--label "traefik.http.routers.https-0-fleetmcp.rule=Host(\`${HOST_DOMAIN}\`)" \ --label "traefik.http.routers.https-0-fleetmcp.rule=Host(\`${HOST_DOMAIN}\`)" \
--label "traefik.http.routers.https-0-fleetmcp.middlewares=fleetmcp-ratelimit" \
--label "traefik.http.routers.https-0-fleetmcp.tls=true" \ --label "traefik.http.routers.https-0-fleetmcp.tls=true" \
--label "traefik.http.routers.https-0-fleetmcp.tls.certresolver=letsencrypt" \ --label "traefik.http.routers.https-0-fleetmcp.tls.certresolver=letsencrypt" \
--label "traefik.http.services.fleetmcp.loadbalancer.server.port=${PORT}" \ --label "traefik.http.services.fleetmcp.loadbalancer.server.port=${PORT}" \

View file

@ -62,9 +62,3 @@ ALTER DEFAULT PRIVILEGES FOR ROLE postgres IN SCHEMA fuel GRANT EX
ALTER ROLE analytics_ro SET default_transaction_read_only = on; ALTER ROLE analytics_ro SET default_transaction_read_only = on;
ALTER ROLE analytics_ro SET statement_timeout = '30s'; ALTER ROLE analytics_ro SET statement_timeout = '30s';
ALTER ROLE analytics_ro SET idle_in_transaction_session_timeout = '60s'; ALTER ROLE analytics_ro SET idle_in_transaction_session_timeout = '60s';
-- Cap idle POOLED connections too (these sit idle but NOT in a transaction, so the
-- idle_in_transaction timeout never reaps them). On a shared 100-connection DB this
-- returns slots the MCP isn't using. Safe with the server's dead-connection handling:
-- a reaped pooled connection is discarded + transparently retried, not surfaced as an
-- error. (Requires PostgreSQL 14+.)
ALTER ROLE analytics_ro SET idle_session_timeout = '5min';

1224
uv.lock

File diff suppressed because it is too large Load diff