From fb9751f3b184d8de31106e209cf0e3a69ff0494a Mon Sep 17 00:00:00 2001 From: Adam Dyess Date: Wed, 5 Feb 2025 14:30:20 -0600 Subject: [PATCH] Reconciler will detect and block on kubernetes resource collisions --- requirements.txt | 3 +- src/charm.py | 36 ++++++++-- src/manifests_base.py | 13 +++- src/manifests_cephfs.py | 14 ++-- src/manifests_rbd.py | 5 +- tests/unit/conftest.py | 12 ++++ tests/unit/test_charm.py | 103 +++++++++++++++++++--------- tests/unit/test_manifests_cephfs.py | 15 ++++ 8 files changed, 156 insertions(+), 45 deletions(-) diff --git a/requirements.txt b/requirements.txt index 069a0a5..c4b7e16 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,6 @@ ops >= 1.2.0 -ops.manifest >= 1.1.3, < 2.0 +# ops.manifest >= 1.1.3, < 2.0 +ops.manifest @ git+https://github.com/canonical/ops-lib-manifest@KU-2644/enhance-list-resources jinja2 pyyaml diff --git a/src/charm.py b/src/charm.py index b1580c5..299c9b6 100755 --- a/src/charm.py +++ b/src/charm.py @@ -20,7 +20,7 @@ from lightkube.core.exceptions import ApiError from lightkube.resources.core_v1 import Namespace from lightkube.resources.storage_v1 import StorageClass -from ops.manifests import Collector, ManifestClientError +from ops.manifests import Collector, ManifestClientError, ResourceAnalysis from manifests_base import Manifests, SafeManifest from manifests_cephfs import CephFilesystem, CephFSManifests, CephStorageClass @@ -84,12 +84,12 @@ def _list_versions(self, event: ops.ActionEvent) -> None: def _list_resources(self, event: ops.ActionEvent) -> None: manifests = event.params.get("manifest", "") resources = event.params.get("resources", "") - return self.collector.list_resources(event, manifests, resources) + self.collector.list_resources(event, manifests, resources) def _scrub_resources(self, event: ops.ActionEvent) -> None: manifests = event.params.get("manifest", "") resources = event.params.get("resources", "") - return self.collector.scrub_resources(event, manifests, resources) + self.collector.scrub_resources(event, manifests, resources) def _sync_resources(self, event: ops.ActionEvent) -> None: manifests = event.params.get("manifest", "") @@ -310,6 +310,27 @@ def enforce_cephfs_enabled(self) -> None: self.unit.status = ops.MaintenanceStatus("Disabling CephFS") self._purge_manifest_by_name("cephfs") + def prevent_collisions(self, event: ops.EventBase) -> None: + """Prevent manifest collisions.""" + if self.unit.is_leader(): + self.unit.status = ops.MaintenanceStatus("Detecting manifest collisions") + analyses: List[ResourceAnalysis] = self.collector.list_resources(event, "", "") + count = sum(len(a.conflicting) for a in analyses) + if count > 0: + msg = f"{count} Kubernetes resource collision{'s'[:count^1]} (action: list-resources)" + logger.error(msg) + for analysis in analyses: + if analysis.conflicting: + logger.error( + " Collision count in '%s' is %d", + analysis.manifest, + len(analysis.conflicting), + ) + for _ in sorted(map(str, analysis.conflicting)): + logger.error(" %s", _) + status.add(ops.BlockedStatus(msg)) + raise status.ReconcilerError(msg) + def evaluate_manifests(self) -> int: """Evaluate all manifests.""" self.unit.status = ops.MaintenanceStatus("Evaluating CephCSI") @@ -351,7 +372,13 @@ def check_ceph_client(self) -> None: :return: `True` if all the data successfully loaded, otherwise `False` """ self.unit.status = ops.MaintenanceStatus("Checking Relations") - if not self.model.get_relation(self.CEPH_CLIENT_RELATION): + try: + relation = self.model.get_relation(self.CEPH_CLIENT_RELATION) + except ops.model.TooManyRelatedAppsError: + status.add(ops.BlockedStatus("Multiple ceph-client relations")) + raise status.ReconcilerError("Multiple ceph-client relations") + + if not relation: status.add(ops.BlockedStatus("Missing relation: ceph-client")) raise status.ReconcilerError("Missing relation: ceph-client") @@ -379,6 +406,7 @@ def reconcile(self, event: ops.EventBase) -> None: self.check_ceph_client() self.configure_ceph_cli() self.enforce_cephfs_enabled() + self.prevent_collisions(event) hash = self.evaluate_manifests() self.install_manifests(config_hash=hash) self._update_status() diff --git a/src/manifests_base.py b/src/manifests_base.py index 19a12f3..f85996d 100644 --- a/src/manifests_base.py +++ b/src/manifests_base.py @@ -6,7 +6,8 @@ from lightkube.codecs import AnyResource from lightkube.core.resource import NamespacedResource from lightkube.models.core_v1 import Toleration -from ops.manifests import Manifests, Patch +from ops.manifests import ManifestLabel, Manifests, Patch +from ops.manifests.literals import APP_LABEL log = logging.getLogger(__name__) @@ -35,6 +36,16 @@ def __call__(self, obj: AnyResource) -> None: obj.metadata.namespace = ns +class ManifestLabelExcluder(ManifestLabel): + """Exclude applying labels to CSIDriver.""" + + def __call__(self, obj: AnyResource) -> None: + super().__call__(obj) + if obj.kind == "CSIDriver" and obj.metadata and obj.metadata.labels: + # Remove the app label from the CSIDriver to disassociate it from the application + obj.metadata.labels.pop(APP_LABEL, None) + + class ConfigureLivenessPrometheus(Patch): """Configure liveness probe for Prometheus.""" diff --git a/src/manifests_cephfs.py b/src/manifests_cephfs.py index 0c56626..e4135db 100644 --- a/src/manifests_cephfs.py +++ b/src/manifests_cephfs.py @@ -4,18 +4,19 @@ import logging from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, cast from lightkube.codecs import AnyResource from lightkube.resources.core_v1 import Secret from lightkube.resources.storage_v1 import StorageClass -from ops.manifests import Addition, ConfigRegistry, ManifestLabel, Patch +from ops.manifests import Addition, ConfigRegistry, Patch from ops.manifests.manipulations import Subtraction from manifests_base import ( AdjustNamespace, CephToleration, ConfigureLivenessPrometheus, + ManifestLabelExcluder, SafeManifest, update_storage_params, ) @@ -174,6 +175,11 @@ def parameter_list(self) -> List[CephStorageClassParameters]: def __call__(self) -> List[AnyResource]: """Craft the storage class object.""" + if cast(SafeManifest, self.manifests).purgeable: + # If we are purging, we may not be able to create any storage classes + # Just return a fake storage class to satisfy delete_manifests method + # which will look up all storage classes installed by this app/manifest + return [StorageClass.from_dict(dict(metadata={}, provisioner=self.PROVISIONER))] return [self.create(class_param) for class_param in self.parameter_list()] @@ -254,7 +260,7 @@ def __init__(self, charm: "CephCsiCharm"): "upstream/cephfs", [ StorageSecret(self), - ManifestLabel(self), + ManifestLabelExcluder(self), ConfigRegistry(self), ProvisionerAdjustments(self), CephStorageClass(self), @@ -290,7 +296,7 @@ def config(self) -> Dict: config["namespace"] = self.charm.stored.namespace config["release"] = config.pop("release", None) - config["enabled"] = self.purgeable or config.get("cephfs-enable", None) + config["enabled"] = config.get("cephfs-enable", None) return config def evaluate(self) -> Optional[str]: diff --git a/src/manifests_rbd.py b/src/manifests_rbd.py index 5c791d4..f32b42e 100644 --- a/src/manifests_rbd.py +++ b/src/manifests_rbd.py @@ -8,12 +8,13 @@ from lightkube.codecs import AnyResource from lightkube.resources.core_v1 import Secret from lightkube.resources.storage_v1 import StorageClass -from ops.manifests import Addition, ConfigRegistry, ManifestLabel, Manifests, Patch +from ops.manifests import Addition, ConfigRegistry, Manifests, Patch from manifests_base import ( AdjustNamespace, CephToleration, ConfigureLivenessPrometheus, + ManifestLabelExcluder, SafeManifest, update_storage_params, ) @@ -166,7 +167,7 @@ def __init__(self, charm: "CephCsiCharm"): "upstream/rbd", [ StorageSecret(self), - ManifestLabel(self), + ManifestLabelExcluder(self), ConfigRegistry(self), ProvisionerAdjustments(self), CephStorageClass(self, "xfs"), # creates ceph-xfs diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 601b5a9..1d29bef 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -3,6 +3,7 @@ import unittest.mock as mock import pytest +from lightkube import ApiError @pytest.fixture(autouse=True) @@ -11,6 +12,17 @@ def lk_client(): yield mock_lightkube.return_value +@pytest.fixture() +def api_error_klass(): + class TestApiError(ApiError): + status = mock.MagicMock() + + def __init__(self): + pass + + yield TestApiError + + # Autouse to prevent calling out to the k8s API via lightkube client in charm @pytest.fixture(autouse=True) def lk_charm_client(): diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index d996691..dd46f28 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -10,13 +10,15 @@ from subprocess import SubprocessError import charms.operator_libs_linux.v0.apt as apt +import ops import pytest from lightkube.core.exceptions import ApiError from lightkube.resources.storage_v1 import StorageClass -from ops.manifests import ManifestClientError +from ops.manifests import HashableResource, ManifestClientError, ManifestLabel, ResourceAnalysis from ops.testing import Harness from charm import CephCsiCharm, CephFilesystem +from manifests_config import EncryptConfig @pytest.fixture @@ -161,6 +163,7 @@ def mocked_handlers(): "configure_ceph_cli", "enforce_cephfs_enabled", "evaluate_manifests", + "prevent_collisions", "install_manifests", "_update_status", ] @@ -414,7 +417,7 @@ def test_cleanup(_purge_manifest, harness, caplog): def test_action_list_versions(harness): harness.begin() - mock_event = mock.MagicMock() + mock_event = mock.MagicMock(spec=ops.ActionEvent) assert harness.charm._list_versions(mock_event) is None expected_results = { "cephfs-versions": "v3.11.0\nv3.10.2\nv3.10.1\nv3.10.0\nv3.9.0\nv3.8.1\nv3.8.0\nv3.7.2", @@ -427,11 +430,13 @@ def test_action_list_versions(harness): @mock.patch("charm.CephCsiCharm.configure_ceph_cli", mock.MagicMock()) @mock.patch("charm.CephCsiCharm.get_ceph_fsid", mock.MagicMock(return_value="12345")) @mock.patch("charm.CephCsiCharm.get_ceph_fs_list", mock.MagicMock(return_value={})) -def test_action_manifest_resources(harness): - harness.begin() +def test_action_manifest_resources(harness, lk_client, api_error_klass): + harness.begin_with_initial_hooks() + not_found = api_error_klass() + not_found.status.code = 404 + not_found.status.message = "Not Found" + lk_client.get.side_effect = not_found - mock_event = mock.MagicMock() - assert harness.charm._list_resources(mock_event) is None expected_results = { "config-missing": "ConfigMap/default/ceph-csi-encryption-kms-config", "rbd-missing": "\n".join( @@ -454,62 +459,62 @@ def test_action_manifest_resources(harness): ] ), } - mock_event.set_results.assert_called_once_with(expected_results) + action = harness.run_action("list-resources", {}) + assert action.results == expected_results - mock_event.set_results.reset_mock() - assert harness.charm._scrub_resources(mock_event) is None - mock_event.set_results.assert_called_with(expected_results) + action = harness.run_action("scrub-resources", {}) + assert action.results == expected_results - mock_event.set_results.reset_mock() - assert harness.charm._sync_resources(mock_event) is None - mock_event.set_results.assert_called_with(expected_results) + action = harness.run_action("sync-resources", {}) + assert action.results == expected_results @mock.patch("charm.CephCsiCharm.configure_ceph_cli", mock.MagicMock()) @mock.patch("charm.CephCsiCharm.get_ceph_fsid", mock.MagicMock(return_value="12345")) @mock.patch("charm.CephCsiCharm.get_ceph_fs_list", mock.MagicMock(return_value={})) -def test_action_sync_resources_install_failure(harness, lk_client): - harness.begin() +def test_action_sync_resources_install_failure(harness, lk_client, api_error_klass): + harness.begin_with_initial_hooks() + not_found = api_error_klass() + not_found.status.code = 404 + not_found.status.message = "Not Found" + lk_client.get.side_effect = not_found - mock_event = mock.MagicMock() - expected_results = {"result": "Failed to sync missing resources: API Server Unavailable"} lk_client.apply.side_effect = ManifestClientError("API Server Unavailable") - assert harness.charm._sync_resources(mock_event) is None - mock_event.set_results.assert_called_with(expected_results) + action = harness.run_action("sync-resources", {}) + + lk_client.delete.assert_not_called() + assert action.results["result"] == "Failed to sync missing resources: API Server Unavailable" def test_action_delete_storage_class_unknown(harness, lk_client): harness.begin_with_initial_hooks() - mock_event = mock.MagicMock() - expected_results = "Invalid storage class name. Must be one of: cephfs, ceph-xfs, ceph-ext4" lk_client.delete.side_effect = ManifestClientError("API Server Unavailable") - assert harness.charm._delete_storage_class(mock_event) is None - mock_event.fail.assert_called_with(expected_results) + with pytest.raises(ops.testing.ActionFailed) as exc: + harness.run_action("delete-storage-class", {"name": ""}) + lk_client.delete.assert_not_called() + assert ( + str(exc.value) == "Invalid storage class name. Must be one of: cephfs, ceph-xfs, ceph-ext4" + ) def test_action_delete_storage_class_api_error(harness, lk_client): harness.begin_with_initial_hooks() - mock_event = mock.MagicMock() - mock_event.params = {"name": "cephfs"} lk_client.delete.side_effect = ManifestClientError("API Server Unavailable") - assert harness.charm._delete_storage_class(mock_event) is None + with pytest.raises(ops.testing.ActionFailed) as exc: + harness.run_action("delete-storage-class", {"name": "cephfs"}) lk_client.delete.assert_called_once_with(StorageClass, name="cephfs") - mock_event.fail.assert_called_with("Failed to delete storage class: API Server Unavailable") + assert str(exc.value) == "Failed to delete storage class: API Server Unavailable" def test_action_delete_storage_class(harness, lk_client): harness.begin_with_initial_hooks() - mock_event = mock.MagicMock() - mock_event.params = {"name": "cephfs"} - assert harness.charm._delete_storage_class(mock_event) is None + action = harness.run_action("delete-storage-class", {"name": "cephfs"}) + assert {"result": "Successfully deleted StorageClass/cephfs"} == action.results lk_client.delete.assert_called_once_with(StorageClass, name="cephfs") - mock_event.set_results.assert_called_with( - {"result": "Successfully deleted StorageClass/cephfs"} - ) @mock.patch("charm.CephCsiCharm.ceph_cli", mock.MagicMock(side_effect=SubprocessError)) @@ -580,6 +585,38 @@ def test_enforce_cephfs_enabled(mock_purge, harness): assert harness.charm.unit.status.name == "active" +@mock.patch("charm.CephCsiCharm.ceph_context", new_callable=mock.PropertyMock) +def test_prevent_collisions(ceph_context, harness, caplog): + ceph_context.return_value = {} + harness.begin() + + config_manifest = harness.charm.collector.manifests["config"] + encrypt_config = EncryptConfig(config_manifest)() + ManifestLabel(config_manifest)(encrypt_config) + encrypt_config.metadata.labels["juju.io/application"] = "ceph-csi-alternate" + conflict = HashableResource(encrypt_config) + caplog.clear() + + expected_results = [ + ResourceAnalysis("config", {conflict}, set(), set(), set()), + ResourceAnalysis("rbd", set(), set(), set(), set()), + ResourceAnalysis("cephfs", set(), set(), set(), set()), + ] + with mock.patch.object(harness.charm.collector, "list_resources") as mock_list_resources: + mock_list_resources.return_value = expected_results + + with reconcile_this(harness, lambda e: harness.charm.prevent_collisions(e)): + harness.set_leader(True) + assert harness.charm.unit.status.name == "blocked" + assert ( + harness.charm.unit.status.message + == "1 Kubernetes resource collision (action: list-resources)" + ) + assert "1 Kubernetes resource collision (action: list-resources)" in caplog.messages + assert " Collision count in 'config' is 1" in caplog.messages + assert " ConfigMap/ceph-csi-encryption-kms-config" in caplog.messages + + @mock.patch("charm.CephCsiCharm.ceph_context", new_callable=mock.PropertyMock) def test_update_status_waiting(ceph_context, harness): harness.set_leader(True) diff --git a/tests/unit/test_manifests_cephfs.py b/tests/unit/test_manifests_cephfs.py index 98d1b1e..6cbebed 100644 --- a/tests/unit/test_manifests_cephfs.py +++ b/tests/unit/test_manifests_cephfs.py @@ -101,6 +101,7 @@ def test_ceph_provisioner_adjustment_modelled(caplog): def test_ceph_storage_class_modelled(caplog): caplog.set_level(logging.INFO) manifest = mock.MagicMock() + manifest.purgeable = False csc = CephStorageClass(manifest) alt_ns = "diff-ns" @@ -140,6 +141,20 @@ def test_ceph_storage_class_modelled(caplog): assert f"Modelling storage class sc='{TEST_CEPH_FS_ALT.name}'" in caplog.text +def test_ceph_storage_class_purgeable(caplog): + caplog.set_level(logging.INFO) + manifest = mock.MagicMock() + manifest.purgeable = True + csc = CephStorageClass(manifest) + + caplog.clear() + expected = StorageClass( + metadata=ObjectMeta(), + provisioner=CephStorageClass.PROVISIONER, + ) + assert csc() == [expected] + + def test_manifest_evaluation(caplog): caplog.set_level(logging.INFO) charm = mock.MagicMock()