MCP-Use supports asynchronous streaming of agent output, allowing you to receive incremental results, tool actions, and intermediate steps as they are generated by the agent.

Basic Streaming

Use the astream method to get real-time output from your agent:

basic_streaming.py
import asyncio
from langchain_openai import ChatOpenAI
from mcp_use import MCPAgent, MCPClient

async def basic_streaming_example():
    # Setup agent
    config = {
        "mcpServers": {
            "playwright": {
                "command": "npx",
                "args": ["@playwright/mcp@latest"]
            }
        }
    }

    client = MCPClient.from_dict(config)
    llm = ChatOpenAI(model="gpt-4")
    agent = MCPAgent(llm=llm, client=client)

    # Stream the agent's response
    print("Agent is working...")
    async for chunk in agent.astream("Search for the latest Python news and summarize it"):
        print(chunk, end="", flush=True)

    print("\n\nDone!")

if __name__ == "__main__":
    asyncio.run(basic_streaming_example())

Advanced Streaming with Event Handling

For more control over the streaming output, you can handle different types of events:

advanced_streaming.py
import asyncio
from langchain_openai import ChatOpenAI
from mcp_use import MCPAgent, MCPClient

async def advanced_streaming_example():
    config = {
        "mcpServers": {
            "playwright": {
                "command": "npx",
                "args": ["@playwright/mcp@latest"]
            },
            "filesystem": {
                "command": "mcp-server-filesystem",
                "args": ["/workspace"]
            }
        }
    }

    client = MCPClient.from_dict(config)
    llm = ChatOpenAI(model="gpt-4")
    agent = MCPAgent(llm=llm, client=client, use_server_manager=True)

    query = """
    Research the top 3 AI companies, create a comparison report,
    and save it as a markdown file
    """

    print("🤖 Starting agent task...")
    print("-" * 50)

    current_step = 1

    async for event in agent.astream_events(query, version="v1"):
        event_type = event.get("event")
        data = event.get("data", {})

        if event_type == "on_chat_model_start":
            print(f"\n📝 Step {current_step}: Planning next action...")

        elif event_type == "on_tool_start":
            tool_name = data.get("input", {}).get("tool_name", "unknown")
            print(f"\n🔧 Using tool: {tool_name}")

        elif event_type == "on_tool_end":
            print(" ✅ Tool completed")
            current_step += 1

        elif event_type == "on_chat_model_stream":
            token = data.get("chunk", {}).get("content", "")
            if token:
                print(token, end="", flush=True)

        elif event_type == "on_chain_end":
            print(f"\n\n🎉 Task completed successfully!")

if __name__ == "__main__":
    asyncio.run(advanced_streaming_example())

Real-time Progress Tracking

Create a progress tracker that shows the agent’s current status:

progress_tracking.py
import asyncio
import time
from langchain_openai import ChatOpenAI
from mcp_use import MCPAgent, MCPClient

class ProgressTracker:
    def __init__(self):
        self.start_time = time.time()
        self.current_tool = None
        self.step_count = 0
        self.tools_used = []

    def update_tool(self, tool_name):
        if tool_name != self.current_tool:
            self.current_tool = tool_name
            self.step_count += 1
            self.tools_used.append(tool_name)
            elapsed = time.time() - self.start_time
            print(f"\n[{elapsed:.1f}s] Step {self.step_count}: Using {tool_name}")

    def show_summary(self):
        elapsed = time.time() - self.start_time
        print(f"\n{'='*50}")
        print(f"Task completed in {elapsed:.1f} seconds")
        print(f"Steps taken: {self.step_count}")
        print(f"Tools used: {', '.join(set(self.tools_used))}")
        print(f"{'='*50}")

async def streaming_with_progress():
    config = {
        "mcpServers": {
            "playwright": {
                "command": "npx",
                "args": ["@playwright/mcp@latest"]
            }
        }
    }

    client = MCPClient.from_dict(config)
    llm = ChatOpenAI(model="gpt-4", streaming=True)
    agent = MCPAgent(llm=llm, client=client)

    tracker = ProgressTracker()

    query = "Find information about the latest iPhone model and its key features"

    print("🚀 Starting agent with real-time progress tracking...")

    async for event in agent.astream_events(query, version="v1"):
        event_type = event.get("event")
        data = event.get("data", {})

        if event_type == "on_tool_start":
            tool_name = data.get("input", {}).get("tool_name")
            if tool_name:
                tracker.update_tool(tool_name)

        elif event_type == "on_chat_model_stream":
            chunk = data.get("chunk", {})
            if hasattr(chunk, 'content') and chunk.content:
                print(chunk.content, end="", flush=True)

        elif event_type == "on_chain_end":
            tracker.show_summary()

if __name__ == "__main__":
    asyncio.run(streaming_with_progress())

Building a Streaming UI

Here’s an example of how you might build a simple console UI for streaming:

streaming_ui.py
import asyncio
import sys
from datetime import datetime
from langchain_openai import ChatOpenAI
from mcp_use import MCPAgent, MCPClient

class StreamingUI:
    def __init__(self):
        self.current_thought = ""
        self.tool_outputs = []
        self.final_answer = ""

    def clear_line(self):
        """Clear the current line in terminal"""
        sys.stdout.write('\r\033[K')

    def print_status(self, status, tool=None):
        """Print colored status updates"""
        timestamp = datetime.now().strftime("%H:%M:%S")
        if tool:
            print(f"\033[94m[{timestamp}] {status}: {tool}\033[0m")
        else:
            print(f"\033[92m[{timestamp}] {status}\033[0m")

    def print_thinking(self, text):
        """Print agent's reasoning in real-time"""
        self.clear_line()
        truncated = text[:80] + "..." if len(text) > 80 else text
        sys.stdout.write(f"\033[93m💭 Thinking: {truncated}\033[0m")
        sys.stdout.flush()

    def print_tool_result(self, tool_name, result):
        """Print tool execution results"""
        print(f"\n\033[96m🔧 {tool_name} result:\033[0m")
        # Truncate long results
        display_result = result[:200] + "..." if len(result) > 200 else result
        print(f"   {display_result}")

async def streaming_ui_example():
    config = {
        "mcpServers": {
            "playwright": {
                "command": "npx",
                "args": ["@playwright/mcp@latest"]
            }
        }
    }

    client = MCPClient.from_dict(config)
    llm = ChatOpenAI(model="gpt-4", streaming=True)
    agent = MCPAgent(llm=llm, client=client)

    ui = StreamingUI()

    query = "What are the current trending topics on Hacker News?"

    print("🤖 MCP Agent - Interactive Session")
    print("=" * 50)
    print(f"Query: {query}")
    print("=" * 50)

    current_tool = None
    current_reasoning = ""

    async for event in agent.astream_events(query, version="v1"):
        event_type = event.get("event")
        data = event.get("data", {})

        if event_type == "on_chat_model_start":
            ui.print_status("Starting to plan")

        elif event_type == "on_chat_model_stream":
            chunk = data.get("chunk", {})
            if hasattr(chunk, 'content') and chunk.content:
                current_reasoning += chunk.content
                ui.print_thinking(current_reasoning)

        elif event_type == "on_tool_start":
            current_tool = data.get("input", {}).get("tool_name")
            if current_tool:
                print("\n")  # New line after thinking
                ui.print_status("Executing tool", current_tool)
                current_reasoning = ""  # Reset for next iteration

        elif event_type == "on_tool_end":
            output = data.get("output")
            if current_tool and output:
                ui.print_tool_result(current_tool, str(output))

        elif event_type == "on_chain_end":
            print("\n")
            ui.print_status("Task completed!")

            # Extract final answer
            final_output = data.get("output")
            if final_output:
                print(f"\n\033[92m📋 Final Answer:\033[0m")
                print(f"{final_output}")

if __name__ == "__main__":
    asyncio.run(streaming_ui_example())

Web Streaming with FastAPI

For web applications, you can stream agent output using Server-Sent Events:

web_streaming.py
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import json
from langchain_openai import ChatOpenAI
from mcp_use import MCPAgent, MCPClient

app = FastAPI()

async def create_agent():
    config = {
        "mcpServers": {
            "playwright": {
                "command": "npx",
                "args": ["@playwright/mcp@latest"]
            }
        }
    }
    client = MCPClient.from_dict(config)
    llm = ChatOpenAI(model="gpt-4", streaming=True)
    return MCPAgent(llm=llm, client=client)

@app.get("/stream/{query}")
async def stream_agent_response(query: str):
    """Stream agent response using Server-Sent Events"""

    async def event_generator():
        agent = await create_agent()

        async for event in agent.astream_events(query, version="v1"):
            event_type = event.get("event")
            data = event.get("data", {})

            # Format as SSE
            sse_data = {
                "type": event_type,
                "timestamp": time.time(),
                "data": data
            }

            yield f"data: {json.dumps(sse_data)}\n\n"

        yield "data: [DONE]\n\n"

    return StreamingResponse(
        event_generator(),
        media_type="text/plain",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
        }
    )

@app.get("/")
async def root():
    return {"message": "MCP Agent Streaming API"}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Performance Considerations

Buffering Output

For better performance, you can buffer output:

async def buffered_streaming(agent, query, buffer_size=10):
    buffer = []

    async for chunk in agent.astream(query):
        buffer.append(chunk)

        if len(buffer) >= buffer_size:
            yield ''.join(buffer)
            buffer = []

    # Yield remaining buffer
    if buffer:
        yield ''.join(buffer)

# Usage
async for buffered_chunk in buffered_streaming(agent, "Your query here"):
    print(buffered_chunk, end="", flush=True)

Filtering Events

Only process events you care about:

async def filtered_streaming(agent, query):
    interesting_events = [
        "on_tool_start",
        "on_tool_end",
        "on_chat_model_stream",
        "on_chain_end"
    ]

    async for event in agent.astream_events(query, version="v1"):
        if event.get("event") in interesting_events:
            yield event

# Usage
async for event in filtered_streaming(agent, "Your query"):
    # Process only relevant events
    handle_event(event)

Troubleshooting Streaming

Next Steps

The streaming API is based on LangChain’s astream_events method. For more details on event types and data structure, check the LangChain streaming documentation.