Skip to content

Commit 2ec3dc2

Browse files
committed
Adding progress tracking for Cruise Control rebalances
Signed-off-by: Kyle Liberti <kliberti@redhat.com>
1 parent 1017db7 commit 2ec3dc2

File tree

32 files changed

+1934
-371
lines changed

32 files changed

+1934
-371
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
## 0.47.0
44

5+
* Adding progress tracking for Cruise Control rebalances
56
* Add support for Kafka 3.9.1
67
* Fixed MirrorMaker 2 client rack init container override being ignored.
78

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright Strimzi authors.
3+
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
4+
*/
5+
package io.strimzi.api.kafka.model.rebalance;
6+
7+
import com.fasterxml.jackson.annotation.JsonInclude;
8+
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
9+
import io.strimzi.api.kafka.model.common.Constants;
10+
import io.strimzi.api.kafka.model.common.UnknownPropertyPreserving;
11+
import io.strimzi.crdgenerator.annotations.Description;
12+
import io.sundr.builder.annotations.Buildable;
13+
import lombok.EqualsAndHashCode;
14+
import lombok.ToString;
15+
16+
import java.util.HashMap;
17+
import java.util.Map;
18+
19+
/**
20+
* Configures the progress section of the status of a `KafkaRebalance` resource with progress information
21+
* related to a partition rebalance.
22+
*/
23+
@Buildable(
24+
editableEnabled = false,
25+
builderPackage = Constants.FABRIC8_KUBERNETES_API
26+
)
27+
@JsonPropertyOrder({"rebalanceProgressConfigMap"})
28+
@JsonInclude(JsonInclude.Include.NON_NULL)
29+
@EqualsAndHashCode
30+
@ToString
31+
public class KafkaRebalanceProgress implements UnknownPropertyPreserving {
32+
33+
private String rebalanceProgressConfigMap;
34+
private Map<String, Object> additionalProperties;
35+
36+
@Description("The name of the `ConfigMap` containing information related to the progress of a partition rebalance.")
37+
@JsonInclude(JsonInclude.Include.NON_NULL)
38+
public String getRebalanceProgressConfigMap() {
39+
return rebalanceProgressConfigMap;
40+
}
41+
42+
public void setRebalanceProgressConfigMap(String rebalanceProgressConfigMap) {
43+
this.rebalanceProgressConfigMap = rebalanceProgressConfigMap;
44+
}
45+
46+
@Override
47+
public Map<String, Object> getAdditionalProperties() {
48+
return this.additionalProperties != null ? this.additionalProperties : Map.of();
49+
}
50+
51+
@Override
52+
public void setAdditionalProperty(String name, Object value) {
53+
if (this.additionalProperties == null) {
54+
this.additionalProperties = new HashMap<>(2);
55+
}
56+
this.additionalProperties.put(name, value);
57+
}
58+
}
59+

api/src/main/java/io/strimzi/api/kafka/model/rebalance/KafkaRebalanceStatus.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,20 @@
1717
import java.util.Map;
1818

1919
/**
20-
* Represents a status of the Kafka Rebalance resource
20+
* Represents a status of the Kafka Rebalance resource.
2121
*/
2222
@Buildable(
2323
editableEnabled = false,
2424
generateBuilderPackage = false,
2525
builderPackage = Constants.FABRIC8_KUBERNETES_API
2626
)
2727
@JsonInclude(JsonInclude.Include.NON_NULL)
28-
@JsonPropertyOrder({ "conditions", "observedGeneration", "sessionId", "optimizationResult" })
28+
@JsonPropertyOrder({ "conditions", "observedGeneration", "sessionId", "progress", "optimizationResult" })
2929
@EqualsAndHashCode(callSuper = true)
3030
@ToString(callSuper = true)
3131
public class KafkaRebalanceStatus extends Status {
3232
private String sessionId;
33+
private KafkaRebalanceProgress progress;
3334
private Map<String, Object> optimizationResult = new HashMap<>(0);
3435

3536
@Description("A JSON object describing the optimization result")
@@ -42,6 +43,16 @@ public void setOptimizationResult(Map<String, Object> optimizationResult) {
4243
this.optimizationResult = optimizationResult;
4344
}
4445

46+
@Description("An object with information related to the progress of the rebalance.")
47+
@JsonInclude(JsonInclude.Include.NON_EMPTY)
48+
public KafkaRebalanceProgress getProgress() {
49+
return progress;
50+
}
51+
52+
public void setProgress(KafkaRebalanceProgress progress) {
53+
this.progress = progress;
54+
}
55+
4556
@Description("The session identifier for requests to Cruise Control pertaining to this KafkaRebalance resource. " +
4657
"This is used by the Kafka Rebalance operator to track the status of ongoing rebalancing operations.")
4758
@JsonInclude(JsonInclude.Include.NON_EMPTY)

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceAssemblyOperator.java

Lines changed: 66 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import io.strimzi.api.kafka.model.rebalance.KafkaRebalanceBuilder;
2626
import io.strimzi.api.kafka.model.rebalance.KafkaRebalanceList;
2727
import io.strimzi.api.kafka.model.rebalance.KafkaRebalanceMode;
28+
import io.strimzi.api.kafka.model.rebalance.KafkaRebalanceProgress;
2829
import io.strimzi.api.kafka.model.rebalance.KafkaRebalanceSpec;
2930
import io.strimzi.api.kafka.model.rebalance.KafkaRebalanceState;
3031
import io.strimzi.api.kafka.model.rebalance.KafkaRebalanceStatus;
@@ -324,6 +325,59 @@ private KafkaRebalanceStatus updateStatus(KafkaRebalance kafkaRebalance,
324325
return rebalanceOptionsBuilder;
325326
}
326327

328+
private static Future<MapAndStatus<ConfigMap, KafkaRebalanceStatus>> updateProgressFields(
329+
Reconciliation reconciliation,
330+
String host,
331+
int cruiseControlPort,
332+
CruiseControlApi apiClient,
333+
KafkaRebalance kafkaRebalance,
334+
ConfigMapOperator configMapOperator,
335+
MapAndStatus<ConfigMap, KafkaRebalanceStatus> desiredStatusAndMap) {
336+
String configMapNamespace = kafkaRebalance.getMetadata().getNamespace();
337+
String configMapName = kafkaRebalance.getMetadata().getName();
338+
339+
return configMapOperator.getAsync(configMapNamespace, configMapName)
340+
.compose(existingConfigMap -> {
341+
ConfigMap desiredConfigMap = desiredStatusAndMap.getRebalanceConfigMap();
342+
KafkaRebalanceStatus desiredStatus = desiredStatusAndMap.getStatus();
343+
344+
if (existingConfigMap == null && desiredConfigMap == null) {
345+
return Future.succeededFuture(desiredStatusAndMap);
346+
}
347+
348+
if (existingConfigMap != null) {
349+
if (desiredConfigMap == null) {
350+
desiredStatusAndMap.setRebalanceConfigMap(existingConfigMap);
351+
desiredConfigMap = existingConfigMap;
352+
} else {
353+
// Ensure desiredConfigMap retains broker load information if it exists.
354+
desiredConfigMap.getData().put(BROKER_LOAD_KEY, existingConfigMap.getData().get(BROKER_LOAD_KEY));
355+
}
356+
}
357+
358+
// Add progress information to `KafkaRebalance` resource status and ConfigMap.
359+
KafkaRebalanceProgress progress = new KafkaRebalanceProgress();
360+
progress.setRebalanceProgressConfigMap(configMapName);
361+
desiredStatus.setProgress(progress);
362+
363+
Promise<MapAndStatus<ConfigMap, KafkaRebalanceStatus>> promise = Promise.promise();
364+
KafkaRebalanceConfigMapUtils.updateRebalanceConfigMap(
365+
reconciliation, desiredStatus, host, cruiseControlPort, apiClient, desiredConfigMap)
366+
.onFailure(exception -> {
367+
exception = new Exception(
368+
String.format("Progress update of rebalance skipped due to the following reason: %s",
369+
exception.getMessage()), exception);
370+
LOGGER.infoCr(reconciliation, exception.getMessage());
371+
KafkaRebalanceUtils.addWarningCondition(desiredStatus, exception);
372+
promise.complete(desiredStatusAndMap);
373+
})
374+
.onSuccess(ignored -> {
375+
promise.complete(desiredStatusAndMap);
376+
});
377+
return promise.future();
378+
});
379+
}
380+
327381
private Future<KafkaRebalanceStatus> handleRebalance(Reconciliation reconciliation, String host,
328382
CruiseControlApi apiClient, KafkaRebalance kafkaRebalance,
329383
KafkaRebalanceState currentState) {
@@ -353,10 +407,11 @@ && rawRebalanceAnnotation(kafkaRebalance) == null) {
353407
AbstractRebalanceOptions.AbstractRebalanceOptionsBuilder<?, ?> rebalanceOptionsBuilder = convertRebalanceSpecToRebalanceOptions(kafkaRebalance.getSpec());
354408

355409
computeNextStatus(reconciliation, host, apiClient, kafkaRebalance, currentState, rebalanceOptionsBuilder)
410+
.compose(statusAndMap -> updateProgressFields(reconciliation, host, cruiseControlPort, apiClient, kafkaRebalance, configMapOperator, statusAndMap))
356411
.compose(desiredStatusAndMap -> {
357412
KafkaRebalanceAnnotation rebalanceAnnotation = rebalanceAnnotation(kafkaRebalance);
358413
return configMapOperator.reconcile(reconciliation, kafkaRebalance.getMetadata().getNamespace(),
359-
kafkaRebalance.getMetadata().getName(), desiredStatusAndMap.getLoadMap())
414+
kafkaRebalance.getMetadata().getName(), desiredStatusAndMap.getRebalanceConfigMap())
360415
.onComplete(ignoredConfigMapResult -> {
361416
KafkaRebalanceStatus kafkaRebalanceStatus = updateStatus(kafkaRebalance, desiredStatusAndMap.getStatus(), null);
362417
if (kafkaRebalance.getStatus() != null
@@ -600,23 +655,27 @@ protected static JsonNode parseLoadStats(ArrayNode brokerLoadBeforeJson, ArrayNo
600655
*/
601656
static class MapAndStatus<T, K> {
602657

603-
T loadMap;
658+
T rebalanceConfigMap;
604659
K status;
605660

606-
public T getLoadMap() {
607-
return loadMap;
661+
public T getRebalanceConfigMap() {
662+
return rebalanceConfigMap;
608663
}
609664

610665
public K getStatus() {
611666
return status;
612667
}
613668

669+
public void setRebalanceConfigMap(T rebalanceConfigMap) {
670+
this.rebalanceConfigMap = rebalanceConfigMap;
671+
}
672+
614673
public void setStatus(K status) {
615674
this.status = status;
616675
}
617676

618-
public MapAndStatus(T loadMap, K status) {
619-
this.loadMap = loadMap;
677+
public MapAndStatus(T rebalanceConfigMap, K status) {
678+
this.rebalanceConfigMap = rebalanceConfigMap;
620679
this.status = status;
621680
}
622681
}
@@ -667,7 +726,7 @@ private MapAndStatus<ConfigMap, KafkaRebalanceStatus> buildRebalanceStatus(Kafka
667726
conditions.add(StatusUtils.buildRebalanceCondition(cruiseControlState.toString()));
668727
conditions.addAll(validation);
669728
MapAndStatus<ConfigMap, Map<String, Object>> optimizationProposalMapAndStatus = processOptimizationProposal(kafkaRebalance, proposalJson);
670-
return new MapAndStatus<>(optimizationProposalMapAndStatus.getLoadMap(), new KafkaRebalanceStatusBuilder()
729+
return new MapAndStatus<>(optimizationProposalMapAndStatus.getRebalanceConfigMap(), new KafkaRebalanceStatusBuilder()
671730
.withSessionId(sessionID)
672731
.withConditions(conditions)
673732
.withOptimizationResult(optimizationProposalMapAndStatus.getStatus())
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
* Copyright Strimzi authors.
3+
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
4+
*/
5+
package io.strimzi.operator.cluster.operator.assembly;
6+
7+
import io.fabric8.kubernetes.api.model.ConfigMap;
8+
import io.strimzi.api.kafka.model.rebalance.KafkaRebalanceState;
9+
import io.strimzi.api.kafka.model.rebalance.KafkaRebalanceStatus;
10+
import io.strimzi.operator.cluster.operator.VertxUtil;
11+
import io.strimzi.operator.cluster.operator.resource.cruisecontrol.CruiseControlApi;
12+
import io.strimzi.operator.cluster.operator.resource.cruisecontrol.ExecutorStatus;
13+
import io.strimzi.operator.common.Reconciliation;
14+
import io.vertx.core.Future;
15+
16+
import java.time.Instant;
17+
import java.util.Map;
18+
19+
20+
/**
21+
* Utility class for updating data in KafkaRebalance ConfigMap.
22+
*/
23+
public class KafkaRebalanceConfigMapUtils {
24+
/**
25+
* The estimated time it will take in minutes until the partition rebalance is complete rounded
26+
* to the nearest minute.
27+
*/
28+
/* test */ static final String ESTIMATED_TIME_TO_COMPLETION_IN_MINUTES_KEY = "estimatedTimeToCompletionInMinutes";
29+
/**
30+
* The percentage of the byte movement of the partition rebalance that is completed as a rounded down integer
31+
* value in the range [0-100].
32+
*/
33+
/* test */ static final String COMPLETED_BYTE_MOVEMENT_PERCENTAGE_KEY = "completedByteMovementPercentage";
34+
/**
35+
* The “non-verbose” JSON payload from the "/kafkacruisecontrol/state?substates=executor" endpoint,
36+
* providing details about the executor's current status, including partition movement progress,
37+
* concurrency limits, and total data to move.
38+
*/
39+
/* test */ static final String EXECUTOR_STATE_KEY = "executorState.json";
40+
41+
/* test */ static final String TIME_COMPLETED = "0";
42+
/* test */ static final String BYTE_MOVEMENT_ZERO = "0";
43+
/* test */ static final String BYTE_MOVEMENT_COMPLETED = "100";
44+
45+
/**
46+
* Updates the given KafkaRebalance ConfigMap with progress fields based on the progress of the Kafka rebalance operation.
47+
*
48+
* @param state The current state of the KafkaRebalance resource (e.g., ProposalReady, Rebalancing, Stopped, etc.).
49+
* @param executorStatus The executor status information of executing task, which is used to calculate progress fields.
50+
* in the Rebalancing state.
51+
* @param configMap The ConfigMap to be updated with progress information.
52+
*/
53+
/* test */ static void updateRebalanceConfigMapWithProgressFields(KafkaRebalanceState state,
54+
ExecutorStatus executorStatus,
55+
ConfigMap configMap) {
56+
if (configMap == null || configMap.getData() == null) {
57+
return;
58+
}
59+
Map<String, String> data = configMap.getData();
60+
61+
switch (state) {
62+
case ProposalReady:
63+
data.remove(ESTIMATED_TIME_TO_COMPLETION_IN_MINUTES_KEY);
64+
data.put(COMPLETED_BYTE_MOVEMENT_PERCENTAGE_KEY, BYTE_MOVEMENT_ZERO);
65+
data.remove(EXECUTOR_STATE_KEY);
66+
break;
67+
case Rebalancing:
68+
Instant taskStartTime = executorStatus.getTaskStartTime();
69+
int totalDataToMove = executorStatus.getTotalDataToMove();
70+
int finishedDataMovement = executorStatus.getFinishedDataMovement();
71+
72+
int estimatedTimeToCompletion = KafkaRebalanceProgressUtils.estimateTimeToCompletionInMinutes(
73+
taskStartTime,
74+
totalDataToMove,
75+
finishedDataMovement);
76+
77+
int completedByteMovement = KafkaRebalanceProgressUtils.estimateCompletedByteMovementPercentage(
78+
totalDataToMove,
79+
finishedDataMovement);
80+
81+
data.put(ESTIMATED_TIME_TO_COMPLETION_IN_MINUTES_KEY, String.valueOf(estimatedTimeToCompletion));
82+
data.put(COMPLETED_BYTE_MOVEMENT_PERCENTAGE_KEY, String.valueOf(completedByteMovement));
83+
data.put(EXECUTOR_STATE_KEY, executorStatus.getJson().toString());
84+
break;
85+
case Stopped:
86+
case NotReady:
87+
data.remove(ESTIMATED_TIME_TO_COMPLETION_IN_MINUTES_KEY);
88+
// Use the value of completedByteMovementPercentage from previous update.
89+
// Use the value of executorStateJson object from previous update.
90+
break;
91+
case Ready:
92+
data.put(ESTIMATED_TIME_TO_COMPLETION_IN_MINUTES_KEY, TIME_COMPLETED);
93+
data.put(COMPLETED_BYTE_MOVEMENT_PERCENTAGE_KEY, BYTE_MOVEMENT_COMPLETED);
94+
data.remove(EXECUTOR_STATE_KEY);
95+
break;
96+
case New:
97+
case PendingProposal:
98+
default:
99+
data.remove(ESTIMATED_TIME_TO_COMPLETION_IN_MINUTES_KEY);
100+
data.remove(COMPLETED_BYTE_MOVEMENT_PERCENTAGE_KEY);
101+
data.remove(EXECUTOR_STATE_KEY);
102+
break;
103+
}
104+
}
105+
106+
/**
107+
* Updates the KafkaRebalance {@link ConfigMap} with relevant state information depending on the current
108+
* {@link KafkaRebalanceState}.
109+
*
110+
* @param reconciliation The reconciliation context.
111+
* @param status The KafkaRebalance status.
112+
* @param host The host address of the Cruise Control instance.
113+
* @param port The port of the Cruise Control instance.
114+
* @param apiClient The API client to communicate with Cruise Control.
115+
* @param configMap The desired ConfigMap.
116+
* @return A {@link Future} representing the updated ConfigMap (or null if no update was required).
117+
*/
118+
public static Future<ConfigMap> updateRebalanceConfigMap(Reconciliation reconciliation,
119+
KafkaRebalanceStatus status,
120+
String host,
121+
int port,
122+
CruiseControlApi apiClient,
123+
ConfigMap configMap) {
124+
KafkaRebalanceState state = KafkaRebalanceUtils.rebalanceState(status);
125+
if (state == KafkaRebalanceState.Rebalancing) {
126+
return VertxUtil.completableFutureToVertxFuture(
127+
apiClient.getCruiseControlState(reconciliation, host, port, false))
128+
.compose(response -> {
129+
ExecutorStatus executorStatus = response.getExecutorStatus();
130+
if (executorStatus.isInProgressState()) {
131+
// We can only estimate the rate of partition movement once some data has been moved.
132+
if (executorStatus.getFinishedDataMovement() > 0) {
133+
updateRebalanceConfigMapWithProgressFields(state, executorStatus, configMap);
134+
return Future.succeededFuture(configMap);
135+
}
136+
}
137+
throw new IllegalStateException(
138+
String.format("Partition movement information unavailable; executor is in '%s' state, " +
139+
"progress estimation will be updated on next reconciliation.", executorStatus.getState()));
140+
});
141+
} else {
142+
updateRebalanceConfigMapWithProgressFields(state, null, configMap);
143+
return Future.succeededFuture(configMap);
144+
}
145+
}
146+
}

0 commit comments

Comments
 (0)