From b98123e94a9f35ce526aba9a9ad1ef3a51e04e63 Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Wed, 5 Mar 2025 19:21:14 +0100 Subject: [PATCH] Review with Devin Part 2 --- .../iceberg/util/IcebergTableWriter.java | 29 ++-- .../iceberg/util/SortOrderProvider.java | 26 +++- .../util/SortOrderProviderInternal.java | 135 ++++++++++++++++-- 3 files changed, 166 insertions(+), 24 deletions(-) diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableWriter.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableWriter.java index ac5f7bc75a7..388c9067d69 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableWriter.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableWriter.java @@ -111,13 +111,12 @@ public class IcebergTableWriter { private final OutputFileFactory outputFileFactory; /** - * The sort order to use while writing data to the Iceberg table. + * The sort order to write down for new data files. */ - private final SortOrder sortOrder; + private final SortOrder sortOrderToWrite; /** - * The names of columns on which the tables will be sorted before writing to Iceberg. This is derived from - * {@link #sortOrder}. + * The names of columns on which the tables will be sorted before writing to Iceberg. */ private final Collection sortColumnNames; @@ -152,9 +151,11 @@ public class IcebergTableWriter { .format(FileFormat.PARQUET) .build(); - sortOrder = ((SortOrderProviderInternal.SortOrderProviderImpl) tableWriterOptions.sortOrderProvider()) - .getSortOrder(table); - sortColumnNames = computeSortColumns(sortOrder); + final SortOrderProviderInternal.SortOrderProviderImpl sortOrderProvider = + ((SortOrderProviderInternal.SortOrderProviderImpl) tableWriterOptions.sortOrderProvider()); + sortColumnNames = + computeSortColumns(sortOrderProvider.getSortOrderToUse(table), sortOrderProvider.failOnUnmapped()); + sortOrderToWrite = sortOrderProvider.getSortOrderToWrite(table); } private static TableParquetWriterOptions verifyWriterOptions( @@ -284,7 +285,7 @@ private Map readNameMappingDefault() { return nameMappingDefault; } - private List computeSortColumns(@NotNull final SortOrder sortOrder) { + private List computeSortColumns(@NotNull final SortOrder sortOrder, final boolean failOnUnmapped) { if (sortOrder.isUnsorted()) { return List.of(); } @@ -297,13 +298,19 @@ private List computeSortColumns(@NotNull final SortOrder sortOrder) } else if (sortField.nullOrder() == NullOrder.NULLS_LAST && sortField.direction() == SortDirection.DESC) { ascending = false; } else { - // Cannot enforce this sort ordering using deephaven + if (failOnUnmapped) { + throw new IllegalArgumentException("Cannot apply sort order " + sortOrder + " since Deephaven" + + " currently only supports sorting by {ASC, NULLS FIRST} or {DESC, NULLS LAST}"); + } return List.of(); } final int fieldId = sortField.sourceId(); final String columnName = fieldIdToColumnName.get(fieldId); if (columnName == null) { - // Could not find the column name in current schema, so we cannot sort by it + if (failOnUnmapped) { + throw new IllegalArgumentException("Cannot apply sort order " + sortOrder + " since column " + + "corresponding to field ID " + fieldId + " not found in schema"); + } return List.of(); } final SortColumn sortColumn = @@ -601,7 +608,7 @@ private List dataFilesFromParquet( .withFormat(FileFormat.PARQUET) .withRecordCount(completedWrite.numRows()) .withFileSizeInBytes(completedWrite.numBytes()) - .withSortOrder(sortOrder); + .withSortOrder(sortOrderToWrite); if (partitionSpec.isPartitioned()) { dataFileBuilder.withPartition(partitionDataList.get(idx)); } diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/SortOrderProvider.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/SortOrderProvider.java index cb0d817bbde..124050a8e05 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/SortOrderProvider.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/SortOrderProvider.java @@ -3,6 +3,8 @@ // package io.deephaven.iceberg.util; +import org.apache.iceberg.SortOrder; + /** * A specification for providing {@link org.apache.iceberg.SortOrder} while writing to an iceberg table. */ @@ -22,7 +24,7 @@ static SortOrderProvider unsorted() { * order is set on the table, no sorting will be done. */ static SortOrderProvider useTableDefault() { - return SortOrderProviderInternal.TableDefaultSortOrderProvider.INSTANCE; + return new SortOrderProviderInternal.TableDefaultSortOrderProvider(); } /** @@ -31,4 +33,26 @@ static SortOrderProvider useTableDefault() { static SortOrderProvider fromSortId(final int id) { return new SortOrderProviderInternal.IdSortOrderProvider(id); } + + /** + * Return a sort order provider that delegates to this provider for computing the columns to sort on, but writes a + * different sort order ID to the iceberg table. Note that the sort order returned by the caller must + * {@link SortOrder#satisfies(SortOrder) satisfy} the sort order corresponding to the provided sort order ID. + *

+ * For example, this provider might return fields {A, B, C} to sort on, but the ID written to iceberg corresponds to + * sort order with fields {A, B}. + * + * @param sortOrderId the sort order ID to write to the iceberg table + */ + default SortOrderProvider as(final int sortOrderId) { + return new SortOrderProviderInternal.DelegatingSortOrderProvider(this, sortOrderId); + } + + /** + * Returns a sort order provider which will fail, if for any reason, the sort order cannot be applied to the tables + * being written. By default, the provider will not fail if the sort order cannot be applied. + * + * @param failOnUnmapped whether to fail if the sort order cannot be applied to the tables being written + */ + SortOrderProvider withFailOnUnmapped(boolean failOnUnmapped); } diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/SortOrderProviderInternal.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/SortOrderProviderInternal.java index a1192333a6b..ecb2d89ddae 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/SortOrderProviderInternal.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/SortOrderProviderInternal.java @@ -14,49 +14,160 @@ class SortOrderProviderInternal { interface SortOrderProviderImpl { /** - * Returns the {@link SortOrder} to use when writing to the given table based on this {@link SortOrderProvider}. + * Returns the {@link SortOrder} to use for sorting the data when writing to the provided iceberg table. */ @NotNull - SortOrder getSortOrder(Table table); + SortOrder getSortOrderToUse(Table table); + + /** + * Returns the sort order to write down to the iceberg table when appending new data. This may be different from + * the {@code getSortOrderToUse(table)} as this sort order may be customized. But + * {@link #getSortOrderToUse(Table)} should always {@link SortOrder#satisfies(SortOrder) satisfy} this sort + * order. + */ + @NotNull + default SortOrder getSortOrderToWrite(final Table table) { + return getSortOrderToUse(table); + } + + /** + * Whether to fail if the sort order cannot be applied to the tables being written. + */ + boolean failOnUnmapped(); } // Implementations of SortOrderProvider - enum DisableSorting implements SortOrderProvider, SortOrderProviderImpl { + enum DisableSorting implements SortOrderProviderImpl, SortOrderProvider { INSTANCE; @Override @NotNull - public SortOrder getSortOrder(final Table table) { + public SortOrder getSortOrderToUse(final Table table) { return SortOrder.unsorted(); } + + @Override + @NotNull + public SortOrderProvider withFailOnUnmapped(final boolean failOnUnmapped) { + throw new UnsupportedOperationException("Cannot set failOnUnmapped for unsorted sort order provider"); + } + + @Override + public boolean failOnUnmapped() { + return true; // Should never fail as we are unsorted + } } - enum TableDefaultSortOrderProvider implements SortOrderProvider, SortOrderProviderImpl { - INSTANCE; + static class TableDefaultSortOrderProvider implements SortOrderProvider, SortOrderProviderImpl { + private boolean failOnUnmapped; + + TableDefaultSortOrderProvider() { + failOnUnmapped = false; + } @Override @NotNull - public SortOrder getSortOrder(final Table table) { + public SortOrder getSortOrderToUse(final Table table) { final SortOrder sortOrder = table.sortOrder(); return sortOrder != null ? sortOrder : SortOrder.unsorted(); } + + @Override + public SortOrderProvider withFailOnUnmapped(final boolean failOnUnmapped) { + this.failOnUnmapped = failOnUnmapped; + return this; + } + + @Override + public boolean failOnUnmapped() { + return failOnUnmapped; + } } static class IdSortOrderProvider implements SortOrderProvider, SortOrderProviderImpl { + private boolean failOnUnmapped; private final int sortOrderId; IdSortOrderProvider(final int sortOrderId) { + super(); + this.sortOrderId = sortOrderId; + } + + @Override + @NotNull + public SortOrder getSortOrderToUse(final Table table) { + return getSortOrderForId(table, sortOrderId); + } + + @Override + public SortOrderProvider withFailOnUnmapped(final boolean failOnUnmapped) { + this.failOnUnmapped = failOnUnmapped; + return this; + } + + @Override + public boolean failOnUnmapped() { + return failOnUnmapped; + } + } + + /** + * A {@link SortOrderProvider} that delegates to another {@link SortOrderProvider} for computing the sort order, + * while providing a custom sort order ID. + */ + static class DelegatingSortOrderProvider implements SortOrderProvider, SortOrderProviderImpl { + private SortOrderProvider sortOrderProvider; + private final int sortOrderId; + + DelegatingSortOrderProvider(final SortOrderProvider sortOrderProvider, final int sortOrderId) { + this.sortOrderProvider = sortOrderProvider; this.sortOrderId = sortOrderId; } @Override @NotNull - public SortOrder getSortOrder(final Table table) { - if (!table.sortOrders().containsKey(sortOrderId)) { - throw new IllegalArgumentException("Sort order with ID " + sortOrderId + " not found for table " + - table); + public SortOrder getSortOrderToUse(final Table table) { + return ((SortOrderProviderImpl) sortOrderProvider).getSortOrderToUse(table); + } + + @Override + @NotNull + public SortOrder getSortOrderToWrite(final Table table) { + final SortOrder sortOrderFromDelegate = getSortOrderToUse(table); + final SortOrder sortOrderForId = getSortOrderForId(table, sortOrderId); + if (!sortOrderFromDelegate.satisfies(sortOrderForId)) { + throw new IllegalArgumentException("Sort order with ID " + sortOrderId + " does not satisfy the " + + "table's sort order: " + sortOrderFromDelegate); } - return table.sortOrders().get(sortOrderId); + return sortOrderForId; + } + + @Override + @NotNull + public SortOrderProvider as(final int sortOrderId) { + return new DelegatingSortOrderProvider(sortOrderProvider, sortOrderId); + } + + @Override + public SortOrderProvider withFailOnUnmapped(final boolean failOnUnmapped) { + sortOrderProvider = sortOrderProvider.withFailOnUnmapped(failOnUnmapped); + return this; + } + + @Override + public boolean failOnUnmapped() { + return ((SortOrderProviderImpl) sortOrderProvider).failOnUnmapped(); + } + } + + // -------------------------------------------------------------------------------------------------- + + // Methods for extracting the sort order from the table + private static SortOrder getSortOrderForId(final Table table, final int sortOrderId) { + if (!table.sortOrders().containsKey(sortOrderId)) { + throw new IllegalArgumentException("Sort order with ID " + sortOrderId + " not found for table " + + table); } + return table.sortOrders().get(sortOrderId); } }