Skip to content

Commit

Permalink
Added write instruction to apply sort order
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam committed Feb 14, 2025
1 parent 58adc11 commit 54e1f08
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ private List<SortColumn> computeSortedColumns() {
sortColumn = SortColumn.desc(columnName);
} else {
// TODO Check with Devin if this is okay, The assumption here is that deephaven sorts nulls first for
// ascending order and nulls last for descending, so if we don't have the correct nulls order, we
// cannot use the column as a sort column
// ascending order and nulls last for descending, so if we don't have the correct nulls order, we
// cannot use the column as a sort column
break;
}
sortColumns.add(sortColumn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//
package io.deephaven.iceberg.util;

import io.deephaven.api.ColumnName;
import io.deephaven.api.SortColumn;
import io.deephaven.base.Pair;
import io.deephaven.base.verify.Require;
Expand All @@ -29,6 +30,8 @@
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortDirection;
import org.apache.iceberg.SortField;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
Expand Down Expand Up @@ -75,7 +78,7 @@ public class IcebergTableWriter {

/**
* Store the partition spec of the Iceberg table at the time of creation of this writer instance and use it for all
* writes, so that even if the table spec, the writer will still work.
* writes, so that even if the table spec changes, the writer will still work.
*/
private final PartitionSpec tableSpec;

Expand All @@ -98,6 +101,9 @@ public class IcebergTableWriter {

/**
* Mapping from Iceberg field IDs to Deephaven column names, populated inside the parquet file.
* <p>
* Use this map instead of the {@link TableWriterOptions#fieldIdToColumnName()} map after initialization to ensure
* that all columns in the table definition are accounted for.
*/
private final Map<Integer, String> fieldIdToColumnName;

Expand Down Expand Up @@ -293,26 +299,28 @@ public List<DataFile> writeDataFiles(@NotNull final IcebergWriteInstructions wri
final List<CompletedParquetWrite> parquetFileInfo;
// Start a new query scope to avoid polluting the existing query scope with new parameters added for
// partitioning columns
List<SortOrder> sortOrders = new ArrayList<>();
try (final SafeCloseable _ignore =
ExecutionContext.getContext().withQueryScope(new StandaloneQueryScope()).open()) {
final Pair<List<PartitionData>, List<String[]>> ret = partitionDataFromPaths(tableSpec, partitionPaths);
partitionData = ret.getFirst();
final List<String[]> dhTableUpdateStrings = ret.getSecond();
parquetFileInfo = writeParquet(partitionData, dhTableUpdateStrings, writeInstructions);
parquetFileInfo = writeParquet(partitionData, dhTableUpdateStrings, writeInstructions, sortOrders);
}

final List<SortOrder> sortOrders = addSortOrdersToTableMetadata(writeInstructions);
if (sortOrders.isEmpty()) {
sortOrders = computeSortOrder(writeInstructions.tables());
}
return dataFilesFromParquet(parquetFileInfo, partitionData, sortOrders);
}

/**
* Iterate over all the tables being written and add the corresponding sort orders to the iceberg table metadata.
*
* @return A list of sort orders, one for each table being written, or an empty list if no sort orders were added.
* Iterate over all the tables being written and return a list of sort orders, one for each table. If none of the
* tables are sorted, return an empty list. Additionally, add the corresponding sort orders to the Iceberg table
* metadata.
*/
private List<SortOrder> addSortOrdersToTableMetadata(
@NotNull final IcebergWriteInstructions writeInstructions) {
final List<Table> dhTables = writeInstructions.tables();
private List<SortOrder> computeSortOrder(
@NotNull final List<Table> dhTables) {
final TableMetadata newMetadata;
final List<SortOrder> icebergSortOrders = new ArrayList<>();
if (table instanceof HasTableOperations) {
Expand Down Expand Up @@ -531,11 +539,24 @@ private static String generateRandomAlphabetString(final int length) {
return stringBuilder.toString();
}

/**
* Write the provided Deephaven tables to parquet files and return a list of {@link CompletedParquetWrite} objects
* for each table written.
*
* @param partitionDataList The list of {@link PartitionData} objects for each table, empty if the table is not
* partitioned.
* @param dhTableUpdateStrings The list of update strings to be applied using {@link Table#updateView}, empty if the
* table is not partitioned.
* @param writeInstructions The instructions for customizations while writing.
* @param sortOrders The list of sort orders for each table, to be populated by this method if the tables being
* written are sorted by this method.
*/
@NotNull
private List<CompletedParquetWrite> writeParquet(
@NotNull final List<PartitionData> partitionDataList,
@NotNull final List<String[]> dhTableUpdateStrings,
@NotNull final IcebergWriteInstructions writeInstructions) {
@NotNull final IcebergWriteInstructions writeInstructions,
@NotNull final List<SortOrder> sortOrders) {
final List<Table> dhTables = writeInstructions.tables();
final boolean isPartitioned = tableSpec.isPartitioned();
if (isPartitioned) {
Expand All @@ -554,6 +575,15 @@ private List<CompletedParquetWrite> writeParquet(
final ParquetInstructions parquetInstructions = tableWriterOptions.toParquetInstructions(
onWriteCompleted, tableDefinition, fieldIdToColumnName);

final Collection<SortColumn> sortColumnNames;
if (writeInstructions.applySortOrder()) {
sortColumnNames = new ArrayList<>(getSortColumnNames());
} else {
sortColumnNames = List.of();
}

final SortOrder sortOrder = table.sortOrder();

// Write the data to parquet files
for (int idx = 0; idx < dhTables.size(); idx++) {
Table dhTable = dhTables.get(idx);
Expand All @@ -568,12 +598,54 @@ private List<CompletedParquetWrite> writeParquet(
} else {
newDataLocation = getDataLocation();
}

if (!sortColumnNames.isEmpty()) {
try {
dhTable = dhTable.sort(sortColumnNames);
} catch (final RuntimeException e) {
throw new IllegalArgumentException("Failed to sort table " + dhTable + " by columns " +
sortColumnNames + ", retry after disabling applying sort order in write instructions", e);
}
sortOrders.add(sortOrder);
}

// TODO (deephaven-core#6343): Set writeDefault() values for required columns that not present in the table
ParquetTools.writeTable(dhTable, newDataLocation, parquetInstructions);
}
return parquetFilesWritten;
}

private List<SortColumn> getSortColumnNames() {
final SortOrder sortOrder = table.sortOrder();
if (sortOrder.isUnsorted()) {
return List.of();
}
final List<SortField> sortFields = sortOrder.fields();
final List<SortColumn> sortColumns = new ArrayList<>(sortFields.size());
for (final SortField sortField : sortOrder.fields()) {
final boolean ascending;
if (sortField.nullOrder() == NullOrder.NULLS_FIRST && sortField.direction() == SortDirection.ASC) {
ascending = true;
} else if (sortField.nullOrder() == NullOrder.NULLS_LAST && sortField.direction() == SortDirection.DESC) {
ascending = false;
} else {
// Cannot enforce this order using deephaven
// TODO Verify this assumption with Devin
return List.of();
}
final int fieldId = sortField.sourceId();
final String columnName = fieldIdToColumnName.get(fieldId);
if (columnName == null) {
// Could not find the column name in current schema, so we cannot sort by it
return List.of();
}
final SortColumn sortColumn =
ascending ? SortColumn.asc(ColumnName.of(columnName)) : SortColumn.desc(ColumnName.of(columnName));
sortColumns.add(sortColumn);
}
return sortColumns;
}

/**
* Generate the location string for a new data file for the given partition data.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ public static Builder builder() {
*/
public abstract List<String> partitionPaths();

/**
* Whether to apply the {@link org.apache.iceberg.Table#sortOrder()} of the iceberg table. If true, the tables will
* be sorted before writing to the iceberg table. By default, this is false.
*/
@Value.Default
public boolean applySortOrder() {
return false;
}

// @formatter:off
public interface Builder {
// @formatter:on
Expand All @@ -56,6 +65,8 @@ public interface Builder {

Builder addAllPartitionPaths(Iterable<String> elements);

Builder applySortOrder(boolean applySortOrder);

IcebergWriteInstructions build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1140,4 +1140,51 @@ private void verifySortOrder(
}
assertThat(actualSortOrders).containsExactlyInAnyOrderElementsOf(expectedSortOrders);
}

@Test
void testEnforceSortOrder() {
final Table source = TableTools.newTable(
intCol("intCol", 15, 0, 32, 33, 19),
doubleCol("doubleCol", 10.5, 2.5, 3.5, 40.5, 0.5),
longCol("longCol", 20L, 50L, 0L, 10L, 5L));
final TableIdentifier tableIdentifier = TableIdentifier.parse("MyNamespace.MyTable");
final IcebergTableAdapter tableAdapter = catalogAdapter.createTable(tableIdentifier, source.getDefinition());
final IcebergTableWriter tableWriter = tableAdapter.tableWriter(writerOptionsBuilder()
.tableDefinition(source.getDefinition())
.build());
tableWriter.append(IcebergWriteInstructions.builder()
.addTables(source)
.build());

// Verify that the data file is not sorted
verifySortOrder(tableAdapter, tableIdentifier, List.of(List.of()));

// Update the sort order of the underlying iceberg table
final org.apache.iceberg.Table icebergTable = tableAdapter.icebergTable();
assertThat(icebergTable.sortOrder().fields()).hasSize(0);
icebergTable.replaceSortOrder().asc("intCol").commit();
assertThat(icebergTable.sortOrder().fields()).hasSize(1);

// Append more unsorted data to the table with enforcing sort order
tableWriter.append(IcebergWriteInstructions.builder()
.addTables(source)
.applySortOrder(true)
.build());

// Verify that the new data file is sorted
verifySortOrder(tableAdapter, tableIdentifier, List.of(
List.of(),
List.of(SortColumn.asc(ColumnName.of("intCol")))));

// Append more unsorted data to the table without enforcing sort order
tableWriter.append(IcebergWriteInstructions.builder()
.addTables(source)
.build());

// Verify that the new data file is not sorted
verifySortOrder(tableAdapter, tableIdentifier, List.of(
List.of(),
List.of(SortColumn.asc(ColumnName.of("intCol"))),
List.of()));
}
}

0 comments on commit 54e1f08

Please sign in to comment.