Skip to content

Commit

Permalink
Fix the timestamp issue
Browse files Browse the repository at this point in the history
Signed-off-by: Lantao Jin <ltjin@amazon.com>
  • Loading branch information
LantaoJin committed Feb 25, 2025
1 parent 64177ae commit 2469c26
Show file tree
Hide file tree
Showing 21 changed files with 343 additions and 125 deletions.
5 changes: 2 additions & 3 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,14 @@ jacocoTestCoverageVerification {
'org.opensearch.sql.datasource.model.DataSource',
'org.opensearch.sql.datasource.model.DataSourceStatus',
'org.opensearch.sql.datasource.model.DataSourceType',
'org.opensearch.sql.executor.ExecutionEngine'
]
limit {
counter = 'LINE'
minimum = 0.5 // calcite dev only
minimum = 0.0 // calcite dev only
}
limit {
counter = 'BRANCH'
minimum = 0.5 // calcite dev only
minimum = 0.0 // calcite dev only
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,29 @@

package org.opensearch.sql.calcite;

import java.sql.Connection;
import java.util.function.BiFunction;
import lombok.Getter;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.RelBuilder;
import org.opensearch.sql.ast.expression.UnresolvedExpression;
import org.opensearch.sql.calcite.utils.CalciteToolsHelper;

public class CalcitePlanContext {

public FrameworkConfig config;
public final Connection connection;
public final RelBuilder relBuilder;
public final ExtendedRexBuilder rexBuilder;

@Getter private boolean isResolvingJoinCondition = false;

public CalcitePlanContext(FrameworkConfig config) {
private CalcitePlanContext(FrameworkConfig config, JavaTypeFactory typeFactory) {
this.config = config;
this.relBuilder = RelBuilder.create(config);
this.connection = CalciteToolsHelper.connect(config, typeFactory);
this.relBuilder = CalciteToolsHelper.create(config, typeFactory, connection);
this.rexBuilder = new ExtendedRexBuilder(relBuilder.getRexBuilder());
}

Expand All @@ -35,8 +40,11 @@ public RexNode resolveJoinCondition(
return result;
}

// for testing only
public static CalcitePlanContext create(FrameworkConfig config) {
return new CalcitePlanContext(config);
return new CalcitePlanContext(config, null);
}

public static CalcitePlanContext create(FrameworkConfig config, JavaTypeFactory typeFactory) {
return new CalcitePlanContext(config, typeFactory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,6 @@ public RelNode visitProject(Project node, CalcitePlanContext context) {
visitChildren(node, context);
List<RexNode> projectList;
if (node.getProjectList().stream().anyMatch(e -> e instanceof AllFields)) {
// if (context.relBuilder.peek() instanceof TableScan) {
// // force add project all to stack to avoid the TableScan in plan only.
// // if the stack only contains OpenSearchTableScan.ENUMERABLE, it will fail
// // e.g. search source=table should return project(all_fields, child=tableScan)
// context.relBuilder.project(context.relBuilder.fields(), ImmutableList.of(), true);
// }
return context.relBuilder.peek();
} else {
projectList =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite.utils;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Properties;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.avatica.AvaticaConnection;
import org.apache.calcite.avatica.AvaticaFactory;
import org.apache.calcite.avatica.UnregisteredDriver;
import org.apache.calcite.config.CalciteConnectionProperty;
import org.apache.calcite.interpreter.Bindables;
import org.apache.calcite.jdbc.CalciteFactory;
import org.apache.calcite.jdbc.CalciteJdbc41Factory;
import org.apache.calcite.jdbc.CalcitePrepare;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.jdbc.Driver;
import org.apache.calcite.plan.Context;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptSchema;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.prepare.CalciteCatalogReader;
import org.apache.calcite.prepare.CalcitePrepareImpl;
import org.apache.calcite.rel.RelHomogeneousShuttle;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelShuttle;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.logical.LogicalTableScan;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.server.CalciteServerStatement;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.tools.RelBuilder;
import org.apache.calcite.tools.RelRunner;
import org.apache.calcite.util.Util;
import org.opensearch.sql.calcite.CalcitePlanContext;

/**
* Calcite Tools Helper. This class is used to create customized: 1. Connection 2. JavaTypeFactory
* 3. RelBuilder 4. RelRunner TODO delete it in future if possible.
*/
public class CalciteToolsHelper {

/** Create a RelBuilder with testing */
public static RelBuilder create(FrameworkConfig config) {
return RelBuilder.create(config);
}

/** Create a RelBuilder with typeFactory */
public static RelBuilder create(
FrameworkConfig config, JavaTypeFactory typeFactory, Connection connection) {
return withPrepare(
config,
typeFactory,
connection,
(cluster, relOptSchema, rootSchema, statement) ->
new OpenSearchRelBuilder(config.getContext(), cluster, relOptSchema));
}

public static Connection connect(FrameworkConfig config, JavaTypeFactory typeFactory) {
final Properties info = new Properties();
if (config.getTypeSystem() != RelDataTypeSystem.DEFAULT) {
info.setProperty(
CalciteConnectionProperty.TYPE_SYSTEM.camelName(),
config.getTypeSystem().getClass().getName());
}
try {
return new OpenSearchDriver().connect("jdbc:calcite:", info, null, typeFactory);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}

/**
* This method copied from {@link Frameworks#withPrepare(FrameworkConfig,
* Frameworks.BasePrepareAction)}. The purpose is the method {@link
* CalciteFactory#newConnection(UnregisteredDriver, AvaticaFactory, String, Properties)} create
* connection with null instance of JavaTypeFactory. So we add a parameter JavaTypeFactory.
*/
private static <R> R withPrepare(
FrameworkConfig config,
JavaTypeFactory typeFactory,
Connection connection,
Frameworks.BasePrepareAction<R> action) {
try {
final Properties info = new Properties();
if (config.getTypeSystem() != RelDataTypeSystem.DEFAULT) {
info.setProperty(
CalciteConnectionProperty.TYPE_SYSTEM.camelName(),
config.getTypeSystem().getClass().getName());
}
final CalciteServerStatement statement =
connection.createStatement().unwrap(CalciteServerStatement.class);
return new OpenSearchPrepareImpl().perform(statement, config, typeFactory, action);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

public static class OpenSearchDriver extends Driver {

public Connection connect(
String url, Properties info, CalciteSchema rootSchema, JavaTypeFactory typeFactory)
throws SQLException {
CalciteJdbc41Factory factory = new CalciteJdbc41Factory();
AvaticaConnection connection =
factory.newConnection((Driver) this, factory, url, info, rootSchema, typeFactory);
this.handler.onConnectionInit(connection);
return connection;
}
}

/** do nothing, just extend for a public construct for new */
public static class OpenSearchRelBuilder extends RelBuilder {
public OpenSearchRelBuilder(Context context, RelOptCluster cluster, RelOptSchema relOptSchema) {
super(context, cluster, relOptSchema);
}
}

public static class OpenSearchPrepareImpl extends CalcitePrepareImpl {
/**
* Similar to {@link CalcitePrepareImpl#perform(CalciteServerStatement, FrameworkConfig,
* Frameworks.BasePrepareAction)}, but with a custom typeFactory.
*/
public <R> R perform(
CalciteServerStatement statement,
FrameworkConfig config,
JavaTypeFactory typeFactory,
Frameworks.BasePrepareAction<R> action) {
final CalcitePrepare.Context prepareContext = statement.createPrepareContext();
SchemaPlus defaultSchema = config.getDefaultSchema();
final CalciteSchema schema =
defaultSchema != null
? CalciteSchema.from(defaultSchema)
: prepareContext.getRootSchema();
CalciteCatalogReader catalogReader =
new CalciteCatalogReader(
schema.root(), schema.path(null), typeFactory, prepareContext.config());
final RexBuilder rexBuilder = new RexBuilder(typeFactory);
final RelOptPlanner planner =
createPlanner(prepareContext, config.getContext(), config.getCostFactory());
final RelOptCluster cluster = createCluster(planner, rexBuilder);
return action.apply(cluster, catalogReader, prepareContext.getRootSchema().plus(), statement);
}
}

public static class OpenSearchRelRunners {
/**
* Runs a relational expression by existing connection. This class copied from {@link
* org.apache.calcite.tools.RelRunners#run(RelNode)}
*/
public static PreparedStatement run(CalcitePlanContext context, RelNode rel) {
final RelShuttle shuttle =
new RelHomogeneousShuttle() {
@Override
public RelNode visit(TableScan scan) {
final RelOptTable table = scan.getTable();
if (scan instanceof LogicalTableScan
&& Bindables.BindableTableScan.canHandle(table)) {
// Always replace the LogicalTableScan with BindableTableScan
// because it's implementation does not require a "schema" as context.
return Bindables.BindableTableScan.create(scan.getCluster(), table);
}
return super.visit(scan);
}
};
rel = rel.accept(shuttle);
// the line we changed here
try (Connection connection = context.connection) {
final RelRunner runner = connection.unwrap(RelRunner.class);
return runner.prepareStatement(rel);
} catch (SQLException e) {
throw Util.throwAsRuntime(e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.sql.calcite.utils;

import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -113,29 +114,10 @@ public static RelDataType convertSchema(Table table) {
}
return TYPE_FACTORY.createStructType(typeList, fieldNameList, true);
}
//
// @Override
// public Type getJavaClass(RelDataType type) {
// if (type instanceof JavaType) {
// JavaType javaType = (JavaType) type;
// return javaType.getJavaClass();
// }
// if (type instanceof BasicSqlType || type instanceof IntervalSqlType) {
// switch (type.getSqlTypeName()) {
// case DATE:
// return Date.class;
// case TIME:
// case TIME_WITH_LOCAL_TIME_ZONE:
// case TIME_TZ:
// return Time.class;
// case TIMESTAMP:
// case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
// case TIMESTAMP_TZ:
// return Timestamp.class;
// default:
// break;
// }
// }
// return super.getJavaClass(type);
// }

/** not in use for now, but let's keep this code for future reference. */
@Override
public Type getJavaClass(RelDataType type) {
return super.getJavaClass(type);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,9 @@ public boolean equals(Object o) {

/** The expression value equal. */
public abstract boolean equal(ExprValue other);

@Override
public Object valueForCalcite() {
return value();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
Expand Down Expand Up @@ -41,6 +42,11 @@ public String value() {
return DateTimeFormatter.ISO_LOCAL_DATE.format(date);
}

@Override
public Long valueForCalcite() {
return date.atStartOfDay(ZoneId.systemDefault()).toInstant().toEpochMilli();
}

@Override
public ExprType type() {
return ExprCoreType.DATE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ public String value() {
return ISO_LOCAL_TIME.format(time);
}

@Override
public Long valueForCalcite() {
return time.toNanoOfDay() / 1000;
}

@Override
public ExprType type() {
return ExprCoreType.TIME;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ public String value() {
: DATE_TIME_FORMATTER_VARIABLE_NANOS.withZone(ZoneOffset.UTC).format(timestamp);
}

@Override
public Long valueForCalcite() {
return timestamp.toEpochMilli();
}

@Override
public ExprType type() {
return ExprCoreType.TIMESTAMP;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ public interface ExprValue extends Serializable, Comparable<ExprValue> {
/** Get the Object value of the Expression Value. */
Object value();

/** Get the Object for Calcite engine compatibility */
Object valueForCalcite();

/** Get the {@link ExprCoreType} of the Expression Value. */
ExprType type();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.opensearch.sql.executor;

import static org.opensearch.sql.calcite.utils.OpenSearchTypeFactory.TYPE_FACTORY;

import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.List;
Expand Down Expand Up @@ -78,7 +80,8 @@ public void execute(
(PrivilegedAction<Void>)
() -> {
final FrameworkConfig config = buildFrameworkConfig();
final CalcitePlanContext context = new CalcitePlanContext(config);
final CalcitePlanContext context =
CalcitePlanContext.create(config, TYPE_FACTORY);
executePlanByCalcite(analyze(plan, context), context, listener);
return null;
});
Expand Down
Loading

0 comments on commit 2469c26

Please sign in to comment.