Skip to content

Commit c87cfdf

Browse files
committed
Extract operations into internal pkg
This reduces the global variables, and regroups all the operations in a single place. This will allow further refactor to represent all the k8s operations kured needs on a single node. Signed-off-by: Jean-Philippe Evrard <open-source@a.spamming.party>
1 parent 6114490 commit c87cfdf

File tree

5 files changed

+241
-208
lines changed

5 files changed

+241
-208
lines changed

cmd/kured/main.go

Lines changed: 9 additions & 205 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,10 @@ package main
22

33
import (
44
"context"
5-
"encoding/json"
65
"fmt"
76
"github.com/kubereboot/kured/internal/daemonsetlock"
7+
"github.com/kubereboot/kured/internal/k8soperations"
88
"github.com/kubereboot/kured/internal/notifications"
9-
"github.com/kubereboot/kured/internal/taints"
109
"github.com/kubereboot/kured/internal/timewindow"
1110
"github.com/kubereboot/kured/pkg/blockers"
1211
"github.com/kubereboot/kured/pkg/checkers"
@@ -17,10 +16,8 @@ import (
1716
flag "github.com/spf13/pflag"
1817
v1 "k8s.io/api/core/v1"
1918
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
20-
"k8s.io/apimachinery/pkg/types"
2119
"k8s.io/client-go/kubernetes"
2220
"k8s.io/client-go/rest"
23-
kubectldrain "k8s.io/kubectl/pkg/drain"
2421
"log"
2522
"log/slog"
2623
"net/http"
@@ -92,8 +89,7 @@ var (
9289

9390
const (
9491
// KuredNodeLockAnnotation is the canonical string value for the kured node-lock annotation
95-
KuredNodeLockAnnotation string = "kured.dev/kured-node-lock"
96-
KuredNodeWasUnschedulableBeforeDrainAnnotation string = "kured.dev/node-unschedulable-before-drain"
92+
KuredNodeLockAnnotation string = "kured.dev/kured-node-lock"
9793
// KuredRebootInProgressAnnotation is the canonical string value for the kured reboot-in-progress annotation
9894
KuredRebootInProgressAnnotation string = "kured.dev/kured-reboot-in-progress"
9995
// KuredMostRecentRebootNeededAnnotation is the canonical string value for the kured most-recent-reboot-needed annotation
@@ -338,202 +334,9 @@ func LoadFromEnv() {
338334

339335
}
340336

341-
type slogWriter struct {
342-
stream string
343-
message string
344-
}
345-
346-
func (sw slogWriter) Write(p []byte) (n int, err error) {
347-
output := string(p)
348-
switch sw.stream {
349-
case "stdout":
350-
slog.Info(sw.message, "node", nodeID, "stdout", output)
351-
case "stderr":
352-
slog.Info(sw.message, "node", nodeID, "stderr", output)
353-
}
354-
return len(p), nil
355-
}
356-
357-
func drain(client *kubernetes.Clientset, node *v1.Node, notifier notifications.Notifier) error {
358-
nodename := node.GetName()
359-
360-
if preRebootNodeLabels != nil {
361-
err := updateNodeLabels(client, node, preRebootNodeLabels)
362-
if err != nil {
363-
return fmt.Errorf("stopping drain due to problem with node labels %v", err)
364-
}
365-
}
366-
367-
if drainDelay > 0 {
368-
slog.Debug("Delaying drain", "delay", drainDelay, "node", nodename)
369-
time.Sleep(drainDelay)
370-
}
371-
372-
slog.Info("Starting drain", "node", nodename)
373-
374-
notifier.Send(fmt.Sprintf(messageTemplateDrain, nodename), "Starting drain")
375-
376-
kubectlStdOutLogger := &slogWriter{message: "draining: results", stream: "stdout"}
377-
kubectlStdErrLogger := &slogWriter{message: "draining: results", stream: "stderr"}
378-
379-
drainer := &kubectldrain.Helper{
380-
Client: client,
381-
Ctx: context.Background(),
382-
GracePeriodSeconds: drainGracePeriod,
383-
PodSelector: drainPodSelector,
384-
SkipWaitForDeleteTimeoutSeconds: skipWaitForDeleteTimeoutSeconds,
385-
Force: true,
386-
DeleteEmptyDirData: true,
387-
IgnoreAllDaemonSets: true,
388-
ErrOut: kubectlStdErrLogger,
389-
Out: kubectlStdOutLogger,
390-
Timeout: drainTimeout,
391-
}
392-
393-
// Add previous state of the node Spec.Unschedulable into an annotation
394-
// If an annotation was present, it means that either the cordon or drain failed,
395-
// hence it does not need to reapply: It might override what the user has set
396-
// (for example if the cordon succeeded but the drain failed)
397-
if _, ok := node.Annotations[KuredNodeWasUnschedulableBeforeDrainAnnotation]; !ok {
398-
// Store State of the node before cordon changes it
399-
annotations := map[string]string{KuredNodeWasUnschedulableBeforeDrainAnnotation: strconv.FormatBool(node.Spec.Unschedulable)}
400-
// & annotate this node with a timestamp so that other node maintenance tools know how long it's been since this node has been marked for reboot
401-
err := addNodeAnnotations(client, nodeID, annotations)
402-
if err != nil {
403-
return fmt.Errorf("error saving state of the node %s, %v", nodename, err)
404-
}
405-
}
406-
407-
if err := kubectldrain.RunCordonOrUncordon(drainer, node, true); err != nil {
408-
return fmt.Errorf("error cordonning node %s, %v", nodename, err)
409-
}
410-
411-
if err := kubectldrain.RunNodeDrain(drainer, nodename); err != nil {
412-
return fmt.Errorf("error draining node %s: %v", nodename, err)
413-
}
414-
return nil
415-
}
416-
417-
func uncordon(client *kubernetes.Clientset, node *v1.Node, notifier notifications.Notifier) error {
418-
// Revert cordon spec change with the help of node annotation
419-
annotationContent, ok := node.Annotations[KuredNodeWasUnschedulableBeforeDrainAnnotation]
420-
if !ok {
421-
// If no node annotations, uncordon will not act.
422-
// Do not uncordon if you do not know previous state, it could bring nodes under maintenance online!
423-
return nil
424-
}
425-
426-
wasUnschedulable, err := strconv.ParseBool(annotationContent)
427-
if err != nil {
428-
return fmt.Errorf("annotation was edited and cannot be converted back to bool %v, cannot uncordon (unrecoverable)", err)
429-
}
430-
431-
if wasUnschedulable {
432-
// Just delete the annotation, keep Cordonned
433-
err := deleteNodeAnnotation(client, nodeID, KuredNodeWasUnschedulableBeforeDrainAnnotation)
434-
if err != nil {
435-
return fmt.Errorf("error removing the WasUnschedulable annotation, keeping the node stuck in cordonned state forever %v", err)
436-
}
437-
return nil
438-
}
439-
440-
nodeName := node.GetName()
441-
kubectlStdOutLogger := &slogWriter{message: "uncordon: results", stream: "stdout"}
442-
kubectlStdErrLogger := &slogWriter{message: "uncordon: results", stream: "stderr"}
443-
444-
drainer := &kubectldrain.Helper{
445-
Client: client,
446-
ErrOut: kubectlStdErrLogger,
447-
Out: kubectlStdOutLogger,
448-
Ctx: context.Background(),
449-
}
450-
if err := kubectldrain.RunCordonOrUncordon(drainer, node, false); err != nil {
451-
return fmt.Errorf("error uncordonning node %s: %v", nodeName, err)
452-
} else if postRebootNodeLabels != nil {
453-
err := updateNodeLabels(client, node, postRebootNodeLabels)
454-
return fmt.Errorf("error updating node (%s) labels, needs manual intervention %v", nodeName, err)
455-
}
456-
457-
err = deleteNodeAnnotation(client, nodeID, KuredNodeWasUnschedulableBeforeDrainAnnotation)
458-
if err != nil {
459-
return fmt.Errorf("error removing the WasUnschedulable annotation, keeping the node stuck in current state forever %v", err)
460-
}
461-
notifier.Send(fmt.Sprintf(messageTemplateUncordon, nodeID), "Node uncordonned successfully")
462-
return nil
463-
}
464-
465-
func addNodeAnnotations(client *kubernetes.Clientset, nodeID string, annotations map[string]string) error {
466-
node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeID, metav1.GetOptions{})
467-
if err != nil {
468-
return fmt.Errorf("error retrieving node object via k8s API: %v", err)
469-
}
470-
for k, v := range annotations {
471-
node.Annotations[k] = v
472-
slog.Debug(fmt.Sprintf("adding node annotation: %s=%s", k, v), "node", node.GetName())
473-
}
474-
475-
bytes, err := json.Marshal(node)
476-
if err != nil {
477-
return fmt.Errorf("error marshalling node object into JSON: %v", err)
478-
}
479-
480-
_, err = client.CoreV1().Nodes().Patch(context.TODO(), node.GetName(), types.StrategicMergePatchType, bytes, metav1.PatchOptions{})
481-
if err != nil {
482-
var annotationsErr string
483-
for k, v := range annotations {
484-
annotationsErr += fmt.Sprintf("%s=%s ", k, v)
485-
}
486-
return fmt.Errorf("error adding node annotations %s via k8s API: %v", annotationsErr, err)
487-
}
488-
return nil
489-
}
490-
491-
func deleteNodeAnnotation(client *kubernetes.Clientset, nodeID, key string) error {
492-
// JSON Patch takes as path input a JSON Pointer, defined in RFC6901
493-
// So we replace all instances of "/" with "~1" as per:
494-
// https://tools.ietf.org/html/rfc6901#section-3
495-
patch := []byte(fmt.Sprintf("[{\"op\":\"remove\",\"path\":\"/metadata/annotations/%s\"}]", strings.ReplaceAll(key, "/", "~1")))
496-
_, err := client.CoreV1().Nodes().Patch(context.TODO(), nodeID, types.JSONPatchType, patch, metav1.PatchOptions{})
497-
if err != nil {
498-
return fmt.Errorf("error deleting node annotation %s via k8s API: %v", key, err)
499-
}
500-
return nil
501-
}
502-
503-
func updateNodeLabels(client *kubernetes.Clientset, node *v1.Node, labels []string) error {
504-
labelsMap := make(map[string]string)
505-
for _, label := range labels {
506-
k := strings.Split(label, "=")[0]
507-
v := strings.Split(label, "=")[1]
508-
labelsMap[k] = v
509-
slog.Debug(fmt.Sprintf("Updating node %s label: %s=%s", node.GetName(), k, v), "node", node.GetName())
510-
}
511-
512-
bytes, err := json.Marshal(map[string]interface{}{
513-
"metadata": map[string]interface{}{
514-
"labels": labelsMap,
515-
},
516-
})
517-
if err != nil {
518-
return fmt.Errorf("error marshalling node object into JSON: %v", err)
519-
}
520-
521-
_, err = client.CoreV1().Nodes().Patch(context.TODO(), node.GetName(), types.StrategicMergePatchType, bytes, metav1.PatchOptions{})
522-
if err != nil {
523-
var labelsErr string
524-
for _, label := range labels {
525-
k := strings.Split(label, "=")[0]
526-
v := strings.Split(label, "=")[1]
527-
labelsErr += fmt.Sprintf("%s=%s ", k, v)
528-
}
529-
return fmt.Errorf("error updating node labels %s via k8s API: %v", labelsErr, err)
530-
}
531-
return nil
532-
}
533-
534337
func rebootAsRequired(nodeID string, rebooter reboot.Rebooter, checker checkers.Checker, blockCheckers []blockers.RebootBlocker, window *timewindow.TimeWindow, lock daemonsetlock.Lock, client *kubernetes.Clientset, period time.Duration, notifier notifications.Notifier) {
535338

536-
preferNoScheduleTaint := taints.New(client, nodeID, preferNoScheduleTaintName, v1.TaintEffectPreferNoSchedule)
339+
preferNoScheduleTaint := k8soperations.NewTaint(client, nodeID, preferNoScheduleTaintName, v1.TaintEffectPreferNoSchedule)
537340

538341
// No reason to delay the first ticks.
539342
// On top of it, we used to leak a goroutine, which was never garbage collected.
@@ -558,7 +361,7 @@ func rebootAsRequired(nodeID string, rebooter reboot.Rebooter, checker checkers.
558361
continue
559362
}
560363

561-
err = uncordon(client, node, notifier)
364+
err = k8soperations.Uncordon(client, node, notifier, postRebootNodeLabels, messageTemplateUncordon)
562365
if err != nil {
563366
// Might be a transient API issue or a real problem. Inform the admin
564367
slog.Info("unable to uncordon needs investigation", "node", nodeID, "error", err)
@@ -578,7 +381,7 @@ func rebootAsRequired(nodeID string, rebooter reboot.Rebooter, checker checkers.
578381
if _, ok := node.Annotations[KuredRebootInProgressAnnotation]; ok {
579382
// Who reads this? I hope nobody bothers outside real debug cases
580383
slog.Debug(fmt.Sprintf("Deleting node %s annotation %s", nodeID, KuredRebootInProgressAnnotation), "node", nodeID)
581-
err := deleteNodeAnnotation(client, nodeID, KuredRebootInProgressAnnotation)
384+
err := k8soperations.DeleteNodeAnnotation(client, nodeID, KuredRebootInProgressAnnotation)
582385
if err != nil {
583386
continue
584387
}
@@ -626,7 +429,7 @@ func rebootAsRequired(nodeID string, rebooter reboot.Rebooter, checker checkers.
626429
annotations := map[string]string{KuredRebootInProgressAnnotation: timeNowString}
627430
// & annotate this node with a timestamp so that other node maintenance tools know how long it's been since this node has been marked for reboot
628431
annotations[KuredMostRecentRebootNeededAnnotation] = timeNowString
629-
err := addNodeAnnotations(client, nodeID, annotations)
432+
err := k8soperations.AddNodeAnnotations(client, nodeID, annotations)
630433
if err != nil {
631434
continue
632435
}
@@ -663,7 +466,8 @@ func rebootAsRequired(nodeID string, rebooter reboot.Rebooter, checker checkers.
663466
// }
664467
//}
665468

666-
err = drain(client, node, notifier)
469+
err = k8soperations.Drain(client, node, preRebootNodeLabels, drainTimeout, drainGracePeriod, skipWaitForDeleteTimeoutSeconds, drainPodSelector, drainDelay, messageTemplateDrain, notifier)
470+
667471
if err != nil {
668472
if !forceReboot {
669473
slog.Debug(fmt.Sprintf("Unable to cordon or drain %s: %v, will force-reboot by releasing lock and uncordon until next success", node.GetName(), err), "node", nodeID, "error", err)
@@ -676,7 +480,7 @@ func rebootAsRequired(nodeID string, rebooter reboot.Rebooter, checker checkers.
676480
// If shown, it is helping understand the uncordonning. If the admin seems the node as cordonned
677481
// with this, it needs to take action (for example if the node was previously cordonned!)
678482
slog.Info("Performing a best-effort uncordon after failed cordon and drain", "node", nodeID)
679-
err := uncordon(client, node, notifier)
483+
err := k8soperations.Uncordon(client, node, notifier, postRebootNodeLabels, messageTemplateUncordon)
680484
if err != nil {
681485
slog.Info("Uncordon failed", "error", err)
682486
}

internal/k8soperations/annotations.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package k8soperations
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"k8s.io/api/core/v1"
8+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
9+
"k8s.io/apimachinery/pkg/types"
10+
"k8s.io/client-go/kubernetes"
11+
"log/slog"
12+
"strings"
13+
)
14+
15+
const (
16+
KuredNodeWasUnschedulableBeforeDrainAnnotation string = "kured.dev/node-unschedulable-before-drain"
17+
)
18+
19+
func AddNodeAnnotations(client *kubernetes.Clientset, nodeID string, annotations map[string]string) error {
20+
node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeID, metav1.GetOptions{})
21+
if err != nil {
22+
return fmt.Errorf("error retrieving node object via k8s API: %v", err)
23+
}
24+
for k, v := range annotations {
25+
node.Annotations[k] = v
26+
slog.Debug(fmt.Sprintf("adding node annotation: %s=%s", k, v), "node", node.GetName())
27+
}
28+
29+
bytes, err := json.Marshal(node)
30+
if err != nil {
31+
return fmt.Errorf("error marshalling node object into JSON: %v", err)
32+
}
33+
34+
_, err = client.CoreV1().Nodes().Patch(context.TODO(), node.GetName(), types.StrategicMergePatchType, bytes, metav1.PatchOptions{})
35+
if err != nil {
36+
var annotationsErr string
37+
for k, v := range annotations {
38+
annotationsErr += fmt.Sprintf("%s=%s ", k, v)
39+
}
40+
return fmt.Errorf("error adding node annotations %s via k8s API: %v", annotationsErr, err)
41+
}
42+
return nil
43+
}
44+
45+
func DeleteNodeAnnotation(client *kubernetes.Clientset, nodeID, key string) error {
46+
// JSON Patch takes as path input a JSON Pointer, defined in RFC6901
47+
// So we replace all instances of "/" with "~1" as per:
48+
// https://tools.ietf.org/html/rfc6901#section-3
49+
patch := []byte(fmt.Sprintf("[{\"op\":\"remove\",\"path\":\"/metadata/annotations/%s\"}]", strings.ReplaceAll(key, "/", "~1")))
50+
_, err := client.CoreV1().Nodes().Patch(context.TODO(), nodeID, types.JSONPatchType, patch, metav1.PatchOptions{})
51+
if err != nil {
52+
return fmt.Errorf("error deleting node annotation %s via k8s API: %v", key, err)
53+
}
54+
return nil
55+
}
56+
57+
func updateNodeLabels(client *kubernetes.Clientset, node *v1.Node, labels []string) error {
58+
labelsMap := make(map[string]string)
59+
for _, label := range labels {
60+
k := strings.Split(label, "=")[0]
61+
v := strings.Split(label, "=")[1]
62+
labelsMap[k] = v
63+
slog.Debug(fmt.Sprintf("Updating node %s label: %s=%s", node.GetName(), k, v), "node", node.GetName())
64+
}
65+
66+
bytes, err := json.Marshal(map[string]interface{}{
67+
"metadata": map[string]interface{}{
68+
"labels": labelsMap,
69+
},
70+
})
71+
if err != nil {
72+
return fmt.Errorf("error marshalling node object into JSON: %v", err)
73+
}
74+
75+
_, err = client.CoreV1().Nodes().Patch(context.TODO(), node.GetName(), types.StrategicMergePatchType, bytes, metav1.PatchOptions{})
76+
if err != nil {
77+
var labelsErr string
78+
for _, label := range labels {
79+
k := strings.Split(label, "=")[0]
80+
v := strings.Split(label, "=")[1]
81+
labelsErr += fmt.Sprintf("%s=%s ", k, v)
82+
}
83+
return fmt.Errorf("error updating node labels %s via k8s API: %v", labelsErr, err)
84+
}
85+
return nil
86+
}

0 commit comments

Comments
 (0)