Skip to content

Commit

Permalink
Fix listing metrics as non-admin without creator policy defined
Browse files Browse the repository at this point in the history
When using a minimal RBAC policy that only allows either access
either to a cloud admin or a user that is part of the project
a metric's resource is owned by, e.g.

```yaml
"list metric": "role:admin or project_id:%(resource.project_id)s"
```

There is an issue where `KeystoneAuthHelper.get_metric_policy_filter`
will reject access to the metrics because from its perspective
there are no applicable policies that the user has permission under.

This patch fixes that by adding an option to `get_metric_policy_filter`
that allows an empty policy filter to be returned if an enforcement
check for `resource.project_id` passes. This option is set to `False`
by default, and only enabled where necessary.

This allows the caller to use
`KeystoneAuthHelper.get_resource_policy_filter` to add project
filters to the metric query based on the resource the metric
is associated with.
  • Loading branch information
Callum027 committed Feb 11, 2025
1 parent b28f215 commit 0253e2a
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 14 deletions.
2 changes: 1 addition & 1 deletion gnocchi/rest/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,7 @@ def get_all(cls, **kwargs):
attr_filters.append({"=": {k: v}})

policy_filter = pecan.request.auth_helper.get_metric_policy_filter(
pecan.request, "list metric")
pecan.request, "list metric", allow_resource_project_id=True)
resource_policy_filter = (
pecan.request.auth_helper.get_resource_policy_filter(
pecan.request, "list metric", resource_type=None,
Expand Down
24 changes: 23 additions & 1 deletion gnocchi/rest/auth_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ def get_resource_policy_filter(request, rule, resource_type, prefix=None):
return {"or": policy_filter}

@staticmethod
def get_metric_policy_filter(request, rule):
def get_metric_policy_filter(request,
rule,
allow_resource_project_id=False):
try:
# Check if the policy allows the user to list any metric
api.enforce(rule, {})
Expand All @@ -113,6 +115,26 @@ def get_metric_policy_filter(request, rule):
policy_filter.append(
{"like": {"creator": "%:" + project_id}})

resource_project_id_allowed = False
try:
# Check if the policy allows the user to list metrics linked
# to their project via a resource
api.enforce(rule, {"resource.project_id": project_id})
except webob.exc.HTTPForbidden:
pass
else:
if allow_resource_project_id:
resource_project_id_allowed = True

# NOTE(callumdickinson): If allow_resource_project_id is enabled
# and the policy filter is empty, allow an empty policy filter
# to be returned for this case ONLY.
# The caller is expected to use get_resource_policy_filter
# to perform filtering by resource to ensure the client
# only gets metrics for resources they are allowed to access.
if resource_project_id_allowed and not policy_filter:
return {}

if not policy_filter:
# We need to have at least one policy filter in place
api.abort(403, "Insufficient privileges")
Expand Down
77 changes: 65 additions & 12 deletions gnocchi/tests/test_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import testscenarios
from testtools import testcase
from unittest import mock
import webob
import webtest

import gnocchi
Expand Down Expand Up @@ -333,20 +334,72 @@ def test_list_metric_with_another_user(self):
self.assertNotIn(metric_id, [m["id"] for m in metric_list.json])

def test_list_metric_with_another_user_allowed(self):
rid = str(uuid.uuid4())
r = self.app.post_json("/v1/resource/generic",
params={
"id": rid,
"project_id": TestingApp.PROJECT_ID_2,
"metrics": {
"disk": {"archive_policy_name": "low"},
}
})
metric_id = r.json['metrics']['disk']
r1id = str(uuid.uuid4())
r1 = self.app.post_json("/v1/resource/generic",
params={
"id": r1id,
"project_id": TestingApp.PROJECT_ID,
"metrics": {
"disk": {
"archive_policy_name": "low"},
}
})
metric_id1 = r1.json['metrics']['disk']
r2id = str(uuid.uuid4())
r2 = self.app.post_json("/v1/resource/generic",
params={
"id": r2id,
"project_id": TestingApp.PROJECT_ID_2,
"metrics": {
"disk": {
"archive_policy_name": "low"},
}
})
metric_id2 = r2.json['metrics']['disk']

with self.app.use_another_user():
metric_list = self.app.get("/v1/metric")
self.assertIn(metric_id, [m["id"] for m in metric_list.json])
metric_list = self.app.get("/v1/metric").json
metric_ids = [m["id"] for m in metric_list]
self.assertNotIn(metric_id1, metric_ids)
self.assertIn(metric_id2, metric_ids)

def test_list_metric_with_another_user_allowed_no_creator_policy(self):
r1id = str(uuid.uuid4())
r1 = self.app.post_json("/v1/resource/generic",
params={
"id": r1id,
"project_id": TestingApp.PROJECT_ID,
"metrics": {
"disk": {
"archive_policy_name": "low"},
}
})
metric_id1 = r1.json['metrics']['disk']
r2id = str(uuid.uuid4())
r2 = self.app.post_json("/v1/resource/generic",
params={
"id": r2id,
"project_id": TestingApp.PROJECT_ID_2,
"metrics": {
"disk": {
"archive_policy_name": "low"},
}
})
metric_id2 = r2.json['metrics']['disk']

orig_enforce = api.enforce

def _enforce(rule, target):
if rule == "list metric" and "created_by_project_id" in target:
raise webob.exc.HTTPForbidden()
return orig_enforce(rule, target)

with mock.patch.object(api, 'enforce', side_effect=_enforce):
with self.app.use_another_user():
metric_list = self.app.get("/v1/metric").json
metric_ids = [m["id"] for m in metric_list]
self.assertNotIn(metric_id1, metric_ids)
self.assertIn(metric_id2, metric_ids)

def test_get_metric_with_another_user(self):
result = self.app.post_json("/v1/metric",
Expand Down

0 comments on commit 0253e2a

Please sign in to comment.