diff --git a/pkg/controller/daemonjob_controller.go b/pkg/controller/daemonjob_controller.go index 535a7b7..eafe5a7 100644 --- a/pkg/controller/daemonjob_controller.go +++ b/pkg/controller/daemonjob_controller.go @@ -3,8 +3,11 @@ package controller import ( "context" "fmt" + "sort" + "strings" + "time" + "github.com/go-logr/logr" - daemonjobv1alpha1 "github.com/myoperator/jobflowoperator/pkg/apis/daemonjob/v1alpha1" batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -17,9 +20,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/reconcile" - "sort" - "strings" - "time" + + daemonjobv1alpha1 "github.com/myoperator/jobflowoperator/pkg/apis/daemonjob/v1alpha1" ) type DaemonJobController struct { @@ -54,7 +56,7 @@ func (r *DaemonJobController) Reconcile(ctx context.Context, klog.Info(fmt.Sprintf("not found daemonJob : %v", req.Name)) return reconcile.Result{}, nil } - klog.Error(err, err.Error()) + klog.Error("get DaemonJob error: ", err.Error()) r.event.Eventf(daemonJob, v1.EventTypeWarning, "Created", err.Error()) return reconcile.Result{}, err } @@ -67,7 +69,6 @@ func (r *DaemonJobController) Reconcile(ctx context.Context, if err = r.deployDaemonJob(ctx, daemonJob); err != nil { klog.Error("deploy DaemonJob error: ", err) r.event.Eventf(daemonJob, v1.EventTypeWarning, "Failed", err.Error()) - return reconcile.Result{Requeue: true, RequeueAfter: time.Second * 60}, err } @@ -99,7 +100,7 @@ func (r *DaemonJobController) deployDaemonJob(ctx context.Context, } // 其他节点:先 get 一下,如果不存在则创建,若存在就不处理 - + // job 对象 job := prepareJobFromDaemonJob(daemonJob, daemonJob.Name, v.Name) namespacedNameJob := types.NamespacedName{ Namespace: daemonJob.Namespace, diff --git a/pkg/controller/daemonjob_controller_test.go b/pkg/controller/daemonjob_controller_test.go index b7247f7..2d6dd8b 100644 --- a/pkg/controller/daemonjob_controller_test.go +++ b/pkg/controller/daemonjob_controller_test.go @@ -2,9 +2,10 @@ package controller import ( "context" - daemonjobv1alpha1 "github.com/myoperator/jobflowoperator/pkg/apis/daemonjob/v1alpha1" - jobflowv1alpha1 "github.com/myoperator/jobflowoperator/pkg/apis/jobflow/v1alpha1" - . "github.com/smartystreets/goconvey/convey" + "testing" + "time" + + "github.com/smartystreets/goconvey/convey" batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -17,8 +18,9 @@ import ( logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log/zap" "sigs.k8s.io/controller-runtime/pkg/reconcile" - "testing" - "time" + + daemonjobv1alpha1 "github.com/myoperator/jobflowoperator/pkg/apis/daemonjob/v1alpha1" + jobflowv1alpha1 "github.com/myoperator/jobflowoperator/pkg/apis/jobflow/v1alpha1" ) func createDaemonJobController(initObjs ...client.Object) *DaemonJobController { @@ -36,7 +38,7 @@ func createDaemonJobController(initObjs ...client.Object) *DaemonJobController { } func TestDaemonJobController_Reconcile(t *testing.T) { - Convey("Test DaemonJob Reconcile", t, func() { + convey.Convey("Test DaemonJob Reconcile", t, func() { node1 := createNode("node1") node2 := createNode("node2") node3 := createNode("node3") @@ -49,7 +51,7 @@ func TestDaemonJobController_Reconcile(t *testing.T) { }, } _, err := reconcileController.Reconcile(context.TODO(), request) - So(err, ShouldBeNil) + convey.So(err, convey.ShouldBeNil) job1 := newJobFromDaemonJob(daemonjob, "daemonjob-test-node1") // 过两秒后更新 job1 状态 @@ -58,10 +60,11 @@ func TestDaemonJobController_Reconcile(t *testing.T) { job1.Status = batchv1.JobStatus{ Succeeded: 1, } - reconcileController.client.Status().Update(context.TODO(), job1) + err = reconcileController.client.Status().Update(context.TODO(), job1) + convey.So(err, convey.ShouldBeNil) } _, err = reconcileController.Reconcile(context.TODO(), request) - So(err, ShouldBeNil) + convey.So(err, convey.ShouldBeNil) }) } diff --git a/pkg/controller/helper.go b/pkg/controller/helper.go index 465a923..03a76bd 100644 --- a/pkg/controller/helper.go +++ b/pkg/controller/helper.go @@ -3,8 +3,8 @@ package controller import ( "context" "fmt" - jobtemplatev1alpha1 "github.com/myoperator/jobflowoperator/pkg/apis/jobTemplate/v1alpha1" - jobflowv1alpha1 "github.com/myoperator/jobflowoperator/pkg/apis/jobflow/v1alpha1" + "time" + "github.com/myoperator/jobflowoperator/pkg/common" batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" @@ -15,7 +15,9 @@ import ( "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/reconcile" - "time" + + jobtemplatev1alpha1 "github.com/myoperator/jobflowoperator/pkg/apis/jobTemplate/v1alpha1" + jobflowv1alpha1 "github.com/myoperator/jobflowoperator/pkg/apis/jobflow/v1alpha1" ) // deploy job by dependence order. @@ -28,7 +30,10 @@ func (r *JobFlowController) deployJobFlow(ctx context.Context, jobFlow jobflowv1 Name: jobName, } // job 对象 - job := r.prepareJob(&jobFlow, &jobFlow.Spec.Flows[i], jobName) + job, err := r.prepareJob(&jobFlow, &jobFlow.Spec.Flows[i], jobName) + if err != nil { + return err + } // 如果没拿到这个 job if err := r.client.Get(ctx, namespacedNameJob, job); err != nil { @@ -136,17 +141,16 @@ func (r *JobFlowController) findJobTemplateByNameNamespace(name, namespace strin if err != nil { // If no instance is found, it will be returned directly if errors.IsNotFound(err) { - klog.Info(fmt.Sprintf("not found JobTemplate : %v", name)) - return jobTemplate.Spec.JobTemplate, nil + klog.Error(fmt.Sprintf("not found JobTemplate : %v", name)) + return jobTemplate.Spec.JobTemplate, errors.NewBadRequest(fmt.Sprintf("not found JobTemplate : %v", name)) } - klog.Error(err, err.Error()) - r.event.Eventf(jobTemplate, v1.EventTypeWarning, "Created", err.Error()) + klog.Error("get JobTemplate error: ", err.Error()) return jobTemplate.Spec.JobTemplate, err } return jobTemplate.Spec.JobTemplate, nil } -func (r *JobFlowController) prepareJob(jobFlow *jobflowv1alpha1.JobFlow, flow *jobflowv1alpha1.Flow, jobName string) *batchv1.Job { +func (r *JobFlowController) prepareJob(jobFlow *jobflowv1alpha1.JobFlow, flow *jobflowv1alpha1.Flow, jobName string) (*batchv1.Job, error) { // job 对象 job := &batchv1.Job{} @@ -169,7 +173,7 @@ func (r *JobFlowController) prepareJob(jobFlow *jobflowv1alpha1.JobFlow, flow *j jobTemplateRefSpec, err := r.findJobTemplateByNameNamespace(flow.JobTemplateRef, jobFlow.Namespace) if err != nil { klog.Error("find JobTemplate failed: ", err) - return nil + return nil, err } job.Spec = jobTemplateRefSpec // 加上 annotation 注名 此 job 使用此 JobTemplate @@ -212,7 +216,7 @@ func (r *JobFlowController) prepareJob(jobFlow *jobflowv1alpha1.JobFlow, flow *j } } - return job + return job, nil } // getAllJobStatus 记录 Job Status diff --git a/pkg/controller/jobflow_controller.go b/pkg/controller/jobflow_controller.go index 0fd4eab..c833907 100644 --- a/pkg/controller/jobflow_controller.go +++ b/pkg/controller/jobflow_controller.go @@ -3,8 +3,9 @@ package controller import ( "context" "fmt" + "time" + "github.com/go-logr/logr" - jobflowv1alpha1 "github.com/myoperator/jobflowoperator/pkg/apis/jobflow/v1alpha1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" @@ -12,7 +13,8 @@ import ( "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" - "time" + + jobflowv1alpha1 "github.com/myoperator/jobflowoperator/pkg/apis/jobflow/v1alpha1" ) type JobFlowController struct { diff --git a/pkg/controller/jobflow_controller_test.go b/pkg/controller/jobflow_controller_test.go index e608452..bbe420f 100644 --- a/pkg/controller/jobflow_controller_test.go +++ b/pkg/controller/jobflow_controller_test.go @@ -2,8 +2,10 @@ package controller import ( "context" - jobflowv1alpha1 "github.com/myoperator/jobflowoperator/pkg/apis/jobflow/v1alpha1" - . "github.com/smartystreets/goconvey/convey" + "testing" + "time" + + "github.com/smartystreets/goconvey/convey" batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -16,8 +18,8 @@ import ( logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log/zap" "sigs.k8s.io/controller-runtime/pkg/reconcile" - "testing" - "time" + + jobflowv1alpha1 "github.com/myoperator/jobflowoperator/pkg/apis/jobflow/v1alpha1" ) func createJobFlowController(initObjs ...client.Object) *JobFlowController { @@ -37,7 +39,7 @@ func createJobFlowController(initObjs ...client.Object) *JobFlowController { } func TestJobFlowController_Reconcile(t *testing.T) { - Convey("Test JobFlow Reconcile", t, func() { + convey.Convey("Test JobFlow Reconcile", t, func() { jobflow := createJobFlow("jobflow-test") reconcileController := createJobFlowController(jobflow) request := reconcile.Request{ @@ -47,22 +49,19 @@ func TestJobFlowController_Reconcile(t *testing.T) { }, } _, err := reconcileController.Reconcile(context.TODO(), request) - So(err, ShouldBeNil) + convey.So(err, convey.ShouldBeNil) job1 := newJob(jobflow, "jobflow-test-jobflow-test1") // 过两秒后更新 job1 状态 - select { - case <-time.After(time.Second * 2): + if <-time.After(time.Second * 2); true { job1.Status = batchv1.JobStatus{ Succeeded: 1, } - err := reconcileController.client.Status().Update(context.TODO(), job1) - if err != nil { - return - } + err = reconcileController.client.Status().Update(context.TODO(), job1) + convey.So(err, convey.ShouldBeNil) } _, err = reconcileController.Reconcile(context.TODO(), request) - So(err, ShouldBeNil) + convey.So(err, convey.ShouldBeNil) }) } @@ -109,6 +108,20 @@ func createJobFlow(jobName string) *jobflowv1alpha1.JobFlow { UID: "12345", }, Spec: jobflowv1alpha1.JobFlowSpec{ + GlobalParams: jobflowv1alpha1.GlobalParams{ + Labels: map[string]string{ + "key": "value", + }, + Annotations: map[string]string{ + "key": "value", + }, + Env: []v1.EnvVar{ + { + Name: "test", + Value: "test", + }, + }, + }, Flows: flows, }, } diff --git a/pkg/controller/jobflow_template_controller.go b/pkg/controller/jobflow_template_controller.go index ccdb0e1..d182ade 100644 --- a/pkg/controller/jobflow_template_controller.go +++ b/pkg/controller/jobflow_template_controller.go @@ -3,8 +3,10 @@ package controller import ( "context" "fmt" + "strings" + "time" + "github.com/go-logr/logr" - jobtemplatev1alpha1 "github.com/myoperator/jobflowoperator/pkg/apis/jobTemplate/v1alpha1" "github.com/myoperator/jobflowoperator/pkg/common" batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" @@ -17,8 +19,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/reconcile" - "strings" - "time" + + jobtemplatev1alpha1 "github.com/myoperator/jobflowoperator/pkg/apis/jobTemplate/v1alpha1" ) type JobTemplateController struct { diff --git a/pkg/controller/jobflow_template_controller_test.go b/pkg/controller/jobflow_template_controller_test.go index 421a9a5..cffd28e 100644 --- a/pkg/controller/jobflow_template_controller_test.go +++ b/pkg/controller/jobflow_template_controller_test.go @@ -2,8 +2,10 @@ package controller import ( "context" - jobtemplatev1alpha1 "github.com/myoperator/jobflowoperator/pkg/apis/jobTemplate/v1alpha1" - . "github.com/smartystreets/goconvey/convey" + "testing" + "time" + + "github.com/smartystreets/goconvey/convey" batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -16,12 +18,12 @@ import ( logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log/zap" "sigs.k8s.io/controller-runtime/pkg/reconcile" - "testing" - "time" + + jobtemplatev1alpha1 "github.com/myoperator/jobflowoperator/pkg/apis/jobTemplate/v1alpha1" ) func TestJobTemplateController_Reconcile(t *testing.T) { - Convey("Test JobFlow Reconcile", t, func() { + convey.Convey("Test JobFlow Reconcile", t, func() { jobtemplate := createJobTemplate("jobtemplate-test") reconcileController := createJobTemplateController(jobtemplate) request := reconcile.Request{ @@ -31,15 +33,16 @@ func TestJobTemplateController_Reconcile(t *testing.T) { }, } _, err := reconcileController.Reconcile(context.TODO(), request) - So(err, ShouldBeNil) + convey.So(err, convey.ShouldBeNil) // 过两秒后更新 job1 状态 if <-time.After(time.Second * 2); true { jobtemplate.Status = jobtemplatev1alpha1.JobTemplateStatus{} - reconcileController.client.Status().Update(context.TODO(), jobtemplate) + err = reconcileController.client.Status().Update(context.TODO(), jobtemplate) + convey.So(err, convey.ShouldBeNil) } _, err = reconcileController.Reconcile(context.TODO(), request) - So(err, ShouldBeNil) + convey.So(err, convey.ShouldBeNil) }) }