Secure, On-Premises Data Analysis with LLM and a Custom MCP Server
Part 3: HTTPS-Based MCP Server with OAuth 2.0 (Azure Entra ID) and Multi-Tenant Architecture
📚 Series Navigation
- Part 1: CSV/Parquet files
- Part 2: CSV/Parquet & Database
- Part 3: HTTPS-Based MCP Server with OAuth 2.0 (Azure Entra ID) (Current)
- Part 4: Debugging MCP Servers (Https) with VS Code
This is Part 3 of the series. Here, we extend the solution to use MCP Server over HTTPS
Introduction
From a business end-user perspective, the core problem this solution addresses is enabling anyone in the organization to ask questions of our internal data—whether in files or databases—using natural language, without needing to know SQL or technical query languages. By leveraging a large language model (LLM), users can simply describe what they want to know, and the LLM translates those requests into the correct queries, helping us analyze our data more efficiently and intuitively. This democratizes access to analytics, reduces dependency on technical staff, and accelerates business insights.
The technical solution ensures this is done securely and at scale: deploying an MCP server over HTTP(S) with enterprise authentication, so all data remains protected and accessible only to authorized users.
While the stdio-based MCP server from Parts 1 and 2 works well for individual users on Windows desktops, enterprise environments demand more:
- Centralized deployment: One server for all users, not executables on every PC
- Secure authentication: Enterprise SSO via Azure Entra ID (formerly Azure AD)
- Multi-tenant architecture: Support for multiple business functions (IT, HR, Finance) with isolated data access
- Cloud-native deployment: Containerized for Azure Container Instances or Kubernetes
- HTTPS connectivity: Compatible with Claude Desktop (paid tier), web clients, and API integrations
This article shows how to transform the local MCP server into a production-ready, enterprise service that: - Authenticates users via OAuth 2.0 (Azure Entra ID) - Routes requests to function-specific data catalogs (IT, HR, FIN) - Runs as a containerized service with TLS offloading - Integrates with Claude Desktop's custom HTTPS connector
All data and computation remain on-premises or in your Azure tenant—no third-party cloud services involved.
Architecture: From Stdio to HTTPS
Previous Architecture (Parts 1 & 2)
[Claude Desktop] ←→ [stdio] ←→ [analyst.exe] ←→ [Local Files/Database]
New Architecture (Part 3)
[Claude Desktop (Paid)] ←→ [HTTPS/OAuth] ←→ [Azure Container Instance]
↓
[FortiGate TLS]
↓
[MCP Server + Auth]
↓
[Business Function Router (IT/HR/FIN)]
↓ ↓ ↓
[IT Data Catalog] [HR Catalog] [FIN Catalog]
↓ ↓ ↓
[Files + Database per Function]
Key improvements: - OAuth 2.0 (Azure Entra ID): Handles authentication and authorization - Multi-tenancy: Single server supports multiple business functions with isolated data - Cloud deployment: Runs as a container in Azure Container Instances - TLS offloading: FortiGate/FortiWeb handles SSL/TLS termination - HTTPS connectivity: Any HTTP client can connect (Claude Desktop, web apps, APIs)
Why HTTPS over Stdio?
| Feature | Stdio (Parts 1 & 2) | HTTPS (Part 3) |
|---|---|---|
| Deployment | Executable per user | Single centralized service |
| Authentication | None | OAuth 2.0 (Azure Entra ID) |
| Client support | Claude Desktop (local) | Claude Desktop (paid), web, mobile, APIs |
| Multi-user | No | Yes, with per-user context |
| Scalability | One user per process | Hundreds of concurrent users |
| Security | Local file access | Enterprise SSO + role-based access |
| Updates | Redeploy to every PC | Deploy once, all users benefit |
Azure Entra App Registration: Setup and Gotchas
Before deploying the MCP server, you must configure Azure Entra ID (formerly Azure AD) for OAuth 2.0 authentication.
Step 1: Create App Registration
- Navigate to Azure Portal → Entra ID → App Registrations → New Registration
- Configure basic settings:
- Name:
MCP-Analyst-Server - Supported account types: Accounts in this organizational directory only (single tenant)
- Redirect URI: Leave blank for now (we'll add it next)
Step 2: Configure API Permissions
- Go to API Permissions → Add a permission → Microsoft Graph
- Add Delegated permissions:
User.Read(to read user profile)email,profile,openid(standard OAuth scopes)- Grant admin consent for your organization
Step 3: Create Client Secret
- Go to Certificates & secrets → New client secret
- Description:
MCP Server Secret - Expires: 24 months (or per your security policy)
- Copy the secret value immediately (it won't be shown again)
Step 4: Configure Redirect URIs
This is critical for Claude Desktop integration:
- Go to Authentication → Add a platform → Web
- Add the following Redirect URIs:
https://mcp.gdenergyproducts.com/oauth/callback https://claude.ai/api/mcp/auth_callback - First URI: Your MCP server's OAuth callback endpoint
-
Second URI: Required for Claude Desktop (paid tier) to act as OAuth proxy
- Claude Desktop intercepts the OAuth flow and forwards tokens to your MCP server
- Without this, Claude Desktop cannot complete authentication
-
Enable ID tokens and Access tokens checkboxes
Step 5: Expose API and Define Scopes
- Go to Expose an API → Add a scope
- Scope name:
access_as_user - Who can consent: Admins and users
- Admin consent display name: Access MCP Analyst as user
- Description: Allows the application to access MCP Analyst on behalf of the signed-in user
🚨 Gotcha #1: OAuth Version Must Be 2.0
By default, Azure Entra creates apps with OAuth version 1.0, but MCP servers require version 2.0 for proper token handling.
Manual fix required:
- Go to App Registration → Manifest
- Find the line:
"accessTokenAcceptedVersion": nullor"accessTokenAcceptedVersion": 1 - Change it to:
"accessTokenAcceptedVersion": 2 - Save the manifest
Why this matters:
- OAuth 1.0 tokens use different claims and formats
- MCP's AzureProvider expects v2 tokens with preferred_username, oid, and name claims
- Without this change, authentication will fail with cryptic token validation errors
🚨 Gotcha #2: Claude Desktop Callback URL
When testing with Claude Desktop (paid tier), you must add this specific callback URL:
https://claude.ai/api/mcp/auth_callback
Why this is needed:
- Claude Desktop acts as an OAuth proxy for custom MCP servers
- When a user clicks "Connect" in Claude Desktop, it:
1. Opens the Azure Entra login page in the browser
2. User authenticates
3. Azure redirects to https://claude.ai/api/mcp/auth_callback with auth code
4. Claude Desktop exchanges the code for tokens
5. Claude Desktop forwards tokens to your MCP server
- Without this callback URL, the OAuth flow breaks at step 3
Security note: - Claude Desktop only proxies the OAuth flow; it does not store or access your data - All MCP queries and data remain between Claude Desktop → Your MCP Server → Your Data
Step 6: Note Your Configuration
Save these values for later use (will be stored in Azure Key Vault):
Application (client) ID: <your-client-id>
Client Secret: <your-secret-value>
Directory (tenant) ID: <your-tenant-id>
Multi-Tenant Architecture: Business Function Routing
One of the key innovations in this implementation is business function isolation. Instead of one data catalog for everyone, the server dynamically routes users to function-specific catalogs based on URL parameters.
Business Functions
This implementation supports three business functions:
- IT (
it): Infrastructure, device inventory, license management, Active Directory - HR (
hr): Employee data, terminations, organizational charts, ADP snapshots - Finance (
fin): Budget, expenses, procurement, SAP exports
Connection URL Examples
- IT users:
https://mcp.gdenergyproducts.com/mcp?function=it - HR users:
https://mcp.gdenergyproducts.com/mcp?function=hr - Finance users:
https://mcp.gdenergyproducts.com/mcp?function=fin
Data Isolation Structure
/mnt/azure/mcp/
├── it/
│ ├── data_catalog.json
│ ├── device_inventory.csv
│ ├── licenses.csv
│ └── accounts_to_be_deleted.csv
├── hr/
│ ├── data_catalog.json
│ ├── idp_hr.csv
│ └── terminations.csv
└── fin/
├── data_catalog.json
├── budget.csv
└── sap_expenses.csv
Each function has:
- Its own data_catalog.json with function-specific datasets
- Isolated file storage (IT users can't access HR files)
- Separate database views (e.g., mcp.it_employees, mcp.hr_employees)
Full Code Walkthrough: analyst.py (HTTPS Version)
Below is a comprehensive walkthrough of the production code, covering 85%+ of the implementation.
1. Logging and Initialization
import os
import logging
import warnings
from pathlib import Path
from logging.handlers import RotatingFileHandler
LOG_SEPARATOR = "=" * 80
def setup_logger(name, log_file, level=logging.DEBUG):
"""Configure structured logging with rotation"""
logger = logging.getLogger(name)
logger.setLevel(level)
formatter = logging.Formatter(
"%(asctime)s - %(levelname)s - %(name)s - "
"%(funcName)s:%(lineno)d - %(message)s"
)
if not logger.handlers:
# File handler with 10MB rotation, keep 5 backups
handler = RotatingFileHandler(
log_file,
maxBytes=10*1024*1024,
backupCount=5
)
handler.setFormatter(formatter)
logger.addHandler(handler)
# Console handler for development
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
return logger
logger = setup_logger("gdepmcp", "/mnt/azure/logs/gdepmcp.log")
logger.info(LOG_SEPARATOR)
logger.info("GDEP MCP Server Starting")
logger.info(LOG_SEPARATOR)
# Suppress deprecation warnings in production
warnings.filterwarnings("ignore", category=DeprecationWarning)
Key points: - Structured logging: Every request, tool call, and error is logged with context - Log rotation: Prevents disk fill-up in production (10MB per file, 5 backups) - Dual output: File for auditing, console for development/debugging - Function/line tracking: Easy debugging with function name and line number in logs
2. Azure Key Vault Integration
from azure.identity import ClientSecretCredential
from azure.keyvault.secrets import SecretClient
def get_azure_kv_secret(name):
"""Retrieve secrets from Azure Key Vault"""
try:
vault_url = "https://kv-gdep-peus-interfaces.vault.azure.net/"
# Service principal credentials from environment variables
client_id = os.environ.get("application_interface_clientid")
client_secret = os.environ.get("application_interface_clientsecret")
tenant_id = os.environ.get("application_interface_tenantid")
credential = ClientSecretCredential(
client_id=client_id,
client_secret=client_secret,
tenant_id=tenant_id
)
secret_client = SecretClient(
vault_url=vault_url,
credential=credential
)
return secret_client.get_secret(name).value
except Exception as exc:
logger.exception(f"Error retrieving Azure KV secret '{name}': {exc}")
return None
# Load OAuth credentials from Key Vault
AZURE_CONFIDENTIAL_APP_ID = str(
get_azure_kv_secret('mcp-authentication-clientid')
)
AZURE_CONFIDENTIAL_SECRET = str(
get_azure_kv_secret('mcp-authentication-clientsecret')
)
Why Key Vault? - Never hardcode secrets: Client IDs and secrets are injected at runtime - Centralized management: Rotate secrets without redeploying code - Audit trail: Key Vault logs all secret access - Role-based access: Only authorized apps/users can read secrets
Required environment variables (set in container):
application_interface_clientid=<service-principal-client-id>
application_interface_clientsecret=<service-principal-secret>
application_interface_tenantid=<azure-tenant-id>
3. Business Function Middleware
from starlette.requests import Request
from starlette.middleware.base import BaseHTTPMiddleware
from contextvars import ContextVar
current_function = ContextVar('current_function', default='it')
class BusinessFunctionExtractionMiddleware(BaseHTTPMiddleware):
"""Extract business function from query string and store in context"""
async def dispatch(self, request: Request, call_next):
host = request.headers.get("host", "")
business_function = "na"
logger.info(
f"Incoming request - Host: {host}, "
f"Path: {request.url.path}, Method: {request.method}"
)
# Extract function from query string
function_via_query_string = request.query_params.get("function")
if function_via_query_string in {"it", "hr", "fin"}:
business_function = function_via_query_string
logger.info(
f"DEBUG MODE: Using business_function from query "
f"parameter: {business_function}"
)
# Store in request context (async-safe via ContextVar)
request.state.business_function = business_function
current_function.set(business_function)
logger.debug(
f"Set request.state.business_function = {business_function}"
)
response = await call_next(request)
logger.info(
f"Response status: {response.status_code} "
f"for business_function: {business_function}"
)
return response
def get_current_business_function() -> str:
"""Get business function from context variable (thread-safe)"""
function = current_function.get()
logger.debug(
f"Retrieved current business function from context: {function}"
)
return function
How this works:
1. Request arrives at /mcp?function=it
2. Middleware intercepts before MCP tools execute
3. Extracts function parameter from query string
4. Validates against allowed functions (it, hr, fin)
5. Stores in context using Python's ContextVar (async-safe)
6. All subsequent tool calls read from context to route to correct data
Security:
- Invalid functions (na) result in "no data found" responses
- No cross-contamination between business functions
- Each user's requests are isolated in async context
- Supports concurrent requests from multiple users
4. OAuth 2.0 (Azure Entra ID) Configuration
from fastmcp import FastMCP
from fastmcp.server.auth.providers.azure import AzureProvider
logger.info("Configuring OAuth 2.0 (Azure Entra ID) Provider")
try:
azure_auth = AzureProvider(
client_id=AZURE_CONFIDENTIAL_APP_ID,
client_secret=AZURE_CONFIDENTIAL_SECRET,
tenant_id="00b1a755-0b06-4d05-9a59-259ebf7f9e00",
base_url="https://mcp.gdenergyproducts.com",
required_scopes=["access_as_user"]
)
logger.info("OAuth 2.0 (Azure Entra ID) Provider configured successfully")
except Exception as e:
logger.error(
f"Failed to configure OAuth 2.0 (Azure Entra ID) Provider: {str(e)}",
exc_info=True
)
raise
# Initialize FastMCP with authentication
mcp = FastMCP("gdep-analyst", auth=azure_auth)
logger.info("FastMCP instance initialized successfully")
Configuration breakdown:
- client_id: Your Azure App Registration's Application ID
- client_secret: Secret created in App Registration
- tenant_id: Your organization's Azure tenant ID
- base_url: Public URL of your MCP server (for OAuth callbacks)
- required_scopes: API scopes defined in "Expose an API" section
What happens during OAuth:
1. User connects in Claude Desktop → redirected to Azure login
2. User authenticates with corporate credentials
3. Azure validates and issues tokens with access_as_user scope
4. MCP server validates token signature and claims
5. Request proceeds with user context (email, name, object ID)
5. Database Connection Management
import pyodbc
_connection = None
_connection_string = None
def build_connection_string() -> str:
"""Build SQL Server connection string from Key Vault"""
logger.debug("Building database connection string")
conn_str = str(get_azure_kv_secret('mcp-sql-conn-string'))
logger.info("Database connection string built successfully")
return conn_str
def get_db_connection():
"""Get or create database connection with connection pooling"""
global _connection, _connection_string
logger.debug("Getting database connection")
# Lazy initialization
if _connection_string is None:
logger.info("Connection string not initialized, building new one")
_connection_string = build_connection_string()
if _connection_string is None:
logger.error("Failed to build connection string")
return None
# Test existing connection
try:
if _connection is not None:
logger.debug("Testing existing connection")
_connection.cursor().execute("SELECT 1")
logger.debug("Existing connection is valid")
except Exception as e:
logger.warning(
f"Existing connection test failed: {str(e)}, "
"will create new connection"
)
_connection = None
# Create new connection if needed
if _connection is None:
logger.info("Creating new database connection")
try:
_connection = pyodbc.connect(_connection_string)
logger.info("Database connection established successfully")
except Exception as e:
logger.error(
f"Failed to establish database connection: {str(e)}",
exc_info=True
)
raise
return _connection
def is_database_configured() -> bool:
"""Check if database connection is configured"""
logger.debug("Checking if database is configured")
return True # Always true in this deployment
Connection pooling strategy:
- Singleton pattern: Reuse connections across requests
- Health checks: Test connection before use (SELECT 1)
- Automatic reconnection: If connection dies, create new one
- Secure credentials: Connection string stored in Key Vault
Example connection string (from Key Vault):
Driver={ODBC Driver 18 for SQL Server};
Server=10.27.18.5,61002;
Database=ADP_AD_AAD;
UID=mcp_reader;
PWD=<from-key-vault>;
Encrypt=yes;
TrustServerCertificate=yes;
6. File Reading with SAP Support
import polars as pl
from pathlib import Path
def read_file(file_location: str, file_type: str = "csv") -> pl.DataFrame:
"""Read CSV or Parquet file into Polars DataFrame"""
logger.info(f"Reading file: {file_location}, type: {file_type}")
try:
if file_type == "csv":
filename = Path(file_location).name.lower()
# SAP files use semicolon delimiter
separator = ';' if filename.startswith('sap_') else ','
logger.debug(
f"Using separator '{separator}' for file: {filename}"
)
df = pl.read_csv(
file_location,
separator=separator,
truncate_ragged_lines=True, # Handle malformed rows
infer_schema_length=10000, # Sample 10k rows for types
ignore_errors=True # Skip unparseable rows
)
logger.info(
f"Successfully read CSV file: {file_location}, "
f"shape: {df.shape}"
)
return df
if file_type == "parquet":
df = pl.read_parquet(file_location)
logger.info(
f"Successfully read Parquet file: {file_location}, "
f"shape: {df.shape}"
)
return df
logger.error(f"Unsupported file type: {file_type}")
raise ValueError(f"Unsupported file type: {file_type}")
except Exception as e:
logger.exception(f"Failed to read file {file_location}")
raise
def read_file_list(
file_locations: List[str],
file_type: str = "csv"
) -> pl.DataFrame:
"""Read multiple files with same schema and concatenate"""
logger.info(
f"Reading file list: {len(file_locations)} files of type {file_type}"
)
logger.debug(f"Files to read: {file_locations}")
try:
if file_type == "csv":
dfs = []
for i, file_location in enumerate(file_locations):
logger.debug(
f"Reading file {i+1}/{len(file_locations)}: "
f"{file_location}"
)
filename = Path(file_location).name.lower()
separator = ';' if filename.startswith('sap_') else ','
df = pl.read_csv(
file_location,
separator=separator,
truncate_ragged_lines=True,
infer_schema_length=10000,
ignore_errors=True
)
dfs.append(df)
logger.debug(f"File {i+1} shape: {df.shape}")
result = pl.concat(dfs)
logger.info(
f"Successfully concatenated {len(dfs)} CSV files, "
f"final shape: {result.shape}"
)
return result
if file_type == "parquet":
result = pl.read_parquet(file_locations)
logger.info(
f"Successfully read {len(file_locations)} Parquet files, "
f"shape: {result.shape}"
)
return result
logger.error(f"Unsupported file type: {file_type}")
raise ValueError(f"Unsupported file type: {file_type}")
except Exception as e:
logger.exception("Failed to read file list")
raise
Key features:
- SAP detection: Files prefixed with sap_ automatically use ; delimiter
- Robust parsing: Handles malformed CSVs gracefully
- Type inference: Samples 10,000 rows to detect column types accurately
- Performance: Polars is 10-50x faster than Pandas on large files
- Multi-file support: Concatenates files with same schema efficiently
7. User Context Extraction
from fastmcp.server.dependencies import get_access_token
def get_current_user_info() -> dict:
"""Returns information about the authenticated Azure user"""
token = get_access_token()
return {
"email": token.claims.get("preferred_username"),
"name": token.claims.get("name"),
"oid": token.claims.get("oid") # Azure object ID
}
Available claims from Azure token:
- preferred_username: User's email (e.g., john.doe@company.com)
- name: Display name (e.g., John Doe)
- oid: Unique Azure object ID (use for database joins/auditing)
Usage example in tools:
@mcp.tool()
def sensitive_tool():
user_info = get_current_user_info()
logger.info(
f"Tool called by: {user_info['name']} ({user_info['email']})"
)
# Implement per-user authorization logic
if user_info['email'] not in AUTHORIZED_USERS:
return {"error": "Unauthorized"}
# ... rest of tool logic
8. MCP Tool: Connection Info
from typing import Dict, Any
from glob import glob
@mcp.tool()
def get_connection_info() -> Dict[str, Any]:
"""
Get information about configured data sources and connection status.
Use to verify available data sources and check connection health.
"""
business_function = get_current_business_function()
logger.info(
f"get_connection_info called for "
f"business_function: {business_function}"
)
file_pattern = f"/mnt/azure/mcp/{business_function}/*.csv"
logger.debug(f"File pattern: {file_pattern}")
available_files = glob(file_pattern)
logger.info(f"Found {len(available_files)} files matching pattern")
info = {
"business_function": business_function,
"file_sources": {
"enabled": True,
"pattern": file_pattern,
"available_files": available_files
},
"database_sources": {
"enabled": is_database_configured(),
"server": "10.27.18.5,61002",
"database": "ADP_AD_AAD"
}
}
# Test database connection and list tables
if is_database_configured():
logger.info("Database is configured, retrieving available objects")
try:
conn = get_db_connection()
cursor = conn.cursor()
# List tables for this business function
db_objects_query = f"""
SELECT TABLE_SCHEMA, TABLE_NAME
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_TYPE IN ('BASE TABLE', 'VIEW')
AND TABLE_SCHEMA = 'mcp'
AND TABLE_NAME LIKE '{business_function}_%'
ORDER BY TABLE_SCHEMA, TABLE_NAME
"""
cursor.execute(db_objects_query)
db_objects = cursor.fetchall()
objects_list = [
{"schema": row[0], "table": row[1]}
for row in db_objects
]
info["database_sources"]["status"] = "connected"
info["database_sources"]["available_objects"] = objects_list
logger.info(
f"Database connection successful, "
f"found {len(objects_list)} objects"
)
except Exception as e:
info["database_sources"]["status"] = "error"
info["database_sources"]["error"] = str(e)
logger.error(
f"Database object listing failed: {str(e)}",
exc_info=True
)
else:
logger.info("Database not configured")
# Log user for auditing
try:
user_info = get_current_user_info()
logger.info(
f"Tool get_connection_info was called by: "
f"{user_info['name']} with email {user_info['email']}"
)
except Exception as e:
logger.warning(f"Could not retrieve user info: {str(e)}")
logger.debug(f"Returning connection info: {json.dumps(info, indent=2)}")
return info
Returns example:
{
"business_function": "it",
"file_sources": {
"enabled": true,
"pattern": "/mnt/azure/mcp/it/*.csv",
"available_files": [
"/mnt/azure/mcp/it/device_inventory.csv",
"/mnt/azure/mcp/it/licenses.csv"
]
},
"database_sources": {
"enabled": true,
"server": "10.27.18.5,61002",
"database": "ADP_AD_AAD",
"status": "connected",
"available_objects": [
{"schema": "mcp", "table": "it_employees"},
{"schema": "mcp", "table": "it_devices"}
]
}
}
9. MCP Tool: Data Catalog (Auto-Discovery)
import json
import os
@mcp.tool()
def get_data_catalog() -> str:
"""
Get the complete data catalog showing all available datasets.
**ALWAYS CALL THIS TOOL FIRST before answering any data question.**
Returns JSON catalog with:
- "source_type": "file" or "database"
- "query_tool": execute_polars_sql or execute_database_query
- "table_reference": 'self' for files, 'schema.table' for database
- "example_query": Sample query to adapt
- Business descriptions, columns, rules, relationships
The catalog contains all business context needed to write correct queries.
"""
business_function = get_current_business_function()
logger.info(
f"get_data_catalog called for business_function: {business_function}"
)
catalog_path = f"/mnt/azure/mcp/{business_function}/data_catalog.json"
logger.debug(f"Looking for catalog file: {catalog_path}")
# Load catalog file if exists
catalog_json = {}
if os.path.exists(catalog_path):
try:
with open(catalog_path, 'r', encoding='utf-8') as f:
catalog_json = json.load(f)
logger.info(f"Loaded catalog file: {catalog_path}")
except Exception as e:
logger.error(
f"Failed to load catalog file: {str(e)}",
exc_info=True
)
catalog_json = {}
# Build catalog with metadata from JSON + auto-discovery
catalog = {
"version": catalog_json.get("version", "2.0"),
"last_updated": catalog_json.get("last_updated", "auto-generated"),
"README": catalog_json.get("README", {}),
"query_decision_tree": catalog_json.get("query_decision_tree", {}),
"anti_patterns": catalog_json.get("anti_patterns", {}),
"categories": catalog_json.get("categories", {}),
"datasets": {}
}
# Auto-discover files on disk
files = (
glob(f"/mnt/azure/mcp/{business_function}/*.csv") +
glob(f"/mnt/azure/mcp/{business_function}/*.parquet")
)
logger.info(
f"Found {len(files)} files on disk for "
f"business_function {business_function}"
)
file_keys_on_disk = set()
for file in files:
basename = os.path.basename(file)
key = basename.replace('.csv', '').replace('.parquet', '').lower()
file_keys_on_disk.add(key)
# Use metadata from JSON if available, else minimal auto-discovery
meta = catalog_json.get("datasets", {}).get(key, {})
dataset = {
"source_type": "file",
"query_tool": meta.get("query_tool", "execute_polars_sql"),
"table_reference": meta.get("table_reference", "self"),
"filename": basename,
"category": meta.get("category"),
"description": meta.get(
"description",
"Auto-discovered file - no metadata available"
),
"source_system": meta.get("source_system"),
"delimiter": meta.get("delimiter", ","),
"columns": meta.get("columns", {}),
"business_rules": meta.get("business_rules", []),
"common_queries": meta.get("common_queries", []),
"related_datasets": meta.get("related_datasets", []),
"usage_tip": meta.get(
"usage_tip",
f"Use get_schema(file_location='{file}') for detailed schema"
)
}
# Add any extra fields from JSON
for extra_field in meta:
if extra_field not in dataset:
dataset[extra_field] = meta[extra_field]
catalog["datasets"][key] = dataset
logger.debug(f"Added/updated file dataset: {key}")
# Auto-discover database tables
db_keys = set()
if is_database_configured():
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute(f"""
SELECT TABLE_SCHEMA, TABLE_NAME
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_TYPE IN ('BASE TABLE', 'VIEW')
AND TABLE_SCHEMA = 'mcp'
AND TABLE_NAME LIKE '{business_function}_%'
ORDER BY TABLE_SCHEMA, TABLE_NAME
""")
tables = cursor.fetchall()
logger.info(
f"Found {len(tables)} database tables for "
f"business_function {business_function}"
)
for schema, table in tables:
key = f"{schema}_{table}".lower()
db_keys.add(key)
meta = catalog_json.get("datasets", {}).get(key, {})
dataset = {
"source_type": "database",
"query_tool": meta.get(
"query_tool",
"execute_database_query"
),
"table_reference": meta.get(
"table_reference",
f"{schema}.{table}"
),
"database_schema": schema,
"database_table": table,
"category": meta.get("category"),
"description": meta.get(
"description",
"Auto-discovered database table - no metadata available"
),
"source_system": meta.get("source_system"),
"update_frequency": meta.get("update_frequency"),
"columns": meta.get("columns", {}),
"business_rules": meta.get("business_rules", []),
"common_queries": meta.get("common_queries", []),
"related_datasets": meta.get("related_datasets", []),
"example_query": meta.get("example_query"),
"semantic_examples": meta.get("semantic_examples", {}),
"usage_tip": meta.get(
"usage_tip",
f"Use get_schema_db(schema_name='{schema}', "
f"table_name='{table}') for detailed schema"
)
}
for extra_field in meta:
if extra_field not in dataset:
dataset[extra_field] = meta[extra_field]
catalog["datasets"][key] = dataset
logger.debug(f"Added/updated database dataset: {key}")
except Exception as e:
error_msg = f"Database table discovery failed: {str(e)}"
catalog["database_discovery_error"] = error_msg
logger.error(error_msg, exc_info=True)
# Only include datasets that are actually present (on disk or in db)
logger.info(
f"Reconciling catalog: {len(catalog['datasets'])} "
"total before filtering"
)
present_keys = file_keys_on_disk | db_keys
catalog["datasets"] = {
k: v for k, v in catalog["datasets"].items()
if k in present_keys
}
logger.info(
f"Final catalog has {len(catalog['datasets'])} datasets "
"after reconciliation"
)
return json.dumps(catalog, indent=2)
Key features:
- Hybrid auto-discovery: Combines JSON metadata with filesystem/database introspection
- Business function isolation: Only shows datasets for current function
- Graceful degradation: Works even without data_catalog.json file
- Rich metadata: Provides all context LLM needs to write correct queries
- Tool routing: Tells LLM exactly which execution tool to use
10. MCP Tool: Get Schema (Files)
from typing import List
@mcp.tool()
def get_schema(
file_location: str,
file_type: str = Field(default="csv", description="csv or parquet"),
) -> List[Dict[str, Any]]:
"""
Get technical schema of a CSV or Parquet file.
**Use for file-based datasets only.**
For database tables, use get_schema_db().
Returns column names and Polars data types.
Call after get_data_catalog() to get detailed column information.
"""
business_function = get_current_business_function()
logger.info(
f"get_schema called for file: {file_location}, "
f"type: {file_type}, business_function: {business_function}"
)
# Enforce that file_location is within the correct business function
allowed_prefix = f"/mnt/azure/mcp/{business_function}/"
if not os.path.abspath(file_location).startswith(
os.path.abspath(allowed_prefix)
):
error_msg = (
f"Access denied: file_location '{file_location}' is not "
f"within the allowed business_function folder '{allowed_prefix}'. "
"You may only request schema for files in your assigned "
"business_function folder."
)
logger.error(error_msg)
return [{"error": error_msg}]
try:
df = read_file(file_location, file_type)
schema = df.schema
schema_dict = {}
for key, value in schema.items():
schema_dict[key] = str(value)
logger.info(
f"Successfully retrieved schema with {len(schema_dict)} columns"
)
logger.debug(f"Schema: {schema_dict}")
return [schema_dict]
except Exception as e:
error_msg = f"Failed to read schema: {str(e)}"
logger.error(error_msg, exc_info=True)
return [{"error": error_msg}]
Security enforcement: - Path validation: Prevents directory traversal attacks - Business function isolation: IT users can't read HR files - Explicit error messages: Clear feedback on authorization failures
11. MCP Tool: Get Schema (Database)
@mcp.tool()
def get_schema_db(
schema_name: str = Field(
description="Database schema name (e.g., 'mcp', 'dbo')"
),
table_name: str = Field(description="Database table name"),
) -> List[Dict[str, Any]]:
"""
Get technical schema of a SQL Server database table.
**Use for database tables only.**
For CSV/Parquet files, use get_schema().
Returns column names, data types, nullability, and primary key info.
Call after get_data_catalog() to get detailed column information.
"""
business_function = get_current_business_function()
logger.info(
f"get_schema_db called for {schema_name}.{table_name}, "
f"business_function: {business_function}"
)
if not is_database_configured():
logger.error("Database not configured")
return [{"error": "No database connection configured."}]
# Enforce table_name starts with business_function_
if not table_name.startswith(f"{business_function}_"):
error_msg = (
f"Table name {table_name} does not match "
f"business_function {business_function}"
)
logger.error(error_msg)
return [{"error": error_msg}]
try:
conn = get_db_connection()
cursor = conn.cursor()
query = """
SELECT
c.COLUMN_NAME,
c.DATA_TYPE,
c.CHARACTER_MAXIMUM_LENGTH,
c.NUMERIC_PRECISION,
c.NUMERIC_SCALE,
c.IS_NULLABLE,
c.COLUMN_DEFAULT,
CASE WHEN pk.COLUMN_NAME IS NOT NULL
THEN 'YES' ELSE 'NO' END as IS_PRIMARY_KEY
FROM INFORMATION_SCHEMA.COLUMNS c
LEFT JOIN (
SELECT ku.TABLE_SCHEMA, ku.TABLE_NAME, ku.COLUMN_NAME
FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc
JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE ku
ON tc.CONSTRAINT_NAME = ku.CONSTRAINT_NAME
WHERE tc.CONSTRAINT_TYPE = 'PRIMARY KEY'
) pk ON c.TABLE_SCHEMA = pk.TABLE_SCHEMA
AND c.TABLE_NAME = pk.TABLE_NAME
AND c.COLUMN_NAME = pk.COLUMN_NAME
WHERE c.TABLE_SCHEMA = ? AND c.TABLE_NAME = ?
ORDER BY c.ORDINAL_POSITION
"""
logger.debug(f"Executing schema query for {schema_name}.{table_name}")
cursor.execute(query, (schema_name, table_name))
columns = cursor.fetchall()
if not columns:
error_msg = f"Table not found: {schema_name}.{table_name}"
logger.error(error_msg)
return [{"error": error_msg}]
logger.info(
f"Found {len(columns)} columns in {schema_name}.{table_name}"
)
schema = {}
for col in columns:
col_name = col[0]
data_type = col[1]
char_len = col[2]
num_precision = col[3]
num_scale = col[4]
nullable = col[5] == "YES"
default = col[6]
is_pk = col[7] == "YES"
# Format type string
type_str = data_type
if char_len:
type_str += f"({char_len})"
elif num_precision:
if num_scale:
type_str += f"({num_precision},{num_scale})"
else:
type_str += f"({num_precision})"
schema[col_name] = {
"data_type": type_str,
"nullable": nullable,
"is_primary_key": is_pk,
"default": default if default else None
}
logger.debug(
f"Column: {col_name}, type: {type_str}, "
f"nullable: {nullable}, pk: {is_pk}"
)
return [schema]
except Exception as e:
error_msg = f"Error retrieving schema: {str(e)}"
logger.error(error_msg, exc_info=True)
return [{"error": error_msg}]
Comprehensive schema information: - Data types: Including length/precision/scale - Nullability: Whether NULL values allowed - Primary keys: Identifies join columns - Defaults: Column default values
12. MCP Tool: Execute Polars SQL (Files)
query_description = """Polars SQL query. MUST use 'self' as table name.
Supports: avg, count, sum, max, min, where, group by, order by, joins, etc."""
@mcp.tool()
def execute_polars_sql(
file_locations: List[str],
query: str = Field(description=query_description),
file_type: str = Field(default="csv", description="csv or parquet"),
) -> List[Dict[str, Any]]:
"""
Execute SQL query on CSV or Parquet files using Polars.
**Use for file-based datasets only.**
For database tables, use execute_database_query().
**CRITICAL:** Query MUST use 'self' as table name, NOT the filename.
Example: execute_polars_sql(
file_locations=["data/employees.csv"],
query="SELECT COUNT(*) FROM self WHERE IsActive = true"
)
"""
business_function = get_current_business_function()
logger.info(
f"execute_polars_sql called for business_function: {business_function}"
)
logger.info(f"File locations ({len(file_locations)}): {file_locations}")
logger.info(f"Query: {query}")
logger.debug(f"File type: {file_type}")
# Validation: Check for common mistakes
query_lower = query.lower()
# Check if query references database schema (wrong tool)
if "mcp." in query or "dbo." in query or any(
schema in query for schema in ["mcp.", "dbo.", "hr.", "sales."]
):
error_msg = (
"WRONG TOOL: This query references a database table "
"(schema.table format). Use execute_database_query() instead."
)
logger.error(error_msg)
return [{
"error": error_msg,
"hint": "File queries must use 'self' as table name, "
"not schema.table",
"your_query": query
}]
# Check if query uses 'self' (required for Polars)
if "self" not in query_lower:
error_msg = (
"INVALID QUERY: Polars SQL queries must use 'self' "
"as the table name."
)
logger.error(error_msg)
return [{
"error": error_msg,
"hint": "Change your query to: SELECT ... FROM self WHERE ...",
"your_query": query
}]
# Enforce that all file_locations are within business function folder
allowed_prefix = os.path.abspath(
f"/mnt/azure/mcp/{business_function}/"
)
for file_location in file_locations:
if not os.path.abspath(file_location).startswith(allowed_prefix):
error_msg = (
f"Access denied: file_location '{file_location}' is not "
f"within the allowed business_function folder "
f"'{allowed_prefix}'. You may only query files in your "
"assigned business_function folder."
)
logger.error(error_msg)
return [{"error": error_msg, "your_query": query}]
try:
logger.debug("Reading files into DataFrame")
df = read_file_list(file_locations, file_type)
logger.info(f"DataFrame loaded, shape: {df.shape}")
logger.debug("Executing SQL query on DataFrame")
op_df = df.sql(query)
logger.info(f"Query executed successfully, result shape: {op_df.shape}")
output_records = op_df.to_dicts()
logger.info(f"Returning {len(output_records)} records")
logger.debug(
f"First record (if any): "
f"{output_records[0] if output_records else 'No records'}"
)
return output_records
except Exception as e:
error_msg = f"Query execution failed: {str(e)}"
logger.error(error_msg, exc_info=True)
return [{"error": error_msg, "your_query": query}]
Validation and security: - Tool detection: Catches wrong tool usage (database vs file queries) - Syntax validation: Ensures 'self' is used as table name - Path validation: Prevents access to files outside business function - Error handling: Returns helpful error messages to LLM
13. MCP Tool: Execute Database Query (SQL Server)
@mcp.tool()
def execute_database_query(
query: str = Field(
description="T-SQL query to execute on SQL Server database"
),
max_rows: int = Field(
default=1000,
description="Maximum rows to return (default: 1000, set to 0 for unlimited)"
)
) -> List[Dict[str, Any]]:
"""
Execute T-SQL queries on SQL Server database tables.
**Use for database tables only.**
For CSV/Parquet files, use execute_polars_sql().
**CRITICAL:** Use schema.table format (e.g., mcp.employees), NOT 'self'.
Example: execute_database_query(
query="SELECT COUNT(*) FROM mcp.vw_ADP_Employee_Daily_Snapshot
WHERE Position_Status = 'Active'"
)
Default limit: 1000 rows. Set max_rows=0 for unlimited.
"""
business_function = get_current_business_function()
logger.info(
f"execute_database_query called for "
f"business_function: {business_function}"
)
logger.info(f"Query: {query}")
logger.debug(f"Max rows: {max_rows}")
if not is_database_configured():
logger.error("Database not configured")
return [{"error": "No database connection configured."}]
# Validation: Check for common mistakes
query_lower = query.lower()
# Check if query uses 'self' (wrong tool)
if " self" in query_lower or "from self" in query_lower or \
"join self" in query_lower:
error_msg = (
"WRONG TOOL: This query uses 'self' as table name. "
"Use execute_polars_sql() for file-based queries."
)
logger.error(error_msg)
return [{
"error": error_msg,
"hint": "Database queries must use schema.table format "
"(e.g., mcp.employees), not 'self'",
"your_query": query
}]
# Check if query has schema.table format
if "mcp." not in query_lower and "dbo." not in query_lower:
error_msg = (
"INVALID QUERY: Database queries must use schema.table format."
)
logger.error(error_msg)
return [{
"error": error_msg,
"hint": "Use: SELECT ... FROM mcp.table_name or dbo.table_name",
"your_query": query
}]
# Validate business_function in table name
if f"{business_function}_" not in query_lower:
logger.warning(
f"Query does not appear to reference business_function "
f"'{business_function}' tables"
)
try:
conn = get_db_connection()
cursor = conn.cursor()
# Add TOP clause if not present and max_rows is set
modified_query = query
if max_rows > 0:
query_upper = query.strip().upper()
if not query_upper.startswith("SELECT TOP") and \
query_upper.startswith("SELECT"):
modified_query = query.strip()
# Insert TOP clause after SELECT
select_pos = modified_query.upper().find("SELECT")
modified_query = (
modified_query[:select_pos + 6] +
f" TOP {max_rows}" +
modified_query[select_pos + 6:]
)
logger.debug(f"Added TOP {max_rows} clause to query")
logger.debug(f"Executing modified query: {modified_query}")
cursor.execute(modified_query)
# Get column names
columns = [column[0] for column in cursor.description]
logger.debug(f"Result columns ({len(columns)}): {columns}")
# Fetch results
results = []
row_count = 0
for row in cursor.fetchall():
row_count += 1
# Convert values to JSON-serializable types
row_dict = {}
for col_name, value in zip(columns, row):
# Handle datetime and other types
if hasattr(value, 'isoformat'):
row_dict[col_name] = value.isoformat()
elif value is None:
row_dict[col_name] = None
else:
row_dict[col_name] = value
results.append(row_dict)
logger.info(f"Query executed successfully, returned {row_count} rows")
if results:
logger.debug(f"First row: {results[0]}")
return results
except Exception as e:
error_msg = f"Query execution error: {str(e)}"
logger.error(error_msg, exc_info=True)
return [{"error": error_msg, "your_query": query}]
Key features: - Automatic row limiting: Adds TOP clause to prevent massive result sets - JSON serialization: Converts datetime and other types properly - Tool validation: Catches wrong tool usage (file vs database queries) - Security: Validates business function table naming
14. Main Entry Point and Server Startup
import uvicorn
def main():
"""Main entry point for MCP server"""
logger.info("=" * 80)
logger.info("MAIN: Starting single MCP Server on port 8000")
logger.info("=" * 80)
try:
# Create the FastMCP HTTP app
app = mcp.http_app()
logger.info("FastMCP HTTP app created successfully")
# Add the business function extraction middleware
logger.info("Adding BusinessFunctionExtractionMiddleware to app")
app.add_middleware(BusinessFunctionExtractionMiddleware)
logger.info("Middleware added successfully")
logger.info("Server configuration:")
logger.info(" - Host: 0.0.0.0")
logger.info(" - Port: 8000")
logger.info(" - Functions supported: IT, HR, FIN")
logger.info(" - Functions extraction: Via Query String (mcp?function=XXX)")
logger.info("=" * 80)
# Run the server - this blocks indefinitely
logger.info("Starting Uvicorn server...")
config = uvicorn.Config(
app,
host="0.0.0.0",
port=8000,
log_level="info",
access_log=True,
loop="asyncio",
timeout_keep_alive=75
)
server = uvicorn.Server(config)
server.run()
except Exception as e:
logger.critical(f"Failed to start server: {str(e)}", exc_info=True)
raise
finally:
logger.info("=" * 80)
logger.info("MAIN: MCP Server shutdown complete")
logger.info("=" * 80)
if __name__ == "__main__":
logger.info("=" * 80)
logger.info("Script execution started - Single Process Mode")
logger.info("=" * 80)
try:
main()
except KeyboardInterrupt:
logger.info("\n" + "=" * 80)
logger.info("MAIN: Keyboard interrupt received, shutting down...")
logger.info("=" * 80)
except Exception as e:
logger.critical(f"Fatal error in main execution: {str(e)}", exc_info=True)
raise
Server configuration: - Port 8000: Standard HTTP port for MCP servers - Host 0.0.0.0: Listen on all network interfaces - Async event loop: Supports concurrent requests - Keep-alive timeout: 75 seconds for long-running queries - Access logging: All requests logged for auditing
Deployment: Azure Container Instance with TLS Offloading
Container Build and Deployment
1. Dockerfile:
FROM mcr.microsoft.com/devcontainers/python:3
# Keeps Python from generating .pyc files in the container
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1
# Install only necessary packages for MCP Analyst
RUN apt-get update && \
apt-get install -y \
build-essential \
python3-dev \
git \
curl \
cifs-utils && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*
# Copy and install Python dependencies
COPY requirements.txt .
RUN python -m pip install --upgrade pip && \
python -m pip install -r requirements.txt
# Add Microsoft GPG key
RUN curl -fsSL https://packages.microsoft.com/keys/microsoft.asc | gpg --dearmor -o /usr/share/keyrings/microsoft-prod.gpg
# Add Microsoft SQL Server repository
RUN curl https://packages.microsoft.com/config/debian/12/prod.list | tee /etc/apt/sources.list.d/mssql-release.list
# Install Microsoft SQL Server related packages
RUN apt-get update && \
ACCEPT_EULA=Y apt-get install -y \
msodbcsql18 \
mssql-tools18
# Add MS SQL Server tools to PATH
RUN echo 'export PATH="$PATH:/opt/mssql-tools18/bin"' >> ~/.bashrc
# Clean up
RUN apt-get clean && \
rm -rf /var/lib/apt/lists/*
#Argument passed in via Command line
ARG GITHUB_PAT
# Clone the private repository using the build argument
RUN git clone https://$GITHUB_PAT@github.com/GDEnergyproducts/GDEP-MCP-ANALYST.git /app
WORKDIR /app
RUN chmod +x scripts/container/mountstorage.sh
# Create a startup script that runs mcp server
RUN echo '#!/bin/bash\n\
set -e\n\
echo "Starting container initialization..."\n\
echo "Starting analyst.py..."\n\
python /app/src/analyst.py\n\
' > /app/start.sh && chmod +x /app/start.sh
CMD ["/app/start.sh"]
2. requirements.txt:
# Core MCP dependencies
polars>=0.20.0
pydantic>=2.0.0
# Azure dependencies (from your existing setup)
azure-identity
azure-keyvault-secrets
# Data processing
pandas
pyarrow>=14.0.0
pyodbc>=5.3.0
# Development/debugging tools
pytest
ipython
# Need this !!
fastmcp
TLS Offloading with FortiGate
Why TLS offloading at the edge? - SSL/TLS termination: FortiGate handles all encryption/decryption - Certificate management: Centralized SSL cert renewal - Performance: Offload crypto operations from container - Security: Web Application Firewall (WAF) protection - Inspection: Deep packet inspection for threats
FortiGate configuration (simplified):
config firewall vip
edit "mcp-analyst-vip"
set extip 203.0.113.10
set mappedip 10.0.1.100 # Container internal IP
set extintf "wan1"
set portforward enable
set extport 443
set mappedport 8000
set ssl-certificate "wildcard-cert"
next
end
config firewall policy
edit 1
set name "Allow-MCP-HTTPS"
set srcintf "wan1"
set dstintf "internal"
set srcaddr "all"
set dstaddr "mcp-analyst-vip"
set action accept
set schedule "always"
set service "HTTPS"
set utm-status enable
set ssl-ssh-profile "certificate-inspection"
set av-profile "default"
set webfilter-profile "default"
set ips-sensor "default"
next
end
Result:
- External URL: https://mcp.gdenergyproducts.com (port 443)
- Internal traffic: HTTP to container (port 8000)
- TLS handled by FortiGate with corporate SSL certificate
Claude Desktop Configuration
To connect Claude Desktop to your MCP server:
- Open Claude Desktop and go to File → Settings.
- Select Connector, then click Add Connector.
- Enter the following URL (replace
function=itas needed):https://mcp.gdenergyproducts.com/mcp?function=it - Click Connect and go through the OAuth authentication process.
- If Claude Desktop does not restart automatically, restart it manually.
- After restart, go back to Configure and allow access as appropriate. If not, you will be prompted for access when each tool is used.
4. Start asking questions! - "How many Windows devices do we have?" - "Show me employees terminated in the last 90 days" - "What's our Office 365 license utilization?"
Security Best Practices
1. Authentication and Authorization
- ✅ OAuth 2.0 (Azure Entra ID) only: No API keys or basic auth
- ✅ Per-user context: Track who queries what
- ✅ Business function isolation: Users can't cross boundaries
- ✅ Audit logging: Every tool call logged with user identity
2. Data Protection
- ✅ TLS in transit: All external traffic encrypted
- ✅ No data exfiltration: Results returned only to authenticated user
- ✅ Path validation: Prevents directory traversal
- ✅ Query validation: Catches SQL injection attempts
3. Secrets Management
- ✅ Azure Key Vault: All secrets stored securely
- ✅ No hardcoded credentials: Injected at runtime
- ✅ Least privilege: Service principals with minimal permissions
- ✅ Secret rotation: Automated via Key Vault
4. Network Security
- ✅ TLS offloading: FortiGate terminates SSL/TLS
- ✅ WAF protection: Web Application Firewall blocks attacks
- ✅ Internal-only database: No public internet exposure
- ✅ VNet integration: Container in private virtual network
Performance and Scalability
Performance Characteristics
File queries (Polars): - 1 million rows: ~1-2 seconds - 10 million rows: ~5-10 seconds - Concurrent users: 20-50 simultaneous queries
Database queries (SQL Server): - Simple SELECT: <100ms - Complex joins: 1-5 seconds - Limited by database performance, not MCP server
Memory usage: - Base: ~200MB - Per concurrent query: +50-500MB (depends on data size) - Recommended: 4GB RAM for 10+ concurrent users
Scaling Strategies
Vertical scaling (single container): - Increase CPU/RAM allocation - Good for up to 50 concurrent users
Horizontal scaling (multiple containers): - Deploy behind Azure Load Balancer - Session affinity not required (stateless) - Scale to 100+ users
Database optimization: - Create indexed views for common queries - Partition large tables by business function - Use columnstore indexes for analytics
Monitoring and Troubleshooting
Key Metrics to Monitor
1. Application logs (/mnt/azure/logs/gdepmcp.log):
- Tool call frequency and duration
- Authentication failures
- Query errors
- User activity patterns
2. Container metrics (Azure Monitor): - CPU utilization - Memory usage - Network throughput - Container restart count
3. Database metrics (SQL Server DMVs): - Long-running queries - Connection pool usage - Lock contention - Index usage
Common Issues and Solutions
Issue: OAuth authentication fails
Solution:
- Verify accessTokenAcceptedVersion: 2 in manifest
- Check redirect URIs include Claude callback URL
- Ensure access_as_user scope is granted admin consent
Issue: "Access denied" errors for files
Solution:
- Verify business function in URL matches data folder
- Check file permissions in Azure File Share
- Validate path in get_schema() or execute_polars_sql()
Issue: Database queries timeout
Solution:
- Check network connectivity to SQL Server
- Verify firewall allows container IP
- Optimize query with indexes
- Increase max_rows limit or add WHERE clause
Issue: Container crashes with OOM (Out of Memory)
Solution:
- Increase container memory allocation
- Add pagination to large queries
- Limit infer_schema_length in Polars
- Review concurrent user load
Lessons Learned
What Worked Well
- OAuth 2.0 (Azure Entra ID) integration: Seamless enterprise SSO
- Multi-tenant architecture: Single server for all business functions
- Polars performance: Handles millions of rows effortlessly
- Data catalog: LLM generates correct queries consistently
- TLS offloading: Simplified container deployment
- Key Vault: Secure, centralized secrets management
Challenges and Solutions
- OAuth version gotcha: Manually edit manifest to v2.0
-
Solution: Document clearly in setup guide
-
Claude Desktop callback: Not in standard OAuth flow
-
Solution: Add
claude.ai/api/mcp/auth_callbackto redirect URIs -
SAP delimiter detection: Semicolon vs comma
-
Solution: Detect
sap_prefix in filename -
Connection pooling: Database connections timing out
-
Solution: Implement health checks and reconnection logic
-
Log file growth: Filled disk in production
- Solution: Use
RotatingFileHandlerwith size limits
Future Enhancements
- Row-level security: Filter data based on user's Azure groups
- Query caching: Redis cache for repeated queries
- Rate limiting: Prevent abuse by individual users
- Advanced analytics: Support for Python/R code execution
- Mobile app: Native iOS/Android client with OAuth
- Real-time data: WebSocket streaming for live dashboards
Conclusion
The evolution from stdio-based to HTTPS-based MCP server represents a quantum leap in enterprise readiness:
- From single-user to multi-tenant: One server, many users, isolated data
- From no auth to OAuth 2.0 (Azure Entra ID): Enterprise-grade security and SSO
- From local to cloud: Scalable, containerized deployment
- From manual to automated: Zero per-user setup, instant access
This architecture enables true self-service analytics for business users while maintaining the security, governance, and scalability requirements of enterprise IT. By combining Azure Entra ID, containerization, TLS offloading, and multi-tenant data isolation, you can deliver LLM-powered analytics that business users love and IT teams trust.
The result: Business leaders can ask any question, across any dataset, with natural language—securely, instantly, and without waiting for IT.
References and Credits
- FastMCP Documentation – MCP server framework with OAuth support
- Polars DataFrame Library – High-performance data processing
- Claude Desktop – LLM client with custom HTTPS connectors