This guide covers performance optimization techniques for mcp_use deployments, from basic tuning to advanced scaling strategies.

Server Management Optimization

Enable Server Manager

The most impactful performance optimization is enabling the server manager:

# ❌ Poor performance - all servers start immediately
agent = MCPAgent(llm=llm, client=client, use_server_manager=False)

# ✅ Better performance - servers start only when needed
agent = MCPAgent(llm=llm, client=client, use_server_manager=True)

Benefits:

  • Lazy loading: Servers start only when their tools are needed
  • Resource efficiency: Lower memory and CPU usage
  • Faster startup: Agent initialization completes quickly

Limit Concurrent Servers

Control resource usage by limiting concurrent server connections:

agent = MCPAgent(
    llm=llm,
    client=client,
    use_server_manager=True,
    max_concurrent_servers=3,  # Limit to 3 active servers
    server_startup_timeout=30  # Faster timeout for stuck servers
)

Tool Optimization

Restrict Tool Access

Reduce decision complexity by limiting available tools:

# Method 1: Whitelist specific tools
agent = MCPAgent(
    llm=llm,
    client=client,
    allowed_tools=["file_read", "file_write", "web_search"],
    use_server_manager=True
)

# Method 2: Blacklist problematic tools
agent = MCPAgent(
    llm=llm,
    client=client,
    disallowed_tools=["system_execute", "dangerous_operation"],
    use_server_manager=True
)

# Method 3: Server-level filtering
agent = MCPAgent(
    llm=llm,
    client=client,
    allowed_servers=["filesystem", "playwright"],  # Only these servers
    use_server_manager=True
)

Tool Caching

Implement tool result caching for expensive operations:

from functools import lru_cache
import hashlib

class CachedMCPAgent(MCPAgent):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._tool_cache = {}

    @lru_cache(maxsize=100)
    def _cache_key(self, tool_name: str, inputs: str) -> str:
        """Generate cache key for tool execution"""
        content = f"{tool_name}:{inputs}"
        return hashlib.md5(content.encode()).hexdigest()

    async def _execute_tool_cached(self, tool_name: str, inputs: dict):
        """Execute tool with caching"""
        cache_key = self._cache_key(tool_name, str(sorted(inputs.items())))

        if cache_key in self._tool_cache:
            return self._tool_cache[cache_key]

        result = await super()._execute_tool(tool_name, inputs)
        self._tool_cache[cache_key] = result
        return result

# Usage
agent = CachedMCPAgent(llm=llm, client=client, use_server_manager=True)

LLM Optimization

Choose Faster Models

Balance capability with speed:

# Fastest
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.1)

# Balanced
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.1)

# Most capable (slower)
llm = ChatOpenAI(model="gpt-4o", temperature=0.1)

Optimize LLM Parameters

llm = ChatOpenAI(
    model="gpt-4o-mini",
    temperature=0.1,  # Lower temperature for more focused responses
    max_tokens=500,   # Limit response length
    streaming=True,   # Enable streaming for perceived speed
    request_timeout=30,  # Reasonable timeout
    max_retries=2     # Limit retry attempts
)

Connection Pooling

For high-throughput scenarios, implement connection pooling:

import asyncio
from langchain_openai import ChatOpenAI

class PooledLLM:
    def __init__(self, model="gpt-4o-mini", pool_size=5):
        self.pool = asyncio.Queue(maxsize=pool_size)
        for _ in range(pool_size):
            llm = ChatOpenAI(model=model, temperature=0.1)
            self.pool.put_nowait(llm)

    async def get_llm(self):
        return await self.pool.get()

    async def return_llm(self, llm):
        await self.pool.put(llm)

# Usage
llm_pool = PooledLLM(pool_size=3)

async def create_agent():
    llm = await llm_pool.get_llm()
    try:
        agent = MCPAgent(llm=llm, client=client, use_server_manager=True)
        return agent
    finally:
        await llm_pool.return_llm(llm)

Configuration Optimization

Server Configuration Tuning

Optimize individual server configurations:

{
  "mcpServers": {
    "playwright": {
      "command": "npx",
      "args": [
        "@playwright/mcp@latest",
        "--headless=true",
        "--timeout=10000"
      ],
      "env": {
        "PLAYWRIGHT_BROWSERS_PATH": "/opt/playwright",
        "PLAYWRIGHT_SKIP_BROWSER_DOWNLOAD": "true"
      }
    },
    "filesystem": {
      "command": "mcp-server-filesystem",
      "args": [
        "/workspace",
        "--readonly=false",
        "--max-file-size=10MB"
      ]
    }
  }
}

Environment Variables

Set performance-related environment variables:

# Node.js optimization
export NODE_ENV=production
export NODE_OPTIONS="--max-old-space-size=2048"

# Python optimization
export PYTHONOPTIMIZE=2
export PYTHONDONTWRITEBYTECODE=1

# MCP-specific
export MCP_TIMEOUT=30
export MCP_MAX_RETRIES=2

Memory Management

Monitor Memory Usage

Track memory consumption:

import psutil
import asyncio

class MemoryMonitor:
    def __init__(self, threshold_mb=500):
        self.threshold_mb = threshold_mb
        self.initial_memory = psutil.Process().memory_info().rss / 1024 / 1024

    def check_memory(self):
        current_memory = psutil.Process().memory_info().rss / 1024 / 1024
        memory_used = current_memory - self.initial_memory

        if memory_used > self.threshold_mb:
            print(f"Warning: Memory usage {memory_used:.1f}MB exceeds threshold")

        return memory_used

# Usage
monitor = MemoryMonitor(threshold_mb=300)

async def run_agent_with_monitoring():
    agent = MCPAgent(llm=llm, client=client, use_server_manager=True)

    result = await agent.run("Your query here")
    memory_used = monitor.check_memory()

    return result, memory_used

Garbage Collection

Force garbage collection for long-running processes:

import gc
import asyncio

async def run_multiple_queries(queries):
    agent = MCPAgent(llm=llm, client=client, use_server_manager=True)

    results = []
    for i, query in enumerate(queries):
        result = await agent.run(query)
        results.append(result)

        # Garbage collect every 10 queries
        if i % 10 == 0:
            gc.collect()

    return results

Async Optimization

Concurrent Processing

Process multiple queries concurrently:

async def process_queries_concurrently(queries, max_concurrent=3):
    semaphore = asyncio.Semaphore(max_concurrent)

    async def process_single_query(query):
        async with semaphore:
            agent = MCPAgent(llm=llm, client=client, use_server_manager=True)
            return await agent.run(query)

    tasks = [process_single_query(query) for query in queries]
    results = await asyncio.gather(*tasks, return_exceptions=True)

    return results

# Usage
queries = ["Query 1", "Query 2", "Query 3", "Query 4", "Query 5"]
results = await process_queries_concurrently(queries, max_concurrent=2)

Connection Reuse

Reuse MCP client connections:

class AgentPool:
    def __init__(self, pool_size=3):
        self.pool_size = pool_size
        self.clients = []
        self.available_clients = asyncio.Queue()

    async def initialize(self):
        for _ in range(self.pool_size):
            client = MCPClient.from_config_file("config.json")
            await client.create_all_sessions()  # Pre-create sessions
            self.clients.append(client)
            await self.available_clients.put(client)

    async def get_agent(self):
        client = await self.available_clients.get()
        llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.1)
        return MCPAgent(llm=llm, client=client, use_server_manager=True)

    async def return_client(self, agent):
        await self.available_clients.put(agent.client)

# Usage
pool = AgentPool(pool_size=3)
await pool.initialize()

agent = await pool.get_agent()
result = await agent.run("Your query")
await pool.return_client(agent)

Production Deployment

Docker Optimization

Optimize Docker configuration:

FROM python:3.9-slim

# Install Node.js for MCP servers
RUN apt-get update && apt-get install -y \
    nodejs npm \
    && rm -rf /var/lib/apt/lists/*

# Set working directory
WORKDIR /app

# Copy requirements first for better caching
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Pre-install common MCP servers
RUN npm install -g @playwright/mcp playwright

# Copy application
COPY . .

# Set performance environment variables
ENV NODE_ENV=production
ENV PYTHONOPTIMIZE=2
ENV PYTHONDONTWRITEBYTECODE=1

# Run with limited resources
CMD ["python", "-O", "main.py"]

Kubernetes Scaling

Configure horizontal pod autoscaling:

apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: mcp-use-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: mcp-use-app
  minReplicas: 2
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80

Monitoring and Profiling

Performance Metrics

Track key performance indicators:

import time
from dataclasses import dataclass
from typing import List

@dataclass
class PerformanceMetrics:
    query_time: float
    server_startup_time: float
    tool_execution_time: float
    memory_usage_mb: float
    tools_used: List[str]

class PerformanceTracker:
    def __init__(self):
        self.metrics = []

    async def track_agent_run(self, agent, query):
        start_time = time.time()
        initial_memory = psutil.Process().memory_info().rss / 1024 / 1024

        # Track server startup time
        server_start = time.time()
        if hasattr(agent, 'client'):
            await agent.client.ensure_connected()
        server_startup_time = time.time() - server_start

        # Run query
        result = await agent.run(query)

        # Calculate metrics
        total_time = time.time() - start_time
        final_memory = psutil.Process().memory_info().rss / 1024 / 1024

        metrics = PerformanceMetrics(
            query_time=total_time,
            server_startup_time=server_startup_time,
            tool_execution_time=total_time - server_startup_time,
            memory_usage_mb=final_memory - initial_memory,
            tools_used=getattr(agent, '_tools_used', [])
        )

        self.metrics.append(metrics)
        return result, metrics

    def get_average_metrics(self):
        if not self.metrics:
            return None

        return {
            'avg_query_time': sum(m.query_time for m in self.metrics) / len(self.metrics),
            'avg_memory_usage': sum(m.memory_usage_mb for m in self.metrics) / len(self.metrics),
            'avg_tools_per_query': sum(len(m.tools_used) for m in self.metrics) / len(self.metrics)
        }

# Usage
tracker = PerformanceTracker()
result, metrics = await tracker.track_agent_run(agent, "Your query")
print(f"Query completed in {metrics.query_time:.2f}s using {metrics.memory_usage_mb:.1f}MB")

Troubleshooting Performance Issues

Common Performance Problems

Next Steps

Start with enabling the server manager and restricting tools - these two changes alone can improve performance by 50-80% in most cases.