Skip to content

Commit

Permalink
Reconciler will detect and block on kubernetes resource collisions
Browse files Browse the repository at this point in the history
  • Loading branch information
addyess committed Feb 5, 2025
1 parent 161dfbc commit fb9751f
Show file tree
Hide file tree
Showing 8 changed files with 156 additions and 45 deletions.
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
ops >= 1.2.0
ops.manifest >= 1.1.3, < 2.0
# ops.manifest >= 1.1.3, < 2.0
ops.manifest @ git+https://github.com/canonical/ops-lib-manifest@KU-2644/enhance-list-resources
jinja2
pyyaml

Expand Down
36 changes: 32 additions & 4 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from lightkube.core.exceptions import ApiError
from lightkube.resources.core_v1 import Namespace
from lightkube.resources.storage_v1 import StorageClass
from ops.manifests import Collector, ManifestClientError
from ops.manifests import Collector, ManifestClientError, ResourceAnalysis

from manifests_base import Manifests, SafeManifest
from manifests_cephfs import CephFilesystem, CephFSManifests, CephStorageClass
Expand Down Expand Up @@ -84,12 +84,12 @@ def _list_versions(self, event: ops.ActionEvent) -> None:
def _list_resources(self, event: ops.ActionEvent) -> None:
manifests = event.params.get("manifest", "")
resources = event.params.get("resources", "")
return self.collector.list_resources(event, manifests, resources)
self.collector.list_resources(event, manifests, resources)

def _scrub_resources(self, event: ops.ActionEvent) -> None:
manifests = event.params.get("manifest", "")
resources = event.params.get("resources", "")
return self.collector.scrub_resources(event, manifests, resources)
self.collector.scrub_resources(event, manifests, resources)

def _sync_resources(self, event: ops.ActionEvent) -> None:
manifests = event.params.get("manifest", "")
Expand Down Expand Up @@ -310,6 +310,27 @@ def enforce_cephfs_enabled(self) -> None:
self.unit.status = ops.MaintenanceStatus("Disabling CephFS")
self._purge_manifest_by_name("cephfs")

def prevent_collisions(self, event: ops.EventBase) -> None:
"""Prevent manifest collisions."""
if self.unit.is_leader():
self.unit.status = ops.MaintenanceStatus("Detecting manifest collisions")
analyses: List[ResourceAnalysis] = self.collector.list_resources(event, "", "")
count = sum(len(a.conflicting) for a in analyses)
if count > 0:
msg = f"{count} Kubernetes resource collision{'s'[:count^1]} (action: list-resources)"
logger.error(msg)
for analysis in analyses:
if analysis.conflicting:
logger.error(
" Collision count in '%s' is %d",
analysis.manifest,
len(analysis.conflicting),
)
for _ in sorted(map(str, analysis.conflicting)):
logger.error(" %s", _)
status.add(ops.BlockedStatus(msg))
raise status.ReconcilerError(msg)

def evaluate_manifests(self) -> int:
"""Evaluate all manifests."""
self.unit.status = ops.MaintenanceStatus("Evaluating CephCSI")
Expand Down Expand Up @@ -351,7 +372,13 @@ def check_ceph_client(self) -> None:
:return: `True` if all the data successfully loaded, otherwise `False`
"""
self.unit.status = ops.MaintenanceStatus("Checking Relations")
if not self.model.get_relation(self.CEPH_CLIENT_RELATION):
try:
relation = self.model.get_relation(self.CEPH_CLIENT_RELATION)
except ops.model.TooManyRelatedAppsError:
status.add(ops.BlockedStatus("Multiple ceph-client relations"))
raise status.ReconcilerError("Multiple ceph-client relations")

if not relation:
status.add(ops.BlockedStatus("Missing relation: ceph-client"))
raise status.ReconcilerError("Missing relation: ceph-client")

Expand Down Expand Up @@ -379,6 +406,7 @@ def reconcile(self, event: ops.EventBase) -> None:
self.check_ceph_client()
self.configure_ceph_cli()
self.enforce_cephfs_enabled()
self.prevent_collisions(event)
hash = self.evaluate_manifests()
self.install_manifests(config_hash=hash)
self._update_status()
Expand Down
13 changes: 12 additions & 1 deletion src/manifests_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
from lightkube.codecs import AnyResource
from lightkube.core.resource import NamespacedResource
from lightkube.models.core_v1 import Toleration
from ops.manifests import Manifests, Patch
from ops.manifests import ManifestLabel, Manifests, Patch
from ops.manifests.literals import APP_LABEL

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -35,6 +36,16 @@ def __call__(self, obj: AnyResource) -> None:
obj.metadata.namespace = ns


class ManifestLabelExcluder(ManifestLabel):
"""Exclude applying labels to CSIDriver."""

def __call__(self, obj: AnyResource) -> None:
super().__call__(obj)
if obj.kind == "CSIDriver" and obj.metadata and obj.metadata.labels:
# Remove the app label from the CSIDriver to disassociate it from the application
obj.metadata.labels.pop(APP_LABEL, None)


class ConfigureLivenessPrometheus(Patch):
"""Configure liveness probe for Prometheus."""

Expand Down
14 changes: 10 additions & 4 deletions src/manifests_cephfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,19 @@

import logging
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, cast

from lightkube.codecs import AnyResource
from lightkube.resources.core_v1 import Secret
from lightkube.resources.storage_v1 import StorageClass
from ops.manifests import Addition, ConfigRegistry, ManifestLabel, Patch
from ops.manifests import Addition, ConfigRegistry, Patch
from ops.manifests.manipulations import Subtraction

from manifests_base import (
AdjustNamespace,
CephToleration,
ConfigureLivenessPrometheus,
ManifestLabelExcluder,
SafeManifest,
update_storage_params,
)
Expand Down Expand Up @@ -174,6 +175,11 @@ def parameter_list(self) -> List[CephStorageClassParameters]:

def __call__(self) -> List[AnyResource]:
"""Craft the storage class object."""
if cast(SafeManifest, self.manifests).purgeable:
# If we are purging, we may not be able to create any storage classes
# Just return a fake storage class to satisfy delete_manifests method
# which will look up all storage classes installed by this app/manifest
return [StorageClass.from_dict(dict(metadata={}, provisioner=self.PROVISIONER))]
return [self.create(class_param) for class_param in self.parameter_list()]


Expand Down Expand Up @@ -254,7 +260,7 @@ def __init__(self, charm: "CephCsiCharm"):
"upstream/cephfs",
[
StorageSecret(self),
ManifestLabel(self),
ManifestLabelExcluder(self),
ConfigRegistry(self),
ProvisionerAdjustments(self),
CephStorageClass(self),
Expand Down Expand Up @@ -290,7 +296,7 @@ def config(self) -> Dict:

config["namespace"] = self.charm.stored.namespace
config["release"] = config.pop("release", None)
config["enabled"] = self.purgeable or config.get("cephfs-enable", None)
config["enabled"] = config.get("cephfs-enable", None)
return config

def evaluate(self) -> Optional[str]:
Expand Down
5 changes: 3 additions & 2 deletions src/manifests_rbd.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
from lightkube.codecs import AnyResource
from lightkube.resources.core_v1 import Secret
from lightkube.resources.storage_v1 import StorageClass
from ops.manifests import Addition, ConfigRegistry, ManifestLabel, Manifests, Patch
from ops.manifests import Addition, ConfigRegistry, Manifests, Patch

from manifests_base import (
AdjustNamespace,
CephToleration,
ConfigureLivenessPrometheus,
ManifestLabelExcluder,
SafeManifest,
update_storage_params,
)
Expand Down Expand Up @@ -166,7 +167,7 @@ def __init__(self, charm: "CephCsiCharm"):
"upstream/rbd",
[
StorageSecret(self),
ManifestLabel(self),
ManifestLabelExcluder(self),
ConfigRegistry(self),
ProvisionerAdjustments(self),
CephStorageClass(self, "xfs"), # creates ceph-xfs
Expand Down
12 changes: 12 additions & 0 deletions tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import unittest.mock as mock

import pytest
from lightkube import ApiError


@pytest.fixture(autouse=True)
Expand All @@ -11,6 +12,17 @@ def lk_client():
yield mock_lightkube.return_value


@pytest.fixture()
def api_error_klass():
class TestApiError(ApiError):
status = mock.MagicMock()

def __init__(self):
pass

yield TestApiError


# Autouse to prevent calling out to the k8s API via lightkube client in charm
@pytest.fixture(autouse=True)
def lk_charm_client():
Expand Down
103 changes: 70 additions & 33 deletions tests/unit/test_charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@
from subprocess import SubprocessError

import charms.operator_libs_linux.v0.apt as apt
import ops
import pytest
from lightkube.core.exceptions import ApiError
from lightkube.resources.storage_v1 import StorageClass
from ops.manifests import ManifestClientError
from ops.manifests import HashableResource, ManifestClientError, ManifestLabel, ResourceAnalysis
from ops.testing import Harness

from charm import CephCsiCharm, CephFilesystem
from manifests_config import EncryptConfig


@pytest.fixture
Expand Down Expand Up @@ -161,6 +163,7 @@ def mocked_handlers():
"configure_ceph_cli",
"enforce_cephfs_enabled",
"evaluate_manifests",
"prevent_collisions",
"install_manifests",
"_update_status",
]
Expand Down Expand Up @@ -414,7 +417,7 @@ def test_cleanup(_purge_manifest, harness, caplog):
def test_action_list_versions(harness):
harness.begin()

mock_event = mock.MagicMock()
mock_event = mock.MagicMock(spec=ops.ActionEvent)
assert harness.charm._list_versions(mock_event) is None
expected_results = {
"cephfs-versions": "v3.11.0\nv3.10.2\nv3.10.1\nv3.10.0\nv3.9.0\nv3.8.1\nv3.8.0\nv3.7.2",
Expand All @@ -427,11 +430,13 @@ def test_action_list_versions(harness):
@mock.patch("charm.CephCsiCharm.configure_ceph_cli", mock.MagicMock())
@mock.patch("charm.CephCsiCharm.get_ceph_fsid", mock.MagicMock(return_value="12345"))
@mock.patch("charm.CephCsiCharm.get_ceph_fs_list", mock.MagicMock(return_value={}))
def test_action_manifest_resources(harness):
harness.begin()
def test_action_manifest_resources(harness, lk_client, api_error_klass):
harness.begin_with_initial_hooks()
not_found = api_error_klass()
not_found.status.code = 404
not_found.status.message = "Not Found"
lk_client.get.side_effect = not_found

mock_event = mock.MagicMock()
assert harness.charm._list_resources(mock_event) is None
expected_results = {
"config-missing": "ConfigMap/default/ceph-csi-encryption-kms-config",
"rbd-missing": "\n".join(
Expand All @@ -454,62 +459,62 @@ def test_action_manifest_resources(harness):
]
),
}
mock_event.set_results.assert_called_once_with(expected_results)
action = harness.run_action("list-resources", {})
assert action.results == expected_results

mock_event.set_results.reset_mock()
assert harness.charm._scrub_resources(mock_event) is None
mock_event.set_results.assert_called_with(expected_results)
action = harness.run_action("scrub-resources", {})
assert action.results == expected_results

mock_event.set_results.reset_mock()
assert harness.charm._sync_resources(mock_event) is None
mock_event.set_results.assert_called_with(expected_results)
action = harness.run_action("sync-resources", {})
assert action.results == expected_results


@mock.patch("charm.CephCsiCharm.configure_ceph_cli", mock.MagicMock())
@mock.patch("charm.CephCsiCharm.get_ceph_fsid", mock.MagicMock(return_value="12345"))
@mock.patch("charm.CephCsiCharm.get_ceph_fs_list", mock.MagicMock(return_value={}))
def test_action_sync_resources_install_failure(harness, lk_client):
harness.begin()
def test_action_sync_resources_install_failure(harness, lk_client, api_error_klass):
harness.begin_with_initial_hooks()
not_found = api_error_klass()
not_found.status.code = 404
not_found.status.message = "Not Found"
lk_client.get.side_effect = not_found

mock_event = mock.MagicMock()
expected_results = {"result": "Failed to sync missing resources: API Server Unavailable"}
lk_client.apply.side_effect = ManifestClientError("API Server Unavailable")
assert harness.charm._sync_resources(mock_event) is None
mock_event.set_results.assert_called_with(expected_results)
action = harness.run_action("sync-resources", {})

lk_client.delete.assert_not_called()
assert action.results["result"] == "Failed to sync missing resources: API Server Unavailable"


def test_action_delete_storage_class_unknown(harness, lk_client):
harness.begin_with_initial_hooks()

mock_event = mock.MagicMock()
expected_results = "Invalid storage class name. Must be one of: cephfs, ceph-xfs, ceph-ext4"
lk_client.delete.side_effect = ManifestClientError("API Server Unavailable")
assert harness.charm._delete_storage_class(mock_event) is None
mock_event.fail.assert_called_with(expected_results)
with pytest.raises(ops.testing.ActionFailed) as exc:
harness.run_action("delete-storage-class", {"name": ""})

lk_client.delete.assert_not_called()
assert (
str(exc.value) == "Invalid storage class name. Must be one of: cephfs, ceph-xfs, ceph-ext4"
)


def test_action_delete_storage_class_api_error(harness, lk_client):
harness.begin_with_initial_hooks()

mock_event = mock.MagicMock()
mock_event.params = {"name": "cephfs"}
lk_client.delete.side_effect = ManifestClientError("API Server Unavailable")
assert harness.charm._delete_storage_class(mock_event) is None
with pytest.raises(ops.testing.ActionFailed) as exc:
harness.run_action("delete-storage-class", {"name": "cephfs"})
lk_client.delete.assert_called_once_with(StorageClass, name="cephfs")
mock_event.fail.assert_called_with("Failed to delete storage class: API Server Unavailable")
assert str(exc.value) == "Failed to delete storage class: API Server Unavailable"


def test_action_delete_storage_class(harness, lk_client):
harness.begin_with_initial_hooks()

mock_event = mock.MagicMock()
mock_event.params = {"name": "cephfs"}
assert harness.charm._delete_storage_class(mock_event) is None
action = harness.run_action("delete-storage-class", {"name": "cephfs"})
assert {"result": "Successfully deleted StorageClass/cephfs"} == action.results
lk_client.delete.assert_called_once_with(StorageClass, name="cephfs")
mock_event.set_results.assert_called_with(
{"result": "Successfully deleted StorageClass/cephfs"}
)


@mock.patch("charm.CephCsiCharm.ceph_cli", mock.MagicMock(side_effect=SubprocessError))
Expand Down Expand Up @@ -580,6 +585,38 @@ def test_enforce_cephfs_enabled(mock_purge, harness):
assert harness.charm.unit.status.name == "active"


@mock.patch("charm.CephCsiCharm.ceph_context", new_callable=mock.PropertyMock)
def test_prevent_collisions(ceph_context, harness, caplog):
ceph_context.return_value = {}
harness.begin()

config_manifest = harness.charm.collector.manifests["config"]
encrypt_config = EncryptConfig(config_manifest)()
ManifestLabel(config_manifest)(encrypt_config)
encrypt_config.metadata.labels["juju.io/application"] = "ceph-csi-alternate"
conflict = HashableResource(encrypt_config)
caplog.clear()

expected_results = [
ResourceAnalysis("config", {conflict}, set(), set(), set()),
ResourceAnalysis("rbd", set(), set(), set(), set()),
ResourceAnalysis("cephfs", set(), set(), set(), set()),
]
with mock.patch.object(harness.charm.collector, "list_resources") as mock_list_resources:
mock_list_resources.return_value = expected_results

with reconcile_this(harness, lambda e: harness.charm.prevent_collisions(e)):
harness.set_leader(True)
assert harness.charm.unit.status.name == "blocked"
assert (
harness.charm.unit.status.message
== "1 Kubernetes resource collision (action: list-resources)"
)
assert "1 Kubernetes resource collision (action: list-resources)" in caplog.messages
assert " Collision count in 'config' is 1" in caplog.messages
assert " ConfigMap/ceph-csi-encryption-kms-config" in caplog.messages


@mock.patch("charm.CephCsiCharm.ceph_context", new_callable=mock.PropertyMock)
def test_update_status_waiting(ceph_context, harness):
harness.set_leader(True)
Expand Down
Loading

0 comments on commit fb9751f

Please sign in to comment.