4
4
*/
5
5
package io .strimzi .operator .cluster .operator .assembly ;
6
6
7
+ import io .fabric8 .kubernetes .api .model .ConfigMap ;
7
8
import io .strimzi .api .kafka .Crds ;
8
9
import io .strimzi .api .kafka .model .common .Condition ;
9
10
import io .strimzi .api .kafka .model .rebalance .KafkaRebalance ;
10
11
import io .strimzi .api .kafka .model .rebalance .KafkaRebalanceState ;
11
12
import io .strimzi .api .kafka .model .rebalance .KafkaRebalanceStatus ;
12
- import io .strimzi .operator .cluster .operator .resource .kubernetes .ConfigMapOperator ;
13
13
import io .strimzi .operator .common .Reconciliation ;
14
14
import io .strimzi .operator .common .model .cruisecontrol .CruiseControlUserTaskStatus ;
15
15
import io .vertx .core .Future ;
16
16
import io .vertx .junit5 .Checkpoint ;
17
17
import io .vertx .junit5 .VertxExtension ;
18
18
import io .vertx .junit5 .VertxTestContext ;
19
+ import org .junit .jupiter .api .BeforeEach ;
19
20
import org .junit .jupiter .api .Test ;
21
+ import org .junit .jupiter .api .TestInfo ;
20
22
import org .junit .jupiter .api .extension .ExtendWith ;
21
23
22
24
import java .util .Map ;
44
46
@ ExtendWith (VertxExtension .class )
45
47
public class KafkaRebalanceAssemblyOperatorProgressTest extends AbstractKafkaRebalanceAssemblyOperatorTest {
46
48
47
- private KafkaRebalanceStatus getKafkaRebalanceStatus () {
48
- KafkaRebalance kafkaRebalance = Crds .kafkaRebalanceOperation (client ).inNamespace (namespace ).withName (RESOURCE_NAME ).get ();
49
- return kafkaRebalance .getStatus ();
49
+ @ BeforeEach
50
+ @ Override
51
+ public void beforeEach (TestInfo testInfo ) {
52
+ super .beforeEach (testInfo );
53
+
54
+ // Create Kafka custom resource
55
+ crdCreateKafka ();
56
+
57
+ // Create Cruise Control secrets
58
+ crdCreateCruiseControlSecrets ();
59
+
60
+ // Create KafkaRebalance custom resource
61
+ KafkaRebalance kr = createKafkaRebalance (namespace , CLUSTER_NAME , RESOURCE_NAME , EMPTY_KAFKA_REBALANCE_SPEC , true );
62
+ Crds .kafkaRebalanceOperation (client ).inNamespace (namespace ).resource (kr ).create ();
50
63
}
51
64
52
- private record RebalanceConfigMapFields (
65
+ private record RebalanceConfigMap (
66
+ boolean configMapExpected ,
53
67
boolean containsEstimatedTimeToCompletion ,
54
68
boolean containsCompletedByteMovement ,
55
69
boolean containsExecutorState ,
56
- boolean containsBrokerLoadKey
57
- ) { }
58
-
59
- private static final Map <KafkaRebalanceState , Boolean > CONFIG_MAP_EXISTS_DURING_STATE = Map .of (
60
- New , false ,
61
- PendingProposal , false ,
62
- ProposalReady , true ,
63
- Rebalancing , true ,
64
- Ready , true ,
65
- NotReady , true
66
- );
70
+ boolean containsBrokerLoad
71
+ ) {
72
+ void assertConfigMapFields (ConfigMap configMap ) {
73
+ if (configMapExpected ) {
74
+ assertThat (configMap , notNullValue ());
75
+ Map <String , String > fields = configMap .getData ();
76
+ assertThat (fields .containsKey (ESTIMATED_TIME_TO_COMPLETION_KEY ), is (containsEstimatedTimeToCompletion ));
77
+ assertThat (fields .containsKey (COMPLETED_BYTE_MOVEMENT_KEY ), is (containsCompletedByteMovement ));
78
+ assertThat (fields .containsKey (EXECUTOR_STATE_KEY ), is (containsExecutorState ));
79
+ assertThat (fields .containsKey (BROKER_LOAD_KEY ), is (containsBrokerLoad ));
80
+ } else {
81
+ assertThat (configMap , nullValue ());
82
+ }
83
+ }
84
+
85
+ void assertConfigMapKeyInStatus (KafkaRebalanceStatus status ) {
86
+ assertThat (status .getProgress ().containsKey (REBALANCE_PROGRESS_CONFIG_MAP_KEY ), is (configMapExpected ));
87
+ }
88
+ }
67
89
68
- private static final Map <KafkaRebalanceState , RebalanceConfigMapFields > STATE_TO_EXPECTED_CONFIG_MAP_FIELDS = Map .of (
69
- New , new RebalanceConfigMapFields ( false , false , false , false ),
70
- PendingProposal , new RebalanceConfigMapFields ( false , false , false , false ),
71
- ProposalReady , new RebalanceConfigMapFields ( false , true , false , true ),
72
- Rebalancing , new RebalanceConfigMapFields ( true , true , true , true ),
73
- Ready , new RebalanceConfigMapFields ( true , true , false , true ),
74
- NotReady , new RebalanceConfigMapFields ( false , true , true , true )
90
+ private static final Map <KafkaRebalanceState , RebalanceConfigMap > STATE_TO_EXPECTED_CONFIG_MAP_FIELDS = Map .of (
91
+ New , new RebalanceConfigMap ( false , false , false , false , false ),
92
+ PendingProposal , new RebalanceConfigMap ( false , false , false , false , false ),
93
+ ProposalReady , new RebalanceConfigMap ( true , false , true , false , true ),
94
+ Rebalancing , new RebalanceConfigMap ( true , true , true , true , true ),
95
+ Ready , new RebalanceConfigMap ( true , true , true , false , true ),
96
+ NotReady , new RebalanceConfigMap ( true , false , true , true , true )
75
97
);
76
98
77
- private Future <Void > verifyKafkaRebalanceStateAndMap (VertxTestContext context , KafkaRebalanceState kafkaRebalanceState , ConfigMapOperator configMapOperator ) {
78
- assertState (context , client , namespace , RESOURCE_NAME , kafkaRebalanceState );
79
-
80
- // Checks `KafkaRebalance` ConfigMap contains expected fields
81
- RebalanceConfigMapFields expectedFields = STATE_TO_EXPECTED_CONFIG_MAP_FIELDS .get (kafkaRebalanceState );
82
- return configMapOperator .getAsync (namespace , RESOURCE_NAME )
83
- .compose (configMap -> {
84
- if (configMap != null ) {
85
- assertThat (CONFIG_MAP_EXISTS_DURING_STATE .get (kafkaRebalanceState ), is (true ));
86
- assertThat (getKafkaRebalanceStatus ().getProgress ().containsKey (REBALANCE_PROGRESS_CONFIG_MAP_KEY ), is (true ));
87
-
88
- Map <String , String > fields = configMap .getData ();
89
- assertThat (fields .containsKey (ESTIMATED_TIME_TO_COMPLETION_KEY ), is (expectedFields .containsEstimatedTimeToCompletion ));
90
- assertThat (fields .containsKey (COMPLETED_BYTE_MOVEMENT_KEY ), is (expectedFields .containsCompletedByteMovement ));
91
- assertThat (fields .containsKey (EXECUTOR_STATE_KEY ), is (expectedFields .containsExecutorState ));
92
- assertThat (fields .containsKey (BROKER_LOAD_KEY ), is (expectedFields .containsBrokerLoadKey ));
93
- } else {
94
- assertThat (CONFIG_MAP_EXISTS_DURING_STATE .get (kafkaRebalanceState ), is (false ));
95
- assertThat (getKafkaRebalanceStatus ().getProgress ().containsKey (REBALANCE_PROGRESS_CONFIG_MAP_KEY ), is (false ));
96
- }
97
- return Future .succeededFuture ();
98
- });
99
+ private KafkaRebalanceStatus getKafkaRebalanceStatus () {
100
+ return Crds .kafkaRebalanceOperation (client ).inNamespace (namespace ).withName (RESOURCE_NAME ).get ().getStatus ();
99
101
}
100
102
101
- private Future <Void > mockTaskAndReconcile (CruiseControlUserTaskStatus taskStatus , boolean stateEndpointFetchError , Reconciliation reconciliation ) {
103
+ private Future <Void > verifyKafkaRebalanceStateAndConfigMap (VertxTestContext context , KafkaRebalanceState state , ConfigMap configMap ) {
104
+ assertState (context , client , namespace , RESOURCE_NAME , state );
105
+
106
+ RebalanceConfigMap expectations = STATE_TO_EXPECTED_CONFIG_MAP_FIELDS .get (state );
107
+ expectations .assertConfigMapFields (configMap );
108
+ expectations .assertConfigMapKeyInStatus (getKafkaRebalanceStatus ());
109
+
110
+ return Future .succeededFuture ();
111
+ }
112
+
113
+ private Future <ConfigMap > reconcile (Reconciliation reconciliation ) {
114
+ return krao .reconcile (reconciliation )
115
+ .compose (res -> this .supplier .configMapOperations .getAsync (namespace , RESOURCE_NAME ));
116
+ }
117
+
118
+ private Future <Void > mockCruiseControlTask (CruiseControlUserTaskStatus taskStatus , boolean stateEndpointFetchError ) {
102
119
cruiseControlServer .mockTask (taskStatus , stateEndpointFetchError );
103
- return krao . reconcile ( reconciliation );
120
+ return Future . succeededFuture ( );
104
121
}
105
122
106
123
/**
107
124
* Test progress fields of `KafkaRebalance` resource and ConfigMap during KafkaRebalance lifecycle.
108
125
*/
109
126
@ Test
110
127
public void testProgressFieldsDuringRebalanceLifecycle (VertxTestContext context ) {
111
- KafkaRebalance kr = createKafkaRebalance (namespace , CLUSTER_NAME , RESOURCE_NAME , EMPTY_KAFKA_REBALANCE_SPEC , true );
112
- Crds .kafkaRebalanceOperation (client ).inNamespace (namespace ).resource (kr ).create ();
113
- crdCreateKafka ();
114
- crdCreateCruiseControlSecrets ();
115
-
128
+ Checkpoint checkpoint = context .checkpoint ();
116
129
Reconciliation reconciliation = new Reconciliation ("test-trigger" , KafkaRebalance .RESOURCE_KIND , namespace , RESOURCE_NAME );
117
- ConfigMapOperator configMapOperator = this .supplier .configMapOperations ;
130
+ mockCruiseControlTask (ACTIVE , false )
131
+ .compose (res -> reconcile (reconciliation ))
132
+ .compose (res -> verifyKafkaRebalanceStateAndConfigMap (context , PendingProposal , res ))
118
133
119
- Checkpoint checkpoint = context .checkpoint ();
120
- cruiseControlServer .mockTask (ACTIVE , false );
121
- krao .reconcile (reconciliation )
122
- .compose (v -> verifyKafkaRebalanceStateAndMap (context , PendingProposal , configMapOperator ))
123
- .compose (v -> mockTaskAndReconcile (COMPLETED , false , reconciliation ))
124
- .compose (v -> verifyKafkaRebalanceStateAndMap (context , ProposalReady , configMapOperator ))
125
- .compose (v -> mockTaskAndReconcile (IN_EXECUTION , false , reconciliation ))
126
- .compose (v -> verifyKafkaRebalanceStateAndMap (context , Rebalancing , configMapOperator ))
127
- .compose (v -> mockTaskAndReconcile (COMPLETED , false , reconciliation ))
128
- .compose (v -> verifyKafkaRebalanceStateAndMap (context , Ready , configMapOperator ))
129
- .onSuccess (v -> checkpoint .flag ())
134
+ .compose (res -> mockCruiseControlTask (COMPLETED , false ))
135
+ .compose (res -> reconcile (reconciliation ))
136
+ .compose (res -> verifyKafkaRebalanceStateAndConfigMap (context , ProposalReady , res ))
137
+
138
+ .compose (res -> mockCruiseControlTask (IN_EXECUTION , false ))
139
+ .compose (res -> reconcile (reconciliation ))
140
+ .compose (res -> verifyKafkaRebalanceStateAndConfigMap (context , Rebalancing , res ))
141
+
142
+ .compose (res -> mockCruiseControlTask (COMPLETED , false ))
143
+ .compose (res -> reconcile (reconciliation ))
144
+ .compose (res -> verifyKafkaRebalanceStateAndConfigMap (context , Ready , res ))
145
+
146
+ .onSuccess (res -> checkpoint .flag ())
130
147
.onFailure (context ::failNow );
131
148
}
132
149
@@ -135,43 +152,42 @@ public void testProgressFieldsDuringRebalanceLifecycle(VertxTestContext context)
135
152
*/
136
153
@ Test
137
154
public void testWarningConditionPropagationForUnreachableApi (VertxTestContext context ) {
138
- KafkaRebalance kr = createKafkaRebalance (namespace , CLUSTER_NAME , RESOURCE_NAME , EMPTY_KAFKA_REBALANCE_SPEC , true );
139
- Crds .kafkaRebalanceOperation (client ).inNamespace (namespace ).resource (kr ).create ();
140
- crdCreateKafka ();
141
- crdCreateCruiseControlSecrets ();
142
-
143
- Reconciliation reconciliation = new Reconciliation ("test-trigger" , KafkaRebalance .RESOURCE_KIND , namespace , RESOURCE_NAME );
144
- ConfigMapOperator configMapOperator = this .supplier .configMapOperations ;
145
-
146
155
Checkpoint checkpoint = context .checkpoint ();
147
- cruiseControlServer .mockTask (COMPLETED , false );
148
- krao .reconcile (reconciliation )
149
- .compose (v -> verifyKafkaRebalanceStateAndMap (context , ProposalReady , configMapOperator ))
150
- .compose (v -> mockTaskAndReconcile (IN_EXECUTION , false , reconciliation ))
151
- .compose (v -> mockTaskAndReconcile (IN_EXECUTION , true , reconciliation ))
152
- .compose (v -> verifyKafkaRebalanceStateAndMap (context , Rebalancing , configMapOperator ))
153
- .map (v -> {
156
+ Reconciliation reconciliation = new Reconciliation ("test-trigger" , KafkaRebalance .RESOURCE_KIND , namespace , RESOURCE_NAME );
157
+ mockCruiseControlTask (COMPLETED , false )
158
+ .compose (res -> reconcile (reconciliation ))
159
+ .compose (res -> verifyKafkaRebalanceStateAndConfigMap (context , ProposalReady , res ))
160
+
161
+ .compose (res -> mockCruiseControlTask (IN_EXECUTION , false ))
162
+ .compose (res -> reconcile (reconciliation ))
163
+ .compose (res -> verifyKafkaRebalanceStateAndConfigMap (context , Rebalancing , res ))
164
+
165
+ .compose (res -> mockCruiseControlTask (IN_EXECUTION , true ))
166
+ .compose (res -> reconcile (reconciliation ))
167
+ .compose (res -> verifyKafkaRebalanceStateAndConfigMap (context , Rebalancing , res ))
168
+ .map (res -> {
154
169
// Test that warning condition was added to resource when Cruise Control API is unreachable.
155
170
Condition warningCondition1 = KafkaRebalanceUtils .getWarningCondition (getKafkaRebalanceStatus ());
156
171
assertThat (warningCondition1 , notNullValue ());
157
172
return warningCondition1 ;
158
173
})
159
- .compose (warningCondition1 -> mockTaskAndReconcile (IN_EXECUTION , true , reconciliation )
160
- .map (v -> {
161
- verifyKafkaRebalanceStateAndMap (context , Rebalancing , configMapOperator );
174
+ .compose (warningCondition1 ->
175
+ mockCruiseControlTask (IN_EXECUTION , true )
176
+ .compose (res -> reconcile (reconciliation ))
177
+ .compose (res -> verifyKafkaRebalanceStateAndConfigMap (context , Rebalancing , res ))
178
+ .onSuccess (v -> {
162
179
163
180
// Test that the warning condition was not updated.
164
181
Condition warningCondition2 = KafkaRebalanceUtils .getWarningCondition (getKafkaRebalanceStatus ());
165
182
assertThat (warningCondition1 .getReason (), is (warningCondition2 .getReason ()));
166
183
assertThat (warningCondition1 .getMessage (), is (warningCondition2 .getMessage ()));
167
184
assertThat (warningCondition1 .getLastTransitionTime (), is (warningCondition2 .getLastTransitionTime ()));
168
185
169
- cruiseControlServer .mockTask (COMPLETED , false );
170
- return null ;
171
186
}))
172
- .compose (v -> mockTaskAndReconcile (COMPLETED , true , reconciliation ))
187
+ .compose (v -> mockCruiseControlTask (COMPLETED , false ))
188
+ .compose (res -> reconcile (reconciliation ))
189
+ .compose (res -> verifyKafkaRebalanceStateAndConfigMap (context , Ready , res ))
173
190
.onSuccess (v -> {
174
- verifyKafkaRebalanceStateAndMap (context , Ready , configMapOperator );
175
191
176
192
// Test that warning condition is removed
177
193
Condition warningCondition2 = KafkaRebalanceUtils .getWarningCondition (getKafkaRebalanceStatus ());
@@ -187,23 +203,19 @@ public void testWarningConditionPropagationForUnreachableApi(VertxTestContext co
187
203
*/
188
204
@ Test
189
205
public void testWarningConditionPropagationForNonExecutingState (VertxTestContext context ) {
190
- cruiseControlServer .mockTask (ACTIVE , false );
191
-
192
- KafkaRebalance kr = createKafkaRebalance (namespace , CLUSTER_NAME , RESOURCE_NAME , EMPTY_KAFKA_REBALANCE_SPEC , true );
193
- Crds .kafkaRebalanceOperation (client ).inNamespace (namespace ).resource (kr ).create ();
194
- crdCreateKafka ();
195
- crdCreateCruiseControlSecrets ();
196
-
206
+ Checkpoint checkpoint = context .checkpoint ();
197
207
Reconciliation reconciliation = new Reconciliation ("test-trigger" , KafkaRebalance .RESOURCE_KIND , namespace , RESOURCE_NAME );
198
- ConfigMapOperator configMapOperator = this .supplier .configMapOperations ;
208
+ mockCruiseControlTask (ACTIVE , false )
209
+ .compose (res -> reconcile (reconciliation ))
210
+ .compose (res -> verifyKafkaRebalanceStateAndConfigMap (context , PendingProposal , res ))
199
211
200
- Checkpoint checkpoint = context . checkpoint ();
201
- krao . reconcile (reconciliation )
202
- .compose (v -> verifyKafkaRebalanceStateAndMap (context , PendingProposal , configMapOperator ))
203
- . compose ( v -> mockTaskAndReconcile ( COMPLETED , true , reconciliation ))
204
- .compose (v -> verifyKafkaRebalanceStateAndMap ( context , ProposalReady , configMapOperator ))
205
- .compose (v -> mockTaskAndReconcile ( COMPLETED_WITH_ERROR , true , reconciliation ))
206
- .onSuccess (v -> {
212
+ . compose ( res -> mockCruiseControlTask ( COMPLETED , true ))
213
+ . compose ( res -> reconcile (reconciliation ) )
214
+ .compose (res -> verifyKafkaRebalanceStateAndConfigMap (context , ProposalReady , res ))
215
+
216
+ .compose (res -> mockCruiseControlTask ( COMPLETED_WITH_ERROR , true ))
217
+ .compose (res -> reconcile ( reconciliation ))
218
+ .onSuccess (res -> {
207
219
assertState (context , client , namespace , RESOURCE_NAME , Rebalancing );
208
220
Condition warningCondition = KafkaRebalanceUtils .getWarningCondition (getKafkaRebalanceStatus ());
209
221
assertThat (warningCondition , notNullValue ());
@@ -217,24 +229,21 @@ public void testWarningConditionPropagationForNonExecutingState(VertxTestContext
217
229
*/
218
230
@ Test
219
231
public void testProgressFieldsOnRebalanceFailure (VertxTestContext context ) {
220
- cruiseControlServer .mockTask (COMPLETED , false );
232
+ Checkpoint checkpoint = context .checkpoint ();
233
+ Reconciliation reconciliation = new Reconciliation ("test-trigger" , KafkaRebalance .RESOURCE_KIND , namespace , RESOURCE_NAME );
234
+ mockCruiseControlTask (COMPLETED , false )
235
+ .compose (res -> reconcile (reconciliation ))
236
+ .compose (res -> verifyKafkaRebalanceStateAndConfigMap (context , ProposalReady , res ))
221
237
222
- KafkaRebalance kr = createKafkaRebalance (namespace , CLUSTER_NAME , RESOURCE_NAME , EMPTY_KAFKA_REBALANCE_SPEC , true );
223
- Crds .kafkaRebalanceOperation (client ).inNamespace (namespace ).resource (kr ).create ();
224
- crdCreateKafka ();
225
- crdCreateCruiseControlSecrets ();
238
+ .compose (res -> mockCruiseControlTask (IN_EXECUTION , false ))
239
+ .compose (res -> reconcile (reconciliation ))
240
+ .compose (res -> verifyKafkaRebalanceStateAndConfigMap (context , Rebalancing , res ))
226
241
227
- Reconciliation reconciliation = new Reconciliation ("test-trigger" , KafkaRebalance .RESOURCE_KIND , namespace , RESOURCE_NAME );
228
- ConfigMapOperator configMapOperator = this .supplier .configMapOperations ;
242
+ .compose (res -> mockCruiseControlTask (COMPLETED_WITH_ERROR , false ))
243
+ .compose (res -> reconcile (reconciliation ))
244
+ .compose (res -> verifyKafkaRebalanceStateAndConfigMap (context , NotReady , res ))
229
245
230
- Checkpoint checkpoint = context .checkpoint ();
231
- krao .reconcile (reconciliation )
232
- .compose (v -> verifyKafkaRebalanceStateAndMap (context , ProposalReady , configMapOperator ))
233
- .compose (v -> mockTaskAndReconcile (IN_EXECUTION , false , reconciliation ))
234
- .compose (v -> verifyKafkaRebalanceStateAndMap (context , Rebalancing , configMapOperator ))
235
- .compose (v -> mockTaskAndReconcile (COMPLETED_WITH_ERROR , false , reconciliation ))
236
- .compose (v -> verifyKafkaRebalanceStateAndMap (context , NotReady , configMapOperator ))
237
- .onSuccess (v -> checkpoint .flag ())
246
+ .onSuccess (res -> checkpoint .flag ())
238
247
.onFailure (context ::failNow );
239
248
}
240
249
}
0 commit comments