Skip to content

Commit d4bc358

Browse files
committed
ESQL: Compute engine support for tagged queries (elastic#128521)
Begins adding support for running "tagged queries" to the compute engine. Here, it's just the `LuceneSourceOperator` because that's useful and contained. Example time! Say you are running: ``` FROM foo | STATS MAX(v) BY ROUND_TO(g, 0, 100, 1000, 100000) ``` It's *often* faster to run this as four queries: * The docs that round to `0` * The docs that round to `100` * The docs that round to `1000` * The docs that round to `100000` This creates an ESQL operator that can run these queries, one after the other and attach those tags. Aggs uses this trick and it's *way* faster when it can push down count queries, but it's still faster when it pushes doc loading things. This implementation in `LuceneSourceOperator` is quite similar to the doc loading version in _search. I don't have performance measurements yet because I haven't plugged this into the language. In _search we call this `filter-by-filter` and enable it when each group averages to more than 5000 documents and when there isn't an `_doc_count` field. It's faster in those cases not to push. I expect we'll be pretty similar.
1 parent b0beb54 commit d4bc358

25 files changed

+502
-118
lines changed

test/framework/src/main/java/org/elasticsearch/indices/CrankyCircuitBreakerService.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public class CrankyCircuitBreakerService extends CircuitBreakerService {
2929
*/
3030
public static final String ERROR_MESSAGE = "cranky breaker";
3131

32-
private final CircuitBreaker breaker = new CircuitBreaker() {
32+
public static final class CrankyCircuitBreaker implements CircuitBreaker {
3333
private final AtomicLong used = new AtomicLong();
3434

3535
@Override
@@ -82,7 +82,9 @@ public Durability getDurability() {
8282
public void setLimitAndOverhead(long limit, double overhead) {
8383

8484
}
85-
};
85+
}
86+
87+
private final CrankyCircuitBreaker breaker = new CrankyCircuitBreaker();
8688

8789
@Override
8890
public CircuitBreaker getBreaker(String name) {

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneCountOperator.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
import org.apache.lucene.search.DocIdStream;
1111
import org.apache.lucene.search.LeafCollector;
12-
import org.apache.lucene.search.Query;
1312
import org.apache.lucene.search.Scorable;
1413
import org.apache.lucene.search.ScoreMode;
1514
import org.apache.lucene.search.Weight;
@@ -44,7 +43,7 @@ public static class Factory extends LuceneOperator.Factory {
4443

4544
public Factory(
4645
List<? extends ShardContext> contexts,
47-
Function<ShardContext, Query> queryFunction,
46+
Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction,
4847
DataPartitioning dataPartitioning,
4948
int taskConcurrency,
5049
int limit
@@ -121,6 +120,9 @@ protected Page getCheckedOutput() throws IOException {
121120
if (scorer == null) {
122121
remainingDocs = 0;
123122
} else {
123+
if (scorer.tags().isEmpty() == false) {
124+
throw new UnsupportedOperationException("tags not supported by " + getClass());
125+
}
124126
Weight weight = scorer.weight();
125127
var leafReaderContext = scorer.leafReaderContext();
126128
// see org.apache.lucene.search.TotalHitCountCollector

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMaxFactory.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import org.apache.lucene.index.NumericDocValues;
1111
import org.apache.lucene.index.PointValues;
1212
import org.apache.lucene.index.SortedNumericDocValues;
13-
import org.apache.lucene.search.Query;
1413
import org.apache.lucene.search.ScoreMode;
1514
import org.apache.lucene.util.NumericUtils;
1615
import org.elasticsearch.compute.data.Block;
@@ -114,7 +113,7 @@ public final long evaluate(long value1, long value2) {
114113

115114
public LuceneMaxFactory(
116115
List<? extends ShardContext> contexts,
117-
Function<ShardContext, Query> queryFunction,
116+
Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction,
118117
DataPartitioning dataPartitioning,
119118
int taskConcurrency,
120119
String fieldName,

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMinFactory.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import org.apache.lucene.index.NumericDocValues;
1111
import org.apache.lucene.index.PointValues;
1212
import org.apache.lucene.index.SortedNumericDocValues;
13-
import org.apache.lucene.search.Query;
1413
import org.apache.lucene.search.ScoreMode;
1514
import org.apache.lucene.util.NumericUtils;
1615
import org.elasticsearch.compute.data.Block;
@@ -114,7 +113,7 @@ public final long evaluate(long value1, long value2) {
114113

115114
public LuceneMinFactory(
116115
List<? extends ShardContext> contexts,
117-
Function<ShardContext, Query> queryFunction,
116+
Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction,
118117
DataPartitioning dataPartitioning,
119118
int taskConcurrency,
120119
String fieldName,

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMinMaxOperator.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,9 @@ public Page getCheckedOutput() throws IOException {
102102
if (scorer == null) {
103103
remainingDocs = 0;
104104
} else {
105+
if (scorer.tags().isEmpty() == false) {
106+
throw new UnsupportedOperationException("tags not supported by " + getClass());
107+
}
105108
final LeafReader reader = scorer.leafReaderContext().reader();
106109
final Query query = scorer.weight().getQuery();
107110
if (query == null || query instanceof MatchAllDocsQuery) {

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ public abstract static class Factory implements SourceOperator.SourceOperatorFac
9595
*/
9696
protected Factory(
9797
List<? extends ShardContext> contexts,
98-
Function<ShardContext, Query> queryFunction,
98+
Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction,
9999
DataPartitioning dataPartitioning,
100100
Function<Query, LuceneSliceQueue.PartitioningStrategy> autoStrategy,
101101
int taskConcurrency,
@@ -153,10 +153,13 @@ LuceneScorer getCurrentOrLoadNextScorer() {
153153
final PartialLeafReaderContext partialLeaf = currentSlice.getLeaf(sliceIndex++);
154154
logger.trace("Starting {}", partialLeaf);
155155
final LeafReaderContext leaf = partialLeaf.leafReaderContext();
156-
if (currentScorer == null || currentScorer.leafReaderContext() != leaf) {
156+
if (currentScorer == null // First time
157+
|| currentScorer.leafReaderContext() != leaf // Moved to a new leaf
158+
|| currentScorer.weight != currentSlice.weight() // Moved to a new query
159+
) {
157160
final Weight weight = currentSlice.weight();
158161
processedQueries.add(weight.getQuery());
159-
currentScorer = new LuceneScorer(currentSlice.shardContext(), weight, leaf);
162+
currentScorer = new LuceneScorer(currentSlice.shardContext(), weight, currentSlice.tags(), leaf);
160163
}
161164
assert currentScorer.maxPosition <= partialLeaf.maxDoc() : currentScorer.maxPosition + ">" + partialLeaf.maxDoc();
162165
currentScorer.maxPosition = partialLeaf.maxDoc();
@@ -175,15 +178,17 @@ static final class LuceneScorer {
175178
private final ShardContext shardContext;
176179
private final Weight weight;
177180
private final LeafReaderContext leafReaderContext;
181+
private final List<Object> tags;
178182

179183
private BulkScorer bulkScorer;
180184
private int position;
181185
private int maxPosition;
182186
private Thread executingThread;
183187

184-
LuceneScorer(ShardContext shardContext, Weight weight, LeafReaderContext leafReaderContext) {
188+
LuceneScorer(ShardContext shardContext, Weight weight, List<Object> tags, LeafReaderContext leafReaderContext) {
185189
this.shardContext = shardContext;
186190
this.weight = weight;
191+
this.tags = tags;
187192
this.leafReaderContext = leafReaderContext;
188193
reinitialize();
189194
}
@@ -228,6 +233,13 @@ Weight weight() {
228233
int position() {
229234
return position;
230235
}
236+
237+
/**
238+
* Tags to add to the data returned by this query.
239+
*/
240+
List<Object> tags() {
241+
return tags;
242+
}
231243
}
232244

233245
@Override

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSlice.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
/**
1515
* Holds a list of multiple partial Lucene segments
1616
*/
17-
public record LuceneSlice(ShardContext shardContext, List<PartialLeafReaderContext> leaves, Weight weight) {
17+
public record LuceneSlice(ShardContext shardContext, List<PartialLeafReaderContext> leaves, Weight weight, List<Object> tags) {
1818
int numLeaves() {
1919
return leaves.size();
2020
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java

Lines changed: 63 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,47 @@
3131

3232
/**
3333
* Shared Lucene slices between Lucene operators.
34+
* <p>
35+
* Each shard is {@link #create built} with a list of queries to run and
36+
* tags to add to the queries ({@code List<QueryAndTags>}). Some examples:
37+
* </p>
38+
* <ul>
39+
* <li>
40+
* For queries like {@code FROM foo} we'll use a one element list
41+
* containing {@code match_all, []}. It loads all documents in the
42+
* index and append no extra fields to the loaded documents.
43+
* </li>
44+
* <li>
45+
* For queries like {@code FROM foo | WHERE a > 10} we'll use a one
46+
* element list containing {@code +single_value(a) +(a > 10), []}.
47+
* It loads all documents where {@code a} is single valued and
48+
* greater than 10.
49+
* </li>
50+
* <li>
51+
* For queries like {@code FROM foo | STATS MAX(b) BY ROUND_TO(a, 0, 100)}
52+
* we'll use a two element list containing
53+
* <ul>
54+
* <li>{@code +single_value(a) +(a < 100), [0]}</li>
55+
* <li>{@code +single_value(a) +(a >= 100), [100]}</li>
56+
* </ul>
57+
* It loads all documents in the index where {@code a} is single
58+
* valued and adds a constant {@code 0} to the documents where
59+
* {@code a < 100} and the constant {@code 100} to the documents
60+
* where {@code a >= 100}.
61+
* </li>
62+
* </ul>
63+
* <p>
64+
* IMPORTANT: Runners make no effort to deduplicate the results from multiple
65+
* queries. If you need to only see each document one time then make sure the
66+
* queries are mutually exclusive.
67+
* </p>
3468
*/
3569
public final class LuceneSliceQueue {
70+
/**
71+
* Query to run and tags to add to the results.
72+
*/
73+
public record QueryAndTags(Query query, List<Object> tags) {}
74+
3675
public static final int MAX_DOCS_PER_SLICE = 250_000; // copied from IndexSearcher
3776
public static final int MAX_SEGMENTS_PER_SLICE = 5; // copied from IndexSearcher
3877

@@ -64,7 +103,7 @@ public Map<String, PartitioningStrategy> partitioningStrategies() {
64103

65104
public static LuceneSliceQueue create(
66105
List<? extends ShardContext> contexts,
67-
Function<ShardContext, Query> queryFunction,
106+
Function<ShardContext, List<QueryAndTags>> queryFunction,
68107
DataPartitioning dataPartitioning,
69108
Function<Query, PartitioningStrategy> autoStrategy,
70109
int taskConcurrency,
@@ -73,27 +112,29 @@ public static LuceneSliceQueue create(
73112
List<LuceneSlice> slices = new ArrayList<>();
74113
Map<String, PartitioningStrategy> partitioningStrategies = new HashMap<>(contexts.size());
75114
for (ShardContext ctx : contexts) {
76-
Query query = queryFunction.apply(ctx);
77-
query = scoreMode.needsScores() ? query : new ConstantScoreQuery(query);
78-
/*
79-
* Rewrite the query on the local index so things like fully
80-
* overlapping range queries become match all. It's important
81-
* to do this before picking the partitioning strategy so we
82-
* can pick more aggressive strategies when the query rewrites
83-
* into MatchAll.
84-
*/
85-
try {
86-
query = ctx.searcher().rewrite(query);
87-
} catch (IOException e) {
88-
throw new UncheckedIOException(e);
89-
}
90-
PartitioningStrategy partitioning = PartitioningStrategy.pick(dataPartitioning, autoStrategy, ctx, query);
91-
partitioningStrategies.put(ctx.shardIdentifier(), partitioning);
92-
List<List<PartialLeafReaderContext>> groups = partitioning.groups(ctx.searcher(), taskConcurrency);
93-
Weight weight = weight(ctx, query, scoreMode);
94-
for (List<PartialLeafReaderContext> group : groups) {
95-
if (group.isEmpty() == false) {
96-
slices.add(new LuceneSlice(ctx, group, weight));
115+
for (QueryAndTags queryAndExtra : queryFunction.apply(ctx)) {
116+
Query query = queryAndExtra.query;
117+
query = scoreMode.needsScores() ? query : new ConstantScoreQuery(query);
118+
/*
119+
* Rewrite the query on the local index so things like fully
120+
* overlapping range queries become match all. It's important
121+
* to do this before picking the partitioning strategy so we
122+
* can pick more aggressive strategies when the query rewrites
123+
* into MatchAll.
124+
*/
125+
try {
126+
query = ctx.searcher().rewrite(query);
127+
} catch (IOException e) {
128+
throw new UncheckedIOException(e);
129+
}
130+
PartitioningStrategy partitioning = PartitioningStrategy.pick(dataPartitioning, autoStrategy, ctx, query);
131+
partitioningStrategies.put(ctx.shardIdentifier(), partitioning);
132+
List<List<PartialLeafReaderContext>> groups = partitioning.groups(ctx.searcher(), taskConcurrency);
133+
Weight weight = weight(ctx, query, scoreMode);
134+
for (List<PartialLeafReaderContext> group : groups) {
135+
if (group.isEmpty() == false) {
136+
slices.add(new LuceneSlice(ctx, group, weight, queryAndExtra.tags));
137+
}
97138
}
98139
}
99140
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@
1717
import org.apache.lucene.search.MatchNoDocsQuery;
1818
import org.apache.lucene.search.Query;
1919
import org.apache.lucene.search.Scorable;
20+
import org.elasticsearch.compute.data.Block;
2021
import org.elasticsearch.compute.data.BlockFactory;
21-
import org.elasticsearch.compute.data.DocBlock;
22+
import org.elasticsearch.compute.data.BlockUtils;
2223
import org.elasticsearch.compute.data.DocVector;
2324
import org.elasticsearch.compute.data.DoubleVector;
2425
import org.elasticsearch.compute.data.IntVector;
@@ -64,7 +65,7 @@ public static class Factory extends LuceneOperator.Factory {
6465

6566
public Factory(
6667
List<? extends ShardContext> contexts,
67-
Function<ShardContext, Query> queryFunction,
68+
Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction,
6869
DataPartitioning dataPartitioning,
6970
int taskConcurrency,
7071
int maxPageSize,
@@ -321,28 +322,29 @@ public Page getCheckedOutput() throws IOException {
321322
IntVector shard = null;
322323
IntVector leaf = null;
323324
IntVector docs = null;
324-
DoubleVector scores = null;
325-
DocBlock docBlock = null;
325+
Block[] blocks = new Block[1 + (scoreBuilder == null ? 0 : 1) + scorer.tags().size()];
326326
currentPagePos -= discardedDocs;
327327
try {
328328
shard = blockFactory.newConstantIntVector(scorer.shardContext().index(), currentPagePos);
329329
leaf = blockFactory.newConstantIntVector(scorer.leafReaderContext().ord, currentPagePos);
330330
docs = buildDocsVector(currentPagePos);
331331
docsBuilder = blockFactory.newIntVectorBuilder(Math.min(remainingDocs, maxPageSize));
332-
docBlock = new DocVector(shard, leaf, docs, true).asBlock();
332+
int b = 0;
333+
blocks[b++] = new DocVector(shard, leaf, docs, true).asBlock();
333334
shard = null;
334335
leaf = null;
335336
docs = null;
336-
if (scoreBuilder == null) {
337-
page = new Page(currentPagePos, docBlock);
338-
} else {
339-
scores = buildScoresVector(currentPagePos);
337+
if (scoreBuilder != null) {
338+
blocks[b++] = buildScoresVector(currentPagePos).asBlock();
340339
scoreBuilder = blockFactory.newDoubleVectorBuilder(Math.min(remainingDocs, maxPageSize));
341-
page = new Page(currentPagePos, docBlock, scores.asBlock());
342340
}
341+
for (Object e : scorer.tags()) {
342+
blocks[b++] = BlockUtils.constantBlock(blockFactory, e, currentPagePos);
343+
}
344+
page = new Page(currentPagePos, blocks);
343345
} finally {
344346
if (page == null) {
345-
Releasables.closeExpectNoException(shard, leaf, docs, docBlock, scores);
347+
Releasables.closeExpectNoException(shard, leaf, docs, Releasables.wrap(blocks));
346348
}
347349
}
348350
currentPagePos = 0;

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public static class Factory extends LuceneOperator.Factory {
5757

5858
public Factory(
5959
List<? extends ShardContext> contexts,
60-
Function<ShardContext, Query> queryFunction,
60+
Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction,
6161
DataPartitioning dataPartitioning,
6262
int taskConcurrency,
6363
int maxPageSize,
@@ -170,6 +170,9 @@ private Page collect() throws IOException {
170170
return emit(true);
171171
}
172172
try {
173+
if (scorer.tags().isEmpty() == false) {
174+
throw new UnsupportedOperationException("tags not supported by " + getClass());
175+
}
173176
if (perShardCollector == null || perShardCollector.shardContext.index() != scorer.shardContext().index()) {
174177
// TODO: share the bottom between shardCollectors
175178
perShardCollector = newPerShardCollector(scorer.shardContext(), sorts, needsScore, limit);

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorFactory.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import org.apache.lucene.index.SortedDocValues;
1212
import org.apache.lucene.index.SortedNumericDocValues;
1313
import org.apache.lucene.search.DocIdSetIterator;
14-
import org.apache.lucene.search.Query;
1514
import org.apache.lucene.search.ScoreMode;
1615
import org.apache.lucene.search.Scorer;
1716
import org.apache.lucene.search.Weight;
@@ -51,7 +50,7 @@ public class TimeSeriesSortedSourceOperatorFactory extends LuceneOperator.Factor
5150

5251
private TimeSeriesSortedSourceOperatorFactory(
5352
List<? extends ShardContext> contexts,
54-
Function<ShardContext, Query> queryFunction,
53+
Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction,
5554
int taskConcurrency,
5655
int maxPageSize,
5756
int limit
@@ -84,7 +83,7 @@ public static TimeSeriesSortedSourceOperatorFactory create(
8483
int maxPageSize,
8584
int taskConcurrency,
8685
List<? extends ShardContext> searchContexts,
87-
Function<ShardContext, Query> queryFunction
86+
Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction
8887
) {
8988
return new TimeSeriesSortedSourceOperatorFactory(searchContexts, queryFunction, taskConcurrency, maxPageSize, limit);
9089
}

0 commit comments

Comments
 (0)