Skip to content

Commit

Permalink
[Calcite Engine] Push down project and filter operator into index scan (
Browse files Browse the repository at this point in the history
#3327)

* Support Filter and Project pushdown

Signed-off-by: Heng Qian <qianheng@amazon.com>

* Support Filter and Project pushdown v2

Signed-off-by: Heng Qian <qianheng@amazon.com>

* Address comments

Signed-off-by: Heng Qian <qianheng@amazon.com>

* Add original license for PredicateAnalyzer

Signed-off-by: Heng Qian <qianheng@amazon.com>

---------

Signed-off-by: Heng Qian <qianheng@amazon.com>
  • Loading branch information
qianheng-aws authored Feb 21, 2025
1 parent e7188da commit cb103d7
Show file tree
Hide file tree
Showing 14 changed files with 1,501 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import java.util.function.BiFunction;
import lombok.Getter;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.RelBuilder;
Expand All @@ -16,15 +15,13 @@
public class CalcitePlanContext {

public FrameworkConfig config;
public CalciteConnection connection;
public final RelBuilder relBuilder;
public final ExtendedRexBuilder rexBuilder;

@Getter private boolean isResolvingJoinCondition = false;

public CalcitePlanContext(FrameworkConfig config, CalciteConnection connection) {
public CalcitePlanContext(FrameworkConfig config) {
this.config = config;
this.connection = connection;
this.relBuilder = RelBuilder.create(config);
this.rexBuilder = new ExtendedRexBuilder(relBuilder.getRexBuilder());
}
Expand All @@ -40,6 +37,6 @@ public RexNode resolveJoinCondition(

// for testing only
public static CalcitePlanContext create(FrameworkConfig config) {
return new CalcitePlanContext(config, null);
return new CalcitePlanContext(config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@
import org.apache.calcite.linq4j.QueryProvider;
import org.apache.calcite.linq4j.Queryable;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.schema.SchemaPlus;
Expand All @@ -33,12 +30,6 @@ public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
return OpenSearchRelDataTypes.convertSchema(this);
}

@Override
public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) {
final RelOptCluster cluster = context.getCluster();
return new OpenSearchTableScan(cluster, relOptTable, this);
}

@Override
public <T> Queryable<T> asQueryable(
QueryProvider queryProvider, SchemaPlus schema, String tableName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,46 +5,26 @@

package org.opensearch.sql.calcite.plan;

import static java.util.Objects.requireNonNull;

import com.google.common.collect.ImmutableList;
import java.util.List;
import org.apache.calcite.adapter.enumerable.EnumerableConvention;
import org.apache.calcite.adapter.enumerable.EnumerableRel;
import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
import org.apache.calcite.adapter.enumerable.PhysType;
import org.apache.calcite.adapter.enumerable.PhysTypeImpl;
import org.apache.calcite.linq4j.tree.Blocks;
import org.apache.calcite.linq4j.tree.Expressions;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.rules.CoreRules;

/** Relational expression representing a scan of an OpenSearch type. */
public class OpenSearchTableScan extends TableScan implements EnumerableRel {
private final OpenSearchTable osTable;

public abstract class OpenSearchTableScan extends TableScan implements EnumerableRel {
/**
* Creates an OpenSearchTableScan.
*
* @param cluster Cluster
* @param table Table
* @param osTable OpenSearch table
*/
OpenSearchTableScan(RelOptCluster cluster, RelOptTable table, OpenSearchTable osTable) {
protected OpenSearchTableScan(RelOptCluster cluster, RelOptTable table) {
super(cluster, cluster.traitSetOf(EnumerableConvention.INSTANCE), ImmutableList.of(), table);
this.osTable = requireNonNull(osTable, "OpenSearch table");
}

@Override
public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
assert inputs.isEmpty();
return new OpenSearchTableScan(getCluster(), table, osTable);
}

@Override
Expand All @@ -57,16 +37,4 @@ public void register(RelOptPlanner planner) {
// it is converted to cardinality aggregation in OpenSearch
planner.removeRule(CoreRules.AGGREGATE_EXPAND_DISTINCT_AGGREGATES);
}

@Override
public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
PhysType physType =
PhysTypeImpl.of(implementor.getTypeFactory(), getRowType(), pref.preferArray());

return implementor.result(
physType,
Blocks.toBlock(
Expressions.call(
requireNonNull(table.getExpression(OpenSearchTable.class)), "search")));
}
}
37 changes: 9 additions & 28 deletions core/src/main/java/org/opensearch/sql/executor/QueryService.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,7 @@
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.RequiredArgsConstructor;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.jdbc.CalciteJdbc41Factory;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.jdbc.Driver;
import org.apache.calcite.plan.RelTraitDef;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.metadata.DefaultRelMetadataProvider;
Expand Down Expand Up @@ -80,29 +77,8 @@ public void execute(
AccessController.doPrivileged(
(PrivilegedAction<Void>)
() -> {
// Use simple calcite schema since we don't compute tables in advance of the
// query.
CalciteSchema rootSchema = CalciteSchema.createRootSchema(true, false);
CalciteJdbc41Factory factory = new CalciteJdbc41Factory();
CalciteConnection connection =
factory.newConnection(
new Driver(),
factory,
"",
new java.util.Properties(),
rootSchema,
null);
final SchemaPlus defaultSchema =
connection
.getRootSchema()
.add(
OpenSearchSchema.OPEN_SEARCH_SCHEMA_NAME,
new OpenSearchSchema(dataSourceService));
// Set opensearch schema as the default schema in config, otherwise we need to
// explicitly
// add schema path 'OpenSearch' before the opensearch table name
final FrameworkConfig config = buildFrameworkConfig(defaultSchema);
final CalcitePlanContext context = new CalcitePlanContext(config, connection);
final FrameworkConfig config = buildFrameworkConfig();
final CalcitePlanContext context = new CalcitePlanContext(config);
executePlanByCalcite(analyze(plan, context), context, listener);
return null;
});
Expand Down Expand Up @@ -174,10 +150,15 @@ public RelNode analyze(UnresolvedPlan plan, CalcitePlanContext context) {
return relNodeVisitor.analyze(plan, context);
}

private FrameworkConfig buildFrameworkConfig(SchemaPlus defaultSchema) {
private FrameworkConfig buildFrameworkConfig() {
// Use simple calcite schema since we don't compute tables in advance of the query.
final SchemaPlus rootSchema = CalciteSchema.createRootSchema(true, false).plus();
final SchemaPlus opensearchSchema =
rootSchema.add(
OpenSearchSchema.OPEN_SEARCH_SCHEMA_NAME, new OpenSearchSchema(dataSourceService));
return Frameworks.newConfigBuilder()
.parserConfig(SqlParser.Config.DEFAULT) // TODO check
.defaultSchema(defaultSchema)
.defaultSchema(opensearchSchema)
.traitDefs((List<RelTraitDef>) null)
.programs(Programs.calc(DefaultRelMetadataProvider.INSTANCE))
.typeSystem(OpenSearchTypeSystem.INSTANCE)
Expand Down
3 changes: 3 additions & 0 deletions opensearch/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ dependencies {
compileOnly group: 'org.opensearch.client', name: 'opensearch-rest-high-level-client', version: "${opensearch_version}"
implementation group: 'org.opensearch', name:'opensearch-ml-client', version: "${opensearch_build}"

annotationProcessor 'org.immutables:value:2.8.8'
compileOnly 'org.immutables:value-annotations:2.8.8'

testImplementation('org.junit.jupiter:junit-jupiter-api:5.9.3')
testImplementation('org.junit.jupiter:junit-jupiter-params:5.9.3')
testRuntimeOnly('org.junit.jupiter:junit-jupiter-engine:5.9.3')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import java.security.AccessController;
import java.security.PrivilegedAction;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
Expand All @@ -18,7 +17,7 @@
import java.util.Map;
import lombok.RequiredArgsConstructor;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.tools.RelRunner;
import org.apache.calcite.tools.RelRunners;
import org.opensearch.sql.calcite.CalcitePlanContext;
import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.data.model.ExprTupleValue;
Expand Down Expand Up @@ -111,13 +110,9 @@ public void execute(
AccessController.doPrivileged(
(PrivilegedAction<Void>)
() -> {
Connection connection = context.connection;
try {
RelRunner relRunner = connection.unwrap(RelRunner.class);
try (PreparedStatement statement = relRunner.prepareStatement(rel)) {
ResultSet resultSet = statement.executeQuery();
buildResultSet(resultSet, listener);
}
try (PreparedStatement statement = RelRunners.run(rel)) {
ResultSet result = statement.executeQuery();
buildResultSet(result, listener);
return null;
} catch (SQLException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.sql.opensearch.planner.physical;

import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelRule;
import org.apache.calcite.rel.core.Filter;
import org.immutables.value.Value;
import org.opensearch.sql.opensearch.storage.OpenSearchIndex;
import org.opensearch.sql.opensearch.storage.scan.CalciteOpenSearchIndexScan;

/** Planner rule that push a {@link Filter} down to {@link CalciteOpenSearchIndexScan} */
@Value.Enclosing
public class OpenSearchFilterIndexScanRule extends RelRule<OpenSearchFilterIndexScanRule.Config> {

/** Creates a OpenSearchFilterIndexScanRule. */
protected OpenSearchFilterIndexScanRule(Config config) {
super(config);
}

protected static boolean test(CalciteOpenSearchIndexScan scan) {
final RelOptTable table = scan.getTable();
return table.unwrap(OpenSearchIndex.class) != null;
}

@Override
public void onMatch(RelOptRuleCall call) {
if (call.rels.length == 2) {
// the ordinary variant
final Filter filter = call.rel(0);
final CalciteOpenSearchIndexScan scan = call.rel(1);
apply(call, filter, scan);
} else {
throw new AssertionError(
String.format(
"The length of rels should be %s but got %s",
this.operands.size(), call.rels.length));
}
}

protected void apply(RelOptRuleCall call, Filter filter, CalciteOpenSearchIndexScan scan) {
if (scan.pushDownFilter(filter)) {
call.transformTo(scan);
}
}

/** Rule configuration. */
@Value.Immutable
public interface Config extends RelRule.Config {
/** Config that matches Filter on CalciteOpenSearchIndexScan. */
Config DEFAULT =
ImmutableOpenSearchFilterIndexScanRule.Config.builder()
.build()
.withOperandSupplier(
b0 ->
b0.operand(Filter.class)
.oneInput(
b1 ->
b1.operand(CalciteOpenSearchIndexScan.class)
.predicate(OpenSearchFilterIndexScanRule::test)
.noInputs()));

@Override
default OpenSearchFilterIndexScanRule toRule() {
return new OpenSearchFilterIndexScanRule(this);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.opensearch.planner.physical;

import com.google.common.collect.ImmutableList;
import java.util.List;
import org.apache.calcite.plan.RelOptRule;

public class OpenSearchIndexRules {
private static final OpenSearchProjectIndexScanRule PROJECT_INDEX_SCAN =
OpenSearchProjectIndexScanRule.Config.DEFAULT.toRule();
private static final OpenSearchFilterIndexScanRule FILTER_INDEX_SCAN =
OpenSearchFilterIndexScanRule.Config.DEFAULT.toRule();

public static final List<RelOptRule> OPEN_SEARCH_INDEX_SCAN_RULES =
ImmutableList.of(PROJECT_INDEX_SCAN, FILTER_INDEX_SCAN);

// prevent instantiation
private OpenSearchIndexRules() {}
}
Loading

0 comments on commit cb103d7

Please sign in to comment.