diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/base/IcebergUtils.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/base/IcebergUtils.java index fadd1a7062d..bd63a76d310 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/base/IcebergUtils.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/base/IcebergUtils.java @@ -75,8 +75,19 @@ public final class IcebergUtils { */ public static Stream allDataFiles(@NotNull final Table table, @NotNull final Snapshot snapshot) { return allManifestFiles(table, snapshot) - .map(manifestFile -> ManifestFiles.read(manifestFile, table.io())) - .flatMap(IcebergUtils::toStream); + .flatMap(manifestFile -> allDataFiles(table, manifestFile)); + } + + /** + * Get a stream of all {@link DataFile} objects from the given {@link Table} and {@link ManifestFile}. + * + * @param table The {@link Table} to retrieve data files for. + * @param manifestFile The {@link ManifestFile} to retrieve data files from. + * + * @return A stream of {@link DataFile} objects. + */ + public static Stream allDataFiles(@NotNull final Table table, @NotNull ManifestFile manifestFile) { + return toStream(ManifestFiles.read(manifestFile, table.io())); } /** @@ -117,6 +128,14 @@ static List allManifests(@NotNull final Table table, @NotNull fina } } + public static Map> manifestToDataFiles( + @NotNull final Table table, @NotNull final Snapshot snapshot) { + return allManifestFiles(table, snapshot) + .collect(Collectors.toMap( + manifestFile -> manifestFile, + manifestFile -> allDataFiles(table, manifestFile).collect(Collectors.toList()))); + } + /** * Convert a {@link org.apache.iceberg.io.CloseableIterable} to a {@link Stream} that will close the iterable when * the stream is closed. diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/location/IcebergTableLocationFactory.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/location/IcebergTableLocationFactory.java index aa96e3ecffd..ce19dba47d9 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/location/IcebergTableLocationFactory.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/location/IcebergTableLocationFactory.java @@ -7,9 +7,8 @@ import io.deephaven.engine.table.impl.locations.TableLocation; import io.deephaven.engine.table.impl.locations.impl.TableLocationFactory; import io.deephaven.engine.table.impl.locations.util.TableDataRefreshService; +import io.deephaven.iceberg.util.IcebergTableAdapter; import io.deephaven.parquet.table.ParquetInstructions; -import io.deephaven.parquet.table.location.ParquetTableLocation; -import io.deephaven.parquet.table.location.ParquetTableLocationKey; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -17,15 +16,21 @@ * {@link TableLocationFactory} for Iceberg {@link TableLocation}s. */ public final class IcebergTableLocationFactory implements TableLocationFactory { - public IcebergTableLocationFactory() {} + + final IcebergTableAdapter tableAdapter; + + public IcebergTableLocationFactory(@NotNull final IcebergTableAdapter tableAdapter) { + this.tableAdapter = tableAdapter; + } @Override @NotNull - public TableLocation makeLocation(@NotNull final TableKey tableKey, + public TableLocation makeLocation( + @NotNull final TableKey tableKey, @NotNull final IcebergTableLocationKey locationKey, @Nullable final TableDataRefreshService refreshService) { if (locationKey instanceof IcebergTableParquetLocationKey) { - return new ParquetTableLocation(tableKey, (ParquetTableLocationKey) locationKey, + return new IcebergTableParquetLocation(tableAdapter, tableKey, (IcebergTableParquetLocationKey) locationKey, (ParquetInstructions) locationKey.readInstructions()); } throw new UnsupportedOperationException("Unsupported location key type: " + locationKey.getClass()); diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/location/IcebergTableParquetLocation.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/location/IcebergTableParquetLocation.java new file mode 100644 index 00000000000..c46c2ce95ed --- /dev/null +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/location/IcebergTableParquetLocation.java @@ -0,0 +1,83 @@ +// +// Copyright (c) 2016-2025 Deephaven Data Labs and Patent Pending +// +package io.deephaven.iceberg.location; + +import io.deephaven.api.ColumnName; +import io.deephaven.api.SortColumn; +import io.deephaven.engine.table.impl.locations.TableKey; +import io.deephaven.engine.table.impl.locations.TableLocation; +import io.deephaven.iceberg.util.IcebergTableAdapter; +import io.deephaven.parquet.table.ParquetInstructions; +import io.deephaven.parquet.table.location.ParquetTableLocation; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.NullOrder; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortDirection; +import org.apache.iceberg.SortField; +import org.apache.iceberg.SortOrder; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class IcebergTableParquetLocation extends ParquetTableLocation implements TableLocation { + + @Nullable + private final List sortedColumns; + + public IcebergTableParquetLocation( + @NotNull final IcebergTableAdapter tableAdapter, + @NotNull final TableKey tableKey, + @NotNull final IcebergTableParquetLocationKey tableLocationKey, + @NotNull final ParquetInstructions readInstructions) { + super(tableKey, tableLocationKey, readInstructions); + sortedColumns = computeSortedColumns(tableAdapter, tableLocationKey.dataFile()); + } + + @Override + @NotNull + public List getSortedColumns() { + return sortedColumns == null ? super.getSortedColumns() : sortedColumns; + } + + @Nullable + private static List computeSortedColumns( + @NotNull final IcebergTableAdapter tableAdapter, + @NotNull final DataFile dataFile) { + final Integer sortOrderId = dataFile.sortOrderId(); + // If sort order is missing or unknown, we cannot determine the sorted columns from the metadata and will + // check the underlying parquet file for the sorted columns, when the user asks for them. + if (sortOrderId == null) { + return null; + } + final SortOrder sortOrder = tableAdapter.icebergTable().sortOrders().get(sortOrderId); + if (sortOrder == null) { + return null; + } + if (sortOrder.isUnsorted()) { + return Collections.emptyList(); + } + final Schema schema = sortOrder.schema(); + final List sortColumns = new ArrayList<>(sortOrder.fields().size()); + for (final SortField field : sortOrder.fields()) { + if (!field.transform().isIdentity()) { + // TODO (DH-18160): Improve support for handling non-identity transforms + break; + } + final ColumnName columnName = ColumnName.of(schema.findColumnName(field.sourceId())); + final SortColumn sortColumn; + if (field.nullOrder() == NullOrder.NULLS_FIRST && field.direction() == SortDirection.ASC) { + sortColumn = SortColumn.asc(columnName); + } else if (field.nullOrder() == NullOrder.NULLS_LAST && field.direction() == SortDirection.DESC) { + sortColumn = SortColumn.desc(columnName); + } else { + break; + } + sortColumns.add(sortColumn); + } + return Collections.unmodifiableList(sortColumns); + } +} diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/location/IcebergTableParquetLocationKey.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/location/IcebergTableParquetLocationKey.java index 5998a748c21..8bde42151ef 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/location/IcebergTableParquetLocationKey.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/location/IcebergTableParquetLocationKey.java @@ -41,6 +41,9 @@ public class IcebergTableParquetLocationKey extends ParquetTableLocationKey impl @Nullable private final TableIdentifier tableIdentifier; + @NotNull + private final DataFile dataFile; + /** * The {@link DataFile#dataSequenceNumber()} of the data file backing this keyed location. */ @@ -102,6 +105,8 @@ public IcebergTableParquetLocationKey( // tableUUID was null this.tableIdentifier = tableUuid != null ? null : tableIdentifier; + this.dataFile = dataFile; + // Files with unknown sequence numbers should be ordered first dataSequenceNumber = dataFile.dataSequenceNumber() != null ? dataFile.dataSequenceNumber() : Long.MIN_VALUE; fileSequenceNumber = dataFile.fileSequenceNumber() != null ? dataFile.fileSequenceNumber() : Long.MIN_VALUE; @@ -124,6 +129,11 @@ public ParquetInstructions readInstructions() { return readInstructions; } + @NotNull + DataFile dataFile() { + return dataFile; + } + /** * When comparing with another {@link IcebergTableParquetLocationKey}, precedence-wise this implementation compares: *
    diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableAdapter.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableAdapter.java index f2b4dbbd0dc..3fc7f9a2062 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableAdapter.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableAdapter.java @@ -401,7 +401,7 @@ public IcebergTable table(@NotNull final IcebergReadInstructions readInstruction new IcebergStaticTableLocationProvider<>( StandaloneTableKey.getInstance(), keyFinder, - new IcebergTableLocationFactory(), + new IcebergTableLocationFactory(this), tableIdentifier); return new IcebergTableImpl( @@ -419,14 +419,14 @@ public IcebergTable table(@NotNull final IcebergReadInstructions readInstruction locationProvider = new IcebergManualRefreshTableLocationProvider<>( StandaloneTableKey.getInstance(), keyFinder, - new IcebergTableLocationFactory(), + new IcebergTableLocationFactory(this), this, tableIdentifier); } else { locationProvider = new IcebergAutoRefreshTableLocationProvider<>( StandaloneTableKey.getInstance(), keyFinder, - new IcebergTableLocationFactory(), + new IcebergTableLocationFactory(this), TableDataRefreshService.getSharedRefreshService(), updatedInstructions.updateMode().autoRefreshMs(), this, diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableWriter.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableWriter.java index 5121134c14f..35290f4a727 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableWriter.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableWriter.java @@ -3,11 +3,14 @@ // package io.deephaven.iceberg.util; +import io.deephaven.api.ColumnName; +import io.deephaven.api.SortColumn; import io.deephaven.base.Pair; import io.deephaven.base.verify.Require; import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.context.QueryScope; import io.deephaven.engine.context.StandaloneQueryScope; +import io.deephaven.engine.liveness.LivenessScopeStack; import io.deephaven.engine.table.ColumnDefinition; import io.deephaven.engine.table.Table; import io.deephaven.engine.table.TableDefinition; @@ -22,10 +25,14 @@ import org.apache.iceberg.DataFiles; import org.apache.iceberg.FileFormat; import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.NullOrder; import org.apache.iceberg.PartitionData; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.SortDirection; +import org.apache.iceberg.SortField; +import org.apache.iceberg.SortOrder; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableProperties; import org.apache.iceberg.Transaction; @@ -38,6 +45,7 @@ import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import java.time.Instant; import java.time.LocalDate; @@ -46,6 +54,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Random; import static io.deephaven.iceberg.base.IcebergUtils.convertToIcebergType; @@ -70,7 +79,7 @@ public class IcebergTableWriter { /** * Store the partition spec of the Iceberg table at the time of creation of this writer instance and use it for all - * writes, so that even if the table spec, the writer will still work. + * writes, so that even if the table spec changes, the writer will still work. */ private final PartitionSpec tableSpec; @@ -93,6 +102,9 @@ public class IcebergTableWriter { /** * Mapping from Iceberg field IDs to Deephaven column names, populated inside the parquet file. + *

    + * Use this map instead of the {@link TableWriterOptions#fieldIdToColumnName()} map after initialization to ensure + * that all columns in the table definition are accounted for. */ private final Map fieldIdToColumnName; @@ -101,6 +113,16 @@ public class IcebergTableWriter { */ private final OutputFileFactory outputFileFactory; + /** + * The sort order to write down for new data files. + */ + private final SortOrder sortOrderToWrite; + + /** + * The names of columns on which the tables will be sorted before writing to Iceberg. + */ + private final Collection sortColumnNames; + /** * Characters to be used for generating random variable names of length {@link #VARIABLE_NAME_LENGTH}. */ @@ -131,6 +153,12 @@ public class IcebergTableWriter { outputFileFactory = OutputFileFactory.builderFor(table, 0, 0) .format(FileFormat.PARQUET) .build(); + + final SortOrderProviderInternal.SortOrderProviderImpl sortOrderProvider = + ((SortOrderProviderInternal.SortOrderProviderImpl) tableWriterOptions.sortOrderProvider()); + sortColumnNames = + computeSortColumns(sortOrderProvider.getSortOrderToUse(table), sortOrderProvider.failOnUnmapped()); + sortOrderToWrite = sortOrderProvider.getSortOrderToWrite(table); } private static TableParquetWriterOptions verifyWriterOptions( @@ -260,6 +288,42 @@ private Map readNameMappingDefault() { return nameMappingDefault; } + private List computeSortColumns(@NotNull final SortOrder sortOrder, final boolean failOnUnmapped) { + if (sortOrder.isUnsorted()) { + return List.of(); + } + final List sortFields = sortOrder.fields(); + final List sortColumns = new ArrayList<>(sortFields.size()); + for (final SortField sortField : sortOrder.fields()) { + final boolean ascending; + if (sortField.nullOrder() == NullOrder.NULLS_FIRST && sortField.direction() == SortDirection.ASC) { + ascending = true; + } else if (sortField.nullOrder() == NullOrder.NULLS_LAST && sortField.direction() == SortDirection.DESC) { + ascending = false; + } else { + if (failOnUnmapped) { + throw new IllegalArgumentException( + "Cannot apply sort order " + sortOrder + " since Deephaven currently only supports " + + "sorting by {ASC, NULLS FIRST} or {DESC, NULLS LAST}"); + } + return List.of(); + } + final int fieldId = sortField.sourceId(); + final String columnName = fieldIdToColumnName.get(fieldId); + if (columnName == null) { + if (failOnUnmapped) { + throw new IllegalArgumentException("Cannot apply sort order " + sortOrder + " since column " + + "corresponding to field ID " + fieldId + " not found in schema"); + } + return List.of(); + } + final SortColumn sortColumn = + ascending ? SortColumn.asc(ColumnName.of(columnName)) : SortColumn.desc(ColumnName.of(columnName)); + sortColumns.add(sortColumn); + } + return sortColumns; + } + /** * Append the provided Deephaven {@link IcebergWriteInstructions#tables()} as new partitions to the existing Iceberg * table in a single snapshot. This method will not perform any compatibility checks between the existing schema and @@ -293,7 +357,7 @@ public List writeDataFiles(@NotNull final IcebergWriteInstructions wri final Pair, List> ret = partitionDataFromPaths(tableSpec, partitionPaths); partitionData = ret.getFirst(); final List dhTableUpdateStrings = ret.getSecond(); - parquetFileInfo = writeParquet(partitionData, dhTableUpdateStrings, writeInstructions); + parquetFileInfo = writeTables(partitionData, dhTableUpdateStrings, writeInstructions); } return dataFilesFromParquet(parquetFileInfo, partitionData); } @@ -438,8 +502,18 @@ private static String generateRandomAlphabetString(final int length) { return stringBuilder.toString(); } + /** + * Write the provided Deephaven tables to parquet files and return a list of {@link CompletedParquetWrite} objects + * for each table written. + * + * @param partitionDataList The list of {@link PartitionData} objects for each table, empty if the table is not + * partitioned. + * @param dhTableUpdateStrings The list of update strings to be applied using {@link Table#updateView}, empty if the + * table is not partitioned. + * @param writeInstructions The instructions for customizations while writing. + */ @NotNull - private List writeParquet( + private List writeTables( @NotNull final List partitionDataList, @NotNull final List dhTableUpdateStrings, @NotNull final IcebergWriteInstructions writeInstructions) { @@ -462,23 +536,59 @@ private List writeParquet( onWriteCompleted, tableDefinition, fieldIdToColumnName); // Write the data to parquet files - for (int idx = 0; idx < dhTables.size(); idx++) { - Table dhTable = dhTables.get(idx); - if (dhTable.numColumns() == 0) { - // Skip writing empty tables with no columns - continue; + final int numTables = dhTables.size(); + for (int idx = 0; idx < numTables; idx++) { + final Table dhTable = dhTables.get(idx); + final PartitionData partitionData; + final String[] dhTableUpdateString; + if (isPartitioned) { + partitionData = partitionDataList.get(idx); + dhTableUpdateString = dhTableUpdateStrings.get(idx); + } else { + partitionData = null; + dhTableUpdateString = null; } + writeTable(dhTable, isPartitioned, partitionData, dhTableUpdateString, parquetInstructions); + } + return parquetFilesWritten; + } + + /** + * Write the provided Deephaven table to a parquet file. + * + * @param dhTable The Deephaven table to write. + * @param isPartitioned Whether the iceberg table is partitioned. + * @param partitionData The partition data for the table, null if the iceberg table is not partitioned. + * @param dhTableUpdateString The update string to apply to the table, null if the iceberg table is not partitioned. + * @param parquetInstructions The instructions for customizations while writing parquet. + */ + private void writeTable( + @NotNull final Table dhTable, + final boolean isPartitioned, + @Nullable final PartitionData partitionData, + @Nullable final String[] dhTableUpdateString, + @NotNull final ParquetInstructions parquetInstructions) { + if (dhTable.numColumns() == 0) { + // Skip writing empty tables with no columns + return; + } + try (final SafeCloseable ignored = LivenessScopeStack.open()) { final String newDataLocation; + Table dhTableToWrite = dhTable; if (isPartitioned) { - newDataLocation = getDataLocation(partitionDataList.get(idx)); - dhTable = dhTable.updateView(dhTableUpdateStrings.get(idx)); + newDataLocation = getDataLocation(Objects.requireNonNull(partitionData)); + dhTableToWrite = dhTableToWrite.updateView(Objects.requireNonNull(dhTableUpdateString)); } else { newDataLocation = getDataLocation(); } - // TODO (deephaven-core#6343): Set writeDefault() values for required columns that not present in the table - ParquetTools.writeTable(dhTable, newDataLocation, parquetInstructions); + + if (!sortColumnNames.isEmpty()) { + dhTableToWrite = dhTableToWrite.sort(sortColumnNames); + } + + // TODO (deephaven-core#6343): Set writeDefault() values for required columns that are not present in table + ParquetTools.writeTable(dhTableToWrite, newDataLocation, parquetInstructions); } - return parquetFilesWritten; } /** @@ -500,8 +610,7 @@ private String getDataLocation() { /** * Commit the changes to the Iceberg table by creating a snapshot. */ - private void commit( - @NotNull final Iterable dataFiles) { + private void commit(@NotNull final Iterable dataFiles) { final Transaction icebergTransaction = table.newTransaction(); // Append the new data files to the table @@ -528,7 +637,8 @@ private List dataFilesFromParquet( .withPath(completedWrite.destination().toString()) .withFormat(FileFormat.PARQUET) .withRecordCount(completedWrite.numRows()) - .withFileSizeInBytes(completedWrite.numBytes()); + .withFileSizeInBytes(completedWrite.numBytes()) + .withSortOrder(sortOrderToWrite); if (partitionSpec.isPartitioned()) { dataFileBuilder.withPartition(partitionDataList.get(idx)); } diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/SchemaProvider.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/SchemaProvider.java index b4b5fbb3a79..02102bc4921 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/SchemaProvider.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/SchemaProvider.java @@ -11,23 +11,39 @@ public interface SchemaProvider { // Static factory methods for creating SchemaProvider instances + + /** + * Use the current schema from the table. + */ static SchemaProvider fromCurrent() { - return new SchemaProviderInternal.CurrentSchemaProvider(); + return SchemaProviderInternal.CurrentSchemaProvider.CURRENT_SCHEMA; } + /** + * Use the schema with the given ID from the table. + */ static SchemaProvider fromSchemaId(final int id) { return new SchemaProviderInternal.IdSchemaProvider(id); } + /** + * Use the given schema directly. + */ static SchemaProvider fromSchema(final Schema schema) { return new SchemaProviderInternal.DirectSchemaProvider(schema); } + /** + * Use the schema from the snapshot with the given ID. + */ static SchemaProvider fromSnapshotId(final int snapshotId) { return new SchemaProviderInternal.SnapshotIdSchemaProvider(snapshotId); } + /** + * Use the schema from the current snapshot of the table. + */ static SchemaProvider fromCurrentSnapshot() { - return new SchemaProviderInternal.CurrentSnapshotSchemaProvider(); + return SchemaProviderInternal.CurrentSnapshotSchemaProvider.CURRENT_SNAPSHOT; } } diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/SchemaProviderInternal.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/SchemaProviderInternal.java index 037405c29bc..22e7fe6334d 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/SchemaProviderInternal.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/SchemaProviderInternal.java @@ -12,7 +12,7 @@ */ class SchemaProviderInternal { - interface SchemaProviderImpl { + interface SchemaProviderImpl extends SchemaProvider { /** * Returns the schema for the given table based on this {@link SchemaProvider}. */ @@ -20,14 +20,16 @@ interface SchemaProviderImpl { } // Implementations of SchemaProvider - static class CurrentSchemaProvider implements SchemaProvider, SchemaProviderImpl { + enum CurrentSchemaProvider implements SchemaProviderImpl { + CURRENT_SCHEMA; + @Override public Schema getSchema(final Table table) { return getCurrentSchema(table); } } - static class IdSchemaProvider implements SchemaProvider, SchemaProviderImpl { + static class IdSchemaProvider implements SchemaProviderImpl { private final int schemaId; IdSchemaProvider(final int schemaId) { @@ -40,7 +42,7 @@ public Schema getSchema(final Table table) { } } - static class DirectSchemaProvider implements SchemaProvider, SchemaProviderImpl { + static class DirectSchemaProvider implements SchemaProviderImpl { private final Schema schema; DirectSchemaProvider(final Schema schema) { @@ -53,7 +55,7 @@ public Schema getSchema(final Table table) { } } - static class SnapshotIdSchemaProvider implements SchemaProvider, SchemaProviderImpl { + static class SnapshotIdSchemaProvider implements SchemaProviderImpl { private final int snapshotId; SnapshotIdSchemaProvider(final int snapshotId) { @@ -66,7 +68,9 @@ public Schema getSchema(final Table table) { } } - static class CurrentSnapshotSchemaProvider implements SchemaProvider, SchemaProviderImpl { + enum CurrentSnapshotSchemaProvider implements SchemaProviderImpl { + CURRENT_SNAPSHOT; + @Override public Schema getSchema(final Table table) { return getSchemaForCurrentSnapshot(table); diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/SortOrderProvider.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/SortOrderProvider.java new file mode 100644 index 00000000000..d5203ce4fb7 --- /dev/null +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/SortOrderProvider.java @@ -0,0 +1,65 @@ +// +// Copyright (c) 2016-2025 Deephaven Data Labs and Patent Pending +// +package io.deephaven.iceberg.util; + +import org.apache.iceberg.SortOrder; + +/** + * A specification for providing {@link org.apache.iceberg.SortOrder} while writing to an iceberg table. + */ +public interface SortOrderProvider { + + // Static factory methods for creating SortOrderProvider instances + + /** + * Do not sort the data while writing new data to the iceberg table. + */ + static SortOrderProvider unsorted() { + return SortOrderProviderInternal.DisableSorting.DISABLE_SORTING; + } + + /** + * Use the default {@link org.apache.iceberg.Table#sortOrder()} of the table while writing new data. If no sort + * order is set on the table, no sorting will be done. + */ + static SortOrderProvider useTableDefault() { + return new SortOrderProviderInternal.TableDefaultSortOrderProvider(); + } + + /** + * Use the sort order with the given ID to sort new data while writing to the iceberg table. + */ + static SortOrderProvider fromSortId(final int id) { + return new SortOrderProviderInternal.IdSortOrderProvider(id); + } + + /** + * Use the given sort order directly to sort new data while writing to the iceberg table. Note that the provided + * sort order must either have a valid {@link SortOrder#orderId()}, else this provider should be chained with an + * {@link #withId(int)} call to set a valid order ID. + */ + static SortOrderProvider fromSortOrder(final SortOrder sortOrder) { + return new SortOrderProviderInternal.DirectSortOrderProvider(sortOrder); + } + + /** + * Returns a sort order provider that delegates to this provider for computing the columns to sort on, but writes a + * different sort order ID to the iceberg table. Note that the sort order returned by the caller must + * {@link SortOrder#satisfies(SortOrder) satisfy} the sort order corresponding to the provided sort order ID. + *

    + * For example, this provider might return fields {A, B, C} to sort on, but the ID written to iceberg corresponds to + * sort order with fields {A, B}. + * + * @param sortOrderId the sort order ID to write to the iceberg table + */ + SortOrderProvider withId(final int sortOrderId); + + /** + * Returns a sort order provider which will fail, if for any reason, the sort order cannot be applied to the tables + * being written. By default, the provider will not fail if the sort order cannot be applied. + * + * @param failOnUnmapped whether to fail if the sort order cannot be applied to the tables being written + */ + SortOrderProvider withFailOnUnmapped(boolean failOnUnmapped); +} diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/SortOrderProviderInternal.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/SortOrderProviderInternal.java new file mode 100644 index 00000000000..4e8e5b3d480 --- /dev/null +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/SortOrderProviderInternal.java @@ -0,0 +1,212 @@ +// +// Copyright (c) 2016-2025 Deephaven Data Labs and Patent Pending +// +package io.deephaven.iceberg.util; + +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.Table; +import org.jetbrains.annotations.NotNull; + +/** + * Internal class containing the implementations of {@link SortOrderProvider}. + */ +class SortOrderProviderInternal { + + interface SortOrderProviderImpl extends SortOrderProvider { + /** + * Returns the {@link SortOrder} to use for sorting the data when writing to the provided iceberg table. + */ + @NotNull + SortOrder getSortOrderToUse(@NotNull Table table); + + /** + * Returns the sort order to write down to the iceberg table when appending new data. This may be different from + * the {@code getSortOrderToUse(table)} as this sort order may be customized. But + * {@link #getSortOrderToUse(Table)} should always {@link SortOrder#satisfies(SortOrder) satisfy} this sort + * order. + */ + @NotNull + default SortOrder getSortOrderToWrite(@NotNull final Table table) { + return getSortOrderToUse(table); + } + + /** + * Whether to fail if the sort order cannot be applied to the tables being written. + */ + boolean failOnUnmapped(); + + @NotNull + default SortOrderProvider withId(final int sortOrderId) { + return new SortOrderProviderInternal.DelegatingSortOrderProvider(this, sortOrderId); + } + } + + // Implementations of SortOrderProvider + enum DisableSorting implements SortOrderProviderImpl { + DISABLE_SORTING; + + @Override + @NotNull + public SortOrder getSortOrderToUse(@NotNull final Table table) { + return SortOrder.unsorted(); + } + + @Override + @NotNull + public SortOrderProvider withFailOnUnmapped(final boolean failOnUnmapped) { + throw new UnsupportedOperationException("Cannot set failOnUnmapped for unsorted sort order provider"); + } + + @Override + public boolean failOnUnmapped() { + return true; // Should never fail as we are unsorted + } + } + + static class TableDefaultSortOrderProvider implements SortOrderProviderImpl { + private boolean failOnUnmapped; + + TableDefaultSortOrderProvider() {} + + @Override + @NotNull + public SortOrder getSortOrderToUse(@NotNull final Table table) { + final SortOrder sortOrder = table.sortOrder(); + return sortOrder != null ? sortOrder : SortOrder.unsorted(); + } + + @Override + public SortOrderProvider withFailOnUnmapped(final boolean failOnUnmapped) { + this.failOnUnmapped = failOnUnmapped; + return this; + } + + @Override + public boolean failOnUnmapped() { + return failOnUnmapped; + } + } + + static class IdSortOrderProvider implements SortOrderProviderImpl { + private boolean failOnUnmapped; + private final int sortOrderId; + + IdSortOrderProvider(final int sortOrderId) { + this.sortOrderId = sortOrderId; + } + + @Override + @NotNull + public SortOrder getSortOrderToUse(@NotNull final Table table) { + return getSortOrderForId(table, sortOrderId); + } + + @Override + public SortOrderProvider withFailOnUnmapped(final boolean failOnUnmapped) { + this.failOnUnmapped = failOnUnmapped; + return this; + } + + @Override + public boolean failOnUnmapped() { + return failOnUnmapped; + } + } + + static class DirectSortOrderProvider implements SortOrderProviderImpl { + private boolean failOnUnmapped; + private final SortOrder sortOrder; + + DirectSortOrderProvider(@NotNull final SortOrder sortOrder) { + this.sortOrder = sortOrder; + } + + @Override + @NotNull + public SortOrder getSortOrderToUse(@NotNull final Table table) { + return sortOrder; + } + + @Override + @NotNull + public SortOrder getSortOrderToWrite(@NotNull final Table table) { + // Check if provided sort order is included in the table's sort orders + if (!sortOrder.equals(getSortOrderForId(table, sortOrder.orderId()))) { + throw new IllegalArgumentException("Provided sort order with id " + sortOrder.orderId() + " is not " + + "included in the table's sort orders"); + } + return sortOrder; + } + + @Override + public SortOrderProvider withFailOnUnmapped(final boolean failOnUnmapped) { + this.failOnUnmapped = failOnUnmapped; + return this; + } + + @Override + public boolean failOnUnmapped() { + return failOnUnmapped; + } + } + + /** + * A {@link SortOrderProvider} that delegates to another {@link SortOrderProvider} for computing the sort order, + * while providing a custom sort order ID. + */ + private static class DelegatingSortOrderProvider implements SortOrderProviderImpl { + private SortOrderProvider delegateProvider; + private final int sortOrderId; + + DelegatingSortOrderProvider(final SortOrderProvider sortOrderProvider, final int sortOrderId) { + this.delegateProvider = sortOrderProvider; + this.sortOrderId = sortOrderId; + } + + @Override + @NotNull + public SortOrder getSortOrderToUse(final @NotNull Table table) { + return ((SortOrderProviderImpl) delegateProvider).getSortOrderToUse(table); + } + + @Override + @NotNull + public SortOrder getSortOrderToWrite(final @NotNull Table table) { + final SortOrder sortOrderFromDelegate = getSortOrderToUse(table); + final SortOrder sortOrderForId = getSortOrderForId(table, sortOrderId); + if (!sortOrderFromDelegate.satisfies(sortOrderForId)) { + throw new IllegalArgumentException( + "Provided sort order " + sortOrderFromDelegate + " does not satisfy the table's sort order " + + "with id " + sortOrderId + ": " + sortOrderForId); + } + return sortOrderForId; + } + + @Override + @NotNull + public SortOrderProvider withId(final int sortOrderId) { + return new DelegatingSortOrderProvider(delegateProvider, sortOrderId); + } + + @Override + public SortOrderProvider withFailOnUnmapped(final boolean failOnUnmapped) { + delegateProvider = delegateProvider.withFailOnUnmapped(failOnUnmapped); + return this; + } + + @Override + public boolean failOnUnmapped() { + return ((SortOrderProviderImpl) delegateProvider).failOnUnmapped(); + } + } + + // -------------------------------------------------------------------------------------------------- + + private static SortOrder getSortOrderForId(final Table table, final int sortOrderId) { + if (!table.sortOrders().containsKey(sortOrderId)) { + throw new IllegalArgumentException("Sort order with ID " + sortOrderId + " not found for table " + + table); + } + return table.sortOrders().get(sortOrderId); + } +} diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/TableWriterOptions.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/TableWriterOptions.java index 5f37c400b99..a6f6b27f61b 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/TableWriterOptions.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/TableWriterOptions.java @@ -34,9 +34,7 @@ public abstract class TableWriterOptions { * {@link #fieldIdToColumnName()} to map Deephaven columns from {@link #tableDefinition()} to Iceberg columns. If * {@link #fieldIdToColumnName()} is not provided, the mapping is done by column name. *

    - * Users can specify how to extract the schema in multiple ways (by schema ID, snapshot ID, etc.). - *

    - * Defaults to {@link SchemaProvider#fromCurrent()}, which means use the current schema from the table. + * Defaults to {@link SchemaProvider#fromCurrent()}. */ @Value.Default public SchemaProvider schemaProvider() { @@ -61,6 +59,18 @@ Map dhColumnNameToFieldId() { return reversedMap; } + /** + * Used for providing {@link org.apache.iceberg.SortOrder} to sort new data while writing to an iceberg table using + * this writer. Note that we select the sort order of the Table at the time the writer is constructed, and it does + * not change if the table's sort order changes. + *

    + * Defaults to {@link SortOrderProvider#useTableDefault()}. + */ + @Value.Default + public SortOrderProvider sortOrderProvider() { + return SortOrderProvider.useTableDefault(); + } + // @formatter:off interface Builder> { // @formatter:on @@ -73,6 +83,8 @@ interface Builder entries); + + INSTRUCTIONS_BUILDER sortOrderProvider(SortOrderProvider sortOrderProvider); } /** diff --git a/extensions/iceberg/src/test/java/io/deephaven/iceberg/TestCatalog/IcebergRefreshingTestTable.java b/extensions/iceberg/src/test/java/io/deephaven/iceberg/TestCatalog/IcebergRefreshingTestTable.java index ab4d15ce4a9..bc1c244a1ed 100644 --- a/extensions/iceberg/src/test/java/io/deephaven/iceberg/TestCatalog/IcebergRefreshingTestTable.java +++ b/extensions/iceberg/src/test/java/io/deephaven/iceberg/TestCatalog/IcebergRefreshingTestTable.java @@ -72,7 +72,6 @@ public SortOrder sortOrder() { @Override public Map sortOrders() { return testTable.sortOrders(); - } @Override diff --git a/extensions/iceberg/src/test/java/io/deephaven/iceberg/junit5/SqliteCatalogBase.java b/extensions/iceberg/src/test/java/io/deephaven/iceberg/junit5/SqliteCatalogBase.java index b1b1062ae05..12196aa64c3 100644 --- a/extensions/iceberg/src/test/java/io/deephaven/iceberg/junit5/SqliteCatalogBase.java +++ b/extensions/iceberg/src/test/java/io/deephaven/iceberg/junit5/SqliteCatalogBase.java @@ -4,16 +4,22 @@ package io.deephaven.iceberg.junit5; import io.deephaven.UncheckedDeephavenException; +import io.deephaven.api.ColumnName; +import io.deephaven.api.SortColumn; import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.table.ColumnDefinition; import io.deephaven.engine.table.Table; import io.deephaven.engine.table.TableDefinition; import io.deephaven.engine.table.impl.PartitionAwareSourceTable; +import io.deephaven.engine.table.impl.locations.TableLocation; +import io.deephaven.engine.table.impl.locations.impl.StandaloneTableKey; import io.deephaven.engine.table.impl.select.FormulaEvaluationException; import io.deephaven.engine.testutil.ControlledUpdateGraph; import io.deephaven.engine.util.TableTools; import io.deephaven.iceberg.base.IcebergUtils; import io.deephaven.engine.testutil.junit4.EngineCleanup; +import io.deephaven.iceberg.location.IcebergTableParquetLocation; +import io.deephaven.iceberg.location.IcebergTableParquetLocationKey; import io.deephaven.iceberg.sqlite.SqliteHelper; import io.deephaven.iceberg.util.IcebergCatalogAdapter; import io.deephaven.iceberg.util.IcebergReadInstructions; @@ -22,15 +28,21 @@ import io.deephaven.iceberg.util.IcebergTableWriter; import io.deephaven.iceberg.util.IcebergUpdateMode; import io.deephaven.iceberg.util.IcebergWriteInstructions; +import io.deephaven.iceberg.util.SortOrderProvider; import io.deephaven.iceberg.util.TableParquetWriterOptions; import io.deephaven.parquet.table.ParquetInstructions; import io.deephaven.parquet.table.ParquetTools; import io.deephaven.parquet.table.location.ParquetTableLocationKey; import io.deephaven.qst.type.Type; +import io.deephaven.util.channel.SeekableChannelsProvider; +import io.deephaven.util.channel.SeekableChannelsProviderLoader; import org.apache.iceberg.AppendFiles; import org.apache.iceberg.DataFile; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.NullOrder; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SortOrder; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.types.Types; @@ -47,6 +59,7 @@ import java.net.URISyntaxException; import java.nio.file.Path; import java.time.LocalDate; +import java.util.ArrayList; import java.util.HashMap; import java.util.Map; @@ -55,6 +68,11 @@ import static io.deephaven.engine.testutil.TstUtils.assertTableEquals; import static io.deephaven.engine.util.TableTools.col; import static io.deephaven.engine.util.TableTools.doubleCol; +import static io.deephaven.engine.util.TableTools.intCol; +import static io.deephaven.engine.util.TableTools.longCol; +import static io.deephaven.engine.util.TableTools.merge; +import static io.deephaven.iceberg.base.IcebergUtils.dataFileUri; +import static io.deephaven.iceberg.base.IcebergUtils.locationUri; import static org.apache.parquet.schema.LogicalTypeAnnotation.intType; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; @@ -1036,4 +1054,298 @@ void testAutoRefreshingPartitionedAppend() throws InterruptedException { final Table expected2 = TableTools.merge(expected, part3.update("PC = `cat`")); assertTableEquals(expected2, fromIcebergRefreshing.select()); } + + /** + * Verify that the sort order for the data files in the table match the expected sort order. + */ + private void verifySortOrder( + final IcebergTableAdapter tableAdapter, + final TableIdentifier tableIdentifier, + final List> expectedSortOrders) { + final org.apache.iceberg.Table icebergTable = tableAdapter.icebergTable(); + final String uriScheme = locationUri(icebergTable).getScheme(); + final SeekableChannelsProvider seekableChannelsProvider = + SeekableChannelsProviderLoader.getInstance().load(uriScheme, dataInstructions()); + + final Map> manifestToDataFiles = + IcebergUtils.manifestToDataFiles(icebergTable, icebergTable.currentSnapshot()); + final List> actualSortOrders = new ArrayList<>(); + for (final Map.Entry> entry : manifestToDataFiles.entrySet()) { + final ManifestFile manifestFile = entry.getKey(); + final List dataFiles = entry.getValue(); + for (final DataFile dataFile : dataFiles) { + final TableLocation tableLocation = new IcebergTableParquetLocation( + tableAdapter, + StandaloneTableKey.getInstance(), + new IcebergTableParquetLocationKey( + null, null, tableIdentifier, manifestFile, dataFile, + dataFileUri(icebergTable, dataFile), 0, Map.of(), ParquetInstructions.EMPTY, + seekableChannelsProvider), + ParquetInstructions.EMPTY); + actualSortOrders.add(tableLocation.getSortedColumns()); + } + } + assertThat(actualSortOrders).containsExactlyInAnyOrderElementsOf(expectedSortOrders); + } + + @Test + void testApplyDefaultSortOrder() { + final Table source = TableTools.newTable( + intCol("intCol", 15, 0, 32, 33, 19), + doubleCol("doubleCol", 10.5, 2.5, 3.5, 40.5, 0.5), + longCol("longCol", 20L, 50L, 0L, 10L, 5L)); + final TableIdentifier tableIdentifier = TableIdentifier.parse("MyNamespace.MyTable"); + final IcebergTableAdapter tableAdapter = catalogAdapter.createTable(tableIdentifier, source.getDefinition()); + final IcebergTableWriter tableWriterWithoutSorting = tableAdapter.tableWriter(writerOptionsBuilder() + .tableDefinition(source.getDefinition()) + .build()); + tableWriterWithoutSorting.append(IcebergWriteInstructions.builder() + .addTables(source) + .build()); + + // Verify that the data file is not sorted + verifySortOrder(tableAdapter, tableIdentifier, List.of(List.of())); + + // Update the default sort order of the underlying iceberg table + final org.apache.iceberg.Table icebergTable = tableAdapter.icebergTable(); + assertThat(icebergTable.sortOrder().fields()).hasSize(0); + icebergTable.replaceSortOrder().asc("intCol").commit(); + assertThat(icebergTable.sortOrder().fields()).hasSize(1); + + // Append more unsorted data to the table with enforcing sort order + final IcebergTableWriter tableWriterWithSorting = tableAdapter.tableWriter(writerOptionsBuilder() + .tableDefinition(source.getDefinition()) + .sortOrderProvider(SortOrderProvider.useTableDefault()) + .build()); + tableWriterWithSorting.append(IcebergWriteInstructions.builder() + .addTables(source) + .build()); + + // Verify that the new data file is sorted + verifySortOrder(tableAdapter, tableIdentifier, List.of( + List.of(), + List.of(SortColumn.asc(ColumnName.of("intCol"))))); + + // Append more unsorted data to the table without enforcing sort order + tableWriterWithoutSorting.append(IcebergWriteInstructions.builder() + .addTables(source) + .build()); + + // Verify that the new data file is not sorted + verifySortOrder(tableAdapter, tableIdentifier, List.of( + List.of(), + List.of(SortColumn.asc(ColumnName.of("intCol"))), + List.of())); + } + + private IcebergTableAdapter buildTableToTestSortOrder( + final TableIdentifier tableIdentifier, + final TableDefinition tableDefinition) { + final IcebergTableAdapter tableAdapter = catalogAdapter.createTable(tableIdentifier, tableDefinition); + + final org.apache.iceberg.Table icebergTable = tableAdapter.icebergTable(); + assertThat(icebergTable.sortOrders()).hasSize(1); // Default unsorted sort order + assertThat(icebergTable.sortOrder().fields()).hasSize(0); + + icebergTable.replaceSortOrder().asc("intCol").commit(); + icebergTable.replaceSortOrder().asc("doubleCol").desc("longCol").commit(); + assertThat(icebergTable.sortOrders()).hasSize(3); + assertThat(icebergTable.sortOrder().fields()).hasSize(2); + return tableAdapter; + } + + @Test + void testSortByDefaultSortOrder() { + final TableIdentifier tableIdentifier = TableIdentifier.parse("MyNamespace.MyTable"); + final Table source = TableTools.newTable( + intCol("intCol", 15, 0, 32, 33, 19), + doubleCol("doubleCol", 10.5, 2.5, 3.5, 40.5, 0.5), + longCol("longCol", 20L, 50L, 0L, 10L, 5L)); + final IcebergTableAdapter tableAdapter = buildTableToTestSortOrder(tableIdentifier, source.getDefinition()); + + final IcebergTableWriter tableWriter = tableAdapter.tableWriter(writerOptionsBuilder() + .tableDefinition(source.getDefinition()) + .sortOrderProvider(SortOrderProvider.useTableDefault()) + .build()); + tableWriter.append(IcebergWriteInstructions.builder() + .addTables(source) + .build()); + final List expectedSortOrder = + List.of(SortColumn.asc(ColumnName.of("doubleCol")), SortColumn.desc(ColumnName.of("longCol"))); + verifySortOrder(tableAdapter, tableIdentifier, List.of(expectedSortOrder)); + final Table fromIceberg = tableAdapter.table(); + final Table expected = source.sort(expectedSortOrder); + assertTableEquals(expected, fromIceberg); + } + + @Test + void testSortBySortOrderId() { + final TableIdentifier tableIdentifier = TableIdentifier.parse("MyNamespace.MyTable"); + final Table source = TableTools.newTable( + intCol("intCol", 15, 0, 32, 33, 19), + doubleCol("doubleCol", 10.5, 2.5, 3.5, 40.5, 0.5), + longCol("longCol", 20L, 50L, 0L, 10L, 5L)); + final IcebergTableAdapter tableAdapter = buildTableToTestSortOrder(tableIdentifier, source.getDefinition()); + + final IcebergTableWriter tableWriter = tableAdapter.tableWriter(writerOptionsBuilder() + .tableDefinition(source.getDefinition()) + .sortOrderProvider(SortOrderProvider.fromSortId(1)) + .build()); + tableWriter.append(IcebergWriteInstructions.builder() + .addTables(source) + .build()); + final List expectedSortOrder = List.of(SortColumn.asc(ColumnName.of("intCol"))); + verifySortOrder(tableAdapter, tableIdentifier, List.of(expectedSortOrder)); + final Table fromIceberg = tableAdapter.table(); + final Table expected = source.sort(expectedSortOrder); + assertTableEquals(expected, fromIceberg); + } + + @Test + void testSortByDisableSorting() { + final TableIdentifier tableIdentifier = TableIdentifier.parse("MyNamespace.MyTable"); + final Table source = TableTools.newTable( + intCol("intCol", 15, 0, 32, 33, 19), + doubleCol("doubleCol", 10.5, 2.5, 3.5, 40.5, 0.5), + longCol("longCol", 20L, 50L, 0L, 10L, 5L)); + final IcebergTableAdapter tableAdapter = buildTableToTestSortOrder(tableIdentifier, source.getDefinition()); + + try { + tableAdapter.tableWriter(writerOptionsBuilder() + .tableDefinition(source.getDefinition()) + .sortOrderProvider(SortOrderProvider.unsorted().withFailOnUnmapped(true)) + .build()); + failBecauseExceptionWasNotThrown(UnsupportedOperationException.class); + } catch (UnsupportedOperationException e) { + assertThat(e).hasMessageContaining("Cannot set failOnUnmapped for unsorted sort order provider"); + } + + final IcebergTableWriter tableWriter = tableAdapter.tableWriter(writerOptionsBuilder() + .tableDefinition(source.getDefinition()) + .sortOrderProvider(SortOrderProvider.unsorted()) + .build()); + tableWriter.append(IcebergWriteInstructions.builder() + .addTables(source) + .build()); + final List expectedSortOrder = List.of(); + verifySortOrder(tableAdapter, tableIdentifier, List.of(expectedSortOrder)); + final Table fromIceberg = tableAdapter.table(); + assertTableEquals(source, fromIceberg); + } + + @Test + void testSortBySortOrder() { + final TableIdentifier tableIdentifier = TableIdentifier.parse("MyNamespace.MyTable"); + final Table source = TableTools.newTable( + intCol("intCol", 15, 0, 32, 33, 19), + doubleCol("doubleCol", 10.5, 2.5, 3.5, 40.5, 0.5), + longCol("longCol", 20L, 50L, 0L, 10L, 5L)); + final IcebergTableAdapter tableAdapter = buildTableToTestSortOrder(tableIdentifier, source.getDefinition()); + + final org.apache.iceberg.Table icebergTable = tableAdapter.icebergTable(); + final SortOrder sortOrder = icebergTable.sortOrders().get(1); + final IcebergTableWriter tableWriter = tableAdapter.tableWriter(writerOptionsBuilder() + .tableDefinition(source.getDefinition()) + .sortOrderProvider(SortOrderProvider.fromSortOrder(sortOrder)) + .build()); + tableWriter.append(IcebergWriteInstructions.builder() + .addTables(source) + .build()); + final List expectedSortOrder = List.of(SortColumn.asc(ColumnName.of("intCol"))); + verifySortOrder(tableAdapter, tableIdentifier, List.of(expectedSortOrder)); + final Table fromIceberg = tableAdapter.table(); + final Table expected = source.sort(expectedSortOrder); + assertTableEquals(expected, fromIceberg); + } + + @Test + void testSortByDelegatingSortOrder() { + final TableIdentifier tableIdentifier = TableIdentifier.parse("MyNamespace.MyTable"); + final Table source = TableTools.newTable( + intCol("intCol", 15, 0, 32, 33, 19), + doubleCol("doubleCol", 10.5, 2.5, 3.5, 40.5, 0.5), + longCol("longCol", 20L, 50L, 0L, 10L, 5L)); + final IcebergTableAdapter tableAdapter = buildTableToTestSortOrder(tableIdentifier, source.getDefinition()); + + final org.apache.iceberg.Table icebergTable = tableAdapter.icebergTable(); + final SortOrder sortOrder = SortOrder.builderFor(icebergTable.schema()) + .asc("doubleCol") + .desc("longCol") + .asc("intCol") + .build(); + + try { + tableAdapter.tableWriter(writerOptionsBuilder() + .tableDefinition(source.getDefinition()) + .sortOrderProvider(SortOrderProvider.fromSortOrder(sortOrder)) + .build()); + failBecauseExceptionWasNotThrown(IllegalArgumentException.class); + } catch (IllegalArgumentException e) { + assertThat(e).hasMessageContaining( + "Provided sort order with id 1 is not included in the table's sort orders"); + } + + try { + tableAdapter.tableWriter(writerOptionsBuilder() + .tableDefinition(source.getDefinition()) + .sortOrderProvider(SortOrderProvider.fromSortOrder(sortOrder).withId(1)) + .build()); + failBecauseExceptionWasNotThrown(IllegalArgumentException.class); + } catch (IllegalArgumentException e) { + assertThat(e).hasMessageContaining("does not satisfy the table's sort order with id 1"); + } + + final IcebergTableWriter tableWriter = tableAdapter.tableWriter(writerOptionsBuilder() + .tableDefinition(source.getDefinition()) + .sortOrderProvider(SortOrderProvider.fromSortOrder(sortOrder).withId(2)) + .build()); + tableWriter.append(IcebergWriteInstructions.builder() + .addTables(source) + .build()); + final List expectedSortOrder = + List.of(SortColumn.asc(ColumnName.of("doubleCol")), SortColumn.desc(ColumnName.of("longCol"))); + verifySortOrder(tableAdapter, tableIdentifier, List.of(expectedSortOrder)); + final Table fromIceberg = tableAdapter.table(); + final Table expected = source.sort(expectedSortOrder); + assertTableEquals(expected, fromIceberg); + } + + @Test + void testFailIfSortOrderUnmapped() { + final TableIdentifier tableIdentifier = TableIdentifier.parse("MyNamespace.MyTable"); + final Table source = TableTools.newTable( + intCol("intCol", 15, 0, 32, 33, 19), + doubleCol("doubleCol", 10.5, 2.5, 3.5, 40.5, 0.5), + longCol("longCol", 20L, 50L, 0L, 10L, 5L)); + final IcebergTableAdapter tableAdapter = buildTableToTestSortOrder(tableIdentifier, source.getDefinition()); + + final org.apache.iceberg.Table icebergTable = tableAdapter.icebergTable(); + + // Add a sort order which cannot be applied by deephaven + icebergTable.replaceSortOrder().asc("doubleCol", NullOrder.NULLS_LAST).commit(); + + + try { + tableAdapter.tableWriter(writerOptionsBuilder() + .tableDefinition(source.getDefinition()) + .sortOrderProvider(SortOrderProvider.useTableDefault().withFailOnUnmapped(true)) + .build()); + failBecauseExceptionWasNotThrown(IllegalArgumentException.class); + } catch (IllegalArgumentException e) { + assertThat(e).hasMessageContaining("Deephaven currently only supports sorting by " + + "{ASC, NULLS FIRST} or {DESC, NULLS LAST}"); + } + + final IcebergTableWriter tableWriter = tableAdapter.tableWriter(writerOptionsBuilder() + .tableDefinition(source.getDefinition()) + .sortOrderProvider(SortOrderProvider.useTableDefault()) + .build()); + tableWriter.append(IcebergWriteInstructions.builder() + .addTables(source) + .build()); + // Empty sort order since the sort order cannot be applied + verifySortOrder(tableAdapter, tableIdentifier, List.of(List.of())); + final Table fromIceberg = tableAdapter.table(); + assertTableEquals(source, fromIceberg); + } } diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProviderPlugin.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProviderPlugin.java index 4612299e8ba..f8836ff489a 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProviderPlugin.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProviderPlugin.java @@ -9,8 +9,6 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import java.net.URI; - /** * {@link SeekableChannelsProviderPlugin} implementation used for reading files from S3. */ diff --git a/py/server/deephaven/experimental/iceberg.py b/py/server/deephaven/experimental/iceberg.py index 3265ab65e2a..fb2e95f85e5 100644 --- a/py/server/deephaven/experimental/iceberg.py +++ b/py/server/deephaven/experimental/iceberg.py @@ -18,6 +18,7 @@ _JIcebergReadInstructions = jpy.get_type("io.deephaven.iceberg.util.IcebergReadInstructions") _JIcebergWriteInstructions = jpy.get_type("io.deephaven.iceberg.util.IcebergWriteInstructions") _JSchemaProvider = jpy.get_type("io.deephaven.iceberg.util.SchemaProvider") +_JSortOrderProvider = jpy.get_type("io.deephaven.iceberg.util.SortOrderProvider") _JTableParquetWriterOptions = jpy.get_type("io.deephaven.iceberg.util.TableParquetWriterOptions") _JIcebergCatalogAdapter = jpy.get_type("io.deephaven.iceberg.util.IcebergCatalogAdapter") _JIcebergTableAdapter = jpy.get_type("io.deephaven.iceberg.util.IcebergTableAdapter") @@ -38,7 +39,7 @@ class IcebergUpdateMode(JObjectWrapper): """ - :class:`.IcebergUpdateMode` specifies the update mode for an Iceberg table to be loaded into Deephaven. The modes + `IcebergUpdateMode` specifies the update mode for an Iceberg table to be loaded into Deephaven. The modes are: - :py:func:`static() `: The table is loaded once and does not change @@ -86,7 +87,7 @@ def j_object(self) -> jpy.JType: class IcebergReadInstructions(JObjectWrapper): """ - :class:`.IcebergReadInstructions` specifies the instructions for reading an Iceberg table into Deephaven. These + `IcebergReadInstructions` specifies the instructions for reading an Iceberg table into Deephaven. These include column rename instructions and table definitions, as well as special data instructions for loading data files from the cloud. """ @@ -148,7 +149,7 @@ def j_object(self) -> jpy.JType: class IcebergWriteInstructions(JObjectWrapper): """ - :class:`.IcebergWriteInstructions` provides instructions intended for writing deephaven tables as partitions to Iceberg + `IcebergWriteInstructions` provides instructions intended for writing deephaven tables as partitions to Iceberg tables. """ @@ -163,13 +164,13 @@ def __init__(self, Args: tables (Union[Table, Sequence[Table]]): The deephaven tables to write. partition_paths (Optional[Union[str, Sequence[str]]]): The partition paths where each table will be written. - For example, if the iceberg table is partitioned by "year" and "month", a partition path could be + For example, if the Iceberg table is partitioned by "year" and "month", a partition path could be "year=2021/month=01". - If writing to a partitioned iceberg table, users must provide partition path for each table in tables + If writing to a partitioned Iceberg table, users must provide partition path for each table in tables argument in the same order. Else when writing to a non-partitioned table, users should not provide any partition paths. Defaults to `None`, which means the deephaven tables will be written to the root data directory of the - iceberg table. + Iceberg table. Raises: DHError: If unable to build the instructions object. @@ -203,7 +204,7 @@ def j_object(self) -> jpy.JType: class SchemaProvider(JObjectWrapper): """ - :class:`.SchemaProvider` is used to extract the schema from an Iceberg table. Users can specify multiple ways to do + `SchemaProvider` is used to extract the schema from an Iceberg table. Users can specify multiple ways to do so, for example, by schema ID, snapshot ID, current schema, etc. This can be useful for passing a schema when writing to an Iceberg table. """ @@ -212,10 +213,10 @@ class SchemaProvider(JObjectWrapper): def __init__(self, _j_object: jpy.JType): """ - Initializes the :class:`.SchemaProvider` object. + Initializes the `SchemaProvider` object. Args: - _j_object (SchemaProvider): the Java :class:`.SchemaProvider` object. + _j_object (SchemaProvider): the Java `SchemaProvider` object. """ self._j_object = _j_object @@ -229,7 +230,7 @@ def from_current(cls) -> 'SchemaProvider': Used for extracting the current schema from the table. Returns: - the SchemaProvider object. + the `SchemaProvider` object. """ return cls(_JSchemaProvider.fromCurrent()) @@ -242,7 +243,7 @@ def from_schema_id(cls, schema_id: int) -> 'SchemaProvider': schema_id (int): the schema id to use. Returns: - the :class:`.SchemaProvider` object. + the `SchemaProvider` object. """ return cls(_JSchemaProvider.fromSchemaId(schema_id)) @@ -255,7 +256,7 @@ def from_snapshot_id(cls, snapshot_id: int) -> 'SchemaProvider': snapshot_id (int): the snapshot id to use. Returns: - the :class:`.SchemaProvider` object. + the `SchemaProvider` object. """ return cls(_JSchemaProvider.fromSnapshotId(snapshot_id)) @@ -270,9 +271,95 @@ def from_current_snapshot(cls) -> 'SchemaProvider': return cls(_JSchemaProvider.fromCurrentSnapshot()) +class SortOrderProvider(JObjectWrapper): + """ + `SortOrderProvider` is used to specify the sort order for new data when writing to an Iceberg table. More details + about sort order can be found in the Iceberg spec: https://iceberg.apache.org/spec/#sorting. + Users can specify the sort order in multiple ways, such as by providing a sort ID or using the table's default sort + order. This class consists of factory methods to create different sort order providers. + """ + + j_object_type = _JSortOrderProvider + + def __init__(self, _j_object: jpy.JType): + """ + Initializes the `SortOrderProvider` object. + + Args: + _j_object (SortOrderProvider): the Java `SortOrderProvider` object. + """ + self._j_object = _j_object + + @property + def j_object(self) -> jpy.JType: + return self._j_object + + @classmethod + def unsorted(cls) -> 'SortOrderProvider': + """ + Used to disable sorting while writing new data to the Iceberg table. + + Returns: + the `SortOrderProvider` object. + """ + return cls(_JSortOrderProvider.unsorted()) + + @classmethod + def use_table_default(cls) -> 'SortOrderProvider': + """ + Use the default sort order of the table while writing new data. If no sort order is set on the table, no sorting + will be done. + + Returns: + the `SortOrderProvider` object. + """ + return cls(_JSortOrderProvider.useTableDefault()) + + @classmethod + def from_sort_id(cls, sort_order_id: int) -> 'SortOrderProvider': + """ + Use the sort order with the given ID to sort new data while writing to the Iceberg table. + + Args: + sort_order_id (int): the id of the sort order to use. + + Returns: + the `.SortOrderProvider` object. + """ + return cls(_JSortOrderProvider.fromSortId(sort_order_id)) + + def with_id(self, sort_order_id: int) -> 'SortOrderProvider': + """ + Returns a sort order provider that uses the current provider to determine the columns to sort on, but writes a + different sort order ID to the Iceberg table. + For example, this provider might sort by columns {A, B, C}, but the ID written to Iceberg corresponds to a sort + order with columns {A, B}. + + Args: + sort_order_id (int): the sort order ID to write to the Iceberg table. + + Returns: + the `SortOrderProvider` object. + """ + return SortOrderProvider(self._j_object.withId(sort_order_id)) + + def with_fail_on_unmapped(self, fail_on_unmapped: bool) -> 'SortOrderProvider': + """ + Returns a sort order provider that will fail if the sort order cannot be applied to the tables being written. + By default, if the sort order cannot be applied, the tables will be written without sorting. + + Args: + fail_on_unmapped: whether to fail if the sort order cannot be applied to the tables being written + + Returns: + the `SortOrderProvider` object. + """ + return SortOrderProvider(self._j_object.withFailOnUnmapped(fail_on_unmapped)) + + class TableParquetWriterOptions(JObjectWrapper): """ - :class:`.TableParquetWriterOptions` provides specialized instructions for configuring :class:`.IcebergTableWriter` + `TableParquetWriterOptions` provides specialized instructions for configuring `IcebergTableWriter` instances. """ @@ -286,6 +373,7 @@ def __init__(self, maximum_dictionary_keys: Optional[int] = None, maximum_dictionary_size: Optional[int] = None, target_page_size: Optional[int] = None, + sort_order_provider: Optional[SortOrderProvider] = None, data_instructions: Optional[s3.S3Instructions] = None): """ Initializes the instructions using the provided parameters. @@ -294,10 +382,9 @@ def __init__(self, table_definition: TableDefinitionLike: The table definition to use when writing Iceberg data files using this writer instance. This definition can be used to skip some columns or add additional columns with null values. The provided definition should have at least one column. - schema_provider: Optional[SchemaProvider]: Used to extract a Schema from a iceberg table. This schema will + schema_provider: Optional[SchemaProvider]: Used to extract a Schema from an Iceberg table. This schema will be used in conjunction with the field_id_to_column_name to map Deephaven columns from table_definition to Iceberg columns. - Users can specify how to extract the schema in multiple ways (by ID, snapshot ID, initial schema, etc.). Defaults to `None`, which means use the current schema from the table. field_id_to_column_name: Optional[Dict[int, str]]: A one-to-one map from Iceberg field IDs from the schema_spec to Deephaven column names from the table_definition. @@ -313,6 +400,12 @@ def __init__(self, `None`, which means use 2^20 (1,048,576) target_page_size (Optional[int]): the target Parquet file page size in bytes, if not specified. Defaults to `None`, which means use 2^20 bytes (1 MiB) + sort_order_provider (Optional[SortOrderProvider]): Specifies the sort order to use for sorting new data + when writing to an Iceberg table with this writer. The sort order is determined at the time the writer + is created and does not change if the table's sort order changes later. Defaults to `None`, which means + the table's default sort order is used. More details about sort order can be found in the Iceberg + spec: https://iceberg.apache.org/spec/#sorting + Raises: DHError: If unable to build the object. @@ -342,6 +435,9 @@ def __init__(self, if target_page_size: builder.targetPageSize(target_page_size) + if sort_order_provider: + builder.sortOrderProvider(sort_order_provider.j_object) + if data_instructions: builder.dataInstructions(data_instructions.j_object) @@ -357,7 +453,7 @@ def j_object(self) -> jpy.JType: class IcebergTable(Table): """ - :class:`.IcebergTable` is a subclass of Table that allows users to dynamically update the table with new snapshots + `IcebergTable` is a subclass of Table that allows users to dynamically update the table with new snapshots from the Iceberg catalog. """ j_object_type = _JIcebergTable @@ -396,8 +492,8 @@ def j_object(self) -> jpy.JType: class IcebergTableWriter(JObjectWrapper): """ - :class:`.IcebergTableWriter` is responsible for writing Deephaven tables to an Iceberg table. Each - :class:`.IcebergTableWriter` instance associated with a single :class:`.IcebergTableAdapter` and can be used to + `IcebergTableWriter` is responsible for writing Deephaven tables to an Iceberg table. Each + `IcebergTableWriter` instance associated with a single `IcebergTableAdapter` and can be used to write multiple Deephaven tables to this Iceberg table. """ j_object_type = _JIcebergTableWriter or type(None) @@ -412,7 +508,7 @@ def append(self, instructions: IcebergWriteInstructions): partition paths where each table will be written using the :attr:`.IcebergWriteInstructions.partition_paths` parameter. This method will not perform any compatibility checks between the existing schema and the provided Deephaven - tables. All such checks happen at the time of creation of the :class:`.IcebergTableWriter` instance. + tables. All such checks happen at the time of creation of the `IcebergTableWriter` instance. Args: instructions (IcebergWriteInstructions): the customization instructions for write. @@ -426,7 +522,7 @@ def j_object(self) -> jpy.JType: class IcebergTableAdapter(JObjectWrapper): """ - :class:`.IcebergTableAdapter` provides an interface for interacting with Iceberg tables. It allows the user to list + `IcebergTableAdapter` provides an interface for interacting with Iceberg tables. It allows the user to list snapshots, retrieve table definitions and reading Iceberg tables into Deephaven tables. """ j_object_type = _JIcebergTableAdapter or type(None) @@ -487,7 +583,7 @@ def table(self, instructions: Optional[IcebergReadInstructions] = None) -> Icebe def table_writer(self, writer_options: TableParquetWriterOptions) -> IcebergTableWriter: """ - Create a new :class:`.IcebergTableWriter` for this Iceberg table using the provided writer options. + Create a new `IcebergTableWriter` for this Iceberg table using the provided writer options. This method will perform schema validation to ensure that the provided table definition from the writer options is compatible with the Iceberg table schema. All further writes performed by the returned writer will not be validated against the table's schema, and thus will be faster. @@ -507,7 +603,7 @@ def j_object(self) -> jpy.JType: class IcebergCatalogAdapter(JObjectWrapper): """ - :class:`.IcebergCatalogAdapter` provides an interface for interacting with Iceberg catalogs. It allows listing + `IcebergCatalogAdapter` provides an interface for interacting with Iceberg catalogs. It allows listing namespaces, tables and snapshots, as well as reading Iceberg tables into Deephaven tables. """ j_object_type = _JIcebergCatalogAdapter or type(None) @@ -568,7 +664,7 @@ def create_table(self, table_identifier: str, table_definition: TableDefinitionL table_definition (TableDefinitionLike): the table definition of the new table. Returns: - :class:`.IcebergTableAdapter`: the table adapter for the new Iceberg table. + `IcebergTableAdapter`: the table adapter for the new Iceberg table. """ return IcebergTableAdapter(self.j_object.createTable(table_identifier, @@ -608,7 +704,7 @@ def adapter_s3_rest( need to set this; it is most useful when connecting to non-AWS, S3-compatible APIs. Returns: - :class:`.IcebergCatalogAdapter`: the catalog adapter for the provided S3 REST catalog. + `IcebergCatalogAdapter`: the catalog adapter for the provided S3 REST catalog. Raises: DHError: If unable to build the catalog adapter. @@ -646,7 +742,7 @@ def adapter_aws_glue( catalog URI. Returns: - :class:`.IcebergCatalogAdapter`: the catalog adapter for the provided AWS Glue catalog. + `IcebergCatalogAdapter`: the catalog adapter for the provided AWS Glue catalog. Raises: DHError: If unable to build the catalog adapter. @@ -742,7 +838,7 @@ def adapter( hadoop_config (Optional[Dict[str, str]]): hadoop configuration properties for the catalog to load s3_instructions (Optional[s3.S3Instructions]): the S3 instructions if applicable Returns: - :class:`.IcebergCatalogAdapter`: the catalog adapter created from the provided properties + `IcebergCatalogAdapter`: the catalog adapter created from the provided properties Raises: DHError: If unable to build the catalog adapter