-
Notifications
You must be signed in to change notification settings - Fork 25.2k
ESQL: Add documents_found
and values_loaded
#125631
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
90de284
4875c5e
fb36e7e
b5bf239
7d306c0
ce44024
aebd712
75cef17
12aafc4
62b5008
add4368
9335c3e
5067e28
4d6e8db
fa3487e
28e9fa4
62a6d3e
4201ae5
3011344
79f7c39
fbbe29c
0953790
25ec61d
a2a9a9e
4e3136a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
pr: 125631 | ||
summary: Add `documents_found` and `values_loaded` | ||
area: ES|QL | ||
type: enhancement | ||
issues: [] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0; you may not use this file except in compliance with the Elastic License | ||
* 2.0. | ||
*/ | ||
|
||
package org.elasticsearch.compute.operator; | ||
|
||
import org.elasticsearch.common.io.stream.StreamInput; | ||
import org.elasticsearch.common.io.stream.StreamOutput; | ||
import org.elasticsearch.common.io.stream.Writeable; | ||
|
||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
|
||
/** | ||
* Information returned when one of more {@link Driver}s is completed. | ||
* @param documentsFound The number of documents found by all lucene queries performed by these drivers. | ||
* @param valuesLoaded The number of values loaded from lucene for all drivers. This is | ||
* <strong>roughly</strong> the number of documents times the number of | ||
* fields per document. Except {@code null} values don't count. | ||
* And multivalued fields count as many times as there are values. | ||
* @param collectedProfiles {@link DriverProfile}s from each driver. These are fairly cheap to build but | ||
* not free so this will be empty if the {@code profile} option was not set in | ||
* the request. | ||
*/ | ||
public record DriverCompletionInfo(long documentsFound, long valuesLoaded, List<DriverProfile> collectedProfiles) implements Writeable { | ||
|
||
/** | ||
* Completion info we use when we didn't properly complete any drivers. | ||
* Usually this is returned with an error, but it's also used when receiving | ||
* responses from very old nodes. | ||
*/ | ||
public static final DriverCompletionInfo EMPTY = new DriverCompletionInfo(0, 0, List.of()); | ||
|
||
/** | ||
* Build a {@link DriverCompletionInfo} for many drivers including their profile output. | ||
*/ | ||
public static DriverCompletionInfo includingProfiles(List<Driver> drivers) { | ||
long documentsFound = 0; | ||
long valuesLoaded = 0; | ||
List<DriverProfile> collectedProfiles = new ArrayList<>(drivers.size()); | ||
for (Driver d : drivers) { | ||
DriverProfile p = d.profile(); | ||
for (OperatorStatus o : p.operators()) { | ||
documentsFound += o.documentsFound(); | ||
valuesLoaded += o.valuesLoaded(); | ||
} | ||
collectedProfiles.add(p); | ||
} | ||
return new DriverCompletionInfo(documentsFound, valuesLoaded, collectedProfiles); | ||
} | ||
|
||
/** | ||
* Build a {@link DriverCompletionInfo} for many drivers excluding their profile output. | ||
*/ | ||
public static DriverCompletionInfo excludingProfiles(List<Driver> drivers) { | ||
long documentsFound = 0; | ||
GalLalouche marked this conversation as resolved.
Show resolved
Hide resolved
|
||
long valuesLoaded = 0; | ||
for (Driver d : drivers) { | ||
DriverStatus s = d.status(); | ||
assert s.status() == DriverStatus.Status.DONE; | ||
for (OperatorStatus o : s.completedOperators()) { | ||
documentsFound += o.documentsFound(); | ||
valuesLoaded += o.valuesLoaded(); | ||
} | ||
} | ||
return new DriverCompletionInfo(documentsFound, valuesLoaded, List.of()); | ||
} | ||
|
||
public DriverCompletionInfo(StreamInput in) throws IOException { | ||
this(in.readVLong(), in.readVLong(), in.readCollectionAsImmutableList(DriverProfile::readFrom)); | ||
} | ||
|
||
@Override | ||
public void writeTo(StreamOutput out) throws IOException { | ||
out.writeVLong(documentsFound); | ||
out.writeVLong(valuesLoaded); | ||
out.writeCollection(collectedProfiles, (o, v) -> v.writeTo(o)); | ||
} | ||
|
||
public static class Accumulator { | ||
private long documentsFound; | ||
private long valuesLoaded; | ||
private final List<DriverProfile> collectedProfiles = new ArrayList<>(); | ||
|
||
public void accumulate(DriverCompletionInfo info) { | ||
this.documentsFound += info.documentsFound; | ||
this.valuesLoaded += info.valuesLoaded; | ||
this.collectedProfiles.addAll(info.collectedProfiles); | ||
} | ||
|
||
public DriverCompletionInfo finish() { | ||
return new DriverCompletionInfo(documentsFound, valuesLoaded, collectedProfiles); | ||
} | ||
} | ||
|
||
public static class AtomicAccumulator { | ||
private final AtomicLong documentsFound = new AtomicLong(); | ||
private final AtomicLong valuesLoaded = new AtomicLong(); | ||
private final List<DriverProfile> collectedProfiles = Collections.synchronizedList(new ArrayList<>()); | ||
|
||
public void accumulate(DriverCompletionInfo info) { | ||
this.documentsFound.addAndGet(info.documentsFound); | ||
this.valuesLoaded.addAndGet(info.valuesLoaded); | ||
this.collectedProfiles.addAll(info.collectedProfiles); | ||
} | ||
|
||
public DriverCompletionInfo finish() { | ||
return new DriverCompletionInfo(documentsFound.get(), valuesLoaded.get(), collectedProfiles); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -105,5 +105,21 @@ interface OperatorFactory extends Describable { | |
/** | ||
* Status of an {@link Operator} to be returned by the tasks API. | ||
*/ | ||
interface Status extends ToXContentObject, VersionedNamedWriteable {} | ||
interface Status extends ToXContentObject, VersionedNamedWriteable { | ||
/** | ||
* The number of documents found by this operator. Most operators | ||
* don't find documents and will return {@code 0} here. | ||
*/ | ||
default long documentsFound() { | ||
return 0; | ||
} | ||
|
||
/** | ||
* The number of values loaded by this operator. Most operators | ||
* don't load values and will return {@code 0} here. | ||
*/ | ||
default long valuesLoaded() { | ||
return 0; | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not related to this change, but I think it is worth revisiting operator status. I believe we should replace it with a simple map (or maybe a wrapper on top of the map). |
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be changed to
long
, or are we not worried about overflows here?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overflow would be a bug in planning. That'd be us using billions of values in a single block....