Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add partial udf:String/condition/math built-in functions #3363

Open
wants to merge 18 commits into
base: feature/calcite-engine
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.sql.calcite;

import java.util.ArrayList;
import java.util.List;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.tools.RelBuilder.AggCall;
import org.opensearch.sql.ast.AbstractNodeVisitor;
Expand Down Expand Up @@ -33,6 +35,10 @@ public AggCall visitAlias(Alias node, CalcitePlanContext context) {
@Override
public AggCall visitAggregateFunction(AggregateFunction node, CalcitePlanContext context) {
RexNode field = rexNodeVisitor.analyze(node.getField(), context);
return AggregateUtils.translate(node, field, context);
List<RexNode> argList = new ArrayList<>();
for (UnresolvedExpression arg : node.getArgList()) {
argList.add(rexNodeVisitor.analyze(arg, context));
}
return AggregateUtils.translate(node, field, context, argList);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import static org.opensearch.sql.ast.expression.SpanUnit.NONE;
import static org.opensearch.sql.ast.expression.SpanUnit.UNKNOWN;
import static org.opensearch.sql.calcite.utils.BuiltinFunctionUtils.translateArgument;

import java.math.BigDecimal;
import java.util.List;
Expand Down Expand Up @@ -240,6 +241,7 @@ public RexNode visitFunction(Function node, CalcitePlanContext context) {
List<RexNode> arguments =
node.getFuncArgs().stream().map(arg -> analyze(arg, context)).collect(Collectors.toList());
return context.rexBuilder.makeCall(
BuiltinFunctionUtils.translate(node.getFuncName()), arguments);
BuiltinFunctionUtils.translate(node.getFuncName()),
translateArgument(node.getFuncName(), arguments, context));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite.udf;

public interface Accumulator {
Object result();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite.udf;

public interface UserDefinedAggFunction<S extends Accumulator> {
S init();

Object result(S accumulator);

// Add values to the accumulator
S add(S acc, Object... values);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite.udf;

public interface UserDefinedFunction {
Object eval(Object... args);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite.udf.conditionUDF;

import org.opensearch.sql.calcite.udf.UserDefinedFunction;

public class IfFunction implements UserDefinedFunction {
@Override
public Object eval(Object... args) {
Object condition = args[0];
Object trueValue = args[1];
Object falseValue = args[2];
if (condition instanceof Boolean) {
return (Boolean) condition ? trueValue : falseValue;
}
return trueValue;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite.udf.conditionUDF;

import java.util.Objects;
import org.opensearch.sql.calcite.udf.UserDefinedFunction;

public class IfNullFunction implements UserDefinedFunction {
@Override
public Object eval(Object... args) {
Object conditionValue = args[0];
Object defaultValue = args[1];
if (Objects.isNull(conditionValue)) {
return defaultValue;
}
return conditionValue;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite.udf.conditionUDF;

import java.util.Objects;
import org.opensearch.sql.calcite.udf.UserDefinedFunction;

public class NullIfFunction implements UserDefinedFunction {
@Override
public Object eval(Object... args) {
Object firstValue = args[0];
Object secondValue = args[1];
if (Objects.equals(firstValue, secondValue)) {
return null;
}
return firstValue;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite.udf.mathUDF;

import java.util.zip.CRC32;
import org.opensearch.sql.calcite.udf.UserDefinedFunction;

public class CRC32Function implements UserDefinedFunction {
@Override
public Object eval(Object... args) {
Object value = args[0];
CRC32 crc = new CRC32();
crc.update(value.toString().getBytes());
return crc.getValue();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite.udf.mathUDF;

import org.opensearch.sql.calcite.udf.UserDefinedFunction;

public class EulerFunction implements UserDefinedFunction {
@Override
public Object eval(Object... args) {
return Math.E;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite.udf.mathUDF;

import org.opensearch.sql.calcite.udf.UserDefinedFunction;

public class ModFunction implements UserDefinedFunction {
@Override
public Object eval(Object... args) {
if (args.length < 2) {
throw new IllegalArgumentException("At least two arguments are required");
}

// Get the two values
Object mod0 = args[0];
Object mod1 = args[1];

// Handle numbers dynamically
if (mod0 instanceof Integer && mod1 instanceof Integer) {
return (Integer) mod0 % (Integer) mod1;
} else if (mod0 instanceof Number && mod1 instanceof Number) {
double num0 = ((Number) mod0).doubleValue();
double num1 = ((Number) mod1).doubleValue();

if (num1 == 0) {
throw new ArithmeticException("Modulo by zero is not allowed");
}

return num0 % num1; // Handles both float and double cases
} else {
throw new IllegalArgumentException("Invalid argument types: Expected numeric values");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite.udf.mathUDF;

import static java.lang.Math.sqrt;

import org.opensearch.sql.calcite.udf.UserDefinedFunction;

public class SqrtFunction implements UserDefinedFunction {
@Override
public Object eval(Object... args) {
if (args.length < 1) {
throw new IllegalArgumentException("At least one argument is required");
}

// Get the input value
Object input = args[0];

// Handle numbers dynamically
if (input instanceof Number) {
double num = ((Number) input).doubleValue();

if (num < 0) {
throw new ArithmeticException("Cannot compute square root of a negative number");
}

return sqrt(num); // Computes sqrt using Math.sqrt()
} else {
throw new IllegalArgumentException("Invalid argument type: Expected a numeric value");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package org.opensearch.sql.calcite.udf.udaf;

import com.tdunning.math.stats.AVLTreeDigest;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import org.opensearch.sql.calcite.udf.Accumulator;
import org.opensearch.sql.calcite.udf.UserDefinedAggFunction;

public class PercentileApproxFunction
implements UserDefinedAggFunction<PercentileApproxFunction.PencentileApproAccumulator> {
@Override
public PencentileApproAccumulator init() {
return new PencentileApproAccumulator();
}

// Add values to the accumulator
@Override
public PencentileApproAccumulator add(PencentileApproAccumulator acc, Object... values) {
List<Object> allValues = Arrays.asList(values);
Object targetValue = allValues.get(0);
if (Objects.isNull(targetValue)) {
return acc;
}
Number percentileValue = (Number) allValues.get(1);
acc.evaluate(((Number) targetValue).doubleValue(), percentileValue.intValue());
return acc;
}

// Calculate the percentile
@Override
public Object result(PencentileApproAccumulator acc) {
if (acc.size() == 0) {
return null;
}
return acc.result();
}

public static class PencentileApproAccumulator extends AVLTreeDigest implements Accumulator {
public static final double DEFAULT_COMPRESSION = 100.0;
private double percent;

public PencentileApproAccumulator() {
super(DEFAULT_COMPRESSION);
this.percent = 1.0;
}

public void evaluate(double value, int percent) {
this.percent = percent / 100.0;
this.add(value);
}

public Object result() {
return this.quantile(this.percent);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package org.opensearch.sql.calcite.udf.udaf;

import java.util.ArrayList;
import java.util.List;
import org.opensearch.sql.calcite.udf.Accumulator;
import org.opensearch.sql.calcite.udf.UserDefinedAggFunction;

public class TakeAggFunction implements UserDefinedAggFunction<TakeAggFunction.TakeAccumulator> {

@Override
public TakeAccumulator init() {
return new TakeAccumulator();
}

@Override
public Object result(TakeAccumulator accumulator) {
return accumulator.result();
}

@Override
public TakeAccumulator add(TakeAccumulator acc, Object... values) {
Object candidateValue = values[0];
int size = 0;
if (values.length > 1) {
size = (int) values[1];
} else {
size = 10;
}
if (size > acc.size()) {
acc.add(candidateValue);
}
return acc;
}

public static class TakeAccumulator implements Accumulator {
private List<Object> hits;

public TakeAccumulator() {
hits = new ArrayList<>();
}

@Override
public Object result() {
return hits;
}

public void add(Object value) {
hits.add(value);
}

public int size() {
return hits.size();
}
}
}
Loading