Skip to main content
Published on October 7 2025 by Vincenzo Reina Middleware

Middleware Pipeline PR

Check out the pull request that introduced this architecture.
For a long time, we handled cross-cutting concerns, logging, metrics, caching, auth, the way most growing projects do: a little bit of logic here, a try/except block there. It was all sprinkled across the methods of our project. It worked, but it was fragile. We knew we were technical-debt-financing our own future. It was time to pay up. We had to find a way to decouple our core feature logic from the operational wiring that held it all together. This is the story of how we did just that by building a middleware pipeline from the ground up.

From Tangled Logic to a Clean Pipeline

The breakthrough came when we stopped thinking about a request as a single, monolithic function call and started thinking of it as a message flowing through a pipeline. Each stage in the pipeline gets a chance to inspect the message, add data to it, change it, or even stop it dead in its tracks. Then, after the core work is done, the response flows back through those same stages in reverse. This “middleware” pattern isn’t new, but applying it to our system was game-changing. It let us untangle the operational logic (e.g., “how long did this take?”) from the business logic (e.g., “call this tool”). We could finally build these concerns as independent, reusable, and testable components. Our design goals were born from past pain:
  1. No, you don’t have to rewrite your code: The new system had to be a drop-in. Developers using our existing ClientSession shouldn’t have to change a single line of their code.
  2. Make the right thing easy (and type-safe): Writing new middleware should be straightforward, with full auto-complete and static analysis support. No more guessing what’s in the context object.
  3. Be both specific and general: We needed a way to write middleware that runs on every single request, but also middleware that only targets a specific method like tools/call.

The Core Components

Our architecture boils down to four key components that work in concert.

1. The MiddlewareContext: Our Universal Passport

For a message to travel through the pipeline, it needs a passport, a standard document carrying all its vital info. That’s our MiddlewareContext. It’s a generic dataclass that holds a request’s unique ID, the RPC method name, and, crucially, its strongly-typed parameters.
from typing import Generic, TypeVar

T = TypeVar("T")

@dataclass
class MiddlewareContext(Generic[T]):
    """Unified, typed context for all middleware operations."""
    id: str
    method: str      # e.g., "tools/call"
    params: T        # T is bound to a specific request type, e.g., CallToolRequestParams
    connection_id: str
    timestamp: float
    metadata: dict[str, Any] = field(default_factory=dict) # A spot for middleware to stash state
By using Generic[T], we give the type checker the information it needs. If a middleware gets a context where method == "tools/call", a developer knows that context.params is a CallToolRequestParams object. This has saved us from countless AttributeError bugs at runtime.

2. The Middleware Base Class: A Template for Behavior

This is the contract for anyone wanting to plug into the pipeline. Instead of forcing developers into a massive if/elif/else chain to check the request type, we use a bit of dispatch magic to route the context to the right handler.
# A simplified view of the Middleware base class
# NextFunctionT is just a fancy type hint for Callable[[MiddlewareContext], Awaitable[Any]]
class Middleware:
    async def __call__(self, context: MiddlewareContext, call_next: NextFunctionT):
        # The magic is here: find a specific handler like 'on_call_tool' or
        # fall back to the generic 'on_request'.
        handler = self._get_handler_for_method(context.method)
        return await handler(context, call_next)

    # The generic fallback for all requests
    async def on_request(self, context: MiddlewareContext, call_next: NextFunctionT):
        return await call_next(context)

    # A specific, type-safe hook developers can override
    async def on_call_tool(self, context: MiddlewareContext[CallToolRequestParams], call_next: NextFunctionT):
        # Default behavior is to just continue the chain
        return await call_next(context)
Want to log every request? Override on_request. Need to add a cache check only for read_resource calls? Just implement on_read_resource. The base class handles the routing; you just write the logic.

3. The MiddlewareManager: The Pipeline Conductor

This is the brain of the operation. It holds the list of registered middleware and stitches the pipeline together for each request. Its process_request method uses a neat bit of functional composition to wrap the original function call, layer by layer.
# In the MiddlewareManager class...
async def process_request(self, context: MiddlewareContext, original_call: Callable):
    # The final link in the chain is always the original function we intercepted.
    async def execute_call(_: MiddlewareContext) -> Any:
        return await original_call()

    # Start with the end of the chain and wrap it backwards.
    call_chain = execute_call
    for middleware in reversed(self.middlewares):
        # This is why the first middleware added is the first one to run.
        call_chain = partial(middleware, call_next=call_chain)

    # Kick off the whole chain.
    return await call_chain(context)
This approach also gives us a single, top-level place to wrap the entire execution in a try...finally block. This is how we ensure that we capture timing, results, and errors for every single request in a structured way. Centralized error handling was a massive win for our on-call engineers.

4. The CallbackClientSession: The Invisible Adapter

This was the secret sauce for our “zero-intrusion” goal. How do you rewire a factory without the workers noticing? You build an adapter that looks and feels exactly like the old tool. The CallbackClientSession is a wrapper around the original ClientSession. It exposes the exact same methods (call_tool, read_resource), so from the user’s perspective, nothing has changed. But on the inside, it’s not executing the call. It’s packaging the arguments into a MiddlewareContext and handing it off to the MiddlewareManager.
# A peek inside the adapter
class CallbackClientSession:
    def __init__(self, real_session: ClientSession, ..., manager: MiddlewareManager):
        self._session = real_session
        self._manager = manager
        # ... other setup ...

    async def call_tool(self, name: str, arguments: dict, ...) -> CallToolResult:
        params = CallToolRequestParams(name=name, arguments=arguments)

        # We wrap the original call in a lambda to defer its execution.
        # It won't run until the middleware chain decides to call it.
        original_call = lambda: self._session.call_tool(name, arguments, ...)

        # Now, hand it off to the pipeline.
        return await self._manager.process_request(
            context=self._create_context("tools/call", params),
            original_call=original_call
        )
This adapter is the bridge between the old world and the new. It intercepts calls at the boundary and shunts them into our pipeline, completely transparently.

So, Was It Worth It?

Absolutely. This wasn’t just code cleanup; it was an investment in our platform’s future. By building a solid foundation, we’ve enabled our entire team to build better, safer, and more observable features, faster. It turned a source of chronic pain into a point of pride. This pattern has been a game-changer for us, and we’re always keen to hear how others are solving similar problems. Feel free to share your own war stories or ideas.
I