Skip to content

Commit

Permalink
feat: jobflow 支持多jobs间共享存储卷
Browse files Browse the repository at this point in the history
  • Loading branch information
googs1025 committed Mar 29, 2024
1 parent a387195 commit fc04b84
Show file tree
Hide file tree
Showing 7 changed files with 212 additions and 15 deletions.
7 changes: 2 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/manager/signals"
"sigs.k8s.io/controller-runtime/pkg/source"
"time"
)

/*
Expand Down Expand Up @@ -59,11 +58,9 @@ func init() {

func main() {

var d time.Duration = 0
// 1. 管理器初始化
mgr, err := manager.New(k8sconfig.K8sRestConfig(), manager.Options{
Logger: logf.Log.WithName("JobFlow operator"),
SyncPeriod: &d, // resync不设置触发
mgr, err := manager.New(k8sconfig.K8sRestConfigOrDie(), manager.Options{
Logger: logf.Log.WithName("JobFlow operator"),
})
if err != nil {
mgr.GetLogger().Error(err, "unable to set up manager")
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/jobflow/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type JobFlowSpec struct {
GlobalParams GlobalParams `json:"globalParams"`
// Flows 多个 flow 步骤流程
Flows []Flow `json:"flows"`
// ShareVolumes 多个 flow 共享数据卷
ShareVolumes []corev1.Volume `json:"shareVolumes"`
// TODO: ErrorHandler 错误处理逻辑
ErrorHandler ErrorHandler
}
Expand Down Expand Up @@ -66,6 +68,8 @@ type Flow struct {
// Dependencies 依赖项,其中可以填写多个 依赖的 job name
// ex: 如果 job3 依赖 job1 and job2, 就在列表中放入 ["job1", "job2"]
Dependencies []string `json:"dependencies"`
// ShareVolumeMounts 单个 job 共享数据卷的挂载目录
ShareVolumeMounts []corev1.VolumeMount `json:"shareVolumeMounts"`
}

type JobFlowStatus struct {
Expand Down
30 changes: 22 additions & 8 deletions pkg/apis/jobflow/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 25 additions & 0 deletions pkg/controller/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,31 @@ func (r *JobFlowController) prepareJob(jobFlow *jobflowv1alpha1.JobFlow, flow *j
job.Spec = flow.JobTemplate
}

// jobflow 处理共享挂载卷
// 如果不为0 代表需要为每个job都共享挂载卷 Volumes
// 如果不为0 代表需要为每个job中的每个container都共享挂载卷 VolumeMounts
if len(jobFlow.Spec.ShareVolumes) != 0 && len(flow.ShareVolumeMounts) != 0 {
// 挂载 Volumes
if job.Spec.Template.Spec.Volumes != nil {
// 如果原本的就有,则合并
job.Spec.Template.Spec.Volumes = append(job.Spec.Template.Spec.Volumes, jobFlow.Spec.ShareVolumes...)
} else {
// 如果原本提交的 spec 没有定义 volumes 则直接赋值
job.Spec.Template.Spec.Volumes = jobFlow.Spec.ShareVolumes
}
// 挂载 VolumeMounts
for i := range job.Spec.Template.Spec.Containers {
if job.Spec.Template.Spec.Containers[i].VolumeMounts != nil {
// 如果原本的就有,则合并
job.Spec.Template.Spec.Containers[i].VolumeMounts =
append(job.Spec.Template.Spec.Containers[i].VolumeMounts, flow.ShareVolumeMounts...)
} else {
// 如果原本提交的 spec 没有定义 VolumeMounts 则直接赋值
job.Spec.Template.Spec.Containers[i].VolumeMounts = flow.ShareVolumeMounts
}
}
}

// 强制设置 job 不重启与重试次数
job.Spec.Template.Spec.RestartPolicy = v1.RestartPolicyNever
var cc int32
Expand Down
4 changes: 2 additions & 2 deletions pkg/k8sconfig/init_k8s_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"os"
)

// K8sRestConfig 集群外部使用
func K8sRestConfig() *rest.Config {
// K8sRestConfigOrDie 集群外部使用
func K8sRestConfigOrDie() *rest.Config {
// 读取配置
if os.Getenv("Release") == "1" {
klog.V(2).Info("run in the cluster")
Expand Down
112 changes: 112 additions & 0 deletions yaml/jobflow/example-shareVolume.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
apiVersion: api.practice.com/v1alpha1
kind: JobFlow
metadata:
name: jobflow-example-localpv
spec:
# 可配置任务流中的全局参数,当设置后会在每个 job 与 pod 中都生效
globalParams:
# 可决定所有 job 都运行在同一节点上
# nodeName: minikube
# 可加入 container 所需的参数
env:
- name: "FOO"
value: "bar"
- name: "QUE"
value: "pasa"
# job pod 的 annotations
annotations:
key1: value1
key2: value2
# job pod 的 labels
labels:
key1: value1
key2: value2

# 可填写多个 flow 流程
# 每个 flow 中重要字段 分别为:
# name: flow 名称,多个 flow 名称不能重复
# dependencies: 定义依赖项,如果有多个依赖可以填写多个
# jobTemplate: job 模版,支持 k8s 原生 job spec 全部字段
flows:
- name: job1
dependencies: []
shareVolumeMounts:
- mountPath: /data
name: mypvc
jobTemplate:
template:
spec:
containers:
- image: busybox:1.28
command:
- sh
- -c
- sleep 10s
imagePullPolicy: IfNotPresent
name: nginx
- name: job2
shareVolumeMounts:
- mountPath: /data
name: mypvc
jobTemplate:
template:
spec:
containers:
- image: busybox:1.28
command:
- sh
- -c
- sleep 100s
imagePullPolicy: IfNotPresent
name: nginx
dependencies:
- job1 # 代表 job2 依赖 job1 完成后才开始启动
- name: job3
jobTemplate:
template:
spec:
containers:
- image: busybox:1.28
command:
- sh
- -c
- sleep 100s
imagePullPolicy: IfNotPresent
name: nginx
dependencies:
# 代表 job3 依赖 job1 job2 完成后才开始启动
- job1
- job2
- name: job4
jobTemplate:
template:
spec:
containers:
- image: busybox:1.28
command:
- sh
- -c
- sleep 10s
imagePullPolicy: IfNotPresent
name: nginx
- name: job5
dependencies:
# 代表依赖 job2 job4 后才执行
- job4
- job2
jobTemplate:
template:
spec:
containers:
- image: busybox:1.28
command:
- sh
- -c
- sleep 10s
imagePullPolicy: IfNotPresent
name: nginx
shareVolumes:
- name: mypvc
persistentVolumeClaim:
claimName: mylocalpvc

45 changes: 45 additions & 0 deletions yaml/jobflow/local-pv.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
apiVersion: v1
kind: PersistentVolume
metadata:
name: mylocalpv
spec:
capacity:
storage: 2Gi
volumeMode: Filesystem
accessModes:
- ReadWriteOnce
persistentVolumeReclaimPolicy: Retain
storageClassName: my-local-storage # 和 StorageClass 一致
local:
path: /data # 如果使用 kind 部署,这个目录是 docker 中的映设目录
# 节点亲和性
nodeAffinity:
required:
nodeSelectorTerms:
# 指定集群中的节点
- matchExpressions:
- key: kubernetes.io/hostname
operator: In
values:
- minikube
---
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: my-local-storage
provisioner: kubernetes.io/no-provisioner
# 延迟绑定
volumeBindingMode: WaitForFirstConsumer
reclaimPolicy: Retain
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: mylocalpvc
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 1Gi
storageClassName: my-local-storage

0 comments on commit fc04b84

Please sign in to comment.