Skip to content

Commit 110bbe8

Browse files
authored
ESQL: Simplify agg construction when reducing on node-level (#128980)
1 parent ba6987f commit 110bbe8

File tree

2 files changed

+1
-10
lines changed

2 files changed

+1
-10
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import org.elasticsearch.xpack.esql.expression.function.aggregate.Count;
3434
import org.elasticsearch.xpack.esql.expression.function.grouping.Categorize;
3535
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
36-
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSourceExec;
3736
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesAggregateExec;
3837
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.LocalExecutionPlannerContext;
3938
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.PhysicalOperation;
@@ -71,14 +70,6 @@ public final PhysicalOperation groupingPhysicalOperation(
7170

7271
var sourceLayout = source.layout;
7372

74-
if (aggregatorMode != AggregatorMode.INITIAL && aggregatorMode != AggregatorMode.FINAL) {
75-
assert false : "Invalid aggregator mode [" + aggregatorMode + "]";
76-
}
77-
if (aggregatorMode == AggregatorMode.INITIAL && aggregateExec.child() instanceof ExchangeSourceExec) {
78-
// the reducer step at data node (local) level
79-
aggregatorMode = AggregatorMode.INTERMEDIATE;
80-
}
81-
8273
if (aggregateExec.groupings().isEmpty()) {
8374
// not grouping
8475
List<Aggregator.Factory> aggregatorFactories = new ArrayList<>();

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ public static PhysicalPlan reductionPlan(PhysicalPlan plan) {
124124
final LocalMapper mapper = new LocalMapper();
125125
PhysicalPlan reducePlan = mapper.map(pipelineBreaker);
126126
if (reducePlan instanceof AggregateExec agg) {
127-
reducePlan = agg.withMode(AggregatorMode.INITIAL); // force to emit intermediate outputs
127+
reducePlan = agg.withMode(AggregatorMode.INTERMEDIATE);
128128
}
129129
return EstimatesRowSize.estimateRowSize(fragment.estimatedRowSize(), reducePlan);
130130
}

0 commit comments

Comments
 (0)