Skip to content

Commit

Permalink
DRILL-8501: Json Conversion UDF Not Respecting System JSON Options
Browse files Browse the repository at this point in the history
  • Loading branch information
cgivre committed Jul 4, 2024
1 parent 40500ec commit 7ddde7b
Show file tree
Hide file tree
Showing 4 changed files with 256 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@


import io.netty.buffer.DrillBuf;

import javax.inject.Inject;

import org.apache.drill.exec.expr.DrillSimpleFunc;
import org.apache.drill.exec.expr.annotations.FunctionTemplate;
import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope;
Expand All @@ -32,28 +29,43 @@
import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
import org.apache.drill.exec.expr.holders.VarBinaryHolder;
import org.apache.drill.exec.expr.holders.VarCharHolder;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;

public class JsonConvertFrom {
import javax.inject.Inject;

static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JsonConvertFrom.class);
public class JsonConvertFrom {

private JsonConvertFrom() {
}

@FunctionTemplate(name = "convert_fromJSON", scope = FunctionScope.SIMPLE, isRandom = true)
public static class ConvertFromJson implements DrillSimpleFunc {

@Param VarBinaryHolder in;
@Inject DrillBuf buffer;
@Workspace org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader;
@Param
VarBinaryHolder in;

@Output ComplexWriter writer;
@Inject
DrillBuf buffer;

@Inject
OptionManager options;

@Workspace
org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader;

@Output
ComplexWriter writer;

@Override
public void setup() {
boolean allTextMode = options.getBoolean(org.apache.drill.exec.ExecConstants.JSON_ALL_TEXT_MODE);
boolean readNumbersAsDouble = options.getBoolean(org.apache.drill.exec.ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE);

jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader.Builder(buffer)
.defaultSchemaPathColumns()
.allTextMode(allTextMode)
.readNumbersAsDouble(readNumbersAsDouble)
.build();
}

Expand All @@ -72,16 +84,30 @@ public void eval() {
@FunctionTemplate(name = "convert_fromJSON", scope = FunctionScope.SIMPLE, isRandom = true)
public static class ConvertFromJsonVarchar implements DrillSimpleFunc {

@Param VarCharHolder in;
@Inject DrillBuf buffer;
@Workspace org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader;
@Param
VarCharHolder in;

@Output ComplexWriter writer;
@Inject
DrillBuf buffer;

@Inject
OptionManager options;

@Workspace
org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader;

@Output
ComplexWriter writer;

@Override
public void setup() {
boolean allTextMode = options.getBoolean(org.apache.drill.exec.ExecConstants.JSON_ALL_TEXT_MODE);
boolean readNumbersAsDouble = options.getBoolean(org.apache.drill.exec.ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE);

jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader.Builder(buffer)
.defaultSchemaPathColumns()
.allTextMode(allTextMode)
.readNumbersAsDouble(readNumbersAsDouble)
.build();
}

Expand All @@ -100,16 +126,30 @@ public void eval() {
@FunctionTemplate(name = "convert_fromJSON", scope = FunctionScope.SIMPLE, isRandom = true)
public static class ConvertFromJsonNullableInput implements DrillSimpleFunc {

@Param NullableVarBinaryHolder in;
@Inject DrillBuf buffer;
@Workspace org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader;
@Param
NullableVarBinaryHolder in;

@Output ComplexWriter writer;
@Inject
DrillBuf buffer;

@Inject
OptionManager options;

@Workspace
org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader;

@Output
ComplexWriter writer;

@Override
public void setup() {
boolean allTextMode = options.getBoolean(org.apache.drill.exec.ExecConstants.JSON_ALL_TEXT_MODE);
boolean readNumbersAsDouble = options.getBoolean(org.apache.drill.exec.ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE);

jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader.Builder(buffer)
.defaultSchemaPathColumns()
.allTextMode(allTextMode)
.readNumbersAsDouble(readNumbersAsDouble)
.build();
}

Expand All @@ -136,16 +176,29 @@ public void eval() {
@FunctionTemplate(name = "convert_fromJSON", scope = FunctionScope.SIMPLE, isRandom = true)
public static class ConvertFromJsonVarcharNullableInput implements DrillSimpleFunc {

@Param NullableVarCharHolder in;
@Inject DrillBuf buffer;
@Workspace org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader;
@Param
NullableVarCharHolder in;

@Inject
DrillBuf buffer;

@Inject
OptionManager options;

@Workspace
org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader;

@Output ComplexWriter writer;

@Override
public void setup() {
boolean allTextMode = options.getBoolean(org.apache.drill.exec.ExecConstants.JSON_ALL_TEXT_MODE);
boolean readNumbersAsDouble = options.getBoolean(org.apache.drill.exec.ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE);

jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader.Builder(buffer)
.defaultSchemaPathColumns()
.allTextMode(allTextMode)
.readNumbersAsDouble(readNumbersAsDouble)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.drill.exec.store.json;

import org.apache.commons.io.FileUtils;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.drill.exec.vector.VarCharVector;
import org.apache.drill.test.BaseTestQuery;
import org.junit.Test;

import java.io.File;
import java.nio.charset.Charset;
import java.util.List;

import static org.apache.drill.test.TestBuilder.mapOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;

public class TestJsonConversionUDF extends BaseTestQuery {

@Test
public void doTestConvertFromJsonFunction() throws Exception {
String table = "nan_test.csv";
File file = new File(dirTestWatcher.getRootDir(), table);
String csv = "col_0, {\"nan_col\":NaN}";
try {
FileUtils.writeStringToFile(file, csv, Charset.defaultCharset());
testBuilder()
.sqlQuery(String.format("select convert_fromJSON(columns[1]) as col from dfs.`%s`", table))
.unOrdered()
.baselineColumns("col")
.baselineValues(mapOf("nan_col", Double.NaN))
.go();
} finally {
FileUtils.deleteQuietly(file);
}
}

@Test
public void doTestConvertToJsonFunction() throws Exception {
String table = "nan_test.csv";
File file = new File(dirTestWatcher.getRootDir(), table);
String csv = "col_0, {\"nan_col\":NaN}";
String query = String.format("select string_binary(convert_toJSON(convert_fromJSON(columns[1]))) as col " +
"from dfs.`%s` where columns[0]='col_0'", table);
try {
FileUtils.writeStringToFile(file, csv, Charset.defaultCharset());
List<QueryDataBatch> results = testSqlWithResults(query);
RecordBatchLoader batchLoader = new RecordBatchLoader(getAllocator());
assertEquals("Query result must contain 1 row", 1, results.size());
QueryDataBatch batch = results.get(0);

batchLoader.load(batch.getHeader().getDef(), batch.getData());
VectorWrapper<?> vw = batchLoader.getValueAccessorById(VarCharVector.class, batchLoader.getValueVectorId(SchemaPath.getCompoundPath("col")).getFieldIds());
// ensuring that `NaN` token ARE NOT enclosed with double quotes
String resultJson = vw.getValueVector().getAccessor().getObject(0).toString();
int nanIndex = resultJson.indexOf("NaN");
assertNotEquals("`NaN` must not be enclosed with \"\" ", '"', resultJson.charAt(nanIndex - 1));
assertNotEquals("`NaN` must not be enclosed with \"\" ", '"', resultJson.charAt(nanIndex + "NaN".length()));
batch.release();
batchLoader.clear();
} finally {
FileUtils.deleteQuietly(file);
}
}

@Test
public void testAllTextMode() throws Exception {
alterSession(ExecConstants.JSON_ALL_TEXT_MODE, true);
alterSession(ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE, false);

String sql = "SELECT \n" +
"typeof(jsonMap['bi']) AS bi, \n" +
"typeof(jsonMap['fl']) AS fl, \n" +
"typeof(jsonMap['st']) AS st, \n" +
"typeof(jsonMap['mp']) AS mp, \n" +
"typeof(jsonMap['ar']) AS ar, \n" +
"typeof(jsonMap['nu']) AS nu\n" +
"FROM(\n" +
"SELECT convert_fromJSON(col1) AS jsonMap, \n" +
"col2 \n" +
"FROM cp.`jsoninput/allTypes.csvh`\n" +
")";

testBuilder()
.sqlQuery(sql)
.unOrdered()
.baselineColumns("bi", "fl", "st", "mp", "ar", "nu")
.baselineValues("VARCHAR", "VARCHAR", "VARCHAR", "MAP", "VARCHAR", "NULL")
.go();

alterSession(ExecConstants.JSON_ALL_TEXT_MODE, false);

testBuilder()
.sqlQuery(sql)
.unOrdered()
.baselineColumns("bi", "fl", "st", "mp", "ar", "nu")
.baselineValues("BIGINT", "FLOAT8", "VARCHAR", "MAP", "BIGINT", "NULL")
.go();

resetSessionOption(ExecConstants.JSON_ALL_TEXT_MODE);
resetSessionOption(ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE);
}

@Test
public void testReadNumbersAsDouble() throws Exception {
alterSession(ExecConstants.JSON_ALL_TEXT_MODE, false);
alterSession(ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE, true);

String sql = "SELECT \n" +
"typeof(jsonMap['bi']) AS bi, \n" +
"typeof(jsonMap['fl']) AS fl, \n" +
"typeof(jsonMap['st']) AS st, \n" +
"typeof(jsonMap['mp']) AS mp, \n" +
"typeof(jsonMap['ar']) AS ar, \n" +
"typeof(jsonMap['nu']) AS nu\n" +
"FROM(\n" +
"SELECT convert_fromJSON(col1) AS jsonMap, \n" +
"col2 \n" +
"FROM cp.`jsoninput/allTypes.csvh`\n" +
")";

testBuilder()
.sqlQuery(sql)
.unOrdered()
.baselineColumns("bi", "fl", "st", "mp", "ar", "nu")
.baselineValues("FLOAT8", "FLOAT8", "VARCHAR", "MAP", "FLOAT8", "NULL")
.go();

alterSession(ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE, true);
testBuilder()
.sqlQuery(sql)
.unOrdered()
.baselineColumns("bi", "fl", "st", "mp", "ar", "nu")
.baselineValues("FLOAT8", "FLOAT8", "VARCHAR", "MAP", "FLOAT8", "NULL")
.go();

resetSessionOption(ExecConstants.JSON_ALL_TEXT_MODE);
resetSessionOption(ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE);
}
}
Loading

0 comments on commit 7ddde7b

Please sign in to comment.