mcp-use supports multiple approaches for streaming agent output, allowing you to receive incremental results, tool actions, and intermediate steps as they are generated by the agent.

Step-by-Step Streaming

The stream method provides a clean interface for receiving intermediate steps during agent execution. Each step represents a tool call and its result.

step_streaming.py
import asyncio
from langchain_openai import ChatOpenAI
from mcp_use import MCPAgent, MCPClient

async def step_streaming_example():
    # Setup agent
    config = {
        "mcpServers": {
            "playwright": {
                "command": "npx",
                "args": ["@playwright/mcp@latest"]
            }
        }
    }

    client = MCPClient.from_dict(config)
    llm = ChatOpenAI(model="gpt-4")
    agent = MCPAgent(llm=llm, client=client)

    # Stream the agent's steps
    print("🤖 Agent is working...")
    print("-" * 50)

    async for item in agent.stream("Search for the latest Python news and summarize it"):
        if isinstance(item, str):
            # Final result
            print(f"\n✅ Final Result:\n{item}")
        else:
            # Intermediate step (action, observation)
            action, observation = item
            print(f"\n🔧 Tool: {action.tool}")
            print(f"📝 Input: {action.tool_input}")
            print(f"📄 Result: {observation[:100]}{'...' if len(observation) > 100 else ''}")

    print("\n🎉 Done!")

if __name__ == "__main__":
    asyncio.run(step_streaming_example())

Low-Level Event Streaming

For more granular control, use the stream_events method to get real-time output events:

basic_streaming.py
import asyncio
from langchain_openai import ChatOpenAI
from mcp_use import MCPAgent, MCPClient

async def basic_streaming_example():
    # Setup agent
    config = {
        "mcpServers": {
            "playwright": {
                "command": "npx",
                "args": ["@playwright/mcp@latest"]
            }
        }
    }

    client = MCPClient.from_dict(config)
    llm = ChatOpenAI(model="gpt-4")
    agent = MCPAgent(llm=llm, client=client)

    # Stream the agent's response
    print("Agent is working...")
    async for chunk in agent.stream_events("Search for the latest Python news and summarize it"):
        print(chunk, end="", flush=True)

    print("\n\nDone!")

if __name__ == "__main__":
    asyncio.run(basic_streaming_example())

The streaming API is based on LangChain’s stream_events method. For more details on event types and data structure, check the LangChain streaming documentation.

Choosing the Right Streaming Method

Use stream() when:

• You want to show step-by-step progress • You need to process each tool call individually • You’re building a workflow UI • You want simple, clean step tracking

Use stream_events() when:

• You need fine-grained control over events • You’re building real-time chat interfaces • You want to stream LLM reasoning text • You need custom event filtering

Examples

Building a Streaming UI

Here’s an example of how you might build a simple console UI for streaming:

streaming_ui.py
import asyncio
import sys
from datetime import datetime
from langchain_openai import ChatOpenAI
from mcp_use import MCPAgent, MCPClient


class StreamingUI:
    def __init__(self):
        self.current_thought = ""
        self.tool_outputs = []
        self.final_answer = ""

    def clear_line(self):
        """Clear the current line in terminal"""
        sys.stdout.write("\r\033[K")

    def print_status(self, status, tool=None):
        """Print colored status updates"""
        timestamp = datetime.now().strftime("%H:%M:%S")
        if tool:
            print(f"\033[94m[{timestamp}] {status}: {tool}\033[0m")
        else:
            print(f"\033[92m[{timestamp}] {status}\033[0m")

    def print_thinking(self, text):
        """Print agent's reasoning in real-time"""
        self.clear_line()
        truncated = text[:80] + "..." if len(text) > 80 else text
        sys.stdout.write(f"\033[93m💭 Thinking: {truncated}\033[0m")
        sys.stdout.flush()

    def print_tool_result(self, tool_name, result):
        """Print tool execution results"""
        print(f"\n\033[96m🔧 {tool_name} result:\033[0m")
        # Truncate long results
        display_result = result[:200] + "..." if len(result) > 200 else result
        print(f"   {display_result}")


async def streaming_ui_example():
    config = {"mcpServers": {"playwright": {"command": "npx", "args": ["@playwright/mcp@latest"]}}}

    client = MCPClient.from_dict(config)
    llm = ChatOpenAI(model="gpt-4", streaming=True)
    agent = MCPAgent(llm=llm, client=client)

    ui = StreamingUI()

    query = "What are the current trending topics on Hacker News?"

    print("🤖 MCP Agent - Interactive Session")
    print("=" * 50)
    print(f"Query: {query}")
    print("=" * 50)

    current_tool = None
    current_reasoning = ""

    async for event in agent.stream_events(query):
        event_type = event.get("event")
        data = event.get("data", {})

        if event_type == "on_chat_model_start":
            ui.print_status("Starting to plan")

        elif event_type == "on_chat_model_stream":
            chunk = data.get("chunk", {})
            if hasattr(chunk, "content") and chunk.content:
                current_reasoning += chunk.content
                ui.print_thinking(current_reasoning)

        elif event_type == "on_tool_start":
            current_tool = data.get("input", {}).get("tool_name")
            if current_tool:
                print("\n")  # New line after thinking
                ui.print_status("Executing tool", current_tool)
                current_reasoning = ""  # Reset for next iteration

        elif event_type == "on_tool_end":
            output = data.get("output")
            if current_tool and output:
                ui.print_tool_result(current_tool, str(output))

        elif event_type == "on_chain_end":
            print("\n")
            ui.print_status("Task completed!")

            # Extract final answer
            final_output = data.get("output")
            if final_output:
                print(f"\n\033[92m📋 Final Answer:\033[0m")
                print(f"{final_output}")


if __name__ == "__main__":
    asyncio.run(streaming_ui_example())

Web Streaming with FastAPI

For web applications, you can stream agent output using Server-Sent Events:

web_streaming.py
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import json
from langchain_openai import ChatOpenAI
from mcp_use import MCPAgent, MCPClient

app = FastAPI()

async def create_agent():
    config = {
        "mcpServers": {
            "playwright": {
                "command": "npx",
                "args": ["@playwright/mcp@latest"]
            }
        }
    }
    client = MCPClient.from_dict(config)
    llm = ChatOpenAI(model="gpt-4", streaming=True)
    return MCPAgent(llm=llm, client=client)

@app.get("/stream/{query}")
async def stream_agent_response(query: str):
    """Stream agent response using Server-Sent Events"""

    async def event_generator():
        agent = await create_agent()

        async for event in agent.stream_events(query, version="v1"):
            event_type = event.get("event")
            data = event.get("data", {})

            # Format as SSE
            sse_data = {
                "type": event_type,
                "timestamp": time.time(),
                "data": data
            }

            yield f"data: {json.dumps(sse_data)}\n\n"

        yield "data: [DONE]\n\n"

    return StreamingResponse(
        event_generator(),
        media_type="text/plain",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
        }
    )

@app.get("/")
async def root():
    return {"message": "MCP Agent Streaming API"}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Next Steps