diff --git a/adapters/cab-converter/sql/spark-3.3.1/run/query_23.sql b/adapters/cab-converter/sql/spark-3.3.1/run/query_23_1.sql similarity index 78% rename from adapters/cab-converter/sql/spark-3.3.1/run/query_23.sql rename to adapters/cab-converter/sql/spark-3.3.1/run/query_23_1.sql index f278555b..eee99328 100644 --- a/adapters/cab-converter/sql/spark-3.3.1/run/query_23.sql +++ b/adapters/cab-converter/sql/spark-3.3.1/run/query_23_1.sql @@ -15,9 +15,3 @@ INSERT ${catalog}.${database}${stream_num}.orders WHERE ${param1} <= o_orderkey and o_orderkey < ${param2}); - -DELETE -FROM - ${catalog}.${database}${stream_num}.orders -WHERE - ${param1} <= o_orderkey and o_orderkey < ${param2} and mod(o_orderkey, 32) between ${param3} and ${param4}; diff --git a/adapters/cab-converter/sql/spark-3.3.1/run/query_23_2.sql b/adapters/cab-converter/sql/spark-3.3.1/run/query_23_2.sql new file mode 100644 index 00000000..8326e27c --- /dev/null +++ b/adapters/cab-converter/sql/spark-3.3.1/run/query_23_2.sql @@ -0,0 +1,5 @@ +DELETE +FROM + ${catalog}.${database}${stream_num}.orders +WHERE + ${param1} <= o_orderkey and o_orderkey < ${param2} and mod(o_orderkey, 32) between ${param3} and ${param4}; diff --git a/adapters/cab-converter/src/main/java/com/microsoft/lst_bench/cab_converter/Converter.java b/adapters/cab-converter/src/main/java/com/microsoft/lst_bench/cab_converter/Converter.java index 7cd42ed3..b3e3b153 100644 --- a/adapters/cab-converter/src/main/java/com/microsoft/lst_bench/cab_converter/Converter.java +++ b/adapters/cab-converter/src/main/java/com/microsoft/lst_bench/cab_converter/Converter.java @@ -33,7 +33,6 @@ import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; -import java.util.Objects; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,6 +43,8 @@ public class Converter { private static final Logger LOGGER = LoggerFactory.getLogger(Converter.class); + private static final String QUERY_23_1 = "query_23_1"; + private static final String QUERY_23_2 = "query_23_2"; private final File streamsDir; private final File outputDir; @@ -127,12 +128,21 @@ private ImmutableLibrary getLibrary() { "auxiliary" + File.separator + "parameter_values.dat"); taskTemplateList.add(buildTaskTemplateBuilder.build()); // Create a task template for each query in run - for (int i = 1; i <= 23; i++) { + for (int i = 1; i <= 22; i++) { ImmutableTaskTemplate.Builder queryTaskTemplateBuilder = ImmutableTaskTemplate.builder(); queryTaskTemplateBuilder.id("query_" + i); queryTaskTemplateBuilder.addFiles("run" + File.separator + "query_" + i + ".sql"); taskTemplateList.add(queryTaskTemplateBuilder.build()); } + // Special handling for query_23, which consists of two write queries + ImmutableTaskTemplate.Builder queryTaskTemplateBuilder = ImmutableTaskTemplate.builder(); + queryTaskTemplateBuilder.id(QUERY_23_1); + queryTaskTemplateBuilder.addFiles("run" + File.separator + QUERY_23_1 + ".sql"); + taskTemplateList.add(queryTaskTemplateBuilder.build()); + queryTaskTemplateBuilder = ImmutableTaskTemplate.builder(); + queryTaskTemplateBuilder.id(QUERY_23_2); + queryTaskTemplateBuilder.addFiles("run" + File.separator + QUERY_23_2 + ".sql"); + taskTemplateList.add(queryTaskTemplateBuilder.build()); libraryBuilder.taskTemplates(taskTemplateList); return libraryBuilder.build(); } @@ -237,13 +247,13 @@ private void createSplitReadWriteSessionsForStream( boolean hasWriteQuery = false; boolean hasReadQuery = false; for (JsonNode queryNode : queriesNode) { - ImmutableTask task = createTaskForQuery(queryNode, streamIndex); - - if (Objects.requireNonNull(task.getTemplateId()).equals("query_" + 23)) { - writeSessionBuilder.addTasks(task); // query_23 is the only write query + ImmutableTask[] tasks = createTasksForQuery(queryNode, streamIndex); + if (tasks.length == 2) { + // Only conversion producing two tasks is for query 23 + writeSessionBuilder.addTasks(tasks); // query_23 is the only write query hasWriteQuery = true; } else { - readSessionBuilder.addTasks(task); + readSessionBuilder.addTasks(tasks); hasReadQuery = true; } } @@ -268,8 +278,8 @@ private void createMixedSessionsForStream( // Create tasks for each query in the stream boolean hasQuery = false; for (JsonNode queryNode : queriesNode) { - ImmutableTask task = createTaskForQuery(queryNode, streamIndex); - sessionBuilder.addTasks(task); + ImmutableTask[] tasks = createTasksForQuery(queryNode, streamIndex); + sessionBuilder.addTasks(tasks); hasQuery = true; } @@ -281,14 +291,26 @@ private void createMixedSessionsForStream( } } - private ImmutableTask createTaskForQuery(JsonNode queryNode, int streamIndex) { + private static ImmutableTask[] createTasksForQuery(JsonNode queryNode, int streamIndex) { // Extract necessary fields from each query int queryId = queryNode.get("query_id").asInt(); long startTime = queryNode.get("start").asLong(); JsonNode argsNode = queryNode.get("arguments"); + if (queryId == 23) { + // Special handling for query 23, which consists of two write queries + ImmutableTask[] tasks = new ImmutableTask[2]; + tasks[0] = createTask(QUERY_23_1, startTime, argsNode, streamIndex); + tasks[1] = createTask(QUERY_23_2, startTime, argsNode, streamIndex); + return tasks; + } // Create a task for this query + return new ImmutableTask[] {createTask("query_" + queryId, startTime, argsNode, streamIndex)}; + } + + private static ImmutableTask createTask( + String queryIdText, long startTime, JsonNode argsNode, int streamIndex) { ImmutableTask.Builder taskBuilder = ImmutableTask.builder(); - taskBuilder.templateId("query_" + queryId); + taskBuilder.templateId(queryIdText); taskBuilder.start(startTime); // Ensure 'arguments' is an array and handle its values if (argsNode != null && argsNode.isArray()) { @@ -304,13 +326,16 @@ private ImmutableTask createTaskForQuery(JsonNode queryNode, int streamIndex) { taskBuilder.putTaskExecutorArguments("param" + (j + 1), arg.asDouble()); } else { throw new IllegalArgumentException( - "Unsupported argument type for query ID: " + queryId + " in stream: " + streamIndex); + "Unsupported argument type for query ID: " + + queryIdText + + " in stream: " + + streamIndex); } } } else { throw new IllegalArgumentException( "Arguments field missing or not an array for query ID: " - + queryId + + queryIdText + " in stream: " + streamIndex); } diff --git a/adapters/cab-converter/src/test/java/com/microsoft/lst_bench/cab_converter/DriverConverterTest.java b/adapters/cab-converter/src/test/java/com/microsoft/lst_bench/cab_converter/DriverConverterTest.java index efacbc2c..ec479ff3 100644 --- a/adapters/cab-converter/src/test/java/com/microsoft/lst_bench/cab_converter/DriverConverterTest.java +++ b/adapters/cab-converter/src/test/java/com/microsoft/lst_bench/cab_converter/DriverConverterTest.java @@ -209,7 +209,7 @@ public void testSplitPerStreamTypeConnection(@TempDir Path tempDir) throws Excep private void validateCABLibrary(Library library) { Assertions.assertNotNull(library); Assertions.assertEquals( - 25, library.getTaskTemplates().size(), "Library task templates size mismatch"); + 26, library.getTaskTemplates().size(), "Library task templates size mismatch"); Assertions.assertTrue( ObjectUtils.isEmpty(library.getPreparedTasks()), "No prepared tasks expected"); Assertions.assertTrue(