This guide covers performance optimization techniques for mcp_use deployments, from basic tuning to advanced scaling strategies.
Server Management Optimization
Enable Server Manager
The most impactful performance optimization is enabling the server manager:
# ❌ Poor performance - all servers start immediately
agent = MCPAgent(llm=llm, client=client, use_server_manager=False)
# ✅ Better performance - servers start only when needed
agent = MCPAgent(llm=llm, client=client, use_server_manager=True)
Benefits:
- Lazy loading: Servers start only when their tools are needed
- Resource efficiency: Lower memory and CPU usage
- Faster startup: Agent initialization completes quickly
Limit Concurrent Servers
Control resource usage by limiting concurrent server connections:
agent = MCPAgent(
llm=llm,
client=client,
use_server_manager=True,
max_concurrent_servers=3, # Limit to 3 active servers
server_startup_timeout=30 # Faster timeout for stuck servers
)
Reduce decision complexity by limiting available tools:
# Method 1: Whitelist specific tools
agent = MCPAgent(
llm=llm,
client=client,
allowed_tools=["file_read", "file_write", "web_search"],
use_server_manager=True
)
# Method 2: Blacklist problematic tools
agent = MCPAgent(
llm=llm,
client=client,
disallowed_tools=["system_execute", "dangerous_operation"],
use_server_manager=True
)
# Method 3: Server-level filtering
agent = MCPAgent(
llm=llm,
client=client,
allowed_servers=["filesystem", "playwright"], # Only these servers
use_server_manager=True
)
Implement tool result caching for expensive operations:
from functools import lru_cache
import hashlib
class CachedMCPAgent(MCPAgent):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._tool_cache = {}
@lru_cache(maxsize=100)
def _cache_key(self, tool_name: str, inputs: str) -> str:
"""Generate cache key for tool execution"""
content = f"{tool_name}:{inputs}"
return hashlib.md5(content.encode()).hexdigest()
async def _execute_tool_cached(self, tool_name: str, inputs: dict):
"""Execute tool with caching"""
cache_key = self._cache_key(tool_name, str(sorted(inputs.items())))
if cache_key in self._tool_cache:
return self._tool_cache[cache_key]
result = await super()._execute_tool(tool_name, inputs)
self._tool_cache[cache_key] = result
return result
# Usage
agent = CachedMCPAgent(llm=llm, client=client, use_server_manager=True)
LLM Optimization
Choose Faster Models
Balance capability with speed:
# Fastest
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.1)
# Balanced
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.1)
# Most capable (slower)
llm = ChatOpenAI(model="gpt-4o", temperature=0.1)
# Fastest
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.1)
# Balanced
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.1)
# Most capable (slower)
llm = ChatOpenAI(model="gpt-4o", temperature=0.1)
# Fastest
llm = ChatAnthropic(model="claude-3-haiku-20240307", temperature=0.1)
# Balanced
llm = ChatAnthropic(model="claude-3-sonnet-20240229", temperature=0.1)
# Most capable (slower)
llm = ChatAnthropic(model="claude-3-opus-20240229", temperature=0.1)
# Very fast inference
llm = ChatGroq(
model="llama3-8b-8192", # Smaller, faster
temperature=0.1,
max_tokens=1000 # Limit response length
)
# Balanced
llm = ChatGroq(model="llama3-70b-8192", temperature=0.1)
Optimize LLM Parameters
llm = ChatOpenAI(
model="gpt-4o-mini",
temperature=0.1, # Lower temperature for more focused responses
max_tokens=500, # Limit response length
streaming=True, # Enable streaming for perceived speed
request_timeout=30, # Reasonable timeout
max_retries=2 # Limit retry attempts
)
Connection Pooling
For high-throughput scenarios, implement connection pooling:
import asyncio
from langchain_openai import ChatOpenAI
class PooledLLM:
def __init__(self, model="gpt-4o-mini", pool_size=5):
self.pool = asyncio.Queue(maxsize=pool_size)
for _ in range(pool_size):
llm = ChatOpenAI(model=model, temperature=0.1)
self.pool.put_nowait(llm)
async def get_llm(self):
return await self.pool.get()
async def return_llm(self, llm):
await self.pool.put(llm)
# Usage
llm_pool = PooledLLM(pool_size=3)
async def create_agent():
llm = await llm_pool.get_llm()
try:
agent = MCPAgent(llm=llm, client=client, use_server_manager=True)
return agent
finally:
await llm_pool.return_llm(llm)
Configuration Optimization
Server Configuration Tuning
Optimize individual server configurations:
{
"mcpServers": {
"playwright": {
"command": "npx",
"args": [
"@playwright/mcp@latest",
"--headless=true",
"--timeout=10000"
],
"env": {
"PLAYWRIGHT_BROWSERS_PATH": "/opt/playwright",
"PLAYWRIGHT_SKIP_BROWSER_DOWNLOAD": "true"
}
},
"filesystem": {
"command": "mcp-server-filesystem",
"args": [
"/workspace",
"--readonly=false",
"--max-file-size=10MB"
]
}
}
}
Environment Variables
Set performance-related environment variables:
# Node.js optimization
export NODE_ENV=production
export NODE_OPTIONS="--max-old-space-size=2048"
# Python optimization
export PYTHONOPTIMIZE=2
export PYTHONDONTWRITEBYTECODE=1
# MCP-specific
export MCP_TIMEOUT=30
export MCP_MAX_RETRIES=2
Memory Management
Monitor Memory Usage
Track memory consumption:
import psutil
import asyncio
class MemoryMonitor:
def __init__(self, threshold_mb=500):
self.threshold_mb = threshold_mb
self.initial_memory = psutil.Process().memory_info().rss / 1024 / 1024
def check_memory(self):
current_memory = psutil.Process().memory_info().rss / 1024 / 1024
memory_used = current_memory - self.initial_memory
if memory_used > self.threshold_mb:
print(f"Warning: Memory usage {memory_used:.1f}MB exceeds threshold")
return memory_used
# Usage
monitor = MemoryMonitor(threshold_mb=300)
async def run_agent_with_monitoring():
agent = MCPAgent(llm=llm, client=client, use_server_manager=True)
result = await agent.run("Your query here")
memory_used = monitor.check_memory()
return result, memory_used
Garbage Collection
Force garbage collection for long-running processes:
import gc
import asyncio
async def run_multiple_queries(queries):
agent = MCPAgent(llm=llm, client=client, use_server_manager=True)
results = []
for i, query in enumerate(queries):
result = await agent.run(query)
results.append(result)
# Garbage collect every 10 queries
if i % 10 == 0:
gc.collect()
return results
Async Optimization
Concurrent Processing
Process multiple queries concurrently:
async def process_queries_concurrently(queries, max_concurrent=3):
semaphore = asyncio.Semaphore(max_concurrent)
async def process_single_query(query):
async with semaphore:
agent = MCPAgent(llm=llm, client=client, use_server_manager=True)
return await agent.run(query)
tasks = [process_single_query(query) for query in queries]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
# Usage
queries = ["Query 1", "Query 2", "Query 3", "Query 4", "Query 5"]
results = await process_queries_concurrently(queries, max_concurrent=2)
Connection Reuse
Reuse MCP client connections:
class AgentPool:
def __init__(self, pool_size=3):
self.pool_size = pool_size
self.clients = []
self.available_clients = asyncio.Queue()
async def initialize(self):
for _ in range(self.pool_size):
client = MCPClient.from_config_file("config.json")
await client.create_all_sessions() # Pre-create sessions
self.clients.append(client)
await self.available_clients.put(client)
async def get_agent(self):
client = await self.available_clients.get()
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.1)
return MCPAgent(llm=llm, client=client, use_server_manager=True)
async def return_client(self, agent):
await self.available_clients.put(agent.client)
# Usage
pool = AgentPool(pool_size=3)
await pool.initialize()
agent = await pool.get_agent()
result = await agent.run("Your query")
await pool.return_client(agent)
Production Deployment
Docker Optimization
Optimize Docker configuration:
FROM python:3.9-slim
# Install Node.js for MCP servers
RUN apt-get update && apt-get install -y \
nodejs npm \
&& rm -rf /var/lib/apt/lists/*
# Set working directory
WORKDIR /app
# Copy requirements first for better caching
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Pre-install common MCP servers
RUN npm install -g @playwright/mcp playwright
# Copy application
COPY . .
# Set performance environment variables
ENV NODE_ENV=production
ENV PYTHONOPTIMIZE=2
ENV PYTHONDONTWRITEBYTECODE=1
# Run with limited resources
CMD ["python", "-O", "main.py"]
Kubernetes Scaling
Configure horizontal pod autoscaling:
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: mcp-use-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: mcp-use-app
minReplicas: 2
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
Monitoring and Profiling
Track key performance indicators:
import time
from dataclasses import dataclass
from typing import List
@dataclass
class PerformanceMetrics:
query_time: float
server_startup_time: float
tool_execution_time: float
memory_usage_mb: float
tools_used: List[str]
class PerformanceTracker:
def __init__(self):
self.metrics = []
async def track_agent_run(self, agent, query):
start_time = time.time()
initial_memory = psutil.Process().memory_info().rss / 1024 / 1024
# Track server startup time
server_start = time.time()
if hasattr(agent, 'client'):
await agent.client.ensure_connected()
server_startup_time = time.time() - server_start
# Run query
result = await agent.run(query)
# Calculate metrics
total_time = time.time() - start_time
final_memory = psutil.Process().memory_info().rss / 1024 / 1024
metrics = PerformanceMetrics(
query_time=total_time,
server_startup_time=server_startup_time,
tool_execution_time=total_time - server_startup_time,
memory_usage_mb=final_memory - initial_memory,
tools_used=getattr(agent, '_tools_used', [])
)
self.metrics.append(metrics)
return result, metrics
def get_average_metrics(self):
if not self.metrics:
return None
return {
'avg_query_time': sum(m.query_time for m in self.metrics) / len(self.metrics),
'avg_memory_usage': sum(m.memory_usage_mb for m in self.metrics) / len(self.metrics),
'avg_tools_per_query': sum(len(m.tools_used) for m in self.metrics) / len(self.metrics)
}
# Usage
tracker = PerformanceTracker()
result, metrics = await tracker.track_agent_run(agent, "Your query")
print(f"Query completed in {metrics.query_time:.2f}s using {metrics.memory_usage_mb:.1f}MB")
Next Steps
Start with enabling the server manager and restricting tools - these two changes alone can improve performance by 50-80% in most cases.