From efc8cd8282b205c68262711045372c8d5aae9aa3 Mon Sep 17 00:00:00 2001 From: Heng Qian Date: Wed, 26 Feb 2025 16:21:21 +0800 Subject: [PATCH] Lazily construct OpenSearchRequestBuilder and do push down Signed-off-by: Heng Qian --- .../OpenSearchFilterIndexScanRule.java | 3 +- .../scan/CalciteOpenSearchIndexScan.java | 56 +++++++++++++------ 2 files changed, 42 insertions(+), 17 deletions(-) diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchFilterIndexScanRule.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchFilterIndexScanRule.java index f8beb339fe..2b1e8e5312 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchFilterIndexScanRule.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchFilterIndexScanRule.java @@ -42,7 +42,8 @@ public void onMatch(RelOptRuleCall call) { } protected void apply(RelOptRuleCall call, Filter filter, CalciteOpenSearchIndexScan scan) { - if (scan.pushDownFilter(filter)) { + CalciteOpenSearchIndexScan newScan = scan.pushDownFilter(filter); + if (newScan != null) { call.transformTo(scan); } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteOpenSearchIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteOpenSearchIndexScan.java index 54a350c742..c32a13a230 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteOpenSearchIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteOpenSearchIndexScan.java @@ -7,6 +7,7 @@ import static java.util.Objects.requireNonNull; +import java.util.ArrayDeque; import java.util.List; import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor; import org.apache.calcite.adapter.enumerable.PhysType; @@ -36,8 +37,6 @@ import org.opensearch.sql.opensearch.planner.physical.OpenSearchIndexRules; import org.opensearch.sql.opensearch.request.OpenSearchRequestBuilder; import org.opensearch.sql.opensearch.request.PredicateAnalyzer; -import org.opensearch.sql.opensearch.request.PredicateAnalyzer.ExpressionNotAnalyzableException; -import org.opensearch.sql.opensearch.request.PredicateAnalyzer.PredicateAnalyzerException; import org.opensearch.sql.opensearch.storage.OpenSearchIndex; /** Relational expression representing a scan of an OpenSearchIndex type. */ @@ -45,9 +44,12 @@ public class CalciteOpenSearchIndexScan extends OpenSearchTableScan { private static final Logger LOG = LogManager.getLogger(CalciteOpenSearchIndexScan.class); private final OpenSearchIndex osIndex; - private final OpenSearchRequestBuilder requestBuilder; + // The schema of this scan operator, it's initialized with the row type of the table, but may be + // changed by push down operations. private final RelDataType schema; + private final PushDownContext pushDownContext; + /** * Creates an CalciteOpenSearchIndexScan. * @@ -57,24 +59,31 @@ public class CalciteOpenSearchIndexScan extends OpenSearchTableScan { */ public CalciteOpenSearchIndexScan( RelOptCluster cluster, RelOptTable table, OpenSearchIndex index) { - this(cluster, table, index, index.createRequestBuilder(), table.getRowType()); + this(cluster, table, index, table.getRowType(), null); } - public CalciteOpenSearchIndexScan( + private CalciteOpenSearchIndexScan( RelOptCluster cluster, RelOptTable table, OpenSearchIndex index, - OpenSearchRequestBuilder requestBuilder, - RelDataType schema) { + RelDataType schema, + PushDownContext pushDownContext) { super(cluster, table); this.osIndex = requireNonNull(index, "OpenSearch index"); - this.requestBuilder = requestBuilder; this.schema = schema; + this.pushDownContext = pushDownContext == null ? new PushDownContext() : pushDownContext; + } + + public CalciteOpenSearchIndexScan copy() { + return new CalciteOpenSearchIndexScan( + getCluster(), table, osIndex, this.schema, pushDownContext.clone()); } public CalciteOpenSearchIndexScan copyWithNewSchema(RelDataType schema) { - // TODO: need to do deep-copy on requestBuilder in case non-idempotent push down. - return new CalciteOpenSearchIndexScan(getCluster(), table, osIndex, requestBuilder, schema); + // Do shallow copy for requestBuilder, thus requestBuilder among different plans produced in the + // optimization process won't affect each other. + return new CalciteOpenSearchIndexScan( + getCluster(), table, osIndex, schema, pushDownContext.clone()); } @Override @@ -115,6 +124,8 @@ public Result implement(EnumerableRelImplementor implementor, Prefer pref) { } public Enumerable<@Nullable Object> scan() { + OpenSearchRequestBuilder requestBuilder = osIndex.createRequestBuilder(); + pushDownContext.forEach(action -> action.apply(requestBuilder)); return new AbstractEnumerable<>() { @Override public Enumerator enumerator() { @@ -127,17 +138,18 @@ public Enumerator enumerator() { }; } - public boolean pushDownFilter(Filter filter) { + public CalciteOpenSearchIndexScan pushDownFilter(Filter filter) { try { + CalciteOpenSearchIndexScan newScan = this.copyWithNewSchema(filter.getRowType()); List schema = this.getRowType().getFieldNames(); QueryBuilder filterBuilder = PredicateAnalyzer.analyze(filter.getCondition(), schema); - requestBuilder.pushDownFilter(filterBuilder); + newScan.pushDownContext.add(requestBuilder -> requestBuilder.pushDownFilter(filterBuilder)); // TODO: handle the case where condition contains a score function - return true; - } catch (ExpressionNotAnalyzableException | PredicateAnalyzerException e) { + return newScan; + } catch (Exception e) { LOG.warn("Cannot analyze the filter condition {}", filter.getCondition(), e); } - return false; + return null; } /** @@ -152,7 +164,19 @@ public CalciteOpenSearchIndexScan pushDownProject(List selectedColumns) } RelDataType newSchema = builder.build(); CalciteOpenSearchIndexScan newScan = this.copyWithNewSchema(newSchema); - newScan.requestBuilder.pushDownProjectStream(newSchema.getFieldNames().stream()); + newScan.pushDownContext.add( + requestBuilder -> requestBuilder.pushDownProjectStream(newSchema.getFieldNames().stream())); return newScan; } + + static class PushDownContext extends ArrayDeque { + @Override + public PushDownContext clone() { + return (PushDownContext) super.clone(); + } + } + + private interface PushDownAction { + void apply(OpenSearchRequestBuilder requestBuilder); + } }