Skip to content

Commit f4f6f99

Browse files
Support pause rollout (#780)
* Support pause rollout * Observer StatefulSet first * Update status * Expose pendingChange * Update diff * Fix livenessrobe compare
1 parent 45d304c commit f4f6f99

File tree

21 files changed

+126
-12
lines changed

21 files changed

+126
-12
lines changed

.ci/tests/integration/cases/java-download-function/manifests.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
apiVersion: compute.functionmesh.io/v1alpha1
22
kind: Function
33
metadata:
4+
annotations:
5+
compute.functionmesh.io/pause-rollout: "false"
46
name: function-download-sample
57
namespace: default
68
spec:

.ci/tests/integration/cases/java-function-vpa/manifests.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
apiVersion: compute.functionmesh.io/v1alpha1
22
kind: Function
33
metadata:
4+
annotations:
5+
compute.functionmesh.io/pause-rollout: "true"
46
name: function-sample-vpa
57
namespace: default
68
spec:

.ci/tests/integration/cases/java-function/manifests.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
apiVersion: compute.functionmesh.io/v1alpha1
22
kind: Function
33
metadata:
4+
annotations:
5+
compute.functionmesh.io/pause-rollout: "true"
46
name: function-sample
57
namespace: default
68
spec:

.ci/tests/integration/cases/java-log-config/manifests.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
apiVersion: compute.functionmesh.io/v1alpha1
22
kind: Function
33
metadata:
4+
annotations:
5+
compute.functionmesh.io/pause-rollout: "true"
46
name: java-log-config
57
namespace: default
68
spec:

.ci/tests/integration/cases/java-log-format-json/manifests.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
apiVersion: compute.functionmesh.io/v1alpha1
22
kind: Function
33
metadata:
4+
annotations:
5+
compute.functionmesh.io/pause-rollout: "false"
46
name: java-log-format-json
57
namespace: default
68
spec:

api/compute/v1alpha1/function_types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ type FunctionStatus struct {
125125
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
126126
GlobalBackendConfigRevision string `json:"globalBackendConfigRevision,omitempty"`
127127
NamespacedBackendConfigRevision string `json:"namespacedBackendConfigRevision,omitempty"`
128+
PendingChange string `json:"pendingChange,omitempty"`
128129
}
129130

130131
// +genclient

api/compute/v1alpha1/sink_types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ type SinkStatus struct {
115115
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
116116
GlobalBackendConfigRevision string `json:"globalBackendConfigRevision,omitempty"`
117117
NamespacedBackendConfigRevision string `json:"namespacedBackendConfigRevision,omitempty"`
118+
PendingChange string `json:"pendingChange,omitempty"`
118119
}
119120

120121
// +genclient

api/compute/v1alpha1/source_types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ type SourceStatus struct {
120120
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
121121
GlobalBackendConfigRevision string `json:"globalBackendConfigRevision,omitempty"`
122122
NamespacedBackendConfigRevision string `json:"namespacedBackendConfigRevision,omitempty"`
123+
PendingChange string `json:"pendingChange,omitempty"`
123124
}
124125

125126
// +genclient

charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functions.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3846,6 +3846,8 @@ spec:
38463846
observedGeneration:
38473847
format: int64
38483848
type: integer
3849+
pendingChange:
3850+
type: string
38493851
replicas:
38503852
format: int32
38513853
type: integer

charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-sinks.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3565,6 +3565,8 @@ spec:
35653565
observedGeneration:
35663566
format: int64
35673567
type: integer
3568+
pendingChange:
3569+
type: string
35683570
replicas:
35693571
format: int32
35703572
type: integer

charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-sources.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3546,6 +3546,8 @@ spec:
35463546
observedGeneration:
35473547
format: int64
35483548
type: integer
3549+
pendingChange:
3550+
type: string
35493551
replicas:
35503552
format: int32
35513553
type: integer

config/crd/bases/compute.functionmesh.io_functions.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3824,6 +3824,8 @@ spec:
38243824
observedGeneration:
38253825
format: int64
38263826
type: integer
3827+
pendingChange:
3828+
type: string
38273829
replicas:
38283830
format: int32
38293831
type: integer

config/crd/bases/compute.functionmesh.io_sinks.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3543,6 +3543,8 @@ spec:
35433543
observedGeneration:
35443544
format: int64
35453545
type: integer
3546+
pendingChange:
3547+
type: string
35463548
replicas:
35473549
format: int32
35483550
type: integer

config/crd/bases/compute.functionmesh.io_sources.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3524,6 +3524,8 @@ spec:
35243524
observedGeneration:
35253525
format: int64
35263526
type: integer
3527+
pendingChange:
3528+
type: string
35273529
replicas:
35283530
format: int32
35293531
type: integer

controllers/function.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,15 @@ func (r *FunctionReconciler) checkIfStatefulSetNeedUpdate(ctx context.Context, s
429429
if err != nil {
430430
return false, err
431431
}
432-
return !spec.CheckIfStatefulSetSpecIsEqual(&statefulSet.Spec, &desiredStatefulSet.Spec), nil
432+
needUpdate := !spec.CheckIfStatefulSetSpecIsEqual(&statefulSet.Spec, &desiredStatefulSet.Spec)
433+
if needUpdate {
434+
diff, err := spec.CreateDiff(statefulSet, desiredStatefulSet)
435+
if err != nil {
436+
return needUpdate, err
437+
}
438+
function.Status.PendingChange = diff
439+
}
440+
return needUpdate, nil
433441
}
434442

435443
func (r *FunctionReconciler) checkIfHPANeedUpdate(hpa *autov2.HorizontalPodAutoscaler,

controllers/function_controller.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,10 +99,24 @@ func (r *FunctionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
9999
function.Status.Conditions = make(map[v1alpha1.Component]v1alpha1.ResourceCondition)
100100
}
101101

102+
isNewGeneration := r.checkIfFunctionGenerationsIsIncreased(function)
103+
102104
err = r.ObserveFunctionStatefulSet(ctx, function)
103105
if err != nil {
104106
return reconcile.Result{}, err
105107
}
108+
// skip reconcile if pauseRollout is set to true and the generation is not increased
109+
if spec.IsPauseRollout(function) && !isNewGeneration {
110+
err = r.Status().Update(ctx, function)
111+
if err != nil {
112+
r.Log.Error(err, "failed to update function status after observing statefulset")
113+
return ctrl.Result{}, err
114+
}
115+
return ctrl.Result{}, nil
116+
} else {
117+
function.Status.PendingChange = ""
118+
}
119+
106120
err = r.ObserveFunctionService(ctx, function)
107121
if err != nil {
108122
return reconcile.Result{}, err
@@ -130,8 +144,6 @@ func (r *FunctionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
130144
return ctrl.Result{}, err
131145
}
132146

133-
isNewGeneration := r.checkIfFunctionGenerationsIsIncreased(function)
134-
135147
err = r.ApplyFunctionStatefulSet(ctx, function, isNewGeneration)
136148
if err != nil {
137149
return reconcile.Result{}, err

controllers/sink.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -425,7 +425,15 @@ func (r *SinkReconciler) checkIfStatefulSetNeedUpdate(ctx context.Context, state
425425
if err != nil {
426426
return false, err
427427
}
428-
return !spec.CheckIfStatefulSetSpecIsEqual(&statefulSet.Spec, &desiredStatefulSet.Spec), nil
428+
needUpdate := !spec.CheckIfStatefulSetSpecIsEqual(&statefulSet.Spec, &desiredStatefulSet.Spec)
429+
if needUpdate {
430+
diff, err := spec.CreateDiff(statefulSet, desiredStatefulSet)
431+
if err != nil {
432+
return needUpdate, err
433+
}
434+
sink.Status.PendingChange = diff
435+
}
436+
return needUpdate, nil
429437
}
430438

431439
func (r *SinkReconciler) checkIfHPANeedUpdate(hpa *autov2.HorizontalPodAutoscaler, sink *v1alpha1.Sink) bool {

controllers/sink_controller.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,10 +98,24 @@ func (r *SinkReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
9898
sink.Status.Conditions = make(map[v1alpha1.Component]v1alpha1.ResourceCondition)
9999
}
100100

101+
isNewGeneration := r.checkIfSinkGenerationsIsIncreased(sink)
102+
101103
err = r.ObserveSinkStatefulSet(ctx, sink)
102104
if err != nil {
103105
return reconcile.Result{}, err
104106
}
107+
// skip reconcile if pauseRollout is set to true and the generation is not increased
108+
if spec.IsPauseRollout(sink) && !isNewGeneration {
109+
err = r.Status().Update(ctx, sink)
110+
if err != nil {
111+
r.Log.Error(err, "failed to update sink status after observing statefulset")
112+
return ctrl.Result{}, err
113+
}
114+
return ctrl.Result{}, nil
115+
} else {
116+
sink.Status.PendingChange = ""
117+
}
118+
105119
err = r.ObserveSinkService(ctx, sink)
106120
if err != nil {
107121
return reconcile.Result{}, err
@@ -129,8 +143,6 @@ func (r *SinkReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
129143
return ctrl.Result{}, err
130144
}
131145

132-
isNewGeneration := r.checkIfSinkGenerationsIsIncreased(sink)
133-
134146
err = r.ApplySinkStatefulSet(ctx, sink, isNewGeneration)
135147
if err != nil {
136148
return reconcile.Result{}, err

controllers/source.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,15 @@ func (r *SourceReconciler) checkIfStatefulSetNeedUpdate(ctx context.Context, sta
427427
if err != nil {
428428
return false, err
429429
}
430-
return !spec.CheckIfStatefulSetSpecIsEqual(&statefulSet.Spec, &desiredStatefulSet.Spec), nil
430+
needUpdate := !spec.CheckIfStatefulSetSpecIsEqual(&statefulSet.Spec, &desiredStatefulSet.Spec)
431+
if needUpdate {
432+
diff, err := spec.CreateDiff(statefulSet, desiredStatefulSet)
433+
if err != nil {
434+
return needUpdate, err
435+
}
436+
source.Status.PendingChange = diff
437+
}
438+
return needUpdate, nil
431439
}
432440

433441
func (r *SourceReconciler) checkIfHPANeedUpdate(hpa *autov2.HorizontalPodAutoscaler, source *v1alpha1.Source) bool {

controllers/source_controller.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,10 +98,24 @@ func (r *SourceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
9898
source.Status.Conditions = make(map[v1alpha1.Component]v1alpha1.ResourceCondition)
9999
}
100100

101+
isNewGeneration := r.checkIfSourceGenerationsIsIncreased(source)
102+
101103
err = r.ObserveSourceStatefulSet(ctx, source)
102104
if err != nil {
103105
return reconcile.Result{}, err
104106
}
107+
// skip reconcile if pauseRollout is set to true and the generation is not increased
108+
if spec.IsPauseRollout(source) && !isNewGeneration {
109+
err = r.Status().Update(ctx, source)
110+
if err != nil {
111+
r.Log.Error(err, "failed to update source status after observing statefulset")
112+
return ctrl.Result{}, err
113+
}
114+
return ctrl.Result{}, nil
115+
} else {
116+
source.Status.PendingChange = ""
117+
}
118+
105119
err = r.ObserveSourceService(ctx, source)
106120
if err != nil {
107121
return reconcile.Result{}, err
@@ -129,8 +143,6 @@ func (r *SourceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
129143
return ctrl.Result{}, err
130144
}
131145

132-
isNewGeneration := r.checkIfSourceGenerationsIsIncreased(source)
133-
134146
err = r.ApplySourceStatefulSet(ctx, source, isNewGeneration)
135147
if err != nil {
136148
return reconcile.Result{}, err

controllers/spec/common.go

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import (
4242
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
4343
"k8s.io/apimachinery/pkg/types"
4444
"k8s.io/apimachinery/pkg/util/intstr"
45+
"k8s.io/apimachinery/pkg/util/strategicpatch"
4546
"k8s.io/client-go/kubernetes/scheme"
4647
"k8s.io/client-go/rest"
4748
"k8s.io/client-go/tools/remotecommand"
@@ -99,6 +100,7 @@ const (
99100
AnnotationPrometheusScrape = "prometheus.io/scrape"
100101
AnnotationPrometheusPort = "prometheus.io/port"
101102
AnnotationManaged = "compute.functionmesh.io/managed"
103+
AnnotationPauseRollout = "compute.functionmesh.io/pause-rollout"
102104
AnnotationNeedCleanup = "compute.functionmesh.io/need-cleanup"
103105

104106
// if labels contains below, we think it comes from function-mesh-worker-service
@@ -171,6 +173,11 @@ func IsManaged(object metav1.Object) bool {
171173
return !exists || managed != "false"
172174
}
173175

176+
func IsPauseRollout(object metav1.Object) bool {
177+
pauseRollout, exists := object.GetAnnotations()[AnnotationPauseRollout]
178+
return exists && pauseRollout == "true"
179+
}
180+
174181
func NeedCleanup(object metav1.Object) bool {
175182
// don't cleanup if it's managed by function-mesh-worker-service
176183
_, exists := object.GetLabels()[LabelPulsarCluster]
@@ -509,8 +516,9 @@ func MakeLivenessProbe(liveness *v1alpha1.Liveness) *corev1.Probe {
509516
return &corev1.Probe{
510517
ProbeHandler: corev1.ProbeHandler{
511518
HTTPGet: &corev1.HTTPGetAction{
512-
Path: "/",
513-
Port: intstr.FromInt32(MetricsPort.ContainerPort),
519+
Path: "/",
520+
Port: intstr.FromInt32(MetricsPort.ContainerPort),
521+
Scheme: corev1.URISchemeHTTP,
514522
},
515523
},
516524
InitialDelaySeconds: initialDelay,
@@ -2080,7 +2088,7 @@ func CheckIfStatefulSetSpecIsEqual(spec *appsv1.StatefulSetSpec, desiredSpec *ap
20802088
if !reflect.DeepEqual(container.Command, desiredContainer.Command) ||
20812089
container.Image != desiredContainer.Image ||
20822090
container.ImagePullPolicy != desiredContainer.ImagePullPolicy ||
2083-
container.LivenessProbe != desiredContainer.LivenessProbe ||
2091+
!reflect.DeepEqual(container.LivenessProbe, desiredContainer.LivenessProbe) ||
20842092
!reflect.DeepEqual(ports, desiredPorts) ||
20852093
!reflect.DeepEqual(containerEnvFrom, desiredContainerEnvFrom) ||
20862094
!reflect.DeepEqual(container.Resources, desiredContainer.Resources) {
@@ -2368,3 +2376,24 @@ func makeFilebeatContainer(volumeMounts []corev1.VolumeMount, envVar []corev1.En
23682376
},
23692377
}
23702378
}
2379+
2380+
func CreateDiff(orj, modified *appsv1.StatefulSet) (string, error) {
2381+
orjCopy := orj.DeepCopyObject().(*appsv1.StatefulSet)
2382+
modifiedCopy := modified.DeepCopyObject().(*appsv1.StatefulSet)
2383+
modifiedCopy.Status = orjCopy.Status
2384+
modifiedCopy.ObjectMeta = orjCopy.ObjectMeta
2385+
2386+
orjData, err := json.Marshal(orjCopy)
2387+
if err != nil {
2388+
return "", fmt.Errorf("marshal origin %w", err)
2389+
}
2390+
modifiedData, err := json.Marshal(modifiedCopy)
2391+
if err != nil {
2392+
return "", fmt.Errorf("marshal modified %w", err)
2393+
}
2394+
patch, err := strategicpatch.CreateTwoWayMergePatch(orjData, modifiedData, orjCopy)
2395+
if err != nil {
2396+
return "", fmt.Errorf("create diff %w", err)
2397+
}
2398+
return string(patch), nil
2399+
}

0 commit comments

Comments
 (0)