1
1
"""Basic authentication & authorization related tools for the Ralph API."""
2
2
3
3
import logging
4
+ import os
4
5
from functools import lru_cache
5
6
from pathlib import Path
6
7
from threading import Lock
10
11
from cachetools import TTLCache , cached
11
12
from fastapi import Depends , HTTPException , status
12
13
from fastapi .security import HTTPBasic , HTTPBasicCredentials
13
- from pydantic import BaseModel , root_validator
14
+ from pydantic import RootModel , model_validator
14
15
from starlette .authentication import AuthenticationError
15
16
16
17
from ralph .api .auth .user import AuthenticatedUser
@@ -40,45 +41,42 @@ class UserCredentials(AuthenticatedUser):
40
41
username : str
41
42
42
43
43
- class ServerUsersCredentials (BaseModel ):
44
+ class ServerUsersCredentials (RootModel [ List [ UserCredentials ]] ):
44
45
"""Custom root pydantic model.
45
46
46
47
Describe expected list of all server users credentials as stored in
47
48
the credentials file.
48
49
49
50
Attributes:
50
- __root__ (List): Custom root consisting of the
51
+ root (List): Custom root consisting of the
51
52
list of all server users credentials.
52
53
"""
53
54
54
- __root__ : List [UserCredentials ]
55
-
56
55
def __add__ (self , other ) -> Any : # noqa: D105
57
- return ServerUsersCredentials .parse_obj (self .__root__ + other .__root__ )
56
+ return ServerUsersCredentials .model_validate (self .root + other .root )
58
57
59
58
def __getitem__ (self , item : int ) -> UserCredentials : # noqa: D105
60
- return self .__root__ [item ]
59
+ return self .root [item ]
61
60
62
61
def __len__ (self ) -> int : # noqa: D105
63
- return len (self .__root__ )
62
+ return len (self .root )
64
63
65
64
def __iter__ (self ) -> Iterator [UserCredentials ]: # noqa: D105
66
- return iter (self .__root__ )
65
+ return iter (self .root )
67
66
68
- @root_validator
69
- @classmethod
70
- def ensure_unique_username (cls , values : Any ) -> Any :
67
+ @model_validator (mode = "after" )
68
+ def ensure_unique_username (self ) -> Any :
71
69
"""Every username should be unique among registered users."""
72
- usernames = [entry .username for entry in values . get ( "__root__" ) ]
70
+ usernames = [entry .username for entry in self . root ]
73
71
if len (usernames ) != len (set (usernames )):
74
72
raise ValueError (
75
73
"You cannot create multiple credentials with the same username"
76
74
)
77
- return values
75
+ return self
78
76
79
77
80
78
@lru_cache ()
81
- def get_stored_credentials (auth_file : Path ) -> ServerUsersCredentials :
79
+ def get_stored_credentials (auth_file : os . PathLike ) -> ServerUsersCredentials :
82
80
"""Helper to read the credentials/scopes file.
83
81
84
82
Read credentials from JSON file and stored them to avoid reloading them with every
@@ -96,7 +94,9 @@ def get_stored_credentials(auth_file: Path) -> ServerUsersCredentials:
96
94
msg = "Credentials file <%s> not found."
97
95
logger .warning (msg , auth_file )
98
96
raise AuthenticationError (msg .format (auth_file ))
99
- return ServerUsersCredentials .parse_file (auth_file )
97
+
98
+ with open (auth_file , encoding = settings .LOCALE_ENCODING ) as f :
99
+ return ServerUsersCredentials .model_validate_json (f .read ())
100
100
101
101
102
102
@cached (
0 commit comments