From 85914dc8274de2d822af510d86fc09d374404f56 Mon Sep 17 00:00:00 2001 From: hinashi Date: Tue, 14 Feb 2023 13:21:11 +0900 Subject: [PATCH] Fixed permissions for entry operations --- acl/models.py | 2 +- api_v1/tests/test_api.py | 45 ++++++++++++++++++++--- api_v1/views.py | 14 ++++++-- entry/api_v2/views.py | 6 ++-- entry/tests/test_api_v2.py | 74 -------------------------------------- entry/tests/test_view.py | 29 +++++++++++++++ entry/views.py | 4 +-- 7 files changed, 87 insertions(+), 87 deletions(-) diff --git a/acl/models.py b/acl/models.py index 946e79bd6..4b1fb4585 100644 --- a/acl/models.py +++ b/acl/models.py @@ -144,7 +144,7 @@ def writable(self): def full(self): return self._get_permission(ACLType.Full.id) - def _get_permission(self, acltype): + def _get_permission(self, acltype) -> HistoricalPermission: return HistoricalPermission.objects.get(codename="%s.%s" % (self.id, acltype)) def get_subclass_object(self): diff --git a/api_v1/tests/test_api.py b/api_v1/tests/test_api.py index f32159389..48b50000e 100644 --- a/api_v1/tests/test_api.py +++ b/api_v1/tests/test_api.py @@ -480,20 +480,27 @@ def test_post_entry_without_permissoin(self): guest = self.guest_login() # checks that we can't create a new entry because of lack of permission + role = Role.objects.create(name="Role") + role.users.add(guest) params = { "name": "entry", "entity": entity.name, "attrs": {"attr1": "hoge", "attr2": "fuga"}, } + + # permission nothing resp = self.client.post("/api/v1/entry", json.dumps(params), "application/json") self.assertEqual(resp.status_code, 400) self.assertEqual(resp.json()["result"], "Permission denied to create(or update) entry") - # Set permisson to create new entry - role = Role.objects.create(name="Role") - entity.writable.roles.add(role) - role.users.add(guest) + # permission readable + entity.readable.roles.add(role) + resp = self.client.post("/api/v1/entry", json.dumps(params), "application/json") + self.assertEqual(resp.status_code, 400) + self.assertEqual(resp.json()["result"], "Permission denied to create(or update) entry") + # permission writable + entity.writable.roles.add(role) # checks that we can create an entry but attr2 doesn't set because # guest doesn't have permission of writable for attr2 params = { @@ -508,6 +515,36 @@ def test_post_entry_without_permissoin(self): self.assertEqual(entry.attrs.count(), 1) self.assertEqual(entry.attrs.last().name, "attr1") + # checks that we can't update entry because of lack of permission + entry = Entry.objects.create( + name="test_entry", schema=entity, created_user=admin, is_public=False + ) + entry.complement_attrs(admin) + params = { + "id": entry.id, + "name": "test_entry", + "entity": entity.name, + "attrs": {"attr1": "hoge", "attr2": "fuga"}, + } + + # permission nothing + resp = self.client.post("/api/v1/entry", json.dumps(params), "application/json") + self.assertEqual(resp.status_code, 400) + self.assertEqual(resp.json()["result"], "Permission denied to update entry") + + # permission readable + entry.readable.roles.add(role) + resp = self.client.post("/api/v1/entry", json.dumps(params), "application/json") + self.assertEqual(resp.status_code, 400) + self.assertEqual(resp.json()["result"], "Permission denied to update entry") + + # permission writable + entry.writable.roles.add(role) + resp = self.client.post("/api/v1/entry", json.dumps(params), "application/json") + self.assertEqual(resp.status_code, 200) + attr: Attribute = entry.attrs.get(schema__name="attr1") + self.assertEqual(attr.get_latest_value().get_value(), "hoge") + def test_update_entry(self): admin = self.admin_login() diff --git a/api_v1/views.py b/api_v1/views.py index 96c4a517a..f14a82636 100644 --- a/api_v1/views.py +++ b/api_v1/views.py @@ -77,10 +77,20 @@ def _update_entry_name(entry): ) entry = Entry.objects.get(id=sel.validated_data["id"]) + if not request.user.has_permission(entry, ACLType.Writable): + return Response( + {"result": "Permission denied to update entry"}, + status=status.HTTP_400_BAD_REQUEST, + ) will_notify_update_entry = _update_entry_name(entry) elif Entry.objects.filter(**entry_condition).exists(): entry = Entry.objects.get(**entry_condition) + if not request.user.has_permission(entry, ACLType.Writable): + return Response( + {"result": "Permission denied to update entry"}, + status=status.HTTP_400_BAD_REQUEST, + ) will_notify_update_entry = _update_entry_name(entry) else: @@ -207,9 +217,7 @@ def delete(self, request, *args, **kwargs): ) # permission check - if not request.user.has_permission(entry, ACLType.Full) or not request.user.has_permission( - entity, ACLType.Readable - ): + if not request.user.has_permission(entry, ACLType.Writable): return Response("Permission denied to operate", status=status.HTTP_400_BAD_REQUEST) # Delete the specified entry then return its id, if is active diff --git a/entry/api_v2/views.py b/entry/api_v2/views.py index 6c9b684bb..d86655906 100644 --- a/entry/api_v2/views.py +++ b/entry/api_v2/views.py @@ -50,9 +50,9 @@ def has_object_permission(self, request, view, obj): permisson = { "retrieve": ACLType.Readable, "update": ACLType.Writable, - "destroy": ACLType.Full, - "restore": ACLType.Full, - "copy": ACLType.Full, + "destroy": ACLType.Writable, + "restore": ACLType.Writable, + "copy": ACLType.Writable, } if not user.has_permission(obj, permisson.get(view.action)): diff --git a/entry/tests/test_api_v2.py b/entry/tests/test_api_v2.py index c0194d820..1fdd26455 100644 --- a/entry/tests/test_api_v2.py +++ b/entry/tests/test_api_v2.py @@ -1080,18 +1080,6 @@ def test_destroy_entry_without_permission(self): # permission writable entity self.entity.writable.roles.add(self.role) resp = self.client.delete("/entry/api/v2/%s/" % entry.id, None, "application/json") - self.assertEqual(resp.status_code, 403) - self.assertEqual( - resp.json(), - { - "code": "AE-210000", - "message": "You do not have permission to perform this action.", - }, - ) - - # permission full entity - self.entity.full.roles.add(self.role) - resp = self.client.delete("/entry/api/v2/%s/" % entry.id, None, "application/json") self.assertEqual(resp.status_code, 204) entry.restore() @@ -1124,18 +1112,6 @@ def test_destroy_entry_without_permission(self): # permission writable entry entry.writable.roles.add(self.role) resp = self.client.delete("/entry/api/v2/%s/" % entry.id, None, "application/json") - self.assertEqual(resp.status_code, 403) - self.assertEqual( - resp.json(), - { - "code": "AE-210000", - "message": "You do not have permission to perform this action.", - }, - ) - - # permission full entry - entry.full.roles.add(self.role) - resp = self.client.delete("/entry/api/v2/%s/" % entry.id, None, "application/json") self.assertEqual(resp.status_code, 204) def test_destory_entry_with_invalid_param(self): @@ -1229,17 +1205,6 @@ def test_restore_entry_without_permission(self): # permission writable entity self.entity.writable.roles.add(self.role) resp = self.client.post("/entry/api/v2/%s/restore/" % entry.id, None, "application/json") - self.assertEqual( - resp.json(), - { - "code": "AE-210000", - "message": "You do not have permission to perform this action.", - }, - ) - - # permission full entity - self.entity.full.roles.add(self.role) - resp = self.client.post("/entry/api/v2/%s/restore/" % entry.id, None, "application/json") self.assertEqual(resp.status_code, 201) entry.delete() @@ -1272,18 +1237,6 @@ def test_restore_entry_without_permission(self): # permission writable entry entry.writable.roles.add(self.role) resp = self.client.post("/entry/api/v2/%s/restore/" % entry.id, None, "application/json") - self.assertEqual(resp.status_code, 403) - self.assertEqual( - resp.json(), - { - "code": "AE-210000", - "message": "You do not have permission to perform this action.", - }, - ) - - # permission full entry - entry.full.roles.add(self.role) - resp = self.client.post("/entry/api/v2/%s/restore/" % entry.id, None, "application/json") self.assertEqual(resp.status_code, 201) def test_restore_entry_with_invalid_param(self): @@ -1393,19 +1346,6 @@ def test_copy_entry_without_permission(self): resp = self.client.post( "/entry/api/v2/%s/copy/" % entry.id, json.dumps(params), "application/json" ) - self.assertEqual( - resp.json(), - { - "code": "AE-210000", - "message": "You do not have permission to perform this action.", - }, - ) - - # permission full entity - self.entity.full.roles.add(self.role) - resp = self.client.post( - "/entry/api/v2/%s/copy/" % entry.id, json.dumps(params), "application/json" - ) self.assertEqual(resp.status_code, 200) params = {"copy_entry_names": ["copy2"]} @@ -1444,20 +1384,6 @@ def test_copy_entry_without_permission(self): resp = self.client.post( "/entry/api/v2/%s/copy/" % entry.id, json.dumps(params), "application/json" ) - self.assertEqual(resp.status_code, 403) - self.assertEqual( - resp.json(), - { - "code": "AE-210000", - "message": "You do not have permission to perform this action.", - }, - ) - - # permission full entry - entry.full.roles.add(self.role) - resp = self.client.post( - "/entry/api/v2/%s/copy/" % entry.id, json.dumps(params), "application/json" - ) self.assertEqual(resp.status_code, 200) def test_copy_entry_with_invalid_param(self): diff --git a/entry/tests/test_view.py b/entry/tests/test_view.py index 7544119ba..2653028a1 100644 --- a/entry/tests/test_view.py +++ b/entry/tests/test_view.py @@ -4428,6 +4428,8 @@ def test_index_deleting_entries(self): def test_restore_entry(self, mock_task): # initialize entries to test user = self.guest_login() + role: Role = Role.objects.create(name="Role") + role.users.add(user) entity = Entity.objects.create(name="entity", created_user=user) entity.attrs.add( EntityAttr.objects.create( @@ -4466,6 +4468,33 @@ def test_restore_entry(self, mock_task): self.assertTrue(entry.name.find("_deleted_") > 0) self.assertFalse(any([x.is_active for x in entry.attrs.all()])) + # nothing permisson + entry.is_public = False + entry.save() + resp = self.client.post( + reverse("entry:do_restore", args=[entry.id]), + json.dumps({}), + "application/json", + ) + self.assertEqual(resp.status_code, 400) + self.assertEqual(resp.content, b"You don't have permission to access this object") + entry.refresh_from_db() + self.assertFalse(entry.is_active) + + # readable permission + entry.readable.roles.add(role) + resp = self.client.post( + reverse("entry:do_restore", args=[entry.id]), + json.dumps({}), + "application/json", + ) + self.assertEqual(resp.status_code, 400) + self.assertEqual(resp.content, b"You don't have permission to access this object") + entry.refresh_from_db() + self.assertFalse(entry.is_active) + + # writable permission + entry.writable.roles.add(role) resp = self.client.post( reverse("entry:do_restore", args=[entry.id]), json.dumps({}), diff --git a/entry/views.py b/entry/views.py index 4e0a3ca28..4b76abf7c 100644 --- a/entry/views.py +++ b/entry/views.py @@ -665,7 +665,7 @@ def do_copy(request, entry_id, recv_data): @http_get def restore(request, entity_id): - entity, error = get_obj_with_check_perm(request.user, Entity, entity_id, ACLType.Full) + entity, error = get_obj_with_check_perm(request.user, Entity, entity_id, ACLType.Writable) if error: return error @@ -705,7 +705,7 @@ def restore(request, entity_id): @http_post([]) def do_restore(request, entry_id, recv_data): - entry, error = get_obj_with_check_perm(request.user, Entry, entry_id, ACLType.Full) + entry, error = get_obj_with_check_perm(request.user, Entry, entry_id, ACLType.Writable) if error: return error