Skip to content

Commit

Permalink
fix: fix some error output && add some unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
googs1025 committed Mar 24, 2024
1 parent d4c2e21 commit 4f0e5a0
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 53 deletions.
15 changes: 8 additions & 7 deletions pkg/controller/daemonjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ package controller
import (
"context"
"fmt"
"sort"
"strings"
"time"

"github.com/go-logr/logr"
daemonjobv1alpha1 "github.com/myoperator/jobflowoperator/pkg/apis/daemonjob/v1alpha1"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -17,9 +20,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sort"
"strings"
"time"

daemonjobv1alpha1 "github.com/myoperator/jobflowoperator/pkg/apis/daemonjob/v1alpha1"
)

type DaemonJobController struct {
Expand Down Expand Up @@ -54,7 +56,7 @@ func (r *DaemonJobController) Reconcile(ctx context.Context,
klog.Info(fmt.Sprintf("not found daemonJob : %v", req.Name))
return reconcile.Result{}, nil
}
klog.Error(err, err.Error())
klog.Error("get DaemonJob error: ", err.Error())
r.event.Eventf(daemonJob, v1.EventTypeWarning, "Created", err.Error())
return reconcile.Result{}, err
}
Expand All @@ -67,7 +69,6 @@ func (r *DaemonJobController) Reconcile(ctx context.Context,
if err = r.deployDaemonJob(ctx, daemonJob); err != nil {
klog.Error("deploy DaemonJob error: ", err)
r.event.Eventf(daemonJob, v1.EventTypeWarning, "Failed", err.Error())

return reconcile.Result{Requeue: true, RequeueAfter: time.Second * 60}, err
}

Expand Down Expand Up @@ -99,7 +100,7 @@ func (r *DaemonJobController) deployDaemonJob(ctx context.Context,
}

// 其他节点:先 get 一下,如果不存在则创建,若存在就不处理

// job 对象
job := prepareJobFromDaemonJob(daemonJob, daemonJob.Name, v.Name)
namespacedNameJob := types.NamespacedName{
Namespace: daemonJob.Namespace,
Expand Down
21 changes: 12 additions & 9 deletions pkg/controller/daemonjob_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package controller

import (
"context"
daemonjobv1alpha1 "github.com/myoperator/jobflowoperator/pkg/apis/daemonjob/v1alpha1"
jobflowv1alpha1 "github.com/myoperator/jobflowoperator/pkg/apis/jobflow/v1alpha1"
. "github.com/smartystreets/goconvey/convey"
"testing"
"time"

"github.com/smartystreets/goconvey/convey"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -17,8 +18,9 @@ import (
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"testing"
"time"

daemonjobv1alpha1 "github.com/myoperator/jobflowoperator/pkg/apis/daemonjob/v1alpha1"
jobflowv1alpha1 "github.com/myoperator/jobflowoperator/pkg/apis/jobflow/v1alpha1"
)

func createDaemonJobController(initObjs ...client.Object) *DaemonJobController {
Expand All @@ -36,7 +38,7 @@ func createDaemonJobController(initObjs ...client.Object) *DaemonJobController {
}

func TestDaemonJobController_Reconcile(t *testing.T) {
Convey("Test DaemonJob Reconcile", t, func() {
convey.Convey("Test DaemonJob Reconcile", t, func() {
node1 := createNode("node1")
node2 := createNode("node2")
node3 := createNode("node3")
Expand All @@ -49,7 +51,7 @@ func TestDaemonJobController_Reconcile(t *testing.T) {
},
}
_, err := reconcileController.Reconcile(context.TODO(), request)
So(err, ShouldBeNil)
convey.So(err, convey.ShouldBeNil)

job1 := newJobFromDaemonJob(daemonjob, "daemonjob-test-node1")
// 过两秒后更新 job1 状态
Expand All @@ -58,10 +60,11 @@ func TestDaemonJobController_Reconcile(t *testing.T) {
job1.Status = batchv1.JobStatus{
Succeeded: 1,
}
reconcileController.client.Status().Update(context.TODO(), job1)
err = reconcileController.client.Status().Update(context.TODO(), job1)
convey.So(err, convey.ShouldBeNil)
}
_, err = reconcileController.Reconcile(context.TODO(), request)
So(err, ShouldBeNil)
convey.So(err, convey.ShouldBeNil)
})
}

Expand Down
26 changes: 15 additions & 11 deletions pkg/controller/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package controller
import (
"context"
"fmt"
jobtemplatev1alpha1 "github.com/myoperator/jobflowoperator/pkg/apis/jobTemplate/v1alpha1"
jobflowv1alpha1 "github.com/myoperator/jobflowoperator/pkg/apis/jobflow/v1alpha1"
"time"

"github.com/myoperator/jobflowoperator/pkg/common"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
Expand All @@ -15,7 +15,9 @@ import (
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"time"

jobtemplatev1alpha1 "github.com/myoperator/jobflowoperator/pkg/apis/jobTemplate/v1alpha1"
jobflowv1alpha1 "github.com/myoperator/jobflowoperator/pkg/apis/jobflow/v1alpha1"
)

// deploy job by dependence order.
Expand All @@ -28,7 +30,10 @@ func (r *JobFlowController) deployJobFlow(ctx context.Context, jobFlow jobflowv1
Name: jobName,
}
// job 对象
job := r.prepareJob(&jobFlow, &jobFlow.Spec.Flows[i], jobName)
job, err := r.prepareJob(&jobFlow, &jobFlow.Spec.Flows[i], jobName)
if err != nil {
return err
}

// 如果没拿到这个 job
if err := r.client.Get(ctx, namespacedNameJob, job); err != nil {
Expand Down Expand Up @@ -136,17 +141,16 @@ func (r *JobFlowController) findJobTemplateByNameNamespace(name, namespace strin
if err != nil {
// If no instance is found, it will be returned directly
if errors.IsNotFound(err) {
klog.Info(fmt.Sprintf("not found JobTemplate : %v", name))
return jobTemplate.Spec.JobTemplate, nil
klog.Error(fmt.Sprintf("not found JobTemplate : %v", name))
return jobTemplate.Spec.JobTemplate, errors.NewBadRequest(fmt.Sprintf("not found JobTemplate : %v", name))
}
klog.Error(err, err.Error())
r.event.Eventf(jobTemplate, v1.EventTypeWarning, "Created", err.Error())
klog.Error("get JobTemplate error: ", err.Error())
return jobTemplate.Spec.JobTemplate, err
}
return jobTemplate.Spec.JobTemplate, nil
}

func (r *JobFlowController) prepareJob(jobFlow *jobflowv1alpha1.JobFlow, flow *jobflowv1alpha1.Flow, jobName string) *batchv1.Job {
func (r *JobFlowController) prepareJob(jobFlow *jobflowv1alpha1.JobFlow, flow *jobflowv1alpha1.Flow, jobName string) (*batchv1.Job, error) {
// job 对象
job := &batchv1.Job{}

Expand All @@ -169,7 +173,7 @@ func (r *JobFlowController) prepareJob(jobFlow *jobflowv1alpha1.JobFlow, flow *j
jobTemplateRefSpec, err := r.findJobTemplateByNameNamespace(flow.JobTemplateRef, jobFlow.Namespace)
if err != nil {
klog.Error("find JobTemplate failed: ", err)
return nil
return nil, err
}
job.Spec = jobTemplateRefSpec
// 加上 annotation 注名 此 job 使用此 JobTemplate
Expand Down Expand Up @@ -212,7 +216,7 @@ func (r *JobFlowController) prepareJob(jobFlow *jobflowv1alpha1.JobFlow, flow *j
}
}

return job
return job, nil
}

// getAllJobStatus 记录 Job Status
Expand Down
6 changes: 4 additions & 2 deletions pkg/controller/jobflow_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@ package controller
import (
"context"
"fmt"
"time"

"github.com/go-logr/logr"
jobflowv1alpha1 "github.com/myoperator/jobflowoperator/pkg/apis/jobflow/v1alpha1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"time"

jobflowv1alpha1 "github.com/myoperator/jobflowoperator/pkg/apis/jobflow/v1alpha1"
)

type JobFlowController struct {
Expand Down
39 changes: 26 additions & 13 deletions pkg/controller/jobflow_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package controller

import (
"context"
jobflowv1alpha1 "github.com/myoperator/jobflowoperator/pkg/apis/jobflow/v1alpha1"
. "github.com/smartystreets/goconvey/convey"
"testing"
"time"

"github.com/smartystreets/goconvey/convey"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -16,8 +18,8 @@ import (
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"testing"
"time"

jobflowv1alpha1 "github.com/myoperator/jobflowoperator/pkg/apis/jobflow/v1alpha1"
)

func createJobFlowController(initObjs ...client.Object) *JobFlowController {
Expand All @@ -37,7 +39,7 @@ func createJobFlowController(initObjs ...client.Object) *JobFlowController {
}

func TestJobFlowController_Reconcile(t *testing.T) {
Convey("Test JobFlow Reconcile", t, func() {
convey.Convey("Test JobFlow Reconcile", t, func() {
jobflow := createJobFlow("jobflow-test")
reconcileController := createJobFlowController(jobflow)
request := reconcile.Request{
Expand All @@ -47,22 +49,19 @@ func TestJobFlowController_Reconcile(t *testing.T) {
},
}
_, err := reconcileController.Reconcile(context.TODO(), request)
So(err, ShouldBeNil)
convey.So(err, convey.ShouldBeNil)

job1 := newJob(jobflow, "jobflow-test-jobflow-test1")
// 过两秒后更新 job1 状态
select {
case <-time.After(time.Second * 2):
if <-time.After(time.Second * 2); true {
job1.Status = batchv1.JobStatus{
Succeeded: 1,
}
err := reconcileController.client.Status().Update(context.TODO(), job1)
if err != nil {
return
}
err = reconcileController.client.Status().Update(context.TODO(), job1)
convey.So(err, convey.ShouldBeNil)
}
_, err = reconcileController.Reconcile(context.TODO(), request)
So(err, ShouldBeNil)
convey.So(err, convey.ShouldBeNil)
})
}

Expand Down Expand Up @@ -109,6 +108,20 @@ func createJobFlow(jobName string) *jobflowv1alpha1.JobFlow {
UID: "12345",
},
Spec: jobflowv1alpha1.JobFlowSpec{
GlobalParams: jobflowv1alpha1.GlobalParams{
Labels: map[string]string{
"key": "value",
},
Annotations: map[string]string{
"key": "value",
},
Env: []v1.EnvVar{
{
Name: "test",
Value: "test",
},
},
},
Flows: flows,
},
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/controller/jobflow_template_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package controller
import (
"context"
"fmt"
"strings"
"time"

"github.com/go-logr/logr"
jobtemplatev1alpha1 "github.com/myoperator/jobflowoperator/pkg/apis/jobTemplate/v1alpha1"
"github.com/myoperator/jobflowoperator/pkg/common"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
Expand All @@ -17,8 +19,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"strings"
"time"

jobtemplatev1alpha1 "github.com/myoperator/jobflowoperator/pkg/apis/jobTemplate/v1alpha1"
)

type JobTemplateController struct {
Expand Down
19 changes: 11 additions & 8 deletions pkg/controller/jobflow_template_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package controller

import (
"context"
jobtemplatev1alpha1 "github.com/myoperator/jobflowoperator/pkg/apis/jobTemplate/v1alpha1"
. "github.com/smartystreets/goconvey/convey"
"testing"
"time"

"github.com/smartystreets/goconvey/convey"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -16,12 +18,12 @@ import (
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"testing"
"time"

jobtemplatev1alpha1 "github.com/myoperator/jobflowoperator/pkg/apis/jobTemplate/v1alpha1"
)

func TestJobTemplateController_Reconcile(t *testing.T) {
Convey("Test JobFlow Reconcile", t, func() {
convey.Convey("Test JobFlow Reconcile", t, func() {
jobtemplate := createJobTemplate("jobtemplate-test")
reconcileController := createJobTemplateController(jobtemplate)
request := reconcile.Request{
Expand All @@ -31,15 +33,16 @@ func TestJobTemplateController_Reconcile(t *testing.T) {
},
}
_, err := reconcileController.Reconcile(context.TODO(), request)
So(err, ShouldBeNil)
convey.So(err, convey.ShouldBeNil)

// 过两秒后更新 job1 状态
if <-time.After(time.Second * 2); true {
jobtemplate.Status = jobtemplatev1alpha1.JobTemplateStatus{}
reconcileController.client.Status().Update(context.TODO(), jobtemplate)
err = reconcileController.client.Status().Update(context.TODO(), jobtemplate)
convey.So(err, convey.ShouldBeNil)
}
_, err = reconcileController.Reconcile(context.TODO(), request)
So(err, ShouldBeNil)
convey.So(err, convey.ShouldBeNil)
})
}

Expand Down

0 comments on commit 4f0e5a0

Please sign in to comment.