fleetanalytics_mcp/analytics_mcp.py
kiania 5e3fc3910b fix: harden MCP server reliability, build reproducibility, and auth
Addresses intermittent query failures on the live instance (container itself
is healthy — failures are application/query-level) plus security hardening.

Reliability (analytics_mcp.py):
- Discard dead pooled connections instead of recycling them. A broken socket
  (DB restart, network blip, crash) previously poisoned the pool and every
  later query handed that connection failed until container recreation. New
  _is_disconnect() classifies real connection loss (class-08 / 57P0x SQLSTATE,
  or socket-level OperationalError with pgcode=None) vs. an in-session query
  error like statement_timeout (QueryCanceled / 57014), which is NOT a
  disconnect and leaves the connection usable.
- query() retries ONCE, only on a genuine disconnect, so a recycled-but-stale
  connection is invisible to the analyst (a real query error still surfaces).
- Bound concurrent checkouts with a semaphore (POOL_MAX) so >POOL_MAX
  concurrent queries QUEUE instead of overflowing the pool and raising
  PoolError (a 500 to the analyst).
- Lazy pool (minconn=0) + retry on init, so a brief DB outage at deploy time
  no longer crash-loops the worker.

Build reproducibility:
- Commit uv.lock (was gitignored) and build with `uv sync --frozen` so
  redeploys can't silently pull a newer, behaviour-changed mcp/starlette.

Security:
- Constant-time Bearer-token comparison (hmac.compare_digest).
- /healthz no longer leaks the analyst/token count.
- Dockerfile runs as a non-root user.
- deploy.sh: Docker log rotation (bound disk) + Traefik rate-limit middleware.

Also: relax the SQL guard so a forbidden keyword inside a string literal (e.g.
WHERE summary ILIKE '%please delete%') no longer rejects a valid read; the
blocklist still rejects data-modifying CTEs (and writes are impossible anyway
via the analytics_ro GRANTs + read-only rolled-back txn). Fix stale docstrings.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-19 23:28:58 +03:00

411 lines
17 KiB
Python

"""
analytics_mcp_rev.py — Fireside Communications · Read-only Analytics MCP Server
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Hosted MCP server for the decision & analytics team. Exposes the fleet reporting
data (reporting.* + tracksolid.*) to Claude as READ-ONLY query + introspection
tools — for reporting and decisions, never edit/delete.
It is a STANDALONE Traefik-labelled bridge (not Coolify-managed), the same shape
as the dashboard_api staging bridge: it reuses the webhook_receiver image, joins
the `coolify` network, and connects to the internal DB over psycopg2 as the
dedicated read-only `analytics_ro` role (deploy_analytics_mcp.sh sets DATABASE_URL
to that DSN). Served over streamable HTTP with Bearer-token auth.
READ-ONLY is enforced at FOUR layers:
1. the analytics_ro GRANTs (no INSERT/UPDATE/DELETE; not the matview owner)
2. role + connection default_transaction_read_only = on
3. every query runs in a transaction that is ROLLED BACK (never committed)
4. the `query` tool's single-statement / keyword guard (clean errors, not DB faults)
Env:
DATABASE_URL analytics_ro DSN (set by the deploy script)
MCP_AUTH_TOKENS "alice:tok1,bob:tok2" — per-analyst Bearer tokens (revocable + audited)
MCP_MAX_ROWS hard ceiling on rows returned (default 10000)
MCP_POOL_MAX max read-only pool connections (default 8)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
"""
from __future__ import annotations
import hmac
import logging
import os
import re
import threading
import time
from contextlib import contextmanager
import psycopg2
import psycopg2.extras
import psycopg2.pool
from mcp.server.fastmcp import FastMCP
from mcp.server.transport_security import TransportSecuritySettings
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import JSONResponse
def _get_logger(name: str) -> logging.Logger:
"""Standalone logger mirroring ts_shared_rev's format. Intentionally NOT
importing ts_shared_rev: that module eagerly requires the Tracksolid ingestion
secrets (APP_KEY/SECRET/PWD), which this read-only analytics server has no
business holding."""
root = logging.getLogger("analytics_mcp")
if not root.handlers:
handler = logging.StreamHandler()
handler.setFormatter(
logging.Formatter(
"%(asctime)s [%(levelname)s] %(name)s%(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
)
root.addHandler(handler)
root.setLevel(logging.INFO)
root.propagate = False # don't double-emit through uvicorn's root handler
return root.getChild(name)
log = _get_logger("server")
DATABASE_URL = os.environ["DATABASE_URL"] # analytics_ro DSN (set by deploy)
MAX_ROWS_CEIL = int(os.getenv("MCP_MAX_ROWS", "10000"))
# Schemas the introspection helpers (list_tables/describe_table/sample_table) expose.
# Override with MCP_READABLE_SCHEMAS="reporting,tracksolid,tickets,fuel" — these must
# stay in sync with the GRANTs in scripts/analytics_ro_role.sql. The raw query() tool
# is bounded by the analytics_ro role's GRANTs, not by this list.
READABLE_SCHEMAS = tuple(
s.strip() for s in os.getenv(
"MCP_READABLE_SCHEMAS", "reporting,tracksolid,tickets,fuel"
).split(",") if s.strip()
)
# ── Read-only connection pool ────────────────────────────────────────────────
# Force read-only + a statement timeout at the connection level (belt + braces;
# the analytics_ro role already sets these, but a self-contained server is safer
# in case it is ever pointed at a less-restricted DSN).
POOL_MAX = int(os.getenv("MCP_POOL_MAX", "8"))
_POOL_OPTS = "-c default_transaction_read_only=on -c statement_timeout=30000 -c client_encoding=UTF8"
def _init_pool(retries: int = 5, delay: float = 2.0) -> psycopg2.pool.ThreadedConnectionPool:
"""Create the pool, retrying so a brief DB outage at deploy time doesn't crash
the worker into a boot loop. minconn=0 → no eager connect at import (connections
are opened lazily on first use)."""
last: Exception | None = None
for attempt in range(1, retries + 1):
try:
return psycopg2.pool.ThreadedConnectionPool(
0, POOL_MAX, DATABASE_URL, options=_POOL_OPTS
)
except psycopg2.OperationalError as exc:
last = exc
log.warning("pool init attempt %d/%d failed: %s", attempt, retries, exc)
if attempt < retries:
time.sleep(delay)
assert last is not None
raise last
_pool = _init_pool()
# FastMCP runs each sync tool in an anyio worker thread (default ~40). Gate checkouts
# behind a bounded semaphore so that >POOL_MAX concurrent queries QUEUE (block in
# their worker thread) instead of overflowing the pool and raising PoolError — a 500
# to the analyst. The event loop is never blocked; only surplus worker threads wait.
_pool_slots = threading.BoundedSemaphore(POOL_MAX)
# SQLSTATE classes that mean the CONNECTION is gone (vs. a query that merely failed,
# e.g. a statement_timeout, which is QueryCanceled / 57014 and leaves the conn usable).
# class 08 = connection exception; 57P01/02/03 = admin/crash shutdown.
_DISCONNECT_SQLSTATES = frozenset(
{"08000", "08003", "08006", "08001", "08004", "08007", "57P01", "57P02", "57P03"}
)
def _is_disconnect(exc: Exception) -> bool:
"""True only for a genuinely lost connection — so we discard/retry on a dropped
socket but NOT on a statement_timeout or other in-session query error."""
if isinstance(exc, psycopg2.InterfaceError):
return True
if isinstance(exc, psycopg2.OperationalError):
code = getattr(exc, "pgcode", None)
return code is None or code in _DISCONNECT_SQLSTATES # None = socket-level failure
return False
@contextmanager
def _ro_conn():
"""Read-only connection; the transaction is ALWAYS rolled back (never commits).
Dead connections (DB restart, network blip, crash) are DISCARDED rather than
recycled — otherwise a single broken socket poisons the pool and every later
query handed that connection fails until the container is recreated. A query that
merely errors (e.g. statement_timeout) leaves the connection healthy, so it is
rolled back and returned to the pool as normal."""
_pool_slots.acquire()
try:
conn = _pool.getconn()
except Exception:
_pool_slots.release()
raise
broken = False
try:
conn.set_session(readonly=True, autocommit=False)
yield conn
except Exception as exc:
broken = _is_disconnect(exc)
raise
finally:
try:
if broken or conn.closed:
_pool.putconn(conn, close=True)
else:
try:
conn.rollback()
_pool.putconn(conn)
except (psycopg2.OperationalError, psycopg2.InterfaceError):
_pool.putconn(conn, close=True)
finally:
_pool_slots.release()
def _rows(cur) -> list[dict]:
"""Materialise the cursor as a list of JSON-safe dicts."""
if cur.description is None:
return []
cols = [d[0] for d in cur.description]
out = []
for row in cur.fetchall():
out.append({c: _jsonable(v) for c, v in zip(cols, row)})
return out
def _jsonable(v):
"""Coerce non-JSON-native values (dates, Decimal, etc.) to str."""
if v is None or isinstance(v, (bool, int, float, str)):
return v
return str(v)
# ── SQL guard for the general query tool ─────────────────────────────────────
# The analytics_ro role + read-only txn already make writes impossible; this guard
# exists to return CLEAN errors (and block multi-statements / SET that could relax
# read-only) instead of letting the DB raise.
_FORBIDDEN = re.compile(
r"\b(insert|update|delete|drop|alter|create|grant|revoke|truncate|copy|call|do|merge|"
r"vacuum|reindex|refresh|comment|lock|set|reset)\b",
re.IGNORECASE,
)
def _strip_comments(sql: str) -> str:
sql = re.sub(r"/\*.*?\*/", " ", sql, flags=re.DOTALL) # block comments
sql = re.sub(r"--[^\n]*", " ", sql) # line comments
return sql.strip()
def _strip_literals(sql: str) -> str:
"""Blank out the contents of single-quoted string literals so the keyword guard
does not fire on DATA (e.g. WHERE summary ILIKE '%please delete%'). '' escapes
are handled so we don't mis-terminate a literal mid-string."""
return re.sub(r"'(?:[^']|'')*'", "''", sql)
def _guard(sql: str) -> str:
"""Validate a single read-only statement; return the cleaned statement."""
stripped = _strip_comments(sql)
if not stripped:
raise ValueError("Empty query.")
parts = [p for p in stripped.split(";") if p.strip()] # allow one trailing ;
if len(parts) != 1:
raise ValueError("Only a single statement is allowed.")
stmt = parts[0].strip()
if not re.match(r"^(select|with)\b", stmt, re.IGNORECASE):
raise ValueError("Only SELECT / WITH queries are allowed.")
# Scan with string literals blanked: the blocklist's real job is to reject
# data-modifying CTEs (WITH x AS (DELETE ... RETURNING ...)), not to trip over a
# keyword that merely appears inside a filter value. (Writes are impossible anyway
# via the analytics_ro GRANTs + the read-only, rolled-back transaction.)
if _FORBIDDEN.search(_strip_literals(stmt)):
raise ValueError("Query contains a forbidden (write/DDL) keyword.")
return stmt
# ── MCP server + tools ───────────────────────────────────────────────────────
# The MCP SDK ships DNS-rebinding protection that, by default, only accepts a
# localhost Host header and returns 421 for anything else — which breaks this
# service behind Traefik (Host = fleetmcp.*). That protection targets browser
# attacks on localhost-bound servers; it does not apply to a public, TLS-terminated,
# Bearer-authenticated service. So it is OFF by default here, and re-enableable via
# MCP_DNS_REBINDING_PROTECTION=1 with an explicit MCP_ALLOWED_HOSTS allowlist.
_DNS_PROT = os.getenv("MCP_DNS_REBINDING_PROTECTION", "0") == "1"
_ALLOWED_HOSTS = [
h.strip()
for h in os.getenv(
"MCP_ALLOWED_HOSTS",
"fleetmcp.fivetitude.com,fleetmcp.rahamafresh.com,localhost,127.0.0.1",
).split(",")
if h.strip()
]
_transport_security = TransportSecuritySettings(
enable_dns_rebinding_protection=_DNS_PROT,
allowed_hosts=_ALLOWED_HOSTS,
allowed_origins=[f"https://{h}" for h in _ALLOWED_HOSTS],
)
mcp = FastMCP("fireside-analytics", stateless_http=True, transport_security=_transport_security)
@mcp.tool()
def query(sql: str, max_rows: int = 1000) -> dict:
"""Run a read-only SELECT/WITH query against the fleet database.
Readable schemas are the analytics_ro grant surface (reporting, tracksolid,
tickets, fuel by default). Single statement only; write/DDL is rejected. Returns
up to `max_rows` rows (default 1000, hard cap 10000). A LIMIT is auto-applied when
absent. Result: {row_count, truncated, rows}.
"""
stmt = _guard(sql)
cap = max(1, min(int(max_rows), MAX_ROWS_CEIL))
if not re.search(r"\blimit\b", stmt, re.IGNORECASE):
stmt = f"{stmt}\nLIMIT {cap + 1}" # +1 row to detect truncation
t0 = time.monotonic()
# Retry once on a dead connection (only): _ro_conn() discards broken sockets, so a
# second attempt gets a fresh one — making a recycled-but-stale pool connection
# invisible to the analyst. A real query error (statement_timeout, bad SQL) is NOT
# retried; it surfaces immediately.
rows = []
for attempt in (1, 2):
try:
with _ro_conn() as conn, conn.cursor() as cur:
cur.execute(stmt)
rows = _rows(cur)
break
except Exception as exc:
if attempt == 2 or not _is_disconnect(exc):
raise
log.warning("stale DB connection on attempt %d — retrying once", attempt)
truncated = len(rows) > cap
rows = rows[:cap]
dur_ms = int((time.monotonic() - t0) * 1000)
log.info("query rows=%d trunc=%s %dms :: %s", len(rows), truncated, dur_ms, sql[:200])
return {"row_count": len(rows), "truncated": truncated, "rows": rows}
@mcp.tool()
def list_schemas() -> list[dict]:
"""List the readable schemas (reporting, tracksolid, tickets, fuel) with object counts."""
with _ro_conn() as conn, conn.cursor() as cur:
cur.execute(
"SELECT table_schema AS schema, count(*) AS objects "
"FROM information_schema.tables WHERE table_schema = ANY(%s) "
"GROUP BY 1 ORDER BY 1",
(list(READABLE_SCHEMAS),),
)
return _rows(cur)
@mcp.tool()
def list_tables(schema: str) -> list[dict]:
"""List tables + views in a schema (must be one of the readable schemas)."""
if schema not in READABLE_SCHEMAS:
raise ValueError(f"schema must be one of {READABLE_SCHEMAS}")
with _ro_conn() as conn, conn.cursor() as cur:
cur.execute(
"SELECT table_name AS name, table_type AS kind "
"FROM information_schema.tables WHERE table_schema = %s "
"ORDER BY 1",
(schema,),
)
return _rows(cur)
@mcp.tool()
def describe_table(schema: str, table: str) -> list[dict]:
"""Describe a table/view: columns, types, nullability, defaults."""
if schema not in READABLE_SCHEMAS:
raise ValueError(f"schema must be one of {READABLE_SCHEMAS}")
with _ro_conn() as conn, conn.cursor() as cur:
cur.execute(
"SELECT column_name AS column, data_type AS type, "
"is_nullable AS nullable, column_default AS default "
"FROM information_schema.columns "
"WHERE table_schema = %s AND table_name = %s ORDER BY ordinal_position",
(schema, table),
)
return _rows(cur)
@mcp.tool()
def list_functions(schema: str = "reporting") -> list[dict]:
"""List callable functions (e.g. reporting.fn_*) with their argument signatures."""
if schema not in READABLE_SCHEMAS:
raise ValueError(f"schema must be one of {READABLE_SCHEMAS}")
with _ro_conn() as conn, conn.cursor() as cur:
cur.execute(
"SELECT p.proname AS name, pg_get_function_arguments(p.oid) AS args "
"FROM pg_proc p JOIN pg_namespace n ON n.oid = p.pronamespace "
"WHERE n.nspname = %s ORDER BY 1",
(schema,),
)
return _rows(cur)
_IDENT = re.compile(r"^[a-z_][a-z0-9_]*$", re.IGNORECASE)
@mcp.tool()
def sample_table(schema: str, table: str, n: int = 20) -> dict:
"""Return the first `n` rows of a table/view (convenience over query)."""
if schema not in READABLE_SCHEMAS:
raise ValueError(f"schema must be one of {READABLE_SCHEMAS}")
if not _IDENT.match(table):
raise ValueError("table must be a simple identifier")
return query(f'SELECT * FROM "{schema}"."{table}"', max_rows=n)
# ── Bearer-token auth ─────────────────────────────────────────────────────────
# MCP_AUTH_TOKENS = "alice:tok1,bob:tok2" → {token: name}. Per-analyst tokens make
# access revocable (edit the env + redeploy) and attributable in the logs.
_TOKENS = {
t.split(":", 1)[1]: t.split(":", 1)[0]
for t in os.getenv("MCP_AUTH_TOKENS", "").split(",")
if ":" in t
}
def _lookup_token(token: str) -> str | None:
"""Constant-time token match: compare against every known token so the response
time does not reveal how far a guessed prefix matched. Cheap for a handful of
per-analyst tokens."""
if not token:
return None
match = None
for known, name in _TOKENS.items():
if hmac.compare_digest(token, known):
match = name
return match
class BearerAuth(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
if request.url.path == "/healthz":
return await call_next(request)
auth = request.headers.get("authorization", "")
token = auth[7:] if auth.lower().startswith("bearer ") else ""
caller = _lookup_token(token)
if caller is None:
return JSONResponse({"error": "unauthorized"}, status_code=401)
request.state.caller = caller
return await call_next(request)
async def healthz(_request):
return JSONResponse({"ok": True})
app = mcp.streamable_http_app()
app.add_middleware(BearerAuth)
# Starlette exposes add_route (not a Flask-style @app.route decorator).
app.add_route("/healthz", healthz, methods=["GET"])
if not _TOKENS:
log.warning("MCP_AUTH_TOKENS is empty — every request will be rejected with 401.")
log.info("Analytics MCP starting. Tokens loaded=%d. Readable schemas=%s.", len(_TOKENS), READABLE_SCHEMAS)