Skip to content

Commit 95aa70a

Browse files
committed
Addressing some feedback - tc
Signed-off-by: Kyle Liberti <kliberti@redhat.com>
1 parent c33b9d8 commit 95aa70a

File tree

5 files changed

+101
-129
lines changed

5 files changed

+101
-129
lines changed

cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/ExecutorStateProcessor.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,7 @@
66

77
import com.fasterxml.jackson.databind.JsonNode;
88

9-
import java.time.LocalDateTime;
10-
import java.time.OffsetDateTime;
9+
import java.time.Instant;
1110
import java.util.Arrays;
1211
import java.util.List;
1312
import java.util.regex.Matcher;
@@ -21,9 +20,9 @@ public class ExecutorStateProcessor {
2120
// Regular expression pattern to match the date-time in ISO 8601 format (UTC, rounded to the second).
2221
private static final Pattern ISO_8601_UTC_TIMESTAMP_PATTERN = Pattern.compile("\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}Z");
2322

24-
private static final String FINISHED_DATA_MOVEMENT_KEY = "finishedDataMovement";
25-
private static final String TOTAL_DATA_TO_MOVE_KEY = "totalDataToMove";
26-
private static final String TRIGGERED_TASK_REASON_KEY = "triggeredTaskReason";
23+
/* test */ static final String FINISHED_DATA_MOVEMENT_KEY = "finishedDataMovement";
24+
/* test */ static final String TOTAL_DATA_TO_MOVE_KEY = "totalDataToMove";
25+
/* test */ static final String TRIGGERED_TASK_REASON_KEY = "triggeredTaskReason";
2726

2827
/**
2928
* Represents the state that the Cruise Control Executor can be in at a moment in time.
@@ -133,13 +132,16 @@ public static Integer getFinishedDataMovement(JsonNode executorJson) {
133132
/**
134133
* Extracts the task start time from the provided `executorJson` JSON object.
135134
* The task start time is extracted from the "triggeredTaskReason" field, which contains a
136-
* timestamp in ISO 8601 format. The timestamp is then parsed into LocalDateTime object.
135+
* timestamp in ISO 8601 format. The timestamp is then parsed into Instant object.
136+
*
137+
* Update this method to extract the task start time from `StartMs` field once this issue is resolved:
138+
* https://github.com/linkedin/cruise-control/issues/2271
137139
*
138140
* @param executorJson The `JsonNode` object containing the state of the executor,
139141
* from which the task start time will be extracted.
140-
* @return The task start time as a LocalDateTime object.
142+
* @return The task start time as an Instant object.
141143
*/
142-
public static LocalDateTime getTaskStartTime(JsonNode executorJson) {
144+
public static Instant getTaskStartTime(JsonNode executorJson) {
143145
if (!executorJson.has(TRIGGERED_TASK_REASON_KEY)) {
144146
throw new IllegalArgumentException(String.format("Executor State does not contain required '%s' field.", TRIGGERED_TASK_REASON_KEY));
145147
}
@@ -151,9 +153,8 @@ public static LocalDateTime getTaskStartTime(JsonNode executorJson) {
151153
if (dateString == null || dateString.isEmpty()) {
152154
throw new IllegalArgumentException("Date string is null or empty.");
153155
}
154-
// Parse the date-time string in ISO 8601 format
155-
OffsetDateTime offsetDateTime = OffsetDateTime.parse(dateString);
156-
return offsetDateTime.toLocalDateTime();
156+
157+
return Instant.parse(dateString);
157158
}
158159

159160
/**

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceConfigMapUtils.java

Lines changed: 22 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -12,22 +12,20 @@
1212
import io.strimzi.operator.cluster.operator.VertxUtil;
1313
import io.strimzi.operator.cluster.operator.resource.cruisecontrol.CruiseControlApi;
1414
import io.strimzi.operator.common.Reconciliation;
15-
import io.strimzi.operator.common.model.cruisecontrol.CruiseControlUserTaskStatus;
1615
import io.vertx.core.Future;
1716

1817
import java.time.Instant;
1918
import java.util.Map;
2019

21-
import static io.strimzi.operator.common.model.cruisecontrol.CruiseControlUserTaskStatus.IN_EXECUTION;
2220

2321
/**
2422
* Utility class for updating data in KafkaRebalance ConfigMap
2523
*/
2624
public class KafkaRebalanceConfigMapUtils {
2725

2826
/* test */ static final String REBALANCE_PROGRESS_CONFIG_MAP_KEY = "rebalanceProgressConfigMap";
29-
/* test */ static final String ESTIMATED_TIME_TO_COMPLETION_KEY = "estimatedTimeToCompletionInMinutes";
30-
/* test */ static final String COMPLETED_BYTE_MOVEMENT_KEY = "completedByteMovementPercentage";
27+
/* test */ static final String ESTIMATED_TIME_TO_COMPLETION_IN_MINUTES_KEY = "estimatedTimeToCompletionInMinutes";
28+
/* test */ static final String COMPLETED_BYTE_MOVEMENT_PERCENTAGE_KEY = "completedByteMovementPercentage";
3129
/* test */ static final String EXECUTOR_STATE_KEY = "executorState.json";
3230

3331
/* test */ static final String TIME_COMPLETED = "0";
@@ -38,24 +36,23 @@ public class KafkaRebalanceConfigMapUtils {
3836
* Updates the given KafkaRebalance ConfigMap with progress fields based on the progress of the Kafka rebalance operation.
3937
*
4038
* @param state The current state of the KafkaRebalance resource (e.g., ProposalReady, Rebalancing, Stopped, etc.).
41-
* @param taskStartTime The time the task started represented as an Instant object
4239
* @param executorState The executor state information in JSON format, which is used to calculate progress fields
4340
* in the Rebalancing state.
4441
* @param configMap The ConfigMap to be updated with progress information.
4542
*/
4643
/* test */ static void updateRebalanceConfigMapWithProgressFields(KafkaRebalanceState state,
47-
Instant taskStartTime,
4844
JsonNode executorState,
4945
ConfigMap configMap) {
5046
Map<String, String> data = configMap != null ? configMap.getData() : null;
5147

5248
switch (state) {
5349
case ProposalReady:
54-
data.remove(ESTIMATED_TIME_TO_COMPLETION_KEY);
55-
data.put(COMPLETED_BYTE_MOVEMENT_KEY, BYTE_MOVEMENT_ZERO);
50+
data.remove(ESTIMATED_TIME_TO_COMPLETION_IN_MINUTES_KEY);
51+
data.put(COMPLETED_BYTE_MOVEMENT_PERCENTAGE_KEY, BYTE_MOVEMENT_ZERO);
5652
data.remove(EXECUTOR_STATE_KEY);
5753
break;
5854
case Rebalancing:
55+
Instant taskStartTime = ExecutorStateProcessor.getTaskStartTime(executorState);
5956
int totalDataToMove = ExecutorStateProcessor.getTotalDataToMove(executorState);
6057
int finishedDataMovement = ExecutorStateProcessor.getFinishedDataMovement(executorState);
6158

@@ -69,19 +66,19 @@ public class KafkaRebalanceConfigMapUtils {
6966
totalDataToMove,
7067
finishedDataMovement);
7168

72-
data.put(ESTIMATED_TIME_TO_COMPLETION_KEY, String.valueOf(estimatedTimeToCompletion));
73-
data.put(COMPLETED_BYTE_MOVEMENT_KEY, String.valueOf(completedByteMovement));
69+
data.put(ESTIMATED_TIME_TO_COMPLETION_IN_MINUTES_KEY, String.valueOf(estimatedTimeToCompletion));
70+
data.put(COMPLETED_BYTE_MOVEMENT_PERCENTAGE_KEY, String.valueOf(completedByteMovement));
7471
data.put(EXECUTOR_STATE_KEY, executorState.toString());
7572
break;
7673
case Stopped:
7774
case NotReady:
78-
data.remove(ESTIMATED_TIME_TO_COMPLETION_KEY);
75+
data.remove(ESTIMATED_TIME_TO_COMPLETION_IN_MINUTES_KEY);
7976
// Use the value of completedByteMovementPercentage from previous update.
8077
// Use the value of executorState object from previous update.
8178
break;
8279
case Ready:
83-
data.put(ESTIMATED_TIME_TO_COMPLETION_KEY, TIME_COMPLETED);
84-
data.put(COMPLETED_BYTE_MOVEMENT_KEY, BYTE_MOVEMENT_COMPLETED);
80+
data.put(ESTIMATED_TIME_TO_COMPLETION_IN_MINUTES_KEY, TIME_COMPLETED);
81+
data.put(COMPLETED_BYTE_MOVEMENT_PERCENTAGE_KEY, BYTE_MOVEMENT_COMPLETED);
8582
data.remove(EXECUTOR_STATE_KEY);
8683
break;
8784
case New:
@@ -113,36 +110,20 @@ public static Future<ConfigMap> updateRebalanceConfigMap(Reconciliation reconcil
113110

114111
KafkaRebalanceState state = KafkaRebalanceUtils.rebalanceState(status);
115112
if (state == KafkaRebalanceState.Rebalancing) {
116-
return VertxUtil.completableFutureToVertxFuture(apiClient.getUserTaskStatus(reconciliation, host, port, status.getSessionId()))
117-
.compose(cruiseControlResponse -> {
118-
119-
if (cruiseControlResponse.isMaxActiveUserTasksReached()) {
120-
throw new IllegalStateException("The maximum number of active user tasks that Cruise Control can run concurrently has been reached, therefore will retry getting user tasks in the next reconciliation. " +
121-
"If this occurs often, consider increasing the value for max.active.user.tasks in the Cruise Control configuration.");
122-
}
123-
124-
CruiseControlUserTaskStatus taskStatus = cruiseControlResponse.getTaskStatus();
125-
Instant taskStartTime = cruiseControlResponse.getTaskStartTime();
126-
if (taskStatus == IN_EXECUTION) {
127-
return VertxUtil.completableFutureToVertxFuture(
128-
apiClient.getCruiseControlState(reconciliation,
129-
host,
130-
port,
131-
false))
132-
.compose(response -> {
133-
JsonNode executorState = response.getJson().get("ExecutorState");
134-
ExecutorStateProcessor.ExecutorState.verifyRebalancingState(executorState);
135-
updateRebalanceConfigMapWithProgressFields(state, taskStartTime, executorState, configMap);
136-
return Future.succeededFuture(configMap);
137-
});
138-
} else {
139-
throw new IllegalStateException(
140-
String.format("Cruise Control user task is in `%s` state and was started on %s, needs execution data before progress fields can be provided",
141-
taskStatus, taskStartTime));
142-
}
113+
return VertxUtil.completableFutureToVertxFuture(
114+
apiClient.getCruiseControlState(reconciliation,
115+
host,
116+
port,
117+
false))
118+
.compose(response -> {
119+
JsonNode executorState = response.getJson().get("ExecutorState");
120+
ExecutorStateProcessor.ExecutorState.verifyRebalancingState(executorState);
121+
updateRebalanceConfigMapWithProgressFields(state, executorState, configMap);
122+
return Future.succeededFuture(configMap);
143123
});
124+
144125
} else {
145-
updateRebalanceConfigMapWithProgressFields(state, null, null, configMap);
126+
updateRebalanceConfigMapWithProgressFields(state, null, configMap);
146127
return Future.succeededFuture(configMap);
147128
}
148129
}

cluster-operator/src/test/java/io/strimzi/operator/cluster/model/cruisecontrol/ExecutorStateProcessorTest.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
import java.util.Map;
1313

1414
import static io.strimzi.operator.cluster.model.cruisecontrol.ExecutorStateProcessor.ExecutorState;
15+
import static io.strimzi.operator.cluster.model.cruisecontrol.ExecutorStateProcessor.FINISHED_DATA_MOVEMENT_KEY;
16+
import static io.strimzi.operator.cluster.model.cruisecontrol.ExecutorStateProcessor.TOTAL_DATA_TO_MOVE_KEY;
17+
import static io.strimzi.operator.cluster.model.cruisecontrol.ExecutorStateProcessor.TRIGGERED_TASK_REASON_KEY;
1518
import static org.hamcrest.CoreMatchers.is;
1619
import static org.hamcrest.MatcherAssert.assertThat;
1720
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -22,9 +25,6 @@ public class ExecutorStateProcessorTest {
2225
private static final String DEFAULT_TRIGGERED_TASK_REASON = "No reason provided (Client: 172.17.0.1, Date: 2024-11-15T19:41:27Z)";
2326

2427
private static final String STATE_KEY = "state";
25-
private static final String FINISHED_DATA_MOVEMENT_KEY = "finishedDataMovement";
26-
private static final String TOTAL_DATA_TO_MOVE_KEY = "totalDataToMove";
27-
private static final String TRIGGERED_TASK_REASON = "triggeredTaskReason";
2828

2929
private static ObjectNode createExecutorState(Map<String, String> executorState) throws Exception {
3030
ObjectMapper objectMapper = new ObjectMapper();
@@ -37,16 +37,21 @@ private static ObjectNode createExecutorState(Map<String, String> executorState)
3737
return objectNode;
3838
}
3939

40-
private static ObjectNode createExecutorState(String finishedDataMovement, String totalDataToMove, String triggeredTaskReason) {
40+
public static ObjectNode createExecutorState(String finishedDataMovement, String totalDataToMove, String triggeredTaskReason) {
4141
ObjectMapper objectMapper = new ObjectMapper();
4242
ObjectNode objectNode = objectMapper.createObjectNode();
43-
objectNode.put(FINISHED_DATA_MOVEMENT_KEY, finishedDataMovement);
44-
objectNode.put(TOTAL_DATA_TO_MOVE_KEY, totalDataToMove);
45-
objectNode.put(TRIGGERED_TASK_REASON, triggeredTaskReason);
43+
if (finishedDataMovement != null) {
44+
objectNode.put(FINISHED_DATA_MOVEMENT_KEY, finishedDataMovement);
45+
}
46+
if (totalDataToMove != null) {
47+
objectNode.put(TOTAL_DATA_TO_MOVE_KEY, totalDataToMove);
48+
}
49+
if (triggeredTaskReason != null) {
50+
objectNode.put(TRIGGERED_TASK_REASON_KEY, triggeredTaskReason);
51+
}
4652
return objectNode;
4753
}
4854

49-
5055
@Test
5156
public void testVerifyRebalancingState() throws Exception {
5257
JsonNode es0 = createExecutorState(Map.of(STATE_KEY, ExecutorState.NO_TASK_IN_PROGRESS.toString()));
@@ -91,11 +96,11 @@ public void testGetTotalDataToMove() throws Exception {
9196
public void testGetTaskStartTime() throws Exception {
9297
JsonNode es0 = createExecutorState(DEFAULT_FINISHED_DATA_MOVEMENT, DEFAULT_TOTAL_DATA_TO_MOVE,
9398
"No reason provided (Client: 172.17.0.1, Date: 2024-11-15T19:41:27Z)");
94-
assertThat(ExecutorStateProcessor.getTaskStartTime(es0).toString(), is("2024-11-15T19:41:27"));
99+
assertThat(ExecutorStateProcessor.getTaskStartTime(es0).toString(), is("2024-11-15T19:41:27Z"));
95100

96101
JsonNode es1 = createExecutorState(DEFAULT_FINISHED_DATA_MOVEMENT, DEFAULT_TOTAL_DATA_TO_MOVE,
97102
"(Client: 172.17.0.1, Date: 2024-11-10T23:25:27Z)");
98-
assertThat(ExecutorStateProcessor.getTaskStartTime(es1).toString(), is("2024-11-10T23:25:27"));
103+
assertThat(ExecutorStateProcessor.getTaskStartTime(es1).toString(), is("2024-11-10T23:25:27Z"));
99104

100105
// Test missing date-string fails
101106
JsonNode es2 = createExecutorState(DEFAULT_FINISHED_DATA_MOVEMENT, DEFAULT_TOTAL_DATA_TO_MOVE, "");

cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceAssemblyOperatorProgressTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@
3030
import static io.strimzi.api.kafka.model.rebalance.KafkaRebalanceState.Ready;
3131
import static io.strimzi.api.kafka.model.rebalance.KafkaRebalanceState.Rebalancing;
3232
import static io.strimzi.operator.cluster.operator.assembly.KafkaRebalanceAssemblyOperator.BROKER_LOAD_KEY;
33-
import static io.strimzi.operator.cluster.operator.assembly.KafkaRebalanceConfigMapUtils.COMPLETED_BYTE_MOVEMENT_KEY;
34-
import static io.strimzi.operator.cluster.operator.assembly.KafkaRebalanceConfigMapUtils.ESTIMATED_TIME_TO_COMPLETION_KEY;
33+
import static io.strimzi.operator.cluster.operator.assembly.KafkaRebalanceConfigMapUtils.COMPLETED_BYTE_MOVEMENT_PERCENTAGE_KEY;
34+
import static io.strimzi.operator.cluster.operator.assembly.KafkaRebalanceConfigMapUtils.ESTIMATED_TIME_TO_COMPLETION_IN_MINUTES_KEY;
3535
import static io.strimzi.operator.cluster.operator.assembly.KafkaRebalanceConfigMapUtils.EXECUTOR_STATE_KEY;
3636
import static io.strimzi.operator.cluster.operator.assembly.KafkaRebalanceConfigMapUtils.REBALANCE_PROGRESS_CONFIG_MAP_KEY;
3737
import static io.strimzi.operator.common.model.cruisecontrol.CruiseControlUserTaskStatus.ACTIVE;
@@ -73,8 +73,8 @@ void assertConfigMapFields(ConfigMap configMap) {
7373
if (configMapExpected) {
7474
assertThat(configMap, notNullValue());
7575
Map<String, String> fields = configMap.getData();
76-
assertThat(fields.containsKey(ESTIMATED_TIME_TO_COMPLETION_KEY), is(containsEstimatedTimeToCompletion));
77-
assertThat(fields.containsKey(COMPLETED_BYTE_MOVEMENT_KEY), is(containsCompletedByteMovement));
76+
assertThat(fields.containsKey(ESTIMATED_TIME_TO_COMPLETION_IN_MINUTES_KEY), is(containsEstimatedTimeToCompletion));
77+
assertThat(fields.containsKey(COMPLETED_BYTE_MOVEMENT_PERCENTAGE_KEY), is(containsCompletedByteMovement));
7878
assertThat(fields.containsKey(EXECUTOR_STATE_KEY), is(containsExecutorState));
7979
assertThat(fields.containsKey(BROKER_LOAD_KEY), is(containsBrokerLoad));
8080
} else {

0 commit comments

Comments
 (0)