Skip to content

Commit

Permalink
Review with Devin Part 2
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam committed Mar 5, 2025
1 parent e029f67 commit b98123e
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,12 @@ public class IcebergTableWriter {
private final OutputFileFactory outputFileFactory;

/**
* The sort order to use while writing data to the Iceberg table.
* The sort order to write down for new data files.
*/
private final SortOrder sortOrder;
private final SortOrder sortOrderToWrite;

/**
* The names of columns on which the tables will be sorted before writing to Iceberg. This is derived from
* {@link #sortOrder}.
* The names of columns on which the tables will be sorted before writing to Iceberg.
*/
private final Collection<SortColumn> sortColumnNames;

Expand Down Expand Up @@ -152,9 +151,11 @@ public class IcebergTableWriter {
.format(FileFormat.PARQUET)
.build();

sortOrder = ((SortOrderProviderInternal.SortOrderProviderImpl) tableWriterOptions.sortOrderProvider())
.getSortOrder(table);
sortColumnNames = computeSortColumns(sortOrder);
final SortOrderProviderInternal.SortOrderProviderImpl sortOrderProvider =
((SortOrderProviderInternal.SortOrderProviderImpl) tableWriterOptions.sortOrderProvider());
sortColumnNames =
computeSortColumns(sortOrderProvider.getSortOrderToUse(table), sortOrderProvider.failOnUnmapped());
sortOrderToWrite = sortOrderProvider.getSortOrderToWrite(table);
}

private static TableParquetWriterOptions verifyWriterOptions(
Expand Down Expand Up @@ -284,7 +285,7 @@ private Map<String, Integer> readNameMappingDefault() {
return nameMappingDefault;
}

private List<SortColumn> computeSortColumns(@NotNull final SortOrder sortOrder) {
private List<SortColumn> computeSortColumns(@NotNull final SortOrder sortOrder, final boolean failOnUnmapped) {
if (sortOrder.isUnsorted()) {
return List.of();
}
Expand All @@ -297,13 +298,19 @@ private List<SortColumn> computeSortColumns(@NotNull final SortOrder sortOrder)
} else if (sortField.nullOrder() == NullOrder.NULLS_LAST && sortField.direction() == SortDirection.DESC) {
ascending = false;
} else {
// Cannot enforce this sort ordering using deephaven
if (failOnUnmapped) {
throw new IllegalArgumentException("Cannot apply sort order " + sortOrder + " since Deephaven" +
" currently only supports sorting by {ASC, NULLS FIRST} or {DESC, NULLS LAST}");
}
return List.of();
}
final int fieldId = sortField.sourceId();
final String columnName = fieldIdToColumnName.get(fieldId);
if (columnName == null) {
// Could not find the column name in current schema, so we cannot sort by it
if (failOnUnmapped) {
throw new IllegalArgumentException("Cannot apply sort order " + sortOrder + " since column " +
"corresponding to field ID " + fieldId + " not found in schema");
}
return List.of();
}
final SortColumn sortColumn =
Expand Down Expand Up @@ -601,7 +608,7 @@ private List<DataFile> dataFilesFromParquet(
.withFormat(FileFormat.PARQUET)
.withRecordCount(completedWrite.numRows())
.withFileSizeInBytes(completedWrite.numBytes())
.withSortOrder(sortOrder);
.withSortOrder(sortOrderToWrite);
if (partitionSpec.isPartitioned()) {
dataFileBuilder.withPartition(partitionDataList.get(idx));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
//
package io.deephaven.iceberg.util;

import org.apache.iceberg.SortOrder;

/**
* A specification for providing {@link org.apache.iceberg.SortOrder} while writing to an iceberg table.
*/
Expand All @@ -22,7 +24,7 @@ static SortOrderProvider unsorted() {
* order is set on the table, no sorting will be done.
*/
static SortOrderProvider useTableDefault() {
return SortOrderProviderInternal.TableDefaultSortOrderProvider.INSTANCE;
return new SortOrderProviderInternal.TableDefaultSortOrderProvider();
}

/**
Expand All @@ -31,4 +33,26 @@ static SortOrderProvider useTableDefault() {
static SortOrderProvider fromSortId(final int id) {
return new SortOrderProviderInternal.IdSortOrderProvider(id);
}

/**
* Return a sort order provider that delegates to this provider for computing the columns to sort on, but writes a
* different sort order ID to the iceberg table. Note that the sort order returned by the caller must
* {@link SortOrder#satisfies(SortOrder) satisfy} the sort order corresponding to the provided sort order ID.
* <p>
* For example, this provider might return fields {A, B, C} to sort on, but the ID written to iceberg corresponds to
* sort order with fields {A, B}.
*
* @param sortOrderId the sort order ID to write to the iceberg table
*/
default SortOrderProvider as(final int sortOrderId) {
return new SortOrderProviderInternal.DelegatingSortOrderProvider(this, sortOrderId);
}

/**
* Returns a sort order provider which will fail, if for any reason, the sort order cannot be applied to the tables
* being written. By default, the provider will not fail if the sort order cannot be applied.
*
* @param failOnUnmapped whether to fail if the sort order cannot be applied to the tables being written
*/
SortOrderProvider withFailOnUnmapped(boolean failOnUnmapped);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,49 +14,160 @@ class SortOrderProviderInternal {

interface SortOrderProviderImpl {
/**
* Returns the {@link SortOrder} to use when writing to the given table based on this {@link SortOrderProvider}.
* Returns the {@link SortOrder} to use for sorting the data when writing to the provided iceberg table.
*/
@NotNull
SortOrder getSortOrder(Table table);
SortOrder getSortOrderToUse(Table table);

/**
* Returns the sort order to write down to the iceberg table when appending new data. This may be different from
* the {@code getSortOrderToUse(table)} as this sort order may be customized. But
* {@link #getSortOrderToUse(Table)} should always {@link SortOrder#satisfies(SortOrder) satisfy} this sort
* order.
*/
@NotNull
default SortOrder getSortOrderToWrite(final Table table) {
return getSortOrderToUse(table);
}

/**
* Whether to fail if the sort order cannot be applied to the tables being written.
*/
boolean failOnUnmapped();
}

// Implementations of SortOrderProvider
enum DisableSorting implements SortOrderProvider, SortOrderProviderImpl {
enum DisableSorting implements SortOrderProviderImpl, SortOrderProvider {
INSTANCE;

@Override
@NotNull
public SortOrder getSortOrder(final Table table) {
public SortOrder getSortOrderToUse(final Table table) {
return SortOrder.unsorted();
}

@Override
@NotNull
public SortOrderProvider withFailOnUnmapped(final boolean failOnUnmapped) {
throw new UnsupportedOperationException("Cannot set failOnUnmapped for unsorted sort order provider");
}

@Override
public boolean failOnUnmapped() {
return true; // Should never fail as we are unsorted
}
}

enum TableDefaultSortOrderProvider implements SortOrderProvider, SortOrderProviderImpl {
INSTANCE;
static class TableDefaultSortOrderProvider implements SortOrderProvider, SortOrderProviderImpl {
private boolean failOnUnmapped;

TableDefaultSortOrderProvider() {
failOnUnmapped = false;
}

@Override
@NotNull
public SortOrder getSortOrder(final Table table) {
public SortOrder getSortOrderToUse(final Table table) {
final SortOrder sortOrder = table.sortOrder();
return sortOrder != null ? sortOrder : SortOrder.unsorted();
}

@Override
public SortOrderProvider withFailOnUnmapped(final boolean failOnUnmapped) {
this.failOnUnmapped = failOnUnmapped;
return this;
}

@Override
public boolean failOnUnmapped() {
return failOnUnmapped;
}
}

static class IdSortOrderProvider implements SortOrderProvider, SortOrderProviderImpl {
private boolean failOnUnmapped;
private final int sortOrderId;

IdSortOrderProvider(final int sortOrderId) {
super();
this.sortOrderId = sortOrderId;
}

@Override
@NotNull
public SortOrder getSortOrderToUse(final Table table) {
return getSortOrderForId(table, sortOrderId);
}

@Override
public SortOrderProvider withFailOnUnmapped(final boolean failOnUnmapped) {
this.failOnUnmapped = failOnUnmapped;
return this;
}

@Override
public boolean failOnUnmapped() {
return failOnUnmapped;
}
}

/**
* A {@link SortOrderProvider} that delegates to another {@link SortOrderProvider} for computing the sort order,
* while providing a custom sort order ID.
*/
static class DelegatingSortOrderProvider implements SortOrderProvider, SortOrderProviderImpl {
private SortOrderProvider sortOrderProvider;
private final int sortOrderId;

DelegatingSortOrderProvider(final SortOrderProvider sortOrderProvider, final int sortOrderId) {
this.sortOrderProvider = sortOrderProvider;
this.sortOrderId = sortOrderId;
}

@Override
@NotNull
public SortOrder getSortOrder(final Table table) {
if (!table.sortOrders().containsKey(sortOrderId)) {
throw new IllegalArgumentException("Sort order with ID " + sortOrderId + " not found for table " +
table);
public SortOrder getSortOrderToUse(final Table table) {
return ((SortOrderProviderImpl) sortOrderProvider).getSortOrderToUse(table);
}

@Override
@NotNull
public SortOrder getSortOrderToWrite(final Table table) {
final SortOrder sortOrderFromDelegate = getSortOrderToUse(table);
final SortOrder sortOrderForId = getSortOrderForId(table, sortOrderId);
if (!sortOrderFromDelegate.satisfies(sortOrderForId)) {
throw new IllegalArgumentException("Sort order with ID " + sortOrderId + " does not satisfy the " +
"table's sort order: " + sortOrderFromDelegate);
}
return table.sortOrders().get(sortOrderId);
return sortOrderForId;
}

@Override
@NotNull
public SortOrderProvider as(final int sortOrderId) {
return new DelegatingSortOrderProvider(sortOrderProvider, sortOrderId);
}

@Override
public SortOrderProvider withFailOnUnmapped(final boolean failOnUnmapped) {
sortOrderProvider = sortOrderProvider.withFailOnUnmapped(failOnUnmapped);
return this;
}

@Override
public boolean failOnUnmapped() {
return ((SortOrderProviderImpl) sortOrderProvider).failOnUnmapped();
}
}

// --------------------------------------------------------------------------------------------------

// Methods for extracting the sort order from the table
private static SortOrder getSortOrderForId(final Table table, final int sortOrderId) {
if (!table.sortOrders().containsKey(sortOrderId)) {
throw new IllegalArgumentException("Sort order with ID " + sortOrderId + " not found for table " +
table);
}
return table.sortOrders().get(sortOrderId);
}
}

0 comments on commit b98123e

Please sign in to comment.