Skip to content

ESQL: Add documents_found and values_loaded #125631

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 25 commits into from
Apr 16, 2025
Merged
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
90de284
ESQL: Add `found_documents` to task and profile
nik9000 Mar 25, 2025
4875c5e
More composite tests
nik9000 Mar 25, 2025
fb36e7e
Plumbing
nik9000 Mar 25, 2025
b5bf239
Add to response
nik9000 Mar 25, 2025
7d306c0
Update docs/changelog/125631.yaml
nik9000 Mar 25, 2025
ce44024
Merge branch 'main' into esql_found_docs
nik9000 Mar 26, 2025
aebd712
Merge remote-tracking branch 'nik9000/esql_found_docs' into esql_foun…
nik9000 Mar 26, 2025
75cef17
Tets
nik9000 Mar 26, 2025
12aafc4
fix more
nik9000 Mar 26, 2025
62b5008
[CI] Auto commit changes from spotless
elasticsearchmachine Mar 26, 2025
add4368
Merge branch 'main' into esql_found_docs
nik9000 Mar 26, 2025
9335c3e
Fixup
nik9000 Mar 26, 2025
5067e28
Fixup
nik9000 Mar 26, 2025
4d6e8db
Merge remote-tracking branch 'nik9000/esql_found_docs' into esql_foun…
nik9000 Mar 26, 2025
fa3487e
Merge branch 'main' into esql_found_docs
nik9000 Apr 1, 2025
28e9fa4
Rename
nik9000 Apr 1, 2025
62a6d3e
Merge branch 'main' into esql_found_docs
nik9000 Apr 2, 2025
4201ae5
Merge branch 'main' into esql_found_docs
nik9000 Apr 11, 2025
3011344
Merge branch 'main' into esql_found_docs
nik9000 Apr 15, 2025
79f7c39
Merge branch 'main' into esql_found_docs
nik9000 Apr 15, 2025
fbbe29c
Update
nik9000 Apr 15, 2025
0953790
Merge branch 'main' into esql_found_docs
nik9000 Apr 15, 2025
25ec61d
Merge branch 'main' into esql_found_docs
nik9000 Apr 16, 2025
a2a9a9e
Merge remote-tracking branch 'nik9000/esql_found_docs' into esql_foun…
nik9000 Apr 16, 2025
4e3136a
Shift assertion
nik9000 Apr 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/125631.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 125631
summary: Add `documents_found` and `values_loaded`
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
@@ -222,6 +222,7 @@ static TransportVersion def(int id) {
public static final TransportVersion AMAZON_BEDROCK_TASK_SETTINGS = def(9_049_0_00);
public static final TransportVersion ESQL_REPORT_SHARD_PARTITIONING = def(9_050_0_00);
public static final TransportVersion ESQL_QUERY_PLANNING_DURATION = def(9_051_0_00);
public static final TransportVersion ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED = def(9_052_0_00);

/*
* STOP! READ THIS FIRST! No, really,
2 changes: 1 addition & 1 deletion server/src/main/java/org/elasticsearch/common/Strings.java
Original file line number Diff line number Diff line change
@@ -822,7 +822,7 @@ public static String toString(ChunkedToXContent chunkedToXContent, boolean prett
* Allows to configure the params.
* Allows to control whether the outputted json needs to be pretty printed and human readable.
*/
private static String toString(ToXContent toXContent, ToXContent.Params params, boolean pretty, boolean human) {
public static String toString(ToXContent toXContent, ToXContent.Params params, boolean pretty, boolean human) {
try {
XContentBuilder builder = createBuilder(pretty, human);
if (toXContent.isFragment()) {
Original file line number Diff line number Diff line change
@@ -2672,8 +2672,13 @@ protected static MapMatcher getProfileMatcher() {
.entry("drivers", instanceOf(List.class));
}

protected static MapMatcher getResultMatcher(boolean includeMetadata, boolean includePartial) {
protected static MapMatcher getResultMatcher(boolean includeMetadata, boolean includePartial, boolean includeDocumentsFound) {
MapMatcher mapMatcher = matchesMap();
if (includeDocumentsFound) {
// Older versions may not return documents_found and values_loaded.
mapMatcher = mapMatcher.entry("documents_found", greaterThanOrEqualTo(0));
mapMatcher = mapMatcher.entry("values_loaded", greaterThanOrEqualTo(0));
}
if (includeMetadata) {
mapMatcher = mapMatcher.entry("took", greaterThanOrEqualTo(0));
}
@@ -2688,7 +2693,7 @@ protected static MapMatcher getResultMatcher(boolean includeMetadata, boolean in
* Create empty result matcher from result, taking into account all metadata items.
*/
protected static MapMatcher getResultMatcher(Map<String, Object> result) {
return getResultMatcher(result.containsKey("took"), result.containsKey("is_partial"));
return getResultMatcher(result.containsKey("took"), result.containsKey("is_partial"), result.containsKey("documents_found"));
}

/**
Original file line number Diff line number Diff line change
@@ -83,7 +83,11 @@ public int getPositionCount() {

@Override
public int getTotalValueCount() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be changed to long, or are we not worried about overflows here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overflow would be a bug in planning. That'd be us using billions of values in a single block....

throw new UnsupportedOperationException("Composite block");
int totalValueCount = 0;
for (Block b : blocks) {
totalValueCount += b.getTotalValueCount();
}
return totalValueCount;
}

@Override
Original file line number Diff line number Diff line change
@@ -434,6 +434,11 @@ public Map<String, LuceneSliceQueue.PartitioningStrategy> partitioningStrategies
return partitioningStrategies;
}

@Override
public long documentsFound() {
return rowsEmitted;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
Original file line number Diff line number Diff line change
@@ -47,6 +47,8 @@
import java.util.function.IntFunction;
import java.util.function.Supplier;

import static org.elasticsearch.TransportVersions.ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED;

/**
* Operator that extracts doc_values from a Lucene index out of pages that have been produced by {@link LuceneSourceOperator}
* and outputs them to a new column.
@@ -113,6 +115,7 @@ public record ShardContext(IndexReader reader, Supplier<SourceLoader> newSourceL
private final BlockFactory blockFactory;

private final Map<String, Integer> readersBuilt = new TreeMap<>();
private long valuesLoaded;

int lastShard = -1;
int lastSegment = -1;
@@ -165,6 +168,9 @@ public int get(int i) {
}
}
success = true;
for (Block b : blocks) {
valuesLoaded += b.getTotalValueCount();
}
return page.appendBlocks(blocks);
} catch (IOException e) {
throw new UncheckedIOException(e);
@@ -547,7 +553,7 @@ public String toString() {

@Override
protected Status status(long processNanos, int pagesProcessed, long rowsReceived, long rowsEmitted) {
return new Status(new TreeMap<>(readersBuilt), processNanos, pagesProcessed, rowsReceived, rowsEmitted);
return new Status(new TreeMap<>(readersBuilt), processNanos, pagesProcessed, rowsReceived, rowsEmitted, valuesLoaded);
}

public static class Status extends AbstractPageMappingOperator.Status {
@@ -558,21 +564,34 @@ public static class Status extends AbstractPageMappingOperator.Status {
);

private final Map<String, Integer> readersBuilt;

Status(Map<String, Integer> readersBuilt, long processNanos, int pagesProcessed, long rowsReceived, long rowsEmitted) {
private final long valuesLoaded;

Status(
Map<String, Integer> readersBuilt,
long processNanos,
int pagesProcessed,
long rowsReceived,
long rowsEmitted,
long valuesLoaded
) {
super(processNanos, pagesProcessed, rowsReceived, rowsEmitted);
this.readersBuilt = readersBuilt;
this.valuesLoaded = valuesLoaded;
}

Status(StreamInput in) throws IOException {
super(in);
readersBuilt = in.readOrderedMap(StreamInput::readString, StreamInput::readVInt);
valuesLoaded = in.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED) ? in.readVLong() : 0;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeMap(readersBuilt, StreamOutput::writeVInt);
if (out.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED)) {
out.writeVLong(valuesLoaded);
}
}

@Override
@@ -584,6 +603,11 @@ public Map<String, Integer> readersBuilt() {
return readersBuilt;
}

@Override
public long valuesLoaded() {
return valuesLoaded;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
@@ -592,6 +616,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(e.getKey(), e.getValue());
}
builder.endObject();
builder.field("values_loaded", valuesLoaded);
innerToXContent(builder);
return builder.endObject();
}
@@ -600,12 +625,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
public boolean equals(Object o) {
if (super.equals(o) == false) return false;
Status status = (Status) o;
return readersBuilt.equals(status.readersBuilt);
return readersBuilt.equals(status.readersBuilt) && valuesLoaded == status.valuesLoaded;
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), readersBuilt);
return Objects.hash(super.hashCode(), readersBuilt, valuesLoaded);
}

@Override
@@ -710,6 +735,4 @@ public BlockLoader.AggregateMetricDoubleBuilder aggregateMetricDoubleBuilder(int
return factory.newAggregateMetricDoubleBlockBuilder(count);
}
}

// TODO tests that mix source loaded fields and doc values in the same block
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.operator;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

/**
* Information returned when one of more {@link Driver}s is completed.
* @param documentsFound The number of documents found by all lucene queries performed by these drivers.
* @param valuesLoaded The number of values loaded from lucene for all drivers. This is
* <strong>roughly</strong> the number of documents times the number of
* fields per document. Except {@code null} values don't count.
* And multivalued fields count as many times as there are values.
* @param collectedProfiles {@link DriverProfile}s from each driver. These are fairly cheap to build but
* not free so this will be empty if the {@code profile} option was not set in
* the request.
*/
public record DriverCompletionInfo(long documentsFound, long valuesLoaded, List<DriverProfile> collectedProfiles) implements Writeable {

/**
* Completion info we use when we didn't properly complete any drivers.
* Usually this is returned with an error, but it's also used when receiving
* responses from very old nodes.
*/
public static final DriverCompletionInfo EMPTY = new DriverCompletionInfo(0, 0, List.of());

/**
* Build a {@link DriverCompletionInfo} for many drivers including their profile output.
*/
public static DriverCompletionInfo includingProfiles(List<Driver> drivers) {
long documentsFound = 0;
long valuesLoaded = 0;
List<DriverProfile> collectedProfiles = new ArrayList<>(drivers.size());
for (Driver d : drivers) {
DriverProfile p = d.profile();
for (OperatorStatus o : p.operators()) {
documentsFound += o.documentsFound();
valuesLoaded += o.valuesLoaded();
}
collectedProfiles.add(p);
}
return new DriverCompletionInfo(documentsFound, valuesLoaded, collectedProfiles);
}

/**
* Build a {@link DriverCompletionInfo} for many drivers excluding their profile output.
*/
public static DriverCompletionInfo excludingProfiles(List<Driver> drivers) {
long documentsFound = 0;
long valuesLoaded = 0;
for (Driver d : drivers) {
DriverStatus s = d.status();
assert s.status() == DriverStatus.Status.DONE;
for (OperatorStatus o : s.completedOperators()) {
documentsFound += o.documentsFound();
valuesLoaded += o.valuesLoaded();
}
}
return new DriverCompletionInfo(documentsFound, valuesLoaded, List.of());
}

public DriverCompletionInfo(StreamInput in) throws IOException {
this(in.readVLong(), in.readVLong(), in.readCollectionAsImmutableList(DriverProfile::readFrom));
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(documentsFound);
out.writeVLong(valuesLoaded);
out.writeCollection(collectedProfiles, (o, v) -> v.writeTo(o));
}

public static class Accumulator {
private long documentsFound;
private long valuesLoaded;
private final List<DriverProfile> collectedProfiles = new ArrayList<>();

public void accumulate(DriverCompletionInfo info) {
this.documentsFound += info.documentsFound;
this.valuesLoaded += info.valuesLoaded;
this.collectedProfiles.addAll(info.collectedProfiles);
}

public DriverCompletionInfo finish() {
return new DriverCompletionInfo(documentsFound, valuesLoaded, collectedProfiles);
}
}

public static class AtomicAccumulator {
private final AtomicLong documentsFound = new AtomicLong();
private final AtomicLong valuesLoaded = new AtomicLong();
private final List<DriverProfile> collectedProfiles = Collections.synchronizedList(new ArrayList<>());

public void accumulate(DriverCompletionInfo info) {
this.documentsFound.addAndGet(info.documentsFound);
this.valuesLoaded.addAndGet(info.valuesLoaded);
this.collectedProfiles.addAll(info.collectedProfiles);
}

public DriverCompletionInfo finish() {
return new DriverCompletionInfo(documentsFound.get(), valuesLoaded.get(), collectedProfiles);
}
}
}
Original file line number Diff line number Diff line change
@@ -104,6 +104,8 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params
if (b.humanReadable()) {
b.field("cpu_time", TimeValue.timeValueNanos(cpuNanos));
}
b.field("documents_found", operators.stream().mapToLong(OperatorStatus::documentsFound).sum());
b.field("values_loaded", operators.stream().mapToLong(OperatorStatus::valuesLoaded).sum());
b.field("iterations", iterations);
return b;
}),
Original file line number Diff line number Diff line change
@@ -124,6 +124,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (builder.humanReadable()) {
builder.field("cpu_time", TimeValue.timeValueNanos(cpuNanos));
}
builder.field("documents_found", documentsFound());
builder.field("values_loaded", valuesLoaded());
builder.field("iterations", iterations);
builder.field("status", status, params);
builder.startArray("completed_operators");
@@ -145,6 +147,34 @@ public String toString() {
return Strings.toString(this);
}

/**
* The number of documents found by this driver.
*/
public long documentsFound() {
long documentsFound = 0;
for (OperatorStatus s : completedOperators) {
documentsFound += s.documentsFound();
}
for (OperatorStatus s : activeOperators) {
documentsFound += s.documentsFound();
}
return documentsFound;
}

/**
* The number of values loaded by this operator.
*/
public long valuesLoaded() {
long valuesLoaded = 0;
for (OperatorStatus s : completedOperators) {
valuesLoaded += s.valuesLoaded();
}
for (OperatorStatus s : activeOperators) {
valuesLoaded += s.valuesLoaded();
}
return valuesLoaded;
}

public enum Status implements Writeable, ToXContentFragment {
QUEUED,
STARTING,
Original file line number Diff line number Diff line change
@@ -105,5 +105,21 @@ interface OperatorFactory extends Describable {
/**
* Status of an {@link Operator} to be returned by the tasks API.
*/
interface Status extends ToXContentObject, VersionedNamedWriteable {}
interface Status extends ToXContentObject, VersionedNamedWriteable {
/**
* The number of documents found by this operator. Most operators
* don't find documents and will return {@code 0} here.
*/
default long documentsFound() {
return 0;
}

/**
* The number of values loaded by this operator. Most operators
* don't load values and will return {@code 0} here.
*/
default long valuesLoaded() {
return 0;
}
}
Copy link
Contributor

@idegtiarenko idegtiarenko Apr 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to this change, but I think it is worth revisiting operator status.
Currently this is interface with implementation per each operator. It is used to expose some diagnostics attributes outside.

I believe we should replace it with a simple map (or maybe a wrapper on top of the map).
Common schema could be achieved with some sort of status builder (something like status().withPagesProcessed(..).withRowReceived(..).withRowsEmitted(..).build())
This way we would not need to have a custom implementation per operator, could have common equals/hashCode and toString logic. With map structure we will not need a new transport version when adding/renaming/removing infos.

}
Loading
Loading