-
Notifications
You must be signed in to change notification settings - Fork 82
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: DH-18143: Improve handling of sort order for Iceberg tables #6646
base: main
Are you sure you want to change the base?
feat: DH-18143: Improve handling of sort order for Iceberg tables #6646
Conversation
fe2f89f
to
67d1003
Compare
67d1003
to
54e1f08
Compare
This PR has three commits as follows:
I am not a fan of Commit 2 for two reasons (please review the PR so the following make more sense):
|
final Schema schema = sortOrder.schema(); | ||
final List<SortColumn> sortColumns = new ArrayList<>(sortOrder.fields().size()); | ||
for (final SortField field : sortOrder.fields()) { | ||
final ColumnName columnName = ColumnName.of(schema.findColumnName(field.sourceId())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might throw an InvalidNameException; we might need to wait for some Resolver work I'm doing in https://deephaven.atlassian.net/browse/DH-18365 to land so we can properly map field ids.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would not mark this as resolved yet
extensions/iceberg/src/main/java/io/deephaven/iceberg/location/IcebergTableParquetLocation.java
Show resolved
Hide resolved
if (field.nullOrder() == NullOrder.NULLS_FIRST && field.direction() == SortDirection.ASC) { | ||
sortColumn = SortColumn.asc(columnName); | ||
} else if (field.nullOrder() == NullOrder.NULLS_LAST && field.direction() == SortDirection.DESC) { | ||
sortColumn = SortColumn.desc(columnName); | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should raise the issue of null-first, nulls-last with the engine team. Arguably, this is something we should want to support.
Additionally, we may need to hold of on handling any floating point columns.
-NaN < -Infinity < -value < -0 < 0 < value < Infinity < NaN
, https://iceberg.apache.org/spec/#sorting
The -NaN
v NaN
is something I have not seen before, but another issue to raise w/ engine team.
In the meantime, I think the strategy of breaking and returning what we have so far should be OK.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* @return A stream of {@link DataFile} objects. | ||
*/ | ||
public static Stream<DataFile> allDataFiles(@NotNull final Table table, @NotNull ManifestFile manifestFile) { | ||
return toStream(ManifestFiles.read(manifestFile, table.io())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recently learned that the files themselves may have metadata, ie org.apache.iceberg.ManifestReader#spec
. It makes me want to add caution extending these helper methods too far. While we aren't passing along ManifestReader#spec
today, we may need to in the future and might need to model it as appropriate.
extensions/iceberg/src/main/java/io/deephaven/iceberg/location/IcebergTableParquetLocation.java
Outdated
Show resolved
Hide resolved
private List<SortColumn> computeSortedColumns() { | ||
final Integer sortOrderId = dataFile.sortOrderId(); | ||
// If sort order ID is missing, unknown or unsorted, we fall back to reading sort columns from the parquet file | ||
if (sortOrderId == null) { | ||
return super.getSortedColumns(); | ||
} | ||
final SortOrder sortOrder = tableAdapter.icebergTable().sortOrders().get(sortOrderId); | ||
if (sortOrder == null || sortOrder.isUnsorted()) { | ||
return super.getSortedColumns(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's an interesting question: if the metadata exists on the file itself, should we prefer it? I can imagine a case where we are setting more specific sort column information in the file itself.
For example, maybe Iceberg knows this table is sorted on columns [A, B]
, but the parquet metadata gives us more information that it is sorted on columns [A, B, C]
.
There's also an argument to be made that we should completely ignore the metadata from the file itself, and only rely on Iceberg. This saves us from needing to materialize the parquet file metadata (at least from this code path). In particular, if Iceberg explicitly gives us back sortOrder.isUnsorted()
, maybe we should be okay just returning an empty list?
It's also possible that we want this to be configurable... it's not obvious to me what the best course of action is.
extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergWriteInstructions.java
Outdated
Show resolved
Hide resolved
extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableWriter.java
Outdated
Show resolved
Hide resolved
extensions/iceberg/src/main/java/io/deephaven/iceberg/util/SortOrderProviderInternal.java
Outdated
Show resolved
Hide resolved
extensions/iceberg/src/main/java/io/deephaven/iceberg/util/SortOrderProviderInternal.java
Outdated
Show resolved
Hide resolved
extensions/iceberg/src/main/java/io/deephaven/iceberg/util/TableWriterOptions.java
Outdated
Show resolved
Hide resolved
extensions/iceberg/src/main/java/io/deephaven/iceberg/util/TableWriterOptions.java
Outdated
Show resolved
Hide resolved
extensions/iceberg/src/main/java/io/deephaven/iceberg/util/SortOrderProvider.java
Outdated
Show resolved
Hide resolved
extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableWriter.java
Outdated
Show resolved
Hide resolved
extensions/iceberg/src/main/java/io/deephaven/iceberg/util/SortOrderProvider.java
Outdated
Show resolved
Hide resolved
extensions/iceberg/src/main/java/io/deephaven/iceberg/util/SortOrderProviderInternal.java
Outdated
Show resolved
Hide resolved
extensions/iceberg/src/main/java/io/deephaven/iceberg/util/SortOrderProvider.java
Show resolved
Hide resolved
extensions/iceberg/src/main/java/io/deephaven/iceberg/location/IcebergTableParquetLocation.java
Outdated
Show resolved
Hide resolved
extensions/iceberg/src/main/java/io/deephaven/iceberg/util/SortOrderProviderInternal.java
Outdated
Show resolved
Hide resolved
extensions/iceberg/src/main/java/io/deephaven/iceberg/util/SortOrderProvider.java
Show resolved
Hide resolved
public List<SortColumn> getSortedColumns() { | ||
return sortedColumns == null ? super.getSortedColumns() : sortedColumns; | ||
} | ||
|
||
@Nullable | ||
private static List<SortColumn> computeSortedColumns( | ||
@NotNull final IcebergTableAdapter tableAdapter, | ||
@NotNull final DataFile dataFile) { | ||
final Integer sortOrderId = dataFile.sortOrderId(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to think about behavior when unsorted (either b/c null or explicitly set to unsorted)...
extensions/iceberg/src/main/java/io/deephaven/iceberg/util/SortOrderProvider.java
Outdated
Show resolved
Hide resolved
extensions/iceberg/src/main/java/io/deephaven/iceberg/util/SchemaProviderInternal.java
Outdated
Show resolved
Hide resolved
extensions/iceberg/src/main/java/io/deephaven/iceberg/util/SchemaProviderInternal.java
Outdated
Show resolved
Hide resolved
extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableWriter.java
Outdated
Show resolved
Hide resolved
extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableWriter.java
Outdated
Show resolved
Hide resolved
extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableWriter.java
Outdated
Show resolved
Hide resolved
@classmethod | ||
def from_sort_id(cls, sort_order_id: int) -> 'SortOrderProvider': | ||
""" | ||
Use the sort order with the given ID to sort new data while writing to the iceberg table. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have no idea what any of this means. It needs more description. e.g. why would a sort order have an ID? How would I know what the IDs are? etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sort order is actually a Iceberg concept (https://iceberg.apache.org/spec/#sorting) and I don't want to add too much detail in our documentation for that.
So I have added a link for the spec at two places. Let me know if this makes it any better.
Returns a sort order provider that delegates to this provider for computing the columns to sort on, but writes a | ||
different sort order ID to the iceberg table. | ||
For example, this provider might return fields {A, B, C} to sort on, but the ID written to iceberg corresponds | ||
to sort order with fields {A, B}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not clear enough. See earlier comments.
to sort order with fields {A, B}. | ||
|
||
Args: | ||
sort_order_id (int): the sort order ID to write to the iceberg table. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tables have sort order IDs stored with them? This is the kind of stuff that isn't documented well enough. No part of this documentation would have suggested such a thing to me. There is a lot of assumed knowledge that needs to be explicit in the docs for the new code.
sort_order_provider: Optional[SortOrderProvider]: Used to provide SortOrder to be used for sorting new data | ||
while writing to an iceberg table using this writer. Note that we select the sort order of the Table at | ||
the time the writer is constructed, and it does not change if the table's sort order changes. Defaults | ||
to `None`, which means use the table's default sort order. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See earlier comments on needing a more readable, clear, and expanded docstring.
class SortOrderProvider(JObjectWrapper): | ||
""" | ||
:class:`.SortOrderProvider` is used for providing SortOrder to be used for sorting new data while writing to an |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- I don't see any test for the new class.
- The name
SortOrderProvider
seems to suggest that it might be used to provide a SortOrder object to be used when working with Iceberg data, but in fact all the factory methods return an instance of itself. I am not opposed to this pattern per se, but the docstring on the methods probably can be more explicit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We still don't have unit testing for iceberg. We have an open ticket for that DH-18261, but it hasn't been prioritised yet.
Related to DH-18143