Skip to content

Commit a64d3b5

Browse files
committed
Addressing some comments - tc
Signed-off-by: Kyle Liberti <kliberti@redhat.com>
1 parent 3d625ce commit a64d3b5

File tree

11 files changed

+195
-215
lines changed

11 files changed

+195
-215
lines changed

cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/ExecutorStateProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ public static LocalDateTime getTaskStartTime(JsonNode executorJson) {
149149

150150
// Validate the date format
151151
if (dateString == null || dateString.isEmpty()) {
152-
throw new IllegalArgumentException("Invalid date string.");
152+
throw new IllegalArgumentException("Date string is null or empty.");
153153
}
154154
// Parse the date-time string in ISO 8601 format
155155
OffsetDateTime offsetDateTime = OffsetDateTime.parse(dateString);

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceAssemblyOperator.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,7 @@ private static Future<MapAndStatus<ConfigMap, KafkaRebalanceStatus>> updateProgr
337337

338338
return configMapOperator.getAsync(configMapNamespace, configMapName)
339339
.compose(existingConfigMap -> {
340-
ConfigMap desiredConfigMap = desiredStatusAndMap.getLoadMap();
340+
ConfigMap desiredConfigMap = desiredStatusAndMap.getRebalanceConfigMap();
341341
KafkaRebalanceStatus desiredStatus = desiredStatusAndMap.getStatus();
342342

343343
if (existingConfigMap == null && desiredConfigMap == null) {
@@ -346,7 +346,7 @@ private static Future<MapAndStatus<ConfigMap, KafkaRebalanceStatus>> updateProgr
346346

347347
if (existingConfigMap != null) {
348348
if (desiredConfigMap == null) {
349-
desiredStatusAndMap.setLoadMap(existingConfigMap);
349+
desiredStatusAndMap.setRebalanceConfigMap(existingConfigMap);
350350
desiredConfigMap = existingConfigMap;
351351
} else {
352352
// Ensure desiredConfigMap retains broker load information if it exists.
@@ -398,7 +398,7 @@ && rawRebalanceAnnotation(kafkaRebalance) == null) {
398398
.compose(desiredStatusAndMap -> {
399399
KafkaRebalanceAnnotation rebalanceAnnotation = rebalanceAnnotation(kafkaRebalance);
400400
return configMapOperator.reconcile(reconciliation, kafkaRebalance.getMetadata().getNamespace(),
401-
kafkaRebalance.getMetadata().getName(), desiredStatusAndMap.getLoadMap())
401+
kafkaRebalance.getMetadata().getName(), desiredStatusAndMap.getRebalanceConfigMap())
402402
.onComplete(ignoredConfigMapResult -> {
403403
KafkaRebalanceStatus kafkaRebalanceStatus = updateStatus(kafkaRebalance, desiredStatusAndMap.getStatus(), null);
404404
if (kafkaRebalance.getStatus() != null
@@ -642,27 +642,27 @@ protected static JsonNode parseLoadStats(ArrayNode brokerLoadBeforeJson, ArrayNo
642642
*/
643643
static class MapAndStatus<T, K> {
644644

645-
T loadMap;
645+
T rebalanceConfigMap;
646646
K status;
647647

648-
public T getLoadMap() {
649-
return loadMap;
648+
public T getRebalanceConfigMap() {
649+
return rebalanceConfigMap;
650650
}
651651

652652
public K getStatus() {
653653
return status;
654654
}
655655

656-
public void setLoadMap(T loadMap) {
657-
this.loadMap = loadMap;
656+
public void setRebalanceConfigMap(T rebalanceConfigMap) {
657+
this.rebalanceConfigMap = rebalanceConfigMap;
658658
}
659659

660660
public void setStatus(K status) {
661661
this.status = status;
662662
}
663663

664-
public MapAndStatus(T loadMap, K status) {
665-
this.loadMap = loadMap;
664+
public MapAndStatus(T rebalanceConfigMap, K status) {
665+
this.rebalanceConfigMap = rebalanceConfigMap;
666666
this.status = status;
667667
}
668668
}
@@ -713,7 +713,7 @@ private MapAndStatus<ConfigMap, KafkaRebalanceStatus> buildRebalanceStatus(Kafka
713713
conditions.add(StatusUtils.buildRebalanceCondition(cruiseControlState.toString()));
714714
conditions.addAll(validation);
715715
MapAndStatus<ConfigMap, Map<String, Object>> optimizationProposalMapAndStatus = processOptimizationProposal(kafkaRebalance, proposalJson);
716-
return new MapAndStatus<>(optimizationProposalMapAndStatus.getLoadMap(), new KafkaRebalanceStatusBuilder()
716+
return new MapAndStatus<>(optimizationProposalMapAndStatus.getRebalanceConfigMap(), new KafkaRebalanceStatusBuilder()
717717
.withSessionId(sessionID)
718718
.withConditions(conditions)
719719
.withOptimizationResult(optimizationProposalMapAndStatus.getStatus())

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceConfigMapUtils.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import io.strimzi.operator.common.model.cruisecontrol.CruiseControlUserTaskStatus;
1616
import io.vertx.core.Future;
1717

18-
import java.time.ZonedDateTime;
18+
import java.time.Instant;
1919
import java.util.Map;
2020

2121
import static io.strimzi.operator.common.model.cruisecontrol.CruiseControlUserTaskStatus.IN_EXECUTION;
@@ -38,13 +38,13 @@ public class KafkaRebalanceConfigMapUtils {
3838
* Updates the given KafkaRebalance ConfigMap with progress fields based on the progress of the Kafka rebalance operation.
3939
*
4040
* @param state The current state of the KafkaRebalance resource (e.g., ProposalReady, Rebalancing, Stopped, etc.).
41-
* @param taskStartDate The date time the task started represented as a ZonedDateTime object
41+
* @param taskStartTime The time the task started represented as an Instant object
4242
* @param executorState The executor state information in JSON format, which is used to calculate progress fields
4343
* in the Rebalancing state.
4444
* @param configMap The ConfigMap to be updated with progress information.
4545
*/
4646
/* test */ static void updateRebalanceConfigMapWithProgressFields(KafkaRebalanceState state,
47-
ZonedDateTime taskStartDate,
47+
Instant taskStartTime,
4848
JsonNode executorState,
4949
ConfigMap configMap) {
5050
Map<String, String> data = configMap != null ? configMap.getData() : null;
@@ -60,8 +60,8 @@ public class KafkaRebalanceConfigMapUtils {
6060
int finishedDataMovement = ExecutorStateProcessor.getFinishedDataMovement(executorState);
6161

6262
int estimatedTimeToCompletion = KafkaRebalanceProgressUtils.estimateTimeToCompletionInMinutes(
63-
taskStartDate,
64-
ZonedDateTime.now(),
63+
taskStartTime,
64+
Instant.now(),
6565
totalDataToMove,
6666
finishedDataMovement);
6767

@@ -122,7 +122,7 @@ public static Future<ConfigMap> updateRebalanceConfigMap(Reconciliation reconcil
122122
}
123123

124124
CruiseControlUserTaskStatus taskStatus = cruiseControlResponse.getTaskStatus();
125-
ZonedDateTime taskStartTime = cruiseControlResponse.getTaskStartTime();
125+
Instant taskStartTime = cruiseControlResponse.getTaskStartTime();
126126
if (taskStatus == IN_EXECUTION) {
127127
return VertxUtil.completableFutureToVertxFuture(
128128
apiClient.getCruiseControlState(reconciliation,

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceProgressUtils.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
package io.strimzi.operator.cluster.operator.assembly;
66

77
import java.time.Duration;
8-
import java.time.ZonedDateTime;
8+
import java.time.Instant;
99

1010
/**
1111
* Utility class for handling progress fields of KafkaRebalance custom resource
@@ -15,8 +15,8 @@ public class KafkaRebalanceProgressUtils {
1515
/**
1616
* Estimates the number of minutes it will take an ongoing partition rebalance to complete.
1717
*
18-
* @param taskStartTime The date time when the task started.
19-
* @param currentTime The date time at the moment of the method call.
18+
* @param taskStartTime The time when the task started.
19+
* @param currentTime The time at the moment of the method call.
2020
* @param totalDataToMoveInMB The total amount of data that needs to be moved, in megabytes.
2121
* @param finishedDataMovementInMB The amount of data that has already been moved, in megabytes.
2222
* @return The estimated time to completion in minutes.
@@ -27,8 +27,8 @@ public class KafkaRebalanceProgressUtils {
2727
* - The elapsed time between `taskStartTime` and `currentTime` is zero, making rate calculation impossible.
2828
* - The data movement rate is zero, making the time to completion estimation impossible.
2929
*/
30-
/* test */ static int estimateTimeToCompletionInMinutes(ZonedDateTime taskStartTime,
31-
ZonedDateTime currentTime,
30+
/* test */ static int estimateTimeToCompletionInMinutes(Instant taskStartTime,
31+
Instant currentTime,
3232
int totalDataToMoveInMB,
3333
int finishedDataMovementInMB)
3434
throws IllegalArgumentException, ArithmeticException {
@@ -48,13 +48,13 @@ public class KafkaRebalanceProgressUtils {
4848
taskStartTime, currentTime));
4949
}
5050

51-
double rateMBperMinute = ((double) finishedDataMovementInMB / timeElapsed.getSeconds()) * 60;
5251
if (finishedDataMovementInMB == 0) {
5352
throw new ArithmeticException("finishedDataMovementInMB is zero, cannot estimate time to completion.");
5453
}
5554

55+
float rateMBperMinute = ((float) finishedDataMovementInMB / timeElapsed.getSeconds()) * 60;
5656
int dataLeftToMoveMB = totalDataToMoveInMB - finishedDataMovementInMB;
57-
return (int) (dataLeftToMoveMB / (rateMBperMinute));
57+
return Math.round(dataLeftToMoveMB / rateMBperMinute);
5858
}
5959

6060
/**

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/CruiseControlUserTasksResponse.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@
88
import io.strimzi.operator.common.model.cruisecontrol.CruiseControlUserTaskStatus;
99

1010
import java.time.Instant;
11-
import java.time.ZoneId;
12-
import java.time.ZonedDateTime;
1311

1412
/**
1513
* Response to user tasks request
@@ -20,7 +18,7 @@ public class CruiseControlUserTasksResponse extends CruiseControlResponse {
2018
private static final String START_MS_KEY = "StartMs";
2119

2220
private CruiseControlUserTaskStatus status;
23-
private ZonedDateTime taskStartTime;
21+
private Instant taskStartTime;
2422
private boolean isMaxActiveUserTasksReached;
2523

2624
/**
@@ -65,7 +63,7 @@ public CruiseControlUserTaskStatus getTaskStatus() {
6563
/**
6664
* Extracts the task start time from the provided `userTaskJson` JSON object.
6765
* The task start time is extracted from the "StartMs" field, which contains a
68-
* timestamp in of Unix epoch in milliseconds. The timestamp is then parsed into ZonedDateTime object.
66+
* timestamp in of Unix epoch in milliseconds. The timestamp is then parsed into Instant object.
6967
*
7068
* @param userTaskJson The `JsonNode` object containing the user task json,
7169
* from which the task start time will be extracted.
@@ -75,14 +73,14 @@ private void setTaskStartTime(JsonNode userTaskJson) {
7573
// Extract the task start time as Unix epoch in milliseconds
7674
long taskStartTimeInMilliseconds = userTaskJson.get(START_MS_KEY).asLong();
7775

78-
this.taskStartTime = Instant.ofEpochMilli(taskStartTimeInMilliseconds).atZone(ZoneId.systemDefault());
76+
this.taskStartTime = Instant.ofEpochMilli(taskStartTimeInMilliseconds);
7977
}
8078
}
8179

8280
/**
83-
* @return start time of user task as date-time string in ISO 8601 format.
81+
* @return Start time of user task as an Instant object
8482
*/
85-
public ZonedDateTime getTaskStartTime() {
83+
public Instant getTaskStartTime() {
8684
return this.taskStartTime;
8785
}
8886

cluster-operator/src/test/java/io/strimzi/operator/cluster/model/cruisecontrol/ExecutorStateProcessorTest.java

Lines changed: 32 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@ public class ExecutorStateProcessorTest {
2121
private static final String DEFAULT_TOTAL_DATA_TO_MOVE = "1000";
2222
private static final String DEFAULT_TRIGGERED_TASK_REASON = "No reason provided (Client: 172.17.0.1, Date: 2024-11-15T19:41:27Z)";
2323

24+
private static final String STATE_KEY = "state";
25+
private static final String FINISHED_DATA_MOVEMENT_KEY = "finishedDataMovement";
26+
private static final String TOTAL_DATA_TO_MOVE_KEY = "totalDataToMove";
27+
private static final String TRIGGERED_TASK_REASON = "triggeredTaskReason";
28+
2429
private static ObjectNode createExecutorState(Map<String, String> executorState) throws Exception {
2530
ObjectMapper objectMapper = new ObjectMapper();
2631
ObjectNode objectNode = objectMapper.createObjectNode();
@@ -32,12 +37,22 @@ private static ObjectNode createExecutorState(Map<String, String> executorState)
3237
return objectNode;
3338
}
3439

40+
private static ObjectNode createExecutorState(String finishedDataMovement, String totalDataToMove, String triggeredTaskReason) {
41+
ObjectMapper objectMapper = new ObjectMapper();
42+
ObjectNode objectNode = objectMapper.createObjectNode();
43+
objectNode.put(FINISHED_DATA_MOVEMENT_KEY, finishedDataMovement);
44+
objectNode.put(TOTAL_DATA_TO_MOVE_KEY, totalDataToMove);
45+
objectNode.put(TRIGGERED_TASK_REASON, triggeredTaskReason);
46+
return objectNode;
47+
}
48+
49+
3550
@Test
3651
public void testVerifyRebalancingState() throws Exception {
37-
JsonNode es0 = createExecutorState(Map.of("state", ExecutorState.NO_TASK_IN_PROGRESS.toString()));
52+
JsonNode es0 = createExecutorState(Map.of(STATE_KEY, ExecutorState.NO_TASK_IN_PROGRESS.toString()));
3853
assertThrows(IllegalStateException.class, () -> ExecutorState.verifyRebalancingState(es0));
3954

40-
JsonNode es1 = createExecutorState(Map.of("state", ExecutorState.INTER_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS.toString()));
55+
JsonNode es1 = createExecutorState(Map.of(STATE_KEY, ExecutorState.INTER_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS.toString()));
4156
ExecutorState.verifyRebalancingState(es1);
4257

4358
JsonNode es2 = createExecutorState(Map.of("", ""));
@@ -46,15 +61,11 @@ public void testVerifyRebalancingState() throws Exception {
4661

4762
@Test
4863
public void testGetFinishedDataMovement() throws Exception {
49-
JsonNode es0 = createExecutorState(Map.of("finishedDataMovement", "50",
50-
"totalDataToMove", DEFAULT_TOTAL_DATA_TO_MOVE,
51-
"triggeredTaskReason", DEFAULT_TRIGGERED_TASK_REASON));
64+
JsonNode es0 = createExecutorState("50", DEFAULT_TOTAL_DATA_TO_MOVE, DEFAULT_TRIGGERED_TASK_REASON);
5265
assertThat(ExecutorStateProcessor.getFinishedDataMovement(es0), is(50));
5366

5467
// Test missing field value is zero
55-
JsonNode es1 = createExecutorState(Map.of("finishedDataMovement", "",
56-
"totalDataToMove", DEFAULT_TOTAL_DATA_TO_MOVE,
57-
"triggeredTaskReason", DEFAULT_TRIGGERED_TASK_REASON));
68+
JsonNode es1 = createExecutorState("", DEFAULT_TOTAL_DATA_TO_MOVE, DEFAULT_TRIGGERED_TASK_REASON);
5869
assertThat(ExecutorStateProcessor.getFinishedDataMovement(es1), is(0));
5970

6071
// Test missing field throws NoSuchFieldException
@@ -64,15 +75,11 @@ public void testGetFinishedDataMovement() throws Exception {
6475

6576
@Test
6677
public void testGetTotalDataToMove() throws Exception {
67-
JsonNode es0 = createExecutorState(Map.of("finishedDataMovement", DEFAULT_FINISHED_DATA_MOVEMENT,
68-
"totalDataToMove", "10000",
69-
"triggeredTaskReason", DEFAULT_TRIGGERED_TASK_REASON));
78+
JsonNode es0 = createExecutorState(DEFAULT_FINISHED_DATA_MOVEMENT, "10000", DEFAULT_TRIGGERED_TASK_REASON);
7079
assertThat(ExecutorStateProcessor.getTotalDataToMove(es0), is(10000));
7180

7281
// Test missing field value is zero
73-
JsonNode es1 = createExecutorState(Map.of("finishedDataMovement", DEFAULT_FINISHED_DATA_MOVEMENT,
74-
"totalDataToMove", "",
75-
"triggeredTaskReason", DEFAULT_TRIGGERED_TASK_REASON));
82+
JsonNode es1 = createExecutorState(DEFAULT_FINISHED_DATA_MOVEMENT, "", DEFAULT_TRIGGERED_TASK_REASON);
7683
assertThat(ExecutorStateProcessor.getTotalDataToMove(es1), is(0));
7784

7885
// Test missing field throws NoSuchFieldException
@@ -82,38 +89,31 @@ public void testGetTotalDataToMove() throws Exception {
8289

8390
@Test
8491
public void testGetTaskStartTime() throws Exception {
85-
JsonNode es0 = createExecutorState(Map.of("finishedDataMovement", DEFAULT_FINISHED_DATA_MOVEMENT,
86-
"totalDataToMove", DEFAULT_TOTAL_DATA_TO_MOVE,
87-
"triggeredTaskReason", "No reason provided (Client: 172.17.0.1, Date: 2024-11-15T19:41:27Z)"));
92+
JsonNode es0 = createExecutorState(DEFAULT_FINISHED_DATA_MOVEMENT, DEFAULT_TOTAL_DATA_TO_MOVE,
93+
"No reason provided (Client: 172.17.0.1, Date: 2024-11-15T19:41:27Z)");
8894
assertThat(ExecutorStateProcessor.getTaskStartTime(es0).toString(), is("2024-11-15T19:41:27"));
8995

90-
JsonNode es1 = createExecutorState(Map.of("finishedDataMovement", DEFAULT_FINISHED_DATA_MOVEMENT,
91-
"totalDataToMove", DEFAULT_TOTAL_DATA_TO_MOVE,
92-
"triggeredTaskReason", "(Client: 172.17.0.1, Date: 2024-11-10T23:25:27Z)"));
96+
JsonNode es1 = createExecutorState(DEFAULT_FINISHED_DATA_MOVEMENT, DEFAULT_TOTAL_DATA_TO_MOVE,
97+
"(Client: 172.17.0.1, Date: 2024-11-10T23:25:27Z)");
9398
assertThat(ExecutorStateProcessor.getTaskStartTime(es1).toString(), is("2024-11-10T23:25:27"));
9499

95100
// Test missing date-string fails
96-
JsonNode es2 = createExecutorState(Map.of("finishedDataMovement", DEFAULT_FINISHED_DATA_MOVEMENT,
97-
"totalDataToMove", DEFAULT_TOTAL_DATA_TO_MOVE,
98-
"triggeredTaskReason", ""));
101+
JsonNode es2 = createExecutorState(DEFAULT_FINISHED_DATA_MOVEMENT, DEFAULT_TOTAL_DATA_TO_MOVE, "");
99102
assertThrows(IllegalArgumentException.class, () -> ExecutorStateProcessor.getTaskStartTime(es2));
100103

101104
// Test date-string in a non-UTC timezone fails
102-
JsonNode es3 = createExecutorState(Map.of("finishedDataMovement", DEFAULT_FINISHED_DATA_MOVEMENT,
103-
"totalDataToMove", DEFAULT_TOTAL_DATA_TO_MOVE,
104-
"triggeredTaskReason", "No reason provided (Client: 172.17.0.1, Date: 2024-11-15T20:41:27+01:00)"));
105+
JsonNode es3 = createExecutorState(DEFAULT_FINISHED_DATA_MOVEMENT, DEFAULT_TOTAL_DATA_TO_MOVE,
106+
"No reason provided (Client: 172.17.0.1, Date: 2024-11-15T20:41:27+01:00)");
105107
assertThrows(IllegalArgumentException.class, () -> ExecutorStateProcessor.getTaskStartTime(es3));
106108

107109
// Test malformed date-string fails
108-
JsonNode es4 = createExecutorState(Map.of("finishedDataMovement", DEFAULT_FINISHED_DATA_MOVEMENT,
109-
"totalDataToMove", DEFAULT_TOTAL_DATA_TO_MOVE,
110-
"triggeredTaskReason", "No reason provided (Client: 172.17.0.1, Date: 2024-11-15T20:41:2"));
110+
JsonNode es4 = createExecutorState(DEFAULT_FINISHED_DATA_MOVEMENT, DEFAULT_TOTAL_DATA_TO_MOVE,
111+
"No reason provided (Client: 172.17.0.1, Date: 2024-11-15T20:41:2");
111112
assertThrows(IllegalArgumentException.class, () -> ExecutorStateProcessor.getTaskStartTime(es4));
112113

113114
// Test missing date-string fails
114-
JsonNode es5 = createExecutorState(Map.of("finishedDataMovement", DEFAULT_FINISHED_DATA_MOVEMENT,
115-
"totalDataToMove", DEFAULT_TOTAL_DATA_TO_MOVE,
116-
"triggeredTaskReason", "No reason provided (Client: 172.17.0.1)"));
115+
JsonNode es5 = createExecutorState(DEFAULT_FINISHED_DATA_MOVEMENT, DEFAULT_TOTAL_DATA_TO_MOVE,
116+
"No reason provided (Client: 172.17.0.1)");
117117
assertThrows(IllegalArgumentException.class, () -> ExecutorStateProcessor.getTaskStartTime(es5));
118118

119119
// Test missing field throws NoSuchFieldException

0 commit comments

Comments
 (0)