Skip to main content
Connection management for MCP implementations. This module provides an abstract base class for different types of connection managers used in MCP connectors.

ConnectionManager

from mcp_use.task_managers.base import ConnectionManager

method init

Initialize a new connection manager.Signature
def __init__():

method get_streams

Get the current connection streams.Returns
returns
mcp_use.task_managers.base.T | None
The current connection (typically a tuple of read_stream, write_stream) or None if not connected.
Signature
def get_streams():

method start

Start the connection manager and establish a connection.Returns
returns
mcp_use.task_managers.base.T
The established connection.
Signature
def start():

method stop

Stop the connection manager and close the connection.Signature
def stop():
I