MCPAgent API Reference

stream

async def stream(
    query: str,
    max_steps: int | None = None,
    manage_connector: bool = True,
    external_history: list[BaseMessage] | None = None,
) -> AsyncGenerator[tuple[AgentAction, str] | str, None]:
Stream agent execution step-by-step. Yields intermediate steps as (AgentAction, str) tuples, followed by the final result as a string. Parameters:
  • query (str): The query to execute
  • max_steps (int, optional): Maximum number of steps to take
  • manage_connector (bool): Whether to handle connector lifecycle
  • external_history (list[BaseMessage], optional): External conversation history
Yields:
  • (AgentAction, str): Intermediate steps containing the action and observation
  • str: Final result string
Example:
async for item in agent.stream("What's the weather like?"):
    if isinstance(item, str):
        print(f"Final result: {item}")
    else:
        action, observation = item
        print(f"Tool: {action.tool}, Result: {observation}")

run

async def run(
    query: str,
    max_steps: int | None = None,
    manage_connector: bool = True,
    external_history: list[BaseMessage] | None = None,
) -> str:
Run agent execution and return the final result. Uses the streaming implementation internally. Parameters:
  • query (str): The query to execute
  • max_steps (int, optional): Maximum number of steps to take
  • manage_connector (bool): Whether to handle connector lifecycle
  • external_history (list[BaseMessage], optional): External conversation history
Returns:
  • str: The final result
Example:
result = await agent.run("What's the weather like?")
print(result)

astream

async def astream(
    query: str,
    max_steps: int | None = None,
    manage_connector: bool = True,
    external_history: list[BaseMessage] | None = None,
) -> AsyncIterator[str]:
Asynchronous streaming interface for low-level agent events. Yields incremental results, tool actions, and intermediate steps as they are generated by the agent. Parameters:
  • query (str): The query to execute
  • max_steps (int, optional): Maximum number of steps to take
  • manage_connector (bool): Whether to handle connector lifecycle
  • external_history (list[BaseMessage], optional): External conversation history
Yields:
  • str: Streaming chunks of the agent’s output
Example:
async for chunk in agent.astream("hello"):
    print(chunk, end="", flush=True)

Method Comparison

MethodUse CaseOutput TypeGranularity
stream()Step-by-step workflow trackingSteps + final resultTool-level
run()Simple executionFinal result onlyComplete
astream()Real-time chat interfacesStreaming chunksToken-level

Configuration Methods

get_conversation_history

def get_conversation_history() -> list[BaseMessage]:
Get the current conversation history.

clear_conversation_history

def clear_conversation_history() -> None:
Clear the conversation history.

set_system_message

def set_system_message(message: str) -> None:
Set a new system message for the agent.

close

async def close() -> None:
Close the MCP connection and clean up resources.