Skip to content
This repository was archived by the owner on Sep 2, 2022. It is now read-only.

exposing a scale resource and way to scale jobs #425

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ clusters and jobs.
* Batch scheduling for JobManager and TaskManager Pods
* GCP integration (service account, GCS connector, networking)
* Support for Beam Python jobs
* Support for scaling Jobs (by attaching an HPA)

## Installation

Expand Down
10 changes: 9 additions & 1 deletion api/v1beta1/flinkcluster_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,18 @@ func _SetJobDefault(jobSpec *JobSpec) {
jobSpec.AllowNonRestoredState = new(bool)
*jobSpec.AllowNonRestoredState = false
}
if jobSpec.Parallelism == nil {
// Parallelism and ParallelismPerTaskManager are mutually exclusive: If one is set do not fill the default so that
// cluster validator will reject the config.
if jobSpec.Parallelism == nil && jobSpec.ParallelismPerTaskManager == nil {
jobSpec.Parallelism = new(int32)
*jobSpec.Parallelism = 1
}
// Note that this setting may not be ideal for some jobs: It should be set explicitly if there are concerns on
// performance of many key partitions.
if jobSpec.MaxParallelism == nil {
jobSpec.MaxParallelism = new(int32)
*jobSpec.MaxParallelism = 32768
}
if jobSpec.NoLoggingToStdout == nil {
jobSpec.NoLoggingToStdout = new(bool)
*jobSpec.NoLoggingToStdout = false
Expand Down
14 changes: 10 additions & 4 deletions api/v1beta1/flinkcluster_default_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func TestSetDefault(t *testing.T) {
var defaultTmQueryPort = int32(6125)
var defaultJobAllowNonRestoredState = false
var defaultJobParallelism = int32(1)
var defaultMaxParallelism = int32(32768)
var defaultJobNoLoggingToStdout = false
var defaultJobRestartPolicy = JobRestartPolicyNever
var defatulJobManagerIngressTLSUse = false
Expand Down Expand Up @@ -99,10 +100,12 @@ func TestSetDefault(t *testing.T) {
SecurityContext: nil,
},
Job: &JobSpec{
AllowNonRestoredState: &defaultJobAllowNonRestoredState,
Parallelism: &defaultJobParallelism,
NoLoggingToStdout: &defaultJobNoLoggingToStdout,
RestartPolicy: &defaultJobRestartPolicy,
AllowNonRestoredState: &defaultJobAllowNonRestoredState,
Parallelism: &defaultJobParallelism,
MaxParallelism: &defaultMaxParallelism,
ParallelismPerTaskManager: nil,
NoLoggingToStdout: &defaultJobNoLoggingToStdout,
RestartPolicy: &defaultJobRestartPolicy,
CleanupPolicy: &CleanupPolicy{
AfterJobSucceeds: "DeleteCluster",
AfterJobFails: "KeepCluster",
Expand Down Expand Up @@ -139,6 +142,7 @@ func TestSetNonDefault(t *testing.T) {
var tmQueryPort = int32(8125)
var jobAllowNonRestoredState = true
var jobParallelism = int32(2)
var maxParallelism = int32(2)
var jobNoLoggingToStdout = true
var jobRestartPolicy = JobRestartPolicyFromSavepointOnFailure
var jobManagerIngressTLSUse = true
Expand Down Expand Up @@ -195,6 +199,7 @@ func TestSetNonDefault(t *testing.T) {
Job: &JobSpec{
AllowNonRestoredState: &jobAllowNonRestoredState,
Parallelism: &jobParallelism,
MaxParallelism: &maxParallelism,
NoLoggingToStdout: &jobNoLoggingToStdout,
RestartPolicy: &jobRestartPolicy,
SecurityContext: &securityContext,
Expand Down Expand Up @@ -259,6 +264,7 @@ func TestSetNonDefault(t *testing.T) {
Job: &JobSpec{
AllowNonRestoredState: &jobAllowNonRestoredState,
Parallelism: &jobParallelism,
MaxParallelism: &maxParallelism,
NoLoggingToStdout: &jobNoLoggingToStdout,
RestartPolicy: &jobRestartPolicy,
SecurityContext: &securityContext,
Expand Down
26 changes: 25 additions & 1 deletion api/v1beta1/flinkcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,12 @@ type JobSpec struct {
// Job parallelism, default: 1.
Parallelism *int32 `json:"parallelism,omitempty"`

// Parallelism per taskmanager, default: if not set parallelism will not scale.
ParallelismPerTaskManager *int32 `json:"parallelismPerTaskManager,omitempty"`

// MaxParallelism, default: 2^15.
MaxParallelism *int32 `json:"maxParallelism,omitempty"`

// No logging output to STDOUT, default: false.
NoLoggingToStdout *bool `json:"noLoggingToStdout,omitempty"`

Expand Down Expand Up @@ -523,12 +529,27 @@ type FlinkClusterComponentsStatus struct {
JobManagerIngress *JobManagerIngressStatus `json:"jobManagerIngress,omitempty"`

// The state of TaskManager StatefulSet.
TaskManagerStatefulSet FlinkClusterComponentState `json:"taskManagerStatefulSet"`
TaskManagerStatefulSet TaskManagerStatefulSetStatus `json:"taskManagerStatefulSet"`

// The status of the job, available only when JobSpec is provided.
Job *JobStatus `json:"job,omitempty"`
}

// TaskManagerStatefulSetStatus
type TaskManagerStatefulSetStatus struct {
// The name of the Kubernetes jobManager service.
Name string `json:"name"`

// The state of the component.
State string `json:"state"`

// The number of replicas in the tasks manager.
Replicas int32 `json:"replicas"`

// The label for the tasks manager pods.
Selector string `json:"selector"`
}

// Control state
type FlinkClusterControlStatus struct {
// Control name
Expand Down Expand Up @@ -673,8 +694,11 @@ type FlinkClusterStatus struct {
LastUpdateTime string `json:"lastUpdateTime,omitempty"`
}

// Below scale subresource watches taskManager.replicas. See https://kubernetes.io/docs/tasks/extend-kubernetes/custom-resources/custom-resource-definitions/#scale-subresource

// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +kubebuilder:subresource:scale:specpath=.spec.taskManager.replicas,statuspath=.status.components.taskManagerStatefulSet.replicas,selectorpath=.status.components.taskManagerStatefulSet.selector

// FlinkCluster is the Schema for the flinkclusters API
type FlinkCluster struct {
Expand Down
17 changes: 13 additions & 4 deletions api/v1beta1/flinkcluster_validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ func (v *Validator) validateTaskManager(tmSpec *TaskManagerSpec) error {
}

func (v *Validator) validateJob(jobSpec *JobSpec) error {

if jobSpec == nil {
return nil
}
Expand All @@ -400,11 +401,19 @@ func (v *Validator) validateJob(jobSpec *JobSpec) error {
return fmt.Errorf("job jarFile is unspecified")
}

if jobSpec.Parallelism == nil {
return fmt.Errorf("job parallelism is unspecified")
}
if *jobSpec.Parallelism < 1 {
if jobSpec.Parallelism != nil && jobSpec.ParallelismPerTaskManager != nil {
return fmt.Errorf("job parallelism and parallelismPerTaskmanager are both specified: Only one must be set")
} else if jobSpec.Parallelism == nil && jobSpec.ParallelismPerTaskManager == nil {
return fmt.Errorf("job parallelism and parallelismPerTaskmanager are unspecified: One must be set")
} else if jobSpec.Parallelism != nil && *jobSpec.Parallelism < 1 {
return fmt.Errorf("job parallelism must be >= 1")
} else if jobSpec.ParallelismPerTaskManager != nil && *jobSpec.ParallelismPerTaskManager < 1 {
return fmt.Errorf("job parallelismPerTaskmanager must be >= 1")
}
if jobSpec.MaxParallelism == nil {
return fmt.Errorf("job maxParallelism is unspecified")
} else if *jobSpec.MaxParallelism > 32768 || *jobSpec.MaxParallelism < 1 {
return fmt.Errorf("job maxParallelism must be in range [1,2^15]")
}

if jobSpec.RestartPolicy == nil {
Expand Down
161 changes: 149 additions & 12 deletions api/v1beta1/flinkcluster_validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,10 @@ func TestValidateCreate(t *testing.T) {
MemoryOffHeapMin: memoryOffHeapMin,
},
Job: &JobSpec{
JarFile: "gs://my-bucket/myjob.jar",
Parallelism: &parallelism,
RestartPolicy: &restartPolicy,
JarFile: "gs://my-bucket/myjob.jar",
Parallelism: &parallelism,
MaxParallelism: &parallelism,
RestartPolicy: &restartPolicy,
CleanupPolicy: &CleanupPolicy{
AfterJobSucceeds: CleanupActionKeepCluster,
AfterJobFails: CleanupActionDeleteTaskManager,
Expand Down Expand Up @@ -357,6 +358,8 @@ func TestInvalidJobSpec(t *testing.T) {
var invalidRestartPolicy JobRestartPolicy = "XXX"
var validator = &Validator{}
var parallelism int32 = 2
var defaultMaxParallelism int32 = 32768
var invalidMaxParallelism int32 = 0
var memoryOffHeapRatio int32 = 25
var memoryOffHeapMin = resource.MustParse("600M")

Expand Down Expand Up @@ -402,6 +405,50 @@ func TestInvalidJobSpec(t *testing.T) {
var expectedErr = "job jarFile is unspecified"
assert.Equal(t, err.Error(), expectedErr)

cluster = FlinkCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "mycluster",
Namespace: "default",
},
Spec: FlinkClusterSpec{
Image: ImageSpec{
Name: "flink:1.8.1",
PullPolicy: corev1.PullPolicy("Always"),
},
JobManager: JobManagerSpec{
Replicas: &jmReplicas,
AccessScope: AccessScopeVPC,
Ports: JobManagerPorts{
RPC: &rpcPort,
Blob: &blobPort,
Query: &queryPort,
UI: &uiPort,
},
MemoryOffHeapRatio: &memoryOffHeapRatio,
MemoryOffHeapMin: memoryOffHeapMin,
},
TaskManager: TaskManagerSpec{
Replicas: 3,
Ports: TaskManagerPorts{
RPC: &rpcPort,
Data: &dataPort,
Query: &queryPort,
},
MemoryOffHeapRatio: &memoryOffHeapRatio,
MemoryOffHeapMin: memoryOffHeapMin,
},
Job: &JobSpec{
JarFile: "gs://my-bucket/myjob.jar",
RestartPolicy: &restartPolicy,
Parallelism: &parallelism,
ParallelismPerTaskManager: &parallelism,
},
},
}
err = validator.ValidateCreate(&cluster)
expectedErr = "job parallelism and parallelismPerTaskmanager are both specified: Only one must be set"
assert.Equal(t, err.Error(), expectedErr)

cluster = FlinkCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "mycluster",
Expand Down Expand Up @@ -441,7 +488,7 @@ func TestInvalidJobSpec(t *testing.T) {
},
}
err = validator.ValidateCreate(&cluster)
expectedErr = "job parallelism is unspecified"
expectedErr = "job parallelism and parallelismPerTaskmanager are unspecified: One must be set"
assert.Equal(t, err.Error(), expectedErr)

cluster = FlinkCluster{
Expand Down Expand Up @@ -478,8 +525,96 @@ func TestInvalidJobSpec(t *testing.T) {
},
Job: &JobSpec{
JarFile: "gs://my-bucket/myjob.jar",
RestartPolicy: &restartPolicy,
Parallelism: &parallelism,
RestartPolicy: &invalidRestartPolicy,
},
},
}
err = validator.ValidateCreate(&cluster)
expectedErr = "job maxParallelism is unspecified"
assert.Equal(t, err.Error(), expectedErr)

cluster = FlinkCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "mycluster",
Namespace: "default",
},
Spec: FlinkClusterSpec{
Image: ImageSpec{
Name: "flink:1.8.1",
PullPolicy: corev1.PullPolicy("Always"),
},
JobManager: JobManagerSpec{
Replicas: &jmReplicas,
AccessScope: AccessScopeVPC,
Ports: JobManagerPorts{
RPC: &rpcPort,
Blob: &blobPort,
Query: &queryPort,
UI: &uiPort,
},
MemoryOffHeapRatio: &memoryOffHeapRatio,
MemoryOffHeapMin: memoryOffHeapMin,
},
TaskManager: TaskManagerSpec{
Replicas: 3,
Ports: TaskManagerPorts{
RPC: &rpcPort,
Data: &dataPort,
Query: &queryPort,
},
MemoryOffHeapRatio: &memoryOffHeapRatio,
MemoryOffHeapMin: memoryOffHeapMin,
},
Job: &JobSpec{
JarFile: "gs://my-bucket/myjob.jar",
RestartPolicy: &restartPolicy,
Parallelism: &parallelism,
MaxParallelism: &invalidMaxParallelism,
},
},
}
err = validator.ValidateCreate(&cluster)
expectedErr = "job maxParallelism must be in range [1,2^15]"
assert.Equal(t, err.Error(), expectedErr)

cluster = FlinkCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "mycluster",
Namespace: "default",
},
Spec: FlinkClusterSpec{
Image: ImageSpec{
Name: "flink:1.8.1",
PullPolicy: corev1.PullPolicy("Always"),
},
JobManager: JobManagerSpec{
Replicas: &jmReplicas,
AccessScope: AccessScopeVPC,
Ports: JobManagerPorts{
RPC: &rpcPort,
Blob: &blobPort,
Query: &queryPort,
UI: &uiPort,
},
MemoryOffHeapRatio: &memoryOffHeapRatio,
MemoryOffHeapMin: memoryOffHeapMin,
},
TaskManager: TaskManagerSpec{
Replicas: 3,
Ports: TaskManagerPorts{
RPC: &rpcPort,
Data: &dataPort,
Query: &queryPort,
},
MemoryOffHeapRatio: &memoryOffHeapRatio,
MemoryOffHeapMin: memoryOffHeapMin,
},
Job: &JobSpec{
JarFile: "gs://my-bucket/myjob.jar",
Parallelism: &parallelism,
MaxParallelism: &defaultMaxParallelism,
RestartPolicy: &invalidRestartPolicy,
},
},
}
Expand Down Expand Up @@ -520,9 +655,10 @@ func TestInvalidJobSpec(t *testing.T) {
MemoryOffHeapMin: memoryOffHeapMin,
},
Job: &JobSpec{
JarFile: "gs://my-bucket/myjob.jar",
Parallelism: &parallelism,
RestartPolicy: &restartPolicy,
JarFile: "gs://my-bucket/myjob.jar",
Parallelism: &parallelism,
MaxParallelism: &defaultMaxParallelism,
RestartPolicy: &restartPolicy,
CleanupPolicy: &CleanupPolicy{
AfterJobSucceeds: "XXX",
AfterJobFails: CleanupActionDeleteCluster,
Expand Down Expand Up @@ -1000,10 +1136,11 @@ func getSimpleFlinkCluster() FlinkCluster {
MemoryOffHeapMin: memoryOffHeapMin,
},
Job: &JobSpec{
JarFile: "gs://my-bucket/myjob.jar",
Parallelism: &parallelism,
RestartPolicy: &restartPolicy,
SavepointsDir: &savepointDir,
JarFile: "gs://my-bucket/myjob.jar",
Parallelism: &parallelism,
MaxParallelism: &parallelism,
RestartPolicy: &restartPolicy,
SavepointsDir: &savepointDir,
CleanupPolicy: &CleanupPolicy{
AfterJobSucceeds: CleanupActionKeepCluster,
AfterJobFails: CleanupActionDeleteTaskManager,
Expand Down
Loading