Skip to content

Commit f5bb766

Browse files
committed
Adding and fixing bugs + mock tests
Signed-off-by: Kyle Liberti <kliberti@redhat.com>
1 parent bb3a229 commit f5bb766

File tree

5 files changed

+295
-116
lines changed

5 files changed

+295
-116
lines changed

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceAssemblyOperator.java

Lines changed: 42 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,43 @@ private KafkaRebalanceStatus updateStatus(KafkaRebalance kafkaRebalance,
325325
return rebalanceOptionsBuilder;
326326
}
327327

328+
private static Future<MapAndStatus<ConfigMap, KafkaRebalanceStatus>> updateProgressFields(Reconciliation reconciliation,
329+
String host,
330+
int cruiseControlPort,
331+
CruiseControlApi apiClient,
332+
KafkaRebalance kafkaRebalance,
333+
ConfigMapOperator configMapOperator,
334+
MapAndStatus<ConfigMap, KafkaRebalanceStatus> desiredStatusAndMap) {
335+
String configMapNamespace = kafkaRebalance.getMetadata().getNamespace();
336+
String configMapName = kafkaRebalance.getMetadata().getName();
337+
338+
return configMapOperator.getAsync(configMapNamespace, configMapName)
339+
.compose(existingConfigMap -> {
340+
ConfigMap desiredConfigMap = desiredStatusAndMap.getLoadMap();
341+
KafkaRebalanceStatus desiredStatus = desiredStatusAndMap.getStatus();
342+
KafkaRebalanceState desiredState = KafkaRebalanceUtils.rebalanceState(desiredStatus);
343+
344+
if (existingConfigMap == null && desiredConfigMap == null) {
345+
return Future.succeededFuture(desiredStatusAndMap);
346+
} else if (existingConfigMap != null && desiredStatusAndMap.getLoadMap() != null) {
347+
// Ensure desiredConfigMap retains broker load information if it exists.
348+
desiredConfigMap.getData().put(BROKER_LOAD_KEY, existingConfigMap.getData().get(BROKER_LOAD_KEY));
349+
} else if (desiredConfigMap == null) {
350+
desiredStatusAndMap.setLoadMap(existingConfigMap);
351+
desiredConfigMap = existingConfigMap;
352+
}
353+
354+
// Add progress information to KafkaRebalance resource status and ConfigMap.
355+
desiredStatus.setProgress(Map.of(REBALANCE_PROGRESS_CONFIG_MAP_KEY, configMapName));
356+
KafkaRebalanceConfigMapUtils.updateRebalanceConfigMap(reconciliation, desiredState, host, cruiseControlPort, apiClient, desiredConfigMap)
357+
.onFailure(exception -> {
358+
KafkaRebalanceUtils.addWarningCondition(desiredStatus, exception);
359+
});
360+
361+
return Future.succeededFuture(desiredStatusAndMap);
362+
});
363+
}
364+
328365
private Future<KafkaRebalanceStatus> handleRebalance(Reconciliation reconciliation, String host,
329366
CruiseControlApi apiClient, KafkaRebalance kafkaRebalance,
330367
KafkaRebalanceState currentState) {
@@ -354,24 +391,7 @@ && rawRebalanceAnnotation(kafkaRebalance) == null) {
354391
AbstractRebalanceOptions.AbstractRebalanceOptionsBuilder<?, ?> rebalanceOptionsBuilder = convertRebalanceSpecToRebalanceOptions(kafkaRebalance.getSpec());
355392

356393
computeNextStatus(reconciliation, host, apiClient, kafkaRebalance, currentState, rebalanceOptionsBuilder)
357-
.compose(statusAndMap -> {
358-
// Update desired `KafkaRebalance` resource status and ConfigMap with progress fields
359-
ConfigMap configMap = statusAndMap.getLoadMap();
360-
KafkaRebalanceStatus status = statusAndMap.getStatus();
361-
KafkaRebalanceState state = KafkaRebalanceUtils.rebalanceState(status);
362-
363-
if (configMap != null) {
364-
status.setProgress(Map.of(REBALANCE_PROGRESS_CONFIG_MAP_KEY, configMap.getMetadata().getName()));
365-
366-
KafkaRebalanceConfigMapUtils.updateRebalanceConfigMap(reconciliation, state, host,
367-
cruiseControlPort, apiClient, configMap)
368-
.onFailure(exception -> {
369-
KafkaRebalanceUtils.addWarningCondition(status, exception);
370-
});
371-
}
372-
373-
return Future.succeededFuture(statusAndMap);
374-
})
394+
.compose(statusAndMap -> updateProgressFields(reconciliation, host, cruiseControlPort, apiClient, kafkaRebalance, configMapOperator, statusAndMap))
375395
.compose(desiredStatusAndMap -> {
376396
KafkaRebalanceAnnotation rebalanceAnnotation = rebalanceAnnotation(kafkaRebalance);
377397
return configMapOperator.reconcile(reconciliation, kafkaRebalance.getMetadata().getNamespace(),
@@ -630,6 +650,10 @@ public K getStatus() {
630650
return status;
631651
}
632652

653+
public void setLoadMap(T loadMap) {
654+
this.loadMap = loadMap;
655+
}
656+
633657
public void setStatus(K status) {
634658
this.status = status;
635659
}

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceConfigMapUtils.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ public class KafkaRebalanceConfigMapUtils {
7777
data.put(COMPLETED_BYTE_MOVEMENT_KEY, BYTE_MOVEMENT_COMPLETED);
7878
data.remove(EXECUTOR_STATE_KEY);
7979
break;
80+
case New:
81+
case PendingProposal:
8082
default:
8183
break;
8284
}

cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceAssemblyOperatorProgressTest.java

Lines changed: 64 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public class KafkaRebalanceAssemblyOperatorProgressTest extends AbstractKafkaReb
4444
@Test
4545
public void testProgressFieldsDuringRebalanceLifecycle(VertxTestContext context) throws IOException, URISyntaxException {
4646
cruiseControlServer.setupCCRebalanceResponse(1, CruiseControlEndpoints.REBALANCE, "true");
47-
cruiseControlServer.setupCCStateResponse(0, 1);
47+
cruiseControlServer.setupCCStateResponse(0, 1, null, 0);
4848
cruiseControlServer.setupCCUserTasksResponseNoGoals(0, 0);
4949

5050
KafkaRebalance kr = createKafkaRebalance(namespace, CLUSTER_NAME, RESOURCE_NAME, EMPTY_KAFKA_REBALANCE_SPEC, true);
@@ -60,8 +60,8 @@ public void testProgressFieldsDuringRebalanceLifecycle(VertxTestContext context)
6060
// Check resource is in Pending state.
6161
assertState(context, client, namespace, RESOURCE_NAME, KafkaRebalanceState.PendingProposal);
6262
configMapOperator.getAsync(namespace, RESOURCE_NAME)
63-
.onComplete(config -> {
64-
assertThat(config, nullValue());
63+
.onComplete(configMap -> {
64+
assertThat(configMap, nullValue());
6565
});
6666
}))
6767
.compose(v -> krao.reconcile(new Reconciliation("test-trigger", KafkaRebalance.RESOURCE_KIND, namespace, kr.getMetadata().getName())))
@@ -119,7 +119,7 @@ public void testProgressFieldsDuringRebalanceLifecycle(VertxTestContext context)
119119
public void testWarningConditionPropagation(VertxTestContext context) throws IOException, URISyntaxException {
120120
cruiseControlServer.setupCCRebalanceResponse(0, CruiseControlEndpoints.REBALANCE, "true");
121121
cruiseControlServer.setupCCUserTasksResponseNoGoals(0, 1);
122-
cruiseControlServer.setupCCStateResponse(3, 0);
122+
cruiseControlServer.setupCCStateResponse(0, 0, 1, 2);
123123

124124
KafkaRebalance kr = createKafkaRebalance(namespace, CLUSTER_NAME, RESOURCE_NAME, EMPTY_KAFKA_REBALANCE_SPEC, true);
125125
Crds.kafkaRebalanceOperation(client).inNamespace(namespace).resource(kr).create();
@@ -179,6 +179,65 @@ public void testWarningConditionPropagation(VertxTestContext context) throws IOE
179179
}));
180180
}
181181

182-
// TODO: Add test to check progress fields after rebalance failure.
182+
/**
183+
* Test to check progress fields after rebalance failure.
184+
*/
185+
@Test
186+
public void testProgressFieldsOnRebalanceFailure(VertxTestContext context) throws IOException, URISyntaxException {
187+
cruiseControlServer.setupCCRebalanceResponse(0, CruiseControlEndpoints.REBALANCE, "true");
188+
cruiseControlServer.setupCCUserTasksResponseNoGoals(0, 0, 1);
189+
cruiseControlServer.setupCCStateResponse(0, 1, null, 0);
190+
191+
KafkaRebalance kr = createKafkaRebalance(namespace, CLUSTER_NAME, RESOURCE_NAME, EMPTY_KAFKA_REBALANCE_SPEC, true);
192+
Crds.kafkaRebalanceOperation(client).inNamespace(namespace).resource(kr).create();
193+
crdCreateKafka();
194+
crdCreateCruiseControlSecrets();
195+
196+
ConfigMapOperator configMapOperator = this.supplier.configMapOperations;
197+
198+
Checkpoint checkpoint = context.checkpoint();
199+
krao.reconcile(new Reconciliation("test-trigger", KafkaRebalance.RESOURCE_KIND, namespace, kr.getMetadata().getName()))
200+
.onComplete(context.succeeding(v -> {
201+
// Check resource moved from New to ProposalReady state.
202+
assertState(context, client, namespace, RESOURCE_NAME, KafkaRebalanceState.ProposalReady);
203+
}))
204+
.compose(v -> krao.reconcile(new Reconciliation("test-trigger", KafkaRebalance.RESOURCE_KIND, namespace, RESOURCE_NAME)))
205+
.onComplete(context.succeeding(v -> {
206+
// Check resource moved from ProposalReady to Rebalancing state.
207+
assertState(context, client, namespace, RESOURCE_NAME, KafkaRebalanceState.Rebalancing);
208+
209+
KafkaRebalance kafkaRebalance = Crds.kafkaRebalanceOperation(client).inNamespace(namespace).withName(RESOURCE_NAME).get();
210+
KafkaRebalanceStatus status = kafkaRebalance.getStatus();
211+
assertThat(status.getProgress().containsKey(REBALANCE_PROGRESS_CONFIG_MAP_KEY), is(Boolean.TRUE));
212+
213+
configMapOperator.getAsync(namespace, RESOURCE_NAME)
214+
.onSuccess(configMap -> {
215+
Map<String, String> fields = configMap.getData();
216+
assertThat(fields.containsKey(ESTIMATED_TIME_TO_COMPLETION_KEY), is(Boolean.TRUE));
217+
assertThat(fields.containsKey(COMPLETED_BYTE_MOVEMENT_KEY), is(Boolean.TRUE));
218+
assertThat(fields.containsKey(EXECUTOR_STATE_KEY), is(Boolean.TRUE));
219+
assertThat(fields.containsKey(BROKER_LOAD_KEY), is(Boolean.TRUE));
220+
});
221+
222+
}))
223+
.compose(v -> krao.reconcile(new Reconciliation("test-trigger", KafkaRebalance.RESOURCE_KIND, namespace, RESOURCE_NAME)))
224+
.onComplete(context.succeeding(v -> {
225+
// Check resource moved from Rebalancing to NotReady state.
226+
assertState(context, client, namespace, RESOURCE_NAME, KafkaRebalanceState.NotReady);
227+
228+
configMapOperator.getAsync(namespace, RESOURCE_NAME)
229+
.onSuccess(configMap -> {
230+
Map<String, String> fields = configMap.getData();
231+
System.out.println(fields);
232+
assertThat(fields.containsKey(ESTIMATED_TIME_TO_COMPLETION_KEY), is(Boolean.FALSE));
233+
assertThat(fields.containsKey(COMPLETED_BYTE_MOVEMENT_KEY), is(Boolean.TRUE));
234+
assertThat(fields.containsKey(EXECUTOR_STATE_KEY), is(Boolean.TRUE));
235+
assertThat(fields.containsKey(BROKER_LOAD_KEY), is(Boolean.TRUE));
236+
});
237+
238+
checkpoint.flag();
239+
}));
240+
}
241+
183242
// TODO: Add test for when CC provides malformed executor state data
184243
}

cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/CruiseControlClientTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ private CruiseControlApi cruiseControlClientProvider() {
7777

7878
@Test
7979
public void testGetCCState() {
80-
cruiseControlServer.setupCCStateResponse(0, 0);
80+
cruiseControlServer.setupCCStateResponse(0, 0, null, 0);
8181

8282
CruiseControlApi client = cruiseControlClientProvider();
8383
client.getCruiseControlState(Reconciliation.DUMMY_RECONCILIATION, HOST, cruiseControlPort, false)

0 commit comments

Comments
 (0)