Skip to content

Commit 50954e0

Browse files
committed
WIP: KafkaRebalance progress tracking
Signed-off-by: Kyle Liberti <kliberti@redhat.com>
1 parent ddea7f6 commit 50954e0

File tree

9 files changed

+777
-1
lines changed

9 files changed

+777
-1
lines changed

api/src/main/java/io/strimzi/api/kafka/model/rebalance/KafkaRebalanceStatus.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,12 @@
2525
builderPackage = Constants.FABRIC8_KUBERNETES_API
2626
)
2727
@JsonInclude(JsonInclude.Include.NON_NULL)
28-
@JsonPropertyOrder({ "conditions", "observedGeneration", "sessionId", "optimizationResult" })
28+
@JsonPropertyOrder({ "conditions", "observedGeneration", "sessionId", "progress", "optimizationResult" })
2929
@EqualsAndHashCode(callSuper = true)
3030
@ToString(callSuper = true)
3131
public class KafkaRebalanceStatus extends Status {
3232
private String sessionId;
33+
private Map<String, Object> progress = new HashMap<>(0);
3334
private Map<String, Object> optimizationResult = new HashMap<>(0);
3435

3536
@Description("A JSON object describing the optimization result")
@@ -42,6 +43,16 @@ public void setOptimizationResult(Map<String, Object> optimizationResult) {
4243
this.optimizationResult = optimizationResult;
4344
}
4445

46+
@Description("A JSON object describing the progress of the rebalance")
47+
@JsonInclude(JsonInclude.Include.NON_EMPTY)
48+
public Map<String, Object> getProgress() {
49+
return progress;
50+
}
51+
52+
public void setProgress(Map<String, Object> progress) {
53+
this.progress = progress;
54+
}
55+
4556
@Description("The session identifier for requests to Cruise Control pertaining to this KafkaRebalance resource. " +
4657
"This is used by the Kafka Rebalance operator to track the status of ongoing rebalancing operations.")
4758
@JsonInclude(JsonInclude.Include.NON_EMPTY)
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Copyright Strimzi authors.
3+
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
4+
*/
5+
package io.strimzi.operator.cluster.model.cruisecontrol;
6+
7+
import com.fasterxml.jackson.databind.JsonNode;
8+
9+
import java.time.Instant;
10+
import java.util.regex.Matcher;
11+
import java.util.regex.Pattern;
12+
13+
/**
14+
* Utility class for extracting fields from ExecutorState JSON from Cruise Control REST API.
15+
*/
16+
public class ExecutorStateProcessor {
17+
18+
private static final String FINISHED_DATA_MOVEMENT_KEY = "finishedDataMovement";
19+
private static final String TOTAL_DATA_TO_MOVE_KEY = "totalDataToMove";
20+
private static final String TRIGGERED_TASK_REASON_KEY = "triggeredTaskReason";
21+
22+
/**
23+
* Retrieves the total amount of data to move from the provided `executorState` JSON object.
24+
* The value is obtained from the "totalDataToMove" field.
25+
*
26+
* @param executorState The `JsonNode` object containing the state of the executor,
27+
* from which the total data to move is extracted.
28+
* @return The total data to move, in megabytes.
29+
* @throws NullPointerException if the "totalDataToMove" field is missing or null.
30+
*/
31+
public static Integer getTotalDataToMove(JsonNode executorState) {
32+
if (!executorState.has(TOTAL_DATA_TO_MOVE_KEY)) {
33+
throw new IllegalArgumentException(String.format("Executor State does not contain required '%s' field.", TOTAL_DATA_TO_MOVE_KEY));
34+
}
35+
return executorState.get(TOTAL_DATA_TO_MOVE_KEY).asInt();
36+
}
37+
38+
/**
39+
* Retrieves the amount of data that has already been moved from the provided `executorState` JSON object.
40+
* The value is obtained from the "finishedDataMovement" field.
41+
*
42+
* @param executorState The `JsonNode` object containing the state of the executor,
43+
* from which the finished data movement is extracted.
44+
* @return The amount of data that has already been moved, in megabytes.
45+
*/
46+
public static Integer getFinishedDataMovement(JsonNode executorState) {
47+
if (!executorState.has(FINISHED_DATA_MOVEMENT_KEY)) {
48+
throw new IllegalArgumentException(String.format("Executor State does not contain required '%s' field.", FINISHED_DATA_MOVEMENT_KEY));
49+
}
50+
return executorState.get(FINISHED_DATA_MOVEMENT_KEY).asInt();
51+
}
52+
53+
/**
54+
* Extracts the task start time from the provided `executorState` JSON object.
55+
* The task start time is extracted from the "triggeredTaskReason" field, which contains a
56+
* timestamp in ISO 8601 format. The timestamp is then parsed into seconds since the Unix epoch.
57+
*
58+
* @param executorState The `JsonNode` object containing the state of the executor,
59+
* from which the task start time will be extracted.
60+
* @return The task start time in seconds since the Unix epoch.
61+
*/
62+
public static Integer getTaskStartTime(JsonNode executorState) {
63+
if (!executorState.has(TRIGGERED_TASK_REASON_KEY)) {
64+
throw new IllegalArgumentException(String.format("Executor State does not contain required '%s' field.", TRIGGERED_TASK_REASON_KEY));
65+
}
66+
String triggeredTaskReason = executorState.get(TRIGGERED_TASK_REASON_KEY).asText();
67+
// Extract the timestamp from the string, assuming it's in ISO 8601 format
68+
String dateString = extractDateFromTriggeredTaskReason(triggeredTaskReason);
69+
return (int) parseDateToSeconds(dateString);
70+
}
71+
72+
/**
73+
* Method to parse date-time string in ISO 8601 into time in seconds since Unix epoch.
74+
*
75+
* @param dateString Date-time string in ISO 8601 format.
76+
*
77+
* @return Seconds since Unix epoch.
78+
*/
79+
private static long parseDateToSeconds(String dateString) {
80+
// Validate the date format
81+
if (dateString == null || dateString.isEmpty()) {
82+
throw new IllegalArgumentException("Invalid date string.");
83+
}
84+
// Use Instant to parse the date-time string in ISO 8601 format
85+
Instant instant = Instant.parse(dateString);
86+
return instant.getEpochSecond(); // Convert to seconds
87+
}
88+
89+
/**
90+
* Extracts the ISO 8601 date-time string from a Cruise Control task's reason string.
91+
* The date-time is expected to be in the format "%s (Client: %s, Date: %s)", where
92+
* the date follows "Date:" in UTC and is rounded to the second
93+
* (see: https://github.com/linkedin/cruise-control/blob/main/cruise-control-core/src/main/java/com/linkedin/cruisecontrol/CruiseControlUtils.java#L39-L41).
94+
*
95+
* @param triggeredTaskReason Cruise Control task's triggeredTaskReason string.
96+
*
97+
* @return Date-time string in ISO 8601 format.
98+
*/
99+
private static String extractDateFromTriggeredTaskReason(String triggeredTaskReason) {
100+
if (triggeredTaskReason == null || triggeredTaskReason.isEmpty()) {
101+
throw new IllegalArgumentException("Triggered task reason is missing.");
102+
}
103+
104+
// Regular expression pattern to match the date-time in ISO 8601 format (UTC, rounded to the second).
105+
String regex = "\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}Z";
106+
Pattern pattern = Pattern.compile(regex);
107+
Matcher matcher = pattern.matcher(triggeredTaskReason);
108+
109+
if (matcher.find()) {
110+
// Extract the date string
111+
return matcher.group();
112+
}
113+
throw new IllegalArgumentException("Date not found in triggered task reason.");
114+
}
115+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
/*
2+
* Copyright Strimzi authors.
3+
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
4+
*/
5+
package io.strimzi.operator.cluster.operator.assembly;
6+
7+
import com.fasterxml.jackson.databind.JsonNode;
8+
import io.fabric8.kubernetes.api.model.ConfigMap;
9+
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
10+
import io.strimzi.api.kafka.model.rebalance.KafkaRebalance;
11+
import io.strimzi.api.kafka.model.rebalance.KafkaRebalanceState;
12+
import io.strimzi.operator.cluster.model.ModelUtils;
13+
import io.strimzi.operator.cluster.model.cruisecontrol.ExecutorStateProcessor;
14+
import io.strimzi.operator.cluster.operator.resource.kubernetes.ConfigMapOperator;
15+
import io.strimzi.operator.common.Reconciliation;
16+
import io.strimzi.operator.common.operator.resource.ReconcileResult;
17+
import io.vertx.core.Future;
18+
19+
import java.time.Instant;
20+
import java.util.Collections;
21+
import java.util.HashMap;
22+
import java.util.Map;
23+
24+
/**
25+
* Utility class for updating data in KafkaRebalance ConfigMap
26+
*/
27+
public class KafkaRebalanceConfigMapUtils {
28+
29+
/* test */ static final String ESTIMATED_TIME_TO_COMPLETION_KEY = "estimatedTimeToCompletionInMinutes";
30+
/* test */ static final String COMPLETED_BYTE_MOVEMENT_KEY = "completedByteMovementPercentage";
31+
/* test */ static final String EXECUTOR_STATE_KEY = "executorState.json";
32+
/* test */ static final String BROKER_LOAD_KEY = "brokerLoad.json";
33+
34+
/* test */ static final String TIME_COMPLETED = "0";
35+
private static final String BYTE_MOVEMENT_ZERO = "0";
36+
/* test */ static final String BYTE_MOVEMENT_COMPLETED = "100";
37+
38+
private static ConfigMap createConfigMap(KafkaRebalance kafkaRebalance) {
39+
return new ConfigMapBuilder()
40+
.withNewMetadata()
41+
.withNamespace(kafkaRebalance.getMetadata().getNamespace())
42+
.withName(kafkaRebalance.getMetadata().getName())
43+
.withLabels(Collections.singletonMap("app", "strimzi"))
44+
.withOwnerReferences(ModelUtils.createOwnerReference(kafkaRebalance, false))
45+
.endMetadata()
46+
.build();
47+
}
48+
49+
/**
50+
* Updates the given KafkaRebalance ConfigMap with the broker load field.
51+
*
52+
* @param beforeAndAfterBrokerLoad The broker load information in JSON format, which is used to populate the broker load field.
53+
* @param configMap The ConfigMap to be updated with progress information.
54+
*/
55+
public static void updateRebalanceConfigMapWithLoadField(JsonNode beforeAndAfterBrokerLoad, ConfigMap configMap) {
56+
Map<String, String> map = configMap.getData();
57+
map.put(BROKER_LOAD_KEY, beforeAndAfterBrokerLoad.toString());
58+
}
59+
60+
/**
61+
* Updates the given KafkaRebalance ConfigMap with progress fields based on the progress of the Kafka rebalance operation.
62+
*
63+
* @param state The current state of the Kafka rebalance operation (e.g., ProposalReady, Rebalancing, Stopped, etc.).
64+
* @param executorState The executor state information in JSON format, which is used to calculate progress fields.
65+
* @param configMap The ConfigMap to be updated with progress information. If null, a new ConfigMap will be created.
66+
*/
67+
public static void updateRebalanceConfigMapWithProgressFields(KafkaRebalanceState state, JsonNode executorState, ConfigMap configMap) {
68+
if (configMap == null) {
69+
configMap = new ConfigMap();
70+
}
71+
72+
Map<String, String> data = configMap.getData();
73+
if (data == null) {
74+
data = new HashMap<>();
75+
configMap.setData(data);
76+
}
77+
78+
switch (state) {
79+
case ProposalReady:
80+
data.remove(ESTIMATED_TIME_TO_COMPLETION_KEY);
81+
data.put(COMPLETED_BYTE_MOVEMENT_KEY, BYTE_MOVEMENT_ZERO);
82+
data.remove(EXECUTOR_STATE_KEY);
83+
break;
84+
case Rebalancing:
85+
try {
86+
int taskStartTime = ExecutorStateProcessor.getTaskStartTime(executorState);
87+
int totalDataToMove = ExecutorStateProcessor.getTotalDataToMove(executorState);
88+
int finishedDataMovement = ExecutorStateProcessor.getFinishedDataMovement(executorState);
89+
90+
int estimatedTimeToCompletion = KafkaRebalanceProgressUtils.estimateTimeToCompletionInMinutes(
91+
taskStartTime,
92+
Instant.now().getEpochSecond(),
93+
totalDataToMove,
94+
finishedDataMovement);
95+
96+
int completedByteMovement = KafkaRebalanceProgressUtils.estimateCompletedByteMovementPercentage(
97+
totalDataToMove,
98+
finishedDataMovement);
99+
100+
data.put(ESTIMATED_TIME_TO_COMPLETION_KEY, String.valueOf(estimatedTimeToCompletion));
101+
data.put(COMPLETED_BYTE_MOVEMENT_KEY, String.valueOf(completedByteMovement));
102+
data.put(EXECUTOR_STATE_KEY, executorState.toString());
103+
} catch (IllegalArgumentException | ArithmeticException e) {
104+
// TODO: Have this caught by caller, logged and put into the `KafkaRebalance` status
105+
throw new RuntimeException(e);
106+
}
107+
break;
108+
case Stopped:
109+
case NotReady:
110+
data.remove(ESTIMATED_TIME_TO_COMPLETION_KEY);
111+
// Use the value of completedByteMovementPercentage from previous update.
112+
// Use the value of executorState object from previous update.
113+
break;
114+
case Ready:
115+
data.put(ESTIMATED_TIME_TO_COMPLETION_KEY, TIME_COMPLETED);
116+
data.put(COMPLETED_BYTE_MOVEMENT_KEY, BYTE_MOVEMENT_COMPLETED);
117+
data.remove(EXECUTOR_STATE_KEY);
118+
break;
119+
default:
120+
break;
121+
}
122+
}
123+
124+
static Future<ConfigMap> updateRebalanceConfigMap(Reconciliation reconciliation,
125+
ConfigMapOperator configMapOperator,
126+
KafkaRebalance kafkaRebalance,
127+
KafkaRebalanceState state,
128+
JsonNode executorState,
129+
JsonNode beforeAndAfterBrokerLoad) {
130+
131+
Future<ConfigMap> oldKafkaRebalanceConfigMap;
132+
Future<ConfigMap> newKafkaRebalanceConfigMap;
133+
134+
ConfigMap kafkaRebalanceConfigMap = createConfigMap(kafkaRebalance);
135+
String namespace = kafkaRebalance.getMetadata().getNamespace();
136+
String name = kafkaRebalance.getMetadata().getName();
137+
138+
switch (state) {
139+
case New:
140+
case PendingProposal:
141+
newKafkaRebalanceConfigMap = Future.succeededFuture(null);
142+
break;
143+
case ProposalReady:
144+
updateRebalanceConfigMapWithLoadField(beforeAndAfterBrokerLoad, kafkaRebalanceConfigMap);
145+
updateRebalanceConfigMapWithProgressFields(state, null, kafkaRebalanceConfigMap);
146+
newKafkaRebalanceConfigMap = Future.succeededFuture(kafkaRebalanceConfigMap);
147+
break;
148+
case Rebalancing:
149+
oldKafkaRebalanceConfigMap = configMapOperator.getAsync(namespace, name);
150+
newKafkaRebalanceConfigMap = oldKafkaRebalanceConfigMap.flatMap(configMap -> {
151+
updateRebalanceConfigMapWithProgressFields(state, executorState, configMap);
152+
return Future.succeededFuture(configMap);
153+
});
154+
break;
155+
case Stopped:
156+
case Ready:
157+
case NotReady:
158+
oldKafkaRebalanceConfigMap = configMapOperator.getAsync(namespace, name);
159+
newKafkaRebalanceConfigMap = oldKafkaRebalanceConfigMap.flatMap(configMap -> {
160+
updateRebalanceConfigMapWithProgressFields(state, null, configMap);
161+
return Future.succeededFuture(configMap);
162+
});
163+
break;
164+
case ReconciliationPaused:
165+
newKafkaRebalanceConfigMap = Future.succeededFuture();
166+
break;
167+
168+
default:
169+
newKafkaRebalanceConfigMap = Future.succeededFuture(null);
170+
break;
171+
}
172+
173+
return newKafkaRebalanceConfigMap;
174+
}
175+
176+
static Future<ReconcileResult<ConfigMap>> reconcileKafkaRebalanceConfigMap(Reconciliation reconciliation,
177+
ConfigMapOperator configMapOperator,
178+
KafkaRebalance kafkaRebalance,
179+
KafkaRebalanceState state,
180+
JsonNode executorState,
181+
JsonNode beforeAndAfterBrokerLoad) {
182+
String namespace = kafkaRebalance.getMetadata().getNamespace();
183+
String name = kafkaRebalance.getMetadata().getName();
184+
Future<ConfigMap> configMapFuture = updateRebalanceConfigMap(reconciliation, configMapOperator, kafkaRebalance, state, executorState, beforeAndAfterBrokerLoad);
185+
return configMapFuture.flatMap(configMap -> {
186+
return configMapOperator.reconcile(reconciliation, namespace, name, configMap);
187+
});
188+
}
189+
190+
}

0 commit comments

Comments
 (0)