Skip to content

Commit

Permalink
Make basic aggregation working (part 2) : adding tests (#3355)
Browse files Browse the repository at this point in the history
* Add more aggregation tests

Signed-off-by: Lantao Jin <ltjin@amazon.com>

* delete unrelavant code

Signed-off-by: Lantao Jin <ltjin@amazon.com>

---------

Signed-off-by: Lantao Jin <ltjin@amazon.com>
  • Loading branch information
LantaoJin authored Feb 27, 2025
1 parent db22f1c commit c2fe0ad
Show file tree
Hide file tree
Showing 10 changed files with 605 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@

package org.opensearch.sql.calcite;

import static org.opensearch.sql.calcite.utils.OpenSearchTypeFactory.TYPE_FACTORY;

import java.sql.Connection;
import java.util.function.BiFunction;
import lombok.Getter;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.RelBuilder;
Expand All @@ -24,10 +25,10 @@ public class CalcitePlanContext {

@Getter private boolean isResolvingJoinCondition = false;

private CalcitePlanContext(FrameworkConfig config, JavaTypeFactory typeFactory) {
private CalcitePlanContext(FrameworkConfig config) {
this.config = config;
this.connection = CalciteToolsHelper.connect(config, typeFactory);
this.relBuilder = CalciteToolsHelper.create(config, typeFactory, connection);
this.connection = CalciteToolsHelper.connect(config, TYPE_FACTORY);
this.relBuilder = CalciteToolsHelper.create(config, TYPE_FACTORY, connection);
this.rexBuilder = new ExtendedRexBuilder(relBuilder.getRexBuilder());
}

Expand All @@ -41,10 +42,6 @@ public RexNode resolveJoinCondition(
}

public static CalcitePlanContext create(FrameworkConfig config) {
return new CalcitePlanContext(config, null);
}

public static CalcitePlanContext create(FrameworkConfig config, JavaTypeFactory typeFactory) {
return new CalcitePlanContext(config, typeFactory);
return new CalcitePlanContext(config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,12 +196,6 @@ public RelNode visitAggregation(Aggregation node, CalcitePlanContext context) {
groupByList.add(spanRex);
// add span's group alias field (most recent added expression)
}
// List<RexNode> aggList = node.getAggExprList().stream()
// .map(expr -> rexVisitor.analyze(expr, context))
// .collect(Collectors.toList());
// relBuilder.aggregate(relBuilder.groupKey(groupByList),
// aggList.stream().map(rex -> (MyAggregateCall) rex)
// .map(MyAggregateCall::getCall).collect(Collectors.toList()));
context.relBuilder.aggregate(context.relBuilder.groupKey(groupByList), aggList);
return context.relBuilder.peek();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlIntervalQualifier;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.parser.SqlParserUtil;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.DateString;
import org.apache.calcite.util.TimeString;
Expand All @@ -39,7 +39,6 @@
import org.opensearch.sql.ast.expression.UnresolvedExpression;
import org.opensearch.sql.ast.expression.Xor;
import org.opensearch.sql.calcite.utils.BuiltinFunctionUtils;
import org.opensearch.sql.calcite.utils.DataTypeTransformer;

public class CalciteRexNodeVisitor extends AbstractNodeVisitor<RexNode, CalcitePlanContext> {

Expand Down Expand Up @@ -198,13 +197,9 @@ public RexNode visitSpan(Span node, CalcitePlanContext context) {
RelDataTypeFactory typeFactory = context.rexBuilder.getTypeFactory();
SpanUnit unit = node.getUnit();
if (isTimeBased(unit)) {
String datetimeUnitString = DataTypeTransformer.translate(unit);
RexNode interval =
context.rexBuilder.makeIntervalLiteral(
new BigDecimal(value.toString()),
new SqlIntervalQualifier(datetimeUnitString, SqlParserPos.ZERO));
// TODO not supported yet
return interval;
SqlIntervalQualifier intervalQualifier = context.rexBuilder.createIntervalUntil(unit);
long millis = SqlParserUtil.intervalToMillis(value.toString(), intervalQualifier);
return context.rexBuilder.makeIntervalLiteral(new BigDecimal(millis), intervalQualifier);
} else {
// if the unit is not time base - create a math expression to bucket the span partitions
return context.rexBuilder.makeCall(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@

package org.opensearch.sql.calcite;

import org.apache.calcite.avatica.util.TimeUnit;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlIntervalQualifier;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.opensearch.sql.ast.expression.SpanUnit;

public class ExtendedRexBuilder extends RexBuilder {

Expand All @@ -22,4 +26,49 @@ public RexNode coalesce(RexNode... nodes) {
public RexNode equals(RexNode n1, RexNode n2) {
return this.makeCall(SqlStdOperatorTable.EQUALS, n1, n2);
}

public SqlIntervalQualifier createIntervalUntil(SpanUnit unit) {
TimeUnit timeUnit;
switch (unit) {
case MILLISECOND:
case MS:
timeUnit = TimeUnit.MILLISECOND;
break;
case SECOND:
case S:
timeUnit = TimeUnit.SECOND;
break;
case MINUTE:
case m:
timeUnit = TimeUnit.MINUTE;
break;
case HOUR:
case H:
timeUnit = TimeUnit.HOUR;
break;
case DAY:
case D:
timeUnit = TimeUnit.DAY;
break;
case WEEK:
case W:
timeUnit = TimeUnit.WEEK;
break;
case MONTH:
case M:
timeUnit = TimeUnit.MONTH;
break;
case QUARTER:
case Q:
timeUnit = TimeUnit.QUARTER;
break;
case YEAR:
case Y:
timeUnit = TimeUnit.YEAR;
break;
default:
timeUnit = TimeUnit.EPOCH;
}
return new SqlIntervalQualifier(timeUnit, timeUnit, SqlParserPos.ZERO);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import static org.opensearch.sql.data.type.ExprCoreType.DOUBLE;
import static org.opensearch.sql.data.type.ExprCoreType.FLOAT;
import static org.opensearch.sql.data.type.ExprCoreType.INTEGER;
import static org.opensearch.sql.data.type.ExprCoreType.INTERVAL;
import static org.opensearch.sql.data.type.ExprCoreType.IP;
import static org.opensearch.sql.data.type.ExprCoreType.LONG;
import static org.opensearch.sql.data.type.ExprCoreType.SHORT;
Expand Down Expand Up @@ -151,6 +152,13 @@ public static ExprType convertRelDataTypeToExprType(RelDataType type) {
return TIMESTAMP;
case GEOMETRY:
return IP;
case INTERVAL_YEAR:
case INTERVAL_MONTH:
case INTERVAL_DAY:
case INTERVAL_HOUR:
case INTERVAL_MINUTE:
case INTERVAL_SECOND:
return INTERVAL;
case ARRAY:
return ARRAY;
case MAP:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

package org.opensearch.sql.executor;

import static org.opensearch.sql.calcite.utils.OpenSearchTypeFactory.TYPE_FACTORY;

import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.List;
Expand Down Expand Up @@ -80,8 +78,7 @@ public void execute(
(PrivilegedAction<Void>)
() -> {
final FrameworkConfig config = buildFrameworkConfig();
final CalcitePlanContext context =
CalcitePlanContext.create(config, TYPE_FACTORY);
final CalcitePlanContext context = CalcitePlanContext.create(config);
executePlanByCalcite(analyze(plan, context), context, listener);
return null;
});
Expand Down
Loading

0 comments on commit c2fe0ad

Please sign in to comment.