Optimize mcp_use performance for production workloads
# ❌ Poor performance - all servers start immediately agent = MCPAgent(llm=llm, client=client, use_server_manager=False) # ✅ Better performance - servers start only when needed agent = MCPAgent(llm=llm, client=client, use_server_manager=True)
agent = MCPAgent( llm=llm, client=client, use_server_manager=True, max_concurrent_servers=3, # Limit to 3 active servers server_startup_timeout=30 # Faster timeout for stuck servers )
# Method 1: Whitelist specific tools agent = MCPAgent( llm=llm, client=client, allowed_tools=["file_read", "file_write", "web_search"], use_server_manager=True ) # Method 2: Blacklist problematic tools agent = MCPAgent( llm=llm, client=client, disallowed_tools=["system_execute", "dangerous_operation"], use_server_manager=True ) # Method 3: Server-level filtering agent = MCPAgent( llm=llm, client=client, allowed_servers=["filesystem", "playwright"], # Only these servers use_server_manager=True )
from functools import lru_cache import hashlib class CachedMCPAgent(MCPAgent): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._tool_cache = {} @lru_cache(maxsize=100) def _cache_key(self, tool_name: str, inputs: str) -> str: """Generate cache key for tool execution""" content = f"{tool_name}:{inputs}" return hashlib.md5(content.encode()).hexdigest() async def _execute_tool_cached(self, tool_name: str, inputs: dict): """Execute tool with caching""" cache_key = self._cache_key(tool_name, str(sorted(inputs.items()))) if cache_key in self._tool_cache: return self._tool_cache[cache_key] result = await super()._execute_tool(tool_name, inputs) self._tool_cache[cache_key] = result return result # Usage agent = CachedMCPAgent(llm=llm, client=client, use_server_manager=True)
# Fastest llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.1) # Balanced llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.1) # Most capable (slower) llm = ChatOpenAI(model="gpt-4o", temperature=0.1)
llm = ChatOpenAI( model="gpt-4o-mini", temperature=0.1, # Lower temperature for more focused responses max_tokens=500, # Limit response length streaming=True, # Enable streaming for perceived speed request_timeout=30, # Reasonable timeout max_retries=2 # Limit retry attempts )
import asyncio from langchain_openai import ChatOpenAI class PooledLLM: def __init__(self, model="gpt-4o-mini", pool_size=5): self.pool = asyncio.Queue(maxsize=pool_size) for _ in range(pool_size): llm = ChatOpenAI(model=model, temperature=0.1) self.pool.put_nowait(llm) async def get_llm(self): return await self.pool.get() async def return_llm(self, llm): await self.pool.put(llm) # Usage llm_pool = PooledLLM(pool_size=3) async def create_agent(): llm = await llm_pool.get_llm() try: agent = MCPAgent(llm=llm, client=client, use_server_manager=True) return agent finally: await llm_pool.return_llm(llm)
{ "mcpServers": { "playwright": { "command": "npx", "args": [ "@playwright/mcp@latest", "--headless=true", "--timeout=10000" ], "env": { "PLAYWRIGHT_BROWSERS_PATH": "/opt/playwright", "PLAYWRIGHT_SKIP_BROWSER_DOWNLOAD": "true" } }, "filesystem": { "command": "mcp-server-filesystem", "args": [ "/workspace", "--readonly=false", "--max-file-size=10MB" ] } } }
# Node.js optimization export NODE_ENV=production export NODE_OPTIONS="--max-old-space-size=2048" # Python optimization export PYTHONOPTIMIZE=2 export PYTHONDONTWRITEBYTECODE=1 # MCP-specific export MCP_TIMEOUT=30 export MCP_MAX_RETRIES=2
import psutil import asyncio class MemoryMonitor: def __init__(self, threshold_mb=500): self.threshold_mb = threshold_mb self.initial_memory = psutil.Process().memory_info().rss / 1024 / 1024 def check_memory(self): current_memory = psutil.Process().memory_info().rss / 1024 / 1024 memory_used = current_memory - self.initial_memory if memory_used > self.threshold_mb: print(f"Warning: Memory usage {memory_used:.1f}MB exceeds threshold") return memory_used # Usage monitor = MemoryMonitor(threshold_mb=300) async def run_agent_with_monitoring(): agent = MCPAgent(llm=llm, client=client, use_server_manager=True) result = await agent.run("Your query here") memory_used = monitor.check_memory() return result, memory_used
import gc import asyncio async def run_multiple_queries(queries): agent = MCPAgent(llm=llm, client=client, use_server_manager=True) results = [] for i, query in enumerate(queries): result = await agent.run(query) results.append(result) # Garbage collect every 10 queries if i % 10 == 0: gc.collect() return results
async def process_queries_concurrently(queries, max_concurrent=3): semaphore = asyncio.Semaphore(max_concurrent) async def process_single_query(query): async with semaphore: agent = MCPAgent(llm=llm, client=client, use_server_manager=True) return await agent.run(query) tasks = [process_single_query(query) for query in queries] results = await asyncio.gather(*tasks, return_exceptions=True) return results # Usage queries = ["Query 1", "Query 2", "Query 3", "Query 4", "Query 5"] results = await process_queries_concurrently(queries, max_concurrent=2)
class AgentPool: def __init__(self, pool_size=3): self.pool_size = pool_size self.clients = [] self.available_clients = asyncio.Queue() async def initialize(self): for _ in range(self.pool_size): client = MCPClient.from_config_file("config.json") await client.create_all_sessions() # Pre-create sessions self.clients.append(client) await self.available_clients.put(client) async def get_agent(self): client = await self.available_clients.get() llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.1) return MCPAgent(llm=llm, client=client, use_server_manager=True) async def return_client(self, agent): await self.available_clients.put(agent.client) # Usage pool = AgentPool(pool_size=3) await pool.initialize() agent = await pool.get_agent() result = await agent.run("Your query") await pool.return_client(agent)
FROM python:3.9-slim # Install Node.js for MCP servers RUN apt-get update && apt-get install -y \ nodejs npm \ && rm -rf /var/lib/apt/lists/* # Set working directory WORKDIR /app # Copy requirements first for better caching COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # Pre-install common MCP servers RUN npm install -g @playwright/mcp playwright # Copy application COPY . . # Set performance environment variables ENV NODE_ENV=production ENV PYTHONOPTIMIZE=2 ENV PYTHONDONTWRITEBYTECODE=1 # Run with limited resources CMD ["python", "-O", "main.py"]
apiVersion: autoscaling/v2 kind: HorizontalPodAutoscaler metadata: name: mcp-use-hpa spec: scaleTargetRef: apiVersion: apps/v1 kind: Deployment name: mcp-use-app minReplicas: 2 maxReplicas: 10 metrics: - type: Resource resource: name: cpu target: type: Utilization averageUtilization: 70 - type: Resource resource: name: memory target: type: Utilization averageUtilization: 80
import time from dataclasses import dataclass from typing import List @dataclass class PerformanceMetrics: query_time: float server_startup_time: float tool_execution_time: float memory_usage_mb: float tools_used: List[str] class PerformanceTracker: def __init__(self): self.metrics = [] async def track_agent_run(self, agent, query): start_time = time.time() initial_memory = psutil.Process().memory_info().rss / 1024 / 1024 # Track server startup time server_start = time.time() if hasattr(agent, 'client'): await agent.client.ensure_connected() server_startup_time = time.time() - server_start # Run query result = await agent.run(query) # Calculate metrics total_time = time.time() - start_time final_memory = psutil.Process().memory_info().rss / 1024 / 1024 metrics = PerformanceMetrics( query_time=total_time, server_startup_time=server_startup_time, tool_execution_time=total_time - server_startup_time, memory_usage_mb=final_memory - initial_memory, tools_used=getattr(agent, '_tools_used', []) ) self.metrics.append(metrics) return result, metrics def get_average_metrics(self): if not self.metrics: return None return { 'avg_query_time': sum(m.query_time for m in self.metrics) / len(self.metrics), 'avg_memory_usage': sum(m.memory_usage_mb for m in self.metrics) / len(self.metrics), 'avg_tools_per_query': sum(len(m.tools_used) for m in self.metrics) / len(self.metrics) } # Usage tracker = PerformanceTracker() result, metrics = await tracker.track_agent_run(agent, "Your query") print(f"Query completed in {metrics.query_time:.2f}s using {metrics.memory_usage_mb:.1f}MB")
Slow agent startup
use_server_manager=True
High memory usage
max_concurrent_servers=3
Tool execution timeouts
timeout=60
Was this page helpful?