Skip to content

Commit

Permalink
Lazily construct OpenSearchRequestBuilder and do push down
Browse files Browse the repository at this point in the history
Signed-off-by: Heng Qian <qianheng@amazon.com>
  • Loading branch information
qianheng-aws committed Feb 26, 2025
1 parent 4698600 commit efc8cd8
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public void onMatch(RelOptRuleCall call) {
}

protected void apply(RelOptRuleCall call, Filter filter, CalciteOpenSearchIndexScan scan) {
if (scan.pushDownFilter(filter)) {
CalciteOpenSearchIndexScan newScan = scan.pushDownFilter(filter);
if (newScan != null) {
call.transformTo(scan);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import static java.util.Objects.requireNonNull;

import java.util.ArrayDeque;
import java.util.List;
import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
import org.apache.calcite.adapter.enumerable.PhysType;
Expand Down Expand Up @@ -36,18 +37,19 @@
import org.opensearch.sql.opensearch.planner.physical.OpenSearchIndexRules;
import org.opensearch.sql.opensearch.request.OpenSearchRequestBuilder;
import org.opensearch.sql.opensearch.request.PredicateAnalyzer;
import org.opensearch.sql.opensearch.request.PredicateAnalyzer.ExpressionNotAnalyzableException;
import org.opensearch.sql.opensearch.request.PredicateAnalyzer.PredicateAnalyzerException;
import org.opensearch.sql.opensearch.storage.OpenSearchIndex;

/** Relational expression representing a scan of an OpenSearchIndex type. */
public class CalciteOpenSearchIndexScan extends OpenSearchTableScan {
private static final Logger LOG = LogManager.getLogger(CalciteOpenSearchIndexScan.class);

private final OpenSearchIndex osIndex;
private final OpenSearchRequestBuilder requestBuilder;
// The schema of this scan operator, it's initialized with the row type of the table, but may be
// changed by push down operations.
private final RelDataType schema;

private final PushDownContext pushDownContext;

/**
* Creates an CalciteOpenSearchIndexScan.
*
Expand All @@ -57,24 +59,31 @@ public class CalciteOpenSearchIndexScan extends OpenSearchTableScan {
*/
public CalciteOpenSearchIndexScan(
RelOptCluster cluster, RelOptTable table, OpenSearchIndex index) {
this(cluster, table, index, index.createRequestBuilder(), table.getRowType());
this(cluster, table, index, table.getRowType(), null);
}

public CalciteOpenSearchIndexScan(
private CalciteOpenSearchIndexScan(
RelOptCluster cluster,
RelOptTable table,
OpenSearchIndex index,
OpenSearchRequestBuilder requestBuilder,
RelDataType schema) {
RelDataType schema,
PushDownContext pushDownContext) {
super(cluster, table);
this.osIndex = requireNonNull(index, "OpenSearch index");
this.requestBuilder = requestBuilder;
this.schema = schema;
this.pushDownContext = pushDownContext == null ? new PushDownContext() : pushDownContext;
}

public CalciteOpenSearchIndexScan copy() {
return new CalciteOpenSearchIndexScan(
getCluster(), table, osIndex, this.schema, pushDownContext.clone());
}

public CalciteOpenSearchIndexScan copyWithNewSchema(RelDataType schema) {
// TODO: need to do deep-copy on requestBuilder in case non-idempotent push down.
return new CalciteOpenSearchIndexScan(getCluster(), table, osIndex, requestBuilder, schema);
// Do shallow copy for requestBuilder, thus requestBuilder among different plans produced in the
// optimization process won't affect each other.
return new CalciteOpenSearchIndexScan(
getCluster(), table, osIndex, schema, pushDownContext.clone());
}

@Override
Expand Down Expand Up @@ -115,6 +124,8 @@ public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
}

public Enumerable<@Nullable Object> scan() {
OpenSearchRequestBuilder requestBuilder = osIndex.createRequestBuilder();
pushDownContext.forEach(action -> action.apply(requestBuilder));
return new AbstractEnumerable<>() {
@Override
public Enumerator<Object> enumerator() {
Expand All @@ -127,17 +138,18 @@ public Enumerator<Object> enumerator() {
};
}

public boolean pushDownFilter(Filter filter) {
public CalciteOpenSearchIndexScan pushDownFilter(Filter filter) {
try {
CalciteOpenSearchIndexScan newScan = this.copyWithNewSchema(filter.getRowType());
List<String> schema = this.getRowType().getFieldNames();
QueryBuilder filterBuilder = PredicateAnalyzer.analyze(filter.getCondition(), schema);
requestBuilder.pushDownFilter(filterBuilder);
newScan.pushDownContext.add(requestBuilder -> requestBuilder.pushDownFilter(filterBuilder));
// TODO: handle the case where condition contains a score function
return true;
} catch (ExpressionNotAnalyzableException | PredicateAnalyzerException e) {
return newScan;
} catch (Exception e) {
LOG.warn("Cannot analyze the filter condition {}", filter.getCondition(), e);
}
return false;
return null;
}

/**
Expand All @@ -152,7 +164,19 @@ public CalciteOpenSearchIndexScan pushDownProject(List<Integer> selectedColumns)
}
RelDataType newSchema = builder.build();
CalciteOpenSearchIndexScan newScan = this.copyWithNewSchema(newSchema);
newScan.requestBuilder.pushDownProjectStream(newSchema.getFieldNames().stream());
newScan.pushDownContext.add(
requestBuilder -> requestBuilder.pushDownProjectStream(newSchema.getFieldNames().stream()));
return newScan;
}

static class PushDownContext extends ArrayDeque<PushDownAction> {
@Override
public PushDownContext clone() {
return (PushDownContext) super.clone();
}
}

private interface PushDownAction {
void apply(OpenSearchRequestBuilder requestBuilder);
}
}

0 comments on commit efc8cd8

Please sign in to comment.