Skip to content

Commit

Permalink
Fix for dependent tasks that use multiple columns (#253)
Browse files Browse the repository at this point in the history
  • Loading branch information
jcamachor authored Mar 11, 2024
1 parent b3c1dc6 commit f1ede40
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 40 deletions.
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
DELETE FROM ${catalog}.${database}.catalog_sales
WHERE cs_item_sk IN (${cs_item_sk}) AND cs_order_number IN (${cs_order_number});
WHERE (cs_item_sk, cs_order_number) IN (${multi_values_clause});
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
DELETE FROM ${catalog}.${database}.inventory
WHERE inv_date_sk IN (${inv_date_sk}) AND inv_item_sk IN (${inv_item_sk}) AND inv_warehouse_sk IN (${inv_warehouse_sk});
WHERE (inv_date_sk, inv_item_sk, inv_warehouse_sk) IN (${multi_values_clause});
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
DELETE FROM ${catalog}.${database}.store_returns
WHERE sr_item_sk IN (${sr_item_sk}) AND sr_ticket_number IN (${sr_ticket_number});
WHERE (sr_item_sk, sr_ticket_number) IN (${multi_values_clause});
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
DELETE FROM ${catalog}.${database}.store_sales
WHERE ss_item_sk IN (${ss_item_sk}) AND ss_ticket_number IN (${ss_ticket_number});
WHERE (ss_item_sk, ss_ticket_number) IN (${multi_values_clause});
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
DELETE FROM ${catalog}.${database}.web_returns
WHERE wr_item_sk IN (${wr_item_sk}) AND wr_order_number IN (${wr_order_number});
WHERE (wr_item_sk, wr_order_number) IN (${multi_values_clause});
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
DELETE FROM ${catalog}.${database}.web_sales
WHERE ws_item_sk IN (${ws_item_sk}) AND ws_order_number IN (${ws_order_number});
WHERE (ws_item_sk, ws_order_number) IN (${multi_values_clause});
91 changes: 58 additions & 33 deletions src/main/java/com/microsoft/lst_bench/client/QueryResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,33 @@
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;

/**
* Represents the query result of a query issued against a source. Query result entries should be
* mapped to column name -> list of column values.
* Represents the result of a query issued against a data source.
* If the query result contains a single column, the entries will be mapped to the column name.
* Otherwise, the entries will be mapped to a special key "multi_values_clause" which is used to
* represent a multi-column result.
*/
public class QueryResult {

private final Map<String, Integer> columnTypes;
private final Map<String, List<Object>> valueList;
private final List<String> columnNames;
private final List<Integer> columnTypes;
private final List<List<Object>> valueList;

private static final String MULTI_VALUES_KEY = "multi_values_clause";
private static final String RESULT = "Result";

public QueryResult() {
this.columnTypes = new HashMap<>();
this.valueList = new HashMap<>();
this(new ArrayList<>(), new ArrayList<>(), new ArrayList<>());
}

QueryResult(List<String> columnNames, List<Integer> columnTypes, List<List<Object>> valueList) {
this.columnNames = columnNames;
this.columnTypes = columnTypes;
this.valueList = valueList;
}

// TODO: Determine whether this can be done lazily i.e., after the statement has finished
Expand All @@ -47,52 +54,70 @@ public void populate(ResultSet rs) throws SQLException {
ResultSetMetaData rsmd = rs.getMetaData();

for (int j = 1; j <= rsmd.getColumnCount(); j++) {
columnTypes.put(rsmd.getColumnName(j), rsmd.getColumnType(j));
valueList.put(rsmd.getColumnName(j), new ArrayList<>());
columnNames.add(rsmd.getColumnName(j));
columnTypes.add(rsmd.getColumnType(j));
valueList.add(new ArrayList<>());
}

while (rs.next()) {
for (int j = 1; j <= rsmd.getColumnCount(); j++) {
valueList.get(rsmd.getColumnName(j)).add(rs.getObject(j));
valueList.get(j - 1).add(rs.getObject(j));
}
}
}

public Integer getValueListSize() {
Integer size = null;
for (Entry<String, List<Object>> pair : valueList.entrySet()) {
size = pair.getValue().size();
for (List<Object> values : valueList) {
size = values.size();
break;
}
return size;
}

public boolean containsEmptyResultColumnOnly() {
return valueList.keySet().size() == 1
&& valueList.containsKey(RESULT)
&& valueList.get(RESULT).isEmpty();
return columnNames.size() == 1
&& columnNames.get(0).equals(RESULT)
&& valueList.get(0).isEmpty();
}

public Map<String, Object> getStringMappings(int listMin, int listMax) {
Map<String, Object> result = new HashMap<>();
for (String key : valueList.keySet()) {
public Pair<String, Object> getStringMappings(int listMin, int listMax) {
if (columnNames.size() == 1) {
List<String> localList =
valueList.get(key).subList(listMin, listMax).stream()
valueList.get(0).subList(listMin, listMax).stream()
.map(Object::toString)
.map(s -> wrapString(s, columnTypes.get(0)))
.collect(Collectors.toUnmodifiableList());
switch (columnTypes.get(key)) {
case java.sql.Types.BIGINT:
case java.sql.Types.INTEGER:
case java.sql.Types.SMALLINT:
case java.sql.Types.TINYINT:
result.put(key, String.join(",", localList));
break;
default:
// Currently assumes String for all other types.
// TODO: Better handling and testing of data types across engines.
result.put(key, "'" + String.join("','", localList) + "'");
return Pair.of(columnNames.get(0), String.join(",", localList));
}
StringBuilder multiValuesClause = new StringBuilder();
for (int i = listMin; i < listMax; i++) {
multiValuesClause.append("(");
for (int j = 0; j < valueList.size(); j++) {
multiValuesClause
.append(wrapString(valueList.get(j).get(i).toString(), columnTypes.get(j)))
.append(",");
}
// Remove trailing comma
multiValuesClause.setLength(multiValuesClause.length() - 1);
multiValuesClause.append("),");
}
// Remove trailing comma
multiValuesClause.setLength(multiValuesClause.length() - 1);
return Pair.of(MULTI_VALUES_KEY, multiValuesClause.toString());
}

private String wrapString(String value, int type) {
switch (type) {
case java.sql.Types.BIGINT:
case java.sql.Types.INTEGER:
case java.sql.Types.SMALLINT:
case java.sql.Types.TINYINT:
return value;
default:
// Currently assumes String for all other types.
// TODO: Better handling and testing of data types across engines.
return "'" + value + "'";
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -88,7 +89,8 @@ public void executeTask(Connection connection, TaskExec task, Map<String, Object
for (int j = 0; j < size; j += batchSize) {
int localMax = (j + batchSize) > size ? size : (j + batchSize);
Map<String, Object> localValues = new HashMap<>(values);
localValues.putAll(queryResult.getStringMappings(j, localMax));
Pair<String, Object> batch = queryResult.getStringMappings(j, localMax);
localValues.put(batch.getKey(), batch.getValue());
executeStatement(connection, statement, localValues, true);
}
// Reset query result.
Expand Down
63 changes: 63 additions & 0 deletions src/test/java/com/microsoft/lst_bench/client/QueryResultTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright (c) Microsoft Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.microsoft.lst_bench.client;

import static org.junit.jupiter.api.Assertions.assertEquals;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.jupiter.api.Test;

public class QueryResultTest {

@Test
public void testGetStringMappingsSingleColumn() {
// Given
List<String> columnNames = Collections.singletonList("ColumnName");
List<Integer> columnTypes = Collections.singletonList(java.sql.Types.VARCHAR);
List<List<Object>> valueList =
List.of(
Arrays.asList(
"Value1", "Value2", "Value3", "Value4", "Value5", "Value6", "Value7", "Value8"));
QueryResult queryResult = new QueryResult(columnNames, columnTypes, valueList);

// When
Pair<String, Object> result = queryResult.getStringMappings(0, 4);

// Then
assertEquals("ColumnName", result.getKey());
assertEquals("'Value1','Value2','Value3','Value4'", result.getValue());
}

@Test
public void testGetStringMappingsMultiColumn() {
// Given
List<String> columnNames = Arrays.asList("Column1", "Column2");
List<Integer> columnTypes = Arrays.asList(java.sql.Types.VARCHAR, java.sql.Types.INTEGER);
List<List<Object>> valueList =
Arrays.asList(Arrays.asList("Value1", "Value2", "Value3"), Arrays.asList(1, 2, 3));
QueryResult queryResult = new QueryResult(columnNames, columnTypes, valueList);

// When
Pair<String, Object> result = queryResult.getStringMappings(0, 3);

// Then
assertEquals("multi_values_clause", result.getKey());
assertEquals("('Value1',1),('Value2',2),('Value3',3)", result.getValue());
}
}

0 comments on commit f1ede40

Please sign in to comment.