Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: DH-18143: Improve handling of sort order for Iceberg tables #6646

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,19 @@ public final class IcebergUtils {
*/
public static Stream<DataFile> allDataFiles(@NotNull final Table table, @NotNull final Snapshot snapshot) {
return allManifestFiles(table, snapshot)
.map(manifestFile -> ManifestFiles.read(manifestFile, table.io()))
.flatMap(IcebergUtils::toStream);
.flatMap(manifestFile -> allDataFiles(table, manifestFile));
}

/**
* Get a stream of all {@link DataFile} objects from the given {@link Table} and {@link ManifestFile}.
*
* @param table The {@link Table} to retrieve data files for.
* @param manifestFile The {@link ManifestFile} to retrieve data files from.
*
* @return A stream of {@link DataFile} objects.
*/
public static Stream<DataFile> allDataFiles(@NotNull final Table table, @NotNull ManifestFile manifestFile) {
return toStream(ManifestFiles.read(manifestFile, table.io()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recently learned that the files themselves may have metadata, ie org.apache.iceberg.ManifestReader#spec. It makes me want to add caution extending these helper methods too far. While we aren't passing along ManifestReader#spec today, we may need to in the future and might need to model it as appropriate.

}

/**
Expand Down Expand Up @@ -117,6 +128,14 @@ static List<ManifestFile> allManifests(@NotNull final Table table, @NotNull fina
}
}

public static Map<ManifestFile, List<DataFile>> manifestToDataFiles(
@NotNull final Table table, @NotNull final Snapshot snapshot) {
return allManifestFiles(table, snapshot)
.collect(Collectors.toMap(
manifestFile -> manifestFile,
manifestFile -> allDataFiles(table, manifestFile).collect(Collectors.toList())));
}

/**
* Convert a {@link org.apache.iceberg.io.CloseableIterable} to a {@link Stream} that will close the iterable when
* the stream is closed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,30 @@
import io.deephaven.engine.table.impl.locations.TableLocation;
import io.deephaven.engine.table.impl.locations.impl.TableLocationFactory;
import io.deephaven.engine.table.impl.locations.util.TableDataRefreshService;
import io.deephaven.iceberg.util.IcebergTableAdapter;
import io.deephaven.parquet.table.ParquetInstructions;
import io.deephaven.parquet.table.location.ParquetTableLocation;
import io.deephaven.parquet.table.location.ParquetTableLocationKey;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

/**
* {@link TableLocationFactory} for Iceberg {@link TableLocation}s.
*/
public final class IcebergTableLocationFactory implements TableLocationFactory<TableKey, IcebergTableLocationKey> {
public IcebergTableLocationFactory() {}

final IcebergTableAdapter tableAdapter;

public IcebergTableLocationFactory(@NotNull final IcebergTableAdapter tableAdapter) {
this.tableAdapter = tableAdapter;
}

@Override
@NotNull
public TableLocation makeLocation(@NotNull final TableKey tableKey,
public TableLocation makeLocation(
@NotNull final TableKey tableKey,
@NotNull final IcebergTableLocationKey locationKey,
@Nullable final TableDataRefreshService refreshService) {
if (locationKey instanceof IcebergTableParquetLocationKey) {
return new ParquetTableLocation(tableKey, (ParquetTableLocationKey) locationKey,
return new IcebergTableParquetLocation(tableAdapter, tableKey, (IcebergTableParquetLocationKey) locationKey,
(ParquetInstructions) locationKey.readInstructions());
}
throw new UnsupportedOperationException("Unsupported location key type: " + locationKey.getClass());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
//
// Copyright (c) 2016-2025 Deephaven Data Labs and Patent Pending
//
package io.deephaven.iceberg.location;

import io.deephaven.api.ColumnName;
import io.deephaven.api.SortColumn;
import io.deephaven.engine.table.impl.locations.TableKey;
import io.deephaven.engine.table.impl.locations.TableLocation;
import io.deephaven.iceberg.util.IcebergTableAdapter;
import io.deephaven.parquet.table.ParquetInstructions;
import io.deephaven.parquet.table.location.ParquetTableLocation;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.NullOrder;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortDirection;
import org.apache.iceberg.SortField;
import org.apache.iceberg.SortOrder;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class IcebergTableParquetLocation extends ParquetTableLocation implements TableLocation {

@Nullable
private final List<SortColumn> sortedColumns;

public IcebergTableParquetLocation(
@NotNull final IcebergTableAdapter tableAdapter,
@NotNull final TableKey tableKey,
@NotNull final IcebergTableParquetLocationKey tableLocationKey,
@NotNull final ParquetInstructions readInstructions) {
super(tableKey, tableLocationKey, readInstructions);
sortedColumns = computeSortedColumns(tableAdapter, tableLocationKey.dataFile());
}

@Override
@NotNull
public List<SortColumn> getSortedColumns() {
return sortedColumns == null ? super.getSortedColumns() : sortedColumns;
}

@Nullable
private static List<SortColumn> computeSortedColumns(
@NotNull final IcebergTableAdapter tableAdapter,
@NotNull final DataFile dataFile) {
final Integer sortOrderId = dataFile.sortOrderId();
Comment on lines +42 to +50
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to think about behavior when unsorted (either b/c null or explicitly set to unsorted)...

// If sort order is missing or unknown, we cannot determine the sorted columns from the metadata and will
// check the underlying parquet file for the sorted columns, when the user asks for them.
if (sortOrderId == null) {
return null;
}
final SortOrder sortOrder = tableAdapter.icebergTable().sortOrders().get(sortOrderId);
if (sortOrder == null) {
return null;
}
if (sortOrder.isUnsorted()) {
return Collections.emptyList();
}
final Schema schema = sortOrder.schema();
final List<SortColumn> sortColumns = new ArrayList<>(sortOrder.fields().size());
for (final SortField field : sortOrder.fields()) {
if (!field.transform().isIdentity()) {
// TODO (DH-18160): Improve support for handling non-identity transforms
break;
}
final ColumnName columnName = ColumnName.of(schema.findColumnName(field.sourceId()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might throw an InvalidNameException; we might need to wait for some Resolver work I'm doing in https://deephaven.atlassian.net/browse/DH-18365 to land so we can properly map field ids.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would not mark this as resolved yet

final SortColumn sortColumn;
if (field.nullOrder() == NullOrder.NULLS_FIRST && field.direction() == SortDirection.ASC) {
sortColumn = SortColumn.asc(columnName);
} else if (field.nullOrder() == NullOrder.NULLS_LAST && field.direction() == SortDirection.DESC) {
sortColumn = SortColumn.desc(columnName);
} else {
Comment on lines +72 to +76
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should raise the issue of null-first, nulls-last with the engine team. Arguably, this is something we should want to support.

Additionally, we may need to hold of on handling any floating point columns.

-NaN < -Infinity < -value < -0 < 0 < value < Infinity < NaN, https://iceberg.apache.org/spec/#sorting

The -NaN v NaN is something I have not seen before, but another issue to raise w/ engine team.

In the meantime, I think the strategy of breaking and returning what we have so far should be OK.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// TODO Check with Devin if this is okay, The assumption here is that deephaven sorts nulls first for
// ascending order and nulls last for descending, so if we don't have the correct nulls order, we
// cannot use the column as a sort column
break;
}
sortColumns.add(sortColumn);
}
return Collections.unmodifiableList(sortColumns);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ public class IcebergTableParquetLocationKey extends ParquetTableLocationKey impl
@Nullable
private final TableIdentifier tableIdentifier;

@NotNull
private final DataFile dataFile;

/**
* The {@link DataFile#dataSequenceNumber()} of the data file backing this keyed location.
*/
Expand Down Expand Up @@ -102,6 +105,8 @@ public IcebergTableParquetLocationKey(
// tableUUID was null
this.tableIdentifier = tableUuid != null ? null : tableIdentifier;

this.dataFile = dataFile;

// Files with unknown sequence numbers should be ordered first
dataSequenceNumber = dataFile.dataSequenceNumber() != null ? dataFile.dataSequenceNumber() : Long.MIN_VALUE;
fileSequenceNumber = dataFile.fileSequenceNumber() != null ? dataFile.fileSequenceNumber() : Long.MIN_VALUE;
Expand All @@ -124,6 +129,11 @@ public ParquetInstructions readInstructions() {
return readInstructions;
}

@NotNull
DataFile dataFile() {
return dataFile;
}

/**
* When comparing with another {@link IcebergTableParquetLocationKey}, precedence-wise this implementation compares:
* <ul>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ public IcebergTable table(@NotNull final IcebergReadInstructions readInstruction
new IcebergStaticTableLocationProvider<>(
StandaloneTableKey.getInstance(),
keyFinder,
new IcebergTableLocationFactory(),
new IcebergTableLocationFactory(this),
tableIdentifier);

return new IcebergTableImpl(
Expand All @@ -419,14 +419,14 @@ public IcebergTable table(@NotNull final IcebergReadInstructions readInstruction
locationProvider = new IcebergManualRefreshTableLocationProvider<>(
StandaloneTableKey.getInstance(),
keyFinder,
new IcebergTableLocationFactory(),
new IcebergTableLocationFactory(this),
this,
tableIdentifier);
} else {
locationProvider = new IcebergAutoRefreshTableLocationProvider<>(
StandaloneTableKey.getInstance(),
keyFinder,
new IcebergTableLocationFactory(),
new IcebergTableLocationFactory(this),
TableDataRefreshService.getSharedRefreshService(),
updatedInstructions.updateMode().autoRefreshMs(),
this,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
//
package io.deephaven.iceberg.util;

import io.deephaven.api.ColumnName;
import io.deephaven.api.SortColumn;
import io.deephaven.base.Pair;
import io.deephaven.base.verify.Require;
import io.deephaven.engine.context.ExecutionContext;
Expand All @@ -22,10 +24,14 @@
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.NullOrder;
import org.apache.iceberg.PartitionData;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortDirection;
import org.apache.iceberg.SortField;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.Transaction;
Expand Down Expand Up @@ -70,7 +76,7 @@ public class IcebergTableWriter {

/**
* Store the partition spec of the Iceberg table at the time of creation of this writer instance and use it for all
* writes, so that even if the table spec, the writer will still work.
* writes, so that even if the table spec changes, the writer will still work.
*/
private final PartitionSpec tableSpec;

Expand All @@ -93,6 +99,9 @@ public class IcebergTableWriter {

/**
* Mapping from Iceberg field IDs to Deephaven column names, populated inside the parquet file.
* <p>
* Use this map instead of the {@link TableWriterOptions#fieldIdToColumnName()} map after initialization to ensure
* that all columns in the table definition are accounted for.
*/
private final Map<Integer, String> fieldIdToColumnName;

Expand All @@ -101,6 +110,16 @@ public class IcebergTableWriter {
*/
private final OutputFileFactory outputFileFactory;

/**
* The sort order to write down for new data files.
*/
private final SortOrder sortOrderToWrite;

/**
* The names of columns on which the tables will be sorted before writing to Iceberg.
*/
private final Collection<SortColumn> sortColumnNames;

/**
* Characters to be used for generating random variable names of length {@link #VARIABLE_NAME_LENGTH}.
*/
Expand Down Expand Up @@ -131,6 +150,12 @@ public class IcebergTableWriter {
outputFileFactory = OutputFileFactory.builderFor(table, 0, 0)
.format(FileFormat.PARQUET)
.build();

final SortOrderProviderInternal.SortOrderProviderImpl sortOrderProvider =
((SortOrderProviderInternal.SortOrderProviderImpl) tableWriterOptions.sortOrderProvider());
sortColumnNames =
computeSortColumns(sortOrderProvider.getSortOrderToUse(table), sortOrderProvider.failOnUnmapped());
sortOrderToWrite = sortOrderProvider.getSortOrderToWrite(table);
}

private static TableParquetWriterOptions verifyWriterOptions(
Expand Down Expand Up @@ -260,6 +285,42 @@ private Map<String, Integer> readNameMappingDefault() {
return nameMappingDefault;
}

private List<SortColumn> computeSortColumns(@NotNull final SortOrder sortOrder, final boolean failOnUnmapped) {
if (sortOrder.isUnsorted()) {
return List.of();
}
final List<SortField> sortFields = sortOrder.fields();
final List<SortColumn> sortColumns = new ArrayList<>(sortFields.size());
for (final SortField sortField : sortOrder.fields()) {
final boolean ascending;
if (sortField.nullOrder() == NullOrder.NULLS_FIRST && sortField.direction() == SortDirection.ASC) {
ascending = true;
} else if (sortField.nullOrder() == NullOrder.NULLS_LAST && sortField.direction() == SortDirection.DESC) {
ascending = false;
} else {
if (failOnUnmapped) {
throw new UnsupportedOperationException(
"Cannot apply sort order " + sortOrder + " since Deephaven currently only supports " +
"sorting by {ASC, NULLS FIRST} or {DESC, NULLS LAST}");
}
return List.of();
}
final int fieldId = sortField.sourceId();
final String columnName = fieldIdToColumnName.get(fieldId);
if (columnName == null) {
if (failOnUnmapped) {
throw new IllegalArgumentException("Cannot apply sort order " + sortOrder + " since column " +
"corresponding to field ID " + fieldId + " not found in schema");
}
return List.of();
}
final SortColumn sortColumn =
ascending ? SortColumn.asc(ColumnName.of(columnName)) : SortColumn.desc(ColumnName.of(columnName));
sortColumns.add(sortColumn);
}
return sortColumns;
}

/**
* Append the provided Deephaven {@link IcebergWriteInstructions#tables()} as new partitions to the existing Iceberg
* table in a single snapshot. This method will not perform any compatibility checks between the existing schema and
Expand Down Expand Up @@ -438,6 +499,16 @@ private static String generateRandomAlphabetString(final int length) {
return stringBuilder.toString();
}

/**
* Write the provided Deephaven tables to parquet files and return a list of {@link CompletedParquetWrite} objects
* for each table written.
*
* @param partitionDataList The list of {@link PartitionData} objects for each table, empty if the table is not
* partitioned.
* @param dhTableUpdateStrings The list of update strings to be applied using {@link Table#updateView}, empty if the
* table is not partitioned.
* @param writeInstructions The instructions for customizations while writing.
*/
@NotNull
private List<CompletedParquetWrite> writeParquet(
@NotNull final List<PartitionData> partitionDataList,
Expand Down Expand Up @@ -475,6 +546,16 @@ private List<CompletedParquetWrite> writeParquet(
} else {
newDataLocation = getDataLocation();
}

if (!sortColumnNames.isEmpty()) {
try {
dhTable = dhTable.sort(sortColumnNames);
} catch (final RuntimeException e) {
throw new IllegalArgumentException("Failed to sort table " + dhTable + " by columns " +
sortColumnNames + ", retry after disabling applying sort order in write instructions", e);
}
}

// TODO (deephaven-core#6343): Set writeDefault() values for required columns that not present in the table
ParquetTools.writeTable(dhTable, newDataLocation, parquetInstructions);
}
Expand All @@ -500,8 +581,7 @@ private String getDataLocation() {
/**
* Commit the changes to the Iceberg table by creating a snapshot.
*/
private void commit(
@NotNull final Iterable<DataFile> dataFiles) {
private void commit(@NotNull final Iterable<DataFile> dataFiles) {
final Transaction icebergTransaction = table.newTransaction();

// Append the new data files to the table
Expand All @@ -528,7 +608,8 @@ private List<DataFile> dataFilesFromParquet(
.withPath(completedWrite.destination().toString())
.withFormat(FileFormat.PARQUET)
.withRecordCount(completedWrite.numRows())
.withFileSizeInBytes(completedWrite.numBytes());
.withFileSizeInBytes(completedWrite.numBytes())
.withSortOrder(sortOrderToWrite);
if (partitionSpec.isPartitioned()) {
dataFileBuilder.withPartition(partitionDataList.get(idx));
}
Expand Down
Loading