Security is crucial when working with MCP servers and LLM agents. This guide covers best practices for protecting your applications, data, and infrastructure.
Important : MCP servers can have powerful capabilities including file system access, network requests, and code execution. Always follow security best practices to protect your systems.
API Key Management
Environment Variables
Never hardcode API keys in your source code. Use environment variables:
import os
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
# Access API keys securely
openai_key = os.getenv( "OPENAI_API_KEY" )
anthropic_key = os.getenv( "ANTHROPIC_API_KEY" )
if not openai_key:
raise ValueError ( "OPENAI_API_KEY environment variable is required" )
.env File Security
Create a secure .env
file:
# LLM Provider Keys
OPENAI_API_KEY = sk-...
ANTHROPIC_API_KEY = sk-ant-...
GROQ_API_KEY = gsk_...
# MCP Server Configuration
FILESYSTEM_ROOT = /safe/workspace
DATABASE_URL = postgresql://user:pass@localhost/db
# Optional: Security settings
MCP_TIMEOUT = 30
MAX_TOOL_CALLS = 10
ALLOWED_DOMAINS = example.com,api.service.com
Never commit .env files : Add .env
to your .gitignore
file to prevent accidentally committing API keys to version control.
Secrets Management
For production environments, use proper secrets management:
AWS Secrets Manager Azure Key Vault HashiCorp Vault import boto3
from botocore.exceptions import ClientError
def get_secret ( secret_name , region_name = "us-east-1" ):
session = boto3.session.Session()
client = session.client( 'secretsmanager' , region_name = region_name)
try :
response = client.get_secret_value( SecretId = secret_name)
return response[ 'SecretString' ]
except ClientError as e:
raise e
# Usage
openai_key = get_secret( "prod/mcp-use/openai-key" )
import boto3
from botocore.exceptions import ClientError
def get_secret ( secret_name , region_name = "us-east-1" ):
session = boto3.session.Session()
client = session.client( 'secretsmanager' , region_name = region_name)
try :
response = client.get_secret_value( SecretId = secret_name)
return response[ 'SecretString' ]
except ClientError as e:
raise e
# Usage
openai_key = get_secret( "prod/mcp-use/openai-key" )
from azure.keyvault.secrets import SecretClient
from azure.identity import DefaultAzureCredential
credential = DefaultAzureCredential()
client = SecretClient(
vault_url = "https://vault-name.vault.azure.net/" ,
credential = credential
)
# Usage
openai_key = client.get_secret( "openai-api-key" ).value
import hvac
client = hvac.Client( url = 'https://vault.example.com' )
client.token = os.getenv( 'VAULT_TOKEN' )
# Read secret
response = client.secrets.kv.v2.read_secret_version(
path = 'mcp-use/api-keys'
)
openai_key = response[ 'data' ][ 'data' ][ 'openai_key' ]
MCP Server Security
Filesystem Server Security
When using filesystem servers, restrict access to safe directories:
secure_filesystem_config.json
{
"mcpServers" : {
"filesystem" : {
"command" : "mcp-server-filesystem" ,
"args" : [
"/workspace/safe-directory" ,
"--readonly" ,
"--max-file-size" , "10MB" ,
"--allowed-extensions" , ".txt,.md,.json,.py"
],
"env" : {
"FILESYSTEM_READONLY" : "true" ,
"MAX_FILE_SIZE" : "10485760"
}
}
}
}
Network Access Restrictions
Limit network access for web-based MCP servers:
secure_network_config.json
{
"mcpServers" : {
"playwright" : {
"command" : "npx" ,
"args" : [ "@playwright/mcp@latest" ],
"env" : {
"PLAYWRIGHT_HEADLESS" : "true" ,
"ALLOWED_DOMAINS" : "example.com,api.trusted-service.com" ,
"BLOCK_PRIVATE_IPS" : "true" ,
"DISABLE_JAVASCRIPT" : "false" ,
"TIMEOUT" : "30000"
}
}
}
}
Database Security
Secure database connections with proper credentials and restrictions:
secure_database_config.json
{
"mcpServers" : {
"postgres" : {
"command" : "mcp-server-postgres" ,
"env" : {
"DATABASE_URL" : "${DATABASE_URL}" ,
"CONNECTION_TIMEOUT" : "30" ,
"MAX_CONNECTIONS" : "5" ,
"READONLY_MODE" : "true" ,
"ALLOWED_SCHEMAS" : "public,reporting" ,
"BLOCKED_TABLES" : "users,passwords,secrets"
}
}
}
}
Agent Security Configuration
Limit which tools the agent can use:
from mcp_use import MCPAgent, MCPClient
from langchain_openai import ChatOpenAI
# Define allowed and disallowed tools
ALLOWED_TOOLS = [
"file_read" ,
"file_write" ,
"web_search" ,
"web_scrape"
]
DISALLOWED_TOOLS = [
"system_execute" ,
"network_request" ,
"database_write" ,
"file_delete"
]
async def create_secure_agent ():
client = MCPClient.from_config_file( "secure_config.json" )
llm = ChatOpenAI( model = "gpt-4" )
agent = MCPAgent(
llm = llm,
client = client,
allowed_tools = ALLOWED_TOOLS ,
disallowed_tools = DISALLOWED_TOOLS ,
max_steps = 20 , # Limit execution steps
timeout = 300 , # 5-minute timeout
use_server_manager = True
)
return agent
Validate user inputs before processing:
import re
from typing import List, Optional
class InputValidator :
def __init__ ( self ):
self .max_length = 1000
self .blocked_patterns = [
r 'rm \s + -rf' , # Dangerous commands
r 'sudo' , # Privilege escalation
r 'chmod \s + 777' , # Permission changes
r ' \.\. /' , # Path traversal
r '<script' , # XSS attempts
r 'DROP \s + TABLE' , # SQL injection
]
def validate_query ( self , query : str ) -> tuple[ bool , Optional[ str ]]:
"""Validate user query for security issues"""
# Check length
if len (query) > self .max_length:
return False , f "Query too long (max { self .max_length } characters)"
# Check for blocked patterns
for pattern in self .blocked_patterns:
if re.search(pattern, query, re. IGNORECASE ):
return False , f "Query contains blocked pattern: { pattern } "
# Check for suspicious characters
suspicious_chars = [ '&' , '|' , ';' , '`' , '$' ]
if any (char in query for char in suspicious_chars):
return False , "Query contains potentially dangerous characters"
return True , None
# Usage
validator = InputValidator()
async def secure_query_handler ( user_query : str ):
is_valid, error = validator.validate_query(user_query)
if not is_valid:
raise ValueError ( f "Invalid query: { error } " )
agent = await create_secure_agent()
return await agent.run(user_query)
Rate Limiting
Implement rate limiting to prevent abuse:
import time
from collections import defaultdict
from typing import Dict, Tuple
class RateLimiter :
def __init__ ( self , max_requests : int = 10 , window_seconds : int = 60 ):
self .max_requests = max_requests
self .window_seconds = window_seconds
self .requests: Dict[ str , List[ float ]] = defaultdict( list )
def is_allowed ( self , user_id : str ) -> Tuple[ bool , str ]:
"""Check if user is within rate limits"""
now = time.time()
window_start = now - self .window_seconds
# Clean old requests
self .requests[user_id] = [
req_time for req_time in self .requests[user_id]
if req_time > window_start
]
# Check if under limit
if len ( self .requests[user_id]) >= self .max_requests:
return False , f "Rate limit exceeded: { self .max_requests } requests per { self .window_seconds } seconds"
# Record this request
self .requests[user_id].append(now)
return True , ""
# Usage
rate_limiter = RateLimiter( max_requests = 5 , window_seconds = 60 )
async def rate_limited_query ( user_id : str , query : str ):
allowed, message = rate_limiter.is_allowed(user_id)
if not allowed:
raise ValueError (message)
agent = await create_secure_agent()
return await agent.run(query)
Logging and Monitoring
Security Logging
Implement comprehensive security logging:
import logging
import json
from datetime import datetime
from typing import Any, Dict
class SecurityLogger :
def __init__ ( self , log_file : str = "security.log" ):
self .logger = logging.getLogger( "mcp_security" )
handler = logging.FileHandler(log_file)
formatter = logging.Formatter(
' %(asctime)s - %(levelname)s - %(message)s '
)
handler.setFormatter(formatter)
self .logger.addHandler(handler)
self .logger.setLevel(logging. INFO )
def log_agent_start ( self , user_id : str , query : str ):
"""Log when an agent starts processing"""
self .logger.info( f "Agent started - User: { user_id } , Query: { query[: 100 ] } ..." )
def log_tool_usage ( self , user_id : str , tool_name : str , success : bool ):
"""Log tool usage attempts"""
status = "SUCCESS" if success else "FAILED"
self .logger.info( f "Tool used - User: { user_id } , Tool: { tool_name } , Status: { status } " )
def log_security_violation ( self , user_id : str , violation_type : str , details : str ):
"""Log security violations"""
self .logger.warning( f "SECURITY VIOLATION - User: { user_id } , Type: { violation_type } , Details: { details } " )
def log_error ( self , user_id : str , error : str ):
"""Log errors"""
self .logger.error( f "Error - User: { user_id } , Error: { error } " )
# Usage
security_logger = SecurityLogger()
async def monitored_agent_run ( user_id : str , query : str ):
security_logger.log_agent_start(user_id, query)
try :
agent = await create_secure_agent()
result = await agent.run(query)
security_logger.log_tool_usage(user_id, "agent_complete" , True )
return result
except Exception as e:
security_logger.log_error(user_id, str (e))
raise
Monitoring Dashboard
Create monitoring for security events:
from prometheus_client import Counter, Histogram, start_http_server
import time
# Metrics
REQUEST_COUNT = Counter( 'mcp_requests_total' , 'Total requests' , [ 'user_id' , 'status' ])
REQUEST_DURATION = Histogram( 'mcp_request_duration_seconds' , 'Request duration' )
SECURITY_VIOLATIONS = Counter( 'mcp_security_violations_total' , 'Security violations' , [ 'type' ])
async def monitored_agent_execution ( user_id : str , query : str ):
start_time = time.time()
try :
# Your existing security checks
is_valid, error = validator.validate_query(query)
if not is_valid:
SECURITY_VIOLATIONS .labels( type = 'invalid_query' ).inc()
raise ValueError (error)
allowed, message = rate_limiter.is_allowed(user_id)
if not allowed:
SECURITY_VIOLATIONS .labels( type = 'rate_limit' ).inc()
raise ValueError (message)
# Execute agent
agent = await create_secure_agent()
result = await agent.run(query)
REQUEST_COUNT .labels( user_id = user_id, status = 'success' ).inc()
return result
except Exception as e:
REQUEST_COUNT .labels( user_id = user_id, status = 'error' ).inc()
raise
finally :
REQUEST_DURATION .observe(time.time() - start_time)
# Start metrics server
start_http_server( 8000 )
Production Deployment Security
Container Security
Use secure container configurations:
FROM python:3.9-slim
# Create non-root user
RUN groupadd -r mcpuser && useradd -r -g mcpuser mcpuser
# Install security updates
RUN apt-get update && apt-get upgrade -y && \
apt-get install -y --no-install-recommends \
ca-certificates && \
rm -rf /var/lib/apt/lists/*
# Set working directory
WORKDIR /app
# Copy requirements and install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Set ownership and permissions
RUN chown -R mcpuser:mcpuser /app
USER mcpuser
# Expose port
EXPOSE 8000
# Run application
CMD [ "python" , "main.py" ]
Network Security
Configure network policies and firewalls:
kubernetes_network_policy.yaml
apiVersion : networking.k8s.io/v1
kind : NetworkPolicy
metadata :
name : mcp-use-policy
spec :
podSelector :
matchLabels :
app : mcp-use
policyTypes :
- Ingress
- Egress
ingress :
- from :
- podSelector :
matchLabels :
app : frontend
ports :
- protocol : TCP
port : 8000
egress :
- to : []
ports :
- protocol : TCP
port : 443 # HTTPS only
- to :
- namespaceSelector :
matchLabels :
name : database
ports :
- protocol : TCP
port : 5432
Security Checklist
Common Security Vulnerabilities
Path Traversal Prevention
import os
from pathlib import Path
def secure_file_path ( base_dir : str , user_path : str ) -> str :
"""Safely resolve user-provided file paths"""
base = Path(base_dir).resolve()
target = (base / user_path).resolve()
# Ensure the target is within the base directory
if not str (target).startswith( str (base)):
raise ValueError ( "Path traversal attempt detected" )
return str (target)
# Usage in MCP server configuration
safe_workspace = secure_file_path( "/workspace" , user_provided_path)
Command Injection Prevention
import shlex
def secure_command_args ( command : str , args : List[ str ]) -> List[ str ]:
"""Safely construct command arguments"""
# Whitelist allowed commands
allowed_commands = [ "node" , "python" , "npm" , "pip" ]
if command not in allowed_commands:
raise ValueError ( f "Command ' { command } ' not allowed" )
# Escape arguments
safe_args = [shlex.quote(arg) for arg in args]
return [command] + safe_args
Next Steps
Security is an ongoing process. Regularly review and update your security practices, monitor for new vulnerabilities, and keep all dependencies up to date.