mcp-use supports multiple approaches for streaming agent output, allowing you to receive incremental results, tool actions, and intermediate steps as they are generated by the agent.
Step-by-Step Streaming
The stream
method provides a clean interface for receiving intermediate steps during agent execution. Each step represents a tool call and its result.
import asyncio
from langchain_openai import ChatOpenAI
from mcp_use import MCPAgent, MCPClient
async def step_streaming_example ():
# Setup agent
config = {
"mcpServers" : {
"playwright" : {
"command" : "npx" ,
"args" : [ "@playwright/mcp@latest" ]
}
}
}
client = MCPClient.from_dict(config)
llm = ChatOpenAI( model = "gpt-4" )
agent = MCPAgent( llm = llm, client = client)
# Stream the agent's steps
print ( "🤖 Agent is working..." )
print ( "-" * 50 )
async for item in agent.stream( "Search for the latest Python news and summarize it" ):
if isinstance (item, str ):
# Final result
print ( f " \n ✅ Final Result: \n { item } " )
else :
# Intermediate step (action, observation)
action, observation = item
print ( f " \n 🔧 Tool: { action.tool } " )
print ( f "📝 Input: { action.tool_input } " )
print ( f "📄 Result: { observation[: 100 ] }{ '...' if len (observation) > 100 else '' } " )
print ( " \n 🎉 Done!" )
if __name__ == "__main__" :
asyncio.run(step_streaming_example())
Low-Level Event Streaming
For more granular control, use the stream_events
method to get real-time output events:
import asyncio
from langchain_openai import ChatOpenAI
from mcp_use import MCPAgent, MCPClient
async def basic_streaming_example ():
# Setup agent
config = {
"mcpServers" : {
"playwright" : {
"command" : "npx" ,
"args" : [ "@playwright/mcp@latest" ]
}
}
}
client = MCPClient.from_dict(config)
llm = ChatOpenAI( model = "gpt-4" )
agent = MCPAgent( llm = llm, client = client)
# Stream the agent's response
print ( "Agent is working..." )
async for chunk in agent.stream_events( "Search for the latest Python news and summarize it" ):
print (chunk, end = "" , flush = True )
print ( " \n\n Done!" )
if __name__ == "__main__" :
asyncio.run(basic_streaming_example())
Choosing the Right Streaming Method
Use stream() when: • You want to show step-by-step progress
• You need to process each tool call individually
• You’re building a workflow UI
• You want simple, clean step tracking
Use stream_events() when: • You need fine-grained control over events
• You’re building real-time chat interfaces
• You want to stream LLM reasoning text
• You need custom event filtering
Examples
Building a Streaming UI
Here’s an example of how you might build a simple console UI for streaming:
import asyncio
import sys
from datetime import datetime
from langchain_openai import ChatOpenAI
from mcp_use import MCPAgent, MCPClient
class StreamingUI :
def __init__ ( self ):
self .current_thought = ""
self .tool_outputs = []
self .final_answer = ""
def clear_line ( self ):
"""Clear the current line in terminal"""
sys.stdout.write( " \r\033 [K" )
def print_status ( self , status , tool = None ):
"""Print colored status updates"""
timestamp = datetime.now().strftime( "%H:%M:%S" )
if tool:
print ( f " \033 [94m[ { timestamp } ] { status } : { tool } \033 [0m" )
else :
print ( f " \033 [92m[ { timestamp } ] { status } \033 [0m" )
def print_thinking ( self , text ):
"""Print agent's reasoning in real-time"""
self .clear_line()
truncated = text[: 80 ] + "..." if len (text) > 80 else text
sys.stdout.write( f " \033 [93m💭 Thinking: { truncated } \033 [0m" )
sys.stdout.flush()
def print_tool_result ( self , tool_name , result ):
"""Print tool execution results"""
print ( f " \n\033 [96m🔧 { tool_name } result: \033 [0m" )
# Truncate long results
display_result = result[: 200 ] + "..." if len (result) > 200 else result
print ( f " { display_result } " )
async def streaming_ui_example ():
config = { "mcpServers" : { "playwright" : { "command" : "npx" , "args" : [ "@playwright/mcp@latest" ]}}}
client = MCPClient.from_dict(config)
llm = ChatOpenAI( model = "gpt-4" , streaming = True )
agent = MCPAgent( llm = llm, client = client)
ui = StreamingUI()
query = "What are the current trending topics on Hacker News?"
print ( "🤖 MCP Agent - Interactive Session" )
print ( "=" * 50 )
print ( f "Query: { query } " )
print ( "=" * 50 )
current_tool = None
current_reasoning = ""
async for event in agent.stream_events(query):
event_type = event.get( "event" )
data = event.get( "data" , {})
if event_type == "on_chat_model_start" :
ui.print_status( "Starting to plan" )
elif event_type == "on_chat_model_stream" :
chunk = data.get( "chunk" , {})
if hasattr (chunk, "content" ) and chunk.content:
current_reasoning += chunk.content
ui.print_thinking(current_reasoning)
elif event_type == "on_tool_start" :
current_tool = data.get( "input" , {}).get( "tool_name" )
if current_tool:
print ( " \n " ) # New line after thinking
ui.print_status( "Executing tool" , current_tool)
current_reasoning = "" # Reset for next iteration
elif event_type == "on_tool_end" :
output = data.get( "output" )
if current_tool and output:
ui.print_tool_result(current_tool, str (output))
elif event_type == "on_chain_end" :
print ( " \n " )
ui.print_status( "Task completed!" )
# Extract final answer
final_output = data.get( "output" )
if final_output:
print ( f " \n\033 [92m📋 Final Answer: \033 [0m" )
print ( f " { final_output } " )
if __name__ == "__main__" :
asyncio.run(streaming_ui_example())
Web Streaming with FastAPI
For web applications, you can stream agent output using Server-Sent Events:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import json
from langchain_openai import ChatOpenAI
from mcp_use import MCPAgent, MCPClient
app = FastAPI()
async def create_agent ():
config = {
"mcpServers" : {
"playwright" : {
"command" : "npx" ,
"args" : [ "@playwright/mcp@latest" ]
}
}
}
client = MCPClient.from_dict(config)
llm = ChatOpenAI( model = "gpt-4" , streaming = True )
return MCPAgent( llm = llm, client = client)
@app.get ( "/stream/ {query} " )
async def stream_agent_response ( query : str ):
"""Stream agent response using Server-Sent Events"""
async def event_generator ():
agent = await create_agent()
async for event in agent.stream_events(query, version = "v1" ):
event_type = event.get( "event" )
data = event.get( "data" , {})
# Format as SSE
sse_data = {
"type" : event_type,
"timestamp" : time.time(),
"data" : data
}
yield f "data: { json.dumps(sse_data) } \n\n "
yield "data: [DONE] \n\n "
return StreamingResponse(
event_generator(),
media_type = "text/plain" ,
headers = {
"Cache-Control" : "no-cache" ,
"Connection" : "keep-alive" ,
}
)
@app.get ( "/" )
async def root ():
return { "message" : "MCP Agent Streaming API" }
if __name__ == "__main__" :
import uvicorn
uvicorn.run(app, host = "0.0.0.0" , port = 8000 )
Next Steps