Compare commits
6 commits
infra/pgbo
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 2f421d7439 | |||
|
|
af6fdbcd3f | ||
|
|
fae40942a2 | ||
| a36542dbc9 | |||
|
|
c02c127798 | ||
|
|
5e3fc3910b |
13 changed files with 1412 additions and 301 deletions
|
|
@ -1,3 +1,5 @@
|
||||||
|
# NOTE: uv.lock is intentionally NOT ignored — the Dockerfile copies it for
|
||||||
|
# reproducible `uv sync --frozen` builds.
|
||||||
.git
|
.git
|
||||||
.venv
|
.venv
|
||||||
__pycache__
|
__pycache__
|
||||||
|
|
|
||||||
3
.gitignore
vendored
3
.gitignore
vendored
|
|
@ -7,4 +7,5 @@ __pycache__/
|
||||||
.DS_Store
|
.DS_Store
|
||||||
.ruff_cache/
|
.ruff_cache/
|
||||||
.mypy_cache/
|
.mypy_cache/
|
||||||
uv.lock
|
# uv.lock IS committed on purpose — the Docker build uses `uv sync --frozen` for
|
||||||
|
# reproducible installs, and the Coolify build clones the repo (it must include it).
|
||||||
|
|
|
||||||
17
Dockerfile
17
Dockerfile
|
|
@ -11,15 +11,26 @@ COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/
|
||||||
WORKDIR /app
|
WORKDIR /app
|
||||||
|
|
||||||
# Install ONLY dependencies (flat module — the project itself is not a package).
|
# Install ONLY dependencies (flat module — the project itself is not a package).
|
||||||
COPY pyproject.toml ./
|
# Copy the lockfile and build --frozen so rebuilds are reproducible: without it,
|
||||||
RUN uv sync --no-dev --no-install-project
|
# `uv sync` re-resolves the >= ranges in pyproject.toml and a redeploy could pull a
|
||||||
|
# newer, behaviour-changed mcp/starlette and break the running server.
|
||||||
|
COPY pyproject.toml uv.lock ./
|
||||||
|
RUN uv sync --no-dev --no-install-project --frozen
|
||||||
ENV PATH="/app/.venv/bin:$PATH"
|
ENV PATH="/app/.venv/bin:$PATH"
|
||||||
|
|
||||||
COPY analytics_mcp.py ./
|
COPY analytics_mcp.py ./
|
||||||
|
|
||||||
|
# Run as a non-root user (least privilege; nothing here needs root).
|
||||||
|
RUN useradd -m -u 10001 app && chown -R app:app /app
|
||||||
|
USER app
|
||||||
|
|
||||||
EXPOSE 8892
|
EXPOSE 8892
|
||||||
|
|
||||||
HEALTHCHECK --interval=30s --timeout=3s --start-period=10s --retries=3 \
|
HEALTHCHECK --interval=30s --timeout=3s --start-period=10s --retries=3 \
|
||||||
CMD python -c "import urllib.request,sys; sys.exit(0 if urllib.request.urlopen('http://localhost:8892/healthz').status==200 else 1)" || exit 1
|
CMD python -c "import urllib.request,sys; sys.exit(0 if urllib.request.urlopen('http://localhost:8892/healthz').status==200 else 1)" || exit 1
|
||||||
|
|
||||||
CMD ["uvicorn", "analytics_mcp:app", "--host", "0.0.0.0", "--port", "8892", "--workers", "2"]
|
# Single worker: this is a low-traffic read-only proxy for a handful of analysts, and
|
||||||
|
# the DB connection budget = workers × MCP_POOL_MAX. One worker (× default pool 8) caps
|
||||||
|
# the MCP at 8 backends instead of 16, which matters on a shared 100-connection DB.
|
||||||
|
# Scale out by raising MCP_POOL_MAX, not workers, so the budget stays obvious.
|
||||||
|
CMD ["uvicorn", "analytics_mcp:app", "--host", "0.0.0.0", "--port", "8892", "--workers", "1"]
|
||||||
|
|
|
||||||
146
analytics_mcp.py
146
analytics_mcp.py
|
|
@ -26,9 +26,12 @@ Env:
|
||||||
"""
|
"""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import contextvars
|
||||||
|
import hmac
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
|
import threading
|
||||||
import time
|
import time
|
||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
|
|
||||||
|
|
@ -57,11 +60,18 @@ def _get_logger(name: str) -> logging.Logger:
|
||||||
)
|
)
|
||||||
root.addHandler(handler)
|
root.addHandler(handler)
|
||||||
root.setLevel(logging.INFO)
|
root.setLevel(logging.INFO)
|
||||||
|
root.propagate = False # don't double-emit through uvicorn's root handler
|
||||||
return root.getChild(name)
|
return root.getChild(name)
|
||||||
|
|
||||||
|
|
||||||
log = _get_logger("server")
|
log = _get_logger("server")
|
||||||
|
|
||||||
|
# Per-request caller name, set by BearerAuth from the matched token so the tools can
|
||||||
|
# attribute each query to an analyst in the logs. A ContextVar (not a tool arg) because
|
||||||
|
# FastMCP tools never receive the HTTP request; anyio propagates the context into the
|
||||||
|
# worker thread that runs each sync tool. Defaults to "?" if auth ever didn't run.
|
||||||
|
_caller_var: contextvars.ContextVar[str] = contextvars.ContextVar("caller", default="?")
|
||||||
|
|
||||||
DATABASE_URL = os.environ["DATABASE_URL"] # analytics_ro DSN (set by deploy)
|
DATABASE_URL = os.environ["DATABASE_URL"] # analytics_ro DSN (set by deploy)
|
||||||
MAX_ROWS_CEIL = int(os.getenv("MCP_MAX_ROWS", "10000"))
|
MAX_ROWS_CEIL = int(os.getenv("MCP_MAX_ROWS", "10000"))
|
||||||
# Schemas the introspection helpers (list_tables/describe_table/sample_table) expose.
|
# Schemas the introspection helpers (list_tables/describe_table/sample_table) expose.
|
||||||
|
|
@ -78,26 +88,89 @@ READABLE_SCHEMAS = tuple(
|
||||||
# Force read-only + a statement timeout at the connection level (belt + braces;
|
# Force read-only + a statement timeout at the connection level (belt + braces;
|
||||||
# the analytics_ro role already sets these, but a self-contained server is safer
|
# the analytics_ro role already sets these, but a self-contained server is safer
|
||||||
# in case it is ever pointed at a less-restricted DSN).
|
# in case it is ever pointed at a less-restricted DSN).
|
||||||
_pool = psycopg2.pool.ThreadedConnectionPool(
|
POOL_MAX = int(os.getenv("MCP_POOL_MAX", "8"))
|
||||||
1,
|
_POOL_OPTS = "-c default_transaction_read_only=on -c statement_timeout=30000 -c client_encoding=UTF8"
|
||||||
int(os.getenv("MCP_POOL_MAX", "8")),
|
|
||||||
DATABASE_URL,
|
|
||||||
options="-c default_transaction_read_only=on -c statement_timeout=30000 -c client_encoding=UTF8",
|
def _init_pool(retries: int = 5, delay: float = 2.0) -> psycopg2.pool.ThreadedConnectionPool:
|
||||||
|
"""Create the pool, retrying so a brief DB outage at deploy time doesn't crash
|
||||||
|
the worker into a boot loop. minconn=0 → no eager connect at import (connections
|
||||||
|
are opened lazily on first use)."""
|
||||||
|
last: Exception | None = None
|
||||||
|
for attempt in range(1, retries + 1):
|
||||||
|
try:
|
||||||
|
return psycopg2.pool.ThreadedConnectionPool(
|
||||||
|
0, POOL_MAX, DATABASE_URL, options=_POOL_OPTS
|
||||||
|
)
|
||||||
|
except psycopg2.OperationalError as exc:
|
||||||
|
last = exc
|
||||||
|
log.warning("pool init attempt %d/%d failed: %s", attempt, retries, exc)
|
||||||
|
if attempt < retries:
|
||||||
|
time.sleep(delay)
|
||||||
|
assert last is not None
|
||||||
|
raise last
|
||||||
|
|
||||||
|
|
||||||
|
_pool = _init_pool()
|
||||||
|
# FastMCP runs each sync tool in an anyio worker thread (default ~40). Gate checkouts
|
||||||
|
# behind a bounded semaphore so that >POOL_MAX concurrent queries QUEUE (block in
|
||||||
|
# their worker thread) instead of overflowing the pool and raising PoolError — a 500
|
||||||
|
# to the analyst. The event loop is never blocked; only surplus worker threads wait.
|
||||||
|
_pool_slots = threading.BoundedSemaphore(POOL_MAX)
|
||||||
|
|
||||||
|
# SQLSTATE classes that mean the CONNECTION is gone (vs. a query that merely failed,
|
||||||
|
# e.g. a statement_timeout, which is QueryCanceled / 57014 and leaves the conn usable).
|
||||||
|
# class 08 = connection exception; 57P01/02/03 = admin/crash shutdown.
|
||||||
|
_DISCONNECT_SQLSTATES = frozenset(
|
||||||
|
{"08000", "08003", "08006", "08001", "08004", "08007", "57P01", "57P02", "57P03"}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _is_disconnect(exc: Exception) -> bool:
|
||||||
|
"""True only for a genuinely lost connection — so we discard/retry on a dropped
|
||||||
|
socket but NOT on a statement_timeout or other in-session query error."""
|
||||||
|
if isinstance(exc, psycopg2.InterfaceError):
|
||||||
|
return True
|
||||||
|
if isinstance(exc, psycopg2.OperationalError):
|
||||||
|
code = getattr(exc, "pgcode", None)
|
||||||
|
return code is None or code in _DISCONNECT_SQLSTATES # None = socket-level failure
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
@contextmanager
|
@contextmanager
|
||||||
def _ro_conn():
|
def _ro_conn():
|
||||||
"""Read-only connection; the transaction is ALWAYS rolled back (never commits)."""
|
"""Read-only connection; the transaction is ALWAYS rolled back (never commits).
|
||||||
|
|
||||||
|
Dead connections (DB restart, network blip, crash) are DISCARDED rather than
|
||||||
|
recycled — otherwise a single broken socket poisons the pool and every later
|
||||||
|
query handed that connection fails until the container is recreated. A query that
|
||||||
|
merely errors (e.g. statement_timeout) leaves the connection healthy, so it is
|
||||||
|
rolled back and returned to the pool as normal."""
|
||||||
|
_pool_slots.acquire()
|
||||||
|
try:
|
||||||
conn = _pool.getconn()
|
conn = _pool.getconn()
|
||||||
|
except Exception:
|
||||||
|
_pool_slots.release()
|
||||||
|
raise
|
||||||
|
broken = False
|
||||||
try:
|
try:
|
||||||
conn.set_session(readonly=True, autocommit=False)
|
conn.set_session(readonly=True, autocommit=False)
|
||||||
yield conn
|
yield conn
|
||||||
|
except Exception as exc:
|
||||||
|
broken = _is_disconnect(exc)
|
||||||
|
raise
|
||||||
finally:
|
finally:
|
||||||
|
try:
|
||||||
|
if broken or conn.closed:
|
||||||
|
_pool.putconn(conn, close=True)
|
||||||
|
else:
|
||||||
try:
|
try:
|
||||||
conn.rollback()
|
conn.rollback()
|
||||||
finally:
|
|
||||||
_pool.putconn(conn)
|
_pool.putconn(conn)
|
||||||
|
except (psycopg2.OperationalError, psycopg2.InterfaceError):
|
||||||
|
_pool.putconn(conn, close=True)
|
||||||
|
finally:
|
||||||
|
_pool_slots.release()
|
||||||
|
|
||||||
|
|
||||||
def _rows(cur) -> list[dict]:
|
def _rows(cur) -> list[dict]:
|
||||||
|
|
@ -135,6 +208,13 @@ def _strip_comments(sql: str) -> str:
|
||||||
return sql.strip()
|
return sql.strip()
|
||||||
|
|
||||||
|
|
||||||
|
def _strip_literals(sql: str) -> str:
|
||||||
|
"""Blank out the contents of single-quoted string literals so the keyword guard
|
||||||
|
does not fire on DATA (e.g. WHERE summary ILIKE '%please delete%'). '' escapes
|
||||||
|
are handled so we don't mis-terminate a literal mid-string."""
|
||||||
|
return re.sub(r"'(?:[^']|'')*'", "''", sql)
|
||||||
|
|
||||||
|
|
||||||
def _guard(sql: str) -> str:
|
def _guard(sql: str) -> str:
|
||||||
"""Validate a single read-only statement; return the cleaned statement."""
|
"""Validate a single read-only statement; return the cleaned statement."""
|
||||||
stripped = _strip_comments(sql)
|
stripped = _strip_comments(sql)
|
||||||
|
|
@ -146,7 +226,11 @@ def _guard(sql: str) -> str:
|
||||||
stmt = parts[0].strip()
|
stmt = parts[0].strip()
|
||||||
if not re.match(r"^(select|with)\b", stmt, re.IGNORECASE):
|
if not re.match(r"^(select|with)\b", stmt, re.IGNORECASE):
|
||||||
raise ValueError("Only SELECT / WITH queries are allowed.")
|
raise ValueError("Only SELECT / WITH queries are allowed.")
|
||||||
if _FORBIDDEN.search(stmt):
|
# Scan with string literals blanked: the blocklist's real job is to reject
|
||||||
|
# data-modifying CTEs (WITH x AS (DELETE ... RETURNING ...)), not to trip over a
|
||||||
|
# keyword that merely appears inside a filter value. (Writes are impossible anyway
|
||||||
|
# via the analytics_ro GRANTs + the read-only, rolled-back transaction.)
|
||||||
|
if _FORBIDDEN.search(_strip_literals(stmt)):
|
||||||
raise ValueError("Query contains a forbidden (write/DDL) keyword.")
|
raise ValueError("Query contains a forbidden (write/DDL) keyword.")
|
||||||
return stmt
|
return stmt
|
||||||
|
|
||||||
|
|
@ -179,28 +263,44 @@ mcp = FastMCP("fireside-analytics", stateless_http=True, transport_security=_tra
|
||||||
def query(sql: str, max_rows: int = 1000) -> dict:
|
def query(sql: str, max_rows: int = 1000) -> dict:
|
||||||
"""Run a read-only SELECT/WITH query against the fleet database.
|
"""Run a read-only SELECT/WITH query against the fleet database.
|
||||||
|
|
||||||
Only the reporting.* and tracksolid.* schemas are readable. Single statement
|
Readable schemas are the analytics_ro grant surface (reporting, tracksolid,
|
||||||
only; write/DDL is rejected. Returns up to `max_rows` rows (default 1000, hard
|
tickets, fuel by default). Single statement only; write/DDL is rejected. Returns
|
||||||
cap 10000). A LIMIT is auto-applied when absent. Result: {row_count, truncated, rows}.
|
up to `max_rows` rows (default 1000, hard cap 10000). A LIMIT is auto-applied when
|
||||||
|
absent. Result: {row_count, truncated, rows}.
|
||||||
"""
|
"""
|
||||||
stmt = _guard(sql)
|
stmt = _guard(sql)
|
||||||
cap = max(1, min(int(max_rows), MAX_ROWS_CEIL))
|
cap = max(1, min(int(max_rows), MAX_ROWS_CEIL))
|
||||||
if not re.search(r"\blimit\b", stmt, re.IGNORECASE):
|
if not re.search(r"\blimit\b", stmt, re.IGNORECASE):
|
||||||
stmt = f"{stmt}\nLIMIT {cap + 1}" # +1 row to detect truncation
|
stmt = f"{stmt}\nLIMIT {cap + 1}" # +1 row to detect truncation
|
||||||
t0 = time.monotonic()
|
t0 = time.monotonic()
|
||||||
|
# Retry once on a dead connection (only): _ro_conn() discards broken sockets, so a
|
||||||
|
# second attempt gets a fresh one — making a recycled-but-stale pool connection
|
||||||
|
# invisible to the analyst. A real query error (statement_timeout, bad SQL) is NOT
|
||||||
|
# retried; it surfaces immediately.
|
||||||
|
rows = []
|
||||||
|
for attempt in (1, 2):
|
||||||
|
try:
|
||||||
with _ro_conn() as conn, conn.cursor() as cur:
|
with _ro_conn() as conn, conn.cursor() as cur:
|
||||||
cur.execute(stmt)
|
cur.execute(stmt)
|
||||||
rows = _rows(cur)
|
rows = _rows(cur)
|
||||||
|
break
|
||||||
|
except Exception as exc:
|
||||||
|
if attempt == 2 or not _is_disconnect(exc):
|
||||||
|
raise
|
||||||
|
log.warning("stale DB connection on attempt %d — retrying once", attempt)
|
||||||
truncated = len(rows) > cap
|
truncated = len(rows) > cap
|
||||||
rows = rows[:cap]
|
rows = rows[:cap]
|
||||||
dur_ms = int((time.monotonic() - t0) * 1000)
|
dur_ms = int((time.monotonic() - t0) * 1000)
|
||||||
log.info("query rows=%d trunc=%s %dms :: %s", len(rows), truncated, dur_ms, sql[:200])
|
log.info(
|
||||||
|
"query caller=%s rows=%d trunc=%s %dms :: %s",
|
||||||
|
_caller_var.get(), len(rows), truncated, dur_ms, sql[:200],
|
||||||
|
)
|
||||||
return {"row_count": len(rows), "truncated": truncated, "rows": rows}
|
return {"row_count": len(rows), "truncated": truncated, "rows": rows}
|
||||||
|
|
||||||
|
|
||||||
@mcp.tool()
|
@mcp.tool()
|
||||||
def list_schemas() -> list[dict]:
|
def list_schemas() -> list[dict]:
|
||||||
"""List the readable schemas (reporting, tracksolid) with their object counts."""
|
"""List the readable schemas (reporting, tracksolid, tickets, fuel) with object counts."""
|
||||||
with _ro_conn() as conn, conn.cursor() as cur:
|
with _ro_conn() as conn, conn.cursor() as cur:
|
||||||
cur.execute(
|
cur.execute(
|
||||||
"SELECT table_schema AS schema, count(*) AS objects "
|
"SELECT table_schema AS schema, count(*) AS objects "
|
||||||
|
|
@ -213,7 +313,7 @@ def list_schemas() -> list[dict]:
|
||||||
|
|
||||||
@mcp.tool()
|
@mcp.tool()
|
||||||
def list_tables(schema: str) -> list[dict]:
|
def list_tables(schema: str) -> list[dict]:
|
||||||
"""List tables + views in a schema (must be reporting or tracksolid)."""
|
"""List tables + views in a schema (must be one of the readable schemas)."""
|
||||||
if schema not in READABLE_SCHEMAS:
|
if schema not in READABLE_SCHEMAS:
|
||||||
raise ValueError(f"schema must be one of {READABLE_SCHEMAS}")
|
raise ValueError(f"schema must be one of {READABLE_SCHEMAS}")
|
||||||
with _ro_conn() as conn, conn.cursor() as cur:
|
with _ro_conn() as conn, conn.cursor() as cur:
|
||||||
|
|
@ -280,21 +380,35 @@ _TOKENS = {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _lookup_token(token: str) -> str | None:
|
||||||
|
"""Constant-time token match: compare against every known token so the response
|
||||||
|
time does not reveal how far a guessed prefix matched. Cheap for a handful of
|
||||||
|
per-analyst tokens."""
|
||||||
|
if not token:
|
||||||
|
return None
|
||||||
|
match = None
|
||||||
|
for known, name in _TOKENS.items():
|
||||||
|
if hmac.compare_digest(token, known):
|
||||||
|
match = name
|
||||||
|
return match
|
||||||
|
|
||||||
|
|
||||||
class BearerAuth(BaseHTTPMiddleware):
|
class BearerAuth(BaseHTTPMiddleware):
|
||||||
async def dispatch(self, request, call_next):
|
async def dispatch(self, request, call_next):
|
||||||
if request.url.path == "/healthz":
|
if request.url.path == "/healthz":
|
||||||
return await call_next(request)
|
return await call_next(request)
|
||||||
auth = request.headers.get("authorization", "")
|
auth = request.headers.get("authorization", "")
|
||||||
token = auth[7:] if auth.lower().startswith("bearer ") else ""
|
token = auth[7:] if auth.lower().startswith("bearer ") else ""
|
||||||
caller = _TOKENS.get(token)
|
caller = _lookup_token(token)
|
||||||
if caller is None:
|
if caller is None:
|
||||||
return JSONResponse({"error": "unauthorized"}, status_code=401)
|
return JSONResponse({"error": "unauthorized"}, status_code=401)
|
||||||
request.state.caller = caller
|
request.state.caller = caller
|
||||||
|
_caller_var.set(caller) # so the tools can attribute the query in the logs
|
||||||
return await call_next(request)
|
return await call_next(request)
|
||||||
|
|
||||||
|
|
||||||
async def healthz(_request):
|
async def healthz(_request):
|
||||||
return JSONResponse({"ok": True, "tokens": len(_TOKENS)})
|
return JSONResponse({"ok": True})
|
||||||
|
|
||||||
|
|
||||||
app = mcp.streamable_http_app()
|
app = mcp.streamable_http_app()
|
||||||
|
|
|
||||||
34
deploy.sh
34
deploy.sh
|
|
@ -24,6 +24,20 @@ set -euo pipefail
|
||||||
NAME=analytics_mcp
|
NAME=analytics_mcp
|
||||||
PORT=8892
|
PORT=8892
|
||||||
HOST_DOMAIN="${HOST_DOMAIN:-fleetmcp.fivetitude.com}" # prod: fleetmcp.rahamafresh.com
|
HOST_DOMAIN="${HOST_DOMAIN:-fleetmcp.fivetitude.com}" # prod: fleetmcp.rahamafresh.com
|
||||||
|
# Comma-separated list of every domain this service answers on (defaults to
|
||||||
|
# HOST_DOMAIN). All are folded into ONE Traefik router rule so a single cert
|
||||||
|
# covers them and connectors on either domain keep working.
|
||||||
|
HOST_DOMAINS="${HOST_DOMAINS:-$HOST_DOMAIN}"
|
||||||
|
BT='`'
|
||||||
|
RULE=""
|
||||||
|
IFS=',' read -ra _DOMS <<< "$HOST_DOMAINS"
|
||||||
|
for _d in "${_DOMS[@]}"; do
|
||||||
|
_d="${_d// /}"
|
||||||
|
if [ -n "$_d" ]; then
|
||||||
|
seg="Host(${BT}${_d}${BT})"
|
||||||
|
if [ -z "$RULE" ]; then RULE="$seg"; else RULE="$RULE || $seg"; fi
|
||||||
|
fi
|
||||||
|
done
|
||||||
IMAGE="fleetanalytics-mcp:latest"
|
IMAGE="fleetanalytics-mcp:latest"
|
||||||
ENV_FILE="$(pwd)/.deploy.env"
|
ENV_FILE="$(pwd)/.deploy.env"
|
||||||
|
|
||||||
|
|
@ -53,9 +67,15 @@ RO_PW=$(cat "${ANALYTICS_RO_PW_FILE:-$HOME/.analytics_ro.pw}" 2>/dev/null || tru
|
||||||
HOSTPART="${SRC_DB_URL#*@}" # host:port/dbname[?params]
|
HOSTPART="${SRC_DB_URL#*@}" # host:port/dbname[?params]
|
||||||
RO_DB_URL="postgresql://analytics_ro:${RO_PW}@${HOSTPART}"
|
RO_DB_URL="postgresql://analytics_ro:${RO_PW}@${HOSTPART}"
|
||||||
|
|
||||||
# Build the image from this repo.
|
# Build the image from this repo (SKIP_BUILD=1 reuses the existing image for a
|
||||||
echo "Building $IMAGE ..."
|
# labels/env-only change — no new code is pulled in).
|
||||||
docker build -t "$IMAGE" .
|
if [ "${SKIP_BUILD:-0}" = "1" ]; then
|
||||||
|
echo "SKIP_BUILD=1 — reusing existing $IMAGE (no rebuild)."
|
||||||
|
docker image inspect "$IMAGE" >/dev/null 2>&1 || { echo "ERROR: $IMAGE not present"; exit 1; }
|
||||||
|
else
|
||||||
|
echo "Building $IMAGE ..."
|
||||||
|
docker build -t "$IMAGE" .
|
||||||
|
fi
|
||||||
|
|
||||||
# Minimal env (read-only DSN + auth only — no Tracksolid ingestion secrets).
|
# Minimal env (read-only DSN + auth only — no Tracksolid ingestion secrets).
|
||||||
{ echo "DATABASE_URL=${RO_DB_URL}"; echo "MCP_AUTH_TOKENS=${MCP_AUTH_TOKENS}"; } > "$ENV_FILE"
|
{ echo "DATABASE_URL=${RO_DB_URL}"; echo "MCP_AUTH_TOKENS=${MCP_AUTH_TOKENS}"; } > "$ENV_FILE"
|
||||||
|
|
@ -65,14 +85,18 @@ docker rm -f "$NAME" 2>/dev/null || true
|
||||||
docker run -d --name "$NAME" --restart unless-stopped \
|
docker run -d --name "$NAME" --restart unless-stopped \
|
||||||
--network "$APPNET" \
|
--network "$APPNET" \
|
||||||
--env-file "$ENV_FILE" \
|
--env-file "$ENV_FILE" \
|
||||||
|
--log-opt max-size=10m --log-opt max-file=5 \
|
||||||
--label 'traefik.enable=true' \
|
--label 'traefik.enable=true' \
|
||||||
--label 'traefik.docker.network=coolify' \
|
--label 'traefik.docker.network=coolify' \
|
||||||
--label 'traefik.http.middlewares.redirect-to-https.redirectscheme.scheme=https' \
|
--label 'traefik.http.middlewares.redirect-to-https.redirectscheme.scheme=https' \
|
||||||
|
--label 'traefik.http.middlewares.fleetmcp-ratelimit.ratelimit.average=30' \
|
||||||
|
--label 'traefik.http.middlewares.fleetmcp-ratelimit.ratelimit.burst=60' \
|
||||||
--label "traefik.http.routers.http-0-fleetmcp.entryPoints=http" \
|
--label "traefik.http.routers.http-0-fleetmcp.entryPoints=http" \
|
||||||
--label "traefik.http.routers.http-0-fleetmcp.middlewares=redirect-to-https" \
|
--label "traefik.http.routers.http-0-fleetmcp.middlewares=redirect-to-https" \
|
||||||
--label "traefik.http.routers.http-0-fleetmcp.rule=Host(\`${HOST_DOMAIN}\`)" \
|
--label "traefik.http.routers.http-0-fleetmcp.rule=${RULE}" \
|
||||||
--label "traefik.http.routers.https-0-fleetmcp.entryPoints=https" \
|
--label "traefik.http.routers.https-0-fleetmcp.entryPoints=https" \
|
||||||
--label "traefik.http.routers.https-0-fleetmcp.rule=Host(\`${HOST_DOMAIN}\`)" \
|
--label "traefik.http.routers.https-0-fleetmcp.rule=${RULE}" \
|
||||||
|
--label "traefik.http.routers.https-0-fleetmcp.middlewares=fleetmcp-ratelimit" \
|
||||||
--label "traefik.http.routers.https-0-fleetmcp.tls=true" \
|
--label "traefik.http.routers.https-0-fleetmcp.tls=true" \
|
||||||
--label "traefik.http.routers.https-0-fleetmcp.tls.certresolver=letsencrypt" \
|
--label "traefik.http.routers.https-0-fleetmcp.tls.certresolver=letsencrypt" \
|
||||||
--label "traefik.http.services.fleetmcp.loadbalancer.server.port=${PORT}" \
|
--label "traefik.http.services.fleetmcp.loadbalancer.server.port=${PORT}" \
|
||||||
|
|
|
||||||
2
pgbouncer/.gitignore
vendored
2
pgbouncer/.gitignore
vendored
|
|
@ -1,2 +0,0 @@
|
||||||
# The real userlist holds the pgbouncer_auth verifier — never commit it.
|
|
||||||
userlist.txt
|
|
||||||
|
|
@ -1,111 +0,0 @@
|
||||||
# PgBouncer for `timescale_db` (connection pooling)
|
|
||||||
|
|
||||||
> **Scope note:** this is **stack-wide infrastructure**, shared by every service that
|
|
||||||
> talks to `timescale_db` — it is only *parked* in the analytics-MCP repo because that
|
|
||||||
> is where the "too many connections" investigation happened. It arguably belongs in
|
|
||||||
> the backend/ingestion repo (`tracksolid_timescale_grafana_prod`). Move it there when
|
|
||||||
> convenient.
|
|
||||||
|
|
||||||
## Why
|
|
||||||
|
|
||||||
The DB runs at `max_connections = 100`. About nine services each keep a persistent
|
|
||||||
pool open — and several connect as the **`postgres` superuser**, holding connections
|
|
||||||
**idle for hours**. When those pools fill under load simultaneously, the sum crosses
|
|
||||||
~97 and new connections fail with `FATAL: sorry, too many clients already`.
|
|
||||||
|
|
||||||
PgBouncer fixes this structurally: clients connect to PgBouncer (cheap, thousands
|
|
||||||
allowed), and it multiplexes them onto a **small, fixed set** of real backend
|
|
||||||
connections. The DB's connection count then depends on the pool size you choose, not
|
|
||||||
on how many app pools exist.
|
|
||||||
|
|
||||||
```
|
|
||||||
9 app pools ──▶ PgBouncer :6432 ──▶ ≤25 real backends ──▶ timescale_db :5432
|
|
||||||
(hundreds) (transaction mode)
|
|
||||||
```
|
|
||||||
|
|
||||||
## Files
|
|
||||||
|
|
||||||
| File | Purpose |
|
|
||||||
|---|---|
|
|
||||||
| `pgbouncer.ini` | pooling + auth config (transaction mode, `auth_query`) |
|
|
||||||
| `auth_setup.sql` | creates `pgbouncer_auth` + `pgbouncer.user_lookup()` on the DB |
|
|
||||||
| `userlist.txt.example` | how to generate the real (gitignored) `userlist.txt` |
|
|
||||||
| `docker-compose.yml` | the PgBouncer service (join the DB network) |
|
|
||||||
|
|
||||||
## Deploy (once)
|
|
||||||
|
|
||||||
```bash
|
|
||||||
# 0) on the host, generate a password for the auth role
|
|
||||||
( umask 077; openssl rand -hex 24 > ~/.pgbouncer_auth.pw )
|
|
||||||
|
|
||||||
# 1) create the auth role + lookup function (as postgres superuser)
|
|
||||||
DB=$(docker ps --filter name=timescale_db --format '{{.Names}}' | head -1)
|
|
||||||
docker exec -i "$DB" psql -U postgres -d tracksolid_db -v ON_ERROR_STOP=1 \
|
|
||||||
-v pgb_pw="$(cat ~/.pgbouncer_auth.pw)" < pgbouncer/auth_setup.sql
|
|
||||||
|
|
||||||
# 2) build userlist.txt from the stored verifier (formats always match this way)
|
|
||||||
docker exec -i "$DB" psql -U postgres -d tracksolid_db -tAc \
|
|
||||||
"SELECT '\"pgbouncer_auth\" \"' || passwd || '\"' \
|
|
||||||
FROM pg_shadow WHERE usename='pgbouncer_auth'" > pgbouncer/userlist.txt
|
|
||||||
|
|
||||||
# 3) set the real DB network name in docker-compose.yml (networks.dbnet.name), then:
|
|
||||||
docker compose -f pgbouncer/docker-compose.yml up -d
|
|
||||||
```
|
|
||||||
|
|
||||||
## Cut services over (incrementally)
|
|
||||||
|
|
||||||
Repoint each app's `DATABASE_URL` host/port from `timescale_db:5432` to
|
|
||||||
`pgbouncer:6432` — **same** dbname, user, and password — and redeploy it.
|
|
||||||
|
|
||||||
**Migrate the superuser app pools first** (`webhook_receiver`, `ingest_worker`,
|
|
||||||
`dashboard_api` backend, `worker`/`cron`/`gateway`) — they are the heaviest
|
|
||||||
consumers. Do them one at a time and watch `SHOW POOLS;` (below).
|
|
||||||
|
|
||||||
## ⚠️ Transaction-pooling caveats — read before cutting over
|
|
||||||
|
|
||||||
`pool_mode = transaction` returns the backend to the pool at every COMMIT/ROLLBACK,
|
|
||||||
so **session-scoped features don't survive across transactions**:
|
|
||||||
|
|
||||||
- **Server-side prepared statements** — the app must not rely on them, or set the
|
|
||||||
driver to not cache them (e.g. asyncpg `statement_cache_size=0`; libpq simple
|
|
||||||
query / psycopg2 default is fine). PgBouncer ≥1.21 supports prepared statements in
|
|
||||||
transaction mode if you set `max_prepared_statements > 0` — enable that if an app
|
|
||||||
needs them.
|
|
||||||
- **`SET`/`RESET` that must persist between transactions**, session `LISTEN/NOTIFY`,
|
|
||||||
advisory locks held across transactions, `WITH HOLD` cursors, session temp tables.
|
|
||||||
- **Per-connection `options` startup GUCs are ignored** (see `ignore_startup_parameters`).
|
|
||||||
Apps that set GUCs via the `options=` DSN param must instead pin them at the **role**
|
|
||||||
level: `ALTER ROLE <app> SET statement_timeout = '...';` etc.
|
|
||||||
|
|
||||||
### The analytics MCP specifically
|
|
||||||
|
|
||||||
The MCP sends `options=-c default_transaction_read_only=on -c statement_timeout=30000`
|
|
||||||
on its DSN and calls `set_session(readonly=True)`. Behind transaction pooling:
|
|
||||||
|
|
||||||
- The `options` GUCs are dropped — **but** `analytics_ro` already has
|
|
||||||
`default_transaction_read_only=on` and `statement_timeout=30s` pinned at the role
|
|
||||||
level (`scripts/analytics_ro_role.sql`), so read-only enforcement is preserved.
|
|
||||||
- `set_session(readonly=True)` issues a `SET` that can leak across pooled clients.
|
|
||||||
Before pointing the MCP at PgBouncer, either drop that call (role default covers it)
|
|
||||||
or run the **MCP only in `session` pooling** (add a second `[databases]` alias with
|
|
||||||
`pool_mode=session`). Given the MCP is a *minor* consumer, the simplest path is to
|
|
||||||
**leave the MCP connecting directly** and pool only the heavy superuser apps.
|
|
||||||
|
|
||||||
## Operating
|
|
||||||
|
|
||||||
```bash
|
|
||||||
# admin console
|
|
||||||
docker exec -it pgbouncer psql -h 127.0.0.1 -p 6432 -U pgbouncer_auth pgbouncer
|
|
||||||
# SHOW POOLS; -- cl_active / sv_active / waiting per pool
|
|
||||||
# SHOW CLIENTS; -- connected clients
|
|
||||||
# SHOW STATS; -- throughput
|
|
||||||
|
|
||||||
# sanity: confirm the DB now sees a small, stable backend count
|
|
||||||
docker exec -i "$DB" psql -U postgres -d tracksolid_db -c \
|
|
||||||
"SELECT usename, count(*) FROM pg_stat_activity GROUP BY 1 ORDER BY 2 DESC;"
|
|
||||||
```
|
|
||||||
|
|
||||||
**Sizing rule:** total backends PgBouncer opens = `Σ(default_pool_size per database) +
|
|
||||||
reserve_pool_size`. Keep that **well under** `max_connections` (100), leaving headroom
|
|
||||||
for superuser/admin/background-worker connections that bypass PgBouncer. The shipped
|
|
||||||
config (20 + 5 reserve, one database) tops out at ~25 backends.
|
|
||||||
|
|
@ -1,40 +0,0 @@
|
||||||
-- auth_setup.sql — create the PgBouncer auth_query plumbing on tracksolid_db.
|
|
||||||
-- ─────────────────────────────────────────────────────────────────────────────
|
|
||||||
-- Run ONCE as the postgres SUPERUSER (the SECURITY DEFINER function must be owned by
|
|
||||||
-- a superuser to read pg_shadow). Apply with:
|
|
||||||
-- docker exec -i <timescale_db> psql -U postgres -d tracksolid_db \
|
|
||||||
-- -v ON_ERROR_STOP=1 -v pgb_pw="$(cat ~/.pgbouncer_auth.pw)" < auth_setup.sql
|
|
||||||
--
|
|
||||||
-- This lets PgBouncer authenticate ANY app user by looking its stored SCRAM verifier
|
|
||||||
-- up at connect time — so you never hand-maintain a userlist of every app password.
|
|
||||||
-- Only the pgbouncer_auth role itself needs an entry in userlist.txt.
|
|
||||||
|
|
||||||
\set ON_ERROR_STOP on
|
|
||||||
|
|
||||||
-- 1) A minimal LOGIN role PgBouncer uses to run the lookup. No other privileges.
|
|
||||||
DO $$
|
|
||||||
BEGIN
|
|
||||||
IF NOT EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'pgbouncer_auth') THEN
|
|
||||||
CREATE ROLE pgbouncer_auth LOGIN NOSUPERUSER NOCREATEDB NOCREATEROLE;
|
|
||||||
END IF;
|
|
||||||
END $$;
|
|
||||||
ALTER ROLE pgbouncer_auth WITH LOGIN PASSWORD :'pgb_pw';
|
|
||||||
|
|
||||||
-- 2) The lookup function. SECURITY DEFINER (owned by postgres) so it can read
|
|
||||||
-- pg_shadow; returns exactly (username, scram_verifier) for one user.
|
|
||||||
CREATE SCHEMA IF NOT EXISTS pgbouncer AUTHORIZATION postgres;
|
|
||||||
|
|
||||||
CREATE OR REPLACE FUNCTION pgbouncer.user_lookup(
|
|
||||||
IN i_username text, OUT uname text, OUT phash text
|
|
||||||
) RETURNS record
|
|
||||||
LANGUAGE sql
|
|
||||||
SECURITY DEFINER
|
|
||||||
SET search_path = pg_catalog
|
|
||||||
AS $$
|
|
||||||
SELECT usename, passwd FROM pg_catalog.pg_shadow WHERE usename = i_username;
|
|
||||||
$$;
|
|
||||||
|
|
||||||
-- 3) Lock the function down to ONLY pgbouncer_auth.
|
|
||||||
REVOKE ALL ON FUNCTION pgbouncer.user_lookup(text) FROM public;
|
|
||||||
GRANT USAGE ON SCHEMA pgbouncer TO pgbouncer_auth;
|
|
||||||
GRANT EXECUTE ON FUNCTION pgbouncer.user_lookup(text) TO pgbouncer_auth;
|
|
||||||
|
|
@ -1,40 +0,0 @@
|
||||||
# docker-compose.yml — PgBouncer in front of timescale_db.
|
|
||||||
# ─────────────────────────────────────────────────────────────────────────────
|
|
||||||
# Deploy:
|
|
||||||
# 1. Apply auth_setup.sql to the DB as postgres (creates pgbouncer_auth + lookup fn).
|
|
||||||
# 2. Generate pgbouncer/userlist.txt (see userlist.txt.example).
|
|
||||||
# 3. Put this stack on the SAME docker network as timescale_db so `timescale_db`
|
|
||||||
# resolves (the tracksolid stack's network — the one with the 10.0.15.x addrs).
|
|
||||||
# 4. `docker compose -f pgbouncer/docker-compose.yml up -d`
|
|
||||||
# 5. Repoint each app's DSN host:port from timescale_db:5432 → pgbouncer:6432
|
|
||||||
# (same dbname/user/password) and redeploy it. Migrate the SUPERUSER app pools
|
|
||||||
# first — they are the heaviest consumers.
|
|
||||||
services:
|
|
||||||
pgbouncer:
|
|
||||||
image: edoburu/pgbouncer:latest # pin to a digest/tag in prod
|
|
||||||
container_name: pgbouncer
|
|
||||||
restart: unless-stopped
|
|
||||||
networks: [dbnet]
|
|
||||||
ports:
|
|
||||||
- "6432:6432" # drop this if only in-network apps connect
|
|
||||||
volumes:
|
|
||||||
- ./pgbouncer.ini:/etc/pgbouncer/pgbouncer.ini:ro
|
|
||||||
- ./userlist.txt:/etc/pgbouncer/userlist.txt:ro
|
|
||||||
logging:
|
|
||||||
driver: json-file
|
|
||||||
options: { max-size: "10m", max-file: "5" }
|
|
||||||
healthcheck:
|
|
||||||
# `SHOW VERSION` on the admin console proves PgBouncer is accepting connections.
|
|
||||||
test: ["CMD-SHELL", "psql -h 127.0.0.1 -p 6432 -U pgbouncer_auth pgbouncer -tAc 'SHOW VERSION' || exit 1"]
|
|
||||||
interval: 30s
|
|
||||||
timeout: 3s
|
|
||||||
retries: 3
|
|
||||||
start_period: 10s
|
|
||||||
|
|
||||||
networks:
|
|
||||||
# Attach to the EXISTING network that can reach timescale_db (external = pre-created
|
|
||||||
# by the tracksolid/Coolify stack). Set the real name here, e.g. the network shown by
|
|
||||||
# docker inspect timescale_db --format '{{range $k,$v := .NetworkSettings.Networks}}{{$k}}{{"\n"}}{{end}}'
|
|
||||||
dbnet:
|
|
||||||
external: true
|
|
||||||
name: CHANGE_ME_tracksolid_db_network
|
|
||||||
|
|
@ -1,64 +0,0 @@
|
||||||
; pgbouncer.ini — transaction-pooling front for timescale_db (tracksolid_db).
|
|
||||||
; ─────────────────────────────────────────────────────────────────────────────
|
|
||||||
; Purpose: the DB runs at max_connections=100 and ~9 stack services each hold a
|
|
||||||
; persistent pool (several as the postgres superuser, idle for hours), so peaks hit
|
|
||||||
; "too many connections". PgBouncer multiplexes MANY client connections onto a SMALL
|
|
||||||
; set of real backend connections, so the DB connection count stays bounded no matter
|
|
||||||
; how many app pools exist.
|
|
||||||
;
|
|
||||||
; Auth uses auth_query (NOT a hand-maintained userlist of every app): PgBouncer logs
|
|
||||||
; in as `pgbouncer_auth` and looks each user's verifier up via pgbouncer.user_lookup()
|
|
||||||
; — see auth_setup.sql. Only the pgbouncer_auth verifier lives in userlist.txt.
|
|
||||||
|
|
||||||
[databases]
|
|
||||||
; Apps point their DSN host at pgbouncer:6432 with the SAME dbname/user/password.
|
|
||||||
; `host` here is the real DB (the timescale_db container hostname on the DB network).
|
|
||||||
tracksolid_db = host=timescale_db port=5432 dbname=tracksolid_db
|
|
||||||
|
|
||||||
[pgbouncer]
|
|
||||||
listen_addr = 0.0.0.0
|
|
||||||
listen_port = 6432
|
|
||||||
|
|
||||||
; ── Auth (pass-through via auth_query) ──────────────────────────────────────
|
|
||||||
auth_type = scram-sha-256
|
|
||||||
auth_file = /etc/pgbouncer/userlist.txt
|
|
||||||
auth_user = pgbouncer_auth
|
|
||||||
auth_query = SELECT uname, phash FROM pgbouncer.user_lookup($1)
|
|
||||||
|
|
||||||
; ── Pooling ─────────────────────────────────────────────────────────────────
|
|
||||||
; transaction mode = a server connection is returned to the pool at COMMIT/ROLLBACK,
|
|
||||||
; so a handful of backends serve hundreds of clients. See README for the feature
|
|
||||||
; caveats (no session-level prepared statements / SET that persists across txns).
|
|
||||||
pool_mode = transaction
|
|
||||||
|
|
||||||
; Total backend connections PgBouncer will ever open to the DB =
|
|
||||||
; (number of [databases] entries) × default_pool_size + reserve_pool_size.
|
|
||||||
; Keep the SUM across all poolers well under the DB's max_connections (100).
|
|
||||||
; With one database and 20 + 5 reserve = 25 backends max — leaving headroom for
|
|
||||||
; superuser/admin/background-worker connections that bypass PgBouncer.
|
|
||||||
default_pool_size = 20
|
|
||||||
min_pool_size = 0
|
|
||||||
reserve_pool_size = 5
|
|
||||||
reserve_pool_timeout = 3
|
|
||||||
|
|
||||||
; Clients can be plentiful (cheap) — only backends are scarce.
|
|
||||||
max_client_conn = 2000
|
|
||||||
|
|
||||||
; Recycle idle/old server connections so none linger for hours.
|
|
||||||
server_idle_timeout = 300
|
|
||||||
server_lifetime = 3600
|
|
||||||
|
|
||||||
; ── Robustness ──────────────────────────────────────────────────────────────
|
|
||||||
; Apps set per-connection GUCs via the `options` startup param (e.g. the analytics
|
|
||||||
; MCP sends `-c default_transaction_read_only=on -c statement_timeout=...`). In
|
|
||||||
; transaction pooling those can't be honored per-shared-backend, so ignore them and
|
|
||||||
; rely on ROLE-level settings (ALTER ROLE ... SET ...) instead. See README.
|
|
||||||
ignore_startup_parameters = extra_float_digits,options,search_path
|
|
||||||
|
|
||||||
; Admin/stats console (psql -p 6432 pgbouncer) — restricted to the auth role.
|
|
||||||
admin_users = pgbouncer_auth
|
|
||||||
stats_users = pgbouncer_auth
|
|
||||||
|
|
||||||
; Quiet by default; flip to 1 temporarily when debugging.
|
|
||||||
log_connections = 0
|
|
||||||
log_disconnections = 0
|
|
||||||
|
|
@ -1,14 +0,0 @@
|
||||||
# userlist.txt — ONLY the pgbouncer_auth role needs an entry; every other user is
|
|
||||||
# resolved at connect time by auth_query (pgbouncer.user_lookup). See README.
|
|
||||||
#
|
|
||||||
# The real userlist.txt is gitignored (it holds a credential). Generate it from the
|
|
||||||
# pgbouncer_auth password you set in auth_setup.sql — PgBouncer accepts the verifier
|
|
||||||
# in SCRAM form. Easiest: copy the stored verifier straight from Postgres so the
|
|
||||||
# formats always match:
|
|
||||||
#
|
|
||||||
# docker exec -i <timescale_db> psql -U postgres -d tracksolid_db -tAc \
|
|
||||||
# "SELECT '\"pgbouncer_auth\" \"' || passwd || '\"' \
|
|
||||||
# FROM pg_shadow WHERE usename='pgbouncer_auth'" > pgbouncer/userlist.txt
|
|
||||||
#
|
|
||||||
# That yields a line of the form (SCRAM-SHA-256 verifier shown abbreviated):
|
|
||||||
"pgbouncer_auth" "SCRAM-SHA-256$4096:....$....:...."
|
|
||||||
|
|
@ -62,3 +62,9 @@ ALTER DEFAULT PRIVILEGES FOR ROLE postgres IN SCHEMA fuel GRANT EX
|
||||||
ALTER ROLE analytics_ro SET default_transaction_read_only = on;
|
ALTER ROLE analytics_ro SET default_transaction_read_only = on;
|
||||||
ALTER ROLE analytics_ro SET statement_timeout = '30s';
|
ALTER ROLE analytics_ro SET statement_timeout = '30s';
|
||||||
ALTER ROLE analytics_ro SET idle_in_transaction_session_timeout = '60s';
|
ALTER ROLE analytics_ro SET idle_in_transaction_session_timeout = '60s';
|
||||||
|
-- Cap idle POOLED connections too (these sit idle but NOT in a transaction, so the
|
||||||
|
-- idle_in_transaction timeout never reaps them). On a shared 100-connection DB this
|
||||||
|
-- returns slots the MCP isn't using. Safe with the server's dead-connection handling:
|
||||||
|
-- a reaped pooled connection is discarded + transparently retried, not surfaced as an
|
||||||
|
-- error. (Requires PostgreSQL 14+.)
|
||||||
|
ALTER ROLE analytics_ro SET idle_session_timeout = '5min';
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue