Security is crucial when working with MCP servers and LLM agents. This guide covers best practices for protecting your applications, data, and infrastructure.

Important: MCP servers can have powerful capabilities including file system access, network requests, and code execution. Always follow security best practices to protect your systems.

API Key Management

Environment Variables

Never hardcode API keys in your source code. Use environment variables:

secure_config.py
import os
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

# Access API keys securely
openai_key = os.getenv("OPENAI_API_KEY")
anthropic_key = os.getenv("ANTHROPIC_API_KEY")

if not openai_key:
    raise ValueError("OPENAI_API_KEY environment variable is required")

.env File Security

Create a secure .env file:

.env
# LLM Provider Keys
OPENAI_API_KEY=sk-...
ANTHROPIC_API_KEY=sk-ant-...
GROQ_API_KEY=gsk_...

# MCP Server Configuration
FILESYSTEM_ROOT=/safe/workspace
DATABASE_URL=postgresql://user:pass@localhost/db

# Optional: Security settings
MCP_TIMEOUT=30
MAX_TOOL_CALLS=10
ALLOWED_DOMAINS=example.com,api.service.com

Never commit .env files: Add .env to your .gitignore file to prevent accidentally committing API keys to version control.

Secrets Management

For production environments, use proper secrets management:

import boto3
from botocore.exceptions import ClientError

def get_secret(secret_name, region_name="us-east-1"):
    session = boto3.session.Session()
    client = session.client('secretsmanager', region_name=region_name)

    try:
        response = client.get_secret_value(SecretId=secret_name)
        return response['SecretString']
    except ClientError as e:
        raise e

# Usage
openai_key = get_secret("prod/mcp-use/openai-key")

MCP Server Security

Filesystem Server Security

When using filesystem servers, restrict access to safe directories:

secure_filesystem_config.json
{
  "mcpServers": {
    "filesystem": {
      "command": "mcp-server-filesystem",
      "args": [
        "/workspace/safe-directory",
        "--readonly",
        "--max-file-size", "10MB",
        "--allowed-extensions", ".txt,.md,.json,.py"
      ],
      "env": {
        "FILESYSTEM_READONLY": "true",
        "MAX_FILE_SIZE": "10485760"
      }
    }
  }
}

Network Access Restrictions

Limit network access for web-based MCP servers:

secure_network_config.json
{
  "mcpServers": {
    "playwright": {
      "command": "npx",
      "args": ["@playwright/mcp@latest"],
      "env": {
        "PLAYWRIGHT_HEADLESS": "true",
        "ALLOWED_DOMAINS": "example.com,api.trusted-service.com",
        "BLOCK_PRIVATE_IPS": "true",
        "DISABLE_JAVASCRIPT": "false",
        "TIMEOUT": "30000"
      }
    }
  }
}

Database Security

Secure database connections with proper credentials and restrictions:

secure_database_config.json
{
  "mcpServers": {
    "postgres": {
      "command": "mcp-server-postgres",
      "env": {
        "DATABASE_URL": "${DATABASE_URL}",
        "CONNECTION_TIMEOUT": "30",
        "MAX_CONNECTIONS": "5",
        "READONLY_MODE": "true",
        "ALLOWED_SCHEMAS": "public,reporting",
        "BLOCKED_TABLES": "users,passwords,secrets"
      }
    }
  }
}

Agent Security Configuration

Restrict Tool Access

Limit which tools the agent can use:

secure_agent.py
from mcp_use import MCPAgent, MCPClient
from langchain_openai import ChatOpenAI

# Define allowed and disallowed tools
ALLOWED_TOOLS = [
    "file_read",
    "file_write",
    "web_search",
    "web_scrape"
]

DISALLOWED_TOOLS = [
    "system_execute",
    "network_request",
    "database_write",
    "file_delete"
]

async def create_secure_agent():
    client = MCPClient.from_config_file("secure_config.json")
    llm = ChatOpenAI(model="gpt-4")

    agent = MCPAgent(
        llm=llm,
        client=client,
        allowed_tools=ALLOWED_TOOLS,
        disallowed_tools=DISALLOWED_TOOLS,
        max_steps=20,  # Limit execution steps
        timeout=300,   # 5-minute timeout
        use_server_manager=True
    )

    return agent

Input Validation

Validate user inputs before processing:

input_validation.py
import re
from typing import List, Optional

class InputValidator:
    def __init__(self):
        self.max_length = 1000
        self.blocked_patterns = [
            r'rm\s+-rf',          # Dangerous commands
            r'sudo',              # Privilege escalation
            r'chmod\s+777',       # Permission changes
            r'\.\./',             # Path traversal
            r'<script',           # XSS attempts
            r'DROP\s+TABLE',      # SQL injection
        ]

    def validate_query(self, query: str) -> tuple[bool, Optional[str]]:
        """Validate user query for security issues"""

        # Check length
        if len(query) > self.max_length:
            return False, f"Query too long (max {self.max_length} characters)"

        # Check for blocked patterns
        for pattern in self.blocked_patterns:
            if re.search(pattern, query, re.IGNORECASE):
                return False, f"Query contains blocked pattern: {pattern}"

        # Check for suspicious characters
        suspicious_chars = ['&', '|', ';', '`', '$']
        if any(char in query for char in suspicious_chars):
            return False, "Query contains potentially dangerous characters"

        return True, None

# Usage
validator = InputValidator()

async def secure_query_handler(user_query: str):
    is_valid, error = validator.validate_query(user_query)

    if not is_valid:
        raise ValueError(f"Invalid query: {error}")

    agent = await create_secure_agent()
    return await agent.run(user_query)

Rate Limiting

Implement rate limiting to prevent abuse:

rate_limiting.py
import time
from collections import defaultdict
from typing import Dict, Tuple

class RateLimiter:
    def __init__(self, max_requests: int = 10, window_seconds: int = 60):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.requests: Dict[str, List[float]] = defaultdict(list)

    def is_allowed(self, user_id: str) -> Tuple[bool, str]:
        """Check if user is within rate limits"""
        now = time.time()
        window_start = now - self.window_seconds

        # Clean old requests
        self.requests[user_id] = [
            req_time for req_time in self.requests[user_id]
            if req_time > window_start
        ]

        # Check if under limit
        if len(self.requests[user_id]) >= self.max_requests:
            return False, f"Rate limit exceeded: {self.max_requests} requests per {self.window_seconds} seconds"

        # Record this request
        self.requests[user_id].append(now)
        return True, ""

# Usage
rate_limiter = RateLimiter(max_requests=5, window_seconds=60)

async def rate_limited_query(user_id: str, query: str):
    allowed, message = rate_limiter.is_allowed(user_id)

    if not allowed:
        raise ValueError(message)

    agent = await create_secure_agent()
    return await agent.run(query)

Logging and Monitoring

Security Logging

Implement comprehensive security logging:

security_logging.py
import logging
import json
from datetime import datetime
from typing import Any, Dict

class SecurityLogger:
    def __init__(self, log_file: str = "security.log"):
        self.logger = logging.getLogger("mcp_security")
        handler = logging.FileHandler(log_file)
        formatter = logging.Formatter(
            '%(asctime)s - %(levelname)s - %(message)s'
        )
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)
        self.logger.setLevel(logging.INFO)

    def log_agent_start(self, user_id: str, query: str):
        """Log when an agent starts processing"""
        self.logger.info(f"Agent started - User: {user_id}, Query: {query[:100]}...")

    def log_tool_usage(self, user_id: str, tool_name: str, success: bool):
        """Log tool usage attempts"""
        status = "SUCCESS" if success else "FAILED"
        self.logger.info(f"Tool used - User: {user_id}, Tool: {tool_name}, Status: {status}")

    def log_security_violation(self, user_id: str, violation_type: str, details: str):
        """Log security violations"""
        self.logger.warning(f"SECURITY VIOLATION - User: {user_id}, Type: {violation_type}, Details: {details}")

    def log_error(self, user_id: str, error: str):
        """Log errors"""
        self.logger.error(f"Error - User: {user_id}, Error: {error}")

# Usage
security_logger = SecurityLogger()

async def monitored_agent_run(user_id: str, query: str):
    security_logger.log_agent_start(user_id, query)

    try:
        agent = await create_secure_agent()
        result = await agent.run(query)
        security_logger.log_tool_usage(user_id, "agent_complete", True)
        return result
    except Exception as e:
        security_logger.log_error(user_id, str(e))
        raise

Monitoring Dashboard

Create monitoring for security events:

monitoring.py
from prometheus_client import Counter, Histogram, start_http_server
import time

# Metrics
REQUEST_COUNT = Counter('mcp_requests_total', 'Total requests', ['user_id', 'status'])
REQUEST_DURATION = Histogram('mcp_request_duration_seconds', 'Request duration')
SECURITY_VIOLATIONS = Counter('mcp_security_violations_total', 'Security violations', ['type'])

async def monitored_agent_execution(user_id: str, query: str):
    start_time = time.time()

    try:
        # Your existing security checks
        is_valid, error = validator.validate_query(query)
        if not is_valid:
            SECURITY_VIOLATIONS.labels(type='invalid_query').inc()
            raise ValueError(error)

        allowed, message = rate_limiter.is_allowed(user_id)
        if not allowed:
            SECURITY_VIOLATIONS.labels(type='rate_limit').inc()
            raise ValueError(message)

        # Execute agent
        agent = await create_secure_agent()
        result = await agent.run(query)

        REQUEST_COUNT.labels(user_id=user_id, status='success').inc()
        return result

    except Exception as e:
        REQUEST_COUNT.labels(user_id=user_id, status='error').inc()
        raise
    finally:
        REQUEST_DURATION.observe(time.time() - start_time)

# Start metrics server
start_http_server(8000)

Production Deployment Security

Container Security

Use secure container configurations:

Dockerfile
FROM python:3.9-slim

# Create non-root user
RUN groupadd -r mcpuser && useradd -r -g mcpuser mcpuser

# Install security updates
RUN apt-get update && apt-get upgrade -y && \
    apt-get install -y --no-install-recommends \
    ca-certificates && \
    rm -rf /var/lib/apt/lists/*

# Set working directory
WORKDIR /app

# Copy requirements and install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY . .

# Set ownership and permissions
RUN chown -R mcpuser:mcpuser /app
USER mcpuser

# Expose port
EXPOSE 8000

# Run application
CMD ["python", "main.py"]

Network Security

Configure network policies and firewalls:

kubernetes_network_policy.yaml
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: mcp-use-policy
spec:
  podSelector:
    matchLabels:
      app: mcp-use
  policyTypes:
  - Ingress
  - Egress
  ingress:
  - from:
    - podSelector:
        matchLabels:
          app: frontend
    ports:
    - protocol: TCP
      port: 8000
  egress:
  - to: []
    ports:
    - protocol: TCP
      port: 443  # HTTPS only
  - to:
    - namespaceSelector:
        matchLabels:
          name: database
    ports:
    - protocol: TCP
      port: 5432

Security Checklist

Common Security Vulnerabilities

Path Traversal Prevention

import os
from pathlib import Path

def secure_file_path(base_dir: str, user_path: str) -> str:
    """Safely resolve user-provided file paths"""
    base = Path(base_dir).resolve()
    target = (base / user_path).resolve()

    # Ensure the target is within the base directory
    if not str(target).startswith(str(base)):
        raise ValueError("Path traversal attempt detected")

    return str(target)

# Usage in MCP server configuration
safe_workspace = secure_file_path("/workspace", user_provided_path)

Command Injection Prevention

import shlex

def secure_command_args(command: str, args: List[str]) -> List[str]:
    """Safely construct command arguments"""
    # Whitelist allowed commands
    allowed_commands = ["node", "python", "npm", "pip"]

    if command not in allowed_commands:
        raise ValueError(f"Command '{command}' not allowed")

    # Escape arguments
    safe_args = [shlex.quote(arg) for arg in args]

    return [command] + safe_args

Next Steps

Security is an ongoing process. Regularly review and update your security practices, monitor for new vulnerabilities, and keep all dependencies up to date.