Skip to content

Commit

Permalink
[Calcite] Build integration test framework (#3342)
Browse files Browse the repository at this point in the history
* Build integration test framework

Signed-off-by: Lantao Jin <ltjin@amazon.com>

* make local work

Signed-off-by: Lantao Jin <ltjin@amazon.com>

* Fix the timestamp issue

Signed-off-by: Lantao Jin <ltjin@amazon.com>

* address comments

Signed-off-by: Lantao Jin <ltjin@amazon.com>

* fix java style and rename CalcitePPLTestCase back to CalcitePPLIntegTestCase

Signed-off-by: Lantao Jin <ltjin@amazon.com>

---------

Signed-off-by: Lantao Jin <ltjin@amazon.com>
  • Loading branch information
LantaoJin authored Feb 26, 2025
1 parent cb103d7 commit 4ba78d3
Show file tree
Hide file tree
Showing 43 changed files with 1,550 additions and 231 deletions.
14 changes: 9 additions & 5 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,15 @@ allprojects {
resolutionStrategy.force "org.jetbrains.kotlin:kotlin-stdlib:1.9.10"
resolutionStrategy.force "org.jetbrains.kotlin:kotlin-stdlib-common:1.9.10"
resolutionStrategy.force "net.bytebuddy:byte-buddy:1.14.9"
resolutionStrategy.force "org.apache.httpcomponents.client5:httpclient5:5.3.1"
resolutionStrategy.force 'org.apache.httpcomponents.core5:httpcore5:5.2.5'
resolutionStrategy.force 'org.apache.httpcomponents.core5:httpcore5-h2:5.2.5'
resolutionStrategy.force 'com.fasterxml.jackson.core:jackson-annotations:2.17.2'
resolutionStrategy.force 'com.fasterxml.jackson:jackson-bom:2.17.2'
resolutionStrategy.force "org.apache.httpcomponents.client5:httpclient5:${versions.httpclient5}"
resolutionStrategy.force "org.apache.httpcomponents.core5:httpcore5:${versions.httpcore5}"
resolutionStrategy.force "org.apache.httpcomponents.core5:httpcore5-h2:${versions.httpcore5}"
resolutionStrategy.force "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}"
resolutionStrategy.force "com.fasterxml.jackson.core:jackson-core:${versions.jackson}"
resolutionStrategy.force "com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}"
resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:${versions.jackson}"
resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-smile:${versions.jackson}"
resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:${versions.jackson}"
resolutionStrategy.force 'com.google.protobuf:protobuf-java:3.25.5'
resolutionStrategy.force 'org.locationtech.jts:jts-core:1.19.0'
resolutionStrategy.force 'com.google.errorprone:error_prone_annotations:2.28.0'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public enum Key {

/** Enable Calcite as execution engine */
CALCITE_ENGINE_ENABLED("plugins.calcite.enabled"),
CALCITE_FALLBACK_ALLOWED("plugins.calcite.fallback.allowed"),

/** Query Settings. */
FIELD_TYPE_TOLERANCE("plugins.query.field_type_tolerance"),
Expand Down
5 changes: 2 additions & 3 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,14 @@ jacocoTestCoverageVerification {
'org.opensearch.sql.datasource.model.DataSource',
'org.opensearch.sql.datasource.model.DataSourceStatus',
'org.opensearch.sql.datasource.model.DataSourceType',
'org.opensearch.sql.executor.ExecutionEngine'
]
limit {
counter = 'LINE'
minimum = 0.5 // calcite dev only
minimum = 0.0 // calcite dev only
}
limit {
counter = 'BRANCH'
minimum = 0.5 // calcite dev only
minimum = 0.0 // calcite dev only
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,29 @@

package org.opensearch.sql.calcite;

import java.sql.Connection;
import java.util.function.BiFunction;
import lombok.Getter;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.RelBuilder;
import org.opensearch.sql.ast.expression.UnresolvedExpression;
import org.opensearch.sql.calcite.utils.CalciteToolsHelper;

public class CalcitePlanContext {

public FrameworkConfig config;
public final Connection connection;
public final RelBuilder relBuilder;
public final ExtendedRexBuilder rexBuilder;

@Getter private boolean isResolvingJoinCondition = false;

public CalcitePlanContext(FrameworkConfig config) {
private CalcitePlanContext(FrameworkConfig config, JavaTypeFactory typeFactory) {
this.config = config;
this.relBuilder = RelBuilder.create(config);
this.connection = CalciteToolsHelper.connect(config, typeFactory);
this.relBuilder = CalciteToolsHelper.create(config, typeFactory, connection);
this.rexBuilder = new ExtendedRexBuilder(relBuilder.getRexBuilder());
}

Expand All @@ -35,8 +40,11 @@ public RexNode resolveJoinCondition(
return result;
}

// for testing only
public static CalcitePlanContext create(FrameworkConfig config) {
return new CalcitePlanContext(config);
return new CalcitePlanContext(config, null);
}

public static CalcitePlanContext create(FrameworkConfig config, JavaTypeFactory typeFactory) {
return new CalcitePlanContext(config, typeFactory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,14 @@ public RelNode visitFilter(Filter node, CalcitePlanContext context) {
@Override
public RelNode visitProject(Project node, CalcitePlanContext context) {
visitChildren(node, context);
List<RexNode> projectList =
node.getProjectList().stream()
.filter(expr -> !(expr instanceof AllFields))
.map(expr -> rexVisitor.analyze(expr, context))
.collect(Collectors.toList());
if (projectList.isEmpty()) {
List<RexNode> projectList;
if (node.getProjectList().stream().anyMatch(e -> e instanceof AllFields)) {
return context.relBuilder.peek();
} else {
projectList =
node.getProjectList().stream()
.map(expr -> rexVisitor.analyze(expr, context))
.collect(Collectors.toList());
}
if (node.isExcluded()) {
context.relBuilder.projectExcept(projectList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Schemas;
import org.apache.calcite.schema.TranslatableTable;
import org.opensearch.sql.calcite.utils.OpenSearchRelDataTypes;
import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory;

public abstract class OpenSearchTable extends AbstractQueryableTable
implements TranslatableTable, org.opensearch.sql.storage.Table {
Expand All @@ -27,7 +27,7 @@ protected OpenSearchTable(Type elementType) {

@Override
public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
return OpenSearchRelDataTypes.convertSchema(this);
return OpenSearchTypeFactory.convertSchema(this);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

/*
* This file contains code from the Apache Calcite project (original license below).
* It contains modifications, which are licensed as above:
*/

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.opensearch.sql.calcite.utils;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Properties;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.avatica.AvaticaConnection;
import org.apache.calcite.avatica.AvaticaFactory;
import org.apache.calcite.avatica.UnregisteredDriver;
import org.apache.calcite.config.CalciteConnectionProperty;
import org.apache.calcite.interpreter.Bindables;
import org.apache.calcite.jdbc.CalciteFactory;
import org.apache.calcite.jdbc.CalciteJdbc41Factory;
import org.apache.calcite.jdbc.CalcitePrepare;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.jdbc.Driver;
import org.apache.calcite.plan.Context;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptSchema;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.prepare.CalciteCatalogReader;
import org.apache.calcite.prepare.CalcitePrepareImpl;
import org.apache.calcite.rel.RelHomogeneousShuttle;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelShuttle;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.logical.LogicalTableScan;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.server.CalciteServerStatement;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.tools.RelBuilder;
import org.apache.calcite.tools.RelRunner;
import org.apache.calcite.util.Util;
import org.opensearch.sql.calcite.CalcitePlanContext;

/**
* Calcite Tools Helper. This class is used to create customized: 1. Connection 2. JavaTypeFactory
* 3. RelBuilder 4. RelRunner TODO delete it in future if possible.
*/
public class CalciteToolsHelper {

/** Create a RelBuilder with testing */
public static RelBuilder create(FrameworkConfig config) {
return RelBuilder.create(config);
}

/** Create a RelBuilder with typeFactory */
public static RelBuilder create(
FrameworkConfig config, JavaTypeFactory typeFactory, Connection connection) {
return withPrepare(
config,
typeFactory,
connection,
(cluster, relOptSchema, rootSchema, statement) ->
new OpenSearchRelBuilder(config.getContext(), cluster, relOptSchema));
}

public static Connection connect(FrameworkConfig config, JavaTypeFactory typeFactory) {
final Properties info = new Properties();
if (config.getTypeSystem() != RelDataTypeSystem.DEFAULT) {
info.setProperty(
CalciteConnectionProperty.TYPE_SYSTEM.camelName(),
config.getTypeSystem().getClass().getName());
}
try {
return new OpenSearchDriver().connect("jdbc:calcite:", info, null, typeFactory);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}

/**
* This method copied from {@link Frameworks#withPrepare(FrameworkConfig,
* Frameworks.BasePrepareAction)}. The purpose is the method {@link
* CalciteFactory#newConnection(UnregisteredDriver, AvaticaFactory, String, Properties)} create
* connection with null instance of JavaTypeFactory. So we add a parameter JavaTypeFactory.
*/
private static <R> R withPrepare(
FrameworkConfig config,
JavaTypeFactory typeFactory,
Connection connection,
Frameworks.BasePrepareAction<R> action) {
try {
final Properties info = new Properties();
if (config.getTypeSystem() != RelDataTypeSystem.DEFAULT) {
info.setProperty(
CalciteConnectionProperty.TYPE_SYSTEM.camelName(),
config.getTypeSystem().getClass().getName());
}
final CalciteServerStatement statement =
connection.createStatement().unwrap(CalciteServerStatement.class);
return new OpenSearchPrepareImpl().perform(statement, config, typeFactory, action);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

public static class OpenSearchDriver extends Driver {

public Connection connect(
String url, Properties info, CalciteSchema rootSchema, JavaTypeFactory typeFactory)
throws SQLException {
CalciteJdbc41Factory factory = new CalciteJdbc41Factory();
AvaticaConnection connection =
factory.newConnection((Driver) this, factory, url, info, rootSchema, typeFactory);
this.handler.onConnectionInit(connection);
return connection;
}
}

/** do nothing, just extend for a public construct for new */
public static class OpenSearchRelBuilder extends RelBuilder {
public OpenSearchRelBuilder(Context context, RelOptCluster cluster, RelOptSchema relOptSchema) {
super(context, cluster, relOptSchema);
}
}

public static class OpenSearchPrepareImpl extends CalcitePrepareImpl {
/**
* Similar to {@link CalcitePrepareImpl#perform(CalciteServerStatement, FrameworkConfig,
* Frameworks.BasePrepareAction)}, but with a custom typeFactory.
*/
public <R> R perform(
CalciteServerStatement statement,
FrameworkConfig config,
JavaTypeFactory typeFactory,
Frameworks.BasePrepareAction<R> action) {
final CalcitePrepare.Context prepareContext = statement.createPrepareContext();
SchemaPlus defaultSchema = config.getDefaultSchema();
final CalciteSchema schema =
defaultSchema != null
? CalciteSchema.from(defaultSchema)
: prepareContext.getRootSchema();
CalciteCatalogReader catalogReader =
new CalciteCatalogReader(
schema.root(), schema.path(null), typeFactory, prepareContext.config());
final RexBuilder rexBuilder = new RexBuilder(typeFactory);
final RelOptPlanner planner =
createPlanner(prepareContext, config.getContext(), config.getCostFactory());
final RelOptCluster cluster = createCluster(planner, rexBuilder);
return action.apply(cluster, catalogReader, prepareContext.getRootSchema().plus(), statement);
}
}

public static class OpenSearchRelRunners {
/**
* Runs a relational expression by existing connection. This class copied from {@link
* org.apache.calcite.tools.RelRunners#run(RelNode)}
*/
public static PreparedStatement run(CalcitePlanContext context, RelNode rel) {
final RelShuttle shuttle =
new RelHomogeneousShuttle() {
@Override
public RelNode visit(TableScan scan) {
final RelOptTable table = scan.getTable();
if (scan instanceof LogicalTableScan
&& Bindables.BindableTableScan.canHandle(table)) {
// Always replace the LogicalTableScan with BindableTableScan
// because it's implementation does not require a "schema" as context.
return Bindables.BindableTableScan.create(scan.getCluster(), table);
}
return super.visit(scan);
}
};
rel = rel.accept(shuttle);
// the line we changed here
try (Connection connection = context.connection) {
final RelRunner runner = connection.unwrap(RelRunner.class);
return runner.prepareStatement(rel);
} catch (SQLException e) {
throw Util.throwAsRuntime(e);
}
}
}
}
Loading

0 comments on commit 4ba78d3

Please sign in to comment.