Compare commits
No commits in common. "a36542dbc9c09467abeb86c47e7258911f9961b5" and "0355047fdde9c4ce4b59d4425e9e256eb32b8ec8" have entirely different histories.
a36542dbc9
...
0355047fdd
7 changed files with 24 additions and 1375 deletions
|
|
@ -1,5 +1,3 @@
|
||||||
# NOTE: uv.lock is intentionally NOT ignored — the Dockerfile copies it for
|
|
||||||
# reproducible `uv sync --frozen` builds.
|
|
||||||
.git
|
.git
|
||||||
.venv
|
.venv
|
||||||
__pycache__
|
__pycache__
|
||||||
|
|
|
||||||
3
.gitignore
vendored
3
.gitignore
vendored
|
|
@ -7,5 +7,4 @@ __pycache__/
|
||||||
.DS_Store
|
.DS_Store
|
||||||
.ruff_cache/
|
.ruff_cache/
|
||||||
.mypy_cache/
|
.mypy_cache/
|
||||||
# uv.lock IS committed on purpose — the Docker build uses `uv sync --frozen` for
|
uv.lock
|
||||||
# reproducible installs, and the Coolify build clones the repo (it must include it).
|
|
||||||
|
|
|
||||||
17
Dockerfile
17
Dockerfile
|
|
@ -11,26 +11,15 @@ COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/
|
||||||
WORKDIR /app
|
WORKDIR /app
|
||||||
|
|
||||||
# Install ONLY dependencies (flat module — the project itself is not a package).
|
# Install ONLY dependencies (flat module — the project itself is not a package).
|
||||||
# Copy the lockfile and build --frozen so rebuilds are reproducible: without it,
|
COPY pyproject.toml ./
|
||||||
# `uv sync` re-resolves the >= ranges in pyproject.toml and a redeploy could pull a
|
RUN uv sync --no-dev --no-install-project
|
||||||
# newer, behaviour-changed mcp/starlette and break the running server.
|
|
||||||
COPY pyproject.toml uv.lock ./
|
|
||||||
RUN uv sync --no-dev --no-install-project --frozen
|
|
||||||
ENV PATH="/app/.venv/bin:$PATH"
|
ENV PATH="/app/.venv/bin:$PATH"
|
||||||
|
|
||||||
COPY analytics_mcp.py ./
|
COPY analytics_mcp.py ./
|
||||||
|
|
||||||
# Run as a non-root user (least privilege; nothing here needs root).
|
|
||||||
RUN useradd -m -u 10001 app && chown -R app:app /app
|
|
||||||
USER app
|
|
||||||
|
|
||||||
EXPOSE 8892
|
EXPOSE 8892
|
||||||
|
|
||||||
HEALTHCHECK --interval=30s --timeout=3s --start-period=10s --retries=3 \
|
HEALTHCHECK --interval=30s --timeout=3s --start-period=10s --retries=3 \
|
||||||
CMD python -c "import urllib.request,sys; sys.exit(0 if urllib.request.urlopen('http://localhost:8892/healthz').status==200 else 1)" || exit 1
|
CMD python -c "import urllib.request,sys; sys.exit(0 if urllib.request.urlopen('http://localhost:8892/healthz').status==200 else 1)" || exit 1
|
||||||
|
|
||||||
# Single worker: this is a low-traffic read-only proxy for a handful of analysts, and
|
CMD ["uvicorn", "analytics_mcp:app", "--host", "0.0.0.0", "--port", "8892", "--workers", "2"]
|
||||||
# the DB connection budget = workers × MCP_POOL_MAX. One worker (× default pool 8) caps
|
|
||||||
# the MCP at 8 backends instead of 16, which matters on a shared 100-connection DB.
|
|
||||||
# Scale out by raising MCP_POOL_MAX, not workers, so the budget stays obvious.
|
|
||||||
CMD ["uvicorn", "analytics_mcp:app", "--host", "0.0.0.0", "--port", "8892", "--workers", "1"]
|
|
||||||
|
|
|
||||||
133
analytics_mcp.py
133
analytics_mcp.py
|
|
@ -26,11 +26,9 @@ Env:
|
||||||
"""
|
"""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import hmac
|
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
import threading
|
|
||||||
import time
|
import time
|
||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
|
|
||||||
|
|
@ -59,7 +57,6 @@ def _get_logger(name: str) -> logging.Logger:
|
||||||
)
|
)
|
||||||
root.addHandler(handler)
|
root.addHandler(handler)
|
||||||
root.setLevel(logging.INFO)
|
root.setLevel(logging.INFO)
|
||||||
root.propagate = False # don't double-emit through uvicorn's root handler
|
|
||||||
return root.getChild(name)
|
return root.getChild(name)
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -81,89 +78,26 @@ READABLE_SCHEMAS = tuple(
|
||||||
# Force read-only + a statement timeout at the connection level (belt + braces;
|
# Force read-only + a statement timeout at the connection level (belt + braces;
|
||||||
# the analytics_ro role already sets these, but a self-contained server is safer
|
# the analytics_ro role already sets these, but a self-contained server is safer
|
||||||
# in case it is ever pointed at a less-restricted DSN).
|
# in case it is ever pointed at a less-restricted DSN).
|
||||||
POOL_MAX = int(os.getenv("MCP_POOL_MAX", "8"))
|
_pool = psycopg2.pool.ThreadedConnectionPool(
|
||||||
_POOL_OPTS = "-c default_transaction_read_only=on -c statement_timeout=30000 -c client_encoding=UTF8"
|
1,
|
||||||
|
int(os.getenv("MCP_POOL_MAX", "8")),
|
||||||
|
DATABASE_URL,
|
||||||
def _init_pool(retries: int = 5, delay: float = 2.0) -> psycopg2.pool.ThreadedConnectionPool:
|
options="-c default_transaction_read_only=on -c statement_timeout=30000 -c client_encoding=UTF8",
|
||||||
"""Create the pool, retrying so a brief DB outage at deploy time doesn't crash
|
|
||||||
the worker into a boot loop. minconn=0 → no eager connect at import (connections
|
|
||||||
are opened lazily on first use)."""
|
|
||||||
last: Exception | None = None
|
|
||||||
for attempt in range(1, retries + 1):
|
|
||||||
try:
|
|
||||||
return psycopg2.pool.ThreadedConnectionPool(
|
|
||||||
0, POOL_MAX, DATABASE_URL, options=_POOL_OPTS
|
|
||||||
)
|
)
|
||||||
except psycopg2.OperationalError as exc:
|
|
||||||
last = exc
|
|
||||||
log.warning("pool init attempt %d/%d failed: %s", attempt, retries, exc)
|
|
||||||
if attempt < retries:
|
|
||||||
time.sleep(delay)
|
|
||||||
assert last is not None
|
|
||||||
raise last
|
|
||||||
|
|
||||||
|
|
||||||
_pool = _init_pool()
|
|
||||||
# FastMCP runs each sync tool in an anyio worker thread (default ~40). Gate checkouts
|
|
||||||
# behind a bounded semaphore so that >POOL_MAX concurrent queries QUEUE (block in
|
|
||||||
# their worker thread) instead of overflowing the pool and raising PoolError — a 500
|
|
||||||
# to the analyst. The event loop is never blocked; only surplus worker threads wait.
|
|
||||||
_pool_slots = threading.BoundedSemaphore(POOL_MAX)
|
|
||||||
|
|
||||||
# SQLSTATE classes that mean the CONNECTION is gone (vs. a query that merely failed,
|
|
||||||
# e.g. a statement_timeout, which is QueryCanceled / 57014 and leaves the conn usable).
|
|
||||||
# class 08 = connection exception; 57P01/02/03 = admin/crash shutdown.
|
|
||||||
_DISCONNECT_SQLSTATES = frozenset(
|
|
||||||
{"08000", "08003", "08006", "08001", "08004", "08007", "57P01", "57P02", "57P03"}
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def _is_disconnect(exc: Exception) -> bool:
|
|
||||||
"""True only for a genuinely lost connection — so we discard/retry on a dropped
|
|
||||||
socket but NOT on a statement_timeout or other in-session query error."""
|
|
||||||
if isinstance(exc, psycopg2.InterfaceError):
|
|
||||||
return True
|
|
||||||
if isinstance(exc, psycopg2.OperationalError):
|
|
||||||
code = getattr(exc, "pgcode", None)
|
|
||||||
return code is None or code in _DISCONNECT_SQLSTATES # None = socket-level failure
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
@contextmanager
|
@contextmanager
|
||||||
def _ro_conn():
|
def _ro_conn():
|
||||||
"""Read-only connection; the transaction is ALWAYS rolled back (never commits).
|
"""Read-only connection; the transaction is ALWAYS rolled back (never commits)."""
|
||||||
|
|
||||||
Dead connections (DB restart, network blip, crash) are DISCARDED rather than
|
|
||||||
recycled — otherwise a single broken socket poisons the pool and every later
|
|
||||||
query handed that connection fails until the container is recreated. A query that
|
|
||||||
merely errors (e.g. statement_timeout) leaves the connection healthy, so it is
|
|
||||||
rolled back and returned to the pool as normal."""
|
|
||||||
_pool_slots.acquire()
|
|
||||||
try:
|
|
||||||
conn = _pool.getconn()
|
conn = _pool.getconn()
|
||||||
except Exception:
|
|
||||||
_pool_slots.release()
|
|
||||||
raise
|
|
||||||
broken = False
|
|
||||||
try:
|
try:
|
||||||
conn.set_session(readonly=True, autocommit=False)
|
conn.set_session(readonly=True, autocommit=False)
|
||||||
yield conn
|
yield conn
|
||||||
except Exception as exc:
|
|
||||||
broken = _is_disconnect(exc)
|
|
||||||
raise
|
|
||||||
finally:
|
finally:
|
||||||
try:
|
|
||||||
if broken or conn.closed:
|
|
||||||
_pool.putconn(conn, close=True)
|
|
||||||
else:
|
|
||||||
try:
|
try:
|
||||||
conn.rollback()
|
conn.rollback()
|
||||||
_pool.putconn(conn)
|
|
||||||
except (psycopg2.OperationalError, psycopg2.InterfaceError):
|
|
||||||
_pool.putconn(conn, close=True)
|
|
||||||
finally:
|
finally:
|
||||||
_pool_slots.release()
|
_pool.putconn(conn)
|
||||||
|
|
||||||
|
|
||||||
def _rows(cur) -> list[dict]:
|
def _rows(cur) -> list[dict]:
|
||||||
|
|
@ -201,13 +135,6 @@ def _strip_comments(sql: str) -> str:
|
||||||
return sql.strip()
|
return sql.strip()
|
||||||
|
|
||||||
|
|
||||||
def _strip_literals(sql: str) -> str:
|
|
||||||
"""Blank out the contents of single-quoted string literals so the keyword guard
|
|
||||||
does not fire on DATA (e.g. WHERE summary ILIKE '%please delete%'). '' escapes
|
|
||||||
are handled so we don't mis-terminate a literal mid-string."""
|
|
||||||
return re.sub(r"'(?:[^']|'')*'", "''", sql)
|
|
||||||
|
|
||||||
|
|
||||||
def _guard(sql: str) -> str:
|
def _guard(sql: str) -> str:
|
||||||
"""Validate a single read-only statement; return the cleaned statement."""
|
"""Validate a single read-only statement; return the cleaned statement."""
|
||||||
stripped = _strip_comments(sql)
|
stripped = _strip_comments(sql)
|
||||||
|
|
@ -219,11 +146,7 @@ def _guard(sql: str) -> str:
|
||||||
stmt = parts[0].strip()
|
stmt = parts[0].strip()
|
||||||
if not re.match(r"^(select|with)\b", stmt, re.IGNORECASE):
|
if not re.match(r"^(select|with)\b", stmt, re.IGNORECASE):
|
||||||
raise ValueError("Only SELECT / WITH queries are allowed.")
|
raise ValueError("Only SELECT / WITH queries are allowed.")
|
||||||
# Scan with string literals blanked: the blocklist's real job is to reject
|
if _FORBIDDEN.search(stmt):
|
||||||
# data-modifying CTEs (WITH x AS (DELETE ... RETURNING ...)), not to trip over a
|
|
||||||
# keyword that merely appears inside a filter value. (Writes are impossible anyway
|
|
||||||
# via the analytics_ro GRANTs + the read-only, rolled-back transaction.)
|
|
||||||
if _FORBIDDEN.search(_strip_literals(stmt)):
|
|
||||||
raise ValueError("Query contains a forbidden (write/DDL) keyword.")
|
raise ValueError("Query contains a forbidden (write/DDL) keyword.")
|
||||||
return stmt
|
return stmt
|
||||||
|
|
||||||
|
|
@ -256,31 +179,18 @@ mcp = FastMCP("fireside-analytics", stateless_http=True, transport_security=_tra
|
||||||
def query(sql: str, max_rows: int = 1000) -> dict:
|
def query(sql: str, max_rows: int = 1000) -> dict:
|
||||||
"""Run a read-only SELECT/WITH query against the fleet database.
|
"""Run a read-only SELECT/WITH query against the fleet database.
|
||||||
|
|
||||||
Readable schemas are the analytics_ro grant surface (reporting, tracksolid,
|
Only the reporting.* and tracksolid.* schemas are readable. Single statement
|
||||||
tickets, fuel by default). Single statement only; write/DDL is rejected. Returns
|
only; write/DDL is rejected. Returns up to `max_rows` rows (default 1000, hard
|
||||||
up to `max_rows` rows (default 1000, hard cap 10000). A LIMIT is auto-applied when
|
cap 10000). A LIMIT is auto-applied when absent. Result: {row_count, truncated, rows}.
|
||||||
absent. Result: {row_count, truncated, rows}.
|
|
||||||
"""
|
"""
|
||||||
stmt = _guard(sql)
|
stmt = _guard(sql)
|
||||||
cap = max(1, min(int(max_rows), MAX_ROWS_CEIL))
|
cap = max(1, min(int(max_rows), MAX_ROWS_CEIL))
|
||||||
if not re.search(r"\blimit\b", stmt, re.IGNORECASE):
|
if not re.search(r"\blimit\b", stmt, re.IGNORECASE):
|
||||||
stmt = f"{stmt}\nLIMIT {cap + 1}" # +1 row to detect truncation
|
stmt = f"{stmt}\nLIMIT {cap + 1}" # +1 row to detect truncation
|
||||||
t0 = time.monotonic()
|
t0 = time.monotonic()
|
||||||
# Retry once on a dead connection (only): _ro_conn() discards broken sockets, so a
|
|
||||||
# second attempt gets a fresh one — making a recycled-but-stale pool connection
|
|
||||||
# invisible to the analyst. A real query error (statement_timeout, bad SQL) is NOT
|
|
||||||
# retried; it surfaces immediately.
|
|
||||||
rows = []
|
|
||||||
for attempt in (1, 2):
|
|
||||||
try:
|
|
||||||
with _ro_conn() as conn, conn.cursor() as cur:
|
with _ro_conn() as conn, conn.cursor() as cur:
|
||||||
cur.execute(stmt)
|
cur.execute(stmt)
|
||||||
rows = _rows(cur)
|
rows = _rows(cur)
|
||||||
break
|
|
||||||
except Exception as exc:
|
|
||||||
if attempt == 2 or not _is_disconnect(exc):
|
|
||||||
raise
|
|
||||||
log.warning("stale DB connection on attempt %d — retrying once", attempt)
|
|
||||||
truncated = len(rows) > cap
|
truncated = len(rows) > cap
|
||||||
rows = rows[:cap]
|
rows = rows[:cap]
|
||||||
dur_ms = int((time.monotonic() - t0) * 1000)
|
dur_ms = int((time.monotonic() - t0) * 1000)
|
||||||
|
|
@ -290,7 +200,7 @@ def query(sql: str, max_rows: int = 1000) -> dict:
|
||||||
|
|
||||||
@mcp.tool()
|
@mcp.tool()
|
||||||
def list_schemas() -> list[dict]:
|
def list_schemas() -> list[dict]:
|
||||||
"""List the readable schemas (reporting, tracksolid, tickets, fuel) with object counts."""
|
"""List the readable schemas (reporting, tracksolid) with their object counts."""
|
||||||
with _ro_conn() as conn, conn.cursor() as cur:
|
with _ro_conn() as conn, conn.cursor() as cur:
|
||||||
cur.execute(
|
cur.execute(
|
||||||
"SELECT table_schema AS schema, count(*) AS objects "
|
"SELECT table_schema AS schema, count(*) AS objects "
|
||||||
|
|
@ -303,7 +213,7 @@ def list_schemas() -> list[dict]:
|
||||||
|
|
||||||
@mcp.tool()
|
@mcp.tool()
|
||||||
def list_tables(schema: str) -> list[dict]:
|
def list_tables(schema: str) -> list[dict]:
|
||||||
"""List tables + views in a schema (must be one of the readable schemas)."""
|
"""List tables + views in a schema (must be reporting or tracksolid)."""
|
||||||
if schema not in READABLE_SCHEMAS:
|
if schema not in READABLE_SCHEMAS:
|
||||||
raise ValueError(f"schema must be one of {READABLE_SCHEMAS}")
|
raise ValueError(f"schema must be one of {READABLE_SCHEMAS}")
|
||||||
with _ro_conn() as conn, conn.cursor() as cur:
|
with _ro_conn() as conn, conn.cursor() as cur:
|
||||||
|
|
@ -370,26 +280,13 @@ _TOKENS = {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def _lookup_token(token: str) -> str | None:
|
|
||||||
"""Constant-time token match: compare against every known token so the response
|
|
||||||
time does not reveal how far a guessed prefix matched. Cheap for a handful of
|
|
||||||
per-analyst tokens."""
|
|
||||||
if not token:
|
|
||||||
return None
|
|
||||||
match = None
|
|
||||||
for known, name in _TOKENS.items():
|
|
||||||
if hmac.compare_digest(token, known):
|
|
||||||
match = name
|
|
||||||
return match
|
|
||||||
|
|
||||||
|
|
||||||
class BearerAuth(BaseHTTPMiddleware):
|
class BearerAuth(BaseHTTPMiddleware):
|
||||||
async def dispatch(self, request, call_next):
|
async def dispatch(self, request, call_next):
|
||||||
if request.url.path == "/healthz":
|
if request.url.path == "/healthz":
|
||||||
return await call_next(request)
|
return await call_next(request)
|
||||||
auth = request.headers.get("authorization", "")
|
auth = request.headers.get("authorization", "")
|
||||||
token = auth[7:] if auth.lower().startswith("bearer ") else ""
|
token = auth[7:] if auth.lower().startswith("bearer ") else ""
|
||||||
caller = _lookup_token(token)
|
caller = _TOKENS.get(token)
|
||||||
if caller is None:
|
if caller is None:
|
||||||
return JSONResponse({"error": "unauthorized"}, status_code=401)
|
return JSONResponse({"error": "unauthorized"}, status_code=401)
|
||||||
request.state.caller = caller
|
request.state.caller = caller
|
||||||
|
|
@ -397,7 +294,7 @@ class BearerAuth(BaseHTTPMiddleware):
|
||||||
|
|
||||||
|
|
||||||
async def healthz(_request):
|
async def healthz(_request):
|
||||||
return JSONResponse({"ok": True})
|
return JSONResponse({"ok": True, "tokens": len(_TOKENS)})
|
||||||
|
|
||||||
|
|
||||||
app = mcp.streamable_http_app()
|
app = mcp.streamable_http_app()
|
||||||
|
|
|
||||||
|
|
@ -65,18 +65,14 @@ docker rm -f "$NAME" 2>/dev/null || true
|
||||||
docker run -d --name "$NAME" --restart unless-stopped \
|
docker run -d --name "$NAME" --restart unless-stopped \
|
||||||
--network "$APPNET" \
|
--network "$APPNET" \
|
||||||
--env-file "$ENV_FILE" \
|
--env-file "$ENV_FILE" \
|
||||||
--log-opt max-size=10m --log-opt max-file=5 \
|
|
||||||
--label 'traefik.enable=true' \
|
--label 'traefik.enable=true' \
|
||||||
--label 'traefik.docker.network=coolify' \
|
--label 'traefik.docker.network=coolify' \
|
||||||
--label 'traefik.http.middlewares.redirect-to-https.redirectscheme.scheme=https' \
|
--label 'traefik.http.middlewares.redirect-to-https.redirectscheme.scheme=https' \
|
||||||
--label 'traefik.http.middlewares.fleetmcp-ratelimit.ratelimit.average=30' \
|
|
||||||
--label 'traefik.http.middlewares.fleetmcp-ratelimit.ratelimit.burst=60' \
|
|
||||||
--label "traefik.http.routers.http-0-fleetmcp.entryPoints=http" \
|
--label "traefik.http.routers.http-0-fleetmcp.entryPoints=http" \
|
||||||
--label "traefik.http.routers.http-0-fleetmcp.middlewares=redirect-to-https" \
|
--label "traefik.http.routers.http-0-fleetmcp.middlewares=redirect-to-https" \
|
||||||
--label "traefik.http.routers.http-0-fleetmcp.rule=Host(\`${HOST_DOMAIN}\`)" \
|
--label "traefik.http.routers.http-0-fleetmcp.rule=Host(\`${HOST_DOMAIN}\`)" \
|
||||||
--label "traefik.http.routers.https-0-fleetmcp.entryPoints=https" \
|
--label "traefik.http.routers.https-0-fleetmcp.entryPoints=https" \
|
||||||
--label "traefik.http.routers.https-0-fleetmcp.rule=Host(\`${HOST_DOMAIN}\`)" \
|
--label "traefik.http.routers.https-0-fleetmcp.rule=Host(\`${HOST_DOMAIN}\`)" \
|
||||||
--label "traefik.http.routers.https-0-fleetmcp.middlewares=fleetmcp-ratelimit" \
|
|
||||||
--label "traefik.http.routers.https-0-fleetmcp.tls=true" \
|
--label "traefik.http.routers.https-0-fleetmcp.tls=true" \
|
||||||
--label "traefik.http.routers.https-0-fleetmcp.tls.certresolver=letsencrypt" \
|
--label "traefik.http.routers.https-0-fleetmcp.tls.certresolver=letsencrypt" \
|
||||||
--label "traefik.http.services.fleetmcp.loadbalancer.server.port=${PORT}" \
|
--label "traefik.http.services.fleetmcp.loadbalancer.server.port=${PORT}" \
|
||||||
|
|
|
||||||
|
|
@ -62,9 +62,3 @@ ALTER DEFAULT PRIVILEGES FOR ROLE postgres IN SCHEMA fuel GRANT EX
|
||||||
ALTER ROLE analytics_ro SET default_transaction_read_only = on;
|
ALTER ROLE analytics_ro SET default_transaction_read_only = on;
|
||||||
ALTER ROLE analytics_ro SET statement_timeout = '30s';
|
ALTER ROLE analytics_ro SET statement_timeout = '30s';
|
||||||
ALTER ROLE analytics_ro SET idle_in_transaction_session_timeout = '60s';
|
ALTER ROLE analytics_ro SET idle_in_transaction_session_timeout = '60s';
|
||||||
-- Cap idle POOLED connections too (these sit idle but NOT in a transaction, so the
|
|
||||||
-- idle_in_transaction timeout never reaps them). On a shared 100-connection DB this
|
|
||||||
-- returns slots the MCP isn't using. Safe with the server's dead-connection handling:
|
|
||||||
-- a reaped pooled connection is discarded + transparently retried, not surfaced as an
|
|
||||||
-- error. (Requires PostgreSQL 14+.)
|
|
||||||
ALTER ROLE analytics_ro SET idle_session_timeout = '5min';
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue