Skip to content

Commit 3b0a363

Browse files
committed
Addressing comments - pp, tc
Signed-off-by: Kyle Liberti <kliberti@redhat.com>
1 parent 95aa70a commit 3b0a363

File tree

12 files changed

+136
-119
lines changed

12 files changed

+136
-119
lines changed

cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/ExecutorStateProcessor.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import com.fasterxml.jackson.databind.JsonNode;
88

99
import java.time.Instant;
10-
import java.util.Arrays;
1110
import java.util.List;
1211
import java.util.regex.Matcher;
1312
import java.util.regex.Pattern;
@@ -90,11 +89,15 @@ public static void verifyRebalancingState(JsonNode executorJson) {
9089
}
9190

9291
private static ExecutorState fromString(String state) {
93-
String normalized = state.trim();
94-
return Arrays.stream(ExecutorState.values())
95-
.filter(e -> e.name().equals(normalized))
96-
.findFirst()
97-
.orElseThrow(() -> new IllegalArgumentException("Unknown ExecutorState: " + state));
92+
if (state == null) {
93+
throw new IllegalArgumentException("ExecutorState cannot be null");
94+
}
95+
96+
try {
97+
return ExecutorState.valueOf(state);
98+
} catch (IllegalArgumentException e) {
99+
throw new IllegalArgumentException("Unknown ExecutorState: " + state, e);
100+
}
98101
}
99102
}
100103

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceAssemblyOperator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,7 @@ private static Future<MapAndStatus<ConfigMap, KafkaRebalanceStatus>> updateProgr
358358
desiredStatus.setProgress(Map.of(REBALANCE_PROGRESS_CONFIG_MAP_KEY, configMapName));
359359
return KafkaRebalanceConfigMapUtils.updateRebalanceConfigMap(reconciliation, desiredStatus, host, cruiseControlPort, apiClient, desiredConfigMap)
360360
.recover(exception -> {
361+
LOGGER.infoCr(reconciliation, "Progress information of rebalance not updated due to the following error: {}", exception);
361362
KafkaRebalanceUtils.addWarningCondition(desiredStatus, exception);
362363
return Future.succeededFuture();
363364
})

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceConfigMapUtils.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,21 @@
2424
public class KafkaRebalanceConfigMapUtils {
2525

2626
/* test */ static final String REBALANCE_PROGRESS_CONFIG_MAP_KEY = "rebalanceProgressConfigMap";
27+
/**
28+
* The estimated time it will take in minutes until the partition rebalance is complete rounded
29+
* to the nearest minute.
30+
*/
2731
/* test */ static final String ESTIMATED_TIME_TO_COMPLETION_IN_MINUTES_KEY = "estimatedTimeToCompletionInMinutes";
32+
/**
33+
* The percentage of the byte movement of the partition rebalance that is completed as a rounded down integer
34+
* value in the range [0-100].
35+
*/
2836
/* test */ static final String COMPLETED_BYTE_MOVEMENT_PERCENTAGE_KEY = "completedByteMovementPercentage";
37+
/**
38+
* The “non-verbose” JSON payload from the /kafkacruisecontrol/state?substates=executor endpoint,
39+
* providing details about the executor's current status, including partition movement progress,
40+
* concurrency limits, and total data to move.
41+
*/
2942
/* test */ static final String EXECUTOR_STATE_KEY = "executorState.json";
3043

3144
/* test */ static final String TIME_COMPLETED = "0";
@@ -58,7 +71,6 @@ public class KafkaRebalanceConfigMapUtils {
5871

5972
int estimatedTimeToCompletion = KafkaRebalanceProgressUtils.estimateTimeToCompletionInMinutes(
6073
taskStartTime,
61-
Instant.now(),
6274
totalDataToMove,
6375
finishedDataMovement);
6476

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceProgressUtils.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ public class KafkaRebalanceProgressUtils {
1616
* Estimates the number of minutes it will take an ongoing partition rebalance to complete.
1717
*
1818
* @param taskStartTime The time when the task started.
19-
* @param currentTime The time at the moment of the method call.
2019
* @param totalDataToMoveInMB The total amount of data that needs to be moved, in megabytes.
2120
* @param finishedDataMovementInMB The amount of data that has already been moved, in megabytes.
2221
* @return The estimated time to completion in minutes.
@@ -27,15 +26,27 @@ public class KafkaRebalanceProgressUtils {
2726
* - The elapsed time between `taskStartTime` and `currentTime` is zero, making rate calculation impossible.
2827
* - The data movement rate is zero, making the time to completion estimation impossible.
2928
*/
29+
protected static int estimateTimeToCompletionInMinutes(Instant taskStartTime,
30+
int totalDataToMoveInMB,
31+
int finishedDataMovementInMB)
32+
throws IllegalArgumentException, ArithmeticException {
33+
return estimateTimeToCompletionInMinutes(
34+
taskStartTime,
35+
Instant.now(),
36+
totalDataToMoveInMB,
37+
finishedDataMovementInMB
38+
);
39+
}
40+
3041
/* test */ static int estimateTimeToCompletionInMinutes(Instant taskStartTime,
3142
Instant currentTime,
3243
int totalDataToMoveInMB,
3344
int finishedDataMovementInMB)
3445
throws IllegalArgumentException, ArithmeticException {
35-
if (taskStartTime == null || currentTime == null || totalDataToMoveInMB < 0 || finishedDataMovementInMB < 0) {
46+
if (taskStartTime == null || totalDataToMoveInMB < 0 || finishedDataMovementInMB < 0) {
3647
throw new IllegalArgumentException(
37-
String.format("Invalid value(s) provided for one of the following arguments: taskStartTime %s, currentTime %s, totalDataToMoveInMB: %d, finishedDataMovementInMB: %d.",
38-
taskStartTime, currentTime, totalDataToMoveInMB, finishedDataMovementInMB)
48+
String.format("Invalid value(s) provided for one of the following arguments: taskStartTime %s, totalDataToMoveInMB: %d, finishedDataMovementInMB: %d.",
49+
taskStartTime, totalDataToMoveInMB, finishedDataMovementInMB)
3950
);
4051
}
4152

@@ -52,9 +63,9 @@ public class KafkaRebalanceProgressUtils {
5263
throw new ArithmeticException("finishedDataMovementInMB is zero, cannot estimate time to completion.");
5364
}
5465

55-
float rateMBperMinute = ((float) finishedDataMovementInMB / timeElapsed.getSeconds()) * 60;
66+
double rateMBperMinute = ((double) finishedDataMovementInMB / timeElapsed.getSeconds()) * 60;
5667
int dataLeftToMoveMB = totalDataToMoveInMB - finishedDataMovementInMB;
57-
return Math.round(dataLeftToMoveMB / rateMBperMinute);
68+
return (int) Math.round(dataLeftToMoveMB / rateMBperMinute);
5869
}
5970

6071
/**

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceUtils.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import io.strimzi.api.kafka.model.kafka.cruisecontrol.KafkaAutoRebalanceStatusBuilder;
1515
import io.strimzi.api.kafka.model.rebalance.KafkaRebalanceState;
1616
import io.strimzi.api.kafka.model.rebalance.KafkaRebalanceStatus;
17-
import io.strimzi.operator.cluster.operator.resource.ConditionType;
1817
import io.strimzi.operator.common.model.StatusUtils;
1918

2019
import java.util.ArrayList;
@@ -179,7 +178,7 @@ public static void addWarningCondition(KafkaRebalanceStatus status, Throwable ex
179178

180179
/* test */ static Condition getWarningCondition(KafkaRebalanceStatus status) {
181180
return status.getConditions().stream()
182-
.filter(ConditionType::isWarning)
181+
.filter(condition -> condition.getType().equals("Warning"))
183182
.findFirst()
184183
.orElse(null);
185184
}

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/ConditionType.java

Lines changed: 0 additions & 41 deletions
This file was deleted.

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/CruiseControlApiImpl.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ public class CruiseControlApiImpl implements CruiseControlApi {
5050
*/
5151
public static final int HTTP_DEFAULT_IDLE_TIMEOUT_SECONDS = -1;
5252
private static final String STATUS_KEY = "Status";
53-
private static final String START_MS_KEY = "StartMs";
5453
private final long idleTimeout;
5554
private final boolean apiSslEnabled;
5655
private final HTTPHeader authHttpHeader;
@@ -364,7 +363,6 @@ public CompletableFuture<CruiseControlUserTasksResponse> getUserTaskStatus(Recon
364363
} else {
365364
JsonNode jsonUserTask = userTasks.get(0);
366365
String taskStatusStr = jsonUserTask.get(STATUS_KEY).asText();
367-
String taskStartTime = jsonUserTask.get(START_MS_KEY).asText();
368366
LOGGER.debugCr(reconciliation, "Got {} response to GET request to {} : userTaskID = {}, status = {}", response.statusCode(), path, userTaskID, taskStatusStr);
369367
// This should not be an error with a 200 status but we play it safe
370368
if (jsonUserTask.has(CC_REST_API_ERROR_KEY)) {
@@ -374,7 +372,6 @@ public CompletableFuture<CruiseControlUserTasksResponse> getUserTaskStatus(Recon
374372
}
375373

376374
statusJson.put(STATUS_KEY, taskStatusStr);
377-
statusJson.put(START_MS_KEY, taskStartTime);
378375
CruiseControlUserTaskStatus taskStatus = CruiseControlUserTaskStatus.lookup(taskStatusStr);
379376
switch (taskStatus) {
380377
case ACTIVE:

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/CruiseControlUserTasksResponse.java

Lines changed: 0 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,12 @@
55
package io.strimzi.operator.cluster.operator.resource.cruisecontrol;
66

77
import com.fasterxml.jackson.databind.JsonNode;
8-
import io.strimzi.operator.common.model.cruisecontrol.CruiseControlUserTaskStatus;
9-
10-
import java.time.Instant;
118

129
/**
1310
* Response to user tasks request
1411
*/
1512
public class CruiseControlUserTasksResponse extends CruiseControlResponse {
1613

17-
private static final String STATUS_KEY = "Status";
18-
private static final String START_MS_KEY = "StartMs";
19-
20-
private CruiseControlUserTaskStatus status;
21-
private Instant taskStartTime;
2214
private boolean isMaxActiveUserTasksReached;
2315

2416
/**
@@ -32,8 +24,6 @@ public class CruiseControlUserTasksResponse extends CruiseControlResponse {
3224
// The maximum number of active user tasks that can run concurrently has reached
3325
// Sourced from the error message that contains "reached the servlet capacity" from the Cruise Control response
3426
this.isMaxActiveUserTasksReached = false;
35-
setTaskStatus(json);
36-
setTaskStartTime(json);
3727
}
3828

3929
/**
@@ -47,41 +37,4 @@ protected void setMaxActiveUserTasksReached(boolean maxActiveUserTasksReached) {
4737
this.isMaxActiveUserTasksReached = maxActiveUserTasksReached;
4838
}
4939

50-
private void setTaskStatus(JsonNode userTaskJson) {
51-
if (userTaskJson != null && userTaskJson.has(STATUS_KEY)) {
52-
this.status = CruiseControlUserTaskStatus.lookup(userTaskJson.get("Status").asText());
53-
}
54-
}
55-
56-
/**
57-
* @return status of user task.
58-
*/
59-
public CruiseControlUserTaskStatus getTaskStatus() {
60-
return this.status;
61-
}
62-
63-
/**
64-
* Extracts the task start time from the provided `userTaskJson` JSON object.
65-
* The task start time is extracted from the "StartMs" field, which contains a
66-
* timestamp in of Unix epoch in milliseconds. The timestamp is then parsed into Instant object.
67-
*
68-
* @param userTaskJson The `JsonNode` object containing the user task json,
69-
* from which the task start time will be extracted.
70-
*/
71-
private void setTaskStartTime(JsonNode userTaskJson) {
72-
if (userTaskJson != null && userTaskJson.has(START_MS_KEY)) {
73-
// Extract the task start time as Unix epoch in milliseconds
74-
long taskStartTimeInMilliseconds = userTaskJson.get(START_MS_KEY).asLong();
75-
76-
this.taskStartTime = Instant.ofEpochMilli(taskStartTimeInMilliseconds);
77-
}
78-
}
79-
80-
/**
81-
* @return Start time of user task as an Instant object
82-
*/
83-
public Instant getTaskStartTime() {
84-
return this.taskStartTime;
85-
}
86-
8740
}

cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceConfigMapUtilsTest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,14 @@ public void testProgressFieldsForProposalRebalancingState() throws Exception {
5959
ConfigMap cm = createKafkaRebalanceConfigMap(BROKER_LOAD_MAP);
6060
updateRebalanceConfigMapWithProgressFields(KafkaRebalanceState.Rebalancing, es0, cm);
6161

62+
/**
63+
* Total Data to Move: 10,000 MB
64+
* Data Moved: 250 MB
65+
* Time Since Task Start: 2 seconds
66+
* Data Rate: 125 MB/s
67+
* Remaining Data: 9,750 MB
68+
* Estimated Time to Complete: 78 seconds
69+
*/
6270
Map<String, String> m = cm.getData();
6371
assertThat(m.get(ESTIMATED_TIME_TO_COMPLETION_IN_MINUTES_KEY), is("1"));
6472
assertThat(m.get(COMPLETED_BYTE_MOVEMENT_PERCENTAGE_KEY), is("2"));

cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceProgressUtilsTest.java

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,86 @@ public class KafkaRebalanceProgressUtilsTest {
2020
public void testEstimateTimeToCompletionInMinutes() {
2121
Instant currentTime = Instant.now();
2222

23+
/**
24+
* Total Data to Move: 1000 MB
25+
* Data Moved: 10 MB
26+
* Time Since Task Start: 1 second
27+
* Data Rate: 600 MB/s
28+
* Remaining Data: 990 MB
29+
* Estimated Time to Complete: 1.5 seconds -> 2 minutes
30+
*/
2331
assertThat(estimateTimeToCompletionInMinutes(currentTime.minusSeconds(1), currentTime, 1000, 10), is(2));
32+
/**
33+
* Total Data to Move: 1000 MB
34+
* Data Moved: 10 MB
35+
* Time Since Task Start: 60 seconds
36+
* Data Rate: 0.167 MB/s
37+
* Remaining Data: 990 MB
38+
* Estimated Time to Complete: 5940 seconds -> 99 minutes
39+
*/
2440
assertThat(estimateTimeToCompletionInMinutes(currentTime.minusSeconds(60), currentTime, 1000, 10), is(99));
41+
/**
42+
* Total Data to Move: 1000 MB
43+
* Data Moved: 500 MB
44+
* Time Since Task Start: 60 seconds
45+
* Data Rate: 8.33 MB/s
46+
* Remaining Data: 500 MB
47+
* Estimated Time to Complete: 60 seconds -> 1 minute
48+
*/
2549
assertThat(estimateTimeToCompletionInMinutes(currentTime.minusSeconds(60), currentTime, 1000, 500), is(1));
26-
50+
/**
51+
* Total Data to Move: 1,000,000 MB
52+
* Data Moved: 100 MB
53+
* Time Since Task Start: 60 seconds
54+
* Data Rate: 1.6666666666666667 MB/s
55+
* Remaining Data: 999,900 MB
56+
* Estimated Time to Complete: 599,940 seconds -> 9999 minutes
57+
*/
2758
assertThat(estimateTimeToCompletionInMinutes(currentTime.minusSeconds(60), currentTime, 1000000, 100), is(9999));
59+
/**
60+
* Total Data to Move: 1000 MB
61+
* Data Moved: 990 MB
62+
* Time Since Task Start: 60 seconds
63+
* Data Rate: 16.5 MB/s
64+
* Remaining Data: 10 MB
65+
* Estimated Time to Complete: ~0.6 seconds -> 0 minutes
66+
*/
2867
assertThat(estimateTimeToCompletionInMinutes(currentTime.minusSeconds(60), currentTime, 1000, 990), is(0));
68+
/**
69+
* Total Data to Move: 2,147,483,647 MB (Integer.MAX_VALUE)
70+
* Data Moved: 1,073,741,823.5 MB
71+
* Time Since Task Start: 1000 seconds
72+
* Data Rate: 1073741.823 MB/s
73+
* Remaining Data: 1,073,741,824.5 MB
74+
* Estimated Time to Complete: ~1000 seconds -> 17 minutes
75+
*/
2976
assertThat(estimateTimeToCompletionInMinutes(currentTime.minusSeconds(1000), currentTime, Integer.MAX_VALUE, Integer.MAX_VALUE / 2), is(17));
77+
/**
78+
* Total Data to Move: 1000 MB
79+
* Data Moved: 10 MB
80+
* Time Since Task Start: 1000 seconds
81+
* Data Rate: 0.01 MB/s
82+
* Remaining Data: 990 MB
83+
* Estimated Time to Complete: 99000 seconds -> 1650 minutes
84+
*/
3085
assertThat(estimateTimeToCompletionInMinutes(currentTime.minusSeconds(1000), currentTime, 1000, 10), is(1650));
86+
/**
87+
* Total Data to Move: 1 MB
88+
* Data Moved: 1 MB
89+
* Time Since Task Start: 1 hour (3600 seconds)
90+
* Data Rate: 0.000278 MB/s
91+
* Remaining Data: 0 MB
92+
* Estimated Time to Complete: 0 seconds -> 0 minutes
93+
*/
3194
assertThat(estimateTimeToCompletionInMinutes(currentTime.minus(1, ChronoUnit.HOURS), currentTime, 1, 1), is(0));
95+
/**
96+
* Total Data to Move: 1000 MB
97+
* Data Moved: 500 MB
98+
* Time Since Task Start: 30 days (2,592,000 seconds)
99+
* Data Rate: ~0.000193 MB/s
100+
* Remaining Data: 500 MB
101+
* Estimated Time to Complete: 2,592,000 seconds -> 43,200 minutes
102+
*/
32103
assertThat(estimateTimeToCompletionInMinutes(currentTime.minus(30, ChronoUnit.DAYS), currentTime, 1000, 500), is(43200));
33104

34105
assertThrows(IllegalArgumentException.class, () -> estimateTimeToCompletionInMinutes(currentTime.plusSeconds(1), currentTime, 1000, 10));

cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceUtilsTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -175,16 +175,16 @@ public void testUpdateStatusCondition() {
175175
// Test Warning condition can be added when there are no existing conditions
176176
KafkaRebalanceUtils.addWarningCondition(status, exception("Example error 0"));
177177
assertThat(status.getConditions().size(), is(1));
178-
assertThat(KafkaRebalanceUtils.getWarningCondition(status).getType(), is("Warning"));
178+
assertThat(KafkaRebalanceUtils.getWarningCondition(status), notNullValue());
179179

180180
// Test Warning condition addition doesn't remove existing conditions.
181181
Condition c0 = new ConditionBuilder()
182-
.withType("Rebalancing")
182+
.withType(KafkaRebalanceState.Rebalancing.toString())
183183
.build();
184184
status.setConditions(List.of(c0));
185185
KafkaRebalanceUtils.addWarningCondition(status, exception("Example error 0"));
186186
assertThat(status.getConditions().size(), is(2));
187-
assertThat(KafkaRebalanceUtils.getWarningCondition(status).getType(), is("Warning"));
187+
assertThat(KafkaRebalanceUtils.getWarningCondition(status), notNullValue());
188188

189189
// Test original Warning is kept when a new error with same reason and message is thrown.
190190
String timestamp = "2024-11-05T15:28:23.995129903Z";

0 commit comments

Comments
 (0)