Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add duplicated role binding check rule #144

Merged
merged 2 commits into from
Dec 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/spaceone/identity/error/error_role.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,14 @@ class ERROR_NOT_ALLOWED_ROLE_TYPE(ERROR_INVALID_ARGUMENT):
"Role type is not allowed. (request_role_id = {request_role_id}, "
"request_role_type = {request_role_type}, supported_role_type = {supported_role_type})"
)


class ERROR_DUPLICATED_ROLE_BINDING(ERROR_INVALID_ARGUMENT):
_message = (
"Role type is duplicated. (role_type = {role_type}), "
"Only one {role_type} is allowed per user in the domain."
)


class ERROR_DUPLICATED_WORKSPACE_ROLE_BINDING(ERROR_INVALID_ARGUMENT):
_message = "Only one role binding is allowed per user in then same workspace. (choices = {allowed_role_type})"
2 changes: 1 addition & 1 deletion src/spaceone/identity/model/user/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class UserDisableMFARequest(BaseModel):

class UserSetRequiredActionsRequest(BaseModel):
user_id: str
actions: List[str]
required_actions: List[str]
domain_id: str


Expand Down
1 change: 0 additions & 1 deletion src/spaceone/identity/model/workspace_user/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ class WorkspaceUserResponse(BaseModel):
role_type: Union[RoleType, None] = None
language: Union[str, None] = None
timezone: Union[str, None] = None
api_key_count: Union[int, None] = None
tags: Union[dict, None] = None
role_binding_info: Union[RoleBindingResponse, None] = None
domain_id: Union[str, None] = None
Expand Down
33 changes: 30 additions & 3 deletions src/spaceone/identity/service/role_binding_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from spaceone.identity.manager.role_manager import RoleManager
from spaceone.identity.manager.user_manager import UserManager
from spaceone.identity.manager.workspace_manager import WorkspaceManager
from spaceone.identity.error.error_role import ERROR_NOT_ALLOWED_ROLE_TYPE
from spaceone.identity.error.error_role import *

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -69,6 +69,7 @@ def create_role_binding(self, params: dict):
workspace_mgr.get_workspace(workspace_id, domain_id)
else:
params["workspace_id"] = "*"
workspace_id = "*"

# Check role
role_mgr = RoleManager()
Expand All @@ -81,15 +82,17 @@ def create_role_binding(self, params: dict):
request_role_type=role_vo.role_type,
supported_role_type="DOMAIN_ADMIN",
)
self.check_duplicate_domain_admin_role(
domain_id, user_id, role_vo.role_type
)
else:
if role_vo.role_type not in ["WORKSPACE_OWNER", "WORKSPACE_MEMBER"]:
raise ERROR_NOT_ALLOWED_ROLE_TYPE(
request_role_id=role_vo.role_id,
request_role_type=role_vo.role_type,
supported_role_type=["WORKSPACE_OWNER", "WORKSPACE_MEMBER"],
)

# TODO: check duplication of role_binding
self.check_duplicate_workspace_role(domain_id, workspace_id, user_id)

params["role_type"] = role_vo.role_type

Expand Down Expand Up @@ -304,6 +307,30 @@ def stat(self, params: RoleBindingStatQueryRequest) -> dict:
query = params.query or {}
return self.role_binding_manager.stat_role_bindings(query)

def check_duplicate_domain_admin_role(
self, domain_id: str, user_id: str, role_type: str
):
rb_vos = self.role_binding_manager.filter_role_bindings(
domain_id=domain_id,
user_id=user_id,
role_type=role_type,
)

if rb_vos.count() > 0:
raise ERROR_DUPLICATED_ROLE_BINDING(role_type=role_type)

def check_duplicate_workspace_role(
self, domain_id: str, workspace_id: str, user_id: str
):
rb_vos = self.role_binding_manager.filter_role_bindings(
domain_id=domain_id, workspace_id=workspace_id, user_id=user_id
)

if rb_vos.count() >= 1:
raise ERROR_DUPLICATED_WORKSPACE_ROLE_BINDING(
allowed_role_type=["WORKSPACE_OWNER", "WORKSPACE_MEMBER"]
)

@staticmethod
def _get_latest_role_type(before: str, after: str) -> str:
priority = {
Expand Down
2 changes: 1 addition & 1 deletion src/spaceone/identity/service/role_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def create(self, params: RoleCreateRequest) -> Union[RoleResponse, dict]:
params (RoleCreateRequest): {
'name': 'str', # required
'role_type': 'list', # required
'permissions': 'list', # required
'permissions': 'list',
'page_access': 'list',
'tags': 'dict',
'domain_id': 'str' # injected from auth (required)
Expand Down
8 changes: 4 additions & 4 deletions src/spaceone/identity/service/user_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,14 +239,14 @@ def set_required_actions(

Args:
params (UserSetRequiredActionsRequest): {
'user_id': 'str', # required
'actions': 'list', # required
'domain_id': 'str' # injected from auth (required)
'user_id': 'str', # required
'required_actions': 'list', # required
'domain_id': 'str' # injected from auth (required)
}
Returns:
UserResponse:
"""
new_actions = params.actions or []
new_actions = params.required_actions or []
user_vo = self.user_mgr.get_user(params.user_id, params.domain_id)

if "UPDATE_PASSWORD" in new_actions:
Expand Down