From 0253e2ad859ed0e2d09ae53446ed8b5d1edcc7fe Mon Sep 17 00:00:00 2001 From: Callum Dickinson Date: Tue, 11 Feb 2025 18:49:33 +1300 Subject: [PATCH] Fix listing metrics as non-admin without creator policy defined When using a minimal RBAC policy that only allows either access either to a cloud admin or a user that is part of the project a metric's resource is owned by, e.g. ```yaml "list metric": "role:admin or project_id:%(resource.project_id)s" ``` There is an issue where `KeystoneAuthHelper.get_metric_policy_filter` will reject access to the metrics because from its perspective there are no applicable policies that the user has permission under. This patch fixes that by adding an option to `get_metric_policy_filter` that allows an empty policy filter to be returned if an enforcement check for `resource.project_id` passes. This option is set to `False` by default, and only enabled where necessary. This allows the caller to use `KeystoneAuthHelper.get_resource_policy_filter` to add project filters to the metric query based on the resource the metric is associated with. --- gnocchi/rest/api.py | 2 +- gnocchi/rest/auth_helper.py | 24 +++++++++++- gnocchi/tests/test_rest.py | 77 +++++++++++++++++++++++++++++++------ 3 files changed, 89 insertions(+), 14 deletions(-) diff --git a/gnocchi/rest/api.py b/gnocchi/rest/api.py index ad0ff9130..1ca467f63 100644 --- a/gnocchi/rest/api.py +++ b/gnocchi/rest/api.py @@ -698,7 +698,7 @@ def get_all(cls, **kwargs): attr_filters.append({"=": {k: v}}) policy_filter = pecan.request.auth_helper.get_metric_policy_filter( - pecan.request, "list metric") + pecan.request, "list metric", allow_resource_project_id=True) resource_policy_filter = ( pecan.request.auth_helper.get_resource_policy_filter( pecan.request, "list metric", resource_type=None, diff --git a/gnocchi/rest/auth_helper.py b/gnocchi/rest/auth_helper.py index 6f7a0fbe3..529e4dacd 100644 --- a/gnocchi/rest/auth_helper.py +++ b/gnocchi/rest/auth_helper.py @@ -94,7 +94,9 @@ def get_resource_policy_filter(request, rule, resource_type, prefix=None): return {"or": policy_filter} @staticmethod - def get_metric_policy_filter(request, rule): + def get_metric_policy_filter(request, + rule, + allow_resource_project_id=False): try: # Check if the policy allows the user to list any metric api.enforce(rule, {}) @@ -113,6 +115,26 @@ def get_metric_policy_filter(request, rule): policy_filter.append( {"like": {"creator": "%:" + project_id}}) + resource_project_id_allowed = False + try: + # Check if the policy allows the user to list metrics linked + # to their project via a resource + api.enforce(rule, {"resource.project_id": project_id}) + except webob.exc.HTTPForbidden: + pass + else: + if allow_resource_project_id: + resource_project_id_allowed = True + + # NOTE(callumdickinson): If allow_resource_project_id is enabled + # and the policy filter is empty, allow an empty policy filter + # to be returned for this case ONLY. + # The caller is expected to use get_resource_policy_filter + # to perform filtering by resource to ensure the client + # only gets metrics for resources they are allowed to access. + if resource_project_id_allowed and not policy_filter: + return {} + if not policy_filter: # We need to have at least one policy filter in place api.abort(403, "Insufficient privileges") diff --git a/gnocchi/tests/test_rest.py b/gnocchi/tests/test_rest.py index 118f7ae01..84720115e 100644 --- a/gnocchi/tests/test_rest.py +++ b/gnocchi/tests/test_rest.py @@ -29,6 +29,7 @@ import testscenarios from testtools import testcase from unittest import mock +import webob import webtest import gnocchi @@ -333,20 +334,72 @@ def test_list_metric_with_another_user(self): self.assertNotIn(metric_id, [m["id"] for m in metric_list.json]) def test_list_metric_with_another_user_allowed(self): - rid = str(uuid.uuid4()) - r = self.app.post_json("/v1/resource/generic", - params={ - "id": rid, - "project_id": TestingApp.PROJECT_ID_2, - "metrics": { - "disk": {"archive_policy_name": "low"}, - } - }) - metric_id = r.json['metrics']['disk'] + r1id = str(uuid.uuid4()) + r1 = self.app.post_json("/v1/resource/generic", + params={ + "id": r1id, + "project_id": TestingApp.PROJECT_ID, + "metrics": { + "disk": { + "archive_policy_name": "low"}, + } + }) + metric_id1 = r1.json['metrics']['disk'] + r2id = str(uuid.uuid4()) + r2 = self.app.post_json("/v1/resource/generic", + params={ + "id": r2id, + "project_id": TestingApp.PROJECT_ID_2, + "metrics": { + "disk": { + "archive_policy_name": "low"}, + } + }) + metric_id2 = r2.json['metrics']['disk'] with self.app.use_another_user(): - metric_list = self.app.get("/v1/metric") - self.assertIn(metric_id, [m["id"] for m in metric_list.json]) + metric_list = self.app.get("/v1/metric").json + metric_ids = [m["id"] for m in metric_list] + self.assertNotIn(metric_id1, metric_ids) + self.assertIn(metric_id2, metric_ids) + + def test_list_metric_with_another_user_allowed_no_creator_policy(self): + r1id = str(uuid.uuid4()) + r1 = self.app.post_json("/v1/resource/generic", + params={ + "id": r1id, + "project_id": TestingApp.PROJECT_ID, + "metrics": { + "disk": { + "archive_policy_name": "low"}, + } + }) + metric_id1 = r1.json['metrics']['disk'] + r2id = str(uuid.uuid4()) + r2 = self.app.post_json("/v1/resource/generic", + params={ + "id": r2id, + "project_id": TestingApp.PROJECT_ID_2, + "metrics": { + "disk": { + "archive_policy_name": "low"}, + } + }) + metric_id2 = r2.json['metrics']['disk'] + + orig_enforce = api.enforce + + def _enforce(rule, target): + if rule == "list metric" and "created_by_project_id" in target: + raise webob.exc.HTTPForbidden() + return orig_enforce(rule, target) + + with mock.patch.object(api, 'enforce', side_effect=_enforce): + with self.app.use_another_user(): + metric_list = self.app.get("/v1/metric").json + metric_ids = [m["id"] for m in metric_list] + self.assertNotIn(metric_id1, metric_ids) + self.assertIn(metric_id2, metric_ids) def test_get_metric_with_another_user(self): result = self.app.post_json("/v1/metric",