Skip to content

Commit

Permalink
fix when we specify task executor arguments from experiment config
Browse files Browse the repository at this point in the history
  • Loading branch information
jcamachor committed Sep 25, 2024
1 parent f458a21 commit 5d6ba5e
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ private static TaskExec createTaskExec(
// tasks.
TaskExecutorArguments taskExecutorArguments =
new TaskExecutorArguments(experimentConfig.getTaskExecutorArguments());
taskExecutorArguments.addArguments(task.getTaskExecutorArguments());
taskExecutorArguments = taskExecutorArguments.addArguments(task.getTaskExecutorArguments());

return ImmutableTaskExec.of(taskId, files)
.withTimeTravelPhaseId(task.getTimeTravelPhaseId())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.microsoft.lst_bench.util.StringUtils;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -136,7 +137,7 @@ protected final QueryResult executeStatement(
return queryResult;
}

private boolean containsException(String message, String[] exceptionStrings) {
private boolean containsException(String message, List<String> exceptionStrings) {
for (String exception : exceptionStrings) {
if (message.contains(exception)) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,33 @@
package com.microsoft.lst_bench.task.util;

import com.microsoft.lst_bench.util.TaskExecutorArgumentsParser;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class TaskExecutorArguments {

private String[] retryExceptionStrings;
private String[] skipExceptionStrings;
private Map<String, Object> arguments;
private final List<String> retryExceptionStrings;
private final List<String> skipExceptionStrings;
private final Map<String, Object> arguments;

public TaskExecutorArguments(Map<String, Object> arguments) {
this.retryExceptionStrings = TaskExecutorArgumentsParser.parseRetryExceptionStrings(arguments);
this.skipExceptionStrings = TaskExecutorArgumentsParser.parseSkipExceptionStrings(arguments);
this.arguments = arguments;
this.retryExceptionStrings =
Collections.unmodifiableList(
TaskExecutorArgumentsParser.parseRetryExceptionStrings(arguments));
this.skipExceptionStrings =
Collections.unmodifiableList(
TaskExecutorArgumentsParser.parseSkipExceptionStrings(arguments));
this.arguments = arguments != null ? Collections.unmodifiableMap(arguments) : Map.of();
}

public String[] getRetryExceptionStrings() {
public List<String> getRetryExceptionStrings() {
return this.retryExceptionStrings;
}

public String[] getSkipExceptionStrings() {
public List<String> getSkipExceptionStrings() {
return this.skipExceptionStrings;
}

Expand All @@ -44,27 +51,19 @@ public Map<String, Object> getArguments() {
}

// Added arguments are automatically appended if possible.
public void addArguments(Map<String, Object> arguments) {
public TaskExecutorArguments addArguments(Map<String, Object> arguments) {
if (arguments == null) {
return;
} else if (this.arguments == null) {
this.arguments = arguments;
} else {
this.arguments.putAll(arguments);
return this; // If no new arguments, return the same instance
}

this.retryExceptionStrings =
Stream.of(
this.getRetryExceptionStrings(),
TaskExecutorArgumentsParser.parseRetryExceptionStrings(arguments))
.flatMap(Stream::of)
.toArray(String[]::new);

this.skipExceptionStrings =
Stream.of(
this.getSkipExceptionStrings(),
TaskExecutorArgumentsParser.parseSkipExceptionStrings(arguments))
.flatMap(Stream::of)
.toArray(String[]::new);
// Create a new map with combined arguments (existing + new)
Map<String, Object> combinedArguments =
Stream.concat(this.arguments.entrySet().stream(), arguments.entrySet().stream())
.collect(
Collectors.toUnmodifiableMap(
Map.Entry::getKey,
Map.Entry::getValue,
(existing, replacement) -> replacement));
// Return a new instance with the updated state
return new TaskExecutorArguments(combinedArguments);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.microsoft.lst_bench.util;

import java.util.List;
import java.util.Map;

public class TaskExecutorArgumentsParser {
Expand All @@ -26,41 +27,34 @@ public class TaskExecutorArgumentsParser {
private static final int DEFAULT_BATCH_SIZE = 1;
private static final String DEPENDENT_TASK_BATCH_SIZE = "dependent_task_batch_size";

public static String[] parseSkipExceptionStrings(Map<String, Object> arguments) {
public static List<String> parseSkipExceptionStrings(Map<String, Object> arguments) {
// Check whether there are any strings that errors are allowed to contain. In that case, we skip
// the erroneous query and log a warning.
String[] exceptionStrings;
if (arguments == null || arguments.get(SKIP_ERRONEOUS_QUERY_STRINGS_KEY) == null) {
exceptionStrings = new String[] {};
} else {
exceptionStrings =
arguments
.get(SKIP_ERRONEOUS_QUERY_STRINGS_KEY)
.toString()
.split(SKIP_ERRONEOUS_QUERY_DELIMITER);
}
return exceptionStrings;
return parseExceptionStrings(
arguments, SKIP_ERRONEOUS_QUERY_STRINGS_KEY, SKIP_ERRONEOUS_QUERY_DELIMITER);
}

public static String[] parseRetryExceptionStrings(Map<String, Object> arguments) {
public static List<String> parseRetryExceptionStrings(Map<String, Object> arguments) {
// Check whether there are any strings that tell us that we should continue to retry this query
// until successful.
String[] exceptionStrings;
if (arguments == null || arguments.get(RETRY_ERRONEOUS_QUERY_STRINGS_KEY) == null) {
exceptionStrings = new String[] {};
return parseExceptionStrings(
arguments, RETRY_ERRONEOUS_QUERY_STRINGS_KEY, RETRY_ERRONEOUS_QUERY_DELIMITER);
}

private static List<String> parseExceptionStrings(
Map<String, Object> arguments, String key, String delimiter) {
List<String> exceptionStrings;
if (arguments == null || arguments.get(key) == null) {
exceptionStrings = List.of();
} else {
exceptionStrings =
arguments
.get(RETRY_ERRONEOUS_QUERY_STRINGS_KEY)
.toString()
.split(RETRY_ERRONEOUS_QUERY_DELIMITER);
exceptionStrings = List.of(arguments.get(key).toString().split(delimiter));
}
return exceptionStrings;
}

public static Integer parseBatchSize(Map<String, Object> arguments) {
// Parses the batch size, currently used for dependent task execution.
Integer batchSize = null;
Integer batchSize;
if (arguments == null || arguments.get(DEPENDENT_TASK_BATCH_SIZE) == null) {
batchSize = DEFAULT_BATCH_SIZE;
} else {
Expand Down

0 comments on commit 5d6ba5e

Please sign in to comment.