Skip to content

Commit 81ddaff

Browse files
committed
CRUSOE-23508: add custom node lifecycle controller to delete dangling VolumeAttachments
1 parent 84587a8 commit 81ddaff

File tree

6 files changed

+399
-5
lines changed

6 files changed

+399
-5
lines changed

Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ lint: ## Verifies `golangci-lint` passes
6565
lint-ci: ## Verifies `golangci-lint` passes and outputs in CI-friendly format
6666
@echo "==> $@"
6767
@golangci-lint version
68-
@golangci-lint run ./... --out-format code-climate > golangci-lint.json
68+
@golangci-lint run ./... --timeout=10m --out-format code-climate > golangci-lint.json
6969

7070
.PHONY: build
7171
build: ## Builds the executable and places it in the build dir

cmd/crusoe-cloud-controller-manager/main.go

+10
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@ import (
55
"os"
66

77
cloudcontrollermanager "github.com/crusoecloud/crusoe-cloud-controller-manager/internal"
8+
"github.com/crusoecloud/crusoe-cloud-controller-manager/internal/node"
89
"k8s.io/apimachinery/pkg/util/wait"
910
cloudprovider "k8s.io/cloud-provider"
1011
"k8s.io/cloud-provider/app"
1112
"k8s.io/cloud-provider/app/config"
13+
"k8s.io/cloud-provider/names"
1214
"k8s.io/cloud-provider/options"
1315
"k8s.io/component-base/cli/flag"
1416
"k8s.io/component-base/logs"
@@ -30,6 +32,14 @@ func main() {
3032

3133
cloudcontrollermanager.RegisterCloudProvider()
3234

35+
// Set the default CloudNodeLifecycleController to our custom implementation
36+
app.DefaultInitFuncConstructors[names.CloudNodeLifecycleController] = app.ControllerInitFuncConstructor{
37+
InitContext: app.ControllerInitContext{
38+
ClientName: "node-controller",
39+
},
40+
Constructor: node.StartCloudNodeLifecycleControllerWrapper,
41+
}
42+
3343
command := app.NewCloudControllerManagerCommand(
3444
opts,
3545
doInitializer,

go.mod

+2-2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ require (
1212
k8s.io/client-go v0.31.3
1313
k8s.io/cloud-provider v0.30.4
1414
k8s.io/component-base v0.30.4
15+
k8s.io/component-helpers v0.30.4
16+
k8s.io/controller-manager v0.30.4
1517
k8s.io/klog/v2 v2.130.1
1618
)
1719

@@ -102,8 +104,6 @@ require (
102104
gopkg.in/yaml.v2 v2.4.0 // indirect
103105
gopkg.in/yaml.v3 v3.0.1 // indirect
104106
k8s.io/apiserver v0.30.4 // indirect
105-
k8s.io/component-helpers v0.30.4 // indirect
106-
k8s.io/controller-manager v0.30.4 // indirect
107107
k8s.io/kms v0.30.4 // indirect
108108
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect
109109
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect

internal/instances/instances.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ func (i *Instances) InstanceShutdown(ctx context.Context, node *v1.Node) (bool,
125125
return i.InstanceShutdownByProviderID(ctx, providerID)
126126
}
127127

128-
//nolint:cyclop // must perform all checks before returning instance does not exists
128+
//nolint:cyclop // must perform all checks before returning instance does not exist
129129
func (i *Instances) InstanceExistsByProviderID(ctx context.Context, providerID string) (bool, error) {
130130
inst, responseBody, err := i.apiClient.GetInstanceByID(ctx, getInstanceIDFromProviderID(providerID))
131131
if responseBody != nil {
@@ -152,7 +152,7 @@ func (i *Instances) InstanceExistsByProviderID(ctx context.Context, providerID s
152152
timeDiff := currTime.Sub(firstSeenTime)
153153
if inst == nil || (responseBody != nil && responseBody.StatusCode == 404) {
154154
if timeDiff < InstanceNotFoundInterval {
155-
klog.Infof("timediff: %v", timeDiff)
155+
klog.Infof("Node %v last seen: %v", providerID, timeDiff)
156156
klog.Infof("Node %v not seen for less than 2 minutes", providerID)
157157

158158
return true, nil
+297
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,297 @@
1+
package node
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"time"
8+
9+
v1 "k8s.io/api/core/v1"
10+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
11+
"k8s.io/apimachinery/pkg/labels"
12+
"k8s.io/apimachinery/pkg/types"
13+
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
14+
"k8s.io/apimachinery/pkg/util/wait"
15+
coreinformers "k8s.io/client-go/informers/core/v1"
16+
clientset "k8s.io/client-go/kubernetes"
17+
"k8s.io/client-go/kubernetes/scheme"
18+
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
19+
v1lister "k8s.io/client-go/listers/core/v1"
20+
"k8s.io/client-go/tools/record"
21+
cloudprovider "k8s.io/cloud-provider"
22+
cloudproviderapi "k8s.io/cloud-provider/api"
23+
cloudnodeutil "k8s.io/cloud-provider/node/helpers"
24+
controllersmetrics "k8s.io/component-base/metrics/prometheus/controllers"
25+
nodeutil "k8s.io/component-helpers/node/util"
26+
"k8s.io/klog/v2"
27+
)
28+
29+
const (
30+
deleteNodeEvent = "DeletingNode"
31+
)
32+
33+
//nolint:gochecknoglobals // can't construct const structs
34+
var ShutdownTaint = &v1.Taint{
35+
Key: cloudproviderapi.TaintNodeShutdown,
36+
Effect: v1.TaintEffectNoSchedule,
37+
}
38+
39+
var ErrInstancesNotSupported = errors.New("cloud provider does not support instances")
40+
41+
var (
42+
ErrNoCloudProvider = errors.New("no cloud provider provided")
43+
ErrNilKubernetesClient = errors.New("kubernetes client is nil")
44+
)
45+
46+
// CloudNodeLifecycleController is responsible for deleting/updating kubernetes
47+
// nodes that have been deleted/shutdown on the cloud provider.
48+
type CloudNodeLifecycleController struct {
49+
kubeClient clientset.Interface
50+
nodeLister v1lister.NodeLister
51+
52+
broadcaster record.EventBroadcaster
53+
recorder record.EventRecorder
54+
55+
cloud cloudprovider.Interface
56+
57+
// Value controlling NodeController monitoring period, i.e. how often does NodeController
58+
// check node status posted from kubelet. This value should be lower than nodeMonitorGracePeriod
59+
// set in controller-manager
60+
nodeMonitorPeriod time.Duration
61+
}
62+
63+
func NewCloudNodeLifecycleController(
64+
nodeInformer coreinformers.NodeInformer,
65+
kubeClient clientset.Interface,
66+
cloud cloudprovider.Interface,
67+
nodeMonitorPeriod time.Duration,
68+
) (*CloudNodeLifecycleController, error) {
69+
eventBroadcaster := record.NewBroadcaster()
70+
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-lifecycle-controller"})
71+
72+
if kubeClient == nil {
73+
return nil, ErrNilKubernetesClient
74+
}
75+
76+
if cloud == nil {
77+
return nil, ErrNoCloudProvider
78+
}
79+
80+
_, instancesSupported := cloud.Instances()
81+
_, instancesV2Supported := cloud.InstancesV2()
82+
if !instancesSupported && !instancesV2Supported {
83+
return nil, ErrInstancesNotSupported
84+
}
85+
86+
c := &CloudNodeLifecycleController{
87+
kubeClient: kubeClient,
88+
nodeLister: nodeInformer.Lister(),
89+
broadcaster: eventBroadcaster,
90+
recorder: recorder,
91+
cloud: cloud,
92+
nodeMonitorPeriod: nodeMonitorPeriod,
93+
}
94+
95+
return c, nil
96+
}
97+
98+
// Run starts the main loop for this controller. Run is blocking so should
99+
// be called via a goroutine.
100+
func (c *CloudNodeLifecycleController) Run(ctx context.Context,
101+
controllerManagerMetrics *controllersmetrics.ControllerManagerMetrics,
102+
) {
103+
defer utilruntime.HandleCrash()
104+
controllerManagerMetrics.ControllerStarted("cloud-node-lifecycle")
105+
defer controllerManagerMetrics.ControllerStopped("cloud-node-lifecycle")
106+
107+
// Start event processing pipeline.
108+
klog.Info("Sending events to api server")
109+
c.broadcaster.StartStructuredLogging(0)
110+
c.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.kubeClient.CoreV1().Events("")})
111+
defer c.broadcaster.Shutdown()
112+
113+
// The following loops run communicate with the APIServer with a worst case complexity
114+
// of O(num_nodes) per cycle. These functions are justified here because these events fire
115+
// very infrequently. DO NOT MODIFY this to perform frequent operations.
116+
117+
// Start a loop to periodically check if any nodes have been
118+
// deleted or shutdown from the cloudprovider
119+
wait.UntilWithContext(ctx, c.MonitorNodes, c.nodeMonitorPeriod)
120+
}
121+
122+
// MonitorNodes checks to see if nodes in the cluster have been deleted
123+
// or shutdown. If deleted, it deletes the node resource. If shutdown it
124+
// applies a shutdown taint to the node.
125+
//
126+
//nolint:funlen,cyclop // copied from upstream
127+
func (c *CloudNodeLifecycleController) MonitorNodes(ctx context.Context) {
128+
nodes, err := c.nodeLister.List(labels.Everything())
129+
if err != nil {
130+
klog.Errorf("error listing nodes from cache: %s", err)
131+
132+
return
133+
}
134+
135+
for _, node := range nodes {
136+
// Default NodeReady status to v1.ConditionUnknown
137+
status := v1.ConditionUnknown
138+
if _, c := nodeutil.GetNodeCondition(&node.Status, v1.NodeReady); c != nil {
139+
status = c.Status
140+
}
141+
142+
if status == v1.ConditionTrue {
143+
// if taint exist remove taint
144+
err = cloudnodeutil.RemoveTaintOffNode(c.kubeClient, node.Name, node, ShutdownTaint)
145+
if err != nil {
146+
klog.Errorf("error patching node taints: %v", err)
147+
}
148+
149+
continue
150+
}
151+
152+
// At this point the node has NotReady status, we need to check if the node has been removed
153+
// from the cloud provider. If node cannot be found in cloudprovider, then delete the node
154+
exists, err := c.ensureNodeExistsByProviderID(ctx, node)
155+
if err != nil {
156+
klog.Errorf("error checking if node %s exists: %v", node.Name, err)
157+
158+
continue
159+
}
160+
161+
//nolint:nestif // not that complex
162+
if !exists {
163+
// Current node does not exist, we should delete it, its taints do not matter anymore
164+
165+
klog.V(2).Infof("deleting node since it is no longer present in cloud provider: %s", node.Name)
166+
167+
ref := &v1.ObjectReference{
168+
Kind: "Node",
169+
Name: node.Name,
170+
UID: node.UID,
171+
Namespace: "",
172+
}
173+
174+
c.recorder.Eventf(ref, v1.EventTypeNormal, deleteNodeEvent,
175+
"Deleting node %s because it does not exist in the cloud provider", node.Name)
176+
177+
err := CleanUpVolumeAttachmentsForNode(ctx, c.kubeClient, node.Name)
178+
if err != nil {
179+
klog.Errorf("failed to clean up volume attachments for node %s: %v", node.Name, err)
180+
}
181+
182+
if err := c.kubeClient.CoreV1().Nodes().Delete(ctx, node.Name, metav1.DeleteOptions{}); err != nil {
183+
klog.Errorf("unable to delete node %q: %v", node.Name, err)
184+
}
185+
} else {
186+
// Node exists. We need to check this to get taint working in similar in all cloudproviders
187+
// current problem is that shutdown nodes are not working in similar way ie. all cloudproviders
188+
// does not delete node from kubernetes cluster when instance it is shutdown see issue #46442
189+
shutdown, err := c.shutdownInCloudProvider(ctx, node)
190+
if err != nil {
191+
klog.Errorf("error checking if node %s is shutdown: %v", node.Name, err)
192+
}
193+
194+
if shutdown && err == nil {
195+
// if node is shutdown add shutdown taint
196+
err = cloudnodeutil.AddOrUpdateTaintOnNode(c.kubeClient, node.Name, ShutdownTaint)
197+
if err != nil {
198+
klog.Errorf("failed to apply shutdown taint to node %s, it may have been deleted.", node.Name)
199+
}
200+
}
201+
}
202+
}
203+
}
204+
205+
// getProviderID returns the provider ID for the node. If Node CR has no provider ID,
206+
// it will be the one from the cloud provider.
207+
func (c *CloudNodeLifecycleController) getProviderID(ctx context.Context, node *v1.Node) (string, error) {
208+
if node.Spec.ProviderID != "" {
209+
return node.Spec.ProviderID, nil
210+
}
211+
212+
if instanceV2, ok := c.cloud.InstancesV2(); ok {
213+
metadata, err := instanceV2.InstanceMetadata(ctx, node)
214+
if err != nil {
215+
return "", fmt.Errorf("failed to get instance metadata: %w", err)
216+
}
217+
218+
return metadata.ProviderID, nil
219+
}
220+
221+
providerID, err := cloudprovider.GetInstanceProviderID(ctx, c.cloud, types.NodeName(node.Name))
222+
if err != nil {
223+
return "", fmt.Errorf("failed to get instance provider id: %w", err)
224+
}
225+
226+
return providerID, nil
227+
}
228+
229+
// shutdownInCloudProvider returns true if the node is shutdown on the cloud provider.
230+
func (c *CloudNodeLifecycleController) shutdownInCloudProvider(ctx context.Context, node *v1.Node) (bool, error) {
231+
if instanceV2, ok := c.cloud.InstancesV2(); ok {
232+
shutDown, err := instanceV2.InstanceShutdown(ctx, node)
233+
if err != nil {
234+
return shutDown, fmt.Errorf("failed to get instance shutdown status: %w", err)
235+
}
236+
237+
return shutDown, nil
238+
}
239+
240+
instances, ok := c.cloud.Instances()
241+
if !ok {
242+
return false, errors.New("cloud provider does not support instances")
243+
}
244+
245+
providerID, err := c.getProviderID(ctx, node)
246+
if err != nil {
247+
if errors.Is(err, cloudprovider.InstanceNotFound) {
248+
return false, nil
249+
}
250+
251+
return false, fmt.Errorf("failed to get instance provider id: %w", err)
252+
}
253+
254+
shutdown, err := instances.InstanceShutdownByProviderID(ctx, providerID)
255+
if err != nil {
256+
if errors.Is(err, cloudprovider.NotImplemented) {
257+
return false, nil
258+
}
259+
260+
return false, fmt.Errorf("failed to get instance shutdown status: %w", err)
261+
}
262+
263+
return shutdown, nil
264+
}
265+
266+
// ensureNodeExistsByProviderID checks if the instance exists by the provider id,.
267+
func (c *CloudNodeLifecycleController) ensureNodeExistsByProviderID(ctx context.Context, node *v1.Node) (bool, error) {
268+
if instanceV2, ok := c.cloud.InstancesV2(); ok {
269+
exists, err := instanceV2.InstanceExists(ctx, node)
270+
if err != nil {
271+
return exists, fmt.Errorf("failed to get instance existence: %w", err)
272+
}
273+
274+
return exists, nil
275+
}
276+
277+
instances, ok := c.cloud.Instances()
278+
if !ok {
279+
return false, errors.New("instances interface not supported in the cloud provider")
280+
}
281+
282+
providerID, err := c.getProviderID(ctx, node)
283+
if err != nil {
284+
if errors.Is(err, cloudprovider.InstanceNotFound) {
285+
return false, nil
286+
}
287+
288+
return false, fmt.Errorf("failed to get instance provider id: %w", err)
289+
}
290+
291+
exists, err := instances.InstanceExistsByProviderID(ctx, providerID)
292+
if err != nil {
293+
return exists, fmt.Errorf("failed to get instance existence by provider ID: %w", err)
294+
}
295+
296+
return exists, nil
297+
}

0 commit comments

Comments
 (0)