Skip to content

Add message queue for SSE messages POST endpoint #459

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conversation

akash329d
Copy link
Contributor

@akash329d akash329d commented Apr 8, 2025

Motivation and Context

This PR decouples session state from individual server instances by introducing a message dispatch abstraction. The previous implementation required client messages to be handled by the same server that initialized the session. Now, servers poll a shared message dispatch (with in-memory or Redis backend) to receive client messages, allowing any server instance to handle POST requests for any session to the message endpoint.

How Has This Been Tested?

All unit tests are passing, tested redis implementation locally. Added unit / integration tests as well for redis implementation.

Breaking Changes

N/A

Types of changes

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to change)
  • Documentation update

Checklist

  • I have read the MCP Documentation
  • My code follows the repository's style guidelines
  • New and existing tests pass locally
  • I have added appropriate error handling
  • I have added or updated documentation as needed

Additional context

@akash329d akash329d requested a review from praboud-ant April 14, 2025 20:19
@@ -96,7 +101,12 @@ async def connect_sse(self, scope: Scope, receive: Receive, send: Send):

session_id = uuid4()
session_uri = f"{quote(self._endpoint)}?session_id={session_id.hex}"
self._read_stream_writers[session_id] = read_stream_writer

async def message_callback(message: types.JSONRPCMessage | Exception) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now, this will block if the MCP server implementation doesn't consume read_stream quickly enough. For the in-memory dispatch implementation, this is basically fine, because that just causes the corresponding POST /message call to block. However, in the Redis implementation, it will block anything else from getting consumed from the Redis pubsub stream.

Fundamentally, something has to have a sized buffer (with size>0; the channels we have here are size=0, so messages block immediately), and writes to that buffer must fail once that buffer fills up. Moreover, we need to signal error to the client when this happens, and probably shut down. I'm not 100% sure on what the right way to signal that error to the client is. If the message is a request, we can return an error response... but if it's a notification, or a response, it's not clear to me how to signal to the client that something went wrong. LoggingMessageNotification is the closest thing I can find in the spec: https://github.com/modelcontextprotocol/modelcontextprotocol/blob/main/schema/2025-03-26/schema.ts. That doesn't seem like quite what this was meant for, though. I guess we could send an error log notification, then cancel the taskgroup / close the connection, but it feels a little bit wrong.

@jerome3o-anthropic: do you have thoughts on how the server can best signal to the client "ahh, fatal error, shutting down"?

As for how to implement this; I think what would make the most sense to me is to have the read_stream buffer be given some size (say, 32, but that's totally arbitrary), make the message_callback be a synchronous function, and call read_stream_writer.send_nowait(). If it errors out, we can then push another message to the SSE stream writer. (This should also be with nowait; we'll need to add a buffer to the SSE channel to accommodate this. If the nowait call fails, we should just silently move on - the client is probably jammed up somehow, and isn't going to get our message anyway.)We should just skip sending an error to the client if the client is backed up reading messages.) Once that's done, we can cancel the TG (though we'll need to split the TG used to send the SSE stream from the one that wraps the yield from this function.

This logic is a bit fiddly, so it's probably best to implement this here, rather than in the message dispatch we expect clients might implement variants of.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the observed functionality here different between the Redis implementation vs InMemory?

Wouldn't we block at message_callback in both cases? Or are you saying this is fine in the InMemory case because we are also blocking the POST response? Not sure what guarantees there are in the SSE protocol for how fast the response needs to be after the POST.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I have a potential solution here; we have a higher level task group take care of the handler logic (which is separate from the listener task group). So once the listener task group completes; we still allow all the handlers to finish out. This allows us to follow the same behavior as the original in-memory implementation with an essentially 'infinite' sized buffer.

Let me know what you think; alternatively we can go to a fixed size stream_buffer and throw errors back to the message POST if we exceed that. Initially attempted this approach, but there wasn't a straightforward way to plumb through the ACKs on handler completion.

Copy link
Contributor

@praboud-ant praboud-ant Apr 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is more elegant. I'm a little wary of arbitrarily deeply queueing tasks, but I think in practice it's probably fine. We can always come back to this later if becomes a problem in practice.

@asynccontextmanager
async def active_for_request(self, session_id: UUID, callback: MessageCallback):
"""Request-scoped context manager that ensures the listener task is running."""
await self._redis.sadd(self._active_sessions_key, session_id.hex)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's somewhat preferable to use separate keys for tracking the active sessions, rather than one big set value, so the set doesn't grow unboundedly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like the max # of redis keys & the max number of members in a set are equal, so I think there should be no difference in practice? https://redis.io/docs/latest/develop/data-types/sets/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh, I guess that seems fine to me, then.

try:
yield
finally:
tg.cancel_scope.cancel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One kinda squirrelly part of this is that it's extremely important that the cancel doesn't happen after we've read a message off the Redis connection, otherwise the data will just get entirely lost. Right now, I think there's a failure mode here where one SSE stream being closed at the wrong moment will cause messages from another SSE stream to be dropped.

Looking through the Redis implementation, there are definitely multiple async calls within get_message, so it's possible for the bytes to be read off the connection, then the context to be cancelled, and the message dropped.

I think the way we have to structure this is either with multiple connections, or by wrapping the get_message() + dispatch calls in a shield=True scope: https://anyio.readthedocs.io/en/stable/cancellation.html#shielding. We'd then need to set a timeout on the get_message call, and just silently retry if the read times out. (This essentially gives us known safe points that we're OK being cancelled at.)

@akash329d akash329d requested a review from praboud-ant April 22, 2025 21:50
Copy link
Contributor

@praboud-ant praboud-ant left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking great! All of the code looks logically right to me - I think the one remaining thing that we should probably do is to add some test coverage for the Redis dispatch code. This logic is fairly complicated, and having to test it manually isn't ideal. FakeRedis is a pretty good way to write these sorts of tests, so we don't actually have to run a separate Redis container.

@nickste
Copy link

nickste commented Apr 24, 2025

Hey folks - thanks for the awesome efforts here! I've tested the latest commit and can confirm it has fixed the "Could not find session for ID: [session-id]" issue I was running into with an earlier commit.

I'm seeing another issue that would be useful to try and figure out. If you have clients connected to your MCP server (running the code from this latest commit), and then restart your MCP server (for example, as would happen when you deploy an update to your containers running the MCP server) then any previously connected clients that make any subsequent tool requests result in this error:

2025-04-24 04:55:14,432 - INFO - 192.168.65.1:18181 - "POST /messages/?session_id=4f2c010cb2cd403fae6109db75cfc143 HTTP/1.1" 202
ERROR:    Exception in ASGI application
Traceback (most recent call last):
  File "/usr/local/lib/python3.13/site-packages/uvicorn/protocols/http/h11_impl.py", line 403, in run_asgi
    result = await app(  # type: ignore[func-returns-value]
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        self.scope, self.receive, self.send
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    )
    ^
  File "/usr/local/lib/python3.13/site-packages/uvicorn/middleware/proxy_headers.py", line 60, in __call__
    return await self.app(scope, receive, send)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.13/site-packages/fastapi/applications.py", line 1054, in __call__
    await super().__call__(scope, receive, send)
  File "/usr/local/lib/python3.13/site-packages/starlette/applications.py", line 112, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/usr/local/lib/python3.13/site-packages/starlette/middleware/errors.py", line 187, in __call__
    raise exc
  File "/usr/local/lib/python3.13/site-packages/starlette/middleware/errors.py", line 165, in __call__
    await self.app(scope, receive, _send)
  File "/usr/local/lib/python3.13/site-packages/starlette/middleware/base.py", line 183, in __call__
    raise app_exc
  File "/usr/local/lib/python3.13/site-packages/starlette/middleware/base.py", line 141, in coro
    await self.app(scope, receive_or_disconnect, send_no_error)
  File "/usr/local/lib/python3.13/site-packages/starlette/middleware/base.py", line 183, in __call__
    raise app_exc
  File "/usr/local/lib/python3.13/site-packages/starlette/middleware/base.py", line 141, in coro
    await self.app(scope, receive_or_disconnect, send_no_error)
  File "/usr/local/lib/python3.13/site-packages/starlette/middleware/exceptions.py", line 62, in __call__
    await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
  File "/usr/local/lib/python3.13/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
    raise exc
  File "/usr/local/lib/python3.13/site-packages/starlette/_exception_handler.py", line 42, in wrapped_app
    await app(scope, receive, sender)
  File "/usr/local/lib/python3.13/site-packages/starlette/routing.py", line 714, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/usr/local/lib/python3.13/site-packages/starlette/routing.py", line 734, in app
    await route.handle(scope, receive, send)
  File "/usr/local/lib/python3.13/site-packages/starlette/routing.py", line 460, in handle
    await self.app(scope, receive, send)
  File "/usr/local/lib/python3.13/site-packages/starlette/applications.py", line 112, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/usr/local/lib/python3.13/site-packages/starlette/middleware/errors.py", line 187, in __call__
    raise exc
  File "/usr/local/lib/python3.13/site-packages/starlette/middleware/errors.py", line 165, in __call__
    await self.app(scope, receive, _send)
  File "/usr/local/lib/python3.13/site-packages/starlette/middleware/exceptions.py", line 62, in __call__
    await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
  File "/usr/local/lib/python3.13/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
    raise exc
  File "/usr/local/lib/python3.13/site-packages/starlette/_exception_handler.py", line 42, in wrapped_app
    await app(scope, receive, sender)
  File "/usr/local/lib/python3.13/site-packages/starlette/routing.py", line 714, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/usr/local/lib/python3.13/site-packages/starlette/routing.py", line 734, in app
    await route.handle(scope, receive, send)
  File "/usr/local/lib/python3.13/site-packages/starlette/routing.py", line 288, in handle
    await self.app(scope, receive, send)
  File "/usr/local/lib/python3.13/site-packages/starlette/routing.py", line 76, in app
    await wrap_app_handling_exceptions(app, request)(scope, receive, send)
  File "/usr/local/lib/python3.13/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
    raise exc
  File "/usr/local/lib/python3.13/site-packages/starlette/_exception_handler.py", line 42, in wrapped_app
    await app(scope, receive, sender)
  File "/usr/local/lib/python3.13/site-packages/starlette/routing.py", line 74, in app
    await response(scope, receive, send)
          ~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^
TypeError: 'NoneType' object is not callable

And the client shows a "tool result" error with the message: "MCP error -32001: Request timed out".

Steps to reproduce:

  1. Use docker to run MCP server (using python-sdk), connecting to a redis container.
  2. Use MCP Inspector to connect to the docker MCP server.
  3. Run a tool call.
  4. Shutdown the docker MCP server and start it up again.
  5. Run a tool call.
  6. Check docker logs for error.

I've also recorded a quick run through of reproducing the error here: https://supercut.video/share/safety-cybersecurity/OXRer1Pj4jjTvSO8FtPgzl

Let me know if you need any help reproducing or if it'd be useful to hop on a quick call to discuss.

@akash329d
Copy link
Contributor Author

akash329d commented May 1, 2025

Thanks for reporting the bug! I did some debugging and I think I have the root cause figured out.

When you shutdown the MCP server and restart it, MCP Inspector attempts to auto-reconnect the SSE session on a cadence. When this happens, it initiates a new SSE connection (getting a new session ID), but it doesn't perform a re-initialization of the SSE connection.

In the in-memory (non-Redis) implementation, this causes an error when you try to do a tool call/query: Received request before initialization was complete.

However, in the Redis implementation there's a race condition. The same RuntimeError on initialization completion triggers the finally clause in the subscribe context manager, which cancels the task group. This ends up swallowing that exception and instead we get the TypeError: 'NoneType' object is not callable exception (which happens because we're not returning anything from the connect_sse method).

The fact that we haven't noticed this before also exposes a separate bug: with the current connect_sse method, it never actually exits. Even when the EventSourceResponse task closes on SSE disconnect, the request just stays active because we have no mechanism to kill it.

That's what I've found so far - will try to have a fix out soon.

For a quick explainer on the Exception swallowing, here's a repro which I think explains it pretty well:

from contextlib import asynccontextmanager
import anyio

@asynccontextmanager
async def test():
    async with anyio.create_task_group() as tg:
        try:
            yield
        finally:
            tg.cancel_scope.cancel()
            await anyio.sleep(0.01)
            print("In finally block - cancelling task group...")

async def main():
    async with anyio.create_task_group() as tg:
        async with test():
            await anyio.sleep(0.5)
            raise Exception("Exception in main")

    print("Done!")

if __name__ == "__main__":
    anyio.run(main)

The result of this is the program completing successfully, but not printing the "In finally block - cancelling task group...". With my prior understanding I thought that print would happen as I thought it wasn't happening in the task group.

@akash329d akash329d requested a review from praboud-ant May 6, 2025 19:53
@praboud-ant praboud-ant merged commit 3b1b213 into modelcontextprotocol:main May 7, 2025
7 checks passed
@ihrpr
Copy link
Contributor

ihrpr commented May 7, 2025

Reverting in #649

  • In spec version 2025-03-26, Streamable HTTP transport supersedes the HTTP+SSE transport from version 2024-11-05
  • We're bringing the Python SDK to the new spec version (2025-03-26), which means focusing on Streamable HTTP rather than enhancing the legacy transport, unless it's a fix
  • Adding Redis integration to SSE creates maintenance burden for a transport being superseded. It would be still useful to have an MCP Server with Redis but as an example so users can reference it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants