MCPAgent API Reference

stream

async def stream(
    query: str,
    max_steps: int | None = None,
    manage_connector: bool = True,
    external_history: list[BaseMessage] | None = None,
) -> AsyncGenerator[tuple[AgentAction, str] | str, None]:
Stream agent execution step-by-step. Yields intermediate steps as (AgentAction, str) tuples, followed by the final result as a string. Parameters:
  • query (str): The query to execute
  • max_steps (int, optional): Maximum number of steps to take
  • manage_connector (bool): Whether to handle connector lifecycle
  • external_history (list[BaseMessage], optional): External conversation history
Yields:
  • (AgentAction, str): Intermediate steps containing the action and observation
  • str: Final result string
Example:
async for item in agent.stream("What's the weather like?"):
    if isinstance(item, str):
        print(f"Final result: {item}")
    else:
        action, observation = item
        print(f"Tool: {action.tool}, Result: {observation}")

run

async def run(
    query: str,
    max_steps: int | None = None,
    manage_connector: bool = True,
    external_history: list[BaseMessage] | None = None,
    output_schema: type[T] | None = None,
) -> str | T:
Run agent execution and return the final result. Uses the streaming implementation internally. Parameters:
  • query (str): The query to execute
  • max_steps (int, optional): Maximum number of steps to take
  • manage_connector (bool): Whether to handle connector lifecycle
  • external_history (list[BaseMessage], optional): External conversation history
  • output_schema (type[T], optional): Pydantic model for structured output. If provided, the agent will return an instance of this model.
Returns:
  • str | T: The final result as a string, or a Pydantic model instance if output_schema is provided.
Examples: Basic Usage
result = await agent.run("What's the weather like?")
print(result)
Structured Output
from pydantic import BaseModel, Field

class WeatherInfo(BaseModel):
    temperature: float = Field(description="Temperature in Celsius")
    condition: str = Field(description="Weather condition")

weather: WeatherInfo = await agent.run(
    "What's the weather like in London?",
    output_schema=WeatherInfo
)
print(f"Temperature: {weather.temperature}°C, Condition: {weather.condition}")

astream

async def astream(
    query: str,
    max_steps: int | None = None,
    manage_connector: bool = True,
    external_history: list[BaseMessage] | None = None,
) -> AsyncIterator[str]:
Asynchronous streaming interface for low-level agent events. Yields incremental results, tool actions, and intermediate steps as they are generated by the agent. Parameters:
  • query (str): The query to execute
  • max_steps (int, optional): Maximum number of steps to take
  • manage_connector (bool): Whether to handle connector lifecycle
  • external_history (list[BaseMessage], optional): External conversation history
Yields:
  • str: Streaming chunks of the agent’s output
Example:
async for chunk in agent.astream("hello"):
    print(chunk, end="", flush=True)

Method Comparison

MethodUse CaseOutput TypeGranularity
stream()Step-by-step workflow trackingSteps + final resultTool-level
run()Simple executionFinal result onlyComplete
astream()Real-time chat interfacesStreaming chunksToken-level

Conversation Memory

Methods for managing the agent’s conversation history.

get_conversation_history

def get_conversation_history() -> list[BaseMessage]:
Retrieves the current conversation history, which is a list of LangChain BaseMessage objects. This is useful for inspecting the agent’s memory or for passing it to another agent. Returns:
  • list[BaseMessage]: The list of messages in the conversation history.

clear_conversation_history

def clear_conversation_history() -> None:
Clears the agent’s conversation history. This is useful for starting a new conversation without creating a new agent instance. The system message is preserved. Example:
# Run a query, which populates the history
await agent.run("What is the capital of France?")

# Clear the history
agent.clear_conversation_history()

# The next query will not have the context of the first one
await agent.run("What was the last question I asked?")
# Assistant: I'm sorry, I don't have access to our previous conversation.

Agent Management

Methods for managing the agent’s lifecycle and configuration.

set_system_message

def set_system_message(message: str) -> None:
Set a new system message for the agent.

close

async def close() -> None:
Close the MCP connection and clean up resources.