Skip to content

Commit 0661b32

Browse files
committed
Merge branch 'main' into ihrpr/shttp-usability
2 parents 01f3f05 + 58c5e72 commit 0661b32

File tree

40 files changed

+2893
-832
lines changed

40 files changed

+2893
-832
lines changed

examples/servers/simple-prompt/mcp_simple_prompt/server.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ async def get_prompt(
9090
if transport == "sse":
9191
from mcp.server.sse import SseServerTransport
9292
from starlette.applications import Starlette
93+
from starlette.responses import Response
9394
from starlette.routing import Mount, Route
9495

9596
sse = SseServerTransport("/messages/")
@@ -101,6 +102,7 @@ async def handle_sse(request):
101102
await app.run(
102103
streams[0], streams[1], app.create_initialization_options()
103104
)
105+
return Response()
104106

105107
starlette_app = Starlette(
106108
debug=True,

examples/servers/simple-resource/mcp_simple_resource/server.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ async def read_resource(uri: FileUrl) -> str | bytes:
4646
if transport == "sse":
4747
from mcp.server.sse import SseServerTransport
4848
from starlette.applications import Starlette
49+
from starlette.responses import Response
4950
from starlette.routing import Mount, Route
5051

5152
sse = SseServerTransport("/messages/")
@@ -57,11 +58,12 @@ async def handle_sse(request):
5758
await app.run(
5859
streams[0], streams[1], app.create_initialization_options()
5960
)
61+
return Response()
6062

6163
starlette_app = Starlette(
6264
debug=True,
6365
routes=[
64-
Route("/sse", endpoint=handle_sse),
66+
Route("/sse", endpoint=handle_sse, methods=["GET"]),
6567
Mount("/messages/", app=sse.handle_post_message),
6668
],
6769
)
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# MCP Simple StreamableHttp Stateless Server Example
2+
3+
A stateless MCP server example demonstrating the StreamableHttp transport without maintaining session state. This example is ideal for understanding how to deploy MCP servers in multi-node environments where requests can be routed to any instance.
4+
5+
## Features
6+
7+
- Uses the StreamableHTTP transport in stateless mode (mcp_session_id=None)
8+
- Each request creates a new ephemeral connection
9+
- No session state maintained between requests
10+
- Task lifecycle scoped to individual requests
11+
- Suitable for deployment in multi-node environments
12+
13+
14+
## Usage
15+
16+
Start the server:
17+
18+
```bash
19+
# Using default port 3000
20+
uv run mcp-simple-streamablehttp-stateless
21+
22+
# Using custom port
23+
uv run mcp-simple-streamablehttp-stateless --port 3000
24+
25+
# Custom logging level
26+
uv run mcp-simple-streamablehttp-stateless --log-level DEBUG
27+
28+
# Enable JSON responses instead of SSE streams
29+
uv run mcp-simple-streamablehttp-stateless --json-response
30+
```
31+
32+
The server exposes a tool named "start-notification-stream" that accepts three arguments:
33+
34+
- `interval`: Time between notifications in seconds (e.g., 1.0)
35+
- `count`: Number of notifications to send (e.g., 5)
36+
- `caller`: Identifier string for the caller
37+
38+
39+
## Client
40+
41+
You can connect to this server using an HTTP client. For now, only the TypeScript SDK has streamable HTTP client examples, or you can use [Inspector](https://github.com/modelcontextprotocol/inspector) for testing.

examples/servers/simple-streamablehttp-stateless/mcp_simple_streamablehttp_stateless/__init__.py

Whitespace-only changes.
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
from .server import main
2+
3+
if __name__ == "__main__":
4+
main()
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
import contextlib
2+
import logging
3+
4+
import anyio
5+
import click
6+
import mcp.types as types
7+
from mcp.server.lowlevel import Server
8+
from mcp.server.streamableHttp import (
9+
StreamableHTTPServerTransport,
10+
)
11+
from starlette.applications import Starlette
12+
from starlette.routing import Mount
13+
14+
logger = logging.getLogger(__name__)
15+
# Global task group that will be initialized in the lifespan
16+
task_group = None
17+
18+
19+
@contextlib.asynccontextmanager
20+
async def lifespan(app):
21+
"""Application lifespan context manager for managing task group."""
22+
global task_group
23+
24+
async with anyio.create_task_group() as tg:
25+
task_group = tg
26+
logger.info("Application started, task group initialized!")
27+
try:
28+
yield
29+
finally:
30+
logger.info("Application shutting down, cleaning up resources...")
31+
if task_group:
32+
tg.cancel_scope.cancel()
33+
task_group = None
34+
logger.info("Resources cleaned up successfully.")
35+
36+
37+
@click.command()
38+
@click.option("--port", default=3000, help="Port to listen on for HTTP")
39+
@click.option(
40+
"--log-level",
41+
default="INFO",
42+
help="Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)",
43+
)
44+
@click.option(
45+
"--json-response",
46+
is_flag=True,
47+
default=False,
48+
help="Enable JSON responses instead of SSE streams",
49+
)
50+
def main(
51+
port: int,
52+
log_level: str,
53+
json_response: bool,
54+
) -> int:
55+
# Configure logging
56+
logging.basicConfig(
57+
level=getattr(logging, log_level.upper()),
58+
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
59+
)
60+
61+
app = Server("mcp-streamable-http-stateless-demo")
62+
63+
@app.call_tool()
64+
async def call_tool(
65+
name: str, arguments: dict
66+
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
67+
ctx = app.request_context
68+
interval = arguments.get("interval", 1.0)
69+
count = arguments.get("count", 5)
70+
caller = arguments.get("caller", "unknown")
71+
72+
# Send the specified number of notifications with the given interval
73+
for i in range(count):
74+
await ctx.session.send_log_message(
75+
level="info",
76+
data=f"Notification {i+1}/{count} from caller: {caller}",
77+
logger="notification_stream",
78+
related_request_id=ctx.request_id,
79+
)
80+
if i < count - 1: # Don't wait after the last notification
81+
await anyio.sleep(interval)
82+
83+
return [
84+
types.TextContent(
85+
type="text",
86+
text=(
87+
f"Sent {count} notifications with {interval}s interval"
88+
f" for caller: {caller}"
89+
),
90+
)
91+
]
92+
93+
@app.list_tools()
94+
async def list_tools() -> list[types.Tool]:
95+
return [
96+
types.Tool(
97+
name="start-notification-stream",
98+
description=(
99+
"Sends a stream of notifications with configurable count"
100+
" and interval"
101+
),
102+
inputSchema={
103+
"type": "object",
104+
"required": ["interval", "count", "caller"],
105+
"properties": {
106+
"interval": {
107+
"type": "number",
108+
"description": "Interval between notifications in seconds",
109+
},
110+
"count": {
111+
"type": "number",
112+
"description": "Number of notifications to send",
113+
},
114+
"caller": {
115+
"type": "string",
116+
"description": (
117+
"Identifier of the caller to include in notifications"
118+
),
119+
},
120+
},
121+
},
122+
)
123+
]
124+
125+
# ASGI handler for stateless HTTP connections
126+
async def handle_streamable_http(scope, receive, send):
127+
logger.debug("Creating new transport")
128+
# Use lock to prevent race conditions when creating new sessions
129+
http_transport = StreamableHTTPServerTransport(
130+
mcp_session_id=None,
131+
is_json_response_enabled=json_response,
132+
)
133+
async with http_transport.connect() as streams:
134+
read_stream, write_stream = streams
135+
136+
if not task_group:
137+
raise RuntimeError("Task group is not initialized")
138+
139+
async def run_server():
140+
await app.run(
141+
read_stream,
142+
write_stream,
143+
app.create_initialization_options(),
144+
# Runs in standalone mode for stateless deployments
145+
# where clients perform initialization with any node
146+
standalone_mode=True,
147+
)
148+
149+
# Start server task
150+
task_group.start_soon(run_server)
151+
152+
# Handle the HTTP request and return the response
153+
await http_transport.handle_request(scope, receive, send)
154+
155+
# Create an ASGI application using the transport
156+
starlette_app = Starlette(
157+
debug=True,
158+
routes=[
159+
Mount("/mcp", app=handle_streamable_http),
160+
],
161+
lifespan=lifespan,
162+
)
163+
164+
import uvicorn
165+
166+
uvicorn.run(starlette_app, host="0.0.0.0", port=port)
167+
168+
return 0
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
[project]
2+
name = "mcp-simple-streamablehttp-stateless"
3+
version = "0.1.0"
4+
description = "A simple MCP server exposing a StreamableHttp transport in stateless mode"
5+
readme = "README.md"
6+
requires-python = ">=3.10"
7+
authors = [{ name = "Anthropic, PBC." }]
8+
keywords = ["mcp", "llm", "automation", "web", "fetch", "http", "streamable", "stateless"]
9+
license = { text = "MIT" }
10+
dependencies = ["anyio>=4.5", "click>=8.1.0", "httpx>=0.27", "mcp", "starlette", "uvicorn"]
11+
12+
[project.scripts]
13+
mcp-simple-streamablehttp-stateless = "mcp_simple_streamablehttp_stateless.server:main"
14+
15+
[build-system]
16+
requires = ["hatchling"]
17+
build-backend = "hatchling.build"
18+
19+
[tool.hatch.build.targets.wheel]
20+
packages = ["mcp_simple_streamablehttp_stateless"]
21+
22+
[tool.pyright]
23+
include = ["mcp_simple_streamablehttp_stateless"]
24+
venvPath = "."
25+
venv = ".venv"
26+
27+
[tool.ruff.lint]
28+
select = ["E", "F", "I"]
29+
ignore = []
30+
31+
[tool.ruff]
32+
line-length = 88
33+
target-version = "py310"
34+
35+
[tool.uv]
36+
dev-dependencies = ["pyright>=1.1.378", "pytest>=8.3.3", "ruff>=0.6.9"]

examples/servers/simple-streamablehttp/README.md

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ A simple MCP server example demonstrating the StreamableHttp transport, which en
99
- Task management with anyio task groups
1010
- Ability to send multiple notifications over time to the client
1111
- Proper resource cleanup and lifespan management
12+
- Resumability support via InMemoryEventStore
1213

1314
## Usage
1415

@@ -32,6 +33,23 @@ The server exposes a tool named "start-notification-stream" that accepts three a
3233
- `count`: Number of notifications to send (e.g., 5)
3334
- `caller`: Identifier string for the caller
3435

36+
## Resumability Support
37+
38+
This server includes resumability support through the InMemoryEventStore. This enables clients to:
39+
40+
- Reconnect to the server after a disconnection
41+
- Resume event streaming from where they left off using the Last-Event-ID header
42+
43+
44+
The server will:
45+
- Generate unique event IDs for each SSE message
46+
- Store events in memory for later replay
47+
- Replay missed events when a client reconnects with a Last-Event-ID header
48+
49+
Note: The InMemoryEventStore is designed for demonstration purposes only. For production use, consider implementing a persistent storage solution.
50+
51+
52+
3553
## Client
3654

37-
You can connect to this server using an HTTP client, for now only Typescript SDK has streamable HTTP client examples or you can use (Inspector)[https://github.com/modelcontextprotocol/inspector]
55+
You can connect to this server using an HTTP client, for now only Typescript SDK has streamable HTTP client examples or you can use [Inspector](https://github.com/modelcontextprotocol/inspector)

0 commit comments

Comments
 (0)