Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor upgrade logic to skip pre upgrade job in case of patch revisions #138

Merged
merged 9 commits into from
Feb 14, 2025
1 change: 1 addition & 0 deletions controllers/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ const (
confTwillSecurityWorkerSecretDiskPath = "twill.security.worker.secret.disk.path"
confJMXServerPort = "jmx.metrics.collector.server.port"
confSecretMountDefaultMode = "secret.mount.default.mode"
confSkipPreUpgrade = "cdap-operator.preupgrade-job.skip"

// default values
defaultImage = "gcr.io/cdapio/cdap:latest"
Expand Down
165 changes: 90 additions & 75 deletions controllers/version_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,21 @@ func init() {
/////////////////////////////////////////////////////////////

func handleVersionUpdate(master *v1alpha1.CDAPMaster, labels map[string]string, observed []reconciler.Object) ([]reconciler.Object, error) {
curVersion, err := getCurrentImageVersion(master)
if err != nil {
return nil, err
}
newVersion, err := getNewImageVersion(master)
if err != nil {
return nil, err
}
versionComparison := compareVersion(curVersion, newVersion)
patchRevision := versionComparison == -4

// Let the current update complete if there is any
if isConditionTrue(master, updateStatus.Inprogress) {
log.Printf("Version update ingress. Continue... ")
return upgradeForBackend(master, labels, observed)
return upgradeForBackend(master, labels, observed, patchRevision)
}

if objs, versionUpdated, err := updateForUserInterface(master); err != nil {
Expand All @@ -45,23 +56,13 @@ func handleVersionUpdate(master *v1alpha1.CDAPMaster, labels map[string]string,
}

// Update backend service image version
curVersion, err := getCurrentImageVersion(master)
if err != nil {
return nil, err
}
newVersion, err := getNewImageVersion(master)
if err != nil {
return nil, err
}
if len(curVersion.rawString) == 0 {
setImageToUse(master)
return []reconciler.Object{}, nil
}

switch compareVersion(curVersion, newVersion) {
case -1:
// Upgrade case

if versionComparison < 0 {
// Upgrade case.
// Don't retry upgrade if it failed.
if isConditionTrue(master, updateStatus.UpgradeFailed) {
return []reconciler.Object{}, nil
Expand All @@ -73,15 +74,14 @@ func handleVersionUpdate(master *v1alpha1.CDAPMaster, labels map[string]string,
setCondition(master, updateStatus.Inprogress)
master.Status.UpgradeStartTimeMillis = getCurrentTimeMs()
log.Printf("Version update: start upgrading %s -> %s ", curVersion.rawString, newVersion.rawString)
return upgradeForBackend(master, labels, observed)
case 0:
return upgradeForBackend(master, labels, observed, patchRevision)
} else if versionComparison == 0 {
// No change.
// Reset all condition so that failed upgraded/downgrade can be retried later if needed.
// This is needed when last upgrade failed and user has reset the version in spec.
updateStatus.clearAllConditions(master)
break
case 1:
// Downgrade

} else {
// Downgrade case.
// At the moment, downgrade never fails, so no need to check if isConditionTrue(downgrade failed)
updateStatus.clearAllConditions(master)
setCondition(master, updateStatus.Inprogress)
Expand Down Expand Up @@ -120,7 +120,10 @@ func downgradeForBackend(master *v1alpha1.CDAPMaster) ([]reconciler.Object, erro
return []reconciler.Object{}, nil
}

func upgradeForBackend(master *v1alpha1.CDAPMaster, labels map[string]string, observed []reconciler.Object) ([]reconciler.Object, error) {
func upgradeForBackend(master *v1alpha1.CDAPMaster, labels map[string]string, observed []reconciler.Object, patchRevision bool) ([]reconciler.Object, error) {
// Skip pre-upgrade and post-upgrade jobs for patch revisions
skipPreUpgrade := patchRevision && !(master.Spec.Config[confSkipPreUpgrade] == "false")

// Find either pre- or post- upgrade job
findJob := func(jobName string) *batchv1.Job {
var job *batchv1.Job = nil
Expand Down Expand Up @@ -154,46 +157,59 @@ func upgradeForBackend(master *v1alpha1.CDAPMaster, labels map[string]string, ob
return jobObj
}

// First, run pre-upgrade job
//
// Note that pre-upgrade job doesn't have an "activeDeadlineSeconds" set it on, so it will
// try as many as imageVersionUpgradeJobMaxRetryCount times before giving up. If we ever
// needed to set an overall deadline for the pre-upgrade job, the logic below needs to check
// deadline exceeded condition on job's status
if !isConditionTrue(master, updateStatus.PreUpgradeSucceeded) {
log.Printf("Version update: pre-upgrade job not completed")
preJobName := getPreUpgradeJobName(master.Status.UpgradeStartTimeMillis)
preJobSpec := buildPreUpgradeJobSpec(getPreUpgradeJobName(master.Status.UpgradeStartTimeMillis), master, labels)
job := findJob(preJobName)
if job == nil {
obj, err := createJob(preJobSpec)
if err != nil {
return nil, err
if !skipPreUpgrade {
// First, run pre-upgrade job
//
// Note that pre-upgrade job doesn't have an "activeDeadlineSeconds" set it on, so it will
// try as many as imageVersionUpgradeJobMaxRetryCount times before giving up. If we ever
// needed to set an overall deadline for the pre-upgrade job, the logic below needs to check
// deadline exceeded condition on job's status
if !isConditionTrue(master, updateStatus.PreUpgradeSucceeded) {
log.Printf("Version update: pre-upgrade job not completed")
preJobName := getPreUpgradeJobName(master.Status.UpgradeStartTimeMillis)
preJobSpec := buildPreUpgradeJobSpec(getPreUpgradeJobName(master.Status.UpgradeStartTimeMillis), master, labels)
job := findJob(preJobName)
if job == nil {
obj, err := createJob(preJobSpec)
if err != nil {
return nil, err
}
log.Printf("Version update: creating pre-upgrade job")
return []reconciler.Object{*obj}, nil
} else if job.Status.Succeeded > 0 {
setCondition(master, updateStatus.PreUpgradeSucceeded)
log.Printf("Version update: pre-upgrade job succeeded")
// Return empty to delete preUpgrade jobObj
return []reconciler.Object{}, nil
} else if job.Status.Failed > imageVersionUpgradeJobMaxRetryCount {
setCondition(master, updateStatus.PreUpgradeFailed)
setCondition(master, updateStatus.UpgradeFailed)
clearCondition(master, updateStatus.Inprogress)
log.Printf("Version update: pre-upgrade job failed, exceeded max retries.")
return []reconciler.Object{}, nil
} else {
log.Printf("Version update: pre-upgrade job inprogress.")
return []reconciler.Object{*buildObject(job)}, nil
}
log.Printf("Version update: creating pre-upgrade job")
return []reconciler.Object{*obj}, nil
} else if job.Status.Succeeded > 0 {
setCondition(master, updateStatus.PreUpgradeSucceeded)
log.Printf("Version update: pre-upgrade job succeeded")
// Return empty to delete preUpgrade jobObj
return []reconciler.Object{}, nil
} else if job.Status.Failed > imageVersionUpgradeJobMaxRetryCount {
setCondition(master, updateStatus.PreUpgradeFailed)
setCondition(master, updateStatus.UpgradeFailed)
clearCondition(master, updateStatus.Inprogress)
log.Printf("Version update: pre-upgrade job failed, exceeded max retries.")
return []reconciler.Object{}, nil
} else {
log.Printf("Version update: pre-upgrade job inprogress.")
return []reconciler.Object{*buildObject(job)}, nil
}
}

// Then, actually update the image version
if !isConditionTrue(master, updateStatus.VersionUpdated) {
// If it's a patch revision, skip the pre and post upgrade jobs. Mark the update as succeeded.
if skipPreUpgrade {
log.Printf("Version update: patch revision detected, skipping pre-upgrade and post-upgrade jobs.")
}

setImageToUse(master)
setCondition(master, updateStatus.VersionUpdated)
log.Printf("Version update: set new version.")

if skipPreUpgrade {
setCondition(master, updateStatus.UpgradeSucceeded)
clearCondition(master, updateStatus.Inprogress)
log.Printf("Version update: upgrade succeeded.")
}
return []reconciler.Object{}, nil
}

Expand Down Expand Up @@ -406,9 +422,9 @@ func parseImageString(imageString string) (*Version, error) {
}

// compare two parsed versions
// -1: left < right
// n: left > right, nth component differs (1-indexed)
// 0: left = right
// 1: left > right
// -n: left < right, nth component differs (1-indexed)
func compareVersion(l, r *Version) int {
if l.latest && r.latest {
return 0
Expand All @@ -418,30 +434,29 @@ func compareVersion(l, r *Version) int {
return -1
}

i := 0
j := 0
for i < len(l.components) && j < len(r.components) {
if l.components[i] > r.components[j] {
return 1
} else if l.components[i] < r.components[j] {
return -1
}
i++
j++
lenL, lenR := len(l.components), len(r.components)
maxLen := lenL
if lenR > lenL {
maxLen = lenR
}
for i < len(l.components) {
if l.components[i] > 0 {
return 1

for i := 0; i < maxLen; i++ {
valL, valR := 0, 0
if i < lenL {
valL = l.components[i]
}
i++
}
for j < len(r.components) {
if r.components[j] > 0 {
return 1
if i < lenR {
valR = r.components[i]
}

if valL > valR {
return i + 1 // Return positive index (1-based) for left > right
} else if valL < valR {
return -(i + 1) // Return negative index (1-based) for left < right
}
j++
}
return 0

return 0 // Versions are equal
}

//////////////////////////////////
Expand Down Expand Up @@ -504,12 +519,12 @@ func getCurrentTimeMs() int64 {

// The returned name is just the suffix of actual k8s object name, as we prepend it with const string + CR name
func getPreUpgradeJobName(startTimeMs int64) string {
return fmt.Sprintf("pre-upgrade-job-%d", startTimeMs / 1000)
return fmt.Sprintf("pre-upgrade-job-%d", startTimeMs/1000)
}

// The returned name is just the suffix of actual k8s object name, as we prepend it with const string + CR name
func getPostUpgradeJobName(startTimeMs int64) string {
return fmt.Sprintf("post-upgrade-job-%d", startTimeMs / 1000)
return fmt.Sprintf("post-upgrade-job-%d", startTimeMs/1000)
}

// Return pre-upgrade job spec
Expand Down
67 changes: 56 additions & 11 deletions controllers/version_update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,26 @@ var _ = Describe("Controller Suite", func() {
Expect(version.latest).To(BeFalse())
Expect(version.components).To(Equal([]int{6, 0, 0, 0}))
})
It("Compare image versions", func() {
It("Compare same image versions", func() {
imagePairs := []Pair{
Pair{"gcr.io/cdapio/cdap:latest", "gcr.io/cdapio/cdap:latest"},
Pair{"gcr.io/cdapio/cdap:6.0.0.0", "gcr.io/cdapio/cdap:6.0.0"},
Pair{"gcr.io/cdapio/cdap:6.0.0.0", "gcr.io/cdapio/cdap:6.0"},
Pair{"gcr.io/cdapio/cdap:6.0.0.0", "gcr.io/cdapio/cdap:6"},
}
for _, imagePair := range imagePairs {
first, err := parseImageString(imagePair.first.(string))
Expect(err).To(BeNil())
second, err := parseImageString(imagePair.second.(string))
Expect(err).To(BeNil())
Expect(compareVersion(first, second)).To(Equal(0))
}
})
It("Compare image versions for difference in 1st component", func() {
imagePairs := []Pair{
Pair{"gcr.io/cdapio/cdap:6.0.0.0", "gcr.io/cdapio/cdap:latest"},
Pair{"gcr.io/cdapio/cdap:6.0.0.0", "gcr.io/cdapio/cdap:6.0.0.1"},
Pair{"gcr.io/cdapio/cdap:6.0.0.0", "gcr.io/cdapio/cdap:6.1.0"},
Pair{"gcr.io/cdapio/cdap:6.0.0.0", "gcr.io/cdapio/cdap:7.0.0.0"},
Pair{"gcr.io/cdapio/cdap:6.0.0.0", "gcr.io/cdapio/cdap:7.1.0"},
Pair{"gcr.io/cdapio/cdap:6.0.0.0", "gcr.io/cdapio/cdap:7"},
}
for _, imagePair := range imagePairs {
Expand All @@ -55,19 +70,49 @@ var _ = Describe("Controller Suite", func() {
Expect(compareVersion(high, low)).To(Equal(1))
}
})
It("Compare same image versions", func() {
It("Compare image versions for difference in 2nd component", func() {
imagePairs := []Pair{
Pair{"gcr.io/cdapio/cdap:latest", "gcr.io/cdapio/cdap:latest"},
Pair{"gcr.io/cdapio/cdap:6.0.0.0", "gcr.io/cdapio/cdap:6.0.0"},
Pair{"gcr.io/cdapio/cdap:6.0.0.0", "gcr.io/cdapio/cdap:6.0"},
Pair{"gcr.io/cdapio/cdap:6.0.0.0", "gcr.io/cdapio/cdap:6"},
Pair{"gcr.io/cdapio/cdap:6.0.0.0", "gcr.io/cdapio/cdap:6.1.0.0"},
Pair{"gcr.io/cdapio/cdap:6.0.0.0", "gcr.io/cdapio/cdap:6.1.0"},
Pair{"gcr.io/cdapio/cdap:6.0.0.0", "gcr.io/cdapio/cdap:6.1.2"},
Pair{"gcr.io/cdapio/cdap:6.0.0.0", "gcr.io/cdapio/cdap:6.1"},
}
for _, imagePair := range imagePairs {
first, err := parseImageString(imagePair.first.(string))
low, err := parseImageString(imagePair.first.(string))
Expect(err).To(BeNil())
second, err := parseImageString(imagePair.second.(string))
high, err := parseImageString(imagePair.second.(string))
Expect(err).To(BeNil())
Expect(compareVersion(first, second)).To(Equal(0))
Expect(compareVersion(low, high)).To(Equal(-2))
Expect(compareVersion(high, low)).To(Equal(2))
}
})
It("Compare image versions for difference in 3rd component", func() {
imagePairs := []Pair{
Pair{"gcr.io/cdapio/cdap:6.0.0.0", "gcr.io/cdapio/cdap:6.0.1.0"},
Pair{"gcr.io/cdapio/cdap:6.0.0.0", "gcr.io/cdapio/cdap:6.0.1.2"},
Pair{"gcr.io/cdapio/cdap:6.0.0.0", "gcr.io/cdapio/cdap:6.0.1"},
}
for _, imagePair := range imagePairs {
low, err := parseImageString(imagePair.first.(string))
Expect(err).To(BeNil())
high, err := parseImageString(imagePair.second.(string))
Expect(err).To(BeNil())
Expect(compareVersion(low, high)).To(Equal(-3))
Expect(compareVersion(high, low)).To(Equal(3))
}
})
It("Compare image versions for difference in 4th component", func() {
imagePairs := []Pair{
Pair{"gcr.io/cdapio/cdap:6.0.0.0", "gcr.io/cdapio/cdap:6.0.0.1"},
Pair{"gcr.io/cdapio/cdap:6.0.0.0", "gcr.io/cdapio/cdap:6.0.0.2"},
}
for _, imagePair := range imagePairs {
low, err := parseImageString(imagePair.first.(string))
Expect(err).To(BeNil())
high, err := parseImageString(imagePair.second.(string))
Expect(err).To(BeNil())
Expect(compareVersion(low, high)).To(Equal(-4))
Expect(compareVersion(high, low)).To(Equal(4))
}
})
It("Fail to parse invalid image string", func() {
Expand Down
Loading