4
4
Corresponds to TypeScript file: src/server/auth/handlers/authorize.ts
5
5
"""
6
6
7
- from typing import Literal
8
- from urllib .parse import urlencode , urlparse , urlunparse
7
+ from typing import Callable , Literal , Optional , Union
8
+ from urllib .parse import parse_qs , urlencode , urlparse , urlunparse
9
9
10
- from pydantic import AnyHttpUrl , AnyUrl , BaseModel , Field , ValidationError
10
+ from pydantic import AnyHttpUrl , AnyUrl , BaseModel , Field , RootModel , ValidationError
11
+ from starlette .datastructures import FormData , QueryParams
11
12
from starlette .requests import Request
12
13
from starlette .responses import RedirectResponse , Response
13
14
14
15
from mcp .server .auth .errors import (
15
16
InvalidClientError ,
16
17
InvalidRequestError ,
17
18
OAuthError ,
19
+ stringify_pydantic_error ,
18
20
)
19
- from mcp .server .auth .handlers .types import HandlerFn
20
- from mcp .server .auth .provider import AuthorizationParams , OAuthServerProvider
21
+ from mcp .server .auth .provider import AuthorizationParams , OAuthServerProvider , construct_redirect_uri
22
+ from mcp .shared .auth import OAuthClientInformationFull
23
+ from mcp .server .auth .json_response import PydanticJSONResponse
21
24
25
+ import logging
22
26
23
- class AuthorizationRequest (BaseModel ):
24
- """
25
- Model for the authorization request parameters.
27
+ logger = logging .getLogger (__name__ )
26
28
27
- Corresponds to request schema in authorizationHandler in
28
- src/server/auth/handlers/authorize.ts
29
- """
30
29
30
+ class AuthorizationRequest (BaseModel ):
31
+ # See https://datatracker.ietf.org/doc/html/rfc6749#section-4.1.1
31
32
client_id : str = Field (..., description = "The client ID" )
32
33
redirect_uri : AnyHttpUrl | None = Field (
33
- ... , description = "URL to redirect to after authorization"
34
+ None , description = "URL to redirect to after authorization"
34
35
)
35
36
37
+ # see OAuthClientMetadata; we only support `code`
36
38
response_type : Literal ["code" ] = Field (
37
39
..., description = "Must be 'code' for authorization code flow"
38
40
)
39
41
code_challenge : str = Field (..., description = "PKCE code challenge" )
40
42
code_challenge_method : Literal ["S256" ] = Field (
41
- "S256" , description = "PKCE code challenge method"
43
+ "S256" , description = "PKCE code challenge method, must be S256"
44
+ )
45
+ state : Optional [str ] = Field (None , description = "Optional state parameter" )
46
+ scope : Optional [str ] = Field (
47
+ None ,
48
+ description = "Optional scope; if specified, should be "
49
+ "a space-separated list of scope strings" ,
42
50
)
43
- state : str | None = Field (None , description = "Optional state parameter" )
44
- scope : str | None = Field (None , description = "Optional scope parameter" )
45
-
46
- class Config :
47
- extra = "ignore"
48
51
49
52
50
- def validate_scope (requested_scope : str | None , scope : str | None ) -> list [str ] | None :
53
+ def validate_scope (
54
+ requested_scope : str | None , client : OAuthClientInformationFull
55
+ ) -> list [str ] | None :
51
56
if requested_scope is None :
52
57
return None
53
58
requested_scopes = requested_scope .split (" " )
54
- allowed_scopes = [] if scope is None else scope .split (" " )
59
+ allowed_scopes = [] if client . scope is None else client . scope .split (" " )
55
60
for scope in requested_scopes :
56
61
if scope not in allowed_scopes :
57
62
raise InvalidRequestError (f"Client was not registered with scope { scope } " )
58
63
return requested_scopes
59
64
60
65
61
66
def validate_redirect_uri (
62
- redirect_uri : AnyHttpUrl | None , redirect_uris : list [ AnyHttpUrl ]
67
+ redirect_uri : AnyHttpUrl | None , client : OAuthClientInformationFull
63
68
) -> AnyHttpUrl :
64
- if not redirect_uris :
65
- raise InvalidClientError ("Client has no registered redirect URIs" )
66
-
67
69
if redirect_uri is not None :
68
70
# Validate redirect_uri against client's registered redirect URIs
69
- if redirect_uri not in redirect_uris :
71
+ if redirect_uri not in client . redirect_uris :
70
72
raise InvalidRequestError (
71
73
f"Redirect URI '{ redirect_uri } ' not registered for client"
72
74
)
73
75
return redirect_uri
74
- elif len (redirect_uris ) == 1 :
75
- return redirect_uris [0 ]
76
+ elif len (client . redirect_uris ) == 1 :
77
+ return client . redirect_uris [0 ]
76
78
else :
77
79
raise InvalidRequestError (
78
80
"redirect_uri must be specified when client has multiple registered URIs"
79
81
)
80
82
83
+ ErrorCode = Literal [
84
+ "invalid_request" ,
85
+ "unauthorized_client" ,
86
+ "access_denied" ,
87
+ "unsupported_response_type" ,
88
+ "invalid_scope" ,
89
+ "server_error" ,
90
+ "temporarily_unavailable"
91
+ ]
92
+
93
+ class ErrorResponse (BaseModel ):
94
+ error : ErrorCode
95
+ error_description : str
96
+ error_uri : Optional [AnyUrl ] = None
97
+ # must be set if provided in the request
98
+ state : Optional [str ]
99
+
100
+ def best_effort_extract_string (key : str , params : None | FormData | QueryParams ) -> Optional [str ]:
101
+ if params is None :
102
+ return None
103
+ value = params .get (key )
104
+ if isinstance (value , str ):
105
+ return value
106
+ return None
81
107
82
- def create_authorization_handler (provider : OAuthServerProvider ) -> HandlerFn :
83
- """
84
- Create a handler for the OAuth 2.0 Authorization endpoint.
85
-
86
- Corresponds to authorizationHandler in src/server/auth/handlers/authorize.ts
108
+ class AnyHttpUrlModel (RootModel ):
109
+ root : AnyHttpUrl
87
110
88
- """
89
111
112
+ def create_authorization_handler (provider : OAuthServerProvider ) -> Callable :
90
113
async def authorization_handler (request : Request ) -> Response :
91
- """
92
- Handler for the OAuth 2.0 Authorization endpoint.
93
- """
94
- # Validate request parameters
114
+ # implements authorization requests for grant_type=code;
115
+ # see https://datatracker.ietf.org/doc/html/rfc6749#section-4.1.1
116
+
117
+ state = None
118
+ redirect_uri = None
119
+ client = None
120
+ params = None
121
+
122
+ async def error_response (error : ErrorCode , error_description : str , attempt_load_client : bool = True ):
123
+ nonlocal client , redirect_uri , state
124
+ if client is None and attempt_load_client :
125
+ # make last-ditch attempt to load the client
126
+ client_id = best_effort_extract_string ("client_id" , params )
127
+ client = client_id and await provider .clients_store .get_client (client_id )
128
+ if redirect_uri is None and client :
129
+ # make last-ditch effort to load the redirect uri
130
+ if params is not None and "redirect_uri" not in params :
131
+ raw_redirect_uri = None
132
+ else :
133
+ raw_redirect_uri = AnyHttpUrlModel .model_validate (best_effort_extract_string ("redirect_uri" , params )).root
134
+ try :
135
+ redirect_uri = validate_redirect_uri (raw_redirect_uri , client )
136
+ except (ValidationError , InvalidRequestError ):
137
+ pass
138
+ if state is None :
139
+ # make last-ditch effort to load state
140
+ state = best_effort_extract_string ("state" , params )
141
+
142
+ error_resp = ErrorResponse (
143
+ error = error ,
144
+ error_description = error_description ,
145
+ state = state ,
146
+ )
147
+
148
+ if redirect_uri and client :
149
+ return RedirectResponse (
150
+ url = construct_redirect_uri (str (redirect_uri ), ** error_resp .model_dump (exclude_none = True )),
151
+ status_code = 302 ,
152
+ headers = {"Cache-Control" : "no-store" },
153
+ )
154
+ else :
155
+ return PydanticJSONResponse (
156
+ status_code = 400 ,
157
+ content = error_resp ,
158
+ headers = {"Cache-Control" : "no-store" },
159
+ )
160
+
95
161
try :
162
+ # Parse request parameters
96
163
if request .method == "GET" :
97
164
# Convert query_params to dict for pydantic validation
98
- params = dict (request .query_params )
99
- auth_request = AuthorizationRequest .model_validate (params )
165
+ params = request .query_params
100
166
else :
101
167
# Parse form data for POST requests
102
- form_data = await request .form ()
103
- params = dict (form_data )
168
+ params = await request .form ()
169
+
170
+ # Save state if it exists, even before validation
171
+ state = best_effort_extract_string ("state" , params )
172
+
173
+ try :
104
174
auth_request = AuthorizationRequest .model_validate (params )
105
- except ValidationError as e :
106
- raise InvalidRequestError (str (e ))
107
-
108
- # Get client information
109
- client = await provider .clients_store .get_client (auth_request .client_id )
110
-
111
- if not client :
112
- raise InvalidClientError (f"Client ID '{ auth_request .client_id } ' not found" )
113
-
114
- # do validation which is dependent on the client configuration
115
- redirect_uri = validate_redirect_uri (
116
- auth_request .redirect_uri , client .redirect_uris
117
- )
118
- scopes = validate_scope (auth_request .scope , client .scope )
119
-
120
- auth_params = AuthorizationParams (
121
- state = auth_request .state ,
122
- scopes = scopes ,
123
- code_challenge = auth_request .code_challenge ,
124
- redirect_uri = redirect_uri ,
125
- )
126
-
127
- response = RedirectResponse (
128
- url = "" , status_code = 302 , headers = {"Cache-Control" : "no-store" }
129
- )
130
-
131
- try :
132
- # Let the provider handle the authorization flow
133
- await provider .authorize (client , auth_params , response )
134
-
135
- return response
136
- except Exception as e :
137
- return RedirectResponse (
138
- url = create_error_redirect (redirect_uri , e , auth_request .state ),
139
- status_code = 302 ,
140
- headers = {"Cache-Control" : "no-store" },
175
+ state = auth_request .state # Update with validated state
176
+ except ValidationError as validation_error :
177
+ error : ErrorCode = "invalid_request"
178
+ for e in validation_error .errors ():
179
+ if e ['loc' ] == ('response_type' ,) and e ['type' ] == 'literal_error' :
180
+ error = "unsupported_response_type"
181
+ break
182
+ return await error_response (error , stringify_pydantic_error (validation_error ))
183
+
184
+ # Get client information
185
+ client = await provider .clients_store .get_client (auth_request .client_id )
186
+ if not client :
187
+ # For client_id validation errors, return direct error (no redirect)
188
+ return await error_response (
189
+ error = "invalid_request" ,
190
+ error_description = f"Client ID '{ auth_request .client_id } ' not found" ,
191
+ attempt_load_client = False ,
192
+ )
193
+
194
+
195
+ # Validate redirect_uri against client's registered URIs
196
+ try :
197
+ redirect_uri = validate_redirect_uri (auth_request .redirect_uri , client )
198
+ except InvalidRequestError as validation_error :
199
+ # For redirect_uri validation errors, return direct error (no redirect)
200
+ return await error_response (
201
+ error = "invalid_request" ,
202
+ error_description = validation_error .message ,
203
+ )
204
+
205
+ # Validate scope - for scope errors, we can redirect
206
+ try :
207
+ scopes = validate_scope (auth_request .scope , client )
208
+ except InvalidRequestError as validation_error :
209
+ # For scope errors, redirect with error parameters
210
+ return await error_response (
211
+ error = "invalid_scope" ,
212
+ error_description = validation_error .message ,
213
+ )
214
+
215
+ # Setup authorization parameters
216
+ auth_params = AuthorizationParams (
217
+ state = state ,
218
+ scopes = scopes ,
219
+ code_challenge = auth_request .code_challenge ,
220
+ redirect_uri = redirect_uri ,
141
221
)
222
+
223
+ # Let the provider pick the next URI to redirect to
224
+ response = RedirectResponse (
225
+ url = "" , status_code = 302 , headers = {"Cache-Control" : "no-store" }
226
+ )
227
+ response .headers ["location" ] = await provider .authorize (
228
+ client , auth_params
229
+ )
230
+ return response
231
+
232
+ except Exception as validation_error :
233
+ # Catch-all for unexpected errors
234
+ logger .exception ("Unexpected error in authorization_handler" , exc_info = validation_error )
235
+ return await error_response (error = "server_error" , error_description = "An unexpected error occurred" )
142
236
143
237
return authorization_handler
144
238
145
239
146
240
def create_error_redirect (
147
- redirect_uri : AnyUrl , error : Exception , state : str | None
241
+ redirect_uri : AnyUrl , error : Union [ Exception , ErrorResponse ]
148
242
) -> str :
149
243
parsed_uri = urlparse (str (redirect_uri ))
150
- if isinstance (error , OAuthError ):
244
+
245
+ if isinstance (error , ErrorResponse ):
246
+ # Convert ErrorResponse to dict
247
+ error_dict = error .model_dump (exclude_none = True )
248
+ query_params = {}
249
+ for key , value in error_dict .items ():
250
+ if value is not None :
251
+ if key == "error_uri" and hasattr (value , "__str__" ):
252
+ query_params [key ] = str (value )
253
+ else :
254
+ query_params [key ] = value
255
+
256
+ elif isinstance (error , OAuthError ):
151
257
query_params = {"error" : error .error_code , "error_description" : str (error )}
152
258
else :
153
259
query_params = {
154
- "error" : "internal_error " ,
260
+ "error" : "server_error " ,
155
261
"error_description" : "An unknown error occurred" ,
156
262
}
157
- # TODO: should we add error_uri?
158
- # if error.error_uri:
159
- # query_params["error_uri"] = str(error.error_uri)
160
- if state :
161
- query_params ["state" ] = state
162
263
163
264
new_query = urlencode (query_params )
164
265
if parsed_uri .query :
165
266
new_query = f"{ parsed_uri .query } &{ new_query } "
166
267
167
- return urlunparse (parsed_uri ._replace (query = new_query ))
268
+ return urlunparse (parsed_uri ._replace (query = new_query ))
0 commit comments