-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Add message queue for SSE messages POST endpoint #459
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add message queue for SSE messages POST endpoint #459
Conversation
src/mcp/server/sse.py
Outdated
@@ -96,7 +101,12 @@ async def connect_sse(self, scope: Scope, receive: Receive, send: Send): | |||
|
|||
session_id = uuid4() | |||
session_uri = f"{quote(self._endpoint)}?session_id={session_id.hex}" | |||
self._read_stream_writers[session_id] = read_stream_writer | |||
|
|||
async def message_callback(message: types.JSONRPCMessage | Exception) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now, this will block if the MCP server implementation doesn't consume read_stream
quickly enough. For the in-memory dispatch implementation, this is basically fine, because that just causes the corresponding POST /message
call to block. However, in the Redis implementation, it will block anything else from getting consumed from the Redis pubsub stream.
Fundamentally, something has to have a sized buffer (with size>0; the channels we have here are size=0, so messages block immediately), and writes to that buffer must fail once that buffer fills up. Moreover, we need to signal error to the client when this happens, and probably shut down. I'm not 100% sure on what the right way to signal that error to the client is. If the message is a request, we can return an error response... but if it's a notification, or a response, it's not clear to me how to signal to the client that something went wrong. LoggingMessageNotification
is the closest thing I can find in the spec: https://github.com/modelcontextprotocol/modelcontextprotocol/blob/main/schema/2025-03-26/schema.ts. That doesn't seem like quite what this was meant for, though. I guess we could send an error log notification, then cancel the taskgroup / close the connection, but it feels a little bit wrong.
@jerome3o-anthropic: do you have thoughts on how the server can best signal to the client "ahh, fatal error, shutting down"?
As for how to implement this; I think what would make the most sense to me is to have the read_stream
buffer be given some size (say, 32, but that's totally arbitrary), make the message_callback
be a synchronous function, and call read_stream_writer.send_nowait()
. If it errors out, we can then push another message to the SSE stream writer. (This should also be with nowait; we'll need to add a buffer to the SSE channel to accommodate this. If the nowait call fails, we should just silently move on - the client is probably jammed up somehow, and isn't going to get our message anyway.)We should just skip sending an error to the client if the client is backed up reading messages.) Once that's done, we can cancel the TG (though we'll need to split the TG used to send the SSE stream from the one that wraps the yield
from this function.
This logic is a bit fiddly, so it's probably best to implement this here, rather than in the message dispatch we expect clients might implement variants of.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the observed functionality here different between the Redis implementation vs InMemory?
Wouldn't we block at message_callback
in both cases? Or are you saying this is fine in the InMemory case because we are also blocking the POST
response? Not sure what guarantees there are in the SSE protocol for how fast the response needs to be after the POST
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I have a potential solution here; we have a higher level task group take care of the handler logic (which is separate from the listener task group). So once the listener task group completes; we still allow all the handlers to finish out. This allows us to follow the same behavior as the original in-memory implementation with an essentially 'infinite' sized buffer.
Let me know what you think; alternatively we can go to a fixed size stream_buffer and throw errors back to the message POST if we exceed that. Initially attempted this approach, but there wasn't a straightforward way to plumb through the ACKs on handler completion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this is more elegant. I'm a little wary of arbitrarily deeply queueing tasks, but I think in practice it's probably fine. We can always come back to this later if becomes a problem in practice.
@asynccontextmanager | ||
async def active_for_request(self, session_id: UUID, callback: MessageCallback): | ||
"""Request-scoped context manager that ensures the listener task is running.""" | ||
await self._redis.sadd(self._active_sessions_key, session_id.hex) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's somewhat preferable to use separate keys for tracking the active sessions, rather than one big set value, so the set doesn't grow unboundedly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like the max # of redis keys & the max number of members in a set are equal, so I think there should be no difference in practice? https://redis.io/docs/latest/develop/data-types/sets/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Huh, I guess that seems fine to me, then.
try: | ||
yield | ||
finally: | ||
tg.cancel_scope.cancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One kinda squirrelly part of this is that it's extremely important that the cancel doesn't happen after we've read a message off the Redis connection, otherwise the data will just get entirely lost. Right now, I think there's a failure mode here where one SSE stream being closed at the wrong moment will cause messages from another SSE stream to be dropped.
Looking through the Redis implementation, there are definitely multiple async
calls within get_message
, so it's possible for the bytes to be read off the connection, then the context to be cancelled, and the message dropped.
I think the way we have to structure this is either with multiple connections, or by wrapping the get_message()
+ dispatch calls in a shield=True
scope: https://anyio.readthedocs.io/en/stable/cancellation.html#shielding. We'd then need to set a timeout on the get_message
call, and just silently retry if the read times out. (This essentially gives us known safe points that we're OK being cancelled at.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking great! All of the code looks logically right to me - I think the one remaining thing that we should probably do is to add some test coverage for the Redis dispatch code. This logic is fairly complicated, and having to test it manually isn't ideal. FakeRedis is a pretty good way to write these sorts of tests, so we don't actually have to run a separate Redis container.
Hey folks - thanks for the awesome efforts here! I've tested the latest commit and can confirm it has fixed the "Could not find session for ID: [session-id]" issue I was running into with an earlier commit. I'm seeing another issue that would be useful to try and figure out. If you have clients connected to your MCP server (running the code from this latest commit), and then restart your MCP server (for example, as would happen when you deploy an update to your containers running the MCP server) then any previously connected clients that make any subsequent tool requests result in this error:
And the client shows a "tool result" error with the message: Steps to reproduce:
I've also recorded a quick run through of reproducing the error here: https://supercut.video/share/safety-cybersecurity/OXRer1Pj4jjTvSO8FtPgzl Let me know if you need any help reproducing or if it'd be useful to hop on a quick call to discuss. |
Thanks for reporting the bug! I did some debugging and I think I have the root cause figured out. When you shutdown the MCP server and restart it, MCP Inspector attempts to auto-reconnect the SSE session on a cadence. When this happens, it initiates a new SSE connection (getting a new session ID), but it doesn't perform a re-initialization of the SSE connection. In the in-memory (non-Redis) implementation, this causes an error when you try to do a tool call/query: However, in the Redis implementation there's a race condition. The same RuntimeError on initialization completion triggers the The fact that we haven't noticed this before also exposes a separate bug: with the current connect_sse method, it never actually exits. Even when the EventSourceResponse task closes on SSE disconnect, the request just stays active because we have no mechanism to kill it. That's what I've found so far - will try to have a fix out soon. For a quick explainer on the Exception swallowing, here's a repro which I think explains it pretty well: from contextlib import asynccontextmanager
import anyio
@asynccontextmanager
async def test():
async with anyio.create_task_group() as tg:
try:
yield
finally:
tg.cancel_scope.cancel()
await anyio.sleep(0.01)
print("In finally block - cancelling task group...")
async def main():
async with anyio.create_task_group() as tg:
async with test():
await anyio.sleep(0.5)
raise Exception("Exception in main")
print("Done!")
if __name__ == "__main__":
anyio.run(main) The result of this is the program completing successfully, but not printing the |
Reverting in #649
|
Motivation and Context
This PR decouples session state from individual server instances by introducing a message dispatch abstraction. The previous implementation required client messages to be handled by the same server that initialized the session. Now, servers poll a shared message dispatch (with in-memory or Redis backend) to receive client messages, allowing any server instance to handle POST requests for any session to the message endpoint.
How Has This Been Tested?
All unit tests are passing, tested redis implementation locally. Added unit / integration tests as well for redis implementation.
Breaking Changes
N/A
Types of changes
Checklist
Additional context