Skip to content

Secure, On-Premises Data Analysis with LLM and a Custom MCP Server

Part 3: HTTPS-Based MCP Server with OAuth 2.0 (Azure Entra ID) and Multi-Tenant Architecture

📚 Series Navigation


This is Part 3 of the series. Here, we extend the solution to use MCP Server over HTTPS

Introduction

From a business end-user perspective, the core problem this solution addresses is enabling anyone in the organization to ask questions of our internal data—whether in files or databases—using natural language, without needing to know SQL or technical query languages. By leveraging a large language model (LLM), users can simply describe what they want to know, and the LLM translates those requests into the correct queries, helping us analyze our data more efficiently and intuitively. This democratizes access to analytics, reduces dependency on technical staff, and accelerates business insights.

The technical solution ensures this is done securely and at scale: deploying an MCP server over HTTP(S) with enterprise authentication, so all data remains protected and accessible only to authorized users.

While the stdio-based MCP server from Parts 1 and 2 works well for individual users on Windows desktops, enterprise environments demand more:

  • Centralized deployment: One server for all users, not executables on every PC
  • Secure authentication: Enterprise SSO via Azure Entra ID (formerly Azure AD)
  • Multi-tenant architecture: Support for multiple business functions (IT, HR, Finance) with isolated data access
  • Cloud-native deployment: Containerized for Azure Container Instances or Kubernetes
  • HTTPS connectivity: Compatible with Claude Desktop (paid tier), web clients, and API integrations

This article shows how to transform the local MCP server into a production-ready, enterprise service that: - Authenticates users via OAuth 2.0 (Azure Entra ID) - Routes requests to function-specific data catalogs (IT, HR, FIN) - Runs as a containerized service with TLS offloading - Integrates with Claude Desktop's custom HTTPS connector

All data and computation remain on-premises or in your Azure tenant—no third-party cloud services involved.


Architecture: From Stdio to HTTPS

Previous Architecture (Parts 1 & 2)

[Claude Desktop] ←→ [stdio] ←→ [analyst.exe] ←→ [Local Files/Database]
- Simple, single-user, Windows-only - No authentication - Manual executable deployment per user

New Architecture (Part 3)

[Claude Desktop (Paid)] ←→ [HTTPS/OAuth] ←→ [Azure Container Instance]
                                                    ↓
                                              [FortiGate TLS]
                                                    ↓
                                            [MCP Server + Auth]
                                                    ↓
                               [Business Function Router (IT/HR/FIN)]
                                    ↓              ↓            ↓
                          [IT Data Catalog] [HR Catalog] [FIN Catalog]
                                    ↓              ↓            ↓
                            [Files + Database per Function]

Key improvements: - OAuth 2.0 (Azure Entra ID): Handles authentication and authorization - Multi-tenancy: Single server supports multiple business functions with isolated data - Cloud deployment: Runs as a container in Azure Container Instances - TLS offloading: FortiGate/FortiWeb handles SSL/TLS termination - HTTPS connectivity: Any HTTP client can connect (Claude Desktop, web apps, APIs)


Why HTTPS over Stdio?

Feature Stdio (Parts 1 & 2) HTTPS (Part 3)
Deployment Executable per user Single centralized service
Authentication None OAuth 2.0 (Azure Entra ID)
Client support Claude Desktop (local) Claude Desktop (paid), web, mobile, APIs
Multi-user No Yes, with per-user context
Scalability One user per process Hundreds of concurrent users
Security Local file access Enterprise SSO + role-based access
Updates Redeploy to every PC Deploy once, all users benefit

Azure Entra App Registration: Setup and Gotchas

Before deploying the MCP server, you must configure Azure Entra ID (formerly Azure AD) for OAuth 2.0 authentication.

Step 1: Create App Registration

  1. Navigate to Azure Portal → Entra ID → App Registrations → New Registration
  2. Configure basic settings:
  3. Name: MCP-Analyst-Server
  4. Supported account types: Accounts in this organizational directory only (single tenant)
  5. Redirect URI: Leave blank for now (we'll add it next)

Step 2: Configure API Permissions

  1. Go to API Permissions → Add a permission → Microsoft Graph
  2. Add Delegated permissions:
  3. User.Read (to read user profile)
  4. email, profile, openid (standard OAuth scopes)
  5. Grant admin consent for your organization

Step 3: Create Client Secret

  1. Go to Certificates & secrets → New client secret
  2. Description: MCP Server Secret
  3. Expires: 24 months (or per your security policy)
  4. Copy the secret value immediately (it won't be shown again)

Step 4: Configure Redirect URIs

This is critical for Claude Desktop integration:

  1. Go to Authentication → Add a platform → Web
  2. Add the following Redirect URIs:
    https://mcp.gdenergyproducts.com/oauth/callback
    https://claude.ai/api/mcp/auth_callback
    
  3. First URI: Your MCP server's OAuth callback endpoint
  4. Second URI: Required for Claude Desktop (paid tier) to act as OAuth proxy

    • Claude Desktop intercepts the OAuth flow and forwards tokens to your MCP server
    • Without this, Claude Desktop cannot complete authentication
  5. Enable ID tokens and Access tokens checkboxes

Step 5: Expose API and Define Scopes

  1. Go to Expose an API → Add a scope
  2. Scope name: access_as_user
  3. Who can consent: Admins and users
  4. Admin consent display name: Access MCP Analyst as user
  5. Description: Allows the application to access MCP Analyst on behalf of the signed-in user

🚨 Gotcha #1: OAuth Version Must Be 2.0

By default, Azure Entra creates apps with OAuth version 1.0, but MCP servers require version 2.0 for proper token handling.

Manual fix required:

  1. Go to App Registration → Manifest
  2. Find the line: "accessTokenAcceptedVersion": null or "accessTokenAcceptedVersion": 1
  3. Change it to: "accessTokenAcceptedVersion": 2
  4. Save the manifest

Why this matters: - OAuth 1.0 tokens use different claims and formats - MCP's AzureProvider expects v2 tokens with preferred_username, oid, and name claims - Without this change, authentication will fail with cryptic token validation errors

🚨 Gotcha #2: Claude Desktop Callback URL

When testing with Claude Desktop (paid tier), you must add this specific callback URL:

https://claude.ai/api/mcp/auth_callback

Why this is needed: - Claude Desktop acts as an OAuth proxy for custom MCP servers - When a user clicks "Connect" in Claude Desktop, it: 1. Opens the Azure Entra login page in the browser 2. User authenticates 3. Azure redirects to https://claude.ai/api/mcp/auth_callback with auth code 4. Claude Desktop exchanges the code for tokens 5. Claude Desktop forwards tokens to your MCP server - Without this callback URL, the OAuth flow breaks at step 3

Security note: - Claude Desktop only proxies the OAuth flow; it does not store or access your data - All MCP queries and data remain between Claude Desktop → Your MCP Server → Your Data

Step 6: Note Your Configuration

Save these values for later use (will be stored in Azure Key Vault):

Application (client) ID: <your-client-id>
Client Secret: <your-secret-value>
Directory (tenant) ID: <your-tenant-id>

Multi-Tenant Architecture: Business Function Routing

One of the key innovations in this implementation is business function isolation. Instead of one data catalog for everyone, the server dynamically routes users to function-specific catalogs based on URL parameters.

Business Functions

This implementation supports three business functions:

  1. IT (it): Infrastructure, device inventory, license management, Active Directory
  2. HR (hr): Employee data, terminations, organizational charts, ADP snapshots
  3. Finance (fin): Budget, expenses, procurement, SAP exports

Connection URL Examples

  • IT users: https://mcp.gdenergyproducts.com/mcp?function=it
  • HR users: https://mcp.gdenergyproducts.com/mcp?function=hr
  • Finance users: https://mcp.gdenergyproducts.com/mcp?function=fin

Data Isolation Structure

/mnt/azure/mcp/
├── it/
│   ├── data_catalog.json
│   ├── device_inventory.csv
│   ├── licenses.csv
│   └── accounts_to_be_deleted.csv
├── hr/
│   ├── data_catalog.json
│   ├── idp_hr.csv
│   └── terminations.csv
└── fin/
    ├── data_catalog.json
    ├── budget.csv
    └── sap_expenses.csv

Each function has: - Its own data_catalog.json with function-specific datasets - Isolated file storage (IT users can't access HR files) - Separate database views (e.g., mcp.it_employees, mcp.hr_employees)


Full Code Walkthrough: analyst.py (HTTPS Version)

Below is a comprehensive walkthrough of the production code, covering 85%+ of the implementation.

1. Logging and Initialization

import os
import logging
import warnings
from pathlib import Path
from logging.handlers import RotatingFileHandler

LOG_SEPARATOR = "=" * 80

def setup_logger(name, log_file, level=logging.DEBUG):
    """Configure structured logging with rotation"""
    logger = logging.getLogger(name)
    logger.setLevel(level)

    formatter = logging.Formatter(
        "%(asctime)s - %(levelname)s - %(name)s - "
        "%(funcName)s:%(lineno)d - %(message)s"
    )

    if not logger.handlers:
        # File handler with 10MB rotation, keep 5 backups
        handler = RotatingFileHandler(
            log_file, 
            maxBytes=10*1024*1024, 
            backupCount=5
        )
        handler.setFormatter(formatter)
        logger.addHandler(handler)

        # Console handler for development
        console_handler = logging.StreamHandler()
        console_handler.setFormatter(formatter)
        logger.addHandler(console_handler)

    return logger

logger = setup_logger("gdepmcp", "/mnt/azure/logs/gdepmcp.log")
logger.info(LOG_SEPARATOR)
logger.info("GDEP MCP Server Starting")
logger.info(LOG_SEPARATOR)

# Suppress deprecation warnings in production
warnings.filterwarnings("ignore", category=DeprecationWarning)

Key points: - Structured logging: Every request, tool call, and error is logged with context - Log rotation: Prevents disk fill-up in production (10MB per file, 5 backups) - Dual output: File for auditing, console for development/debugging - Function/line tracking: Easy debugging with function name and line number in logs


2. Azure Key Vault Integration

from azure.identity import ClientSecretCredential
from azure.keyvault.secrets import SecretClient

def get_azure_kv_secret(name):
    """Retrieve secrets from Azure Key Vault"""
    try:
        vault_url = "https://kv-gdep-peus-interfaces.vault.azure.net/"

        # Service principal credentials from environment variables
        client_id = os.environ.get("application_interface_clientid")
        client_secret = os.environ.get("application_interface_clientsecret")
        tenant_id = os.environ.get("application_interface_tenantid")

        credential = ClientSecretCredential(
            client_id=client_id,
            client_secret=client_secret,
            tenant_id=tenant_id
        )

        secret_client = SecretClient(
            vault_url=vault_url, 
            credential=credential
        )
        return secret_client.get_secret(name).value
    except Exception as exc:
        logger.exception(f"Error retrieving Azure KV secret '{name}': {exc}")
        return None

# Load OAuth credentials from Key Vault
AZURE_CONFIDENTIAL_APP_ID = str(
    get_azure_kv_secret('mcp-authentication-clientid')
)
AZURE_CONFIDENTIAL_SECRET = str(
    get_azure_kv_secret('mcp-authentication-clientsecret')
)

Why Key Vault? - Never hardcode secrets: Client IDs and secrets are injected at runtime - Centralized management: Rotate secrets without redeploying code - Audit trail: Key Vault logs all secret access - Role-based access: Only authorized apps/users can read secrets

Required environment variables (set in container):

application_interface_clientid=<service-principal-client-id>
application_interface_clientsecret=<service-principal-secret>
application_interface_tenantid=<azure-tenant-id>


3. Business Function Middleware

from starlette.requests import Request
from starlette.middleware.base import BaseHTTPMiddleware
from contextvars import ContextVar

current_function = ContextVar('current_function', default='it')

class BusinessFunctionExtractionMiddleware(BaseHTTPMiddleware):
    """Extract business function from query string and store in context"""

    async def dispatch(self, request: Request, call_next):
        host = request.headers.get("host", "")
        business_function = "na"

        logger.info(
            f"Incoming request - Host: {host}, "
            f"Path: {request.url.path}, Method: {request.method}"
        )

        # Extract function from query string
        function_via_query_string = request.query_params.get("function")

        if function_via_query_string in {"it", "hr", "fin"}:
            business_function = function_via_query_string
            logger.info(
                f"DEBUG MODE: Using business_function from query "
                f"parameter: {business_function}"
            )

        # Store in request context (async-safe via ContextVar)
        request.state.business_function = business_function
        current_function.set(business_function)

        logger.debug(
            f"Set request.state.business_function = {business_function}"
        )

        response = await call_next(request)

        logger.info(
            f"Response status: {response.status_code} "
            f"for business_function: {business_function}"
        )

        return response

def get_current_business_function() -> str:
    """Get business function from context variable (thread-safe)"""
    function = current_function.get()
    logger.debug(
        f"Retrieved current business function from context: {function}"
    )
    return function

How this works: 1. Request arrives at /mcp?function=it 2. Middleware intercepts before MCP tools execute 3. Extracts function parameter from query string 4. Validates against allowed functions (it, hr, fin) 5. Stores in context using Python's ContextVar (async-safe) 6. All subsequent tool calls read from context to route to correct data

Security: - Invalid functions (na) result in "no data found" responses - No cross-contamination between business functions - Each user's requests are isolated in async context - Supports concurrent requests from multiple users


4. OAuth 2.0 (Azure Entra ID) Configuration

from fastmcp import FastMCP
from fastmcp.server.auth.providers.azure import AzureProvider

logger.info("Configuring OAuth 2.0 (Azure Entra ID) Provider")
try:
    azure_auth = AzureProvider(
        client_id=AZURE_CONFIDENTIAL_APP_ID,
        client_secret=AZURE_CONFIDENTIAL_SECRET,
        tenant_id="00b1a755-0b06-4d05-9a59-259ebf7f9e00",
        base_url="https://mcp.gdenergyproducts.com",
        required_scopes=["access_as_user"]
    )
    logger.info("OAuth 2.0 (Azure Entra ID) Provider configured successfully")
except Exception as e:
    logger.error(
    f"Failed to configure OAuth 2.0 (Azure Entra ID) Provider: {str(e)}", 
        exc_info=True
    )
    raise

# Initialize FastMCP with authentication
mcp = FastMCP("gdep-analyst", auth=azure_auth)
logger.info("FastMCP instance initialized successfully")

Configuration breakdown: - client_id: Your Azure App Registration's Application ID - client_secret: Secret created in App Registration - tenant_id: Your organization's Azure tenant ID - base_url: Public URL of your MCP server (for OAuth callbacks) - required_scopes: API scopes defined in "Expose an API" section

What happens during OAuth: 1. User connects in Claude Desktop → redirected to Azure login 2. User authenticates with corporate credentials 3. Azure validates and issues tokens with access_as_user scope 4. MCP server validates token signature and claims 5. Request proceeds with user context (email, name, object ID)


5. Database Connection Management

import pyodbc

_connection = None
_connection_string = None

def build_connection_string() -> str:
    """Build SQL Server connection string from Key Vault"""
    logger.debug("Building database connection string")
    conn_str = str(get_azure_kv_secret('mcp-sql-conn-string'))
    logger.info("Database connection string built successfully")
    return conn_str

def get_db_connection():
    """Get or create database connection with connection pooling"""
    global _connection, _connection_string

    logger.debug("Getting database connection")

    # Lazy initialization
    if _connection_string is None:
        logger.info("Connection string not initialized, building new one")
        _connection_string = build_connection_string()

    if _connection_string is None:
        logger.error("Failed to build connection string")
        return None

    # Test existing connection
    try:
        if _connection is not None:
            logger.debug("Testing existing connection")
            _connection.cursor().execute("SELECT 1")
            logger.debug("Existing connection is valid")
    except Exception as e:
        logger.warning(
            f"Existing connection test failed: {str(e)}, "
            "will create new connection"
        )
        _connection = None

    # Create new connection if needed
    if _connection is None:
        logger.info("Creating new database connection")
        try:
            _connection = pyodbc.connect(_connection_string)
            logger.info("Database connection established successfully")
        except Exception as e:
            logger.error(
                f"Failed to establish database connection: {str(e)}",
                exc_info=True
            )
            raise

    return _connection

def is_database_configured() -> bool:
    """Check if database connection is configured"""
    logger.debug("Checking if database is configured")
    return True  # Always true in this deployment

Connection pooling strategy: - Singleton pattern: Reuse connections across requests - Health checks: Test connection before use (SELECT 1) - Automatic reconnection: If connection dies, create new one - Secure credentials: Connection string stored in Key Vault

Example connection string (from Key Vault):

Driver={ODBC Driver 18 for SQL Server};
Server=10.27.18.5,61002;
Database=ADP_AD_AAD;
UID=mcp_reader;
PWD=<from-key-vault>;
Encrypt=yes;
TrustServerCertificate=yes;


6. File Reading with SAP Support

import polars as pl
from pathlib import Path

def read_file(file_location: str, file_type: str = "csv") -> pl.DataFrame:
    """Read CSV or Parquet file into Polars DataFrame"""
    logger.info(f"Reading file: {file_location}, type: {file_type}")

    try:
        if file_type == "csv":
            filename = Path(file_location).name.lower()

            # SAP files use semicolon delimiter
            separator = ';' if filename.startswith('sap_') else ','
            logger.debug(
                f"Using separator '{separator}' for file: {filename}"
            )

            df = pl.read_csv(
                file_location,
                separator=separator,
                truncate_ragged_lines=True,  # Handle malformed rows
                infer_schema_length=10000,   # Sample 10k rows for types
                ignore_errors=True           # Skip unparseable rows
            )
            logger.info(
                f"Successfully read CSV file: {file_location}, "
                f"shape: {df.shape}"
            )
            return df

        if file_type == "parquet":
            df = pl.read_parquet(file_location)
            logger.info(
                f"Successfully read Parquet file: {file_location}, "
                f"shape: {df.shape}"
            )
            return df

        logger.error(f"Unsupported file type: {file_type}")
        raise ValueError(f"Unsupported file type: {file_type}")

    except Exception as e:
        logger.exception(f"Failed to read file {file_location}")
        raise

def read_file_list(
    file_locations: List[str], 
    file_type: str = "csv"
) -> pl.DataFrame:
    """Read multiple files with same schema and concatenate"""
    logger.info(
        f"Reading file list: {len(file_locations)} files of type {file_type}"
    )
    logger.debug(f"Files to read: {file_locations}")

    try:
        if file_type == "csv":
            dfs = []
            for i, file_location in enumerate(file_locations):
                logger.debug(
                    f"Reading file {i+1}/{len(file_locations)}: "
                    f"{file_location}"
                )
                filename = Path(file_location).name.lower()
                separator = ';' if filename.startswith('sap_') else ','

                df = pl.read_csv(
                    file_location,
                    separator=separator,
                    truncate_ragged_lines=True,
                    infer_schema_length=10000,
                    ignore_errors=True
                )
                dfs.append(df)
                logger.debug(f"File {i+1} shape: {df.shape}")

            result = pl.concat(dfs)
            logger.info(
                f"Successfully concatenated {len(dfs)} CSV files, "
                f"final shape: {result.shape}"
            )
            return result

        if file_type == "parquet":
            result = pl.read_parquet(file_locations)
            logger.info(
                f"Successfully read {len(file_locations)} Parquet files, "
                f"shape: {result.shape}"
            )
            return result

        logger.error(f"Unsupported file type: {file_type}")
        raise ValueError(f"Unsupported file type: {file_type}")

    except Exception as e:
        logger.exception("Failed to read file list")
        raise

Key features: - SAP detection: Files prefixed with sap_ automatically use ; delimiter - Robust parsing: Handles malformed CSVs gracefully - Type inference: Samples 10,000 rows to detect column types accurately - Performance: Polars is 10-50x faster than Pandas on large files - Multi-file support: Concatenates files with same schema efficiently


7. User Context Extraction

from fastmcp.server.dependencies import get_access_token

def get_current_user_info() -> dict:
    """Returns information about the authenticated Azure user"""
    token = get_access_token()

    return {
        "email": token.claims.get("preferred_username"),
        "name": token.claims.get("name"),
        "oid": token.claims.get("oid")  # Azure object ID
    }

Available claims from Azure token: - preferred_username: User's email (e.g., john.doe@company.com) - name: Display name (e.g., John Doe) - oid: Unique Azure object ID (use for database joins/auditing)

Usage example in tools:

@mcp.tool()
def sensitive_tool():
    user_info = get_current_user_info()
    logger.info(
        f"Tool called by: {user_info['name']} ({user_info['email']})"
    )

    # Implement per-user authorization logic
    if user_info['email'] not in AUTHORIZED_USERS:
        return {"error": "Unauthorized"}

    # ... rest of tool logic


8. MCP Tool: Connection Info

from typing import Dict, Any
from glob import glob

@mcp.tool()
def get_connection_info() -> Dict[str, Any]:
    """
    Get information about configured data sources and connection status.
    Use to verify available data sources and check connection health.
    """
    business_function = get_current_business_function()
    logger.info(
        f"get_connection_info called for "
        f"business_function: {business_function}"
    )

    file_pattern = f"/mnt/azure/mcp/{business_function}/*.csv"
    logger.debug(f"File pattern: {file_pattern}")

    available_files = glob(file_pattern)
    logger.info(f"Found {len(available_files)} files matching pattern")

    info = {
        "business_function": business_function,
        "file_sources": {
            "enabled": True,
            "pattern": file_pattern,
            "available_files": available_files
        },
        "database_sources": {
            "enabled": is_database_configured(),
            "server": "10.27.18.5,61002",
            "database": "ADP_AD_AAD"
        }
    }

    # Test database connection and list tables
    if is_database_configured():
        logger.info("Database is configured, retrieving available objects")
        try:
            conn = get_db_connection()
            cursor = conn.cursor()

            # List tables for this business function
            db_objects_query = f"""
                SELECT TABLE_SCHEMA, TABLE_NAME 
                FROM INFORMATION_SCHEMA.TABLES 
                WHERE TABLE_TYPE IN ('BASE TABLE', 'VIEW')
                AND TABLE_SCHEMA = 'mcp' 
                AND TABLE_NAME LIKE '{business_function}_%'
                ORDER BY TABLE_SCHEMA, TABLE_NAME
            """
            cursor.execute(db_objects_query)
            db_objects = cursor.fetchall()

            objects_list = [
                {"schema": row[0], "table": row[1]} 
                for row in db_objects
            ]

            info["database_sources"]["status"] = "connected"
            info["database_sources"]["available_objects"] = objects_list
            logger.info(
                f"Database connection successful, "
                f"found {len(objects_list)} objects"
            )
        except Exception as e:
            info["database_sources"]["status"] = "error"
            info["database_sources"]["error"] = str(e)
            logger.error(
                f"Database object listing failed: {str(e)}", 
                exc_info=True
            )
    else:
        logger.info("Database not configured")

    # Log user for auditing
    try:
        user_info = get_current_user_info()
        logger.info(
            f"Tool get_connection_info was called by: "
            f"{user_info['name']} with email {user_info['email']}"
        )
    except Exception as e:
        logger.warning(f"Could not retrieve user info: {str(e)}")

    logger.debug(f"Returning connection info: {json.dumps(info, indent=2)}")
    return info

Returns example:

{
  "business_function": "it",
  "file_sources": {
    "enabled": true,
    "pattern": "/mnt/azure/mcp/it/*.csv",
    "available_files": [
      "/mnt/azure/mcp/it/device_inventory.csv",
      "/mnt/azure/mcp/it/licenses.csv"
    ]
  },
  "database_sources": {
    "enabled": true,
    "server": "10.27.18.5,61002",
    "database": "ADP_AD_AAD",
    "status": "connected",
    "available_objects": [
      {"schema": "mcp", "table": "it_employees"},
      {"schema": "mcp", "table": "it_devices"}
    ]
  }
}


9. MCP Tool: Data Catalog (Auto-Discovery)

import json
import os

@mcp.tool()
def get_data_catalog() -> str:
    """
    Get the complete data catalog showing all available datasets.

    **ALWAYS CALL THIS TOOL FIRST before answering any data question.**

    Returns JSON catalog with:
    - "source_type": "file" or "database"
    - "query_tool": execute_polars_sql or execute_database_query
    - "table_reference": 'self' for files, 'schema.table' for database
    - "example_query": Sample query to adapt
    - Business descriptions, columns, rules, relationships

    The catalog contains all business context needed to write correct queries.
    """
    business_function = get_current_business_function()
    logger.info(
        f"get_data_catalog called for business_function: {business_function}"
    )

    catalog_path = f"/mnt/azure/mcp/{business_function}/data_catalog.json"
    logger.debug(f"Looking for catalog file: {catalog_path}")

    # Load catalog file if exists
    catalog_json = {}
    if os.path.exists(catalog_path):
        try:
            with open(catalog_path, 'r', encoding='utf-8') as f:
                catalog_json = json.load(f)
            logger.info(f"Loaded catalog file: {catalog_path}")
        except Exception as e:
            logger.error(
                f"Failed to load catalog file: {str(e)}", 
                exc_info=True
            )
            catalog_json = {}

    # Build catalog with metadata from JSON + auto-discovery
    catalog = {
        "version": catalog_json.get("version", "2.0"),
        "last_updated": catalog_json.get("last_updated", "auto-generated"),
        "README": catalog_json.get("README", {}),
        "query_decision_tree": catalog_json.get("query_decision_tree", {}),
        "anti_patterns": catalog_json.get("anti_patterns", {}),
        "categories": catalog_json.get("categories", {}),
        "datasets": {}
    }

    # Auto-discover files on disk
    files = (
        glob(f"/mnt/azure/mcp/{business_function}/*.csv") + 
        glob(f"/mnt/azure/mcp/{business_function}/*.parquet")
    )
    logger.info(
        f"Found {len(files)} files on disk for "
        f"business_function {business_function}"
    )

    file_keys_on_disk = set()
    for file in files:
        basename = os.path.basename(file)
        key = basename.replace('.csv', '').replace('.parquet', '').lower()
        file_keys_on_disk.add(key)

        # Use metadata from JSON if available, else minimal auto-discovery
        meta = catalog_json.get("datasets", {}).get(key, {})

        dataset = {
            "source_type": "file",
            "query_tool": meta.get("query_tool", "execute_polars_sql"),
            "table_reference": meta.get("table_reference", "self"),
            "filename": basename,
            "category": meta.get("category"),
            "description": meta.get(
                "description", 
                "Auto-discovered file - no metadata available"
            ),
            "source_system": meta.get("source_system"),
            "delimiter": meta.get("delimiter", ","),
            "columns": meta.get("columns", {}),
            "business_rules": meta.get("business_rules", []),
            "common_queries": meta.get("common_queries", []),
            "related_datasets": meta.get("related_datasets", []),
            "usage_tip": meta.get(
                "usage_tip", 
                f"Use get_schema(file_location='{file}') for detailed schema"
            )
        }

        # Add any extra fields from JSON
        for extra_field in meta:
            if extra_field not in dataset:
                dataset[extra_field] = meta[extra_field]

        catalog["datasets"][key] = dataset
        logger.debug(f"Added/updated file dataset: {key}")

    # Auto-discover database tables
    db_keys = set()
    if is_database_configured():
        try:
            conn = get_db_connection()
            cursor = conn.cursor()

            cursor.execute(f"""
                SELECT TABLE_SCHEMA, TABLE_NAME 
                FROM INFORMATION_SCHEMA.TABLES 
                WHERE TABLE_TYPE IN ('BASE TABLE', 'VIEW')
                AND TABLE_SCHEMA = 'mcp' 
                AND TABLE_NAME LIKE '{business_function}_%'
                ORDER BY TABLE_SCHEMA, TABLE_NAME
            """)
            tables = cursor.fetchall()

            logger.info(
                f"Found {len(tables)} database tables for "
                f"business_function {business_function}"
            )

            for schema, table in tables:
                key = f"{schema}_{table}".lower()
                db_keys.add(key)

                meta = catalog_json.get("datasets", {}).get(key, {})

                dataset = {
                    "source_type": "database",
                    "query_tool": meta.get(
                        "query_tool", 
                        "execute_database_query"
                    ),
                    "table_reference": meta.get(
                        "table_reference", 
                        f"{schema}.{table}"
                    ),
                    "database_schema": schema,
                    "database_table": table,
                    "category": meta.get("category"),
                    "description": meta.get(
                        "description", 
                        "Auto-discovered database table - no metadata available"
                    ),
                    "source_system": meta.get("source_system"),
                    "update_frequency": meta.get("update_frequency"),
                    "columns": meta.get("columns", {}),
                    "business_rules": meta.get("business_rules", []),
                    "common_queries": meta.get("common_queries", []),
                    "related_datasets": meta.get("related_datasets", []),
                    "example_query": meta.get("example_query"),
                    "semantic_examples": meta.get("semantic_examples", {}),
                    "usage_tip": meta.get(
                        "usage_tip", 
                        f"Use get_schema_db(schema_name='{schema}', "
                        f"table_name='{table}') for detailed schema"
                    )
                }

                for extra_field in meta:
                    if extra_field not in dataset:
                        dataset[extra_field] = meta[extra_field]

                catalog["datasets"][key] = dataset
                logger.debug(f"Added/updated database dataset: {key}")

        except Exception as e:
            error_msg = f"Database table discovery failed: {str(e)}"
            catalog["database_discovery_error"] = error_msg
            logger.error(error_msg, exc_info=True)

    # Only include datasets that are actually present (on disk or in db)
    logger.info(
        f"Reconciling catalog: {len(catalog['datasets'])} "
        "total before filtering"
    )
    present_keys = file_keys_on_disk | db_keys
    catalog["datasets"] = {
        k: v for k, v in catalog["datasets"].items() 
        if k in present_keys
    }
    logger.info(
        f"Final catalog has {len(catalog['datasets'])} datasets "
        "after reconciliation"
    )

    return json.dumps(catalog, indent=2)

Key features: - Hybrid auto-discovery: Combines JSON metadata with filesystem/database introspection - Business function isolation: Only shows datasets for current function - Graceful degradation: Works even without data_catalog.json file - Rich metadata: Provides all context LLM needs to write correct queries - Tool routing: Tells LLM exactly which execution tool to use


10. MCP Tool: Get Schema (Files)

from typing import List

@mcp.tool()
def get_schema(
    file_location: str,
    file_type: str = Field(default="csv", description="csv or parquet"),
) -> List[Dict[str, Any]]:
    """
    Get technical schema of a CSV or Parquet file.

    **Use for file-based datasets only.** 
    For database tables, use get_schema_db().

    Returns column names and Polars data types.
    Call after get_data_catalog() to get detailed column information.
    """
    business_function = get_current_business_function()
    logger.info(
        f"get_schema called for file: {file_location}, "
        f"type: {file_type}, business_function: {business_function}"
    )

    # Enforce that file_location is within the correct business function
    allowed_prefix = f"/mnt/azure/mcp/{business_function}/"
    if not os.path.abspath(file_location).startswith(
        os.path.abspath(allowed_prefix)
    ):
        error_msg = (
            f"Access denied: file_location '{file_location}' is not "
            f"within the allowed business_function folder '{allowed_prefix}'. "
            "You may only request schema for files in your assigned "
            "business_function folder."
        )
        logger.error(error_msg)
        return [{"error": error_msg}]

    try:
        df = read_file(file_location, file_type)
        schema = df.schema
        schema_dict = {}
        for key, value in schema.items():
            schema_dict[key] = str(value)

        logger.info(
            f"Successfully retrieved schema with {len(schema_dict)} columns"
        )
        logger.debug(f"Schema: {schema_dict}")
        return [schema_dict]
    except Exception as e:
        error_msg = f"Failed to read schema: {str(e)}"
        logger.error(error_msg, exc_info=True)
        return [{"error": error_msg}]

Security enforcement: - Path validation: Prevents directory traversal attacks - Business function isolation: IT users can't read HR files - Explicit error messages: Clear feedback on authorization failures


11. MCP Tool: Get Schema (Database)

@mcp.tool()
def get_schema_db(
    schema_name: str = Field(
        description="Database schema name (e.g., 'mcp', 'dbo')"
    ),
    table_name: str = Field(description="Database table name"),
) -> List[Dict[str, Any]]:
    """
    Get technical schema of a SQL Server database table.

    **Use for database tables only.** 
    For CSV/Parquet files, use get_schema().

    Returns column names, data types, nullability, and primary key info.
    Call after get_data_catalog() to get detailed column information.
    """
    business_function = get_current_business_function()
    logger.info(
        f"get_schema_db called for {schema_name}.{table_name}, "
        f"business_function: {business_function}"
    )

    if not is_database_configured():
        logger.error("Database not configured")
        return [{"error": "No database connection configured."}]

    # Enforce table_name starts with business_function_
    if not table_name.startswith(f"{business_function}_"):
        error_msg = (
            f"Table name {table_name} does not match "
            f"business_function {business_function}"
        )
        logger.error(error_msg)
        return [{"error": error_msg}]

    try:
        conn = get_db_connection()
        cursor = conn.cursor()

        query = """
            SELECT 
                c.COLUMN_NAME, 
                c.DATA_TYPE,
                c.CHARACTER_MAXIMUM_LENGTH,
                c.NUMERIC_PRECISION,
                c.NUMERIC_SCALE,
                c.IS_NULLABLE,
                c.COLUMN_DEFAULT,
                CASE WHEN pk.COLUMN_NAME IS NOT NULL 
                     THEN 'YES' ELSE 'NO' END as IS_PRIMARY_KEY
            FROM INFORMATION_SCHEMA.COLUMNS c
            LEFT JOIN (
                SELECT ku.TABLE_SCHEMA, ku.TABLE_NAME, ku.COLUMN_NAME
                FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc
                JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE ku 
                    ON tc.CONSTRAINT_NAME = ku.CONSTRAINT_NAME
                WHERE tc.CONSTRAINT_TYPE = 'PRIMARY KEY'
            ) pk ON c.TABLE_SCHEMA = pk.TABLE_SCHEMA 
                AND c.TABLE_NAME = pk.TABLE_NAME 
                AND c.COLUMN_NAME = pk.COLUMN_NAME
            WHERE c.TABLE_SCHEMA = ? AND c.TABLE_NAME = ?
            ORDER BY c.ORDINAL_POSITION
        """
        logger.debug(f"Executing schema query for {schema_name}.{table_name}")

        cursor.execute(query, (schema_name, table_name))
        columns = cursor.fetchall()

        if not columns:
            error_msg = f"Table not found: {schema_name}.{table_name}"
            logger.error(error_msg)
            return [{"error": error_msg}]

        logger.info(
            f"Found {len(columns)} columns in {schema_name}.{table_name}"
        )

        schema = {}
        for col in columns:
            col_name = col[0]
            data_type = col[1]
            char_len = col[2]
            num_precision = col[3]
            num_scale = col[4]
            nullable = col[5] == "YES"
            default = col[6]
            is_pk = col[7] == "YES"

            # Format type string
            type_str = data_type
            if char_len:
                type_str += f"({char_len})"
            elif num_precision:
                if num_scale:
                    type_str += f"({num_precision},{num_scale})"
                else:
                    type_str += f"({num_precision})"

            schema[col_name] = {
                "data_type": type_str,
                "nullable": nullable,
                "is_primary_key": is_pk,
                "default": default if default else None
            }
            logger.debug(
                f"Column: {col_name}, type: {type_str}, "
                f"nullable: {nullable}, pk: {is_pk}"
            )

        return [schema]
    except Exception as e:
        error_msg = f"Error retrieving schema: {str(e)}"
        logger.error(error_msg, exc_info=True)
        return [{"error": error_msg}]

Comprehensive schema information: - Data types: Including length/precision/scale - Nullability: Whether NULL values allowed - Primary keys: Identifies join columns - Defaults: Column default values


12. MCP Tool: Execute Polars SQL (Files)

query_description = """Polars SQL query. MUST use 'self' as table name. 
Supports: avg, count, sum, max, min, where, group by, order by, joins, etc."""

@mcp.tool()
def execute_polars_sql(
    file_locations: List[str],
    query: str = Field(description=query_description),
    file_type: str = Field(default="csv", description="csv or parquet"),
) -> List[Dict[str, Any]]:
    """
    Execute SQL query on CSV or Parquet files using Polars.

    **Use for file-based datasets only.** 
    For database tables, use execute_database_query().

    **CRITICAL:** Query MUST use 'self' as table name, NOT the filename.

    Example: execute_polars_sql(
        file_locations=["data/employees.csv"],
        query="SELECT COUNT(*) FROM self WHERE IsActive = true"
    )
    """
    business_function = get_current_business_function()
    logger.info(
        f"execute_polars_sql called for business_function: {business_function}"
    )
    logger.info(f"File locations ({len(file_locations)}): {file_locations}")
    logger.info(f"Query: {query}")
    logger.debug(f"File type: {file_type}")

    # Validation: Check for common mistakes
    query_lower = query.lower()

    # Check if query references database schema (wrong tool)
    if "mcp." in query or "dbo." in query or any(
        schema in query for schema in ["mcp.", "dbo.", "hr.", "sales."]
    ):
        error_msg = (
            "WRONG TOOL: This query references a database table "
            "(schema.table format). Use execute_database_query() instead."
        )
        logger.error(error_msg)
        return [{
            "error": error_msg,
            "hint": "File queries must use 'self' as table name, "
                    "not schema.table",
            "your_query": query
        }]

    # Check if query uses 'self' (required for Polars)
    if "self" not in query_lower:
        error_msg = (
            "INVALID QUERY: Polars SQL queries must use 'self' "
            "as the table name."
        )
        logger.error(error_msg)
        return [{
            "error": error_msg,
            "hint": "Change your query to: SELECT ... FROM self WHERE ...",
            "your_query": query
        }]

    # Enforce that all file_locations are within business function folder
    allowed_prefix = os.path.abspath(
        f"/mnt/azure/mcp/{business_function}/"
    )
    for file_location in file_locations:
        if not os.path.abspath(file_location).startswith(allowed_prefix):
            error_msg = (
                f"Access denied: file_location '{file_location}' is not "
                f"within the allowed business_function folder "
                f"'{allowed_prefix}'. You may only query files in your "
                "assigned business_function folder."
            )
            logger.error(error_msg)
            return [{"error": error_msg, "your_query": query}]

    try:
        logger.debug("Reading files into DataFrame")
        df = read_file_list(file_locations, file_type)
        logger.info(f"DataFrame loaded, shape: {df.shape}")

        logger.debug("Executing SQL query on DataFrame")
        op_df = df.sql(query)
        logger.info(f"Query executed successfully, result shape: {op_df.shape}")

        output_records = op_df.to_dicts()
        logger.info(f"Returning {len(output_records)} records")
        logger.debug(
            f"First record (if any): "
            f"{output_records[0] if output_records else 'No records'}"
        )

        return output_records
    except Exception as e:
        error_msg = f"Query execution failed: {str(e)}"
        logger.error(error_msg, exc_info=True)
        return [{"error": error_msg, "your_query": query}]

Validation and security: - Tool detection: Catches wrong tool usage (database vs file queries) - Syntax validation: Ensures 'self' is used as table name - Path validation: Prevents access to files outside business function - Error handling: Returns helpful error messages to LLM


13. MCP Tool: Execute Database Query (SQL Server)

@mcp.tool()
def execute_database_query(
    query: str = Field(
        description="T-SQL query to execute on SQL Server database"
    ),
    max_rows: int = Field(
        default=1000, 
        description="Maximum rows to return (default: 1000, set to 0 for unlimited)"
    )
) -> List[Dict[str, Any]]:
    """
    Execute T-SQL queries on SQL Server database tables.

    **Use for database tables only.** 
    For CSV/Parquet files, use execute_polars_sql().

    **CRITICAL:** Use schema.table format (e.g., mcp.employees), NOT 'self'.

    Example: execute_database_query(
        query="SELECT COUNT(*) FROM mcp.vw_ADP_Employee_Daily_Snapshot 
               WHERE Position_Status = 'Active'"
    )

    Default limit: 1000 rows. Set max_rows=0 for unlimited.
    """
    business_function = get_current_business_function()
    logger.info(
        f"execute_database_query called for "
        f"business_function: {business_function}"
    )
    logger.info(f"Query: {query}")
    logger.debug(f"Max rows: {max_rows}")

    if not is_database_configured():
        logger.error("Database not configured")
        return [{"error": "No database connection configured."}]

    # Validation: Check for common mistakes
    query_lower = query.lower()

    # Check if query uses 'self' (wrong tool)
    if " self" in query_lower or "from self" in query_lower or \
       "join self" in query_lower:
        error_msg = (
            "WRONG TOOL: This query uses 'self' as table name. "
            "Use execute_polars_sql() for file-based queries."
        )
        logger.error(error_msg)
        return [{
            "error": error_msg,
            "hint": "Database queries must use schema.table format "
                    "(e.g., mcp.employees), not 'self'",
            "your_query": query
        }]

    # Check if query has schema.table format
    if "mcp." not in query_lower and "dbo." not in query_lower:
        error_msg = (
            "INVALID QUERY: Database queries must use schema.table format."
        )
        logger.error(error_msg)
        return [{
            "error": error_msg,
            "hint": "Use: SELECT ... FROM mcp.table_name or dbo.table_name",
            "your_query": query
        }]

    # Validate business_function in table name
    if f"{business_function}_" not in query_lower:
        logger.warning(
            f"Query does not appear to reference business_function "
            f"'{business_function}' tables"
        )

    try:
        conn = get_db_connection()
        cursor = conn.cursor()

        # Add TOP clause if not present and max_rows is set
        modified_query = query
        if max_rows > 0:
            query_upper = query.strip().upper()
            if not query_upper.startswith("SELECT TOP") and \
               query_upper.startswith("SELECT"):
                modified_query = query.strip()
                # Insert TOP clause after SELECT
                select_pos = modified_query.upper().find("SELECT")
                modified_query = (
                    modified_query[:select_pos + 6] + 
                    f" TOP {max_rows}" + 
                    modified_query[select_pos + 6:]
                )
                logger.debug(f"Added TOP {max_rows} clause to query")

        logger.debug(f"Executing modified query: {modified_query}")
        cursor.execute(modified_query)

        # Get column names
        columns = [column[0] for column in cursor.description]
        logger.debug(f"Result columns ({len(columns)}): {columns}")

        # Fetch results
        results = []
        row_count = 0
        for row in cursor.fetchall():
            row_count += 1
            # Convert values to JSON-serializable types
            row_dict = {}
            for col_name, value in zip(columns, row):
                # Handle datetime and other types
                if hasattr(value, 'isoformat'):
                    row_dict[col_name] = value.isoformat()
                elif value is None:
                    row_dict[col_name] = None
                else:
                    row_dict[col_name] = value
            results.append(row_dict)

        logger.info(f"Query executed successfully, returned {row_count} rows")
        if results:
            logger.debug(f"First row: {results[0]}")

        return results
    except Exception as e:
        error_msg = f"Query execution error: {str(e)}"
        logger.error(error_msg, exc_info=True)
        return [{"error": error_msg, "your_query": query}]

Key features: - Automatic row limiting: Adds TOP clause to prevent massive result sets - JSON serialization: Converts datetime and other types properly - Tool validation: Catches wrong tool usage (file vs database queries) - Security: Validates business function table naming


14. Main Entry Point and Server Startup

import uvicorn

def main():
    """Main entry point for MCP server"""
    logger.info("=" * 80)
    logger.info("MAIN: Starting single MCP Server on port 8000")
    logger.info("=" * 80)

    try:
        # Create the FastMCP HTTP app
        app = mcp.http_app()
        logger.info("FastMCP HTTP app created successfully")

        # Add the business function extraction middleware
        logger.info("Adding BusinessFunctionExtractionMiddleware to app")
        app.add_middleware(BusinessFunctionExtractionMiddleware)
        logger.info("Middleware added successfully")

        logger.info("Server configuration:")
        logger.info("  - Host: 0.0.0.0")
        logger.info("  - Port: 8000")
        logger.info("  - Functions supported: IT, HR, FIN")
        logger.info("  - Functions extraction: Via Query String (mcp?function=XXX)")
        logger.info("=" * 80)

        # Run the server - this blocks indefinitely
        logger.info("Starting Uvicorn server...")
        config = uvicorn.Config(
            app,
            host="0.0.0.0",
            port=8000,
            log_level="info",
            access_log=True,
            loop="asyncio",
            timeout_keep_alive=75
        )
        server = uvicorn.Server(config)
        server.run()

    except Exception as e:
        logger.critical(f"Failed to start server: {str(e)}", exc_info=True)
        raise
    finally:
        logger.info("=" * 80)
        logger.info("MAIN: MCP Server shutdown complete")
        logger.info("=" * 80)

if __name__ == "__main__":
    logger.info("=" * 80)
    logger.info("Script execution started - Single Process Mode")
    logger.info("=" * 80)
    try:
        main()
    except KeyboardInterrupt:
        logger.info("\n" + "=" * 80)
        logger.info("MAIN: Keyboard interrupt received, shutting down...")
        logger.info("=" * 80)
    except Exception as e:
        logger.critical(f"Fatal error in main execution: {str(e)}", exc_info=True)
        raise

Server configuration: - Port 8000: Standard HTTP port for MCP servers - Host 0.0.0.0: Listen on all network interfaces - Async event loop: Supports concurrent requests - Keep-alive timeout: 75 seconds for long-running queries - Access logging: All requests logged for auditing


Deployment: Azure Container Instance with TLS Offloading

Container Build and Deployment

1. Dockerfile:

FROM mcr.microsoft.com/devcontainers/python:3

# Keeps Python from generating .pyc files in the container
ENV PYTHONDONTWRITEBYTECODE=1 \
    PYTHONUNBUFFERED=1

# Install only necessary packages for MCP Analyst
RUN apt-get update && \
    apt-get install -y \
    build-essential \
    python3-dev \
    git \
    curl \
    cifs-utils && \
    apt-get clean && \
    rm -rf /var/lib/apt/lists/*

# Copy and install Python dependencies
COPY requirements.txt .
RUN python -m pip install --upgrade pip && \
    python -m pip install -r requirements.txt

# Add Microsoft GPG key
RUN curl -fsSL https://packages.microsoft.com/keys/microsoft.asc | gpg --dearmor -o /usr/share/keyrings/microsoft-prod.gpg

# Add Microsoft SQL Server repository
RUN curl https://packages.microsoft.com/config/debian/12/prod.list | tee /etc/apt/sources.list.d/mssql-release.list

# Install Microsoft SQL Server related packages
RUN apt-get update && \
    ACCEPT_EULA=Y apt-get install -y \
    msodbcsql18 \
    mssql-tools18

# Add MS SQL Server tools to PATH
RUN echo 'export PATH="$PATH:/opt/mssql-tools18/bin"' >> ~/.bashrc 

# Clean up
RUN apt-get clean && \
    rm -rf /var/lib/apt/lists/*


#Argument passed in via Command line 
ARG GITHUB_PAT

# Clone the private repository using the build argument
RUN git clone https://$GITHUB_PAT@github.com/GDEnergyproducts/GDEP-MCP-ANALYST.git /app

WORKDIR /app

RUN chmod +x scripts/container/mountstorage.sh

# Create a startup script that runs mcp server
RUN echo '#!/bin/bash\n\
set -e\n\
echo "Starting container initialization..."\n\
echo "Starting analyst.py..."\n\
python /app/src/analyst.py\n\
' > /app/start.sh && chmod +x /app/start.sh

CMD ["/app/start.sh"]

2. requirements.txt:

# Core MCP dependencies
polars>=0.20.0
pydantic>=2.0.0

# Azure dependencies (from your existing setup)
azure-identity
azure-keyvault-secrets

# Data processing
pandas
pyarrow>=14.0.0
pyodbc>=5.3.0

# Development/debugging tools
pytest
ipython

# Need this !!
fastmcp

TLS Offloading with FortiGate

Why TLS offloading at the edge? - SSL/TLS termination: FortiGate handles all encryption/decryption - Certificate management: Centralized SSL cert renewal - Performance: Offload crypto operations from container - Security: Web Application Firewall (WAF) protection - Inspection: Deep packet inspection for threats

FortiGate configuration (simplified):

config firewall vip
    edit "mcp-analyst-vip"
        set extip 203.0.113.10
        set mappedip 10.0.1.100  # Container internal IP
        set extintf "wan1"
        set portforward enable
        set extport 443
        set mappedport 8000
        set ssl-certificate "wildcard-cert"
    next
end

config firewall policy
    edit 1
        set name "Allow-MCP-HTTPS"
        set srcintf "wan1"
        set dstintf "internal"
        set srcaddr "all"
        set dstaddr "mcp-analyst-vip"
        set action accept
        set schedule "always"
        set service "HTTPS"
        set utm-status enable
        set ssl-ssh-profile "certificate-inspection"
        set av-profile "default"
        set webfilter-profile "default"
        set ips-sensor "default"
    next
end

Result: - External URL: https://mcp.gdenergyproducts.com (port 443) - Internal traffic: HTTP to container (port 8000) - TLS handled by FortiGate with corporate SSL certificate


Claude Desktop Configuration

To connect Claude Desktop to your MCP server:

  1. Open Claude Desktop and go to File → Settings.
  2. Select Connector, then click Add Connector.
  3. Enter the following URL (replace function=it as needed):
    https://mcp.gdenergyproducts.com/mcp?function=it
    
  4. Click Connect and go through the OAuth authentication process.
  5. If Claude Desktop does not restart automatically, restart it manually.
  6. After restart, go back to Configure and allow access as appropriate. If not, you will be prompted for access when each tool is used.

4. Start asking questions! - "How many Windows devices do we have?" - "Show me employees terminated in the last 90 days" - "What's our Office 365 license utilization?"


Security Best Practices

1. Authentication and Authorization

  • OAuth 2.0 (Azure Entra ID) only: No API keys or basic auth
  • Per-user context: Track who queries what
  • Business function isolation: Users can't cross boundaries
  • Audit logging: Every tool call logged with user identity

2. Data Protection

  • TLS in transit: All external traffic encrypted
  • No data exfiltration: Results returned only to authenticated user
  • Path validation: Prevents directory traversal
  • Query validation: Catches SQL injection attempts

3. Secrets Management

  • Azure Key Vault: All secrets stored securely
  • No hardcoded credentials: Injected at runtime
  • Least privilege: Service principals with minimal permissions
  • Secret rotation: Automated via Key Vault

4. Network Security

  • TLS offloading: FortiGate terminates SSL/TLS
  • WAF protection: Web Application Firewall blocks attacks
  • Internal-only database: No public internet exposure
  • VNet integration: Container in private virtual network

Performance and Scalability

Performance Characteristics

File queries (Polars): - 1 million rows: ~1-2 seconds - 10 million rows: ~5-10 seconds - Concurrent users: 20-50 simultaneous queries

Database queries (SQL Server): - Simple SELECT: <100ms - Complex joins: 1-5 seconds - Limited by database performance, not MCP server

Memory usage: - Base: ~200MB - Per concurrent query: +50-500MB (depends on data size) - Recommended: 4GB RAM for 10+ concurrent users

Scaling Strategies

Vertical scaling (single container): - Increase CPU/RAM allocation - Good for up to 50 concurrent users

Horizontal scaling (multiple containers): - Deploy behind Azure Load Balancer - Session affinity not required (stateless) - Scale to 100+ users

Database optimization: - Create indexed views for common queries - Partition large tables by business function - Use columnstore indexes for analytics


Monitoring and Troubleshooting

Key Metrics to Monitor

1. Application logs (/mnt/azure/logs/gdepmcp.log): - Tool call frequency and duration - Authentication failures - Query errors - User activity patterns

2. Container metrics (Azure Monitor): - CPU utilization - Memory usage - Network throughput - Container restart count

3. Database metrics (SQL Server DMVs): - Long-running queries - Connection pool usage - Lock contention - Index usage

Common Issues and Solutions

Issue: OAuth authentication fails Solution: - Verify accessTokenAcceptedVersion: 2 in manifest - Check redirect URIs include Claude callback URL - Ensure access_as_user scope is granted admin consent

Issue: "Access denied" errors for files Solution: - Verify business function in URL matches data folder - Check file permissions in Azure File Share - Validate path in get_schema() or execute_polars_sql()

Issue: Database queries timeout Solution: - Check network connectivity to SQL Server - Verify firewall allows container IP - Optimize query with indexes - Increase max_rows limit or add WHERE clause

Issue: Container crashes with OOM (Out of Memory) Solution: - Increase container memory allocation - Add pagination to large queries - Limit infer_schema_length in Polars - Review concurrent user load


Lessons Learned

What Worked Well

  1. OAuth 2.0 (Azure Entra ID) integration: Seamless enterprise SSO
  2. Multi-tenant architecture: Single server for all business functions
  3. Polars performance: Handles millions of rows effortlessly
  4. Data catalog: LLM generates correct queries consistently
  5. TLS offloading: Simplified container deployment
  6. Key Vault: Secure, centralized secrets management

Challenges and Solutions

  1. OAuth version gotcha: Manually edit manifest to v2.0
  2. Solution: Document clearly in setup guide

  3. Claude Desktop callback: Not in standard OAuth flow

  4. Solution: Add claude.ai/api/mcp/auth_callback to redirect URIs

  5. SAP delimiter detection: Semicolon vs comma

  6. Solution: Detect sap_ prefix in filename

  7. Connection pooling: Database connections timing out

  8. Solution: Implement health checks and reconnection logic

  9. Log file growth: Filled disk in production

  10. Solution: Use RotatingFileHandler with size limits

Future Enhancements

  1. Row-level security: Filter data based on user's Azure groups
  2. Query caching: Redis cache for repeated queries
  3. Rate limiting: Prevent abuse by individual users
  4. Advanced analytics: Support for Python/R code execution
  5. Mobile app: Native iOS/Android client with OAuth
  6. Real-time data: WebSocket streaming for live dashboards

Conclusion

The evolution from stdio-based to HTTPS-based MCP server represents a quantum leap in enterprise readiness:

  • From single-user to multi-tenant: One server, many users, isolated data
  • From no auth to OAuth 2.0 (Azure Entra ID): Enterprise-grade security and SSO
  • From local to cloud: Scalable, containerized deployment
  • From manual to automated: Zero per-user setup, instant access

This architecture enables true self-service analytics for business users while maintaining the security, governance, and scalability requirements of enterprise IT. By combining Azure Entra ID, containerization, TLS offloading, and multi-tenant data isolation, you can deliver LLM-powered analytics that business users love and IT teams trust.

The result: Business leaders can ask any question, across any dataset, with natural language—securely, instantly, and without waiting for IT.


References and Credits