fleetanalytics_mcp/analytics_mcp.py
david kiania 1eda59fe06 feat: read-only Fleet Analytics MCP server
Standalone, hosted MCP server that lets the decision & analytics team query
the fleet database (reporting.* / tracksolid.*) from Claude — read-only, for
reporting and decisions, never edit/delete.

- analytics_mcp.py: FastMCP streamable-HTTP server. Tools: query (guarded
  single SELECT/WITH, auto-LIMIT, write/DDL blocked), list_schemas,
  list_tables, describe_table, list_functions, sample_table. Per-analyst
  Bearer auth; /healthz exempt. No ts_shared_rev import (carries no ingestion
  secrets).
- Read-only enforced at four layers: analytics_ro GRANTs,
  default_transaction_read_only=on, rolled-back txn, SQL keyword guard.
- scripts/: analytics_ro_role.sql + bootstrap_analytics_ro.sh (dedicated
  least-privilege role, password in host-only ~/.analytics_ro.pw).
- Dockerfile + pyproject (uv, package=false) for Coolify build; deploy.sh
  manual host fallback (standalone Traefik bridge on the tracksolid_db host).
- docs/ANALYTICS_MCP.{md,html} + README: architecture, deploy runbook,
  add-to-Claude, verification, security notes.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-16 23:43:24 +03:00

279 lines
11 KiB
Python

"""
analytics_mcp_rev.py — Fireside Communications · Read-only Analytics MCP Server
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Hosted MCP server for the decision & analytics team. Exposes the fleet reporting
data (reporting.* + tracksolid.*) to Claude as READ-ONLY query + introspection
tools — for reporting and decisions, never edit/delete.
It is a STANDALONE Traefik-labelled bridge (not Coolify-managed), the same shape
as the dashboard_api staging bridge: it reuses the webhook_receiver image, joins
the `coolify` network, and connects to the internal DB over psycopg2 as the
dedicated read-only `analytics_ro` role (deploy_analytics_mcp.sh sets DATABASE_URL
to that DSN). Served over streamable HTTP with Bearer-token auth.
READ-ONLY is enforced at FOUR layers:
1. the analytics_ro GRANTs (no INSERT/UPDATE/DELETE; not the matview owner)
2. role + connection default_transaction_read_only = on
3. every query runs in a transaction that is ROLLED BACK (never committed)
4. the `query` tool's single-statement / keyword guard (clean errors, not DB faults)
Env:
DATABASE_URL analytics_ro DSN (set by the deploy script)
MCP_AUTH_TOKENS "alice:tok1,bob:tok2" — per-analyst Bearer tokens (revocable + audited)
MCP_MAX_ROWS hard ceiling on rows returned (default 10000)
MCP_POOL_MAX max read-only pool connections (default 8)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
"""
from __future__ import annotations
import logging
import os
import re
import time
from contextlib import contextmanager
import psycopg2
import psycopg2.extras
import psycopg2.pool
from mcp.server.fastmcp import FastMCP
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import JSONResponse
def _get_logger(name: str) -> logging.Logger:
"""Standalone logger mirroring ts_shared_rev's format. Intentionally NOT
importing ts_shared_rev: that module eagerly requires the Tracksolid ingestion
secrets (APP_KEY/SECRET/PWD), which this read-only analytics server has no
business holding."""
root = logging.getLogger("analytics_mcp")
if not root.handlers:
handler = logging.StreamHandler()
handler.setFormatter(
logging.Formatter(
"%(asctime)s [%(levelname)s] %(name)s%(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
)
root.addHandler(handler)
root.setLevel(logging.INFO)
return root.getChild(name)
log = _get_logger("server")
DATABASE_URL = os.environ["DATABASE_URL"] # analytics_ro DSN (set by deploy)
MAX_ROWS_CEIL = int(os.getenv("MCP_MAX_ROWS", "10000"))
READABLE_SCHEMAS = ("reporting", "tracksolid")
# ── Read-only connection pool ────────────────────────────────────────────────
# Force read-only + a statement timeout at the connection level (belt + braces;
# the analytics_ro role already sets these, but a self-contained server is safer
# in case it is ever pointed at a less-restricted DSN).
_pool = psycopg2.pool.ThreadedConnectionPool(
1,
int(os.getenv("MCP_POOL_MAX", "8")),
DATABASE_URL,
options="-c default_transaction_read_only=on -c statement_timeout=30000 -c client_encoding=UTF8",
)
@contextmanager
def _ro_conn():
"""Read-only connection; the transaction is ALWAYS rolled back (never commits)."""
conn = _pool.getconn()
try:
conn.set_session(readonly=True, autocommit=False)
yield conn
finally:
try:
conn.rollback()
finally:
_pool.putconn(conn)
def _rows(cur) -> list[dict]:
"""Materialise the cursor as a list of JSON-safe dicts."""
if cur.description is None:
return []
cols = [d[0] for d in cur.description]
out = []
for row in cur.fetchall():
out.append({c: _jsonable(v) for c, v in zip(cols, row)})
return out
def _jsonable(v):
"""Coerce non-JSON-native values (dates, Decimal, etc.) to str."""
if v is None or isinstance(v, (bool, int, float, str)):
return v
return str(v)
# ── SQL guard for the general query tool ─────────────────────────────────────
# The analytics_ro role + read-only txn already make writes impossible; this guard
# exists to return CLEAN errors (and block multi-statements / SET that could relax
# read-only) instead of letting the DB raise.
_FORBIDDEN = re.compile(
r"\b(insert|update|delete|drop|alter|create|grant|revoke|truncate|copy|call|do|merge|"
r"vacuum|reindex|refresh|comment|lock|set|reset)\b",
re.IGNORECASE,
)
def _strip_comments(sql: str) -> str:
sql = re.sub(r"/\*.*?\*/", " ", sql, flags=re.DOTALL) # block comments
sql = re.sub(r"--[^\n]*", " ", sql) # line comments
return sql.strip()
def _guard(sql: str) -> str:
"""Validate a single read-only statement; return the cleaned statement."""
stripped = _strip_comments(sql)
if not stripped:
raise ValueError("Empty query.")
parts = [p for p in stripped.split(";") if p.strip()] # allow one trailing ;
if len(parts) != 1:
raise ValueError("Only a single statement is allowed.")
stmt = parts[0].strip()
if not re.match(r"^(select|with)\b", stmt, re.IGNORECASE):
raise ValueError("Only SELECT / WITH queries are allowed.")
if _FORBIDDEN.search(stmt):
raise ValueError("Query contains a forbidden (write/DDL) keyword.")
return stmt
# ── MCP server + tools ───────────────────────────────────────────────────────
mcp = FastMCP("fireside-analytics", stateless_http=True)
@mcp.tool()
def query(sql: str, max_rows: int = 1000) -> dict:
"""Run a read-only SELECT/WITH query against the fleet database.
Only the reporting.* and tracksolid.* schemas are readable. Single statement
only; write/DDL is rejected. Returns up to `max_rows` rows (default 1000, hard
cap 10000). A LIMIT is auto-applied when absent. Result: {row_count, truncated, rows}.
"""
stmt = _guard(sql)
cap = max(1, min(int(max_rows), MAX_ROWS_CEIL))
if not re.search(r"\blimit\b", stmt, re.IGNORECASE):
stmt = f"{stmt}\nLIMIT {cap + 1}" # +1 row to detect truncation
t0 = time.monotonic()
with _ro_conn() as conn, conn.cursor() as cur:
cur.execute(stmt)
rows = _rows(cur)
truncated = len(rows) > cap
rows = rows[:cap]
dur_ms = int((time.monotonic() - t0) * 1000)
log.info("query rows=%d trunc=%s %dms :: %s", len(rows), truncated, dur_ms, sql[:200])
return {"row_count": len(rows), "truncated": truncated, "rows": rows}
@mcp.tool()
def list_schemas() -> list[dict]:
"""List the readable schemas (reporting, tracksolid) with their object counts."""
with _ro_conn() as conn, conn.cursor() as cur:
cur.execute(
"SELECT table_schema AS schema, count(*) AS objects "
"FROM information_schema.tables WHERE table_schema = ANY(%s) "
"GROUP BY 1 ORDER BY 1",
(list(READABLE_SCHEMAS),),
)
return _rows(cur)
@mcp.tool()
def list_tables(schema: str) -> list[dict]:
"""List tables + views in a schema (must be reporting or tracksolid)."""
if schema not in READABLE_SCHEMAS:
raise ValueError(f"schema must be one of {READABLE_SCHEMAS}")
with _ro_conn() as conn, conn.cursor() as cur:
cur.execute(
"SELECT table_name AS name, table_type AS kind "
"FROM information_schema.tables WHERE table_schema = %s "
"ORDER BY 1",
(schema,),
)
return _rows(cur)
@mcp.tool()
def describe_table(schema: str, table: str) -> list[dict]:
"""Describe a table/view: columns, types, nullability, defaults."""
if schema not in READABLE_SCHEMAS:
raise ValueError(f"schema must be one of {READABLE_SCHEMAS}")
with _ro_conn() as conn, conn.cursor() as cur:
cur.execute(
"SELECT column_name AS column, data_type AS type, "
"is_nullable AS nullable, column_default AS default "
"FROM information_schema.columns "
"WHERE table_schema = %s AND table_name = %s ORDER BY ordinal_position",
(schema, table),
)
return _rows(cur)
@mcp.tool()
def list_functions(schema: str = "reporting") -> list[dict]:
"""List callable functions (e.g. reporting.fn_*) with their argument signatures."""
if schema not in READABLE_SCHEMAS:
raise ValueError(f"schema must be one of {READABLE_SCHEMAS}")
with _ro_conn() as conn, conn.cursor() as cur:
cur.execute(
"SELECT p.proname AS name, pg_get_function_arguments(p.oid) AS args "
"FROM pg_proc p JOIN pg_namespace n ON n.oid = p.pronamespace "
"WHERE n.nspname = %s ORDER BY 1",
(schema,),
)
return _rows(cur)
_IDENT = re.compile(r"^[a-z_][a-z0-9_]*$", re.IGNORECASE)
@mcp.tool()
def sample_table(schema: str, table: str, n: int = 20) -> dict:
"""Return the first `n` rows of a table/view (convenience over query)."""
if schema not in READABLE_SCHEMAS:
raise ValueError(f"schema must be one of {READABLE_SCHEMAS}")
if not _IDENT.match(table):
raise ValueError("table must be a simple identifier")
return query(f'SELECT * FROM "{schema}"."{table}"', max_rows=n)
# ── Bearer-token auth ─────────────────────────────────────────────────────────
# MCP_AUTH_TOKENS = "alice:tok1,bob:tok2" → {token: name}. Per-analyst tokens make
# access revocable (edit the env + redeploy) and attributable in the logs.
_TOKENS = {
t.split(":", 1)[1]: t.split(":", 1)[0]
for t in os.getenv("MCP_AUTH_TOKENS", "").split(",")
if ":" in t
}
class BearerAuth(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
if request.url.path == "/healthz":
return await call_next(request)
auth = request.headers.get("authorization", "")
token = auth[7:] if auth.lower().startswith("bearer ") else ""
caller = _TOKENS.get(token)
if caller is None:
return JSONResponse({"error": "unauthorized"}, status_code=401)
request.state.caller = caller
return await call_next(request)
async def healthz(_request):
return JSONResponse({"ok": True, "tokens": len(_TOKENS)})
app = mcp.streamable_http_app()
app.add_middleware(BearerAuth)
# Starlette exposes add_route (not a Flask-style @app.route decorator).
app.add_route("/healthz", healthz, methods=["GET"])
if not _TOKENS:
log.warning("MCP_AUTH_TOKENS is empty — every request will be rejected with 401.")
log.info("Analytics MCP starting. Tokens loaded=%d. Readable schemas=%s.", len(_TOKENS), READABLE_SCHEMAS)