Skip to content

Commit

Permalink
Add more aggregation tests
Browse files Browse the repository at this point in the history
Signed-off-by: Lantao Jin <ltjin@amazon.com>
  • Loading branch information
LantaoJin committed Feb 26, 2025
1 parent db22f1c commit ab24d83
Show file tree
Hide file tree
Showing 10 changed files with 614 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@

package org.opensearch.sql.calcite;

import static org.opensearch.sql.calcite.utils.OpenSearchTypeFactory.TYPE_FACTORY;

import java.sql.Connection;
import java.util.function.BiFunction;
import lombok.Getter;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.RelBuilder;
Expand All @@ -24,10 +25,10 @@ public class CalcitePlanContext {

@Getter private boolean isResolvingJoinCondition = false;

private CalcitePlanContext(FrameworkConfig config, JavaTypeFactory typeFactory) {
private CalcitePlanContext(FrameworkConfig config) {
this.config = config;
this.connection = CalciteToolsHelper.connect(config, typeFactory);
this.relBuilder = CalciteToolsHelper.create(config, typeFactory, connection);
this.connection = CalciteToolsHelper.connect(config, TYPE_FACTORY);
this.relBuilder = CalciteToolsHelper.create(config, TYPE_FACTORY, connection);
this.rexBuilder = new ExtendedRexBuilder(relBuilder.getRexBuilder());
}

Expand All @@ -41,10 +42,6 @@ public RexNode resolveJoinCondition(
}

public static CalcitePlanContext create(FrameworkConfig config) {
return new CalcitePlanContext(config, null);
}

public static CalcitePlanContext create(FrameworkConfig config, JavaTypeFactory typeFactory) {
return new CalcitePlanContext(config, typeFactory);
return new CalcitePlanContext(config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,12 +196,6 @@ public RelNode visitAggregation(Aggregation node, CalcitePlanContext context) {
groupByList.add(spanRex);
// add span's group alias field (most recent added expression)
}
// List<RexNode> aggList = node.getAggExprList().stream()
// .map(expr -> rexVisitor.analyze(expr, context))
// .collect(Collectors.toList());
// relBuilder.aggregate(relBuilder.groupKey(groupByList),
// aggList.stream().map(rex -> (MyAggregateCall) rex)
// .map(MyAggregateCall::getCall).collect(Collectors.toList()));
context.relBuilder.aggregate(context.relBuilder.groupKey(groupByList), aggList);
return context.relBuilder.peek();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.calcite.sql.SqlIntervalQualifier;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.parser.SqlParserUtil;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.DateString;
import org.apache.calcite.util.TimeString;
Expand All @@ -29,6 +30,7 @@
import org.opensearch.sql.ast.expression.Compare;
import org.opensearch.sql.ast.expression.EqualTo;
import org.opensearch.sql.ast.expression.Function;
import org.opensearch.sql.ast.expression.Interval;
import org.opensearch.sql.ast.expression.Let;
import org.opensearch.sql.ast.expression.Literal;
import org.opensearch.sql.ast.expression.Not;
Expand All @@ -39,7 +41,6 @@
import org.opensearch.sql.ast.expression.UnresolvedExpression;
import org.opensearch.sql.ast.expression.Xor;
import org.opensearch.sql.calcite.utils.BuiltinFunctionUtils;
import org.opensearch.sql.calcite.utils.DataTypeTransformer;

public class CalciteRexNodeVisitor extends AbstractNodeVisitor<RexNode, CalcitePlanContext> {

Expand Down Expand Up @@ -198,13 +199,9 @@ public RexNode visitSpan(Span node, CalcitePlanContext context) {
RelDataTypeFactory typeFactory = context.rexBuilder.getTypeFactory();
SpanUnit unit = node.getUnit();
if (isTimeBased(unit)) {
String datetimeUnitString = DataTypeTransformer.translate(unit);
RexNode interval =
context.rexBuilder.makeIntervalLiteral(
new BigDecimal(value.toString()),
new SqlIntervalQualifier(datetimeUnitString, SqlParserPos.ZERO));
// TODO not supported yet
return interval;
SqlIntervalQualifier intervalQualifier = context.rexBuilder.createIntervalUntil(unit);
long millis = SqlParserUtil.intervalToMillis(value.toString(), intervalQualifier);
return context.rexBuilder.makeIntervalLiteral(new BigDecimal(millis), intervalQualifier);
} else {
// if the unit is not time base - create a math expression to bucket the span partitions
return context.rexBuilder.makeCall(
Expand Down Expand Up @@ -247,4 +244,12 @@ public RexNode visitFunction(Function node, CalcitePlanContext context) {
return context.rexBuilder.makeCall(
BuiltinFunctionUtils.translate(node.getFuncName()), arguments);
}

@Override
public RexNode visitInterval(Interval node, CalcitePlanContext context) {
RexNode field = analyze(node.getValue(), context);
return context.rexBuilder.makeIntervalLiteral(
new BigDecimal(field.toString()),
new SqlIntervalQualifier(node.getUnit().name(), SqlParserPos.ZERO));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@

package org.opensearch.sql.calcite;

import org.apache.calcite.avatica.util.TimeUnit;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlIntervalQualifier;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.opensearch.sql.ast.expression.SpanUnit;

public class ExtendedRexBuilder extends RexBuilder {

Expand All @@ -22,4 +26,49 @@ public RexNode coalesce(RexNode... nodes) {
public RexNode equals(RexNode n1, RexNode n2) {
return this.makeCall(SqlStdOperatorTable.EQUALS, n1, n2);
}

public SqlIntervalQualifier createIntervalUntil(SpanUnit unit) {
TimeUnit timeUnit;
switch (unit) {
case MILLISECOND:
case MS:
timeUnit = TimeUnit.MILLISECOND;
break;
case SECOND:
case S:
timeUnit = TimeUnit.SECOND;
break;
case MINUTE:
case m:
timeUnit = TimeUnit.MINUTE;
break;
case HOUR:
case H:
timeUnit = TimeUnit.HOUR;
break;
case DAY:
case D:
timeUnit = TimeUnit.DAY;
break;
case WEEK:
case W:
timeUnit = TimeUnit.WEEK;
break;
case MONTH:
case M:
timeUnit = TimeUnit.MONTH;
break;
case QUARTER:
case Q:
timeUnit = TimeUnit.QUARTER;
break;
case YEAR:
case Y:
timeUnit = TimeUnit.YEAR;
break;
default:
timeUnit = TimeUnit.EPOCH;
}
return new SqlIntervalQualifier(timeUnit, timeUnit, SqlParserPos.ZERO);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import static org.opensearch.sql.data.type.ExprCoreType.DOUBLE;
import static org.opensearch.sql.data.type.ExprCoreType.FLOAT;
import static org.opensearch.sql.data.type.ExprCoreType.INTEGER;
import static org.opensearch.sql.data.type.ExprCoreType.INTERVAL;
import static org.opensearch.sql.data.type.ExprCoreType.IP;
import static org.opensearch.sql.data.type.ExprCoreType.LONG;
import static org.opensearch.sql.data.type.ExprCoreType.SHORT;
Expand Down Expand Up @@ -151,6 +152,13 @@ public static ExprType convertRelDataTypeToExprType(RelDataType type) {
return TIMESTAMP;
case GEOMETRY:
return IP;
case INTERVAL_YEAR:
case INTERVAL_MONTH:
case INTERVAL_DAY:
case INTERVAL_HOUR:
case INTERVAL_MINUTE:
case INTERVAL_SECOND:
return INTERVAL;
case ARRAY:
return ARRAY;
case MAP:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

package org.opensearch.sql.executor;

import static org.opensearch.sql.calcite.utils.OpenSearchTypeFactory.TYPE_FACTORY;

import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.List;
Expand Down Expand Up @@ -80,8 +78,7 @@ public void execute(
(PrivilegedAction<Void>)
() -> {
final FrameworkConfig config = buildFrameworkConfig();
final CalcitePlanContext context =
CalcitePlanContext.create(config, TYPE_FACTORY);
final CalcitePlanContext context = CalcitePlanContext.create(config);
executePlanByCalcite(analyze(plan, context), context, listener);
return null;
});
Expand Down
Loading

0 comments on commit ab24d83

Please sign in to comment.