4
4
5
5
import contextlib
6
6
import logging
7
+ import threading
7
8
from collections .abc import AsyncIterator
8
9
from http import HTTPStatus
9
10
from typing import Any
@@ -37,6 +38,10 @@ class StreamableHTTPSessionManager:
37
38
3. Connection management and lifecycle
38
39
4. Request handling and transport setup
39
40
41
+ Important: Only one StreamableHTTPSessionManager instance should be created
42
+ per application. The instance cannot be reused after its run() context has
43
+ completed. If you need to restart the manager, create a new instance.
44
+
40
45
Args:
41
46
app: The MCP server instance
42
47
event_store: Optional event store for resumability support.
@@ -67,6 +72,9 @@ def __init__(
67
72
68
73
# The task group will be set during lifespan
69
74
self ._task_group = None
75
+ # Thread-safe tracking of run() calls
76
+ self ._run_lock = threading .Lock ()
77
+ self ._has_started = False
70
78
71
79
@contextlib .asynccontextmanager
72
80
async def run (self ) -> AsyncIterator [None ]:
@@ -75,13 +83,26 @@ async def run(self) -> AsyncIterator[None]:
75
83
76
84
This creates and manages the task group for all session operations.
77
85
86
+ Important: This method can only be called once per instance. The same
87
+ StreamableHTTPSessionManager instance cannot be reused after this
88
+ context manager exits. Create a new instance if you need to restart.
89
+
78
90
Use this in the lifespan context manager of your Starlette app:
79
91
80
92
@contextlib.asynccontextmanager
81
93
async def lifespan(app: Starlette) -> AsyncIterator[None]:
82
94
async with session_manager.run():
83
95
yield
84
96
"""
97
+ # Thread-safe check to ensure run() is only called once
98
+ with self ._run_lock :
99
+ if self ._has_started :
100
+ raise RuntimeError (
101
+ "StreamableHTTPSessionManager .run() can only be called "
102
+ "once per instance. Create a new instance if you need to run again."
103
+ )
104
+ self ._has_started = True
105
+
85
106
async with anyio .create_task_group () as tg :
86
107
# Store the task group for later use
87
108
self ._task_group = tg
@@ -113,9 +134,7 @@ async def handle_request(
113
134
send: ASGI send function
114
135
"""
115
136
if self ._task_group is None :
116
- raise RuntimeError (
117
- "Task group is not initialized. Make sure to use the run()."
118
- )
137
+ raise RuntimeError ("Task group is not initialized. Make sure to use run()." )
119
138
120
139
# Dispatch to the appropriate handler
121
140
if self .stateless :
0 commit comments