Skip to content

Commit 17c1eac

Browse files
committed
ESQL: Support running tagged queries
Begins adding support for running "tagged queries" to the compute engine. Here, it's just the `LuceneSourceOperator` because that's useful and contained. Example time! Say you are running: ``` FROM foo | STATS MAX(v) BY ROUND_TO(g, 0, 100, 1000, 100000) ``` It's *often* faster to run this as four queries: * The docs that round to `0` * The docs that round to `100` * The docs that round to `1000` * The docs that round to `100000` This creates an ESQL operator that can run these queries, one after the other and attach those tags. Aggs uses this trick and it's *way* faster when it can push down count queries, but it's still faster when it pushes doc loading things. This implementation in `LuceneSourceOperator` is quite similar to the doc loading version in _search. I don't have performance measurements yet because I haven't plugged this into the language. In _search we call this `filter-by-filter` and enable it when each group averages to more than 5000 documents and when there isn't an `_doc_count` field. It's faster in those cases not to push. I expect we'll be pretty similar.
1 parent 7207692 commit 17c1eac

27 files changed

+400
-117
lines changed

docs/reference/query-languages/esql/_snippets/functions/appendix/values.md

Lines changed: 3 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

docs/reference/query-languages/esql/kibana/definition/functions/values.json

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

docs/reference/query-languages/esql/kibana/docs/functions/values.md

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneCountOperator.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
import org.apache.lucene.search.DocIdStream;
1111
import org.apache.lucene.search.LeafCollector;
12-
import org.apache.lucene.search.Query;
1312
import org.apache.lucene.search.Scorable;
1413
import org.apache.lucene.search.ScoreMode;
1514
import org.apache.lucene.search.Weight;
@@ -44,7 +43,7 @@ public static class Factory extends LuceneOperator.Factory {
4443

4544
public Factory(
4645
List<? extends ShardContext> contexts,
47-
Function<ShardContext, Query> queryFunction,
46+
Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction,
4847
DataPartitioning dataPartitioning,
4948
int taskConcurrency,
5049
int limit
@@ -121,6 +120,9 @@ protected Page getCheckedOutput() throws IOException {
121120
if (scorer == null) {
122121
remainingDocs = 0;
123122
} else {
123+
if (scorer.tags().isEmpty() == false) {
124+
throw new UnsupportedOperationException("extra not supported by " + getClass());
125+
}
124126
Weight weight = scorer.weight();
125127
var leafReaderContext = scorer.leafReaderContext();
126128
// see org.apache.lucene.search.TotalHitCountCollector

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMaxFactory.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import org.apache.lucene.index.NumericDocValues;
1111
import org.apache.lucene.index.PointValues;
1212
import org.apache.lucene.index.SortedNumericDocValues;
13-
import org.apache.lucene.search.Query;
1413
import org.apache.lucene.search.ScoreMode;
1514
import org.apache.lucene.util.NumericUtils;
1615
import org.elasticsearch.compute.data.Block;
@@ -114,7 +113,7 @@ public final long evaluate(long value1, long value2) {
114113

115114
public LuceneMaxFactory(
116115
List<? extends ShardContext> contexts,
117-
Function<ShardContext, Query> queryFunction,
116+
Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction,
118117
DataPartitioning dataPartitioning,
119118
int taskConcurrency,
120119
String fieldName,

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMinFactory.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import org.apache.lucene.index.NumericDocValues;
1111
import org.apache.lucene.index.PointValues;
1212
import org.apache.lucene.index.SortedNumericDocValues;
13-
import org.apache.lucene.search.Query;
1413
import org.apache.lucene.search.ScoreMode;
1514
import org.apache.lucene.util.NumericUtils;
1615
import org.elasticsearch.compute.data.Block;
@@ -114,7 +113,7 @@ public final long evaluate(long value1, long value2) {
114113

115114
public LuceneMinFactory(
116115
List<? extends ShardContext> contexts,
117-
Function<ShardContext, Query> queryFunction,
116+
Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction,
118117
DataPartitioning dataPartitioning,
119118
int taskConcurrency,
120119
String fieldName,

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMinMaxOperator.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,9 @@ public Page getCheckedOutput() throws IOException {
102102
if (scorer == null) {
103103
remainingDocs = 0;
104104
} else {
105+
if (scorer.tags().isEmpty() == false) {
106+
throw new UnsupportedOperationException("extra not supported by " + getClass());
107+
}
105108
final LeafReader reader = scorer.leafReaderContext().reader();
106109
final Query query = scorer.weight().getQuery();
107110
if (query == null || query instanceof MatchAllDocsQuery) {

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public abstract static class Factory implements SourceOperator.SourceOperatorFac
9797
*/
9898
protected Factory(
9999
List<? extends ShardContext> contexts,
100-
Function<ShardContext, Query> queryFunction,
100+
Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction,
101101
DataPartitioning dataPartitioning,
102102
Function<Query, LuceneSliceQueue.PartitioningStrategy> autoStrategy,
103103
int taskConcurrency,
@@ -158,7 +158,7 @@ LuceneScorer getCurrentOrLoadNextScorer() {
158158
if (currentScorer == null || currentScorer.leafReaderContext() != leaf) {
159159
final Weight weight = currentSlice.weight();
160160
processedQueries.add(weight.getQuery());
161-
currentScorer = new LuceneScorer(currentSlice.shardContext(), weight, leaf);
161+
currentScorer = new LuceneScorer(currentSlice.shardContext(), weight, currentSlice.extra(), leaf);
162162
}
163163
assert currentScorer.maxPosition <= partialLeaf.maxDoc() : currentScorer.maxPosition + ">" + partialLeaf.maxDoc();
164164
currentScorer.maxPosition = partialLeaf.maxDoc();
@@ -177,15 +177,17 @@ static final class LuceneScorer {
177177
private final ShardContext shardContext;
178178
private final Weight weight;
179179
private final LeafReaderContext leafReaderContext;
180+
private final List<Object> tags;
180181

181182
private BulkScorer bulkScorer;
182183
private int position;
183184
private int maxPosition;
184185
private Thread executingThread;
185186

186-
LuceneScorer(ShardContext shardContext, Weight weight, LeafReaderContext leafReaderContext) {
187+
LuceneScorer(ShardContext shardContext, Weight weight, List<Object> tags, LeafReaderContext leafReaderContext) {
187188
this.shardContext = shardContext;
188189
this.weight = weight;
190+
this.tags = tags;
189191
this.leafReaderContext = leafReaderContext;
190192
reinitialize();
191193
}
@@ -230,6 +232,13 @@ Weight weight() {
230232
int position() {
231233
return position;
232234
}
235+
236+
/**
237+
* Tags to add to the data returned by this query.
238+
*/
239+
List<Object> tags() {
240+
return tags;
241+
}
233242
}
234243

235244
@Override

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSlice.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
/**
1515
* Holds a list of multiple partial Lucene segments
1616
*/
17-
public record LuceneSlice(ShardContext shardContext, List<PartialLeafReaderContext> leaves, Weight weight) {
17+
public record LuceneSlice(ShardContext shardContext, List<PartialLeafReaderContext> leaves, Weight weight, List<Object> extra) {
1818
int numLeaves() {
1919
return leaves.size();
2020
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java

Lines changed: 58 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,42 @@
3232

3333
/**
3434
* Shared Lucene slices between Lucene operators.
35+
* <p>
36+
* Each shard is {@link #create built} with a list of queries to run and
37+
* tags to add to the queries ({@code List<QueryAndTags>}). Some examples:
38+
* </p>
39+
* <ul>
40+
* <li>
41+
* For queries like {@code FROM foo} we'll use a one element list
42+
* containing {@code match_all, []}. It loads all documents in the
43+
* index and append no extra fields to the loaded documents.
44+
* </li>
45+
* <li>
46+
* For queries like {@code FROM foo | WHERE a > 10} we'll use a one
47+
* element list containing {@code +single_value(a) +(a > 10), []}.
48+
* It loads all documents where {@code a} is single valued and
49+
* greater than 10.
50+
* </li>
51+
* <li>
52+
* For queries like {@code FROM foo | STATS MAX(b) BY ROUND_TO(a, 0, 100)}
53+
* we'll use a two element list containing
54+
* <ul>
55+
* <li>{@code +single_value(a) +(a < 100), [0]}</li>
56+
* <li>{@code +single_value(a) +(a >= 100), [100]}</li>
57+
* </ul>
58+
* It loads all documents in the index where {@code a} is single
59+
* valued and adds a constant {@code 0} to the documents where
60+
* {@code a < 100} and the constant {@code 100} to the documents
61+
* where {@code a >= 100}.
62+
* </li>
63+
* </ul>
3564
*/
3665
public final class LuceneSliceQueue {
66+
/**
67+
* Query to run and tags to add to the results.
68+
*/
69+
public record QueryAndTags(Query query, List<Object> tags) {}
70+
3771
public static final int MAX_DOCS_PER_SLICE = 250_000; // copied from IndexSearcher
3872
public static final int MAX_SEGMENTS_PER_SLICE = 5; // copied from IndexSearcher
3973

@@ -69,7 +103,7 @@ public Collection<String> remainingShardsIdentifiers() {
69103

70104
public static LuceneSliceQueue create(
71105
List<? extends ShardContext> contexts,
72-
Function<ShardContext, Query> queryFunction,
106+
Function<ShardContext, List<QueryAndTags>> queryFunction,
73107
DataPartitioning dataPartitioning,
74108
Function<Query, PartitioningStrategy> autoStrategy,
75109
int taskConcurrency,
@@ -78,27 +112,29 @@ public static LuceneSliceQueue create(
78112
List<LuceneSlice> slices = new ArrayList<>();
79113
Map<String, PartitioningStrategy> partitioningStrategies = new HashMap<>(contexts.size());
80114
for (ShardContext ctx : contexts) {
81-
Query query = queryFunction.apply(ctx);
82-
query = scoreMode.needsScores() ? query : new ConstantScoreQuery(query);
83-
/*
84-
* Rewrite the query on the local index so things like fully
85-
* overlapping range queries become match all. It's important
86-
* to do this before picking the partitioning strategy so we
87-
* can pick more aggressive strategies when the query rewrites
88-
* into MatchAll.
89-
*/
90-
try {
91-
query = ctx.searcher().rewrite(query);
92-
} catch (IOException e) {
93-
throw new UncheckedIOException(e);
94-
}
95-
PartitioningStrategy partitioning = PartitioningStrategy.pick(dataPartitioning, autoStrategy, ctx, query);
96-
partitioningStrategies.put(ctx.shardIdentifier(), partitioning);
97-
List<List<PartialLeafReaderContext>> groups = partitioning.groups(ctx.searcher(), taskConcurrency);
98-
Weight weight = weight(ctx, query, scoreMode);
99-
for (List<PartialLeafReaderContext> group : groups) {
100-
if (group.isEmpty() == false) {
101-
slices.add(new LuceneSlice(ctx, group, weight));
115+
for (QueryAndTags queryAndExtra : queryFunction.apply(ctx)) {
116+
Query query = queryAndExtra.query;
117+
query = scoreMode.needsScores() ? query : new ConstantScoreQuery(query);
118+
/*
119+
* Rewrite the query on the local index so things like fully
120+
* overlapping range queries become match all. It's important
121+
* to do this before picking the partitioning strategy so we
122+
* can pick more aggressive strategies when the query rewrites
123+
* into MatchAll.
124+
*/
125+
try {
126+
query = ctx.searcher().rewrite(query);
127+
} catch (IOException e) {
128+
throw new UncheckedIOException(e);
129+
}
130+
PartitioningStrategy partitioning = PartitioningStrategy.pick(dataPartitioning, autoStrategy, ctx, query);
131+
partitioningStrategies.put(ctx.shardIdentifier(), partitioning);
132+
List<List<PartialLeafReaderContext>> groups = partitioning.groups(ctx.searcher(), taskConcurrency);
133+
Weight weight = weight(ctx, query, scoreMode);
134+
for (List<PartialLeafReaderContext> group : groups) {
135+
if (group.isEmpty() == false) {
136+
slices.add(new LuceneSlice(ctx, group, weight, queryAndExtra.tags));
137+
}
102138
}
103139
}
104140
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@
1717
import org.apache.lucene.search.MatchNoDocsQuery;
1818
import org.apache.lucene.search.Query;
1919
import org.apache.lucene.search.Scorable;
20+
import org.elasticsearch.compute.data.Block;
2021
import org.elasticsearch.compute.data.BlockFactory;
21-
import org.elasticsearch.compute.data.DocBlock;
22+
import org.elasticsearch.compute.data.BlockUtils;
2223
import org.elasticsearch.compute.data.DocVector;
2324
import org.elasticsearch.compute.data.DoubleVector;
2425
import org.elasticsearch.compute.data.IntVector;
@@ -64,7 +65,7 @@ public static class Factory extends LuceneOperator.Factory {
6465

6566
public Factory(
6667
List<? extends ShardContext> contexts,
67-
Function<ShardContext, Query> queryFunction,
68+
Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction,
6869
DataPartitioning dataPartitioning,
6970
int taskConcurrency,
7071
int maxPageSize,
@@ -320,28 +321,29 @@ public Page getCheckedOutput() throws IOException {
320321
IntVector shard = null;
321322
IntVector leaf = null;
322323
IntVector docs = null;
323-
DoubleVector scores = null;
324-
DocBlock docBlock = null;
324+
Block[] blocks = new Block[1 + (scoreBuilder == null ? 0 : 1) + scorer.tags().size()];
325325
currentPagePos -= discardedDocs;
326326
try {
327327
shard = blockFactory.newConstantIntVector(scorer.shardContext().index(), currentPagePos);
328328
leaf = blockFactory.newConstantIntVector(scorer.leafReaderContext().ord, currentPagePos);
329329
docs = buildDocsVector(currentPagePos);
330330
docsBuilder = blockFactory.newIntVectorBuilder(Math.min(remainingDocs, maxPageSize));
331-
docBlock = new DocVector(shard, leaf, docs, true).asBlock();
331+
int b = 0;
332+
blocks[b++] = new DocVector(shard, leaf, docs, true).asBlock();
332333
shard = null;
333334
leaf = null;
334335
docs = null;
335-
if (scoreBuilder == null) {
336-
page = new Page(currentPagePos, docBlock);
337-
} else {
338-
scores = buildScoresVector(currentPagePos);
336+
if (scoreBuilder != null) {
337+
blocks[b++] = buildScoresVector(currentPagePos).asBlock();
339338
scoreBuilder = blockFactory.newDoubleVectorBuilder(Math.min(remainingDocs, maxPageSize));
340-
page = new Page(currentPagePos, docBlock, scores.asBlock());
341339
}
340+
for (Object e : scorer.tags()) {
341+
blocks[b++] = BlockUtils.constantBlock(blockFactory, e, currentPagePos);
342+
}
343+
page = new Page(currentPagePos, blocks);
342344
} finally {
343345
if (page == null) {
344-
Releasables.closeExpectNoException(shard, leaf, docs, docBlock, scores);
346+
Releasables.closeExpectNoException(shard, leaf, docs, Releasables.wrap(blocks));
345347
}
346348
}
347349
currentPagePos = 0;

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public static class Factory extends LuceneOperator.Factory {
5858

5959
public Factory(
6060
List<? extends ShardContext> contexts,
61-
Function<ShardContext, Query> queryFunction,
61+
Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction,
6262
DataPartitioning dataPartitioning,
6363
int taskConcurrency,
6464
int maxPageSize,
@@ -171,6 +171,9 @@ private Page collect() throws IOException {
171171
return emit(true);
172172
}
173173
try {
174+
if (scorer.tags().isEmpty() == false) {
175+
throw new UnsupportedOperationException("extra not supported by " + getClass());
176+
}
174177
if (perShardCollector == null || perShardCollector.shardContext.index() != scorer.shardContext().index()) {
175178
// TODO: share the bottom between shardCollectors
176179
perShardCollector = newPerShardCollector(scorer.shardContext(), sorts, needsScore, limit);

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperator.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,9 @@ public Page getCheckedOutput() throws IOException {
116116
doneCollecting = true;
117117
return null;
118118
}
119+
if (slice.extra().isEmpty() == false) {
120+
throw new UnsupportedOperationException("extra not supported by " + getClass());
121+
}
119122
Releasables.close(fieldsReader);
120123
fieldsReader = new ShardLevelFieldsReader(blockFactory, slice.shardContext(), fieldsToExtracts);
121124
iterator = new SegmentsIterator(slice);

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorFactory.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
package org.elasticsearch.compute.lucene;
99

10-
import org.apache.lucene.search.Query;
1110
import org.apache.lucene.search.ScoreMode;
1211
import org.elasticsearch.compute.operator.DriverContext;
1312
import org.elasticsearch.compute.operator.SourceOperator;
@@ -37,7 +36,7 @@ private TimeSeriesSourceOperatorFactory(
3736
List<? extends ShardContext> contexts,
3837
boolean emitDocIds,
3938
List<ValuesSourceReaderOperator.FieldInfo> fieldsToExact,
40-
Function<ShardContext, Query> queryFunction,
39+
Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction,
4140
int taskConcurrency,
4241
int maxPageSize,
4342
int limit
@@ -74,7 +73,7 @@ public static TimeSeriesSourceOperatorFactory create(
7473
boolean emitDocIds,
7574
List<? extends ShardContext> contexts,
7675
List<ValuesSourceReaderOperator.FieldInfo> fieldsToExact,
77-
Function<ShardContext, Query> queryFunction
76+
Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction
7877
) {
7978
return new TimeSeriesSourceOperatorFactory(contexts, emitDocIds, fieldsToExact, queryFunction, taskConcurrency, maxPageSize, limit);
8079
}

0 commit comments

Comments
 (0)