Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: DH-18143: Improve handling of sort order for Iceberg tables #6646

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,19 @@ public final class IcebergUtils {
*/
public static Stream<DataFile> allDataFiles(@NotNull final Table table, @NotNull final Snapshot snapshot) {
return allManifestFiles(table, snapshot)
.map(manifestFile -> ManifestFiles.read(manifestFile, table.io()))
.flatMap(IcebergUtils::toStream);
.flatMap(manifestFile -> allDataFiles(table, manifestFile));
}

/**
* Get a stream of all {@link DataFile} objects from the given {@link Table} and {@link ManifestFile}.
*
* @param table The {@link Table} to retrieve data files for.
* @param manifestFile The {@link ManifestFile} to retrieve data files from.
*
* @return A stream of {@link DataFile} objects.
*/
public static Stream<DataFile> allDataFiles(@NotNull final Table table, @NotNull ManifestFile manifestFile) {
return toStream(ManifestFiles.read(manifestFile, table.io()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recently learned that the files themselves may have metadata, ie org.apache.iceberg.ManifestReader#spec. It makes me want to add caution extending these helper methods too far. While we aren't passing along ManifestReader#spec today, we may need to in the future and might need to model it as appropriate.

}

/**
Expand Down Expand Up @@ -117,6 +128,14 @@ static List<ManifestFile> allManifests(@NotNull final Table table, @NotNull fina
}
}

public static Map<ManifestFile, List<DataFile>> manifestToDataFiles(
@NotNull final Table table, @NotNull final Snapshot snapshot) {
return allManifestFiles(table, snapshot)
.collect(Collectors.toMap(
manifestFile -> manifestFile,
manifestFile -> allDataFiles(table, manifestFile).collect(Collectors.toList())));
}

/**
* Convert a {@link org.apache.iceberg.io.CloseableIterable} to a {@link Stream} that will close the iterable when
* the stream is closed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,30 @@
import io.deephaven.engine.table.impl.locations.TableLocation;
import io.deephaven.engine.table.impl.locations.impl.TableLocationFactory;
import io.deephaven.engine.table.impl.locations.util.TableDataRefreshService;
import io.deephaven.iceberg.util.IcebergTableAdapter;
import io.deephaven.parquet.table.ParquetInstructions;
import io.deephaven.parquet.table.location.ParquetTableLocation;
import io.deephaven.parquet.table.location.ParquetTableLocationKey;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

/**
* {@link TableLocationFactory} for Iceberg {@link TableLocation}s.
*/
public final class IcebergTableLocationFactory implements TableLocationFactory<TableKey, IcebergTableLocationKey> {
public IcebergTableLocationFactory() {}

final IcebergTableAdapter tableAdapter;

public IcebergTableLocationFactory(@NotNull final IcebergTableAdapter tableAdapter) {
this.tableAdapter = tableAdapter;
}

@Override
@NotNull
public TableLocation makeLocation(@NotNull final TableKey tableKey,
public TableLocation makeLocation(
@NotNull final TableKey tableKey,
@NotNull final IcebergTableLocationKey locationKey,
@Nullable final TableDataRefreshService refreshService) {
if (locationKey instanceof IcebergTableParquetLocationKey) {
return new ParquetTableLocation(tableKey, (ParquetTableLocationKey) locationKey,
return new IcebergTableParquetLocation(tableAdapter, tableKey, (IcebergTableParquetLocationKey) locationKey,
(ParquetInstructions) locationKey.readInstructions());
}
throw new UnsupportedOperationException("Unsupported location key type: " + locationKey.getClass());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
//
// Copyright (c) 2016-2025 Deephaven Data Labs and Patent Pending
//
package io.deephaven.iceberg.location;

import io.deephaven.api.ColumnName;
import io.deephaven.api.SortColumn;
import io.deephaven.engine.table.impl.locations.TableKey;
import io.deephaven.engine.table.impl.locations.TableLocation;
import io.deephaven.iceberg.util.IcebergTableAdapter;
import io.deephaven.parquet.table.ParquetInstructions;
import io.deephaven.parquet.table.location.ParquetTableLocation;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.NullOrder;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortDirection;
import org.apache.iceberg.SortField;
import org.apache.iceberg.SortOrder;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class IcebergTableParquetLocation extends ParquetTableLocation implements TableLocation {

@Nullable
private final List<SortColumn> sortedColumns;

public IcebergTableParquetLocation(
@NotNull final IcebergTableAdapter tableAdapter,
@NotNull final TableKey tableKey,
@NotNull final IcebergTableParquetLocationKey tableLocationKey,
@NotNull final ParquetInstructions readInstructions) {
super(tableKey, tableLocationKey, readInstructions);
sortedColumns = computeSortedColumns(tableAdapter, tableLocationKey.dataFile());
}

@Override
@NotNull
public List<SortColumn> getSortedColumns() {
return sortedColumns == null ? super.getSortedColumns() : sortedColumns;
}

@Nullable
private static List<SortColumn> computeSortedColumns(
@NotNull final IcebergTableAdapter tableAdapter,
@NotNull final DataFile dataFile) {
final Integer sortOrderId = dataFile.sortOrderId();
Comment on lines +42 to +50
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to think about behavior when unsorted (either b/c null or explicitly set to unsorted)...

// If sort order is missing or unknown, we cannot determine the sorted columns from the metadata and will
// check the underlying parquet file for the sorted columns, when the user asks for them.
if (sortOrderId == null) {
return null;
}
final SortOrder sortOrder = tableAdapter.icebergTable().sortOrders().get(sortOrderId);
if (sortOrder == null) {
return null;
}
if (sortOrder.isUnsorted()) {
return Collections.emptyList();
}
final Schema schema = sortOrder.schema();
final List<SortColumn> sortColumns = new ArrayList<>(sortOrder.fields().size());
for (final SortField field : sortOrder.fields()) {
if (!field.transform().isIdentity()) {
// TODO (DH-18160): Improve support for handling non-identity transforms
break;
}
final ColumnName columnName = ColumnName.of(schema.findColumnName(field.sourceId()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might throw an InvalidNameException; we might need to wait for some Resolver work I'm doing in https://deephaven.atlassian.net/browse/DH-18365 to land so we can properly map field ids.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would not mark this as resolved yet

final SortColumn sortColumn;
if (field.nullOrder() == NullOrder.NULLS_FIRST && field.direction() == SortDirection.ASC) {
sortColumn = SortColumn.asc(columnName);
} else if (field.nullOrder() == NullOrder.NULLS_LAST && field.direction() == SortDirection.DESC) {
sortColumn = SortColumn.desc(columnName);
} else {
Comment on lines +72 to +76
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should raise the issue of null-first, nulls-last with the engine team. Arguably, this is something we should want to support.

Additionally, we may need to hold of on handling any floating point columns.

-NaN < -Infinity < -value < -0 < 0 < value < Infinity < NaN, https://iceberg.apache.org/spec/#sorting

The -NaN v NaN is something I have not seen before, but another issue to raise w/ engine team.

In the meantime, I think the strategy of breaking and returning what we have so far should be OK.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

break;
}
sortColumns.add(sortColumn);
}
return Collections.unmodifiableList(sortColumns);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ public class IcebergTableParquetLocationKey extends ParquetTableLocationKey impl
@Nullable
private final TableIdentifier tableIdentifier;

@NotNull
private final DataFile dataFile;

/**
* The {@link DataFile#dataSequenceNumber()} of the data file backing this keyed location.
*/
Expand Down Expand Up @@ -102,6 +105,8 @@ public IcebergTableParquetLocationKey(
// tableUUID was null
this.tableIdentifier = tableUuid != null ? null : tableIdentifier;

this.dataFile = dataFile;

// Files with unknown sequence numbers should be ordered first
dataSequenceNumber = dataFile.dataSequenceNumber() != null ? dataFile.dataSequenceNumber() : Long.MIN_VALUE;
fileSequenceNumber = dataFile.fileSequenceNumber() != null ? dataFile.fileSequenceNumber() : Long.MIN_VALUE;
Expand All @@ -124,6 +129,11 @@ public ParquetInstructions readInstructions() {
return readInstructions;
}

@NotNull
DataFile dataFile() {
return dataFile;
}

/**
* When comparing with another {@link IcebergTableParquetLocationKey}, precedence-wise this implementation compares:
* <ul>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ public IcebergTable table(@NotNull final IcebergReadInstructions readInstruction
new IcebergStaticTableLocationProvider<>(
StandaloneTableKey.getInstance(),
keyFinder,
new IcebergTableLocationFactory(),
new IcebergTableLocationFactory(this),
tableIdentifier);

return new IcebergTableImpl(
Expand All @@ -419,14 +419,14 @@ public IcebergTable table(@NotNull final IcebergReadInstructions readInstruction
locationProvider = new IcebergManualRefreshTableLocationProvider<>(
StandaloneTableKey.getInstance(),
keyFinder,
new IcebergTableLocationFactory(),
new IcebergTableLocationFactory(this),
this,
tableIdentifier);
} else {
locationProvider = new IcebergAutoRefreshTableLocationProvider<>(
StandaloneTableKey.getInstance(),
keyFinder,
new IcebergTableLocationFactory(),
new IcebergTableLocationFactory(this),
TableDataRefreshService.getSharedRefreshService(),
updatedInstructions.updateMode().autoRefreshMs(),
this,
Expand Down
Loading