Getting Started
Client Configuration
Agent Configuration
Advanced Usage
Development
Troubleshooting
Streaming Agent Output
Implement real-time streaming of agent actions and responses
MCP-Use supports asynchronous streaming of agent output, allowing you to receive incremental results, tool actions, and intermediate steps as they are generated by the agent.
Basic Streaming
Use the astream
method to get real-time output from your agent:
import asyncio
from langchain_openai import ChatOpenAI
from mcp_use import MCPAgent, MCPClient
async def basic_streaming_example():
# Setup agent
config = {
"mcpServers": {
"playwright": {
"command": "npx",
"args": ["@playwright/mcp@latest"]
}
}
}
client = MCPClient.from_dict(config)
llm = ChatOpenAI(model="gpt-4")
agent = MCPAgent(llm=llm, client=client)
# Stream the agent's response
print("Agent is working...")
async for chunk in agent.astream("Search for the latest Python news and summarize it"):
print(chunk, end="", flush=True)
print("\n\nDone!")
if __name__ == "__main__":
asyncio.run(basic_streaming_example())
Advanced Streaming with Event Handling
For more control over the streaming output, you can handle different types of events:
import asyncio
from langchain_openai import ChatOpenAI
from mcp_use import MCPAgent, MCPClient
async def advanced_streaming_example():
config = {
"mcpServers": {
"playwright": {
"command": "npx",
"args": ["@playwright/mcp@latest"]
},
"filesystem": {
"command": "mcp-server-filesystem",
"args": ["/workspace"]
}
}
}
client = MCPClient.from_dict(config)
llm = ChatOpenAI(model="gpt-4")
agent = MCPAgent(llm=llm, client=client, use_server_manager=True)
query = """
Research the top 3 AI companies, create a comparison report,
and save it as a markdown file
"""
print("🤖 Starting agent task...")
print("-" * 50)
current_step = 1
async for event in agent.astream_events(query, version="v1"):
event_type = event.get("event")
data = event.get("data", {})
if event_type == "on_chat_model_start":
print(f"\n📝 Step {current_step}: Planning next action...")
elif event_type == "on_tool_start":
tool_name = data.get("input", {}).get("tool_name", "unknown")
print(f"\n🔧 Using tool: {tool_name}")
elif event_type == "on_tool_end":
print(" ✅ Tool completed")
current_step += 1
elif event_type == "on_chat_model_stream":
token = data.get("chunk", {}).get("content", "")
if token:
print(token, end="", flush=True)
elif event_type == "on_chain_end":
print(f"\n\n🎉 Task completed successfully!")
if __name__ == "__main__":
asyncio.run(advanced_streaming_example())
Real-time Progress Tracking
Create a progress tracker that shows the agent’s current status:
import asyncio
import time
from langchain_openai import ChatOpenAI
from mcp_use import MCPAgent, MCPClient
class ProgressTracker:
def __init__(self):
self.start_time = time.time()
self.current_tool = None
self.step_count = 0
self.tools_used = []
def update_tool(self, tool_name):
if tool_name != self.current_tool:
self.current_tool = tool_name
self.step_count += 1
self.tools_used.append(tool_name)
elapsed = time.time() - self.start_time
print(f"\n[{elapsed:.1f}s] Step {self.step_count}: Using {tool_name}")
def show_summary(self):
elapsed = time.time() - self.start_time
print(f"\n{'='*50}")
print(f"Task completed in {elapsed:.1f} seconds")
print(f"Steps taken: {self.step_count}")
print(f"Tools used: {', '.join(set(self.tools_used))}")
print(f"{'='*50}")
async def streaming_with_progress():
config = {
"mcpServers": {
"playwright": {
"command": "npx",
"args": ["@playwright/mcp@latest"]
}
}
}
client = MCPClient.from_dict(config)
llm = ChatOpenAI(model="gpt-4", streaming=True)
agent = MCPAgent(llm=llm, client=client)
tracker = ProgressTracker()
query = "Find information about the latest iPhone model and its key features"
print("🚀 Starting agent with real-time progress tracking...")
async for event in agent.astream_events(query, version="v1"):
event_type = event.get("event")
data = event.get("data", {})
if event_type == "on_tool_start":
tool_name = data.get("input", {}).get("tool_name")
if tool_name:
tracker.update_tool(tool_name)
elif event_type == "on_chat_model_stream":
chunk = data.get("chunk", {})
if hasattr(chunk, 'content') and chunk.content:
print(chunk.content, end="", flush=True)
elif event_type == "on_chain_end":
tracker.show_summary()
if __name__ == "__main__":
asyncio.run(streaming_with_progress())
Building a Streaming UI
Here’s an example of how you might build a simple console UI for streaming:
import asyncio
import sys
from datetime import datetime
from langchain_openai import ChatOpenAI
from mcp_use import MCPAgent, MCPClient
class StreamingUI:
def __init__(self):
self.current_thought = ""
self.tool_outputs = []
self.final_answer = ""
def clear_line(self):
"""Clear the current line in terminal"""
sys.stdout.write('\r\033[K')
def print_status(self, status, tool=None):
"""Print colored status updates"""
timestamp = datetime.now().strftime("%H:%M:%S")
if tool:
print(f"\033[94m[{timestamp}] {status}: {tool}\033[0m")
else:
print(f"\033[92m[{timestamp}] {status}\033[0m")
def print_thinking(self, text):
"""Print agent's reasoning in real-time"""
self.clear_line()
truncated = text[:80] + "..." if len(text) > 80 else text
sys.stdout.write(f"\033[93m💭 Thinking: {truncated}\033[0m")
sys.stdout.flush()
def print_tool_result(self, tool_name, result):
"""Print tool execution results"""
print(f"\n\033[96m🔧 {tool_name} result:\033[0m")
# Truncate long results
display_result = result[:200] + "..." if len(result) > 200 else result
print(f" {display_result}")
async def streaming_ui_example():
config = {
"mcpServers": {
"playwright": {
"command": "npx",
"args": ["@playwright/mcp@latest"]
}
}
}
client = MCPClient.from_dict(config)
llm = ChatOpenAI(model="gpt-4", streaming=True)
agent = MCPAgent(llm=llm, client=client)
ui = StreamingUI()
query = "What are the current trending topics on Hacker News?"
print("🤖 MCP Agent - Interactive Session")
print("=" * 50)
print(f"Query: {query}")
print("=" * 50)
current_tool = None
current_reasoning = ""
async for event in agent.astream_events(query, version="v1"):
event_type = event.get("event")
data = event.get("data", {})
if event_type == "on_chat_model_start":
ui.print_status("Starting to plan")
elif event_type == "on_chat_model_stream":
chunk = data.get("chunk", {})
if hasattr(chunk, 'content') and chunk.content:
current_reasoning += chunk.content
ui.print_thinking(current_reasoning)
elif event_type == "on_tool_start":
current_tool = data.get("input", {}).get("tool_name")
if current_tool:
print("\n") # New line after thinking
ui.print_status("Executing tool", current_tool)
current_reasoning = "" # Reset for next iteration
elif event_type == "on_tool_end":
output = data.get("output")
if current_tool and output:
ui.print_tool_result(current_tool, str(output))
elif event_type == "on_chain_end":
print("\n")
ui.print_status("Task completed!")
# Extract final answer
final_output = data.get("output")
if final_output:
print(f"\n\033[92m📋 Final Answer:\033[0m")
print(f"{final_output}")
if __name__ == "__main__":
asyncio.run(streaming_ui_example())
Web Streaming with FastAPI
For web applications, you can stream agent output using Server-Sent Events:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import json
from langchain_openai import ChatOpenAI
from mcp_use import MCPAgent, MCPClient
app = FastAPI()
async def create_agent():
config = {
"mcpServers": {
"playwright": {
"command": "npx",
"args": ["@playwright/mcp@latest"]
}
}
}
client = MCPClient.from_dict(config)
llm = ChatOpenAI(model="gpt-4", streaming=True)
return MCPAgent(llm=llm, client=client)
@app.get("/stream/{query}")
async def stream_agent_response(query: str):
"""Stream agent response using Server-Sent Events"""
async def event_generator():
agent = await create_agent()
async for event in agent.astream_events(query, version="v1"):
event_type = event.get("event")
data = event.get("data", {})
# Format as SSE
sse_data = {
"type": event_type,
"timestamp": time.time(),
"data": data
}
yield f"data: {json.dumps(sse_data)}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
event_generator(),
media_type="text/plain",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
}
)
@app.get("/")
async def root():
return {"message": "MCP Agent Streaming API"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Performance Considerations
Buffering Output
For better performance, you can buffer output:
async def buffered_streaming(agent, query, buffer_size=10):
buffer = []
async for chunk in agent.astream(query):
buffer.append(chunk)
if len(buffer) >= buffer_size:
yield ''.join(buffer)
buffer = []
# Yield remaining buffer
if buffer:
yield ''.join(buffer)
# Usage
async for buffered_chunk in buffered_streaming(agent, "Your query here"):
print(buffered_chunk, end="", flush=True)
Filtering Events
Only process events you care about:
async def filtered_streaming(agent, query):
interesting_events = [
"on_tool_start",
"on_tool_end",
"on_chat_model_stream",
"on_chain_end"
]
async for event in agent.astream_events(query, version="v1"):
if event.get("event") in interesting_events:
yield event
# Usage
async for event in filtered_streaming(agent, "Your query"):
# Process only relevant events
handle_event(event)
Troubleshooting Streaming
Ensure your LLM supports streaming:
# Enable streaming in your LLM
llm = ChatOpenAI(model="gpt-4", streaming=True)
Consider buffering and filtering events:
# Use smaller buffer sizes for more responsive streaming
async for chunk in agent.astream(query, buffer_size=1):
print(chunk, end="", flush=True)
Process events incrementally without storing everything:
# Don't store all events in memory
async for event in agent.astream_events(query, version="v1"):
process_event_immediately(event)
# Don't append to a list
Next Steps
Agent Configuration
Learn more about configuring agents for optimal streaming performance
Multi-Server Setup
Stream output from agents using multiple MCP servers
Performance Optimization
Optimize streaming performance for production use
The streaming API is based on LangChain’s astream_events
method. For more details on event types and data structure, check the LangChain streaming documentation.