Skip to content

Commit 5774862

Browse files
committed
feat: add custom node lifecycle controller to delete dangling VolumeAttachments
1 parent 84587a8 commit 5774862

File tree

4 files changed

+392
-1
lines changed

4 files changed

+392
-1
lines changed

Diff for: cmd/crusoe-cloud-controller-manager/main.go

+10
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@ import (
55
"os"
66

77
cloudcontrollermanager "github.com/crusoecloud/crusoe-cloud-controller-manager/internal"
8+
"github.com/crusoecloud/crusoe-cloud-controller-manager/internal/node"
89
"k8s.io/apimachinery/pkg/util/wait"
910
cloudprovider "k8s.io/cloud-provider"
1011
"k8s.io/cloud-provider/app"
1112
"k8s.io/cloud-provider/app/config"
13+
"k8s.io/cloud-provider/names"
1214
"k8s.io/cloud-provider/options"
1315
"k8s.io/component-base/cli/flag"
1416
"k8s.io/component-base/logs"
@@ -30,6 +32,14 @@ func main() {
3032

3133
cloudcontrollermanager.RegisterCloudProvider()
3234

35+
// Set the default CloudNodeLifecycleController to our custom implementation
36+
app.DefaultInitFuncConstructors[names.CloudNodeLifecycleController] = app.ControllerInitFuncConstructor{
37+
InitContext: app.ControllerInitContext{
38+
ClientName: "node-controller",
39+
},
40+
Constructor: node.StartCloudNodeLifecycleControllerWrapper,
41+
}
42+
3343
command := app.NewCloudControllerManagerCommand(
3444
opts,
3545
doInitializer,

Diff for: internal/instances/instances.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ func (i *Instances) InstanceShutdown(ctx context.Context, node *v1.Node) (bool,
125125
return i.InstanceShutdownByProviderID(ctx, providerID)
126126
}
127127

128-
//nolint:cyclop // must perform all checks before returning instance does not exists
128+
//nolint:cyclop // must perform all checks before returning instance does not exist
129129
func (i *Instances) InstanceExistsByProviderID(ctx context.Context, providerID string) (bool, error) {
130130
inst, responseBody, err := i.apiClient.GetInstanceByID(ctx, getInstanceIDFromProviderID(providerID))
131131
if responseBody != nil {

Diff for: internal/node/node_lifecycle_controller.go

+307
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,307 @@
1+
package node
2+
3+
/*
4+
Copyright 2016 The Kubernetes Authors.
5+
6+
Licensed under the Apache License, Version 2.0 (the "License");
7+
you may not use this file except in compliance with the License.
8+
You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
*/
18+
19+
import (
20+
"context"
21+
"errors"
22+
"fmt"
23+
"time"
24+
25+
v1 "k8s.io/api/core/v1"
26+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27+
"k8s.io/apimachinery/pkg/labels"
28+
"k8s.io/apimachinery/pkg/types"
29+
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
30+
"k8s.io/apimachinery/pkg/util/wait"
31+
coreinformers "k8s.io/client-go/informers/core/v1"
32+
clientset "k8s.io/client-go/kubernetes"
33+
"k8s.io/client-go/kubernetes/scheme"
34+
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
35+
v1lister "k8s.io/client-go/listers/core/v1"
36+
"k8s.io/client-go/tools/record"
37+
cloudprovider "k8s.io/cloud-provider"
38+
cloudproviderapi "k8s.io/cloud-provider/api"
39+
cloudnodeutil "k8s.io/cloud-provider/node/helpers"
40+
controllersmetrics "k8s.io/component-base/metrics/prometheus/controllers"
41+
nodeutil "k8s.io/component-helpers/node/util"
42+
"k8s.io/klog/v2"
43+
)
44+
45+
const (
46+
deleteNodeEvent = "DeletingNode"
47+
)
48+
49+
//nolint:gochecknoglobals // can't construct const structs
50+
var ShutdownTaint = &v1.Taint{
51+
Key: cloudproviderapi.TaintNodeShutdown,
52+
Effect: v1.TaintEffectNoSchedule,
53+
}
54+
55+
var ErrInstancesNotSupported = errors.New("cloud provider does not support instances")
56+
57+
var (
58+
ErrNoCloudProvider = errors.New("no cloud provider provided")
59+
ErrNilKubernetesClient = errors.New("kubernetes client is nil")
60+
)
61+
62+
// CloudNodeLifecycleController is responsible for deleting/updating kubernetes
63+
// nodes that have been deleted/shutdown on the cloud provider.
64+
type CloudNodeLifecycleController struct {
65+
kubeClient clientset.Interface
66+
nodeLister v1lister.NodeLister
67+
68+
broadcaster record.EventBroadcaster
69+
recorder record.EventRecorder
70+
71+
cloud cloudprovider.Interface
72+
73+
// Value controlling NodeController monitoring period, i.e. how often does NodeController
74+
// check node status posted from kubelet. This value should be lower than nodeMonitorGracePeriod
75+
// set in controller-manager
76+
nodeMonitorPeriod time.Duration
77+
}
78+
79+
func NewCloudNodeLifecycleController(
80+
nodeInformer coreinformers.NodeInformer,
81+
kubeClient clientset.Interface,
82+
cloud cloudprovider.Interface,
83+
nodeMonitorPeriod time.Duration,
84+
) (*CloudNodeLifecycleController, error) {
85+
eventBroadcaster := record.NewBroadcaster()
86+
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-lifecycle-controller"})
87+
88+
if kubeClient == nil {
89+
return nil, ErrNilKubernetesClient
90+
}
91+
92+
if cloud == nil {
93+
return nil, ErrNoCloudProvider
94+
}
95+
96+
_, instancesSupported := cloud.Instances()
97+
_, instancesV2Supported := cloud.InstancesV2()
98+
if !instancesSupported && !instancesV2Supported {
99+
return nil, ErrInstancesNotSupported
100+
}
101+
102+
c := &CloudNodeLifecycleController{
103+
kubeClient: kubeClient,
104+
nodeLister: nodeInformer.Lister(),
105+
broadcaster: eventBroadcaster,
106+
recorder: recorder,
107+
cloud: cloud,
108+
nodeMonitorPeriod: nodeMonitorPeriod,
109+
}
110+
111+
return c, nil
112+
}
113+
114+
// Run starts the main loop for this controller. Run is blocking so should
115+
// be called via a goroutine.
116+
func (c *CloudNodeLifecycleController) Run(ctx context.Context,
117+
controllerManagerMetrics *controllersmetrics.ControllerManagerMetrics,
118+
) {
119+
defer utilruntime.HandleCrash()
120+
controllerManagerMetrics.ControllerStarted("cloud-node-lifecycle")
121+
defer controllerManagerMetrics.ControllerStopped("cloud-node-lifecycle")
122+
123+
// Start event processing pipeline.
124+
klog.Info("Sending events to api server")
125+
c.broadcaster.StartStructuredLogging(0)
126+
c.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.kubeClient.CoreV1().Events("")})
127+
defer c.broadcaster.Shutdown()
128+
129+
// The following loops run communicate with the APIServer with a worst case complexity
130+
// of O(num_nodes) per cycle. These functions are justified here because these events fire
131+
// very infrequently. DO NOT MODIFY this to perform frequent operations.
132+
133+
// Start a loop to periodically check if any nodes have been
134+
// deleted or shutdown from the cloudprovider
135+
wait.UntilWithContext(ctx, c.MonitorNodes, c.nodeMonitorPeriod)
136+
}
137+
138+
// MonitorNodes checks to see if nodes in the cluster have been deleted
139+
// or shutdown. If deleted, it deletes the node resource. If shutdown it
140+
// applies a shutdown taint to the node.
141+
//
142+
//nolint:funlen,cyclop // copied from upstream
143+
func (c *CloudNodeLifecycleController) MonitorNodes(ctx context.Context) {
144+
nodes, err := c.nodeLister.List(labels.Everything())
145+
if err != nil {
146+
klog.Errorf("error listing nodes from cache: %s", err)
147+
148+
return
149+
}
150+
151+
for _, node := range nodes {
152+
// Default NodeReady status to v1.ConditionUnknown
153+
status := v1.ConditionUnknown
154+
if _, c := nodeutil.GetNodeCondition(&node.Status, v1.NodeReady); c != nil {
155+
status = c.Status
156+
}
157+
158+
if status == v1.ConditionTrue {
159+
// if taint exist remove taint
160+
err = cloudnodeutil.RemoveTaintOffNode(c.kubeClient, node.Name, node, ShutdownTaint)
161+
if err != nil {
162+
klog.Errorf("error patching node taints: %v", err)
163+
}
164+
165+
continue
166+
}
167+
168+
// At this point the node has NotReady status, we need to check if the node has been removed
169+
// from the cloud provider. If node cannot be found in cloudprovider, then delete the node
170+
exists, err := c.ensureNodeExistsByProviderID(ctx, node)
171+
if err != nil {
172+
klog.Errorf("error checking if node %s exists: %v", node.Name, err)
173+
174+
continue
175+
}
176+
177+
//nolint:nestif // not that complex
178+
if !exists {
179+
// Current node does not exist, we should delete it, its taints do not matter anymore
180+
181+
klog.V(2).Infof("deleting node since it is no longer present in cloud provider: %s", node.Name)
182+
183+
ref := &v1.ObjectReference{
184+
Kind: "Node",
185+
Name: node.Name,
186+
UID: node.UID,
187+
Namespace: "",
188+
}
189+
190+
c.recorder.Eventf(ref, v1.EventTypeNormal, deleteNodeEvent,
191+
"Deleting node %s because it does not exist in the cloud provider", node.Name)
192+
193+
err := CleanUpVolumeAttachmentsForNode(ctx, c.kubeClient, node.Name)
194+
if err != nil {
195+
klog.Errorf("failed to clean up volume attachments for node %s: %v", node.Name, err)
196+
}
197+
198+
if err := c.kubeClient.CoreV1().Nodes().Delete(ctx, node.Name, metav1.DeleteOptions{}); err != nil {
199+
klog.Errorf("unable to delete node %q: %v", node.Name, err)
200+
}
201+
} else {
202+
// Node exists. We need to check this to get taint working in similar in all cloudproviders
203+
// current problem is that shutdown nodes are not working in similar way ie. all cloudproviders
204+
// does not delete node from kubernetes cluster when instance it is shutdown see issue #46442
205+
shutdown, err := c.shutdownInCloudProvider(ctx, node)
206+
if err != nil {
207+
klog.Errorf("error checking if node %s is shutdown: %v", node.Name, err)
208+
}
209+
210+
if shutdown && err == nil {
211+
// if node is shutdown add shutdown taint
212+
err = cloudnodeutil.AddOrUpdateTaintOnNode(c.kubeClient, node.Name, ShutdownTaint)
213+
if err != nil {
214+
klog.Errorf("failed to apply shutdown taint to node %s, it may have been deleted.", node.Name)
215+
}
216+
}
217+
}
218+
}
219+
}
220+
221+
// getProviderID returns the provider ID for the node. If Node CR has no provider ID,
222+
// it will be the one from the cloud provider.
223+
func (c *CloudNodeLifecycleController) getProviderID(ctx context.Context, node *v1.Node) (string, error) {
224+
if node.Spec.ProviderID != "" {
225+
return node.Spec.ProviderID, nil
226+
}
227+
228+
if instanceV2, ok := c.cloud.InstancesV2(); ok {
229+
metadata, err := instanceV2.InstanceMetadata(ctx, node)
230+
if err != nil {
231+
return "", fmt.Errorf("failed to get instance metadata: %w", err)
232+
}
233+
234+
return metadata.ProviderID, nil
235+
}
236+
237+
providerID, err := cloudprovider.GetInstanceProviderID(ctx, c.cloud, types.NodeName(node.Name))
238+
if err != nil {
239+
return "", fmt.Errorf("failed to get instance provider id: %w", err)
240+
}
241+
242+
return providerID, nil
243+
}
244+
245+
// shutdownInCloudProvider returns true if the node is shutdown on the cloud provider.
246+
func (c *CloudNodeLifecycleController) shutdownInCloudProvider(ctx context.Context, node *v1.Node) (bool, error) {
247+
if instanceV2, ok := c.cloud.InstancesV2(); ok {
248+
shutDown, err := instanceV2.InstanceShutdown(ctx, node)
249+
250+
return shutDown, fmt.Errorf("failed to get instance shutdown status: %w", err)
251+
}
252+
253+
instances, ok := c.cloud.Instances()
254+
if !ok {
255+
return false, errors.New("cloud provider does not support instances")
256+
}
257+
258+
providerID, err := c.getProviderID(ctx, node)
259+
if err != nil {
260+
if errors.Is(err, cloudprovider.InstanceNotFound) {
261+
return false, nil
262+
}
263+
264+
return false, fmt.Errorf("failed to get instance provider id: %w", err)
265+
}
266+
267+
shutdown, err := instances.InstanceShutdownByProviderID(ctx, providerID)
268+
if err != nil {
269+
if errors.Is(err, cloudprovider.NotImplemented) {
270+
return false, nil
271+
}
272+
273+
return false, fmt.Errorf("failed to get instance shutdown status: %w", err)
274+
}
275+
276+
return shutdown, nil
277+
}
278+
279+
// ensureNodeExistsByProviderID checks if the instance exists by the provider id,.
280+
func (c *CloudNodeLifecycleController) ensureNodeExistsByProviderID(ctx context.Context, node *v1.Node) (bool, error) {
281+
if instanceV2, ok := c.cloud.InstancesV2(); ok {
282+
exists, err := instanceV2.InstanceExists(ctx, node)
283+
284+
return exists, fmt.Errorf("failed to get instance existence: %w", err)
285+
}
286+
287+
instances, ok := c.cloud.Instances()
288+
if !ok {
289+
return false, errors.New("instances interface not supported in the cloud provider")
290+
}
291+
292+
providerID, err := c.getProviderID(ctx, node)
293+
if err != nil {
294+
if errors.Is(err, cloudprovider.InstanceNotFound) {
295+
return false, nil
296+
}
297+
298+
return false, fmt.Errorf("failed to get instance provider id: %w", err)
299+
}
300+
301+
exists, err := instances.InstanceExistsByProviderID(ctx, providerID)
302+
if err != nil {
303+
return exists, fmt.Errorf("failed to get instance existence by provider ID: %w", err)
304+
}
305+
306+
return exists, nil
307+
}

0 commit comments

Comments
 (0)