Skip to content

Commit 042a4b6

Browse files
committed
WIP: KafkaRebalance progress tracking
Signed-off-by: Kyle Liberti <kliberti@redhat.com>
1 parent ddea7f6 commit 042a4b6

File tree

15 files changed

+898
-1
lines changed

15 files changed

+898
-1
lines changed

api/src/main/java/io/strimzi/api/kafka/model/rebalance/KafkaRebalanceStatus.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,12 @@
2525
builderPackage = Constants.FABRIC8_KUBERNETES_API
2626
)
2727
@JsonInclude(JsonInclude.Include.NON_NULL)
28-
@JsonPropertyOrder({ "conditions", "observedGeneration", "sessionId", "optimizationResult" })
28+
@JsonPropertyOrder({ "conditions", "observedGeneration", "sessionId", "progress", "optimizationResult" })
2929
@EqualsAndHashCode(callSuper = true)
3030
@ToString(callSuper = true)
3131
public class KafkaRebalanceStatus extends Status {
3232
private String sessionId;
33+
private Map<String, Object> progress = new HashMap<>(0);
3334
private Map<String, Object> optimizationResult = new HashMap<>(0);
3435

3536
@Description("A JSON object describing the optimization result")
@@ -42,6 +43,16 @@ public void setOptimizationResult(Map<String, Object> optimizationResult) {
4243
this.optimizationResult = optimizationResult;
4344
}
4445

46+
@Description("A JSON object describing the progress of the rebalance")
47+
@JsonInclude(JsonInclude.Include.NON_EMPTY)
48+
public Map<String, Object> getProgress() {
49+
return progress;
50+
}
51+
52+
public void setProgress(Map<String, Object> progress) {
53+
this.progress = progress;
54+
}
55+
4556
@Description("The session identifier for requests to Cruise Control pertaining to this KafkaRebalance resource. " +
4657
"This is used by the Kafka Rebalance operator to track the status of ongoing rebalancing operations.")
4758
@JsonInclude(JsonInclude.Include.NON_EMPTY)
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Copyright Strimzi authors.
3+
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
4+
*/
5+
package io.strimzi.operator.cluster.model.cruisecontrol;
6+
7+
import com.fasterxml.jackson.databind.JsonNode;
8+
9+
import java.time.Instant;
10+
import java.util.regex.Matcher;
11+
import java.util.regex.Pattern;
12+
13+
/**
14+
* Utility class for extracting fields from ExecutorState JSON from Cruise Control REST API.
15+
*/
16+
public class ExecutorStateProcessor {
17+
18+
private static final String FINISHED_DATA_MOVEMENT_KEY = "finishedDataMovement";
19+
private static final String TOTAL_DATA_TO_MOVE_KEY = "totalDataToMove";
20+
private static final String TRIGGERED_TASK_REASON_KEY = "triggeredTaskReason";
21+
22+
/**
23+
* Retrieves the total amount of data to move from the provided `executorState` JSON object.
24+
* The value is obtained from the "totalDataToMove" field.
25+
*
26+
* @param executorState The `JsonNode` object containing the state of the executor,
27+
* from which the total data to move is extracted.
28+
* @return The total data to move, in megabytes.
29+
* @throws NullPointerException if the "totalDataToMove" field is missing or null.
30+
*/
31+
public static Integer getTotalDataToMove(JsonNode executorState) {
32+
if (!executorState.has(TOTAL_DATA_TO_MOVE_KEY)) {
33+
throw new IllegalArgumentException(String.format("Executor State does not contain required '%s' field.", TOTAL_DATA_TO_MOVE_KEY));
34+
}
35+
return executorState.get(TOTAL_DATA_TO_MOVE_KEY).asInt();
36+
}
37+
38+
/**
39+
* Retrieves the amount of data that has already been moved from the provided `executorState` JSON object.
40+
* The value is obtained from the "finishedDataMovement" field.
41+
*
42+
* @param executorState The `JsonNode` object containing the state of the executor,
43+
* from which the finished data movement is extracted.
44+
* @return The amount of data that has already been moved, in megabytes.
45+
*/
46+
public static Integer getFinishedDataMovement(JsonNode executorState) {
47+
if (!executorState.has(FINISHED_DATA_MOVEMENT_KEY)) {
48+
throw new IllegalArgumentException(String.format("Executor State does not contain required '%s' field.", FINISHED_DATA_MOVEMENT_KEY));
49+
}
50+
return executorState.get(FINISHED_DATA_MOVEMENT_KEY).asInt();
51+
}
52+
53+
/**
54+
* Extracts the task start time from the provided `executorState` JSON object.
55+
* The task start time is extracted from the "triggeredTaskReason" field, which contains a
56+
* timestamp in ISO 8601 format. The timestamp is then parsed into seconds since the Unix epoch.
57+
*
58+
* @param executorState The `JsonNode` object containing the state of the executor,
59+
* from which the task start time will be extracted.
60+
* @return The task start time in seconds since the Unix epoch.
61+
*/
62+
public static Integer getTaskStartTime(JsonNode executorState) {
63+
if (!executorState.has(TRIGGERED_TASK_REASON_KEY)) {
64+
throw new IllegalArgumentException(String.format("Executor State does not contain required '%s' field.", TRIGGERED_TASK_REASON_KEY));
65+
}
66+
String triggeredTaskReason = executorState.get(TRIGGERED_TASK_REASON_KEY).asText();
67+
// Extract the timestamp from the string, assuming it's in ISO 8601 format
68+
String dateString = extractDateFromTriggeredTaskReason(triggeredTaskReason);
69+
return (int) parseDateToSeconds(dateString);
70+
}
71+
72+
/**
73+
* Method to parse date-time string in ISO 8601 into time in seconds since Unix epoch.
74+
*
75+
* @param dateString Date-time string in ISO 8601 format.
76+
*
77+
* @return Seconds since Unix epoch.
78+
*/
79+
private static long parseDateToSeconds(String dateString) {
80+
// Validate the date format
81+
if (dateString == null || dateString.isEmpty()) {
82+
throw new IllegalArgumentException("Invalid date string.");
83+
}
84+
// Use Instant to parse the date-time string in ISO 8601 format
85+
Instant instant = Instant.parse(dateString);
86+
return instant.getEpochSecond(); // Convert to seconds
87+
}
88+
89+
/**
90+
* Extracts the ISO 8601 date-time string from a Cruise Control task's reason string.
91+
* The date-time is expected to be in the format "%s (Client: %s, Date: %s)", where
92+
* the date follows "Date:" in UTC and is rounded to the second
93+
* (see: https://github.com/linkedin/cruise-control/blob/main/cruise-control-core/src/main/java/com/linkedin/cruisecontrol/CruiseControlUtils.java#L39-L41).
94+
*
95+
* @param triggeredTaskReason Cruise Control task's triggeredTaskReason string.
96+
*
97+
* @return Date-time string in ISO 8601 format.
98+
*/
99+
private static String extractDateFromTriggeredTaskReason(String triggeredTaskReason) {
100+
if (triggeredTaskReason == null || triggeredTaskReason.isEmpty()) {
101+
throw new IllegalArgumentException("Triggered task reason is missing.");
102+
}
103+
104+
// Regular expression pattern to match the date-time in ISO 8601 format (UTC, rounded to the second).
105+
String regex = "\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}Z";
106+
Pattern pattern = Pattern.compile(regex);
107+
Matcher matcher = pattern.matcher(triggeredTaskReason);
108+
109+
if (matcher.find()) {
110+
// Extract the date string
111+
return matcher.group();
112+
}
113+
throw new IllegalArgumentException("Date not found in triggered task reason.");
114+
}
115+
}

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceAssemblyOperator.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
import java.util.stream.Stream;
7676

7777
import static io.strimzi.api.ResourceAnnotations.ANNO_STRIMZI_IO_REBALANCE_TEMPLATE;
78+
import static io.strimzi.operator.cluster.operator.assembly.KafkaRebalanceConfigMapUtils.REBALANCE_PROGRESS_CONFIG_MAP_KEY;
7879
import static io.strimzi.operator.cluster.operator.resource.cruisecontrol.CruiseControlApiImpl.HTTP_DEFAULT_IDLE_TIMEOUT_SECONDS;
7980
import static io.strimzi.operator.common.Annotations.ANNO_STRIMZI_IO_REBALANCE;
8081
import static io.strimzi.operator.common.Annotations.ANNO_STRIMZI_IO_REBALANCE_AUTOAPPROVAL;
@@ -353,6 +354,22 @@ && rawRebalanceAnnotation(kafkaRebalance) == null) {
353354
AbstractRebalanceOptions.AbstractRebalanceOptionsBuilder<?, ?> rebalanceOptionsBuilder = convertRebalanceSpecToRebalanceOptions(kafkaRebalance.getSpec());
354355

355356
computeNextStatus(reconciliation, host, apiClient, kafkaRebalance, currentState, rebalanceOptionsBuilder)
357+
.compose(statusAndMap -> {
358+
ConfigMap configMap = statusAndMap.getLoadMap();
359+
KafkaRebalanceStatus status = statusAndMap.getStatus();
360+
KafkaRebalanceState state = KafkaRebalanceUtils.rebalanceState(status);
361+
362+
if (configMap != null) {
363+
status.setProgress(Map.of(REBALANCE_PROGRESS_CONFIG_MAP_KEY, configMap.getMetadata().getName()));
364+
}
365+
366+
KafkaRebalanceConfigMapUtils.updateRebalanceConfigMap(reconciliation, state, host, apiClient, configMap)
367+
.onFailure(exception -> {
368+
KafkaRebalanceConfigMapUtils.updateStatusCondition(status, exception);
369+
});
370+
371+
return Future.succeededFuture(statusAndMap);
372+
})
356373
.compose(desiredStatusAndMap -> {
357374
KafkaRebalanceAnnotation rebalanceAnnotation = rebalanceAnnotation(kafkaRebalance);
358375
return configMapOperator.reconcile(reconciliation, kafkaRebalance.getMetadata().getNamespace(),
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/*
2+
* Copyright Strimzi authors.
3+
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
4+
*/
5+
package io.strimzi.operator.cluster.operator.assembly;
6+
7+
import com.fasterxml.jackson.databind.JsonNode;
8+
import io.fabric8.kubernetes.api.model.ConfigMap;
9+
import io.strimzi.api.kafka.model.common.Condition;
10+
import io.strimzi.api.kafka.model.rebalance.KafkaRebalanceState;
11+
import io.strimzi.api.kafka.model.rebalance.KafkaRebalanceStatus;
12+
import io.strimzi.operator.cluster.model.CruiseControl;
13+
import io.strimzi.operator.cluster.model.cruisecontrol.ExecutorStateProcessor;
14+
import io.strimzi.operator.cluster.operator.VertxUtil;
15+
import io.strimzi.operator.cluster.operator.resource.cruisecontrol.CruiseControlApi;
16+
import io.strimzi.operator.common.Reconciliation;
17+
import io.strimzi.operator.common.model.StatusUtils;
18+
import io.vertx.core.Future;
19+
20+
import java.time.Instant;
21+
import java.util.HashMap;
22+
import java.util.List;
23+
import java.util.Map;
24+
25+
/**
26+
* Utility class for updating data in KafkaRebalance ConfigMap
27+
*/
28+
public class KafkaRebalanceConfigMapUtils {
29+
30+
/* test */ static final String REBALANCE_PROGRESS_CONFIG_MAP_KEY = "rebalanceProgressConfigMap";
31+
/* test */ static final String ESTIMATED_TIME_TO_COMPLETION_KEY = "estimatedTimeToCompletionInMinutes";
32+
/* test */ static final String COMPLETED_BYTE_MOVEMENT_KEY = "completedByteMovementPercentage";
33+
/* test */ static final String EXECUTOR_STATE_KEY = "executorState.json";
34+
35+
/* test */ static final String TIME_COMPLETED = "0";
36+
private static final String BYTE_MOVEMENT_ZERO = "0";
37+
/* test */ static final String BYTE_MOVEMENT_COMPLETED = "100";
38+
39+
/**
40+
* Updates the given KafkaRebalance ConfigMap with progress fields based on the progress of the Kafka rebalance operation.
41+
*
42+
* @param state The current state of the Kafka rebalance operation (e.g., ProposalReady, Rebalancing, Stopped, etc.).
43+
* @param executorState The executor state information in JSON format, which is used to calculate progress fields.
44+
* @param configMap The ConfigMap to be updated with progress information. If null, a new ConfigMap will be created.
45+
*/
46+
public static void updateRebalanceConfigMapWithProgressFields(KafkaRebalanceState state, JsonNode executorState, ConfigMap configMap) {
47+
if (configMap == null) {
48+
return;
49+
}
50+
51+
Map<String, String> data = configMap.getData();
52+
if (data == null) {
53+
data = new HashMap<>();
54+
configMap.setData(data);
55+
}
56+
57+
switch (state) {
58+
case ProposalReady:
59+
data.remove(ESTIMATED_TIME_TO_COMPLETION_KEY);
60+
data.put(COMPLETED_BYTE_MOVEMENT_KEY, BYTE_MOVEMENT_ZERO);
61+
data.remove(EXECUTOR_STATE_KEY);
62+
break;
63+
case Rebalancing:
64+
int taskStartTime = ExecutorStateProcessor.getTaskStartTime(executorState);
65+
int totalDataToMove = ExecutorStateProcessor.getTotalDataToMove(executorState);
66+
int finishedDataMovement = ExecutorStateProcessor.getFinishedDataMovement(executorState);
67+
68+
int estimatedTimeToCompletion = KafkaRebalanceProgressUtils.estimateTimeToCompletionInMinutes(
69+
taskStartTime,
70+
Instant.now().getEpochSecond(),
71+
totalDataToMove,
72+
finishedDataMovement);
73+
74+
int completedByteMovement = KafkaRebalanceProgressUtils.estimateCompletedByteMovementPercentage(
75+
totalDataToMove,
76+
finishedDataMovement);
77+
78+
data.put(ESTIMATED_TIME_TO_COMPLETION_KEY, String.valueOf(estimatedTimeToCompletion));
79+
data.put(COMPLETED_BYTE_MOVEMENT_KEY, String.valueOf(completedByteMovement));
80+
data.put(EXECUTOR_STATE_KEY, executorState.toString());
81+
break;
82+
case Stopped:
83+
case NotReady:
84+
data.remove(ESTIMATED_TIME_TO_COMPLETION_KEY);
85+
// Use the value of completedByteMovementPercentage from previous update.
86+
// Use the value of executorState object from previous update.
87+
break;
88+
case Ready:
89+
data.put(ESTIMATED_TIME_TO_COMPLETION_KEY, TIME_COMPLETED);
90+
data.put(COMPLETED_BYTE_MOVEMENT_KEY, BYTE_MOVEMENT_COMPLETED);
91+
data.remove(EXECUTOR_STATE_KEY);
92+
break;
93+
default:
94+
break;
95+
}
96+
}
97+
98+
/**
99+
* Updates the Kafka Rebalance {@link ConfigMap} with relevant state information depending on the current
100+
* {@link KafkaRebalanceState}.
101+
*
102+
* @param reconciliation The reconciliation context
103+
* @param state The KafkaRebalance state
104+
* @param host The host address of the Cruise Control instance
105+
* @param apiClient The API client to communicate with Cruise Control
106+
* @param configMap The desired ConfigMap
107+
*
108+
* @return A {@link Future} representing the updated ConfigMap (or null if no update was required)
109+
*/
110+
public static Future<ConfigMap> updateRebalanceConfigMap(Reconciliation reconciliation,
111+
KafkaRebalanceState state,
112+
String host,
113+
CruiseControlApi apiClient,
114+
ConfigMap configMap) {
115+
if (state == KafkaRebalanceState.Rebalancing) {
116+
return VertxUtil.completableFutureToVertxFuture(
117+
apiClient.getCruiseControlState(reconciliation,
118+
host,
119+
CruiseControl.REST_API_PORT,
120+
false))
121+
.compose(response -> {
122+
JsonNode executorState = response.getJson().get("ExecutorState");
123+
KafkaRebalanceConfigMapUtils.updateRebalanceConfigMapWithProgressFields(
124+
KafkaRebalanceState.Rebalancing, executorState, configMap);
125+
return Future.succeededFuture(configMap);
126+
});
127+
} else {
128+
updateRebalanceConfigMapWithProgressFields(state, null, configMap);
129+
return Future.succeededFuture(configMap);
130+
}
131+
}
132+
133+
/**
134+
* Updates the {@code KafkaRebalanceStatus} conditions by adding a new Warning condition
135+
* based on the provided {@link Throwable}. If a Warning condition with the same reason
136+
* and message already exists, no update is performed.
137+
*
138+
* @param status The {@link KafkaRebalanceStatus} object whose conditions will be updated.
139+
* @param exception The {@link Throwable} containing the reason and message for the Warning condition.
140+
*/
141+
public static void updateStatusCondition(KafkaRebalanceStatus status, Throwable exception) {
142+
List<Condition> conditions = status.getConditions();
143+
144+
Condition warningCondition = StatusUtils.buildWarningCondition(exception.getCause().toString(), exception.getMessage());
145+
146+
// Do not update Warning condition if it already exists with same reason and message
147+
for (Condition condition : conditions) {
148+
if (condition.getType().equals("Warning")) {
149+
if (condition.getReason().equals(warningCondition.getReason()) &&
150+
condition.getMessage().equals(warningCondition.getMessage())) {
151+
return;
152+
}
153+
}
154+
155+
}
156+
conditions.add(warningCondition);
157+
}
158+
}

0 commit comments

Comments
 (0)