Skip to content

Commit

Permalink
Merge pull request #773 from hinashi/fixed/entry_permission
Browse files Browse the repository at this point in the history
Fixed permissions for entry operations
  • Loading branch information
userlocalhost authored Feb 14, 2023
2 parents b9fdc52 + 85914dc commit 7b94a76
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 87 deletions.
2 changes: 1 addition & 1 deletion acl/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def writable(self):
def full(self):
return self._get_permission(ACLType.Full.id)

def _get_permission(self, acltype):
def _get_permission(self, acltype) -> HistoricalPermission:
return HistoricalPermission.objects.get(codename="%s.%s" % (self.id, acltype))

def get_subclass_object(self):
Expand Down
45 changes: 41 additions & 4 deletions api_v1/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,20 +482,27 @@ def test_post_entry_without_permissoin(self):
guest = self.guest_login()

# checks that we can't create a new entry because of lack of permission
role = Role.objects.create(name="Role")
role.users.add(guest)
params = {
"name": "entry",
"entity": entity.name,
"attrs": {"attr1": "hoge", "attr2": "fuga"},
}

# permission nothing
resp = self.client.post("/api/v1/entry", json.dumps(params), "application/json")
self.assertEqual(resp.status_code, 400)
self.assertEqual(resp.json()["result"], "Permission denied to create(or update) entry")

# Set permisson to create new entry
role = Role.objects.create(name="Role")
entity.writable.roles.add(role)
role.users.add(guest)
# permission readable
entity.readable.roles.add(role)
resp = self.client.post("/api/v1/entry", json.dumps(params), "application/json")
self.assertEqual(resp.status_code, 400)
self.assertEqual(resp.json()["result"], "Permission denied to create(or update) entry")

# permission writable
entity.writable.roles.add(role)
# checks that we can create an entry but attr2 doesn't set because
# guest doesn't have permission of writable for attr2
params = {
Expand All @@ -510,6 +517,36 @@ def test_post_entry_without_permissoin(self):
self.assertEqual(entry.attrs.count(), 1)
self.assertEqual(entry.attrs.last().name, "attr1")

# checks that we can't update entry because of lack of permission
entry = Entry.objects.create(
name="test_entry", schema=entity, created_user=admin, is_public=False
)
entry.complement_attrs(admin)
params = {
"id": entry.id,
"name": "test_entry",
"entity": entity.name,
"attrs": {"attr1": "hoge", "attr2": "fuga"},
}

# permission nothing
resp = self.client.post("/api/v1/entry", json.dumps(params), "application/json")
self.assertEqual(resp.status_code, 400)
self.assertEqual(resp.json()["result"], "Permission denied to update entry")

# permission readable
entry.readable.roles.add(role)
resp = self.client.post("/api/v1/entry", json.dumps(params), "application/json")
self.assertEqual(resp.status_code, 400)
self.assertEqual(resp.json()["result"], "Permission denied to update entry")

# permission writable
entry.writable.roles.add(role)
resp = self.client.post("/api/v1/entry", json.dumps(params), "application/json")
self.assertEqual(resp.status_code, 200)
attr: Attribute = entry.attrs.get(schema__name="attr1")
self.assertEqual(attr.get_latest_value().get_value(), "hoge")

def test_update_entry(self):
admin = self.admin_login()

Expand Down
14 changes: 11 additions & 3 deletions api_v1/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,20 @@ def _update_entry_name(entry):
)

entry = Entry.objects.get(id=sel.validated_data["id"])
if not request.user.has_permission(entry, ACLType.Writable):
return Response(
{"result": "Permission denied to update entry"},
status=status.HTTP_400_BAD_REQUEST,
)
will_notify_update_entry = _update_entry_name(entry)

elif Entry.objects.filter(**entry_condition).exists():
entry = Entry.objects.get(**entry_condition)
if not request.user.has_permission(entry, ACLType.Writable):
return Response(
{"result": "Permission denied to update entry"},
status=status.HTTP_400_BAD_REQUEST,
)
will_notify_update_entry = _update_entry_name(entry)

else:
Expand Down Expand Up @@ -208,9 +218,7 @@ def delete(self, request, *args, **kwargs):
)

# permission check
if not request.user.has_permission(entry, ACLType.Full) or not request.user.has_permission(
entity, ACLType.Readable
):
if not request.user.has_permission(entry, ACLType.Writable):
return Response("Permission denied to operate", status=status.HTTP_400_BAD_REQUEST)

if custom_view.is_custom("delete_entry_api", entry.schema.name):
Expand Down
6 changes: 3 additions & 3 deletions entry/api_v2/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ def has_object_permission(self, request, view, obj):
permisson = {
"retrieve": ACLType.Readable,
"update": ACLType.Writable,
"destroy": ACLType.Full,
"restore": ACLType.Full,
"copy": ACLType.Full,
"destroy": ACLType.Writable,
"restore": ACLType.Writable,
"copy": ACLType.Writable,
}

if not user.has_permission(obj, permisson.get(view.action)):
Expand Down
74 changes: 0 additions & 74 deletions entry/tests/test_api_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -1080,18 +1080,6 @@ def test_destroy_entry_without_permission(self):
# permission writable entity
self.entity.writable.roles.add(self.role)
resp = self.client.delete("/entry/api/v2/%s/" % entry.id, None, "application/json")
self.assertEqual(resp.status_code, 403)
self.assertEqual(
resp.json(),
{
"code": "AE-210000",
"message": "You do not have permission to perform this action.",
},
)

# permission full entity
self.entity.full.roles.add(self.role)
resp = self.client.delete("/entry/api/v2/%s/" % entry.id, None, "application/json")
self.assertEqual(resp.status_code, 204)

entry.restore()
Expand Down Expand Up @@ -1124,18 +1112,6 @@ def test_destroy_entry_without_permission(self):
# permission writable entry
entry.writable.roles.add(self.role)
resp = self.client.delete("/entry/api/v2/%s/" % entry.id, None, "application/json")
self.assertEqual(resp.status_code, 403)
self.assertEqual(
resp.json(),
{
"code": "AE-210000",
"message": "You do not have permission to perform this action.",
},
)

# permission full entry
entry.full.roles.add(self.role)
resp = self.client.delete("/entry/api/v2/%s/" % entry.id, None, "application/json")
self.assertEqual(resp.status_code, 204)

def test_destory_entry_with_invalid_param(self):
Expand Down Expand Up @@ -1229,17 +1205,6 @@ def test_restore_entry_without_permission(self):
# permission writable entity
self.entity.writable.roles.add(self.role)
resp = self.client.post("/entry/api/v2/%s/restore/" % entry.id, None, "application/json")
self.assertEqual(
resp.json(),
{
"code": "AE-210000",
"message": "You do not have permission to perform this action.",
},
)

# permission full entity
self.entity.full.roles.add(self.role)
resp = self.client.post("/entry/api/v2/%s/restore/" % entry.id, None, "application/json")
self.assertEqual(resp.status_code, 201)

entry.delete()
Expand Down Expand Up @@ -1272,18 +1237,6 @@ def test_restore_entry_without_permission(self):
# permission writable entry
entry.writable.roles.add(self.role)
resp = self.client.post("/entry/api/v2/%s/restore/" % entry.id, None, "application/json")
self.assertEqual(resp.status_code, 403)
self.assertEqual(
resp.json(),
{
"code": "AE-210000",
"message": "You do not have permission to perform this action.",
},
)

# permission full entry
entry.full.roles.add(self.role)
resp = self.client.post("/entry/api/v2/%s/restore/" % entry.id, None, "application/json")
self.assertEqual(resp.status_code, 201)

def test_restore_entry_with_invalid_param(self):
Expand Down Expand Up @@ -1393,19 +1346,6 @@ def test_copy_entry_without_permission(self):
resp = self.client.post(
"/entry/api/v2/%s/copy/" % entry.id, json.dumps(params), "application/json"
)
self.assertEqual(
resp.json(),
{
"code": "AE-210000",
"message": "You do not have permission to perform this action.",
},
)

# permission full entity
self.entity.full.roles.add(self.role)
resp = self.client.post(
"/entry/api/v2/%s/copy/" % entry.id, json.dumps(params), "application/json"
)
self.assertEqual(resp.status_code, 200)

params = {"copy_entry_names": ["copy2"]}
Expand Down Expand Up @@ -1444,20 +1384,6 @@ def test_copy_entry_without_permission(self):
resp = self.client.post(
"/entry/api/v2/%s/copy/" % entry.id, json.dumps(params), "application/json"
)
self.assertEqual(resp.status_code, 403)
self.assertEqual(
resp.json(),
{
"code": "AE-210000",
"message": "You do not have permission to perform this action.",
},
)

# permission full entry
entry.full.roles.add(self.role)
resp = self.client.post(
"/entry/api/v2/%s/copy/" % entry.id, json.dumps(params), "application/json"
)
self.assertEqual(resp.status_code, 200)

def test_copy_entry_with_invalid_param(self):
Expand Down
29 changes: 29 additions & 0 deletions entry/tests/test_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -4428,6 +4428,8 @@ def test_index_deleting_entries(self):
def test_restore_entry(self, mock_task):
# initialize entries to test
user = self.guest_login()
role: Role = Role.objects.create(name="Role")
role.users.add(user)
entity = Entity.objects.create(name="entity", created_user=user)
entity.attrs.add(
EntityAttr.objects.create(
Expand Down Expand Up @@ -4466,6 +4468,33 @@ def test_restore_entry(self, mock_task):
self.assertTrue(entry.name.find("_deleted_") > 0)
self.assertFalse(any([x.is_active for x in entry.attrs.all()]))

# nothing permisson
entry.is_public = False
entry.save()
resp = self.client.post(
reverse("entry:do_restore", args=[entry.id]),
json.dumps({}),
"application/json",
)
self.assertEqual(resp.status_code, 400)
self.assertEqual(resp.content, b"You don't have permission to access this object")
entry.refresh_from_db()
self.assertFalse(entry.is_active)

# readable permission
entry.readable.roles.add(role)
resp = self.client.post(
reverse("entry:do_restore", args=[entry.id]),
json.dumps({}),
"application/json",
)
self.assertEqual(resp.status_code, 400)
self.assertEqual(resp.content, b"You don't have permission to access this object")
entry.refresh_from_db()
self.assertFalse(entry.is_active)

# writable permission
entry.writable.roles.add(role)
resp = self.client.post(
reverse("entry:do_restore", args=[entry.id]),
json.dumps({}),
Expand Down
4 changes: 2 additions & 2 deletions entry/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,7 @@ def do_copy(request, entry_id, recv_data):

@http_get
def restore(request, entity_id):
entity, error = get_obj_with_check_perm(request.user, Entity, entity_id, ACLType.Full)
entity, error = get_obj_with_check_perm(request.user, Entity, entity_id, ACLType.Writable)
if error:
return error

Expand Down Expand Up @@ -705,7 +705,7 @@ def restore(request, entity_id):

@http_post([])
def do_restore(request, entry_id, recv_data):
entry, error = get_obj_with_check_perm(request.user, Entry, entry_id, ACLType.Full)
entry, error = get_obj_with_check_perm(request.user, Entry, entry_id, ACLType.Writable)
if error:
return error

Expand Down

0 comments on commit 7b94a76

Please sign in to comment.