Skip to content

Commit 9dfc925

Browse files
authored
StreamableHttp client transport (#573)
1 parent 46523af commit 9dfc925

File tree

5 files changed

+498
-15
lines changed

5 files changed

+498
-15
lines changed

examples/servers/simple-streamablehttp/mcp_simple_streamablehttp/server.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import click
88
import mcp.types as types
99
from mcp.server.lowlevel import Server
10-
from mcp.server.streamableHttp import (
10+
from mcp.server.streamable_http import (
1111
MCP_SESSION_ID_HEADER,
1212
StreamableHTTPServerTransport,
1313
)

src/mcp/client/streamable_http.py

Lines changed: 258 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,258 @@
1+
"""
2+
StreamableHTTP Client Transport Module
3+
4+
This module implements the StreamableHTTP transport for MCP clients,
5+
providing support for HTTP POST requests with optional SSE streaming responses
6+
and session management.
7+
"""
8+
9+
import logging
10+
from contextlib import asynccontextmanager
11+
from datetime import timedelta
12+
from typing import Any
13+
14+
import anyio
15+
import httpx
16+
from httpx_sse import EventSource, aconnect_sse
17+
18+
from mcp.types import (
19+
ErrorData,
20+
JSONRPCError,
21+
JSONRPCMessage,
22+
JSONRPCNotification,
23+
JSONRPCRequest,
24+
)
25+
26+
logger = logging.getLogger(__name__)
27+
28+
# Header names
29+
MCP_SESSION_ID_HEADER = "mcp-session-id"
30+
LAST_EVENT_ID_HEADER = "last-event-id"
31+
32+
# Content types
33+
CONTENT_TYPE_JSON = "application/json"
34+
CONTENT_TYPE_SSE = "text/event-stream"
35+
36+
37+
@asynccontextmanager
38+
async def streamablehttp_client(
39+
url: str,
40+
headers: dict[str, Any] | None = None,
41+
timeout: timedelta = timedelta(seconds=30),
42+
sse_read_timeout: timedelta = timedelta(seconds=60 * 5),
43+
):
44+
"""
45+
Client transport for StreamableHTTP.
46+
47+
`sse_read_timeout` determines how long (in seconds) the client will wait for a new
48+
event before disconnecting. All other HTTP operations are controlled by `timeout`.
49+
50+
Yields:
51+
Tuple of (read_stream, write_stream, terminate_callback)
52+
"""
53+
54+
read_stream_writer, read_stream = anyio.create_memory_object_stream[
55+
JSONRPCMessage | Exception
56+
](0)
57+
write_stream, write_stream_reader = anyio.create_memory_object_stream[
58+
JSONRPCMessage
59+
](0)
60+
61+
async def get_stream():
62+
"""
63+
Optional GET stream for server-initiated messages
64+
"""
65+
nonlocal session_id
66+
try:
67+
# Only attempt GET if we have a session ID
68+
if not session_id:
69+
return
70+
71+
get_headers = request_headers.copy()
72+
get_headers[MCP_SESSION_ID_HEADER] = session_id
73+
74+
async with aconnect_sse(
75+
client,
76+
"GET",
77+
url,
78+
headers=get_headers,
79+
timeout=httpx.Timeout(timeout.seconds, read=sse_read_timeout.seconds),
80+
) as event_source:
81+
event_source.response.raise_for_status()
82+
logger.debug("GET SSE connection established")
83+
84+
async for sse in event_source.aiter_sse():
85+
if sse.event == "message":
86+
try:
87+
message = JSONRPCMessage.model_validate_json(sse.data)
88+
logger.debug(f"GET message: {message}")
89+
await read_stream_writer.send(message)
90+
except Exception as exc:
91+
logger.error(f"Error parsing GET message: {exc}")
92+
await read_stream_writer.send(exc)
93+
else:
94+
logger.warning(f"Unknown SSE event from GET: {sse.event}")
95+
except Exception as exc:
96+
# GET stream is optional, so don't propagate errors
97+
logger.debug(f"GET stream error (non-fatal): {exc}")
98+
99+
async def post_writer(client: httpx.AsyncClient):
100+
nonlocal session_id
101+
try:
102+
async with write_stream_reader:
103+
async for message in write_stream_reader:
104+
# Add session ID to headers if we have one
105+
post_headers = request_headers.copy()
106+
if session_id:
107+
post_headers[MCP_SESSION_ID_HEADER] = session_id
108+
109+
logger.debug(f"Sending client message: {message}")
110+
111+
# Handle initial initialization request
112+
is_initialization = (
113+
isinstance(message.root, JSONRPCRequest)
114+
and message.root.method == "initialize"
115+
)
116+
if (
117+
isinstance(message.root, JSONRPCNotification)
118+
and message.root.method == "notifications/initialized"
119+
):
120+
tg.start_soon(get_stream)
121+
122+
async with client.stream(
123+
"POST",
124+
url,
125+
json=message.model_dump(
126+
by_alias=True, mode="json", exclude_none=True
127+
),
128+
headers=post_headers,
129+
) as response:
130+
if response.status_code == 202:
131+
logger.debug("Received 202 Accepted")
132+
continue
133+
# Check for 404 (session expired/invalid)
134+
if response.status_code == 404:
135+
if isinstance(message.root, JSONRPCRequest):
136+
jsonrpc_error = JSONRPCError(
137+
jsonrpc="2.0",
138+
id=message.root.id,
139+
error=ErrorData(
140+
code=32600,
141+
message="Session terminated",
142+
),
143+
)
144+
await read_stream_writer.send(
145+
JSONRPCMessage(jsonrpc_error)
146+
)
147+
continue
148+
response.raise_for_status()
149+
150+
# Extract session ID from response headers
151+
if is_initialization:
152+
new_session_id = response.headers.get(MCP_SESSION_ID_HEADER)
153+
if new_session_id:
154+
session_id = new_session_id
155+
logger.info(f"Received session ID: {session_id}")
156+
157+
# Handle different response types
158+
content_type = response.headers.get("content-type", "").lower()
159+
160+
if content_type.startswith(CONTENT_TYPE_JSON):
161+
try:
162+
content = await response.aread()
163+
json_message = JSONRPCMessage.model_validate_json(
164+
content
165+
)
166+
await read_stream_writer.send(json_message)
167+
except Exception as exc:
168+
logger.error(f"Error parsing JSON response: {exc}")
169+
await read_stream_writer.send(exc)
170+
171+
elif content_type.startswith(CONTENT_TYPE_SSE):
172+
# Parse SSE events from the response
173+
try:
174+
event_source = EventSource(response)
175+
async for sse in event_source.aiter_sse():
176+
if sse.event == "message":
177+
try:
178+
await read_stream_writer.send(
179+
JSONRPCMessage.model_validate_json(
180+
sse.data
181+
)
182+
)
183+
except Exception as exc:
184+
logger.exception("Error parsing message")
185+
await read_stream_writer.send(exc)
186+
else:
187+
logger.warning(f"Unknown event: {sse.event}")
188+
189+
except Exception as e:
190+
logger.exception("Error reading SSE stream:")
191+
await read_stream_writer.send(e)
192+
193+
else:
194+
# For 202 Accepted with no body
195+
if response.status_code == 202:
196+
logger.debug("Received 202 Accepted")
197+
continue
198+
199+
error_msg = f"Unexpected content type: {content_type}"
200+
logger.error(error_msg)
201+
await read_stream_writer.send(ValueError(error_msg))
202+
203+
except Exception as exc:
204+
logger.error(f"Error in post_writer: {exc}")
205+
finally:
206+
await read_stream_writer.aclose()
207+
await write_stream.aclose()
208+
209+
async def terminate_session():
210+
"""
211+
Terminate the session by sending a DELETE request.
212+
"""
213+
nonlocal session_id
214+
if not session_id:
215+
return # No session to terminate
216+
217+
try:
218+
delete_headers = request_headers.copy()
219+
delete_headers[MCP_SESSION_ID_HEADER] = session_id
220+
221+
response = await client.delete(
222+
url,
223+
headers=delete_headers,
224+
)
225+
226+
if response.status_code == 405:
227+
# Server doesn't allow client-initiated termination
228+
logger.debug("Server does not allow session termination")
229+
elif response.status_code != 200:
230+
logger.warning(f"Session termination failed: {response.status_code}")
231+
except Exception as exc:
232+
logger.warning(f"Session termination failed: {exc}")
233+
234+
async with anyio.create_task_group() as tg:
235+
try:
236+
logger.info(f"Connecting to StreamableHTTP endpoint: {url}")
237+
# Set up headers with required Accept header
238+
request_headers = {
239+
"Accept": f"{CONTENT_TYPE_JSON}, {CONTENT_TYPE_SSE}",
240+
"Content-Type": CONTENT_TYPE_JSON,
241+
**(headers or {}),
242+
}
243+
# Track session ID if provided by server
244+
session_id: str | None = None
245+
246+
async with httpx.AsyncClient(
247+
headers=request_headers,
248+
timeout=httpx.Timeout(timeout.seconds, read=sse_read_timeout.seconds),
249+
follow_redirects=True,
250+
) as client:
251+
tg.start_soon(post_writer, client)
252+
try:
253+
yield read_stream, write_stream, terminate_session
254+
finally:
255+
tg.cancel_scope.cancel()
256+
finally:
257+
await read_stream_writer.aclose()
258+
await write_stream.aclose()
File renamed without changes.

0 commit comments

Comments
 (0)