4
4
Corresponds to TypeScript file: src/server/auth/handlers/authorize.ts
5
5
"""
6
6
7
+ import logging
7
8
from typing import Callable , Literal , Optional , Union
8
- from urllib .parse import parse_qs , urlencode , urlparse , urlunparse
9
+ from urllib .parse import urlencode , urlparse , urlunparse
9
10
10
11
from pydantic import AnyHttpUrl , AnyUrl , BaseModel , Field , RootModel , ValidationError
11
12
from starlette .datastructures import FormData , QueryParams
12
13
from starlette .requests import Request
13
14
from starlette .responses import RedirectResponse , Response
14
15
15
16
from mcp .server .auth .errors import (
16
- InvalidClientError ,
17
17
InvalidRequestError ,
18
18
OAuthError ,
19
19
stringify_pydantic_error ,
20
20
)
21
- from mcp .server .auth .provider import AuthorizationParams , OAuthServerProvider , construct_redirect_uri
22
- from mcp .shared .auth import OAuthClientInformationFull
23
21
from mcp .server .auth .json_response import PydanticJSONResponse
24
-
25
- import logging
22
+ from mcp .server .auth .provider import (
23
+ AuthorizationParams ,
24
+ OAuthServerProvider ,
25
+ construct_redirect_uri ,
26
+ )
27
+ from mcp .shared .auth import OAuthClientInformationFull
26
28
27
29
logger = logging .getLogger (__name__ )
28
30
@@ -80,15 +82,17 @@ def validate_redirect_uri(
80
82
"redirect_uri must be specified when client has multiple registered URIs"
81
83
)
82
84
85
+
83
86
ErrorCode = Literal [
84
- "invalid_request" ,
85
- "unauthorized_client" ,
86
- "access_denied" ,
87
- "unsupported_response_type" ,
88
- "invalid_scope" ,
89
- "server_error" ,
90
- "temporarily_unavailable"
91
- ]
87
+ "invalid_request" ,
88
+ "unauthorized_client" ,
89
+ "access_denied" ,
90
+ "unsupported_response_type" ,
91
+ "invalid_scope" ,
92
+ "server_error" ,
93
+ "temporarily_unavailable" ,
94
+ ]
95
+
92
96
93
97
class ErrorResponse (BaseModel ):
94
98
error : ErrorCode
@@ -97,14 +101,18 @@ class ErrorResponse(BaseModel):
97
101
# must be set if provided in the request
98
102
state : Optional [str ]
99
103
100
- def best_effort_extract_string (key : str , params : None | FormData | QueryParams ) -> Optional [str ]:
104
+
105
+ def best_effort_extract_string (
106
+ key : str , params : None | FormData | QueryParams
107
+ ) -> Optional [str ]:
101
108
if params is None :
102
109
return None
103
110
value = params .get (key )
104
111
if isinstance (value , str ):
105
112
return value
106
113
return None
107
114
115
+
108
116
class AnyHttpUrlModel (RootModel ):
109
117
root : AnyHttpUrl
110
118
@@ -119,18 +127,24 @@ async def authorization_handler(request: Request) -> Response:
119
127
client = None
120
128
params = None
121
129
122
- async def error_response (error : ErrorCode , error_description : str , attempt_load_client : bool = True ):
130
+ async def error_response (
131
+ error : ErrorCode , error_description : str , attempt_load_client : bool = True
132
+ ):
123
133
nonlocal client , redirect_uri , state
124
134
if client is None and attempt_load_client :
125
135
# make last-ditch attempt to load the client
126
136
client_id = best_effort_extract_string ("client_id" , params )
127
- client = client_id and await provider .clients_store .get_client (client_id )
137
+ client = client_id and await provider .clients_store .get_client (
138
+ client_id
139
+ )
128
140
if redirect_uri is None and client :
129
141
# make last-ditch effort to load the redirect uri
130
142
if params is not None and "redirect_uri" not in params :
131
143
raw_redirect_uri = None
132
144
else :
133
- raw_redirect_uri = AnyHttpUrlModel .model_validate (best_effort_extract_string ("redirect_uri" , params )).root
145
+ raw_redirect_uri = AnyHttpUrlModel .model_validate (
146
+ best_effort_extract_string ("redirect_uri" , params )
147
+ ).root
134
148
try :
135
149
redirect_uri = validate_redirect_uri (raw_redirect_uri , client )
136
150
except (ValidationError , InvalidRequestError ):
@@ -147,7 +161,9 @@ async def error_response(error: ErrorCode, error_description: str, attempt_load_
147
161
148
162
if redirect_uri and client :
149
163
return RedirectResponse (
150
- url = construct_redirect_uri (str (redirect_uri ), ** error_resp .model_dump (exclude_none = True )),
164
+ url = construct_redirect_uri (
165
+ str (redirect_uri ), ** error_resp .model_dump (exclude_none = True )
166
+ ),
151
167
status_code = 302 ,
152
168
headers = {"Cache-Control" : "no-store" },
153
169
)
@@ -157,7 +173,7 @@ async def error_response(error: ErrorCode, error_description: str, attempt_load_
157
173
content = error_resp ,
158
174
headers = {"Cache-Control" : "no-store" },
159
175
)
160
-
176
+
161
177
try :
162
178
# Parse request parameters
163
179
if request .method == "GET" :
@@ -166,20 +182,22 @@ async def error_response(error: ErrorCode, error_description: str, attempt_load_
166
182
else :
167
183
# Parse form data for POST requests
168
184
params = await request .form ()
169
-
185
+
170
186
# Save state if it exists, even before validation
171
187
state = best_effort_extract_string ("state" , params )
172
-
188
+
173
189
try :
174
190
auth_request = AuthorizationRequest .model_validate (params )
175
191
state = auth_request .state # Update with validated state
176
192
except ValidationError as validation_error :
177
193
error : ErrorCode = "invalid_request"
178
194
for e in validation_error .errors ():
179
- if e [' loc' ] == (' response_type' ,) and e [' type' ] == ' literal_error' :
195
+ if e [" loc" ] == (" response_type" ,) and e [" type" ] == " literal_error" :
180
196
error = "unsupported_response_type"
181
197
break
182
- return await error_response (error , stringify_pydantic_error (validation_error ))
198
+ return await error_response (
199
+ error , stringify_pydantic_error (validation_error )
200
+ )
183
201
184
202
# Get client information
185
203
client = await provider .clients_store .get_client (auth_request .client_id )
@@ -191,7 +209,6 @@ async def error_response(error: ErrorCode, error_description: str, attempt_load_
191
209
attempt_load_client = False ,
192
210
)
193
211
194
-
195
212
# Validate redirect_uri against client's registered URIs
196
213
try :
197
214
redirect_uri = validate_redirect_uri (auth_request .redirect_uri , client )
@@ -201,7 +218,7 @@ async def error_response(error: ErrorCode, error_description: str, attempt_load_
201
218
error = "invalid_request" ,
202
219
error_description = validation_error .message ,
203
220
)
204
-
221
+
205
222
# Validate scope - for scope errors, we can redirect
206
223
try :
207
224
scopes = validate_scope (auth_request .scope , client )
@@ -211,28 +228,30 @@ async def error_response(error: ErrorCode, error_description: str, attempt_load_
211
228
error = "invalid_scope" ,
212
229
error_description = validation_error .message ,
213
230
)
214
-
231
+
215
232
# Setup authorization parameters
216
233
auth_params = AuthorizationParams (
217
234
state = state ,
218
235
scopes = scopes ,
219
236
code_challenge = auth_request .code_challenge ,
220
237
redirect_uri = redirect_uri ,
221
238
)
222
-
239
+
223
240
# Let the provider pick the next URI to redirect to
224
241
response = RedirectResponse (
225
242
url = "" , status_code = 302 , headers = {"Cache-Control" : "no-store" }
226
243
)
227
- response .headers ["location" ] = await provider .authorize (
228
- client , auth_params
229
- )
244
+ response .headers ["location" ] = await provider .authorize (client , auth_params )
230
245
return response
231
-
246
+
232
247
except Exception as validation_error :
233
248
# Catch-all for unexpected errors
234
- logger .exception ("Unexpected error in authorization_handler" , exc_info = validation_error )
235
- return await error_response (error = "server_error" , error_description = "An unexpected error occurred" )
249
+ logger .exception (
250
+ "Unexpected error in authorization_handler" , exc_info = validation_error
251
+ )
252
+ return await error_response (
253
+ error = "server_error" , error_description = "An unexpected error occurred"
254
+ )
236
255
237
256
return authorization_handler
238
257
@@ -241,7 +260,7 @@ def create_error_redirect(
241
260
redirect_uri : AnyUrl , error : Union [Exception , ErrorResponse ]
242
261
) -> str :
243
262
parsed_uri = urlparse (str (redirect_uri ))
244
-
263
+
245
264
if isinstance (error , ErrorResponse ):
246
265
# Convert ErrorResponse to dict
247
266
error_dict = error .model_dump (exclude_none = True )
@@ -252,7 +271,7 @@ def create_error_redirect(
252
271
query_params [key ] = str (value )
253
272
else :
254
273
query_params [key ] = value
255
-
274
+
256
275
elif isinstance (error , OAuthError ):
257
276
query_params = {"error" : error .error_code , "error_description" : str (error )}
258
277
else :
@@ -265,4 +284,4 @@ def create_error_redirect(
265
284
if parsed_uri .query :
266
285
new_query = f"{ parsed_uri .query } &{ new_query } "
267
286
268
- return urlunparse (parsed_uri ._replace (query = new_query ))
287
+ return urlunparse (parsed_uri ._replace (query = new_query ))
0 commit comments